【實戰】使用 Kettle 工具將 mysql 數據增量導入到 MongoDB 中

最近有一個將 mysql 數據導入到 MongoDB 中的需求,打算使用 Kettle 工具實現。本文章記錄了數據導入從0到1的過程,最終實現了每秒鐘快速導入約 1200 條數據。一塊兒來看吧~mysql

1、Kettle 鏈接圖

簡單說下該轉換流程,增量導入數據:linux

<!--more-->sql

1)根據 source 和 db 字段來獲取 MongoDB 集合內 business_time 最大值。shell

2)設置 mysql 語句數據庫

3)對查詢的字段進行更名json

4)過濾數據:只往 MongoDB 裏面導入 person_id,address,business_time 字段均不爲空的數據。微信

  • 符合過濾條件的數據,增長常量,並將其導入到 mongoDB 中。
  • 不符合過濾條件的數據,增長常量,將其導入到 Excel 表中記錄。

2、流程組件解析

一、MongoDB input

1)Configure connection

  • Host name(s) or IP address(es):網絡名稱或者地址。能夠輸入多個主機名或IP地址,用逗號分隔。還能夠經過將主機名和端口號與冒號分隔開,爲每一個主機名指定不一樣的端口號,並將主機名和端口號的組合與逗號分隔開。例如,要爲兩個不一樣的MongoDB實例包含主機名和端口號,您將輸入localhost 1:27017,localhost 2:27018,並使 Port 字段爲空。
  • Port:端口號
  • Username:用戶名
  • Password:密碼
  • Authenticate using Kerberos:指示是否使用Kerberos服務來管理身份驗證過程。
  • Connection timeout:鏈接超時時間(毫秒)
  • Socket timeout:等待寫操做(以毫秒爲單位)的時間
2)Input options

  • Database:檢索數據的數據庫的名稱。點擊 「Get DBs」 按鈕以獲取數據庫列表。
  • Collection:集合名稱。點擊 「Get collections」 按鈕獲取集合列表。
  • Read preference:表示要先讀取哪一個節點。
  • Tag set specification/#/Tag Set:標籤容許您自定義寫關注和讀取副本的首選項。
3)query

根據 source 和 db 字段來獲取 bussiness_time 的最大值,Kettle 的 MongoDB 查詢語句以下圖所示:網絡

對應的 MongDB 的寫法爲:工具

記得勾選 Query is aggregation pipeline 選項:大數據

4)Fields

取消選中 Output single JSON field ,表示下一組件接收到的結果是一個 Number 類型的單值,不然就是一個 json 對象。

二、表輸入

設置 mysql 數據庫 jdbc 鏈接後,填好 SQL 語句以後,在下方的「從步驟插入數據」下拉列表中,選中「MongoDB input」。「MongoDB input」 中的變量,在 SQL 語句中用 ? 表示,以下圖所示:

若是導數的時候發生中文亂碼,能夠點擊 編輯 ,選擇 數據庫鏈接 的 選項,添加配置項:characterEncoding utf8,便可解決。以下圖所示:

三、字段選擇

若是查詢出來的列名須要更改,則可使用「字段選擇」組件,該組件還能夠移除某字段,本次應用中,主要使用該組件將字段名進行修改。以下圖所示:

四、過濾選擇

只保留 person_id,address,business_time 字段都不爲空的數據:

五、增長常量

很簡單,在「增長常量」組件內設置好要增長常量的類型和值便可。

六、Excel 輸出

添加「Excel 輸出」,設置好文件名,若是有必要的話還能夠設置 Excel 字段格式,以下圖所示:

七、MongoDB output

1)Configure connection

以下圖所示,因爲一開始就介紹了 MongoDB 的鏈接方式,因此在這裏不在贅述。

2)Output options

  • Batch insert size:每次批量插入的條數。
  • Truncate collection:執行操做前先清空集合
  • Update:更新數據
  • Upsert:選擇 Upsert 選項將寫入模式從 insert 更改成 upsert(即:若是找到匹配項則更新,不然插入新記錄)。使用前提是 勾選 Update 選項。
  • Muli-update:屢次更新,能夠更新全部匹配的文檔,而不只僅是第一個。
3)Mongo document fields

根據 id、source、db 字段插入更新數據,以下圖所示:

更多 MongoDB output 可參考:https://wiki.pentaho.com/display/EAI/MongoDB+Output

3、索引優化

一、mysql

爲 mysql 查詢字段添加索引。(略)

二、MongoDB

對 MongoDB 查詢作優化,建立複合索引:

對於 MongoDB input 組件來講,會關聯查詢出 business_time 最大值,因此要建立複合索引,建立複合索引時要注意字段順序,按照查詢順序建立:

db.trajectory_data.createIndex({source: 1, db: 1, business_time: 1})

對於 MongoDB output 組件來講,由於已經設置了 插入或更新 數據的規則,也會涉及到查詢,因此再設置一個複合索引:

db.trajectory_data.createIndex({id: 1, source: 1, db: 1})

4、運行

運行前,須要在集合內插入一條含 business_time 字段的 demo 數據,不然 MongoDB input 會由於查不到數據而報錯:

db.trajectory_data.insert({
    id: 0,
    source: 'xx數據',
    db: "17-db2",
    business_time: 0
})

成功插入數據後,執行該轉換:

  • 可視化操做
  • 命令行操做:${KETTLE_HOME}/pan.sh -file=xxx.ktr

可經過點擊 「執行結果」 --> 「步驟度量」 來查看各組件運行狀態,以下圖所示:

24 分鐘共導了 172 萬的數據,每秒鐘約導入 1200 條數據。

這樣子,這個轉換基本就算完成了。能夠在 linux 上寫一個定時任務去執行這個轉換,每次轉換 mysql 都會將大於 mongoDB 集合中 business_time 字段最大值的數據增量導入到 MongoDB 中。

5、不足

像上述的 Kettle 流程也是有不足的。假如一次性拉取的數據量過大,頗有可能致使 Mysql 或 Kettle 內存溢出而報錯。因此上述流程只適合小數據量導入。大數據量導入的話仍是建議分批次導入或者分頁導入,你們能夠研究一下。


本文來自: 微信公衆號【大數據實戰演練】。閱讀更多精彩好文,歡迎關注微信公衆號【大數據實戰演練】。

相關文章
相關標籤/搜索