如何經過Logstash同步多表關聯數據至Elasticsearch

若是你對 使用Logstash保持Elasticsearch與數據庫同步 方案還不是很熟悉,建議先花點時間精讀它。
上面的文章以單表同步場景爲例,清楚講述瞭如何經過JDBC同步數據至ES,而對於實際開發中常常出現的多表關聯同步並未說起,如下是我針對多表關聯同步的趟坑過程但願對你有所幫助。

數據庫表的約定原則

同步單表時咱們對於表字段的約定:java

  • 表中要有主鍵字段(如id),最近變動時間字段(如modification_time),軟刪除標記字段(如is_deleted),以便jdbc-input數據採集的輪詢Job能夠識別出增量變更的數據。
  • 提示:jdbc input輪詢須要基於modification_time條件查詢,因此給該字段加上索引。

多表關聯同步方案

多表關聯的狀況下咱們須要JOIN其餘表查詢獲得結果,這個結果就是ES須要的打平後的寬表。ES新的版本中也增長了join操做,但這事不是ES擅長的,咱們選擇交給更擅長的數據庫處理,讓ES只存儲打平後的單層索引。mysql

若是你理解單表同步而困惑多表關聯同步的話,試着將關聯查詢的複雜SQL想象(定義)爲視圖,是否是後續操做就跟單表沒區別了!git

咱們來逐個看下多表關聯的同步問題 (假設表a多對多關聯表b):github

  • 單表的id字段綁定到ES document的_id,能夠實現ES索引冪等性,不會出現job緣由致使索引文檔重複。那對於多表關聯的狀況呢,可使用各表id的組合做爲document的_id。如SELECT:sql

    concat(a.id, '_', b.id) AS docid

    (若是你不關注冪等,也能夠用_id默認生成策略。)數據庫

  • 單表基於modification_time就能夠識別出自上次輪詢後新的變化數據,多表關聯的狀況呢也相似:segmentfault

    (CASE WHEN a.modification_time > b.modification_time THEN a.modification_time ELSE     b.modification_time END) AS modification_time
  • 同理軟刪除字段is_deleted的處理邏輯:ruby

    (CASE WHEN a.is_deleted=0 AND b.is_deleted=0 THEN 0 ELSE 1 END) AS is_deleted

    這樣不管表a仍是表b發生變動,均可以被logstash識別出來採集到。框架

如此咱們就能夠寫出多表關聯同步的SQL了,爲了方便更新維護SQL及保持logstash-jdbc端conf配置文件的簡潔,你能夠把SQL定義成一張視圖,conf文件中的SQL statement能夠像寫單表處理同樣了。elasticsearch

示例conf:

input {
  jdbc {
    jdbc_driver_library => "../drivers/mysql-connector-java-8.0.16.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/es_db?serverTimezone=UTC"
    jdbc_user => "usr"
    jdbc_password => "pwd"
    jdbc_paging_enabled => true
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "*/5 * * * * *"
    statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM esview WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
  }
}
filter {
  mutate {
    copy => { "docid" => "[@metadata][_id]"}
    remove_field => ["docid", "@version", "unix_ts_in_secs"]
  }
}
output {
  elasticsearch {
      index => "test_idx"
      document_id => "%{[@metadata][_id]}"
  }
}

diboot 簡單高效的輕代碼開發框架 (求star)

相關文章
相關標籤/搜索