一、UDF 核心原理
Flink 自定義函數(UDF)是擴展 Table API/SQL 能力的核心機制,允許將自定義邏輯嵌入查詢。其設計遵循以下原則:
1. 函數類型體系
類型 | 輸入輸出關系 | 核心用途 |
---|---|---|
標量函數(ScalarFunction) | 0~N 個標量 → 1 個標量 | 字段轉換、值計算 |
表值函數(TableFunction) | 0~N 個標量 → 多行多列 | 數據拆分、關聯外部數據 |
聚合函數(AggregateFunction) | 多行標量 → 1 個標量 | 自定義聚合(如加權平均) |
表值聚合函數(TableAggregateFunction) | 多行標量 → 多行多列 | 分組TopN、分桶統計等 |
異步表值函數 | 異步查詢外部系統 → 多行多列 | 高效關聯外部數據庫/API |
2. 類型系統
- 標量/表值函數使用新數據類型系統(基于
DataTypes
) - 聚合函數仍使用舊類型系統(基于
TypeInformation
) - 類型推導:默認通過反射獲取,復雜場景可通過
@DataTypeHint
或@FunctionHint
注解顯式指定
3. 執行邏輯
- 核心是求值方法(如
eval()
、accumulate()
),定義數據處理邏輯 - 生命周期:
open()
初始化 → 求值方法調用 →close()
資源清理 - 確定性:通過
isDeterministic()
聲明是否返回確定結果(影響優化策略)
二、快速上手實戰
1. 標量函數(ScalarFunction)
作用:對輸入標量做轉換計算(如字符串處理、格式轉換)
實現步驟:
- 繼承
ScalarFunction
,實現eval()
方法public class HashFunction extends ScalarFunction {// 輸入任意類型,返回哈希值public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {return o.hashCode();} }
- 注冊與調用
// 注冊 tableEnv.createTemporarySystemFunction("HashFunc", HashFunction.class); // Table API 調用 table.select(call("HashFunc", $("field"))); // SQL 調用 tableEnv.sqlQuery("SELECT HashFunc(field) FROM t");
2. 表值函數(TableFunction)
作用:將單行輸入拆分為多行輸出(如字符串按分隔符拆分)
實現步驟:
- 繼承
TableFunction<T>
,通過collect()
輸出結果@FunctionHint(output = @DataTypeHint("ROW<word STRING, len INT>")) public class SplitFunction extends TableFunction<Row> {public void eval(String str) {for (String s : str.split(" ")) {collect(Row.of(s, s.length())); // 輸出每行數據}} }
- 注冊與調用
tableEnv.createTemporarySystemFunction("SplitFunc", SplitFunction.class); // 關聯查詢(LATERAL JOIN) tableEnv.sqlQuery("""SELECT t.id, s.word, s.len FROM t, LATERAL TABLE(SplitFunc(t.content)) AS s(word, len) """);
3. 聚合函數(AggregateFunction)
作用:多行數據聚合為單個值(如自定義平均值、求和邏輯)
實現步驟:
- 定義累加器(存儲中間結果)
public class WeightedAvgAccum {public long sum = 0; // 加權和public int count = 0; // 權重總和 }
- 繼承
AggregateFunction
,實現核心方法public class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum> {@Overridepublic WeightedAvgAccum createAccumulator() { return new WeightedAvgAccum(); }// 累加邏輯public void accumulate(WeightedAvgAccum acc, long value, int weight) {acc.sum += value * weight;acc.count += weight;}// 最終結果計算@Overridepublic Long getValue(WeightedAvgAccum acc) {return acc.count == 0 ? null : acc.sum / acc.count;} }
- 注冊與調用
tableEnv.createTemporarySystemFunction("WeightedAvg", WeightedAvg.class); tableEnv.sqlQuery("""SELECT user, WeightedAvg(score, weight) FROM scores GROUP BY user """);
4. 表值聚合函數(TableAggregateFunction)
作用:多行數據聚合為多行結果(如分組取TopN)
實現步驟:
- 定義累加器(存儲中間狀態)
public class Top2Accum {public int first; // 第一名public int second; // 第二名 }
- 繼承
TableAggregateFunction
,實現核心方法public class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accum> {@Overridepublic Top2Accum createAccumulator() {Top2Accum acc = new Top2Accum();acc.first = Integer.MIN_VALUE;acc.second = Integer.MIN_VALUE;return acc;}// 累加邏輯public void accumulate(Top2Accum acc, int value) {if (value > acc.first) {acc.second = acc.first;acc.first = value;} else if (value > acc.second) {acc.second = value;}}// 輸出結果public void emitValue(Top2Accum acc, Collector<Tuple2<Integer, Integer>> out) {out.collect(Tuple2.of(acc.first, 1));out.collect(Tuple2.of(acc.second, 2));} }
- 注冊與調用
tableEnv.createTemporarySystemFunction("Top2", Top2.class); // Table API 調用(SQL暫不支持) table.groupBy($("group")).flatAggregate(call(Top2.class, $("value")).as("val", "rank")).select($("group"), $("val"), $("rank"));
三、關鍵技巧
-
類型注解:復雜類型用
@DataTypeHint
指定,例如:@DataTypeHint("DECIMAL(12, 3)") // 聲明 decimal 精度 public BigDecimal eval(double a) { ... }
-
命名參數:通過
@ArgumentHint
指定參數名,支持 SQL 中按名傳參:public String eval(@ArgumentHint(name = "content") String s,@ArgumentHint(name = "begin") int b ) { ... } // SQL 調用:SELECT func(content => 'abc', begin => 1)
-
確定性聲明:非確定性函數(如隨機數、當前時間)需重寫:
@Override public boolean isDeterministic() { return false; }
四、常見問題
- 注冊方式:臨時注冊(
createTemporarySystemFunction
)僅當前會話有效,永久注冊需結合 Catalog - 權限控制:UDF 可訪問外部資源(如數據庫連接),需確保執行環境有對應權限
- 性能優化:聚合函數盡量實現
merge()
方法,支持兩階段聚合優化
通過上述步驟,可快速實現各類自定義邏輯,擴展 Flink 處理能力。核心是理解不同函數的輸入輸出關系,以及累加器(聚合函數)的設計邏輯。