Flink SQL 性能優化實戰

最近我們組在大規模上線Flink SQL作業。首先,在進行跑批量初始化完歷史數據后,剩下的就是消費Kafka歷史數據進行追數了。但是發現某些作業的追數過程十分緩慢,要運行一晚上甚至三四天才能追上最新數據。由于是實時數倉指標計算上線初期,經常驗證作業如果有問題就得重蹈覆轍重新追數,效率很低,于是我開始分析Flink SQL的優化。

問題


insert into tableB
select a, max(b), max(c), sum(d) ...
from tableA
group by a

上面這個作業的簡化版SQL,主要就是做一個分組聚合:

  1. 從tableA分組聚合出結果插入tableB
  2. tableA的聯合主鍵是:a,b(但是a的離散度已經很高了)
  3. tableA的Flink表類型為upset-kafka
  4. tableB的Flink表類型為HBase

初步分析


這個作業跑在集群上的job graph如下:

可以看到有三個vertex:

  1. 第一個是TableSourceScan
  2. 第二個是ChangelogNormalize
  3. 第三個是GroupAggregate

TableSourceScan接入tableA表的upsert-kafka流;

ChangelogNormalize對upset-kafka進行撤回語義的解析;

GroupAggregate對撤回流進行分組聚合,然后寫入tableB的HBase;

優化思路1:local/global agg


agg分類:

  • group agg
select count(a) from t group by b
  • over agg
select count(a) over (partition by b order by c) from t
  • window agg
select count(a) from t group by tumble(ts, interval '10' seconds), b

local/global agg:

核心思想與hadoop的combiner是一致的,就是在mapreduce的過程中,在map階段就做一個預聚合,即combine操作。

[圖片上傳失敗…(image-c0ad24-1650075387085)]

帶來的收益是:減少網絡shuffle數據,提升計算引擎的性能。

前提條件:

  1. agg的所有agg function都是mergeable(實現merge方法)
  2. table.optimizer.agg-phase-strategy為AUTO或TWO_PHASE
  3. Stream下,minibatch開啟;Batch下,AUTO會根據cost選擇

解釋說明:

mergeable其實就是能用分治法解決的計算問題,例如sum、count等,而avg就不能用分治法先計算部分元素的avg,再計算最終avg了,結果有時候會出錯。

table.optimizer.agg-phase-strategy:默認為AUTO,意思是引擎盡量做預聚合;TWO_PHASE表示所有聚合操作都做預聚合;ONE_PHASE表示所有聚合都不做預聚合。

minibatch:即開啟微批模式。主要有三個參數:

table.exec.mini-batch.enabled:是否開啟,默認不開啟
table.exec.mini-batch.size:微批的record buffer大小
table.exec.mini-batch.allow-latency:微批的time buffer大小

minibatch的本質就是平衡實時性和吞吐量的刻度尺。

所以,local/global agg一共需要三個參數控制。

驗證


經過對比驗證,在這個SQL場景下的效率提升很小。

local/global agg降低了第二個vertex即ChangelogNormalize的sent records的數據量,而并沒有使得第一個vertex的數據處理效率有顯著提升。

所以,這個作業的瓶頸并不在vertex間, 而在于第一個vertex的處理數據效率。

優化思路二:調大并行度


這個思路的關鍵在于source upsert-kafka的分區數,這是制約吞吐量的瓶頸。因為在upsert-kafka中,每個partition最多被一個Flink線程讀取。

增加了10倍的并行度,source分區也增加10倍后,作業周轉時間縮短了將近一半。

優化思路三:RocksDB性能調優


仔細分析這個SQL作業,是對一個聯合主鍵的字段做group by,那么state一定會非常大。

經過在對這個表在數倉中的數據進行分析,發現這個字段的離散度幾乎接近于主鍵的離散度。

而進行group by必然要根據每一條upsert kafka的數據去查驗在flink statebackend中物化的source table中該字段值的分布情況,這應該是才是瓶頸所在!

沿著這個思路,開始分析Flink的statebackend機制。

這里我們簡單回顧一下Flink statebackend(后面再做專題總結):

