Flink-DataStreamAPI-執行模式

一、概覽

DataStream API支持不同的運行時執行模式,我們可以根據用例的要求和作業的特征進行選擇。

  • STREAMING執行模式:被稱為“經典”執行模式為,主要用于需要持續增量處理并且預計無限期保持在線的無界作業
  • BATCH執行模式:類似于MapReduce的批處理框架,主要用于已知固定輸入且不連續運行的有界作業。
  • AUTOMATIC執行模式:交給Flink自己決斷,如果所有源都有界,Flink將選擇BATCH,否則選擇STREAMING

Flink對流和批次處理作業的統一方法意味著,無論配置的執行模式如何,在有界輸入上執行的DataStream應用程序都將產生相同的最終結果。請務必注意最終的含義:在STREAMING模式下執行的作業可能會產生增量更新(想想數據庫中的upserts),而BATCH作業最終只會產生一個最終結果。如果解釋正確,最終結果將是相同的,但到達那里的方式可能不同

當啟用BATCH執行,我們允許Flink應用額外的優化,只有當我們知道我們的輸入是有界的時候,我們才能這樣做。例如,除了允許更有效的任務調度和故障恢復行為的不同洗牌實現之外,還可以使用不同的連接/聚合策略。

二、我什么時候可以/應該使用BATCH執行模式?

只有有界的作業/Flink程序才能使用BATCH執行模式。有界是數據源的一個屬性,它告訴我們來自該源的所有輸入在執行之前是否已知,或者新數據是否會無限期地出現。反過來,如果一個作業的所有源都有界,則該作業是有界的,否則是無界的。

STREAMING執行模式可以用于有界和無界作業

根據經驗,當程序有界時,應該使用BATCH執行模式,因為這會更有效。當程序無界時,則必須使用STREAMING執行模式,因為只有這種模式足夠通用,能夠處理連續的數據流。

三、配置BATCH執行模式

執行模式可以通過execution.runtime-mode設置進行配置。有三個可能的值:STREAMING、BATCH、AUTOMATIC

這可以通過bin/flink run ...的命令行參數進行配置,或者在創建/配置StreamExecutionEnvironment時以編程方式進行配置。例如:

命令行:

bin/flink run -Dexecution.runtime-mode=BATCH <jarFile>

代碼:?

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

建議不要在程序中設置運行時模式,而是在提交應用程序時使用命令行設置。保持應用程序代碼無配置允許更大的靈活性,因為同一應用程序可以在任何執行模式下執行

四、執行行為

1、任務調度和網絡洗牌

Flink作業由在數據流圖中連接在一起的不同操作組成。系統決定如何安排這些操作在不同進程/機器(TaskManager)上的執行,以及如何在它們之間洗牌(發送)數據。

可以使用稱為鏈接的功能將多個操作/運算符鏈接在一起。Flink認為作為調度單元的一組一個或多個(鏈接的)運算符稱為任務。術語子任務通常用于指代在多個TaskManager上并行運行的任務的各個實例,但我們在這里只使用術語任務。

任務調度和網絡洗牌在BATCH和STREAMING執行模式下的工作方式不同。主要是因為我們知道我們的輸入數據在BATCH執行模式下是有界的,這允許Flink使用更有效的數據結構和算法。

我們將用這個例子來解釋任務調度和網絡傳輸的區別:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.fromElements(...);source.name("source").map(...).name("map1").map(...).name("map2").rebalance().map(...).name("map3").map(...).name("map4").keyBy((value) -> value).map(...).name("map5").map(...).name("map6").sinkTo(...).name("sink");

和Spark一樣,數據之間關系是1對1、多對1關系的,Flink通常不會在它們之間插入網絡洗牌。例如:map(),?flatMap(), ?filter()。諸如keyBy()或re平衡()之類的操作需要在任務的不同并行實例之間洗牌數據。這會導致網絡洗牌。

對于上面的示例,Flink將操作組合為如下任務:

  • Task1:?source,?map1, map2
  • Task2:?map3,?map4
  • Task3:?map5,?map6, sink

Task1和Task2以及Task2和Task3之間有一個網絡洗牌:

STREAMING執行模式

在流式執行模式下,所有任務都需要一直在線/運行。這允許Flink立即通過整個管道處理新記錄,這是我們連續和低延遲流處理所需要的。這也意味著分配給作業的TaskManager需要有足夠的資源來同時運行所有任務。

