kettle是一個ETL工具,ETL(Extract-Transform-Load的縮寫,即數據抽取、轉換、裝載的過程)。
kettle中文名稱叫水壺,該項目的主程序員MATT 但願把各類數據放到一個壺裏,而後以一種指定的格式流出。
因此他的重心是用於數據
oozie是一個工做流,Oozie工做流是放置在控制依賴DAG(有向無環圖 Direct Acyclic Graph)中的一組動做(例如,Hadoop的Map/Reduce做業、Pig做業等),其中指定了動做執行的順序。
oozie工做流中是有數據流動的,可是重心是在於工做流的定義。
兩者雖然都有相關功能及數據的流動,可是其實用途是不同的。php
查看幫助java
bin/sqoop help
列舉出全部linux上的數據庫node
bin/sqoop list-databases --connect jdbc:mysql://localhost:3306 --username root --password root
列舉出全部Window上的數據庫mysql
bin/sqoop list-databases --connect jdbc:mysql://192.168.22.36:3306 --username root --password root
查看數據庫下的全部表linux
bin/sqoop list-tables --connect jdbc:mysql://localhost:3306/mysql --username root --password root
(1)肯定mysql服務啓動正常程序員
查詢控制端口和查詢進程來肯定,一下兩種辦法能夠確認mysql是否在啓動狀態web
辦法1:查詢端口ajax
$ netstat -tulpn
MySQL監控的TCP的3306端口,若是顯示3306,證實MySQL服務在運行中sql
辦法二:查詢進程shell
能夠看見mysql的進程
ps -ef | grep mysqld
沒有指定數據導入到哪一個目錄,默認是/user/root/表名
bin/sqoop import \
--connect jdbc:mysql://192.168.77.137/zhjy \
--password 123456 \
--username root \
--table zf_jygz_thjc \
--m 1 \
--fields-terminated-by '\t'
或是
bin/sqoop import \
--connect jdbc:mysql://192.168.77.137/zhjy \
--password 123456 \
--username root \
--table zf_jygz_thjc \
--m 5 \
--split-by ZF_BH(通常在設置-m>1時使用)
--fields-terminated-by '\t'
緣由:
若是表中有主鍵,m的值能夠設置大於1的值;若是沒有主鍵只能將m值設置成爲1;或者要將m值大於1,須要使用--split-by指定一個字段
設置了-m 1 說明只有一個maptask執行數據導入,默認是4個maptask執行導入操做,可是必須指定一個列來做爲劃分依據
導入數據到指定目錄
在導入表數據到HDFS使用Sqoop導入工具,咱們能夠指定目標目錄。使用參數 --target-dir來指定導出目的地,使用參數—delete-target-dir來判斷導出目錄是否存在,若是存在就刪掉
bin/sqoop import \
--connect jdbc:mysql://192.168.77.137/zhjy \
--username root \
--password 123456 \
--delete-target-dir \ --若是目錄存在,將目錄刪除
--table zf_jygz_thjc \
--target-dir /user/zhjy \ --指定保存目錄
--m 1 \
--fields-terminated-by '\t'
查詢導入
bin/sqoop import \ --connect jdbc:mysql://192.168.72.133:3306/company \ --username root \ --password root \ --target-dir /user/company \ --delete-target-dir \ --num-mappers 1 \ --fields-terminated-by "\t" \ --query 'select name,sex from staff where id <=1 and $CONDITIONS;'
提示:must contain '$CONDITIONS' in WHERE clause。
where id <=1 匹配條件
$CONDITIONS:傳遞做用。
若是 query 後使用的是雙引號,則 $CONDITIONS前必須加轉義符,防止 shell 識別爲本身的變量。
--query時不能使用--table一塊兒使用
須要指定--target-dir路徑
導入到hdfs指定目錄並指定要求
bin/sqoop import \ --connect jdbc:mysql://192.168.72.133:3306/company \ --username root \ --password root\ #提升數據庫到hadoop的傳輸速度 --direct --table staff \
--delete-target-dir \ #導入指定列,涉及到多列,用逗號分隔 --column id,sex \ --target-dir /user/company \ --num-mappers 1 \ #指定分隔符 --fields-terminated-by '\t' #指定導出存儲格式 --as-textfile #指定數據壓縮(壓縮,解壓縮方式) --compress --compression-codec org.apache.hadoop.io.compress.SnappyCodec
數據導出儲存方式(數據存儲文件格式---( textfil parquet)--as-textfileImports data as plain text (default)--as-parquetfile Imports data to Parquet Files)
導入表數據子集到HDFS
bin/sqoop import \ --connect jdbc:mysql://172.16.43.67:3306/userdb \ --username root \ --password root \ --table emp_add \ --target-dir /sqoop/emp_add \ -m 1 \ --delete-target-dir \ --where "city = 'sec-bad'"
sqoop導入blob數據到hive
對於CLOB,如xml文本,sqoop能夠遷移到Hive表,對應字段存儲爲字符類型。
對於BLOB,如jpg圖片,sqoop沒法直接遷移到Hive表,只能先遷移到HDFS路徑,而後再使用Hive命令加載到Hive表。遷移到HDFS後BLOB字段存儲爲16進制形式。
bin/sqoop-import \ --connect jdbc:mysql://192.168.77.137:3306/zhjy \ --username root \ --password 123456 \ --table ceshi \ --columns "id,name,photo" \ --split-by id \ -m 4 \ --inline-lob-limit=16777126 \設置內聯的LOB對象的大小 --target-dir /user/hive/warehouse/ods.db/ceshi
2.1.3導入關係表到Hive
第一步:導入須要的jar包
將咱們mysql表當中的數據直接導入到hive表中的話,咱們須要將hive的一個叫作hive-exec-1.1.0-cdh5.14.0.jar的jar包拷貝到sqoop的lib目錄下
cp /export/servers/hive-1.1.0-cdh5.14.0/lib/hive-exec-1.1.0-cdh5.14.0.jar /export/servers/sqoop-1.4.6-cdh5.14.0/lib/
第二步:開始導入
day=`date -d "yesterday" +%Y%m%d`
sqoop import \ --導入數據 --connect jdbc:mysql://10.2.111.87:3306/ehl_apmp \ --鏈接url --username root \ --用戶名 --password root \ --密碼 --table zf_jygz_thjc \ --要導入的表 -m 1 \ --maptask
--hive-drop-import-delims \ --導入時刪除數據庫中特殊字符
--hive-overwrite \ --覆蓋導入
--hive-import \ --導入到hive表中
--hive-database ods \ --導入到hive中哪一個數據庫
--hive-table ods_zf_jygz_thjc \ --導入到hive中哪一個表
--fields-terminated-by '\t' \ --字段分隔符
--lines-terminated-by '\n' --指定行分隔符
--null-string '\\N' \ --字符串類型爲null是代替字符
--null-non-string '\\N' \ --字非符串類型爲null是的代替字符
--hive-partition-key day \ --hive表的分區字段
--hive-partition-value "$day" --指定導入表的分區值
導入關係表到hive並自動建立hive表
們也能夠經過命令來將咱們的mysql的表直接導入到hive表當中去
sqoop import --connect jdbc:mysql://10.2.111.87:3306/ehl_apmp \
--username root --password root \
--table $1 \
--hive-import \
--hive-database ods \
--create-hive-table \
--fields-terminated-by '\t' \
--null-string '\\N' \
--null-non-string '\\N' \
--split-by code \
-m 4
經過這個命令,咱們能夠直接將咱們mysql表當中的數據以及表結構一塊兒倒入到hive當中去
https://blog.csdn.net/holdbelief/article/details/79578723
--incremental<mode> 增量模式。
append id 是獲取一個某一列的某個值。
lastmodified 「2016-12-15 15:47:35」 獲取某個時間後修改的全部數據
-append 附加模式
-merge-key id 合併模式
--check-column<column name> 用來指定一些列,能夠去指定多個列;一般的是指定主鍵id
--last -value<last check column value> 從哪一個值開始增量
==注意:增量導入的時候,必定不能加參數--delete-target-dir 不然會報錯==
第一種增量導入方式(不經常使用)
1.Append方式
使用場景:有個訂單表,裏面每一個訂單有一個惟一標識的自增列id,在關係型數據庫中以主鍵的形式存在。以前已經將id在0-1000之間的編號的訂單導入到HDFS 中;若是在產生新的訂單,此時咱們只需指定incremental參數爲append,--last-value參數爲1000便可,表示只從id大於1000後開始導入。
(1)建立一個MySQL表
CREATE TABLE orders( o_id INT PRIMARY KEY AUTO_INCREMENT, o_name VARCHAR(255), o_price INT ); INSERT INTO orders(o_id,o_name,o_price) VALUES(1,'聯想',5000); INSERT INTO orders(o_id,o_name,o_price) VALUES(2,'海爾',3000); INSERT INTO orders(o_id,o_name,o_price) VALUES(3,'雷神',5000); INSERT INTO orders(o_id,o_name,o_price) VALUES(4,'JACK JONES',800); INSERT INTO orders(o_id,o_name,o_price) VALUES(5,'真維斯',200);
(2)建立一個hive表(表結構與mysql一致)
bin/sqoop import \ --connect jdbc:mysql://192.168.22.30:3306/userdb \ --username root \ --password root \ --table emp \ --target-dir /sqoop/increment \ --num-mappers 1 \ --incremental append \ --check-column id \ --last-value 1202
注意:
append 模式不支持寫入到hive表中
2.lastModify方式
此方式要求原有表有time字段,它能指定一個時間戳,讓sqoop把該時間戳以後的數據導入到HDFS;由於後續訂單可能狀體會變化,變化後time字段時間戳也會變化,此時sqoop依然會將相同狀態更改後的訂單導入HDFS,固然咱們能夠只當merge-key參數爲order-id,表示將後續新的記錄和原有記錄合併。
# 將時間列大於等於閾值的數據增量導入HDFS
sqoop import \ --connect jdbc:mysql://192.168.xxx.xxx:3316/testdb \ --username root \ --password transwarp \ --query 「select order_id, name from order_table where \$CONDITIONS」 \ --target-dir /user/root/order_all \ --split-by id \ -m 4 \ --incremental lastmodified \ --merge-key order_id \ --check-column time \ # remember this date !!! --last-value 「2014-11-09 21:00:00」
使用 lastmodified 方式導入數據,要指定增量數據是要 --append(追加)仍是要 --merge-key(合併)last-value 指定的值是會包含於增量導入的數據中。
第二種增量導入方式(推薦)
==經過where條件選取數據更加精準==
yesterday=`date -d "yesterday" +%Y_%m_%d`
where="update_time >= \"${yesterday}""
day=`date -d "yesterday" +%Y-%m-%d`
sqoop import \ --導入數據
--connect jdbc:mysql://10.2.111.87:3306/ehl_apmp \ --鏈接url
--username root \ --用戶名
--password root \ --密碼
--table zf_jygz_thjc \ --要導入的表
-m 1 \ --maptask
--hive-drop-import-delims \ --導入時刪除數據庫中特殊字符
--hive-overwrite \ --覆蓋導入
--hive-import \ --導入到hive表中
--hive-database ods \ --導入到hive中哪一個數據庫
--hive-table ods_zf_jygz_thjc \ --導入到hive中哪一個表
--fields-terminated-by '\t' \ --字段分隔符
--lines-terminated-by '\n' --指定行分隔符
--columns 'zf_bh,zf_xm' \ --導入的字段(可選)
--where "${where}" \ --條件導入
--null-string '\\N' \ --字符串類型爲null是代替字符
--null-non-string '\\N' \ --字非符串類型爲null是的代替字符
--hive-partition-key day \ --hive表的分區字段
--hive-partition-value "$day" --指定導入表的分區值
2.1.5從RDBMS到HBase
bin/sqoop import --connect jdbc:mysql://192.168.22.30:3306/userdb \ --username root \ --password root \ --table emp \ --columns "id,name,sex" \ --column-family "info" --hbase-create-table \ --hbase-row-key "id" \ --hbase-table "hbase_test" \ --split-by id \ --num-mappers 1
會報錯
緣由:sqoop1.4.6 只支持 HBase1.0.1 以前的版本的自動建立 HBase 表的功能。
解決方案:手動建立 HBase 表
hbase> create 'hbase_staff','info'
導出前,目標表必須存在與目標數據庫中
默認操做是將文件中的數據使用insert語句插入到表中
數據是在HDFS當中的以下目錄/sqoop/emp,數據內容以下
1201,gopal,manager,50000,TP,2018-06-17 18:54:32.0,2018-06-17 18:54:32.0,1 1202,manisha,Proof reader,50000,TP,2018-06-15 18:54:32.0,2018-06-17 20:26:08.0,1 1203,khalil,php dev,30000,AC,2018-06-17 18:54:32.0,2018-06-17 18:54:32.0,1 1204,prasanth,php dev,30000,AC,2018-06-17 18:54:32.0,2018-06-17 21:05:52.0,0 1205,kranthi,admin,20000,TP,2018-06-17 18:54:32.0,2018-06-17 18:54:32.0,1
第一步:建立MySQL表
CREATE TABLE `emp_out` ( `id` INT(11) DEFAULT NULL, `name` VARCHAR(100) DEFAULT NULL, `deg` VARCHAR(100) DEFAULT NULL, `salary` INT(11) DEFAULT NULL, `dept` VARCHAR(10) DEFAULT NULL, `create_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, `update_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `is_delete` BIGINT(20) DEFAULT '1' ) ENGINE=INNODB DEFAULT CHARSET=utf8;
第二步:執行導出命令
經過export來實現數據的導出,將hdfs的數據導出到mysql當中去
全量導出
bin/sqoop export \ --connect jdbc:mysql://172.16.43.67:3306/userdb \ --username root \
--password admin \ --table emp_out \ --export-dir /sqoop/emp \
--columns id,name \(當文件數據與表結構一致時,能夠不指定) --input-fields-terminated-by ","
增量導出
bin/sqoop export \ --connect jdbc:mysql://192.168.77.137:3306/zhjy \ --username root \ --password 123456 \ --table emp_out \ --update-key id \ --update-mode allowinsert \(新增的數據被導出) --export-dir '/user/hive/warehouse/ods_ceshi/part-m-00000' \ --input-null-string '\\N' \ --input-null-non-string '\\N' \ --input-fields-terminated-by ',' \ -m 1
更新導出
bin/sqoop export \ --connect jdbc:mysql://192.168.77.137:3306/zhjy \ --username root \ --password 123456 \ --table emp_out \ --update-key id \ --update-mode updateonly \(只能導出修改後的數據,不能導出新增的數據) --export-dir '/user/hive/warehouse/ods_ceshi/part-m-00000' \ --input-null-string '\\N' \ --input-null-non-string '\\N' \ --input-fields-terminated-by ',' \ -m 1
總結:
參數介紹
--update-key 後面也能夠接多個關鍵字列名,可使用逗號隔開,Sqoop將會匹配多個關鍵字後再執行更新操做。
--export-dir 參數配合--table或者--call參數使用,指定了HDFS上須要將數據導入到MySQL中的文件集目錄。
--update-mode updateonly和allowinsert。 默認模式爲updateonly,若是指定--update-mode模式爲allowinsert,能夠將目標數據庫中原來不存在的數據也導入到數據庫表中。即將存在的數據更新,不存在數據插入。
組合測試及說明
一、當指定update-key,且關係型數據庫表存在主鍵時:
A、allowinsert模式時,爲更新目標數據庫表存的內容,而且原來不存在的數據也導入到數據庫表;
B、updateonly模式時,爲更新目標數據庫表存的內容,而且原來不存在的數據也不導入到數據庫表;
二、當指定update-key,且關係型數據庫表不存在主鍵時:
A、allowinsert模式時,爲所有數據追加導入到數據庫表;
B、updateonly模式時,爲更新目標數據庫表存的內容,而且原來不存在的數據也不導入到數據庫表;
三、當不指定update-key,且關係型數據庫表存在主鍵時:
A、allowinsert模式時,報主鍵衝突,數據無變化;
B、updateonly模式時,報主鍵衝突,數據無變化;
四、當不指定update-key,且關係型數據庫表不存在主鍵時:
A、allowinsert模式時,爲所有數據追加導入到數據庫表;
B、updateonly模式時,爲所有數據追加導入到數據庫表;
實際案例:
(1)mysql批量導入hive
#!/bin/bash source /etc/profile num=0 list="table1 table2 table3" for i in $list; do echo "$sum" echo "$i" echo "sqoop開始批量導入......" sqoop import --connect jdbc:mysql://localhost:3306/test --username root --password 123456 --table person --hive-table db.$i --delete-target-dir --hive-overwrite --hive-import & num=$(expr $num + 1) if [$sum -gt 4 ]; then { echo "等待批量任務完成" wait echo "開始下一批導入" num = 0 } fi done echo "等待最後一批任務完成" wait echo "所有導入完成"
使用shell腳本:
#!/bin/sh export SQOOP_HOME=/usr/share/sqoop-1.4.4 hostname="192.168.1.199" user="root" password="root" database="test" table="tags" curr_max=0 function db_to_hive(){ ${SQOOP_HOME}/bin/sqoop import --connect jdbc:mysql://${hostname}/${database} \ --username ${user} \ --password ${password} \ --table ${table} \ --split-by docid \ --hive-import \ --hive-table lan.ding --fields-terminated-by '\t' --incremental append --check-column docid --last-value ${curr_max} result=`mysql -h${hostname} -u${user} -p${password} ${database}<<EOF select max(docid) from ${table}; EOF` curr_max=`echo $result |awk '{print $2}'` } if [ $# -eq 0 ];then while true do db_to_hive sleep 120 done exit fi
筆者目前用sqoop把mysql數據導入到Hive中,最後實現命令行導入,sqoop版本1.4.7,實現以下
sqoop job --import --connect jdbc:mysql://10.4.20.93:3303 \--username user \--password 123456 \--query "select user_name ,user_id,identype from users where $CONDITIONS" \--hive-import \--hive-database haibian_odbc \--hive-table users \--split-by id \--fields-terminated-by '\01' \--lines-terminated-by '\n' \--target-dir /user/hive/tmp/users \--hive-delims-replacement ' ' --incremental append \--check-column id \--last-value 0
最後須要把這個導入搞成job,天天定時去跑,實現數據的自動化增量導入,sqoop支持job的管理,能夠把導入建立成job重複去跑,而且它會在metastore中記錄增值,每次執行增量導入以前去查詢
建立job命令以下
sqoop job --create users -- import --connect jdbc:mysql://10.4.20.93:3303 \--username user \--password 123456 \--query "select user_name ,user_id,identype from users where $CONDITIONS" \--hive-import \--hive-database haibian_odbc \--hive-table users \--split-by id \--fields-terminated-by '\01' \--lines-terminated-by '\n' \--target-dir /user/hive/tmp/users \--hive-delims-replacement ' ' --incremental append \--check-column id \--last-value 0
建立完job就能夠去執行它了
sqoop job --exec users
能夠把該指令設爲Linux定時任務,或者用Azkaban定時去執行它
#! /bin/bash
first="$1"
second="$2"
while [ "$first" != "$second" ]
do
date=`date -d "$first" +"%Y-%m-%d"`
sqoop export \
--connect jdbc:mysql:// \
--username \
--password \
--table dwd_fact_front_orderinfo \
--export-dir /user/hive/warehouse/dwd.db/dwd_fact_front_orderinfo/day="$date" \
--input-null-non-string '\\N' \
--input-null-string '\\N' \
--input-fields-terminated-by "\t" \
--update-key id \
--update-mode allowinsert \
--m 1;
let first=`date -d "-1 days ago ${first}" +%Y%m%d`
done
(1):對市面上最流行的兩種調度器,給出如下詳細對比,以供技術選型參考。整體來講,ooize相比azkaban是一個重量級的任務調度系統,功能全面,但配置使用也更復雜。若是能夠不在乎某些功能的缺失,輕量級調度器azkaban是很不錯的候選對象。
(2):功能:
二者都可以調度mapreduce,pig,java,腳本工做流任務;
二者都可以定時執行工做流任務;
(3):工做流定義:
Azkaban使用Properties文件定義工做流;
Oozie使用XML文件定義工做流;
(4):工做流傳參:
Azkaban支持直接傳參,例如${input};
Oozie支持參數和EL表達式,例如${fs:dirSize(myInputDir)};
(5):定時執行:
Azkaban的定時執行任務是基於時間的;
Oozie的定時執行任務基於時間和輸入數據;
(6):資源管理:
Azkaban有較嚴格的權限控制,如用戶對工做流進行讀/寫/執行等操做;
Oozie暫無嚴格的權限控制;
(7):工做流執行:
Azkaban有兩種運行模式,分別是solo server mode(executor server和web server部署在同一臺節點)和multi server mode(executor server和web server能夠部署在不一樣節點);
Oozie做爲工做流服務器運行,支持多用戶和多工做流;
(8):工做流管理:
Azkaban支持瀏覽器以及ajax方式操做工做流;
Oozie支持命令行、HTTP REST、Java API、瀏覽器操做工做流;
cd /export/servers/azkaban-solo-server-0.1.0-SNAPSHOT bin/start-solo.sh
瀏覽器頁面訪問
使用Oozie時一般整合hue,用戶數據倉庫調度
大體流程:
MySQL -> HDFS -> ODS -> DWD -> DWS -> ADS -> MySQL
具體流程:
1. MySQL業務經過Sqoop數據導入HDFS
2. 將HDFS數據導入Hive數倉ODS層
3. 將ODS數據簡單清洗寫入DWD層
4. 將DWD數據輕度彙總寫入DWS層寬表
5. 將DWS層數據統計結果寫入ADS層
6. 將ADS層數據經過Sqoop導出到MySQL彙總表
就是剛纔選擇的腳本
腳本里須要的參數,儘可能設置爲動態自動獲取,如 ${date}
第一步的參數是全部文件和當天日期,後面的只須要日期,最後一步是導出全部結果,相應填入
添加文件和設置相應參數
運行後會有狀態提示頁面,能夠看到任務進度
點擊調度任務的頁面狀況
修改定時任務名和描述
添加須要定時調度的任務
sm-workflow的參數都是寫死的,沒有設置動態,這裏的下拉列表就不會有可選項。
設置參數
將sm-workflow的日期修改成 ${do_date},保存
進入定時計劃sm-dw中,會看到有參數 do_date
填入相應參數,前一天日期
${coord:formatTime(coord:dateOffset(coord:nominalTime(), -1, ‘DAY’), ‘yyyyMMdd’)}
Oozie經常使用系統常量
固然,也能夠經過這樣將參數傳入workflow任務中,代碼或者shell中須要的參數。
如,修改sm-workflow 中的 sqoop_import.sh,添加一個參數 ${num}。
編輯文件(須要登錄Hue的用戶有對HDFS操做的權限),修改shell中的一個值爲參數,保存。
在workflow中,編輯添加參數 ${num} ,或者num=${num} 保存。
進入schedule中,能夠看到添加的參數,編輯輸入相應參數便可。
Bundle統一管理全部定時調度,階段劃分:Bundle > Schedule > workflow