使用logstash同步MySQL數據到ES

使用logstash同步MySQL數據到ES

概述:

在生成業務常有將MySQL數據同步到ES的需求,若是須要很高的定製化,每每須要開發同步程序用於處理數據。但沒有特殊業務需求,官方提供的logstash就頗有優點了。html

在使用logstash咱們贏先了解其特性,再決定是否使用:java

  • 無需開發,僅需安裝配置logstash便可;
  • 凡是SQL能夠實現的logstash都可以實現(本就是經過sql查詢數據)
  • 支持每次全量同步或按照特定字段(如遞增ID、修改時間)增量同步;
  • 同步頻率可控,最快同步頻率每分鐘一次(若是對實效性要求較高,慎用);
  • 不支持被物理刪除的數據同步物理刪除ES中的數據(可在表設計中增長邏輯刪除字段IsDelete標識數據刪除)。

 

一、安裝

前往官網下載logstash,下載地址https://www.elastic.co/downloads/logstash,zip壓縮包大約160M;mysql

程序目錄:【windows】G:\ELK\logstash-6.5.4;【linux】/tomcat/logstash/logstash-6.5.4。linux

下文統一以【程序目錄】表示不一樣環境的安裝目錄。git

 

二、配置

2.一、新建目錄存放配置文件及mysql依賴包

在【程序目錄】目錄(\bin同級)新建mysql目錄,將下載好的mysql-connector-java-5.1.34.jar放入此目錄;github

在【程序目錄】\mysql目錄新建jdbc.conf文件,此文件將配置數據庫鏈接信息、查詢數據sql、分頁信息、同步頻率等核心信息。sql

 

注意事項請查看註釋信息。數據庫

 

