目錄
前言
一、StarRocks 數據導入
二、StarRocks 事務寫入原理
三、InLong 實時寫入StarRocks原理
3.1 InLong概述
3.2?基本原理
3.3?詳細流程
3.3.1 任務寫入數據
3.3.2 任務保存檢查點
3.3.3 任務如何確認保存點成功
3.3.4?任務如何初始化
3.4?Exactly Once 保證
3.4.1 數據不重復保證
3.4.2 數據不丟失保證
四、Stream Load事務接口使用
4.1 事務接口優勢
4.1.1 Exactly-once語義
4.1.2 提升導入性能
4.2 事務接口使用限制
4.3?事務接口使用案例
前言
? ?StarRocks 支持通過 Stream Load 方式實時寫入數據,為進一步提升導入效率,從 2.4 版本實現了新的事務接口,本文闡述Stream Load 事務接口實現原理
?官網文章地址:
使用 Stream Load 事務接口導入 | StarRocks
一、StarRocks 數據導入
? ? ?StarRocks豐富的導入方式為業務在報表推送、實時數據分析、數據湖分析等場景提供了助力。目前支持的四種數據導入方式,分別是 Stream Load, Broker Load, Routine Load,Spark Load。此外,為了支持和Flink、Kafka等其他系統之間實現跨系統的兩階段提交(預提交事務、提交事務),提升高并發Stream Load導入場景下的性能,StarRocks 自 2.4 版本起提供 Stream Load 事務接口。
二、StarRocks 事務寫入原理
? ? StarRocks事務寫入基于典型的兩階段提交事務實現,客戶端使用事務主要包含以下幾個接口:
-
/api/transaction/begin:開啟一個新事務。
- /api/transaction/prepare:預提交當前事務,臨時持久化變更。預提交一個事務后,可以繼續提交或者回滾該事務。在這種機制下,如果在事務預提交成功后StarRocks發生了宕機,仍然可以在系統恢復后繼續執行提交。
- /api/transaction/commit:提交當前事務,持久化變更。
- /api/transaction/rollback:回滾當前事務,回滾變更。
- /api/transaction/load:發送數據,可以使用已有的事務,如果沒有指定事務label,會隨機生成一個label進行數據寫入。
? ? ?ps:事務去重:復用StarRocks現有的label標簽機制,通過標簽綁定事務,實現事務的“至多一次(At-Most-Once)”語義。
? 不同階段對應的StarRocks內部流程如下:
- begin + load 階段
? ?開始數據導入時,客戶端通過begin transaction接口開啟一個新的事務,提交給FE leader中的事務管理模塊,事務管理模塊充當了兩階段提交中的事務管理者,用來管理事務的原子性、事務的回滾等。每一個事務可以設置一個label,StarRocks FE會檢查本次begin transaction 請求的label是否已經存在,如果label在系統中不存在,則會為當前label開啟一個新的事務。begin階段之后可以使用該label對StarRocks進行Stream Load導入,Stream Load返回成功的條件是數據的副本數量超過了tablet數據分片的副本數的一半,剩下的一本由StarRocks的副本機制保證完整寫入。
-
Commit 階段
? ?FE接受commit信息之后,會將事務狀態改成commited。之后事務管理器會向BE節點發送publish version信息,BE收到publish中的版本信息后,會將本地的消息版本改成本次事務對應的版本;同時會向FE上報,表示數據版本已經成功修改,之后FE會將事務狀態改成VISIBLE。此時數據對用戶可見,客戶端執行查詢的時候,會比較版本號,從而解決讀寫版本沖突;
-
Rollback 階段
? ?如果寫入過程或者commit過程失敗,則事務abort,清理事務的任務在BE節點異步執行,將數據導入過程中生成的批次數據標記為不可用,這些數據之后會從BE上被刪除。
? ?總結:
- StarRocks可以通過給數據設置版本控制(rowset version)來解決讀寫沖突。
- StarRocks通過引入FE中的事務管理實現了兩階段導入,保證了導入的原子性。
三、InLong 實時寫入StarRocks原理
3.1 InLong概述
? ? Apache InLong(應龍)?提供自動、安全、可靠和高性能的數據傳輸能力,方便業務快速構建基于流式的數據分析、建模和應用。該模塊闡述 InLong基于事務接口,實現數據實時寫入 StarRocks的技術原理,主要對寫入過程中的精準一次性保證進行闡述。
3.2?基本原理
? ?InLong實時寫入StarRocks如下圖所示,實時寫入通過 Flink實時任務來實現,Flink任務寫入側的具體執行邏輯如下:
- 根據Flink并行度配置生成多個Task執行寫入;
- 每一個Task基于StarRocks提供的Stream Load機制進行寫入,每一個Flink checkpoint周期會使用相同的StarRocks事務label;
- Flink開始做checkpoint時,當前寫入的table以及對應的StarRocks事務label會一并存入到state狀態中;
- Flink寫入算子收到checkpoint完成的消息時,將所有的table對應的事務進行commit,此時數據才會對用戶可見;
3.3?詳細流程
3.3.1 任務寫入數據
? 在寫入數據時,首先不會直接將數據寫入到StarRocks中,而是將每個table對應的數據進行緩存。當批次數據達到一定大小之后才會調用一次刷新flush操作,flush操作包括以下流程:
- 啟動一個事務,每一個Flink checkpoint周期會使用相同的StarRocks事務label,調用/api/transaction/begin
- 使用該label進行數據寫入,調用 /api/transaction/load 實際寫入數據
? 這種寫入流程保證了:
- 每次寫入相同的事務label,提交時可以提交一整個checkpoint周期的所有的數據,單個checkpointh只會提交一次,重復提交StarRocks不會生效。
- 每次寫入都是批次寫入,緩解StarRocks寫入壓力。(內存攢批+flush)
3.3.2 任務保存檢查點
? ?任務保存檢查點的時候會進行以下流程:
- 對目前內存中保存的所有表數據都進行flush,確保內存中所有的數據已經導入到StarRocks,當前數據在StarRocks中不可見
- 對所有的表對應的導入事務,進行prepare調用(預提交事務) ,如果prepare失敗,則表示當前StarRocks不支持該事務的提交,調用abort接口,并失敗重試
- 對于prepare成功的事務,保存在當前flink狀態信息中state
3.3.3 任務如何確認保存點成功
? 當Flink Task收到checkpoint檢查點已經完成的確認信息后,對checkpoint過程中保存的事務信息進行commit,如果commit失敗,則重啟任務。commit成功的事務會在checkpoint中刪除。
3.3.4?任務如何初始化
? ? 當任務啟動時,Task拿到上一個保存點的狀態信息,恢復版本暫時未commit的事務信息,對checkpoint id小于等于當前checkpoint id的事務進行提交。
3.4?Exactly Once 保證
? 要保證流式寫入的 Exactly once語義等同于:需要保證數據的不重復以及不丟失。
? Exactly once語義的實現需要合理的定義checkpoint間隔,優點是在各種異常情況下保障數據不丟失不重復,缺點是數據可見時間取決于checkpoint間隔(flink將所有的table對應的事務進行commit,此時數據才會對用戶可見)
3.4.1 數據不重復保證
? ?基于Flink的流式任務產生數據重復的原因主要是Flink從某一個checkpoint啟動時,重復提交之前已經提交過的數據。InLong實時寫入中,狀態中會記錄本checkpoint下prepare成功的事務id,故障恢復時,會提交該事務id,如果該事務id在之前的流程中被提交過,StarRocks會返回報錯信息表示該事務id已經提交過,該次提交會被忽略,通過這種機制保證了數據的不重復。
3.4.2 數據不丟失保證
? 假設在數據寫入過程中,有部分數據寫入失敗,Flink checkpoint機制會保證任務重啟后從上一個保存點啟動,Source端會從上次保存消費位置開始消費,這樣能夠保證數據的不丟失,之前寫入失敗的數據會在重啟后繼續執行寫入。
四、Stream Load事務接口使用
4.1 事務接口優勢
4.1.1 Exactly-once語義
- 通過“預提交事務”,“提交事務”,方便實現跨系統的兩階段提交。例如配合在Flink實現“精確一次(Exactly-once)”語義。
4.1.2 提升導入性能
? 在通過程序提交Stream Load作業的場景中,Stream Load事務接口允許在一個導入作業中按需合并發送多次小批量的數據后“提交事務”,從而能減少數據導入的版本,提升導入性能。
4.2 事務接口使用限制
? ?事務接口當前具有如下使用限制:
-
只支持單庫單表事務,未來將會支持跨庫多表事務。
-
只支持單客戶端并發數據寫入,未來將會支持多客戶端并發數據寫入。
-
支持在單個事務中多次調用數據寫入接口?
/api/transaction/load
?來寫入數據,但是要求所有?/api/transaction/load
?接口中的參數設置必須保持一致。 -
導入CSV格式的數據時,需要確保每行數據結尾都有行分隔符。
4.3?事務接口使用案例
? 具體使用案例見官網:
使用 Stream Load 事務接口導入 | StarRocks
參考文章:
Apache InLong 實時同步數據到 StarRocks 原理與實踐