一、什么是分流
所謂“分流”,就是將一條數據流拆分成完全獨立的兩條、甚至多條流。也就是基于一個DataStream,定義一些篩選條件,將符合條件的數據揀選出來放到對應的流里。
二、基于filter算子的簡單實現分流
其實根據條件篩選數據的需求,本身非常容易實現:只要針對同一條流多次獨立調用.filter()方法進行篩選,就可以得到拆分之后的流了。
案例需求:讀取一個整數數字流,將數據流劃分為奇數流和偶數流。
package com.flink.DataStream.SplitStream;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FlinkSplitStreamByFilter {public static void main(String[] args) throws Exception {//TODO 創建Flink上下文執行環境StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration().set(RestOptions.BIND_PORT, "8081"));//.getExecutionEnvironment();//TODO 設置全局并行度為2streamExecutionEnvironment.setParallelism(2);DataStreamSource<String> dataStreamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);//TODO 先將輸入流轉為Integer類型SingleOutputStreamOperator<Integer> mapResult = dataStreamSource.map((input) -> {int i = Integer.parseInt(input);return i;});//TODO 使用匿名函數分流偶數流SingleOutputStreamOperator<Integer> ds1 = mapResult.filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer a) throws Exception {return a % 2 == 0;}});//TODO 使用lamda表達式分流奇數流SingleOutputStreamOperator<Integer> ds2 = mapResult.filter((a) -> a % 2 == 1);ds1.print("偶數流");ds2.print("奇數流");streamExecutionEnvironment.execute();}
}
執行結果
奇數流:1> 1
偶數流:2> 2
偶數流:1> 2
偶數流:2> 4
奇數流:1> 3
奇數流:2> 1Process finished with exit code 130 (interrupted by signal 2: SIGINT)
這種實現非常簡單,但代碼顯得有些冗余——我們的處理邏輯對拆分出的三條流其實是一樣的,卻重復寫了三次。而且這段代碼背后的含義,是將原始數據流 stream 復制三份,然后對每一份分別做篩選;這明顯是不夠高效的。我們自然想到,能不能不用復制流,直接用一個算子就把它們都拆分開呢?
三、使用測輸出流
關于處理函數中側輸出流的用法,我們已經在 7.5 節做了詳細介紹。簡單來說,只需要調用上下文 ctx 的.output()方法,就可以輸出任意類型的數據了。而側輸出流的標記和提取,都離不開一個“輸出標簽”(OutputTag),指定了側輸出流的 id 和類型。