Flink Window中典型的增量聚合(ReduceFunction / AggregateFunction)

一、什么是增量聚合函數

在Flink Window中定義了窗口分配器,我們只是知道了數據屬于哪個窗口,可以將數據收集起來了;至于收集起來到底要做什么,其實還完全沒有頭緒,這也就是窗口函數所需要做的事情。所以在窗口分配器之后,我們還要再接上一個定義窗口如何進行計算的操作,這就是所謂的“窗口函數”(window functions)。
窗口可以將數據收集起來,最基本的處理操作當然就是基于窗口內的數據進行聚合。
我們可以每來一個數據就在之前結果上聚合一次,這就是“增量聚合”。
典型的增量聚合函數有兩個:ReduceFunction 和 AggregateFunction。
在這里插入圖片描述

二、ReduceFunction

源碼解析

@FunctionalInterface
@Public
public interface ReduceFunction<T> extends Function, Serializable {T reduce(T var1, T var2) throws Exception;
}

實際案例
在Flink中,使用socket模擬實時的數據流DataStream,通過定義一個滾動窗口,窗口的大小為10s,按照id分區,使用reduce聚合函數實現value的累加統計

package com.flink.DataStream.WindowFunctions;import com.flink.POJOs.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class FlinkWindowReduceFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();streamExecutionEnvironment.setParallelism(1);DataStreamSource<String> streamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);// 注意這里為什么返回的是KeyedStream(建控流/分區流),而不是DataStreamKeyedStream<WaterSensor, String> keyedStream = streamSource// 使用map函數將輸入的string轉為一個WaterSensor類.map(new MapFunction<String, WaterSensor>() {@Overridepublic WaterSensor map(String s) throws Exception {// 這里寫的很詳細,如何把string轉為的WaterSensor類String[] strings = s.split(",");String id = strings[0];Long ts = Long.valueOf(strings[1]);Integer vc = Integer.valueOf(strings[2]);WaterSensor waterSensor = new WaterSensor();waterSensor.setId(id);waterSensor.setTs(ts);waterSensor.setVc(vc);return waterSensor;//return new WaterSensor(strings[0],Long.valueOf(strings[1]),Integer.valueOf(strings[2])}})// 按照id做keyBy分區(提問:KeyBy是如何實現分區的?).keyBy(new KeySelector<WaterSensor, String>() {// 也可以直接使用lamda表達式更簡單@Overridepublic String getKey(WaterSensor waterSensor) throws Exception {// getId()方法就是return的waterSensor.idreturn waterSensor.getId();}});/*** 窗口操作主要有兩個部分:窗口分配器(Window Assigners)和窗口函數(WindowFunctions)* .window()方法需要傳入一個窗口分配器,它指明了窗口的類型* */SingleOutputStreamOperator<WaterSensor> outputStreamOperator = keyedStream// 設置滾動窗口的大小(10秒).window(TumblingProcessingTimeWindows.of(Time.seconds(10)))// 使用匿名函數實現增量聚合函數ReduceFunction.reduce(new ReduceFunction<WaterSensor>() {@Overridepublic WaterSensor reduce(WaterSensor waterSensor1, WaterSensor waterSensor2) throws Exception {System.out.println("調用reduce方法,之前的結果:" + waterSensor1 + ",現在來的數據:" + waterSensor2);return new WaterSensor(waterSensor1.getId(), System.currentTimeMillis(), waterSensor1.getVc() + waterSensor2.getVc());}});outputStreamOperator.print();streamExecutionEnvironment.execute();}
}

啟動Flink程序,啟動nc,模擬輸入

nc -lk 8888
# 00-10秒輸入
a,11111,1
# 11-20秒輸入
a,11111,2
a,22222,3
# 21-30秒輸入
a,11111,4

查看控制臺打印結果

WaterSensor{id='a', ts=11111, vc=1}
調用reduce方法,之前的結果:WaterSensor{id='a', ts=11111, vc=2},現在來的數據:WaterSensor{id='a', ts=22222, vc=3}
WaterSensor{id='a', ts=1702022598011, vc=5}
WaterSensor{id='a', ts=11111, vc=4}

在這里插入圖片描述

三、AggregateFunction

雖然ReduceFunction 可以解決大多數歸約聚合的問題,但是我們通過上述案例可以發現:這個接口有一個限制,就是聚合狀態的類型、輸出結果的類型都必須和輸入數據類型一樣。
Flink Window API 中的 aggregate 就突破了這個限制,可以定義更加靈活的窗口聚合操作。這個方法需要傳入一個 AggregateFunction 的實現類作為參數。AggregateFunction 可以看作是 ReduceFunction 的通用版本,這里有三種類型:輸入類型(IN)、累加器類型(ACC)和輸出類型(OUT)。輸入類型 IN 就是輸入流中元素的數據類型;累加器類型 ACC 則是我們進行聚合的中間狀態類型;而輸出類型當然就是最終計算結果的類型了。