由 Flink 管理的 keyed state 是一種分片的鍵/值存儲,每個 keyed state 的工作副本都保存在負責該鍵的 taskmanager 本地中。另外,Operator state 也保存在機器節點本地。Flink 定期獲取所有狀態的快照,并將這些快照復制到持久化的位置,例如分布式文件系統。

如果發生故障,Flink 可以恢復應用程序的完整狀態并繼續處理,就如同沒有出現過異常。

Flink 管理的狀態存儲在 state backend 中。Flink 有兩種 state backend 的實現 – 一種基于 RocksDB 內嵌 key/value 存儲將其工作狀態保存在磁盤上的,另一種基于堆的 state backend,將其工作狀態保存在 Java 的堆內存中。這種基于堆的 state backend 有兩種類型:FsStateBackend,將其狀態快照持久化到分布式文件系統;MemoryStateBackend,它使用 JobManager 的堆保存狀態快照。

當使用基于堆的 state backend 保存狀態時,訪問和更新涉及在堆上讀寫對象。但是對于保存在 RocksDBStateBackend 中的對象,訪問和更新涉及序列化和反序列化,所以會有更大的開銷。但 RocksDB 的狀態量僅受本地磁盤大小的限制。還要注意,只有 RocksDBStateBackend 能夠進行增量快照,這對于具有大量變化緩慢狀態的應用程序來說是大有裨益的。

所有這些 state backends 都能夠異步執行快照,這意味著它們可以在不妨礙正在進行的流處理的情況下執行快照。

我們的線上一般采用的是RocksDB作為狀態后端,checkpoint dir采用hdfs文件系統。其實我個人覺得這個應該根據作業的特性進行選擇,根據我個人的經驗以及知識沉淀,選擇的主要因素是作業的state大小及對處理數據性能的要求:

  • RocksDBStateBackend可以突破內存的限制,rocksDB的數據邏輯結構和redis相似,但是數據的物理存儲結構又和hbase相似,繼承自levelDB的LSM樹思想,缺點是性能太低
  • 而FsStateBackend是在做snapshot的時候才將內存的state持久化到遠端,速度接近于內存狀態
  • MemoryStateBackend是純內存的,一般只用做調試。

但是由于這個大狀態作業追數速度實在太慢,我甚至想過:

在追數的時候用FsStateBackend,并配置大內存,且把managed memory調成0,同時將ck的周期設置的很大,基本上不做ck,追上后savepoint。再把狀態后端換成RocksDB,并且從FSSatebackend的savepoint處恢復,但是發現1.13才支持savepoint切換statebackend類型。

只剩下調優RocksDB一條路了。根據之前對HBase的LSM原理的理解,進行知識遷移,馬上對RocksDB有了一定的認識。在HBase中調優效果最明顯無乎:

blockcache讀緩存、memStore寫緩存、增加布隆過濾器、提升compact效率

沿著這個思路,再查閱了一番RocksDB資料后,決定先對如下參數進行調優:

  • state.backend.rocksdb.block.cache-size
state.backend.rocksdb.block.blocksize

Block 塊是 RocksDB 保存在磁盤中的 SST 文件的基本單位,它包含了一系列列有序的 Key 和 Value 集合,可以設置固定的大小。

但是,通過增加 Block Size,會顯著增加讀放大(Read Amplification)效應,令讀取數據時,吞吐量下降。原因是 Block Size增加以后,如果 Block Cache 的大小沒有變,就會?大減少 Cache 中可存放的 Block 數。如果 Cache 中還存處理索引和過濾?等內容,那么可放置的數據塊數目就會更少,可能需要更多的磁盤 IO 操作,找到數據就更更慢了,此時讀取性能會大幅下降。反之,如果減小BlockSize,會讓讀的性能有不少提升,但是寫性能會下降,?而且對 SSD 壽命也不利。

因此我的調優經驗是,如果需要增加 Block Size 的大小來提升讀寫性能,請務必一并增加 Block Cache Size 的大小,這樣才可以取得比較好的讀寫性能。Block Cache,緩存清除算法?用的是 LRU(Least Recently Used)。

