input { jdbc { # 驅動jar包的位置 jdbc_driver_library => "/usr/local/logstash/jdbc-driver-library/mysql-connector-java-8.0.19.jar" # 驅動類名 jdbc_driver_class => "com.mysql.cj.jdbc.Driver" # 數據庫鏈接 jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/base_db?characterEncoding=utf8&autoReconnect=true&serverTimezone=Asia/Shanghai&zeroDateTimeBehavior=convertToNull" # mysql用戶名 jdbc_user => "root" # mysql密碼 jdbc_password => "123456" # 數據庫重連嘗試次數 connection_retry_attempts => "3" # 開啓分頁查詢(默認false不開啓) jdbc_paging_enabled => true # 單次分頁查詢條數(默認100000,若字段較多且更新頻率較高,建議調低此值) jdbc_page_size => "2" # 若是sql較複雜,建議配經過statement_filepath配置sql文件的存放路徑; statement_filepath => "/usr/local/logstash/sql/t_sys_loginperson.sql" # 須要記錄查詢結果某字段的值時,此字段爲true use_column_value => true # 須要記錄的字段,用於增量同步 tracking_column => "last_modify_time" # 字段類型 tracking_column_type => "timestamp" # 記錄上一次運行記錄 record_last_run => true # 上一次運行記錄值的存放文件路徑 last_run_metadata_path => "/usr/local/logstash/last-run-metadata/t_sys_loginperson.txt" # 是否清除last_run_metadata_path的記錄,須要增量同步時此字段必須爲false; clean_run => false # 設置時區 若是設置會將 last_modify_time 增長8小時 #jdbc_default_timezone => "Asia/Shanghai" # 同步頻率(分 時 天 月 年),默認每分鐘同步一次; schedule => "* * * * *" } } filter { json { source => "message" remove_field => ["message","@version"] } } output { elasticsearch { hosts => "127.0.0.1:9200" index => "%{table_name}" document_id => "%{id}" } }
input { jdbc { # 驅動jar包的位置 jdbc_driver_library => "/usr/local/logstash/jdbc-driver-library/mysql-connector-java-8.0.19.jar" # 驅動類名 jdbc_driver_class => "com.mysql.cj.jdbc.Driver" # 數據庫鏈接 jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/base_db?characterEncoding=utf8&autoReconnect=true&serverTimezone=Asia/Shanghai&zeroDateTimeBehavior=convertToNull" # mysql用戶名 jdbc_user => "root" # mysql密碼 jdbc_password => "123456" # 數據庫重連嘗試次數 connection_retry_attempts => "3" # 開啓分頁查詢(默認false不開啓) jdbc_paging_enabled => true # 單次分頁查詢條數(默認100000,若字段較多且更新頻率較高,建議調低此值) jdbc_page_size => "2" # 若是sql較複雜,建議配經過statement_filepath配置sql文件的存放路徑; statement_filepath => "/usr/local/logstash/sql/t_sys_loginperson.sql" # 須要記錄查詢結果某字段的值時,此字段爲true use_column_value => true # 須要記錄的字段,用於增量同步 tracking_column => "last_modify_time" # 字段類型 tracking_column_type => "timestamp" # 記錄上一次運行記錄 record_last_run => true # 上一次運行記錄值的存放文件路徑 last_run_metadata_path => "/usr/local/logstash/last-run-metadata/t_sys_loginperson.txt" # 是否清除last_run_metadata_path的記錄,須要增量同步時此字段必須爲false clean_run => false # 設置時區 若是設置會將 last_modify_time 增長8小時 #jdbc_default_timezone => "Asia/Shanghai" # 同步頻率(分 時 天 月 年),默認每分鐘同步一次; schedule => "* * * * *" } } filter { mutate { # 刪除字段 remove_field => ["@timestamp","@version"] } } output { kafka { bootstrap_servers => "10.10.6.202:9092" topic_id => "base-db" codec => "json" client_id => "kafkaOutPut" } }
input {
kafka {
bootstrap_servers => "10.10.6.202:9092"
client_id => "kafkaInPut"
# 從最新一條開始讀取
auto_offset_reset => "latest"
# 消費線程,通常是這個隊列的partitions數
consumer_threads => 3
decorate_events => true
# 隊列名稱
topics => ["base-db"]
codec => "json"
}
}
filter {
mutate {
# 刪除字段
remove_field => ["@timestamp","@version"]
}
}
output {
elasticsearch {
hosts => "10.10.6.202:9200"
index => "%{table_name}"
document_id => "%{id}"
}
}
select id,person_name,date_format(create_time, '%Y-%m-%d %H:%i:%s') as create_time,date_format(last_modify_time, '%Y-%m-%d %H:%i:%s') as last_modify_time,'t_sys_loginperson' as table_name from t_sys_loginperson where last_modify_time>:sql_last_value
#帶時分秒 date_format(create_time, '%Y-%m-%d %H:%i:%s') #不帶時分秒 date_format(create_time, '%Y-%m-%d')
#單配置文件啓動(後臺啓動) nohup /usr/local/logstash/bin/logstash -f /usr/local/logstash/config/kafka2elasticsearch.conf > /dev/null 2>&1 &
配置pipelines.ymljava
vi /usr/local/logstash/config/pipelines.yml
- pipeline.id: mysql pipeline.workers: 1 pipeline.batch.size: 100 path.config: "/usr/local/logstash/customize-config/mysql2kafka.conf" - pipeline.id: es queue.type: persisted path.config: "/usr/local/logstash/customize-config/kafka2elasticsearch.conf"
啓動mysql
#無需指定配置文件,默認走pipelines.yml的配置,若是使用-e或-f時,Logstash會忽略pipelines.yml文件 nohup /usr/local/logstash/bin/logstash > /dev/null 2>&1 &