hive 的udtf使用

HIVE的自定義函數,有UDFUDAF,UDTF,分別實現一進一出、多進一出、一進多出的操做 java

今天只分享一個UDTF的使用,另外兩個之後再分享 apache

場景:使用UDTF解析JSON串 json

廢話很少說了,直接上代碼 ide

要想寫UDTF必須繼承GenericUDTF類,並實現initialize,process,close三個方法,initialize定義每行的列名及類型,process方法是對數據的操做,就是把一行拆成多行,注意一行有多列的話,須要是個集合,close方法能夠不實現 函數



package dw.udf;

import java.util.ArrayList;
import java.util.Iterator;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;
import org.json.JSONObject;

public class JsonParse extends GenericUDTF {//集成GenericUDTF

	@Override
	public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
		if (args.length != 1) {
			throw new UDFArgumentLengthException("ExplodeMap takes only one argument");
		}
		if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
			throw new UDFArgumentException("ExplodeMap takes string as a parameter");
		}
	
		ArrayList<String> fieldNames = new ArrayList<String>();//這裏是列的
		ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
		fieldNames.add("containerid");
		fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
		fieldNames.add("first_step");
		fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
		fieldNames.add("second_step");
		fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
		fieldNames.add("third_step");
		fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
		return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
	}

	@SuppressWarnings("rawtypes")
	@Override
	public void process(Object[] args) throws HiveException {
		try {
			JSONObject obj = new JSONObject(args[0].toString());
			Iterator it = obj.keys();
			while (it.hasNext()) {
				String key = (String) it.next();
				JSONArray array = obj.getJSONArray(key);
				if (key.indexOf("&") != -1) {
					key = key.substring(0, key.indexOf("&"));
				}
				String[] outstr = new String[4];
				outstr[0] = key;
				for (int i = 0; i < array.length(); i++) {
					outstr[i + 1] = array.getString(i);
				}
				forward(outstr);
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Override
	public void close() throws HiveException {

	}

}
相關文章
相關標籤/搜索