56、Flink DataStream 的管理執行配置詳解

1)概述
1.執行配置

StreamExecutionEnvironment 包含了 ExecutionConfig,它允許在運行時設置作業特定的配置值。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ExecutionConfig executionConfig = env.getConfig();

以下是可用的配置選項:(默認為粗體)

  • setClosureCleanerLevel()。closure cleaner 的級別默認設置為 ClosureCleanerLevel.RECURSIVE。closure cleaner 刪除 Flink 程序中對匿名 function 的調用類的不必要引用。禁用 closure cleaner 后,用戶的匿名 function 可能正引用一些不可序列化的調用類。這將導致序列化器出現異常。可設置的值是: NONE:完全禁用 closure cleaner ,TOP_LEVEL:只清理頂級類而不遞歸到字段中,RECURSIVE:遞歸清理所有字段。
  • getParallelism() / setParallelism(int parallelism)。為作業設置默認的并行度。
  • getMaxParallelism() / setMaxParallelism(int parallelism)。為作業設置默認的最大并行度。此設置決定最大并行度并指定動態縮放的上限。
  • getNumberOfExecutionRetries() / setNumberOfExecutionRetries(int numberOfExecutionRetries)。設置失敗任務重新執行的次數。值為零會有效地禁用容錯。-1 表示使用系統默認值(在配置中定義)。該配置已棄用,請改用重啟策略。
  • getExecutionRetryDelay() / setExecutionRetryDelay(long executionRetryDelay)。設置系統在作業失敗后重新執行之前等待的延遲(以毫秒為單位)。在 TaskManagers 上成功停止所有任務后,開始計算延遲,一旦延遲過去,任務會被重新啟動。該配置已被棄用,請改用重啟策略 。
  • getExecutionMode() / setExecutionMode()。默認的執行模式是 PIPELINED。設置執行模式以執行程序。執行模式定義了數據交換是以批處理方式還是以流方式執行。
  • enableForceKryo() / disableForceKryo。默認情況下不強制使用 Kryo。強制 GenericTypeInformation 對 POJO 使用 Kryo 序列化器,即使可以將它們作為 POJO 進行分析。在某些情況下,應該優先啟用該配置。例如,當 Flink 的內部序列化器無法正確處理 POJO 時。
  • enableForceAvro() / disableForceAvro()。默認情況下不強制使用 Avro。強制 Flink AvroTypeInfo 使用 Avro 序列化器而不是 Kryo 來序列化 Avro 的 POJO。
  • enableObjectReuse() / disableObjectReuse()。默認情況下,Flink 中不重用對象。啟用對象重用模式會指示運行時重用用戶對象以獲得更好的性能。注意可能會導致bug。
  • getGlobalJobParameters() / setGlobalJobParameters()。此方法允許用戶將自定義對象設置為作業的全局配置。由于 ExecutionConfig 可在所有用戶定義的 function 中訪問,因此這是一種使配置在作業中全局可用的簡單方法。
  • addDefaultKryoSerializer(Class type, Serializer serializer)。為指定的類型注冊 Kryo 序列化器實例。
  • addDefaultKryoSerializer(Class type, Class> serializerClass)。為指定的類型注冊 Kryo 序列化器的類。
  • registerTypeWithKryoSerializer(Class type, Serializer serializer)。使用 Kryo 注冊指定類型并為其指定序列化器。通過使用 Kryo 注冊類型,該類型的序列化將更加高效。
  • registerKryoType(Class type)。如果類型最終被 Kryo 序列化,那么它將在 Kryo 中注冊,以確保只有標記(整數 ID)被寫入。如果一個類型沒有在 Kryo 注冊,它的全限定類名將在每個實例中被序列化,從而導致更高的 I/O 成本。
  • registerPojoType(Class type)。將指定的類型注冊到序列化棧中。如果該類型最終被序列化為 POJO,那么該類型將注冊到 POJO 序列化器中。如果該類型最終被 Kryo 序列化,那么它將在 Kryo 中注冊,以確保只有標記被寫入。如果一個類型沒有在 Kryo 注冊,它的全限定類名將在每個實例中被序列化,從而導致更高的I/O成本。

