sqoop用法之mysql與hive數據導入導出

一. Sqoop介紹

Sqoop是一個用來將Hadoop和關係型數據庫中的數據相互轉移的工具,能夠將一個關係型數據庫(例如:MySQL、Oracle、Postgres等)中的數據導進到HadoopHDFS中,也能夠將HDFS的數據導進到關係型數據庫中。對於某些NoSQL數據庫它也提供了鏈接器。Sqoop,相似於其餘ETL工具,使用元數據模型來判斷數據類型並在數據從數據源轉移到Hadoop時確保類型安全的數據處理。Sqoop專爲大數據批量傳輸設計,可以分割數據集並建立Hadoop任務來處理每一個區塊。java

本文版本說明mysql

hadoop版本 : hadoop-2.7.2
hive版本 : hive-2.1.0
sqoop版本:sqoop-1.4.6web

二. Mysql 數據導入到 Hive

1). 將mysqlpeople_access_log表導入到hiveweb.people_access_log,而且hive中的表不存在。
mysql中表people_access_log數據爲:sql

1,15110101010,1577003281739,'112.168.1.2','https://www.baidu.com'
2,15110101011,1577003281749,'112.16.1.23','https://www.baidu.com'
3,15110101012,1577003281759,'193.168.1.2','https://www.taobao.com'
4,15110101013,1577003281769,'112.18.1.2','https://www.baidu.com'
5,15110101014,1577003281779,'112.168.10.2','https://www.baidu.com'
6,15110101015,1577003281789,'11.168.1.2','https://www.taobao.com'

mysql數據導入hive的命令爲:數據庫

sqoop import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log \
-m 1 \
--hive-import \
--create-hive-table \
--fields-terminated-by '\t' \
--hive-table web.people_access_log

該命令會啓用一個mapreduce任務,將mysql數據導入到hive表,而且指定了hive表的分隔符爲\t,若是不指定則爲默認分隔符^A(ctrl+A)安全

參數說明bash

參數 說明
--connect mysql的鏈接信息
--username mysql的用戶名
--password mysql的密碼
--table 被導入的mysql源表名
-m 並行導入啓用的map任務數量,與--num-mapper含義同樣
--hive-import 插入數據到hive當中,使用hive默認的分隔符,可使用--fields-terminated-by參數來指定分隔符。
-- hive-table hive當中的表名

2). 也能夠經過--query條件查詢Mysql數據,將查詢結果導入到Hiveapp

sqoop import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--query 'select * from people_access_log where \$CONDITIONS and url = "https://www.baidu.com"' \
--target-dir /user/hive/warehouse/web/people_access_log \
--delete-target-dir \
--fields-terminated-by '\t' \
-m 1
參數 說明
--query 後接查詢語句,條件查詢須要\$CONDITIONS and鏈接查詢條件,這裏的\$表示轉義$,必須有.
--delete-target-dir 若是目標hive表目錄存在,則刪除,至關於overwrite.

三. Hive數據導入到Mysql

仍是使用上面的hiveweb.people_access_log,將其導入到mysql中的people_access_log_out表中.工具

sqoop export \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log_out \
--input-fields-terminated-by '\t' \
--export-dir /user/hive/warehouse/web.db/people_access_log \
--num-mappers 1

注意:mysqlpeople_access_log_out須要提早建好,不然報錯:ErrorException: Table 'test.people_access_log_out' doesn't exist。若是有id自增列,hive表也須要有,hive表與mysql表字段必須徹底相同。oop

create table people_access_log_out like people_access_log;

執行完一個mr任務後,成功導入到mysqlpeople_access_log_out中.

四. mysql數據增量導入hive

實際中mysql數據會不斷增長,這時候須要用sqoop將數據增量導入hive,而後進行海量數據分析統計。增量數據導入分兩種,一是基於遞增列的增量數據導入(Append方式)。二是基於時間列的增量數據導入(LastModified方式)。有幾個核心參數:

  • –check-column:用來指定一些列,這些列在增量導入時用來檢查這些數據是否做爲增量數據進行導入,和關係型數據庫中的自增字段及時間戳相似.注意:這些被指定的列的類型不能使任意字符類型,如char、varchar等類型都是不能夠的,同時–check-column能夠去指定多個列
  • –incremental:用來指定增量導入的模式,兩種模式分別爲AppendLastmodified
  • –last-value:指定上一次導入中檢查列指定字段最大值

1. 基於遞增列Append導入

