hive拉鍊工具實戰

一、背景

你們好sql

最近因爲公司業務須要寫了一篇hive拉鍊工具,下邊對工具進行簡單的介紹。bash

工具名爲zipperu(意思是拉鍊工具),由bin,conf,historys,logs,tmp組成。微信

二、實現原理 

具體實現原理是根據業務表(你天天更新的表),你所關注的字段(好比phonenumber發生了變化你就認爲這條數據發生了變化,而後更改其歷史狀態)進行MD5加密,比較該字段的MD5值是否發生變化,則更新該條數據,不然不更新!編輯器

 

\bin:ide

只有一個簡單的腳本,zipperu.sh 用來處理全部的任務,以及業務邏輯工具

\conf:加密

conf下邊有個zipperu.conf文件,相關內容:spa

    tableN=xxx(須要處理的業務表,由庫名.表名組成)ip

    rowkeys={customerid} 中括號裏邊是業務表的主鍵,若是是多個用逗號隔開,好比{id,di2,id3}最後一列不加逗號md5

    tableMD5=xxxx  tableMD5要生成加md5表的名字

    column={birthday} birthday是你業務表須要關注的維度,若是這個字段有變化,就認爲這天數據已經更新

 

其中zipperu.conf文件每行表明一個須要處理的表,字段之間用tab或則空格隔開

\historys:

historys 就是天天拉鍊自動生成的sql腳本

\logs:

logs 天天運行的任務記錄

\tmp:

tmp 腳本執行生成的臨時文件,請不要將任何文件放在tmp目錄下,由於腳本啓動會清空tmp目錄。

 

目前還不支持刪除,只支持新增和更改,因爲小編水平,有限,多多包涵!

三、代碼以下

微信裏面很差編輯代碼,建議拷貝出來放到編輯器裏面查看。

 

#!/bin/bash

. /etc/profile

cd `dirname $0`

logs_data=`date +%F`

confFile=../conf/zipperu.conf

mkdir -p ../logs/$logs_data

mkdir -p ../historys/$logs_data

cat $confFile | while read linet; 

do

