1.map
特性:接收一個數據,經過處理之后,就返回一個數據
1.1. 源碼分析
- 我們來看看map的源碼
map需要接收一個MapFunction<T,R>的對象,其中泛型T表示傳入的數據類型,R表示經過處理之后輸出的數據類型 - 我們繼續往下點,看看MapFunction<T,R>的源碼
這是一個接口,那么在代碼中,我們就需要實現這個接口
1.2. 案例
那么我們現在要實現一個功能,就是從給一個文件中讀取數據,返回每一行的字符串長度。
我們要讀取的文件內容如下
代碼貼在這里(為了讓打擊不看迷糊,導包什么的我就省略了)
public class TransformTest1_Base {public static void main(String[] args) throws Exception {// 1. 獲取執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 將并行度設為1env.setParallelism(1);// 3. 讀取文件夾DataStreamSource<String> inputDataStream = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\FlinkTutorial\\src\\main\\resources\\sensor");// 4. 將文件夾每一行的數據都返回它的長度// 在這里我們用匿名內部類的方式創建了一個MapFunction對象SingleOutputStreamOperator<Integer> dataStream = inputDataStream.map(new MapFunction<String, Integer>() {// 5. 重寫map方法,參數s是接收到的一個數據,我們只需要返回它的長度就行了。@Overridepublic Integer map(String s) throws Exception {return s.length();}});// 6. 打印輸出dataStream.print();// 7. 啟動執行環境env.execute();}
}
顯示
1.3. 總結
map的使用范圍就是需要對的那個數據進行處理,并且每次返回一個數據的時候,map就比較方便了。
2. flatMap
- 接收一個數據,可以返回多條數據
2.1. 源碼分析
我們發現,它需要傳入一個FlatMapFunction的一個對象
我們繼續點進去,看看FlatMapFunction的源碼,可以發現,FlatMapFunction<T,R>也是一個接口,并且接口里面的方法的返回值是一個Collector,也就是多個值的集合。
2.2. 案例
我們還是讀取那個文件,這次我們要做的處理是,將文件的每一行數據按照逗號隔開,給出代碼:
public class TransformTest2_Base {public static void main(String[] args) throws Exception {// 1. 獲取執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 設置并行度env.setParallelism(1);// 3. 讀取文件夾DataStreamSource<String> dataStream = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\FlinkTutorial\\src\\main\\resources\\sensor");// 4. 用匿名內部類的方式重寫FlatMapFuncction,將每行字符按","隔開SingleOutputStreamOperator<String> flatMapStream = dataStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String s, Collector<String> collector) throws Exception {// 5. 分割一行字符,獲得對應的字符串數組String[] split = s.split(",");for (String slt : split) {// 6. 將這些數據返回collector.collect(slt);}}});// 7. 打印輸出處理后的數據flatMapStream.print();// 8. 啟動執行環境env.execute();}
}
可以看到執行的結果
3. filter
聽這個名字就知道是個過濾器,用來過濾數據。
3.1. 源碼分析
我們看看filer的源碼,繼承子FilterFunction,可以看到,這次泛型就只有一個值了,因為filter只允許返回的數據<=原來的數據,所以只做過濾,并不能改變數據蕾西,沒必要設置返回的類型
我們繼續點進去,看看FilterFunction的源碼
果不其然,也是一個接口,而里面的filter方法只有一個參數,并且返回的是一個boolean類型,若返回true則var1原樣返回,若返回false,則var1會被過濾掉。
3.2. 案例
我們還是讀取以上文件,這一次我們返回以"sensor_1"開頭的字符串,其余的一律不返回,給出代碼
public class TransformTest3_Base {public static void main(String[] args) throws Exception {// 1. 獲取執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 設置并行度env.setParallelism(1);// 3. 讀取文件DataStreamSource<String> dataStream = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\FlinkTutorial\\src\\main\\resources\\sensor");// 4. 用匿名內部類的方式重寫FilterFunctionSingleOutputStreamOperator<String> filterDataStream = dataStream.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String s) throws Exception {// 5. 若s以"sensor_1"開頭,則返回truereturn s.startsWith("\"sensor_1\"");}});// 6. 打印處理后的數據filterDataStream.print();// 7. 啟動執行環境env.execute();}
}
4. 分組聚合
- 注意:任何的聚合操作都有默認的分組,聚合是在分組的基礎上進行的。比如,對整體進行求和,那么分組就是整體。所以,在做聚合操作之前,一定要明確是在哪個分組上進行聚合操作
- 注意:聚合操作,本質上是一個多對一(一對一是多對一的特殊情況)的操作。特別注意的是這個’一‘,可以是一個值(mean, sum等),同樣也可以是一個對象(list, set等對象)
4.1. 分組(keyBy)
DataStream → KeyedStream:邏輯地將一個流拆分成不相交的分區,每個分區包含具有相同 key 的元素,在內部以 hash 的形式實現的。
- 分組就是為了聚合操作做準備的,keyBy方法會將數據流按照hash實現,分別放在不同的分區,每個分區都可以進行聚合操作。
- 我們可以用這個性質,計算每一個sensor溫度的最大值,我們為此將文件修改:
分組之后的圖就是所有sensor_1在一個分區里,sensor_6,sensor_7,sensor_10在不同的三個分區,也就是有四個分區,而后三個分區中只有一條數據,所以最大值和最小值都只有一個 - 在flink中,分組操作是由keyBy方法來完成的,我們來看看keyBy的源碼
可以發現,keyBy可以對對象和元組進行聚合。
4.2. 聚合
這些算子可以針對 KeyedStream 的每一個支流做聚合。
? sum():對每個支流求和
? min():對每個支流求最小值
? max():對每個支流求最大值
? minBy()
? maxBy()
我們來看看max()的源碼
這也是傳一個屬性名,也就是求對應的屬性名的最大值。
4.3. 實例演示
public class TransformTest1_RollingAggreation {public static void main(String[] args) throws Exception {// 1. 獲取執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 設置并行度env.setParallelism(1);// 3. 讀取文件DataStreamSource<String> stringDataStreamSource = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\FlinkTutorial\\src\\main\\resources\\sensor");// 4. 用map將每行數據變成一個對象SingleOutputStreamOperator<SensorReading> map = stringDataStreamSource.map(new MapFunction<String, SensorReading>() {@Overridepublic SensorReading map(String s) throws Exception {String[] split = s.split(",");return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));}});// 5. 分組操作,以id屬性分組KeyedStream<SensorReading, Tuple> keyedstream = map.keyBy("id");// 6. 聚合操作,求每個分組的溫度最大值SingleOutputStreamOperator<SensorReading> resultStream = keyedstream.max("temperature");// 7. 打印輸出resultStream.print();// 8. 啟動執行環境env.execute();}
}
運行結果
誒,這有人就要問了,不是求每一個分組的溫度最大值么?為什么sensor_1的這個分組所有的數據都有?
答:flink是一個流處理分布式框架,這是一條數據流,每來一個數據就得處理一次,所以輸出的都是當前狀態下的最大值。
4.4. reduce自定義聚合
在實際生產中,不可能讓我們完成這么簡單的操作就行了,所以我們需要更復雜的操作,而reduce就是滿足這個條件,它可以讓我們自定義聚合的方式。
- 我們來看看reduce的源碼
reduce需要傳入的是一個ReduceFunction的對象,我們再來看看ReduceFunction是個什么東西
var1是當前這個分組的狀態,var2是新加入的值,而reduce函數體就是我們要進行的操作,返回一個新的狀態。
到這我就明白了,要是我們向實時獲取最大溫度的話,var1是之前的最大溫度,通過var1和var2的比較就能實現。
4.5. reduce實例
我們這一次要實現一個實時的溫度最大值,也就是返回的數據中的時間戳是當前的。
public class TransformTest1_Reduce {public static void main(String[] args) throws Exception {// 1. 獲取執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 設置并行度env.setParallelism(1);// 3. 讀取文件DataStreamSource<String> dataStream = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\FlinkTutorial\\src\\main\\resources\\sensor");// 4. 通過map將每行數據轉換為一個對象SingleOutputStreamOperator<SensorReading> map = dataStream.map(new MapFunction<String, SensorReading>() {@Overridepublic SensorReading map(String s) throws Exception {String[] split = s.split(",");return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));}});// 5. 按對象的id分組KeyedStream<SensorReading, Tuple> keyStream = map.keyBy("id");// 6. reduce自定義聚合SingleOutputStreamOperator<SensorReading> reduce = keyStream.reduce(new ReduceFunction<SensorReading>() {@Overridepublic SensorReading reduce(SensorReading sensorReading, SensorReading t1) throws Exception {// 7. 獲取當前時間為止接收到的最大溫度return new SensorReading(sensorReading.getId(), System.currentTimeMillis(), Math.max(sensorReading.getTemperature(),t1.getTemperature()));}});// 8. 打印輸出reduce.print();// 9. 啟動運行環境env.execute();}
}
這一次的輸出我們就得你好好研究一下了。
從這塊可以發現,我們獲取的都是當前的時間戳,而且時間戳也在改變,這一點很好理解,但是下面這個數據就很詭異了。
- 這兩塊的時間戳為什么沒有改變呢?這需要我們再來看看reduce方法了,reduce方法是傳入兩個參數,第一個是當前的狀態,第二個是新讀取的值,通過方法體的操作返回一個最新的狀態。
- 仔細理解一下這句話,若我剛開始沒有數據的時候,那么哪來的狀態呢?所以reduce把接收到的第一個參數作為狀態,其中sensor_6,7,8這三個分區只有一個數據,所以直接拿來當作狀態。
5. 多流轉換算子
5.1. 分流操作(Split 和 Select)
- Split能將流中的數據按條件貼上標簽,比如我把溫度大于30度的對象貼上一個high標簽,把溫度低于30度的貼上一個low標簽,標簽可以貼多個。那么就把流中的數據,按照標簽分類了(這里并沒有分流)
- Select是按照標簽來分流
- split源碼
可以發現,返回的是一個SplitStream,需要傳入一個選擇器,我們看看OutputSeclector的源碼
傳入value,返回這個value對應的標簽,實現對這個value進行類似"分類"的操作。 - select源碼
只需要接收一個或者多個標簽就能返回包含那個標簽對象的數據流。
5.2. 實例演示
- 我們這一次要把讀取到的數據分成三條流,一條是high(高于30度),一條是low(低于30度),一條是all(所有的數據)。代碼:
public class TransformTest4_MultipleStreams {public static void main(String[] args) throws Exception {// 1. 獲取執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 設置并行度env.setParallelism(1);// 3. 讀取文件DataStreamSource<String> dataStream = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\FlinkTutorial\\src\\main\\resources\\sensor");// 4. 通過map將每行數據轉換為一個對象SingleOutputStreamOperator<SensorReading> map = dataStream.map(new MapFunction<String, SensorReading>() {@Overridepublic SensorReading map(String s) throws Exception {String[] split = s.split(",");return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));}});// 5. 按條件貼標簽SplitStream<SensorReading> split = map.split(new OutputSelector<SensorReading>() {@Overridepublic Iterable<String> select(SensorReading value) {return value.getTemperature() > 30 ? Collections.singletonList("high") : Collections.singletonList("low");}});// 6. 按標簽選擇,生成不同的數據流DataStream<SensorReading> high = split.select("high");DataStream<SensorReading> low = split.select("low");DataStream<SensorReading> all = split.select("high", "low");high.print("high");low.print("low");all.print("all");env.execute();}
}
5.3. 合流操作Connect 和 CoMap
DataStream,DataStream → ConnectedStreams:連接兩個保持他們類型的數
據流,兩個數據流被 Connect 之后,只是被放在了一個同一個流中,內部依然保持各自的數據和形式不發生任何變化,兩個流相互獨立。
ConnectedStreams → DataStream:作用于 ConnectedStreams 上,功能與 map和 flatMap 一樣,對 ConnectedStreams 中的每一個 Stream 分別進行 map 和 flatMap處理。
類似于一國兩制,看似兩條流合并在了一起,其實內部依舊是按照自己的約定運行,類型并沒有改變。
- connect源碼
將當前調用者的流和參數中的流合并,返回一個ConnectedStreams<T,R>類型
我們再來看看ConnectionStreams<T,R>中的map方法,其中要傳的是一個CoMapFunction<IN1,IN2,R>的對象,最重要的就是這個類,我們來看看這個類
這個CoMapFunction<IN1,IN2,R>和之前的MapFunction不太一樣,這里要重寫的方法有兩個,map1和map2,一個是針對IN1的,一個是針對IN2的,R就是返回類型。
這下全明白了,在這個方法內部,對這兩條流分別操作,合成一條流。
5.4. 實例演示
public class TransformTest5_MultipleStreams {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 1. 讀取文件DataStreamSource<String> dataStreamSource = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\FlinkTutorial\\src\\main\\resources\\sensor");// 2. 將每行數據變成一個對象SingleOutputStreamOperator<SensorReading> map = dataStreamSource.map(new MapFunction<String, SensorReading>() {@Overridepublic SensorReading map(String s) throws Exception {String[] split = s.split(",");return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));}});// 3. 將數據打上標簽SplitStream<SensorReading> split = map.split(new OutputSelector<SensorReading>() {@Overridepublic Iterable<String> select(SensorReading value) {return value.getTemperature() > 30 ? Collections.singletonList("high") : Collections.singletonList("low");}});// 4. 按照高溫和低溫的標簽分成兩條流DataStream<SensorReading> high = split.select("high");DataStream<SensorReading> low = split.select("low");// 5. 將high流的數據轉換為二元組SingleOutputStreamOperator<Tuple2<String, Double>> tuple2SingleOutputStreamOperator = high.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {@Overridepublic Tuple2<String, Double> map(SensorReading sensorReading) throws Exception {return new Tuple2<>(sensorReading.getId(), sensorReading.getTemperature());}});// 6. 將tuple2SingleOutputStreamOperator和low連接ConnectedStreams<Tuple2<String, Double>, SensorReading> connect = tuple2SingleOutputStreamOperator.connect(low);// 7. 調用map傳參CoMapFunction將兩條流合并成一條流objectSingleOutputStreamOperatorSingleOutputStreamOperator<Object> objectSingleOutputStreamOperator = connect.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {// 這是處理high流的方法@Overridepublic Object map1(Tuple2<String, Double> value) throws Exception {return new Tuple3<>(value.getField(0), value.getField(1), "temp is too high");}// 這是處理low流的方法@Overridepublic Object map2(SensorReading value) throws Exception {return new Tuple2<>(value.getTemperature(), "normal");}});objectSingleOutputStreamOperator.print();env.execute();}
}
5.5. 多條流合并(union)
之前我們只能合并兩條流,那我們要合并多條流呢?這里我們就需要用到union方法。
- Connect 與 Union 區別:
- Union 之前兩個流的類型必須是一樣,Connect 可以不一樣,在之后的 coMap中再去調整成為一樣的。
- Connect 只能操作兩個流,Union 可以操作多個。
若我們給出以下代碼:
high.union(low,all);
那么high,low,all三條流都會合并在一起。