接着前面的日誌表,裏面每行有一個惟一標識自增列ID,在關係型數據庫中以主鍵形式存在。以前已經將id在0~6之間的編號的訂單導入到Hadoop中了(這裏爲HDFS),如今一段時間後咱們須要將近期產生的新的訂 單數據導入Hadoop中(這裏爲HDFS),以供後續數倉進行分析。此時咱們只須要指定–incremental 參數爲append–last-value參數爲6便可。表示只從id大於6後即7開始導入。

1). 建立hive

首先咱們須要建立一張與mysql結構相同的hive表,假設指定字段分隔符爲\t,後面導入數據時候分隔符也須要保持一致。

2). 建立job

增量導入確定是屢次進行的,可能每隔一個小時、一天等,因此須要建立計劃任務,而後定時執行便可。咱們都知道hive的數據是存在hdfs上面的,咱們建立sqoop job的時候須要指定hive的數據表對應的hdfs目錄,而後定時執行這個job便可。

當前mysql中數據,hive中數據與mysql同樣也有6條:

id user_id access_time ip url
1 15110101010 1577003281739 112.168.1.2 https://www.baidu.com
2 15110101011 1577003281749 112.16.1.23 https://www.baidu.com
3 15110101012 1577003281759 193.168.1.2 https://www.taobao.com
4 15110101013 1577003281769 112.18.1.2 https://www.baidu.com
5 15110101014 1577003281779 112.168.10.2 https://www.baidu.com
6 15110101015 1577003281789 11.168.1.2 https://www.taobao.com

增量導入有幾個參數,保證下次同步的時候能夠接着上次繼續同步.

sqoop job --create mysql2hive_job -- import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log \
--target-dir /user/hive/warehouse/web.db/people_access_log \
--check-column id \
--incremental append \
--fields-terminated-by '\t' \
--last-value 6 \
-m 1

這裏經過sqoop job --create job_name命令建立了一個名爲mysql2hive_jobsqoop job

3). 執行job

建立好了job,後面只須要定時週期執行這個提早定義好的job便可。咱們先往mysql裏面插入2條數據。

INSERT INTO `people_access_log` (`id`,`user_id`,`access_time`,`ip`,`url`) VALUES
(7,15110101016,1577003281790,'112.168.1.3','https://www.qq.com'),
(8,15110101017,1577003281791,'112.1.1.3','https://www.microsoft.com');

這樣mysql裏面就會多了2條數據。此時hive裏面只有id1 ~ 6的數據,執行同步job使用如下命令。

sqoop job -exec mysql2hive_job

執行完成後,發現剛纔mysql新加入的id7 ~ 8的兩條數據已經同步到hive

hive> select * from web.people_access_log;
OK
1	15110101010	1577003281739	112.168.1.2	https://www.baidu.com
2	15110101011	1577003281749	112.16.1.23	https://www.baidu.com
3	15110101012	1577003281759	193.168.1.2	https://www.taobao.com
4	15110101013	1577003281769	112.18.1.2	https://www.baidu.com
5	15110101014	1577003281779	112.168.10.2	https://www.baidu.com
6	15110101015	1577003281789	11.168.1.2	https://www.taobao.com
7	15110101016	1577003281790	112.168.1.3	https://www.qq.com
8	15110101017	1577003281791	112.1.1.3	https://www.microsoft.com

因爲實際場景中,mysql表中的數據,好比訂單表等,一般是一致有數據進入的,這時候只須要將sqoop job -exec mysql2hive_job這個命令定時(好比說10分鐘頻率)執行一次,就能將數據10分鐘同步一次到hive數據倉庫。

2. Lastmodified 導入實戰

append適合業務系統庫,通常業務系統表會經過自增ID做爲主鍵標識惟一性。Lastmodified適合ETL的數據根據時間戳字段導入,表示只導入比這個時間戳大,即比這個時間晚的數據。

1). 新建一張表

mysql中新建一張表people_access_log2,而且初始化幾條數據:

CREATE TABLE `people_access_log2` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
  `user_id` bigint(20) unsigned NOT NULL COMMENT '用戶id',
  `access_time` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `ip` varchar(15) NOT NULL COMMENT '訪客ip',
  `url` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

插入數據:

insert into people_access_log2(id,user_id, ip, url) values(1,15110101010,'112.168.1.200','https://www.baidu.com');
insert into people_access_log2(id,user_id, ip, url) values(2,15110101011,'112.16.1.2','https://www.baidu.com');
insert into people_access_log2(id,user_id, ip, url) values(3,15110101012,'112.168.1.2','https://www.taobao.com');
insert into people_access_log2(id,user_id, ip, url) values(4,15110101013,'112.168.10.2','https://www.baidu.com');
insert into people_access_log2(id,user_id, ip, url) values(5,15110101014,'112.168.1.2','https://www.jd.com');
insert into people_access_log2(id,user_id, ip, url) values(6,15110101015,'112.168.12.4','https://www.qq.com');