驗證


測試對比后發現,原本半天左右完成的作業只需要一到兩個小時即可追上數據!

感悟


性能調優就如同把脈治病,關鍵在于對癥下藥。

前期,要分析當前場景下真正制約性能的瓶頸所在,后期,在癥結處用效果最明顯的方式處理癥結。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/88988.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/88988.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/88988.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

HTML 樹結構(DOM)深入講解教程

一、HTML 樹結構的核心概念 1.1 DOM(文檔對象模型)的定義 DOM(Document Object Model)是 W3C 制定的標準接口,允許程序或腳本(如 JavaScript)動態訪問和更新 HTML/XML 文檔的內容、結構和樣式。…

用鼠標點擊終端窗口的時候出現:0;61;50M0;61;50M0;62;50M0

在做aws webrtc viewer拉流壓測的過程中,我本地打開了多個終端,用于連接EC2實例: 一個終端用于啟動 ‘并發master腳本’、監控master端的cpu、mem;一個終端用于監控master端的帶寬情況;一個終端用于監控viewer端的cpu、…

C++-linux 5.gdb調試工具

GDB調試工具 在C/C開發中,程序運行時的錯誤往往比編譯錯誤更難定位。GDB(GNU Debugger)是Linux環境下最強大的程序調試工具,能夠幫助開發者追蹤程序執行流程、查看變量狀態、定位內存錯誤等。本章將從基礎到進階,全面講…

Update~Read PLC for Chart ~ Log By Shift To be... Alarm AI Machine Learning

上圖~ 持續迭代 1、增加報警彈窗,具體到哪個值,雙邊規格具體是多少 2、實時顯示當前值的統計特征,Max Min AVG ... import tkinter as tk from tkinter import simpledialog import time import threading import queue import logging from datetime import datet…

es的自定義詞典和停用詞

