logstash-input-jdbc同步mysql數據到elasticsearch

前言:

有項目之前使用coreseek來作索引,隨着時間推移,技術上面也要更新換代,該項目後端從php5升級到php7,
而這時候問題來了,coreseek的做者已經再也不更新,官網也關閉了,所以尋求其餘的索引軟件替代,而elasticsearch恰好能夠很好的知足業務php

1、安裝

1.首先須要安裝java,elasticsearch,logstash,能夠參考我另一篇文章:https://segmentfault.com/a/11...java

2.而後logstash-input-jdbc安裝node

cd /opt/logstash
.bin/plugin install logstash-input-jdbc

2、配置

安裝仍是比較容易的,主要是配置這裏有一些坑,這裏須要爲logstash準備配置文件jdbc.conf和jdbc.sqlmysql

jdbc.conf文件內容sql

input {
    stdin {
    }
    jdbc {
      // mysql相關jdbc配置
      jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/your_mysql_database"
      jdbc_user => "mysql_user"
      jdbc_password => "mysql_password"
 
      // jdbc鏈接mysql驅動的文件目錄,可去官網下載:https://dev.mysql.com/downloads/connector/j/
      jdbc_driver_library => "/opt/logstash/conf/mysql-connector-java/mysql-connector-java-5.1.44-bin.jar"
      # the name of the driver class for mysql
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"

      // mysql文件, 也能夠直接寫SQL語句在此處,以下:
      // statement => "SELECT * from Table_test;"
      statement_filepath => "/opt/logstash/conf/jdbc.sql"

      // 這裏相似crontab,能夠定製定時操做,好比每10分鐘執行一次同步(分 時 天 月 年)
      schedule => "*/10 * * * *"
      type => "jdbc"

      // 是否記錄上次執行結果, 若是爲真,將會把上次執行到的 tracking_column 字段的值記錄下來,保存到 last_run_metadata_path 指定的文件中
      record_last_run => "true"

      // 是否須要記錄某個column 的值,若是record_last_run爲真,能夠自定義咱們須要 track 的 column 名稱,此時該參數就要爲 true. 不然默認 track 的是 timestamp 的值.
      use_column_value => "true"

      // 若是 use_column_value 爲真,需配置此參數. track 的數據庫 column 名,該 column 必須是遞增的. 通常是mysql主鍵
      tracking_column => "autoid"

      last_run_metadata_path => "/opt/logstash/conf/last_id"

      // 是否清除 last_run_metadata_path 的記錄,若是爲真那麼每次都至關於從頭開始查詢全部的數據庫記錄
      clean_run => "false"

      //是否將 字段(column) 名稱轉小寫
      lowercase_column_names => "false"
    }
}

// 此處我不作過濾處理,若是須要,也可參考elk安裝那篇
filter {}

output {
    // 輸出到elasticsearch的配置
    elasticsearch {
        hosts => ["127.0.0.1:9200"]
        index => "jdbc"

        // 將"_id"的值設爲mysql的autoid字段
        document_id => "%{autoid}"
        template_overwrite => true
    }

    // 這裏輸出調試,正式運行時能夠註釋掉
    stdout {
        codec => json_lines
    }
}

下面是mysql文件jdbc.sql,注意:sql_last_value,後面會說道數據庫

SELECT
*

FROM
Table_test

WHERE
autoid > :sql_last_value

3、遇到的問題

相信不少人在安裝和實際使用的過程當中有這樣那樣的問題,這裏我記錄了一些本身遇到的json

1.elasticsearch數據重複以及增量同步segmentfault

在默認配置下,tracking_column這個值是@timestamp,存在elasticsearch就是_id值,是logstash存入elasticsearch的時間,這個值的主要做用相似mysql的主鍵,是惟一的,可是咱們的時間戳實際上是一直在變的,因此咱們每次使用select語句查詢的數據都會存入elasticsearch中,致使數據重複。
解決方法:在要查詢的表中,找主鍵或者自增值的字段,將它設置爲_id的值,由於_id值是惟一的,因此,當有重複的_id的時候,數據就不會重複後端

