前段時間寫了MySql實時數據變動事件捕獲kafka confluent之debezium,使用的是confluent整套的,接下來這篇將會介紹完整實戰。html
首先明確需求,公司訂單數據愈來愈大,商戶端和E端各類業務需求也愈來愈多查詢愈加複雜,咱們想引進elasticsearch來實現查詢和搜索。那麼問題來了,實時更新的訂單數據如何同步到es中,業務代碼中insert或者update es中的index這確定是不可取的,咱們選擇使用kafka和debezium結合使用,讀取MySQLbinlog及時寫入es.java
本文將會實現一套完整的Debezium結合Kafka Connect實時捕獲MySQL變動事件寫入Elasticsearch並實現查詢的流程.mysql
MySQL的安裝比較簡單,同時須要MySQL開啓binlog,爲了簡單我這裏使用docker啓動一個MySQL而且裏面已建立有數據。git
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.8
複製代碼
local安裝 若是本地沒有docker環境或者MySQL環境能夠參考mysql 5.7 主從同步配置(windows)和MySQL 5.7.18 數據庫主從(Master/Slave)同步安裝與配置詳解配置。github
這裏給出同上docker的數據庫和表結構,點擊msyql table inventory ddl下載。spring
kafka的啓動依賴於zookeeper,因此這裏先安裝而且啓動zookeeper.sql
downloaddocker
首先須要配置conf/zoo.cfg
,能夠直接複製一份conf/zoo_sample.cfg
使用.切換到安裝目錄下bin/zkServer.sh start
啓動zookeeper.apache
MySQL Connector plugin archive download
下載好了的kafka文件目錄裏面其實默認已經包含了幾個connect,這裏咱們須要使用的是debezium
這個插件,因此須要把下載後的debezium安裝到connect中,安裝方法也比較簡單,把解壓後的MySQL Connector plugin archive
裏面的內容所有copy到kafka安裝目錄libs
目錄下便可.
啓動kafka Quickstart
在安裝目錄下執行bin/kafka-server-start.sh config/server.properties
啓動kafka connect Running Kafka Connect
在安裝目錄下執行./bin/connect-distributed.sh config/connect-distributed.properties
bin/elasticsearch
截止目前已經有了本地的MySQL
,kafka
,kafka connect
,elasticearch
,接下來配置kafka connect,經過配置好connect可以讓debezium讀取到binlog把MySQL的數據change事件寫入到kafka的topic中.
kafka connect爲咱們提供了restful的訪問方式,詳細文檔查看Kafka Connect REST Interface.
put http://localhost:8083/connectors/order-center-connector/config
{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "root",
"database.password": "debezium",
"database.server.id": "1",
"database.server.name": "trade_order_0",
"database.whitelist": "inventory",
"include.schema.changes": "false",
"snapshot.mode": "schema_only",
"snapshot.locking.mode": "none",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "dbhistory.tradeOrder1",
"decimal.handling.mode": "string",
"database.history.store.only.monitored.tables.ddl":"true",
"database.history.skip.unparseable.ddl":"true"
}
複製代碼
備註: http://localhost:8083/connectors/order-center-connector/config
這個接口不但能更新connector還能建立,若是connector不存在的時候使它就會建立一個connector若是存在就去更新.
debezium提供了諸多配置參數,上圖例子中只是提供了經常使用的配置,詳細配置參考Debezium Connector for MySQL .
connector建立成功以後,能夠經過http://localhost:8083/connectors/查看已經建立個的connector.
同時你還能夠經過http://localhost:8083/connectors/order-center-connector/查看某一個connector的詳細配置信息.
也能夠經過http://localhost:8083/connectors/order-center-connector/status查看當前connector的詳細狀態,若是當前connector出現故障,也會在這裏提示出來.
connector建立成功後,接下來應該測試debezium是否開始工做了,MySQL發生insert或者update 的時候有沒有寫入kafka.
[注意事項]
筆者在配置connector的過程當中也遇到過了好多問題,一些比較重要的東西也記錄下來了,若是你在使用過程當中出現問題能夠查看文末常見問題
裏面是否有一樣的問題.
在上面的debezium配置中能夠看到參數database.server.name
,database.whitelist
,debezium connector會處理MySQL的binlog後對應數據庫不一樣的表將消息發送到不通的topic上,其中這些topic的構成方式爲:[database.server.name].[數據庫名稱].[表名稱],記下來按步驟操做.
* 1. 在kafka的安裝目錄下使用bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic trade_order_0.inventory.orders
消費trade_order_0.inventory.orders
這個topic.
看到這樣的結果說明debezium已經開始工做了.
Demo代碼已經在github.com/m65536/prac…所有實現.下載後配合上面安裝好了的環境能夠直接啓動運行(當前版本使用的6.5,若是須要使用2.X,es客戶端配置略有不一樣).
使用建立index以前能夠建立index template,使用簡單而且方便靈活.
建立template
{
"template": "trade-order-sales",
"order": 0,
"mappings": {
"_default_": {
"_source": {
"enabled": true
}
},
"type": {
"properties":{"orderNumber":{"type":"text"},"quantity":{"type":"text"},"productId":{"type":"text"},"purchaser":{"type":"date"},"orderDate":{"type":"text"},"purchaserName":{"type":"text"},"createDate":{"type":"date"}}
}
}
}
複製代碼
啓動項目測試
啓動SpringBootElasticsearchApplication後,更改orders表任意數據,此時咱們看到日誌,再去觀察es,如圖.
此時說明MySQL到connect到kafka再到server再到es整個流程通了,同時能夠經過server去查詢esTestController-http://localhost:8080/test/list
1 檢查表白名單 2 檢查database.server.id是否重複 3 檢查其餘配置重複是否
zhuanlan.zhihu.com/p/29183128 esuc.dev.rs.com:9200/_analyze?pr… &text=SO5046240000014238
保持寫入消費使用的同一個序列化方式.
date
,datetime
,timestamp
之類的字段,消費者收到少了8個小時或者多了8個小時這個問題主要是因爲時區的問題,建議閱讀官網文檔Temporal values without time zone
解決辦法
建議數據都改爲timestamp
(攜帶了時區)類型而後再kafka消費的時候使用Date對象接收,轉成Date對象時區就是本地的了,再寫入es就是你想要的了.