Flink基本轉換算子map/filter/flatmap

map

map是大家非常熟悉的大數據操作算子,主要用于將數據流中的數據進行轉換,形成新的數據流。簡單來說,就是一個“一一映射”,消費一個元素就產出一個元素。
在這里插入圖片描述
我們只需要基于DataStream調用map()方法就可以進行轉換處理。方法需要傳入的參數是接口MapFunction的實現;返回值類型還是DataStream,不過泛型(流中的元素類型)可能改變。

public class TransMap {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1, 1),new WaterSensor("sensor_2", 2, 2));// 方式一:傳入匿名類,實現MapFunctionstream.map(new MapFunction<WaterSensor, String>() {@Overridepublic String map(WaterSensor e) throws Exception {return e.id;}}).print();// 方式二:傳入MapFunction的實現類// stream.map(new UserMap()).print();env.execute();}public static class UserMap implements MapFunction<WaterSensor, String> {@Overridepublic String map(WaterSensor e) throws Exception {return e.id;}}
}

面代碼中,MapFunction實現類的泛型類型,與輸入數據類型和輸出數據的類型有關。在實現MapFunction接口的時候,需要指定兩個泛型,分別是輸入事件和輸出事件的類型,還需要重寫一個map()方法,定義從一個輸入事件轉換為另一個輸出事件的具體邏輯。

Filter

filter轉換操作,顧名思義是對數據流執行一個過濾,通過一個布爾條件表達式設置過濾條件,對于每一個流內元素進行判斷,若為true則元素正常輸出,若為false則元素被過濾掉。
在這里插入圖片描述
進行filter轉換之后的新數據流的數據類型與原數據流是相同的。filter轉換需要傳入的參數需要實現FilterFunction接口,而FilterFunction內要實現filter()方法,就相當于一個返回布爾類型的條件表達式。

public class TransFilter {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1, 1),
new WaterSensor("sensor_1", 2, 2),
new WaterSensor("sensor_2", 2, 2),
new WaterSensor("sensor_3", 3, 3));// 方式一:傳入匿名類實現FilterFunctionstream.filter(new FilterFunction<WaterSensor>() {@Overridepublic boolean filter(WaterSensor e) throws Exception {return e.id.equals("sensor_1");}}).print();// 方式二:傳入FilterFunction實現類// stream.filter(new UserFilter()).print();env.execute();}public static class UserFilter implements FilterFunction<WaterSensor> {@Overridepublic boolean filter(WaterSensor e) throws Exception {return e.id.equals("sensor_1");}}
}

FlatMap

flatMap操作又稱為扁平映射,主要是將數據流中的整體(一般是集合類型)拆分成一個一個的個體使用。消費一個元素,可以產生0到多個元素。flatMap可以認為是“扁平化”(flatten)和“映射”(map)兩步操作的結合,也就是先按照某種規則對數據進行打散拆分,同map一樣,flatMap也可以使用Lambda表達式或者FlatMapFunction接口實現類的方式來進行傳參,返回值類型取決于所傳參數的具體邏輯,可以與原數據流相同,也可以不同。
在這里插入圖片描述
案例需求:如果輸入的數據是sensor_1,只打印vc;如果輸入的數據是sensor_2,既打印ts又打印vc。

