搭建Flink分布式集群

1. 基礎環境:

1.1 安裝JDK

本次使用?jdk-11.0.26_linux-x64_bin.tar.gz

解壓縮

tar -zxvf jdk-11.0.26_linux-x64_bin.tar.gz -C /usr/local/java/

?配置環境變量:

vi /etc/profileJAVA_HOME=/usr/local/java/jdk-11.0.26
CLASSPATH=.:${JAVA_HOME}/lib:$CLASSPATH
PATH=$PATH:${JAVA_HOME}/bin
export JAVA_HOME CLASS_PATH PATH

讓環境變量生效:

source /etc/profile 

如果沒生效就重啟服務器

1.2 ssh免密碼登錄

集群內節點之間免密登錄

2. 搭建Flink分布式集群

1. 下載

版本:flink-2.0.0-bin-scala_2.12.tgz
地址: https://www.apache.org/dyn/closer.lua/flink/flink-2.0.0/flink-2.0.0-bin-scala_2.12.tgz

2. 安裝

通過虛擬機設置共享文件夾將需要的安裝包復制到linux虛擬機中 localhost1。虛擬機的共享盤在 /mnt/hgfs/。 將共享盤安裝包復制到 存在目標路徑/opt/software/

解壓縮

cd /opt/software/
tar -zxvf flink-2.0.0-bin-scala_2.12.tgz -C /usr/local/applications/

3,修改FLINK配置

修改 /conf/config.yaml 文件

at localhost1

jobmanager:bind-host: 0.0.0.0rpc:address: localhost1port: 6123taskmanager:bind-host: 0.0.0.0host: localhost1

at localhost2

jobmanager:bind-host: 0.0.0.0rpc:address: localhost1port: 6123taskmanager:bind-host: 0.0.0.0host: localhost2

at localhost3

jobmanager:bind-host: 0.0.0.0rpc:address: localhost1port: 6123taskmanager:bind-host: 0.0.0.0host: localhost3

修改 /conf/masters文件

localhost1:8081

修改 /conf/workers文件

localhost1
localhost2
localhost3

修改 /conf/zoo.cfg 文件 (可以不改)

server.1=localhost1:2888:3888
server.2=localhost2:2888:3888
server.3=localhost3:2888:3888

4. 將Spark軟件分發到集群

先關閉防火墻

systemctl stop firewalldsystemctl disable firewalld

將Flink分發到localhost2 和 localhost3

scp -r flink-2.0.0 root@localhost2:/usr/local/applications/flink-2.0.0
scp -r flink-2.0.0 root@localhost3:/usr/local/applications/flink-2.0.0

5, 啟動集群

[root@localhost1 flink-2.0.0]# bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host localhost1.
Starting taskexecutor daemon on host localhost1.
Starting taskexecutor daemon on host localhost2.
Starting taskexecutor daemon on host localhost3.

6, 查看WEB頁面

http://localhost1:8081/#/overview

3, Flink 開發

3.1?單詞統計案例

Socket 模擬實時發送單詞,使用 Flink 實時接收數據,并且對 數據進行聚合統計,并且把計算結果打印出來。

?創建一個Java項目 導入Flink依賴

     <properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>2.0.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency></dependencies>

創建WordCount 類

package com.neilparker;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment executionEnvironment =  StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = executionEnvironment.socketTextStream("localhost1",7777,"\n");SingleOutputStreamOperator<Tuple2<String, Long>> dataStream = source.flatMap(new FlatMapFunction<String, Tuple2<String,Long>>(){@Overridepublic void flatMap (String string, Collector<Tuple2<String, Long>> collector) {String[] splits = string.split("\\s");for (String word : splits) {collector.collect(Tuple2.of(word, 1L));}}}).keyBy(value -> value.f0).sum(1);dataStream.print();executionEnvironment.execute("wordcount batch process");}}

啟動nc 命令 模擬一個 Socket Server ,

然后運行java 代碼,

然后再nc 命令行發送數據

然后就可以看到nc 命令行如下:

[root@localhost1 ~]# nc -lp 7777
hello neil hello jack
hello mike hello walker
hello sun

Java代碼控制臺看到單詞統計結果:

5> (hello,1)
15> (neil,1)
14> (jack,1)
5> (hello,2)
4> (mike,1)
9> (walker,1)
5> (hello,3)
5> (hello,4)
15> (sun,1)
5> (hello,5)

3.2 提交代碼到Flink集群中

先在pom文件中添加 打包插件
 <build><plugins><!-- 打jar插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins></build>

然后maven package

先啟動nc命令

[root@localhost1 flink-2.0.0]# nc -lp 7777

然后到Flink UI 頁面提交jar包

然后就看到job正常運行起來了

然后區nc 命令行 輸入一些單詞

到task manager 頁面就能看到統計結果

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/89089.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/89089.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/89089.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

基于ssm校園綜合服務系統微信小程序源碼數據庫文檔

摘 要 隨著我國經濟迅速發展&#xff0c;人們對手機的需求越來越大&#xff0c;各種手機軟件也都在被廣泛應用&#xff0c;但是對于手機進行數據信息管理&#xff0c;對于手機的各種軟件也是備受用戶的喜愛&#xff0c;校園綜合服務被用戶普遍使用&#xff0c;為方便用戶能夠可…

桌面小屏幕實戰課程:DesktopScreen 17 HTTPS

飛書文檔http://https://x509p6c8to.feishu.cn/docx/doxcn8qjiNXmw2r3vBEdc7XCBCh 源碼參考&#xff1a; /home/kemp/work/esp/esp-idf/examples/protocols/https_request 源碼下載方式參考&#xff1a; 源碼下載方式 獲取網站ca證書 openssl s_client -showcerts -connec…

uniapp上傳gitee

右鍵點擊項目&#xff0c;選擇git提交&#xff0c;會彈出這樣的彈窗 在Message輸入框里面輸入更新的內容&#xff0c;選擇更新過的文件&#xff0c;然后點擊commit 然后點擊push 后面會讓你填寫gitee的用戶名和密碼 用戶名就是郵箱 密碼就是登錄gitee的密碼

重寫(Override)與重載(Overload)深度解析

在Java面向對象編程中&#xff0c;多態性是一個核心概念&#xff0c;它允許我們以統一的方式處理不同類型的對象。而實現多態性的兩種重要機制便是方法的“重寫”&#xff08;Override&#xff09;與“重載”&#xff08;Overload&#xff09;。透徹理解這兩者之間的區別與聯系…

Go 語言中操作 SQLite

sqlite以其無需安裝和配置&#xff1a;直接使用數據庫文件&#xff0c;無需啟動獨立的數據庫服務進程。 單文件存儲&#xff1a;整個數據庫&#xff08;包括表、索引、數據等&#xff09;存儲在單個跨平臺文件中&#xff0c;便于遷移和備份。 在應對的小型應用軟件中.有著不可…

【硬核數學】2.3 AI的“想象力”:概率深度學習與生成模型《從零構建機器學習、深度學習到LLM的數學認知》

歡迎來到本系列的第八篇文章。在前七章中&#xff0c;我們已經構建了一個強大的深度學習工具箱&#xff1a;我們用張量來處理高維數據&#xff0c;用反向傳播來高效地計算梯度&#xff0c;用梯度下降來優化模型參數。我們訓練出的模型在分類、回歸等任務上表現出色。 但它們有…

華為云Flexus+DeepSeek征文|Dify平臺開發搭建口腔牙科24小時在線問診系統(AI知識庫系統)

引言&#xff1a;為什么需要口腔牙科24小時在線問診系統&#xff1f; 在口腔醫療領域&#xff0c;“時間”是患者最敏感的需求之一——深夜牙齒突發疼痛、周末想提前了解治療方案、異地患者無法及時到院……傳統“工作時間在線”的咨詢模式已無法滿足用戶需求。同時&#xff0…

嵌入式硬件中電容的基本原理與詳解

大家好我們今天重討論點知識點如下: 1.電容在電路中的作用 2.用生活中水缸的例子來比喻電容 3.電容存儲能力原理 4.電容封裝的種類介紹電容種類圖片辨識 5.X 電容的作用介紹 6.Y 電容的作用介紹7.鉭電容的優點及特性 7.鉭電容的缺點及特性 8. 鋁電解電容的優點及特性…

中央空調控制系統深度解析:從原理到智能AIOT運維

——附水冷式系統全電路圖解與技術參數 一、中央空調系統架構與技術演進 1. 兩大主流系統對比 技術趨勢&#xff1a;2023年全球冷水機組市場占比達68%&#xff08;BSRIA數據&#xff09;&#xff0c;其核心優勢在于&#xff1a; - 分區控溫精度&#xff1a;0.5℃&#…

document.write 和 innerHTML、innerText 的區別

