37、Flink 的窗口函數(Window Functions)詳解

窗口函數(Window Functions)
a)概述

定義了 window assigner 之后,需要指定當窗口觸發之后,如何計算每個窗口中的數據, 即 window function。

窗口函數有三種:ReduceFunctionAggregateFunctionProcessWindowFunction

  • 前兩者執行更高效,因為 Flink 可以在每條數據到達窗口后進行增量聚合(incrementally aggregate);
  • ProcessWindowFunction 會得到能夠遍歷當前窗口內所有數據的 Iterable,以及關于這個窗口的 meta-information。

使用 ProcessWindowFunction 的窗口轉換操作沒有其它兩種函數高效,因為 Flink 在窗口觸發前必須緩存里面的所有數據; ProcessWindowFunction 可以與 ReduceFunctionAggregateFunction 合并來提高效率,既可以增量聚合窗口內的數據,又可以從 ProcessWindowFunction 接收窗口的 metadata。

b)ReduceFunction

ReduceFunction 指定兩條輸入數據如何合并起來產生一條輸出數據,輸入和輸出數據的類型必須相同

Flink 使用 ReduceFunction 對窗口中的數據進行增量聚合。

示例:對窗口內元組的第二個屬性求和。

DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).reduce(new ReduceFunction<Tuple2<String, Long>>() {public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {return new Tuple2<>(v1.f0, v1.f1 + v2.f1);}});
c)AggregateFunction

ReduceFunctionAggregateFunction 的特殊情況; AggregateFunction 接收三個參數:輸入數據的類型(IN)、累加器的類型(ACC)和輸出數據的類型(OUT)。

輸入數據的類型是輸入流的元素類型,AggregateFunction 接口有如下幾個方法: 把每一條元素加進累加器、創建初始累加器、合并兩個累加器、從累加器中提取輸出(OUT 類型)。

ReduceFunction 相同,Flink 會在輸入數據到達窗口時直接進行增量聚合。

示例:計算窗口內所有元素第二個屬性的平均值。

private static class AverageAggregateimplements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {@Overridepublic Tuple2<Long, Long> createAccumulator() {return new Tuple2<>(0L, 0L);}@Overridepublic Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);}@Overridepublic Double getResult(Tuple2<Long, Long> accumulator) {return ((double) accumulator.f0) / accumulator.f1;}@Overridepublic Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);}
}DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).aggregate(new AverageAggregate());
d)ProcessWindowFunction

ProcessWindowFunction 具備 Iterable 能獲取窗口內所有的元素 ,以及用來獲取時間和狀態信息的 Context 對象,比其他窗口函數更加靈活;ProcessWindowFunction 的靈活性是以性能和資源消耗為代價的, 因為窗口中的數據無法被增量聚合,而需要在窗口觸發前緩存所有數據。

ProcessWindowFunction 的函數簽名如下:

public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {/*** Evaluates the window and outputs none or several elements.** @param key The key for which this window is evaluated.* @param context The context in which the window is being evaluated.* @param elements The elements in the window being evaluated.* @param out A collector for emitting elements.** @throws Exception The function may throw exceptions to fail the program and trigger recovery.*/public abstract void process(KEY key,Context context,Iterable<IN> elements,Collector<OUT> out) throws Exception;/*** Deletes any state in the {@code Context} when the Window expires (the watermark passes its* {@code maxTimestamp} + {@code allowedLateness}).** @param context The context to which the window is being evaluated* @throws Exception The function may throw exceptions to fail the program and trigger recovery.*/public void clear(Context context) throws Exception {}/*** The context holding window metadata.*/public abstract class Context implements java.io.Serializable {/*** Returns the window that is being evaluated.*/public abstract W window();/** Returns the current processing time. */public abstract long currentProcessingTime();/** Returns the current event-time watermark. */public abstract long currentWatermark();/*** State accessor for per-key and per-window state.** <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up* by implementing {@link ProcessWindowFunction#clear(Context)}.*/public abstract KeyedStateStore windowState();/*** State accessor for per-key global state.*/public abstract KeyedStateStore globalState();}}

key 參數由 keyBy() 中指定的 KeySelector 選出;如果是給出 key 在 tuple 中的 index 或用屬性名的字符串形式指定 key,這個 key 的類型將總是 Tuple, 并且需要手動將它轉換為正確大小的 tuple 才能提取 key。

