Hive UDF,就這
摘要:Hive UDF是什麼?有什麼用?怎麼用?什麼原理?本文從UDF使用入手,簡要介紹相關原始碼,UDF從零開始。
本文分享自華為雲社群《Hive UDF,就這》,作者:湯忒撒。
Hive中內建了很多函式,同時支援使用者自行擴充套件,按規則新增後即可在sql執行過程中使用,目前支援UDF、UDTF、UDAF三種類型,一般UDF應用場景較多,本文主要介紹UDF使用,簡要介紹相關原始碼。
UDF,(User Defined Function)使用者自定義函式
UDTF,(User-defined Table Generating Function)自定義表生成函式,一行資料生成多行
UDAF,(User-defined Aggregation Function)使用者自定義聚合函式,多行資料生成一行
1. UDF簡介
UDF包含兩種型別:1、臨時函式僅當前會話中有效,退出後重新連線即無法使用;2、永久函式註冊UDF資訊到MetaStore元資料中,可永久使用。
實現UDF需要繼承特定類UDF或GenericUDF二選一。
- apache.hadoop.hive.ql.exec.UDF,處理並返回基本資料型別,int、string、boolean、double等;
- apache.hadoop.hive.ql.udf.generic.GenericUDF,可處理並返回複雜資料型別,如Map、List、Array等,同時支援巢狀;
2. UDF相關語法
UDF使用需要將編寫的UDF類編譯為jar包新增到Hive中,根據需要建立臨時函式或永久函式。
2.1. resources操作
Hive支援向會話中新增資源,支援檔案、jar、存檔,新增後即可在sql中直接引用,僅當前會話有效,預設讀取本地路徑,支援hdfs等,路徑不加引號。例:add jar /opt/ht/AddUDF.jar;
新增資源
ADD { FILE[S] | JAR[S] | ARCHIVE[S] } <filepath1> [<filepath2>]*
檢視資源
LIST { FILE[S] | JAR[S] | ARCHIVE[S] } [<filepath1> <filepath2> ..]
刪除資源
DELETE { FILE[S] | JAR[S] | ARCHIVE[S] } [<filepath1> <filepath2> ..]
2.2. 臨時函式
僅當前會話有效,不支援指定資料庫,USING路徑需加引號。
CREATE TEMPORARY FUNCTION function_name AS class_name [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri'] ];
DROP TEMPORARY FUNCTION [IF EXISTS] function_name;
2.3. 永久函式
函式資訊入庫,永久有效,USING路徑需加引號。臨時函式與永久函式均可使用USING語句,Hive會自動新增指定檔案到當前環境中,效果與add語句相同,執行後即可list檢視已新增的檔案或jar包。
CREATE FUNCTION [db_name.]function_name AS class_name [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri'] ];
DROP FUNCTION [IF EXISTS] function_name;
RELOAD (FUNCTIONS|FUNCTION);
2.4. 檢視函式
檢視所有函式,不區分臨時函式與永久函式
show functions;
函式模糊查詢,此處為查詢x開頭的函式
show functions like 'x*';
檢視函式描述
desc function function_name;
檢視函式詳細描述
desc function extended function_name;
3. Description註解
Hive已定義註解型別org.apache.hadoop.hive.ql.exec.Description,用於執行desc function [extended] function_name時介紹函式功能,內建函式與自定義函式用法相同。
【備註】若Description註解名稱與建立UDF時指定名稱不同,以建立UDF時指定名稱為準。
public @interface Description {
//函式簡單介紹
String value() default "_FUNC_ is undocumented";
//函式詳細使用說明
String extended() default "";
//函式名稱
String name() default "";
}
例:Hive內建ceil函式GenericUDFCeil程式碼定義如下,
desc function ceil;
desc function extended ceil;
4. UDF
繼承UDF類必須實現evaluate方法,支援定義多個evaluate方法不同引數列表用於處理不同型別資料,如下
public Text evaluate(Text s)
public int evaluate(Integer s)
…
4.1. UDF示例
實現UDF函式,若字串執行拼接,int型別執行加法運算。
@Description(
name="my_plus",
value="my_plus() - if string, do concat; if integer, do plus",
extended = "Example : \n >select my_plus('a', 'b');\n >ab\n >select my_plus(3, 5);\n >8"
)
public class AddUDF extends UDF {
public String evaluate(String... parameters) {
if (parameters == null || parameters.length == 0) {
return null;
}
StringBuilder sb = new StringBuilder();
for (String param : parameters) {
sb.append(param);
}
return sb.toString();
}
public int evaluate(IntWritable... parameters) {
if (parameters == null || parameters.length == 0) {
return 0;
}
long sum = 0;
for (IntWritable currentNum : parameters) {
sum = Math.addExact(sum, currentNum.get());
}
return (int) sum;
}
}
hdfs dfs -put AddUDF.jar /tmp/ht/
create function my_plus as 'com.huawei.ht.test.AddUDF' using jar 'hdfs:///tmp/ht/AddUDF.jar';
desc function my_plus;
desc function extended my_plus;
UDF新增後記錄在元資料表FUNCS、FUNC_RU表中
4.2. 原始碼淺析
UDF類呼叫入口為方法解析器,預設方法解析器DefaultUDFMethodResolver,執行時由解析器反射獲取UDF類的evaluate方法執行,類程式碼如下:
UDF
public class UDF {
//udf方法解析器
private UDFMethodResolver rslv;
//預設構造器DefaultUDFMethodResolver
public UDF() {
rslv = new DefaultUDFMethodResolver(this.getClass());
}
protected UDF(UDFMethodResolver rslv) {
this.rslv = rslv;
}
public void setResolver(UDFMethodResolver rslv) {
this.rslv = rslv;
}
public UDFMethodResolver getResolver() {
return rslv;
}
public String[] getRequiredJars() {
return null;
}
public String[] getRequiredFiles() {
return null;
}
}
DefaultUDFMethodResolver
public class DefaultUDFMethodResolver implements UDFMethodResolver {
//The class of the UDF.
private final Class<? extends UDF> udfClass;
public DefaultUDFMethodResolver(Class<? extends UDF> udfClass) {
this.udfClass = udfClass;
}
@Override
public Method getEvalMethod(List<TypeInfo> argClasses) throws UDFArgumentException {
return FunctionRegistry.getMethodInternal(udfClass, "evaluate", false, argClasses);
}
}
5. GenericUDF
GenericUDF相比與UDF功能更豐富,支援所有引數型別,引數型別由ObjectInspector封裝;引數Writable類由DeferredObject封裝,使用時簡單型別可直接從Writable獲取,複雜型別可由ObjectInspector解析。
繼承GenericUDF必須實現如下3個介面:
//初始化,ObjectInspector為資料型別封裝類,無實際引數值,返回結果型別
public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
return null;
}
//DeferredObject封裝實際引數的對應Writable類
public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
return null;
}
//函式資訊
public String getDisplayString(String[] strings) {
return null;
}
5.1. GenericUDF示例
自定義函式實現count函式,支援int與long型別,Hive中無long型別,對應型別為bigint,create function與資料庫儲存與UDF一致,此處不再贅述。
initialize,遍歷ObjectInspector[]檢查每個引數型別,根據引數型別構造ObjectInspectorConverters.Converter,用於將Hive傳遞的引數型別轉換為對應的Writable封裝物件ObjectInspector,供後續統一處理。
evaluate,初始化時已記錄每個引數具體型別,從DeferredObject中獲取物件,根據型別使用對應Converter物件轉換為Writable執行計算。
例:處理int型別,
UDF查詢常量時,DeferredObject中封裝型別為IntWritable;
UDF查詢表字段時,DeferredObject中封裝型別為LazyInteger。
@Description(
name="my_count",
value="my_count(...) - count int or long type numbers",
extended = "Example :\n >select my_count(3, 5);\n >8\n >select my_count(3, 5, 25);\n >33"
)
public class MyCountUDF extends GenericUDF {
private PrimitiveObjectInspector.PrimitiveCategory[] inputType;
private transient ObjectInspectorConverters.Converter intConverter;
private transient ObjectInspectorConverters.Converter longConverter;
@Override
public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
int length = objectInspectors.length;
inputType = new PrimitiveObjectInspector.PrimitiveCategory[length];
for (int i = 0; i < length; i++) {
ObjectInspector currentOI = objectInspectors[i];
ObjectInspector.Category type = currentOI.getCategory();
if (type != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentException("The function my_count need PRIMITIVE Category, but get " + type);
}
PrimitiveObjectInspector.PrimitiveCategory primitiveType =
((PrimitiveObjectInspector) currentOI).getPrimitiveCategory();
inputType[i] = primitiveType;
switch (primitiveType) {
case INT:
if (intConverter == null) {
ObjectInspector intOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(primitiveType);
intConverter = ObjectInspectorConverters.getConverter(currentOI, intOI);
}
break;
case LONG:
if (longConverter == null) {
ObjectInspector longOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(primitiveType);
longConverter = ObjectInspectorConverters.getConverter(currentOI, longOI);
}
break;
default:
throw new UDFArgumentException("The function my_count need INT OR BIGINT, but get " + primitiveType);
}
}
return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
}
@Override
public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
LongWritable out = new LongWritable();
for (int i = 0; i < deferredObjects.length; i++) {
PrimitiveObjectInspector.PrimitiveCategory type = this.inputType[i];
Object param = deferredObjects[i].get();
switch (type) {
case INT:
Object intObject = intConverter.convert(param);
out.set(Math.addExact(out.get(), ((IntWritable) intObject).get()));
break;
case LONG:
Object longObject = longConverter.convert(param);
out.set(Math.addExact(out.get(), ((LongWritable) longObject).get()));
break;
default:
throw new IllegalStateException("Unexpected type in MyCountUDF evaluate : " + type);
}
}
return out;
}
@Override
public String getDisplayString(String[] strings) {
return "my_count(" + Joiner.on(", ").join(strings) + ")";
}
}
create function my_count as 'com.huawei.ht.test.MyCountUDF' using jar 'hdfs:///tmp/countUDF.jar';
create table test_numeric(i1 int, b1 bigint, b2 bigint, i2 int, i3 int);
insert into table test_numeric values(0, -10, 25, 300, 15), (11, 22, 33, 44, 55);
select *, my_count(*) from test_numeric;
5.2. 原始碼淺析
GenericUDF內部定義了方法呼叫順序,子類實現相應功能即可,呼叫時根據函式名稱從FunctionRegistry中獲取UDF物件,返回執行結果。
Hive中資料型別均使用ObjectInspector封裝,為區分普通型別與負責結構型別,定義列舉Category,共包含PRIMITIVE,LIST,MAP,STRUCT,UNION這5種類型,其中PRIMITIVE表示普通型別(int、long、double等)。
ObjectInspector
public interface ObjectInspector extends Cloneable {
//用於型別名稱
String getTypeName();
//用於獲取ObjectInspector封裝的欄位型別
ObjectInspector.Category getCategory();
public static enum Category {
PRIMITIVE,
LIST,
MAP,
STRUCT,
UNION;
private Category() {
}
}
}
PrimitiveObjectInspector.PrimitiveCategory,基本型別
public static enum PrimitiveCategory {
VOID,
BOOLEAN,
BYTE,
SHORT,
INT,
LONG,
…
}
GenericUDF. initializeAndFoldConstants
呼叫initialize獲取輸出ObjectInspector,若為常量型別,直接evaluate計算結果值。
此方法編譯階段通過AST構造Operator遍歷sql節點時,常量直接計算結果值,其他型別僅執行initialize。
計算表字段時,在MR等任務中,Operator執行時呼叫initialize、evaluate計算結果值(例:SelectOperator)。
public ObjectInspector initializeAndFoldConstants(ObjectInspector[] arguments) throws UDFArgumentException {
ObjectInspector oi = this.initialize(arguments);
if (this.getRequiredFiles() == null && this.getRequiredJars() == null) {
boolean allConstant = true;
for(int ii = 0; ii < arguments.length; ++ii) {
if (!ObjectInspectorUtils.isConstantObjectInspector(arguments[ii])) {
allConstant = false;
break;
}
}
if (allConstant && !ObjectInspectorUtils.isConstantObjectInspector((ObjectInspector)oi) && FunctionRegistry.isConsistentWithinQuery(this) && ObjectInspectorUtils.supportsConstantObjectInspector((ObjectInspector)oi)) {
GenericUDF.DeferredObject[] argumentValues = new GenericUDF.DeferredJavaObject[arguments.length];
for(int ii = 0; ii < arguments.length; ++ii) {
argumentValues[ii] = new GenericUDF.DeferredJavaObject(((ConstantObjectInspector)arguments[ii]).getWritableConstantValue());
}
try {
Object constantValue = this.evaluate(argumentValues);
oi = ObjectInspectorUtils.getConstantObjectInspector((ObjectInspector)oi, constantValue);
} catch (HiveException var6) {
throw new UDFArgumentException(var6);
}
}
return (ObjectInspector)oi;
} else {
return (ObjectInspector)oi;
}
}
6. UDF相關原始碼
6.1. 運算子
Hive SQL中,“+、-、*、/、=”等運算子都是是UDF函式,在FunctionRegistry中宣告,所有UDF均在編譯階段由AST生成Operator樹時解析,常量直接計算結果值,其他型別僅初始化,獲取輸出型別用於生成Operator樹,後續在Operator真正執行時計算結果值。
static {
HIVE_OPERATORS.addAll(Arrays.asList(
"+", "-", "*", "/", "%", "div", "&", "|", "^", "~",
"and", "or", "not", "!",
"=", "==", "<=>", "!=", "<>", "<", "<=", ">", ">=",
"index"));
}
6.2. 函式型別
Hive中包含BUILTIN, PERSISTENT, TEMPORARY三種函式;
public static enum FunctionType {
BUILTIN, PERSISTENT, TEMPORARY;
}
6.3. FunctionRegistry
Hive的所有UDF均由FunctionRegistry管理,FunctionRegistry僅管理記憶體中的UDF,不操作資料庫。
內建函式都在FunctionRegistry靜態塊中初始化,不在資料庫中記錄;使用者自定義UDF新增、刪除都在HiveServer本地執行,臨時函式在SessionState中處理,永久函式由FunctionTask呼叫FunctionRegistry對應方法處理,載入後FunctionTask負責寫庫。
public final class FunctionRegistry {
…
private static final Registry system = new Registry(true);
static {
system.registerGenericUDF("concat", GenericUDFConcat.class);
system.registerUDF("substr", UDFSubstr.class, false);
…
}
…
public static void registerTemporaryMacro(
String macroName, ExprNodeDesc body, List<String> colNames, List<TypeInfo> colTypes) {
SessionState.getRegistryForWrite().registerMacro(macroName, body, colNames, colTypes);
}
public static FunctionInfo registerPermanentFunction(String functionName,
String className, boolean registerToSession, FunctionResource[] resources) {
return system.registerPermanentFunction(functionName, className, registerToSession, resources);
}
…
}
6.4. GenericUDFBridge
Hive中UDF與GenericUDF實際均以GenericUDF方式處理,通過GenericUDFBridge適配,GenericUDFBridge繼承GenericUDF。
新增UDF時,FunctionRegistry呼叫Registry物件新增UDF,Registry將UDF封裝為GenericUDFBridge儲存到內建中。
Registry
private FunctionInfo registerUDF(String functionName, FunctionType functionType,
Class<? extends UDF> UDFClass, boolean isOperator, String displayName,
FunctionResource... resources) {
validateClass(UDFClass, UDF.class);
FunctionInfo fI = new FunctionInfo(functionType, displayName,
new GenericUDFBridge(displayName, isOperator, UDFClass.getName()), resources);
addFunction(functionName, fI);
return fI;
}
GenericUDFBridge
內部根據引數反射獲取UDF類evaluate方法並適配引數,自動轉化為相應型別,故UDF不需要感知函式本地執行與yarn執行時的具體型別是否一致。
部分程式碼如下:
public GenericUDFBridge(String udfName, boolean isOperator,
String udfClassName) {
this.udfName = udfName;
this.isOperator = isOperator;
this.udfClassName = udfClassName;
}
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
//初始化UDF物件
try {
udf = (UDF)getUdfClassInternal().newInstance();
} catch (Exception e) {
throw new UDFArgumentException(
"Unable to instantiate UDF implementation class " + udfClassName + ": " + e);
}
// Resolve for the method based on argument types
ArrayList<TypeInfo> argumentTypeInfos = new ArrayList<TypeInfo>(
arguments.length);
for (ObjectInspector argument : arguments) {
argumentTypeInfos.add(TypeInfoUtils
.getTypeInfoFromObjectInspector(argument));
}
udfMethod = udf.getResolver().getEvalMethod(argumentTypeInfos);
udfMethod.setAccessible(true);
// Create parameter converters
conversionHelper = new ConversionHelper(udfMethod, arguments);
// Create the non-deferred realArgument
realArguments = new Object[arguments.length];
// Get the return ObjectInspector.
ObjectInspector returnOI = ObjectInspectorFactory
.getReflectionObjectInspector(udfMethod.getGenericReturnType(),
ObjectInspectorOptions.JAVA);
return returnOI;
}
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
assert (arguments.length == realArguments.length);
// Calculate all the arguments
for (int i = 0; i < realArguments.length; i++) {
realArguments[i] = arguments[i].get();
}
// Call the function,反射執行UDF類evaluate方法
Object result = FunctionRegistry.invoke(udfMethod, udf, conversionHelper
.convertIfNecessary(realArguments));
// For non-generic UDF, type info isn't available. This poses a problem for Hive Decimal.
// If the returned value is HiveDecimal, we assume maximum precision/scale.
if (result != null && result instanceof HiveDecimalWritable) {
result = HiveDecimalWritable.enforcePrecisionScale
((HiveDecimalWritable) result,
HiveDecimal.SYSTEM_DEFAULT_PRECISION,
HiveDecimal.SYSTEM_DEFAULT_SCALE);
}
return result;
}
6.5. 函式呼叫入口
sql中使用函式時,可能有3處呼叫,不同版本程式碼行數可能不一致,流程類似。
1. 編譯時遍歷語法樹轉換Operator。
TypeCheckProcFactory.getXpathOrFuncExprNodeDesc中根據sql中運算子或UDF名稱生成表示式物件ExprNodeGenericFuncDesc,內部呼叫GenericUDF方法。
2. 啟用常量傳播優化器優化時,ConstantPropagate中遍歷樹過程呼叫;
此優化器預設開啟,可引數控制"hive.optimize.constant.propagation"。
ConstantPropagate優化時遍歷節點,嘗試提前計算常量表達式,由ConstantPropagateProcFactory.evaluateFunction計算UDF。
3. UDF引數不是常量,SQL按計劃執行過程中Operator真正執行時;
Operator真正執行時,由ExprNodeGenericFuncEvaluator. _evaluate處理每行資料,計算UDF結果值。
@Override
protected Object _evaluate(Object row, int version) throws HiveException {
if (isConstant) {
// The output of this UDF is constant, so don't even bother evaluating.
return ((ConstantObjectInspector) outputOI).getWritableConstantValue();
}
rowObject = row;
for (GenericUDF.DeferredObject deferredObject : childrenNeedingPrepare) {
deferredObject.prepare(version);
}
return genericUDF.evaluate(deferredChildren);
}
- DCM:一個能夠改善所有應用資料互動場景的中介軟體新秀
- 手繪圖解java類載入原理
- 關於加密通道規範,你真正用的是TLS,而非SSL
- 程式碼重構,真的只有複雜化一條路嗎?
- 解讀分散式排程平臺Airflow在華為雲MRS中的實踐
- 透過例項demo帶你認識gRPC
- 帶你聚焦GaussDB(DWS)儲存時遊標使用
- 傳統到敏捷的轉型中,誰更適合做Scrum Master?
- 輕鬆解決研發知識管理難題
- Java中觀察者模式與委託,還在傻傻分不清
- 如何使用Python實現影象融合及加法運算?
- 什麼是強化學習?
- 探索開源工作流引擎Azkaban在MRS中的實踐
- GaussDB(DWS) NOT IN優化技術解密:排他分析場景400倍效能提升
- Java中觀察者模式與委託,還在傻傻分不清
- Java中的執行緒到底有哪些安全策略
- 一圖詳解java-class類檔案原理
- Java中的執行緒到底有哪些安全策略
- 擺平各類目標檢測識別AI應用,有它就夠了!
- KeyDB重量釋出6.3.0開源版,華為深度參與貢獻