數據清洗

 

hadoop fs -rm -r -skipTrash /flumu //刪除跳過垃圾回收站java

 導入數據到HDFS

[kris@hadoop102 ~]$ hadoop fs -mkdir -p /user/hive/warehouse/ods.db/origin_user_behavior/2019-05-14
把數據上傳到ods層
[kris@hadoop101 datas]$ hadoop fs -put *.txt /user/hive/warehouse/ods.db/origin_user_behavior/2019-05-14
[kris@hadoop101 spark]$  bin/spark-submit --total-executor-cores 2 --class com.atguigu.data_monitor.GeneratorUserBehaviorMonitorData Online-Edu-1.0-SNAPSHOT.jar 2019-05-20
[kris@hadoop101 spark]$ hadoop fs -mkdir -p /user/hive/warehouse/ods.db/origin_user_behavior/2019-05-20
[kris@hadoop101 spark]$ hadoop fs -put *.txt /user/hive/warehouse/ods.db/origin_user_behavior/2019-05-20
[kris@hadoop101 spark]$ rm -r *.txt

 

 

數據清洗和加載

Hive字段以下:android

uid STRING comment "用戶惟一標識",
username STRING comment "用戶暱稱",
gender STRING comment "性別",
level TINYINT comment "1表明小學,2表明初中,3表明高中",
is_vip TINYINT comment "0表明不是會員,1表明是會員",
os STRING comment "操做系統:os,android等",
channel STRING comment "下載渠道:auto,toutiao,huawei",
net_config STRING comment "當前網絡類型",
ip STRING comment "IP地址",
phone STRING comment "手機號碼",
video_id INT comment "視頻id",
video_length INT comment "視頻時長,單位秒",
start_video_time BIGINT comment "開始看視頻的時間綴,秒級",
end_video_time BIGINT comment "退出視頻時的時間綴,秒級",
version STRING comment "版本",
event_key STRING comment "事件類型",
event_time BIGINT comment "事件發生時的時間綴,秒級"
View Code

1) 用SparkCore將數據清洗,清洗需求以下:shell

  a)  手機號脫敏:187xxxx2659apache

  b) 過濾重複行(重複條件,uid,event_key,event_time三者都相同即爲重複)服務器

  c)  最終數據保存到dwd.user_behavior分區表,以dt(天)爲分區條件,表的文件存儲格式爲ORC,數據總量爲xxxx條網絡

總體流程(使用調度系統調度)app

  a) SparkCore清洗數據,寫入到/user/hive/warehouse/tmp.db/user_behavior_${day}目錄socket

  b) 創建tmp.user_behavior_${day}臨時表,並加載上面清洗後的數據ide

  c)  使用hive引擎,並用開窗函數row_number,將tmp.user_behavior_${day}表數據插入到dwd.user_behavior表中函數

  d) 刪除tmp.user_behavior_${day}臨時表

在IDEA中配置參數;在resources在拷貝core-site.xml、hdfs-site.xml、hive-site.xml文件

數據清洗代碼實現:

/**
  * 用戶行爲數據清洗
  * 一、驗證數據格式是否正確,切分後長度必須爲17
  * 二、手機號脫敏,格式爲123xxxx4567
  * 三、去掉username中帶有的\n,不然致使寫入HDFS時會換行
  */

object UserBehaviorCleaner {
  def main(args: Array[String]): Unit = {

    if (args.length != 2){
      println("Usage:please input inputPath and outputPath")
      System.exit(1)
    }
    val inputPaht = args(0)
    val outputPath = args(1)

    val conf: SparkConf = new SparkConf().setAppName(getClass.getSimpleName) //.setMaster("local[*]")
    val sparkContext = new SparkContext(conf)
    // 經過輸入路徑獲取RDD
    val eventRDD: RDD[String] = sparkContext.textFile(inputPaht)
    // 清洗數據,在算子中不要寫大量業務邏輯,應該將邏輯封裝到方法中
    eventRDD.filter(event => checkEventValid(event)).map(event => maskPhone(event))
      .map(event => repairUsername(event))
      .coalesce(3)
      .saveAsTextFile(outputPath)

    sparkContext.stop()

  }
  def repairUsername(event:String)= {
    val field: Array[String] = event.split("\t")
    //取出用戶暱稱
    val username: String = field(1)
    if (username != null && "".equals(username)){
      field(1) = username.replace("\n", "")
    }
    field.mkString("\t")

  }

