14. 數據倉庫分層之DWD層

DWD層:對ODS層數據進行清洗(去除空值,髒數據,超過極限範圍的數據,行式存儲改成列存儲,改壓縮格式)。

DWD層啓動表數據解析


  1. 建立啓動表dwd_start_logjava

    hive (gmall)> drop table if exists dwd_start_log;
    CREATE EXTERNAL TABLE dwd_start_log(
    `mid_id` string,
    `user_id` string, 
    `version_code` string, 
    `version_name` string, 
    `lang` string, 
    `source` string, 
    `os` string, 
    `area` string, 
    `model` string,
    `brand` string, 
    `sdk_version` string, 
    `gmail` string, 
    `height_width` string,  
    `app_time` string,
    `network` string, 
    `lng` string, 
    `lat` string, 
    `entry` string, 
    `open_ad_type` string, 
    `action` string, 
    `loading_time` string, 
    `detail` string, 
    `extend1` string
    )
    PARTITIONED BY (dt string)
    location '/warehouse/gmall/dwd/dwd_start_log/';
  2. 向啓動表中插入數據。apache

    insert overwrite table dwd_start_log
    PARTITION (dt='2020-02-03')
    select 
       get_json_object(line,'$.mid') mid_id,
       get_json_object(line,'$.uid') user_id,
       get_json_object(line,'$.vc') version_code,
       get_json_object(line,'$.vn') version_name,
       get_json_object(line,'$.l') lang,
       get_json_object(line,'$.sr') source,
       get_json_object(line,'$.os') os,
       get_json_object(line,'$.ar') area,
       get_json_object(line,'$.md') model,
       get_json_object(line,'$.ba') brand,
       get_json_object(line,'$.sv') sdk_version,
       get_json_object(line,'$.g') gmail,
       get_json_object(line,'$.hw') height_width,
       get_json_object(line,'$.t') app_time,
       get_json_object(line,'$.nw') network,
       get_json_object(line,'$.ln') lng,
       get_json_object(line,'$.la') lat,
       get_json_object(line,'$.entry') entry,
       get_json_object(line,'$.open_ad_type') open_ad_type,
       get_json_object(line,'$.action') action,
       get_json_object(line,'$.loading_time') loading_time,
       get_json_object(line,'$.detail') detail,
       get_json_object(line,'$.extend1') extend1
    from ods_start_log 
    where dt='2020-02-03';
  3. 測試json

    hive (gmall)> select * from dwd_start_log limit 2;
  4. 使用腳本將全部數據導入數倉。(腳本在筆記)

DWD層事件表數據解析


