文章目錄
- 1. NetworkStack整體架構
- 2. StreamTask內數據流轉過程
NetworkStack提供了高效的網絡I/O和反壓控制
除了各個組件之間進行RPC通信之外,在Flink集群中TaskManager和TaskManager節點之間也會發生數據交換,尤其當用戶提交的作業涉及Task實例運行在不同的TaskManager上時。Task實例之間的數據交換主要借助Flink中的NetworkStack實現。NetworkStack不僅提供了非常高效的網絡I/O,也提供了非常靈活的反壓控制。
?
1. NetworkStack整體架構
通過Netty協議實現的NetworkStack
Flink NetworkStack整體架構在不同的TaskManager之間建立
TCP連接,而TCP連接則主要依賴Netty通信框架實現
。Netty是一個NIO網絡編程框架,可以快速開發高性能、高可靠性的網絡服務器/客戶端程序,能夠極大簡化TCP和UDP等網絡編程。
流程舉例:
TaskManager中會運行多個Task實例,例如在TaskManager 1中運行了Task A-1和Task A-2,在TaskManager 2中運行了Task B-1和Task B-2,Task A中從外部接入數據并處理后,會通過基于Netty構建的TCP連接發送到Task B中繼續進行處理。整個數據傳輸過程主要基于Flink的NetworkStack框架進行。
?
上游數據流轉邏輯:二進制buffer->ResultSubPartition隊列->InputChannel
對于上游的Task A實例來講,經過Operator處理后的數據,最終會通過
RecordWriter組件寫入網絡棧
,即算子輸出的數據并不是直接寫入網絡,而是先將數據元素轉換為二級制Buffer數據,并將Buffer緩存在ResultSubPartition隊列中,接著寫入下游Task對應的InputChannel。在上游的Task中會創建LocalBufferPool為數據元素申請對應Buffer的存儲空間,且上游的Task會創建NettyServer作為網絡連接服務端,并與下游Task內部的NettyClient之間建立網絡連接。
?
?
下游Task數據接收邏輯:InputGate的InputChannel接收->StreamTaskInput取數據并處理(反序列化)->OperatorChain
- 對下游的Task實例來講,會通過InputGate組件接收上游Task發送的數據,在InputGate中包含了多個InputChannel。InputChannel實際上是將Netty中Channel進行封裝,
數量取決于Task的并行度
。- 上游Task的ResultPartition會根據
ChannelSelector選擇需要將數據下發到哪一個InputChannel中,其實現類似Shuffe的數據洗牌操作
。- 在下游的Task實例中可以看出,InputGate中接收到的二進制數據,會轉換為Buffer數據結構并存儲到本地的Buffer隊列中,最后被StreamTaskInput不斷地從隊列中拉取出來并處理。StreamTaskInput會將Buffer數據進行反序列化操作,將Buffer數據轉換為StreamRecord并發送到OperatorChain中繼續處理。
?
2. StreamTask內數據流轉過程
流式作業中OperatorChain轉為StreamTask
在ExecutionGraph調度和執行ExecutionVertex節點的過程中,會將
OperatorChain提交到同一個Task實例
中運行。如果被調度的作業為流式類型,則AbstractInvokable的實現類就為StreamTask。最終StreamTask會被TaskManager中的Task線程觸發執行。
根據數據源不同,StreamTask分為兩種類型:
- 直接從外部源數據讀取數據的SourceStreamTask和SourceReaderStreamTask;
- 支持從網絡或本地獲取數據的OneInputStreamTask和TwoInputStreamTask;
?
以OneInputStreamTask為例,分析從Task層面介紹數據從網絡接入并發送到OperatorChain中進行處理,接著通過Output組建輸出到下游網絡中的過程。
?
OneInputStreamTask包含一個StreamInputProcessor,用于對輸入數據進行處理和輸出。在StreamInputProcessor組件中包含StreamTaskInput、OperatorChain以及DataOutput三個組成部分。
?
task內部數據流轉:StreamTaskNetworkIutput -> StreamTaskNetworkOutput -> OperatorChain中的HeaderOperator -> task實例算子->Output->下游算子...->RecordWriter->網絡
。詳細過程如下:
- StreamTaskInput從Task外部獲取數據。
根據不同的數據來源,StreamTaskInput的實現主要分為從網絡獲取數據的StreamTaskNetworkInput和從外部系統獲取數據的StreamTaskSourceInput。
- DataOutput負責將StreamTaskInput接收的數據
發送到當前Task實例的OperatorChain的HeadOperator
中進行處理。DataOutput主要有StreamTaskNetworkOutput(用于處理StreamTaskNetworkInput接收的數據)和StreamTaskSourceOutput(用于處理StreamTaskSourceInput接收的數據)兩種實現。
- HeaderOperator接收數據,算子開始接收數據并進行處理
OperatorChain負責
將能夠運行在同一個Task實例中的Operator連接起來,然后形成算子鏈
,且算子鏈中HeaderOperator會暴露給StreamTask。當StreamTaskNetworkIutput接收到網絡數據后,就會通過StreamTaskNetworkOutput組件將數據元素發送給OperatorChain中的HeaderOperator進行處理,此時Task實例中的算子就能夠接收數據并進行處理了。
- 上一個算子處理的數據會通過Output組件發送到下一個算子中繼續處理
- 在OperatorChain中,除了具有HeaderOperator之外,還包含了其他算子,這些算子會按照拓撲關系連接到HeaderOperator之后,每個算子之間的數據傳輸通過Output組件相連,即在OperatorChain中,上一個算子處理的數據會通過Output組件發送到下一個算子中繼續處理。注意:DataOutput強調的是從外部接入數據到Task實例后再轉發到HeaderOperator中,Output則更加強調算子鏈內部的數據傳遞。
- Output組件的實現主要有ChainingOutput、BroadcastingOutputCollector、DirectedOutput和RecordWriterOutput等類型,它們最大的區別在于數據下發的方式不同,例如ChainingOutput代表直接向下游算子推送數據。
- RecordWriterOutput中RecordWriter組件將數據發送到網絡
經過算子鏈處理后的數據,需要發送到網絡中供下游的Task實例繼續處理,此時需要通過RecordWriterOutput完成數據的網絡輸出。RecordWriterOutput中包含了RecordWriter組件,用于將數據輸出到網絡中,下游Task實例就能通過StreamTaskInput組件從網絡中獲取數據,并繼續傳遞到Task內部的算子鏈進行處理。
小結:
在StreamTask中接入數據,然后通過OperatorChain進行處理,再通過RecordWriterOutput發送到網絡中,下游Task節點則繼續從網絡中獲取數據并繼續處理,最后組合這些Task節點就形成了整個Flink作業的計算拓撲。
注意:Task節點的數據輸入也可以是本地類型,這種情況主要出現在Task實例被執行在同一臺TaskManager時,數據不需要經過網絡傳輸。
?