生產上完成TopN統計流程

背景

現有城市信息和產品信息兩張表在MySQL中,另外有用戶點擊產品日誌以文本形式存在hdfs上,現要求統計每一個個城市區域下點擊量前三的產品名,具體信息見下方。mysql

mysql> show tables;
+---------------------------------+
| Tables_in_d7                    |
+---------------------------------+
| city_info                       |
| product_info                    |
| result_product_area_clicks_top3 |
+---------------------------------+
3 rows in set (0.00 sec)

mysql> desc city_info;
+-----------+--------------+------+-----+---------+-------+
| Field     | Type         | Null | Key | Default | Extra |
+-----------+--------------+------+-----+---------+-------+
| city_id   | int(11)      | YES  |     | NULL    |       |
| city_name | varchar(255) | YES  |     | NULL    |       |
| area      | varchar(255) | YES  |     | NULL    |       |
+-----------+--------------+------+-----+---------+-------+
3 rows in set (0.00 sec)

mysql> select * from city_info;
+---------+-----------+------+
| city_id | city_name | area |
+---------+-----------+------+
|       1 | BEIJING   | NC   |
|       2 | SHANGHAI  | EC   |
|       3 | NANJING   | EC   |
|       4 | GUANGZHOU | SC   |
|       5 | SANYA     | SC   |
|       6 | WUHAN     | CC   |
|       7 | CHANGSHA  | CC   |
|       8 | XIAN      | NW   |
|       9 | CHENGDU   | SW   |
|      10 | HAERBIN   | NE   |
+---------+-----------+------+
10 rows in set (0.00 sec)

mysql> desc product_info; 
+--------------+--------------+------+-----+---------+-------+
| Field        | Type         | Null | Key | Default | Extra |
+--------------+--------------+------+-----+---------+-------+
| product_id   | int(11)      | YES  |     | NULL    |       |
| product_name | varchar(255) | YES  |     | NULL    |       |
| extend_info  | varchar(255) | YES  |     | NULL    |       |
+--------------+--------------+------+-----+---------+-------+
3 rows in set (0.00 sec)

mysql> select * from product_info limit 10;  <-- product_info總數100
+------------+--------------+----------------------+
| product_id | product_name | extend_info          |
+------------+--------------+----------------------+
|          1 | product1     | {"product_status":1} |
|          2 | product2     | {"product_status":1} |
|          3 | product3     | {"product_status":1} |
|          4 | product4     | {"product_status":1} |
|          5 | product5     | {"product_status":1} |
|          6 | product6     | {"product_status":1} |
|          7 | product7     | {"product_status":1} |
|          8 | product8     | {"product_status":1} |
|          9 | product9     | {"product_status":0} |
|         10 | product10    | {"product_status":1} |
+------------+--------------+----------------------+
10 rows in set (0.00 sec)

[hadoop@hadoop001 data]$ more user_click.txt 
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:01:56,1(city_id),72(product_id)
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:52:26,1,68
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:17:03,1,40
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:32:07,1,21
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:26:06,1,63
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:03:11,1,60
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:43:43,1,30
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:09:58,1,96
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:18:45,1,71
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:42:39,1,8
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:24:30,1,6
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:29:49,1,26
95,5b8cdcb0b18645a19f4e3e34a241012e,2016-05-05 20:24:12,1,83
95,5b8cdcb0b18645a19f4e3e34a241012e,2016-05-05 20:07:50,1,62
95,5b8cdcb0b18645a19f4e3e34a241012e,2016-05-05 20:19:31,1,61
95,5b8cdcb0b18645a19f4e3e34a241012e,2016-05-05 20:40:51,1,46
....
[hadoop@hadoop001 data]$ wc -l user_click.txt 
11448 user_click.txt
複製代碼

解決思路

