Debezium結合Kafka Connect實時捕獲MySQL變動事件寫入Elasticsearch實現搜索流程

前段時間寫了MySql實時數據變動事件捕獲kafka confluent之debezium,使用的是confluent整套的,接下來這篇將會介紹完整實戰。html

首先明確需求,公司訂單數據愈來愈大,商戶端和E端各類業務需求也愈來愈多查詢愈加複雜,咱們想引進elasticsearch來實現查詢和搜索。那麼問題來了,實時更新的訂單數據如何同步到es中,業務代碼中insert或者update es中的index這確定是不可取的,咱們選擇使用kafka和debezium結合使用,讀取MySQLbinlog及時寫入es.java

本文將會實現一套完整的Debezium結合Kafka Connect實時捕獲MySQL變動事件寫入Elasticsearch並實現查詢的流程.mysql

安裝

MySQL

MySQL的安裝比較簡單,同時須要MySQL開啓binlog,爲了簡單我這裏使用docker啓動一個MySQL而且裏面已建立有數據。git

  • docker安裝
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.8

複製代碼

Zookeeper

kafka的啓動依賴於zookeeper,因此這裏先安裝而且啓動zookeeper.sql

  • downloaddocker

  • install and start數據庫

    首先須要配置conf/zoo.cfg,能夠直接複製一份conf/zoo_sample.cfg使用.切換到安裝目錄下bin/zkServer.sh start啓動zookeeper.apache

Kafka

  • kafka download

  • MySQL Connector plugin archive download

    下載好了的kafka文件目錄裏面其實默認已經包含了幾個connect,這裏咱們須要使用的是debezium這個插件,因此須要把下載後的debezium安裝到connect中,安裝方法也比較簡單,把解壓後的MySQL Connector plugin archive裏面的內容所有copy到kafka安裝目錄libs目錄下便可.

  • 啓動kafka Quickstart

    在安裝目錄下執行bin/kafka-server-start.sh config/server.properties

  • 啓動kafka connect Running Kafka Connect

    在安裝目錄下執行./bin/connect-distributed.sh config/connect-distributed.properties

Elasticsearch

  • download
  • 啓動,安裝目錄下 bin/elasticsearch

配置connect

截止目前已經有了本地的MySQL,kafka,kafka connect,elasticearch,接下來配置kafka connect,經過配置好connect可以讓debezium讀取到binlog把MySQL的數據change事件寫入到kafka的topic中.

kafka connect爲咱們提供了restful的訪問方式,詳細文檔查看Kafka Connect REST Interface.

新增一個connect

put http://localhost:8083/connectors/order-center-connector/config

{
  "connector.class": "io.debezium.connector.mysql.MySqlConnector",
  "tasks.max": "1",
  "database.hostname": "localhost",
  "database.port": "3306",
  "database.user": "root",
  "database.password": "debezium",
  "database.server.id": "1",
  "database.server.name": "trade_order_0",
  "database.whitelist": "inventory",
  "include.schema.changes": "false",
  "snapshot.mode": "schema_only",
  "snapshot.locking.mode": "none",
  "database.history.kafka.bootstrap.servers": "localhost:9092",
  "database.history.kafka.topic": "dbhistory.tradeOrder1",
  "decimal.handling.mode": "string",
  "database.history.store.only.monitored.tables.ddl":"true",
  "database.history.skip.unparseable.ddl":"true"
}
複製代碼

備註: http://localhost:8083/connectors/order-center-connector/config這個接口不但能更新connector還能建立,若是connector不存在的時候使它就會建立一個connector若是存在就去更新.

debezium提供了諸多配置參數,上圖例子中只是提供了經常使用的配置,詳細配置參考Debezium Connector for MySQL .

connector建立成功以後,能夠經過http://localhost:8083/connectors/查看已經建立個的connector.

同時你還能夠經過http://localhost:8083/connectors/order-center-connector/查看某一個connector的詳細配置信息.

也能夠經過http://localhost:8083/connectors/order-center-connector/status查看當前connector的詳細狀態,若是當前connector出現故障,也會在這裏提示出來.

connector建立成功後,接下來應該測試debezium是否開始工做了,MySQL發生insert或者update 的時候有沒有寫入kafka.

[注意事項]

筆者在配置connector的過程當中也遇到過了好多問題,一些比較重要的東西也記錄下來了,若是你在使用過程當中出現問題能夠查看文末常見問題裏面是否有一樣的問題.

debezium kafka topic消費

在上面的debezium配置中能夠看到參數database.server.name,database.whitelist,debezium connector會處理MySQL的binlog後對應數據庫不一樣的表將消息發送到不通的topic上,其中這些topic的構成方式爲:[database.server.name].[數據庫名稱].[表名稱],記下來按步驟操做.

* 1. 在kafka的安裝目錄下使用bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic trade_order_0.inventory.orders消費trade_order_0.inventory.orders這個topic.

    1. 任意修改orders表的一行數據,而後回到第一步就能夠觀察到.

看到這樣的結果說明debezium已經開始工做了.

spring boot消費kafka消息而且寫入elasticsearch中

{
  "template": "trade-order-sales",
  "order": 0,
  "mappings": {
    "_default_": {
      "_source": {
        "enabled": true
      }
    },
    "type": {
      "properties":{"orderNumber":{"type":"text"},"quantity":{"type":"text"},"productId":{"type":"text"},"purchaser":{"type":"date"},"orderDate":{"type":"text"},"purchaserName":{"type":"text"},"createDate":{"type":"date"}}
      
     }
  }
}
複製代碼

此時說明MySQL到connect到kafka再到server再到es整個流程通了,同時能夠經過server去查詢esTestController-http://localhost:8080/test/list

常見問題

  • Unexpected exception while parsing statement alter table pay_cs_market_balance alter column balance_amt set default 0 at line 1

blog.csdn.net/lzufeng/art…

  • 若是配置無效

1 檢查表白名單 2 檢查database.server.id是否重複 3 檢查其餘配置重複是否

  • 如何分詞(version 2.X)

zhuanlan.zhihu.com/p/29183128 esuc.dev.rs.com:9200/_analyze?pr… &text=SO5046240000014238

  • 消費者亂碼

保持寫入消費使用的同一個序列化方式.

  • 數據庫date,datetime,timestamp之類的字段,消費者收到少了8個小時或者多了8個小時

這個問題主要是因爲時區的問題,建議閱讀官網文檔Temporal values without time zone

解決辦法

建議數據都改爲timestamp(攜帶了時區)類型而後再kafka消費的時候使用Date對象接收,轉成Date對象時區就是本地的了,再寫入es就是你想要的了.

相關文章
相關標籤/搜索