「Flink」算子主要方法介紹

背景:

上期文章主要講了Flink項目搭建的一些方法,其中對于數據流的處理很大一部分是通過算子來進行計算和處理的,算子也是Flink中功能非常龐大,且很重要的一部分。

算子介紹:

算子在Flink的開發者文檔中是這樣介紹的:通過算子能將一個或多個 DataStream 轉換成新的 DataStream,在應用程序中可以將多個數據轉換算子合并成一個復雜的數據流拓撲。這簡單總結就有點類似于Flink的一些API,來對數據流進行操作處理。

算子介紹目錄:

主要介紹幾個在日常開發中,比較常用的幾個算子方法:

1.FlatMap

2.Filter

3.Window

4.join

5.coGroup

1.FlatMap

flatMap是輸入一個元素同時產生零個、一個或多個元素。通常在日常開發中用于對于數據流的初步處理和合并,將數據流轉換成我們希望輸入的數據格式

方法舉例:

dataStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out)throws Exception {for(String word: value.split(" ")){out.collect(word);}}
});

日常使用舉例:

/// 將binglog獲取的dataChangInfo格式轉換成OrderInfo業務格式
dataStream1.flatMap(new FlatMapFunction<DataChangeInfo, OrderInfo>() {@Overridepublic void flatMap(DataChangeInfo dataChangeInfo, Collector<OrderInfo> collector) throws Exception {OrderInfo orderInfo = JSONObject.parseObject(dataChangeInfo.getAfterData(), OrderInfo.class);log.info("訂單數據:{}", orderInfo);collector.collect(orderInfo);}
});
2.Filter

對數據流進行過濾操作,將一些臟數據或者我們不希望流入的數據進行排除處理
使用舉例:

/// 篩選出訂單狀態小于等于40的訂單數據
orderInfoSingleOutputStream.filter(new FilterFunction<OrderInfo>() {@Overridepublic boolean filter(OrderInfo orderInfo) throws Exception {if (orderInfo.getStatus() <= 40){return true;}return false;}
});
3.Window

Window 根據某些特征(例如,最近 5 秒內到達的數據)對每個 key Stream 中的數據進行分組。就類似于上期文章所講述的窗口,具體介紹可以查看上期文章「Flink」Flink項目搭建方法介紹;

/// 先通過keyby設置主鍵
/// 然后設置一個以事件時間為標定,設一個5秒的窗口
dataStream.keyBy(value -> value.f0).window(TumblingEventTimeWindows.of(Duration.ofSeconds(5))); 
4.Join

根據指定的 key 和窗口 join 兩個數據流。
這個方法通常用在兩個數據流需要通過某個key值進行合并的時候,比如訂單主表和訂單副表需要通過orderId進行數據合并的時候,進行數據處理。

方法舉例:

dataStream.join(otherStream).where(<key selector>).equalTo(<key selector>).window(TumblingEventTimeWindows.of(Duration.ofSeconds(3))).apply (new JoinFunction () {...});

日常使用舉例:

DataStream<OrderOutputInfo> outputInfoDataStream = orderInfoSingleOutputStream.join(orderCodeInfoSingleOutputStream).where(OrderInfo::getId).equalTo(OrderCodeInfo::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).apply(new JoinFunction<OrderInfo, OrderCodeInfo, OrderOutputInfo>() {@Overridepublic OrderOutputInfo join(OrderInfo orderInfo, OrderCodeInfo orderCodeInfo) throws Exception {OrderOutputInfo orderOutputInfo = new OrderOutputInfo();orderOutputInfo.setId(orderInfo.getId());orderOutputInfo.setStatus(orderInfo.getStatus());orderOutputInfo.setCode(orderCodeInfo.getCode());orderOutputInfo.setCreate_time(orderInfo.getCreate_time());log.info("輸出數據:{}", orderOutputInfo);return orderOutputInfo;}});

在這里插入圖片描述

通過斷點,其實可以發現,數據并不是按照一批一批進行輸出的,而是根據key,進行一條一條的輸出的,這個需要注意寫入庫的方法,以免對數據庫寫入產生較大的壓力。
然后該方法會發現一個弊端,那就是如果不在事件窗口期輸入的,那么無法匹配到對應的數據行,那么就會出現數據無法輸出,數據丟失的情況,使用outside,官方推薦的側輸出,也無法有效輸出,這時候比較推薦下面這個方法Cogroup,可以通過自定義的方法進行對未匹配的數據進行輸出報錯;

5.CoGroup

根據指定的 key 和窗口將兩個數據流組合在一起。
CoGroup和Join是個類似的方法,但是CoGroup的數據處理方法里面可以有迭代器,然后在實際數據處理過程中可以通過判斷迭代器,從而實現對于未匹配成功的訂單進行打印輸出。

方法舉例:

dataStream.coGroup(otherStream).where(0).equalTo(1).window(TumblingEventTimeWindows.of(Duration.ofSeconds(3))).apply (new CoGroupFunction () {...});

日常使用舉例:

orderInfoSingleOutputStream.coGroup(orderCodeInfoSingleOutputStream).where(OrderInfo::getId).equalTo(OrderCodeInfo::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).apply(new CoGroupFunction<OrderInfo, OrderCodeInfo, OrderOutputInfo>() {@Overridepublic void coGroup(Iterable<OrderInfo> iterable, Iterable<OrderCodeInfo> iterable1, Collector<OrderOutputInfo> collector) throws Exception {if(iterable.iterator().hasNext() && iterable1.iterator().hasNext()){OrderInfo orderInfo = iterable.iterator().next();OrderCodeInfo orderCodeInfo = iterable1.iterator().next();System.out.println("匹配成功的訂單ID:" + orderInfo.getId() + "  訂單創建時間:" + orderInfo.getCreate_time() + "  status " + orderInfo.getStatus());System.out.println("=============================");OrderOutputInfo orderOutputInfo = new OrderOutputInfo();orderOutputInfo.setId(orderInfo.getId());orderOutputInfo.setStatus(orderInfo.getStatus());orderOutputInfo.setCode(orderCodeInfo.getCode());orderOutputInfo.setCreate_time(orderInfo.getCreate_time());collector.collect(orderOutputInfo);}else if(iterable.iterator().hasNext() && !iterable1.iterator().hasNext()){OrderInfo order = iterable.iterator().next();System.out.println("訂單未找到匹配的訂單-----------Code:"+ order.getId());} else  if(!iterable.iterator().hasNext() && iterable1.iterator().hasNext()){OrderCodeInfo orderCodeInfo = iterable1.iterator().next();System.out.println("未找到匹配的Code訂單-----------Code:" + orderCodeInfo.getId() );}}});

數據輸出日志:
可以看到數據也是一條條匹配后輸出,無法匹配的數據也會在窗口結束后進行輸出展示或告警。

在這里插入圖片描述

總結:

以上幾個算子方法就是平時日常開發中比較常用且好用的方法,大家可以結合各自的業務場景,進行挑選使用。

相關鏈接

Flink

Flink開發者文檔

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/88200.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/88200.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/88200.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

3405. 統計恰好有 K 個相等相鄰元素的數組數目

3405. 統計恰好有 K 個相等相鄰元素的數組數目 給你三個整數 n &#xff0c;m &#xff0c;k 。長度為 n 的 好數組 arr 定義如下&#xff1a; arr 中每個元素都在 閉 區間 [1, m] 中。恰好 有 k 個下標 i &#xff08;其中 1 < i < n&#xff09;滿足 arr[i - 1] arr…

Spring AI 項目實戰(十):Spring Boot + AI + DeepSeek 構建智能合同分析技術實踐(附完整源碼)

系列文章 序號文章名稱1Spring AI 項目實戰(一):Spring AI 核心模塊入門2Spring AI 項目實戰(二):Spring Boot + AI + DeepSeek 深度實戰(附完整源碼)3Spring AI 項目實戰(三):Spring Boot + AI + DeepSeek 打造智能客服系統(附完整源碼)4

impala中時間戳轉(DATE)指定格式的字符串

注意i&#xff1a;注意大小寫 timestamp\date–>string SELECT now(),from_timestamp(now(),yyyyMMdd);string->timestamp SELECT 20230710,to_timestamp(20230710,yyyyMMdd);日期加減 select 20231201,from_timestamp(date_add(to_timestamp(20231201,yyyyMMdd),1),…

百度下拉框出詞技術解密:72小時出下拉詞軟件原理分享

如何才能刷下拉詞&#xff1f;這個問題一直是企業做流量時最糾結的問題&#xff0c;百度下拉詞作為百度搜索體驗中的一項智能化功能&#xff0c;極大地方便了用戶快速完成搜索&#xff0c;也成為了企業在搜索引擎優化&#xff08;SEO&#xff09;策略中的重要流量入口。通過研究…

上海人工智能實驗室明珠湖會議首開,解答AI前沿疑問,推進科學智能

在通用人工智能&#xff08;AGI&#xff09;探索如火如荼的當下&#xff0c;如何加速突破&#xff1f;如何凝練關鍵問題、孕育顛覆性創新&#xff1f;2025年6月13日&#xff0c;上海人工智能實驗室主任、首席科學家&#xff0c;清華大學惠妍講席教授周伯文在首屆明珠湖會議&…

BeyondCompare安裝(永久免費使用+全網最詳細版)

一.下載&#xff1a; 官網下載&#xff08;速度較慢&#xff09;&#xff1a; https://www.scootersoftware.com/download.php 阿里云盤&#xff08;不限速&#xff09; https://www.alipan.com/s/WaG1z54BQ2U 二.安裝&#xff08;無腦下一步即可&#xff09; 三.永久免費…

如何用AI開發完整的小程序<7>—讓AI微調UI排版

上一節我們介紹了如何讓AI修改整體UI視覺效果。 不過有時候AI調整的并不理想&#xff0c;一些UI的布局還是需要微調。 比如已經實現的這個開始頁面&#xff0c;我覺得標題太高了&#xff0c;這時候可以自己調&#xff0c;也可以讓AI單獨調&#xff0c;下面詳細介紹。 一、手動…

