from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
# 配置數據庫連接(示例為PostgreSQL->MySQL)
SRC_DB_URL = 'postgresql://user:pass@source_host:5432/source_db'
DST_DB_URL = 'mysql+pymysql://user:pass@dest_host:3306/dest_db'
# 創建引擎和會話
src_engine = create_engine(SRC_DB_URL)
dst_engine = create_engine(DST_DB_URL)
SrcSession = sessionmaker(bind=src_engine)
DstSession = sessionmaker(bind=dst_engine)
def migrate_with_raw_sql():
? ? with SrcSession() as src_session, DstSession() as dst_session:
? ? ? ? # 從源數據庫查詢數據(使用原生SQL)
? ? ? ? query = text("SELECT id, name, age FROM users WHERE updated_at > :last_update")
? ? ? ? src_data = src_session.execute(query, {"last_update": "2025-01-01"}).fetchall()
? ? ? ??
? ? ? ? # 構建批量更新SQL語句
? ? ? ? update_sql = text("""
? ? ? ? ? ? UPDATE users?
? ? ? ? ? ? SET name = :name, age = :age?
? ? ? ? ? ? WHERE id = :id
? ? ? ? """)
? ? ? ??
? ? ? ? # 執行批量更新
? ? ? ? for row in src_data:
? ? ? ? ? ? dst_session.execute(
? ? ? ? ? ? ? ? update_sql,?
? ? ? ? ? ? ? ? {"id": row.id, "name": row.name, "age": row.age}
? ? ? ? ? ? )
? ? ? ??
? ? ? ? dst_session.commit()
? ? ? ? print(f"通過原生SQL語句完成 {len(src_data)} 條記錄更新")
if __name__ == '__main__':
? ? migrate_with_raw_sql()
?