粗糧廠的基于spark的通用olap之間的同步工具項目
- 1 項目背景
- 2 項目實現
- 2.1 實現原理
- 2.2 細節要點
- 3 抽樣說明
- 4 項目運行狀態
- 4.1 運行速度
- 4.2 項目吞吐
- 4.3 穩定性
說的比較簡單,有需要的可以留言,我不斷補充完善
1 項目背景
我們公司內部的需要一款,能在不同的olap之間做數據傳遞與拷貝,例如 iceberg到doris,到mysql,甚至到kafka的,這么一個數據同步工具,要盡可能簡單,盡可能維護容易。所以有了這么一個項目的誕生,目前可以實現,通過一條簡短的shell命令,實現不同數據庫與存儲之間的數據拷貝。
目前這套方案,在公司內部已經部署4個數據團隊,服務對象產品+數據研發 超過100人,直接使用使用的業務對象,超過 4000人。
2 項目實現
2.1 實現原理
目前的實現是通過spark來實現,分為兩個部分:
- 1 寫入同步信息:同步任務記錄,是一個很簡單的,通過shell 傳參,調用一個spark任務,執行一個簡單的,數據插入動作,將 數據源表,目標,持有人,過濾或想要保留的字段,數據篩選項等一些信息,傳入spark任務中,并將數據寫入一個mysql中保存。其中源表與目標表,通過 catalog__database__schema__tablename的方式保存,并維護了一套catalog,通過前綴就可以知道數據在哪個引擎的哪個表中。
- 2 讀取同步信息:一個常駐的,死循環的spark任務,會定期遍歷mysql,會篩選出目前符合條件的,未過期的,同步任務,使用ExecutionContext 和 Future ,來并發執行同步任務,通過源信息,與反射 ,維護一個連接的配置項,來做隔離,保證數據傳入時,不會涉及隱私信息
2.2 細節要點
- 并發部分,可以通過【讀取同步信息】 任務部分啟動時,動態傳參,來控制數據流量
- 在任務中,維護了3個列表,分別保證,同一個任務只會執行一次,同一個目標表,同一時間只有一個任務在寫入,任務執行超過配置時間,會自動殺死,并允許新的任務調起,這樣就可以保證不會觸發目標的鎖,并控制重復提交
- 通過對不同傳入參數解析,對于每個目標引擎單獨部署獨立的同步任務,做到資源隔離
- spark任務 每個并發執行有做到很好的異常捕獲,發生問題時,可以調用報警接口,發送信息到持有人飛書中;對于常駐的 【讀取同步信息】整體任務監控,做到2天殺死重新啟動,并每5分鐘pid判活,保證任務的執行中
- 任務監控與判斷,對目標數據與原始數據做數據量校驗,對數據過程中的日志做接受,掃描錯誤日志等,保證要給
3 抽樣說明
這里抽樣說明一下 ,iceberg 同步數據到hologres 時的要點,其實整體的使用都相同,不過在開發的時候,可以根據不同的引擎做不同的細節調整 : 例入hologres
- 使用spark-connector-hologres的連接器寫入數據,連接器會先在hologres引擎中創建臨時表,數據寫入完成后,再做insert overwrite動作,因為分布式存儲的問題,所以就需要在代碼里手動執行set hg_experimental_force_sync_replay = on; 來保證元數據在不同節點的同步
- 使用hologres連接器,對原始數據量做判斷,超過1千萬的,執行serverless,也就是后被隱藏能源!
- 增加1次的任務重試,減少因為元數據不同步導致的表不存在的bug
- 目標數據是視圖的方式,也有分區表,可以在代碼中做判斷并刷新視圖,保證數據插入可以兼容
總的來說,可以根據不同的業務目標庫與使用方法,做單獨的優化迭代,保證到每次的同步都是最優的選項
4 項目運行狀態
還是以iceberg 到 hologres 為例,某個實例的spark資源情況為 180個 Executor,每個4G,16G的DM,參數配置為:
--conf spark.sql.catalog.iceberg_zjyprc_hadoop.cache-enabled=false
--conf spark.sql.adaptive.coalescePartitions.initialPartitionNum=30
--conf spark.network.timeout=180000
--conf spark.slow.shuffle.fetch.time.blacklist.threshold=60000
--conf spark.speculation=false
--conf spark.excludeOnFailure.enabled=false
--conf spark.task.maxFailures=1
4.1 運行速度
目前 5000w條數級別的數據量,大概需要 16-17分鐘,而且這里面有一半的時間時因為hologres連接器在內部重新shuffle,如果目標是mysql之類的,速度會提高至少一半
4.2 項目吞吐
目前每日同步 9000張表,總數據量大概 1-2T左右,基本可以滿足業務需求
4.3 穩定性
通過上述的監控與定期重啟,配合計算引擎的升級,同步迭代工具的使用,例如hologres 支持了insert overwrite 命令,可以實現寫cpu打滿也不會影響讀的使用,同步迭代最新版本,可以保證業務的高可用。