背景:
最近在做實時數據的抽取工作,利用FLinkCDC實時抽取目標庫Oracle的數據到Doris中,但是在抽取的過程中,會導致目標庫的生產庫數據庫非常卡頓,為了避免對生產環境的數據庫造成影響,對生產環境的數據庫利用OGG技術做了備庫,從備庫中利用FlinkCDC抽取數據到Doris,但是在抽取的過程中出現了同樣的錯誤,導致備庫數據庫卡頓,數據異常,數據庫宕機,至于這些原因,懷疑我們當時的方案出了嚴重的問題
第一次發方案:
?直接利用FLINKCDC抽取Oralce的binglog日志,這種方案就是在Dinky中啟動的每一個任務都會去Oracle的源端數據庫中讀取binglog日志,從而有大量的進程和線程出現,導致cpu和內存無限上升(剛開始cpu是8核,后面升級到了32核,問題還是出現)
架構圖如下:
改進后的方案:
Oracle CDC數據表主要用于獲取Oracle 數據,并可以實時同步數據表中的修改,經常用在復雜的計算場景。例如,作為一張維表和其他數據表做Join操作。在使用中,同一張MySQL表可能被多個作業依賴,當多個任務使用同一張MySQL表做處理時,MySQL數據庫會啟動多個連接,對MySQL服務器和網絡造成很大的壓力。
為了緩解對上游Oracle 數據庫的壓力,Flink實時計算已提供Oracle 整庫同步到Kafka的能力,通過引入Kafka作為中間層,利用OGG將數據推送到Kafka,然后FLink從Kafka獲取數據,這樣減少了源端數據庫的壓力
架構圖如下:
基本操作如下:
CREATE TEMPORARY TABLE tempOrder (`key_order_id` BIGINT NOT NULL,`value_product` STRING,PRIMARY KEY (key_order_id) NOT ENFORCED
) WITH ('connector' = 'upsert-kafka','topic' = 'order','properties.bootstrap.servers' = 'xxxx','key.format' = 'json','key.fields-prefix' = 'key_','value.format' = 'json','value.fields-prefix' = 'value_','value.fields-include' = 'EXCEPT_KEY','value.json.infer-schema.flatten-nested-columns.enable' = 'false','value.json.infer-schema.primitive-as-string' = 'false'
);
?
利用這種方案從而減少了源端數據庫的壓力
常見問題:1. 源端庫的鏈接數沾滿
? ? ? ? ? ? ? ? ? 2.FlinkCDC 引起的Flink服務器cpu卡頓問題