一,下載flink:Downloads | Apache Flink,解壓后放入IDE工作目錄:我這里以1.17版本為例
可以看到,flink后期的版本中沒有提供window啟動腳本:start-cluster.bat
所以這里要通過windows自帶的wsl 系統啟動它
打開終端依次運行下列命令完成wsl linux 系統的安裝以及jdk的安裝
wsl --install wsl.exe -d Ubuntu sudo apt update sudo apt install openjdk-11-jdk -y
之后繼續在終端中執行 wsl.exe -d Ubuntu 啟動wsl,wsl 默認系統為:Ubuntu,當然也可以切換其他類型的系統,重要的是:wsl會自動掛載windows 目錄,這就實現了在wsl上運行windows目錄中的項目。
然后 一路cd 到flink bin目錄,啟動flink:
這里啟動前要注意修改flink 的配置:把localhost 統統改為 0.0.0.0,,除jobmanager.rpc.address: 這項要設置為wsl? 的ip,不然flink集群選舉master會失敗: [jobmanager.rpc.address: 172.29.145.42],這樣啟動后,就可以在本機瀏覽器輸入wsl的ip訪問flink服務的web ui了
二,提交flink作業
為了方便測試,這里寫一個程序每隔1秒向本機(192.168.0.39) 端口:9999發送數據:“test flink window hallo word”。
package org.example.demo01;import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;public class PushDataTo9999 {private static final String HOST = "192.168.0.39";private static final int PORT = 9999;private static final String DATA = "test flink window hallo word";public static void main(String[] args) {try {System.out.println("Connecting to " + HOST + ":" + PORT);// 創建到WSL的連接try (Socket socket = new Socket(HOST, PORT);OutputStream outputStream = socket.getOutputStream()) {System.out.println("Connected to " + HOST + ":" + PORT);// 持續發送數據while (!Thread.currentThread().isInterrupted() && !socket.isClosed()) {// 獲取當前系統時間String currentTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));// 每秒發送一次帶時間戳的數據String dataToSend = DATA + " " + currentTime + "\n";outputStream.write(dataToSend.getBytes(StandardCharsets.UTF_8));outputStream.flush();System.out.println("Sent: " + dataToSend.trim());// 等待1秒Thread.sleep(1000);}}} catch (IOException | InterruptedException e) {System.err.println("Error: " + e.getMessage());}}
}
?
然后flink 作業內容為在wsl服務器(172.29.145.42)中 監聽本機(192.168.0.39)端口9999,并實時統計每個單詞出現的次數,這里注意關閉windows 防火墻
package org.example.demo01;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
import org.apache.flink.util.Collector;/*** Hello world!*/
public class App {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 設置為流處理模式env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 基本配置env.setParallelism(1); // 設置并行度為1env.disableOperatorChaining(); // 禁用算子鏈,使執行更清晰// 禁用檢查點,因為是簡單的演示程序env.getCheckpointConfig().disableCheckpointing();// 創建周期性的數據源
// DataStream<String> dataStream = env
// .socketTextStream("localhost", 9999) // 從socket讀取數據
// .name("source-strings")
// .setParallelism(1);DataStream<String> dataStream = env.addSource(new SocketTextStreamFunction("192.168.0.39", 9999, "\n", 0)).name("socket-source");// 轉換算子 keyBy: 按單詞分組并計數dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> out) {for (String word : line.split(" ")) {out.collect(new Tuple2<>(word, 1));}}}).name("flatmap-split-words").setParallelism(1).keyBy(tuple -> tuple.f0) // 按單詞分組.sum(1) // 計算每個單詞的出現次數.print().name("printer-word-count");// 執行任務env.execute("Flink Streaming Java API Hello");}
}
注意pom 需要加入flink的打包插件:
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><!-- Replace this with the main class of your job --><mainClass>org.example.demo01.App</mainClass></transformer><transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/></transformers></configuration></execution></executions></plugin></plugins></build>
通過maven編譯,打包后,我們把jar包通過web ui上傳到flink 服務端:
點擊我們上傳的jar,進入提交項:
提交了后作業會自動啟動:
作業的print輸出可以在taskmanagers中查看:
?