實際業務場景中,會遇到基礎數據存在Mysql中,實時寫入數據量比較大的情景。遷移至kafka是一種比較好的業務選型方案。java
而mysql寫入kafka的選型方案有:
方案一:logstash_output_kafka 插件。
方案二:kafka_connector。
方案三:debezium 插件。
方案四:flume。
方案五:其餘相似方案。
其中:debezium和flume是基於mysql binlog實現的。mysql
若是須要同步歷史全量數據+實時更新數據,建議使用logstash。redis
經常使用的logstash的插件是:logstash_input_jdbc實現關係型數據庫到Elasticsearch等的同步。sql
實際上,核心logstash的同步原理的掌握,有助於你們理解相似的各類庫之間的同步。數據庫
logstash核心原理:輸入生成事件,過濾器修改它們,輸出將它們發送到其餘地方。json
logstash核心三部分組成:input、filter、output。bootstrap
input { } filter { } output { }
包含但遠不限於:ruby
過濾器是Logstash管道中的中間處理設備。您能夠將過濾器與條件組合,以便在事件知足特定條件時對其執行操做。服務器
能夠把它比做數據處理的ETL環節。oracle
一些有用的過濾包括:
輸出是Logstash管道的最後階段。一些經常使用的輸出包括:
elasticsearch:將事件數據發送到Elasticsearch。
file:將事件數據寫入磁盤上的文件。
kafka:將事件寫入Kafka。
詳細的filter demo參考:http://t.cn/EaAt4zP
input { jdbc { jdbc_connection_string => "jdbc:mysql://192.168.1.12:3306/news_base" jdbc_user => "root" jdbc_password => "xxxxxxx" jdbc_driver_library => "/home/logstash-6.4.0/lib/mysql-connector-java-5.1.47.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" #schedule => "* * * * *" statement => "SELECT * from news_info WHERE id > :sql_last_value order by id" use_column_value => true tracking_column => "id" tracking_column_type => "numeric" record_last_run => true last_run_metadata_path => "/home/logstash-6.4.0/sync_data/news_last_run" } } filter { ruby{ code => "event.set('gather_time_unix',event.get('gather_time').to_i*1000)" } ruby{ code => "event.set('publish_time_unix',event.get('publish_time').to_i*1000)" } mutate { remove_field => [ "@version" ] remove_field => [ "@timestamp" ] remove_field => [ "gather_time" ] remove_field => [ "publish_time" ] } } output { kafka { bootstrap_servers => "192.168.1.13:9092" codec => json_lines topic_id => "mytopic" } file { codec => json_lines path => "/tmp/output_a.log" } }
以上內容不復雜,不作細講。
注意:
Mysql藉助logstash同步後,日期類型格式:「2019-04-20 13:55:53」已經被識別爲日期格式。
code =>
"event.set('gather_time_unix',event.get('gather_time').to_i*1000)",
是將Mysql中的時間格式轉化爲時間戳格式。
from星友:使用logstash同步mysql數據的,由於在jdbc.conf裏面沒有添加 lowercase_column_names
=> "false" 這個屬性,因此logstash默認把查詢結果的列明改成了小寫,同步進了es,因此就致使es裏面看到的字段名稱全是小寫。
最後總結:es是支持大寫字段名稱的,問題出在logstash沒用好,須要在同步配置中加上 lowercase_column_names => "false" 。記錄下來但願能夠幫到更多人。
想將關係數據庫的數據同步至ES中,若是在集羣的多臺服務器上同時啓動logstash。
解讀:實際項目中就是沒用隨機id 使用指定id做爲es的_id ,指定id能夠是url的md5.這樣相同數據就會走更新覆蓋之前數據
解讀:高版本基於時間增量有優化。
tracking_column_type => "timestamp"應該是須要指定標識爲時間類型,默認爲數字類型numeric
解讀:能夠logstash同步mysql的時候sql查詢階段處理,如:select a_value as avalue***。
或者filter階段處理,mutate rename處理。
mutate { rename => ["shortHostname", "hostname" ] }
或者kafka階段藉助kafka stream處理。
推薦閱讀:
一、實戰 | canal 實現Mysql到Elasticsearch實時增量同步
二、乾貨 | Debezium實現Mysql到Elasticsearch高效實時同步
三、一張圖理清楚關係型數據庫與Elasticsearch同步 http://t.cn/EaAceD3
四、新的實現:http://t.cn/EaAt60O
五、mysql2mysql: http://t.cn/EaAtK7r
六、推薦開源實現:http://t.cn/EaAtjqN加入星球,更短期更快習得更多幹貨!