在 Elasticsearch 中,自定義詞典是優化分詞效果的核心手段,尤其適用于中文或專業領域的文本處理。以下是關于 ES 自定義詞典的完整指南: 為什么需要自定義詞典? 默認分詞不足: ES 自帶的分詞器(如 Standard…

微算法科技技術突破:用于前饋神經網絡的量子算法技術助力神經網絡變革

隨著量子計算和機器學習的迅猛發展,企業界正逐步邁向融合這兩大領域的新時代。在這一背景下,微算法科技(NASDAQ:MLGO)成功研發出一套用于前饋神經網絡的量子算法,突破了傳統神經網絡在訓練和評估中的性能瓶頸。這一創新…

一文讀懂循環神經網絡(RNN)—語言模型+讀取長序列數據(2)

目錄 讀取長序列數據 為什么需要 “讀取長序列數據”? 讀取長序列數據的核心方法 1. 滑動窗口(Sliding Window) 2. 分段截取(Segmentation) 3. 滾動生成(Rolling Generation) 4. 關鍵信息…

Oracle Virtualbox 虛擬機配置靜態IP

Oracle Virtualbox 虛擬機配置靜態IP VirtualBox的網卡,默認都是第一個不能自定義,后續新建的可以自定義。 新建NAT網卡、host主機模式網卡 依次點擊:管理->工具->網絡管理器新建host主機模式網卡 這個網卡的網段自定義,創建…

Linux RAID1 創建與配置實戰指南(mdadm)

Linux RAID1 創建與配置實戰指南(mdadm)一、RAID1 核心價值與實戰目標RAID1(磁盤鏡像) 通過數據冗余提供高可靠性:當單塊硬盤損壞時,數據不丟失支持快速陣列重建讀寫性能略低于單盤(鏡像寫入開銷…

MySQL數據庫----函數

目錄函數1,字符串函數2,數值函數3,日期函數4,流程函數函數 1,字符串函數 MySQL中內置了很多字符串函數 2,數值函數 3,日期函數 4,流程函數

1.2 vue2(組合式API)的語法結構以及外部暴露

vue2 vue3中可以寫vue2的語法&#xff0c;vue2的結構像一個花盆里的根&#xff08;根組件App.vue&#xff09;&#xff0c;根上可以插上不同的枝杈和花朵&#xff08;組件&#xff09;。 組件的結構&#xff1a; // 這里寫邏輯行為 <script lang"ts"> export d…

Swift 解 LeetCode 324:一步步實現擺動排序 II,掌握數組重排的節奏感

文章目錄摘要描述題解答案題解代碼&#xff08;Swift&#xff09;題解代碼分析步驟一&#xff1a;排序數組步驟二&#xff1a;左右指針分段步驟三&#xff1a;按位置交錯插入示例測試及結果示例 1示例 2示例 3&#xff08;邊界情況&#xff09;時間復雜度分析空間復雜度分析總結…

使用SQLMAP的文章管理系統CMS的sql注入滲透測試

SQLMAP注入演示&#xff1a;抓包拿到Cookie:召喚sqlmap&#xff1a;sqlmap -u "http://192.168.1.99:8085/show.php?id34" --cookie "pma_langzh_CN; kbqug_admin_username2621-PL_LxhFjyVe43ZuQvht6MI5q0ZcpRVV5FI0pzQ6XR8; kbqug_siteid2621-PL_LxhFjyVe4yA5…

I3C通信協議核心詳解

一、物理層與電氣特性雙線結構 SCL&#xff08;串行時鐘線&#xff09;&#xff1a;主設備控制&#xff0c;支持 推挽&#xff08;Push-Pull&#xff09;輸出&#xff08;高速模式&#xff09;和 開漏&#xff08;Open-Drain&#xff09;&#xff08;兼容I2C模式&#xff09;。…

Docker搭建Redis哨兵集群

Redis提供了哨兵機制實現主從集群下的故障轉移&#xff0c;其中包含了對主從服務的檢測、自動故障恢復和通知。 1.環境 centos7、redis6.2.4、MobaXterm 目的&#xff1a; 搭建redis的主從同步哨兵集群&#xff08;一主一從三哨兵&#xff09; 2.步驟 1.主從集群的搭建 主從…

暑假Python基礎整理 --異常處理及程序調試

異常概念 在程序運行過程中&#xff0c;經常會遇到各種各樣的錯誤&#xff0c;這些錯誤統稱為“異常”。如下表是Python常見的異常與描述&#xff1a; 異常描述NameError嘗試訪問一個未聲明的變量引發錯誤IndexError索引超出序列范圍引發錯誤IndentationError縮進錯誤ValueErr…

k8s-高級調度(二)

目錄 Taint(污點)與Toleration(容忍) Taint&#xff08;污點&#xff09;&#xff1a;節點的排斥標記 Toleration&#xff08;容忍&#xff09;&#xff1a;Pod的適配聲明 與節點親和性的對比 警戒(cordon)和轉移(drain) Cordon&#xff1a;節點隔離&#xff08;阻止新 Po…

基于OpenCV的深度學習人臉識別系統開發全攻略(DNN+FaceNet核心技術選型)

核心技術選型表 技術組件版本/型號用途OpenCV DNN4.5.5人臉檢測FaceNet (facenet-pytorch)0.5.0人臉特征提取MiniConda最新版Python環境管理PyTorch1.8.0FaceNet運行基礎OpenVINO2021.4模型加速(可選)SSD Caffe模型res10_300x300高精度人臉檢測 一、環境準備與項目搭建 1.1 M…

【AI News | 20250714】每日AI進展

AI Repos 1、All-Model-Chat All Model Chat 是一款為Google Gemini API家族設計的網頁聊天應用&#xff0c;支持多模態輸入&#xff08;圖片、音頻、PDF等&#xff09;和多種模型&#xff08;如Gemini Flash、Imagen&#xff09;。它提供了豐富的自定義功能&#xff0c;包括高…

C 語言(二)

主要包括變量與常量、數據類型、存儲方式、數制轉換以及字符處理等內容一、變量與常量在 C 語言中&#xff0c;變量是用來存儲數據的命名空間&#xff0c;它會在內存中分配地址。例如&#xff1a;int i; i 12345; 其中 i 是變量&#xff0c;12345 是常量。常量表示在程序運行過…