網絡洗牌是流水線式的,這意味著記錄立即發送到下游任務,并在網絡層進行一些緩沖。同樣,這是必需的,因為在處理連續的數據流時,沒有自然的時間點(在時間上)可以在任務(或任務管道)之間實現數據。這與BATCH執行模式形成鮮明對比,后者可以實現中間結果。

BATCH執行模式

在BATCH執行模式下,作業的任務可以分成可以一個接一個執行的階段。我們可以這樣做,因為輸入是有界的,因此Flink可以在進入下一個階段之前完全處理管道的一個階段。在上面的示例中,作業將有三個階段,對應于由洗牌屏障分隔的三個任務。

分階段處理需要Flink將任務的中間結果物化到一些非短暫的存儲中,這允許下游任務在上游任務已經脫機后讀取它們,而不是像上面解釋的那樣立即向下游任務發送記錄。這將增加處理的延遲,但也帶來了其他有趣的屬性。一方面,這允許Flink在發生故障時回溯到最新的可用結果,而不是重新啟動整個作業。另一個副作用是BATCH作業可以在更少的資源上執行(就TaskManager的可用槽而言),因為系統可以一個接一個地順序執行任務。

TaskManager將保留中間結果,至少只要下游任務沒有消耗它們。(從技術上講,它們將被保留,直到消耗管道區域產生它們的輸出。)之后,只要空間允許,它們將被保留,以便在失敗的情況下允許上述回溯到早期結果。

StateBackend

在STREAMING模式下,Flink使用StateBackend來控制狀態的存儲方式以及檢查點的工作方式。

在BATCH模式下,配置的StateBackend被忽略。相反,鍵控操作的輸入按鍵分組(使用排序),然后我們依次處理一個鍵的所有記錄。這允許同時只保留一個鍵的狀態。當移動到下一個鍵時,給定鍵的狀態將被丟棄。

處理順序

BATCH和STREAMING執行之間在運算符或用戶定義函數(UDF)中處理記錄的順序可能不同。

在STREAMING模式下,用戶定義的函數不應對傳入記錄的順序做出任何假設。數據一到達就會被處理。

在BATCH執行模式下,有一些操作是Flink保證順序的。排序可以是特定任務調度、網絡洗牌和StateBackend(見上文)的副作用,也可以是系統有意識的選擇。

我們可以區分三種一般類型的輸入:

  • 廣播輸入:來自廣播流的輸入
  • 常規輸入:既不是廣播也不是鍵控的輸入
  • 鍵控輸入:來自KeyedStream的輸入

使用多種輸入類型的函數或運算符將按以下順序處理它們:

  1. 首先處理廣播輸入
  2. 接著處理常規輸入
  3. 最后處理鍵控輸入

對于從多個常規或廣播輸入中使用的函數(例如CoProcessFunction),Flink有權以任何順序處理來自該類型的任何輸入的數據。

對于從多個鍵控輸入中使用的函數(例如KeyedCoProcessFunction),Flink在繼續下一個鍵控輸入之前,會處理來自所有鍵控輸入的單個鍵的所有記錄。

事件時間/水印

在支持事件時間方面,Flink的流運行時建立在事件可能亂序的悲觀假設之上,即時間戳為t的事件可能發生在時間戳為t+1的事件之后。正因為如此,系統永遠無法確定給定時間戳T的時間戳為t<T的元素將來不會再出現。為了在使系統實用的同時攤銷這種亂序對最終結果的影響,在STREAMING模式下,Flink使用了一種稱為水印的啟發式方法。帶有時間戳T的水印表示沒有時間戳為t<T的元素會跟隨。

在BATCH模式下,輸入數據集是預先知道的,不需要這樣的啟發式方法,因為至少可以按時間戳對元素進行排序,以便按時間順序處理。因此在BATCH中,我們可以假設“完美水印”。

鑒于上述情況,在BATCH模式下,我們只需要在與每個鍵關聯的輸入末尾MAX_WATERMARK,或者如果輸入流沒有鍵控,則在輸入末尾。基于此方案,所有注冊的計時器將在時間結束時觸發,用戶定義的WatermarkAssigners或WatermarkGenerator將被忽略。不過,指定WatermarkStrategy仍然很重要,因為它的TimestampAssigner仍將用于為記錄分配時間戳。

