MongoDB to PostgreSQL ETL Process

Sıla Kazan
5 min readDec 21, 2024

--

In the world of data engineering, data transfer is often a complex and challenging process. Data frequently comes from various sources and needs to be transferred to different target systems. In this post, I will walk you through the steps of creating an ETL (Extract, Transform, Load) process by transferring data from MongoDB to PostgreSQL.

Step 1: Project Objective

First, my objective was to transfer the “shipment” data from the MongoDB database to PostgreSQL. Identifying the required tools and technologies was a critical step for the success of the project. Throughout the project, I performed various transformations to convert the flexible structure of MongoDB into the powerful relational structure of PostgreSQL.

Step 2: Tools and Technologies

For this ETL process, we will use the following tools and technologies:

  • MongoDB: The source database of the data.
  • PostgreSQL: The target database for the data.
  • Python: For data processing and connection tasks.
  • Pymongo and Psycopg2: Python libraries to communicate with MongoDB and PostgreSQL.

Step 3: Connecting to MongoDB

To connect to MongoDB, I used the pymongo library to establish the necessary connection. In this step, I connected to MongoDB and performed the data insertion process.

Example code to insert data into MongoDB:

from pymongo import MongoClient
import random
from datetime import datetime, timezone

# MongoDB bağlantısı
client = MongoClient("mongodb://localhost:27017")
db = client.mydatabase
collection = db.mycollection
#db = client["shipments"]
#collection = db["shipment_data"]

# Veri oluşturucu
def generate_data():
for _ in range(10): # 10 sevkiyat
shipment = {
"shipment_id": f"SHIP{random.randint(1000, 9999)}",
"date": datetime.now(timezone.utc),
"parcels": [f"PARCEL{random.randint(100, 999)}" for _ in range(5)],
"barcodes": [f"BARCODE{random.randint(1000, 9999)}" for _ in range(5)],
"address": {
"street": f"Street {random.randint(1, 100)}",
"city": "Istanbul",
"zip_code": f"{random.randint(10000, 99999)}",
},
}
collection.insert_one(shipment)
print("Veri MongoDB'ye başarıyla eklendi.")

# Veriyi MongoDB'den okuma ve yazdırma
def print_data():
shipments = collection.find() # Tüm verileri al
for shipment in shipments:
print(shipment)


if __name__ == "__main__":
generate_data()
print_data()

Code output:

{'_id': ObjectId('67674500c3eebcddd2a70569'), 'shipment_id': 'SHIP4966', 'date': datetime.datetime(2024, 12, 21, 22, 45, 20, 94000), 'parcels': ['PARCEL278', 'PARCEL610', 'PARCEL896', 'PARCEL772', 'PARCEL264'], 'barcodes': ['BARCODE8233', 'BARCODE5985', 'BARCODE3660', 'BARCODE4201', 'BARCODE2229'], 'address': {'street': 'Street 31', 'city': 'Istanbul', 'zip_code': '81416'}}
{'_id': ObjectId('67674500c3eebcddd2a7056a'), 'shipment_id': 'SHIP8268', 'date': datetime.datetime(2024, 12, 21, 22, 45, 20, 112000), 'parcels': ['PARCEL399', 'PARCEL430', 'PARCEL956', 'PARCEL207', 'PARCEL216'], 'barcodes': ['BARCODE6083', 'BARCODE6166', 'BARCODE1968', 'BARCODE3623', 'BARCODE5516'], 'address': {'street': 'Street 89', 'city': 'Istanbul', 'zip_code': '86501'}}
{'_id': ObjectId('67674500c3eebcddd2a7056b'), 'shipment_id': 'SHIP5316', 'date': datetime.datetime(2024, 12, 21, 22, 45, 20, 112000), 'parcels': ['PARCEL417', 'PARCEL836', 'PARCEL564', 'PARCEL785', 'PARCEL706'], 'barcodes': ['BARCODE1964', 'BARCODE1354', 'BARCODE7876', 'BARCODE5365', 'BARCODE2773'], 'address': {'street': 'Street 45', 'city': 'Istanbul', 'zip_code': '16512'}}
{'_id': ObjectId('67674500c3eebcddd2a7056c'), 'shipment_id': 'SHIP2533', 'date': datetime.datetime(2024, 12, 21, 22, 45, 20, 113000), 'parcels': ['PARCEL924', 'PARCEL190', 'PARCEL386', 'PARCEL557', 'PARCEL617'], 'barcodes': ['BARCODE7720', 'BARCODE3661', 'BARCODE8292', 'BARCODE2049', 'BARCODE9219'], 'address': {'street': 'Street 53', 'city': 'Istanbul', 'zip_code': '29725'}}
{'_id': ObjectId('67674500c3eebcddd2a7056d'), 'shipment_id': 'SHIP8012', 'date': datetime.datetime(2024, 12, 21, 22, 45, 20, 113000), 'parcels': ['PARCEL742', 'PARCEL632', 'PARCEL290', 'PARCEL583', 'PARCEL271'], 'barcodes': ['BARCODE1214', 'BARCODE9911', 'BARCODE1422', 'BARCODE1544', 'BARCODE5078'], 'address': {'street': 'Street 82', 'city': 'Istanbul', 'zip_code': '75669'}}
{'_id': ObjectId('67674500c3eebcddd2a7056e'), 'shipment_id': 'SHIP7193', 'date': datetime.datetime(2024, 12, 21, 22, 45, 20, 113000), 'parcels': ['PARCEL516', 'PARCEL760', 'PARCEL682', 'PARCEL667', 'PARCEL454'], 'barcodes': ['BARCODE9352', 'BARCODE5576', 'BARCODE3169', 'BARCODE4200', 'BARCODE3075'], 'address': {'street': 'Street 70', 'city': 'Istanbul', 'zip_code': '63509'}}
{'_id': ObjectId('67674500c3eebcddd2a7056f'), 'shipment_id': 'SHIP3650', 'date': datetime.datetime(2024, 12, 21, 22, 45, 20, 114000), 'parcels': ['PARCEL486', 'PARCEL700', 'PARCEL644', 'PARCEL281', 'PARCEL394'], 'barcodes': ['BARCODE5418', 'BARCODE3007', 'BARCODE5924', 'BARCODE6832', 'BARCODE7603'], 'address': {'street': 'Street 22', 'city': 'Istanbul', 'zip_code': '27500'}}
{'_id': ObjectId('67674500c3eebcddd2a70570'), 'shipment_id': 'SHIP7924', 'date': datetime.datetime(2024, 12, 21, 22, 45, 20, 114000), 'parcels': ['PARCEL397', 'PARCEL144', 'PARCEL681', 'PARCEL528', 'PARCEL576'], 'barcodes': ['BARCODE9050', 'BARCODE4096', 'BARCODE6654', 'BARCODE9905', 'BARCODE3535'], 'address': {'street': 'Street 63', 'city': 'Istanbul', 'zip_code': '97045'}}
{'_id': ObjectId('67674500c3eebcddd2a70571'), 'shipment_id': 'SHIP1735', 'date': datetime.datetime(2024, 12, 21, 22, 45, 20, 114000), 'parcels': ['PARCEL172', 'PARCEL279', 'PARCEL804', 'PARCEL142', 'PARCEL907'], 'barcodes': ['BARCODE3021', 'BARCODE1241', 'BARCODE7891', 'BARCODE3129', 'BARCODE6532'], 'address': {'street': 'Street 69', 'city': 'Istanbul', 'zip_code': '29061'}}
{'_id': ObjectId('67674500c3eebcddd2a70572'), 'shipment_id': 'SHIP6319', 'date': datetime.datetime(2024, 12, 21, 22, 45, 20, 115000), 'parcels': ['PARCEL136', 'PARCEL363', 'PARCEL763', 'PARCEL753', 'PARCEL421'], 'barcodes': ['BARCODE6587', 'BARCODE1276', 'BARCODE1385', 'BARCODE3150', 'BARCODE2151'], 'address': {'street': 'Street 78', 'city': 'Istanbul', 'zip_code': '83397'}}

Step 4: Preparing the PostgreSQL Database

Before transferring data to PostgreSQL, I needed to set up the PostgreSQL database and the required tables. At this point, I used the psycopg2 library to establish the database connection and create the tables.

Example code to create a PostgreSQL database and table:

from pymongo import MongoClient
import psycopg2
from psycopg2 import extensions
from pytz import timezone
from datetime import datetime

# MongoDB bağlantısı
client = MongoClient("mongodb://localhost:27017")
db = client["shipments"]
collection = db["shipment_data"]

# PostgreSQL veritabanını oluşturma
def create_new_database():
try:
# PostgreSQL'e bağlan
temp_conn = psycopg2.connect(
dbname="postgres",
user="postgres",
password="postgres",
host="localhost",
port="5432"
)
temp_conn.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
temp_cursor = temp_conn.cursor()

# Yeni veritabanını oluştur
temp_cursor.execute("CREATE DATABASE shipments_3;")
print("Yeni veritabanı 'shipments_3' oluşturuldu.")

