flink的窗口

目錄

窗口分類

1.按照驅動類型分類

1. 時間窗口(Time window)

2.計數窗口(Count window)

2.按照窗口分配數據的規則分類

窗口API分類

API調用

窗口分配器器:

窗口函數

增量聚合函數:

全窗口函數

flink sql 窗口函數

窗口 | Apache Flink

窗口分類

1.按照驅動類型分類

1. 時間窗口(Time window)

? ? 時間窗口以時間點定義窗口的開始和結束,因此截取出就是某一段時間的數據。當到達結束時間時窗口不在接受數據,觸發計算輸出結果,并關閉銷毀窗口。

flink有一個專門的類用來表示時間窗口TimeWindow,這個類只有兩個私有屬性;窗口的方法獲取最大時間戳為end-1,因此窗口[start,end)? 左開右閉;

@PublicEvolving
public class TimeWindow extends Window {private final long start;private final long end;public TimeWindow(long start, long end) {this.start = start;this.end = end;}@Overridepublic long maxTimestamp() {return end - 1;}
2.計數窗口(Count window)

計數窗口是基于元素個數截取,在到達固定個數是就觸發計算并關閉窗口。

3.全局窗口(Global Windows)

是計數窗口的底層實現,窗口分配器由GlobalWindows類提供,需要自定義觸發器實現窗口的計算;

 stream.keyBy(data -> true).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2)))
//                .max().aggregate(new AvgPv()).print();查看源代碼,windou函數后見windowStrream時獲取默認的觸發器
@PublicEvolvingpublic WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {this.input = input;this.builder =new WindowOperatorBuilder<>(windowAssigner,windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()),  //湖區觸發器input.getExecutionConfig(),input.getType(),input.getKeySelector(),input.getKeyType());}// 計數窗口底層采用全局窗口加計數器來實現public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));}public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {return window(GlobalWindows.create()).evictor(CountEvictor.of(size)).trigger(CountTrigger.of(slide));}

2.按照窗口分配數據的規則分類

滾動窗口(Tumbling Window):窗口大小固定,窗口沒有重疊;

滑動窗口 (Sliding Window):滑動窗口有重疊,也可以沒有重疊,如果窗口size和滑動size相等,等于滾動窗口;

會話窗口 (Session Window):基于會話對窗口進行分組,與其他兩個不同的是,會話窗口是借用會話窗口的超時失效機觸發窗口計算,當數據到來后會開啟一個窗口,如果在超時時間內有數據陸續到來,窗口不會關閉,反之會關閉;極端情況,如果數據總能在窗口超時時間到達前遠遠不斷的到來,該窗口會一直開啟不會關閉;

全局窗口 (Global Window):比較通用的窗口,該窗口會把數據分配到一個窗口中,窗口為全局有效,會把相同key的數據分配到同一個窗口中,默認不會觸發計算,跟沒有窗口一樣,需要自定義觸發器才能使用;

窗口API分類

窗口大的分類可以分為按鍵分區和非按鍵分區兩種:按鍵分需要經過keyby操作,會把數據進行分發,實現負載均分,可以并行處理更大的數據量。而非按鍵分區窗口,相當于并行度為1,使用上直接調用windowall(),因此一般并不推薦使用;

stream
.keyby(...)  //流按鍵分區
.window(...)  //定義窗口分配器
[.trigger()] //設置出發器
[.evictor()]   //設置移除器
[.allowedLateness()]  // 設置延遲時間
[.sideOutputLateData()]  //設置側輸出流
.reduce/aggregate/fold/apply()  //處理函數
[.getSideOutput()] //獲取側輸出流stream
.windowAll(...)  //定義窗口分配器
[.trigger()] //設置出發器
[.evictor()]   //設置移除器
[.allowedLateness()]  // 設置延遲時間
[.sideOutputLateData()]  //設置側輸出流
.reduce/aggregate/fold/apply()  //處理函數
[.getSideOutput()] //獲取側輸出流

API調用

窗口操作包含兩個重要的概念:窗口分配器(window Assigners)和窗口函數(window function)兩部分;

窗口分配器用于構建窗口,確定窗口類型,確定數據劃分哪一個窗口,窗口函數制定數據的計算規則;

窗口分配器器:

作用:窗口分配器用來劃分窗口屬于哪一個窗口;

窗口按照時間可以劃分為:滾動、滑動和session,三種類型窗口;

窗口計數劃分:滾動和滑動兩種類型;

  eventStream.keyBy(data -> data.url).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).aggregate();
窗口函數

窗口函數按照計算特點可以分為增量計算和全量計算;

