Flink自定義函數

一、UDF 核心原理

Flink 自定義函數(UDF)是擴展 Table API/SQL 能力的核心機制,允許將自定義邏輯嵌入查詢。其設計遵循以下原則:

1. 函數類型體系

類型輸入輸出關系核心用途
標量函數(ScalarFunction)0~N 個標量 → 1 個標量字段轉換、值計算
表值函數(TableFunction)0~N 個標量 → 多行多列數據拆分、關聯外部數據
聚合函數(AggregateFunction)多行標量 → 1 個標量自定義聚合(如加權平均)
表值聚合函數(TableAggregateFunction)多行標量 → 多行多列分組TopN、分桶統計等
異步表值函數異步查詢外部系統 → 多行多列高效關聯外部數據庫/API

2. 類型系統

  • 標量/表值函數使用新數據類型系統(基于DataTypes
  • 聚合函數仍使用舊類型系統(基于TypeInformation
  • 類型推導:默認通過反射獲取,復雜場景可通過@DataTypeHint@FunctionHint注解顯式指定

3. 執行邏輯

  • 核心是求值方法(如eval()accumulate()),定義數據處理邏輯
  • 生命周期:open()初始化 → 求值方法調用 → close()資源清理
  • 確定性:通過isDeterministic()聲明是否返回確定結果(影響優化策略)

二、快速上手實戰

1. 標量函數(ScalarFunction)

作用:對輸入標量做轉換計算(如字符串處理、格式轉換)

實現步驟

  1. 繼承ScalarFunction,實現eval()方法
    public class HashFunction extends ScalarFunction {// 輸入任意類型,返回哈希值public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {return o.hashCode();}
    }
    
  2. 注冊與調用
    // 注冊
    tableEnv.createTemporarySystemFunction("HashFunc", HashFunction.class);
    // Table API 調用
    table.select(call("HashFunc", $("field")));
    // SQL 調用
    tableEnv.sqlQuery("SELECT HashFunc(field) FROM t");
    

2. 表值函數(TableFunction)

作用:將單行輸入拆分為多行輸出(如字符串按分隔符拆分)

實現步驟

  1. 繼承TableFunction<T>,通過collect()輸出結果
    @FunctionHint(output = @DataTypeHint("ROW<word STRING, len INT>"))
    public class SplitFunction extends TableFunction<Row> {public void eval(String str) {for (String s : str.split(" ")) {collect(Row.of(s, s.length())); // 輸出每行數據}}
    }
    
  2. 注冊與調用
    tableEnv.createTemporarySystemFunction("SplitFunc", SplitFunction.class);
    // 關聯查詢(LATERAL JOIN)
    tableEnv.sqlQuery("""SELECT t.id, s.word, s.len FROM t, LATERAL TABLE(SplitFunc(t.content)) AS s(word, len)
    """);
    

3. 聚合函數(AggregateFunction)

作用:多行數據聚合為單個值(如自定義平均值、求和邏輯)

實現步驟

  1. 定義累加器(存儲中間結果)
    public class WeightedAvgAccum {public long sum = 0;   // 加權和public int count = 0; // 權重總和
    }
    
  2. 繼承AggregateFunction,實現核心方法
    public class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum> {@Overridepublic WeightedAvgAccum createAccumulator() { return new WeightedAvgAccum(); }// 累加邏輯public void accumulate(WeightedAvgAccum acc, long value, int weight) {acc.sum += value * weight;acc.count += weight;}// 最終結果計算@Overridepublic Long getValue(WeightedAvgAccum acc) {return acc.count == 0 ? null : acc.sum / acc.count;}
    }
    
  3. 注冊與調用
    tableEnv.createTemporarySystemFunction("WeightedAvg", WeightedAvg.class);
    tableEnv.sqlQuery("""SELECT user, WeightedAvg(score, weight) FROM scores GROUP BY user
    """);
    

4. 表值聚合函數(TableAggregateFunction)

作用:多行數據聚合為多行結果(如分組取TopN)

實現步驟

  1. 定義累加器(存儲中間狀態)
    public class Top2Accum {public int first;  // 第一名public int second; // 第二名
    }
    
  2. 繼承TableAggregateFunction,實現核心方法
    public class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accum> {@Overridepublic Top2Accum createAccumulator() {Top2Accum acc = new Top2Accum();acc.first = Integer.MIN_VALUE;acc.second = Integer.MIN_VALUE;return acc;}// 累加邏輯public void accumulate(Top2Accum acc, int value) {if (value > acc.first) {acc.second = acc.first;acc.first = value;} else if (value > acc.second) {acc.second = value;}}// 輸出結果public void emitValue(Top2Accum acc, Collector<Tuple2<Integer, Integer>> out) {out.collect(Tuple2.of(acc.first, 1));out.collect(Tuple2.of(acc.second, 2));}
    }
    
  3. 注冊與調用
    tableEnv.createTemporarySystemFunction("Top2", Top2.class);
    // Table API 調用(SQL暫不支持)
    table.groupBy($("group")).flatAggregate(call(Top2.class, $("value")).as("val", "rank")).select($("group"), $("val"), $("rank"));
    

三、關鍵技巧

  1. 類型注解:復雜類型用@DataTypeHint指定,例如:

    @DataTypeHint("DECIMAL(12, 3)") // 聲明 decimal 精度
    public BigDecimal eval(double a) { ... }
    
  2. 命名參數:通過@ArgumentHint指定參數名,支持 SQL 中按名傳參:

    public String eval(@ArgumentHint(name = "content") String s,@ArgumentHint(name = "begin") int b
    ) { ... }
    // SQL 調用:SELECT func(content => 'abc', begin => 1)
    
  3. 確定性聲明:非確定性函數(如隨機數、當前時間)需重寫:

    @Override
    public boolean isDeterministic() { return false; }
    

四、常見問題

  • 注冊方式:臨時注冊(createTemporarySystemFunction)僅當前會話有效,永久注冊需結合 Catalog
  • 權限控制:UDF 可訪問外部資源(如數據庫連接),需確保執行環境有對應權限
  • 性能優化:聚合函數盡量實現merge()方法,支持兩階段聚合優化

通過上述步驟,可快速實現各類自定義邏輯,擴展 Flink 處理能力。核心是理解不同函數的輸入輸出關系,以及累加器(聚合函數)的設計邏輯。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/88554.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/88554.shtml
英文地址,請注明出處:http://en.pswp.cn/web/88554.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【AI學習】大模型微調實踐

參加了書生?浦語&#xff08;InternLM&#xff09;端側小模型論文分類微調練習打榜賽 具體的實踐教程在&#xff1a; https://aicarrier.feishu.cn/wiki/D7kZw9Nx4iMyDnkpL0Gc5giNn5g 折騰了十多天&#xff0c;各種嘗試&#xff0c;AB榜單終于進入了前十都&#xff0c;累死 …

ElementUI:高效優雅的Vue.js組件庫

Hi&#xff0c;我是布蘭妮甜 &#xff01;在當今快節奏的前端開發領域&#xff0c;選擇一個功能強大、設計優雅且易于使用的UI組件庫至關重要。ElementUI作為基于Vue.js的知名組件庫&#xff0c;憑借其豐富的組件體系、一致的設計語言和出色的開發體驗&#xff0c;已成為眾多企…

Java Stream流介紹及使用指南

背景在Java 8之前&#xff0c;處理集合數據&#xff08;如List, Set, Map&#xff09;通常意味著編寫冗長的、以操作為中心的代碼&#xff1a;創建迭代器、使用for或while循環遍歷元素、在循環體內進行條件判斷和操作、收集結果。這種方式雖然有效&#xff0c;但不夠簡潔、可讀…

JDK 1.7 vs JDK 1.8

JDK版本比較 Java平臺的兩次重大飛躍&#xff1a;JDK 7的穩定優化與JDK 8的革命性創新引言&#xff1a;Java的進化之路Java作為企業級開發的支柱語言&#xff0c;其版本更新直接影響著全球數百萬開發者。JDK 1.7&#xff08;2011年發布&#xff09;和JDK 1.8&#xff08;2014年…

張量與維度

3x4x5的張量&#xff1a; x torch.tensor([[[1, 2, 3, 4, 5], [6, 7, 8, 9, 10], [11, 12, 13, 14, 15], [16, 17, 18, 19, 20]], [[21, 22, 23, 24, 25], …

智慧菜場系統(源碼+文檔+講解+演示)

引言 在數字化浪潮的推動下&#xff0c;傳統菜市場也在尋求創新與變革。智慧菜場系統作為一種新型的菜市場管理工具&#xff0c;通過數字化手段優化菜市場的全流程&#xff0c;提高運營效率&#xff0c;增強消費者體驗&#xff0c;提升市場管理質量。本文將詳細介紹智慧菜場系統…

【GESP】C++一級真題 luogu-B4355 [GESP202506 一級] 值日

GESP C一級&#xff0c;2025年6月真題&#xff0c;基礎運算和循環語句&#xff0c;難度★☆☆☆☆。 題目題解詳見&#xff1a;【GESP】C一級真題 luogu-B4355 [GESP202506 一級] 值日 | OneCoder 【GESP】C一級真題 luogu-B4355 [GESP202506 一級] 值日 | OneCoderGESP C一級…

【Linux應用】Ubuntu20.04 aarch64開發板一鍵安裝ROS2(清華源)

【Linux應用】Ubuntu20.04 aarch64開發板一鍵安裝ROS2&#xff08;清華源&#xff09; 文章目錄相關資料更改UTF8執行更新一鍵安裝ROS2驗證配置環境變量附錄&#xff1a;開發板快速上手&#xff1a;鏡像燒錄、串口shell、外設掛載、WiFi配置、SSH連接、文件交互&#xff08;RAD…

【HDLBits習題 2】Circuit - Sequential Logic(4)More Circuits

1. Rule90&#xff08;Rule 90&#xff09;方法1&#xff1a;module top_module (output reg [511:0] q,input clk,input load,input [511:0] data ); integer i;always (posedge clk) beginif (load 1b1) beginq < data;end else beginfor (i0; i<$bits(q);…

基于mysqlfrm工具解析mysql數據結構文件frm表結構和數據庫版本信息

這里使用Linux系統上操作。win上搞了下 python報錯。所以在這里記錄一下推薦大家使用linux系統操作。 安裝mysql utilswget https://downloads.mysql.com/archives/get/p/30/file/mysql-utilities-1.6.5.tar.gztar -xf mysql-utilities-1.6.5.tar.gzcd mysql-utilities-1.6.5py…

【C++ 深入解析 C++ 模板中的「依賴類型」】

深入解析 C 模板中的「依賴類型」 依賴類型是 C 模板編程中的核心概念&#xff0c;特指那些依賴于模板參數的類型。迭代器是依賴類型的常見例子&#xff0c;但遠不止于此。讓我們全面解析這個重要概念&#xff1a; 依賴類型的本質定義 依賴類型是&#xff1a; 在模板中定義直接…

Telnet遠程連接實驗(Cisco)

Telnet遠程連接實驗&#xff08;Cisco&#xff09; 拓撲圖一并實現DHCP服務、HTTP服務、FTP服務。 二層交換機配置&#xff1a; 交換機Switch0配置&#xff1a; vlan 10vlan 20int f0/1switchport mode accessswitchport access vlan 10int f0/2switchport mode accessswitchpo…

C++:非類型模板參數,模板特化以及模板的分離編譯

目錄 一、前言 二、非類型模板參數 三、模板的特化 3.1 類模板特化 3.11 全特化 3.12 偏特化 3.2 函數模板特化 3.3 注意 四、模板的分離編譯 一、前言 前面的文章梳理了模板初階的一些用法&#xff0c;在后面梳理了STL的一些容器的用法后&#xff0c;下面將用到含有S…

【Qt 學習之路】Qt Android開發環境搭建:Ubuntu的Vmware虛擬機中的踩坑實錄

文章目錄1、簡介2、虛擬機內USB設備識別難題2.1、正確連接手機2.2、打開USB相關配置2.3、打開虛擬機中的手機設備3、Gradle下載速度緩慢之困3.1、下載 Gradle 鏡像3.2、安放鏡像位置3.3、修改項目中的gradle路徑1、簡介 許久未曾使用Qt進行Android開發&#xff0c;今日在Ubunt…

MySQL中使用group_concat遇到的問題及解決

在使用group_concat的過程中遇到個問題&#xff0c;這里記錄一下&#xff1a;在MySQL中有個配置參數group_concat_max_len&#xff0c;它會限制使用group_concat返回的最大字符串長度&#xff0c;默認是1024。 查詢group_concat_max_len大小&#xff1a; show variables like…

高性能小型爬蟲語言與代碼示例

高性能小型爬蟲現在有哪幾種新興語言可以選擇。我看到了很多關于爬蟲框架的信息&#xff0c;特別是使用Go語言和Node.js的框架。Go語言方面有Kaola1和Katana2這兩個框架。Kaola被描述為高性能的Go語言爬蟲框架&#xff0c;輕量級且強大&#xff0c;提供靈活配置選項。 Node.js…

【PTA數據結構 | C語言版】在順序表 list 中查找元素 x

本專欄持續輸出數據結構題目集&#xff0c;歡迎訂閱。 文章目錄題目代碼題目 請編寫程序&#xff0c;將 n 個整數存入順序表&#xff0c;對任一給定整數 x&#xff0c;查找其在順序表中的位置。 輸入格式&#xff1a; 輸入首先在第一行給出正整數 n&#xff08;≤10^4 &#…

claude code-- 基于Claude 4 模型的智能編程工具,重塑你的編程體驗

文章目錄0.前言1.安裝nodejs2.使用指南3.快速上手4.總結0.前言 最近的這個claudecode非常的火&#xff0c;因為可能是這個cursoe定價的一些原因吧&#xff0c;我是聽其他的這個大佬說的&#xff0c;因為這個cursor其實我就是最開始的使用用過一下&#xff0c;現在基本上不使用…

HTTP API 身份認證

互聯網系統通常需要根據用戶身份決定是否有資源的訪問權限&#xff0c;這就需要對用戶進行身份認證&#xff08;Authentication&#xff09;&#xff0c;驗證用戶所聲稱的身份。驗證手段通常是驗證只有用戶知道或擁有的東西&#xff0c;比如密碼、手機號、指紋等。 基于瀏覽器…

Python畢業設計232—基于python+Django+vue的圖書管理系統(源代碼+數據庫)

畢設所有選題&#xff1a; https://blog.csdn.net/2303_76227485/article/details/131104075 基于pythonDjangovue的圖書管理系統(源代碼數據庫)232 一、系統介紹 本項目前后端分離&#xff0c;分為用戶、管理員兩種角色 1、用戶&#xff1a; 注冊、登錄、新聞資訊、圖書信…