最近有一項需求須要把mysql中一個表中的數據同步到es中,分析後使用logstash的jdbc插件獲取mysql中的數據,output到es中,採集的狀況分兩種:開始是全量的採集,以後是增量採集。java
已下式驗證的過程:mysql
1.安裝logstash和mysqlsql
mysql和logstash的安裝就很少說了,能夠在網上自行查找。bash
須要注意的事項是:ide
1)可能須要安裝:bin/plugin install logstash-input-jdbc測試
2)mysql的driver下載https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.48.tar.gzspa
2.建立mysql表和數據插件
create table test.zy ( id int, str varchar(20) ) ; insert into test.zy values('1','a1'); insert into test.zy values('2','a2'); insert into test.zy values('3','a3'); insert into test.zy values('4','a4'); insert into test.zy values('5','a5'); insert into test.zy values('6','a6'); insert into test.zy values('7','a7'); insert into test.zy values('8','a8'); insert into test.zy values('9','a9'); insert into test.zy values('10','a10'); insert into test.zy values('11','a11'); insert into test.zy values('12','a12'); insert into test.zy values('13','a13'); insert into test.zy values('14','a14'); #增量採集驗證插入的數據 insert into test.zy values('15','a15'); insert into test.zy values('16','a16');
3.logstash採集mysql的配置文件3d
#根據字段增量採集 input { jdbc { jdbc_connection_string => "jdbc:mysql://ip:3306/test" jdbc_user => "root" jdbc_password => "123456" jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" statement => "select * from zy where id > :sql_last_value" use_column_value => true tracking_column => "id" record_last_run => true last_run_metadata_path => "/root/test.log" schedule => "*/2 * * * *" } } output { file { path => "./mysql/test-%{+YYYY-MM-dd}.txt" } } #根據時間戳增量採集 input { jdbc { jdbc_connection_string => "jdbc:mysql://ip:port/test" jdbc_user => "root" jdbc_password => "123456" jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_default_timezone =>"Asia/Shanghai" #設置sql_last_value記錄的時間時區,否則會影響增量採集的效果 statement => "select * from zy1 where time > :sql_last_value" use_column_value => false record_last_run => true last_run_metadata_path => "/root/test.log" schedule => "*/2 * * * *" } } output { file { path => "./db2/test-%{+YYYY-MM-dd}.txt" } } input的配置解析 statement 執行myqsl的語句,也能夠是statementpath 後跟sql文件的路徑 use_column_value 是否使用列值做爲依據,進行上次運行位置的記錄。 若是設置爲true,則使用tracking_column定義的列,做爲:sql_last_value. 若是設置爲false,則:sql_last_value反映的是上次SQL的運行時間。 tracking_column 增量採集依據的字段名 若是 use_colomn_value爲false時,能夠不寫 record_last_run 是否記錄本次採集數據的位置 last_run_metadata_path 設置記錄採集數據位置的文件 schedule sql腳本執行的頻率
4.採集驗證
日誌
啓動logstash
cd /usr/local/logstash
bin/logstash -f conf/logtash.conf
##logstash 自動加載配置文件 bin/logstash -f conf/logtash.conf --config.reload.automatic
測試logtash是否正常使用
bin/logstash -e 'input { stdin{}} output { output{}}'
測試並退出
bin/logstash -f conf/logtash.conf --config.test_and_exit
logstash採集輸出日誌:
輸出文件內容查看
sql_last_value的記錄