背景:
上期文章主要講了Flink項目搭建的一些方法,其中對于數據流的處理很大一部分是通過算子來進行計算和處理的,算子也是Flink中功能非常龐大,且很重要的一部分。
算子介紹:
算子在Flink的開發者文檔中是這樣介紹的:通過算子能將一個或多個 DataStream 轉換成新的 DataStream,在應用程序中可以將多個數據轉換算子合并成一個復雜的數據流拓撲。這簡單總結就有點類似于Flink的一些API,來對數據流進行操作處理。
算子介紹目錄:
主要介紹幾個在日常開發中,比較常用的幾個算子方法:
1.FlatMap
2.Filter
3.Window
4.join
5.coGroup
1.FlatMap
flatMap是輸入一個元素同時產生零個、一個或多個元素。通常在日常開發中用于對于數據流的初步處理和合并,將數據流轉換成我們希望輸入的數據格式
方法舉例:
dataStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out)throws Exception {for(String word: value.split(" ")){out.collect(word);}}
});
日常使用舉例:
/// 將binglog獲取的dataChangInfo格式轉換成OrderInfo業務格式
dataStream1.flatMap(new FlatMapFunction<DataChangeInfo, OrderInfo>() {@Overridepublic void flatMap(DataChangeInfo dataChangeInfo, Collector<OrderInfo> collector) throws Exception {OrderInfo orderInfo = JSONObject.parseObject(dataChangeInfo.getAfterData(), OrderInfo.class);log.info("訂單數據:{}", orderInfo);collector.collect(orderInfo);}
});
2.Filter
對數據流進行過濾操作,將一些臟數據或者我們不希望流入的數據進行排除處理
使用舉例:
/// 篩選出訂單狀態小于等于40的訂單數據
orderInfoSingleOutputStream.filter(new FilterFunction<OrderInfo>() {@Overridepublic boolean filter(OrderInfo orderInfo) throws Exception {if (orderInfo.getStatus() <= 40){return true;}return false;}
});
3.Window
Window 根據某些特征(例如,最近 5 秒內到達的數據)對每個 key Stream 中的數據進行分組。就類似于上期文章所講述的窗口,具體介紹可以查看上期文章「Flink」Flink項目搭建方法介紹;
/// 先通過keyby設置主鍵
/// 然后設置一個以事件時間為標定,設一個5秒的窗口
dataStream.keyBy(value -> value.f0).window(TumblingEventTimeWindows.of(Duration.ofSeconds(5)));
4.Join
根據指定的 key 和窗口 join 兩個數據流。
這個方法通常用在兩個數據流需要通過某個key值進行合并的時候,比如訂單主表和訂單副表需要通過orderId進行數據合并的時候,進行數據處理。
方法舉例:
dataStream.join(otherStream).where(<key selector>).equalTo(<key selector>).window(TumblingEventTimeWindows.of(Duration.ofSeconds(3))).apply (new JoinFunction () {...});
日常使用舉例:
DataStream<OrderOutputInfo> outputInfoDataStream = orderInfoSingleOutputStream.join(orderCodeInfoSingleOutputStream).where(OrderInfo::getId).equalTo(OrderCodeInfo::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).apply(new JoinFunction<OrderInfo, OrderCodeInfo, OrderOutputInfo>() {@Overridepublic OrderOutputInfo join(OrderInfo orderInfo, OrderCodeInfo orderCodeInfo) throws Exception {OrderOutputInfo orderOutputInfo = new OrderOutputInfo();orderOutputInfo.setId(orderInfo.getId());orderOutputInfo.setStatus(orderInfo.getStatus());orderOutputInfo.setCode(orderCodeInfo.getCode());orderOutputInfo.setCreate_time(orderInfo.getCreate_time());log.info("輸出數據:{}", orderOutputInfo);return orderOutputInfo;}});
通過斷點,其實可以發現,數據并不是按照一批一批進行輸出的,而是根據key,進行一條一條的輸出的,這個需要注意寫入庫的方法,以免對數據庫寫入產生較大的壓力。
然后該方法會發現一個弊端,那就是如果不在事件窗口期輸入的,那么無法匹配到對應的數據行,那么就會出現數據無法輸出,數據丟失的情況,使用outside,官方推薦的側輸出,也無法有效輸出,這時候比較推薦下面這個方法Cogroup,可以通過自定義的方法進行對未匹配的數據進行輸出報錯;
5.CoGroup
根據指定的 key 和窗口將兩個數據流組合在一起。
CoGroup和Join是個類似的方法,但是CoGroup的數據處理方法里面可以有迭代器,然后在實際數據處理過程中可以通過判斷迭代器,從而實現對于未匹配成功的訂單進行打印輸出。
方法舉例:
dataStream.coGroup(otherStream).where(0).equalTo(1).window(TumblingEventTimeWindows.of(Duration.ofSeconds(3))).apply (new CoGroupFunction () {...});
日常使用舉例:
orderInfoSingleOutputStream.coGroup(orderCodeInfoSingleOutputStream).where(OrderInfo::getId).equalTo(OrderCodeInfo::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).apply(new CoGroupFunction<OrderInfo, OrderCodeInfo, OrderOutputInfo>() {@Overridepublic void coGroup(Iterable<OrderInfo> iterable, Iterable<OrderCodeInfo> iterable1, Collector<OrderOutputInfo> collector) throws Exception {if(iterable.iterator().hasNext() && iterable1.iterator().hasNext()){OrderInfo orderInfo = iterable.iterator().next();OrderCodeInfo orderCodeInfo = iterable1.iterator().next();System.out.println("匹配成功的訂單ID:" + orderInfo.getId() + " 訂單創建時間:" + orderInfo.getCreate_time() + " status " + orderInfo.getStatus());System.out.println("=============================");OrderOutputInfo orderOutputInfo = new OrderOutputInfo();orderOutputInfo.setId(orderInfo.getId());orderOutputInfo.setStatus(orderInfo.getStatus());orderOutputInfo.setCode(orderCodeInfo.getCode());orderOutputInfo.setCreate_time(orderInfo.getCreate_time());collector.collect(orderOutputInfo);}else if(iterable.iterator().hasNext() && !iterable1.iterator().hasNext()){OrderInfo order = iterable.iterator().next();System.out.println("訂單未找到匹配的訂單-----------Code:"+ order.getId());} else if(!iterable.iterator().hasNext() && iterable1.iterator().hasNext()){OrderCodeInfo orderCodeInfo = iterable1.iterator().next();System.out.println("未找到匹配的Code訂單-----------Code:" + orderCodeInfo.getId() );}}});
數據輸出日志:
可以看到數據也是一條條匹配后輸出,無法匹配的數據也會在窗口結束后進行輸出展示或告警。
總結:
以上幾個算子方法就是平時日常開發中比較常用且好用的方法,大家可以結合各自的業務場景,進行挑選使用。
相關鏈接
Flink
Flink開發者文檔