目錄
一、芒果TV實時數倉建設歷程
1.1 階段一:Storm/Flink Java+Spark SQL
1.2 階段二:Flink SQL+Spark SQL
1.3 階段三:Flink SQL+StarRocks
二、自研Flink實時計算調度平臺介紹
2.1?現有痛點
2.2 平臺架構設計
三、Flink SQL實時數倉分層實踐
四、Flink SQL實時數倉生產過程遇到的問題
4.1 多表關聯
4.2?復雜的表處理
4.3?State過大
4.4??Checkpoint 不能順利完成
五、StarRocks選型背景及問題
六、基于Flink SQL+StarRocks實時分析數倉
6.1 明細模型
6.2 主鍵模型
6.3?聚合模型
6.4 物化視圖
七、未來展望
7.1?湖倉一體
7.2?低代碼
? 原文大佬的這篇實時數倉建設案例有借鑒意義,這里摘抄下來用作學習和知識沉淀。
一、芒果TV實時數倉建設歷程
? ? ?芒果TV實時數倉的建設分為三個階段,14-19 年為第一階段,技術選型采用 Storm/Flink Java+Spark SQL。20-22 年上半年為第二階段,技術選型采用 Flink SQL+Spark SQL 。22 年下半年-至今為第三階段,技術選型采用 Flink SQL+ StarRocks。每一次升級都是在原有基礎上進行迭代,以求更全面的功能,更快的速度,能更好的滿足業務方的需求。
1.1 階段一:Storm/Flink Java+Spark SQL
? ? ?芒果 TV 的實時數據處理很早就開始了,最開始用的是 Storm,到了 18 年時,Flink 橫空出世。Flink 的 State狀態與流處理的優勢讓人眼前一亮,所以改用了 Flink 來搭建實時數倉,但當時主要以滿足業務方需求為主,進行煙囪式的開發,基本流程是接上游kafka的數據,使用flink java進行相關業務邏輯處理后,將數據輸出至對象存儲中。然后使用spark sql對數據進行統計等二次加工處理后,再交付客戶使用。此階段優點是利用了Flink的長處,讓數據從源頭到終端更實時化了,滿足了業務方對數據的時效性與業務需求,缺點是一個需求就開發一個功能,沒有進行實時數倉的建設和沉淀。
1.2 階段二:Flink SQL+Spark SQL
? ? 基于上一階段的技術積累與發現的問題,提出了建設實時數倉的新方案。此時Flink sql功能已經初步完善,能滿足搭建數倉各方面的需求,SQL 化相較 Flink Java 也能降低開發、維護等各方面成本,于是選擇 Flink SQL 來搭建實時數倉。此階段對實時數倉進行了分層架構設計,這個后面有詳細講解。
? ?基本流程是接上游 Kafka 數據進行格式化后輸出至 Kafka,下層接到 Kafka 數據進行字段處理、垃圾數據過濾等操作后輸出至 Kafka,最后一層接 Kafka 數據進行維度擴展,然后將數據寫至對象存儲中。再由 Spark SQL 讀取對象存儲中的數據進行統計等處理后,交付客戶使用。
? 此階段的優點是實現了數倉的分層架構設計,對各層數據定義了標準化,實現了各層數據解耦,避免了煙囪式的開發,解決了重復開發等問題,實時數倉逐步走向成熟。缺點是使用Spark? SQL進行后續的統計與匯總時,不夠靈活。需要提前設計好指標,面對客戶多變的需求時,往往不能很及時的響應。
1.3 階段三:Flink SQL+StarRocks
? ? ?隨著實時數倉的建設逐步加深,Spark SQL不夠靈活,處理速度不夠快的弊端越發突出。此時StarRocks進入了我們的視線,其MPP的架構,向量化引擎,多表Join等特性所展現出來在性能、易用性等方面的優勢,都很好的彌補了 Spark SQL 在這塊的不足。于是經調研后決定,在實時數倉中用 StarRocks 替換掉 Spark SQL 。在此階段,前面用 Flink SQL 搭建的實時數倉分層構架并未改變,而下游用 Spark SQL 進行統計分析的相關功能,逐步替換成了用 StarRocks 來做。
? ?之前使用Spark SQL先將數據進行統計與匯總后,將最終結果寫入對象存儲中,而現在是直接用 StarRocks 對明細數據進行匯總,展示到前端頁面中。這么做的好處是能更快、更靈活的滿足業務方的需求,減少了開發工作量,減少了測試、上線等時間。StarRocks 優秀的性能讓即席查詢速度并未變慢,功能更強大,更靈活,交付速度變更快了。
二、自研Flink實時計算調度平臺介紹
2.1?現有痛點
- 原生任務命令復雜,調試麻煩,開發成本比較高;
- 連接器,UDF,Jar任務包等無法管理,調試復雜,經常遇到依賴沖突問題
- 無法做到統一的監控報警以及對資源上的權限管理
- sql開發任務復雜,沒有一個好用的編輯器和代碼管理及保存平臺
- 基礎表、維表、catalog沒有記錄和可視化的平臺
- 多版本和跨云任務無法很好的管理
2.2 平臺架構設計
? 實時Flink調度平臺架構圖:
平臺主要分為三個部分:
(1) Phoenix Web 模塊主要負責面向用戶。
-
集群部署與任務提交。
- 公司各內部業務權限管理
-
UDF,連接器等三方依賴 Jar 包管理。
-
多類型監控報警以及日志管理。
-
SQL 可視化編輯和校驗以及多版本存儲。
-
公司各內部業務權限管理。
(2)?Flink SQL Gateway 和 Flink Jar Gateway 都是基于開源版本修改定制后的服務,支持SQL符合業務場景的解析和校驗,以及Jar任務的提交,支持本地模式。Yarn-per-job 模式和 Application 模式,也支持自動的保存點Savepoint。
-
進行 SQL 的解析和校驗。
-
加載SQL和Jar任務所需要的三方依賴。
-
SQL 任務連接 Catalog 存儲進行關聯和映射。
-
Checkpoint 和 Savepoint 的自動管理和恢復。
-
Jar 類型任務啟動參數的注入。
-
運行時配置的自適應。
-
多類型的提交方式適配。
(3) 混合多云模塊主要負責啟動任務的分發和云之間的信息管理。
三、Flink SQL實時數倉分層實踐
? ?使用Flink SQL 搭建實時數倉時,首要問題是數倉分層架構如何解決,業界內有許多優秀的經驗可以參考,同時也基于我們的情況,最終采用了如下數倉架構:
ODS層:原始日志層,在該層將上游 Binlog 日志、用戶行為日志、外部數據等數據源同步至數倉,對多種數據源,多種格式的數據通過統一UDF函數解析,格式化,最終輸出格式化JSON數據
DW層:數據明細層,在該層主要進行錯誤數據過濾,字段轉義,統一字段名等處理,輸出的數據已能滿足日常基礎分析的使用。
DM層:數據模型層,在該層進行擴維,補充相關的公共信息。再按業務進行分域,輸出的數據
具有更豐富的維度,可以滿足高級分析的數據使用需求。
ST 層:數據應用層,按業務,功能等維度進行匯總,交由給前端頁面進行展現,輸出的數據可交付 Web、App、小程序等功能使用。
四、Flink SQL實時數倉生產過程遇到的問題
? ?在搭建實時數倉時,遇到了不少的問題,下面挑幾個典型的問題講解一下解決思路:
4.1 多表關聯
? ? ?在使用 Flink SQL 搭建實時數倉初期,涉及多表關聯時,有些維表的數據在 Hive 里,有些維表又在 MySQL 中,甚至還有些維表數據在其它 OLAP 中,該選擇何種關聯方式,需要綜合考慮性能 ,功能等方面,總結出如下規則:
- 流表關聯維表(小數據量),使用Lookup Join, 維表數據量在十萬以下時,可使用hive表做維表,因為離線數倉中的維表數據大部分都在 Hive 中,這樣的話就可以直接復用,省去數據導入導出的額外工作,并且性能方面沒有瓶頸,維表小時更新后,Flink SQL 也能讀到最新數據。
- 流表關聯維表(大數據量),使用Lookup Join,維表數據量在十萬-千萬以下時,可用Mysql做維表,此時用 Hive 維表已不能滿足性能需求。可將數據導出至 MySQL 中,利用緩存機制,也能很好的滿足要求。
- 流表關聯流表,使用?Interval Join,通過兩個流表的時間字段來控制關聯范圍,這種關聯方式是目前用的比較多的,使用方式要跟離線比較接近。
4.2?復雜的表處理
4.3?State過大
? ?在兩個流表進行關聯或進行匯總統計時,Flink的機制是會將數據緩存在State中,這就會導致State過大,導致GC頻繁,進而任務失敗。針對這種情況,在研究了 Flink 的內存機制后,得出的解決方案如下:
- 縮短時間范圍,根據業務需求,適當減少關聯時兩條流的時間范圍。
- 調整?Managed Memory 大小,可以調整 Managed Memory 占比,適當的縮小其它內存的使用。
- 設置State的TTL來避免緩存過多的數據
4.4??Checkpoint 不能順利完成
? 任務中頻繁出現?Checkpoint expired before completing異常,在實際生產環境中,發現有任務頻繁的報這個錯誤,這個錯誤指Checkpoint不能順序完成,因為Flink的Checkpoint有Barrier機制來保證數據的Exactly-once 精確一次性語義。如果一批數據處理不完,Checkpoint就完成不了。導致這個錯誤原因有多種,不同的問題也有不同的解答,接下來列舉一下各場景與解決方案:
- Checkpoint 的超時時長設置的太短,導致 Checkpoint 還沒完成就被報了超時,這個問題比較常見。解決方案就是設置長一點,我們一般根據任務類型,會設置 6 秒-2 分鐘不等。
- 任務有被壓,因為一個任務內有多個操作,其中一個操作耗時長影響了整個任務的執行,這個問題比較常見。解決方案是可以從WebUI上找到執行緩慢的Task
-
內存不足,我們在生產環境中一般使用 rocksdb statebackend,默認會保留全量 Checkpoint。而這種情況下,在遇到有關聯、分組統計等使用了 heap statebackend 的任務中,計算的中間結果會緩存到 State中,State的內存默認是總內存的 40%,在這種計算中會不太夠,從而導致頻率的 GC,也影響了 Checkpoint 的執行。解決方案如下:
調大 TaskManager的內存,TaskManager 的內存調大后,其它內存區域也會相應調大。
調大 Managed Memory 的內存占比,就是設置 taskmanager.memory.managed.fraction 這個參數,可根據實際情況來,實際生產中最高可調到 90%。這種方法只調大了 ManagedMemory 一塊,如果內存資源并不是很充裕時,可以用這種方式。
- 改用增量Checkpoint,根據實際情況調整State的TTL時間,并開啟增量Checkpoint,甚至都不用調內存大小,也能解決問題。
五、StarRocks選型背景及問題
? ?在之前的框架中我們是以Flink流式處理引擎完成原始日志的清洗,數據的打寬與輕度聚合,再落地到分布式文件系統或對象存儲,通過離線Spark SQL五分鐘級別的調度批處理,結果會通過Presto等引擎去查詢,這樣的架構在生產環境中漸漸顯露出很多問題:
- 存在重復計算的問題,原始數據會在不同的任務中反復清洗,有的需要多個原始數據的關聯也會反復的清洗,大量浪費了計算資源,代碼和數據流可重用性很差。
-
為了滿足離線批處理歷史累計值和當前 5 分鐘窗口的計算指標,在流量高峰期和當日指標累計到晚上時很可能在 5 分鐘之內無法完成指標的計算,有很大的超時風險,業務會反饋實時指標的延遲。
-
由于離線Spark 批處理在多維組合分析并且又要求實時性情況下,略顯乏力。業務的在線化,催生出很多實時的場景,另一方面運營的精細化和分析的平民化也催生出多維的分析需求,這些場景下需要粒度特別細,維度特別豐富的地層數據,這兩部分的疊加起來就催生出了實時多維分析的場景。這時候我們需要不斷的增加維度組合,增加結果字段,增加計算資源來滿足以上場景,但是還是略顯乏力。
-
在數據時效性日益增加的今天,很多場景下數據的時效性提出了秒級毫秒級的要求,之前5分鐘級別的方式不能滿足業務需求。
-
在之前的實時任務中經常需要在Flink內存中做流和流的Join,由于上游多個數據流的數據到達時間不一致,很難設計合適的window去在計算引擎里面打寬數據,采用Flink Interval Join時多個流的時間間隔太久,狀態數據會非常龐大,啟用mapState之類的狀態計算又過于定制
-
在線上有大型活動或者大型節目時,實時數據量暴增,實時的大批量寫入的情況下,寫入延遲大,寫入效率不高,數據積壓。
-
對于 Flink 清洗或者計算的結果可能需要多個存儲介質中,對于明細數據我們可能會存儲在分布式文件系統或者對象存儲,這時候是 Flink+HDFS,對于業務更新流數據,可能是 Flink CDC+hbase(cassandra或者其他 key-value 數據庫),對于 Flink 產生回撤流數據可能是 Flink+MySQL(redis),對于風控類數據或者傳統的精細化的看版可能是 Flink+ elasticsearch,對于大批量日志數據指標分析可能是Flink+clickhouse,難以統一,資源大量損耗,維護成本同樣高。
? ?總體分析,早期架構以下問題:
- 數據源多樣,維護成本比較高
- 性能不足,寫入延遲大,大促的場景會有數據積壓,交互式查詢體驗較差
- 各個數據源割裂,無法關聯查詢,形成眾多的數據孤島,從開發的角度,每個引擎都需要投入相應的學習開發成本,程序復雜度比較高。
- 實時性要求高,并且開發效率快,代碼或者數據可重復利用性強。
- 實時任務開發沒有同一套標準,各自為戰。
六、基于Flink SQL+StarRocks實時分析數倉
? ? ?基于已經搭建完畢的 Flink SQL 的數倉分層體系,且由 StarRocks2.5X 版本升級到 StarRocks3.0X 存算分離版本并已大規模投入在生產環境中。
? ? 實時和離線湖倉一體的架構圖:
6.1 明細模型
? ? 在大數據生產環境中最常見的日志數據,特點是數據量大,多維度的靈活復雜計算,計算指標多,實時性強,秒級別的高性能查詢,簡單穩定實時流寫入,大表的Join,高基數字符列去重
? ?使用Flink SQL+StarRocks 都能滿足,首先實時平臺上使用Flink SQL快速對實時流日志數據進行清洗,打寬,同時StarRocks提供?Flink-Connector-StarRocks連接器開箱即用,并且支持Exactly-once精準一次性語義和事務支持,底層通過Stream Load低延遲快速導入。
?通過高效簡單地Flink? SQL建表模式,批量百萬級寫入,速度快,同時針對生產環境中單表十億級別以上的數據,在計算多維度用戶訪問次數,和用戶去重數據,能達到秒級別。
6.2 主鍵模型
? ? 對于數倉中的數據變更方式:
- 方式一:某些OLAP數據倉庫數據倉庫提供 Merge on Read模型的更新功能,完成數據變更,例如(clickhouse)。?
? ? ?Merge on Read 模式在寫入時簡單高效,但讀取時會消耗大量的資源在版本合并上,同時由于 merge 算子的存在,使得謂詞無法下推、索引無法使用,嚴重的影響了查詢的性能。? ? ? ? StarRocks 提供了基于 Delete and Insert 模式的主鍵模型,避免了因為版本合并導致的算子無法下推的問題。主鍵模型適合需要對數據進行實時更新的場景,可以更好的解決行級別的更新操作,支撐百萬級別的 TPS,適合MySQL 或其他業務庫同步到StarRocks 的場景。
-
方式二:簡單來說就是創建新分區表,刪除舊的分區表數據,然后批量刷寫過去。
在新的分區中插入修改后的數據,通過分區交換完成數據變更。通過批量刷寫的方式會要重新建表,刪除分區數據,刷寫數據過程繁雜,還可能導致出錯。
? ? 而且通過Flink CDC和StarRocks完美結合可以實現業務庫到OLAP數據倉庫端到端的全量+增量的實時同步,一個任務可以搞定批量和實時的全部問題,并且高效穩定,同時主鍵模型也可以解決Flink中回撤流輸出的問題,支持按條件更新,支持按列更新,這些都是傳統OLAP數據庫很多不兼具的優點。
6.3?聚合模型
? ? ?在實時數倉中還有一種場景,我們不太關心原始的明細數據,多為匯總類查詢,比如 SUM、MAX、MIN 等類型的查詢,舊數據更新不頻繁,只會追加新的數據,這個時候可以考慮使用聚合模型。建表時,支持定義排序鍵和指標列,并為指標列指定聚合函數。當多條數據具有相同的排序鍵時,指標列會進行聚合。在分析統計和匯總數據時,聚合模型能夠減少查詢時所需要處理的數據,提升查詢效率。
? ?針對聚合指標,之前是放在Flink中統計,狀態數據會存在內存中,會導致狀態數據持續增長,斌并且消耗大量資源,將Flink的單純統計修改為Flink SQL + StarRocks聚合模型,Flink這里只需要明細數據進行清洗并導入到 StarRocks,效率非常高且穩定。
? 實際生產環境中,聚合模型主要用來統計用戶觀看時長,點擊量,訂單統計等。
6.4 物化視圖
? ?數據倉庫環境中的應用程序經常基于多個大表執行復雜查詢,通常涉及多表之間數十億行數據的關聯和聚合。要實現這種實時多表關聯并查詢結果的方式,在之前我們可能會把此項內容放在 Flink 實時數倉中去處理,分層處理關聯,合并,統計等任務,最后輸出結果層數據,處理此類查詢通常會大量消耗系統資源和時間,造成極高的查詢成本。
? ? 現在可以考慮使用Flink SQL+StarRocks 的新思路去處理這種大規模的分層計算問題,使得 Flink SQL 這里只需要處理一些簡單清洗任務,把大量重復計算的邏輯下推到StarRocks去執行,多個實時流實時落地 ,在StarRocks可以建立多級物化視圖的建模方式,StarRocks 的物化視圖不僅支持內表和內表關聯,也支持內表和外表關聯。例如:數據分布在MySQL,Hudi,Hive 等都可以通過StarRocks 物化視圖的方式查詢加速,并設定定期刷新規則,從而避免手動調度關聯任務。其中最大的一個特點時,當有新的查詢對已構建了物化視圖的基表進行查詢時,系統自動判斷是否可以復用物化視圖中的預計算結果處理查詢。如果可以復用,系統會直接從相關的物化視圖讀取預計算結果,以避免重復計算消耗系統資源和時間。查詢的頻率越高或查詢語句越復雜,性能增益就會越很明顯。
? ? 實時即未來,StarRocks 在逐漸實現這樣的能力,StarRocks 和 Flink 結合去構建實時數據分析體系的聯合解決方案,將在一定程度上顛覆既有的一些禁錮,形成實時數據分析新范式。
七、未來展望
7.1?湖倉一體
當前芒果 TV 已經實現了流批一體的數倉建設,而未來的重點是湖倉一體的建設。數據湖的特點在于可以存儲各種類型和格式的原始數據,包括結構化數據、半結構化數據和非結構化數據。而數據倉庫則是對數據進行結構化和整理,以滿足特定的業務需求。
? ? ?湖倉一體將數據倉庫和數據湖的特點融合在一起,打造一個統一的數據中心,實現對數據的集中管理。湖倉一體的架構能夠提供更好的安全性、成本效益和開放性,既能夠存儲和管理大量原始數據,又能夠將數據整理成結構化的形式,為分析和查詢提供便利。
? ? ?通過建立湖倉一體,芒果 TV 能夠向公司內部提供更豐富的數據服務,支持業務決策和創新,實現對數據的全面掌控和管理,包括數據的采集、存儲、處理和分析。同時,湖倉一體還能夠支持多種計算引擎和工具的使用,如 Flink、Spark、Hive 等,使得數據處理和分析更加靈活和高效。
7.2?低代碼
? ? ?現在的開發方式是在自研的平臺上寫 SQL 提交任務,這種方式在面對一些清洗場景時,大部分是重復工作,有較大的提升空間。低代碼是時下比較熱門的概念,其在降本增效方面的優勢很大。我們的下一步的計劃是逐步實現低代碼,第一階段是將實時平臺與數據上報平臺進行打通,通過讀取上報平臺里相關元數據,能夠自動生成對應的數據清洗任務,解放生產力,提升工作效率與交付速度。
? ? 低代碼的優勢在于它能夠將開發過程中的重復工作進行自動化和簡化,減少了開發人員的編碼工作量。通過可視化的方式,開發人員可以通過拖拽和配置來完成任務,而無需編寫大量的代碼。這不僅提高了開發效率,還降低了出錯的風險。
? ?總結而言,基于 Flink技術的特點,芒果 TV 在未來的數倉建設中將注重實現湖倉一體的架構,以實現對數據的全面管理和利用。同時,芒果 TV 計劃逐步實現低代碼的開發方式,以提高開發效率和交付速度。
參考文章:
芒果 TV 基于 Flink 的實時數倉建設實踐