1、實驗介紹
logstash-jdbc-input
是Logstash提供的官方插件之一,該插件經過JDBC接口將任何數據庫中的數據導入 Logstash。關於使用logstash-jdbc-input
插件從數據庫中導出數據到es上,大部分是關於mysql數據庫的導入。本篇文章是關於如何使用logstash-jdbc-input
插件對mongodb的數據進行實時導入。html
2、版本說明
本實驗使用的ELK版本是7.6.2。
(這裏想要補充一下,關於mongodb數據庫的數據導入,另一種常使用的插件是mongo-connector,但該插件僅支持到elasticsearch5.x,所以對於更高版本的elasticsearch更推薦使用本篇文章使用的方法。)
java
3、具體實現
1. 下載相關的jdbc-driver文件並解壓
- 下載地址: https://dbschema.com/jdbc-drivers/MongoDbJdbcDriver.zip
- 解壓安裝包:
unzip MongoDbJdbcDriver.zip
(安裝包裏面包括三個jar
包文件:gson-2.8.6.jar
、mongo-java-driver-3.12.4.jar
、mongojdbc2.1.jar
) - 將全部文件(即三個jar包)複製到
(~/logstash-7.6.2/logstash-core/lib/jars/)
目錄(即你的logstash所在的安裝目錄)
2. 編寫配置文件內容
- 在你的logstash安裝目錄下新建一個
.conf
文件 - 關於
.conf
配置文件主要由input
,filter
,output
三大板塊組成,咱們依次介紹如何填寫各部分的內容:
2.1 input
input { jdbc { jdbc_driver_class => "com.dbschema.MongoJdbcDriver" # jar包的目錄 jdbc_driver_library => "logstash/logstash-core/lib/jars/mongojdbc2.1.jar" # mongo數據庫對應的uri jdbc_connection_string => jdbc_connection_string => "jdbc:mongodb://127.0.0.1:27017/dbtest" # 這個不填 jdbc_user => "" # 這個不填 jdbc_password => "" # 表示每分鐘中執行一次,以實現實時同步的效果 schedule => "* * * * *" # mongodb的查詢語句 statement => "db.dbtest.find({}, {_id: 0})" } }
- 在編寫mongodb查詢語句時咱們須要注意,因爲logstash沒法識別mongodb中的
ObjectId
類型,所以咱們須要拋棄該字段,所以在find
語句中咱們設置_id:0
,即表示不須要該字段。
2.2 filter
filter { # 數據預處理 }
- filter部分主要是針對mongodb中的數據進行預處理,若是不須要進行預處理,這部份內容沒必要填寫;關於filter實現預處理的部份內容比較繁多,以後會專門出一篇文章進行總結,這裏再也不贅述。
2.3 output
output { elasticsearch { # es所在的地址 hosts => "localhost:9200" # 導入到es上對應的索引 index => "test" } stdout { codec => json_lines } }
3. 實現數據的實時同步(全量法)
- 全量法,即指每次將表的全部數據所有導入,這種方法可能會致使數據重複的問題,由於每次同步時都會將以前已經導入的數據再導入一遍,爲避免數據重複的問題,咱們須要對每條數據進行標識,這樣在每次同步時es中若已出現相同標識的數據則會選擇覆蓋,以此實現數據實時同步的效果。
- 實現數據標識效果,即在
output
部分指定document_id
便可
output { elasticsearch { # es所在的地址 hosts => "localhost:9200" # 導入到es上對應的索引 index => "test" # 指定標識每條數據的字段 document_id => "%{id}" } stdout { codec => json_lines } }
- 這裏須要注意的是,咱們沒法使用mongodb自動生成的id做爲標識符,由於id是
ObjectId
類型,在input
階段咱們已經把該字段刪去了,所以這裏應該選擇表中其餘能標識數據且不是ObjectId
類型的字段(string, int等皆可)
4. 實現數據的實時同步(增量法)
- 若在你的數據中除了mongodb自動生成的
id
再也不有其它具備標識性質的字段,能夠考慮使用增量法實現數據的實時同步。增量法,即每次同步時是從上一次執行命令的時間開始,將插入時間在上一次命令以後的數據導入es中。增量法的優勢是沒必要每次將所有數據導入,而是隻導入新加入到數據庫的數據,能夠減少每次同步時的壓力。 - 使用增量法實現數據同步,須要修改
input
部分的代碼
input { jdbc { jdbc_driver_class => "com.dbschema.MongoJdbcDriver" # jar包的目錄 jdbc_driver_library => "logstash/logstash-core/lib/jars/mongojdbc2.1.jar" # mongo數據庫對應的uri jdbc_connection_string => jdbc_connection_string => "jdbc:mongodb://127.0.0.1:27017/dbtest" # 這個不填 jdbc_user => "" # 這個不填 jdbc_password => "" # 表示每分鐘中執行一次,以實現實時同步的效果 schedule => "* * * * *" # 實現增量同步的mongodb的查詢語句 statement => "db.dbtest.find({ $gte: ISODate(:sql_last_value) }, {_id: 0})" # 保存上一次執行時間的文件 last_run_metadata_path => "/logstash-7.6.2/.logstash_jdbc_last_run" } }
- 實現增量同步主要是兩個字段:
statement:
執行mongodb查詢的字段- 關於
:sql_last_value
:logstash中提供的一個協助查詢的時間參數,默認值是1970-01-01 08:00:00
,數據類型是string
,每次執行命令以後,該值會替換成執行命令時刻的時間。 - 在修改
find
語句時容易由於:sql_last_value
的類型出錯:若是表中關於時間的數據類型是string
,那在find
語句中改成db.dbtest.find({ $gte: :sql_last_value}, {_id: 0})
便可;若若是表中關於時間的數據類型是date
,那在find
語句須要進行類型轉換,即改成·db.dbtest.find({ $gte: ISODate(:sql_last_value)}, {_id: 0})
- 關於
last_run_metadata_path:
保存上一次執行時間的文件,能夠放在任意目錄下,我這裏放在了/logstash-7.6.2
的目錄下面
5. 運行文件
/logstash-7.6.2/bin/logstash -f /logstash-7.6.2/dbtest.conf --path.data=/logstash-7.6.2/data/dbtest
4、可能出現的報錯
1. 沒法識別ObjectId錯誤
-
報錯信息:
Exception when executing JDBC query {:exception=>#<Sequel::DatabaseError: Java::OrgLogstash::MissingConverterException: Missing Converter handling for full class name=org.bson.types.ObjectId, simple name=ObjectId>}
mysql -
錯誤緣由:在
input
部分編寫mongodb查詢語句時須要注意,因爲logstash沒法識別mongodb中的ObjectId
類型,所以咱們須要拋棄該字段,所以在find
語句中咱們設置_id:0
,即表示不須要該字段。git
db.dbtest.find({}, {_id: 0})
【tips】經過mongodb的find
語句咱們還能夠選取只導出文檔中的某一字段,具體操做可參考官方文檔:https://docs.mongodb.com/manual/reference/method/db.collection.find/github
2. 增量同步無效可是沒有報錯信息
- 這一問題的緣由在上面的增量法部分中也有提到過,多是
find
語句中:sql_last_value
或者其餘字段的數據類型不正確,建議檢查一下數據庫中字段類型和find
語句中的查詢條件是否匹配
參考文章
- https://stackoverflow.com/questions/58342818/sync-mongodb-to-elasticsearch
- https://docs.mongodb.com/manual/reference/method/db.collection.find/
- https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html