一步一個腳印實現logstash同步MySQL數據到ES

centos7 mysql: MySQL5.7 安裝配置 logstash:版本logstash-7.1.1下載 ES: 安裝配置 logstash-input-jdbc 方式,該方式只支持新增和更新數據,不支持刪除操做java

準備

安裝 logstash-input-jdbcmysql

[root@localhost logstash-7.1.1]# bin/logstash-plugin install logstash-input-jdbc
Validating logstash-input-jdbc
Installing logstash-input-jdbc
Installation successful
複製代碼

下載mysql-connector-java-5.1.46.zip 並解壓 wget cdn.mysql.com//Downloads/… unzip mysql-connector-java-5.1.46.zipsql

[root@localhost logstash-7.1.1]# wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.46.zip
--2019-05-30 19:48:30--  https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.46.zip
Resolving cdn.mysql.com (cdn.mysql.com)... 104.127.195.16
Connecting to cdn.mysql.com (cdn.mysql.com)|104.127.195.16|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4792694 (4.6M) [application/zip]
Saving to: ‘mysql-connector-java-5.1.46.zip’
100%[================================================================================================================================================>] 4,792,694    963KB/s   in 5.2s   

2019-05-30 19:48:37 (902 KB/s) - ‘mysql-connector-java-5.1.46.zip’ saved [4792694/4792694]
[root@localhost logstash-7.1.1]# unzip mysql-connector-java-5.1.46.zip 
[root@localhost logstash-7.1.1]# ls
bin     CONTRIBUTORS  Gemfile       lib          logstash-core             modules                      mysql-connector-java-5.1.46.zip  tools   x-pack
config  data          Gemfile.lock  LICENSE.txt  logstash-core-plugin-api  mysql-connector-java-5.1.46  NOTICE.TXT                       vendor
複製代碼

在數據庫中準備數據表(測試表table1)數據庫

INSERT INTO `test_datax_1`.`table1`(`id`, `name`) VALUES (1, '測試123');
INSERT INTO `test_datax_1`.`table1`(`id`, `name`) VALUES (2, '測試234');
INSERT INTO `test_datax_1`.`table1`(`id`, `name`) VALUES (3, '測試345');
INSERT INTO `test_datax_1`.`table1`(`id`, `name`) VALUES (4, '測試4');

複製代碼

在logstash/config 目錄中建立jdbc.sql 後面會用json

內容vim

select * from table1
複製代碼

配置config

  • 進入logstash/config 複製一份logstash-sample.conf
cp logstash-sample.conf mylogstash.conf
複製代碼
  • 編輯
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
  jdbc{
    # mysql 數據庫連接
    # jdbc_connection_string => "jdbc:mysql:192.168.177.128:3306/test_datax_1?characterEncoding=utf8"
    jdbc_connection_string => "jdbc:mysql://192.168.177.128:3306/test_datax_1?useUnicode=true&characterEncoding=utf-8&useSSL=false"
    # 用戶名和密碼
    jdbc_user => "root"
    jdbc_password => "root123132*"
    #驅動
    jdbc_driver_library => "/home/elastic/logstash-7.1.1/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46.jar"
    # 驅動類名
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_paging_enabled => "true"
    jdbc_page_size => "50000"
    jdbc_default_timezone =>"Asia/Shanghai"
    # mysql文件, 也能夠直接寫SQL語句在此處,以下:
    # statement => "select * from t_order where update_time >= :sql_last_value;"
    statement_filepath => "./config/jdbc.sql"
    # 這裏相似crontab,能夠定製定時操做,好比每分鐘執行一次同步(分 時 天 月 年)
    schedule => "* * * * *"
    type => "jdbc"
    # 是否記錄上次執行結果, 若是爲真,將會把上次執行到的 tracking_column 字段的值記錄下來,保存到 last_run_metadata_path 指定的文件中
    #record_last_run => true
    # 是否須要記錄某個column 的值,若是record_last_run爲真,能夠自定義咱們須要 track 的 column 名稱,此時該參數就要爲 true. 不然默認 track 的是 timestamp 的值.
    # use_column_value => true
    # 若是 use_column_value 爲真,需配置此參數. track 的數據庫 column 名,該 column 必須是遞增的. 通常是mysql主鍵
    # tracking_column => "update_time"
    # tracking_column_type => "timestamp"
    # last_run_metadata_path => "./logstash_capital_bill_last_id"
    # 是否清除 last_run_metadata_path 的記錄,若是爲真那麼每次都至關於從頭開始查詢全部的數據庫記錄
    # clean_run => false
    # 是否將 字段(column) 名稱轉小寫
    lowercase_column_names => false
  }
}

output {
 # elasticsearch {
 # hosts => ["http://localhost:9200"]
 # index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
 # #user => "elastic"
 # #password => "changeme"
 # } 
 #
 stdout {
     codec => json_lines
     }   
}
複製代碼
  • 執行 (此處只打印)
[root@localhost logstash-7.1.1]# bin/logstash -f config/mylogstash.conf 
複製代碼

成功後返回結果:centos

