flink寫doris時的優化

1.概念

doris并擅長高頻小量數據的導入

因為doris每一次數據導入都會在be節點上生成數據文件;如果高頻導入小量數據,就會在存儲層產生大量的小文件(必然會影響到后續的查詢效率,也會對系統產生更多的compaction操作壓力)

而flink是實時不斷地往doris中插入數據,所以很容易出現上述問題;

怎么辦?有兩個辦法:

  1. 在flink中先做一些按時間開窗后的輕度聚合,降低寫入的數據量(在先flink端處理,后續的數據量變少了)
  2. 可以適當調大checkpoint間隔(10分鐘),降低插入頻率(原因是flink在做完checkpoint才往下游寫數據)

方案1:開窗輕度聚合

1.例子

例子:
-- 分鐘級聚合
CREATE TABLE doris_sink (window_start TIMESTAMP(3),total_count BIGINT,sum_value DECIMAL(16,2)
) WITH ('connector' = 'doris','table.identifier' = 'db.table','sink.batch.size' = '5000', 'sink.batch.interval' = '60s'
);INSERT INTO doris_sink
SELECTTUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start,COUNT(*) AS total_count,SUM(value) AS sum_value
FROM source_table
GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE);

優化效果??(示例):

時間窗口原始數據量聚合后數據量寫入壓縮比
1秒10000條/s10000條/s1:1
10秒10000條/s1000條/10s10:1
1分鐘10000條/s167條/min60:1

?在flink端部分聚合,再寫入doris,數據量變小了,效率自然提高

2.合適的使用場景:

場景特征適用性技術實現要點收益
高并發寫入(>1萬條/秒)?滾動窗口聚合 + 計數窗口降頻減少 90% 小文件,避免 -235 錯誤

1

亞秒級查詢需求?預計算指標 + 結果表寫入查詢延遲從秒級降至毫秒級

3

多源數據關聯?窗口內多流 Join + 聚合避免 Doris 復雜查詢,節省 30% CPU

5

精確統計需求?需寫入原始明細數據-

?(1)高并發寫入場景

當上游數據源(如 Kafka)的寫入并發量極高(例如每秒 10 萬條以上)時,直接寫入 Doris 可能導致以下問題:

  1. ??小文件過多??:頻繁寫入會產生大量小文件,觸發 Doris 的版本合并(Compaction)壓力,可能引發錯誤。
  2. ??資源消耗大??:高頻寫入導致 Doris BE 節點的 CPU 和 I/O 資源被 Compaction 任務占用,影響查詢性能。

??解決方案??:
在 Flink 中通過 ??滾動窗口(如 5 秒窗口)?? 或 ??計數窗口(如每 1000 條)?? 對數據進行預聚合,將多條數據合并為一條統計結果后再寫入 Doris。例如:

DataStream<Event> stream = ...;
stream.keyBy(Event::getKey).window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 5秒滾動窗口.aggregate(new AvgAggregator()) // 聚合邏輯(如計算均值).addSink(new DorisSink());對應到sql 直接開窗5s

此方式可將寫入頻率降低 10 倍以上,減少 Doris 的寫入壓力

(2)低延遲查詢需求場景

當業務需要??亞秒級查詢響應??(如實時大屏、風控決策)時,直接寫入原始數據可能導致:

  1. ??查詢性能下降??:原始數據量大,Doris 需實時聚合計算,增加查詢耗時;
  2. ??存儲成本高??:原始明細數據占用大量存儲空間。

解決方案??:
在 Flink 中按時間窗口(如 1 分鐘)預計算關鍵指標(如 PV、UV、GMV),僅將聚合結果寫入 Doris。例如:

  • ??原始數據??:用戶點擊事件(每秒 10 萬條) → ??聚合后??:每分鐘 PV 統計值(每秒 1 條)。
    此方式可提升 Doris 查詢效率,同時節省存儲資源

(3)數據預處理與清洗場景

當原始數據存在以下特征時,適合在 Flink 端聚合:

  1. ??冗余數據多??:如重復日志、無效埋點;
  2. ??關聯計算需求??:需跨數據源關聯(如用戶行為數據與訂單數據)。

??解決方案??:
通過 Flink 窗口函數實現:

  • ??去重??:使用?WindowFunction?過濾重復數據;
  • ??關聯計算??:在窗口內完成多流 Join,輸出關聯結果。
    例如,在 10 秒窗口內關聯用戶點擊與加購行為,輸出轉化率指標,避免 Doris 中復雜的多表關聯查詢

(4)資源受限場景

