02. Flink 快速上手

02. Flink 快速上手

1、創建項目導入依賴

pom文件:

<properties><flink.version>1.17.0</flink.version>
</properties><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version>
</dependency>

2、需求

批處理基本思路:先逐行讀取文本,在根據空格進行單詞拆分,最后再去統計每個單詞出現的頻率。

(1)數據準備

在工程目錄下新建文件夾input,新建文本words.txt。

文件輸入:

hello world
hello flink
hello java

2.1 批處理

代碼編寫(使用DataSet API實現)

package com.company.onedayflink.demo;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class FlinkBatchWords {public static void main(String[] args) throws Exception {// 1、創建執行環境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 2、從文件中讀取數據DataSource<String> lineDS = env.readTextFile("one-day-flink/input/words.txt");// 3、切分、轉換FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {/**** @param value     讀取到的輸入* @param out       返回的內容,Tuple2是一個二元分組,(字符串,個數)。* @throws Exception*/@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {// 3.1 切分for (String s : value.split(" ")) {// 3.2 將單組轉為二元組Tuple2<String, Integer> tuple = Tuple2.of(s, 1);// 3.3 將二元組發送給下游out.collect(tuple);}}});// 4、按照 word 分組UnsortedGrouping<Tuple2<String, Integer>> wordAndOneGroup = wordAndOne.groupBy(0); // 0 表示下標為0的參數,也就是二元組的String單詞// 5、各分組聚合AggregateOperator<Tuple2<String, Integer>> sum = wordAndOneGroup.sum(1);//1 表示下標1的元素,即單詞個數// 6、輸出sum.print();}
}

運行結果:

image-20240519130034466

2.2 流處理

2.2.1 有界流

代碼編寫(使用DataStream API實現,讀取文件屬于有界流)

package com.company.onedayflink.demo;import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;@Slf4j
public class FlinkStreamWords {public static void main(String[] args) throws Exception {// 1、創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2、從文件中讀取數據DataStreamSource<String> lineDS = env.readTextFile("one-day-flink/input/words.txt");// 3、處理數據(切換、轉換、分組、聚合)// 3.1 切換、轉換SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {for (String s : value.split(" ")) {// 構建二元組Tuple2<String, Integer> tuple = Tuple2.of(s, 1);// 通過采集器向下游發送數據out.collect(tuple);}}});// 3.2 分組, KeySelector<IN, KEY> 中 IN 表示輸入的類型,KEY 表示分組key的類型KeyedStream<Tuple2<String, Integer>, String> wordAndOneKS = wordAndOne.keyBy((KeySelector<Tuple2<String, Integer>, String>) value -> value.f0); // value.f0 表示二元組的第一個元素// 3.3 聚合SingleOutputStreamOperator<Tuple2<String, Integer>> sum = wordAndOneKS.sum(1);  // 1 表示二元組的第二個元素// 4、輸出數據sum.print();// 5、執行env.execute();}
}

執行結果:

2> (java,1)
3> (hello,1)
3> (hello,2)
3> (hello,3)
6> (world,1)
8> (flink,1)

前面的編號是并行度,線程數。

2.2.2 無界流

(1)使用 netcat 監聽7777端口,建立stream流

安裝 netcat

brew install netcat

監聽 7777 端口

nc -lk 7777

(2)代碼編寫(使用DataStream API實現,讀取stream流屬于無界流)

package com.company.onedayflink.demo;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class FlinkSteamSocketWords {public static void main(String[] args) throws Exception {// 1、創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2、讀取數據(其中hostname 是需要監聽的主機名稱,mac電腦可以在終端使用hostname命令查看)DataStreamSource<String> socketDS = env.socketTextStream("zgyMacBook-Pro.local", 7777);// 3、數據處理(切割、轉換、分組、聚合)SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketDS.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {// 3.1 切分for (String s : value.split(" ")) {// 3.2 將單組轉為二元組Tuple2<String, Integer> tuple = Tuple2.of(s, 1);// 3.3 將二元組發送給下游out.collect(tuple);}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(value -> value.f0).sum(1);// 4、輸出sum.print();// 5、執行env.execute();}
}

(3)測試

在終端發送消息

hello flink
hello world

觀察程序控制臺打印

8> (flink,1)
3> (hello,1)
6> (world,1)
3> (hello,2)

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/15587.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/15587.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/15587.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

k8s配置pods滾動發布

背景 采用微服務架構部署的應用&#xff0c;部署方式都要用到容器化部署k8s容器編排&#xff0c;最近我在公司負載的系統也是用的上述架構部署&#xff0c;但是隨著系統的運行&#xff0c;用戶提的需求就會越多&#xff0c;每次更新的話都要停機發布&#xff0c;最用戶側來說就…

【C語言刷題系列】求一個數組中兩個元素a和b的和最接近整數m

&#x1f493; 博客主頁&#xff1a;倔強的石頭的CSDN主頁 &#x1f4dd;Gitee主頁&#xff1a;倔強的石頭的gitee主頁 ? 文章專欄&#xff1a;C語言刷題系列 目錄 一、問題描述 二、解題思路 解題思路&#xff1a; 解題步驟: 三、C語言代碼實現及測試 一、問題描述 給定一…

指北者智能音樂學習機隆重亮相廣州國際樂器展

2024年5月23-26日廣州國際樂器展覽會在廣交會展館B區隆重開幕&#xff0c;本屆展會開設5大展廳、50000平方米的主題展區&#xff0c;吸引了700多家國內外參展商參展&#xff0c;打造集展示、商貿、文化交流、文娛于一體的廣闊平臺。深圳市指北科技有限公司也攜旗下品牌指北者智…

AWS云服務器每月費用高昂,如何優化達到節省目的?

AWS云服務器每月費用可能因不同的使用情況和配置而有所不同。為了優化并節省AWS云服務器的費用&#xff0c;aws的合作伙伴九河云提供了一些建議&#xff1a; &#xff08;1&#xff09;調整實例大小&#xff1a;確保你使用的實例大小與你的工作負載相匹配。實例的容量每增加一倍…

Gopeed的高級用法

Gopeed是一個開源全平臺下載器&#xff0c;具體簡介請參考&#xff1a; “狗屁下載器”&#xff1f;Gopeed - 開源全平臺下載器 (免費輕量 / 比 Aria2 好用 / 遠程下載) - 異次元軟件世界 (iplaysoft.com) 這里主要介紹下自己摸索出來的 Gopeed 的高級做法。 有的網站添加的…

時政|醫療結果互認

背景&#xff08;存在的問題&#xff09; 看同一種病&#xff0c;換一家醫院甚至換一個院區、換一個科室&#xff0c;檢查檢驗還得再來一遍&#xff0c;費錢又費時。開展檢查檢驗結果互認&#xff0c;可以明顯減輕患者就醫負擔。患者不用做重復檢查&#xff0c;也可節約就醫時…

基于JSP/Servlet校園二手交易平臺(二)

目錄 2 開發技術及開發環境 2.1 Java語言簡介 2.2 J2EE技術介紹 2.3 Servlet/JSP技術 2.4 MVC 簡介 2.5 Struts 技術 2.6 Hibernate 技術 2.6.1 應用程序的分層體系結構 2.6.2 Hibernate的應用及API簡介 2.7 開發環境及環境配置 2.7.1 Java/JSP系統環境 2.7.2 JSP環…

D365 SysDictTable\SysDictField

文章目錄 前言一、示例 前言 SysDictField 和 SysDictTable 用于訪問表和字段的元數據信息。 一、示例 循環表&#xff0c;使對應數據源的字段禁止編輯 public void fieldNoAllowEdit(Common _common,formDataSource fds,boolean aE false){TableId tab…

小程序-購物車-基于SKU電商規格組件實現

SKU 概念&#xff1a; 存貨單位&#xff08; Stock Keeping Unit &#xff09;&#xff0c; 庫存 管理的最小可用單元&#xff0c;通常稱為“單品”。 SKU 常見于電商領域&#xff0c;對于前端工程師而言&#xff0c;更多關注 SKU 算法 &#xff0c;基于后端的 SKU 數據…

(二)vForm 動態表單設計器之下拉、選擇

系列文章目錄 &#xff08;一&#xff09;vForm 動態表單設計器之使用 目錄 系列文章目錄 前言 一、后端需提供接口 二、組件配置 總結 前言 動態表單下拉、選擇等組件&#xff0c;大概率要使用數據庫中的數據&#xff0c;那么vForm如何拿到數據庫中的數據呢&#xff1f;跟隨…

僵尸進程、孤兒進程、守護進程

【一】僵尸進程和孤兒進程 【1】引入 我們知道在unix/linux中&#xff0c;正常情況下&#xff0c;子進程是通過父進程創建的&#xff0c;子進程在創建新的進程。 子進程的結束和父進程的運行是一個異步過程,即父進程永遠無法預測子進程 到底什么時候結束。 當一個 進程完成它…

動物合并消除休閑游戲源碼 Animal Merge 益智游戲

一款動物合并消除休閑游戲源碼&#xff0c;Animal Merge是一款引人入勝的益智游戲&#xff0c;玩家的任務是合并方塊&#xff0c;創造出可愛的動物&#xff0c;這些動物的體型會逐漸變大。游戲玩法包括將方塊放到網格上&#xff0c;并戰略性地將它們合并以形成更大的動物形狀。…

作文筆記9 描寫方法

動態描寫&#xff1a; 威尼斯小艇&#xff0c;窗外的風景飛快的后退。 靜態描寫&#xff1a; 牧場之國&#xff0c;牛不再哞哞&#xff0c;馬忘記了踢馬房的擋板。 動靜結合&#xff1a; 火車進站&#xff0c;人聲鼎沸&#xff0c;叫賣聲&#xff0c;廣播聲&#xff0c;人…

【408精華知識】主存相關解題套路大揭秘!

講完了Cache&#xff0c;再來講講主存是怎么考察的&#xff0c;我始終認為&#xff0c;一圖勝千言&#xff0c;所以對于很多部件&#xff0c;我都是通過畫圖進行形象的記憶&#xff0c;那么接下來我們對主存也畫個圖&#xff0c;然后再來詳細解讀其考察套路~ 文章目錄 零、主存…

機器人正逆運動學、動力學概念

1.基本概念 建立機器人的正逆運動學和正逆動力學模型是為了解決不同類型的控制和規劃問題。這些模型幫助工程師和研究人員理解和預測機器人的行為&#xff0c;從而設計出更有效的控制策略和運動規劃。以下是建立這些模型的主要原因和一些應用實例&#xff1a; 正運動學模型 正…

python-pytorch 下批量seq2seq+Bahdanau Attention實現問答1.0.000

python-pytorch 下批量seq2seq+Bahdanau Attention實現簡單問答1.0.000 前言原理看圖數據準備分詞、index2word、word2index、vocab_size輸入模型的數據構造注意力模型decoder的編寫關于損失函數和優化器在預測時完整代碼參考前言 前面實現了 luong的dot 、general、concat注意…

【話題】我眼神的IT行業現狀與未來趨勢

目錄 一、挑戰 教學資源的重新分配 教師角色的轉變 學生學習方式的改變 教育評價體系的挑戰 二、機遇 個性化學習 跨學科學習 國際合作與交流 創新教育模式 三、如何培養下一代IT專業人才 更新教育理念 加強基礎設施建設 整合課程資源 加強實踐教學 培養跨學科…

easy-es EsAutoConfiguration RestHighLevelClient 沒有自動注入配置

我用的easy-es.version 是 2.0.0-beta1&#xff0c;是基于springboot2開發的&#xff0c;自動注入配置的目錄掃描的是META-INF/spring.factories文件&#xff1b;而我使用的框架是springboot3&#xff0c;springboot3掃描的是META-INF/spring/org.springframework.boot.autocon…

【算法刷題day57】Leetcode:739. 每日溫度、496.下一個更大元素 I

文章目錄 Leetcode 739. 每日溫度解題思路代碼總結 Leetcode 496.下一個更大元素 I解題思路代碼總結 草稿圖網站 java的Deque Leetcode 739. 每日溫度 題目&#xff1a;739. 每日溫度 解析&#xff1a;代碼隨想錄解析 解題思路 維護一個單調棧&#xff0c;當新元素大于棧頂&a…

【Linux】TCP協議【中】{確認應答機制/超時重傳機制/連接管理機制}

文章目錄 1.確認應答機制2.超時重傳機制&#xff1a;超時不一定是真超時了3.連接管理機制 1.確認應答機制 TCP協議中的確認應答機制是確保數據可靠傳輸的關鍵部分。以下是該機制的主要步驟和特點的詳細解釋&#xff1a; 數據分段與發送&#xff1a; 發送方將要發送的數據分成一…