mysql 做爲成熟穩定的數據持久化解決方案,普遍地應用在各類領域,可是在數據分析方面稍有不足,而 elasticsearch 做爲數據分析領域的佼佼者,恰好能夠彌補這項不足,而咱們要作的只須要將 mysql 中的數據同步到 elasticsearch 中便可,而 logstash 恰好就能夠支持,全部你須要作的只是寫一個配置文件而已html
獲取 logstashjava
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.2.3.zip unzip logstash-6.2.3.zip && cd logstash-6.2.3
安裝 jdbc 和 elasticsearch 插件mysql
bin/logstash-plugin install logstash-input-jdbc bin/logstash-plugin install logstash-output-elasticsearch
獲取 jdbc mysql 驅動sql
wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.46.zip unzip mysql-connector-java-5.1.46.zip
使用 logstash-input-jdbc 插件讀取 mysql 的數據,這個插件的工做原理比較簡單,就是定時執行一個 sql,而後將 sql 執行的結果寫入到流中,增量獲取的方式沒有經過 binlog 方式同步,而是用一個遞增字段做爲條件去查詢,每次都記錄當前查詢的位置,因爲遞增的特性,只須要查詢比當前大的記錄便可獲取這段時間內的所有增量,通常的遞增字段有兩種,AUTO_INCREMENT
的主鍵 id
和 ON UPDATE CURRENT_TIMESTAMP
的 update_time
字段,id
字段只適用於那種只有插入沒有更新的表,update_time
更加通用一些,建議在 mysql 表設計的時候都增長一個 update_time
字段ruby
input { jdbc { jdbc_driver_library => "../mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://<mysql_host>:3306/rta" jdbc_user => "<username>" jdbc_password => "<password>" schedule => "* * * * *" statement => "SELECT * FROM table WHERE update_time >= :sql_last_value" use_column_value => true tracking_column_type => "timestamp" tracking_column => "update_time" last_run_metadata_path => "syncpoint_table" } }
jdbc_driver_library
: jdbc mysql 驅動的路徑,在上一步中已經下載jdbc_driver_class
: 驅動類的名字,mysql 填 com.mysql.jdbc.Driver
就行了jdbc_connection_string
: mysql 地址jdbc_user
: mysql 用戶jdbc_password
: mysql 密碼schedule
: 執行 sql 時機,相似 crontab 的調度statement
: 要執行的 sql,以 ":" 開頭是定義的變量,能夠經過 parameters 來設置變量,這裏的 sql_last_value
是內置的變量,表示上一次 sql 執行中 update_time 的值,這裏 update_time
條件是 >=
由於時間有可能相等,沒有等號可能會漏掉一些增量use_column_value
: 使用遞增列的值tracking_column_type
: 遞增字段的類型,numeric
表示數值類型, timestamp
表示時間戳類型tracking_column
: 遞增字段的名稱,這裏使用 update_time 這一列,這列的類型是 timestamp
last_run_metadata_path
: 同步點文件,這個文件記錄了上次的同步點,重啓時會讀取這個文件,這個文件能夠手動修改output { elasticsearch { hosts => ["172.31.22.165", "172.31.17.241", "172.31.30.84", "172.31.18.178"] user => "<user>" password => "<password>" index => "table" document_id => "%{id}" } }
hosts
: es 集羣地址user
: es 用戶名password
: es 密碼index
: 導入到 es 中的 index 名,這裏我直接設置成了 mysql 表的名字document_id
: 導入到 es 中的文檔 id,這個須要設置成主鍵,不然同一條記錄更新後在 es 中會出現兩條記錄,%{id}
表示引用 mysql 表中 id
字段的值把上面的代碼保存到一個配置文件裏面 sync_table.cfg
,執行下面命令便可elasticsearch
cd logstash-6.2.3 && bin/logstash -f config/sync_table.cfg
若是成功了會在標準輸出輸出執行的 sql 語句ide
[2018-04-14T18:12:00,278][INFO ][logstash.inputs.jdbc ] (0.001011s) SELECT version() [2018-04-14T18:12:00,284][INFO ][logstash.inputs.jdbc ] (0.000723s) SELECT * FROM table WHERE update_time > '2018-04-14 17:55:00'
一個 logstash 實例能夠藉助 pipelines 機制同步多個表,只須要寫多個配置文件就能夠了,假設咱們有兩個表 table1 和 table2,對應兩個配置文件 sync_table1.cfg
和 sync_table2.cfg
ui
在 config/pipelines.yml
中配置插件
- pipeline.id: table1 path.config: "config/sync_table1.cfg" - pipeline.id: table2 path.config: "config/sync_table2.cfg"
直接 bin/logstash
啓動便可設計
@timestamp
字段默認狀況下 @timestamp
字段是 logstash-input-jdbc 添加的字段,默認是當前時間,這個字段在數據分析的時候很是有用,可是有時候咱們但願使用數據中的某些字段來指定這個字段,這個時候可使用 filter.date, 這個插件是專門用來設置 @timestamp
這個字段的
好比我有我但願用字段 timeslice
來表示 @timestamp
,timeslice
是一個字符串,格式爲 %Y%m%d%H%M
filter { date { match => [ "timeslice", "yyyyMMddHHmm" ] timezone => "Asia/Shanghai" } }
把這一段配置加到 sync_table.cfg
中,如今 @timestamp
和 timeslice
一致了
轉載請註明出處
本文連接: http://www.hatlonely.com/2018...