下載 1.15.1
https://flink.apache.org/downloads.html#apache-flink-1151
部署模式分類
- 會話模式
- 應用模式
- 單作業模式
1、會話模式
先啟動一個集群,保持一個會話,然后通過客戶端提交作業,所有作業都在一個會話執行;
會話模式適合規模小、執行時間短的大量作業;
2、應用模式
前兩種模式應用代碼都是在客戶端運行,然后由客戶端提交給jobmanager的,這種方式的弊端是:需要占用大量網絡帶寬,去下載依賴和把二進制數據發送給jobmanager,將會加重客戶端資源消耗。
所以Application Mode的解決辦法是:不需要客戶端,直接把應用提交到jobmanager上運行,這意味著要為每個提交的應用單獨啟動一個jobmanager,也就是創建一個集群,
jobmanager執行完自己的應用將會關閉
應用模式與單作業模式,都是提交作業之后才創建集群;單作業模式是通過客戶端來提交的,客戶端解析出的每一個作業對應一個集群;而應用模式下,是直接由 JobManager 執行應用程序的,即使應用包含了多個作業,也只創建一個集群。此模式用的比較少,
3、單作業模式
為每個作業啟動一個集群,只要客戶端提交了一個作業,就為這個作業啟動一個單獨的集群,這個集群只為這個作業提供服務;其
一、獨立會話模式(Standalone)-部署
flink只支持linux部署
1、解壓
tar -zvxf flink-1.15.1-bin-scala_2.12.tgz
2、修改配置文件
vim conf/flink-conf.yaml
# 修改以下內容
jobmanager.rpc.address: 192.168.31.250 # 選擇當前主機的ip地址,如果是云服務器,使用外網ip
# JobManager將綁定到的主機接口,默認值為 localhost 禁止外部訪問,設為0.0.0.0表示允許外部訪問,設置錯誤的話 Available Task Slots 會顯示0
jobmanager.bind-host: 0.0.0.0
# 任務插槽數量,相當于使用多少個線程來執行流
taskmanager.numberOfTaskSlots: 2
parallelism.default: 1web.submit.enable: true
# 指定TaskManager主機的地址,單機部署的話,用localhost即可
taskmanager.host: 192.168.31.250
# web前端展示的端口,自己設置
rest.port: 8081
# 客戶端應該用來連接到服務器的地址。注意:僅當高可用性配置為 NONE 時才考慮此選項
rest.address: 192.168.31.250
# 允許外部ip訪問的地址,默認情況下是localhost,只能內部訪問,改為0.0.0.0允許所有外部ip訪問
rest.bind-address: 0.0.0.0
3、修改master文件,
vim conf/masters # 填寫主節點的ip地址,如果是云服務器,使用外網ip
192.168.31.250:8081
4、修改 workers 文件
vim conf/workers# 添加 taskManager 節點的ip地址列表,如果是單節點,只填寫主節點ip地址即可
192.168.31.250
192.168.31.251
192.168.31.252
5、、啟動
bin/start-cluster.sh
啟動成功后,命令行會顯示如下信息
[root@dev-server bin]# ./start-cluster.sh
Starting cluster. # 啟動集群
Starting standalonesession daemon on host dev-server. # 啟動會話模式的 作業調度器 jobmanager
Starting taskexecutor daemon on host dev-server. # 啟動任務管理器
通過jps命令可以看到已經啟動的flink
[root@dev-server bin]# jps
3010991 TaskManagerRunner # 任務調度器 taskManager
3010438 StandaloneSessionClusterEntrypoint # 會話模式的節點
3023395 Jps
說明:
- JobManager 的啟動代碼:standalonesession,實現類是:StandaloneSessionClusterEntrypoint
- TaskManager 的啟動代碼:taskexecutor,實現類是:TaskManagerRunner
6、、訪問ui界面
http://192.168.31.250:8081
7、、停止flink
bin/stop-cluster.sh
二、提交作業
1、編寫作業代碼
新建maven項目,pom.xml 加入flink的依賴
<properties><java.version>1.8</java.version><scala-binary-version>2.12</scala-binary-version><flink-version>1.13.0</flink-version><slf4j-version>1.7.30</slf4j-version></properties><dependencies>
<!-- flink 依賴--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala-binary-version}</artifactId><version>${flink-version}</version></dependency>
<!-- flink 客戶端,主要做一些管理相關的工作,如果不需要,就不需要導入此依賴--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala-binary-version}</artifactId><version>${flink-version}</version></dependency><!-- 日志相關依賴--><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j-version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j-version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>2.14.0</version></dependency></dependencies>
2、編寫java代碼
package com.demo;/*** @author yexd*/import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @title: 無界流處理* @Author yexd* @Date: 2022/8/7 20:10* @Version 1.0*/
public class UnboundedStreamWord {static String ip = "192.168.31.250";static int port = 9879;/*** 先將文件中的每一行進行分詞,然后統計每個單詞出現的次數* @param args* @throws Exception*/public static void main(String[] args) throws Exception {// 創建執行環境StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();// 讀取網絡流,在linux系統輸入命令 : nc -lk 8888 后,就可以進行通訊了,-lk表示保持當前的連接并持續監聽8888端口DataStreamSource<String> stringDataStreamSource = executionEnvironment.socketTextStream(ip,port);// 將每行數據根據空格切割后進行分詞,轉換成二元組, FlatMapOperator<輸入的數據類型, 輸出的數據類型>SingleOutputStreamOperator<Tuple2<String, Long>> operator = stringDataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {// 將每行進行切割String[] words = line.split(" ");for (String word : words) {// 將每個單詞轉換成二元組進行輸出,其中第一個 word 表示單詞本身, 1L表示每個單詞出現的次數,后面會用這個次數來進行統計單詞出現的總數out.collect(Tuple2.of(word, 1L));}});// 返回分詞后的結果,FlatMapOperator<輸入的數據類型, 輸出的數據類型>SingleOutputStreamOperator<Tuple2<String, Long>> returns = operator.returns(Types.TUPLE(Types.STRING, Types.LONG));// 按照分詞進行分組,keyBy 參數中的 f0 表示根據第幾個字段進行分組(從0開始), 很明顯,Tuple2的第一個字段是String類型,也就是剛剛分好詞后的單詞KeyedStream<Tuple2<String, Long>, Object> tuple2UnsortedGrouping = returns.keyBy(data -> data.f0);// 分組內進行聚合統計,sum 中的參數1 表示根據第幾個屬性進行統計,Tuple2<String, Long> 很明顯第二個屬性是Long,在上面我們將這個屬性都置為1了,所以會進行統計SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2UnsortedGrouping.sum(1);// 打印sum.print();// 啟動執行executionEnvironment.execute();/**打印結果:4> (123,1)5> (hello,1)15> (456,1)5> (hello,2)4> (123,2)5> (hello,3)說明: 大于號前面的數字表示 線程的編號,表示使用不同的線程進行處理,也就是并行流*/}
}
3、打包,通過以下命令將項目打成 jar 包
maven clean package
3、添加作業
在頁面中選擇 Submit New Job -> Add New ,
選擇剛剛打好的jar包
上傳后點擊jar的名稱,有些信息需要填寫一下
說明:
- Entry Class : jar包中 main 方法所在類的全類名
- Parallelism : 并行度,就是用多線程去執行作業,調成多少就用多少個線程執行作業
- Program Arguments : 傳入main 方法的參數,多個參數用空格隔開
- Savepoint Path :保存點路徑,比如你作業執行到一半,但是flink服務器需要重啟,就會先暫停作業,然后將執行到一半的作業保存起來,待重啟后繼續執行,這里配置就是保存的路徑;如果不需要保存,為空就行
4、提交之前的改動
因為在java代碼里面用的無界流處理,也就是說,數據是通過 socket 網絡傳輸的,如果不先啟動監聽的話,現在盲目提交就會導致報錯,而我的代碼里監聽了 192.168.31.250 的 9879端口, 所以需要在 192.168.31.250 的服務器上輸入以下命令來監聽 9879 的端口
# -lk表示保持當前的連接并持續監聽9879端口
nc -lk 9879
5、提交
以下是我的配置,然后點擊 Submit 就可以提交了
提交后 一次點擊左邊的菜單欄 Jobs -> Running Jobs ,就可以可以看到剛剛提交的任務了,點進去看看
說明:
- 綠色的RUNNING 表示正在運行中,如果是紅色的字體,就表示有錯誤
- RUNNING旁邊綠色的 2 表示并行度,表示有2個線程執行這個作業
- 底部表格展示的是運行的時長、數據流大小、任務數量等信息
- Cancel Job : 可通過此按鈕來停止作業
6、往flink發送消息
剛剛啟動了 linux 監聽了 9879 端口,發送了2條信息
然后依次點擊 TaskManager -> 任務id
最后點擊 Stout 就可以看到輸入的內容了