  def maskPhone(event:String): String ={
    var maskPhone = new StringBuilder
    val fields: Array[String] = event.split("\t")
    //取出手機號
    var phone: String = fields(9)
    // 手機號不爲空時作掩碼處理
    if (phone != null && !"".equals(phone)){
      maskPhone = maskPhone.append(phone.substring(0,3)).append("xxxx").append(phone.substring(7,11))
      fields(9) = maskPhone.toString()

    }
    fields.mkString("\t")
  }

  def checkEventValid(event:String) ={
    event.split("\t").length == 17
  }

}
View Code

               

    

    -DHADOOP_USER_NAME=kris -Dspark.master=local[2]

    輸入路徑:/user/hive/warehouse/ods.db/origin_user_behavior2019-05-14

    輸出路徑:/user/hive/warehouse/tmp.db/user_behavior20190514

Spark on yarn完整提交命令  輸入:HDFS中ODS目錄; 輸出: HDFS中tmp目錄

java -cp  mainClass須要指定全類名; java -jar 就不須要指定了,指定jar包便可

spark-submit --master yarn --deploy-mode cluster \
--num-executors 8 \
--executor-cores 4 \
--executor-memory 12G \
--class com.kris.user_behavior.UserBehaviorCleaner UserBehaviorCleaner.jar \
hdfs://hadoop101:9000/user/hive/warehouse/ods.db/origin_user_behavior/${day} \
hdfs://hadoop101:9000/user/hive/warehouse/tmp.db/user_behavior_${day} 
View Code

 

建立臨時--內部表:

路徑爲剛剛輸入清洗後的輸出路徑,就可直接在Hive表中查詢出數據;

 

建立內部表: //數據已經導入了;
create database tmp;

drop table if exists tmp.user_behavior20190514; 
create table
if not exists tmp.user_behavior20190514( uid STRING comment "用戶惟一標識", username STRING comment "用戶暱稱", gender STRING comment "性別", level TINYINT comment "1表明小學,2表明初中,3表明高中", is_vip TINYINT comment "0表明不是會員,1表明是會員", os STRING comment "操做系統:os,android等", channel STRING comment "下載渠道:auto,toutiao,huawei", net_config STRING comment "當前網絡類型", ip STRING comment "IP地址", phone STRING comment "手機號碼", video_id INT comment "視頻id", video_length INT comment "視頻時長,單位秒", start_video_time BIGINT comment "開始看視頻的時間綴,秒級", end_video_time BIGINT comment "退出視頻時的時間綴,秒級", version STRING comment "版本", event_key STRING comment "事件類型", event_time BIGINT comment "事件發生時的時間綴,秒級") row format delimited fields terminated by "\t" location "/user/hive/warehouse/tmp.db/user_behavior20190514"; //數據所在的目錄;

建立orc格式的外部表

.txt文件不能直接load成orc格式文件
Caused by: java.io.IOException: Malformed ORC file
緣由:
ORC格式是列式存儲的表,不能直接從本地文件導入數據,只有當數據源表也是ORC格式存儲時,才能夠直接加載,不然會出現上述報錯。
解決辦法:
要麼將數據源表改成以ORC格式存儲的表,要麼新建一個以textfile格式的臨時表先將源文件數據加載到該表,而後在從textfile表中insert數據到ORC目標表中。

經過insert overwrite將txt格式轉換成orc格式;

