Sqoop
是一個用來將Hadoop
和關係型數據庫中的數據相互轉移的工具,能夠將一個關係型數據庫(例如:MySQL、Oracle、Postgres
等)中的數據導進到Hadoop
的HDFS
中,也能夠將HDFS
的數據導進到關係型數據庫中。對於某些NoSQL
數據庫它也提供了鏈接器。Sqoop
,相似於其餘ETL
工具,使用元數據模型來判斷數據類型並在數據從數據源轉移到Hadoop
時確保類型安全的數據處理。Sqoop
專爲大數據批量傳輸設計,可以分割數據集並建立Hadoop
任務來處理每一個區塊。java
本文版本說明mysql
hadoop
版本 :hadoop-2.7.2
hive版本
:hive-2.1.0
sqoop版本:sqoop-1.4.6
web
1). 將mysql
的people_access_log
表導入到hive
表web.people_access_log
,而且hive
中的表不存在。
mysql
中表people_access_log
數據爲:sql
1,15110101010,1577003281739,'112.168.1.2','https://www.baidu.com' 2,15110101011,1577003281749,'112.16.1.23','https://www.baidu.com' 3,15110101012,1577003281759,'193.168.1.2','https://www.taobao.com' 4,15110101013,1577003281769,'112.18.1.2','https://www.baidu.com' 5,15110101014,1577003281779,'112.168.10.2','https://www.baidu.com' 6,15110101015,1577003281789,'11.168.1.2','https://www.taobao.com'
將mysql
數據導入hive
的命令爲:數據庫
sqoop import \ --connect jdbc:mysql://master1.hadoop:3306/test \ --username root \ --password 123456 \ --table people_access_log \ -m 1 \ --hive-import \ --create-hive-table \ --fields-terminated-by '\t' \ --hive-table web.people_access_log
該命令會啓用一個mapreduce
任務,將mysql
數據導入到hive
表,而且指定了hive
表的分隔符爲\t
,若是不指定則爲默認分隔符^A(ctrl+A)
。安全
參數說明bash
參數 | 說明 |
---|---|
--connect |
mysql 的鏈接信息 |
--username |
mysql 的用戶名 |
--password |
mysql 的密碼 |
--table |
被導入的mysql 源表名 |
-m |
並行導入啓用的map 任務數量,與--num-mapper 含義同樣 |
--hive-import |
插入數據到hive 當中,使用hive 默認的分隔符,可使用--fields-terminated-by 參數來指定分隔符。 |
-- hive-table |
hive當中的表名 |
2). 也能夠經過--query
條件查詢Mysql
數據,將查詢結果導入到Hive
app
sqoop import \ --connect jdbc:mysql://master1.hadoop:3306/test \ --username root \ --password 123456 \ --query 'select * from people_access_log where \$CONDITIONS and url = "https://www.baidu.com"' \ --target-dir /user/hive/warehouse/web/people_access_log \ --delete-target-dir \ --fields-terminated-by '\t' \ -m 1
參數 | 說明 |
---|---|
--query |
後接查詢語句,條件查詢須要\$CONDITIONS and 鏈接查詢條件,這裏的\$ 表示轉義$ ,必須有. |
--delete-target-dir |
若是目標hive 表目錄存在,則刪除,至關於overwrite . |
仍是使用上面的hive
表web.people_access_log
,將其導入到mysql
中的people_access_log_out
表中.工具
sqoop export \ --connect jdbc:mysql://master1.hadoop:3306/test \ --username root \ --password 123456 \ --table people_access_log_out \ --input-fields-terminated-by '\t' \ --export-dir /user/hive/warehouse/web.db/people_access_log \ --num-mappers 1
注意:mysql
表people_access_log_out
須要提早建好,不然報錯:ErrorException: Table 'test.people_access_log_out' doesn't exist
。若是有id
自增列,hive
表也須要有,hive
表與mysql
表字段必須徹底相同。oop
create table people_access_log_out like people_access_log;
執行完一個mr
任務後,成功導入到mysql
表people_access_log_out
中.
實際中mysql
數據會不斷增長,這時候須要用sqoop
將數據增量導入hive
,而後進行海量數據分析統計。增量數據導入分兩種,一是基於遞增列的增量數據導入(Append
方式)。二是基於時間列的增量數據導入(LastModified
方式)。有幾個核心參數:
–check-column
:用來指定一些列,這些列在增量導入時用來檢查這些數據是否做爲增量數據進行導入,和關係型數據庫中的自增字段及時間戳相似.注意:這些被指定的列的類型不能使任意字符類型,如char、varchar等類型都是不能夠的,同時–check-column
能夠去指定多個列–incremental
:用來指定增量導入的模式,兩種模式分別爲Append
和Lastmodified
–last-value
:指定上一次導入中檢查列指定字段最大值接着前面的日誌表,裏面每行有一個惟一標識自增列ID
,在關係型數據庫中以主鍵形式存在。以前已經將id在0~6
之間的編號的訂單導入到Hadoop
中了(這裏爲HDFS
),如今一段時間後咱們須要將近期產生的新的訂 單數據導入Hadoop
中(這裏爲HDFS
),以供後續數倉進行分析。此時咱們只須要指定–incremental
參數爲append
,–last-value
參數爲6
便可。表示只從id
大於6
後即7
開始導入。
hive
表首先咱們須要建立一張與mysql
結構相同的hive
表,假設指定字段分隔符爲\t
,後面導入數據時候分隔符也須要保持一致。
job
增量導入確定是屢次進行的,可能每隔一個小時、一天等,因此須要建立計劃任務,而後定時執行便可。咱們都知道hive
的數據是存在hdfs
上面的,咱們建立sqoop job
的時候須要指定hive
的數據表對應的hdfs
目錄,而後定時執行這個job
便可。
當前mysql
中數據,hive
中數據與mysql
同樣也有6條:
id |
user_id |
access_time |
ip |
url |
---|---|---|---|---|
1 | 15110101010 | 1577003281739 | 112.168.1.2 | https://www.baidu.com |
2 | 15110101011 | 1577003281749 | 112.16.1.23 | https://www.baidu.com |
3 | 15110101012 | 1577003281759 | 193.168.1.2 | https://www.taobao.com |
4 | 15110101013 | 1577003281769 | 112.18.1.2 | https://www.baidu.com |
5 | 15110101014 | 1577003281779 | 112.168.10.2 | https://www.baidu.com |
6 | 15110101015 | 1577003281789 | 11.168.1.2 | https://www.taobao.com |
增量導入有幾個參數,保證下次同步的時候能夠接着上次繼續同步.
sqoop job --create mysql2hive_job -- import \ --connect jdbc:mysql://master1.hadoop:3306/test \ --username root \ --password 123456 \ --table people_access_log \ --target-dir /user/hive/warehouse/web.db/people_access_log \ --check-column id \ --incremental append \ --fields-terminated-by '\t' \ --last-value 6 \ -m 1
這裏經過sqoop job --create job_name
命令建立了一個名爲mysql2hive_job
的sqoop job
。
建立好了job
,後面只須要定時週期執行這個提早定義好的job
便可。咱們先往mysql
裏面插入2條數據。
INSERT INTO `people_access_log` (`id`,`user_id`,`access_time`,`ip`,`url`) VALUES (7,15110101016,1577003281790,'112.168.1.3','https://www.qq.com'), (8,15110101017,1577003281791,'112.1.1.3','https://www.microsoft.com');
這樣mysql
裏面就會多了2條數據。此時hive
裏面只有id
爲1 ~ 6
的數據,執行同步job
使用如下命令。
sqoop job -exec mysql2hive_job
執行完成後,發現剛纔mysql
新加入的id
爲7 ~ 8
的兩條數據已經同步到hive
。
hive> select * from web.people_access_log; OK 1 15110101010 1577003281739 112.168.1.2 https://www.baidu.com 2 15110101011 1577003281749 112.16.1.23 https://www.baidu.com 3 15110101012 1577003281759 193.168.1.2 https://www.taobao.com 4 15110101013 1577003281769 112.18.1.2 https://www.baidu.com 5 15110101014 1577003281779 112.168.10.2 https://www.baidu.com 6 15110101015 1577003281789 11.168.1.2 https://www.taobao.com 7 15110101016 1577003281790 112.168.1.3 https://www.qq.com 8 15110101017 1577003281791 112.1.1.3 https://www.microsoft.com
因爲實際場景中,mysql
表中的數據,好比訂單表等,一般是一致有數據進入的,這時候只須要將sqoop job -exec mysql2hive_job
這個命令定時(好比說10分鐘頻率)執行一次,就能將數據10分鐘同步一次到hive
數據倉庫。
Lastmodified
導入實戰append
適合業務系統庫,通常業務系統表會經過自增ID做爲主鍵標識惟一性。Lastmodified
適合ETL
的數據根據時間戳字段導入,表示只導入比這個時間戳大,即比這個時間晚的數據。
在mysql
中新建一張表people_access_log2
,而且初始化幾條數據:
CREATE TABLE `people_access_log2` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id', `user_id` bigint(20) unsigned NOT NULL COMMENT '用戶id', `access_time` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `ip` varchar(15) NOT NULL COMMENT '訪客ip', `url` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
插入數據:
insert into people_access_log2(id,user_id, ip, url) values(1,15110101010,'112.168.1.200','https://www.baidu.com'); insert into people_access_log2(id,user_id, ip, url) values(2,15110101011,'112.16.1.2','https://www.baidu.com'); insert into people_access_log2(id,user_id, ip, url) values(3,15110101012,'112.168.1.2','https://www.taobao.com'); insert into people_access_log2(id,user_id, ip, url) values(4,15110101013,'112.168.10.2','https://www.baidu.com'); insert into people_access_log2(id,user_id, ip, url) values(5,15110101014,'112.168.1.2','https://www.jd.com'); insert into people_access_log2(id,user_id, ip, url) values(6,15110101015,'112.168.12.4','https://www.qq.com');
mysql
裏面的數據就是這樣:
id | user_id | access_time | ip | url |
---|---|---|---|---|
1 |
15110101010 |
2019-12-28 16:23:10 |
112.168.1.200 |
https://www.baidu.com |
2 |
15110101011 |
2019-12-28 16:23:33 |
112.16.1.2 |
https://www.baidu.com |
3 |
15110101012 |
2019-12-28 16:23:41 |
112.168.1.2 |
https://www.taobao.com |
4 |
15110101013 |
2019-12-28 16:23:46 |
112.168.10.2 |
https://www.baidu.com |
5 |
15110101014 |
2019-12-28 16:23:52 |
112.168.1.2 |
https://www.jd.com |
6 |
15110101015 |
2019-12-28 16:23:56 |
112.168.12.4 |
https://www.qq. |
hive
表:初始化hive
數據,將mysql
裏面的6
條數據導入hive
中,而且能夠自動幫助咱們建立對應hive
表,何樂而不爲,不然咱們須要本身手動建立,完成初始化工做。
sqoop import \ --connect jdbc:mysql://master1.hadoop:3306/test \ --username root \ --password 123456 \ --table people_access_log2 \ --hive-import \ --create-hive-table \ --fields-terminated-by ',' \ --hive-table web.people_access_log2
能夠看到執行該命令後,啓動了二一個mapreduce
任務,這樣6條數據就進入hive
表web.people_access_log2
了:
hive> select * from web.people_access_log2; OK 1 15110101010 2019-12-28 16:23:10.0 112.168.1.200 https://www.baidu.com 2 15110101011 2019-12-28 16:23:33.0 112.16.1.2 https://www.baidu.com 3 15110101012 2019-12-28 16:23:41.0 112.168.1.2 https://www.taobao.com 4 15110101013 2019-12-28 16:23:46.0 112.168.10.2 https://www.baidu.com 5 15110101014 2019-12-28 16:23:52.0 112.168.1.2 https://www.jd.com 6 15110101015 2019-12-28 16:23:56.0 112.168.12.4 https://www.qq.com Time taken: 0.326 seconds, Fetched: 6 row(s)
咱們再次插入一條數據進入mysql
的people_access_log2
表:
insert into people_access_log2(id,user_id, ip, url) values(7,15110101016,'112.168.12.45','https://www.qq.com');
此時,mysql
表裏面已經有7
條數據了,咱們使用incremental
的方式進行增量的導入到hive
:
sqoop import \ --connect jdbc:mysql://master1.hadoop:3306/test \ --username root \ --password 123456 \ --table people_access_log2 \ --hive-import \ --hive-table people_access_log2 \ -m 1 \ --check-column access_time \ --incremental lastmodified \ --last-value "2019-12-28 16:23:56" \
2019-12-28 16:23:56
就是第6條數據的時間,這裏須要指定。報錯了:
19/12/28 16:17:25 ERROR tool.ImportTool: Error during import: --merge-key or --append is required when using --incremental lastmodified and the output directory exists.
注意:能夠看到--merge-key or --append is required when using --incremental lastmodified
意思是,這種基於時間導入模式,須要指定--merge-key
或者--append
參數,表示根據時間戳導入,數據是直接在末尾追加(append)仍是合併(merge),這裏使用merge
方式,根據id
合併:
sqoop import \ --connect jdbc:mysql://master1.hadoop:3306/test \ --username root \ --password 123456 \ --table people_access_log2 \ --hive-import \ --hive-table web.people_access_log2 \ --check-column access_time \ --incremental lastmodified \ --last-value "2019-12-28 16:23:56" \ --fields-terminated-by ',' \ --merge-key id
執行該命令後,與直接導入不一樣,該命令啓動了2個mapreduce
任務,這樣就把數據增量merge
導入hive
表了.
hive> select * from web.people_access_log2 order by id; OK 1 15110101010 2019-12-28 16:23:10.0 112.168.1.200 https://www.baidu.com 2 15110101011 2019-12-28 16:23:33.0 112.16.1.2 https://www.baidu.com 3 15110101012 2019-12-28 16:23:41.0 112.168.1.2 https://www.taobao.com 4 15110101013 2019-12-28 16:23:46.0 112.168.10.2 https://www.baidu.com 5 15110101014 2019-12-28 16:23:52.0 112.168.1.2 https://www.jd.com 6 15110101015 2019-12-28 16:23:56.0 112.168.12.4 https://www.qq.com 6 15110101015 2019-12-28 16:23:56.0 112.168.12.4 https://www.qq.com 7 15110101016 2019-12-28 16:28:24.0 112.168.12.45 https://www.qq.com Time taken: 0.241 seconds, Fetched: 8 row(s)
能夠看到id=6
的數據,有2條,它的時間恰好是--last-value
指定的時間,則會導入大於等於--last-value
指定時間的數據,這點須要注意。