?
目錄
前提條件
背景
概念及適用場景
UDF(User-Defined Function)
概念
適用場景
UDAF(User-Defined Aggregate Function)
概念
適用場景
UDTF(User-Defined Table-Generating Function)
概念
適用場景
案例
UDF案例
UDTF案例
UDAF案例
前提條件
- 安裝好Hive,可參考:openEuler24.03 LTS下安裝Hive3
- 具備Java開發環境:JDK8、Maven3、IDEA
背景
Hive 作為大數據領域常用的數據倉庫工具,提供了豐富的內置函數,但在實際業務場景中,內置函數往往無法滿足復雜的計算需求。這時,Hive 的自定義函數就顯得尤為重要。Hive 支持三種類型的自定義函數:UDF、UDAF 和 UDTF,本文分別介紹它們的概念和適用場景,并給出典型案例。
概念及適用場景
UDF(User-Defined Function)
概念
UDF 是最基本的自定義函數類型,用于實現 "單行進,單行出" 的處理邏輯,即對每行數據中的一個或多個輸入值進行計算,返回一個結果值。
適用場景
- 字符串處理(如格式轉換、編碼轉換)
- 數學計算(如自定義計算公式)
- 日期處理(如自定義日期格式解析)
UDAF(User-Defined Aggregate Function)
概念
UDAF 即用戶定義的聚合函數,用于實現 "多行進,一行出" 的處理邏輯,將一組數據經過計算后返回一個匯總結果,類似于 SQL 中的 SUM、COUNT 等內置聚合函數。
適用場景
- 自定義統計指標(如計算中位數、眾數)
- 復雜數據聚合(如分組拼接字符串)
- 多階段聚合計算
UDTF(User-Defined Table-Generating Function)
概念
UDTF 是用戶定義的表生成函數,實現 "單行進,多行出" 的處理邏輯,將一行數據擴展為多行或多列數據。
適用場景
- 字符串拆分(如將逗號分隔的字符串拆分為多行)
- 數組或集合展開(如將 JSON 數組展開為多行記錄)
- 復雜數據結構解析(如解析嵌套 JSON)
案例
UDF案例
需求:
自定義一個UDF實現計算給定基本數據類型的長度,效果如下:
hive(default)> select my_len("abcd");4
1)使用IDEA創建一個Maven工程Hive,工程名稱例如:udf
2)添加依賴
<dependencies><dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>3.1.3</version></dependency>
</dependencies>
添加依賴后,刷新依賴,如下
3)創建包、創建類
創建包:在src/main/java下創建org.exapmle.hive.udf包
創建類:MyUDF.java
package org.example.hive.udf;import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;/*** 我們需計算一個要給定基本數據類型的長度*/
public class MyUDF extends GenericUDF {/*** 判斷傳進來的參數的類型和長度* 約定返回的數據類型*/@Overridepublic ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {if (arguments.length !=1) {throw new UDFArgumentLengthException("please give me only one arg");}if (!arguments[0].getCategory().equals(ObjectInspector.Category.PRIMITIVE)){throw new UDFArgumentTypeException(1, "i need primitive type arg");}return PrimitiveObjectInspectorFactory.javaIntObjectInspector;}/*** 解決具體邏輯的*/@Overridepublic Object evaluate(DeferredObject[] arguments) throws HiveException {Object o = arguments[0].get();if(o==null){return 0;}return o.toString().length();}/*** 用于獲取解釋的字符串*/@Overridepublic String getDisplayString(String[] children) {return "";}
}
?4)創建臨時函數
1)打成jar包上傳到Linux /opt/module/hive/datas/myudf.jar
點擊右側的Maven,點開Lifecycle,按Ctrl鍵不放,同時選中clean和package,點擊箭頭指向的三角形圖標運行
?看到BUILD SUCCESS說明打包成功,同時看到jar包所在路徑,如下
將jar包上傳到Linux合適目錄下,例如:/home/liang/testjar
[liang@node2 testjar]$ ls
udf-1.0-SNAPSHOT.jar
(2)將jar包添加到hive的classpath,臨時生效
hive (default)> add jar /home/liang/testjar/udf-1.0-SNAPSHOT.jar;
?(3)創建臨時函數與開發好的java class關聯
hive (default)> create temporary function my_len as "org.exapmle.hive.udf.MyUDF";
注意:創建臨時函數,此時只是在當前會話生效,關閉會話,臨時函數被刪除。如果需要能在其他會話能看到,且關閉會話后,不刪除自定義函數,則需要創建永久函數。
(4)查詢函數
hive (default)> show functions;
...
months_between
murmur_hash
my_len
named_struct
negative
...
Time taken: 0.024 seconds, Fetched: 291 row(s)
看到my_len函數,說明可以使用自定義函數了。?
(5)使用自定義的臨時函數
hive (default)> select my_len("abcd");
結果為
4
(6)刪除臨時函數
使用如下語句或者關閉會話(退出Hive命令行)刪除臨時函數。
hive (default)> drop temporary function my_len;
5)創建永久函數
(1)創建永久函數
把jar包上傳到hdfs
[liang@node2 ~]$ hdfs dfs -put /home/liang/testjar/udf-1.0-SNAPSHOT.jar /
創建永久函數
hive (default)>
create function my_len2 as "org.exapmle.hive.udf.MyUDF" using jar "hdfs://node2:8020/udf-1.0-SNAPSHOT.jar";
操作過程
hive (default)> create function my_len2 as "org.exapmle.hive.udf.MyUDF" using jar "hdfs://node2:8020/udf-1.0-SNAPSHOT.jar";
Added [/tmp/944c050b-e360-48f1-b7b6-93f8fd7e2644_resources/udf-1.0-SNAPSHOT.jar] to class path
Added resources: [hdfs://node2:8020/udf-1.0-SNAPSHOT.jar]
OK
Time taken: 0.212 seconds
查看函數
hive (default)> show functions;
...
dayofweek
decode
default.my_len2
degrees
dense_rank
...
Time taken: 0.019 seconds, Fetched: 291 row(s)
看到永久函數名為庫名.函數名。
注意:永久函數創建的時候,在函數名之前需要自己加上庫名,如果不指定庫名的話,會默認把當前庫的庫名給加上。
退出hive命令行會話,重新進入hive命令行,再次查看函數,還可以看到default.my_len2
hive (default)> show functions;
...
dayofweek
decode
default.my_len2
degrees
dense_rank
...
Time taken: 0.019 seconds, Fetched: 291 row(s)
使用永久函數
hive (default)> select my_len2("abcd");
結果為
4
(3)刪除永久函數
hive (default)> drop function my_len2;
注意:永久函數使用的時候,在其他庫里面使用的話加上,庫名.函數名。
UDTF案例
需求:
將字符串按分隔符分割為多行,例如:將a,b,c
按照,
進行分隔,得到三行
a
b
c
代碼
package org.exapmle.hive.udtf;import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector;
import org.apache.hadoop.io.Text;import java.util.Arrays;
import java.util.List;@Description(name = "explode_string",value = "將字符串按分隔符分割為多行",extended = "SELECT explode_string('a,b,c', ',') FROM table_name;"
)
public class ExplodeWordsUDTF extends GenericUDTF {private WritableStringObjectInspector inputOI;private WritableStringObjectInspector separatorOI;private final Object[] forwardObj = new Object[1];private final Text outputText = new Text();@Overridepublic StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {// 獲取輸入參數的ObjectInspectorList<? extends StructField> inputFields = argOIs.getAllStructFieldRefs();// 檢查參數數量if (inputFields.size() != 2) {throw new UDFArgumentLengthException("explode_string需要兩個參數: 字符串和分隔符");}// 檢查參數類型ObjectInspector firstOI = inputFields.get(0).getFieldObjectInspector();ObjectInspector secondOI = inputFields.get(1).getFieldObjectInspector();if (!(firstOI instanceof WritableStringObjectInspector) || !(secondOI instanceof WritableStringObjectInspector)) {throw new UDFArgumentLengthException("參數必須是字符串類型");}inputOI = (WritableStringObjectInspector) firstOI;separatorOI = (WritableStringObjectInspector) secondOI;// 定義輸出結構List<String> fieldNames = Arrays.asList("element");List<ObjectInspector> fieldOIs = Arrays.asList(PrimitiveObjectInspectorFactory.writableStringObjectInspector);return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);}@Overridepublic void process(Object[] args) throws HiveException {if (args[0] == null) {return;}Text input = inputOI.getPrimitiveWritableObject(args[0]);Text sep = separatorOI.getPrimitiveWritableObject(args[1]);String separator = sep != null ? sep.toString() : ",";String inputStr = input.toString();// 處理空字符串if (inputStr.isEmpty()) {outputText.set("");forwardObj[0] = outputText;forward(forwardObj);return;}// 使用正則表達式分隔字符串String[] elements = inputStr.split(separator, -1);for (String element : elements) {outputText.set(element);forwardObj[0] = outputText;forward(forwardObj);}}@Overridepublic void close() throws HiveException {}
}
打jar包,上傳到Linux
注冊與使用
hive (default)> ADD JAR /home/liang/testjar/udf-1.0-SNAPSHOT.jar;
Added [/home/liang/testjar/udf-1.0-SNAPSHOT.jar] to class path
Added resources: [/home/liang/testjar/udf-1.0-SNAPSHOT.jar]hive (default)> CREATE TEMPORARY FUNCTION explode_string AS 'org.exapmle.hive.udtf.ExplodeStringUDTF';
OK
Time taken: 0.362 secondshive (default)> SELECT explode_string('a,b,c', ',');
OK
element
a
b
c
Time taken: 2.844 seconds, Fetched: 3 row(s)hive (default)> SELECT explode_string('hello,world', ',');
OK
element
hello
world
Time taken: 0.209 seconds, Fetched: 2 row(s)
UDAF案例
需求:
計算加權平均值,加權平均數=(Σ(數值×權重))/Σ權重
例如:
計算學生綜合成績時,若數學(學分4分,成績90)和語文(學分3分,成績80),其中數值為成績,權重為學分,則加權平均成績為 (4×90+3×80)/(4+3)≈85.71(4×90+3×80)/(4+3)≈85.71分。
代碼
package org.exapmle.hive.udaf;import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;@Description(name = "weighted_avg",value = "計算加權平均值",extended = "SELECT weighted_avg(score, credit) FROM grades GROUP BY student_id;"
)
public class WeightedAverageUDAF extends UDAF {public static class WeightedAverageEvaluator implements UDAFEvaluator {// 存儲中間結果private double sumWeightedValues;private double sumWeights;private boolean empty;@Overridepublic void init() {sumWeightedValues = 0;sumWeights = 0;empty = true;}// 處理輸入行public boolean iterate(DoubleWritable value, DoubleWritable weight) {if (value == null || weight == null || weight.get() <= 0) {return true;}sumWeightedValues += value.get() * weight.get();sumWeights += weight.get();empty = false;return true;}// 存儲部分結果的類public static class PartialResult implements Writable {double sumWeightedValues;double sumWeights;@Overridepublic void write(DataOutput out) throws IOException {out.writeDouble(sumWeightedValues);out.writeDouble(sumWeights);}@Overridepublic void readFields(DataInput in) throws IOException {sumWeightedValues = in.readDouble();sumWeights = in.readDouble();}}// 返回部分結果public PartialResult terminatePartial() {if (empty) {return null;}PartialResult result = new PartialResult();result.sumWeightedValues = sumWeightedValues;result.sumWeights = sumWeights;return result;}// 合并部分結果public boolean merge(PartialResult other) {if (other == null) {return true;}sumWeightedValues += other.sumWeightedValues;sumWeights += other.sumWeights;empty = false;return true;}// 返回最終結果public DoubleWritable terminate() {if (empty || sumWeights <= 0) {return null;}return new DoubleWritable(sumWeightedValues / sumWeights);}}
}
打jar包,上傳到Linux
注冊
hive (default)> ADD JAR /home/liang/testjar/udf-1.0-SNAPSHOT.jar;
Added [/home/liang/testjar/udf-1.0-SNAPSHOT.jar] to class path
Added resources: [/home/liang/testjar/udf-1.0-SNAPSHOT.jar]hive (default)> CREATE TEMPORARY FUNCTION weighted_avg AS 'org.exapmle.hive.udaf.WeightedAverageUDAF';
OK
Time taken: 0.043 seconds
使用
WITH grades AS (SELECT 1 AS student_id, 'Math' AS course, 90 AS score, 4 AS creditUNION ALLSELECT 1, 'English', 85, 3UNION ALLSELECT 2, 'Math', 88, 4UNION ALLSELECT 2, 'English', 92, 3
)
SELECT student_id,weighted_avg(score, credit) AS gpa
FROM grades
GROUP BY student_id;
操作過程
hive (default)> WITH grades AS (> SELECT 1 AS student_id, 'Math' AS course, 90 AS score, 4 AS credit> UNION ALL> SELECT 1, 'English', 85, 3> UNION ALL> SELECT 2, 'Math', 88, 4> UNION ALL> SELECT 2, 'English', 92, 3> )> SELECT> student_id,> weighted_avg(score, credit) AS gpa> FROM grades> GROUP BY student_id;
Query ID = liang_20250521165046_6335eb21-2d92-4ae1-b30c-511bcb9a98ab
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:set mapreduce.job.reduces=<number>
Starting Job = job_1747808931389_0001, Tracking URL = http://node3:8088/proxy/application_1747808931389_0001/
Kill Command = /opt/module/hadoop-3.3.4/bin/mapred job -kill job_1747808931389_0001
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2025-05-21 16:51:04,388 Stage-1 map = 0%, reduce = 0%
2025-05-21 16:51:12,756 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 3.89 sec
2025-05-21 16:51:28,486 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 11.36 sec
MapReduce Total cumulative CPU time: 11 seconds 360 msec
Ended Job = job_1747808931389_0001
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 11.36 sec HDFS Read: 12918 HDFS Write: 151 SUCCESS
Total MapReduce CPU Time Spent: 11 seconds 360 msec
OK
student_id gpa
1 87.85714285714286
2 89.71428571428571
Time taken: 43.272 seconds, Fetched: 2 row(s)
更多Hive自定義函數用法,請參考:Hive官方文檔
完成!enjoy it!