基于flink的實時數倉解決方案
- 1 背景
- 2 業務模型
- 1 業務框架
- 2 難點痛點
- 3技術選型
- 1 計算引擎
- 2 中間存儲
- 3 查詢引擎
- 4 flink計算架構設計
- 1 純實時架構
- 2 純實時+定期補充離線數據
- 3 純實時+定期刷新過期binlog
- 4 lamdba + 分字段更新 + 歷史過期數據刷新
- 5 痛點解決
- delta join
- merge-engine
- hologres分字段更新
這里主要是做一個記錄畢竟工作了這么多年,想要總結一下,所以就慢慢寫,慢慢更新了,想到哪里寫哪里吧。
1 背景
其實背景很簡單,就是要做實時數倉,總的原因是提高數據的可用性與時效性,所以單純的批處理已經無法滿足業務需求了,甚至有些業務從動作發生,到業務處理,到數倉處理,到展示可能 要求整體流程在5s以內,所以就有了這么一個項目的誕生。
2 業務模型
1 業務框架
汽車領域的業務其實都大差不差,包含生產,銷售,交付等等,所以在博主這里,主要分為以下數據域:線索,銷售,交付,售后 四大部分。
其中有一些數據域的融合,例如 線索到銷售的漏斗轉化,線索到客服之類的融合。
2 難點痛點
其實具體的業務模型,對于開發汽車領域的同學來說,沒什么具體作用,而痛點難點,卻是相同的,例如:
- flink超長周期的,秒級運算如何維護,一個客戶21年留下來聯系方式,今天忽然買了一輛汽車,這個轉化率 如何能夠秒級更新?有兩張表,用戶信息表(uid, tag_id, level),和用戶標簽表(tag_id,tag_name),下游想要一個,能夠獲取全部數據的,全量數據做主流推送,去更新寬表,兩邊數據的時間跨度會有好幾年,這時候數據要怎么使用維護?
- 一張300個字段的寬表,怎么保證實時更新每個字段?
3技術選型
1 計算引擎
這里可選的不多,在滿足秒級更新與吞吐量的情況下,這里首選flink
2 中間存儲
中間存儲指的是,實時數倉中間存儲層,flink內部狀態,各種關聯的維度表等等。
維度表:這里指實時維表,一個任務不停寫入數據,另一個任務把他當做維度表來使用。
考察了 MySQL,iceberg,paimon,hologres 幾個存儲:
- 其中MySQL吞吐量和單表不太夠
- iceberg 延遲太高,寫入后可讀要有一定延遲,達不到業務要求,而且hdfs查詢也慢
- paimon與iceberg差不多,但能好很多
- hologres,性能最好,但價格最高
- kafka 可讀性與永久存儲達不到要求
所以綜合比較,最后選擇了hologres來做中間存儲,消費cdc消費binlog可以達到類似使用kafka的模式
3 查詢引擎
這里主要指服務端查詢數據的存儲,這里用過很多存儲,mysql ,doris,iceberg,hologres,最終目前采用的是iceberg+hologres的方案。
- mysql還是那個樣子,對于實時匯總的明細,例如百萬 ,千萬級明細,多維度匯總展示,性能上有欠缺。
- doris 會有熱點問題,而且需要專門的集群維護,另外服務端使用,經常會關聯維度表展示,這部分性能也不是特別好
- iceberg 便宜,量大,慢,除了慢,似乎沒有不好的地方了
- 所以最終采用的是 hologres+iceberg的模式,直接頁面展示的各種匯總值,明細值,使用的都是hologres存儲,在優化好索引后,千萬級別的數據做聚合也可以做到500ms以內,iceberg提供明細下載能力,詳細數據使用iceberg+spark提供給用戶
4 flink計算架構設計
1 純實時架構
這里可以理解為 flink+kafka 這種方式,我們運行過一段時間,但僅限于比較簡單的業務架構,像是復雜一點的,尤其是歷史累計值那種,就會很被動了,例如上面說的,要統計用戶轉化率,這個轉化率周期可能會跨度好幾年,所以就不太合適這種指標的開發,但是用作etl的動作,提供明細數據是非常合適的
2 純實時+定期補充離線數據
這個其實是第一種方案的升級版本,也是flink+kafka,但是會定期的用spark把離線的數據推送一份到kafka里,也就是kafka的topic里時刻,都會有全部的數據,可能會有重復,但不會少數據,所以這樣用flink計算,無論是累計值,還是明細,都可以在flink state里取到。唯一的區別是什么呢,資源消耗會比較大,因為kafka和flink使用的資源會
3 純實時+定期刷新過期binlog
這種則是第二種方式的升級版,使用的是flink+binlog的方式,如果數據量少,可能用的是mysql,數據量大,可能hologres,用flinkcdc的方式,消費binlog數據,例如 如果flink state是7天的生命周期,每天update原始數據庫的 7天以前產生的數據,把etl時間更新成now,這樣 所有now - 7day以前的數據,都是過期的了,每次刷新的數據量就少了,更新的數據也少了
這個方案也使用過大約1年左右,目前發現 一個flink任務中,總的數據量 不超過20G以上,用這種方式都非常方便,甚至可以說又方便又快。
4 lamdba + 分字段更新 + 歷史過期數據刷新
最后一種,也是正在使用的就是,用目前市面最常用的lamdba的方案 ,也就是實時+離線的方式,每次離線任務執行完,通過更新視圖的方式,離線union實時 ,兩張表 共同提供數據,使用業務時間做切分。
另外使用hologres的分字段更新,一個表10個字段,通過主鍵,多個flink任務共同更新一張hologres表,下游可以直接接binlog就可以一行數據,只要主鍵相同,就能接到好幾個表的數據,而且不會過期,永久存儲。這種方式,適用于上文說的 300個字段,多個數據域通過同一個主鍵來將數據打平一張大寬表。
5 痛點解決
delta join
這里主要是flink2.2中預計上線的一個能力,是雙流 無狀態join,需要雙流都是binlog,也就是有原始的永久存儲,也就是state中沒有狀態保留,每次都loop up join表,兩個超級大表互相關聯,flink只需要提供很小的內存就可以執行,并且不會有狀態過期的問題,對于時間跨度超級大的場景最好用
merge-engine
這里主要是把paimon中的一些能力 應用到了flink 對于 hologres上,這里是對sink的 connector的改造
例如數據插入的時候,會默認保留首條,拋棄后續數據,或者保留最大最小等。
hologres分字段更新
這里是hologres的能力 ,通過相同數據,將不同數據寫入到目標表的同一行里,省去了join的動作,一方面是解耦,另一方面解決的狀態過期的問題