一.自定義函數
1.Hive自帶了一些函數,比如:max/min等,但是數量有限,自己可以通過自定義UDF來方便的擴展。
2.當Hive提供的內置函數無法滿足你的業務處理需要時,此時就可以考慮使用用戶自定義函數。
3.根據用戶自定義函數類別分為以下三種:
(1)UDF(User-Defined-Function) 一進一出。
(2)UDAF(User-Defined Aggregation Function) 用戶自定義聚合函數,多進一出 。
(3)UDTF(User-Defined Table-Generating Functions) 用戶自定義表生成函數,一進多出。
4.編程步驟
(1)繼承Hive提供的類
org.apache.hadoop.hive.ql.udf.generic.GenericUDF
org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator
(2)實現類中的抽象方法
(3)在hive的命令行窗口創建函數
(4) 創建臨時函數
需要把jar包上傳到服務器上面
添加jar。
add jar linux_jar_path
創建function
create temporary function dbname.function_name AS class_name;
刪除函數
drop temporary function if exists dbname.function_name;
(5)創建永久函數
需要把jar包上傳到hdfs上面,創建函數時jar包的位置使用hdfs的地址。
創建function
create function if exists my_udtf as "com.zxl.hive.udf.ExplodeJSONArray" using jar "hdfs://flinkv1:8020/my_function/hive_udtf_funtion.jar";
刪除函數
drop function if exists my_udtf ;
注意:永久函數跟會話沒有關系,創建函數的會話斷了以后,其他會話也可以使用。 永久函數創建的時候,在函數名之前需要自己加上庫名,如果不指定庫名的話,會默認把當前庫的庫名給加上。 永久函數使用的時候,需要在指定的庫里面操作,或者在其他庫里面使用的話加上,庫名.函數名。
二.UDF
官方案例:https://cwiki.apache.org/confluence/display/Hive/HivePlugins#HivePlugins-CreatingCustomUDFs
計算給定基本數據類型的長度
package com.zxl.hive.udf;import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;public class MyUDF extends GenericUDF {/** 判斷傳進來的參數的類型和長度* 約定返回的數據類型* */@Overridepublic ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {//判斷傳進來的參數的長度if (arguments.length !=1) {throw new UDFArgumentLengthException("please give me only one arg");}//判斷傳進來的參數的類型if (!arguments[0].getCategory().equals(ObjectInspector.Category.PRIMITIVE)){throw new UDFArgumentTypeException(1, "i need primitive type arg");}// 約定返回的數據類型return PrimitiveObjectInspectorFactory.javaIntObjectInspector;}/** 具體解決邏輯* */@Overridepublic Object evaluate(DeferredObject[] arguments) throws HiveException {Object o = arguments[0].get();if(o==null){return 0;}return o.toString().length();}/** 用于獲取解釋的字符串* */@Overridepublic String getDisplayString(String[] strings) {return "";}
}
(1)創建永久函數
注意:因為add jar本身也是臨時生效,所以在創建永久函數的時候,需要制定路徑(并且因為元數據的原因,這個路徑還得是HDFS上的路徑)。
三.UDTF
官網案例:https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF
執行步驟
要實現UDTF,我們需要繼承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF,實現initialize, process, close三個方法。
UDTF首先會調用initialize方法,此方法返回UDTF的返回行的信息,返回個數,類型;
初始化完成后,會調用process方法,真正的處理過程在process函數中,在process中,每一次forward()調用產生一行;如果產生多列可以將多個列的值放在一個數組中,然后將該數組傳入到forward()函數;
最后close()方法調用,對需要清理的方法進行清理。
關于HIVE的UDTF自定義函數使用的更多詳細內容請看:
轉載原文鏈接:https://blog.csdn.net/lidongmeng0213/article/details/110877351
下面是json日志解析案例:
package com.zxl.hive.udf;import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;import java.util.ArrayList;
import java.util.List;public class ExplodeJSONArray extends GenericUDTF {/*** 初始化方法,里面要做三件事* 1.約束函數傳入參數的個數* 2.約束函數傳入參數的類型* 3.約束函數返回值的類型*/@Overridepublic StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {// TODO: 2023/12/6 返回結構體類型。udtf函數,有可能炸開之后形成多列。所以用返回結構體來封裝。屬性名:屬性值。屬性名就是列名;屬性值就是列的類型。//用結構體,來約束函數傳入參數的個數//List<? extends StructField> allStructFieldRefs = argOIs.getAllStructFieldRefs(); --見名知意,獲取結構體所有屬性的引用 可以看見返回值是個list類型.if(argOIs.getAllStructFieldRefs().size()!=1){ //只要個數不等于1,就拋出異常throw new UDFArgumentLengthException("explode_json_array()函數的參數個數只能為1");}//2.約束函數傳入參數的類型// StructField structField = argOIs.getAllStructFieldRefs().get(0);//只能有一個參數,所以index給0 可以看見,是獲得結構體的屬性//ObjectInspector fieldObjectInspector = argOIs.getAllStructFieldRefs().get(0).getFieldObjectInspector();//獲得屬性的對象檢測器 。通過檢查器我們才能知道是什么類型.String typeName = argOIs.getAllStructFieldRefs().get(0).getFieldObjectInspector().getTypeName();//我們要確保傳入的類型是stringif(!"string".equals(typeName)){throw new UDFArgumentTypeException(0,"explode_json_array函數的第1個參數的類型只能為String."); //拋出異常}//3.約束函數返回值的類型List<String> fieldNames = new ArrayList<>(); //② 表示我建立了一個String類型的集合。表示存儲的列名List<ObjectInspector> fieldOIs = new ArrayList<>(); //②fieldNames.add("item"); //炸裂之后有個列名,如果不重新as,那這個item就是列名fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); //表示item這一列是什么類型.基本數據類型工廠類,獲取了個string類型的檢查器//用一個工廠類獲取StructObjectInspector類型。return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs);//①獲取標準結構體檢查器。fieldNames,fieldOI是兩個變量名}//這里是實現主邏輯的方法。首先分析下需求:把json array字符串變成一個json字符串@Overridepublic void process(Object[] args) throws HiveException {//1 獲取函數傳入的jsonarray字符串String jsonArrayStr = args[0].toString(); //我要把jsonArrayStr字符串劃分為一個一個的json,通過字符串這種類型是不好劃分的。不知道如何split切分//2 將jsonArray字符串轉換成jsonArray數組。正常情況下我們要引入依賴,比如fastjson啥的。JSONArray jsonArray = new JSONArray(jsonArrayStr); //通過JSONArray這種類型,我們就比較容易獲得一條條的json字符串//3 得到jsonArray里面的一個個json,并把他們寫出。將actions里面的一個個action寫出for (int i = 0; i < jsonArray.length(); i++) { //普通for循環進行遍歷String jsonStr = jsonArray.getString(i);//前面定義了,要返回String//forward是最后收集數據返回的方法forward(jsonStr);}}@Overridepublic void close() throws HiveException {}
}
注意:UDTF函數不能和其他字段同時出現在select語句中,負責SQL會執行失敗
三.UDAF
官網案例:https://cwiki.apache.org/confluence/display/Hive/GenericUDAFCaseStudy#GenericUDAFCaseStudy-Writingtheresolver
執行步驟:
編寫自定義函數需要創建三個類:
1.繼承 AbstractGenericUDAFResolver重寫 getEvaluator方法,對傳入的值進行判斷。
2.創建數據緩存區,創建一些變量來進行調用賦值,作為中間值,類似于flink的checkpoints。
3.繼承GenericUDAFEvaluator類重寫方法即可,實現具體邏輯的類。
參考文章:
UDAF重要的類及原理分析(UDAF繼承類的各個方法的用法)
原文鏈接:https://blog.csdn.net/lidongmeng0213/article/details/110869457
Hive之ObjectInspector詳解(UDAF中用到的類型詳解)
原文鏈接:https://blog.csdn.net/weixin_42167895/article/details/108314139
一個類似于SUM的自定義函數:
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;// TODO: 2023/12/9 繼承 AbstractGenericUDAFResolver重寫 getEvaluator方法
public class FieldSum extends AbstractGenericUDAFResolver {@Overridepublic GenericUDAFEvaluator getEvaluator(TypeInfo[] info) throws SemanticException {// TODO: 2023/12/9 判斷傳入的參數是否為一個if (info.length != 1) {throw new UDFArgumentLengthException("只能傳入一個參數, 但是現在有 " + info.length + "個參數!");}/*TypeInfoUtils是一個Java類,它提供了一些用于處理Hive數據類型的實用方法。以下是TypeInfoUtils類中的一些方法及其功能:getTypeInfoFromTypeString(String typeString) - 將類型字符串轉換為Hive數據類型信息對象。getStandardJavaObjectInspectorFromTypeInfo(TypeInfo typeInfo) - 從Hive數據類型信息對象中獲取標準Java對象檢查器。isExactNumericType(PrimitiveTypeInfo typeInfo) - 檢查給定的Hive原始數據類型是否為精確數值類型。getCategoryFromTypeString(String typeString) - 從類型字符串中獲取Hive數據類型的類別。getPrimitiveTypeInfoFromPrimitiveWritable(Class<? extends Writable> writableClass) -從Hadoop Writable類中獲取Hive原始數據類型信息對象。*/ObjectInspector objectInspector = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(info[0]);// TODO: 2023/12/9 判斷是不是標準的java Object的primitive類型if (objectInspector.getCategory() != ObjectInspector.Category.PRIMITIVE) {throw new UDFArgumentTypeException(0, "Argument type must be PRIMARY. but " +objectInspector.getCategory().name() + " was passed!");}// 如果是標準的java Object的primitive類型,說明可以進行類型轉換PrimitiveObjectInspector inputOI = (PrimitiveObjectInspector) objectInspector;// 如果是標準的java Object的primitive類型,判斷是不是INT類型,因為參數只接受INT類型if (inputOI.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.INT) {throw new UDFArgumentTypeException(0, "Argument type must be INT, but " +inputOI.getPrimitiveCategory().name() + " was passed!");}return new FieldSumUDAFEvaluator();}@Overridepublic GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {return super.getEvaluator(info);}
}
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;// TODO: 2023/12/9 創建數據緩存區,創建一些變量來進行調用賦值,作為中間值,類似于flink的checkpoints。
public class FieldSumBuffer extends GenericUDAFEvaluator.AbstractAggregationBuffer {Integer num = 0;// TODO: 2023/12/9 實現變量的get,set方法方便后面賦值,取值public Integer getNum() {return num;}public void setNum(int num) {this.num = num;}// TODO: 2023/12/9 創建累加的方法,方便對變量進行累加public Integer addNum(int aum) {num += aum;return num;}
}
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;// TODO: 2023/12/9 實現具體邏輯的地方 直接繼承GenericUDAFEvaluator類重寫方法即可
public class FieldSumUDAFEvaluator extends GenericUDAFEvaluator {// TODO: 2023/12/9 初始輸入的變量 PrimitiveObjectInspector是Hadoop里面原始數據類別PrimitiveObjectInspector inputNum;PrimitiveObjectInspector middleNum;// TODO: 2023/12/9 最終輸出的變量ObjectInspector outputNum;// TODO: 2023/12/9 最終統計值的變量int sumNum;// TODO: 2023/12/7 Model代表了UDAF在mapreduce的各個階段。//* PARTIAL1: 這個是mapreduce的map階段:從原始數據到部分數據聚合//* 將會調用iterate()和terminatePartial()//* PARTIAL2: 這個是mapreduce的map端的Combiner階段,負責在map端合并map的數據::從部分數據聚合到部分數據聚合://* 將會調用merge() 和 terminatePartial()//* FINAL: mapreduce的reduce階段:從部分數據的聚合到完全聚合//* 將會調用merge()和terminate()//* COMPLETE: 如果出現了這個階段,表示mapreduce只有map,沒有reduce,所以map端就直接出結果了:從原始數據直接到完全聚合//* 將會調用 iterate()和terminate()// TODO: 2023/12/7 確定各個階段輸入輸出參數的數據格式ObjectInspectors@Overridepublic ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {super.init(m, parameters);// TODO: 2023/12/9 COMPLETE或者PARTIAL1,輸入的都是數據庫的原始數據所以要確定輸入的數據格式if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) {inputNum = (PrimitiveObjectInspector) parameters[0];} else {middleNum = (PrimitiveObjectInspector) parameters[0];}// TODO: 2023/12/9 ObjectInspectorFactory是創建新的ObjectInspector實例的主要方法:一般用于創建集合數據類型。輸出的類型是Integer類型,java類型outputNum = ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,ObjectInspectorFactory.ObjectInspectorOptions.JAVA);return outputNum;}// TODO: 2023/12/9 保存數據聚集結果的類@Overridepublic AggregationBuffer getNewAggregationBuffer() throws HiveException {return new FieldSumBuffer();}// TODO: 2023/12/9 重置聚集結果@Overridepublic void reset(AggregationBuffer agg) throws HiveException {//重新賦值為零((FieldSumBuffer) agg).setNum(0);}// TODO: 2023/12/9 map階段,迭代處理輸入sql傳過來的列數據,不斷被調用執行的方法,最終數據都保存在agg中,parameters是新傳入的數據@Overridepublic void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {// TODO: 2023/12/9 判斷如果傳入的值是空的直接返回if (parameters == null || parameters.length < 1) {return;}Object javaObj = inputNum.getPrimitiveJavaObject(parameters[0]);((FieldSumBuffer) agg).addNum(Integer.parseInt(javaObj.toString()));}// TODO: 2023/12/9 map與combiner結束返回結果,得到部分數據聚集結果@Overridepublic Object terminatePartial(AggregationBuffer agg) throws HiveException {return terminate(agg);}// TODO: 2023/12/9 combiner合并map返回的結果,還有reducer合并mapper或combiner返回的結果。@Overridepublic void merge(AggregationBuffer agg, Object partial) throws HiveException {((FieldSumBuffer) agg).addNum((Integer) middleNum.getPrimitiveJavaObject(partial));}// TODO: 2023/12/9 map階段,迭代處理輸入sql傳過來的列數據@Overridepublic Object terminate(AggregationBuffer agg) throws HiveException {Integer num = ((FieldSumBuffer) agg).getNum();return num;}
}
打包上傳,注冊函數進行測試:
可以看到實現了對參數的判斷和參數類型的判斷
執行查詢測試: