Flink DataStream API詳解(二)

一、引言

咱兩書接上回,上一篇文章主要介紹了DataStream API一些基本的使用,主要是針對單數據流的場景下,但是在實際的流處理場景中,常常需要對多個數據流進行合并、拆分等操作,以滿足復雜的業務需求。Flink 的 DataStream API 提供了一系列強大的多流轉換算子,如 union、connect 和 split 等,下面我們來詳細了解一下它們的功能和用法。

二、多流轉換

2.1 union 算子

union 算子的功能非常直接,就是將多個類型相同的 DataStream 合并成一個新的 DataStream 。它適用于需要將多個來源相同相似的數據合并到一起進行統一處理的場景。例如,在電商場景中,我們可能有來自不同地區的訂單流,希望將它們合并起來進行整體的銷售統計分析。?

下面通過一個簡單的代碼示例來展示 union 算子的使用:

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class UnionExample {public static void main(String[] args) throws Exception {// 創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 定義兩個數據流DataStream<Integer> stream1 = env.fromElements(1, 2, 3);DataStream<Integer> stream2 = env.fromElements(4, 5, 6);// 使用union合并兩個數據流DataStream<Integer> unionStream = stream1.union(stream2);// 打印合并后的數據流unionStream.print();// 執行任務env.execute("Union Example");}
}

?在這個示例中,我們首先創建了兩個包含整數的數據流 stream1 和 stream2,然后使用 union 算子將它們合并成一個新的數據流 unionStream 。最后,通過 print 方法將合并后的數據流輸出到控制臺。合并后的數據特點是,新數據流包含了原始兩個數據流中的所有元素,元素的順序按照它們在原始數據流中的順序依次排列。

2.2 connect 算子

connect 算子與 union 算子不同,它主要用于連接兩個類型可以不同的數據流,并將它們合并成一個 ConnectedStreams 對象。這個對象允許我們在后續處理中分別對兩個流的數據進行操作,從而保留流之間的差異。這種特性在需要對不同類型的數據進行關聯處理,但又要保持數據類型獨立性的場景中非常有用。例如,在一個監控系統中,我們可能有一個數據流表示設備的狀態信息(如溫度、濕度等數值型數據),另一個數據流表示設備的日志信息(字符串類型),我們可以使用 connect 算子將這兩個流連接起來,以便在后續處理中綜合分析設備狀態和日志。?

以下是使用 connect 算子的代碼示例:

import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;public class ConnectExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 定義兩個不同類型的數據流DataStream<Integer> stream1 = env.fromElements(1, 2, 3);DataStream<String> stream2 = env.fromElements("a", "b", "c");// 使用connect連接兩個數據流ConnectedStreams<Integer, String> connectedStreams = stream1.connect(stream2);// 對連接后的流進行處理DataStream<Object> resultStream = connectedStreams.map(new CoMapFunction<Integer, String, Object>() {@Overridepublic Object map1(Integer value) {return "Integer: " + value;}@Overridepublic Object map2(String value) {return "String: " + value;}});resultStream.print();env.execute("Connect Example");}
}

在這個示例中,我們創建了一個整數類型的數據流 stream1 和一個字符串類型的數據流 stream2 。通過 connect 算子將它們連接成 ConnectedStreams 對象,然后使用 CoMapFunction 對連接后的流進行處理。CoMapFunction 包含兩個方法 map1 和 map2 ,分別用于處理來自不同流的數據。最終,將處理結果輸出到控制臺。

2.3 split 算子

split 算子的作用與合并相反,它用于將一個 DataStream 根據某些條件拆分成多個 DataStream 。在實際應用中,我們常常需要根據數據的不同特征對數據流進行分類處理,split 算子就可以幫助我們實現這一需求。比如,在一個電商訂單處理系統中,我們可以根據訂單金額的大小將訂單流拆分成小額訂單流和大額訂單流,以便對不同金額范圍的訂單進行不同的處理策略。?

