hadoop fs -rm -r -skipTrash /flumu //刪除跳過垃圾回收站java
[kris@hadoop102 ~]$ hadoop fs -mkdir -p /user/hive/warehouse/ods.db/origin_user_behavior/2019-05-14 把數據上傳到ods層 [kris@hadoop101 datas]$ hadoop fs -put *.txt /user/hive/warehouse/ods.db/origin_user_behavior/2019-05-14
[kris@hadoop101 spark]$ bin/spark-submit --total-executor-cores 2 --class com.atguigu.data_monitor.GeneratorUserBehaviorMonitorData Online-Edu-1.0-SNAPSHOT.jar 2019-05-20 [kris@hadoop101 spark]$ hadoop fs -mkdir -p /user/hive/warehouse/ods.db/origin_user_behavior/2019-05-20 [kris@hadoop101 spark]$ hadoop fs -put *.txt /user/hive/warehouse/ods.db/origin_user_behavior/2019-05-20 [kris@hadoop101 spark]$ rm -r *.txt
Hive字段以下:android
uid STRING comment "用戶惟一標識", username STRING comment "用戶暱稱", gender STRING comment "性別", level TINYINT comment "1表明小學,2表明初中,3表明高中", is_vip TINYINT comment "0表明不是會員,1表明是會員", os STRING comment "操做系統:os,android等", channel STRING comment "下載渠道:auto,toutiao,huawei", net_config STRING comment "當前網絡類型", ip STRING comment "IP地址", phone STRING comment "手機號碼", video_id INT comment "視頻id", video_length INT comment "視頻時長,單位秒", start_video_time BIGINT comment "開始看視頻的時間綴,秒級", end_video_time BIGINT comment "退出視頻時的時間綴,秒級", version STRING comment "版本", event_key STRING comment "事件類型", event_time BIGINT comment "事件發生時的時間綴,秒級"
1) 用SparkCore將數據清洗,清洗需求以下:shell
a) 手機號脫敏:187xxxx2659apache
b) 過濾重複行(重複條件,uid,event_key,event_time三者都相同即爲重複)服務器
c) 最終數據保存到dwd.user_behavior分區表,以dt(天)爲分區條件,表的文件存儲格式爲ORC,數據總量爲xxxx條網絡
總體流程(使用調度系統調度)app
a) SparkCore清洗數據,寫入到/user/hive/warehouse/tmp.db/user_behavior_${day}目錄socket
b) 創建tmp.user_behavior_${day}臨時表,並加載上面清洗後的數據ide
c) 使用hive引擎,並用開窗函數row_number,將tmp.user_behavior_${day}表數據插入到dwd.user_behavior表中函數
d) 刪除tmp.user_behavior_${day}臨時表
在IDEA中配置參數;在resources在拷貝core-site.xml、hdfs-site.xml、hive-site.xml文件
數據清洗代碼實現:
/** * 用戶行爲數據清洗 * 一、驗證數據格式是否正確,切分後長度必須爲17 * 二、手機號脫敏,格式爲123xxxx4567 * 三、去掉username中帶有的\n,不然致使寫入HDFS時會換行 */ object UserBehaviorCleaner { def main(args: Array[String]): Unit = { if (args.length != 2){ println("Usage:please input inputPath and outputPath") System.exit(1) } val inputPaht = args(0) val outputPath = args(1) val conf: SparkConf = new SparkConf().setAppName(getClass.getSimpleName) //.setMaster("local[*]") val sparkContext = new SparkContext(conf) // 經過輸入路徑獲取RDD val eventRDD: RDD[String] = sparkContext.textFile(inputPaht) // 清洗數據,在算子中不要寫大量業務邏輯,應該將邏輯封裝到方法中 eventRDD.filter(event => checkEventValid(event)).map(event => maskPhone(event)) .map(event => repairUsername(event)) .coalesce(3) .saveAsTextFile(outputPath) sparkContext.stop() } def repairUsername(event:String)= { val field: Array[String] = event.split("\t") //取出用戶暱稱 val username: String = field(1) if (username != null && "".equals(username)){ field(1) = username.replace("\n", "") } field.mkString("\t") } def maskPhone(event:String): String ={ var maskPhone = new StringBuilder val fields: Array[String] = event.split("\t") //取出手機號 var phone: String = fields(9) // 手機號不爲空時作掩碼處理 if (phone != null && !"".equals(phone)){ maskPhone = maskPhone.append(phone.substring(0,3)).append("xxxx").append(phone.substring(7,11)) fields(9) = maskPhone.toString() } fields.mkString("\t") } def checkEventValid(event:String) ={ event.split("\t").length == 17 } }
-DHADOOP_USER_NAME=kris -Dspark.master=local[2]
輸入路徑:/user/hive/warehouse/ods.db/origin_user_behavior2019-05-14
輸出路徑:/user/hive/warehouse/tmp.db/user_behavior20190514
Spark on yarn完整提交命令 輸入:HDFS中ODS目錄; 輸出: HDFS中tmp目錄
java -cp mainClass須要指定全類名; java -jar 就不須要指定了,指定jar包便可
spark-submit --master yarn --deploy-mode cluster \ --num-executors 8 \ --executor-cores 4 \ --executor-memory 12G \ --class com.kris.user_behavior.UserBehaviorCleaner UserBehaviorCleaner.jar \ hdfs://hadoop101:9000/user/hive/warehouse/ods.db/origin_user_behavior/${day} \ hdfs://hadoop101:9000/user/hive/warehouse/tmp.db/user_behavior_${day}
建立臨時--內部表:
路徑爲剛剛輸入清洗後的輸出路徑,就可直接在Hive表中查詢出數據;
建立內部表: //數據已經導入了; create database tmp; drop table if exists tmp.user_behavior20190514;
create table if not exists tmp.user_behavior20190514( uid STRING comment "用戶惟一標識", username STRING comment "用戶暱稱", gender STRING comment "性別", level TINYINT comment "1表明小學,2表明初中,3表明高中", is_vip TINYINT comment "0表明不是會員,1表明是會員", os STRING comment "操做系統:os,android等", channel STRING comment "下載渠道:auto,toutiao,huawei", net_config STRING comment "當前網絡類型", ip STRING comment "IP地址", phone STRING comment "手機號碼", video_id INT comment "視頻id", video_length INT comment "視頻時長,單位秒", start_video_time BIGINT comment "開始看視頻的時間綴,秒級", end_video_time BIGINT comment "退出視頻時的時間綴,秒級", version STRING comment "版本", event_key STRING comment "事件類型", event_time BIGINT comment "事件發生時的時間綴,秒級") row format delimited fields terminated by "\t" location "/user/hive/warehouse/tmp.db/user_behavior20190514"; //數據所在的目錄;
建立orc格式的外部表
.txt文件不能直接load成orc格式文件
Caused by: java.io.IOException: Malformed ORC file
緣由:
ORC格式是列式存儲的表,不能直接從本地文件導入數據,只有當數據源表也是ORC格式存儲時,才能夠直接加載,不然會出現上述報錯。
解決辦法:
要麼將數據源表改成以ORC格式存儲的表,要麼新建一個以textfile格式的臨時表先將源文件數據加載到該表,而後在從textfile表中insert數據到ORC目標表中。
經過insert overwrite將txt格式轉換成orc格式;
create database dwd; drop table if exists dwd.user_behavior;
create external table if not exists dwd.user_behavior( uid STRING comment "用戶惟一標識", username STRING comment "用戶暱稱", gender STRING comment "性別", level TINYINT comment "1表明小學,2表明初中,3表明高中", is_vip TINYINT comment "0表明不是會員,1表明是會員", os STRING comment "操做系統:os,android等", channel STRING comment "下載渠道:auto,toutiao,huawei", net_config STRING comment "當前網絡類型", ip STRING comment "IP地址", phone STRING comment "手機號碼", video_id INT comment "視頻id", video_length INT comment "視頻時長,單位秒", start_video_time BIGINT comment "開始看視頻的時間綴,秒級", end_video_time BIGINT comment "退出視頻時的時間綴,秒級", version STRING comment "版本", event_key STRING comment "事件類型", event_time BIGINT comment "事件發生時的時間綴,秒級") partitioned by(dt INT) row format delimited fields terminated by "\t" stored as ORC
查詢並在外部表中插入數據:
create table if not exists tmp.user_behavior20190307( uid STRING comment "用戶惟一標識", username STRING comment "用戶暱稱", gender STRING comment "性別", level TINYINT comment "1表明小學,2表明初中,3表明高中", is_vip TINYINT comment "0表明不是會員,1表明是會員", os STRING comment "操做系統:os,android等", channel STRING comment "下載渠道:auto,toutiao,huawei", net_config STRING comment "當前網絡類型", ip STRING comment "IP地址", phone STRING comment "手機號碼", video_id INT comment "視頻id", video_length INT comment "視頻時長,單位秒", start_video_time BIGINT comment "開始看視頻的時間綴,秒級", end_video_time BIGINT comment "退出視頻時的時間綴,秒級", version STRING comment "版本", event_key STRING comment "事件類型", event_time BIGINT comment "事件發生時的時間綴,秒級") row format delimited fields terminated by "\t" location "/user/hive/warehouse/tmp.db/user_behavior20190307";
insert overwrite table dwd.user_behavior partition(dt=20190307) select uid,username,gender,level,is_vip,os,channel,net_config,ip,phone,video_id, video_length,start_video_time,end_video_time,version,event_key,event_time from ( select uid,username,gender,level,is_vip,os,channel,net_config,ip,phone,video_id, video_length,start_video_time,end_video_time,version,event_key,event_time, row_number() OVER (PARTITION BY uid,event_key,event_time ORDER BY event_time) u_rank from tmp.user_behavior20190307 ) temp where u_rank = 1;
在IDEA中進行調試:
場景:之後工做中常常會遇到在本地執行沒有問題,到了服務器跑的數據就是錯誤的
IDEA設置:Run --> Edit Configurations添加Remote
[kris@hadoop101 spark]$ bin/spark-submit --master local[2] --driver-java-options "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=18888"
--class com.atguigu.user_behavior.UserBehaviorCleaner Online-Edu-1.0-SNAPSHOT.jar
hdfs://hadoop101:9000/user/hive/warehouse/ods.db/origin_user_behavior/2019-05-14
hdfs://hadoop101:9000/user/hive/warehouse/tmp.db/user_behavior20190514 Listening for transport dt_socket at address: 18888
在IDEA中就能夠進行EDBUG:
使用spark-shell進行調試
[kris@hadoop101 spark]$ bin/spark-shell --master local[2] --jars Online-Edu-1.0-SNAPSHOT.jar scala> sc.textFile("/user/hive/warehouse/ods.db/origin_user_behavior/2019-05-14") res0: org.apache.spark.rdd.RDD[String] = /user/hive/warehouse/ods.db/origin_user_behavior/2019-05-14 MapPartitionsRDD[1] at textFile at <console>:25 scala> def checkEventValid(event:String) ={ | event.split("\t").length == 17 | } checkEventValid: (event: String)Boolean