表數據同步的斷點續傳
有時候需要將一個表的數據復制到另一個表,循環是常用的方式。當表比較大,執行的時間很長,會有很多因素引起失敗。我希望可以比較簡單的跑數,所以做一個簡單的任務系統。
SQLitre是嵌入式數據庫,這樣腳本可以不必考慮太多依賴,又可以用到數據庫的持久化功能。
1 數據庫初始化
給到一個文件路徑參數,確定持久化的文件
import sqlite3
import time
import randomdef init_db(DB_FILE = "tasks.db"):"""初始化數據庫和表"""conn = sqlite3.connect(DB_FILE)conn.row_factory = sqlite3.Row cur = conn.cursor()cur.execute("""CREATE TABLE IF NOT EXISTS tasks (id INTEGER PRIMARY KEY AUTOINCREMENT,name TEXT NOT NULL,content TEXT, status INT DEFAULT 0 -- 0=未開始, 1=進行中, 2=完成, 3=失敗)""")# 可選:給 status 建索引cur.execute("CREATE INDEX IF NOT EXISTS idx_status ON tasks(status)")cur.execute("CREATE INDEX IF NOT EXISTS idx_name ON tasks(name)")conn.commit()return conn, cur
實際中可以以腳本的file path作為參數(結尾改為.db)
# 建立數據庫
conn, cur = init_db(DB_FILE = './the_script.db')
2 任務初始化
同步任務,可以按照id分為若干區間,每個區間任務額可以稱為lot。
tuple_list = slice_list_by_batch1(min_id,max_id, 10000)
task_df = pd.DataFrame()
task_df['name'] = ['lot_%s' % i for i in range(len(tuple_list)) ]
task_df['content'] = tuple_list
task_df['content'] = task_df['content'].apply(lambda x: json.dumps(x))
task_lod = df2lod(task_df)
# 寫入任務
cur.executemany("INSERT INTO tasks (name, content) VALUES (?,?)", [(t['name'],t['content']) for t in task_lod])
conn.commit()res = cur.execute('select count(*) from tasks').fetchone()
dict(res)
sqlite也可以提供類似 Row Dict的格式,需要
conn.row_factory = sqlite3.Row cur = conn.cursor()row = cur.execute(new_task_sql).fetchone()
3 處理一次
每次啟動時,讀出未處理的任務。任務里有tuple參數,根據tuple的起止區間獲取數據后進行處理,如果成功就根據任務名將狀態更新為2,異常則更新為3。這樣每次重復執行這個就可以了,因為數據持久化在文件中,所以即使和服務器斷開連接也沒關系。
def process_one():new_task_sql = 'select name, content, status from tasks where status = 0 limit 1'# new_task = cur.execute(new_task_sql).fetchall()conn.row_factory = sqlite3.Row cur = conn.cursor()row = cur.execute(new_task_sql).fetchone()process_code = 1if row:row_dict = dict(row)tem_tuple = json.loads(row_dict['content'])print('>>>> ',row_dict['name'])try:some_tuple = tem_tuple---- DO LOGICcur.execute("UPDATE tasks SET status = 2 WHERE name = ?", (row_dict['name'],))conn.commit()except:cur.execute("UPDATE tasks SET status = 3 WHERE name = ?", (row_dict['name'],))conn.commit()else:process_code = 0return process_code
4 處理直到結束
由于 process_one
在沒有獲取到待處理任務行時會返回0,這個作為結束信號。所以可以給到一個略大的循環次數,當收到結束信號時停止。
for i in range(30000):process_code = process_one()if not process_code:print('無待處理任務')break