下面是使用 split 算子的代碼實現:

import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;
import java.util.List;public class SplitExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 定義一個包含整數的數據流DataStream<Integer> stream = env.fromElements(1, 20, 3, 40, 5, 60);// 使用split算子拆分數據流SplitStream<Integer> splitStream = stream.split(new OutputSelector<Integer>() {@Overridepublic Iterable<String> select(Integer value) {List<String> output = new ArrayList<>();if (value > 10) {output.add("large");} else {output.add("small");}return output;}});// 獲取拆分后的兩個數據流DataStream<Integer> largeStream = splitStream.select("large");DataStream<Integer> smallStream = splitStream.select("small");// 打印拆分后的數據流largeStream.print("Large Stream: ");smallStream.print("Small Stream: ");env.execute("Split Example");}
}

在這個示例中,我們定義了一個包含整數的數據流 stream 。通過 split 算子和自定義的 OutputSelector,根據數值大小將數據流拆分成兩個子流:largeStream 包含大于 10 的整數,smallStream 包含小于等于 10 的整數。最后,分別將這兩個子流輸出到控制臺,并加上相應的標識以便區分。

三、數據下沉(Sink)

在流處理應用中,將處理后的結果數據輸出到各種存儲介質是非常重要的一環。Flink 提供了豐富的數據下沉(Sink)操作,支持將數據寫入文件、Kafka、數據庫等多種存儲系統,以滿足不同場景下的數據持久化和后續處理需求。

3.1 寫入文件?

Flink 提供了多種將數據寫入文件的方法,其中常用的是 writeAsText 方法,它將數據流中的元素以文本形式寫入文件。例如,我們可以將處理后的訂單數據寫入文件,以便后續進行數據分析或存檔。?

以下是一個完整的代碼示例,展示如何從流數據到文件寫入的全流程:

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FileSinkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 定義一個包含字符串的數據流DataStream<String> stream = env.fromElements("apple", "banana", "cherry");// 將數據流寫入文件,路徑為output.txtstream.writeAsText("output.txt");// 執行任務env.execute("File Sink Example");}
}

在上述代碼中,我們首先創建了一個包含水果名稱的數據流 stream 。然后,使用 writeAsText 方法將數據流中的元素寫入名為 output.txt 的文件中。在實際應用中,需要注意文件寫入模式和配置,writeAsText 默認使用追加模式寫入文件,如果文件已存在,新的數據會追加到文件末尾。還可以通過配置參數來指定寫入模式(如覆蓋模式)、緩沖區大小等。例如,通過設置 env.setBufferTimeout (1000) 可以調整緩沖區的超時時間,當緩沖區數據達到一定時間(這里是 1 秒)或大小限制時,會被寫入文件。

3.2?寫入 Kafka?

Flink 與 Kafka 的集成非常緊密,通過 FlinkKafkaProducer 可以方便地將處理結果寫入 Kafka 主題。這種方式在構建實時數據管道時非常常見,處理后的數據可以被其他系統從 Kafka 中消費,實現數據的共享和進一步處理。其原理是通過配置 Kafka 集群的地址、目標主題等參數,FlinkKafkaProducer 將 Flink 處理后的數據流轉換為 Kafka 可接受的消息格式,并發送到指定的 Kafka 主題中 。?

以下是連接 Kafka、配置參數以及數據寫入 Kafka 的代碼示例:

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import java.util.Properties;public class KafkaSinkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 定義一個包含字符串的數據流DataStream<String> stream = env.fromElements("message1", "message2", "message3");// 配置Kafka參數Properties properties = new Properties();properties.setProperty("bootstrap.servers", "localhost:9092");properties.setProperty("acks", "all");// 創建FlinkKafkaProducer,將數據寫入Kafka的test-topic主題FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("test-topic",new SimpleStringSchema(),properties);// 將數據流寫入Kafkastream.addSink(kafkaProducer);// 執行任務env.execute("Kafka Sink Example");}
}

