使用logstash-jdbc-input插件實現mongodb數據實時同步到elasticsearch

1、實驗介紹

logstash-jdbc-input 是Logstash提供的官方插件之一,該插件經過JDBC接口將任何數據庫中的數據導入 Logstash。關於使用logstash-jdbc-input 插件從數據庫中導出數據到es上,大部分是關於mysql數據庫的導入。本篇文章是關於如何使用logstash-jdbc-input 插件對mongodb的數據進行實時導入。html

2、版本說明

本實驗使用的ELK版本是7.6.2。
(這裏想要補充一下,關於mongodb數據庫的數據導入,另一種常使用的插件是mongo-connector,但該插件僅支持到elasticsearch5.x,所以對於更高版本的elasticsearch更推薦使用本篇文章使用的方法。)
java

3、具體實現

1. 下載相關的jdbc-driver文件並解壓
  • 下載地址: https://dbschema.com/jdbc-drivers/MongoDbJdbcDriver.zip
  • 解壓安裝包: unzip MongoDbJdbcDriver.zip
    (安裝包裏面包括三個jar包文件:gson-2.8.6.jarmongo-java-driver-3.12.4.jarmongojdbc2.1.jar)
  • 將全部文件(即三個jar包)複製到(~/logstash-7.6.2/logstash-core/lib/jars/)目錄(即你的logstash所在的安裝目錄)
2. 編寫配置文件內容
  • 在你的logstash安裝目錄下新建一個.conf文件
  • 關於.conf配置文件主要由input, filter, output 三大板塊組成,咱們依次介紹如何填寫各部分的內容:
2.1 input
input {
  jdbc {
    jdbc_driver_class => "com.dbschema.MongoJdbcDriver"
    # jar包的目錄
    jdbc_driver_library => "logstash/logstash-core/lib/jars/mongojdbc2.1.jar"
    # mongo數據庫對應的uri
    jdbc_connection_string => jdbc_connection_string => "jdbc:mongodb://127.0.0.1:27017/dbtest"
    # 這個不填  
    jdbc_user => ""
    # 這個不填
    jdbc_password => ""
    # 表示每分鐘中執行一次,以實現實時同步的效果
    schedule => "* * * * *"
    # mongodb的查詢語句
    statement => "db.dbtest.find({}, {_id: 0})"
  }
}
  • 在編寫mongodb查詢語句時咱們須要注意,因爲logstash沒法識別mongodb中的 ObjectId類型,所以咱們須要拋棄該字段,所以在find語句中咱們設置_id:0,即表示不須要該字段。
2.2 filter
filter {
  # 數據預處理
}
  • filter部分主要是針對mongodb中的數據進行預處理,若是不須要進行預處理,這部份內容沒必要填寫;關於filter實現預處理的部份內容比較繁多,以後會專門出一篇文章進行總結,這裏再也不贅述。
2.3 output
output {
 elasticsearch {
    # es所在的地址
    hosts => "localhost:9200"
    # 導入到es上對應的索引
    index => "test"
  }
  
  stdout {
    codec => json_lines
  }
}
3. 實現數據的實時同步(全量法)
  • 全量法,即指每次將表的全部數據所有導入,這種方法可能會致使數據重複的問題,由於每次同步時都會將以前已經導入的數據再導入一遍,爲避免數據重複的問題,咱們須要對每條數據進行標識,這樣在每次同步時es中若已出現相同標識的數據則會選擇覆蓋,以此實現數據實時同步的效果。
  • 實現數據標識效果,即在output部分指定document_id便可
output {
 elasticsearch {
    # es所在的地址
    hosts => "localhost:9200"
    # 導入到es上對應的索引
    index => "test"
    # 指定標識每條數據的字段
    document_id => "%{id}"
  }
  
  stdout {
    codec => json_lines
  }
}
  • 這裏須要注意的是,咱們沒法使用mongodb自動生成的id做爲標識符,由於id是ObjectId類型,在input階段咱們已經把該字段刪去了,所以這裏應該選擇表中其餘能標識數據且不是ObjectId類型的字段(string, int等皆可)