@PublicEvolving
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {ACC createAccumulator();ACC add(IN var1, ACC var2);OUT getResult(ACC var1);ACC merge(ACC var1, ACC var2);
}

接口中有四個方法:
1.createAccumulator()
創建一個累加器,這就是為聚合創建了一個初始狀態,每個聚合任務只會調用一次。
2.add()
將輸入的元素添加到累加器中。
3.getResult()
從累加器中提取聚合的輸出結果。
4.merge()
合并兩個累加器,并將合并后的狀態作為一個累加器返回。
所以可以看到,AggregateFunction 的工作原理是:首先調用 createAccumulator()為任務初始化一個狀態(累加器);而后每來一個數據就調用一次 add()方法,對數據進行聚合,得到的結果保存在狀態中;等到了窗口需要輸出時,再調用 getResult()方法得到計算結果。很明顯,與 ReduceFunction 相同,AggregateFunction 也是增量式的聚合;而由于輸入、中間狀態、輸出的類型可以不同,使得應用更加靈活方便。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/207188.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/207188.shtml
英文地址,請注明出處:http://en.pswp.cn/news/207188.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

聽GPT 講Rust源代碼--src/tools(9)

File: rust/src/tools/rust-analyzer/crates/ide-assists/src/handlers/apply_demorgan.rs 在Rust源代碼中&#xff0c;apply_demorgan.rs文件位于rust-analyzer工具的ide-assists庫中&#xff0c;其作用是實現一個輔助函數&#xff0c;用于在代碼中應用De Morgan定律的變換。 …

Android : 籃球記分器app _簡單應用

示例圖&#xff1a; 1.導包 在build.gradle 中 加入 // 使用androidx版本庫implementation androidx.lifecycle:lifecycle-extensions:2.1.0-alpha03 2. 開啟dataBinding android{...// 步驟1.開啟data bindingdataBinding {enabled true}...} 3.寫個類繼承 ViewModel pac…

整數與IP地址間的轉換

原理&#xff1a;ip地址的每段可以看成是一個0-255的整數&#xff0c;把每段拆分成一個二進制形式組合起來&#xff0c;然后把這個二進制數轉變成一個長整數。 舉例&#xff1a;一個ip地址為10.0.3.193 每段數字相對應的二進制數 10 00001010 0 00000000 3 00000011 193 110000…

自下而上-存儲全棧(TiDB/RockDB/SPDK/fuse/ceph/NVMe/ext4)存儲技術專家成長路線

數字化時代的到來帶來了大規模數據的產生&#xff0c;各行各業都面臨著數據爆炸的挑戰。 隨著云計算、物聯網、人工智能等新興技術的發展&#xff0c;對存儲技術的需求也越來越多樣化。不同應用場景對存儲的容量、性能、可靠性和成本等方面都有不同的要求。具備存儲技術知識和技…

機器學習-聚類問題

前言 聚類算法又叫做”無監督分類“&#xff0c;目標是通過對無標記訓練樣本來揭示數據的內在性質及 規律&#xff0c;為進一步的數據分析提供基礎。 Kmeans 作為聚類算法的典型代表&#xff0c;Kmeans可以說是最簡單的聚類算法&#xff0c;沒有之一&#xff0c;那她是怎么完…

MySQL為何偏愛B+樹索引

一、MySQL、B樹概念 MySQL是一種關系型數據庫&#xff0c;它使用SQL語言來操作數據。SQL語言可以實現對數據的增刪改查等操作&#xff0c;但是如果數據量很大&#xff0c;那么這些操作的效率就會很低。為了提高效率&#xff0c;MySQL引入了索引的概念。 索引是一種數據結構&am…

人體關鍵點檢測1:人體姿勢估計數據集

人體關鍵點檢測1&#xff1a;人體姿勢估計數據集 目錄 人體關鍵點檢測1&#xff1a;人體姿勢估計數據集 1.人體姿態估計 2.人體姿勢估計數據集 &#xff08;1&#xff09;COCO數據集 &#xff08;2&#xff09;MPII數據集 &#xff08;3&#xff09;Human3.6M &#xf…

PostgreSQL 主鍵和唯一鍵的區別

主鍵和唯一鍵的區別 主鍵&#xff08;Primary Key&#xff09;&#xff1a; 主鍵是用于唯一標識表中的每一條記錄的鍵。主鍵必須是唯一的&#xff0c;不允許為空。一個表只能有一個主鍵。主鍵可以由一個或多個字段組成。主鍵的值在整個表中必須是唯一的&#xff0c;用于確保數據…

