若是你對 使用Logstash保持Elasticsearch與數據庫同步 方案還不是很熟悉,建議先花點時間精讀它。
上面的文章以單表同步場景爲例,清楚講述瞭如何經過JDBC同步數據至ES,而對於實際開發中常常出現的多表關聯同步並未說起,如下是我針對多表關聯同步的趟坑過程但願對你有所幫助。
同步單表時咱們對於表字段的約定:java
多表關聯的狀況下咱們須要JOIN其餘表查詢獲得結果,這個結果就是ES須要的打平後的寬表。ES新的版本中也增長了join操做,但這事不是ES擅長的,咱們選擇交給更擅長的數據庫處理,讓ES只存儲打平後的單層索引。mysql
若是你理解單表同步而困惑多表關聯同步的話,試着將關聯查詢的複雜SQL想象(定義)爲視圖,是否是後續操做就跟單表沒區別了!git
咱們來逐個看下多表關聯的同步問題 (假設表a多對多關聯表b):github
單表的id字段綁定到ES document的_id,能夠實現ES索引冪等性,不會出現job緣由致使索引文檔重複。那對於多表關聯的狀況呢,可使用各表id的組合做爲document的_id。如SELECT:sql
concat(a.id, '_', b.id) AS docid
(若是你不關注冪等,也能夠用_id默認生成策略。)數據庫
單表基於modification_time就能夠識別出自上次輪詢後新的變化數據,多表關聯的狀況呢也相似:segmentfault
(CASE WHEN a.modification_time > b.modification_time THEN a.modification_time ELSE b.modification_time END) AS modification_time
同理軟刪除字段is_deleted的處理邏輯:ruby
(CASE WHEN a.is_deleted=0 AND b.is_deleted=0 THEN 0 ELSE 1 END) AS is_deleted
這樣不管表a仍是表b發生變動,均可以被logstash識別出來採集到。框架
如此咱們就能夠寫出多表關聯同步的SQL了,爲了方便更新維護SQL及保持logstash-jdbc端conf配置文件的簡潔,你能夠把SQL定義成一張視圖,conf文件中的SQL statement能夠像寫單表處理同樣了。elasticsearch
示例conf:
input { jdbc { jdbc_driver_library => "../drivers/mysql-connector-java-8.0.16.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/es_db?serverTimezone=UTC" jdbc_user => "usr" jdbc_password => "pwd" jdbc_paging_enabled => true tracking_column => "unix_ts_in_secs" use_column_value => true tracking_column_type => "numeric" schedule => "*/5 * * * * *" statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM esview WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC" } } filter { mutate { copy => { "docid" => "[@metadata][_id]"} remove_field => ["docid", "@version", "unix_ts_in_secs"] } } output { elasticsearch { index => "test_idx" document_id => "%{[@metadata][_id]}" } }