2.二、單表同步配置

  1.  
    input {
  2.  
    stdin {}
  3.  
    jdbc {
  4.  
    type => "jdbc"
  5.  
    # 數據庫鏈接地址
  6.  
    jdbc_connection_string => "jdbc:mysql://192.168.1.1:3306/TestDB?characterEncoding=UTF-8&autoReconnect=true""
  7.  
    # 數據庫鏈接帳號密碼;
  8.  
    jdbc_user => "username"
  9.  
    jdbc_password => "pwd"
  10.  
    # MySQL依賴包路徑;
  11.  
    jdbc_driver_library => "mysql/mysql-connector-java-5.1.34.jar"
  12.  
    # the name of the driver class for mysql
  13.  
    jdbc_driver_class => "com.mysql.jdbc.Driver"
  14.  
    # 數據庫重連嘗試次數
  15.  
    connection_retry_attempts => "3"
  16.  
    # 判斷數據庫鏈接是否可用,默認false不開啓
  17.  
    jdbc_validate_connection => "true"
  18.  
    # 數據庫鏈接可用校驗超時時間,默認3600S
  19.  
    jdbc_validation_timeout => "3600"
  20.  
    # 開啓分頁查詢(默認false不開啓);
  21.  
    jdbc_paging_enabled => "true"
  22.  
    # 單次分頁查詢條數(默認100000,若字段較多且更新頻率較高,建議調低此值);
  23.  
    jdbc_page_size => "500"
  24.  
    # statement爲查詢數據sql,若是sql較複雜,建議配經過statement_filepath配置sql文件的存放路徑;
  25.  
    # sql_last_value爲內置的變量,存放上次查詢結果中最後一條數據tracking_column的值,此處即爲ModifyTime;
  26.  
    # statement_filepath => "mysql/jdbc.sql"
  27.  
    statement => " SELECT KeyId,TradeTime,OrderUserName,ModifyTime FROM `DetailTab` WHERE ModifyTime>= :sql_last_value order by ModifyTime asc"
  28.  
    # 是否將字段名轉換爲小寫,默認true(若是有數據序列化、反序列化需求,建議改成false);
  29.  
    lowercase_column_names => false
  30.  
    # Value can be any of: fatal,error,warn,info,debug,默認info;
  31.  
    sql_log_level => warn
  32.  
    #
  33.  
    # 是否記錄上次執行結果,true表示會將上次執行結果的tracking_column字段的值保存到last_run_metadata_path指定的文件中;
  34.  
    record_last_run => true
  35.  
    # 須要記錄查詢結果某字段的值時,此字段爲true,不然默認tracking_column爲timestamp的值;
  36.  
    use_column_value => true
  37.  
    # 須要記錄的字段,用於增量同步,需是數據庫字段
  38.  
    tracking_column => "ModifyTime"
  39.  
    # Value can be any of: numeric,timestamp,Default value is "numeric"
  40.  
    tracking_column_type => timestamp
  41.  
    # record_last_run上次數據存放位置;
  42.  
    last_run_metadata_path => "mysql/last_id.txt"
  43.  
    # 是否清除last_run_metadata_path的記錄,須要增量同步時此字段必須爲false;
  44.  
    clean_run => false
  45.  
    #
  46.  
    # 同步頻率(分 時 天 月 年),默認每分鐘同步一次;若是是每10分鐘執行一下 */10 便可 https://www.cnblogs.com/superman66/p/4565723.html
  47.  
    schedule => "* * * * *"
  48.  
    }
  49.  
    }
  50.  
     
  51.  
    filter {
  52.  
    json {
  53.  
    source => "message"
  54.  
    remove_field => ["message"]
  55.  
    }
  56.  
    # convert 字段類型轉換,將字段TotalMoney數據類型改成float;
  57.  
    mutate {
  58.  
    convert => {
  59.  
    "TotalMoney" => "float"
  60.  
    }
  61.  
    }
  62.  
    }
  63.  
    output {
  64.  
    elasticsearch {
  65.  
    # host => "192.168.1.1"
  66.  
    # port => "9200"
  67.  
    # 配置ES集羣地址
  68.  
    hosts => ["192.168.1.1:9200", "192.168.1.2:9200", "192.168.1.3:9200"]
  69.  
    # 索引名字,必須小寫
  70.  
    index => "consumption"
  71.  
    # 數據惟一索引(建議使用數據庫KeyID)
  72.  
    document_id => "%{KeyId}"
  73.  
    }
  74.  
    stdout {
  75.  
    codec => json_lines
  76.  
    }
  77.  
    }

2.三、多表同步

