DWD層:對ODS層數據進行清洗(去除空值,髒數據,超過極限範圍的數據,行式存儲改成列存儲,改壓縮格式)。
建立啓動表dwd_start_logjava
hive (gmall)> drop table if exists dwd_start_log; CREATE EXTERNAL TABLE dwd_start_log( `mid_id` string, `user_id` string, `version_code` string, `version_name` string, `lang` string, `source` string, `os` string, `area` string, `model` string, `brand` string, `sdk_version` string, `gmail` string, `height_width` string, `app_time` string, `network` string, `lng` string, `lat` string, `entry` string, `open_ad_type` string, `action` string, `loading_time` string, `detail` string, `extend1` string ) PARTITIONED BY (dt string) location '/warehouse/gmall/dwd/dwd_start_log/';
向啓動表中插入數據。apache
insert overwrite table dwd_start_log PARTITION (dt='2020-02-03') select get_json_object(line,'$.mid') mid_id, get_json_object(line,'$.uid') user_id, get_json_object(line,'$.vc') version_code, get_json_object(line,'$.vn') version_name, get_json_object(line,'$.l') lang, get_json_object(line,'$.sr') source, get_json_object(line,'$.os') os, get_json_object(line,'$.ar') area, get_json_object(line,'$.md') model, get_json_object(line,'$.ba') brand, get_json_object(line,'$.sv') sdk_version, get_json_object(line,'$.g') gmail, get_json_object(line,'$.hw') height_width, get_json_object(line,'$.t') app_time, get_json_object(line,'$.nw') network, get_json_object(line,'$.ln') lng, get_json_object(line,'$.la') lat, get_json_object(line,'$.entry') entry, get_json_object(line,'$.open_ad_type') open_ad_type, get_json_object(line,'$.action') action, get_json_object(line,'$.loading_time') loading_time, get_json_object(line,'$.detail') detail, get_json_object(line,'$.extend1') extend1 from ods_start_log where dt='2020-02-03';
測試json
hive (gmall)> select * from dwd_start_log limit 2;
建立事件日誌基礎明細表dwd_base_event_log服務器
hive (gmall)> drop table if exists dwd_base_event_log; CREATE EXTERNAL TABLE dwd_base_event_log( `mid_id` string, `user_id` string, `version_code` string, `version_name` string, `lang` string, `source` string, `os` string, `area` string, `model` string, `brand` string, `sdk_version` string, `gmail` string, `height_width` string, `app_time` string, `network` string, `lng` string, `lat` string, `event_name` string, `event_json` string, `server_time` string) PARTITIONED BY (`dt` string) stored as parquet location '/warehouse/gmall/dwd/dwd_base_event_log/';
建立UDF函數解析公共字段。app
在pom.xml文件中插入如下內容:maven
<properties> <project.build.sourceEncoding>UTF8</project.build.sourceEncoding> <hive.version>1.2.1</hive.version> </properties> <dependencies> <!--添加hive依賴--> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>${hive.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
具體UDF函數代碼以下:ide
package com.bbxy.udf; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.ql.exec.UDF; import org.json.JSONException; import org.json.JSONObject; public class BaseFieldUDF extends UDF { public String evaluate(String line, String jsonkeysString) { // 0 準備一個sb StringBuilder sb = new StringBuilder(); // 1 切割jsonkeys mid uid vc vn l sr os ar md String[] jsonkeys = jsonkeysString.split(","); // 2 處理line 服務器時間 | json String[] logContents = line.split("\\|"); // 3 合法性校驗 if (logContents.length != 2 || StringUtils.isBlank(logContents[1])) { return ""; } // 4 開始處理json try { JSONObject jsonObject = new JSONObject(logContents[1]); // 獲取cm裏面的對象 JSONObject base = jsonObject.getJSONObject("cm"); // 循環遍歷取值 for (int i = 0; i < jsonkeys.length; i++) { String filedName = jsonkeys[i].trim(); if (base.has(filedName)) { sb.append(base.getString(filedName)).append("\t"); } else { sb.append("\t"); } } sb.append(jsonObject.getString("et")).append("\t"); sb.append(logContents[0]).append("\t"); } catch (JSONException e) { e.printStackTrace(); } return sb.toString(); } }
定義UDTF函數解析具體事件字段。
具體UDTF函數代碼以下:函數
package com.bbxy.udtf; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.json.JSONArray; import org.json.JSONException; import java.util.ArrayList; public class EventJsonUDTF extends GenericUDTF { //該方法中,咱們將指定輸出參數的名稱和參數類型: @Override public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException { ArrayList<String> fieldNames = new ArrayList<String>(); ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(); fieldNames.add("event_name"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldNames.add("event_json"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); } //輸入1條記錄,輸出若干條結果 @Override public void process(Object[] objects) throws HiveException { // 獲取傳入的et String input = objects[0].toString(); // 若是傳進來的數據爲空,直接返回過濾掉該數據 if (StringUtils.isBlank(input)) { return; } else { try { // 獲取一共有幾個事件(ad/facoriters) JSONArray ja = new JSONArray(input); if (ja == null) return; // 循環遍歷每個事件 for (int i = 0; i < ja.length(); i++) { String[] result = new String[2]; try { // 取出每一個的事件名稱(ad/facoriters) result[0] = ja.getJSONObject(i).getString("en"); // 取出每個事件總體 result[1] = ja.getString(i); } catch (JSONException e) { continue; } // 將結果返回 forward(result); } } catch (JSONException e) { e.printStackTrace(); } } } //當沒有記錄處理的時候該方法會被調用,用來清理代碼或者產生額外的輸出 @Override public void close() throws HiveException { } }
將jar包添加到Hive的classpathoop
hive (gmall)> add jar /opt/module/hive/hivefunction-1.0-SNAPSHOT.jar;
建立臨時函數與開發好的java class關聯。測試
hive (gmall)> create temporary function base_analizer as 'com.bbxy.udf.BaseFieldUDF'; create temporary function flat_analizer as 'com.bbxy.udtf.EventJsonUDTF';
向事件日誌基礎明細表dwd_base_event_log中插入數據。
hive (gmall)> set hive.exec.dynamic.partition.mode=nonstrict; insert overwrite table dwd_base_event_log PARTITION (dt='2020-02-03') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, event_name, event_json, server_time from ( select split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[0] as mid_id, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[1] as user_id, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[2] as version_code, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[3] as version_name, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[4] as lang, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[5] as source, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[6] as os, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[7] as area, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[8] as model, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[9] as brand, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[10] as sdk_version, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[11] as gmail, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[12] as height_width, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[13] as app_time, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[14] as network, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[15] as lng, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[16] as lat, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[17] as ops, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[18] as server_time from ods_event_log where dt='2020-02-03' and base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la')<>'' ) sdk_log lateral view flat_analizer(ops) tmp_k as event_name, event_json;
測試
hive (gmall)> select * from dwd_base_event_log limit 2;
將剩餘數據導入數倉。
[hadoop@hadoop151 bin]$ dwd_base_log.sh 2020-01-01 2020-01-31