document.write 與 innerHTML、innerText 的區別 document.write 直接寫入 HTML 文檔流&#xff0c;若在頁面加載完成后調用會覆蓋整個文檔。常用于動態生成內容&#xff0c;但會破壞現有 DOM 結構&#xff0c;不推薦在現代開發中使用。 document.write("<p>直接寫…

日志分析與實時監控:Elasticsearch在DevOps中的核心作用

引言 在現代DevOps實踐中&#xff0c;日志分析與實時監控是保障系統穩定性與性能的關鍵。Elasticsearch作為分布式搜索與分析引擎&#xff0c;憑借其高效的索引與查詢能力&#xff0c;成為構建日志管理與監控系統的核心組件。本文將深入探討Elasticsearch在DevOps中的應用&…

Unity Catalog 三大升級:Data+AI 時代的統一治理再進化

在剛剛落幕的 2025 Databricks Data AI Summit 上&#xff0c;Databricks 重磅發布了多項 Lakehouse 相關功能更新。其中&#xff0c;面向數據湖治理場景的統一數據訪問與管理方案 —— Unity Catalog&#xff0c;迎來了三大關鍵升級&#xff1a;全面支持 Apache Iceberg、面向…

電容屏觸摸不靈敏及跳點問題分析

在電容屏的使用過程中&#xff0c;觸摸不靈敏和觸點不精準是極為常見且讓人困擾的問題。這些問題不僅影響用戶的操作體驗&#xff0c;在一些對觸摸精度要求較高的場景&#xff0c;如工業控制、繪圖設計等領域&#xff0c;還可能導致嚴重的后果。下面我們就來深入剖析一下這兩個…

小程序學習筆記:導航、刷新、加載、生命周期

在小程序開發的領域中&#xff0c;掌握視圖與邏輯相關的技能是打造功能完備、用戶體驗良好應用的關鍵。今天&#xff0c;咱們就來深入梳理一下小程序視圖與邏輯的學習要點&#xff0c;并結合代碼示例&#xff0c;讓大家有更直觀的理解。 一、頁面之間的導航跳轉 在小程序里實…

生成樹基礎實驗

以太網交換網絡中為了進行鏈路備份&#xff0c;提高網絡可靠性&#xff0c;通常會使用冗余鏈路。但是使用冗余鏈路會在交換網絡上產生環路&#xff0c;引發廣播風暴以及 MAC地址表不穩定等故障現象&#xff0c;從而導致用戶通信質量較差&#xff0c;甚至通信中斷。 為解決交換…

flex布局實例:把色子放進盒子里

目錄 一、flex布局實例&#xff1a;把色子放進盒子里 1、基礎樣式 二、justify-content 屬性 三、flex-direction 屬性 四、align-items 屬性 五、flex-wrap 屬性 二、flex布局應用到常見場景 非常詳細的講解flex布局&#xff0c;看一看&#xff0c;練一練&#xff01; …

Netty編解碼器

目錄 1、概念 2、Netty提供的編解碼器類型 2.1 解碼器 2.1.1 ByteToMessageDecoder 2.1.2 ReplayingDecoder 2.1.3 MessageToMessageDecoder 2.2 編碼器 2.2.1 MessageToByteEncoder 2.2.2 MessageToMessageEncoder 2.3 編解碼器 2.3.1 ByteToMessageCodec 2.3.2 M…

企業內部安全組網技術解析:安全通道選型、零信任架構與數據合規加密防護

一、引言&#xff1a;企業內部安全組網的重要性 隨著企業數字化轉型的深入以及遠程辦公需求的增加&#xff0c;企業內部異地組網逐漸成為企業信息技術部門關注的重要話題。如何在合規合法的前提下&#xff0c;保障企業內部網絡連接的安全性、穩定性&#xff0c;并有效保護企業…

Windows 4625日志類別解析:未成功的賬戶登錄事件

Windows 4625日志類別解析&#xff1a;未成功的賬戶登錄事件 什么是Windows 4625日志&#xff1f; Windows 4625日志屬于安全日志&#xff08;Security Log&#xff09; 的一部分&#xff0c;記錄系統中未成功的賬戶登錄嘗試&#xff08;Failed Logon&#xff09;。它是追蹤非…

3D看房實現房屋的切換

作為3D看房的補充&#xff0c;在這里&#xff0c;我們講一下如何實現房屋的切換&#xff0c;我這里提供兩種思路&#xff0c; 切換貼圖&#xff0c;切換場景&#xff0c; 接下我們按照較復雜的場景切換來講&#xff0c;切換貼圖也就水到渠成&#xff1a; 初始化場景&#xf…