一、概覽
DataStream API支持不同的運行時執行模式,我們可以根據用例的要求和作業的特征進行選擇。
STREAMING
執行模式:被稱為“經典”執行模式為,主要用于需要持續增量處理并且預計無限期保持在線的無界作業BATCH
執行模式:類似于MapReduce的批處理框架,主要用于已知固定輸入且不連續運行的有界作業。- AUTOMATIC執行模式:交給Flink自己決斷,如果所有源都有界,Flink將選擇BATCH,否則選擇STREAMING
Flink對流和批次處理作業的統一方法意味著,無論配置的執行模式如何,在有界輸入上執行的DataStream應用程序都將產生相同的最終結果。請務必注意最終的含義:在STREAMING
模式下執行的作業可能會產生增量更新(想想數據庫中的upserts),而BATCH
作業最終只會產生一個最終結果。如果解釋正確,最終結果將是相同的,但到達那里的方式可能不同
當啟用BATCH
執行,我們允許Flink應用額外的優化,只有當我們知道我們的輸入是有界的時候,我們才能這樣做。例如,除了允許更有效的任務調度和故障恢復行為的不同洗牌實現之外,還可以使用不同的連接/聚合策略。
二、我什么時候可以/應該使用BATCH執行模式?
只有有界的作業/Flink程序才能使用BATCH執行模式。有界是數據源的一個屬性,它告訴我們來自該源的所有輸入在執行之前是否已知,或者新數據是否會無限期地出現。反過來,如果一個作業的所有源都有界,則該作業是有界的,否則是無界的。
STREAMING執行模式可以用于有界和無界作業
根據經驗,當程序有界時,應該使用BATCH執行模式,因為這會更有效。當程序無界時,則必須使用STREAMING執行模式,因為只有這種模式足夠通用,能夠處理連續的數據流。
三、配置BATCH執行模式
執行模式可以通過execution.runtime-mode
設置進行配置。有三個可能的值:STREAMING、BATCH、AUTOMATIC
這可以通過bin/flink run ...
的命令行參數進行配置,或者在創建/配置StreamExecutionEnvironment
時以編程方式進行配置。例如:
命令行:
bin/flink run -Dexecution.runtime-mode=BATCH <jarFile>
代碼:?
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
建議不要在程序中設置運行時模式,而是在提交應用程序時使用命令行設置。保持應用程序代碼無配置允許更大的靈活性,因為同一應用程序可以在任何執行模式下執行
四、執行行為
1、任務調度和網絡洗牌
Flink作業由在數據流圖中連接在一起的不同操作組成。系統決定如何安排這些操作在不同進程/機器(TaskManager)上的執行,以及如何在它們之間洗牌(發送)數據。
可以使用稱為鏈接的功能將多個操作/運算符鏈接在一起。Flink認為作為調度單元的一組一個或多個(鏈接的)運算符稱為任務。術語子任務通常用于指代在多個TaskManager上并行運行的任務的各個實例,但我們在這里只使用術語任務。
任務調度和網絡洗牌在BATCH和STREAMING執行模式下的工作方式不同。主要是因為我們知道我們的輸入數據在BATCH執行模式下是有界的,這允許Flink使用更有效的數據結構和算法。
我們將用這個例子來解釋任務調度和網絡傳輸的區別:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.fromElements(...);source.name("source").map(...).name("map1").map(...).name("map2").rebalance().map(...).name("map3").map(...).name("map4").keyBy((value) -> value).map(...).name("map5").map(...).name("map6").sinkTo(...).name("sink");
和Spark一樣,數據之間關系是1對1、多對1關系的,Flink通常不會在它們之間插入網絡洗牌。例如:map()
,?flatMap()
, ?filter()。
諸如keyBy()或re平衡()之類的操作需要在任務的不同并行實例之間洗牌數據。這會導致網絡洗牌。
對于上面的示例,Flink將操作組合為如下任務:
- Task1:?
source
,?map1
,map2
- Task2:?
map3
,?map4
- Task3:?
map5
,?map6
,sink
Task1和Task2以及Task2和Task3之間有一個網絡洗牌:
STREAMING執行模式
在流式執行模式下,所有任務都需要一直在線/運行。這允許Flink立即通過整個管道處理新記錄,這是我們連續和低延遲流處理所需要的。這也意味著分配給作業的TaskManager需要有足夠的資源來同時運行所有任務。
網絡洗牌是流水線式的,這意味著記錄立即發送到下游任務,并在網絡層進行一些緩沖。同樣,這是必需的,因為在處理連續的數據流時,沒有自然的時間點(在時間上)可以在任務(或任務管道)之間實現數據。這與BATCH執行模式形成鮮明對比,后者可以實現中間結果。
BATCH執行模式
在BATCH執行模式下,作業的任務可以分成可以一個接一個執行的階段。我們可以這樣做,因為輸入是有界的,因此Flink可以在進入下一個階段之前完全處理管道的一個階段。在上面的示例中,作業將有三個階段,對應于由洗牌屏障分隔的三個任務。
分階段處理需要Flink將任務的中間結果物化到一些非短暫的存儲中,這允許下游任務在上游任務已經脫機后讀取它們,而不是像上面解釋的那樣立即向下游任務發送記錄。這將增加處理的延遲,但也帶來了其他有趣的屬性。一方面,這允許Flink在發生故障時回溯到最新的可用結果,而不是重新啟動整個作業。另一個副作用是BATCH作業可以在更少的資源上執行(就TaskManager的可用槽而言),因為系統可以一個接一個地順序執行任務。
TaskManager將保留中間結果,至少只要下游任務沒有消耗它們。(從技術上講,它們將被保留,直到消耗管道區域產生它們的輸出。)之后,只要空間允許,它們將被保留,以便在失敗的情況下允許上述回溯到早期結果。
StateBackend
在STREAMING模式下,Flink使用StateBackend來控制狀態的存儲方式以及檢查點的工作方式。
在BATCH模式下,配置的StateBackend被忽略。相反,鍵控操作的輸入按鍵分組(使用排序),然后我們依次處理一個鍵的所有記錄。這允許同時只保留一個鍵的狀態。當移動到下一個鍵時,給定鍵的狀態將被丟棄。
處理順序
BATCH和STREAMING執行之間在運算符或用戶定義函數(UDF)中處理記錄的順序可能不同。
在STREAMING模式下,用戶定義的函數不應對傳入記錄的順序做出任何假設。數據一到達就會被處理。
在BATCH執行模式下,有一些操作是Flink保證順序的。排序可以是特定任務調度、網絡洗牌和StateBackend(見上文)的副作用,也可以是系統有意識的選擇。
我們可以區分三種一般類型的輸入:
- 廣播輸入:來自廣播流的輸入
- 常規輸入:既不是廣播也不是鍵控的輸入
- 鍵控輸入:來自KeyedStream的輸入
使用多種輸入類型的函數或運算符將按以下順序處理它們:
- 首先處理廣播輸入
- 接著處理常規輸入
- 最后處理鍵控輸入
對于從多個常規或廣播輸入中使用的函數(例如CoProcessFunction),Flink有權以任何順序處理來自該類型的任何輸入的數據。
對于從多個鍵控輸入中使用的函數(例如KeyedCoProcessFunction),Flink在繼續下一個鍵控輸入之前,會處理來自所有鍵控輸入的單個鍵的所有記錄。
事件時間/水印
在支持事件時間方面,Flink的流運行時建立在事件可能亂序的悲觀假設之上,即時間戳為t的事件可能發生在時間戳為t+1的事件之后。正因為如此,系統永遠無法確定給定時間戳T的時間戳為t<T的元素將來不會再出現。為了在使系統實用的同時攤銷這種亂序對最終結果的影響,在STREAMING模式下,Flink使用了一種稱為水印的啟發式方法。帶有時間戳T的水印表示沒有時間戳為t<T的元素會跟隨。
在BATCH模式下,輸入數據集是預先知道的,不需要這樣的啟發式方法,因為至少可以按時間戳對元素進行排序,以便按時間順序處理。因此在BATCH中,我們可以假設“完美水印”。
鑒于上述情況,在BATCH模式下,我們只需要在與每個鍵關聯的輸入末尾MAX_WATERMARK,或者如果輸入流沒有鍵控,則在輸入末尾。基于此方案,所有注冊的計時器將在時間結束時觸發,用戶定義的WatermarkAssigners或WatermarkGenerator將被忽略。不過,指定WatermarkStrategy仍然很重要,因為它的TimestampAssigner仍將用于為記錄分配時間戳。
處理時間
處理時間是機器上處理記錄的掛鐘時間,在該記錄正在被處理的特定實例中。根據這個定義,我們看到基于流轉時長的計算結果是不可重現的。這是因為處理兩次的同一記錄將有兩個不同的時間戳。
盡管如此,在流轉時長模式下使用流轉時長還是很有用的。原因與流轉管道經常實時攝取其無界輸入有關,因此事件時間和流轉時長之間存在相關性。此外,由于上述原因,在流轉模式下,事件時間中的1h通常可以在流轉時長或掛鐘時間中接近1h。因此,使用流轉時長可以用于早期(不完整)觸發,從而給出預期結果的提示。
在輸入數據集是靜態的并且事先已知的批處理世界中不存在這種相關性。因此,在BATCH模式下,我們允許用戶請求當前流轉時長并注冊流轉時長計時器,但是,就像事件時間一樣,所有計時器都將在輸入結束時觸發
從概念上講,我們可以想象流轉時長在作業執行期間不會提前,我們快進到處理整個輸入的時間結束。
故障恢復
在STREAMING執行模式下,Flink使用檢查點進行故障恢復。也可以通過狀態快照進行容錯的更介紹性部分。
故障恢復檢查點的特點之一是Flink將在發生故障時從檢查點重新啟動所有正在運行的任務。這可能比我們在BATCH模式下必須做的事情更昂貴(如下所述),這也是如果您的作業允許,您應該使用BATCH執行模式的原因之一。
在BATCH執行模式下,Flink將嘗試并回溯到中間結果仍然可用的先前處理階段。潛在地,只有失敗的任務(或它們在圖中的前身)必須重新啟動,與從檢查點重新啟動所有任務相比,這可以提高處理效率和作業的整體流轉時長。
2、重要參考因素
與經典的STREAMING執行模式相比,在BATCH模式下,某些功能可能無法按預期工作。某些功能的工作方式略有不同,而其他功能不受支持。
BATCH模式下的行為改變:
????????Rolling”操作(例如reduce()或sum())會為以STREAMING模式到達的每條新記錄發出增量更新。在BATCH模式下,這些操作不是“滾動”。它們只發出最終結果。
BATCH模式下不支持:
????????檢查點和任何依賴于檢查點的操作都不起作用
自定義運算符應該小心實現,否則它們可能會行為不當。
檢查點
如上所述,批處理程序的故障恢復不使用檢查點。想想Spark是如何做的呢?利用RDD的血統,按stage來進行失敗重試的,因為每個stage最后都會落盤。
重要的是要記住,因為沒有檢查點,某些功能,如Checkpoint Listener,因此Kafka的EXACTLY_ONCE模式或File Sink的OnCheckpointRollingPolicy將不起作用。
您仍然可以使用所有狀態原語,只是用于故障恢復的機制會有所不同。
編寫自定義運算符
注意:自定義運算符是Apache Flink的高級使用模式。對于大多數用例,請考慮改用(keyed-)進程函數。
在編寫自定義運算符時,記住對BATCH執行模式所做的假設非常重要。否則,適用于流式傳輸模式的運算符可能會在BATCH模式下產生錯誤的結果。運算符永遠不會限定為特定鍵,這意味著他們會看到Flink試圖利用的BATCH處理的某些屬性。
首先,您不應該在運算符中緩存最后看到的水印。在BATCH模式下,我們逐個鍵處理記錄。因此,水印將在每個鍵之間從MAX_VALUE切換到MIN_VALUE。您不應該假設水印在運算符中總是升序的。出于同樣的原因,計時器將首先按鍵順序觸發,然后按每個鍵內的時間戳順序觸發。此外,不支持手動更改鍵的操作。
?------------------------------------------------------------------------------------------------------------------------------
大多數高校碩博生畢業要求需要參加學術會議,發表EI或者SCI檢索的學術論文會議論文:
可訪問艾思科藍官網,瀏覽即將召開的學術會議列表。會議如下:
2025年人工智能、數字媒體技術與社會計算國際學術會議
https://ais.cn/u/byAVfu
第二屆邊緣計算與并行、分布式計算國際學術會議(ECPDC 2025)
https://ais.cn/u/77FJ3u
2025人工智能與計算機網絡技術國際學術會議(ICAICN 2025)
https://ais.cn/u/jUfAVz
2025年數據挖掘與項目管理國際研討會
https://ais.cn/u/nIbMvm