注意:用 registerKryoType() 注冊的類型對 Flink 的 Kryo 序列化器實例來說是不可用的。

  • disableAutoTypeRegistration()。自動類型注冊在默認情況下是啟用的。自動類型注冊是將用戶代碼使用的所有類型(包括子類型)注冊到 Kryo 和 POJO 序列化器。
  • setTaskCancellationInterval(long interval)。設置嘗試連續取消正在運行任務的等待時間間隔(以毫秒為單位)。當一個任務被取消時,會創建一個新的線程,如果任務線程在一定時間內沒有終止,新線程就會定期調用任務線程上的 interrupt() 方法。這個參數是指連續調用 interrupt() 的時間間隔,默認設置為 30000 毫秒,或 30秒

通過 getRuntimeContext() 方法在 Rich* function 中訪問到的 RuntimeContext 也允許在所有用戶定義的 function 中訪問 ExecutionConfig

2.程序打包和分布式運行
a)打包程序

為了能夠通過命令行或 web 界面執行打包的 JAR 文件,程序必須使用通過 StreamExecutionEnvironment.getExecutionEnvironment() 獲取的 environment。當 JAR 被提交到命令行或 web 界面后,該 environment 會扮演集群環境的角色。如果調用 Flink 程序的方式與上述接口不同,該 environment 會扮演本地環境的角色。

打包程序只要簡單地將所有相關的類導出為 JAR 文件,JAR 文件的 manifest 必須指向包含程序入口點(擁有公共 main 方法)的類。實現的最簡單方法是將 main-class 寫入 manifest 中(比如 main-class: org.apache.flinkexample.MyProgram)。main-class 屬性與 Java 虛擬機通過指令 java -jar pathToTheJarFile 執行 JAR 文件時尋找 main 方法的類是相同的。

大多數 IDE 提供了在導出 JAR 文件時自動包含該屬性的功能。

b)總結

調用打包后程序的完整流程包括兩步:

  • 搜索 JAR 文件 manifest 中的 main-classprogram-class 屬性。如果兩個屬性同時存在,program-class 屬性會優先于 main-class 屬性。對于 JAR manifest 中兩個屬性都不存在的情況,命令行和 web 界面支持手動傳入入口點類名參數。
  • 系統接著調用該類的 main 方法。
3.并行執行
a)概述

一個 Flink 程序由多個任務 task 組成(轉換/算子、數據源和數據接收器)。一個 task 包括多個并行執行的實例,且每一個實例都處理 task 輸入數據的一個子集。一個 task 的并行實例數被稱為該 task 的 并行度 (parallelism)。

使用 savepoints 時,應該考慮設置最大并行度。當作業從一個 savepoint 恢復時,可以改變特定算子或者整個程序的并行度,并且此設置會限定整個程序的并行度的上限。由于在 Flink 內部將狀態劃分為了 key-groups,且性能所限不能無限制地增加 key-groups,因此設定最大并行度是有必要的。

b)設置并行度

算子層面

單個算子、數據源和數據接收器的并行度可以通過調用 setParallelism()方法來指定。如下所示:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> text = [...];
DataStream<Tuple2<String, Integer>> wordCounts = text.flatMap(new LineSplitter()).keyBy(value -> value.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).sum(1).setParallelism(5);wordCounts.print();env.execute("Word Count Example");

執行環境層次

Flink 程序運行在執行環境的上下文中。執行環境為所有執行的算子、數據源、數據接收器 (data sink) 定義了一個默認的并行度。可以顯式配置算子層次的并行度去覆蓋執行環境的并行度。

可以通過調用 setParallelism() 方法指定執行環境的默認并行度。如果想以并行度3來執行所有的算子、數據源和數據接收器。可以在執行環境上設置默認并行度,如下所示:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);DataStream<String> text = [...];
DataStream<Tuple2<String, Integer>> wordCounts = [...];
wordCounts.print();env.execute("Word Count Example");

客戶端層次

將作業提交到 Flink 時可在客戶端設定其并行度。客戶端可以是 Java 或 Scala 程序,Flink 的命令行接口(CLI)就是一種典型的客戶端。

在 CLI 客戶端中,可以通過 -p 參數指定并行度,例如:

./bin/flink run -p 10 ../examples/*WordCount-java*.jar

在 Java/Scala 程序中,可以通過如下方式指定并行度:

try {PackagedProgram program = new PackagedProgram(file, args);InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");Configuration config = new Configuration();Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());// set the parallelism to 10 hereclient.run(program, 10, true);} catch (ProgramInvocationException e) {e.printStackTrace();
}

系統層次

可以通過設置 Flink 配置文件中的 parallelism.default 參數,在系統層次來指定所有執行環境的默認并行度。

c)設置最大并行度

最大并行度可以在所有設置并行度的地方進行設定(客戶端和系統層次除外)。與調用 setParallelism() 方法修改并行度相似,可以通過調用 setMaxParallelism() 方法來設定最大并行度。

默認的最大并行度等于將 operatorParallelism + (operatorParallelism / 2) 值四舍五入到大于等于該值的一個整型值,并且這個整型值是 2 的冪次方,注意默認最大并行度下限為 128,上限為 32768

為最大并行度設置一個非常大的值將會降低性能,因為一些 state backends 需要維持內部的數據結構,而這些數據結構將會隨著 key-groups 的數目而擴張(key-group 是狀態重新分配的最小單元)。

從之前的作業恢復時,改變該作業的最大并發度將會導致狀態不兼容。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/40225.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/40225.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/40225.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

《企業實戰分享 · 內存溢出分析》

&#x1f4e2; 大家好&#xff0c;我是 【戰神劉玉棟】&#xff0c;有10多年的研發經驗&#xff0c;致力于前后端技術棧的知識沉淀和傳播。 &#x1f497; &#x1f33b; 近期剛轉戰 CSDN&#xff0c;會嚴格把控文章質量&#xff0c;絕不濫竽充數&#xff0c;如需交流&#xff…

用PyQt5打造炫酷界面:深入解析pyqt5-custom-widgets

在PyQt5中&#xff0c;使用自定義小部件可以為應用程序增添更多實用性和時尚感。pyqt5-custom-widgets是一個開源項目&#xff0c;提供了一系列有用且時尚的自定義小部件&#xff0c;如開關按鈕、動畫按鈕等。本文將詳細介紹pyqt5-custom-widgets的安裝和使用方法。 安裝 可以…

權限維持Linux---監控功能Strace后門命令自定義Alias后門

免責聲明:本文僅做技術交流與學習... 目錄 監控功能Strace后門 1、記錄 sshd 明文 監控 篩選查看 2、記錄sshd私鑰 命令自定義Alias后門 1、簡單粗魯實現反彈&#xff1a; 靶機替換命令 攻擊機監聽上線 2.升級(讓命令正常) 將反彈命令進行base64編碼 替換alias命令 …

【Linux】--help,man page , info page

我們知道Linux有很多的命令&#xff0c;那LInux要不要背命令&#xff1f; 答案是背最常用的那些就行了 那有的時候我們想查詢一些命令的詳細用法該怎么辦呢&#xff1f; 這里我給出3種方法 1.--help --help的使用方法很簡單啊 要查詢的命令 --help 我們看個例子 這里我只…

java版企業工程管理系統源碼:全方位的項目管理解決方案

工程管理系統是一款專注于建設工程項目全生命周期管理的軟件。它覆蓋了項目從策劃、設計、施工到竣工的每一個階段&#xff0c;提供全方位的管理功能。系統采用模塊化設計&#xff0c;包括系統管理、系統設置、項目管理、合同管理、預警管理、竣工管理、質量管理、統計報表和工…

李白的酒量之謎

在中國古典文學的璀璨星空中&#xff0c;李白的名字猶如一顆耀眼的星辰&#xff0c;其卓越的文學成就與獨特的人生經歷引得無數后人仰望。特別是“李白斗酒詩百篇”&#xff0c;這句話不僅高度概括了李白的詩歌才華和其對酒精的熱愛&#xff0c;也使得后人對李白的酒量產生了濃…

6月30日功能測試Day10

3.4.4拼團購測試點 功能位置&#xff1a;營銷-----拼團購 后臺優惠促銷列表管理可以添加拼團&#xff0c;查看拼團活動&#xff0c;啟動活動&#xff0c;編輯活動&#xff0c;刪除活動。 可以查看拼團活動中已下單的訂單以狀態 需求分析 功能和添加拼團 商品拼團活動頁 3…

Pytorch中方法對象和屬性,例如size()和shape

文章目錄 方法對象和屬性的基本概念方法對象屬性示例說明總結 常見的方法對象和屬性常見的方法對象常見的屬性總結示例 方法對象和屬性的基本概念 方法對象&#xff08;method object&#xff09;和屬性&#xff08;attribute&#xff09;是面向對象編程中的兩個重要概念。讓我…

python使用pywebview集成vue3和element-plus開發桌面系統框架

隨著web技術越來越成熟&#xff0c;就連QQ的windows客戶端都用web技術來開發&#xff0c;所以在未來&#xff0c;web技術來開發windows桌面軟件也會越來越多&#xff0c;所以在此發展驅動之下&#xff0c;將最近流程的python與web技術相結合&#xff0c;使用vue3和element-plus…

圖像增強 目標檢測 仿射變換 圖像處理 扭曲圖像

1.背景 在目標檢測中&#xff0c;需要進行圖像增強。這里的代碼模擬了旋轉、扭曲圖像的功能&#xff0c;并且在扭曲的時候&#xff0c;能夠同時把標注的結果也進行扭曲。 這里忽略了讀取xml的過程&#xff0c;假設圖像IMG存在對應的標注框&#xff0c;且坐標為左上、右下兩個…

[C++初階]vector的初步理解

一、標準庫中的vector類 1.vector的介紹 1. vector是表示可變大小數組的序列容器 &#xff0c; 和數組一樣&#xff0c;vector可采用的連續存儲空間來存儲元素。也就是意味著可以采用下標對vector的元素進行訪問&#xff0c;和數組一樣高效。但是又不像數組&#xff0c;它的大…

災后恢復與勒索恢復的區別

災難恢復通常側重于物理基礎設施&#xff0c;如硬盤和網絡設備&#xff0c;而勒索軟件恢復則涉及數據完整性和防范網絡威脅。 BackBox 產品管理副總裁 Amar Ramakrishnan表示&#xff0c;在災難中&#xff0c;企業可能面臨硬件受損等問題&#xff0c;但在網絡安全事件中&#…

Java學習高級一

修飾符 static 類變量的應用場景 成員方法的分類 成員變量的執行原理 成員方法的執行原理 Java之 main 方法 類方法的常見應用場景 代碼塊 設計模式 單例設計模式 餓漢式單例設計模式 懶漢式單例設計模式 繼承 權限修飾符

小紅書 達芬奇:生活問答 AI 機器人

小紅書去年 9 月開始內測的生活問答 AI 機器人&#xff1a;達芬奇&#xff0c;現在可以在小紅書 APP 上用了 得益于小紅書平臺的特性&#xff0c;該助手擅長吃、住、寵、喝、學等等各類生活知識&#xff0c;目前還在搞活動&#xff0c;寫評測筆記最高得 666 元

為什么不能在foreach中刪除元素

文章目錄 快速失敗機制&#xff08;fail-fast&#xff09;for-each刪除元素為什么報錯原因分析邏輯分析 如何正確的刪除元素remove 后 breakfor 循環使用 Iterator 總結 快速失敗機制&#xff08;fail-fast&#xff09; In systems design, a fail-fast system is one which i…

合肥高新區建設世界領先科技園區政策(商務部分)申報獎勵補貼和條件材料、時間指南

一、合肥高新區建設世界領先科技園區政策&#xff08;商務部分&#xff09;申報主體 &#xff08;更多政策可以查看小編主頁方式&#xff09; 工商、稅務、統計關系均在合肥高新區&#xff0c;并在高新區持續經營。申請項目在高新區內實施、且符合政策要求的具有獨立法人資格…

網絡基礎:EIGRP

EIGRP&#xff08;Enhanced Interior Gateway Routing Protocol&#xff09;是由思科開發的一種高級距離矢量路由協議&#xff0c;結合了距離矢量和鏈路狀態路由協議的優點&#xff1b;EIGRP具有快速收斂、高效帶寬利用、負載均衡等特點&#xff0c;適用于各種規模的網絡。EIGR…

python sklearn機械學習-數據預處理

&#x1f308;所屬專欄&#xff1a;【機械學習】?作者主頁&#xff1a; Mr.Zwq??個人簡介&#xff1a;一個正在努力學技術的Python領域創作者&#xff0c;擅長爬蟲&#xff0c;逆向&#xff0c;全棧方向&#xff0c;專注基礎和實戰分享&#xff0c;歡迎咨詢&#xff01; 您…

【設計模式】策略模式(定義 | 特點 | Demo入門講解)

文章目錄 定義策略模式的結構 QuickStart | DemoStep1 | 策略接口Step2 | 策略實現Step3 | 上下文服務類Step4 | 客戶端 策略模式的特點優點缺點 定義 策略模式Strategy是一種行為模式&#xff0c;它能定義一系列算法&#xff0c;并將每種算法分別放入到獨立的類中&#xff0c…

書籍表達式得到期望結果的組成種數

題目 給定一個只由0(假)、1(真)、&(邏輯與)、|(邏輯或)和^(異或)五種組成的字符串express&#xff0c;再給定一個布爾值desired。返回express能有多少種組合方式。可以達到desired的結果。 舉例 express“1^0|0|1”,desiredfalse. 只有1^((0|0)|1)和1^(0|(0|1))的組合可…