map
map是大家非常熟悉的大數據操作算子,主要用于將數據流中的數據進行轉換,形成新的數據流。簡單來說,就是一個“一一映射”,消費一個元素就產出一個元素。
我們只需要基于DataStream調用map()方法就可以進行轉換處理。方法需要傳入的參數是接口MapFunction的實現;返回值類型還是DataStream,不過泛型(流中的元素類型)可能改變。
public class TransMap {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1, 1),new WaterSensor("sensor_2", 2, 2));// 方式一:傳入匿名類,實現MapFunctionstream.map(new MapFunction<WaterSensor, String>() {@Overridepublic String map(WaterSensor e) throws Exception {return e.id;}}).print();// 方式二:傳入MapFunction的實現類// stream.map(new UserMap()).print();env.execute();}public static class UserMap implements MapFunction<WaterSensor, String> {@Overridepublic String map(WaterSensor e) throws Exception {return e.id;}}
}
面代碼中,MapFunction實現類的泛型類型,與輸入數據類型和輸出數據的類型有關。在實現MapFunction接口的時候,需要指定兩個泛型,分別是輸入事件和輸出事件的類型,還需要重寫一個map()方法,定義從一個輸入事件轉換為另一個輸出事件的具體邏輯。
Filter
filter轉換操作,顧名思義是對數據流執行一個過濾,通過一個布爾條件表達式設置過濾條件,對于每一個流內元素進行判斷,若為true則元素正常輸出,若為false則元素被過濾掉。
進行filter轉換之后的新數據流的數據類型與原數據流是相同的。filter轉換需要傳入的參數需要實現FilterFunction接口,而FilterFunction內要實現filter()方法,就相當于一個返回布爾類型的條件表達式。
public class TransFilter {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1, 1),
new WaterSensor("sensor_1", 2, 2),
new WaterSensor("sensor_2", 2, 2),
new WaterSensor("sensor_3", 3, 3));// 方式一:傳入匿名類實現FilterFunctionstream.filter(new FilterFunction<WaterSensor>() {@Overridepublic boolean filter(WaterSensor e) throws Exception {return e.id.equals("sensor_1");}}).print();// 方式二:傳入FilterFunction實現類// stream.filter(new UserFilter()).print();env.execute();}public static class UserFilter implements FilterFunction<WaterSensor> {@Overridepublic boolean filter(WaterSensor e) throws Exception {return e.id.equals("sensor_1");}}
}
FlatMap
flatMap操作又稱為扁平映射,主要是將數據流中的整體(一般是集合類型)拆分成一個一個的個體使用。消費一個元素,可以產生0到多個元素。flatMap可以認為是“扁平化”(flatten)和“映射”(map)兩步操作的結合,也就是先按照某種規則對數據進行打散拆分,同map一樣,flatMap也可以使用Lambda表達式或者FlatMapFunction接口實現類的方式來進行傳參,返回值類型取決于所傳參數的具體邏輯,可以與原數據流相同,也可以不同。
案例需求:如果輸入的數據是sensor_1,只打印vc;如果輸入的數據是sensor_2,既打印ts又打印vc。
public class TransFlatmap {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1, 1),
new WaterSensor("sensor_1", 2, 2),
new WaterSensor("sensor_2", 2, 2),
new WaterSensor("sensor_3", 3, 3));stream.flatMap(new MyFlatMap()).print();env.execute();}/*** TODO flatmap: 一進多出(包含0出)* 對于s1的數據,一進一出* 對于s2的數據,一進2出* 對于s3的數據,一進0出(類似于過濾的效果)** map怎么控制一進一出:* =》 使用 return** flatmap怎么控制的一進多出* =》 通過 Collector來輸出, 調用幾次就輸出幾條***/public static class MyFlatMap implements FlatMapFunction<WaterSensor, String> {@Overridepublic void flatMap(WaterSensor value, Collector<String> out) throws Exception {if (value.id.equals("sensor_1")) {out.collect(String.valueOf(value.vc));} else if (value.id.equals("sensor_2")) {out.collect(String.valueOf(value.ts));out.collect(String.valueOf(value.vc));}