在這個示例中,我們首先創建了一個包含消息的數據流 stream 。然后,配置了 Kafka 的連接參數,包括 Kafka 集群地址(bootstrap.servers)和 acks 參數(這里設置為 "all",表示等待所有副本確認寫入成功,以確保數據的可靠性)。接著,創建了 FlinkKafkaProducer 對象,指定了要寫入的 Kafka 主題(test-topic)和序列化器(SimpleStringSchema,用于將字符串數據轉換為 Kafka 消息格式)。最后,通過 addSink 方法將數據流寫入 Kafka。

3.3 寫入數據庫(以 Redis 為例)?

以 Redis 為例,將 Flink 處理后的數據寫入 Redis 可以實現數據的快速存儲和查詢,適用于對數據讀寫性能要求較高的場景。連接 Redis 數據庫的步驟主要包括引入相關依賴、創建 Jedis 連接池配置以及在 Flink 中自定義 Sink 函數來實現數據寫入。?

以下是自定義 RichSinkFunction 實現數據寫入 Redis 的代碼:

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPoolConfig;public class RedisSinkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 定義一個包含字符串的數據流DataStream<String> stream = env.fromElements("key1:value1", "key2:value2", "key3:value3");// 將數據流寫入Redisstream.addSink(new RedisSink());// 執行任務env.execute("Redis Sink Example");}public static class RedisSink extends RichSinkFunction<String> {private transient Jedis jedis;@Overridepublic void open(org.apache.flink.configuration.Configuration parameters) throws Exception {super.open(parameters);// 創建Jedis連接池配置JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();// 初始化Jedis連接,連接到本地Redis服務jedis = new Jedis("localhost", 6379);}@Overridepublic void invoke(String value, Context context) throws Exception {// 按冒號分割字符串,得到鍵值對String[] parts = value.split(":", 2);String key = parts[0];String data = parts[1];// 將數據寫入Redisjedis.set(key, data);}@Overridepublic void close() throws Exception {super.close();// 關閉Jedis連接if (jedis != null) {jedis.close();}}}
}

?在這段代碼中,我們定義了一個自定義的 RichSinkFunction,即 RedisSink 。在 open 方法中,創建了 Jedis 連接池配置,并初始化了 Jedis 連接,連接到本地的 Redis 服務(地址為localhost,端口為 6379)。在 invoke 方法中,對輸入的字符串進行處理,將其按冒號分割為鍵值對,然后使用 jedis.set 方法將數據寫入 Redis。在 close 方法中,關閉 Jedis 連接,釋放資源。通過這種方式,Flink 處理后的數據流中的數據就可以成功寫入 Redis 數據庫。

四、總結

Flink DataStream API 的多流轉換操作,如 union、connect 和 split 等算子,為我們提供了強大的工具,使他們能夠靈活地處理多個數據流之間的復雜關系。通過這些算子,我們可以將不同來源的數據進行合并,對不同類型的數據進行關聯處理,以及根據數據特征進行分類拆分,從而滿足各種復雜業務場景下的流處理需求。?

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/88489.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/88489.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/88489.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Unity3D游戲線上崩潰排查指南

前言 排查Unity3D線上游戲崩潰是個系統工程&#xff0c;需要結合工具鏈、日志分析和版本管理。以下是詳細的排查指南和關鍵步驟&#xff1a; 對惹&#xff0c;這里有一個游戲開發交流小組&#xff0c;希望大家可以點擊進來一起交流一下開發經驗呀&#xff01; 一、崩潰信息收…

DPDK性能優化實踐:系統級性能調優的方法論與實戰(一套通用的方法論)

