Flink Flink中的分流

一、什么是分流

所謂“分流”,就是將一條數據流拆分成完全獨立的兩條、甚至多條流。也就是基于一個DataStream,定義一些篩選條件,將符合條件的數據揀選出來放到對應的流里。
在這里插入圖片描述

二、基于filter算子的簡單實現分流

其實根據條件篩選數據的需求,本身非常容易實現:只要針對同一條流多次獨立調用.filter()方法進行篩選,就可以得到拆分之后的流了。
案例需求:讀取一個整數數字流,將數據流劃分為奇數流和偶數流。

package com.flink.DataStream.SplitStream;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FlinkSplitStreamByFilter {public static void main(String[] args) throws Exception {//TODO 創建Flink上下文執行環境StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration().set(RestOptions.BIND_PORT, "8081"));//.getExecutionEnvironment();//TODO 設置全局并行度為2streamExecutionEnvironment.setParallelism(2);DataStreamSource<String> dataStreamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);//TODO 先將輸入流轉為Integer類型SingleOutputStreamOperator<Integer> mapResult = dataStreamSource.map((input) -> {int i = Integer.parseInt(input);return i;});//TODO 使用匿名函數分流偶數流SingleOutputStreamOperator<Integer> ds1 = mapResult.filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer a) throws Exception {return a % 2 == 0;}});//TODO 使用lamda表達式分流奇數流SingleOutputStreamOperator<Integer> ds2 = mapResult.filter((a) -> a % 2 == 1);ds1.print("偶數流");ds2.print("奇數流");streamExecutionEnvironment.execute();}
}

執行結果

奇數流:1> 1
偶數流:2> 2
偶數流:1> 2
偶數流:2> 4
奇數流:1> 3
奇數流:2> 1Process finished with exit code 130 (interrupted by signal 2: SIGINT)

這種實現非常簡單,但代碼顯得有些冗余——我們的處理邏輯對拆分出的三條流其實是一樣的,卻重復寫了三次。而且這段代碼背后的含義,是將原始數據流 stream 復制三份,然后對每一份分別做篩選;這明顯是不夠高效的。我們自然想到,能不能不用復制流,直接用一個算子就把它們都拆分開呢?

三、使用測輸出流

關于處理函數中側輸出流的用法,我們已經在 7.5 節做了詳細介紹。簡單來說,只需要調用上下文 ctx 的.output()方法,就可以輸出任意類型的數據了。而側輸出流的標記和提取,都離不開一個“輸出標簽”(OutputTag),指定了側輸出流的 id 和類型。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/167460.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/167460.shtml
英文地址,請注明出處:http://en.pswp.cn/news/167460.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

面了一個4年經驗的測試工程師,自動化都不會也要15k,我也是醉了····

&#x1f4e2;專注于分享軟件測試干貨內容&#xff0c;歡迎點贊 &#x1f44d; 收藏 ?留言 &#x1f4dd; 如有錯誤敬請指正&#xff01;&#x1f4e2;交流討論&#xff1a;歡迎加入我們一起學習&#xff01;&#x1f4e2;資源分享&#xff1a;耗時200小時精選的「軟件測試」資…

表單考勤簽到作業周期打卡打分評價評分小程序開源版開發

表單考勤簽到作業周期打卡打分評價評分小程序開源版開發 表單打卡評分 表單簽到功能&#xff1a;學生可以通過掃描二維碼或輸入簽到碼進行簽到&#xff0c;方便教師進行考勤管理。 考勤功能&#xff1a;可以記錄學生的出勤情況&#xff0c;并自動生成出勤率和缺勤次數等統計數…

本地緩存與分布式緩存

一、緩存的概念 在服務端編程當中&#xff0c;緩存主要是指將數據庫的數據加載到內存中&#xff0c;之后對該數據的訪問都在內存中完成&#xff0c;從而減少了對數據庫的訪問&#xff0c;解決了高并發場景中數據庫容易成為性能瓶頸的問題&#xff1b;以及基于內存的訪問速度高…

ruoyi-plus-vue部署

安裝虛擬機 部署文檔 安裝docker 安裝docker 安裝docker-compose 可能遇到的錯誤 Failed to deploy ruoyi/ruoyi-server:5.1.0 Dockerfile: ruoyi-admin/Dockerfile: Cant retrieve im age ID from build stream 安裝 vim 命令 yum install vim -y 修改文件 vim /etc/re…

flutter 無法從H5 WebView 訪問攝像頭和錄音權限

AndroidManifest.xml需要在 中添加以下權限&#xff1a; <uses-permission android:name"android.permission.INTERNET"/> <uses-permission android:name"android.permission.CAMERA" /> <uses-permission android:name"android.per…

基于Springboot的冬奧會科普平臺(有報告),Javaee項目,springboot項目。

演示視頻&#xff1a; 基于Springboot的冬奧會科普平臺&#xff08;有報告&#xff09;&#xff0c;Javaee項目&#xff0c;springboot項目。 項目介紹&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三層…

線性表之鏈式表

文章目錄 主要內容一.單鏈表1.頭插法建立單鏈表代碼如下&#xff08;示例&#xff09;: 2.尾插法建立單鏈表代碼如下&#xff08;示例&#xff09;: 3.按序號查找結點值代碼如下&#xff08;示例&#xff09;: 4.按值查找表結點代碼如下&#xff08;示例&#xff09;: 5.插入節…

ELK+kafka+filebeat企業內部日志分析系統

1、組件介紹 1、Elasticsearch&#xff1a; 是一個基于Lucene的搜索服務器。提供搜集、分析、存儲數據三大功能。它提供了一個分布式多用戶能力的全文搜索引擎&#xff0c;基于RESTful web接口。Elasticsearch是用Java開發的&#xff0c;并作為Apache許可條款下的開放源碼發布…

module ‘d2l.torch‘ has no attribute ‘train_ch3‘

解決方法&#xff1a; 方法1&#xff1a; 如果沒有安裝d2l&#xff0c;請安裝 詳細步驟見安裝d2l 方法2&#xff1a; 先卸載舊的版本 pip uninstall d2l再下載新的版本&#xff0c;需要以管理員身份運行下載指令 pip install d2l0.17.5 --user完美解決&#xff01; ????…

創新研報|企業如何在不確定時期突破至新高度?

報告下載地址&#xff1a; 創新研報&#xff5c;BCG 2023最創新企業研究-在不確定時期躍升新高度 創新從未如此重要&#xff0c;領先的企業創新者正在證明這一切。BCG&#xff08;于2005年首次發布年度創新報告&#xff0c;其中列出了全球創新高管最欽佩的50家企業&#xf…

2824. 統計和小于目標的下標對數目 --力扣 --JAVA

題目 給你一個下標從 0 開始長度為 n 的整數數組 nums 和一個整數 target &#xff0c;請你返回滿足 0 < i < j < n 且 nums[i] nums[j] < target 的下標對 (i, j) 的數目。 解題思路 對數組進行排序&#xff0c;可以利用List自帶的sort函數傳遞比較規則(代碼中的…

【MATLAB源碼-第88期】基于matlab的灰狼優化算法(GWO)的柵格路徑規劃,輸出做短路徑圖和適應度曲線

操作環境&#xff1a; MATLAB 2022a 1、算法描述 灰狼優化算法&#xff08;Grey Wolf Optimizer, GWO&#xff09;是一種模仿灰狼捕食行為的優化算法。灰狼是群居動物&#xff0c;有著嚴格的社會等級結構。在灰狼群體中&#xff0c;通常有三個等級&#xff1a;首領&#xff…

數據結構-歸并排序+計數排序

1.歸并排序 基本思想&#xff1a; 歸并排序是建立在歸并操作上的一種有效的排序算法,該算法是采用分治法的一個非常典型的應用。將已有序的子序列合并&#xff0c;得到完全有序的序列&#xff1b;即先使每個子序列有序&#xff0c;再使子序列段間有序。若將兩個有序表合并成一個…

2023年P氣瓶充裝證模擬考試題庫及P氣瓶充裝理論考試試題

題庫來源&#xff1a;安全生產模擬考試一點通公眾號小程序 2023年P氣瓶充裝證模擬考試題庫及P氣瓶充裝理論考試試題是由安全生產模擬考試一點通提供&#xff0c;P氣瓶充裝證模擬考試題庫是根據P氣瓶充裝最新版教材&#xff0c;P氣瓶充裝大綱整理而成&#xff08;含2023年P氣瓶…

pulseaudio是如何測試出音頻延遲的

通常專業的音頻設備生產廠商都有專業的設備來測試精確的音頻鏈路延時。 那么沒有專業設備怎么測試出音頻延遲呢?如下圖,我們可以看到pulseaudio可以測試出硬件音頻延遲。 那么,他是怎么測試出硬件延遲的呢?他的理論依據是什么呢?接下來我帶大伙一起探索一下。 /*占位…

紅隊攻防實戰之從邊界突破到漫游內網(無cs和msf)

也許有一天我們再相逢&#xff0c;睜大眼睛看清楚&#xff0c;我才是英雄。 本文首發于先知社區&#xff0c;原創作者即是本人 本篇文章目錄 網絡拓撲圖&#xff1a; 本次紅隊攻防實戰所需繪制的拓撲圖如下&#xff1a; 邊界突破 訪問網站&#xff1a; http://xxx.xxx.xxx…

leetcode刷題記錄——1991. 找到數組的中間位置

找到數組的中間位置 給你一個下標從 0 開始的整數數組 nums &#xff0c;請你找到 最左邊 的中間位置 middleIndex &#xff08;也就是所有可能中間位置下標最小的一個&#xff09;。 中間位置 middleIndex 是滿足 nums[0] nums[1] … nums[middleIndex-1] nums[middleInd…

數據傳輸的思考

Wi-Fi&#xff1a;Wi-Fi是一種無線網絡技術&#xff0c;可以用于無線互聯網接入、局域網通信和數據傳輸等。Wi-Fi基于IEEE 802.11標準&#xff0c;通過無線信號傳輸數據&#xff0c;提供高速的無線網絡連接。Wi-Fi可用于連接設備與路由器或者設備之間的直接通信&#xff0c;可以…

Linux 排查必看文件

目錄 1. 登錄日志 1.1 /var/log/wtmp 1.2 /var/log/btmp.* 1.3 /var/log/lastlog 1.4 /var/log/faillog 1.5 /var/log/secure 1.6 /var/log/auth.log 2. 系統日志 2.1 /var/log/cron.* 2.2 /var/log/syslog 2.3 /var/log/audit/audit.*log 3. 歷史命令 3.1 ~/…

Docker 中OpenResty下載與使用

1Panel安裝OpenResty 查看到就說明安裝成功 部署項目 在http中添加&#xff1a; server { listen 8001; //端口號 server_name localhost; location / { root /admin; //項目路徑 index index.html index.htm; …