64-Oracle Redo Log

小伙伴們&#xff0c;關于數據庫的redo log相信大家都操作很多次了,且這是OCM考試必考內容。Oracle Redo Log是一種特殊的日志文件&#xff0c;用于完整地記錄數據庫中所有數據變更的詳細信息。當數據庫執行插如、更新或刪除等更新操作&#xff0c;這些操作并不會立刻寫入數據庫…

hive集群優化和治理常見的問題答案

Hive 集群優化與治理常見問題答案合集 &#x1f42d;1. Q&#xff1a;Hive中如何優化大表Join操作&#xff1f; A&#xff1a; 使用Map Join&#xff08;小表Join大表時&#xff09;避免Reduce階段。啟用自動Map Join&#xff08;設置hive.auto.convert.jointrue&#xff09;…

C#采集電腦硬件(CPU、GPU、硬盤、內存等)溫度和使用狀況

這是采集出來的Json&#xff0c;部分電腦&#xff08;特別是筆記本&#xff09;無法獲取到&#xff1a; {"HardwareList": [{"Name": "MITX-6999","Type": "主板","Sensors": [],"WmiReport": null}, …

C3新增特性

? 一、選擇器&#xff08;Selectors&#xff09; 1. 屬性選擇器 [attr^value]: 匹配屬性值以特定字符串開頭的元素。[attr$value]: 匹配屬性值以特定字符串結尾的元素。[attr*value]: 匹配屬性值包含特定字符串的元素。 2. 子元素和兄弟元素選擇器 :nth-child(n): 匹配父元…

報錯 @import “~element-ui/packages/theme-chalk/src/index“;

報錯 import "~element-ui/packages/theme-chalk/src/index"; 具體報錯報錯原因 具體報錯 SassError: Can’t find stylesheet to import. import “~element-ui/packages/theme-chalk/src/index”; src\views\login\theme\element-variables.scss 8:9 root stylesh…

ESLint從入門到實戰

引言 作為前端開發者&#xff0c;你是否遇到過這樣的情況&#xff1a;團隊成員寫出的代碼風格各異&#xff0c;有人喜歡用分號&#xff0c;有人不用&#xff1b;有人用雙引號&#xff0c;有人用單引號&#xff1b;代碼評審時總是在糾結這些格式問題而不是業務邏輯&#xff1f;…

vue3實現markdown文檔轉HTML并可更換樣式

vue3實現markdown文檔轉HTML 安裝marked npm install marked<template><!-- 后臺可添加樣式編輯器 --><div class"markdown-editor" :class"{ fullscreen: isFullscreen, preview-mode: isPreviewMode }"><div class"editor-c…

Temu 實時獲取商品動態:一個踩坑后修好的抓數腳本筆記

Temu 作為一個增長迅猛的購物平臺&#xff0c;其商品價格、庫存等信息&#xff0c;對許多做運營分析的小伙伴來說非常有參考價值。 我在寫這個小工具的時候&#xff0c;踩了很多坑&#xff0c;特別記錄下來&#xff0c;希望對你有用。 初版代碼&#xff1a;想當然的“直接來一下…

【軟考高級系統架構論文】論數據分片技術及其應用

論文真題 數據分片就是按照一定的規則,將數據集劃分成相互獨立、 正交的數據子集,然后將數據子集分布到不同的節點上。通過設計合理的數據分片規則,可將系統中的數據分布在不同的物理數據庫中,達到提升應用系統數據處理速度的目的。 請圍繞“論數據分片技術及其應用”論題…

VR飛奪瀘定橋沉浸式歷史再現?

當你戴上 VR 設備開啟這場震撼人心的 VR 飛奪瀘定橋體驗&#xff0c;瞬間就會被拉回到 1935 年那個戰火紛飛的 VR 飛奪瀘定橋的歲月&#xff0c;置身于瀘定橋的西岸 。映入眼簾的是一座由 13 根鐵索組成的瀘定橋&#xff0c;它橫跨在波濤洶涌的大渡河上&#xff0c;橋下江水咆哮…

libwebsockets編譯

#安裝 libwebsocket git clone https://github.com/warmcat/libwebsockets && \ mkdir libwebsockets/build && cd libwebsockets/build && \ cmake -DMAKE_INSTALL_PREFIX:PATH/usr -DCMAKE_C_FLAGS"-fpic" .. && \ make &&…

使用docker部署epg節目單,同時管理自己的直播源

配置 Docker 環境 拉取鏡像并運行&#xff1a; docker run -d \--name php-epg \-v /etc/epg:/htdocs/data \-p 5678:80 \--restart unless-stopped \taksss/php-epg:latest 默認數據目錄為 /etc/epg &#xff0c;根據需要自行修改 默認端口為 5678 &#xff0c;根據需要自行修…

H5新增屬性

? 一、表單相關新增屬性&#xff08;Form Attributes&#xff09; 這些屬性增強了表單功能&#xff0c;提升用戶體驗和前端驗證能力。 1. placeholder 描述&#xff1a;在輸入框為空時顯示提示文本。示例&#xff1a; <input type"text" placeholder"請輸…