Flink之遲到的數據

遲到數據的處理

  1. 推遲水位線推進: WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
  2. 設置窗口延遲關閉:.allowedLateness(Time.seconds(3))
  3. 使用側流接收遲到的數據: .sideOutputLateData(lateData)
public class Flink12_LateDataCorrect {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Event> ds = env.socketTextStream("hadoop102", 8888).map(line -> {String[] fields = line.split(",");return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim()));}).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2)) // 水位線延遲2秒.withTimestampAssigner((event, ts) -> event.getTs()));ds.print("input");OutputTag<WordCountWithTs> lateOutputTag = new OutputTag<>("late", Types.POJO(WordCountWithTs.class));//new OutputTag<WordCount>("late"){}SingleOutputStreamOperator<UrlViewCount> urlViewCountDs = ds.map(event -> new WordCountWithTs(event.getUrl(), 1 , event.getTs())).keyBy(WordCountWithTs::getWord).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(5))  // 窗口延遲5秒關閉.sideOutputLateData(lateOutputTag) // 捕獲到側輸出流.aggregate(new AggregateFunction<WordCountWithTs, UrlViewCount, UrlViewCount>() {@Overridepublic UrlViewCount createAccumulator() {return new UrlViewCount();}@Overridepublic UrlViewCount add(WordCountWithTs value, UrlViewCount accumulator) {accumulator.setCount((accumulator.getCount() == null ? 0L : accumulator.getCount()) + value.getCount());return accumulator;}@Overridepublic UrlViewCount getResult(UrlViewCount accumulator) {return accumulator;}@Overridepublic UrlViewCount merge(UrlViewCount a, UrlViewCount b) {return null;}},new ProcessWindowFunction<UrlViewCount, UrlViewCount, String, TimeWindow>() {@Overridepublic void process(String key, ProcessWindowFunction<UrlViewCount, UrlViewCount, String, TimeWindow>.Context context, Iterable<UrlViewCount> elements, Collector<UrlViewCount> out) throws Exception {UrlViewCount urlViewCount = elements.iterator().next();//補充urlurlViewCount.setUrl(key);//補充窗口信息urlViewCount.setWindowStart(context.window().getStart());urlViewCount.setWindowEnd(context.window().getEnd());// 寫出out.collect(urlViewCount);}});urlViewCountDs.print("window") ;//TODO 將窗口的計算結果寫出到Mysql的表中, 有則更新,無則插入/*窗口觸發計算輸出的結果,該部分數據寫出到mysql表中執行插入操作,后續遲到的數據,如果窗口進行了延遲, 窗口還能正常對數據進行計算, 該部分數據寫出到mysql執行更新操作。建表語句:CREATE TABLE `url_view_count` (`url` VARCHAR(100) NOT NULL  ,`cnt` BIGINT NOT NULL,`window_start` BIGINT NOT NULL,`window_end` BIGINT NOT NULL,PRIMARY KEY (url, window_start, window_end )  -- 聯合主鍵) ENGINE=INNODB DEFAULT CHARSET=utf8*/SinkFunction<UrlViewCount> jdbcSink = JdbcSink.<UrlViewCount>sink("replace into url_view_count(url, cnt ,window_start ,window_end) value (?,?,?,?)",new JdbcStatementBuilder<UrlViewCount>() {@Overridepublic void accept(PreparedStatement preparedStatement, UrlViewCount urlViewCount) throws SQLException {preparedStatement.setString(1, urlViewCount.getUrl());preparedStatement.setLong(2, urlViewCount.getCount());preparedStatement.setLong(3, urlViewCount.getWindowStart());preparedStatement.setLong(4, urlViewCount.getWindowEnd());}},JdbcExecutionOptions.builder().withBatchSize(2).withMaxRetries(3).withBatchIntervalMs(1000L).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName("com.mysql.cj.jdbc.Driver").withUrl("jdbc:mysql://hadoop102:3306/test").withUsername("root").withPassword("000000").build());urlViewCountDs.addSink(jdbcSink) ;//捕獲側輸出流SideOutputDataStream<WordCountWithTs> lateData = urlViewCountDs.getSideOutput(lateOutputTag);lateData.print("late");//TODO 將側輸出流中的數據,寫出到mysql中的表中,需要對mysql中已經存在的數據進行修正//轉換結構  WordCountWithTs => UrlViewCount//調用flink計算窗口的方式, 基于當前數據的時間計算對應的窗口SingleOutputStreamOperator<UrlViewCount> mapDs = lateData.map(wordCountWithTs -> {Long windowStart = TimeWindow.getWindowStartWithOffset(wordCountWithTs.getTs()/*數據時間*/, 0L/*偏移*/, 10000L/*窗口大小*/);Long windowEnd = windowStart + 10000L;return new UrlViewCount(wordCountWithTs.getWord(), 1L, windowStart, windowEnd);});// 寫出到mysql中SinkFunction<UrlViewCount> lateJdbcSink = JdbcSink.<UrlViewCount>sink("insert into url_view_count (url ,cnt , window_start ,window_end) values(?,?,?,?) on duplicate key update cnt = VALUES(cnt) + cnt  ",new JdbcStatementBuilder<UrlViewCount>() {@Overridepublic void accept(PreparedStatement preparedStatement, UrlViewCount urlViewCount) throws SQLException {preparedStatement.setString(1, urlViewCount.getUrl());preparedStatement.setLong(2, urlViewCount.getCount());preparedStatement.setLong(3, urlViewCount.getWindowStart());preparedStatement.setLong(4, urlViewCount.getWindowEnd());}},JdbcExecutionOptions.builder().withBatchSize(2).withMaxRetries(3).withBatchIntervalMs(1000L).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName("com.mysql.cj.jdbc.Driver").withUrl("jdbc:mysql://hadoop102:3306/test").withUsername("root").withPassword("000000").build());mapDs.addSink(lateJdbcSink) ;try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}

