DataX3.0實現Mysql數據傳遞至HDFS與Hive

一、須要在hive中先行創建數據庫和要寫入的表。python

CREATE TABLE datax

   (id  BIGINT,

   test1  VARCHAR(25),

   test2  INT,

   test3  INT) ROW FORMAT DELIMITED

   FIELDS TERMINATED BY '\t'

   STORED AS TEXTFILE;

二、配置mysql寫入hdfs的做業配置文件,json格式。mysql

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [
                            "*"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://192.xxx.x.x(數據庫地址):3306(數據庫端口)/data(數據庫名稱)"
                                ],
                                "table": [
                                    "tb_sensorstate"
                                ]
                            }
                        ],
                        "password": "hivedb",
                        "username": "xxxxx"
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "BIGINT"
                            },
                            {
                                "name": "test1",
                                "type": "VARCHAR"
                            },
                            {
                                "name": "test2",
                                "type": "INT"
                            },
                            {
                                "name": "test3",
                                "type": "INT"
                            }
                        ],
                        "compress": "gzip",
                        "defaultFS": "hdfs://192.xxx.x.xxx(HDFS地址):9000(HDFS端口)",
                        "fieldDelimiter": "\t",
                        "fileName": "dataxtest",
                        "fileType": "text",
                        "path": "/usr/local/hadoop/hive/warehouse/datax(hive倉庫地址)",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "2"
            }
        }
    }
}

三、運行job做業:git

   命令:python datax.py ../job/mysqlreader-hdfswriter.jsonsql

四、運行成功結果:數據庫

Job配置項信息能夠參看從git上下載的源碼文件中的指定插件文件夾下的doc文件夾中的插件說明。json

本例中:併發

   一、mysqlreader中的column配置項中以[「*」]代替全部要從mysql指定表中讀取的行。運行時:app

 您的配置文件中的列配置存在必定的風險. 由於您未配置讀取數據庫表的列,當您的表字段個數、類型有變更時,可能影響任務正確性甚至會運行出錯。請檢查您的配置並做出修改。oop

實際應用中應該依據實際狀況配置相應的字段名和類型。測試

  二、本例配置項說明

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "插件名",
                    "parameter": {
                        "column": [
                            要讀取的數據庫表中的列的字段名和類型。
                        ],
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "須要讀取的mysql數據庫地址"
                                ],
                                "table": [
                                    "須要讀取的表名"
                                ]
                            }
                        ],
                        "password": "數據庫鏈接密碼",
                        "username": "數據庫鏈接用戶名"
                    }
                },
                "writer": {
                    "name": "插件名",
                    "parameter": {
                        "column": [
                           要寫入的數據庫表中的列的字段名和類型。
                        ],
                        "compress": "壓縮格式",
                        "defaultFS": "HDFS的IP和端口",
                        "fieldDelimiter": "hive先行建表的時候使用的分隔符",
                        "fileName": "寫入HDFS時的文件名前綴",
                        "fileType": "文件格式",
                        "path": "hadoop上hive倉庫的地址",
                        "writeMode": "append"//append,寫入前不作任何處理,DataX hdfswriter直接使用filename寫入,並保證文件名不衝突。
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "2"//應該是設置併發傳輸通道數,可是datax的用戶指導中未見詳細說明。
            }
        }
    }
}

追加寫入

Mysqlreader 中讀取更新數據能夠配置where或querysql配置項。

如下引用官方文檔:

* **where***   
描述:篩選條件,MysqlReader根據指定的column、table、where條件拼接SQL,並根據這個SQL進行數據抽取。
例如在作測試時,能夠將where條件指定爲limit 10;在實際業務場景中,每每會選擇當天的數據進行同步,
能夠將where條件指定爲gmt_create > $bizdate 。<br />。
where條件能夠有效地進行業務增量同步。若是不填寫where語句,包括不提供where的key或者value,
DataX均視做同步全量數據。
* 必選:否 <br />
* 默認值:無 <br />

例:

"where":[
     "id>22"
   ]

再job中加入where項,便可實現id大於22的數據更新。

如下引用官方文檔:

* **querySql*** 
描述:在有些業務場景下,where這一配置項不足以描述所篩選的條件,用戶能夠經過該配置型來自定義篩選SQL。
當用戶配置了這一項以後,DataX系統就會忽略table,column這些配置型,直接使用這個配置項的內容對數據進行篩選,
例如須要進行多表join後同步數據,
使用select a,b from table_a join table_b on table_a.id = table_b.id <br />
當用戶配置querySql時,MysqlReader直接忽略table、column、where條件的配置,
querySql優先級大於table、column、where選項。
* 必選:否 <br />
* 默認值:無 <br />
相關文章
相關標籤/搜索