性能優化的挑戰與現實困境 在高性能網絡處理領域&#xff0c;性能優化往往被視為一門“玄學”而非科學。許多開發者在面對性能瓶頸時&#xff0c;要么盲目追求單一指標的極致優化&#xff0c;要么采用"試錯法"進行零散的局部調優&#xff0c;結果往往是投入大量精力卻…

Docker的/var/lib/docker/目錄占用100%的處理方法

文章目錄 一、問題描述 二、解決措施 三、可能遇到的問題 問題1、問題描述&#xff1a;執行 sudo systemctl stop docker 命令時&#xff0c;提示 Warning: Stopping docker.service, but it can still be activated by: docker.socket 問題2、問題描述&#xff1a;執行 s…

【UE教程/進階】Slate鏈式編輯原理

目錄鏈式編輯操作" . "操作" "操作" [ ] "鏈式編輯 SNew().&#xfeff;[] 操作" . " SLATE_ARGUMENT(ArgType, ArgName) 宏 調用宏 SLATE_PRIVATE_ARGUMENT_VARIABLE(ArgType, ArgName) &#xff0c;在FArgument結構體中添加了變量…

將手工建模模型(fbx、obj)轉換為3dtiles的免費工具!

文章目錄1、工具下載2、使用說明3、詳細說明命令行格式示例命令參數說明4、源碼地址1、工具下載 百度網盤下載鏈接 選擇最新版本下載即可&#xff0c;支持Linux和Windows系統 2、使用說明 1&#xff09;按住鍵盤winr鍵&#xff0c;在彈出的窗口中輸入cmd 2&#xff09;點擊…

FreeRTOS源碼學習之內核初始化

目錄 前言 一、主函數內容 二、osKernelInitialize ()內核初始化函數內容 三、IS_IRQ()宏定義中斷檢測函數內容 四、如果這篇文章能幫助到你&#xff0c;請點個贊鼓勵一下吧ξ( ?&#xff1e;??)~ 前言 使用STM32CubeMX添加FreeRTOS進入工程之后&#xff0c;會自動在ma…

Docker—— 鏡像構建原因

在現代軟件開發和運維中&#xff0c;Docker已成為一種非常流行的工具&#xff0c;它通過容器化應用程序來簡化部署過程。然而&#xff0c;默認的官方鏡像往往只能滿足基礎需求&#xff0c;無法涵蓋所有特定項目的具體要求。原因說明系統級改動無法通過 volume 實現修改用戶、刪…

鋰電池自動化生產線的現狀與發展

鋰電池自動化生產線的概述鋰電池自動化生產線是指采用自動化設備和控制系統&#xff0c;實現鋰電池從原材料到成品的全流程自動化生產過程。隨著新能源產業的快速發展&#xff0c;鋰電池作為重要的儲能元件&#xff0c;其生產制造技術也在不斷進步。自動化生產線通過減少人工干…

java底層的native和沙箱安全機制

沙箱安全機制沙箱&#xff08;Sandbox&#xff09;安全機制是一種將程序或代碼運行在隔離環境中的安全技術&#xff0c;旨在限制其對系統資源&#xff08;如文件系統、網絡、內存、其他進程等&#xff09;的訪問權限&#xff0c;從而降低潛在惡意代碼帶來的風險。其核心思想是“…

【分享】文件擺渡系統適配醫療場景:安全與效率兼得

根據國家信息安全相關法規要求&#xff0c;醫院為了網絡安全&#xff0c;大多會采用網閘等隔離手段&#xff0c;將網絡隔離為內網和外網&#xff0c;但網絡隔離后&#xff0c;醫院的內外網間仍存在較為頻繁的文件擺渡需求。文件擺渡系統則是可以解決跨網絡或跨安全域文件傳輸中…

vscode 中的 mermaid

一、安裝軟件 Mermaid preview Mermaid support 二、運行命令 創建.md 文件右鍵選擇 ?Open Preview?&#xff08;或按 CtrlShiftV&#xff09; 三、流程圖 注意&#xff1a; 要md 文件要保留 mermaid 1. #mermaid-svg-nchqbvlWePe5KCwJ {font-family:"trebuchet ms"…