處理時間

處理時間是機器上處理記錄的掛鐘時間,在該記錄正在被處理的特定實例中。根據這個定義,我們看到基于流轉時長的計算結果是不可重現的。這是因為處理兩次的同一記錄將有兩個不同的時間戳。

盡管如此,在流轉時長模式下使用流轉時長還是很有用的。原因與流轉管道經常實時攝取其無界輸入有關,因此事件時間和流轉時長之間存在相關性。此外,由于上述原因,在流轉模式下,事件時間中的1h通常可以在流轉時長或掛鐘時間中接近1h。因此,使用流轉時長可以用于早期(不完整)觸發,從而給出預期結果的提示。

在輸入數據集是靜態的并且事先已知的批處理世界中不存在這種相關性。因此,在BATCH模式下,我們允許用戶請求當前流轉時長并注冊流轉時長計時器,但是,就像事件時間一樣,所有計時器都將在輸入結束時觸發

從概念上講,我們可以想象流轉時長在作業執行期間不會提前,我們快進到處理整個輸入的時間結束。

故障恢復

在STREAMING執行模式下,Flink使用檢查點進行故障恢復。也可以通過狀態快照進行容錯的更介紹性部分。

故障恢復檢查點的特點之一是Flink將在發生故障時從檢查點重新啟動所有正在運行的任務。這可能比我們在BATCH模式下必須做的事情更昂貴(如下所述),這也是如果您的作業允許,您應該使用BATCH執行模式的原因之一。

在BATCH執行模式下,Flink將嘗試并回溯到中間結果仍然可用的先前處理階段。潛在地,只有失敗的任務(或它們在圖中的前身)必須重新啟動,與從檢查點重新啟動所有任務相比,這可以提高處理效率和作業的整體流轉時長。

2、重要參考因素

與經典的STREAMING執行模式相比,在BATCH模式下,某些功能可能無法按預期工作。某些功能的工作方式略有不同,而其他功能不受支持。

BATCH模式下的行為改變:

????????Rolling”操作(例如reduce()或sum())會為以STREAMING模式到達的每條新記錄發出增量更新。在BATCH模式下,這些操作不是“滾動”。它們只發出最終結果。

BATCH模式下不支持:

????????檢查點和任何依賴于檢查點的操作都不起作用

自定義運算符應該小心實現,否則它們可能會行為不當。

檢查點

如上所述,批處理程序的故障恢復不使用檢查點。想想Spark是如何做的呢?利用RDD的血統,按stage來進行失敗重試的,因為每個stage最后都會落盤。

重要的是要記住,因為沒有檢查點,某些功能,如Checkpoint Listener,因此Kafka的EXACTLY_ONCE模式或File Sink的OnCheckpointRollingPolicy將不起作用。

您仍然可以使用所有狀態原語,只是用于故障恢復的機制會有所不同。

編寫自定義運算符

注意:自定義運算符是Apache Flink的高級使用模式。對于大多數用例,請考慮改用(keyed-)進程函數。

在編寫自定義運算符時,記住對BATCH執行模式所做的假設非常重要。否則,適用于流式傳輸模式的運算符可能會在BATCH模式下產生錯誤的結果。運算符永遠不會限定為特定鍵,這意味著他們會看到Flink試圖利用的BATCH處理的某些屬性。

首先,您不應該在運算符中緩存最后看到的水印。在BATCH模式下,我們逐個鍵處理記錄。因此,水印將在每個鍵之間從MAX_VALUE切換到MIN_VALUE。您不應該假設水印在運算符中總是升序的。出于同樣的原因,計時器將首先按鍵順序觸發,然后按每個鍵內的時間戳順序觸發。此外,不支持手動更改鍵的操作。

?------------------------------------------------------------------------------------------------------------------------------

大多數高校碩博生畢業要求需要參加學術會議,發表EI或者SCI檢索的學術論文會議論文:
可訪問艾思科藍官網,瀏覽即將召開的學術會議列表。會議如下:

2025年人工智能、數字媒體技術與社會計算國際學術會議

https://ais.cn/u/byAVfu

第二屆邊緣計算與并行、分布式計算國際學術會議(ECPDC 2025)

https://ais.cn/u/77FJ3u

2025人工智能與計算機網絡技術國際學術會議(ICAICN 2025)