public class TransFlatmap {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1, 1),
new WaterSensor("sensor_1", 2, 2),
new WaterSensor("sensor_2", 2, 2),
new WaterSensor("sensor_3", 3, 3));stream.flatMap(new MyFlatMap()).print();env.execute();}/*** TODO flatmap: 一進多出(包含0出)*      對于s1的數據,一進一出*      對于s2的數據,一進2出*      對于s3的數據,一進0出(類似于過濾的效果)**    map怎么控制一進一出:*      =》 使用 return**    flatmap怎么控制的一進多出*      =》 通過 Collector來輸出, 調用幾次就輸出幾條***/public static class MyFlatMap implements FlatMapFunction<WaterSensor, String> {@Overridepublic void flatMap(WaterSensor value, Collector<String> out) throws Exception {if (value.id.equals("sensor_1")) {out.collect(String.valueOf(value.vc));} else if (value.id.equals("sensor_2")) {out.collect(String.valueOf(value.ts));out.collect(String.valueOf(value.vc));}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/212296.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/212296.shtml
英文地址,請注明出處:http://en.pswp.cn/news/212296.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

案例026:基于微信小程序的原創音樂系統的設計與實現

文末獲取源碼 開發語言&#xff1a;Java 框架&#xff1a;SSM JDK版本&#xff1a;JDK1.8 數據庫&#xff1a;mysql 5.7 開發軟件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.5.4 小程序框架&#xff1a;uniapp 小程序開發軟件&#xff1a;HBuilder X 小程序…

什么是Restful?

Rest簡介 REST是英文representational state transfer(表象性狀態轉變)或者表述性狀態轉移。Rest是web服務的一種架構風格。使用HTTP,URI,XML,JSON,HTML等廣泛流行的標準和協議。輕量級,跨平臺,跨語言的架構設計。它是一種設計風格,不是一種標準,是一種思想。 Rest架構的主要…

java程序定時器

目錄 1.java定時器原生方法 1.java定時器原生方法 實現每天早上8點執行任務的示例代碼 import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit;public class TimeTest{pub…

汽車網絡安全--關于UN R155認證的思考

1.UN R155概述 2020年6月25日,聯合國頒布了全球首個汽車網絡安全強制性法規 -- UN 155,詳細規定了關于評估網絡安全措施的審核條款、制造商和供應商降低網絡安全風險的方法以及實施風險評估的義務等。 法規適用于與信息安全相關的M類(4輪及以上載客汽車)、N類(四輪載貨汽車)…

SpringBoot項目連接Graylog

直接用logback將控制臺輸出的日志發送到graylog上 1.導入logback依賴 <dependency> <groupId>de.siegmar</groupId> <artifactId>logback-gelf</artifactId> <version>1.1.0</version> </dependency> 2.創建logback-spring.x…

淺談低代碼

低代碼開發是近年來迅速崛起的軟件開發方法&#xff0c;讓編寫應用程序變得更快、更簡單。有人說它是美味的膳食&#xff0c;讓開發過程高效而滿足&#xff0c;但也有人質疑它是垃圾食品&#xff0c;缺乏定制性與深度。你認為低代碼到底是美以下方向僅供參考。味的膳食還是垃圾…

SpringBoot - 四種常見定時器

常見實現方案 Scheduled注解&#xff1a;基于注解Timer().schedule創建任務&#xff1a;基于封裝類Timer線程&#xff1a;使用線程直接執行任務即可&#xff0c;可以與thread、線程池、ScheduleTask等配合使用quartz配置定時器&#xff1a;基于spring的quartz框架 Scheduled注…

golang學習筆記——編寫最簡單的命令行工具

編寫最簡單的命令行工具 用戶輸入bufio 使用go語言編寫最簡單的命令行工具 mkdir hello-cli-demo cd hello-cli-demo # 查看環境變量 go envgo mod初始化 go mod init gitcode.com/m打開vscode&#xff0c;創建main.go package mainimport ("fmt""bufio&qu…

RK3568 CIF和ISP的關聯

1. 引言 在本文檔中&#xff0c;我們將介紹RK3568芯片的CIF&#xff08;Camera Interface&#xff09;和ISP&#xff08;Image Signal Processor&#xff09;模塊。這兩個模塊是RK3568芯片的關鍵組成部分&#xff0c;用于圖像采集和處理。 CIF是一個標準接口&#xff0c;用于…

快速測試 3節點的redis sentinel集群宕機2個節點以后是否仍能正常使用

有同事問我&#xff0c;三個redis sentinel節點&#xff0c;宕機兩個節點以后&#xff0c;是否還能夠正常的通過redis sentinel正常訪問redis的數據。我想了想&#xff0c;理論上是可以的&#xff0c;但是我沒試過&#xff0c;今天有時間就測試了一下。搭建環境和測試代碼的過程…

Java并發(十七)----變量的線程安全分析

1、成員變量和靜態變量是否線程安全 如果它們沒有共享&#xff0c;則線程安全 如果它們被共享了&#xff0c;根據它們的狀態是否能夠改變&#xff0c;又分兩種情況 如果只有讀操作&#xff0c;則線程安全 如果有讀寫操作&#xff0c;則這段代碼是臨界區&#xff0c;需要考慮線…

深入了解Python pydash庫

更多資料獲取 &#x1f4da; 個人網站&#xff1a;ipengtao.com 在數據處理和分析領域&#xff0c;Python一直是一種強大的編程語言。然而&#xff0c;在處理大規模數據集和執行復雜操作時&#xff0c;有時候需要更高效的工具。在本文中&#xff0c;我們將深入探討pydash庫&am…

語義分割 簡介及數據集簡介

參考文章 MS COCO數據集介紹以及pycocotools簡單使用-CSDN博客

[MySQL--進階篇]存儲引擎的體系結構、簡介、特點、選擇

前言 ?Hello!這里是歐_aita的博客。 ?今日語錄&#xff1a;不要在乎別人怎么看你&#xff0c;因為他們根本就沒有時間&#xff0c;他們只關心他們自己。 ?個人主頁&#xff1a;歐_aita ψ(._. )>?個人專欄&#xff1a; 數據結構與算法 MySQL數據庫 存儲引擎 前言MySQL體…

代碼隨想錄算法訓練營第四十一天|343. 整數拆分、96.不同的二叉搜索樹

代碼隨想錄算法訓練營第四十一天|343. 整數拆分、96.不同的二叉搜索樹 整數拆分 343. 整數拆分 文章講解&#xff1a;https://programmercarl.com/0343.%E6%95%B4%E6%95%B0%E6%8B%86%E5%88%86.html 題目鏈接&#xff1a;https://leetcode.cn/problems/integer-break/ 視頻講解…

李宏毅gpt個人記錄

參考&#xff1a; 李宏毅機器學習--self-supervised&#xff1a;BERT、GPT、Auto-encoder-CSDN博客 用無標注資料的任務訓練完模型以后&#xff0c;它本身沒有什么用&#xff0c;GPT 1只能夠把一句話補完&#xff0c;可以把 Self-Supervised Learning 的 Model做微微的調整&am…

32.768KHz時鐘RTC晶振精度PPM值及頻差計算

一個數字電路就像一所城市的交通&#xff0c;晶振的作用就是十字路口的信號燈&#xff0c;因此晶振的品質及其電路應用尤其關鍵。數字電路又像生命體&#xff0c;它的運行就像人身體里的血液流通&#xff0c;它不是由單一的某個器件或器件單元構成&#xff0c;而是由多個器件及…

【Spring Boot 源碼學習】ApplicationListener 詳解

Spring Boot 源碼學習系列 ApplicationListener 詳解 引言往期內容主要內容1. 初識 ApplicationListener2. 加載 ApplicationListener3. 響應應用程序事件 總結 引言 書接前文《初識 SpringApplication》&#xff0c;我們從 Spring Boot 的啟動類 SpringApplication 上入手&am…

如何查詢川菜食材配料的API接口

在當今的美食文化中&#xff0c;菜譜不只是一張簡單的食譜&#xff0c;更是了解美食文化和飲食知識的重要途徑。然而&#xff0c;若沒有準確的食材配料&#xff0c;烹制出的每道菜品都將難以達到完美的味道。因此&#xff0c;為了更好地滿足人們對于菜譜和食譜的需求&#xff0…

C語言習題集(026)

//寫一個函數&#xff0c;輸入一個4位數字&#xff0c;要求輸出這4個 //數字字符&#xff0c;但每兩個數字間空一個空格。如輸入 //1990&#xff0c;應輸出"1 9 9 0"。 /* */ //解答&#xff1a; #include<stdio.h> void change(int a) { if(a/10!0) { chang…