增量聚合函數:數據到達后立即計算,窗口只保存中間結果。效率高,性能好,但不夠靈活。

全量聚合函數:緩存窗口的所有元素,觸發后統一計算,效率低,但計算靈活。

增量聚合函數:

數據進入窗口會參與計算,窗口結束前只需要保留一個聚合后的狀態值,內存壓力小。

1.規約函數(ReduceFunction):數據保存留一個狀態,輸入類型和輸出類型必須一致,來一條數據會處理將數據合并到狀態中;

 stream.keyBy(r -> r.f0)// 設置滾動事件時間窗口.window(TumblingEventTimeWindows.of(Time.seconds(5))).reduce(new ReduceFunction<Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {// 定義累加規則,窗口閉合時,向下游發送累加結果return Tuple2.of(value1.f0, value1.f1 + value2.f1);}}).print();

sum、max、min等底層都是通過同名AggregateFunction實現(非下面的聚合函數),本質還是實現ReduceFunction結構重寫了reduce方法;

2.聚合函數(AggrateFunction):在規約函數基礎上進行完善。解決輸出和輸入類型必須一致的限制問題。實現應用更靈活;

  // 所有數據設置相同的key,發送到同一個分區統計PV和UV,再相除stream.keyBy(data -> true).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2))).aggregate(new AvgPv()).print();public static class AvgPv implements AggregateFunction<Event, Tuple2<HashSet<String>, Long>, Double> {@Overridepublic Tuple2<HashSet<String>, Long> createAccumulator() {// 創建累加器return Tuple2.of(new HashSet<String>(), 0L);}@Overridepublic Tuple2<HashSet<String>, Long> add(Event value, Tuple2<HashSet<String>, Long> accumulator) {// 屬于本窗口的數據來一條累加一次,并返回累加器accumulator.f0.add(value.user);return Tuple2.of(accumulator.f0, accumulator.f1 + 1L);}@Overridepublic Double getResult(Tuple2<HashSet<String>, Long> accumulator) {// 窗口閉合時,增量聚合結束,將計算結果發送到下游return (double) accumulator.f1 / accumulator.f0.size();}@Overridepublic Tuple2<HashSet<String>, Long> merge(Tuple2<HashSet<String>, Long> a, Tuple2<HashSet<String>, Long> b) {return null;}}
全窗口函數

全窗口函數會將進入窗口的數據先進行緩存,然后在窗口關閉時一起計算,緩存數據會占用內存資源,如果一個窗口數據量太大時,可能出現內存溢出的問題;

全窗口函數可以劃分窗口函數(windowFunction)和處理窗口函數(processWindowFunction)兩種;

窗口函數(windowFunction):老版本通用窗口接口,window()后調用apply(),傳入實現windowFunction接口; 缺點是不能獲取上下文信息,也沒有更高級的功能。因為在功能上可以被processWindowFunction全覆蓋,因此主鍵被棄用

DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).apply(new MyWindowFunction());@Public
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {/*** Evaluates the window and outputs none or several elements.** @param key The key for which this window is evaluated.* @param window The window that is being evaluated.* @param input The elements in the window being evaluated.* @param out A collector for emitting elements.* @throws Exception The function may throw exceptions to fail the program and trigger recovery.*/void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}

處理窗口函數(processWindowFunction):是窗口API中最底層通用的窗口函數接口,可以獲取到上問對象(context),實現為調用process方法傳入自定義繼承ProcessWindowFunction類;

input.keyBy(t -> t.f0).window(TumblingEventTimeWindows.of(Time.minutes(5))).process(new MyProcessWindowFunction());/* ... */public class MyProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {@Overridepublic void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {long count = 0;for (Tuple2<String, Long> in: input) {count++;}out.collect("Window: " + context.window() + "count: " + count);}
}

注意:一般增量窗口函數和全量窗口函數可以一起使用,window().aggregate()方法可以傳入兩個函數,第一個采用增量聚合函數,第二個傳入全量函數,這樣數據在進入窗口會觸發增量計算,窗口不會緩存數據。當窗口關閉觸發計算時,結果數據穿度到全量計算,參數Iterable中一般只有一個數據;

aggregate(acct1,acct2)

flink sql 窗口函數

flink sql 窗口也包含常見的滾動窗口、滑動窗口、session窗口,但還有一種累計窗口。

在flink1.13版本后flinksql支持累計窗口CUMULATE,可以實現沒5分鐘觸發一次計算,輸出當天的累計數據,使用樣例

