1、背景介紹:
MaxCompute 2.0版本升級後,Java UDF支持的數據類型從原來的BIGINT、STRING、DOUBLE、BOOLEAN擴展了更多基本的數據類型,同時還擴展支持了ARRAY、MAP、STRUCT等複雜類型,以及Writable參數。Java UDF使用複雜數據類型的方法,STRUCT對應com.aliyun.odps.data.Struct。com.aliyun.odps.data.Struct從反射看不出Field Name和Field Type,因此須要用@Resolve註解來輔助。即若是須要在UDF中使用STRUCT,要求在UDF Class上也標註上@Resolve註解。可是當咱們Struct類型中的field有不少字段的時候,這個時候須要咱們去手動的添加@Resolve註解就不是那麼的友好。針對這一個問題,咱們能夠使用Hive 中的GenericUDF去實現。MaxCompute 2.0支持Hive風格的UDF,部分Hive UDF、UDTF能夠直接在MaxCompute上使用。
2、複雜數據類型UDF示例
示例定義了一個有三個複雜數據類型的UDF,其中第一個用ARRAY做爲參數,第二個用MAP做爲參數,第三個用STRUCT做爲參數。因爲第三個Overloads用了STRUCT做爲參數或者返回值,所以要求必須對UDF Class添加@Resolve註解,指定STRUCT的具體類型。
1.代碼編寫java
@Resolve("struct<a:bigint>,string->string") public class UdfArray extends UDF { public String evaluate(List<String> vals, Long len) { return vals.get(len.intValue()); } public String evaluate(Map<String,String> map, String key) { return map.get(key); } public String evaluate(Struct struct, String key) { return struct.getFieldValue("a") + key; } }
2.打jar包添加資源sql
add jar UdfArray.jar
3.建立函數apache
create function my_index as 'UdfArray' using 'UdfArray.jar';
4.使用UDF函數數組
select id, my_index(array('red', 'yellow', 'green'), colorOrdinal) as color_name from colors;
3、使用Hive的GenericUDF
這裏咱們使用Struct複雜數據類型做爲示例,主要處理的邏輯是當咱們結構體中兩個字段先後沒有差別時不返回,若是先後有差別將新的字段及其值組成新的結構體返回。示例中Struct的Field爲3個。使用GenericUDF方式能夠解決須要手動添加@Resolve註解。
1.建立一個MaxCompute表數據結構
CREATE TABLE IF NOT EXISTS `tmp_ab_struct_type_1` ( `a1` struct<a:STRING,b:STRING,c:string>, `b1` struct<a:STRING,b:STRING,c:string> );
2.表中數據結構以下ide
insert into table tmp_ab_struct_type_1 SELECT named_struct('a',1,'b',3,'c','2019-12-17 16:27:00'), named_struct('a',5,'b',6,'c','2019-12-18 16:30:00');
查詢數據以下所示:函數
3.編寫GenericUDF處理邏輯
(1)QSC_DEMOO類oop
package com.aliyun.udf.struct; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import java.util.ArrayList; import java.util.List; /** * Created by ljw on 2019-12-17 * Description: */ @SuppressWarnings("Duplicates") public class QSC_DEMOO extends GenericUDF { StructObjectInspector soi1; StructObjectInspector soi2; /** * 避免頻繁Struct對象 */ private PubSimpleStruct resultStruct = new PubSimpleStruct(); private List<? extends StructField> allStructFieldRefs; //1. 這個方法只調用一次,而且在evaluate()方法以前調用。該方法接受的參數是一個arguments數組。該方法檢查接受正確的參數類型和參數個數。 //2. 輸出類型的定義 @Override public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { String error = ""; //檢驗參數個數是否正確 if (arguments.length != 2) { throw new UDFArgumentException("須要兩個參數"); } //判斷參數類型是否正確-struct ObjectInspector.Category arg1 = arguments[0].getCategory(); ObjectInspector.Category arg2 = arguments[1].getCategory(); if (!(arg1.equals(ObjectInspector.Category.STRUCT))) { error += arguments[0].getClass().getSimpleName(); throw new UDFArgumentTypeException(0, "\"array\" expected at function STRUCT_CONTAINS, but \"" + arg1.name() + "\" " + "is found" + "\n" + error); } if (!(arg2.equals(ObjectInspector.Category.STRUCT))) { error += arguments[1].getClass().getSimpleName(); throw new UDFArgumentTypeException(0, "\"array\" expected at function STRUCT_CONTAINS, but \"" + arg2.name() + "\" " + "is found" + "\n" + error); } //輸出結構體定義 ArrayList<String> structFieldNames = new ArrayList(); ArrayList<ObjectInspector> structFieldObjectInspectors = new ArrayList(); soi1 = (StructObjectInspector) arguments[0]; soi2 = (StructObjectInspector) arguments[1]; StructObjectInspector toValid = null; if (soi1 == null) toValid = soi2; else toValid = soi1; //設置返回類型 allStructFieldRefs = toValid.getAllStructFieldRefs(); for (StructField structField : allStructFieldRefs) { structFieldNames.add(structField.getFieldName()); structFieldObjectInspectors.add(structField.getFieldObjectInspector()); } return ObjectInspectorFactory.getStandardStructObjectInspector(structFieldNames, structFieldObjectInspectors); } //這個方法相似UDF的evaluate()方法。它處理真實的參數,並返回最終結果。 @Override public Object evaluate(DeferredObject[] deferredObjects) throws HiveException { //將hive中的struct類型轉換成com.aliyun.odps.data.Struct, 若是有錯誤,請調試,查看deferredObjects的數據是什麼樣子的 //而後本身進行從新封裝 !!! ArrayList list1 = (ArrayList) deferredObjects[0].get(); ArrayList list2 = (ArrayList) deferredObjects[1].get(); int len = list1.size(); ArrayList fieldNames = new ArrayList<>(); ArrayList fieldValues = new ArrayList<>(); for (int i = 0; i < len ; i++) { if (!list1.get(i).equals(list2.get(i))) { fieldNames.add(allStructFieldRefs.get(i).getFieldName()); fieldValues.add(list2.get(i)); } } if (fieldValues.size() == 0) return null; return fieldValues; } //這個方法用於當實現的GenericUDF出錯的時候,打印出提示信息。而提示信息就是你實現該方法最後返回的字符串。 @Override public String getDisplayString(String[] strings) { return "Usage:" + this.getClass().getName() + "(" + strings[0] + ")"; } }
(2)PubSimpleStruct類測試
package com.aliyun.udf.struct; import com.aliyun.odps.data.Struct; import com.aliyun.odps.type.StructTypeInfo; import com.aliyun.odps.type.TypeInfo; import java.util.List; public class PubSimpleStruct implements Struct { private StructTypeInfo typeInfo; private List<Object> fieldValues; public StructTypeInfo getTypeInfo() { return typeInfo; } public void setTypeInfo(StructTypeInfo typeInfo) { this.typeInfo = typeInfo; } public void setFieldValues(List<Object> fieldValues) { this.fieldValues = fieldValues; } public int getFieldCount() { return fieldValues.size(); } public String getFieldName(int index) { return typeInfo.getFieldNames().get(index); } public TypeInfo getFieldTypeInfo(int index) { return typeInfo.getFieldTypeInfos().get(index); } public Object getFieldValue(int index) { return fieldValues.get(index); } public TypeInfo getFieldTypeInfo(String fieldName) { for (int i = 0; i < typeInfo.getFieldCount(); ++i) { if (typeInfo.getFieldNames().get(i).equalsIgnoreCase(fieldName)) { return typeInfo.getFieldTypeInfos().get(i); } } return null; } public Object getFieldValue(String fieldName) { for (int i = 0; i < typeInfo.getFieldCount(); ++i) { if (typeInfo.getFieldNames().get(i).equalsIgnoreCase(fieldName)) { return fieldValues.get(i); } } return null; } public List<Object> getFieldValues() { return fieldValues; } @Override public String toString() { return "PubSimpleStruct{" + "typeInfo=" + typeInfo + ", fieldValues=" + fieldValues + '}'; } }
三、打jar包,添加資源this
add jar test.jar;
四、建立函數
CREATE FUNCTION UDF_DEMO as 'com.aliyun.udf.test.UDF_DEMOO' using 'test.jar';
五、測試使用UDF函數
set odps.sql.hive.compatible=true; select UDF_DEMO(a1,b1) from tmp_ab_struct_type_1;
查詢結果以下所示:
注意:
(1)在使用兼容的Hive UDF的時候,須要在SQL前加set odps.sql.hive.compatible=true;語句,set語句和SQL語句一塊兒提交執行。
(2)目前支持兼容的Hive版本爲2.1.0,對應Hadoop版本爲2.7.2。若是UDF是在其餘版本的Hive/Hadoop開發的,則可能須要使用此Hive/Hadoop版本從新編譯。
有疑問能夠諮詢阿里雲MaxCompute技術支持:劉建偉
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>2.1.0</version> </dependency>
本文爲阿里雲內容,未經容許不得轉載。