Flink2.0學習筆記:Stream API 常用轉換算子

EC0720/FLINKTASK-TEST-STREAM/demo at master · stevensu1/EC0720

先看測試效果:控制臺

測試效果:監控服務端

主要的轉換算子包括:

轉換算子 filter:過濾包含“Flink”的輸入

轉換算子 map: 將每行數據前添加“Processed: ”并轉為大寫

轉換算子 flatMap: 將每行數據拆分為單詞

轉換算子 sum/keyBy: 按單詞分組并計數

轉換算子 reduce: 規約合并單詞

轉換算子 union: 合并兩個數據流

主要的代碼:

package com.example;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.util.Collector;public class App {public static void main(String[] args) {try {// 創建本地配置Configuration conf = new Configuration();// Web UI 配置conf.setString("rest.bind-port", "8081"); // 設置Web UI端口conf.setString("rest.bind-address", "0.0.0.0"); // 綁定所有網絡接口conf.setString("rest.address", "localhost"); // 設置Web UI地址conf.setString("rest.enable", "true"); // 啟用REST服務conf.setString("web.submit.enable", "true"); // 允許通過Web UI提交作業conf.setString("web.upload.dir", System.getProperty("java.io.tmpdir")); // 設置上傳目錄conf.setString("web.access-control-allow-origin", "*"); // 允許跨域訪問// 使用配置創建支持Web UI的執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);// 設置為流處理模式env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 基本配置env.setParallelism(1); // 設置并行度為1env.disableOperatorChaining(); // 禁用算子鏈,使執行更清晰// 禁用檢查點,因為是簡單的演示程序env.getCheckpointConfig().disableCheckpointing();// 創建周期性的數據源DataStream<String> text = env.socketTextStream("localhost", 9999) // 從socket讀取數據.name("source-strings").setParallelism(1);// 轉換算子 filter:過濾包含“Flink”的輸入text.filter(line -> line.contains("Flink")).name("filter-flink-strings").setParallelism(1).map(String::toUpperCase).name("uppercase-mapper").setParallelism(1).print().name("printer");// 轉換算子 map: 將每行數據前添加“Processed: ”并轉為大寫text.map(line -> "Processed: " + line.toUpperCase()).name("map-processed-strings").setParallelism(1).print().name("printer-processed");// 轉換算子 flatMap: 將每行數據拆分為單詞text.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String line, Collector<String> out) {for (String word : line.split(" ")) {out.collect(word);}}}).name("flatmap-split-words").setParallelism(1).print().name("printer-split-words");// 轉換算子 keyBy: 按單詞分組并計數text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> out) {for (String word : line.split(" ")) {out.collect(new Tuple2<>(word, 1));}}}).name("flatmap-split-words").setParallelism(1).keyBy(tuple -> tuple.f0) // 按單詞分組.sum(1) // 計算每個單詞的出現次數.print().name("printer-word-count");// 轉換算子 reduce: 規約合并單詞text.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String line, Collector<String> out) {for (String word : line.split(" ")) {out.collect(word);}}}).name("flatmap-split-words").setParallelism(1).keyBy(word -> word) // 按單詞分組.reduce((word1, word2) -> word1 + ", " + word2) // 合并單詞.print().name("printer-word-reduce");// 轉換算子 union: 合并兩個數據流DataStream<String> anotherText = env.fromSequence(1, Long.MAX_VALUE) // 持續生成數據.map(i -> {try {Thread.sleep(3000); // 每3秒生成一條消息return "Stream2> Auto Message " + i + ": Hello Flink";} catch (InterruptedException e) {return "Stream2> Error occurred";}}).name("source-another-strings").setParallelism(1);// 將兩個流合并并處理text.map(str -> "Stream1> " + str) // 為第一個流添加前綴.union(anotherText) // 合并兩個數據流.filter(str -> str.contains(":")) // 過濾掉不符合格式的數據.map(str -> {String[] parts = str.split(">");return String.format("%-8s | %s",parts[0].trim() + ">", // 對齊源標識parts[1].trim()); // 消息內容}).print().name("printer-union");// 執行任務env.execute("Flink Streaming Java API Hello");} catch (Exception e) {System.err.println("任務執行失敗:" + e.getMessage());e.printStackTrace();}}
}

關于監控服務端集成:REST API |Apache Flink

在引入? 本地執行UI支持 的依賴后

    <!-- 本地執行UI支持 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency>

還需要在構建環境時指定 支持Web UI的執行環境

            // 創建本地配置Configuration conf = new Configuration();// Web UI 配置conf.setString("rest.bind-port", "8081"); // 設置Web UI端口conf.setString("rest.bind-address", "0.0.0.0"); // 綁定所有網絡接口conf.setString("rest.address", "localhost"); // 設置Web UI地址conf.setString("rest.enable", "true"); // 啟用REST服務conf.setString("web.submit.enable", "true"); // 允許通過Web UI提交作業conf.setString("web.upload.dir", System.getProperty("java.io.tmpdir")); // 設置上傳目錄conf.setString("web.access-control-allow-origin", "*"); // 允許跨域訪問// 使用配置創建支持Web UI的執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

同時設置為無界處理 :

          // 設置為流處理模式env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

不然以批處理模式的話,程序執行完就會終止Web UI環境,從而無法進入Web UI界面。不過通常都是打包后發布到專門的fink監控服務器執行任務。

下面是完整依賴:

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>demo</artifactId><version>1</version><name>demo</name><!-- FIXME change it to the project's website --><url>http://www.example.com</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>17</maven.compiler.source><maven.compiler.target>17</maven.compiler.target><flink.version>2.0.0</flink.version></properties><dependencies><!--flink-streaming-java--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><!-- Flink Clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!-- Flink Table API & SQL --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><!-- 本地執行UI支持 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency><!-- 日志支持 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.32</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.6</version></dependency><!-- 測試依賴 --><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><!-- Replace this with the main class of your job --><mainClass>com.example.App</mainClass></transformer><transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/></transformers></configuration></execution></executions></plugin></plugins>
</build></project>

關于本地windows?ncat服務器搭建,ncat 是 Nmap 軟件包的一部分,所以我們需要安裝 Nmap:

  1. 從官方網站下載 Nmap 安裝程序:

    • 訪問?https://nmap.org/download.html
    • 下載 "Latest stable release self-installer" 的 Windows 版本
    • 通常文件名類似 "nmap-7.94-setup.exe"
    • 打開新的 PowerShell 窗口(以使環境變量生效)
    • 運行以下命令來啟動 ncat 服務器:ncat -l 9999

然后在另一個窗口中運行 Flink 程序:跑起來監聽9999端口后,就可以在PowerShell 窗口輸入對應的內容回車 作為程序的輸入了

cd FLINKTASK-TEST-STREAM/demo
mvn clean package
java -jar target/demo-1.jar

關于更多概念,我也在持續學習實踐中,比如flink內存模型等,希望大家多支持。

?

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/916559.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/916559.shtml
英文地址,請注明出處:http://en.pswp.cn/news/916559.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

一、Python環境、Jupyter與Pycharm

安裝Python由于RAG項目中所需要的Python版本必須高于3.8&#xff0c;經過篩選&#xff0c;最終選擇了3.10.11這個版本py --version Python 3.10.11安裝過程略過&#xff0c;但對于幾個基礎的命令作個筆記記錄where python找到python啟動器的位置D:\>where python C:\Users\x…

Flink CEP 動態模板與規則動態修改實踐完全手冊

1. Flink CEP:從靜態規則到動態江湖 Flink 的復雜事件處理(CEP)庫就像一個武功高強的俠客,能從數據流中精準捕獲特定模式,堪稱流處理界的“降龍十八掌”。但問題來了:傳統 CEP 規則通常是寫死在代碼里的,就像刻在石碑上的武功秘籍,改起來費勁不說,還得重啟應用,簡直…

vue3.2 + echarts5.6 + ant-design-vue 3.x 實現自定義 echarts 圖例

文章目錄概要技術細節效果概要 需求需要實現圖例移入顯示描述說明 故實現自定義圖例 技術細節 <template><div class"custom-legend"><divv-for"item in legends":key"item.name"class"legend-item":class"{ i…

【2025年7月25日】TrollStore巨魔商店恢復在線安裝

就在今日7月25日&#xff0c;TrollStore的在線安裝功能再次變得可用&#xff0c;這對于許多iPhone用戶來說無疑是個喜訊。在經歷了近三個月的中斷后&#xff0c;巨魔商店的企業證書意外的到來了&#xff0c;使得用戶能夠重新采用在線安裝的方式&#xff01; 在線安裝地址在文…

【05】C#入門到精通——C# 面向對象、類、靜態變量static、類與類之間的調用

文章目錄1 引入例子2 創建類2.1 類的訪問屬性2.2 英雄 特點類2.3 英雄信息打印3 靜態變量static4 類 調用 類4.1 非靜態 成員函數4.2 靜態 成員函數1 引入例子 比如游戲中 描述英雄的角色&#xff0c; 我們可以像下面這樣&#xff0c;給每一個英雄特點及擁有技能分別定義變量…

單片機的硬件結構

單片機的硬件結構 一、課程導入 在上一節課《認識單片機》中&#xff0c;我們知道單片機就像一個超級迷你的工廠&#xff0c;有著類似工廠的各個組成部分。而這個 “迷你工廠” 能正常運轉&#xff0c;離不開其內部嚴謹的硬件結構。就像一座大廈&#xff0c;只有基礎結構穩固且…

multiprocessing模塊使用方法(二)

spawn_main是Python multiprocessing模塊的核心內部函數&#xff0c;用于實現spawn啟動方法的子進程初始化。以下結合代碼Demo詳細說明其使用方法和推薦場景。一、spawn_main的功能與定位核心作用&#xff1a; 在spawn模式下啟動子進程&#xff0c;負責進程間通信管道的建立和資…

編程與數學 03-002 計算機網絡 07_路由算法

編程與數學 03-002 計算機網絡 07_路由算法一、靜態路由算法&#xff08;一&#xff09;手工配置路由表的方法&#xff08;二&#xff09;靜態路由的優缺點二、動態路由算法原理&#xff08;一&#xff09;距離矢量算法&#xff08;如貝爾曼 - 福特算法&#xff09;&#xff08…

使用Python,OpenCV計算跑圖的圖像彩色度

使用Python&#xff0c;OpenCV計算跑圖的圖像彩色度 這篇博客將介紹如何計算跑圖里最鮮艷的top25圖片和最灰暗的top25圖片并顯示色彩彩色度值展示。 效果圖 以下分別是最鮮艷top25和最灰暗top25對比效果圖&#xff1a; 最鮮艷top25效果圖&#xff1a; 最灰暗top25效果圖…

LeetCode 60:排列序列

LeetCode 60&#xff1a;排列序列問題定義與核心挑戰 給定整數 n 和 k&#xff0c;返回集合 {1,2,...,n} 的第 k 個字典序排列。直接生成所有排列再遍歷到第 k 個的方法&#xff08;時間復雜度 O(n!)&#xff09;會因 n≥10 時階乘爆炸而超時&#xff0c;因此需要 數學推導 貪…

亞遠景-傳統功能安全VS AI安全:ISO 8800填補的標準空白與實施難點

一、為什么需要ISO 8800&#xff1a;傳統安全標準的“盲區”傳統功能安全&#xff08;ISO 26262&#xff09;? 假設&#xff1a;系統行為可被完整規格化&#xff0c;失效模式可枚舉&#xff0c;風險可用概率-危害矩陣量化。? 盲區&#xff1a;對“設計意圖正確&#xff0c;但…

菜鳥教程 R語言基礎運算 注釋 和數據類型

菜鳥教程 R語言基礎運算 注釋 和數據類型 1.注釋 注釋主要用于一段代碼的解析&#xff0c;可以讓閱讀者更易理解&#xff0c;編程語言的注釋會被編譯器忽略掉&#xff0c;且不會影響代碼的執行。 一般編程語言的注釋分為單行注釋與多行注釋&#xff0c;但是 R 語言只支持單行注…

華為云ELB(彈性負載均衡)持續報異常

華為云ELB&#xff08;彈性負載均衡&#xff09;持續報異常&#xff0c;需結合實例類型&#xff08;共享型/獨享型&#xff09;和異常代碼進行針對性排查。以下是分步排查建議&#xff1a;一、根據實例類型排查網絡配置共享型實例 安全組規則&#xff1a;檢查后端服務器安全組是…

《R for Data Science (2e)》免費中文翻譯 (第2章) --- Workflow: basics

寫在前面 本系列推文為《R for Data Science (2)》的中文翻譯版本。所有內容都通過開源免費的方式上傳至Github&#xff0c;歡迎大家參與貢獻&#xff0c;詳細信息見&#xff1a; Books-zh-cn 項目介紹&#xff1a; Books-zh-cn&#xff1a;開源免費的中文書籍社區 r4ds-zh-cn …

開源深度學習新寵:Burn框架助您無憂高效建模

在日新月異的人工智能世界里&#xff0c;各類深度學習框架如雨后春筍般涌現&#xff0c;而Burn&#xff0c;作為新一代的深度學習框架&#xff0c;以其不妥協的靈活性、高效性和可移植性嶄露頭角。本文將深入探討Burn的核心功能、應用場景及具體使用方法&#xff0c;幫助您更好…

基于深度學習的圖像分割:使用DeepLabv3實現高效分割

前言 圖像分割是計算機視覺領域中的一個重要任務&#xff0c;其目標是將圖像中的每個像素分配到不同的類別中。近年來&#xff0c;深度學習技術&#xff0c;尤其是卷積神經網絡&#xff08;CNN&#xff09;&#xff0c;在圖像分割任務中取得了顯著的進展。DeepLabv3是一種高效的…

如何高效合并音視頻文件(時間短消耗資源少)(二)

英語字幕 1 00:00:06,480 --> 00:00:08,400 Good morning. We have a banger for you2 00:00:08,400 --> 00:00:09,840 today. We&amp;#39;re going to launch chatbt3 00:00:09,840 --> 00:00:11,519 agent. But before jumping into that, I&amp;#39;d4 00…

內網后滲透攻擊過程(實驗環境)--4、權限維持(2)

用途限制聲明&#xff0c;本文僅用于網絡安全技術研究、教育與知識分享。文中涉及的滲透測試方法與工具&#xff0c;嚴禁用于未經授權的網絡攻擊、數據竊取或任何違法活動。任何因不當使用本文內容導致的法律后果&#xff0c;作者及發布平臺不承擔任何責任。滲透測試涉及復雜技…

CentOS 9 配置國內 YUM 源

1.備份 sudo mv /etc/yum.repos.d/centos.repo /etc/yum.repos.d/centos.repo.backup sudo mv /etc/yum.repos.d/centos-addons.repo /etc/yum.repos.d/centos-addons.repo.backup2.創建新文件 vi /etc/yum.repos.d/centos.repo[baseos] nameCentOS Stream $releasever - BaseO…

【算法】遞歸、搜索與回溯算法入門

文章目錄遞歸什么是遞歸為什么會用到遞歸如何理解遞歸如何寫好一個遞歸搜索 vs 深度優先遍歷 vs 深度優先搜索 vs 寬度&#xff08;廣度&#xff09;優先遍歷 vs 寬度&#xff08;廣度&#xff09;優先搜索 vs 暴搜深度優先遍歷 vs 深度優先搜索&#xff08;dfs&#xff09;寬度…