logstash同步mysql到elasticsearch

logstash 獲取

wget https://artifacts.elastic.co/downloads/logstash/logstash-6.5.2.zip
unzip logstash-6.5.2.zip && cd logstash-6.5.2

安裝 jdbc 和 elasticsearch 插件

bin/logstash-plugin install logstash-input-jdbc
bin/logstash-plugin install logstash-output-elasticsearch

獲取 jdbc mysql 驅動

wget http://central.maven.org/maven2/mysql/mysql-connector-java/5.1.47/mysql-connector-java-5.1.47.jar
unzip mysql-connector-java-5.1.47.zip

編寫配置文件

logstash-input-jdbc

使用 logstash-input-jdbc 插件讀取 mysql 的數據,這個插件的工做原理比較簡單,就是定時執行一個 sql,而後將 sql 執行的結果寫入到流中,增量獲取的方式沒有經過 binlog 方式同步,而是用一個遞增字段做爲條件去查詢,每次都記錄當前查詢的位置,因爲遞增的特性,只須要查詢比當前大的記錄便可獲取這段時間內的所有增量,通常的遞增字段有兩種,AUTO_INCREMENT 的主鍵 id 和 ON UPDATE CURRENT_TIMESTAMP 的 update_time 字段,id 字段只適用於那種只有插入沒有更新的表,update_time 更加通用一些,建議在 mysql 表設計的時候都增長一個 update_time 字段java

  • jdbc_driver_library: jdbc mysql 驅動的路徑,在上一步中已經下載
  • jdbc_driver_class: 驅動類的名字,mysql 填 com.mysql.jdbc.Driver 就行了
  • jdbc_connection_string: mysql 地址
  • jdbc_user: mysql 用戶
  • jdbc_password: mysql 密碼
  • schedule: 執行 sql 時機,相似 crontab 的調度
  • statement: 要執行的 sql,以 ":" 開頭是定義的變量,能夠經過 parameters 來設置變量,這裏的 sql_last_value是內置的變量,表示上一次 sql 執行中 update_time 的值,這裏 update_time 條件是 >= 由於時間有可能相等,沒有等號可能會漏掉一些增量
  • use_column_value: 使用遞增列的值
  • tracking_column_type: 遞增字段的類型,numeric 表示數值類型, timestamp 表示時間戳類型
  • tracking_column: 遞增字段的名稱,這裏使用 update_time 這一列,這列的類型是 timestamp
  • last_run_metadata_path: 同步點文件,這個文件記錄了上次的同步點,重啓時會讀取這個文件,這個文件能夠手動修改
input {
  jdbc {
    jdbc_driver_library => "../mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://<mysql_host>:3306/rta"
    jdbc_user => "<username>"
    jdbc_password => "<password>"
    schedule => "* * * * *"
    statement => "SELECT * FROM table WHERE update_time >= :sql_last_value"
    use_column_value => true
    tracking_column_type => "timestamp"
    tracking_column => "update_time"
    last_run_metadata_path => "syncpoint_table"
  }
}

logstash-output-elasticsearch

  • hosts: es 集羣地址
  • user: es 用戶名
  • password: es 密碼
  • index: 導入到 es 中的 index 名,這裏我直接設置成了 mysql 表的名字
  • document_id: 導入到 es 中的文檔 id,這個須要設置成主鍵,不然同一條記錄更新後在 es 中會出現兩條記錄,%{id} 表示引用 mysql 表中 id 字段的值
output {
  elasticsearch {
    hosts => ["172.31.22.165", "172.31.17.241", "172.31.30.84", "172.31.18.178"]
    user => "<user>"
    password => "<password>"
    index => "table"
    document_id => "%{id}"
  }
}

運行

把上面的代碼保存到一個配置文件裏面 sync_table.cfg,執行下面命令便可mysql

bin/logstash -f config/sync_table.cfg
#logstash的配置文件須要本身建立,一下是我本身的配置文件
cat  mysql_conf/mysql.conf

input {
     stdin {
    }
    jdbc {
      #數據庫地址
      jdbc_connection_string => "jdbc:mysql://localhost:3601/user"
      jdbc_user => "admin"
      jdbc_password => "passwd" 
      #數據庫驅動路徑
      jdbc_driver_library => "/soft/logstash-5.6.9/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar"
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      #sql路徑,也能夠是直接指定sql語句,字段須要換成 statement =>
      statement_filepath => "/soft/logstash-5.6.9/pay_test.sql"
      #是否開啓記錄追蹤
      record_last_run => "true"
      #是否須要追蹤字段,若是爲true,則須要指定tracking_column,默認是timestamp
      use_column_value => "true"
      #指定追蹤的字段,這裏須要注意的是,建議選擇主鍵字段,若是選擇日期須要爲實時表
      tracking_column => "id"
      #追蹤字段的類型,目前只有數字和時間類型,默認是數字類型
      #tracking_column_type => "number"
      #設置時區
      jdbc_default_timezone =>"Asia/Shanghai"
      #是否每次清除last_run_metadata_path的內容
      clean_run => "false"
      #這裏能夠手動設置:sql_last_value的值,默認時間是1970-01-01,默認數字是0
      last_run_metadata_path => "/soft/logstash-5.6.9/logstash_jdbc_last_run"  
      #多久同步一次
      schedule => "*/5 * * * *"   
      #是否分頁
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
    }
}

filter {
    #jdbc默認json,暫時沒找到修改方法
    json {
        source => "message"
        remove_field => ["message"]
    }
    mutate {  #須要移除的字段
        remove_field => "@timestamp"
        remove_field => "type"
        remove_field => "@version"

    }
}

output {
        elasticsearch {
            hosts => "localhost:9200" #elasticsearch地址
            index => "user" #elasticsearch索引
            document_id => "%{id}" #elasticsearch的id,該值須要惟一,若是不惟一就不要加這個字段,默認生成
            document_type => "log" #elasticsearch的type
        }
}
相關文章
相關標籤/搜索