// 是否記錄上次執行結果, 若是爲真,將會把上次執行到的 tracking_column 字段的值記錄下來,保存到 last_run_metadata_path 指定的文件中
  record_last_run => "true"

  // 是否須要記錄某個column 的值,若是record_last_run爲真,能夠自定義咱們須要 track 的 column 名稱,此時該參數就要爲 true. 不然默認 track 的是 timestamp 的值.
  use_column_value => "true"

  // 若是 use_column_value 爲真,需配置此參數. track 的數據庫 column 名,該 column 必須是遞增的. 通常是mysql主鍵
  tracking_column => "autoid"

2.數據同步頻繁,影響mysql數據庫性能安全

咱們寫入jdbc.sql文件的mysql語句是寫死的,因此每次查詢的數據庫有不少是已經不須要去查詢的,尤爲是每次select * from table;的時候,對mysql數據庫形成了很是大的壓力

解決:(1)根據業務需求,能夠適當修改定時同步時間,我這裏對實時性相對要求較高,所以設置了10分鐘

// 這裏相似crontab,能夠定製定時操做,好比每10分鐘執行一次同步(分 時 天 月 年)
  schedule => "*/10 * * * *"

(2)設置mysql查詢範圍,防止大量的查詢拖死數據庫

// 若是 use_column_value 爲真,需配置此參數. track 的數據庫 column 名,該 column 必須是遞增的. 通常是mysql主鍵
  tracking_column => "autoid"

  // 上次執行數據庫的值,該值是上次查詢時tracking_column設置的字段最大值
  last_run_metadata_path => "/opt/logstash/conf/last_id"

  // 是否清除 last_run_metadata_path 的記錄,若是爲真那麼每次都至關於從頭開始查詢全部的數據庫記錄
  clean_run => "false"

在sql語句這裏設置select * from WHERE autoid > :sql_last_value;
注意:若是你的語句比較複雜,autoid > :sql_last_value必定要寫在WHERE後面,而後接AND便可

3.elasticsearch存儲容量不斷上升

稍微觀察下就會發現,即便沒有新的數據寫入到elasticsearch裏面,但只要logstash定時每次運行,elasticsearch容量就不斷上升

clipboard.png

過一段時間看,佔用空間增大,其實elasticsearch數據是同樣的

clipboard.png

緣由:在elasticsearch/nodes/0/indices/jdbc/{0,1,2,3,4}/下有個translog,這個是elasticsearch的事務日誌,相似mysql的binlog。elasticsearch爲了數據安全,接收到數據後,先將數據寫入內存和translog,而後再創建索引寫入到磁盤,這樣即便忽然斷電,重啓後,還能夠經過translog恢復,不過這裏因爲咱們每次查詢都有不少重複的數據,而這些重複的數據又沒有寫入到elasticsearch的索引中,因此就囤積了下來

clipboard.png

解決:查詢官網說會按期refresh,會自動清理掉老的日誌,所以可不作處理

4.增量同步和mysql範圍查詢致使mysql數據庫有修改時沒法同步到之前的數據

增量同步解決了,mysql每次都小範圍查詢,解決了數據庫壓力的問題,不過卻致使沒法同步老數據的修改問題

解決:可根據業務狀態來作,若是你數據庫是修改頻繁類型,那隻能作全量更新了,可是高頻率大範圍掃描數據庫來作的索引還不如不作索引了(由於創建索引也是有成本的),咱們作索引主要是針對一些數據量大,不常修改,很消耗數據庫性能的狀況。我這裏是數據修改較少,並且修改也通常是近期數據,由於同步時,我在mysql範圍上面稍微調整一下

如:autoid > (:sql_last_value-100000),每次掃描上次掃描範圍往以前再多10W行,這樣掃描的數據量相對較小,也照顧到了可能會修改的數據類型

可是範圍掃描還存在一個問題,就是過往的數據寫入了elasticsearch以後,若是有修改,而又不在範圍掃描之內,那麼elasticsearch就不會同步到。所以,咱們還能夠按期作一次全量或者更大範圍的同步,只須要修改範圍值便可。具體的值固然能夠根據業務來定

相關文章
相關標籤/搜索