https://ais.cn/u/jUfAVz

2025年數據挖掘與項目管理國際研討會

https://ais.cn/u/nIbMvm

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/72969.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/72969.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/72969.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

解決VScode 連接不上問題

問題 &#xff1a;VScode 連接不上 解決方案&#xff1a; 1、手動殺死VS Code服務器進程&#xff0c;然后重新嘗試登錄 打開xshell &#xff0c;遠程連接服務器 &#xff0c;查看vscode的進程 &#xff0c;然后全部殺掉 [cxqiZwz9fjj2ssnshikw14avaZ ~]$ ps ajx | grep vsc…

C#類型轉換基本概念

一、基本定義? C# 類型轉換是將數據從一種類型轉換為另一種類型的過程&#xff0c;分為 ?隱式轉換? 和 ?顯式轉換? 兩類?。 強類型語言特性?&#xff1a;C# 要求變量類型在編譯時確定&#xff0c;類型轉換需滿足兼容性或顯式規則?。目的?&#xff1a;處理不同數據類…

使用阿里云操作系統控制臺排查內存溢出

引言 操作系統控制臺是阿里云最新推出的一款智能運維工具&#xff0c;專為提升運維效率、優化服務器管理而設計。它集成了多種運維管理功能&#xff0c;包括操作系統助手、插件管理器以及其他實用工具&#xff0c;為用戶提供一站式的運維解決方案。無論是個人開發者還是企業運…

(C/S)架構、(B/S)架構

客戶機/服務器&#xff08;C/S&#xff09;架構 理論描述&#xff1a; 客戶機/服務器架構是一種網絡架構風格&#xff0c;其中任務被分配給網絡中的不同計算機&#xff0c;以提高效率和靈活性。這種架構由兩部分組成&#xff1a;客戶端&#xff08;Client&#xff09;和服務器&…

混合存儲HDD+SSD機型磁盤陣列,配上SSD緩存功能,性能提升300%

企業日常運行各種文件無處不在&#xff0c;文檔、報告、視頻、應用數據......面對成千上萬的文件&#xff0c;團隊之間需要做到無障礙協作&#xff0c;員工能夠即時快速訪問、共享處理文件。隨著業務增長&#xff0c;數字化辦公不僅需要大容量&#xff0c;快速高效的文件訪問越…

C 語言異常處理方式全面解析

引言? 在 C 語言編程領域&#xff0c;穩健的錯誤處理機制對于保障程序的可靠性、穩定性以及安全性至關重要。異常處理作為錯誤處理的進階形式&#xff0c;雖然并非 C 語言標準庫原生支持的特性&#xff0c;但通過巧妙運用語言特性和編程技巧&#xff0c;開發者能夠實現有效的…

【每日學點HarmonyOS Next知識】狀態欄控制、片段按鈕點擊回調、繪制組件、取消按鈕與輸入框對齊、父調子組件方法

1、HarmonyOS 狀態欄怎么控制顯示于隱藏&#xff0c;設置狀態欄顏色&#xff0c;子顏色等控制&#xff1f; 顯示與隱藏 可以設置沉浸式&#xff0c;隱藏的話可以退出沉靜式&#xff0c;在子窗口打開的頁面 aboutToAppear 方法中設置沉浸式 aboutToAppear(): void {// 設置沉浸…

二級Python通關秘籍:字符串操作符/函數/方法全解析與實戰演練

第一章 字符串基礎概念與運算符速通 1.1 字符串的不可變性特性 在Python中&#xff0c;字符串被設計為immutable類型&#xff0c;任何修改操作都會生成新對象。這一特性直接影響字符串拼接的性能表現&#xff0c;建議使用join()方法代替多次操作。 1.2 基礎操作符全掌握 pyt…

GStreamer —— 2.6、Windows下Qt加載GStreamer庫后運行 - “教程6:媒體格式和Pad功能“(附:完整源碼)

運行效果 簡介 上一個教程演示了GUI 工具包集成(gtk)。本教程介紹媒體格式和Pad功能。Pad Capabilities 是 GStreamer 的一個基本元素&#xff0c;盡管大多數它們不可見&#xff0c;因為框架會處理它們 自然而然。這個有點理論性的教程展示了&#xff1a; ? 什么是 Pad 功能。…

【前綴和與差分 C/C++】洛谷 P8218 求區間和