當 Doris 集群資源(CPU、內存、磁盤)有限時,可通過以下方式優化:

  1. ??降低寫入量??:聚合后數據量減少 50%~90%,降低 Doris 存儲和 Compaction 壓力;
  2. ??延長 Compaction 周期??:通過減少小文件數量,允許 Doris 合并任務更高效調度。

??參數調優建議??:

  • Flink Checkpoint 間隔:從 5 秒調整為 30 秒~1 分鐘,減少事務提交頻率;
  • Doris Compaction 參數:調低?cumulative_size_based_promotion_min_size_mbytes(默認 64MB → 8MB),加速小文件合并;

方案2:調大 Checkpoint 間隔

生產環境測試數據??:

Checkpoint間隔吞吐量(events/s)寫入延遲(ms)CPU利用率
1分鐘12,00050-10075%
5分鐘28,00030-8065%
10分鐘35,00020-6058%

考一個對checkpoint的理解:flink是在做完checkpoint才往下游寫數據?,比如說checkpoint的時間是1分鐘,豈不是延遲就是一分鐘?

結論:??數據處理和狀態快照是解耦的??。調整 Checkpoint 間隔只會影響故障恢復時可能丟失的數據量(Recovery Time Objective),??不會增加數據處理的固有延遲?;?

具體例子(以第 N 分鐘為例):

  1. ??0:00.000??

    • 用戶A點擊商品X → Kafka 生產事件
    • Flink 立即消費并處理,PV計數器+1 → 實時寫入 Doris

  2. ??0:00.500??

    • CheckpointCoordinator 觸發新一輪 Checkpoint
    • Source 算子注入 Barrier 到數據流(特殊標記,不影響正常數據處理)

  3. ??0:00.501-0:02.000??

    • Barrier 隨數據流向下游傳播
    • PV統計算子 ??邊處理新事件?? 邊接收 Barrier:
    • Doris 持續收到?PV=100 → 101 → 102...?的寫入請求
  4. ??0:03.000??

    • 所有算子完成狀態快照(耗時約2秒)
    • 快照存儲到 HDFS(異步執行,不阻塞主線程)
  5. ??0:06.000??

    • Checkpoint 確認完成,JM 記錄元數據;

正常情況:

  • 用戶點擊后 ??500ms?? 內即可在 Doris 查詢到最新 PV(實際延遲僅網絡+計算耗時)
  • Checkpoint 過程持續 ??6秒??,但期間 Doris 收到 ??60次?? 數據寫入(每秒10次);

故障要恢復情況:

假設在 ??0:50?? 發生故障:

  • 從最近 Checkpoint(0:00 開始,0:06 完成)恢復
  • 狀態回滾到 PV=100(Checkpoint 時的快照值)
  • ??但 Doris 實際已寫入 PV=150??
  • Flink 通過事務機制保證最終 PV=150 + 恢復后新數據 的精確一次語義

這時候聰明的你又發現:

Doris 實際已寫入 PV=150??,相當于以及寫入到下游的doris,是怎么讓數據回滾的???

原因:Flink 在故障恢復時保證 Doris 已寫入的 PV=150 數據不會導致重復計算,核心是通過??兩階段提交(2PC)機制??與??事務性寫入??實現的,所以可以回滾數據

Checkpoint 與事務的階段性控制?:

Flink 的 Checkpoint 過程與 Sink 的事務提交嚴格綁定,整個過程分為 ??預提交(pre-commit)?? 和 ??正式提交(commit)?? 兩個階段

  1. ??預提交階段??(Checkpoint 進行中)

    • Flink Sink 將計算結果(如 PV=100→150 的增量)寫入 Doris 的??臨時存儲位置??(如臨時表或事務日志),但??未對外可見??。
    • 此時 Doris 的 PV=150 ??僅處于預提交狀態??,未實際生效。
  2. ??正式提交階段??(Checkpoint 確認完成)

    • 當 JobManager 收到所有算子的 Checkpoint 完成確認后,才會通知 Sink ??提交事務??。
    • Doris 將臨時數據??原子性替換為正式數據??(如重命名臨時文件或更新可見標志)

故障恢復時的回滾邏輯??

假設故障發生在 ??0:50??(Checkpoint 未完成):

  1. ??未完成的 Checkpoint 事務回滾??

    • Flink 從最近成功的 Checkpoint(PV=100)恢復狀態。
    • 同時,Doris 中處于預提交狀態的 PV=150 ??會被自動清理??(如刪除臨時表或撤銷事務日志)。
  2. ??數據重放與冪等性保障??

    • Flink 會從 Source 端(如 Kafka)??重放 Checkpoint 后的數據??(0:06→0:50 的數據)。
    • Doris Sink 在寫入時通過??事務 ID 或唯一鍵??實現冪等性,確保相同數據多次寫入不會重復累加;

