Flink 本地單機/Standalone集群/YARN模式集群搭建

準備工作

本文簡述FlinkLinux中安裝步驟,和示例程序的運行。需要安裝JDK1.8及以上版本。

下載地址:下載Flink的二進制包

在這里插入圖片描述
點進去后,選擇如下鏈接:
在這里插入圖片描述
解壓flink-1.10.1-bin-scala_2.12.tgz,我這里解壓到soft目錄

[root@hadoop1 softpackage]# tar -zxvf flink-1.10.1-bin-scala_2.12.tgz -C ../soft/

單節點安裝

解壓后進入Flinkbin目錄執行如下腳本即可

 [root@hadoop1 bin]# ./start-cluster.sh Starting cluster.Starting standalonesession daemon on host hadoop1.Starting taskexecutor daemon on host hadoop1.

進入Flink頁面看看,如果沒有修改配置中的端口,默認是8081
在這里插入圖片描述## 集群安裝

集群安裝分為以下幾步:(注意:hadoopx都是我配置了/etc/hosts域名的)bin
【1】將hadoop1中解壓的Flink分發到其他機器上,同時我也配置了免密登錄SSH(也可以手動復制low)。

[root@hadoop1 soft]# xsync flink-1.10.1

執行完后,我們就可以在hadoop2hadoop3中看到flink
在這里插入圖片描述
【2】選擇hadoop1作為master節點,然后修改所有機器conf/flink-conf.yaml(修改hadoop1分發即可)jobmanager.rpc.address密鑰以指向您的主節點。您還應該通過設置jobmanager.heap.size和taskmanager.memory.process.size鍵來定義允許Flink在每個節點上分配的最大主內存量。這些值以MB為單位。如果某些工作節點有更多的主內存要分配給Flink系統,則可以通過在這些特定節點上設置 taskmanager.memory.process.size或taskmanager.memory.flink.sizeconf / flink-conf.yaml中覆蓋默認值。

jobmanager.rpc.address = master主機名

【3】修改masterconf/slaves提供集群中所有節點的列表,這些列表將用作工作節點。我的是hadoop2hadoop3。類似于HDFS配置,編輯文件conf / slaves并輸入每個輔助節點的IP /主機名。每個工作節點稍后都將運行TaskManager

hadoop2
hadoop3

以上示例說明了具有三個節點(主機名hadoop1作為masterhadoop2hadoop3作為worker)的設置,并顯示了配置文件的內容。Flink目錄必須在同一路徑下的每個工作線程上都可用。您可以使用共享的NFS(網絡文件系統)目錄,也可以將整個Flink目錄復制到每個工作節點。特別是:
1、每個JobManager的可用內存量jobmanager.heap.size
2、每個TaskManager的可用內存量(taskmanager.memory.process.size并查看內存設置指南);
3、每臺計算機可用的CPU數(taskmanager.numberOfTaskSlots);
4、集群中的CPU總數(parallelism.default);
5、臨時目錄(io.tmp.dirs);
【4】在master上啟動集群(第一行)以及執行結果。下面的腳本在本地節點上啟動JobManager,并通過SSH連接到slaves文件中列出的所有輔助節點,以在每個節點上啟動TaskManager。現在,您的 Flink系統已啟動并正在運行。現在,在本地節點上運行的JobManager將在配置的RPC端口上接受作業。要停止Flink,還有一個stop-cluster.sh腳本。

 [root@hadoop1 flink-1.10.1]# bin/start-cluster.sh Starting cluster.Starting standalonesession daemon on host hadoop1.Starting taskexecutor daemon on host hadoop2.Starting taskexecutor daemon on host hadoop3.

【5】Flink界面展示 :進入8081端口,例如:http://hadoop1:8081/ 或者通過jps命令查看服務也可行。
在這里插入圖片描述Standalone集群架構展示:client客戶端提交任務給JobManagerJobManager負責Flink集群計算資源管理,并分發任務給TaskManager執行,TaskManager定期向JobManager匯報狀態。
在這里插入圖片描述

運行 flink示例程序

批處理示例:提交Flink的批處理examples程序:也可以在頁面中進行提交,但是作為一名NB的程序員就使用命令

 [root@hadoop1 flink-1.10.1]# bin/flink run examples/batch/WordCount.jar

執行上面的命令后,就會顯示如下信息,這是Flink提供的examples下的批處理例子程序,統計單詞個數。

