hamed sahami
hamed sahami
خواندن ۲ دقیقه·۱ سال پیش

Python ETL

سلام دوستان و مخاطبین عزیزم
نمونه کد این نوشته برای انتقال اطلاعات از پایگاه داده PostgreSQL به پایگاه داده MS SQL با زبان پایتون با سادگی زیاد تهیه شده است. در این کد تلاش کردم ضمن حفظ سادگی بار خیلی کمی بروی منبع اطلاعاتی با خرد کردن اطلاعات به تعداد رکوردهای محدود پنج هزارتایی تقسیم کنم تا مدت تراکنش و Lock شدن جدول منبع به میزان زیادی کاهش پیدا کنه، علاوه بر اون سعی کردم تو کتابخانه پاندا برای ذخیره اطلاعات در سمت پایگاه داده مقصد از روش BulkInsert استفاده کنم تا ذخیره کردن اطلاعات هم با سرعت قابل قبولی صورت بگیره.

دلتون شاد کدتون سلامت
;)


import psycopg2

import pyodbc

import pandas as pd

from sqlalchemy import create_engine

# PostgreSQL connection details

pg_db = "****"

pg_user = "**********"

pg_password = "***************"

pg_host = "**********"

pg_port = "**********"

# MSSQL connection details

mssql_driver = "{ODBC Driver 17 for SQL Server}"

mssql_server = "*****************"

mssql_db = "*****"

mssql_user = "***********"

mssql_password = "*************"

table_name = '*****************'

# Connect to PostgreSQL database

with psycopg2.connect(database=pg_db, user=pg_user, password=pg_password, host=pg_host, port=pg_port,

connect_timeout=0) as pg_conn:

# Connect to MSSQL database

conn_str = f"DRIVER={mssql_driver};SERVER={mssql_server};DATABASE={mssql_db};UID={mssql_user};PWD={mssql_password}"

engine = create_engine(f"mssql+pyodbc:///?odbc_connect={conn_str}",fast_executemany=True)

# ...

# Fetch data from PostgreSQL table in chunks and insert into MSSQL table

command = """

SELECT *

FROM """+table_name+"""

"""

df = pd.read_sql_query(command, pg_conn, chunksize=5000)

# Establish a connection to the MSSQL database

with pyodbc.connect(conn_str) as mssql_conn:

# Execute the DELETE query

cursor = mssql_conn.cursor()

cursor.execute(""" truncate table temp_"""+table_name)

mssql_conn.commit()

# Iterate over the chunks and insert into MSSQL table

for chunk in df:

chunk.to_sql('temp_'+table_name, con=engine, if_exists='append', index=False )

print('Data inserted successfully')

# ...

# Execute the DELETE query

cursor.execute("""

DELETE FROM """+table_name+"""

WHERE id IN (SELECT id FROM temp_"""+table_name+""");

INSERT INTO """+table_name+"""

SELECT * FROM temp_"""+table_name+""" with(tablock);

""")

print('Data transfered from temp')

# Commit the changes

mssql_conn.commit()

# Close connections

pg_conn.close()

engine.dispose()



sql serverزبان پایتون
https://www.linkedin.com/in/hamedsahami/
شاید از این پست‌ها خوشتان بیاید