微服務引擎 MSE 及云原生 API 網關 2025 年 6 月產品動態

點擊此處&#xff0c;了解微服務引擎 MSE 產品詳情。

【TCP/IP】7. IP 路由

7. IP 路由7. IP 路由概述7.1 直接傳遞與間接傳遞7.2 IP 路由核心機制7.3 路由表7.3.1 路由表的構成7.3.2 信宿地址采用網絡地址的好處7.3.3 下一跳地址的優勢7.3.4 特殊路由表項7.3.5 路由算法7.4 靜態路由7.4.1 特點7.4.2 自治系統&#xff08;AS&#xff09;7.4.3 配置命令7…

xFile:高性能虛擬分布式加密存儲系統——Go

xFile&#xff1a;高性能虛擬分布式加密存儲系統 目錄xFile&#xff1a;高性能虛擬分布式加密存儲系統1 背景介紹2 設計初衷與目標3 項目簡介4 系統架構5 核心優勢1. 真正的分布式塊存儲2. 塊級加密與壓縮&#xff0c;安全高效3. 靈活的索引與元數據管理4. 多用戶與權限體系5. …

時序數據庫:高效處理時間序列數據的核心技術

時序數據庫概述時序數據庫&#xff08;Time Series Database&#xff0c;TSDB&#xff09;是一種專門為存儲、處理和查詢時間序列數據而優化的數據庫系統。隨著物聯網、金融科技、工業互聯網等領域的快速發展&#xff0c;時序數據呈現出爆炸式增長&#xff0c;傳統的關系型數據…

面試官:你再問TCP三次握手,我就要報警了!

CP三次握手和四次揮手&#xff0c;是面試官最愛問的“開場白”之一 別看它基礎&#xff0c;真要講清楚細節&#xff0c;分分鐘讓你冷汗直流&#xff01; 這玩意兒就跟程序員相親一樣&#xff1a; 表面上問的是“你老家哪的” 實際上是在試探你有沒有房、有沒有車、能不能落…

RuoYi+Uniapp(uni-ui)開發商城系統

如果你正在考慮用 RuoYi 和 UniApp&#xff08;uni-ui&#xff09;搭建一套商城系統&#xff0c;那這套組合確實值得好好研究。它整合了 RuoYi 的快速開發能力和 UniApp 的跨平臺特性&#xff0c;在高效開發的同時還能兼顧多端適配的需求。下面從技術架構、功能模塊、開發實踐到…

面試150 二叉樹的最大高度

思路 考慮從遞歸出發&#xff0c;聯想遞歸三部曲&#xff1a;返回什么、傳入的參數是什么、遍歷的方式是什么。此題現在需要我們整個樹&#xff0c;并且需要從根節點出發&#xff0c;因此我們選擇先序遍歷即可。另一張辦法&#xff0c;則是選擇通過隊列實現層次遍歷&#xff0c…

從零實現一個GPT 【React + Express】--- 【2】實現對話流和停止生成

摘要 這是本系列文章的第二篇&#xff0c;開始之前我們先回顧一下上一篇文章的內容&#xff1a; 從零實現一個GPT 【React Express】— 【1】初始化前后端項目&#xff0c;實現模型接入SSE 在這一篇中&#xff0c;我們主要創建了前端工程和后端工程&#xff0c;這里貼一下我…

SEQUENCE在RAC多實例開啟CACHE的NEXTVAL數值亂序問題

問題說明 在多實例環境中可能會出現從Sequence所取出來的nextval是亂序的&#xff0c;比如第二次比第一次所取的數要小但這并不是我們所希望的。當程序邏輯Base on sequence.nextval數值所謂填充字段的大小來排序時&#xff0c;就會產生問題。 實際上就是由于多實例這一特性造成…