2025 - 03 - 09 - 第 72 篇 Author: 鄭龍浩 / 仟濹 【前綴和與差分 C/C】 文章目錄 洛谷 P8218 求區間和題目描述輸入格式輸出格式輸入輸出樣例 #1輸入 #1輸出 #1 說明/提示思路代碼 洛谷 P8218 求區間和 題目描述 給定 n n n 個正整數組成的數列 a 1 , a 2 , ? , a n a_…

初識Bert

在學習Bert之前我們先了解“遞歸神經網絡&#xff08;RNN Recurrent neural network)” 和 “長短期記憶&#xff08;LSTM Long short-term memory)” 我們如果僅僅識別每個字的含義&#xff0c;那么在一句話中沒有相同的字還是可以的但是如果一句話中有相同的字&#xff0c;那…

clickhouse源碼分析

《ClickHouse源碼分析》 當我們談論數據庫時&#xff0c;ClickHouse是一個不容忽視的名字。它是一個用于聯機分析處理&#xff08;OLAP&#xff09;的列式數據庫管理系統&#xff08;DBMS&#xff09;&#xff0c;以其快速的數據查詢能力而聞名。對于想要深入了解這個高效工具…

[網絡爬蟲] 動態網頁抓取 — Selenium 元素定位

&#x1f31f;想系統化學習爬蟲技術&#xff1f;看看這個&#xff1a;[數據抓取] Python 網絡爬蟲 - 學習手冊-CSDN博客 在使用 Selenium 時&#xff0c;往往需要先定位到指定元素&#xff0c;然后再執行相應的操作。例如&#xff0c;再向文本輸入框中輸入文字之前&#xff0c;…

ArcGIS操作:15 計算點的經緯度,并添加到屬性表

注意&#xff1a;需要轉化為地理坐標系 1、打開屬性表&#xff0c;添加字段 2、計算字段&#xff08;以計算緯度為例 !Shape!.centroid.Y ) 3、效果

[項目]基于FreeRTOS的STM32四軸飛行器: 七.遙控器按鍵

基于FreeRTOS的STM32四軸飛行器: 七.遙控器 一.遙控器按鍵搖桿功能說明二.搖桿和按鍵的配置三.按鍵掃描 一.遙控器按鍵搖桿功能說明 兩個手柄四個ADC。 左側手柄&#xff1a; 前后推為飛控油門&#xff0c;左右推為控制飛機偏航角。 右側手柄&#xff1a; 控制飛機飛行方向&a…

Redis 內存淘汰策略深度解析

Redis 作為高性能的內存數據庫&#xff0c;其內存資源的高效管理直接關系到系統的穩定性和性能。當 Redis 的內存使用達到配置的最大值&#xff08;maxmemory&#xff09;時&#xff0c;新的寫入操作將觸發內存淘汰機制&#xff08;Eviction Policy&#xff09;&#xff0c;以釋…

【面試】Java 集合

集合 1、常見的集合有哪些2、說說 List、Set、Queue、Map 四者的區別3、Collection 和 Collections 有什么區別4、Comparable 和 Comparator 的區別5、ArrayList 和 LinkedList 的區別是什么6、ArrayList 和 Vector 的區別是什么7、ArrayList 和 Vector 的擴容機制8、CopyOnWri…

【c++】平移字符串

說明 實現字符串的左移與右移 示例代碼 #include <iostream> #include <string> using namespace std;int main() {string str1 "12345";//左移2位string str2 str1.substr(2) str1.substr(0, 2);cout << str2 << endl;//右移2位&…

密碼學(終極版)

加密 & 解密 備注&#xff1a;密碼學領域不存在完全不能破解的密碼&#xff0c;但是如果一個密碼需要很久很久&#xff0c;例如一萬年才能破解&#xff0c;就認為這個密碼是安全的了。 對稱加密 非對稱加密 公鑰加密、私鑰解密 私鑰簽名、公鑰認證 非對稱的底層原理是…

FreeRTOS任務狀態查詢

一.任務相關API vTaskList&#xff08;&#xff09;&#xff0c;創建一個表格描述每個任務的詳細信息 char biaoge[1000]; //定義一個緩存 vTaskList(biaoge); //將表格存到這緩存中 printf("%s /r/n",biaoge); 1.uxTaskPriorityGet&#xff08;&#xf…