準備工作
本文簡述Flink
在Linux
中安裝步驟,和示例程序的運行。需要安裝JDK1.8
及以上版本。
下載地址:下載Flink
的二進制包
點進去后,選擇如下鏈接:
解壓flink-1.10.1-bin-scala_2.12.tgz
,我這里解壓到soft
目錄
[root@hadoop1 softpackage]# tar -zxvf flink-1.10.1-bin-scala_2.12.tgz -C ../soft/
單節點安裝
解壓后進入Flink
的bin
目錄執行如下腳本即可
[root@hadoop1 bin]# ./start-cluster.sh Starting cluster.Starting standalonesession daemon on host hadoop1.Starting taskexecutor daemon on host hadoop1.
進入Flink
頁面看看,如果沒有修改配置中的端口,默認是8081
## 集群安裝
集群安裝分為以下幾步:(注意:hadoopx
都是我配置了/etc/hosts
域名的)bin
【1】將hadoop1
中解壓的Flink
分發到其他機器上,同時我也配置了免密登錄SSH
(也可以手動復制low
)。
[root@hadoop1 soft]# xsync flink-1.10.1
執行完后,我們就可以在hadoop2
和hadoop3
中看到flink
【2】選擇hadoop1
作為master
節點,然后修改所有機器conf/flink-conf.yaml
(修改hadoop1
分發即可)jobmanager.rpc.address
密鑰以指向您的主節點。您還應該通過設置jobmanager.heap.size和taskmanager.memory.process.size
鍵來定義允許Flink
在每個節點上分配的最大主內存量。這些值以MB
為單位。如果某些工作節點有更多的主內存要分配給Flink
系統,則可以通過在這些特定節點上設置 taskmanager.memory.process.size或taskmanager.memory.flink.size
在conf / flink-conf.yaml
中覆蓋默認值。
jobmanager.rpc.address = master主機名
【3】修改master
的conf/slaves
提供集群中所有節點的列表,這些列表將用作工作節點。我的是hadoop2
和hadoop3
。類似于HDFS
配置,編輯文件conf / slaves
并輸入每個輔助節點的IP
/主機名。每個工作節點稍后都將運行TaskManager
。
hadoop2
hadoop3
以上示例說明了具有三個節點(主機名hadoop1
作為master
,hadoop2
和hadoop3
作為worker
)的設置,并顯示了配置文件的內容。Flink
目錄必須在同一路徑下的每個工作線程上都可用。您可以使用共享的NFS
(網絡文件系統)目錄,也可以將整個Flink
目錄復制到每個工作節點。特別是:
1、每個JobManager
的可用內存量jobmanager.heap.size
;
2、每個TaskManager
的可用內存量(taskmanager.memory.process.size
并查看內存設置指南);
3、每臺計算機可用的CPU
數(taskmanager.numberOfTaskSlots
);
4、集群中的CPU
總數(parallelism.default
);
5、臨時目錄(io.tmp.dirs
);
【4】在master
上啟動集群(第一行)以及執行結果。下面的腳本在本地節點上啟動JobManager
,并通過SSH
連接到slaves
文件中列出的所有輔助節點,以在每個節點上啟動TaskManager
。現在,您的 Flink系統已啟動并正在運行。現在,在本地節點上運行的JobManager
將在配置的RPC
端口上接受作業。要停止Flink
,還有一個stop-cluster.sh
腳本。
[root@hadoop1 flink-1.10.1]# bin/start-cluster.sh Starting cluster.Starting standalonesession daemon on host hadoop1.Starting taskexecutor daemon on host hadoop2.Starting taskexecutor daemon on host hadoop3.
【5】Flink
界面展示 :進入8081
端口,例如:http://hadoop1:8081/
或者通過jps
命令查看服務也可行。
Standalone
集群架構展示:client
客戶端提交任務給JobManager
,JobManager
負責Flink
集群計算資源管理,并分發任務給TaskManager
執行,TaskManager
定期向JobManager
匯報狀態。
運行 flink示例程序
批處理示例:提交Flink
的批處理examples
程序:也可以在頁面中進行提交,但是作為一名NB
的程序員就使用命令
[root@hadoop1 flink-1.10.1]# bin/flink run examples/batch/WordCount.jar
執行上面的命令后,就會顯示如下信息,這是Flink
提供的examples
下的批處理例子程序,統計單詞個數。
[root@hadoop1 flink-1.10.1]# bin/flink run examples/batch/WordCount.jar
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 99f4c579947a66884ec269ddf5f5b0ed
Program execution finished
Job with JobID 99f4c579947a66884ec269ddf5f5b0ed has finished.
Job Runtime: 795 ms
Accumulator Results:
- b70332353f355cf0464b0eba21f61075 (java.util.ArrayList) [170 elements](a,5)
(action,1)
(after,1)
(against,1)
(all,2)
(and,12)
(arms,1)
(arrows,1)
(awry,1)
(ay,1)
(bare,1)
(be,4)
(bear,3)
(bodkin,1)
(bourn,1)
(but,1)
(by,2)
(calamity,1)
(cast,1)
(coil,1)
(come,1)
(conscience,1)
(consummation,1)
(contumely,1)
(country,1)
(cowards,1)
(currents,1)
......
得到結果,這里統計的是默認的數據集,可以通過--input --output
指定輸入輸出。我們可以在頁面中查看運行的情況:
流處理示例:啟動
nc
服務器:
[root@hadoop1 flink-1.10.1]# nc -lk 9000
提交Flink
的批處理examples
程序:
[root@hadoop1 flink-1.10.1]# bin/flink run examples/streaming/SocketWindowWordCount.jar --hostname hadoop1 --port 9000
這是Flink
提供的examples
下的流處理例子程序,接收socket
數據傳入,統計單詞個數。在nc
端隨意寫入單詞
[root@hadoop1 flink-1.10.1]# nc -lk 9000gs
進入slave
節點(hadoop2
,hadoop3
),進入Flink
安裝目錄輸入如下命令,查看實時數據變化
[root@hadoop2 flink-1.10.1]# tail -f log/flink-*-taskexecutor-*.out
s : 1: 2
w : 1
d : 1
g : 1
d : 1
停止Flink
[root@hadoop1 flink-1.10.1]# bin/stop-cluster.sh
在Flink
的web
中查看運行的job
將 JobManager / TaskManager 實例添加到集群(擴展)
您可以使用bin/jobmanager.sh
和bin/taskmanager.sh
腳本將JobManager
和TaskManager
實例添加到正在運行的集群中。添加JobManager
(確保在要啟動/停止相應實例的主機上調用這些腳本)
[root@hadoop1 flink-1.10.1]# bin/jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all
添加任務管理器
[root@hadoop1 flink-1.10.1]# bin/taskmanager.sh start|start-foreground|stop|stop-all
YARN模式
在企業中,經常需要將Flink
集群部署到YARN
,因為可以使用YARN
來管理所有計算資源。而且Spark
程序也可以部署到YARN
上。CliFrontend
是所有job
的入口類,通過解析傳遞的參數(jar
包,mainClass
等),讀取flink
的環境,配置信息等,封裝成PackagedProgram
,最終通過ClusterClient
提交給Flink
集群。Flink
運行在YARN
上,提供了兩種方式:
第一種使用yarn-session
模式來快速提交作業到YARN
集群。如下,在Yarn
中初始化一個flink
集群,開辟指定的資源,以后提交任務都向這里提交,這個flink
集群會常駐在Yarn
集群中,除非手動停止。共享Dispatcher
與ResourceManager
,共享資源。有大量的小作業,適合使用這種方式;
YarnSessionClusterEntrypoint
是Flink
在Yarn
上的線程。ApplicationMaster
是JobManager
。YarnTaskExecutorRunner
負責接收subTask
并運行,是TaskManager
。
【1】修改Hadoop
的etc/hadoop/yarn-site.xml
,添加該配置表示內存超過分配值,是否將任務殺掉。默認為true
。運行Flink
程序,很容易超過分配的內存。
<property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value>
</property>
【2】 添加環境變量
//查看是否配置HADOOP_CONF_DIR,我這里沒有配置輸出為空
[root@hadoop1 hadoop-2.7.2]# echo $HADOOP_CONF_DIR//在系統變量中添加 HADOOP_CONF_DIR
[root@hadoop1 hadoop-2.7.2]# vim /etc/profile
//添加如下內容,wq保存退出
export HADOOP_CONF_DIR=$HADOOP_HOME/conf/
//刷新 /etc/profile
[root@hadoop1 hadoop-2.7.2]# source /etc/profile//重新查看是否配置HADOOP_CONF_DIR
[root@hadoop1 hadoop-2.7.2]# echo $HADOOP_CONF_DIR
/opt/module/hadoop-2.7.2/conf/
【3】啟動HDFS
、YARN
集群。通過jps
查看啟動狀況。關閉flink
的其他集群。
[root@hadoop1 hadoop-2.7.2]# sbin/start-all.sh
[root@hadoop2 hadoop-2.7.2]# jps
10642 NodeManager
11093 Jps
10838 ResourceManager
10535 DataNode
10168 TaskManagerRunner
【4】將官方指定Pre-bundled Hadoop 2.7.5包放到flink
的lib
目錄下。使用yarn-session
模式提交作業
使用Flink
中的yarn-session
(yarn
客戶端),會啟動兩個必要服務JobManager
和TaskManagers
;
客戶端通過yarn-session
提交作業;
yarn-session
會一直啟動,不停地接收客戶端提交的作用。
-n 表示申請2個容器
-s 表示每個容器啟動多少個slot
-tm 表示每個TaskManager申請800M內存
-nm yarn 的 appName,
-d detached表示以后臺程序方式運行
如下表示啟動一個yarn session
集群,每個JM
為1G
,TM
的內存是1G
。
[root@hadoop1 flink-1.10.1]# bin/yarn-session.sh -n 2 -jm 1024m -tm 1024m -d
客戶端默認是attach
模式,不會退出 。可以ctrl+c
退出,然后再通過如下命令連上來。或者啟動的時候用-d
則為detached
模式
./bin/yarn-session.sh -id application_1594027553009_0001(這個id來自下面hadoop集群)
Yarn
上顯示為Flink session cluster
,一致處于運行狀態。
點擊
ApplicationMaster
就會進入Flink
集群
啟動命令行中也會顯示如下的
JobManager
啟動的Web
界面
JobManager Web Interface: http://hadoop1:34431
然后我們可以通過jps
來看下當前的進程,其中YarnSessionClusterEntrypoint
就是我們Yarn Session
的分布式集群。
[root@hadoop1 flink-1.10.1]# jps
69923 NodeManager
81267 Jps
69394 NameNode
69531 DataNode
80571 FlinkYarnSessionCli
80765 YarnSessionClusterEntrypoint
/tmp
下生成了一個文件
將Flink
應用部署到Flink On Yarn 之 session
方式中。
[root@hadoop1 flink-1.10.1]# bin/flink run -d examples/streaming/WordCount.jar
查看運行結果:
Flink On Yarn
之session
部署方式集群停止:關閉Yarn
就會關閉Flink
集群。。。
第二種模式:使用Per-JOBYarn
分離模式(與當前客戶端無關,當客戶端提交完任務就結束,不用等到Flink
應用執行完畢)提交作業:每次提交都會創建一個新的flink
集群,任務之間相互獨立,互不影響,方便管理。任務執行完成之后創建的集群也會消失。 直接提交任務給YARN
,獨享Dispatcher
與ResourceManager
。按需要申請資源。適合執行時間較長的大作業。
AM
啟動類是YarnJobClusterEntrypoint
。YarnTaskExecutorRunner
負責接收subTask
,就是TaskManager
。需要打開hadoop
和yarn
分布式集群。不需要啟動flink
分布式集群,它會自動啟動flink
分布式集群。
[root@hadoop1 flink-1.10.1]# bin/flink run -m yarn-cluster -d ./examples/streaming/WordCount.jar
2020-07-13 03:21:50,479 WARN org.apache.flink.yarn.cli.FlinkYarnSessionCli - The configuration directory ('/usr/local/soft/flink-1.10.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2020-07-13 03:21:50,479 WARN org.apache.flink.yarn.cli.FlinkYarnSessionCli - The configuration directory ('/usr/local/soft/flink-1.10.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
2020-07-13 03:21:50,707 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at hadoop2/192.168.52.129:8032
2020-07-13 03:21:50,791 INFO org.apache.flink.yarn.YarnClusterDescriptor - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2020-07-13 03:21:50,928 WARN org.apache.flink.yarn.YarnClusterDescriptor - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN.
2020-07-13 03:21:51,001 INFO org.apache.flink.yarn.YarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1728, slotsPerTaskManager=1}
2020-07-13 03:21:53,906 INFO org.apache.flink.yarn.YarnClusterDescriptor
-yn
:yarncontainer
表示TaskManager
的個數;
-yqu
:yarnqueue
指定yarn
的隊列;
-ys
:yarnslots
每一個TaskManager
對應的slot
個數;
上傳成功之后,我們可以在Hadoop
的圖形化界面:http://hadoop2:8088/cluster/apps 中看到當前任務的信息;