1)概述
1.執行配置
StreamExecutionEnvironment
包含了 ExecutionConfig
,它允許在運行時設置作業特定的配置值。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ExecutionConfig executionConfig = env.getConfig();
以下是可用的配置選項:(默認為粗體)
setClosureCleanerLevel()
。closure cleaner 的級別默認設置為ClosureCleanerLevel.RECURSIVE
。closure cleaner 刪除 Flink 程序中對匿名 function 的調用類的不必要引用。禁用 closure cleaner 后,用戶的匿名 function 可能正引用一些不可序列化的調用類。這將導致序列化器出現異常。可設置的值是:NONE
:完全禁用 closure cleaner ,TOP_LEVEL
:只清理頂級類而不遞歸到字段中,RECURSIVE
:遞歸清理所有字段。getParallelism()
/setParallelism(int parallelism)
。為作業設置默認的并行度。getMaxParallelism()
/setMaxParallelism(int parallelism)
。為作業設置默認的最大并行度。此設置決定最大并行度并指定動態縮放的上限。getNumberOfExecutionRetries()
/setNumberOfExecutionRetries(int numberOfExecutionRetries)
。設置失敗任務重新執行的次數。值為零會有效地禁用容錯。-1
表示使用系統默認值(在配置中定義)。該配置已棄用,請改用重啟策略。getExecutionRetryDelay()
/setExecutionRetryDelay(long executionRetryDelay)
。設置系統在作業失敗后重新執行之前等待的延遲(以毫秒為單位)。在 TaskManagers 上成功停止所有任務后,開始計算延遲,一旦延遲過去,任務會被重新啟動。該配置已被棄用,請改用重啟策略 。getExecutionMode()
/setExecutionMode()
。默認的執行模式是 PIPELINED。設置執行模式以執行程序。執行模式定義了數據交換是以批處理方式還是以流方式執行。enableForceKryo()
/disableForceKryo
。默認情況下不強制使用 Kryo。強制 GenericTypeInformation 對 POJO 使用 Kryo 序列化器,即使可以將它們作為 POJO 進行分析。在某些情況下,應該優先啟用該配置。例如,當 Flink 的內部序列化器無法正確處理 POJO 時。enableForceAvro()
/disableForceAvro()
。默認情況下不強制使用 Avro。強制 Flink AvroTypeInfo 使用 Avro 序列化器而不是 Kryo 來序列化 Avro 的 POJO。enableObjectReuse()
/disableObjectReuse()
。默認情況下,Flink 中不重用對象。啟用對象重用模式會指示運行時重用用戶對象以獲得更好的性能。注意可能會導致bug。getGlobalJobParameters()
/setGlobalJobParameters()
。此方法允許用戶將自定義對象設置為作業的全局配置。由于ExecutionConfig
可在所有用戶定義的 function 中訪問,因此這是一種使配置在作業中全局可用的簡單方法。addDefaultKryoSerializer(Class type, Serializer serializer)
。為指定的類型注冊 Kryo 序列化器實例。addDefaultKryoSerializer(Class type, Class> serializerClass)
。為指定的類型注冊 Kryo 序列化器的類。registerTypeWithKryoSerializer(Class type, Serializer serializer)
。使用 Kryo 注冊指定類型并為其指定序列化器。通過使用 Kryo 注冊類型,該類型的序列化將更加高效。registerKryoType(Class type)
。如果類型最終被 Kryo 序列化,那么它將在 Kryo 中注冊,以確保只有標記(整數 ID)被寫入。如果一個類型沒有在 Kryo 注冊,它的全限定類名將在每個實例中被序列化,從而導致更高的 I/O 成本。registerPojoType(Class type)
。將指定的類型注冊到序列化棧中。如果該類型最終被序列化為 POJO,那么該類型將注冊到 POJO 序列化器中。如果該類型最終被 Kryo 序列化,那么它將在 Kryo 中注冊,以確保只有標記被寫入。如果一個類型沒有在 Kryo 注冊,它的全限定類名將在每個實例中被序列化,從而導致更高的I/O成本。
注意:用 registerKryoType()
注冊的類型對 Flink 的 Kryo 序列化器實例來說是不可用的。
disableAutoTypeRegistration()
。自動類型注冊在默認情況下是啟用的。自動類型注冊是將用戶代碼使用的所有類型(包括子類型)注冊到 Kryo 和 POJO 序列化器。setTaskCancellationInterval(long interval)
。設置嘗試連續取消正在運行任務的等待時間間隔(以毫秒為單位)。當一個任務被取消時,會創建一個新的線程,如果任務線程在一定時間內沒有終止,新線程就會定期調用任務線程上的interrupt()
方法。這個參數是指連續調用interrupt()
的時間間隔,默認設置為 30000 毫秒,或 30秒 。
通過 getRuntimeContext()
方法在 Rich*
function 中訪問到的 RuntimeContext
也允許在所有用戶定義的 function 中訪問 ExecutionConfig
。
2.程序打包和分布式運行
a)打包程序
為了能夠通過命令行或 web 界面執行打包的 JAR 文件,程序必須使用通過 StreamExecutionEnvironment.getExecutionEnvironment()
獲取的 environment。當 JAR 被提交到命令行或 web 界面后,該 environment 會扮演集群環境的角色。如果調用 Flink 程序的方式與上述接口不同,該 environment 會扮演本地環境的角色。
打包程序只要簡單地將所有相關的類導出為 JAR 文件,JAR 文件的 manifest 必須指向包含程序入口點(擁有公共 main
方法)的類。實現的最簡單方法是將 main-class 寫入 manifest 中(比如 main-class: org.apache.flinkexample.MyProgram
)。main-class 屬性與 Java 虛擬機通過指令 java -jar pathToTheJarFile
執行 JAR 文件時尋找 main 方法的類是相同的。
大多數 IDE 提供了在導出 JAR 文件時自動包含該屬性的功能。
b)總結
調用打包后程序的完整流程包括兩步:
- 搜索 JAR 文件 manifest 中的 main-class 或 program-class 屬性。如果兩個屬性同時存在,program-class 屬性會優先于 main-class 屬性。對于 JAR manifest 中兩個屬性都不存在的情況,命令行和 web 界面支持手動傳入入口點類名參數。
- 系統接著調用該類的 main 方法。
3.并行執行
a)概述
一個 Flink 程序由多個任務 task 組成(轉換/算子、數據源和數據接收器)。一個 task 包括多個并行執行的實例,且每一個實例都處理 task 輸入數據的一個子集。一個 task 的并行實例數被稱為該 task 的 并行度 (parallelism)。
使用 savepoints 時,應該考慮設置最大并行度。當作業從一個 savepoint 恢復時,可以改變特定算子或者整個程序的并行度,并且此設置會限定整個程序的并行度的上限。由于在 Flink 內部將狀態劃分為了 key-groups,且性能所限不能無限制地增加 key-groups,因此設定最大并行度是有必要的。
b)設置并行度
算子層面
單個算子、數據源和數據接收器的并行度可以通過調用 setParallelism()
方法來指定。如下所示:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> text = [...];
DataStream<Tuple2<String, Integer>> wordCounts = text.flatMap(new LineSplitter()).keyBy(value -> value.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).sum(1).setParallelism(5);wordCounts.print();env.execute("Word Count Example");
執行環境層次
Flink 程序運行在執行環境的上下文中。執行環境為所有執行的算子、數據源、數據接收器 (data sink) 定義了一個默認的并行度。可以顯式配置算子層次的并行度去覆蓋執行環境的并行度。
可以通過調用 setParallelism()
方法指定執行環境的默認并行度。如果想以并行度3
來執行所有的算子、數據源和數據接收器。可以在執行環境上設置默認并行度,如下所示:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);DataStream<String> text = [...];
DataStream<Tuple2<String, Integer>> wordCounts = [...];
wordCounts.print();env.execute("Word Count Example");
客戶端層次
將作業提交到 Flink 時可在客戶端設定其并行度。客戶端可以是 Java 或 Scala 程序,Flink 的命令行接口(CLI)就是一種典型的客戶端。
在 CLI 客戶端中,可以通過 -p
參數指定并行度,例如:
./bin/flink run -p 10 ../examples/*WordCount-java*.jar
在 Java/Scala 程序中,可以通過如下方式指定并行度:
try {PackagedProgram program = new PackagedProgram(file, args);InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");Configuration config = new Configuration();Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());// set the parallelism to 10 hereclient.run(program, 10, true);} catch (ProgramInvocationException e) {e.printStackTrace();
}
系統層次
可以通過設置 Flink 配置文件中的 parallelism.default
參數,在系統層次來指定所有執行環境的默認并行度。
c)設置最大并行度
最大并行度可以在所有設置并行度的地方進行設定(客戶端和系統層次除外)。與調用 setParallelism()
方法修改并行度相似,可以通過調用 setMaxParallelism()
方法來設定最大并行度。
默認的最大并行度等于將 operatorParallelism + (operatorParallelism / 2)
值四舍五入到大于等于該值的一個整型值,并且這個整型值是 2
的冪次方,注意默認最大并行度下限為 128
,上限為 32768
。
為最大并行度設置一個非常大的值將會降低性能,因為一些 state backends 需要維持內部的數據結構,而這些數據結構將會隨著 key-groups 的數目而擴張(key-group 是狀態重新分配的最小單元)。
從之前的作業恢復時,改變該作業的最大并發度將會導致狀態不兼容。