前言
Apache Kafka 和 Apache Flink 的結合,為構建實時流處理應用提供了一套強大的解決方案[1]。Kafka 作為高吞吐量、低延遲的分布式消息隊列,負責數據的采集、緩沖和分發;而 Flink 則是功能強大的流處理引擎,負責對數據進行實時計算和分析。兩者相輔相成,優勢互補,共同構成了實時流處理應用的堅實基礎。
其中 Flink Kafka Source 成為了連接 Kafka 與 Flink 的橋梁, 為 Apache Flink 提供了從 Apache Kafka 讀取數據流的功能。它作為 Flink 數據輸入的起點,負責高效、可靠地將 Kafka Topic 中的消息數據接入 Flink 流處理程序,為后續的實時計算、分析和處理提供數據基礎。
值得一提的是,AutoMQ 作為 Apache Kafka 的社區分叉項目,對其存儲層進行了重新設計與實現,但是完整保留了 Apache Kafka 計算層的代碼。對于 Apache Kafka 具有 100% 的兼容性。這意味著在 Flink 生態系統中,專為 Kafka 開發的 Flink Kafka Source/Sink 可以與 AutoMQ 完全兼容。
Flink Source 接口重構動機
從 Flink 1.12 開始,基于 new source API(FLIP-27)[2]和 new sink API (FLIP-143)[3]開發的 KafkaSource
和 KafkaSink
是推薦的 Kafka 連接器。 FlinkKafkaConsumer
和 FlinkKafkaProducer
則已被棄用。
在 FLIP-27: Refactor Source Interface 中旨在解決當前 streaming source 接口(SourceFunction)中的諸多問題與缺點,并同時統一批處理和 streaming APIs 之間的 source 接口。
在 FLIP-27 中,具體闡述 SourceFunction 中存在的問題,總結下來,可以分為如下:
-
批處理和流處理的 Source 實現不一致: Flink 為批處理和流處理提供了不同的 Source 接口,導致代碼重復,維護困難。
-
邏輯耦合: “work discovery”(例如,發現 Kafka 的分區或文件系統的 Split )和實際讀取數據的邏輯在
SourceFunction
接口和DataStream API
中混合在一起,導致實現復雜,例如 Kafka 和 Kinesis 的 Source 實現。 -
缺乏對分區/ Split 的顯式支持: 當前接口沒有明確表示分區或 Split 的概念。這使得難以以獨立于 Source 的方式實現某些功能,例如事件時間對齊、每個分區的 watermark、動態 Split 分配和工作竊取。例如,Kafka 和 Kinesis 消費者都支持每個分區的 watermark,但截至 Flink 1.8.1,只有 Kinesis 消費者支持事件時間對齊(選擇性地從 Split 讀取數據,以確保事件時間均勻地推進)。
-
Checkpoint 鎖的問題:
-
SourceFunction 持有 checkpoint 鎖,導致實現必須確保在鎖下進行元素發送和狀態更新,限制了 Flink 對鎖的優化空間。
-
鎖不是公平鎖,在鎖競爭激烈的情況下,某些線程(例如 checkpoint 線程)可能無法及時獲取鎖。
-
當前的鎖機制也阻礙了基于無鎖 Actor/Mailbox 模型的 operator 實現。
-
-
缺乏統一線程模型: 每個 Source 都需要自己實現復雜的線程模型,導致開發和測試新 Source 變得困難。
重構后的 KafkaSource
核心抽象
Split:Flink 中的可追蹤數據單元
在 Flink 中,記錄分片 (Record Split) 是指一個具有唯一標識符的有序記錄集合,它代表了數據源中的一段連續數據。記錄分片是 Flink 進行并行處理、容錯恢復和狀態管理的基本單元。
分片的定義靈活可變,以 Kafka 為例:
-
分片可以是一個完整的分區。
-
分片也可以是分區內的一部分,例如 offset 100 到 200 的記錄。
同時以 Kafka 為例,來解釋 Split 的特征:
-
有序的記錄集合: 分片中的記錄是有序的,例如按照 Kafka 中的 offset 排序。
-
唯一標識符: 每個分片都有一個唯一的 ID,用于區分不同的分片,例如 Topic-PartitionId。
-
進度可追蹤: Flink 會記錄每個分片的處理進度,以便在發生故障時進行恢復,例如某個分區的消費位點。
Split Enumerator:Flink 數據讀取的指揮官
Flink 中的記錄分片枚舉器 (Split Enumerator) 負責管理和分配數據源中的記錄分片給 Source Reader 讀取數據,它在 Flink 數據讀取過程中扮演著“指揮官”的角色。
主要職責:
-
發現記錄分片 (Split Discovery):
-
定期掃描外部數據源,例如 Kafka、文件系統等,檢測新增的記錄分片。
-
例如,Kafka 的 Split Enumerator 會監聽 topic 的分區變化,并在新增分區時創建新的分片。
-
-
分配記錄分片 (Split Assignment):
-
將發現的記錄分片分配給 Source Reader 進行讀取。
-
協調多個 Source Reader 之間的分片分配,盡量保證負載均衡。
-
監控 Source Reader 的處理進度,動態調整分片分配,例如將部分分片從過載的 Reader 轉移到空閑的 Reader。
-
-
協調 Source Reader:
-
控制 Source Reader 的讀取速度,避免個別 Reader 讀取過快或過慢,影響整體的 watermark 推進和數據處理進度。
-
處理 Source Reader 的故障,例如將故障 Reader 負責的分片重新分配給其他 Reader。
-
Source Reader:Flink 數據讀取的執行者
Source Reader 是 Flink 中真正執行數據讀取操作的組件,它負責從 Split Enumerator 分配的記錄分片中讀取數據,并將數據傳遞給下游算子進行處理。
主要職責:
-
從記錄分片讀取數據:
-
根據 Split Enumerator 分配的記錄分片信息,連接到外部數據源。
-
從指定位置開始讀取數據,例如從 Kafka 的指定 offset 開始消費數據。
-
持續讀取數據,直到分片結束或者收到停止信號。
-
-
事件時間水印處理:
-
從讀取的記錄中提取事件時間信息。
-
根據事件時間生成水印 (Watermark),并將其發送到下游算子,用于處理亂序數據和事件時間窗口。
-
-
數據反序列化:
- 將從外部數據源讀取的原始數據(例如字節流)反序列化成 Flink 內部可以處理的數據結構(例如 DataStream 中的元素)。
-
數據發送:
- 將反序列化后的數據發送給下游算子進行處理。
將 Work Discovery 與 Reading 分離
將 Source 的功能拆分為兩個主要組件:
-
SplitEnumerator( Split 枚舉器):
-
負責發現和分配 Split (splits),例如文件、Kafka 分區等。
-
可以在 JobManager 或 TaskManager 上運行。
-
-
Reader(讀取器):
-
負責從分配的 Split 中讀取實際數據。
-
包含了當前 Source 接口的大部分功能。
-
可以按順序讀取一系列有界 Split ,也可以并行讀取多個(無界) Split 。
-
之前 FlinkKafkaConsumerBase [4]的設計中,集中了 kafka partition 發現邏輯(KafkaPartitionDiscoverer)、數據讀取邏輯(KafkaFetcher)、基于阻塞隊列實現的生產者消費者模型等等。整體設計相對來說代碼復雜,難以維護和擴展。
@Override
public void run(SourceContext<T> sourceContext) throws Exception {// ... (省略部分初始化代碼)// ... (省略部分邏輯)this.kafkaFetcher =createFetcher(// ... (省略部分參數));// ... (省略部分邏輯)// 根據是否開啟分區發現機制,選擇不同的執行路徑if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {// 直接運行數據讀取循環kafkaFetcher.runFetchLoop(); } else {// 運行包含分區發現邏輯的代碼runWithPartitionDiscovery(); }
}
在該思路下就可以分離并設計為:
KafkaSourceEnumerator:
-
發現分區: 定期或一次性地發現 Kafka 主題中的所有分區。
-
初始化分區: 獲取每個分區的起始偏移量和結束偏移量。
-
分配分區: 將分區分配給不同的 Source Reader,并管理分區的分配狀態
KafkaSourceReader 負責從分配的 Kafka 分區中讀取數據,并處理 checkpoint 相關的邏輯。
-
接收并處理 SplitEnumerator 分配的分區
-
處理讀取到的數據
-
處理 checkpoint
將 “Work Discovery” 和數據讀取邏輯分離,提高了代碼的模塊化和可重用性。例如,可以為不同的分區發現策略實現不同的 SplitEnumerator,而無需修改 Reader 的代碼
KafkaSourceEnumerator
SourceCoordinator 啟動
-
當 Flink 作業啟動時,會為每個 Kafka Source 任務創建一個
SourceCoordinator
實例。 -
SourceCoordinator
的start()
方法會被調用,開始執行以下操作:-
如果是第一次啟動(非從 Checkpoint 恢復),則調用
source.createEnumerator()
創建一個KafkaSourceEnumerator
實例。 -
調用
enumerator.start()
啟動KafkaSourceEnumerator
。
-
KafkaSourceEnumerator 啟動
-
KafkaSourceEnumerator
的start()
方法會被調用:-
初始化 Kafka 消費者和 Kafka 管理客戶端。
-
根據配置決定分區發現模式(周期性或單次)。
-
異步調用
discoverAndInitializePartitionSplit()
方法進行初始分區發現。
-
分區發現與初始化
-
discoverAndInitializePartitionSplit()
方法執行以下操作:-
獲取 Kafka 分區變化信息。
-
獲取新增分區的起始和終止偏移量(針對有限制的流)。
-
為每個新增分區創建
KafkaPartitionSplit
對象。 -
將新增分片添加到待分配列表 (
pendingPartitionSplitAssignment
) 中。 -
調用
assignPendingPartitionSplits()
方法分配分片。
-
分片分配
-
assignPendingPartitionSplits()
方法執行以下操作:-
將待分配分片分配給可用的 Source Reader。
-
如果禁用了周期性分區發現,則在初始分片分配完成后,向 Source Reader 發送
NoMoreSplitsEvent
事件。
-
Enumerator-Reader 通信機制
在 Flink 新的 Source 設計中,SplitEnumerator 和 SourceReader 是兩個獨立的組件,分別負責 Split 管理和數據讀取。然而,在實際應用中,這兩個組件之間 often 需要進行通信,例如在 Kafka Source 場景下:
-
KafkaSourceReader 需要請求 KafkaSplitEnumerator 進行 KafkaSourceReader 注冊
-
KafkaSplitEnumerator 需要通知 KafkaSourceReader 有新的 KafkaPartitionSplit 需要讀取。
通用通信機制:
為了滿足 SplitEnumerator 和 SourceReader 之間的通信需求,Flink 引入了一種通用的消息傳遞機制,其核心是 SourceEvent
接口。
-
SourceEvent
: 定義了 SplitEnumerator 和 SourceReader 之間傳遞的消息類型。 -
OperatorEvent
:是在 OperatorCoordinator 和 Operator 之間傳遞消息的接口。
消息傳遞鏈條:
-
OperatorEventGateway: 接收
OperatorEvent
,并添加OperatorID
信息。 -
TaskOperatorEventGateway: 接收來自
OperatorEventGateway
的事件,添加ExecutionAttemptID
信息,并將其轉發給JobMasterOperatorEventGateway
。 -
JobMasterOperatorEventGateway: Task Manager 與 JobManager 之間的 RPC 接口,負責將事件最終發送到 JobManager 上的 OperatorCoordinator。
public interface JobMasterOperatorEventGateway {CompletableFuture<Acknowledge> sendOperatorEventToCoordinator(ExecutionAttemptID task,OperatorID operatorID,SerializedValue<OperatorEvent> event);}
public interface OperatorCoordinator extends CheckpointListener, AutoCloseable {
...void handleEventFromOperator(int subtask, OperatorEvent event) throws Exception;
...
}
對于 SourceCoordinator 來說,handleOperatorEvent 內到處理邏輯如下:
-
RequestSplitEvent
: 請求分配新的 Split ,調用enumerator.handleSplitRequest()
處理。 -
SourceEventWrapper
: 來自 SourceReader 的事件,調用enumerator.handleSourceEvent()
處理。 -
ReaderRegistrationEvent
: Reader 注冊事件,調用handleReaderRegistrationEvent()
處理。 -
其他事件類型: 拋出異常,表示無法識別該事件類型。
(在實際實現當中,OperatorEvent
有時也可以直接傳遞到 SourceReader/SplitEnumerator,而不需要在轉換為SourceEvent
)
對于 SourceOperator 來說,handleOperatorEvent 內到處理邏輯如下:
-
AddSplitEvent
: 新增 Split 事件,表示SplitEnumerator
分配了新的 Split 給該SourceReader
。 -
SourceEventWrapper
: 調用sourceReader.handleSourceEvents()
將事件傳遞給SourceReader
處理。 -
NoMoreSplitsEvent
: 沒有更多 Split 事件,表示SplitEnumerator
已經分配完所有 Split 。
KafkaSourceReader
Reader 接口與線程模型
Flink 新 Source API 中的 SourceReader
接口,它負責從 Source Split 中讀取數據,并與 SplitEnumerator
進行交互。SourceReader
接口代碼如下:
public interface SourceReader<T, SplitT extends SourceSplit>extends AutoCloseable, CheckpointListener {void start();InputStatus pollNext(ReaderOutput<T> output) throws Exception;CompletableFuture<Void> isAvailable();void addSplits(List<SplitT> splits);void notifyNoMoreSplits();default void handleSourceEvents(SourceEvent sourceEvent) {}List<SplitT> snapshotState(long checkpointId);@Overridedefault void notifyCheckpointComplete(long checkpointId) throws Exception {}}
SourceReader 被設計為無鎖的、非阻塞的接口,以支持 Actor/Mailbox/Dispatcher 風格的 operator 實現。所有方法都在同一個線程中調用,因此實現者無需處理并發問題。
-
SourceReader
使用異步的方式讀取數據,并通過isAvailable()
方法通知運行時數據是否可讀。 -
pollNext
可以非阻塞地讀取下一條記錄,并將記錄發送到ReaderOutput
。 返回一個InputStatus
枚舉值,表示讀取狀態,例如MORE_AVAILABLE
(還有更多數據)、END_OF_INPUT
(數據讀取完畢) 等。
高層抽象簡化 Source Reader 實現
-
底層的
SourceReader
接口非常通用,但實現起來比較復雜,尤其是對于像 Kafka 或 Kinesis 這樣需要處理多路復用和并發讀取的 Source 來說。 -
大多數連接器使用的 I/O 庫都是阻塞式的,需要創建額外的 I/O 線程才能實現非阻塞讀取。
因此在此 FP 中提出了一個解決方案:
- 高層抽象: 提供更簡單的接口,允許使用阻塞式調用,并封裝了多路復用和事件時間處理等復雜邏輯。
大多數 Reader 屬于以下類別之一:
-
單 Reader 單 splits: 最簡單的類型,例如讀取單個文件。
-
單 Reader 多 splits: 一個 Reader 可以讀取多個 Split ,例如:
- Sequential Single Split 讀取: 單個 IO 線程依次順序讀取各個 Split,例如文件或數據庫查詢結果。
Sequential Single Split
- 多路復用多 splits 讀取: 單個 IO 線程使用多路復用技術讀取多個 Split ,例如 Kafka、Pulsar、Pravega 等。
Multi-split Multiplexed
- 多線程多 splits 讀取: 使用多個線程并發讀取多個 Split ,例如 Kinesis。
Multi-split Multi-threaded
以上分析,抽象如下接口,開發者可根據實際需求選擇不同的高層 Reader 類型,并通過實現簡單的接口來創建自定義的 Source Reader。
public interface SplitReader<E, SplitT extends SourceSplit> {RecordsWithSplitIds<E> fetch() throws InterruptedException;void handleSplitsChanges(Queue<SplitsChange<SplitT>> splitsChanges);void wakeUp();
}
-
fetch()
: 從 Split 中讀取數據,返回一個RecordsWithSplitIds
對象,包含讀取到的記錄和對應的 Split ID。 -
handleSplitsChanges()
: 處理 Split 的變化,例如新增 Split 或移除 Split。 -
wakeUp()
: 喚醒阻塞的fetch()
操作,例如在有新的 Split 可用時。
public interface RecordEmitter<E, T, SplitStateT> {void emitRecord(E element, SourceOutput<T> output, SplitStateT splitState) throws Exception;
}
emitRecord
: 負責將SplitReader
讀取的原始記錄 (E
) 轉換為最終的記錄類型 (T
)
SourceReaderBase:提供了 SourceReader 的基礎實現,封裝了事件隊列、 Split 狀態管理、SplitFetcher 管理等通用邏輯
Split 分配流程:
-
SplitEnumerator 分配 Split :
SplitEnumerator
發現新的 Split ,并將它們分配給對應的SourceReader
。 -
SourceReader 接收 Split :
SourceReader
收到新的 Split 后,會進行初始化 state,隨后調用SplitFetcherManager
的addSplits()
方法。 -
SplitFetcherManager 獲取或創建 SplitFetcher,將 Splits 添加到 SplitFetcher
-
SplitFetcher 將
AddSplitsTask
添加到任務隊列,喚醒 SplitFetcher 的工作線程 -
AddSplitsTask 通知 SplitReader 處理 SplitsChanges
-
SplitReader 更新被分配的 Split
Source 數據獲取流程:
-
SplitReader 讀取數據:
SplitReader
從 Split 中讀取數據,并將數據封裝成RecordsWithSplitIds
對象返回給SourceReader
。 -
SourceReader 處理數據:
SourceReader
遍歷RecordsWithSplitIds
中的每條記錄,并根據記錄所屬的 Split ID 獲取對應的SplitState
。 -
調用 RecordEmitter 處理記錄:
SourceReader
將記錄和SplitState
傳遞給RecordEmitter
進行處理。 -
RecordEmitter 處理記錄:
-
將原始記錄類型 (
E
) 轉換為最終的記錄類型 (T
)。 -
更新
SplitState
,例如記錄讀取進度等信息。 -
將處理后的記錄加入到
SourceOutput
。
-
Checkpoint 和 Failover 流程
Flink 的容錯機制依賴于 檢查點 (Checkpoint),它會定期生成數據流的快照,包括數據源的讀取位置和算子的狀態信息。當發生故障時,Flink 可以從最近的 Checkpoint 恢復,保證 Exactly-Once 語義。
在 Flink Kafka Source 中,KafkaSourceEnumerator
和 KafkaSourceReader
兩個關鍵組件分別就有自己的 Checkpoint 和 Failover 的流程。如圖所示,Flink Kafka Source 通過 Checkpoint 機制記錄數據源的讀取位置和 Source Reader 的狀態信息,并在 Failover 時利用這些信息進行恢復,保證數據不會丟失或重復處理。
總結
Apache Flink 與消息隊列的結合是構建實時流處理應用的強大方案。本文首先介紹了 Flink 與 Kafka 的集成,并深入探討了 Flink Kafka Source 的重構,以解決原有設計上的不足。
Flink Kafka Source 的重構主要包括:
-
引入 Split Enumerator 和 Source Reader,實現 “Work Discovery” 與 Reading 的分離,提高代碼模塊化和可重用性。
-
通過 Source Event 機制實現 Enumerator 和 Reader 之間的異步通信,提高代碼可維護性。
-
提供 SplitReader 和 RecordEmitter 等高層抽象,提供 SourceReaderBase 的實現,使得 Kafka Source 可以只需專注于 SplitReader 和 RecordEmitter 的實現。
重構后的 Flink Kafka Source 通過 Checkpoint 機制記錄數據源讀取位置和 Source Reader 狀態信息,保證 Exactly-Once 語義。
然而,傳統的 Shared Nothing 架構消息隊列(如 Kafka)在面對海量數據和高并發場景時,存在存儲成本高、運維復雜、擴縮容困難等挑戰。
AutoMQ 作為新一代云原生消息隊列,采用 Shared Storage 架構和基于對象存儲的低成本存儲,并與 Kafka 100% 兼容。未來,AutoMQ 與 Flink 的結合將為云原生實時流處理應用帶來以下優勢:
-
更低的成本: 尤其在處理冷數據時,成本優勢更加明顯。
-
更高的彈性: 支持集群自動擴縮容和流量自平衡,靈活應對業務變化,保證系統穩定運行。
-
更簡化的運維: Shared Storage 架構簡化了集群部署和運維。
-
與 Kafka 生態的無縫銜接: 方便企業平滑遷移。
AutoMQ 與 Flink 的結合將成為未來云原生實時流處理應用的重要發展方向,為企業提供更低成本、更高效、更便捷的流處理解決方案。
[1]: Apache Kafka (including Kafka Streams) + Apache Flink = Match Made in Heaven
[2]: https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
[3]: https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
[4]: https://github.com/apache/flink/blob/b1e7b892cc9241f568150135b8bcf7bcd9f0c125/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L757-L830