[root@hadoop1 flink-1.10.1]# bin/flink run examples/batch/WordCount.jar
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 99f4c579947a66884ec269ddf5f5b0ed
Program execution finished
Job with JobID 99f4c579947a66884ec269ddf5f5b0ed has finished.
Job Runtime: 795 ms
Accumulator Results:
- b70332353f355cf0464b0eba21f61075 (java.util.ArrayList) [170 elements](a,5)
(action,1)
(after,1)
(against,1)
(all,2)
(and,12)
(arms,1)
(arrows,1)
(awry,1)
(ay,1)
(bare,1)
(be,4)
(bear,3)
(bodkin,1)
(bourn,1)
(but,1)
(by,2)
(calamity,1)
(cast,1)
(coil,1)
(come,1)
(conscience,1)
(consummation,1)
(contumely,1)
(country,1)
(cowards,1)
(currents,1)
......

得到結果,這里統計的是默認的數據集,可以通過--input --output指定輸入輸出。我們可以在頁面中查看運行的情況:
在這里插入圖片描述流處理示例:啟動nc服務器:

[root@hadoop1 flink-1.10.1]# nc -lk 9000

提交Flink的批處理examples程序:

[root@hadoop1 flink-1.10.1]# bin/flink run examples/streaming/SocketWindowWordCount.jar  --hostname hadoop1  --port 9000

這是Flink提供的examples下的流處理例子程序,接收socket數據傳入,統計單詞個數。在nc端隨意寫入單詞

 [root@hadoop1 flink-1.10.1]# nc -lk 9000gs

進入slave節點(hadoop2hadoop3),進入Flink安裝目錄輸入如下命令,查看實時數據變化

[root@hadoop2 flink-1.10.1]# tail -f log/flink-*-taskexecutor-*.out
s : 1: 2
w : 1
d : 1
g : 1
d : 1

停止Flink

[root@hadoop1 flink-1.10.1]# bin/stop-cluster.sh

Flinkweb中查看運行的job
在這里插入圖片描述

將 JobManager / TaskManager 實例添加到集群(擴展)

您可以使用bin/jobmanager.shbin/taskmanager.sh腳本將JobManagerTaskManager實例添加到正在運行的集群中。添加JobManager(確保在要啟動/停止相應實例的主機上調用這些腳本)

[root@hadoop1 flink-1.10.1]# bin/jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all

添加任務管理器

[root@hadoop1 flink-1.10.1]# bin/taskmanager.sh start|start-foreground|stop|stop-all

YARN模式

在企業中,經常需要將Flink集群部署到YARN,因為可以使用YARN來管理所有計算資源。而且Spark程序也可以部署到YARN上。CliFrontend是所有job的入口類,通過解析傳遞的參數(jar包,mainClass等),讀取flink的環境,配置信息等,封裝成PackagedProgram,最終通過ClusterClient提交給Flink集群。Flink運行在YARN上,提供了兩種方式:
第一種使用yarn-session模式來快速提交作業到YARN集群。如下,在Yarn中初始化一個flink集群,開辟指定的資源,以后提交任務都向這里提交,這個flink集群會常駐在Yarn集群中,除非手動停止。共享DispatcherResourceManager,共享資源。有大量的小作業,適合使用這種方式;
在這里插入圖片描述
YarnSessionClusterEntrypointFlinkYarn上的線程。ApplicationMasterJobManagerYarnTaskExecutorRunner負責接收subTask并運行,是TaskManager
【1】修改Hadoopetc/hadoop/yarn-site.xml,添加該配置表示內存超過分配值,是否將任務殺掉。默認為true。運行Flink程序,很容易超過分配的內存。

<property> <name>yarn.nodemanager.vmem-check-enabled</name>    <value>false</value> 
</property> 

【2】 添加環境變量

//查看是否配置HADOOP_CONF_DIR,我這里沒有配置輸出為空
[root@hadoop1 hadoop-2.7.2]# echo $HADOOP_CONF_DIR//在系統變量中添加 HADOOP_CONF_DIR
[root@hadoop1 hadoop-2.7.2]# vim /etc/profile
//添加如下內容,wq保存退出
export HADOOP_CONF_DIR=$HADOOP_HOME/conf/
//刷新 /etc/profile
[root@hadoop1 hadoop-2.7.2]# source /etc/profile//重新查看是否配置HADOOP_CONF_DIR
[root@hadoop1 hadoop-2.7.2]# echo $HADOOP_CONF_DIR
/opt/module/hadoop-2.7.2/conf/

