EC0720/FLINKTASK-TEST-STREAM/demo at master · stevensu1/EC0720
先看測試效果:控制臺
測試效果:監控服務端
主要的轉換算子包括:
轉換算子 filter:過濾包含“Flink”的輸入
轉換算子 map: 將每行數據前添加“Processed: ”并轉為大寫
轉換算子 flatMap: 將每行數據拆分為單詞
轉換算子 sum/keyBy: 按單詞分組并計數
轉換算子 reduce: 規約合并單詞
轉換算子 union: 合并兩個數據流
主要的代碼:
package com.example;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.util.Collector;public class App {public static void main(String[] args) {try {// 創建本地配置Configuration conf = new Configuration();// Web UI 配置conf.setString("rest.bind-port", "8081"); // 設置Web UI端口conf.setString("rest.bind-address", "0.0.0.0"); // 綁定所有網絡接口conf.setString("rest.address", "localhost"); // 設置Web UI地址conf.setString("rest.enable", "true"); // 啟用REST服務conf.setString("web.submit.enable", "true"); // 允許通過Web UI提交作業conf.setString("web.upload.dir", System.getProperty("java.io.tmpdir")); // 設置上傳目錄conf.setString("web.access-control-allow-origin", "*"); // 允許跨域訪問// 使用配置創建支持Web UI的執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);// 設置為流處理模式env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 基本配置env.setParallelism(1); // 設置并行度為1env.disableOperatorChaining(); // 禁用算子鏈,使執行更清晰// 禁用檢查點,因為是簡單的演示程序env.getCheckpointConfig().disableCheckpointing();// 創建周期性的數據源DataStream<String> text = env.socketTextStream("localhost", 9999) // 從socket讀取數據.name("source-strings").setParallelism(1);// 轉換算子 filter:過濾包含“Flink”的輸入text.filter(line -> line.contains("Flink")).name("filter-flink-strings").setParallelism(1).map(String::toUpperCase).name("uppercase-mapper").setParallelism(1).print().name("printer");// 轉換算子 map: 將每行數據前添加“Processed: ”并轉為大寫text.map(line -> "Processed: " + line.toUpperCase()).name("map-processed-strings").setParallelism(1).print().name("printer-processed");// 轉換算子 flatMap: 將每行數據拆分為單詞text.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String line, Collector<String> out) {for (String word : line.split(" ")) {out.collect(word);}}}).name("flatmap-split-words").setParallelism(1).print().name("printer-split-words");// 轉換算子 keyBy: 按單詞分組并計數text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> out) {for (String word : line.split(" ")) {out.collect(new Tuple2<>(word, 1));}}}).name("flatmap-split-words").setParallelism(1).keyBy(tuple -> tuple.f0) // 按單詞分組.sum(1) // 計算每個單詞的出現次數.print().name("printer-word-count");// 轉換算子 reduce: 規約合并單詞text.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String line, Collector<String> out) {for (String word : line.split(" ")) {out.collect(word);}}}).name("flatmap-split-words").setParallelism(1).keyBy(word -> word) // 按單詞分組.reduce((word1, word2) -> word1 + ", " + word2) // 合并單詞.print().name("printer-word-reduce");// 轉換算子 union: 合并兩個數據流DataStream<String> anotherText = env.fromSequence(1, Long.MAX_VALUE) // 持續生成數據.map(i -> {try {Thread.sleep(3000); // 每3秒生成一條消息return "Stream2> Auto Message " + i + ": Hello Flink";} catch (InterruptedException e) {return "Stream2> Error occurred";}}).name("source-another-strings").setParallelism(1);// 將兩個流合并并處理text.map(str -> "Stream1> " + str) // 為第一個流添加前綴.union(anotherText) // 合并兩個數據流.filter(str -> str.contains(":")) // 過濾掉不符合格式的數據.map(str -> {String[] parts = str.split(">");return String.format("%-8s | %s",parts[0].trim() + ">", // 對齊源標識parts[1].trim()); // 消息內容}).print().name("printer-union");// 執行任務env.execute("Flink Streaming Java API Hello");} catch (Exception e) {System.err.println("任務執行失敗:" + e.getMessage());e.printStackTrace();}}
}
關于監控服務端集成:REST API |Apache Flink
在引入? 本地執行UI支持 的依賴后
<!-- 本地執行UI支持 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency>
還需要在構建環境時指定 支持Web UI的執行環境
// 創建本地配置Configuration conf = new Configuration();// Web UI 配置conf.setString("rest.bind-port", "8081"); // 設置Web UI端口conf.setString("rest.bind-address", "0.0.0.0"); // 綁定所有網絡接口conf.setString("rest.address", "localhost"); // 設置Web UI地址conf.setString("rest.enable", "true"); // 啟用REST服務conf.setString("web.submit.enable", "true"); // 允許通過Web UI提交作業conf.setString("web.upload.dir", System.getProperty("java.io.tmpdir")); // 設置上傳目錄conf.setString("web.access-control-allow-origin", "*"); // 允許跨域訪問// 使用配置創建支持Web UI的執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
同時設置為無界處理 :
// 設置為流處理模式env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
不然以批處理模式的話,程序執行完就會終止Web UI環境,從而無法進入Web UI界面。不過通常都是打包后發布到專門的fink監控服務器執行任務。
下面是完整依賴:
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>demo</artifactId><version>1</version><name>demo</name><!-- FIXME change it to the project's website --><url>http://www.example.com</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>17</maven.compiler.source><maven.compiler.target>17</maven.compiler.target><flink.version>2.0.0</flink.version></properties><dependencies><!--flink-streaming-java--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><!-- Flink Clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!-- Flink Table API & SQL --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><!-- 本地執行UI支持 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency><!-- 日志支持 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.32</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.6</version></dependency><!-- 測試依賴 --><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><!-- Replace this with the main class of your job --><mainClass>com.example.App</mainClass></transformer><transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/></transformers></configuration></execution></executions></plugin></plugins>
</build></project>
關于本地windows?ncat服務器搭建,ncat 是 Nmap 軟件包的一部分,所以我們需要安裝 Nmap:
-
從官方網站下載 Nmap 安裝程序:
- 訪問?https://nmap.org/download.html
- 下載 "Latest stable release self-installer" 的 Windows 版本
- 通常文件名類似 "nmap-7.94-setup.exe"
- 打開新的 PowerShell 窗口(以使環境變量生效)
- 運行以下命令來啟動 ncat 服務器:ncat -l 9999
然后在另一個窗口中運行 Flink 程序:跑起來監聽9999端口后,就可以在PowerShell 窗口輸入對應的內容回車 作為程序的輸入了
cd FLINKTASK-TEST-STREAM/demo
mvn clean package
java -jar target/demo-1.jar
關于更多概念,我也在持續學習實踐中,比如flink內存模型等,希望大家多支持。
?