SELECT cast(PROCTIME() as timestamp_ltz) as window_end_time,manufacturer_name,event_id,case when state is null then -1 else state end ,cast(sum(agg)as string ) as agg
FROM TABLE(CUMULATE(TABLE dm_cumulate, DESCRIPTOR(ts1), INTERVAL '5' MINUTES, INTERVAL '1' DAY(9)))
GROUP BYwindow_end,window_start,manufacturer_name,event_id,case when state is null then -1 else state end

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/38091.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/38091.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/38091.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

MySQL高級-MVCC-原理分析(RC級別)

文章目錄 1、RC隔離級別下&#xff0c;在事務中每一次執行快照讀時生成ReadView2、先來看第一次快照讀具體的讀取過程&#xff1a;3、再來看第二次快照讀具體的讀取過程: 1、RC隔離級別下&#xff0c;在事務中每一次執行快照讀時生成ReadView 我們就來分析事務5中&#xff0c;兩…

VBA代碼解決方案第十五講:如何對單元格區域進行高亮顯示

《VBA代碼解決方案》(版權10028096)這套教程是我最早推出的教程&#xff0c;目前已經是第三版修訂了。這套教程定位于入門后的提高&#xff0c;在學習這套教程過程中&#xff0c;側重點是要理解及掌握我的“積木編程”思想。要靈活運用教程中的實例像搭積木一樣把自己喜歡的代碼…

Java中集中常見的算法

以下是對選擇排序、冒泡排序和插入排序的理解及代碼實現 選擇排序&#xff1a; 理解&#xff1a;它通過不斷地從待排序元素中選擇最小&#xff08;或最大&#xff09;元素&#xff0c;并將其放置在已排序序列的一端。 代碼實現&#xff1a; public class SelectionSort {pu…

final、const、readonly關鍵字在不同語言中代表著什么

一、Java 1.被final修飾的類不能被繼承。 2.被final修飾的方法不能被重寫。 被 final 修飾的類中所有的成員方法都會隱式的定義為 final 方法。 若父類中 final 方法的訪問權限為 private &#xff0c;則子類中不能直接繼承該方法。此時可以在子類中定義相同方法名的函數&…

【操作系統期末速成】EP06 | 學習筆記(基于五道口一只鴨)

文章目錄 一、前言&#x1f680;&#x1f680;&#x1f680;二、正文&#xff1a;??????2.1 考點十四&#xff1a;同步互斥的基本概念2.2 考點十五&#xff1a;實現臨界區互斥的基本方法2.3 考點十六&#xff1a;信號量的含義及常用信號量 一、前言&#x1f680;&#x1…

品牌推廣的三個階段與核心內容,一篇文章全掌握!

在競爭激烈的市場環境中&#xff0c;品牌推廣是企業成功的關鍵。精心策劃的推廣策略能夠幫助企業在消費者心中樹立獨特的品牌形象&#xff0c;進而促進銷售增長。 作為一家手工酸奶品牌的創始人&#xff0c;目前全國也復制了100多家門店&#xff0c;我理解的品牌推廣分為3個階…

操作系統概論(二)