示例:使用 ProcessWindowFunction 對窗口中的元素計數,并且將窗口本身的信息一同輸出。

DataStream<Tuple2<String, Long>> input = ...;input.keyBy(t -> t.f0).window(TumblingEventTimeWindows.of(Time.minutes(5))).process(new MyProcessWindowFunction());public class MyProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {@Overridepublic void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {long count = 0;for (Tuple2<String, Long> in: input) {count++;}out.collect("Window: " + context.window() + "count: " + count);}
}
e)增量聚合的 ProcessWindowFunction

ProcessWindowFunction 可以與 ReduceFunctionAggregateFunction 搭配使用, 使其能夠在數據到達窗口的時候進行增量聚合,當窗口關閉時,ProcessWindowFunction 將會得到聚合的結果;即實現了增量聚合窗口的元素并且從 ProcessWindowFunction 中獲得窗口的元數據。

使用 ReduceFunction 增量聚合

示例:將 ReduceFunctionProcessWindowFunction 組合,返回窗口中的最小元素和窗口的開始時間。

DataStream<SensorReading> input = ...;input.keyBy(<key selector>).window(<window assigner>).reduce(new MyReduceFunction(), new MyProcessWindowFunction());// Function definitions
private static class MyReduceFunction implements ReduceFunction<SensorReading> {public SensorReading reduce(SensorReading r1, SensorReading r2) {return r1.value() > r2.value() ? r2 : r1;}
}private static class MyProcessWindowFunctionextends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {public void process(String key,Context context,Iterable<SensorReading> minReadings,Collector<Tuple2<Long, SensorReading>> out) {SensorReading min = minReadings.iterator().next();out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));}
}

使用 AggregateFunction 增量聚合

示例:將 AggregateFunction 與 ProcessWindowFunction 組合,計算平均值并與窗口對應的 key 一同輸出。

DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).aggregate(new AverageAggregate(), new MyProcessWindowFunction());// Function definitionsprivate static class AverageAggregateimplements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {@Overridepublic Tuple2<Long, Long> createAccumulator() {return new Tuple2<>(0L, 0L);}@Overridepublic Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);}@Overridepublic Double getResult(Tuple2<Long, Long> accumulator) {return ((double) accumulator.f0) / accumulator.f1;}@Overridepublic Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);}
}private static class MyProcessWindowFunctionextends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {public void process(String key,Context context,Iterable<Double> averages,Collector<Tuple2<String, Double>> out) {Double average = averages.iterator().next();out.collect(new Tuple2<>(key, average));}
}
f)在 ProcessWindowFunction 中使用 per-window state

除了訪問 keyed state,ProcessWindowFunction 還可以使用作用域僅為“當前正在處理的窗口”的 keyed state

per-window 中的 window 對應某個 key 的窗口實例:比如 以 user-id xyz 為 key,從 12:00 到 13:00 的時間窗口,具體情況取決于窗口的定義,根據具體的 key 和時間段會產生諸多不同的窗口實例。

Per-window state 如果處理有 1000 種不同 key 的事件,并且目前所有事件都處于 [12:00, 13:00) 時間窗口內,那么將會得到 1000 個窗口實例, 且每個實例都有自己的 keyed per-window state。

process() 接收到的 Context 對象中有兩個方法允許訪問以下兩種 state:

  • globalState(),訪問全局的 keyed state
  • windowState(), 訪問作用域僅限于當前窗口的 keyed state

如果可能將一個 window 觸發多次(比如當遲到數據會再次觸發窗口計算, 或自定義了根據推測提前觸發窗口的 trigger),那么這個功能將非常有用,這時可能需要在 per-window state 中儲存關于之前觸發的信息或觸發的總次數。

當使用窗口狀態時,一定記得在刪除窗口時清除這些狀態,應該定義在 clear() 方法中

WindowFunction(已過時)

在某些可以使用 ProcessWindowFunction 的地方,也可以使用 WindowFunction;它是舊版的 ProcessWindowFunction,只能提供更少的環境信息且缺少一些高級的功能,比如 per-window state。

WindowFunction 的函數簽名如下:

public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {/*** Evaluates the window and outputs none or several elements.** @param key The key for which this window is evaluated.* @param window The window that is being evaluated.* @param input The elements in the window being evaluated.* @param out A collector for emitting elements.** @throws Exception The function may throw exceptions to fail the program and trigger recovery.*/void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}

可以像下例這樣使用:

DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).apply(new MyWindowFunction());

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/17075.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/17075.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/17075.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

嵌入式學習記錄5.27(c++基礎1)

目錄 一.C和C的區別 二.輸入輸出流類 2.1輸出cout 2.2輸入cin 三.命名空間 2.1使用命名空間中的標識符 2.2命名空間中聲明函數 2.3命名沖突問題 2.4匿名空間 2.5命名空間添加&#xff0c;嵌套&#xff0c;重命名 四.字符串的使用 4.1string類 4.2C風格和C風格字符串…

LeetCode27.移除元素

題目鏈接&#xff1a; 27. 移除元素 - 力扣&#xff08;LeetCode&#xff09; 思路分析&#xff1a;同樣屬于經典的雙指針移動問題&#xff0c;要掌握固定的思路即可。 算法分析&#xff1a;這個題目可以這樣處理&#xff0c;我們把所有非val 的元素都向前移動&#xff0c;把…

Java面試八股之線程池是怎么實現的

線程池是怎么實現的 線程池是一種基于池化技術的線程管理方式&#xff0c;通過預先創建一定數量的線程并保持在池中待命&#xff0c;從而在有任務來臨時能夠快速分配線程處理任務&#xff0c;而無需頻繁創建和銷毀線程&#xff0c;以此達到提升系統性能、減少資源消耗的目的。…

推薦《從零開始大模型開發與微調》

大模型是深度學習是當前AI和NLP研究與產業中最重要的方向之一。 本書用PyTorch 2.0作為學習大模型的基本框架&#xff0c;以ChatGLM為例詳細講解大模型的基本理論、算法、程序實現、應用實戰以及微調技術&#xff0c;為讀者揭示大模型開發技術。 《從零開始大模型開發與微調&…

兩個數組的交集-力扣

想到的解法是使用兩個哈希表&#xff0c;s1用來統計nums1中出現過的數字&#xff0c;然后遍歷nums2數組&#xff0c;當能夠在s1中查找到nums2的元素時&#xff0c;將這個元素添加到s2中&#xff0c;最后遍歷s2&#xff0c;將其中的元素添加到返回數組中。 但最開始寫時&#xf…

外星人存在與否......----小話外星人(1)

前一段時間&#xff0c;看了好多關于UFO、外星人、宇宙、遠古外星人的視頻和電子書&#xff0c;最后發現&#xff0c;這樣的東西還是不要看多為好&#xff0c;搞得好像這些是真的似的&#xff0c;有時睡覺會被意外驚醒&#xff0c;想多了...... 1、外星人存在嗎 不管有多少UFO的…

Windows10映射網絡驅動器之后不顯示映射盤

目錄 背景解決步驟1、按 Windows R 打開運行2、打開注冊表編輯器3、 System上新建-- DWORD(32bit)4、對新建的文件重命名5、將EnableLinkedConnections的數值改為16、退出注冊表編輯器&#xff0c;重啟系統。 知識擴展斷開連接備份注冊表 背景 目前有一臺NAS服務器,和一臺lin…

Vuex 頁面刷新數據丟失怎么解決

當Vuex中的數據在頁面刷新后丟失時&#xff0c;這通常是因為Vuex的狀態數據是保存在運行內存中的&#xff0c;頁面刷新會導致Vue實例重新加載&#xff0c;進而Vuex中的數據被重置為初始狀態。為了解決這個問題&#xff0c;可以采取以下幾種方法&#xff1a; 1. 使用瀏覽器的本…

工廠模式的三種實現方式

文章目錄 1.引出工廠模式具體需求 2.傳統模式1.類圖2.目錄結構3.pizzastore 用于設計pizza1.Pizza.java 抽象的Pizza類型2.CheesePizaa.java CheesePizaa3.GreekPizza.java GreekPizza 4.order 用于訂購和制作pizza1.OrderPizza.java 制作pizza2.PizzaStore.java 訂購pizza 5.優…

【Redis】 關于列表類型

