遲到數據的處理
- 推遲水位線推進:
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
- 設置窗口延遲關閉:
.allowedLateness(Time.seconds(3))
- 使用側流接收遲到的數據:
.sideOutputLateData(lateData)
public class Flink12_LateDataCorrect {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Event> ds = env.socketTextStream("hadoop102", 8888).map(line -> {String[] fields = line.split(",");return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim()));}).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2)) // 水位線延遲2秒.withTimestampAssigner((event, ts) -> event.getTs()));ds.print("input");OutputTag<WordCountWithTs> lateOutputTag = new OutputTag<>("late", Types.POJO(WordCountWithTs.class));//new OutputTag<WordCount>("late"){}SingleOutputStreamOperator<UrlViewCount> urlViewCountDs = ds.map(event -> new WordCountWithTs(event.getUrl(), 1 , event.getTs())).keyBy(WordCountWithTs::getWord).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(5)) // 窗口延遲5秒關閉.sideOutputLateData(lateOutputTag) // 捕獲到側輸出流.aggregate(new AggregateFunction<WordCountWithTs, UrlViewCount, UrlViewCount>() {@Overridepublic UrlViewCount createAccumulator() {return new UrlViewCount();}@Overridepublic UrlViewCount add(WordCountWithTs value, UrlViewCount accumulator) {accumulator.setCount((accumulator.getCount() == null ? 0L : accumulator.getCount()) + value.getCount());return accumulator;}@Overridepublic UrlViewCount getResult(UrlViewCount accumulator) {return accumulator;}@Overridepublic UrlViewCount merge(UrlViewCount a, UrlViewCount b) {return null;}},new ProcessWindowFunction<UrlViewCount, UrlViewCount, String, TimeWindow>() {@Overridepublic void process(String key, ProcessWindowFunction<UrlViewCount, UrlViewCount, String, TimeWindow>.Context context, Iterable<UrlViewCount> elements, Collector<UrlViewCount> out) throws Exception {UrlViewCount urlViewCount = elements.iterator().next();//補充urlurlViewCount.setUrl(key);//補充窗口信息urlViewCount.setWindowStart(context.window().getStart());urlViewCount.setWindowEnd(context.window().getEnd());// 寫出out.collect(urlViewCount);}});urlViewCountDs.print("window") ;//TODO 將窗口的計算結果寫出到Mysql的表中, 有則更新,無則插入/*窗口觸發計算輸出的結果,該部分數據寫出到mysql表中執行插入操作,后續遲到的數據,如果窗口進行了延遲, 窗口還能正常對數據進行計算, 該部分數據寫出到mysql執行更新操作。建表語句:CREATE TABLE `url_view_count` (`url` VARCHAR(100) NOT NULL ,`cnt` BIGINT NOT NULL,`window_start` BIGINT NOT NULL,`window_end` BIGINT NOT NULL,PRIMARY KEY (url, window_start, window_end ) -- 聯合主鍵) ENGINE=INNODB DEFAULT CHARSET=utf8*/SinkFunction<UrlViewCount> jdbcSink = JdbcSink.<UrlViewCount>sink("replace into url_view_count(url, cnt ,window_start ,window_end) value (?,?,?,?)",new JdbcStatementBuilder<UrlViewCount>() {@Overridepublic void accept(PreparedStatement preparedStatement, UrlViewCount urlViewCount) throws SQLException {preparedStatement.setString(1, urlViewCount.getUrl());preparedStatement.setLong(2, urlViewCount.getCount());preparedStatement.setLong(3, urlViewCount.getWindowStart());preparedStatement.setLong(4, urlViewCount.getWindowEnd());}},JdbcExecutionOptions.builder().withBatchSize(2).withMaxRetries(3).withBatchIntervalMs(1000L).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName("com.mysql.cj.jdbc.Driver").withUrl("jdbc:mysql://hadoop102:3306/test").withUsername("root").withPassword("000000").build());urlViewCountDs.addSink(jdbcSink) ;//捕獲側輸出流SideOutputDataStream<WordCountWithTs> lateData = urlViewCountDs.getSideOutput(lateOutputTag);lateData.print("late");//TODO 將側輸出流中的數據,寫出到mysql中的表中,需要對mysql中已經存在的數據進行修正//轉換結構 WordCountWithTs => UrlViewCount//調用flink計算窗口的方式, 基于當前數據的時間計算對應的窗口SingleOutputStreamOperator<UrlViewCount> mapDs = lateData.map(wordCountWithTs -> {Long windowStart = TimeWindow.getWindowStartWithOffset(wordCountWithTs.getTs()/*數據時間*/, 0L/*偏移*/, 10000L/*窗口大小*/);Long windowEnd = windowStart + 10000L;return new UrlViewCount(wordCountWithTs.getWord(), 1L, windowStart, windowEnd);});// 寫出到mysql中SinkFunction<UrlViewCount> lateJdbcSink = JdbcSink.<UrlViewCount>sink("insert into url_view_count (url ,cnt , window_start ,window_end) values(?,?,?,?) on duplicate key update cnt = VALUES(cnt) + cnt ",new JdbcStatementBuilder<UrlViewCount>() {@Overridepublic void accept(PreparedStatement preparedStatement, UrlViewCount urlViewCount) throws SQLException {preparedStatement.setString(1, urlViewCount.getUrl());preparedStatement.setLong(2, urlViewCount.getCount());preparedStatement.setLong(3, urlViewCount.getWindowStart());preparedStatement.setLong(4, urlViewCount.getWindowEnd());}},JdbcExecutionOptions.builder().withBatchSize(2).withMaxRetries(3).withBatchIntervalMs(1000L).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName("com.mysql.cj.jdbc.Driver").withUrl("jdbc:mysql://hadoop102:3306/test").withUsername("root").withPassword("000000").build());mapDs.addSink(lateJdbcSink) ;try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}
withIdleness關鍵字
解決某條流長時間沒有數據,不能推進水位線,導致下游窗口的窗口無法正常計算。
public class Flink12_WithIdleness {public static void main(String[] args) {//1.創建運行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默認是最大并行度env.setParallelism(1);SingleOutputStreamOperator<Event> ds1 = env.socketTextStream("hadoop102", 8888).map(line -> {String[] words = line.split(" ");return new Event(words[0].trim(), words[1].trim(), Long.valueOf(words[2].trim()));}).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner((event, ts) -> event.getTs())//如果超過10秒鐘不發送數據,就不等待該數據源的水位線.withIdleness(Duration.ofSeconds(10)));ds1.print("input1");SingleOutputStreamOperator<Event> ds2 = env.socketTextStream("hadoop102", 9999).map(line -> {String[] words = line.split(" ");return new Event(words[0].trim(), words[1].trim(), Long.valueOf(words[2].trim()));}).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner((event, ts) -> event.getTs())//如果超過10秒鐘不發送數據,就不等待該數據源的水位線
// .withIdleness(Duration.ofSeconds(10)));ds2.print("input2");ds1.union(ds2).map(event->new WordCount(event.getUrl(),1)).keyBy(WordCount::getWord).window(TumblingEventTimeWindows.of(Time.seconds(10))).sum("count").print("window");try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}
基于時間的合流
窗口聯結Window Join
WindowJoin: 在同一個窗口內的相同key的數據才能join成功。
orderDs.join( detailDs ).where( OrderEvent::getOrderId ) // 第一條流用于join的key.equalTo( OrderDetailEvent::getOrderId) // 第二條流用于join的key.window(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new JoinFunction<OrderEvent, OrderDetailEvent, String>() {@Overridepublic String join(OrderEvent first, OrderDetailEvent second) throws Exception {// 處理join成功的數據return first + " -- " + second ;}}).print("windowJoin");
時間聯結intervalJoin
IntervalJoin : 以一條流中數據的時間為基準, 設定上界和下界, 形成一個時間范圍, 另外一條流中相同key的數據如果能落到對應的時間范圍內, 即可join成功。
核心代碼:
orderDs.keyBy(OrderEvent::getOrderId).intervalJoin(detailDs.keyBy( OrderDetailEvent::getOrderId)).between(Time.seconds(-2) , Time.seconds(2))//.upperBoundExclusive() 排除上邊界值//.lowerBoundExclusive() 排除下邊界值.process(new ProcessJoinFunction<OrderEvent, OrderDetailEvent, String>() {@Overridepublic void processElement(OrderEvent left, OrderDetailEvent right, ProcessJoinFunction<OrderEvent, OrderDetailEvent, String>.Context ctx, Collector<String> out) throws Exception {//處理join成功的數據out.collect( left + " -- " + right );}}).print("IntervalJoin");