1.概念
doris并不擅長高頻、小量數據的導入;
因為doris每一次數據導入都會在be節點上生成數據文件;如果高頻導入小量數據,就會在存儲層產生大量的小文件(必然會影響到后續的查詢效率,也會對系統產生更多的compaction操作壓力)
而flink是實時不斷地往doris中插入數據,所以很容易出現上述問題;
怎么辦?有兩個辦法:
- 在flink中先做一些按時間開窗后的輕度聚合,降低寫入的數據量(在先flink端處理,后續的數據量變少了)
- 可以適當調大checkpoint間隔(10分鐘),降低插入頻率(原因是flink在做完checkpoint才往下游寫數據)
方案1:開窗輕度聚合
1.例子
例子:
-- 分鐘級聚合
CREATE TABLE doris_sink (window_start TIMESTAMP(3),total_count BIGINT,sum_value DECIMAL(16,2)
) WITH ('connector' = 'doris','table.identifier' = 'db.table','sink.batch.size' = '5000', 'sink.batch.interval' = '60s'
);INSERT INTO doris_sink
SELECTTUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start,COUNT(*) AS total_count,SUM(value) AS sum_value
FROM source_table
GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE);
優化效果??(示例):
時間窗口 | 原始數據量 | 聚合后數據量 | 寫入壓縮比 |
---|---|---|---|
1秒 | 10000條/s | 10000條/s | 1:1 |
10秒 | 10000條/s | 1000條/10s | 10:1 |
1分鐘 | 10000條/s | 167條/min | 60:1 |
?在flink端部分聚合,再寫入doris,數據量變小了,效率自然提高
2.合適的使用場景:
場景特征 | 適用性 | 技術實現要點 | 收益 |
---|---|---|---|
高并發寫入(>1萬條/秒) | ? | 滾動窗口聚合 + 計數窗口降頻 | 減少 90% 小文件,避免 -235 錯誤 1 |
亞秒級查詢需求 | ? | 預計算指標 + 結果表寫入 | 查詢延遲從秒級降至毫秒級 3 |
多源數據關聯 | ? | 窗口內多流 Join + 聚合 | 避免 Doris 復雜查詢,節省 30% CPU 5 |
精確統計需求 | ? | 需寫入原始明細數據 | - |
?(1)高并發寫入場景
當上游數據源(如 Kafka)的寫入并發量極高(例如每秒 10 萬條以上)時,直接寫入 Doris 可能導致以下問題:
- ??小文件過多??:頻繁寫入會產生大量小文件,觸發 Doris 的版本合并(Compaction)壓力,可能引發錯誤。
- ??資源消耗大??:高頻寫入導致 Doris BE 節點的 CPU 和 I/O 資源被 Compaction 任務占用,影響查詢性能。
??解決方案??:
在 Flink 中通過 ??滾動窗口(如 5 秒窗口)?? 或 ??計數窗口(如每 1000 條)?? 對數據進行預聚合,將多條數據合并為一條統計結果后再寫入 Doris。例如:
DataStream<Event> stream = ...;
stream.keyBy(Event::getKey).window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 5秒滾動窗口.aggregate(new AvgAggregator()) // 聚合邏輯(如計算均值).addSink(new DorisSink());對應到sql 直接開窗5s
此方式可將寫入頻率降低 10 倍以上,減少 Doris 的寫入壓力
(2)低延遲查詢需求場景
當業務需要??亞秒級查詢響應??(如實時大屏、風控決策)時,直接寫入原始數據可能導致:
- ??查詢性能下降??:原始數據量大,Doris 需實時聚合計算,增加查詢耗時;
- ??存儲成本高??:原始明細數據占用大量存儲空間。
解決方案??:
在 Flink 中按時間窗口(如 1 分鐘)預計算關鍵指標(如 PV、UV、GMV),僅將聚合結果寫入 Doris。例如:
- ??原始數據??:用戶點擊事件(每秒 10 萬條) → ??聚合后??:每分鐘 PV 統計值(每秒 1 條)。
此方式可提升 Doris 查詢效率,同時節省存儲資源
(3)數據預處理與清洗場景
當原始數據存在以下特征時,適合在 Flink 端聚合:
- ??冗余數據多??:如重復日志、無效埋點;
- ??關聯計算需求??:需跨數據源關聯(如用戶行為數據與訂單數據)。
??解決方案??:
通過 Flink 窗口函數實現:
- ??去重??:使用?
WindowFunction
?過濾重復數據; - ??關聯計算??:在窗口內完成多流 Join,輸出關聯結果。
例如,在 10 秒窗口內關聯用戶點擊與加購行為,輸出轉化率指標,避免 Doris 中復雜的多表關聯查詢
(4)資源受限場景
當 Doris 集群資源(CPU、內存、磁盤)有限時,可通過以下方式優化:
- ??降低寫入量??:聚合后數據量減少 50%~90%,降低 Doris 存儲和 Compaction 壓力;
- ??延長 Compaction 周期??:通過減少小文件數量,允許 Doris 合并任務更高效調度。
??參數調優建議??:
- Flink Checkpoint 間隔:從 5 秒調整為 30 秒~1 分鐘,減少事務提交頻率;
- Doris Compaction 參數:調低?
cumulative_size_based_promotion_min_size_mbytes
(默認 64MB → 8MB),加速小文件合并;
方案2:調大 Checkpoint 間隔
生產環境測試數據??:
Checkpoint間隔 | 吞吐量(events/s) | 寫入延遲(ms) | CPU利用率 |
---|---|---|---|
1分鐘 | 12,000 | 50-100 | 75% |
5分鐘 | 28,000 | 30-80 | 65% |
10分鐘 | 35,000 | 20-60 | 58% |
考一個對checkpoint的理解:flink是在做完checkpoint才往下游寫數據?,比如說checkpoint的時間是1分鐘,豈不是延遲就是一分鐘?
結論:??數據處理和狀態快照是解耦的??。調整 Checkpoint 間隔只會影響故障恢復時可能丟失的數據量(Recovery Time Objective),??不會增加數據處理的固有延遲?;?
具體例子(以第 N 分鐘為例):
-
??0:00.000??
- 用戶A點擊商品X → Kafka 生產事件
- Flink 立即消費并處理,PV計數器+1 → 實時寫入 Doris
-
??0:00.500??
- CheckpointCoordinator 觸發新一輪 Checkpoint
- Source 算子注入 Barrier 到數據流(特殊標記,不影響正常數據處理)
-
??0:00.501-0:02.000??
- Barrier 隨數據流向下游傳播
- PV統計算子 ??邊處理新事件?? 邊接收 Barrier:
- Doris 持續收到?
PV=100 → 101 → 102...
?的寫入請求
-
??0:03.000??
- 所有算子完成狀態快照(耗時約2秒)
- 快照存儲到 HDFS(異步執行,不阻塞主線程)
-
??0:06.000??
- Checkpoint 確認完成,JM 記錄元數據;
正常情況:
- 用戶點擊后 ??500ms?? 內即可在 Doris 查詢到最新 PV(實際延遲僅網絡+計算耗時)
- Checkpoint 過程持續 ??6秒??,但期間 Doris 收到 ??60次?? 數據寫入(每秒10次);
故障要恢復情況:
假設在 ??0:50?? 發生故障:
- 從最近 Checkpoint(0:00 開始,0:06 完成)恢復
- 狀態回滾到 PV=100(Checkpoint 時的快照值)
- ??但 Doris 實際已寫入 PV=150??
- Flink 通過事務機制保證最終 PV=150 + 恢復后新數據 的精確一次語義
這時候聰明的你又發現:
Doris 實際已寫入 PV=150??,相當于以及寫入到下游的doris,是怎么讓數據回滾的???
原因:Flink 在故障恢復時保證 Doris 已寫入的 PV=150 數據不會導致重復計算,核心是通過??兩階段提交(2PC)機制??與??事務性寫入??實現的,所以可以回滾數據
Checkpoint 與事務的階段性控制?:
Flink 的 Checkpoint 過程與 Sink 的事務提交嚴格綁定,整個過程分為 ??預提交(pre-commit)?? 和 ??正式提交(commit)?? 兩個階段
-
??預提交階段??(Checkpoint 進行中)
- Flink Sink 將計算結果(如 PV=100→150 的增量)寫入 Doris 的??臨時存儲位置??(如臨時表或事務日志),但??未對外可見??。
- 此時 Doris 的 PV=150 ??僅處于預提交狀態??,未實際生效。
-
??正式提交階段??(Checkpoint 確認完成)
- 當 JobManager 收到所有算子的 Checkpoint 完成確認后,才會通知 Sink ??提交事務??。
- Doris 將臨時數據??原子性替換為正式數據??(如重命名臨時文件或更新可見標志)
故障恢復時的回滾邏輯??
假設故障發生在 ??0:50??(Checkpoint 未完成):
-
??未完成的 Checkpoint 事務回滾??
- Flink 從最近成功的 Checkpoint(PV=100)恢復狀態。
- 同時,Doris 中處于預提交狀態的 PV=150 ??會被自動清理??(如刪除臨時表或撤銷事務日志)。
-
??數據重放與冪等性保障??
- Flink 會從 Source 端(如 Kafka)??重放 Checkpoint 后的數據??(0:06→0:50 的數據)。
- Doris Sink 在寫入時通過??事務 ID 或唯一鍵??實現冪等性,確保相同數據多次寫入不會重復累加;
疑問:針對數據回滾的場景,doris能查詢到 PV=150的數據嗎
Doris 默認的隔離級別保證查詢只能看到已提交的數據,所以查看不到PV=150的數據
其他調優手段:
1、開啟 MiniBatch 聚合
table.exec.mini-batch.enabled = true
table.exec.mini-batch.size = 5000
2、配置 Doris 批量寫入
sink.batch.size = 5000
sink.max-retries = 5 --最大可重試5次
3、異步 Compaction 調優
ALTER TABLE doris_table SET ("compaction_policy" = "time_series");