4. 實現數據的實時同步(增量法)
  • 若在你的數據中除了mongodb自動生成的id再也不有其它具備標識性質的字段,能夠考慮使用增量法實現數據的實時同步。增量法,即每次同步時是從上一次執行命令的時間開始,將插入時間在上一次命令以後的數據導入es中。增量法的優勢是沒必要每次將所有數據導入,而是隻導入新加入到數據庫的數據,能夠減少每次同步時的壓力。
  • 使用增量法實現數據同步,須要修改input部分的代碼
input {
  jdbc {
    jdbc_driver_class => "com.dbschema.MongoJdbcDriver"
    # jar包的目錄
    jdbc_driver_library => "logstash/logstash-core/lib/jars/mongojdbc2.1.jar"
    # mongo數據庫對應的uri
    jdbc_connection_string => jdbc_connection_string => "jdbc:mongodb://127.0.0.1:27017/dbtest"
    # 這個不填  
    jdbc_user => ""
    # 這個不填
    jdbc_password => ""
    # 表示每分鐘中執行一次,以實現實時同步的效果
    schedule => "* * * * *"
    # 實現增量同步的mongodb的查詢語句
    statement => "db.dbtest.find({ $gte: ISODate(:sql_last_value) }, {_id: 0})"
    # 保存上一次執行時間的文件
    last_run_metadata_path => "/logstash-7.6.2/.logstash_jdbc_last_run"
  }
}
  • 實現增量同步主要是兩個字段:
    • statement: 執行mongodb查詢的字段
      • 關於 :sql_last_value :logstash中提供的一個協助查詢的時間參數,默認值是1970-01-01 08:00:00,數據類型是string,每次執行命令以後,該值會替換成執行命令時刻的時間。
      • 在修改find語句時容易由於:sql_last_value的類型出錯:若是表中關於時間的數據類型是string,那在find語句中改成db.dbtest.find({ $gte: :sql_last_value}, {_id: 0})便可;若若是表中關於時間的數據類型是date,那在find語句須要進行類型轉換,即改成·db.dbtest.find({ $gte: ISODate(:sql_last_value)}, {_id: 0})
    • last_run_metadata_path: 保存上一次執行時間的文件,能夠放在任意目錄下,我這裏放在了/logstash-7.6.2的目錄下面
5. 運行文件
/logstash-7.6.2/bin/logstash -f /logstash-7.6.2/dbtest.conf --path.data=/logstash-7.6.2/data/dbtest

4、可能出現的報錯

1. 沒法識別ObjectId錯誤
  • 報錯信息:Exception when executing JDBC query {:exception=>#<Sequel::DatabaseError: Java::OrgLogstash::MissingConverterException: Missing Converter handling for full class name=org.bson.types.ObjectId, simple name=ObjectId>}mysql

  • 錯誤緣由:在input部分編寫mongodb查詢語句時須要注意,因爲logstash沒法識別mongodb中的 ObjectId類型,所以咱們須要拋棄該字段,所以在find語句中咱們設置_id:0,即表示不須要該字段。git

db.dbtest.find({}, {_id: 0})

【tips】經過mongodb的find語句咱們還能夠選取只導出文檔中的某一字段,具體操做可參考官方文檔:https://docs.mongodb.com/manual/reference/method/db.collection.find/github

2. 增量同步無效可是沒有報錯信息
  • 這一問題的緣由在上面的增量法部分中也有提到過,多是find語句中:sql_last_value或者其餘字段的數據類型不正確,建議檢查一下數據庫中字段類型和find語句中的查詢條件是否匹配

參考文章

  1. https://stackoverflow.com/questions/58342818/sync-mongodb-to-elasticsearch
  2. https://docs.mongodb.com/manual/reference/method/db.collection.find/
  3. https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html
相關文章
相關標籤/搜索