1)city_info表和product_info表經過sqoop放到Hive裏面
2)經過user_click關聯Hive裏面的city_info和product_info
3)再使用窗口函數求分組內的TOPN將結果sqoop導入MySQL
4)shell腳本封裝這個業務線的全部代碼的思路,須要說起的一點,由於city_info/product_info數據變更少,因此經過其餘的腳本導入,這個shell腳本不涉及,但我下面步驟依然會寫出來。
5)使用crontab觸發,天天凌晨2點開始執行
注意點:
a) 每次建立的臨時表,在執行以前必定要先刪除,要使用if not exits
b) 關鍵的執行要有日誌輸出
c) shell腳本如何解決冪等性問題
sql

MySQL導入Hive

sqoop部署篇講到過怎麼部署和使用sqoop,這裏不在說明,直接上代碼。shell

# 這裏給出hive裏的city_info的表結構
hive (d7)> create table city_info(
 city_id int,
 city_name string,
 area string
)
row format delimited fields terminated by '\t';

# 導入city_info
[hadoop@hadoop001 ~]$ sqoop import \
--connect "jdbc:mysql://localhost:3306/d7" \
--username root \
--password root \
--table city_info \
--split-by 'city_id' \
--fields-terminated-by '\t' \
--hive-import \
--hive-database d7 \
--target-dir '/user/hive/warehouse/d7.db/city_info' \
--delete-target-dir \
-m 2

# 這裏給出hive裏的product_info的表結構
hive (d7)> create table product_info(
 product_id int,
 product_name string,
 extend_info string
)
row format delimited fields terminated by '\t';

# 導入product_info
[hadoop@hadoop001 ~]$ sqoop import \
--connect "jdbc:mysql://localhost:3306/d7" \
--username root \
--password root \
--table product_info \
--split-by 'product_id' \
--fields-terminated-by '\t' \
--hive-import \
--hive-database d7 \
--target-dir '/user/hive/warehouse/d7.db/product_info' \
--delete-target-dir \
-m 2
複製代碼

ps:若是你第一次用sqoop的話,這裏確定會有兩個坑。這裏暫且不說,下篇文章解答。vim

user_click加載數據

生產上hive的user_click表確定是個一直數據增加的表,因此該表確定是個分區表。可是通常來講清洗好的前一天數據會直接放在user_click表存放hdfs上路徑上,好比分區表存放路徑爲hdfs://hadoop001:9000/user/hive/warehouse/d7.db/user_click,那麼生產上會將2016-05-05日誌清洗好並在該路徑上建立分區路徑。這時候你查詢分區表不會出現該分區數據,該怎麼高效的將數據刷新到分區表呢?請看下方代碼bash

# 先給出user_click表結構
hive (d7)> create table user_click(
 user_id int,
 session_id string,
 action_time string,
 city_id int,
 product_id int
)
partitioned by(day string)
row format delimited fields terminated by ',';

# 刷新分區表,另外一種刷新方式不推薦,過於暴力
hive (d7)> alter table user_click add if not exists partition(day='2016-05-05');
複製代碼

三表關聯生成臨時表

臨時表有區域名,產品名,點擊量三個字段。session

hive (d7)> drop table if exists tmp_product_area_clicks;
hive (d7)> create table tmp_product_area_clicks as
	 > select b.area,c.product_name,count(1) as click_count from user_click a
	 > left join city_info b on a.city_id=b.city_id
  	 > left join product_info c on a.product_id=c.product_id 
 	 > where a.day='2016-05-05'
	 > group by b.area,c.product_name
複製代碼

窗口函數獲得TopN結果

使用row_number()函數函數

hive (d7)> drop table if exists result_product_area_clicks_top3;
hive (d7)> create table result_product_area_clicks_top3
	 > row format delimited fields terminated by '\t' as
	 > select * from ( 
	 > select 
	 > "2016-05-05" day,product_id,product_name,area,click_count, <-- 日期會在腳本中更改
 	 > row_number() over(partition by area order by click_count desc) rank
	 > from tmp_product_area_clicks
	 > ) t where t.rank<=3;
