logstash增量採集mysql的驗證

    最近有一項需求須要把mysql中一個表中的數據同步到es中,分析後使用logstash的jdbc插件獲取mysql中的數據,output到es中,採集的狀況分兩種:開始是全量的採集,以後是增量採集。java

已下式驗證的過程:mysql

1.安裝logstash和mysqlsql

mysql和logstash的安裝就很少說了,能夠在網上自行查找。bash

須要注意的事項是:ide

    1)可能須要安裝:bin/plugin install logstash-input-jdbc測試

    2)mysql的driver下載https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.48.tar.gzspa

2.建立mysql表和數據插件

create table test.zy  (
id int,
str varchar(20)
) ;

insert into test.zy   values('1','a1');
insert into test.zy   values('2','a2');
insert into test.zy   values('3','a3');
insert into test.zy   values('4','a4');
insert into test.zy   values('5','a5');
insert into test.zy   values('6','a6');
insert into test.zy   values('7','a7');
insert into test.zy   values('8','a8');
insert into test.zy   values('9','a9');
insert into test.zy   values('10','a10');
insert into test.zy   values('11','a11');
insert into test.zy   values('12','a12');
insert into test.zy   values('13','a13');
insert into test.zy   values('14','a14');
#增量採集驗證插入的數據
insert into test.zy   values('15','a15');
insert into test.zy   values('16','a16');

3.logstash採集mysql的配置文件3d

#根據字段增量採集
input {
    jdbc {
        jdbc_connection_string => "jdbc:mysql://ip:3306/test"
        jdbc_user => "root"
        jdbc_password => "123456"
        jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        statement => "select * from zy where id > :sql_last_value"
	use_column_value => true
	tracking_column => "id"
	record_last_run => true
	last_run_metadata_path => "/root/test.log"
	schedule => "*/2 * * * *"
     }
}

output {
  file {
   path => "./mysql/test-%{+YYYY-MM-dd}.txt"
  }
}

#根據時間戳增量採集
input {
    jdbc {
        jdbc_connection_string => "jdbc:mysql://ip:port/test"
        jdbc_user => "root"
        jdbc_password => "123456"
        jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_default_timezone =>"Asia/Shanghai"  #設置sql_last_value記錄的時間時區,否則會影響增量採集的效果
        statement => "select * from zy1 where time > :sql_last_value"
        use_column_value => false
        record_last_run => true
        last_run_metadata_path => "/root/test.log"
        schedule => "*/2 * * * *"
     }
}

output {
  file {
   path => "./db2/test-%{+YYYY-MM-dd}.txt"
  }
}

input的配置解析
statement  執行myqsl的語句,也能夠是statementpath 後跟sql文件的路徑
use_column_value 
    是否使用列值做爲依據,進行上次運行位置的記錄。
    若是設置爲true,則使用tracking_column定義的列,做爲:sql_last_value.
    若是設置爲false,則:sql_last_value反映的是上次SQL的運行時間。
tracking_column   增量採集依據的字段名   若是 use_colomn_value爲false時,能夠不寫
record_last_run   是否記錄本次採集數據的位置
last_run_metadata_path   設置記錄採集數據位置的文件
schedule         sql腳本執行的頻率

4.採集驗證
日誌

    啓動logstash

    cd /usr/local/logstash

    bin/logstash -f conf/logtash.conf 

    ##logstash 自動加載配置文件  bin/logstash -f conf/logtash.conf   --config.reload.automatic 

        測試logtash是否正常使用

        bin/logstash  -e 'input { stdin{}}  output { output{}}'

        測試並退出

        bin/logstash  -f conf/logtash.conf   --config.test_and_exit

    logstash採集輸出日誌:

    image.png

   

輸出文件內容查看

image.png

sql_last_value的記錄

image.png

相關文章
相關標籤/搜索