多表配置和單表配置的區別在於input模塊的jdbc模塊有幾個type,output模塊就需對應有幾個type;json

  1.  
    input {
  2.  
    stdin {}
  3.  
    jdbc {
  4.  
    # 多表同步時,表類型區分,建議命名爲「庫名_表名」,每一個jdbc模塊需對應一個type;
  5.  
    type => "TestDB_DetailTab"
  6.  
     
  7.  
    # 其餘配置此處省略,參考單表配置
  8.  
    # ...
  9.  
    # ...
  10.  
    # record_last_run上次數據存放位置;
  11.  
    last_run_metadata_path => "mysql\last_id.txt"
  12.  
    # 是否清除last_run_metadata_path的記錄,須要增量同步時此字段必須爲false;
  13.  
    clean_run => false
  14.  
    #
  15.  
    # 同步頻率(分 時 天 月 年),默認每分鐘同步一次;
  16.  
    schedule => "* * * * *"
  17.  
    }
  18.  
    jdbc {
  19.  
    # 多表同步時,表類型區分,建議命名爲「庫名_表名」,每一個jdbc模塊需對應一個type;
  20.  
    type => "TestDB_Tab2"
  21.  
    # 多表同步時,last_run_metadata_path配置的路徑應不一致,避免有影響;
  22.  
    # 其餘配置此處省略
  23.  
    # ...
  24.  
    # ...
  25.  
    }
  26.  
    }
  27.  
     
  28.  
    filter {
  29.  
    json {
  30.  
    source => "message"
  31.  
    remove_field => [ "message"]
  32.  
    }
  33.  
    }
  34.  
     
  35.  
    output {
  36.  
    # output模塊的type需和jdbc模塊的type一致
  37.  
    if [type] == "TestDB_DetailTab" {
  38.  
    elasticsearch {
  39.  
    # host => "192.168.1.1"
  40.  
    # port => "9200"
  41.  
    # 配置ES集羣地址
  42.  
    hosts => [ "192.168.1.1:9200", "192.168.1.2:9200", "192.168.1.3:9200"]
  43.  
    # 索引名字,必須小寫
  44.  
    index => "detailtab1"
  45.  
    # 數據惟一索引(建議使用數據庫KeyID)
  46.  
    document_id => "%{KeyId}"
  47.  
    }
  48.  
    }
  49.  
    if [type] == "TestDB_Tab2" {
  50.  
    elasticsearch {
  51.  
    # host => "192.168.1.1"
  52.  
    # port => "9200"
  53.  
    # 配置ES集羣地址
  54.  
    hosts => [ "192.168.1.1:9200", "192.168.1.2:9200", "192.168.1.3:9200"]
  55.  
    # 索引名字,必須小寫
  56.  
    index => "detailtab2"
  57.  
    # 數據惟一索引(建議使用數據庫KeyID)
  58.  
    document_id => "%{KeyId}"
  59.  
    }
  60.  
    }
  61.  
    stdout {
  62.  
    codec => json_lines
  63.  
    }
  64.  
    }

 

三、啓動運行

在【程序目錄】目錄執行如下命令啓動:windows

  1.  
    【windows】bin\logstash.bat -f mysql\jdbc.conf
  2.  
    【linux】nohup ./bin/logstash -f mysql/jdbc_jx_moretable.conf &

可新建腳本配置好啓動命令,後期直接運行便可。

在【程序目錄】\logs目錄會有運行日誌。

Note:

6.X版本須要jdk8支持,若是默認jdk版本不是jdk8,那麼須要在logstash或logstash.lib.sh的行首位置添加兩個環境變量:

  1.  
    export JAVA_CMD="/usr/tools/jdk1.8.0_162/bin"
  2.  
    export JAVA_HOME="/usr/tools/jdk1.8.0_162/"

開機自啓動:

windows開機自啓:

linux開機自啓:

 

 

四、問題及解決方案

4.一、數據同步後,ES沒有數據

output.elasticsearch模塊的index必須是全小寫;

4.二、增量同步後last_run_metadata_path文件內容不改變

若是lowercase_column_names配置的不是false,那麼tracking_column字段配置的必須是全小寫。

4.三、提示找不到jdbc_driver_library

2032 com.mysql.jdbc.Driver not loaded. Are you sure you've included the correct jdbc driver in :jdbc_driver_library?

檢測配置的地址是否正確,若是是linux環境,注意路徑分隔符是「/」,而不是「\」。

 

4.四、數據丟失

statement配置的sql中,若是比較字段使用的是大於「>」,可能存在數據丟失。

假設當同步完成後last_run_metadata_path存放的時間爲2019-01-30 20:45:30,而這時候新入庫一條數據的更新時間也爲2019-01-30 20:45:30,那麼這條數據將沒法同步。

解決方案:將比較字段使用 大於等於「>=」。

4.五、數據重複更新

上一個問題「數據丟失」提供的解決方案是比較字段使用「大於等於」,但這時又會產生新的問題。

假設當同步完成後last_run_metadata_path存放的時間爲2019-01-30 20:45:30,而數據庫中更新時間最大值也爲2019-01-30 20:45:30,那麼這些數據將重複更新,直到有更新時間更大的數據出現。

當上述特殊數據不少,且長期沒有新的數據更新時,會致使大量的數據重複同步到ES。

什麼時候會出現以上狀況呢:①比較字段非「自增」;②比較字段是程序生成插入。

