linux 部署 flink 1.15.1 并提交作業

下載 1.15.1

https://flink.apache.org/downloads.html#apache-flink-1151

部署模式分類

  • 會話模式
  • 應用模式
  • 單作業模式
1、會話模式

先啟動一個集群,保持一個會話,然后通過客戶端提交作業,所有作業都在一個會話執行;

會話模式適合規模小、執行時間短的大量作業;

2、應用模式

前兩種模式應用代碼都是在客戶端運行,然后由客戶端提交給jobmanager的,這種方式的弊端是:需要占用大量網絡帶寬,去下載依賴和把二進制數據發送給jobmanager,將會加重客戶端資源消耗。
所以Application Mode的解決辦法是:不需要客戶端,直接把應用提交到jobmanager上運行,這意味著要為每個提交的應用單獨啟動一個jobmanager,也就是創建一個集群,

jobmanager執行完自己的應用將會關閉

應用模式與單作業模式,都是提交作業之后才創建集群;單作業模式是通過客戶端來提交的,客戶端解析出的每一個作業對應一個集群;而應用模式下,是直接由 JobManager 執行應用程序的,即使應用包含了多個作業,也只創建一個集群。此模式用的比較少,

3、單作業模式

為每個作業啟動一個集群,只要客戶端提交了一個作業,就為這個作業啟動一個單獨的集群,這個集群只為這個作業提供服務;其

一、獨立會話模式(Standalone)-部署

flink只支持linux部署

1、解壓

tar -zvxf flink-1.15.1-bin-scala_2.12.tgz

2、修改配置文件

vim conf/flink-conf.yaml 
# 修改以下內容
jobmanager.rpc.address: 192.168.31.250 # 選擇當前主機的ip地址,如果是云服務器,使用外網ip
# JobManager將綁定到的主機接口,默認值為 localhost 禁止外部訪問,設為0.0.0.0表示允許外部訪問,設置錯誤的話 Available Task Slots 會顯示0
jobmanager.bind-host: 0.0.0.0
# 任務插槽數量,相當于使用多少個線程來執行流
taskmanager.numberOfTaskSlots: 2 
parallelism.default: 1web.submit.enable: true
# 指定TaskManager主機的地址,單機部署的話,用localhost即可
taskmanager.host: 192.168.31.250
# web前端展示的端口,自己設置
rest.port: 8081   
# 客戶端應該用來連接到服務器的地址。注意:僅當高可用性配置為 NONE 時才考慮此選項
rest.address: 192.168.31.250
# 允許外部ip訪問的地址,默認情況下是localhost,只能內部訪問,改為0.0.0.0允許所有外部ip訪問
rest.bind-address: 0.0.0.0

3、修改master文件,

vim conf/masters # 填寫主節點的ip地址,如果是云服務器,使用外網ip
192.168.31.250:8081

4、修改 workers 文件

vim  conf/workers# 添加 taskManager 節點的ip地址列表,如果是單節點,只填寫主節點ip地址即可
192.168.31.250
192.168.31.251
192.168.31.252

5、、啟動

bin/start-cluster.sh

啟動成功后,命令行會顯示如下信息

[root@dev-server bin]# ./start-cluster.sh 
Starting cluster.  # 啟動集群
Starting standalonesession daemon on host dev-server.  # 啟動會話模式的 作業調度器 jobmanager
Starting taskexecutor daemon on host dev-server. # 啟動任務管理器

通過jps命令可以看到已經啟動的flink

[root@dev-server bin]# jps
3010991 TaskManagerRunner   # 任務調度器 taskManager
3010438 StandaloneSessionClusterEntrypoint   # 會話模式的節點
3023395 Jps

說明:

  1. JobManager 的啟動代碼:standalonesession,實現類是:StandaloneSessionClusterEntrypoint
  2. TaskManager 的啟動代碼:taskexecutor,實現類是:TaskManagerRunner

6、、訪問ui界面

http://192.168.31.250:8081

7、、停止flink

bin/stop-cluster.sh

二、提交作業

1、編寫作業代碼
新建maven項目,pom.xml 加入flink的依賴

<properties><java.version>1.8</java.version><scala-binary-version>2.12</scala-binary-version><flink-version>1.13.0</flink-version><slf4j-version>1.7.30</slf4j-version></properties><dependencies>
<!--        flink 依賴--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala-binary-version}</artifactId><version>${flink-version}</version></dependency>
<!--        flink 客戶端,主要做一些管理相關的工作,如果不需要,就不需要導入此依賴--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala-binary-version}</artifactId><version>${flink-version}</version></dependency><!--        日志相關依賴--><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j-version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j-version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>2.14.0</version></dependency></dependencies>