【3】啟動HDFSYARN集群。通過jps查看啟動狀況。關閉flink的其他集群。

[root@hadoop1 hadoop-2.7.2]# sbin/start-all.sh
[root@hadoop2 hadoop-2.7.2]# jps
10642 NodeManager
11093 Jps
10838 ResourceManager
10535 DataNode
10168 TaskManagerRunner

【4】將官方指定Pre-bundled Hadoop 2.7.5包放到flinklib目錄下。使用yarn-session模式提交作業
在這里插入圖片描述
使用Flink中的yarn-sessionyarn客戶端),會啟動兩個必要服務JobManagerTaskManagers
客戶端通過yarn-session提交作業;
yarn-session會一直啟動,不停地接收客戶端提交的作用。

-n 表示申請2個容器
-s 表示每個容器啟動多少個slot
-tm 表示每個TaskManager申請800M內存
-nm yarn 的 appName,
-d detached表示以后臺程序方式運行

如下表示啟動一個yarn session集群,每個JM1GTM的內存是1G

[root@hadoop1 flink-1.10.1]# bin/yarn-session.sh -n 2 -jm 1024m -tm 1024m -d

客戶端默認是attach模式,不會退出 。可以ctrl+c退出,然后再通過如下命令連上來。或者啟動的時候用-d則為detached模式

./bin/yarn-session.sh -id application_1594027553009_0001(這個id來自下面hadoop集群)

在這里插入圖片描述Yarn上顯示為Flink session cluster,一致處于運行狀態。
在這里插入圖片描述點擊ApplicationMaster就會進入Flink集群
在這里插入圖片描述啟動命令行中也會顯示如下的JobManager啟動的Web界面

JobManager Web Interface: http://hadoop1:34431

在這里插入圖片描述

然后我們可以通過jps來看下當前的進程,其中YarnSessionClusterEntrypoint就是我們Yarn Session的分布式集群。

[root@hadoop1 flink-1.10.1]# jps
69923 NodeManager
81267 Jps
69394 NameNode
69531 DataNode
80571 FlinkYarnSessionCli
80765 YarnSessionClusterEntrypoint

/tmp下生成了一個文件

Flink應用部署到Flink On Yarn 之 session方式中。

[root@hadoop1 flink-1.10.1]# bin/flink run -d examples/streaming/WordCount.jar 

在這里插入圖片描述
查看運行結果:
在這里插入圖片描述
Flink On Yarnsession部署方式集群停止:關閉Yarn就會關閉Flink集群。。。

第二種模式:使用Per-JOBYarn分離模式(與當前客戶端無關,當客戶端提交完任務就結束,不用等到Flink應用執行完畢)提交作業:每次提交都會創建一個新的flink集群,任務之間相互獨立,互不影響,方便管理。任務執行完成之后創建的集群也會消失。 直接提交任務給YARN,獨享DispatcherResourceManager。按需要申請資源。適合執行時間較長的大作業。
在這里插入圖片描述
AM啟動類是YarnJobClusterEntrypointYarnTaskExecutorRunner負責接收subTask,就是TaskManager。需要打開hadoopyarn分布式集群。不需要啟動flink分布式集群,它會自動啟動flink分布式集群。

[root@hadoop1 flink-1.10.1]# bin/flink run -m yarn-cluster -d ./examples/streaming/WordCount.jar
2020-07-13 03:21:50,479 WARN  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - The configuration directory ('/usr/local/soft/flink-1.10.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2020-07-13 03:21:50,479 WARN  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - The configuration directory ('/usr/local/soft/flink-1.10.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
2020-07-13 03:21:50,707 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at hadoop2/192.168.52.129:8032
2020-07-13 03:21:50,791 INFO  org.apache.flink.yarn.YarnClusterDescriptor                   - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2020-07-13 03:21:50,928 WARN  org.apache.flink.yarn.YarnClusterDescriptor                   - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN.
2020-07-13 03:21:51,001 INFO  org.apache.flink.yarn.YarnClusterDescriptor                   - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1728, slotsPerTaskManager=1}
2020-07-13 03:21:53,906 INFO  org.apache.flink.yarn.YarnClusterDescriptor

