在上一篇文章中,我們對第一種用戶定義函數(UDF)進行了基礎介紹。接下來,本文將帶您深入了解剩余的兩種UDF函數類型。
文章目錄
- 1. UDAF
- 1.1 簡單UDAF
- 1.2 通用UDAF
- 2. UDTF
- 3. 總結
1. UDAF
1.1 簡單UDAF
第一種方式是 Simple(簡單) 方式,即繼承 org.apache.hadoop.hive.ql.exec.UDAF 類,并在派生類中以靜態內部類的方式實現 org.apache.hadoop.hive.ql.exec.UDAFEvaluator 接口。這個計算類將負責執行具體的聚合邏輯,具體步驟如下:
a)初始化(init):首先,我們需要實現UDAFEvaluator接口的init方法,用于初始化聚合過程中所需的任何資源或狀態。
b)迭代(iterate):接下來,iterate方法將被用來處理傳入的數據。此方法將逐個接收數據項,并更新聚合狀態。它返回一個布爾值,指示是否繼續迭代或停止。
c)部分終止(terminatePartial):在迭代完成后,terminatePartial方法將被調用。它的作用類似于Hadoop中的Combiner,用于返回一個中間聚合結果,以便在多個任務之間進行合并。
d)合并(merge):merge方法用于接收來自terminatePartial的中間結果,并將其合并以形成更接近最終結果的聚合狀態。此方法同樣返回一個布爾值,指示合并操作是否成功。
e)最終終止(terminate):最后,terminate方法將被用來生成并返回聚合操作的最終結果。
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.io.IntWritable;// 自定義的UDAF類,用于計算最大值
public class MyMaxUDAF extends UDAF {// 實現UDAFEvaluator接口的靜態內部類static public class MaxIntEvaluator implements UDAFEvaluator {// 存放當前聚合操作過程中的最大值private int mMax;// 用于標記聚合數據集是否為空private boolean mEmpty;// 構造方法,用于執行初始化操作public MaxIntEvaluator() {super();init();}// 初始化方法,用于重置聚合狀態public void init() {// 初始化最大值為0mMax = 0;// 初始化聚合數據集為空mEmpty = true;}// 迭代處理每一行數據。每次調用處理一行記錄public boolean iterate(IntWritable o) {// 檢查傳入的數據是否為nullif (o != null) {// 如果當前聚合數據集為空,則直接將當前值設置為最大值if (mEmpty) {mMax = o.get();mEmpty = false; // 更新狀態,標記聚合數據集不再為空} else {// 聚合數據集不為空時,用當前值和之前的最大值比較,保留較大的那個mMax = Math.max(mMax, o.get());}}return true;}// 輸出Map階段處理結果的方法,返回當前的最大值public IntWritable terminatePartial() {// 如果聚合數據集為空,則返回null;否則,返回當前的最大值return mEmpty ? null : new IntWritable(mMax);}// Combine/Reduce階段,合并處理結果public boolean merge(IntWritable o) {// 通過調用iterate方法進行合并操作return iterate(o);}// 返回最終的聚集函數結果public IntWritable terminate() {// 如果聚合數據集為空,則返回null;否則,返回最終的最大值return mEmpty ? null : new IntWritable(mMax);}}
}
1.2 通用UDAF
編寫簡單的UDAF(用戶定義聚合函數)相對容易,但這種方法由于依賴Java的反射機制,可能會犧牲一些性能,并且它不支持變長參數等高級特性。相比之下,通用UDAF(Generic UDAF)提供了這些高級特性的支持,雖然它的編寫可能不如簡單UDAF那樣直接明了。
Hive社區推崇使用通用UDAF作為最佳實踐,建議采用新的抽象類org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver
來替代舊的UDAF接口,并推薦使用org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator
抽象類來替換舊的UDAFEvaluator
接口。這種新方法不僅提升了性能,還增加了靈活性,使得UDAF的功能更加強大和多樣化。
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.io.IntWritable;// 通過繼承AbstractGenericUDAFResolver并使用Description注解來定義一個新的UDAF。
@Description(name = "max_int", value = "_FUNC_(int) - Returns the maximum value of the column")
public class MyMaxUDAF2 extends AbstractGenericUDAFResolver {// 聚合函數的求值器內部類,繼承自GenericUDAFEvaluator。public static class MaxIntEvaluator extends GenericUDAFEvaluator {// 用于存儲輸入參數的ObjectInspector。private PrimitiveObjectInspector inputOI;// 用于存儲聚合結果。private IntWritable result;// 初始化方法,用于設置聚合函數的參數和返回類型。@Overridepublic ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {super.init(m, parameters);// 確認參數是原始類型并初始化inputOI。inputOI = (PrimitiveObjectInspector) parameters[0];// 設置聚合函數的返回類型為可寫的整型。return PrimitiveObjectInspectorFactory.writableIntObjectInspector;}// 創建聚合緩沖區對象的方法。@Overridepublic AggregationBuffer getNewAggregationBuffer() throws HiveException {MaxAggBuffer buffer = new MaxAggBuffer();reset(buffer);return buffer;}// 重置聚合緩沖區對象的方法。@Overridepublic void reset(AggregationBuffer agg) throws HiveException {((MaxAggBuffer) agg).setValue(Integer.MIN_VALUE);}// 迭代方法,用于處理每一行數據。@Overridepublic void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {if (parameters[0] != null) {MaxAggBuffer myagg = (MaxAggBuffer) agg;// 從參數中獲取整數值并更新聚合緩沖區中的最大值。int value = PrimitiveObjectInspectorUtils.getInt(parameters[0], inputOI);if (value > myagg.value) {myagg.setValue(value);}}}// 終止部分聚合的方法,通常返回最終聚合結果。@Overridepublic Object terminatePartial(AggregationBuffer agg) throws HiveException {return terminate(agg);}// 合并部分聚合結果的方法。@Overridepublic void merge(AggregationBuffer agg, Object partial) throws HiveException {if (partial != null) {MaxAggBuffer myagg = (MaxAggBuffer) agg;// 從部分聚合結果中獲取整數值并更新聚合緩沖區中的最大值。int partialValue = PrimitiveObjectInspectorUtils.getInt(partial, inputOI);if (partialValue > myagg.value) {myagg.setValue(partialValue);}}}// 終止方法,用于返回最終聚合結果。@Overridepublic Object terminate(AggregationBuffer agg) throws HiveException {MaxAggBuffer myagg = (MaxAggBuffer) agg;// 創建IntWritable對象并設置聚合結果,然后返回。result = new IntWritable(myagg.value);return result;}// 聚合緩沖區對象的內部類定義,用于存儲聚合過程中的中間狀態。static class MaxAggBuffer implements AggregationBuffer {int value; // 聚合緩沖區中的值// 設置聚合緩沖區中的值void setValue(int val) { value = val; }}}
}
特性/UDAF類型 | 簡單UDAF | 通用UDAF |
---|---|---|
性能 | 依賴反射,性能較低 | 不依賴反射,性能較高 |
參數靈活性 | 不支持變長參數 | 支持變長參數 |
易用性 | 編寫簡單直觀 | 編寫復雜,功能強大 |
推薦使用 | 適合簡單聚合操作 | 適合復雜聚合邏輯和高性能需求 |
接口和抽象類 | 舊的UDAF接口和UDAFEvaluator | 新的AbstractGenericUDAFResolver 和GenericUDAFEvaluator |
功能特性 | 功能有限,實現常見聚合 | 支持復雜迭代邏輯和自定義終止邏輯 |
應用場景 | - 快速開發和原型設計 - 實現基本聚合操作,如求和、平均值 - 對性能要求不高的小型項目 | - 實現復雜的數據分析和處理 - 大數據量處理,需要高性能 - 需要變長參數支持的復雜查詢 - 高級功能實現,如窗口函數、復雜的分組聚合 |
選擇UDAF類型時應根據實際需求和上述特性來決定,以確保既能滿足功能需求,又能獲得較好的性能表現。
2. UDTF
-
繼承GenericUDTF類的步驟:
開發自定義的表生成函數(UDTF)時,首先需要繼承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF這個抽象類,它為UDTF提供了一個通用的實現框架。 -
實現
initialize()、process()和close()
方法:
為了完成自定義UDTF的功能,需要實現三個核心方法:initialize()用于初始化UDTF,process()用于處理輸入數據并生成輸出,close()用于執行清理操作。-
initialize()
方法的調用與作用:在UDTF的執行過程中,initialize()方法是首先被調用的。它負責初始化UDTF的狀態,并返回關于UDTF返回行的信息,包括返回行的個數和類型。process()
方法的執行:initialize()方法執行完成后,接下來會調用process()方法。該方法是UDTF的核心,負責對輸入參數進行處理。在process()方法中,可以通過調用forward()方法將處理結果逐行返回。close()
方法的清理作用:在UDTF的所有處理工作完成后,最終會調用close()方法。這個方法用于執行必要的清理工作,如釋放資源或關閉文件句柄等,確保UDTF在結束時不會留下任何未處理的事務。
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;import java.util.ArrayList;
import java.util.List;/*** 自定義一個UDTF,實現將一個由任意分割符分隔的字符串切割成獨立的單詞。**/
public class LineToWordUDTF extends GenericUDTF {// 用于存儲輸出單詞的集合private ArrayList<String> outList = new ArrayList<String>();/*** initialize方法:當GenericUDTF函數初始化時被調用一次,用于執行一些初始化操作。* 包括:* 1. 判斷函數參數個數* 2. 判斷函數參數類型* 3. 確定函數返回值類型*/@Overridepublic StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {// 1. 定義輸出數據的列名和類型List<String> fieldNames = new ArrayList<String>();List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();// 2. 添加輸出數據的列名和類型fieldNames.add("lineToWord"); // 輸出列名fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); // 輸出列類型// 返回輸出數據的ObjectInspectorreturn ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);}/*** process方法:自定義UDTF的核心邏輯實現方法。* 代碼實現步驟可以分為三部分:* 1. 參數接收* 2. 自定義UDTF核心邏輯* 3. 輸出結果*/@Overridepublic void process(Object[] objects) throws HiveException {// 1. 獲取原始數據String arg = objects[0].toString(); // 假設第一個參數為要分割的字符串// 2. 獲取數據傳入的第二個參數,此處為分隔符String splitKey = objects[1].toString(); // 假設第二個參數為分隔符// 3. 將原始數據按照傳入的分隔符進行切分String[] fields = arg.split(splitKey); // 分割字符串// 4. 遍歷切分后的結果,并寫出for(String field : fields) {// 集合為復用的,首先清空集合outList.clear();// 將每個單詞添加至集合outList.add(field);// 將集合內容通過forward方法寫出,這里假設forward方法可以處理集合forward(outList);}}/*** close方法:當沒有其他輸入行時,調用該函數。* 可以進行一些資源關閉處理等最終處理。*/@Overridepublic void close() throws HiveException {// 資源清理邏輯,當前示例中無具體實現}}
3. 總結
本文我們詳細解析了UDAF和UDTF在Hive中的應用。通過實際代碼示例,我們展示了UDAF如何幫助我們深入分析數據,以及UDTF如何簡化復雜的數據轉換任務。
感謝您的閱讀和支持。如果您對UDAF、UDTF或Hive的其他高級功能有疑問,或者想要更深入地討論,歡迎在文章下留言或直接聯系我們。期待我們的下一次分享,一起在大數據的世界里探索新知。
再次感謝,希望您喜歡這次的分享。我們下次見!