withIdleness關鍵字

解決某條流長時間沒有數據,不能推進水位線,導致下游窗口的窗口無法正常計算。

public class Flink12_WithIdleness {public static void main(String[] args) {//1.創建運行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默認是最大并行度env.setParallelism(1);SingleOutputStreamOperator<Event> ds1 = env.socketTextStream("hadoop102", 8888).map(line -> {String[] words = line.split(" ");return new Event(words[0].trim(), words[1].trim(), Long.valueOf(words[2].trim()));}).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner((event, ts) -> event.getTs())//如果超過10秒鐘不發送數據,就不等待該數據源的水位線.withIdleness(Duration.ofSeconds(10)));ds1.print("input1");SingleOutputStreamOperator<Event> ds2 = env.socketTextStream("hadoop102", 9999).map(line -> {String[] words = line.split(" ");return new Event(words[0].trim(), words[1].trim(), Long.valueOf(words[2].trim()));}).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner((event, ts) -> event.getTs())//如果超過10秒鐘不發送數據,就不等待該數據源的水位線
//                                .withIdleness(Duration.ofSeconds(10)));ds2.print("input2");ds1.union(ds2).map(event->new WordCount(event.getUrl(),1)).keyBy(WordCount::getWord).window(TumblingEventTimeWindows.of(Time.seconds(10))).sum("count").print("window");try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}

基于時間的合流

窗口聯結Window Join

WindowJoin: 在同一個窗口內的相同key的數據才能join成功。

orderDs.join( detailDs ).where( OrderEvent::getOrderId )  // 第一條流用于join的key.equalTo( OrderDetailEvent::getOrderId) // 第二條流用于join的key.window(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new JoinFunction<OrderEvent, OrderDetailEvent, String>() {@Overridepublic String join(OrderEvent first, OrderDetailEvent second) throws Exception {// 處理join成功的數據return  first + " -- " + second ;}}).print("windowJoin");

時間聯結intervalJoin

在這里插入圖片描述

IntervalJoin : 以一條流中數據的時間為基準, 設定上界和下界, 形成一個時間范圍, 另外一條流中相同key的數據如果能落到對應的時間范圍內, 即可join成功。

核心代碼:

 orderDs.keyBy(OrderEvent::getOrderId).intervalJoin(detailDs.keyBy( OrderDetailEvent::getOrderId)).between(Time.seconds(-2) , Time.seconds(2))//.upperBoundExclusive()  排除上邊界值//.lowerBoundExclusive()  排除下邊界值.process(new ProcessJoinFunction<OrderEvent, OrderDetailEvent, String>() {@Overridepublic void processElement(OrderEvent left, OrderDetailEvent right, ProcessJoinFunction<OrderEvent, OrderDetailEvent, String>.Context ctx, Collector<String> out) throws Exception {//處理join成功的數據out.collect( left + " -- " + right );}}).print("IntervalJoin");

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/214202.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/214202.shtml
英文地址,請注明出處:http://en.pswp.cn/news/214202.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

力扣編程題算法初階之雙指針算法+代碼分析

目錄 第一題&#xff1a;復寫零 第二題&#xff1a;快樂數&#xff1a; 第三題&#xff1a;盛水最多的容器 第四題&#xff1a;有效三角形的個數 第一題&#xff1a;復寫零 力扣&#xff08;LeetCode&#xff09;官網 - 全球極客摯愛的技術成長平臺 思路&#xff1a; 上期…

【SpringBoot教程】SpringBoot 統一異常處理(附核心工具類-ErrorInfoBuilder)

作者簡介&#xff1a;大家好&#xff0c;我是擼代碼的羊駝&#xff0c;前阿里巴巴架構師&#xff0c;現某互聯網公司CTO 聯系v&#xff1a;sulny_ann&#xff08;17362204968&#xff09;&#xff0c;加我進群&#xff0c;大家一起學習&#xff0c;一起進步&#xff0c;一起對抗…

曲線分板機主軸有何特點?如何選擇合適的曲線分板機主軸?

在現代工業領域&#xff0c;分板機主軸作為重要的機械部件&#xff0c;其性能和質量對于生產效率和產品質量具有至關重要的影響。而在這其中&#xff0c;曲線分板機主軸則因為其獨特的優勢而被廣泛應用于PCB電路板的切割和分板。面對市場上眾多的曲線分板機主軸品牌&#xff0c…

【深度學習】loss與梯度與交叉熵的關系

問的GPT3.5 模型訓練時loss與梯度的關系&#xff1f; 在深度學習模型訓練過程中&#xff0c;loss&#xff08;損失函數&#xff09;與梯度&#xff08;gradient&#xff09;之間存在密切關系。損失函數衡量模型在給定輸入上的預測輸出與實際輸出之間的差距&#xff0c;而梯度則…

Leetcode 2958. Length of Longest Subarray With at Most K Frequency

Leetcode 2958. Length of Longest Subarray With at Most K Frequency 1. 解題思路2. 代碼實現 題目鏈接&#xff1a;2958. Length of Longest Subarray With at Most K Frequency 1. 解題思路 這一題思路上其實也很簡單&#xff0c;就是一個滑動窗口的思路&#xff0c;遍歷…

前端知識(十三)——JavaScript監聽按鍵,禁止F12,禁止右鍵,禁止保存網頁【Ctrl+s】等操作

禁止右鍵 document.oncontextmenu new Function("event.returnValuefalse;") //禁用右鍵禁止按鍵 // 監聽按鍵 document.onkeydown function () {// f12if (window.event && window.event.keyCode 123) {alert("F12被禁用");event.keyCode 0…

RNN循環神經網絡python實現

import collections import math import re import random import torch from torch import nn from torch.nn import functional as F from d2l import torch as d2ldef read_txt():# 讀取文本數據with open(./A Study in Drowning.txt, r, encodingutf-8) as f:# 讀取每一行l…

軟件測試之缺陷管理

一、軟件缺陷的基本概念 1、軟件缺陷的基本概念主要分為&#xff1a;缺陷、故障、失效這三種。 &#xff08;1&#xff09;缺陷&#xff08;defect&#xff09;&#xff1a;存在于軟件之中的偏差&#xff0c;可被激活&#xff0c;以靜態的形式存在于軟件內部&#xff0c;相當…

【隱馬爾可夫模型】隱馬爾可夫模型的觀測序列概率計算算法及例題詳解

【隱馬爾可夫模型】用前向算法計算觀測序列概率P&#xff08;O&#xff5c;λ&#xff09;??????? 【隱馬爾可夫模型】用后向算法計算觀測序列概率P&#xff08;O&#xff5c;λ&#xff09; 隱馬爾可夫模型是關于時序的概率模型&#xff0c;描述由一個隱藏的馬爾可夫鏈…

Elbie勒索病毒:最新變種.elbie襲擊了您的計算機?

引言&#xff1a; 在數字時代&#xff0c;.Elbie勒索病毒的威脅越發突出&#xff0c;對個人和組織的數據安全構成了巨大挑戰。本文將深入介紹.Elbie勒索病毒的特征&#xff0c;有效的數據恢復方法&#xff0c;以及一系列預防措施&#xff0c;幫助您更好地保護數字資產。當面對…

線性規劃-單純形法推導

這里寫目錄標題 線性規劃例子啤酒廠問題圖解法 單純形法數學推導將問題標準化并轉為矩陣形式開始推導 實例圖解法單純形法 線性規劃例子 啤酒廠問題 每日銷售上限&#xff1a;100箱啤酒營業時間&#xff1a;14小時生產1箱生啤需1小時生產1箱黑啤需2小時生啤售價&#xff1a;2…

從零開發短視頻電商 AWS OpenSearch Service開發環境申請以及Java客戶端介紹

文章目錄 創建域1.創建域2.輸入配置部署選項數據節點網絡精細訪問控制訪問策略 獲取域端點數據如何插入到OpenSearch ServiceJava連接OpenSearch Servicespring-data-opensearchelasticsearch-rest-high-level-clientopensearch-rest-clientopensearch-java 因為是開發測試使用…

[Linux] nginx的location和rewrite

一、Nginx常用的正則表達式 符號作用^匹配輸入字符串的起始位置$ 匹配輸入字符串的結束位置*匹配前面的字符零次或多次。如“ol*”能匹配“o”及“ol”、“oll” 匹配前面的字符一次或多次。如“ol”能匹配“ol”及“oll”、“olll”&#xff0c;但不能匹配“o”?匹配前面的字…

Vue3 setup 頁面跳轉監聽路由變化調整頁面訪問位置

頁面跳轉后頁面還是停留在上一個頁面的位置&#xff0c;沒有回到頂部 解決 1、router中路由守衛中統一添加 router.beforeEach(async (to, from, next) > {window.scrollTo(0, 0);next(); }); 2、頁面中監聽頁面變化 <script setup> import { ref, onMounted, wat…

@Autowired 找不到Bean的問題

排查思路 檢查包掃描&#xff1a;查詢的Bean是否被spring掃描裝配到檢查該Bean上是否配上注解&#xff08;Service/Component/Repository…&#xff09;如果使用第三方&#xff0c;檢查相關依賴是否已經安裝到當前項目 Autowired和Resource的區別 Autowired 是spring提供的注…

圖像清晰度 和像素、分辨率、鏡頭的關系

關于圖像清晰度的幾個知識點分享。 知識點 清晰度 清晰度指影像上各細部影紋及其邊界的清晰程度。清晰度&#xff0c;一般是從錄像機角度出發&#xff0c;通過看重放圖像的清晰程度來比較圖像質量&#xff0c;所以常用清晰度一詞。 而攝像機一般使用分解力一詞來衡量它“分解被…

linux通過命令切換用戶

在Linux中&#xff0c;你可以使用su&#xff08;substitute user或switch user&#xff09;命令來切換用戶。這個命令允許你臨時或永久地以另一個用戶的身份運行命令。以下是基本的用法&#xff1a; 基本切換到另一個用戶&#xff08;需要密碼&#xff09;&#xff1a;su [用戶…

APIFox:打造高效便捷的API管理工具

隨著互聯網技術的不斷發展&#xff0c;API&#xff08;應用程序接口&#xff09;已經成為了企業間數據交互的重要方式。然而&#xff0c;API的管理和維護卻成為了開發者們面臨的一大挑戰。為了解決這一問題&#xff0c;APIFox應運而生&#xff0c;它是一款專為API管理而生的工具…

【力扣100】189.輪轉數組

添加鏈接描述 class Solution:def rotate(self, nums: List[int], k: int) -> None:"""Do not return anything, modify nums in-place instead."""# 思路&#xff1a;三次數組翻轉nlen(nums)kk%nnums[:] nums[-k:] nums[:-k]思路就是&…

數據科學實踐:探索數據驅動的決策

寫在前面 你是否曾經困擾于如何從海量的數據中提取有價值的信息?你是否想過如何利用數據來指導你的決策,讓你的決策更加科學和精確?如果你有這樣的困擾和疑問,那么你來對了地方。這篇文章將引導你走進數據科學的世界,探索數據驅動的決策。 1.數據科學的基本原則 在我們…