1. 基礎環境:
1.1 安裝JDK
本次使用?jdk-11.0.26_linux-x64_bin.tar.gz
解壓縮
tar -zxvf jdk-11.0.26_linux-x64_bin.tar.gz -C /usr/local/java/
?配置環境變量:
vi /etc/profileJAVA_HOME=/usr/local/java/jdk-11.0.26
CLASSPATH=.:${JAVA_HOME}/lib:$CLASSPATH
PATH=$PATH:${JAVA_HOME}/bin
export JAVA_HOME CLASS_PATH PATH
讓環境變量生效:
source /etc/profile
如果沒生效就重啟服務器
1.2 ssh免密碼登錄
2. 搭建Flink分布式集群
1. 下載
版本:flink-2.0.0-bin-scala_2.12.tgz
地址: https://www.apache.org/dyn/closer.lua/flink/flink-2.0.0/flink-2.0.0-bin-scala_2.12.tgz
2. 安裝
通過虛擬機設置共享文件夾將需要的安裝包復制到linux虛擬機中 localhost1。虛擬機的共享盤在 /mnt/hgfs/。 將共享盤安裝包復制到 存在目標路徑/opt/software/
解壓縮
cd /opt/software/
tar -zxvf flink-2.0.0-bin-scala_2.12.tgz -C /usr/local/applications/
3,修改FLINK配置
修改 /conf/config.yaml 文件
at localhost1
jobmanager:bind-host: 0.0.0.0rpc:address: localhost1port: 6123taskmanager:bind-host: 0.0.0.0host: localhost1
at localhost2
jobmanager:bind-host: 0.0.0.0rpc:address: localhost1port: 6123taskmanager:bind-host: 0.0.0.0host: localhost2
at localhost3
jobmanager:bind-host: 0.0.0.0rpc:address: localhost1port: 6123taskmanager:bind-host: 0.0.0.0host: localhost3
修改 /conf/masters文件
localhost1:8081
修改 /conf/workers文件
localhost1
localhost2
localhost3
修改 /conf/zoo.cfg 文件 (可以不改)
server.1=localhost1:2888:3888
server.2=localhost2:2888:3888
server.3=localhost3:2888:3888
4. 將Spark軟件分發到集群
先關閉防火墻
systemctl stop firewalldsystemctl disable firewalld
將Flink分發到localhost2 和 localhost3
scp -r flink-2.0.0 root@localhost2:/usr/local/applications/flink-2.0.0
scp -r flink-2.0.0 root@localhost3:/usr/local/applications/flink-2.0.0
5, 啟動集群
[root@localhost1 flink-2.0.0]# bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host localhost1.
Starting taskexecutor daemon on host localhost1.
Starting taskexecutor daemon on host localhost2.
Starting taskexecutor daemon on host localhost3.
6, 查看WEB頁面
http://localhost1:8081/#/overview
3, Flink 開發
3.1?單詞統計案例
?創建一個Java項目 導入Flink依賴
<properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>2.0.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency></dependencies>
創建WordCount 類
package com.neilparker;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = executionEnvironment.socketTextStream("localhost1",7777,"\n");SingleOutputStreamOperator<Tuple2<String, Long>> dataStream = source.flatMap(new FlatMapFunction<String, Tuple2<String,Long>>(){@Overridepublic void flatMap (String string, Collector<Tuple2<String, Long>> collector) {String[] splits = string.split("\\s");for (String word : splits) {collector.collect(Tuple2.of(word, 1L));}}}).keyBy(value -> value.f0).sum(1);dataStream.print();executionEnvironment.execute("wordcount batch process");}}
啟動nc 命令 模擬一個 Socket Server ,
然后運行java 代碼,
然后再nc 命令行發送數據
然后就可以看到nc 命令行如下:
[root@localhost1 ~]# nc -lp 7777
hello neil hello jack
hello mike hello walker
hello sun
Java代碼控制臺看到單詞統計結果:
5> (hello,1)
15> (neil,1)
14> (jack,1)
5> (hello,2)
4> (mike,1)
9> (walker,1)
5> (hello,3)
5> (hello,4)
15> (sun,1)
5> (hello,5)
3.2 提交代碼到Flink集群中
<build><plugins><!-- 打jar插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins></build>
然后maven package

先啟動nc命令
[root@localhost1 flink-2.0.0]# nc -lp 7777
然后到Flink UI 頁面提交jar包
然后就看到job正常運行起來了
然后區nc 命令行 輸入一些單詞
到task manager 頁面就能看到統計結果