做者簡介html
淳敏,物流架構師同時也是一位team leader,工做認真負責,曾在休假期間「面向大海編程」,不明覺厲java
在Hive中,用戶能夠自定義一些函數,用於擴展HiveQL的功能。Hive 自定義函數主要包含如下三種:git
Hive的UDF機制是須要用戶實現: Resolver
和Evaluator
,其中Resolver
就用來處理輸入,調用Evaluator
,Evaluator
就是具體功能的實現。sql
Hadoop提供了一個基礎類org.apache.hadoop.hive.ql.exec.UDF
,在這個類中含有了一個UDFMethodResolver
的接口實現類DefaultUDFMethodResolver
的對象。apache
public class UDF {
private UDFMethodResolver rslv;
public UDF() {
this.rslv = new DefaultUDFMethodResolver(this.getClass());
}
......
}
複製代碼
在DefaultUDFMethodResolver
中,提供了一個getEvalMethod
的方法,從切面調用UDF
的evaluate
方法編程
public class DefaultUDFMethodResolver implements UDFMethodResolver {
private final Class<? extends UDF> udfClass;
public DefaultUDFMethodResolver(Class<? extends UDF> udfClass) {
this.udfClass = udfClass;
}
public Method getEvalMethod(List<TypeInfo> argClasses) throws UDFArgumentException {
return FunctionRegistry.getMethodInternal(this.udfClass, "evaluate", false, argClasses);
}
}
複製代碼
自定義UDF的實現上以繼承org.apache.hadoop.hive.ql.exec.UDF
爲基礎,而後實現一個evaluate
方法,該方法會被DefaultUDFMethodResolver
對象執行。json
public class DAIsContainPoint extends UDF {
public Boolean evaluate(Double longitude, Double latitude, String geojson) {
Boolean isContained = false;
try {
Polygon polygon = JTSHelper.parse(geojson);
Coordinate center = new Coordinate(longitude, latitude);
GeometryFactory factory = new GeometryFactory();
Point point = factory.createPoint(center);
isContained = polygon.contains(point);
}catch (Throwable e){
isContained = false;
}finally {
return isContained;
}
}
}
複製代碼
完成了代碼定義以後須要對其進行打包,編譯成一個jar
,注意: 最終的jar
中須要包含全部依賴的jar
,maven
編譯上推薦使用maven-shade-plugin
數組
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.2</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
複製代碼
最後產生的jar
文件須要在HIVE SQL中被引用架構
add jar hdfs://xxx/udf/ff8bd59f-d0a5-4b13-888b-5af239270869/udf.jar;
create temporary function is_in_polygon as 'me.ele.breat.hive.udf.DAIsContainPoint';
select lat, lng, geojson, is_in_polygon(lat, lng, geojson) as is_in from example;
複製代碼
在Hive的聚合計算中,採用MapReduce的方式來加快聚合的速度,而UDAF就是用來撰寫聚合類自定義方法的擴展方式。關於MapReduce須要補充知識的請看這裏,爲了更好的說明白UDAF咱們須要知道一下MapReduce
的流程app
回到Hive中來,在UDAF的實現中,首先須要繼承org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver
,並實現org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2
接口。而後構造GenericUDAFEvaluator
類,實現MapReduce的計算過程,其中有3個關鍵的方法
iterate
:獲取mapper,輸送去作mergemerge
:combiner合併mapperterminate
:合併全部combiner返回結果而後再實現一個繼承AbstractGenericUDAFResolver
的類,重載其getEvaluator
的方法,返回一個GenericUDAFEvaluator
的實例
public class DAJoinV2 extends AbstractGenericUDAFResolver implements GenericUDAFResolver2 {
@Override
public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo genericUDAFParameterInfo) throws SemanticException {
return new DAJoinStringEvaluator();
}
public GenericUDAFEvaluator getEvaluator(TypeInfo[] typeInfos) throws SemanticException {
if (typeInfos.length != 1) {
throw new UDFArgumentTypeException(typeInfos.length - 1,
"Exactly one argument is expected.");
}
if (typeInfos[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentTypeException(0,
"Only primitive type arguments are accepted but "
+ typeInfos[0].getTypeName() + " is passed.");
}
switch (((PrimitiveTypeInfo) typeInfos[0]).getPrimitiveCategory()) {
case STRING:
return new DAJoinStringEvaluator();
default:
throw new UDFArgumentTypeException(0,
"Only numeric or string type arguments are accepted but "
+ typeInfos[0].getTypeName() + " is passed.");
}
}
public static class DAJoinStringEvaluator extends GenericUDAFEvaluator {
private PrimitiveObjectInspector mInput;
private Text mResult;
// 存儲Geometry join的值的類
static class PolygonAgg implements AggregationBuffer {
Geometry geometry;
}
//定義:UDAF的返回類型,肯定了DAJoin自定義UDF的返回類型是Text類型
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
assert (parameters.length == 1);
super.init(m, parameters);
mResult = new Text();
mInput = (PrimitiveObjectInspector) parameters[0];
return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
}
//內存建立,用來存儲mapper,combiner,reducer運算過程當中的相加總和。
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
PolygonAgg polygonAgg = new PolygonAgg();
reset(polygonAgg);
return polygonAgg;
}
public void reset(AggregationBuffer aggregationBuffer) throws HiveException {
PolygonAgg polygonAgg = (PolygonAgg) aggregationBuffer;
GeometryFactory factory = new GeometryFactory();
polygonAgg.geometry = factory.createPolygon(new Coordinate[]{});
}
//map階段:獲取每一個mapper,去進行merge
public void iterate(AggregationBuffer aggregationBuffer, Object[] objects) throws HiveException {
assert (objects.length == 1);
merge(aggregationBuffer, objects[0]);
}
//在一個子的partial中combiner合併map返回結果
public Object terminatePartial(AggregationBuffer aggregationBuffer) throws HiveException {
return terminate(aggregationBuffer);
}
//combiner合併map返回結果
public void merge(AggregationBuffer aggregationBuffer, Object partial) throws HiveException {
if (partial != null) {
try {
PolygonAgg polygonAgg = (PolygonAgg) aggregationBuffer;
String geoJson = PrimitiveObjectInspectorUtils.getString(partial, mInput);
Polygon polygon = JTSHelper.parse(geoJson);
polygonAgg.geometry = polygonAgg.geometry.union(polygon);
} catch (Exception e){
}
}
}
//reducer合併全部combiner返回結果
public Object terminate(AggregationBuffer aggregationBuffer) throws HiveException {
try {
PolygonAgg polygonAgg = (PolygonAgg) aggregationBuffer;
Geometry buffer = polygonAgg.geometry.buffer(0);
mResult.set(JTSHelper.convert2String(buffer.convexHull()));
return mResult;
}catch (Exception e) {
return "";
}
}
}
}
複製代碼
打包以後將其用在HIVE SQL中執行
add jar hdfs://xxx/udf/ff8bd59f-d0a5-4b13-888b-5af239270869/udf.jar;
create temporary function da_join as 'me.ele.breat.hive.udf.DAJoinV2';
create table udaf_example as
select id, da_join(da_range) as da_union_polygon
from example
group by id
複製代碼
在UDTF的實現中,首先須要繼承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
,實現process
,initialize
和close
方法
initialize
返回StructObjectInspector對象,決定最後輸出的column的名稱和類型process
是對每個輸入record進行處理,產生出一個新數組,傳遞到forward
方法中進行處理close
關閉整個調用的回調處,清理內存public class S2SimpleRegionCoverV2 extends GenericUDTF {
private final static int LEVEL = 16;
@Override
public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
List<String> structFieldNames = Lists.newArrayList("s2cellid");
List<ObjectInspector> structFieldObjectInspectors = Lists.<ObjectInspector>newArrayList(
PrimitiveObjectInspectorFactory.javaLongObjectInspector);
return ObjectInspectorFactory
.getStandardStructObjectInspector(structFieldNames, structFieldObjectInspectors);
}
@Override
public void process(Object[] objects) throws HiveException {
String json = String.valueOf(objects[0]);
List<Long> s2cellids = toS2CellIds(json);
for (Long s2cellid: s2cellids){
forward(new Long[]{s2cellid});
}
}
public static List<Long> toS2CellIds(String json) {
GeometryFactory factory = new GeometryFactory();
GeoJsonReader reader = new GeoJsonReader();
Geometry geometry = null;
try {
geometry = reader.read(json);
} catch (ParseException e) {
geometry = factory.createPolygon(new Coordinate[]{});
}
List<S2Point> polygonS2Point = new ArrayList<S2Point>();
for (Coordinate coordinate : geometry.getCoordinates()) {
S2LatLng s2LatLng = S2LatLng.fromDegrees(coordinate.y, coordinate.x);
polygonS2Point.add(s2LatLng.toPoint());
}
List<S2Point> points = polygonS2Point;
if (points.size() == 0) {
return Lists.newArrayList();
}
ArrayList<S2CellId> result = new ArrayList<S2CellId>();
S2RegionCoverer
.getSimpleCovering(new S2Polygon(new S2Loop(points)), points.get(0), LEVEL, result);
List<Long> output = new ArrayList<Long>();
for (S2CellId s2CellId : result) {
output.add(s2CellId.id());
}
return output;
}
@Override
public void close() throws HiveException {
}
}
複製代碼
在使用的時候和lateral view
連在一塊兒用
add jar hdfs://bipcluster/data/upload/udf/ff8bd59f-d0a5-4b13-888b-5af239270869/google_s2_udf.jar;
create temporary function da_cover as 'me.ele.breat.hive.udf.S2SimpleRegionCoverV2';
drop table if exists temp.cm_s2_id_cover_list;
create table temp.cm_s2_id_cover_list as
select tb_s2cellid.s2cellid, source.shop_id
from (
select
geometry,
shop_id
from
example) source
lateral view da_cover(geometry) tb_s2cellid as s2cellid;
複製代碼
閱讀博客還不過癮?
歡迎你們掃二維碼經過添加羣助手,加入交流羣,討論和博客有關的技術問題,還能夠和博主有更多互動
![]()
博客轉載、線下活動及合做等問題請郵件至 shadowfly_zyl@hotmail.com 進行溝通