mysql裏面的數據就是這樣:

id user_id access_time ip url
1 15110101010 2019-12-28 16:23:10 112.168.1.200 https://www.baidu.com
2 15110101011 2019-12-28 16:23:33 112.16.1.2 https://www.baidu.com
3 15110101012 2019-12-28 16:23:41 112.168.1.2 https://www.taobao.com
4 15110101013 2019-12-28 16:23:46 112.168.10.2 https://www.baidu.com
5 15110101014 2019-12-28 16:23:52 112.168.1.2 https://www.jd.com
6 15110101015 2019-12-28 16:23:56 112.168.12.4 https://www.qq.

2). 初始化hive表:

初始化hive數據,將mysql裏面的6條數據導入hive中,而且能夠自動幫助咱們建立對應hive表,何樂而不爲,不然咱們須要本身手動建立,完成初始化工做。

sqoop import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log2 \
--hive-import \
--create-hive-table \
--fields-terminated-by ',' \
--hive-table web.people_access_log2

能夠看到執行該命令後,啓動了二一個mapreduce任務,這樣6條數據就進入hiveweb.people_access_log2了:

hive> select * from web.people_access_log2;
OK
1	15110101010	2019-12-28 16:23:10.0	112.168.1.200	https://www.baidu.com
2	15110101011	2019-12-28 16:23:33.0	112.16.1.2	https://www.baidu.com
3	15110101012	2019-12-28 16:23:41.0	112.168.1.2	https://www.taobao.com
4	15110101013	2019-12-28 16:23:46.0	112.168.10.2	https://www.baidu.com
5	15110101014	2019-12-28 16:23:52.0	112.168.1.2	https://www.jd.com
6	15110101015	2019-12-28 16:23:56.0	112.168.12.4	https://www.qq.com
Time taken: 0.326 seconds, Fetched: 6 row(s)

3). 增量導入數據:

咱們再次插入一條數據進入mysqlpeople_access_log2表:

insert into people_access_log2(id,user_id, ip, url) values(7,15110101016,'112.168.12.45','https://www.qq.com');

此時,mysql表裏面已經有7條數據了,咱們使用incremental的方式進行增量的導入到hive:

sqoop import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log2 \
--hive-import \
--hive-table people_access_log2 \
-m 1 \
--check-column access_time \
--incremental lastmodified \
--last-value "2019-12-28 16:23:56" \

2019-12-28 16:23:56就是第6條數據的時間,這裏須要指定。報錯了:

19/12/28 16:17:25 ERROR tool.ImportTool: Error during import: --merge-key or --append is required when using --incremental lastmodified and the output directory exists.

注意:能夠看到--merge-key or --append is required when using --incremental lastmodified意思是,這種基於時間導入模式,須要指定--merge-key或者--append參數,表示根據時間戳導入,數據是直接在末尾追加(append)仍是合併(merge),這裏使用merge方式,根據id合併:

sqoop import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log2 \
--hive-import \
--hive-table web.people_access_log2 \
--check-column access_time \
--incremental lastmodified \
--last-value "2019-12-28 16:23:56" \
--fields-terminated-by ',' \
--merge-key id

執行該命令後,與直接導入不一樣,該命令啓動了2個mapreduce任務,這樣就把數據增量merge導入hive表了.

hive> select * from web.people_access_log2 order by id;
OK
1	15110101010	2019-12-28 16:23:10.0	112.168.1.200	https://www.baidu.com
2	15110101011	2019-12-28 16:23:33.0	112.16.1.2	https://www.baidu.com
3	15110101012	2019-12-28 16:23:41.0	112.168.1.2	https://www.taobao.com
4	15110101013	2019-12-28 16:23:46.0	112.168.10.2	https://www.baidu.com
5	15110101014	2019-12-28 16:23:52.0	112.168.1.2	https://www.jd.com
6	15110101015	2019-12-28 16:23:56.0	112.168.12.4	https://www.qq.com
6	15110101015	2019-12-28 16:23:56.0	112.168.12.4	https://www.qq.com
7	15110101016	2019-12-28 16:28:24.0	112.168.12.45	https://www.qq.com
Time taken: 0.241 seconds, Fetched: 8 row(s)

能夠看到id=6的數據,有2條,它的時間恰好是--last-value指定的時間,則會導入大於等於--last-value指定時間的數據,這點須要注意。

相關文章
相關標籤/搜索