複製代碼

Hive導出MySQL

# 咱們事先在MySQL建立好結果表,下面爲表結構
create table result_product_area_clicks_top3(
day varchar(15),
product_id int(11),
product_name varchar(50),
area varchar(10),
click_count int(11),
rank int(10)
)

# 爲了冪等性,會將MySQL結果表該日期的數據先刪掉
# 日期會在腳本中更改
mysql> delete from result_product_area_clicks_top3 where day='2016-05-05'; 


[hadoop@hadoop001 ~]$ sqoop export \
--connect jdbc:mysql://localhost:3306/d7 \
--password root \
--username root \
--table result_product_area_clicks_top3\
--export-dir /user/hive/warehouse/d7_hive.db/result_product_area_clicks_top3 \
--columns "day,product_id,product_name,area,click_count,rank" \
--fields-terminated-by '\t' \
-m 2
複製代碼

shell腳本編寫

hive離線是一天一次,是今天某個時間去運行昨天的數據,因此要在shell腳本中獲取前一天,該命令爲'date --date '1 day ago' +%Y-%m-%d'。下面就是shell腳本代碼。oop

[hadoop@hadoop001 ~]$ vim top3.sh
#!/bin/bash 
CURRENT=`date +%Y-%m-%d_%H:%M:%S`
USE_DAY=`date --date '1 day ago' +%Y-%m-%d`
echo '當前使用的日期爲:'$USE_DAY''


echo ''$CURRENT',開始刷新分區'
HIVE_PARTITION_SQL="alter table d7.user_click add if not exists partition(day='${USE_DAY}');"
hive -e "${HIVE_PARTITION_SQL}"

echo ''$CURRENT',開始建立臨時表,其中數據爲每一個區域下每一個產品的點擊數'
HIVE_TMP_SQL="drop table if exists tmp_product_area_clicks; create table tmp_product_area_clicks as select b.area,c.product_name,count(1) as click_count from user_click a left join city_info b on a.city_id=b.city_id left join product_info c on a.product_id=c.product_id where a.day='${USE_DAY}' group by b.area,c.product_name;"
hive -e "${HIVE_TMP_SQL}"

echo ''$CURRENT',開始建立結果表,其中數據爲每一個區域下每一個產品的前三點擊數'
HIVE_RESULT_SQL="drop table if exists result_product_area_clicks_top3; create table result_product_area_clicks_top3 row format delimited fields terminated by '\t' as select * from ( select '${USE_DAY}' day,product_id,product_name,area,click_count, row_number() over(partition by area order by click_count desc) rank from tmp_product_area_clicks ) t where t.rank<=3;"
hive -e "${HIVE_RESULT_SQL}"

echo ''$CURRENT',保持冪等性,開始刪除MySQL結果表中當前'$USE_DAY'數據'
MySQL_DETELE_SQL="delete from result_product_area_clicks_top3 where day='${USE_DAY}';"
sudo mysql -uroot -proot -e "${MySQL_DETELE_SQL}"

echo ''$CURRENT',開始將Hive結果表導入MySQL'
sqoop export \
--connect jdbc:mysql://localhost:3306/d7 \
--password root \
--username root \
--table result_product_area_clicks_top3\
--export-dir /user/hive/warehouse/d7_hive.db/result_product_area_clicks_top3 \
--columns "day,product_id,product_name,area,click_count,rank" \
--fields-terminated-by '\t' \
-m 2
echo ''$CURRENT',整個流程結束,請查看MySQL中數據是否導入'

複製代碼

定時後臺執行

使用crontab來作定時,具體見下方代碼post

[hadoop@hadoop001 ~]$ crontab -e
* 2 * * * nohup /home/hadoop/top3.sh >> /tmp/top3_logs.log 2>&1 & 
複製代碼
相關文章
相關標籤/搜索