image.png

  1. 建立事件日誌基礎明細表dwd_base_event_log服務器

    hive (gmall)> drop table if exists dwd_base_event_log;
    CREATE EXTERNAL TABLE dwd_base_event_log(
    `mid_id` string,
    `user_id` string, 
    `version_code` string, 
    `version_name` string, 
    `lang` string, 
    `source` string, 
    `os` string, 
    `area` string, 
    `model` string,
    `brand` string, 
    `sdk_version` string, 
    `gmail` string, 
    `height_width` string, 
    `app_time` string, 
    `network` string, 
    `lng` string, 
    `lat` string, 
    `event_name` string, 
    `event_json` string, 
    `server_time` string)
    PARTITIONED BY (`dt` string)
    stored as parquet
    location '/warehouse/gmall/dwd/dwd_base_event_log/';
  2. 建立UDF函數解析公共字段。
    image.pngapp

    1. 建立maven工程:hivefunction
    2. 在pom.xml文件中插入如下內容:maven

      <properties>
        <project.build.sourceEncoding>UTF8</project.build.sourceEncoding>
        <hive.version>1.2.1</hive.version>
      </properties>
      
      <dependencies>
        <!--添加hive依賴-->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>${hive.version}</version>
        </dependency>
      </dependencies>
      
      <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
      </build>
    3. 具體UDF函數代碼以下:ide

      package com.bbxy.udf;
      
      import org.apache.commons.lang.StringUtils;
      import org.apache.hadoop.hive.ql.exec.UDF;
      import org.json.JSONException;
      import org.json.JSONObject;
      
      public class BaseFieldUDF extends UDF {
      
          public String evaluate(String line, String jsonkeysString) {
            
              // 0 準備一個sb
              StringBuilder sb = new StringBuilder();
      
              // 1 切割jsonkeys  mid uid vc vn l sr os ar md
              String[] jsonkeys = jsonkeysString.split(",");
      
              // 2 處理line   服務器時間 | json
              String[] logContents = line.split("\\|");
      
              // 3 合法性校驗
              if (logContents.length != 2 || StringUtils.isBlank(logContents[1])) {
                  return "";
              }
      
              // 4 開始處理json
              try {
                  JSONObject jsonObject = new JSONObject(logContents[1]);
      
                   // 獲取cm裏面的對象
                  JSONObject base = jsonObject.getJSONObject("cm");
      
                  // 循環遍歷取值
                  for (int i = 0; i < jsonkeys.length; i++) {
                      String filedName = jsonkeys[i].trim();
      
                      if (base.has(filedName)) {
                          sb.append(base.getString(filedName)).append("\t");
                      } else {
                        sb.append("\t");
                      }
                  }
      
                  sb.append(jsonObject.getString("et")).append("\t");
                  sb.append(logContents[0]).append("\t");
              } catch (JSONException e) {
                  e.printStackTrace();
              }
      
              return sb.toString();
          }
        }
  3. 定義UDTF函數解析具體事件字段。
    image.png
    具體UDTF函數代碼以下:函數

    package com.bbxy.udtf;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    import org.apache.hadoop.hive.ql.metadata.HiveException;
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
    import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
    import org.json.JSONArray;
    import org.json.JSONException;
    
    import java.util.ArrayList;
    
    public class EventJsonUDTF extends GenericUDTF {
        //該方法中,咱們將指定輸出參數的名稱和參數類型:
        @Override
        public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
    
            ArrayList<String> fieldNames = new ArrayList<String>();
            ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
    
            fieldNames.add("event_name");
            fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
            fieldNames.add("event_json");
            fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
    
            return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
        }
    
        //輸入1條記錄,輸出若干條結果
        @Override
        public void process(Object[] objects) throws HiveException {
            // 獲取傳入的et
            String input = objects[0].toString();
    
            // 若是傳進來的數據爲空,直接返回過濾掉該數據
            if (StringUtils.isBlank(input)) {
                return;
            } else {
                try {
                    // 獲取一共有幾個事件(ad/facoriters)
                    JSONArray ja = new JSONArray(input);
    
                    if (ja == null)
                        return;
    
                    // 循環遍歷每個事件
                    for (int i = 0; i < ja.length(); i++) {
                        String[] result = new String[2];
    
                        try {
                            // 取出每一個的事件名稱(ad/facoriters)
                            result[0] = ja.getJSONObject(i).getString("en");
    
                            // 取出每個事件總體
                            result[1] = ja.getString(i);
                        } catch (JSONException e) {
                            continue;
                        }
    
                        // 將結果返回
                        forward(result);
                    }
                } catch (JSONException e) {
                    e.printStackTrace();
                }
            }
        }
    
        //當沒有記錄處理的時候該方法會被調用,用來清理代碼或者產生額外的輸出
        @Override
        public void close() throws HiveException {
    
        }
    }
  4. 打包。將打包好的jar包放入虛擬機「hadoop151」的「/opt/module/hive」目錄下。
  5. 將jar包添加到Hive的classpathoop

    hive (gmall)> add jar /opt/module/hive/hivefunction-1.0-SNAPSHOT.jar;
  6. 建立臨時函數與開發好的java class關聯。測試

    hive (gmall)> 
    create temporary function base_analizer as 'com.bbxy.udf.BaseFieldUDF';
    create temporary function flat_analizer as 'com.bbxy.udtf.EventJsonUDTF';
  7. 向事件日誌基礎明細表dwd_base_event_log中插入數據。

    hive (gmall)> set hive.exec.dynamic.partition.mode=nonstrict;
    
    insert overwrite table dwd_base_event_log 
    PARTITION (dt='2020-02-03')
    select
        mid_id,
        user_id,
        version_code,
        version_name,
        lang,
        source,
        os,
        area,
        model,
        brand,
        sdk_version,
        gmail,
        height_width,
        app_time,
        network,
        lng,
        lat,
        event_name,
        event_json,
        server_time
    from
    (
        select
           split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[0]   as mid_id,
           split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[1]   as user_id,
           split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[2]   as version_code,
           split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[3]   as version_name,
           split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[4]   as lang,
           split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[5]   as source,
           split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[6]   as os,
           split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[7]   as area,
           split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[8]   as model,
           split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[9]   as brand,
           split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[10]   as sdk_version,
           split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[11]  as gmail,
           split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[12]  as height_width,
           split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[13]  as app_time,
           split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[14]  as network,
           split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[15]  as lng,
           split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[16]  as lat,
           split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[17]  as ops,
           split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[18]  as server_time
        from ods_event_log where dt='2020-02-03'  and  base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la')<>'' 
    ) sdk_log lateral view flat_analizer(ops) tmp_k as event_name, event_json;
  8. 測試

    hive (gmall)> select * from dwd_base_event_log limit 2;
  9. 將剩餘數據導入數倉。

    [hadoop@hadoop151 bin]$ dwd_base_log.sh 2020-01-01 2020-01-31
相關文章
相關標籤/搜索