-ynyarncontainer表示TaskManager的個數;
-yquyarnqueue指定yarn的隊列;
-ysyarnslots每一個TaskManager對應的slot個數;

上傳成功之后,我們可以在Hadoop的圖形化界面:http://hadoop2:8088/cluster/apps 中看到當前任務的信息;
在這里插入圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/211543.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/211543.shtml
英文地址,請注明出處:http://en.pswp.cn/news/211543.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

OrangePi ZERO2 刷機與啟動

鏡像準備 用讀卡器和Win32Diskimager刷寫鏡像到內存卡&#xff0c;鏡像文件見下面百度云鏈接&#xff1a;https://pan.baidu.com/s/14aKTznc4Jvw4SoFF54JUTg 提取碼&#xff1a;1815 刷寫完畢后插回香橙派 串口登錄 用MobaXterm和USB-TTL進行串口登錄&#xff0c;MobaXterm軟…

談一談網絡協議中的應用層

文章目錄 一&#xff0c;什么是HTTPHTTP的優缺點HTTPS 一&#xff0c;什么是HTTP 我們在通過網絡進行傳輸數據時&#xff0c;我們要保證&#xff0c;我們在發送時構造的數據&#xff0c;在接收時也能夠解析出來&#xff0c;這本質上就是一種協議&#xff0c;是一種應用層協議&…

Spring Cloud + Vue前后端分離-第3章 SpringBoot項目技術整合

Spring Cloud Vue前后端分離-第3章 SpringBoot項目技術整合 3-1 集成持久層框架Mybatis ORM:對象關系映射&#xff0c;Hibernate是全自動ORM&#xff0c;Mybatis是半自動ORM&#xff0c;Mybatis可以操作的花樣更多&#xff0c;是首選的持久層框架 System模塊集成Mybatis框架…

整數分析 C語言xdoj43

問題描述 給出一個整數n&#xff08;0<n<100000000&#xff09;。求出該整數的位數&#xff0c;以及組成該整數的所有數字中的最大數字和最小數字。 輸入說明 輸入一個整數n&#xff08;0<n<100000000&#xff09; 輸出說明 在一行上依次輸出整數n的位…

Linux內核上游提交完整流程及示例

參考博客文章&#xff1a; 向linux內核提交代碼 - 知乎 一、下載Linux內核源碼 通過git下載Linux內核源碼&#xff0c;具體命令如下&#xff1a; git clone git://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git 實際命令及結果如下&#xff1a; penghaoDin…

IBM Qiskit量子機器學習速成(六)

量子卷積神經網絡 卷積和池化&#xff1a;卷積神經網絡的必備成分 卷積神經網絡被廣泛應用于圖像和音頻的識別當中&#xff0c;關鍵在于“卷積”操作賦予神經網絡統籌學習數據的能力。 執行卷積操作需要輸入數據與卷積核&#xff0c;卷積核首先與輸入數據左上角對齊&#xf…

【數據庫】簡單連接嵌套查詢

目錄 &#x1f387;簡單查詢 &#x1f387;連接查詢 &#x1f387;嵌套查詢 分析&思考 &#x1f387;簡單查詢 --練習簡單查詢 --select * from classes --select * from student --select * from scores --1.按Schedule表的結構要求用SQL語言創建Schedule表 --字段名…

深度學習之全面了解預訓練模型

在本專欄中&#xff0c;我們將討論預訓練模型。有很多模型可供選擇&#xff0c;因此也有很多考慮事項。 這次的專欄與以往稍有不同。我要回答的問題全部源于 MathWorks 社區論壇&#xff08;ww2.mathworks.cn/matlabcentral/&#xff09;的問題。我會首先總結 MATLAB Answers …

關于Linux Kernel Panic導致重啟的簡單分析步驟

Linux系統Kernel Panic的檢索 如何判斷是否發生Kernel Panic&#xff0c;以下以 CentOS 7.9系統為例 #查看 /var/crash 路徑下是否有生成文件夾&#xff0c;Kernel Panic后會生成文件夾在此路徑表示產生了Kernel Panic ls /var/crash #/var/crash/127.0.0.1-2023-12-04-08\:5…

HarmonyOS應用開發者基礎認證考試(穩過)