解決方案:

①比較字段自增保證不重複或重複機率極小(好比使用自增ID或者數據庫的timestamp),這樣就能避免大部分異常狀況了;

②若是確實存在大量程序插入的數據,其更新時間相同,且可能長期無數據更新,可考慮按期更新數據庫中的一條測試數據,避免最大值有大量數據。

4.六、容災

logstash自己沒法集羣,咱們常使用的組合ELK是經過kafka集羣變相實現集羣的。

可供選擇的處理方式:①使用任務程序推送數據到kafaka,由kafka同步數據到ES,但任務程序自己也須要容災,並須要考慮重複推送的問題;②將logstash加入守護程序,並輔以第三方監控其運行狀態。具體如何選擇,須要結合自身的應用場景了。

4.七、海量數據同步

爲何會慢?logstash分頁查詢使用臨時表分頁,每條分頁SQL都是將全集查詢出來看成臨時表,再在臨時表上分頁查詢。這樣致使每次分頁查詢都要對主表進行一次全表掃描。

SELECT * FROM (SELECT * FROM `ImageCN1` WHERE ModifyTime>= '1970-01-01 08:00:00' order by ModifyTime asc) AS `t1` LIMIT 5000 OFFSET 10000000; 

數據量太大,首次同步如何安全過渡同步?

可考慮在statement對應的sql中加上分頁條件,好比ID在什麼範圍,修改時間在什麼區間,將單詞同步的數據總量減小。先少許數據同步測試驗證,再根據測試狀況修改區間條件啓動logstash完成同步。好比將SQL修改成:

SELECT * FROM `ImageCN1` WHERE ModifyTime<'2018-10-10 10:10:10' AND ModifyTime>= '1970-01-01 08:00:00' order by ModifyTime asc

這樣須要每次同步後就修改sql,線上運營比較繁瑣,是否能夠不修改sql,同時保證同步效率呢?SQL咱們能夠再修改下:

SELECT * FROM `ImageCN1` WHERE ModifyTime>= '1970-01-01 08:00:00' order by ModifyTime asc limit 100000

 這樣就能保證每次子查詢的數據量不超過10W條,實際測試發現,數據量很大時效果很明顯。

  1.  
    [SQL] USE XXXDataDB;
  2.  
    受影響的行: 0
  3.  
    時間: 0.001s
  4.  
     
  5.  
    [SQL]
  6.  
    SELECT * FROM (SELECT * FROM `ImageCN1` WHERE ModifyTime>= '1970-01-01 08:00:00' order by ModifyTime asc ) AS `t1` LIMIT 5000 OFFSET 900000;
  7.  
    受影響的行: 0
  8.  
    時間: 7.229s
  9.  
     
  10.  
    [SQL]
  11.  
     
  12.  
    SELECT * FROM (SELECT * FROM `ImageCN1` WHERE ModifyTime>= '2018-07-18 19:35:10' order by ModifyTime asc limit 100000) AS `t1` LIMIT 5000 OFFSET 90000
  13.  
     
  14.  
    受影響的行: 0
  15.  
    時間: 1.778s

 測試能夠看出,SQL不加limit 10W時,越日後分頁查詢越慢,耗時達到8S,而加了limit條件的SQL耗時穩定在2S之內。

 

 

 

  1.  
    歡迎我的轉載,但須在文章頁面明顯位置給出原文鏈接;
  2.  
    未經做者贊成必須保留此段聲明、不得隨意修改原文、不得用於商業用途,不然保留追究法律責任的權利。
  3.  
     
  4.  
    【 CSDN 】:csdn.zxiaofan.com
  5.  
    【GitHub】:github.zxiaofan.com
  6.  
     
  7.  
    若有任何問題,歡迎留言。祝君好運!
  8.  
    Life is all about choices!
  9.  
    未來的你必定會感激如今拼命的本身!
相關文章
相關標籤/搜索