create database dwd; drop table if exists dwd.user_behavior;
create external table
if not exists dwd.user_behavior( uid STRING comment "用戶惟一標識", username STRING comment "用戶暱稱", gender STRING comment "性別", level TINYINT comment "1表明小學,2表明初中,3表明高中", is_vip TINYINT comment "0表明不是會員,1表明是會員", os STRING comment "操做系統:os,android等", channel STRING comment "下載渠道:auto,toutiao,huawei", net_config STRING comment "當前網絡類型", ip STRING comment "IP地址", phone STRING comment "手機號碼", video_id INT comment "視頻id", video_length INT comment "視頻時長,單位秒", start_video_time BIGINT comment "開始看視頻的時間綴,秒級", end_video_time BIGINT comment "退出視頻時的時間綴,秒級", version STRING comment "版本", event_key STRING comment "事件類型", event_time BIGINT comment "事件發生時的時間綴,秒級") partitioned by(dt INT) row format delimited fields terminated by "\t" stored as ORC

查詢並在外部表中插入數據:

create table if not exists tmp.user_behavior20190307(
uid STRING comment "用戶惟一標識",
username STRING comment "用戶暱稱",
gender STRING comment "性別",
level TINYINT comment "1表明小學,2表明初中,3表明高中",
is_vip TINYINT comment "0表明不是會員,1表明是會員",
os STRING comment "操做系統:os,android等",
channel STRING comment  "下載渠道:auto,toutiao,huawei",
net_config STRING comment "當前網絡類型",
ip STRING comment "IP地址",
phone STRING comment "手機號碼",
video_id INT comment "視頻id",
video_length INT comment "視頻時長,單位秒",
start_video_time BIGINT comment "開始看視頻的時間綴,秒級",
end_video_time BIGINT comment "退出視頻時的時間綴,秒級",
version STRING comment "版本",
event_key STRING comment  "事件類型",
event_time BIGINT comment  "事件發生時的時間綴,秒級")
row format delimited fields terminated by "\t" 
location "/user/hive/warehouse/tmp.db/user_behavior20190307";


insert overwrite table dwd.user_behavior partition(dt=20190307) select uid,username,gender,level,is_vip,os,channel,net_config,ip,phone,video_id, video_length,start_video_time,end_video_time,version,event_key,event_time from ( select uid,username,gender,level,is_vip,os,channel,net_config,ip,phone,video_id, video_length,start_video_time,end_video_time,version,event_key,event_time, row_number() OVER (PARTITION BY uid,event_key,event_time ORDER BY event_time) u_rank from tmp.user_behavior20190307 ) temp where u_rank = 1;

 

遠程調試

在IDEA中進行調試:

       場景:之後工做中常常會遇到在本地執行沒有問題,到了服務器跑的數據就是錯誤的

    IDEA設置:Run --> Edit Configurations添加Remote

 

[kris@hadoop101 spark]$ bin/spark-submit --master local[2] --driver-java-options "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=18888" 
--class com.atguigu.user_behavior.UserBehaviorCleaner Online-Edu-1.0-SNAPSHOT.jar
hdfs://hadoop101:9000/user/hive/warehouse/ods.db/origin_user_behavior/2019-05-14
hdfs:
//hadoop101:9000/user/hive/warehouse/tmp.db/user_behavior20190514 Listening for transport dt_socket at address: 18888

在IDEA中就能夠進行EDBUG:

使用spark-shell進行調試

[kris@hadoop101 spark]$ bin/spark-shell --master local[2] --jars Online-Edu-1.0-SNAPSHOT.jar scala> sc.textFile("/user/hive/warehouse/ods.db/origin_user_behavior/2019-05-14") res0: org.apache.spark.rdd.RDD[String] = /user/hive/warehouse/ods.db/origin_user_behavior/2019-05-14 MapPartitionsRDD[1] at textFile at <console>:25 scala>   def checkEventValid(event:String) ={ |     event.split("\t").length == 17
     | } checkEventValid: (event: String)Boolean
相關文章
相關標籤/搜索