一、單項選擇題(本大題共20小題&#xff0c;每小題1分&#xff0c;共20分) 在每小題列出的四個備選項中只有一個選項是符合題目要求的&#xff0c;請將其代碼填寫在題后的括號內。錯選、多選或未選均無分。 1&#xff0e;操作員接口是操作系統為用戶提供的使用計算機系統的手…

Vitis IDE 艱難切換--從傳統 Vitis GUI 到 2024.1 統一軟件界面

目錄 1. 簡介 2. 界面展示 2.1 啟動 2.2 Flow Navigator 2.1.1 C Simulation Dialog 2.1.2 C Synthesis 2.1.3 C/RTL Co-simulation 2.1.4 Implementation 2.1.5 Package 3. C Synthesis 詳解 3.1 Classic Configuration Settings 3.1.1 config_array_partition 3…

MySQL進階:存儲過程和函數

存儲過程和函數 1. 簡介2. 創建存儲過程使用MySQL工作臺創建存儲過程 3. 刪除存儲過程4. 參數帶默認值的參數參數驗證輸出參數 5. 變量6. 函數7. 其他約定 1. 簡介 存儲過程三大作用&#xff1a; 儲存和管理SQL代碼&#xff08;置于數據庫中&#xff0c;與應用層分離&#xf…

【力扣 28】找出字符串中第一個匹配項的下標 C++題解(字符串匹配)

給你兩個字符串 haystack 和 needle &#xff0c;請你在 haystack 字符串中找出 needle 字符串的第一個匹配項的下標&#xff08;下標從 0 開始&#xff09;。如果 needle 不是 haystack 的一部分&#xff0c;則返回 -1 。 示例 1&#xff1a; 輸入&#xff1a;haystack “s…

(13)DroneCAN 適配器節點(二)

文章目錄 前言 2 固件 2.1 基于F103 2.2 基于F303 2.3 基于F431 3 ArduPilot固件DroneCAN設置 3.1 f303-通用設置示例 4 DroneCAN適配器節點 前言 這些節點允許現有的 ArduPilot 支持的外圍設備作為 DroneCAN 或 MSP 設備適應 CAN 總線。這也允許擴展自動駕駛儀硬件的…

隨機文本生成器

目錄 開頭程序程序的流程圖程序打印的效果(不必細看&#xff0c;因為字符太多)例1例2例3 結尾 開頭 大家好&#xff0c;我叫這是我58。看&#xff01;這下面有一個程序。 程序 #define _CRT_SECURE_NO_WARNINGS 1 #include <stdio.h> #include <random> #includ…

快遞物流倉庫管理系統java項目springboot和vue的前后端分離系統java課程設計java畢業設計

文章目錄 快遞物流倉庫管理系統一、項目演示二、項目介紹三、部分功能截圖四、部分代碼展示五、底部獲取項目源碼&#xff08;9.9&#xffe5;帶走&#xff09; 快遞物流倉庫管理系統 一、項目演示 快遞物流倉庫管理系統 二、項目介紹 語言: Java 數據庫&#xff1a;MySQL 前…

寶塔安裝rabbitMQ實戰

服務器環境說明 阿里云服務器、寶塔、centos7 一、下載erlang 原因&#xff1a;RabbitMQ服務端代碼是使用并發式語言Erlang編寫的&#xff0c;安裝Rabbit MQ的前提是安裝Erlang。 下載地址&#xff1a;http://www.erlang.org/downloads 下載對應的版本&…

山東省著名烈士孫善師孫善帥故居布展喜添新篇

人海信息網山東訊&#xff08;張春兄、馮愛云&#xff09; “……他們以鋼鐵般的意志&#xff0c;堅守共產黨員的使命&#xff0c;他們就是濼口九烈士的孫善師孫善帥兄弟&#xff01;”6月28日&#xff0c;對于山東省著名烈士孫善師孫善帥故居來說&#xff0c;又是一個不平凡的…

LabVIEW電壓電流實時監測系統

開發了一種基于LabVIEW和研華&#xff08;Advantech&#xff09;數據采集卡的電壓電流實時監測系統&#xff0c;通過高效的數據采集和處理&#xff0c;為工業和科研用戶提供高精度、實時的電壓電流監測解決方案。系統采用研華USB-4711A數據采集卡&#xff0c;結合LabVIEW編程環…

AI論文速讀 | 2024[KDD]自適應時空圖神經網絡中圖中獎彩票的預訓練識別

題目&#xff1a;Pre-Training Identification of Graph Winning Tickets in Adaptive Spatial-Temporal Graph Neural Networks 作者&#xff1a;Wenying Duan, Tianxiang Fang, Hong Rao, Xiaoxi He 機構&#xff1a;南昌大學&#xff0c;澳門大學 arXiv網址&#xff1a;h…

Python數據分析-股票分析和可視化(深證指數)

一、內容簡介 股市指數作為衡量股市整體表現的重要工具&#xff0c;不僅反映了市場的即時狀態&#xff0c;也提供了經濟健康狀況的關鍵信號。在全球經濟體系中&#xff0c;股市指數被廣泛用于預測經濟活動&#xff0c;評估投資環境&#xff0c;以及制定財政和貨幣政策。在中國…

IEEE JSTSP綜述:從信號處理領域分析視觸覺傳感器的研究

觸覺傳感器是機器人系統的重要組成部分&#xff0c;雖然與視覺相比觸覺具有較小的感知面積&#xff0c;但卻可以提供機器人與物體交互過程中更加真實的物理信息。 視覺觸覺傳感是一種分辨率高、成本低的觸覺感知技術&#xff0c;被廣泛應用于分類、抓取、操作等領域中。近期&a…

R-CNN和YOLO的各自優缺點

R-CNN&#xff08;包括其改進版本如Faster R-CNN和Mask R-CNN&#xff09;與YOLO&#xff08;You Only Look Once&#xff09;是兩種常用的物體檢測算法&#xff0c;它們各自有不同的優缺點&#xff0c;適用于不同的應用場景和需求。 R-CNN 系列 優點&#xff1a; 高精度&am…