rm -rf ../tmp/*

echo "--------------------------------------------------------------------------------------------------正在讀取配置文件$confFile----------------------------------------------------------------------------------------------------------------------"

if [[ "$confFile" = "" ]] ; then

         echo "-------------------------------------------------------------------------您輸入的配置文件爲空,請輸入有效配置文件!-------------------------------------------------------------------------------"

         exit 1

   else

                        echo "----------------------------------------------------------------------------本次拉鍊,您輸入的配置文件爲:$confFile-----------------------------------------------------------------"

   fi

tableN=`echo $linet | awk '{print $1}'|awk -F '=' '{print $2}'`

if [[ "$tableN" = "" ]] ; then       

                        echo "您的配置文件$linet hive表配置爲空,請從新配置"

                        exit 1

else    

                        echo "----------------------------------------------------------本次拉鍊,您配置的hive表爲:$tableN-------------------------------------------------------------------"

                fi

rowkeys=`echo $linet | awk '{print $2}'|awk -F '=' '{print $2}' |sed 's/}//g'|sed 's/{//g'`

                if [[ "$rowkeys" = "" ]] ; then

                        echo "您的配置文件$linet hive表主鍵配置爲空,請從新配置"

                        exit 1

                else

echo $rowkeys >> ../tmp/$tableN.rowkeys_tmp1

cat ../tmp/$tableN.rowkeys_tmp1 | tr -s "\","\" "\"\012"\" | sed s/[[:space:]]//g >> ../tmp/$tableN.rowkeys_tmp2

rowkey=`sed -n '1p' ../tmp/$tableN.rowkeys_tmp2`

rowkeysn=`cat ../tmp/$tableN.rowkeys_tmp2 |wc -l`

                        echo "----------------------------------------------------------本次拉鍊,您配置的hive表主鍵爲:$rowkey------------------------------------------------------------------"

                fi

tableMD5=`echo $linet | awk '{print $3}'|awk -F '=' '{print $2}'`

                if [[ "$tableMD5" = "" ]] ; then

                        echo "您的配置文件$linet hiveMD5表配置爲空,請從新配置"

                        exit 1

                else

                        echo "----------------------------------------------------------本次拉鍊,您配置的hiveMD5表爲:$tableMD5-------------------------------------------------------------------"

                fi

column=`echo $linet | awk '{print $4}'|awk -F '=' '{print $2}'|sed 's/}//g'|sed 's/{//g'` #獲取配置文件中的列

if [[ "$column" = "" ]] ; then

                        echo "您的配置文件$linet下列爲空"

                        exit 1

                else

echo $rowkey

start_time=`date "+%Y%m%d%H%M%S"`

start_date=`date +%F`

end_date=`date +%F`

etl_time=`date '+%Y-%m-%d %H:%M:%S'`

tableMD5_Y="${tableMD5}"_Y""

tableN_his="${tableN}"_his""

tableN_tmp_h="${tableN}"_tmp_h""

tableN_tmp_c="${tableN}"_tmp_c""

#rm -rf ../tmp/*

                        echo "----------------------------------------------------------本次拉鍊,您配置的列爲:$column-------------------------------------------------------------------"

echo $column >> ../tmp/$tableN.tmp

cat ../tmp/$tableN.tmp |tr -s "\","\" "\"\012"\" | sed s/[[:space:]]//g > ../tmp/$tableN.tmp2

rm -rf ../tmp/$tableN.tmp

ln=`cat ../tmp/$tableN.tmp2 | wc -l`

if [[ "$ln" -gt  "1" ]] ; then

var=0

for line in `cat ../tmp/$tableN.tmp2`;

do

 

linenum=`awk '{print NR}' ../tmp/$tableN.tmp2 |tail -n1`

linenum1=`echo $[linenum-1]`

if [ $linenum1 -eq $var ] ; then

                                 echo "coalesce($line,''),','" >> ../tmp/$tableN.tmp3 #是最後一個字段處理

                         else

echo "coalesce($line,''),','," >> ../tmp/$tableN.tmp3 #最後一個字段處理

                         fi

                                  ((var+=1))

done

rm -rf ../tmp/$tableN.tmp2

column2=`cat ../tmp/$tableN.tmp3`

echo $column2 >> ../tmp/$tableN.tmp4

cat ../tmp/$tableN.tmp4 | sed s/[[:space:]]//g > ../tmp/$tableN.tmp5

column2=`cat ../tmp/$tableN.tmp5`

###############################################################################################

#獲取當前表的字段tableN(業務表的全部字段字段,用來見分區表)

hive -e "desc $tableN;" >> ../tmp/$tableN.colsinfo_tmp1

expand ../tmp/$tableN.colsinfo_tmp1 | tr -s ' ' >>../tmp/$tableN.colsinfo_tmp2

rm -rf ../tmp/$tableN.colsinfo_tmp1

tableNcolsn=`cat ../tmp/$tableN.colsinfo_tmp2 |wc -l`

echo $tableNcolsn

sed -i 's/$/,/' ../tmp/$tableN.colsinfo_tmp2

tableNcols=`cat ../tmp/$tableN.colsinfo_tmp2`

sql0="create table $tableN_his($tableNcols etl_time string , versions int , start_date string) partitioned by (end_date string);" 

echo $sql0 >> ../historys/$logs_data/$start_time$tableN_his.create.sql

sql1="drop table if exists $tableN_tmp_h;create table $tableN_tmp_h as select *,md5(concat($column2)) as md5_str from $tableN_his where end_date = '3000-12-31';"

echo $sql1 >> ../historys/$logs_data/$start_time$tableN_his.create.sql

sql2="drop table if exists $tableN_tmp_c;create table $tableN_tmp_c as select *,md5(concat($column2)) as md5_str from $tableN;"

echo $sql2 >> ../historys/$logs_data/$start_time$tableN_his.create.sql

awk '{print $1}' ../tmp/$tableN.colsinfo_tmp2 > ../tmp/$tableN.colsinfo_tmp3

echo "etl_time" >>../tmp/$tableN.colsinfo_tmp3

echo "versions" >>../tmp/$tableN.colsinfo_tmp3

echo "start_date" >>../tmp/$tableN.colsinfo_tmp3

cat ../tmp/$tableN.colsinfo_tmp3 | while read fiel;

do

echo "h.$fiel," >> ../tmp/$tableN.colsinfo_tmp4

done

echo "'$end_date' as end_date" >> ../tmp/$tableN.colsinfo_tmp4

awk '{if(s){print s};s=$0}END{sub(",$","");print}' ../tmp/$tableN.colsinfo_tmp4 >> ../tmp/$tableN.colsinfo_tmp5

hall=`cat ../tmp/$tableN.colsinfo_tmp5`

echo "$hall"

expand ../tmp/$tableN.colsinfo_tmp2 | tr -s ' ' >>../tmp/$tableN.colsinfo_tmp2_1

cat ../tmp/$tableN.colsinfo_tmp2_1 | awk  '{print $1}'| while read fiel2;

do

echo "case when c.$rowkey is not null then c.$fiel2 else h.$fiel2 end as $fiel2," >>../tmp/$tableN.colsinfo_tmp2_2

done

awk '{if(s){print s};s=$0}END{sub(",$","");print}' ../tmp/$tableN.colsinfo_tmp2_2 >> ../tmp/$tableN.colsinfo_tmp2_3

allcase=`cat ../tmp/$tableN.colsinfo_tmp2_2`

if [[ "$rowkeysn" -eq  "1" ]] ; then

sql3="from $tableN_tmp_h h full outer join $tableN_tmp_c c on h.$rowkey = c.$rowkey insert overwrite table $tableN_his partition(end_date) select $hall where h.$rowkey is not null and c.$rowkey is not null and h.md5_str <> c.md5_str insert overwrite table $tableN_his partition(end_date='3000-12-31') select $allcase '$etl_time' as etl_time,case when h.$rowkey is null then 0 when h.$rowkey is not null and c.$rowkey is not null and h.md5_str<>c.md5_str then h.versions+1 else h.versions end as versions, IF (h.$rowkey IS not NULL AND c.$rowkey IS NOT NULL and h.md5_str = c.md5_str,h.start_date,'$start_date') AS start_date;"

echo $sql3 >>../historys/$logs_data/$start_time$tableN_his.create.sql

else 

sed -i '1d' ../tmp/$tableN.rowkeys_tmp2

cat ../tmp/$tableN.rowkeys_tmp2 | while read fiel3;

do

echo "and h.$fiel3 = c.$fiel3" >>../tmp/$tableN.rowkeys_tmp3

done 

rowksys=`cat ../tmp/$tableN.rowkeys_tmp3`

sql3="from $tableN_tmp_h h full outer join $tableN_tmp_c c on h.$rowkey = c.$rowkey $rowksys insert overwrite table $tableN_his partition(end_date='$end_date') select $hall where h.$rowkey is not null and c.$rowkey is not null and h.md5_str <> c.md5_str insert overwrite table $tableN_his partition(end_date='3000-12-31') select $allcase '$etl_time' as etl_time,case when h.$rowkey is null then 0 when h.$rowkey is not null and c.$rowkey is not null and h.md5_str<>c.md5_str then h.versions+1 else h.versions end as versions, IF (h.$rowkey IS not NULL AND c.$rowkey IS NOT NULL and h.md5_str = c.md5_str,h.start_date,'$start_date') AS start_date;"

echo $sql3 >> ../historys/$logs_data/$start_time$tableN_his.create.sql

fi

nohup hive -e "$sql1 $sql2 $sql3" >> ../logs/$logs_data/$start_time$tableN_his.log

else

###################################加密md5字段數目爲1的狀況

column2="$column"

#獲取當前表的字段tableN(業務表的全部字段字段,用來見分區表)

hive -e "desc $tableN;" >> ../tmp/$tableN.colsinfo_tmp1

expand ../tmp/$tableN.colsinfo_tmp1 | tr -s ' ' >>../tmp/$tableN.colsinfo_tmp2

rm -rf ../tmp/$tableN.colsinfo_tmp1

tableNcolsn=`cat ../tmp/$tableN.colsinfo_tmp2 |wc -l`

sed -i 's/$/,/' ../tmp/$tableN.colsinfo_tmp2

tableNcols=`cat ../tmp/$tableN.colsinfo_tmp2`

sql0="create table $tableN_his($tableNcols etl_time string , versions int , start_date string) partitioned by (end_date string);" 

echo $sql0 >> ../historys/$logs_data/$start_time$tableN_his.create.sql

sql1="drop table if exists $tableN_tmp_h;create table $tableN_tmp_h as select *,md5(concat($column2)) as md5_str from $tableN_his where end_date = '3000-12-31';"

sql2="drop table if exists $tableN_tmp_c;create table $tableN_tmp_c as select *,md5(concat($column2)) as md5_str from $tableN;"

echo $sql1 >> ../historys/$logs_data/$start_time$tableN_his.create.sql

echo $sql2 >> ../historys/$logs_data/$start_time$tableN_his.create.sql

awk '{print $1}' ../tmp/$tableN.colsinfo_tmp2 > ../tmp/$tableN.colsinfo_tmp3

echo "etl_time" >>../tmp/$tableN.colsinfo_tmp3

echo "versions" >>../tmp/$tableN.colsinfo_tmp3

echo "start_date" >>../tmp/$tableN.colsinfo_tmp3

cat ../tmp/$tableN.colsinfo_tmp3 | while read fiel;

do

echo "h.$fiel," >> ../tmp/$tableN.colsinfo_tmp4

done

echo "'$end_date' as end_date" >> ../tmp/$tableN.colsinfo_tmp4

awk '{if(s){print s};s=$0}END{sub(",$","");print}' ../tmp/$tableN.colsinfo_tmp4 >> ../tmp/$tableN.colsinfo_tmp5

hall=`cat ../tmp/$tableN.colsinfo_tmp5`

echo "$hall"

expand ../tmp/$tableN.colsinfo_tmp2 | tr -s ' ' >>../tmp/$tableN.colsinfo_tmp2_1

cat ../tmp/$tableN.colsinfo_tmp2_1 | awk  '{print $1}'| while read fiel2;

do

echo "case when c.$rowkey is not null then c.$fiel2 else h.$fiel2 end as $fiel2," >>../tmp/$tableN.colsinfo_tmp2_2

done

awk '{if(s){print s};s=$0}END{sub(",$","");print}' ../tmp/$tableN.colsinfo_tmp2_2 >> ../tmp/$tableN.colsinfo_tmp2_3

allcase=`cat ../tmp/$tableN.colsinfo_tmp2_2`

if [[ "$rowkeysn" -eq  "1" ]] ; then

sql3="from $tableN_tmp_h h full outer join $tableN_tmp_c c on h.$rowkey = c.$rowkey insert overwrite table $tableN_his partition(end_date) select $hall where h.$rowkey is not null and c.$rowkey is not null and h.md5_str <> c.md5_str insert overwrite table $tableN_his partition(end_date='3000-12-31') select $allcase '$etl_time' as etl_time,case when h.$rowkey is null then 0 when h.$rowkey is not null and c.$rowkey is not null and h.md5_str<>c.md5_str then h.versions+1 else h.versions end as versions, IF (h.$rowkey IS not NULL AND c.$rowkey IS NOT NULL and h.md5_str = c.md5_str,h.start_date,'$start_date') AS start_date;"

echo $sql3 >> ../historys/$logs_data/$start_time$tableN_his.create.sql

else 

sed -i '1d' ../tmp/$tableN.rowkeys_tmp2

cat ../tmp/$tableN.rowkeys_tmp2 | while read fiel3;

do

echo "and h.$fiel3 = c.$fiel3" >>../tmp/$tableN.rowkeys_tmp3

done 

rowksys=`cat ../tmp/$tableN.rowkeys_tmp3`

sql3="from $tableN_tmp_h h full outer join $tableN_tmp_c c on h.$rowkey = c.$rowkey $rowksys insert overwrite table $tableN_his partition(end_date) select $hall where h.$rowkey is not null and c.$rowkey is not null and h.md5_str <> c.md5_str insert overwrite table $tableN_his partition(end_date='3000-12-31') select $allcase '$etl_time' as etl_time,case when h.$rowkey is null then 0 when h.$rowkey is not null and c.$rowkey is not null and h.md5_str<>c.md5_str then h.versions+1 else h.versions end as versions, IF (h.$rowkey IS not NULL AND c.$rowkey IS NOT NULL and h.md5_str = c.md5_str,h.start_date,'$start_date') AS start_date;"

echo $sql3 >> ../historys/$logs_data/$start_time$tableN_his.create.sql

fi

# hive -e "$sql0"

nohup hive -e "$sql1 $sql2 $sql3"  >> ../logs/$logs_data/$start_time$tableN_his.log

fi

 

 

 

fi

rm -rf ../tmp/*

done

 

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

相關文章
相關標籤/搜索