{"@timestamp":"2019-05-31T04:02:00.280Z","id":1,"name":"測試123","@version":"1","type":"jdbc"}
{"@timestamp":"2019-05-31T04:02:00.281Z","id":2,"name":"測試234","@version":"1","type":"jdbc"}
{"@timestamp":"2019-05-31T04:02:00.281Z","id":3,"name":"測試345","@version":"1","type":"jdbc"}
{"@timestamp":"2019-05-31T04:02:00.281Z","id":4,"name":"測試4","@version":"1","type":"jdbc"}
[2019-05-30T21:03:00,188][INFO ][logstash.inputs.jdbc     ] (0.006211s) SELECT version()
[2019-05-30T21:03:00,201][INFO ][logstash.inputs.jdbc     ] (0.010732s) SELECT version()
[2019-05-30T21:03:00,214][INFO ][logstash.inputs.jdbc     ] (0.008067s) SELECT count(*) AS `count` FROM (select * from table1
) AS `t1` LIMIT 1
[2019-05-30T21:03:00,232][INFO ][logstash.inputs.jdbc     ] (0.006079s) SELECT * FROM (select * from table1
) AS `t1` LIMIT 50000 OFFSET 0
{"@timestamp":"2019-05-31T04:03:00.233Z","id":1,"name":"測試123","@version":"1","type":"jdbc"}
{"@timestamp":"2019-05-31T04:03:00.234Z","id":2,"name":"測試234","@version":"1","type":"jdbc"}
{"@timestamp":"2019-05-31T04:03:00.234Z","id":3,"name":"測試345","@version":"1","type":"jdbc"}
{"@timestamp":"2019-05-31T04:03:00.234Z","id":4,"name":"測試4","@version":"1","type":"jdbc"}
^C[2019-05-30T21:03:45,379][ERROR][org.logstash.Logstash    ] org.jruby.exceptions.ThreadKill
複製代碼

添加更新依賴字段,好比新增的時候但願根據數據庫數據變化來更新ES中的對應的數據,通常設置update_time做爲更新字段,此時也能夠直接配置ES的輸入

  • 修改配置
input {
  jdbc{
    # mysql 數據庫連接
    # jdbc_connection_string => "jdbc:mysql:192.168.177.128:3306/test_datax_1?characterEncoding=utf8"
    jdbc_connection_string => "jdbc:mysql://192.168.177.128:3306/test_datax_1?useUnicode=true&characterEncoding=utf-8&useSSL=false"
    # 用戶名和密碼
    jdbc_user => "root"
    jdbc_password => "root123132*"
    #驅動
    jdbc_driver_library => "/home/elastic/logstash-7.1.1/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46.jar"
    # 驅動類名
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_paging_enabled => "true"
    jdbc_page_size => "50000"
    jdbc_default_timezone =>"Asia/Shanghai"
    # mysql文件, 也能夠直接寫SQL語句在此處,以下:
    # statement => "select * from t_order where update_time >= :sql_last_value;"
    statement_filepath => "./config/jdbc.sql"
    # 這裏相似crontab,能夠定製定時操做,好比每分鐘執行一次同步(分 時 天 月 年)
    schedule => "* * * * *"
    type => "jdbc"
    # 是否記錄上次執行結果, 若是爲真,將會把上次執行到的 tracking_column 字段的值記錄下來,保存到 last_run_metadata_path 指定的文件中
    record_last_run => true
    # 是否須要記錄某個column 的值,若是record_last_run爲真,能夠自定義咱們須要 track 的 column 名稱,此時該參數就要爲 true. 不然默認 track 的是 timestamp 的值.
     use_column_value => true
    # 若是 use_column_value 爲真,需配置此參數. track 的數據庫 column 名,該 column 必須是遞增的. 通常是mysql主鍵
    tracking_column => "update_time"
    tracking_column_type => "timestamp"
    last_run_metadata_path => "./logstash_capital_bill_last_id"
    # 是否清除 last_run_metadata_path 的記錄,若是爲真那麼每次都至關於從頭開始查詢全部的數據庫記錄
    # clean_run => false
    # 是否將 字段(column) 名稱轉小寫
    lowercase_column_names => false
  }
}

output {
  elasticsearch {
    hosts => "http://192.168.177.128:9200"
    #index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
    index => "tabel_test"
    # 將表Id做爲ES的主鍵,防止數據重複,這樣會影響效率,雙刃劍
    document_id => "%{id}"
    user => "elastic"
    password => "123456"
  }
 # 打印信息,生產環境不須要
 stdout {
     codec => json_lines
     }   
}
複製代碼
  • 修改vim config/jdbc.sql
select * from table1 WHERE update_time > :sql_last_value
複製代碼

修改數據庫表table1,新增update_time字段api

  • 運行
[root@localhost logstash-7.1.1]# bin/logstash -f config/mylogstash.conf
複製代碼

修改時間update_time,或者新增數據,能夠看到後臺會打印下信息ruby

  • 在ES查詢,此處使用kibana 查詢:GET /tabel_test/_search

返回bash

{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 8,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "tabel_test",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : 1.0,
        "_source" : {
          "update_time" : "2019-05-31T14:00:00.000Z",
          "@version" : "1",
          "@timestamp" : "2019-05-31T07:32:00.891Z",
          "name" : "修改測試1",
          "id" : 1,
          "type" : "jdbc"
        }
      },
.....
複製代碼

到此結束

source

www.yrclubs.com/details?id=…

reference

Elasticsearch - Logstash實現mysql同步數據到elasticsearch

相關文章
相關標籤/搜索