編譯器:swc 究竟比 babel 快在哪里?

前言 swc 與 babel 都是 JavaScript 編譯器&#xff0c;它們的主要功能是將 ES2015 以及 TypeScript, Flow, JSX 等語法轉換為瀏覽器或環境中的向后兼容的 JavaScript 代碼。 哪里快了&#xff1f; 1. 開發語言的優勢 swc 是用 Rust 語言開發的&#xff0c;而 babel 是用 Java…

MS5228/5248/5268:2.7V 到 5.5V、 12/14/16Bit、內置基準、八通道數模轉換器

MS5228/MS5248/MS5268 是一款 12/14/16bit 八通道輸出的電壓型 DAC &#xff0c;內部集成上電復位電路、可選內部基準、接口采用四線串口模式&#xff0c; 最高工作頻率可以到 40MHz &#xff0c;可以兼容 SPI 、 QSPI 、 DSP 接口和 Microwire 串口。輸出接到一個 …

IP地址/16或者/24的意義

IP地址/16或者/24的意義 2023-04-26 16:54 獵手家園 閱讀(533) 評論(0) 編輯 收藏 舉報 當創建VPC專有網絡時&#xff0c;許多人會遇到填寫IPv4地址的情況&#xff0c;通常使用的格式是xxx.xxx.xxx.xxx/16或者xxx.xxx.xxx.xxx/24。那么這個斜杠后面的數字代表什么意思呢&#…

<習題集><LeetCode><鏈表><2/19/21/23/24>

目錄 2. 兩數相加 19. 刪除鏈表的倒數第 N 個結點 21. 合并兩個有序鏈表 23. 合并 K 個升序鏈表 24. 兩兩交換鏈表中的節點 2. 兩數相加 https://leetcode.cn/problems/add-two-numbers/ public ListNode addTwoNumbers(ListNode l1, ListNode l2) {//head是cur鏈表頭節點…

pdf轉png的兩種方法

背景:pdf在一般公司,沒有辦公系統,又不是word/wps/Office系統,讀不出來,識別不了,只能將其轉化為圖片png,因此在小公司或者一般公司就需要pdf轉png的功能。本文將詳細展開。 1、fitz庫(也就是PyMuPDF) 直接pip安裝PyMuPDF即可使用,直接使用fitz操作,無需其他庫。 …

Go語言實現深度學習的正向傳播和反向傳播

文章目錄 開發前言開發理論圖解理論數據類型數學函數數據節點統一抽象變量數據節點常量數據節點單目運算封裝雙目運算封裝算子節點統一抽象基礎算子加法算子減法算子乘法算子除法算子指數算子對數算子正切算子正弦算子余弦算子數據流圖正向傳播反向傳播正向訓練反向訓練運行示例…

我的記事本

url uniform resource locator. 統一資源定位符 請求狀態碼 1XX:信息響應 2XX:成功響應 3XX:重定向消息 4XX:客戶端錯誤響應 5XX:服務器端錯誤響應 IP地址分類 本機回環IP地址&#xff1a;127.0.0.1 &#xff5e; 127.255.255.254 局域網IP(私網IP) 192.168.0.0 &am…

船舶機電設備振動數據采集監控系統解決方案

船舶運行中&#xff0c;通常需要通過振動數據采集系統對船舶的各個機電設備運行進行監控&#xff0c;有助于在設備故障時快速預警&#xff0c;進行診斷、分析和維護&#xff0c;保證船舶機電設備正常工作&#xff0c;從而確保工作人員及船舶的安全。 船舶各種機電設備會產生大…

vLLM介紹

簡介 vLLM 工程github地址 Paged attention論文地址 vLLM開發者介紹 Woosuk Kwon vLLM: A high-throughput and memory-efficient inference and serving engine for LLMs. SkyPilot: A framework for easily and cost effectively running machine learning workloads on …

【模型量化】神經網絡量化基礎及代碼學習總結

1 量化的介紹 量化是減少神經網絡計算時間和能耗的最有效的方法之一。在神經網絡量化中&#xff0c;權重和激活張量存儲在比訓練時通常使用的16-bit或32-bit更低的比特精度。當從32-bit降低到8-bit&#xff0c;存儲張量的內存開銷減少了4倍&#xff0c;矩陣乘法的計算成本則二…

ALNS算法中隨機化重要性的評價

文章概述 本研究分析了在海上提貨和交付問題中使用的ALNS元啟發式算法中的隨機化成分。研究者提出了簡單的確定性替代方案&#xff0c;并通過實驗比較了隨機化和確定性成分的性能。結果表明&#xff0c;初始實現的簡單確定性替代方案能夠與隨機化成分的性能相匹配。這項研究為…

IDEA使用git從遠程倉庫獲取項目

將地址填入url中 然后直接clone就行