temp_cursor.close()
temp_conn.close()
except psycopg2.errors.DuplicateDatabase:
print("Veritabanı 'shipments_3' zaten mevcut, yeniden oluşturulmadı.")
except Exception as e:
print(f"Yeni veritabanı oluşturulurken bir hata oluştu: {e}")

# PostgreSQL bağlantısı ve tablo oluşturma
def setup_tables():
try:
conn = psycopg2.connect(
dbname="shipments_3",
user="postgres",
password="postgres",
host="localhost",
port="5432"
)
cursor = conn.cursor()

print("PostgreSQL bağlantısı başarılı")

# PostgreSQL tablolarını oluştur
cursor.execute("""
CREATE TABLE IF NOT EXISTS shipments (
shipment_id VARCHAR PRIMARY KEY,
date TIMESTAMP
);
CREATE TABLE IF NOT EXISTS parcels (
parcel_id SERIAL PRIMARY KEY,
shipment_id VARCHAR NOT NULL,
barcode VARCHAR UNIQUE, -- Barcode'a UNIQUE constraint ekleyin
FOREIGN KEY (shipment_id) REFERENCES shipments(shipment_id)
);
CREATE TABLE IF NOT EXISTS addresses (
address_id SERIAL PRIMARY KEY,
shipment_id VARCHAR NOT NULL,
street VARCHAR,
city VARCHAR,
zip_code VARCHAR,
FOREIGN KEY (shipment_id) REFERENCES shipments(shipment_id)
);
""")
conn.commit()
print("Tablolar başarıyla oluşturuldu.")
return conn, cursor
except psycopg2.OperationalError as e:
print(f"PostgreSQL bağlantısı sırasında bir hata oluştu: {e}")
raise


if __name__ == "__main__":
create_new_database() # Yeni veritabanını oluştur
conn, cursor = setup_tables() # Yeni tabloları oluştur

Step 5: Data Transfer (ETL Process)

The transfer of data from MongoDB to PostgreSQL was the most critical part of the ETL process. In this phase, I took necessary precautions to ensure the accuracy of the data and prevent conflicts with existing data. While transferring the data to PostgreSQL, I implemented the following validation and transfer logic:


# MongoDB’den PostgreSQL’e aktarım
def etl_process(cursor, conn):
for doc in collection.find():
try:
if validate_data(doc):
shipment_id = doc["shipment_id"]
date = doc["date"].astimezone(timezone("Europe/Istanbul"))

# Duplicate kontrolü
if not check_duplicate(cursor, shipment_id):
# Sevkiyat bilgilerini ekle
cursor.execute(
"INSERT INTO shipments (shipment_id, date) VALUES (%s, %s)",
(shipment_id, date)
)
else:
print(f"Duplicate shipment_id {shipment_id} bulundu, veri eklenmedi.")
continue

# Paket bilgilerini ekle
for barcode in doc["barcodes"]:
if not check_duplicate(cursor, shipment_id, barcode):
cursor.execute(
"INSERT INTO parcels (shipment_id, barcode) VALUES (%s, %s)",
(shipment_id, barcode)
)
else:
print(f"Duplicate barcode {barcode} bulundu, paket verisi eklenmedi.")

# Adres bilgilerini ekle
address = doc["address"]
cursor.execute(
"INSERT INTO addresses (shipment_id, street, city, zip_code) VALUES (%s, %s, %s, %s)",
(shipment_id, address["street"], address["city"], address["zip_code"])
)
except ValueError as e:
print(f"Veri doğrulama hatası: {e}")
except Exception as e:
print(f"Genel bir hata oluştu: {e}")

conn.commit()
print("Veri başarıyla PostgreSQL'e aktarıldı.")

Step 6: Results and Visualization

After completing the data transfer, I wanted to visualize the data in PostgreSQL to verify the success of the process. In this phase, I confirmed that the data was correctly transferred and stored without issues in the PostgreSQL database.

def print_postgresql_data(cursor):
print("\n--- Sevkiyat Verileri ---")
cursor.execute("SELECT * FROM shipments;")
shipments = cursor.fetchall()
for shipment in shipments:
print(shipment)

Step 7: Conclusion

Transferring data from MongoDB to PostgreSQL was a complex process, but it was highly educational, with numerous new techniques and methods that I learned along the way. In this post, I have explained how I successfully implemented the ETL process between MongoDB and PostgreSQL and how I overcame the challenges at each step.

Thank you :)

This project Github link :

https://github.com/silakazan/Mong_to_postgres/tree/main

--

--

Sıla Kazan
Sıla Kazan

Responses (1)