從聚合體日誌中須要拆解出來各子日誌數據,而後單獨插入到各日誌子表中。經過表生成函數完成這一過程。java
package com.oldboy.umeng.hive.util; import com.oldboy.umeng.common.domain.AppStartupLog; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import java.beans.BeanInfo; import java.beans.IntrospectionException; import java.beans.Introspector; import java.beans.PropertyDescriptor; import java.lang.reflect.Method; import java.util.ArrayList; /** * hive工具 */ public class HiveUtil { /** * 組裝對象檢查器 */ public static void popOIs(Class<AppStartupLog> clz, ArrayList<String> fieldNames, ArrayList<ObjectInspector> fieldOIs) throws IntrospectionException { // BeanInfo bi = Introspector.getBeanInfo(clz); PropertyDescriptor[] pps = bi.getPropertyDescriptors() ; for(PropertyDescriptor pp : pps){ String name = pp.getName() ; Class type = pp.getPropertyType() ; Method getter = pp.getReadMethod() ; Method setter = pp.getWriteMethod() ; if(getter != null && setter != null){ if(type == String.class){ fieldNames.add(name) ; fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector) ; } else if(type == int.class || type == Integer.class){ fieldNames.add(name); fieldOIs.add(PrimitiveObjectInspectorFactory.javaIntObjectInspector); } else if(type == long.class || type == Long.class){ fieldNames.add(name); fieldOIs.add(PrimitiveObjectInspectorFactory.javaLongObjectInspector); } } } } /** * 按照指定的列表順序組裝數組 */ public static Object[] convert2Arr(AppStartupLog l, ArrayList<String> fieldNames) { Object[] values = new Object[fieldNames.size()] ; for(int i = 0 ; i < fieldNames.size() ; i ++){ try { values[i] = getPropValue(l , fieldNames.get(i)) ; } catch (Exception e) { } } return values ; } /** * 從指定對象中提取指定的屬性值 */ public static Object getPropValue(Object o , String propName) throws Exception { BeanInfo bi = Introspector.getBeanInfo(o.getClass()) ; PropertyDescriptor[] pps = bi.getPropertyDescriptors() ; for(PropertyDescriptor pp : pps){ String name = pp.getName() ; if(name.equals(propName)){ Method getter = pp.getReadMethod(); if(getter != null){ return getter.invoke(o) ; } } } return null ; } }
package com.oldboy.umeng.common.util; import com.alibaba.fastjson.JSON; import com.oldboy.umeng.common.domain.*; import java.beans.BeanInfo; import java.beans.IntrospectionException; import java.beans.Introspector; import java.beans.PropertyDescriptor; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.text.DecimalFormat; import java.util.*; /** * 日誌工具類 */ public class LogUtil { private static Random r = new Random(); /** * 經過內省生成日誌對象 */ public static <T> T genLog(Class<T> t) throws Exception { //建立實例 Object obj = t.newInstance(); BeanInfo bi = Introspector.getBeanInfo(t); PropertyDescriptor[] pps = bi.getPropertyDescriptors(); //循環全部屬性 for (PropertyDescriptor pp : pps) { //取set方法 Method setter = pp.getWriteMethod(); if (setter != null) { String pname = pp.getName(); Class ptype = pp.getPropertyType(); //字符串類型 if (ptype == String.class) { String pvalue = DictUtil.getRandString(pname.toLowerCase()); setter.invoke(obj, pvalue); } else if (ptype == int.class || ptype == Integer.class) { try { int pvalue = DictUtil.getRandInt(pname.toLowerCase()); setter.invoke(obj, pvalue); } catch (Exception e) { } } } } processLogTime(obj); return (T) obj; } /** * 生成實例,不包含父類的內容 */ public static <T> T genLogNoParents(Class<T> t) throws Exception { //建立實例 Object obj = t.newInstance(); Field[] fs = t.getDeclaredFields(); for (Field f : fs) { String fname = f.getName(); Class ftype = f.getType(); if (ftype == String.class) { String fvalue = DictUtil.getRandString(fname.toLowerCase()); f.setAccessible(true); f.set(obj, fvalue); } else if (ftype == int.class || ftype == Integer.class) { try { int fvalue = DictUtil.getRandInt(fname.toLowerCase()); f.setAccessible(true); f.set(obj, fvalue); } catch (Exception e) { } } } //處理設備id和時間問題 processLogTime(obj); return (T) obj; } /** * 處理時間 */ private static void processLogTime(Object obj) { long now = System.currentTimeMillis(); int dur = 10 * 24 * 60 * 60 * 1000; long thatTime = now - r.nextInt(dur); if (obj instanceof AppBaseLog) { ((AppBaseLog) obj).setCreatedAtMs(thatTime); } } public static <T> List<T> genLogList(Class<T> t, int n) throws Exception { List<T> list = new ArrayList<T>(); for (int i = 0; i < n; i++) { list.add(genLogNoParents(t)); } return list; } /** * 生成日誌聚合體 */ public static AppLogAggEntity getLogAgg() throws Exception { Random r = new Random(); int n = 5; AppLogAggEntity agg = genLog(AppLogAggEntity.class); processsDeviceId(agg); agg.setStartupLogs(genLogList(AppStartupLog.class, r.nextInt(n) + 1)); agg.setEventLogs(genLogList(AppEventLog.class, r.nextInt(n) + 1)); agg.setErrorLogs(genLogList(AppErrorLog.class, r.nextInt(n) + 1)); agg.setUsageLogs(genLogList(AppUsageLog.class, r.nextInt(n) + 1)); agg.setPageLogs(genLogList(AppPageLog.class, r.nextInt(n) + 1)); return agg; } /** * 處理設備id */ private static void processsDeviceId(AppLogAggEntity agg) { //1 - 00001 DecimalFormat df = new DecimalFormat("00000"); int devid = r.nextInt(10000) + 1; String str = "dv-" + df.format(devid); agg.setDeviceId(str); } /** * 生成特定的類對應的ddl語句 */ public static String genDDL(Class clazz) throws IntrospectionException { String RN = "\r\n"; //只含類名 String simpleName = clazz.getSimpleName(); //算表名 String tablename = simpleName.substring(3).toLowerCase() + "s"; StringBuilder builder = new StringBuilder(); builder.append(RN) .append("--") .append(tablename).append(RN).append( "create table if not exists " + tablename).append(RN).append("(").append(RN); BeanInfo bi = Introspector.getBeanInfo(clazz); PropertyDescriptor[] pps = bi.getPropertyDescriptors(); for (int i = 0; i < pps.length; i++) { PropertyDescriptor pp = pps[i]; String name = pp.getName(); Class type = pp.getPropertyType(); Method getter = pp.getReadMethod(); Method setter = pp.getWriteMethod(); if (getter != null && setter != null) { //不是最後 if (i != pps.length - 1) { if (type == String.class) { builder.append(" " + name + "\t\t\t string , " + RN); } else if (type == int.class || type == Integer.class) { builder.append(" " + name + "\t\t\t int , " + RN); } else if (type == long.class || type == Long.class) { builder.append(" " + name + "\t\t\t bigint , " + RN); } } else { if (type == String.class) { builder.append(" " + name + "\t\t\t string " + RN); } else if (type == int.class || type == Integer.class) { builder.append(" " + name + "\t\t\t int " + RN); } else if (type == long.class || type == Long.class) { builder.append(" " + name + "\t\t\t bigint " + RN); } } } } //追加結束符 builder.append(")") .append(RN) .append("partitioned by (ym int ,day int , hm int) ") .append(RN) .append("stored as parquet ;") .append(RN) ; return builder.toString() ; } /** * 生成全部的DDL語句 */ public static String genAllDDL() throws IntrospectionException { Class[] clazz = { AppStartupLog.class , AppEventLog.class , AppErrorLog.class , AppUsageLog.class , AppPageLog.class , } ; StringBuilder builder = new StringBuilder() ; builder.append("use umeng_big11 ;") ; builder.append("\r\n") ; for(Class clz : clazz){ builder.append(genDDL(clz)) ; } return builder.toString() ; } /** * 從json個數反串行化日誌 */ public static AppLogAggEntity deserLog(String json){ String newJson = json.replace("\\\"" , "\"") ; AppLogAggEntity agg = JSON.parseObject(newJson , AppLogAggEntity.class) ; return agg ; } /** * 合併聚合體中公共屬性到每一個日誌實體中。 */ public static void mergeProp(AppLogAggEntity agg) throws Exception { List<AppBaseLog> sublogs = new ArrayList<AppBaseLog>() ; sublogs.addAll(agg.getStartupLogs()); sublogs.addAll(agg.getErrorLogs()); sublogs.addAll(agg.getEventLogs()); sublogs.addAll(agg.getUsageLogs()); sublogs.addAll(agg.getPageLogs()); for(AppBaseLog log : sublogs){ doMergeProper(agg, log) ; } } /** * 將a的屬性合併到b上去 */ private static void doMergeProper(Object a , Object b ) throws Exception { // Map<String, Method> bcache = new HashMap<String, Method>() ; //提取b的屬性集合 BeanInfo b_bi = Introspector.getBeanInfo(b.getClass()) ; PropertyDescriptor[] b_pps = b_bi.getPropertyDescriptors(); for(PropertyDescriptor pp : b_pps){ String pname = pp.getName() ; Method setter = pp.getWriteMethod() ; if(setter != null){ bcache.put(pname,setter) ; } } BeanInfo bi = Introspector.getBeanInfo(a.getClass()) ; PropertyDescriptor[] pps = bi.getPropertyDescriptors() ; for(PropertyDescriptor pp : pps){ String name = pp.getName() ; Class type = pp.getPropertyType() ; Method getter = pp.getReadMethod() ; Method setter = pp.getWriteMethod() ; if(getter != null && setter != null && (type == String.class || type == int.class || type == Integer.class || type == long.class || type == Long.class)){ Object value = getter.invoke(a) ; Method b_set = bcache.get(name) ; if(b_set != null){ Class[] ptype = b_set.getParameterTypes(); if(ptype != null && ptype.length == 1){ if(ptype[0] == type){ b_set.setAccessible(true); b_set.invoke(b ,value ) ; } } } } } } }
package com.oldboy.umeng.hive.udf; import com.oldboy.umeng.common.domain.AppLogAggEntity; import com.oldboy.umeng.common.domain.AppStartupLog; import com.oldboy.umeng.common.util.LogUtil; import com.oldboy.umeng.hive.util.HiveUtil; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.serde2.objectinspector.*; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import java.beans.IntrospectionException; import java.util.ArrayList; import java.util.List; /** * 自定義日誌叉分函數 */ public class ForkLogUDTF extends GenericUDTF{ //存放字段名稱列表 ArrayList<String> fieldNames ; //字段對應的對象檢查器 ArrayList<ObjectInspector> fieldOIs ; //轉換器數組 ObjectInspectorConverters.Converter[] converters = new ObjectInspectorConverters.Converter[4] ; /** * 判斷參數合法性 , 定義輸出表結構 , 準備轉換器 */ public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException { if(args.length != 4){ throw new UDFArgumentException("參數個數不對,須要4個參數!!") ; } if(args[0].getCategory() != ObjectInspector.Category.PRIMITIVE || ((PrimitiveObjectInspector)args[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING){ throw new UDFArgumentException("第一個參數須要string類型"); } if(args[1].getCategory() != ObjectInspector.Category.PRIMITIVE || ((PrimitiveObjectInspector)args[1]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.LONG){ throw new UDFArgumentException("第二參數須要long類型"); } if(args[2].getCategory() != ObjectInspector.Category.PRIMITIVE || ((PrimitiveObjectInspector)args[2]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING){ throw new UDFArgumentException("第三個參數須要string類型"); } if(args[3].getCategory() != ObjectInspector.Category.PRIMITIVE || ((PrimitiveObjectInspector)args[3]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING){ throw new UDFArgumentException("第四個參數須要string類型"); } //正常處理 //處理輸入的OI converters[0] = ObjectInspectorConverters.getConverter(args[0] , PrimitiveObjectInspectorFactory.javaStringObjectInspector) ; converters[1] = ObjectInspectorConverters.getConverter(args[1] , PrimitiveObjectInspectorFactory.javaLongObjectInspector) ; converters[2] = ObjectInspectorConverters.getConverter(args[2] , PrimitiveObjectInspectorFactory.javaStringObjectInspector) ; converters[3] = ObjectInspectorConverters.getConverter(args[3] , PrimitiveObjectInspectorFactory.javaStringObjectInspector) ; //輸出表結構 fieldNames = new ArrayList<String>(); fieldOIs = new ArrayList<ObjectInspector>(); //組裝對象檢查器集合 try { HiveUtil.popOIs(AppStartupLog.class , fieldNames , fieldOIs); } catch (IntrospectionException e) { e.printStackTrace(); } return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); } public void process(Object[] args) throws HiveException { if (args.length != 4) { throw new UDFArgumentException("參數個數不對,須要4個參數!!"); } String servertimestr = (String)converters[0].convert(args[0]); long clienttimems = (Long)converters[1].convert(args[1]); String clientip = (String)converters[2].convert(args[2]); String log= (String)converters[3].convert(args[3]); //反序列化聚合體 AppLogAggEntity agg = LogUtil.deserLog(log) ; try { //合併屬性 LogUtil.mergeProp(agg); List<AppStartupLog> logs = agg.getStartupLogs() ; for(AppStartupLog l : logs){ forward(HiveUtil.convert2Arr( l , fieldNames)); } } catch (Exception e) { e.printStackTrace(); } } public void close() throws HiveException { } }
略shell
$hive>add jar /soft/hive/umeng_hive.jar ;
$hive>create function forklogs as 'com.oldboy.umeng.hive.udf.ForkLogUDTF' ;
$hive>use umeng_big11 ; $hive>select forklogs(servertimestr , clienttimems , clientip ,log) from raw_logs ;