疑問:針對數據回滾的場景,doris能查詢到 PV=150的數據嗎

Doris 默認的隔離級別保證查詢只能看到已提交的數據,所以查看不到PV=150的數據

其他調優手段:

1、開啟 MiniBatch 聚合

table.exec.mini-batch.enabled = true
table.exec.mini-batch.size = 5000

2、配置 Doris 批量寫入

sink.batch.size = 5000
sink.max-retries = 5 --最大可重試5次

3、異步 Compaction 調優

ALTER TABLE doris_table SET ("compaction_policy" = "time_series");

?

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/77098.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/77098.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/77098.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

ElementNotInteractableException原因及解決辦法

在自動化測試中,ElementNotInteractableException是一個常見的異常,它通常發生在嘗試與網頁上的某個元素進行交互(例如點擊、輸入等操作)時,但由于該元素當前不可交互。這可能由多種原因引起,以下是一些常見的原因及其解決方法: 元素未完全加載 如果嘗試與頁面上的元素交…

如何從 GitHub 鏡像倉庫到極狐GitLab?

最近 GitHub 封禁中國用戶的事情鬧得沸沸揚揚,雖然官方發布的報道說中國用戶被限制登錄是因為配置錯誤導致,已經撤回了更新,中國用戶已經可以正常使用。但是這就像橫在國內開發者和企業頭上的“達摩克利斯之劍”。為了避免 GitHub 不可用而帶來的影響,國內開發者和企業可以…

服務器安裝nacos

1.下載依賴 docker pull nacos/nacos-server:v2.4.3安裝 docker run -d --name nacos-server -p 8848:8848 -e MODEstandalone nacos/nacos-server:v2.4.3把nacos中的data 文件和conf 文件copy到自己服務的文件夾 docker cp nacos-server:/home/nacos/data /home/admin1/…

Matter協議暗戰:蘋果、谷歌、亞馬遜的智能家居霸權爭奪

原文地址&#xff1a;Matter協議暗戰&#xff1a;蘋果、谷歌、亞馬遜的智能家居霸權爭奪 一、Matter 協議&#xff1a;巨頭聯手打造的 “智能家居聯合國” 1.1 從 CHIP 到 Matter&#xff1a;標準統一的十年長跑 智能家居發展多年&#xff0c;卻始終被 “孤島效應” 困擾。各…

軟件設計師2009-2022歷年真題與答案解析(附pdf下載)

軟考在即&#xff0c;現在給大家分享一下軟件設計師2009-2022真題與答案解析 pdf全套&#xff0c;文末提供大家免費下載&#xff0c;大家都知道在軟考備考過程中&#xff0c;擁有一套全面且實用的考試資料對于考生來說至關重要。目錄如下&#xff1a; 歷年真題及詳解2004-2019 …

基于EasyX庫開發的球球大作戰游戲

目錄 球球大作戰 一、開發環境 二、流程圖預覽 三、代碼邏輯 1、初始化時間 2、設置開始界面大小 3、設置開始界面 4、讓玩家選擇速度 5、設置玩家小球、人機小球、食物的屬性 6、一次性把圖繪制到界面里 7、進入死循環 8、移動玩家小球 9、移動人機 10、食物刷新…

aslist和list的區別

?Arrays.asList和List的主要區別在于它們的固定長度和不可變性、與原始數組的關系、性能以及使用場景。 一、固定長度和不可變性 ?Arrays.asList?&#xff1a;通過Arrays.asList方法創建的List是一個固定長度的List&#xff0c;其長度與原始數組相同。這意味著你不能通過添…

大模型預標注和自動化標注在OCR標注場景的應用

OCR&#xff0c;即光學字符識別&#xff0c;簡單來說就是利用光學設備去捕獲圖像并識別文字&#xff0c;最終將圖片中的文字轉換為可編輯和可搜索的文本。在數字化時代&#xff0c;OCR&#xff08;光學字符識別&#xff09;技術作為處理圖像中文字信息的關鍵手段&#xff0c;其…

stm32工程,拷貝到另一臺電腦編譯,錯誤提示頭文件找不到cannot open source input file “core_cm4.h”

