/usr/share/logstash/bin/logstash-plugin install logstash-input-jdbc /usr/share/logstash/bin/logstash-plugin install logstash-output-elasticsearch
wget -P /usr/local/src/ https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.46.zip unzip /usr/local/src/mysql-connector-java-5.1.46.zip -d /usr/local/ # 下載解壓
/etc/logstash/conf.d/sql.indexer.confjava
input { jdbc { jdbc_driver_library => "/usr/local/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://172.16.140.xxx:3306/gatexxx" jdbc_user => "root" jdbc_password => "xxxxx" schedule => "* * * * *" statement => "SELECT * FROM browser_req_log WHERE gmt_create >= :sql_last_value limit 100" use_column_value => true tracking_column_type => "timestamp" tracking_column => "gmt_create" last_run_metadata_path => "syncpoint_table" } } filter { mutate { add_field => {"temp_ts" => "%{gmt_create}"} } #添加數據庫字段做爲 timestamp # timestamp date { match => ["temp_ts","ISO8601"] remove_field => ["temp_ts"] timezone => "Asia/Shanghai" } output { elasticsearch { hosts => ["172.16.140.120", "172.16.140.121", "172.16.140.122"] index => "browser_req-log" document_id => "%{id}" } }
參數說明python
jdbc_driver_library
: jdbc mysql 驅動的路徑,在上一步中已經下載jdbc_driver_class
: 驅動類的名字,mysql 填 com.mysql.jdbc.Driver 就行了jdbc_connection_string
: mysql 地址jdbc_user
: mysql 用戶jdbc_password
: mysql 密碼schedule
: 執行 sql 時機,相似 crontab 的調度statement
: 要執行的 sql,以 ":" 開頭是定義的變量,能夠經過 parameters 來設置變量,這裏的 sql_last_value 是內置的變量,表示上一次 sql 執行中 update_time 的值,這裏 update_time 條件是 >= 由於時間有可能相等,沒有等號可能會漏掉一些增量use_column_value
: 使用遞增列的值tracking_column_type
: 遞增字段的類型,numeric 表示數值類型, timestamp 表示時間戳類型tracking_column
: 遞增字段的名稱,這裏使用 update_time 這一列,這列的類型是 timestamplast_run_metadata_path
: 同步點文件,這個文件記錄了上次的同步點,重啓時會讀取這個文件,這個文件能夠手動修改hosts
: es 集羣地址user
: es 用戶名password
: es 密碼index
: 導入到 es 中的 index 名,這裏我直接設置成了 mysql 表的名字document_id
: 導入到 es 中的文檔 id,這個須要設置成主鍵,不然同一條記錄更新後在 es 中會出現兩條記錄,%{id} 表示引用 mysql 表中 id 字段的值