判斷題 ??????? 1. Web組件對于所有的網頁都可以使用zoom(factor: number)方法進行縮放。錯誤(False) 2. 每一個自定義組件都有自己的生命周期正確(True) 3. 每調用一次router.pushUrl()方法&#xff0c;默認情況下&#xff0c;頁面棧數量會加1&#xff0c;頁面棧支持的…

linux redis-cluster ipv6方式

配置文件&#xff0c;具體字段的含義&#xff0c;可以參考其他文檔。 1.單個文件的配置信息 redis_36380.conf requirepass Paas_2024port 36380tcp-backlog 511timeout 0tcp-keepalive 300daemonize yessupervised nopidfile /data/paas/apps/aicache-redis/redis_36380.p…

【STM32】TIM定時器編碼器

1 編碼器接口簡介 Encoder Interface 編碼器接口 編碼器接口可接收增量&#xff08;正交&#xff09;編碼器的信號&#xff0c;根據編碼器旋轉產生的正交信號脈沖&#xff0c;自動控制CNT自增或自減&#xff0c;從而指示編碼器的位置、旋轉方向和旋轉速度 接收正交信號&#…

黑豹程序員-EasyExcel實現導出

需求 將業務數據導出到excel中&#xff0c;老牌的可以選擇POI&#xff0c;也有個新的選擇EasyExcel。 有個小坑&#xff0c;客戶要求樣式比較美觀&#xff0c;數字列要求千位符&#xff0c;保留2位小數。 可以用代碼實現但非常繁瑣&#xff0c;用模板就特別方便&#xff0c;模…

C++優秀串口庫

serial::Serial Class Reference #include <serial.h> Data Structures class ScopedReadLockclass ScopedWriteLock Public Member Functions公有成員方法&#xff08;編程用的都在這里了&#xff0c;那些私有的如果不開源一般跟我們沒有關系了&#xff09; Serial …

用chatGPT開發項目:我想的無人的智慧樹網站 流量之神 利用人工智能的算法將人吸引住 GPT4是不是越來越難用了,問一下就要證明一下自己是不是人類

廣度發散&#xff1a;讓AI給出時代或今日或你關注的熱點事件 比如采集新聞頭條&#xff0c;根據內容或標題&#xff0c;以不同的角度&#xff0c;或各種人群的角色&#xff0c;生成50篇簡短的文章。一下就能占傳統的搜索引擎。這是AI最擅長的【千人千面&#xff0c;海量生成】…

【中國海洋大學】操作系統隨堂測試6整理

1. IO系統的層次機構包括&#xff1a;IO硬件、中斷處理程序、&#xff08;&#xff09;程序、設備獨立性軟件、用戶層軟件。 答&#xff1a;設備驅動 2. IO設備和控制器之間的接口包括三種類型的信號&#xff1a;數據信號線、控制信號線和&#xff08;&#xff09;&#xff1…

qt反射基礎

最近研究了一下QT的反射機制&#xff0c; Qt的元對象系統除了提供信號/槽機制的特性之外&#xff0c;它還提供了以下特性: QObject::metaObject() 返回關聯的元對象 QMetaObject::className() 在運行時狀態下返回類名 QObject::inherits() 判斷類的繼承關系 QObject::tr()&…

鴻蒙開發之封裝優化

面向對象開發離不開封裝&#xff0c;將重復的可以復用的代碼封裝起來&#xff0c;提高開發效率。 基于之前的List&#xff0c;對代碼進行封裝。 1、抽取component 將List的頭部抽離出來作為一個新的component。可以創建一個新的ArkTS文件&#xff0c;寫我們的頭部代碼 為了…

代理模式:解析對象間的間接訪問與控制

目錄 引言 理解代理模式 不同類型的代理模式 代理模式的應用場景 代理模式的優缺點 優點 缺點 實際案例&#xff1a;Java中的代理模式應用 結語 引言 代理模式是軟件設計模式中的一種結構型模式&#xff0c;旨在為其他對象提供一種代理以控制對這個對象的訪問。它允許你…

消息隊列使用指南

介紹 消息隊列是一種常用的應用程序間通信方法&#xff0c;可以用來在不同應用程序或組件之間傳遞數據或消息。消息隊列就像一個緩沖區&#xff0c;接收來自發送方的消息&#xff0c;并存儲在隊列中&#xff0c;等待接收方從隊列中取出并處理。 在分布式系統中&#xff0c;消…