提示 cannot open source input file “core_cm4.h” ,找不到 [ core_cm4.h ] 這個頭文件 . 于是我在原電腦工程文件里找也沒有找到這個頭文件 接下來查看原電腦keil的頭文件引入配置,發現只引入了工程文件下的頭文件, 那么core_cm4.h到底哪里來的? (到現在我也不清楚怎…

STM32 模塊化開發指南 · 第 2 篇 如何編寫高復用的外設驅動模塊(以 UART 為例)

本文是《STM32 模塊化開發實戰指南》的第 2 篇,聚焦于“串口驅動模塊的設計與封裝”。我們將從一個最基礎的裸機 UART 初始化開始,逐步實現:中斷支持、環形緩沖收發、模塊接口抽象與測試策略,構建一個可移植、可擴展、可復用的 UART 驅動模塊。 一、模塊化 UART 的設計目標…

【NLP 59、大模型應用 —— 字節對編碼 bpe 算法】

目錄 一、詞表的構造問題 二、bpe(byte pair encoding) 壓縮算法 算法步驟 示例&#xff1a; 步驟 1&#xff1a;初始化符號表和頻率統計 步驟 2&#xff1a;統計相鄰符號對的頻率 步驟 3&#xff1a;合并最高頻的符號對 步驟 4&#xff1a;重復合并直至終止條件 三、bpe在NLP中…

TMS320F28P550SJ9學習筆記15:Lin通信SCI模式結構體寄存器

今日初步認識與配置使用Lin通信SCI模式&#xff0c;用結構體寄存器的方式編程 文章提供完整工程下載、測試效果圖 我的單片機平臺是這個&#xff1a; LIN通信引腳&#xff1a; LIN通信PIE中斷&#xff1a; 這個 PIE Vector Table 表在手冊111頁&#xff1a; 這是提到LINa的PI…

linux-設置每次ssh登錄服務器的時候提醒多久需要修改密碼

在 Linux 系統中,你可以通過設置 motd(Message of the Day)或 sshd 配置來在用戶通過 SSH 登錄時提醒他們密碼即將過期。以下是具體步驟: 方法 1: 使用 motd 文件 motd 文件在用戶登錄時顯示,你可以通過腳本動態生成內容,提醒用戶密碼過期時間。 編輯 /etc/motd 文件:…

matlab求和∑函數方程編程?

matlab求和∑函數方程編程&#xff1f; 一 題目&#xff1a;求下列函數方程式的和 二&#xff1a;代碼如下&#xff1a; >> sum_result 0; % 初始化求和變量 for x 1:10 % 設…

electron桌面端開發-打開指定軟件和文件

electron桌面端開發 現在越來越多的軟件開發已經趨向于簡單化&#xff0c;桌面端開發已經不在依賴之前的java、c等主流技術&#xff0c;目前基于node的開發越來越廣泛。功能點也越來越多元化。 文章目錄 electron桌面端開發前言一、打開文件的方式&#xff1f;二、exec使用步驟…

ShenNiusModularity項目源碼學習(17:ShenNius.Admin.Mvc項目分析-2)

ShenNiusModularity項目的后臺管理主頁面如下圖所示&#xff0c;該頁面為ShenNius.Admin.Mvc項目的Views\Home\Index.cshtml&#xff0c;使用的是layuimini后臺模板&#xff08;參考文獻2&#xff09;&#xff0c;在layuimini的GitHub主頁中提供有不同樣式的頁面模版鏈接&#…

SpringBoot 與 Vue3 實現前后端互聯全解析

在當前的互聯網時代&#xff0c;前后端分離架構已經成為構建高效、可維護且易于擴展應用系統的主流方式。本文將詳細介紹如何利用 SpringBoot 與 Vue3 構建一個前后端分離的項目&#xff0c;展示兩者如何通過 RESTful API 實現無縫通信&#xff0c;讓讀者了解從環境搭建、代碼實…

portainer.io篇

Portainer?是一個輕量級的容器管理工具&#xff0c;支持Docker、Kubernetes、Docker Swarm、ACI和Nomad等多種平臺。它提供了一個直觀的Web界面&#xff0c;使用戶能夠輕松地管理和監控容器&#xff0c;包括創建、啟動、停止、刪除容器&#xff0c;以及查看容器的日志和配置信…

Dockerfile 文件常見命令及其作用

Dockerfile 文件包含一系列命令語句&#xff0c;用于定義 Docker 鏡像的內容、配置和構建過程。以下是一些常見的命令及其作用&#xff1a; FROM&#xff1a;指定基礎鏡像&#xff0c;后續的操作都將基于該鏡像進行。例如&#xff0c;FROM python:3.9-slim-buster 表示使用 Pyt…

Android Studio開發知識:從基礎到進階

引言 Android開發作為移動應用開發的主流方向之一&#xff0c;曾吸引了無數開發者投身其中。然而&#xff0c;隨著市場飽和和技術迭代&#xff0c;當前的Android開發就業形勢并不樂觀&#xff0c;競爭日益激烈。盡管如此&#xff0c;掌握扎實的開發技能仍然是脫穎而出的關鍵。本…