2、編寫java代碼

package com.demo;/*** @author yexd*/import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @title: 無界流處理* @Author yexd* @Date: 2022/8/7 20:10* @Version 1.0*/
public class UnboundedStreamWord {static String ip = "192.168.31.250";static int port = 9879;/*** 先將文件中的每一行進行分詞,然后統計每個單詞出現的次數* @param args* @throws Exception*/public static void main(String[] args) throws Exception {// 創建執行環境StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();// 讀取網絡流,在linux系統輸入命令 : nc -lk  8888  后,就可以進行通訊了,-lk表示保持當前的連接并持續監聽8888端口DataStreamSource<String> stringDataStreamSource = executionEnvironment.socketTextStream(ip,port);// 將每行數據根據空格切割后進行分詞,轉換成二元組, FlatMapOperator<輸入的數據類型, 輸出的數據類型>SingleOutputStreamOperator<Tuple2<String, Long>> operator = stringDataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {// 將每行進行切割String[] words = line.split(" ");for (String word : words) {// 將每個單詞轉換成二元組進行輸出,其中第一個 word 表示單詞本身, 1L表示每個單詞出現的次數,后面會用這個次數來進行統計單詞出現的總數out.collect(Tuple2.of(word, 1L));}});// 返回分詞后的結果,FlatMapOperator<輸入的數據類型, 輸出的數據類型>SingleOutputStreamOperator<Tuple2<String, Long>> returns = operator.returns(Types.TUPLE(Types.STRING, Types.LONG));// 按照分詞進行分組,keyBy 參數中的 f0 表示根據第幾個字段進行分組(從0開始), 很明顯,Tuple2的第一個字段是String類型,也就是剛剛分好詞后的單詞KeyedStream<Tuple2<String, Long>, Object> tuple2UnsortedGrouping = returns.keyBy(data -> data.f0);// 分組內進行聚合統計,sum 中的參數1 表示根據第幾個屬性進行統計,Tuple2<String, Long> 很明顯第二個屬性是Long,在上面我們將這個屬性都置為1了,所以會進行統計SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2UnsortedGrouping.sum(1);// 打印sum.print();// 啟動執行executionEnvironment.execute();/**打印結果:4> (123,1)5> (hello,1)15> (456,1)5> (hello,2)4> (123,2)5> (hello,3)說明: 大于號前面的數字表示 線程的編號,表示使用不同的線程進行處理,也就是并行流*/}
}

3、打包,通過以下命令將項目打成 jar 包

maven clean package

3、添加作業
在頁面中選擇 Submit New Job -> Add New ,

選擇剛剛打好的jar包

上傳后點擊jar的名稱,有些信息需要填寫一下

說明:

  • Entry Class : jar包中 main 方法所在類的全類名
  • Parallelism : 并行度,就是用多線程去執行作業,調成多少就用多少個線程執行作業
  • Program Arguments : 傳入main 方法的參數,多個參數用空格隔開
  • Savepoint Path :保存點路徑,比如你作業執行到一半,但是flink服務器需要重啟,就會先暫停作業,然后將執行到一半的作業保存起來,待重啟后繼續執行,這里配置就是保存的路徑;如果不需要保存,為空就行

4、提交之前的改動
因為在java代碼里面用的無界流處理,也就是說,數據是通過 socket 網絡傳輸的,如果不先啟動監聽的話,現在盲目提交就會導致報錯,而我的代碼里監聽了 192.168.31.250 的 9879端口, 所以需要在 192.168.31.250 的服務器上輸入以下命令來監聽 9879 的端口

# -lk表示保持當前的連接并持續監聽9879端口
nc -lk  9879

5、提交

以下是我的配置,然后點擊 Submit 就可以提交了

提交后 一次點擊左邊的菜單欄 Jobs -> Running Jobs ,就可以可以看到剛剛提交的任務了,點進去看看

說明:

  • 綠色的RUNNING 表示正在運行中,如果是紅色的字體,就表示有錯誤
  • RUNNING旁邊綠色的 2 表示并行度,表示有2個線程執行這個作業
  • 底部表格展示的是運行的時長、數據流大小、任務數量等信息
  • Cancel Job : 可通過此按鈕來停止作業

6、往flink發送消息
剛剛啟動了 linux 監聽了 9879 端口,發送了2條信息

然后依次點擊 TaskManager -> 任務id

最后點擊 Stout 就可以看到輸入的內容了

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/91011.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/91011.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/91011.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Redis數據量過大的隱患:查詢會變慢嗎?如何避免?

一、Redis數據過多引發的五大隱患&#xff08;附系統交互圖&#xff09; #mermaid-svg-X83bpHUu830QXKUt {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-X83bpHUu830QXKUt .error-icon{fill:#552222;}#mermaid-svg-…

網絡與信息安全有哪些崗位:(3)安全運維工程師

安全運維工程師是企業安全防線的 “日常守護者”&#xff0c;既要確保安全設備與系統的穩定運行&#xff0c;又要實時監控潛在威脅&#xff0c;快速響應并處置安全事件&#xff0c;是連接安全技術與業務運營的關鍵角色。其核心價值在于通過常態化運維&#xff0c;將安全風險控制…

魚皮項目簡易版 RPC 框架開發(三)

本文為筆者閱讀魚皮的項目 《簡易版 RPC 框架開發》的筆記&#xff0c;如果有時間可以直接去看原文&#xff0c; 1. 簡易版 RPC 框架開發 前面的內容可以筆者的前面兩個篇筆記 魚皮項目簡易版 RPC 框架開發&#xff08;一&#xff09; 魚皮項目簡易版 RPC 框架開發&#xff08;…

嵌入式Linux:注冊線程清理處理函數

在 Linux 多線程編程中&#xff0c;線程終止時可以執行特定的清理操作&#xff0c;通過注冊線程清理函數&#xff08;thread cleanup handler&#xff09;來實現。這類似于使用 atexit() 注冊進程終止處理函數。線程清理函數用于在線程退出時執行一些資源釋放或清理工作&#x…

【Git】Linux-ubuntu 22.04 初步認識 -> 安裝 -> 基礎操作

文章目錄Git 初識Git 安裝Linux-centosLinux-ubuntuWindowsGit 基本操作配置 Git認識工作區、暫存區、版本庫添加文件 -- 場景一查看 .git 文件添加文件 -- 場景二修改文件版本回退撤銷修改情況一&#xff1a;對于工作區的代碼&#xff0c;還沒有 add情況二&#xff1a;已經 ad…

輕量級音樂元數據編輯器Metadata Remote

簡介 什么是 Metadata Remote (mdrm) &#xff1f; Metadata Remote 是一個基于 Web 的音頻元數據編輯工具&#xff0c;旨在簡化在無頭服務器&#xff08;即沒有圖形用戶界面的服務器&#xff09;上編輯音頻文件的元數據。用戶只需使用 Docker 和瀏覽器&#xff0c;無需復雜的…

免費使用|共享服務器上線RTX3080(20GB顯存)

共享服務器也上架GPU啦 生物信息學中有很多用到GPU的場景&#xff0c;例如我們分享過的&#xff1a;利用GPU加速TensorFlow、部署本地DeepSeek&#xff0c;空間轉錄組學習手冊合輯加速。因此多種GPU供大家選擇&#xff1a;RTX5090、4080S、5070顯卡上機。為了讓此前的CPU服務器…

搭建DM數據守護集群

1環境與規劃準備3個kylin 10操作系統的虛擬機&#xff0c;規劃IP、端口、安裝目錄等。說明搭建REALTIME歸檔模式、事務一致性的數據守護名稱項初始主庫機器dm1初始備庫機器dm2監視器機器dmmon外部業務IP192.168.23.129192.168.23.130192.168.23.131內部心跳IP192.168.23.129192…

AUTOSAR進階圖解==>AUTOSAR_SRS_OCUDriver

AUTOSAR OCU驅動程序詳解 AUTOSAR標準輸出比較單元驅動程序架構與實現分析目錄 1. 概述 1.1 OCU驅動程序簡介1.2 功能概述 2. OCU驅動程序架構 2.1 架構圖2.2 層次結構 3. OCU驅動程序組件設計 3.1 組件圖3.2 接口定義 4. OCU驅動程序狀態管理 4.1 狀態圖4.2 狀態轉換 5. OCU驅…

InfluxDB 與 HTTP 協議交互進階(一)

引言 在當今數字化時代&#xff0c;數據處理的高效性和準確性成為了眾多領域關注的焦點。InfluxDB 作為一款開源的時序數據庫&#xff0c;憑借其高性能、易擴展等特性&#xff0c;在時間序列數據處理中占據了重要地位。而 HTTP 協議作為互聯網應用層的核心協議之一&#xff0c…

NAS遠程訪問新解法:OMV與cpolar的技術協同價值

文章目錄前言1. OMV安裝Cpolar2. 配置FTP公網地址3. OMV FTP 配置4. OMV FTP遠程連接前言 當家庭存儲需求突破本地邊界時&#xff0c;傳統NAS方案往往陷入"連接困境"&#xff1a;復雜的端口轉發配置、高昂的公網IP成本、以及始終存在的安全顧慮…開源解決方案OMV雖然…

vue 渲染 | 不同類型的元素渲染的方式(vue組件/htmlelement/純 html)

省流總結&#xff1a;&#xff08;具體實現見下方&#xff09; vue 組件 ——》<component :is組件名> htmlelement 元素 ——》 ref 、★ v-for ref 或是 ★ vue 的 nextTick 純 html 結構——》v-html 另外&#xff0c;當數據異步加載時&#xff0c;vue3中如何渲…

Charles中文版深度解析,輕松調試API與優化網絡請求

在現代軟件開發過程中&#xff0c;調試API、捕獲HTTP/HTTPS流量以及優化網絡性能是開發者不可避免的挑戰。特別是在處理復雜的網絡請求和驗證API接口的數據傳輸準確性時&#xff0c;開發者需要一款強大且易于使用的工具。Charles抓包工具憑借其功能強大、界面簡潔、易于操作的特…

【CF】Codeforces Round 1039 (Div. 2) E1 (二分答案求中位數)

E1. Submedians (Easy Version)題目&#xff1a;思路&#xff1a;經典不過加了點東西對于求中位數&#xff0c;我們必然要想到二分答案&#xff0c;具體的&#xff0c;對于所有大于等于 x 的數我們令其奉獻為 1&#xff0c;小于的為 -1&#xff0c;如果存在某段區間的奉獻和大于…

ESP32-S3學習筆記<8>:LEDC的應用

ESP32-S3學習筆記&#xff1c;8&#xff1e;&#xff1a;LEDC的應用1. 頭文件包含2. LEDC的配置2.1 配置定時器2.1.1 speed_mode/設置速度模式2.1.2 duty_resolution/設置占空比分辨率2.1.3 timer_num/選擇定時器2.1.4 freq_hz/設定PWM頻率2.1.5 clk_cfg/選擇LEDC的外設時鐘源2…

網絡安全第14集

前言&#xff1a;小迪安全14集&#xff0c;這集重點內容&#xff1a;0、什么是js滲透測試&#xff1f;在javascript中也存在變量和函數&#xff0c;存在可控變量和函數就有可能存在在漏洞&#xff0c;js開發的web應用和php、java開發的區別是&#xff0c;js能看得到的源代碼&am…

代碼隨想錄算法訓練營第三十三天

LeetCode.62 不同路徑 題目鏈接 不同路徑 題解 class Solution {public int uniquePaths(int m, int n) {// dp表示到達ij有多少條路徑int[][] dp new int[110][110];dp[1][1] 1;for(int i 0;i<m;i){dp[i][0] 1;}for(int j 0;j<n;j){dp[0][j] 1;}for(int i 1;i…

銀行回單OCR識別技術原理

銀行回單OCR&#xff08;光學字符識別&#xff09;技術通過結合圖像處理、模式識別和自然語言處理&#xff08;NLP&#xff09;技術&#xff0c;將紙質或電子版銀行回單中的非結構化文本&#xff08;如賬號、金額、日期等&#xff09;轉化為結構化數據。以下是其核心原理和關鍵…

Day22-二叉樹的迭代遍歷

昨天學習了遞歸遍歷&#xff1a;遞歸就是一次次的把參數壓入棧中&#xff0c;然后返回的時候還是上一次遞歸保存的參數。今天學習迭代遍歷。迭代遍歷就是用棧去模擬保存二叉樹的節點&#xff0c;然后依次去遍歷&#xff0c;只不過要注意棧的后入先出的規則。前序遍歷&#xff1…

知識蒸餾 - 通過引入溫度參數T調整 Softmax 的輸出

知識蒸餾 - 通過引入溫度參數T調整 Softmax 的輸出 flyfish import torch import torch.nn.functional as F import matplotlib.pyplot as plt import numpy as np# 設置中文字體支持 plt.rcParams["font.family"] [AR PL UMing CN] # Linux plt.rcParams[axes.uni…