一、引言
咱兩書接上回,上一篇文章主要介紹了DataStream API一些基本的使用,主要是針對單數據流的場景下,但是在實際的流處理場景中,常常需要對多個數據流進行合并、拆分等操作,以滿足復雜的業務需求。Flink 的 DataStream API 提供了一系列強大的多流轉換算子,如 union、connect 和 split 等,下面我們來詳細了解一下它們的功能和用法。
二、多流轉換
2.1 union 算子
union 算子的功能非常直接,就是將多個類型相同的 DataStream 合并成一個新的 DataStream 。它適用于需要將多個來源相同或相似的數據合并到一起進行統一處理的場景。例如,在電商場景中,我們可能有來自不同地區的訂單流,希望將它們合并起來進行整體的銷售統計分析。?
下面通過一個簡單的代碼示例來展示 union 算子的使用:
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class UnionExample {public static void main(String[] args) throws Exception {// 創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 定義兩個數據流DataStream<Integer> stream1 = env.fromElements(1, 2, 3);DataStream<Integer> stream2 = env.fromElements(4, 5, 6);// 使用union合并兩個數據流DataStream<Integer> unionStream = stream1.union(stream2);// 打印合并后的數據流unionStream.print();// 執行任務env.execute("Union Example");}
}
?在這個示例中,我們首先創建了兩個包含整數的數據流 stream1 和 stream2,然后使用 union 算子將它們合并成一個新的數據流 unionStream 。最后,通過 print 方法將合并后的數據流輸出到控制臺。合并后的數據特點是,新數據流包含了原始兩個數據流中的所有元素,元素的順序按照它們在原始數據流中的順序依次排列。
2.2 connect 算子
connect 算子與 union 算子不同,它主要用于連接兩個類型可以不同的數據流,并將它們合并成一個 ConnectedStreams 對象。這個對象允許我們在后續處理中分別對兩個流的數據進行操作,從而保留流之間的差異。這種特性在需要對不同類型的數據進行關聯處理,但又要保持數據類型獨立性的場景中非常有用。例如,在一個監控系統中,我們可能有一個數據流表示設備的狀態信息(如溫度、濕度等數值型數據),另一個數據流表示設備的日志信息(字符串類型),我們可以使用 connect 算子將這兩個流連接起來,以便在后續處理中綜合分析設備狀態和日志。?
以下是使用 connect 算子的代碼示例:
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;public class ConnectExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 定義兩個不同類型的數據流DataStream<Integer> stream1 = env.fromElements(1, 2, 3);DataStream<String> stream2 = env.fromElements("a", "b", "c");// 使用connect連接兩個數據流ConnectedStreams<Integer, String> connectedStreams = stream1.connect(stream2);// 對連接后的流進行處理DataStream<Object> resultStream = connectedStreams.map(new CoMapFunction<Integer, String, Object>() {@Overridepublic Object map1(Integer value) {return "Integer: " + value;}@Overridepublic Object map2(String value) {return "String: " + value;}});resultStream.print();env.execute("Connect Example");}
}
在這個示例中,我們創建了一個整數類型的數據流 stream1 和一個字符串類型的數據流 stream2 。通過 connect 算子將它們連接成 ConnectedStreams 對象,然后使用 CoMapFunction 對連接后的流進行處理。CoMapFunction 包含兩個方法 map1 和 map2 ,分別用于處理來自不同流的數據。最終,將處理結果輸出到控制臺。
2.3 split 算子
split 算子的作用與合并相反,它用于將一個 DataStream 根據某些條件拆分成多個 DataStream 。在實際應用中,我們常常需要根據數據的不同特征對數據流進行分類處理,split 算子就可以幫助我們實現這一需求。比如,在一個電商訂單處理系統中,我們可以根據訂單金額的大小將訂單流拆分成小額訂單流和大額訂單流,以便對不同金額范圍的訂單進行不同的處理策略。?
下面是使用 split 算子的代碼實現:
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;
import java.util.List;public class SplitExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 定義一個包含整數的數據流DataStream<Integer> stream = env.fromElements(1, 20, 3, 40, 5, 60);// 使用split算子拆分數據流SplitStream<Integer> splitStream = stream.split(new OutputSelector<Integer>() {@Overridepublic Iterable<String> select(Integer value) {List<String> output = new ArrayList<>();if (value > 10) {output.add("large");} else {output.add("small");}return output;}});// 獲取拆分后的兩個數據流DataStream<Integer> largeStream = splitStream.select("large");DataStream<Integer> smallStream = splitStream.select("small");// 打印拆分后的數據流largeStream.print("Large Stream: ");smallStream.print("Small Stream: ");env.execute("Split Example");}
}
在這個示例中,我們定義了一個包含整數的數據流 stream 。通過 split 算子和自定義的 OutputSelector,根據數值大小將數據流拆分成兩個子流:largeStream 包含大于 10 的整數,smallStream 包含小于等于 10 的整數。最后,分別將這兩個子流輸出到控制臺,并加上相應的標識以便區分。
三、數據下沉(Sink)
在流處理應用中,將處理后的結果數據輸出到各種存儲介質是非常重要的一環。Flink 提供了豐富的數據下沉(Sink)操作,支持將數據寫入文件、Kafka、數據庫等多種存儲系統,以滿足不同場景下的數據持久化和后續處理需求。
3.1 寫入文件?
Flink 提供了多種將數據寫入文件的方法,其中常用的是 writeAsText 方法,它將數據流中的元素以文本形式寫入文件。例如,我們可以將處理后的訂單數據寫入文件,以便后續進行數據分析或存檔。?
以下是一個完整的代碼示例,展示如何從流數據到文件寫入的全流程:
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FileSinkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 定義一個包含字符串的數據流DataStream<String> stream = env.fromElements("apple", "banana", "cherry");// 將數據流寫入文件,路徑為output.txtstream.writeAsText("output.txt");// 執行任務env.execute("File Sink Example");}
}
在上述代碼中,我們首先創建了一個包含水果名稱的數據流 stream 。然后,使用 writeAsText 方法將數據流中的元素寫入名為 output.txt 的文件中。在實際應用中,需要注意文件寫入模式和配置,writeAsText 默認使用追加模式寫入文件,如果文件已存在,新的數據會追加到文件末尾。還可以通過配置參數來指定寫入模式(如覆蓋模式)、緩沖區大小等。例如,通過設置 env.setBufferTimeout (1000) 可以調整緩沖區的超時時間,當緩沖區數據達到一定時間(這里是 1 秒)或大小限制時,會被寫入文件。
3.2?寫入 Kafka?
Flink 與 Kafka 的集成非常緊密,通過 FlinkKafkaProducer 可以方便地將處理結果寫入 Kafka 主題。這種方式在構建實時數據管道時非常常見,處理后的數據可以被其他系統從 Kafka 中消費,實現數據的共享和進一步處理。其原理是通過配置 Kafka 集群的地址、目標主題等參數,FlinkKafkaProducer 將 Flink 處理后的數據流轉換為 Kafka 可接受的消息格式,并發送到指定的 Kafka 主題中 。?
以下是連接 Kafka、配置參數以及數據寫入 Kafka 的代碼示例:
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import java.util.Properties;public class KafkaSinkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 定義一個包含字符串的數據流DataStream<String> stream = env.fromElements("message1", "message2", "message3");// 配置Kafka參數Properties properties = new Properties();properties.setProperty("bootstrap.servers", "localhost:9092");properties.setProperty("acks", "all");// 創建FlinkKafkaProducer,將數據寫入Kafka的test-topic主題FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("test-topic",new SimpleStringSchema(),properties);// 將數據流寫入Kafkastream.addSink(kafkaProducer);// 執行任務env.execute("Kafka Sink Example");}
}
在這個示例中,我們首先創建了一個包含消息的數據流 stream 。然后,配置了 Kafka 的連接參數,包括 Kafka 集群地址(bootstrap.servers)和 acks 參數(這里設置為 "all",表示等待所有副本確認寫入成功,以確保數據的可靠性)。接著,創建了 FlinkKafkaProducer 對象,指定了要寫入的 Kafka 主題(test-topic)和序列化器(SimpleStringSchema,用于將字符串數據轉換為 Kafka 消息格式)。最后,通過 addSink 方法將數據流寫入 Kafka。
3.3 寫入數據庫(以 Redis 為例)?
以 Redis 為例,將 Flink 處理后的數據寫入 Redis 可以實現數據的快速存儲和查詢,適用于對數據讀寫性能要求較高的場景。連接 Redis 數據庫的步驟主要包括引入相關依賴、創建 Jedis 連接池配置以及在 Flink 中自定義 Sink 函數來實現數據寫入。?
以下是自定義 RichSinkFunction 實現數據寫入 Redis 的代碼:
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPoolConfig;public class RedisSinkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 定義一個包含字符串的數據流DataStream<String> stream = env.fromElements("key1:value1", "key2:value2", "key3:value3");// 將數據流寫入Redisstream.addSink(new RedisSink());// 執行任務env.execute("Redis Sink Example");}public static class RedisSink extends RichSinkFunction<String> {private transient Jedis jedis;@Overridepublic void open(org.apache.flink.configuration.Configuration parameters) throws Exception {super.open(parameters);// 創建Jedis連接池配置JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();// 初始化Jedis連接,連接到本地Redis服務jedis = new Jedis("localhost", 6379);}@Overridepublic void invoke(String value, Context context) throws Exception {// 按冒號分割字符串,得到鍵值對String[] parts = value.split(":", 2);String key = parts[0];String data = parts[1];// 將數據寫入Redisjedis.set(key, data);}@Overridepublic void close() throws Exception {super.close();// 關閉Jedis連接if (jedis != null) {jedis.close();}}}
}
?在這段代碼中,我們定義了一個自定義的 RichSinkFunction,即 RedisSink 。在 open 方法中,創建了 Jedis 連接池配置,并初始化了 Jedis 連接,連接到本地的 Redis 服務(地址為localhost,端口為 6379)。在 invoke 方法中,對輸入的字符串進行處理,將其按冒號分割為鍵值對,然后使用 jedis.set 方法將數據寫入 Redis。在 close 方法中,關閉 Jedis 連接,釋放資源。通過這種方式,Flink 處理后的數據流中的數據就可以成功寫入 Redis 數據庫。
四、總結
Flink DataStream API 的多流轉換操作,如 union、connect 和 split 等算子,為我們提供了強大的工具,使他們能夠靈活地處理多個數據流之間的復雜關系。通過這些算子,我們可以將不同來源的數據進行合并,對不同類型的數據進行關聯處理,以及根據數據特征進行分類拆分,從而滿足各種復雜業務場景下的流處理需求。?