文章目錄 &#x1f343;前言&#x1f340;常見操作命令介紹&#x1f6a9;lpush&#x1f6a9;lpushx&#x1f6a9;rpush&#x1f6a9;rpushx&#x1f6a9;lrange&#x1f6a9;lpop&#x1f6a9;rpop&#x1f6a9;lindex&#x1f6a9;linsert&#x1f6a9;llen&#x1f6a9;lrem&…

“按摩”科技?

都說A股股民是特別善于學習的&#xff0c;這不市場又現新概念——“按摩科技”&#xff0c;成立僅6年&#xff0c;把上門按摩干到35億營收也是沒誰了&#xff0c;現在號稱有1000萬用戶&#xff0c;3萬家入駐商戶數的按摩平臺&#xff0c;難道就憑借2.5萬名女技師&#xff0c;活…

【Django】中間件實現鉤子函數預處理和后處理,局部裝飾視圖函數

在app文件夾里新建middleware.py繼承MiddlewareMixin&#xff0c; 編寫中間件類&#xff0c;重寫process_request、process_response鉤子函數 from django.http import HttpRequest, HttpResponse from django.utils.decorators import decorator_from_middleware from django…

關于pytest中用例名稱使用中文亂碼的解決

場景&#xff1a;使用pytest.mark.parametrize裝飾器為用例自定義名稱時&#xff0c;運行顯示亂碼。如下圖所示&#xff1a; 解決方案&#xff1a; 1.在根目錄 pytest.ini中增加一行代碼 [pytest] disable_test_id_escaping_and_forfeit_all_rights_to_community_supportTrue…

NAT 網絡轉換

NAT(Network Address Translation) 網絡地址轉換 0x01 NAT 簡介 為什么要使用 NAT IPv4 網絡地址緊缺&#xff0c;從而出現了私有網段&#xff0c;來補充地址&#xff0c;但私有網段不課訪問 internet 所以出現了 NAT 地址轉換&#xff0c;將私有地址&#xff0c;轉換為公網 I…

一口氣看完es(上)

此系列博客分為上中下3篇&#xff1a;上篇是關于es的概念和對數據的增刪改操作&#xff0c;中篇是對數據的查詢、對搜索結果進行處理操作&#xff0c;下篇是介紹怎么在Java代碼中調用和操作es。 基本概念 1、es是什么&#xff1f;有什么作用&#xff1f; es全名是elasticsea…

關于0成本部署個人博客

分享一個文章關于零成本搭建個人博客 參考&#xff1a;‘關于部署博客hexoshokagithub的流程以及問題’ - 關于博客部署 | XiaoYang Guo Welcome to Guo Xiaoyangs personal blog 歡迎來到郭曉陽的個人博客 (1330303.github.io) 這個博主講的流程很全&#xff0c;而且回答也…

智慧管廊巡檢運維解決方案

一、智慧管廊巡檢行業目前存在的挑戰和難題 智慧管廊巡檢行業面臨著運行環境的客觀影響&#xff0c;如地面施工、液體滲漏、通風不佳、內部空間受限等問題。而管廊巡檢機器人系統的出現卻具有重大意義。它能夠有力地保障管廊安全且可靠地運行&#xff0c;在面對火情、災情等緊…

springboot基礎篇(快速入門+要點總結)

目錄 一、SpringBoot簡介 二、創建SpringBoot&#xff08;通過Idea腳手架搭建項目&#xff09; 三、properties配置文件 properties 配置文件說明 ①. properties 基本語法 ②. 讀取配置?件 ③. properties 缺點 2. yml 配置?件說明 ①. yml 基本語法 ②. yml 使用進…

上海AI lab發布MathBench,GPT-4o的數學能力有多強?

大模型數學能力哪家強&#xff1f; 最近&#xff0c;上海AI lab構建了一個全面的多語言數學基準——MathBench。與現有的基準不同的是&#xff0c;MathBench涵蓋從小學、初中、高中、大學不同難度&#xff0c;從基礎算術題到高階微積分、統計學、概率論等豐富類別的數學題目&a…

React項目知識積累(五)

1.dispatch、dev派發 src/models/formStatus.js: const FromStatusModel {namespace: "fromStatus",state: {isDisable: false,},reducers: {saveIsDisable(state, { payload }) {return {...state,...payload,};},}, };export default FromStatusModel; 改變和提…