debezium、kafka connector 解析 mysql binlog 到 kafak

目的: 須要搭建一個能夠自動監聽MySQL數據庫的變化,將變化的數據捕獲處理,此處只講解如何自動捕獲mysql 中數據的變化html

使用的技術

debeziumhttps://debezium.io/documentation/reference/1.0/connectors/mysql.htmljava

kafkahttp://kafka.apache.org/mysql

zookeeperhttp://zookeeper.apache.org/linux

mysql 5.7  https://www.mysql.com/sql

1、思路

須要一臺 Centos 7.x 的虛擬機 ,zk、debezium、kafka、confluent 運行在 虛擬機上 ,mysql 運行在 windows 系統上,虛擬機監聽 window 環境下的 mysql 數據變化數據庫

2、MySQL 環境準備

首先須要找到 mysql 的配置文件:my.ini ,個人路徑是:C:\ProgramData\MySQL\MySQL Server 5.7 ,由於監聽基礎是基於 mysql binlog ,須要開啓binlog ,添加以下配置apache

log_bin =D:\mysql-binlog\mysql-bin binlog_format=Row server-id=223344 binlog_row_image = full expire_logs_days = 10 binlog_rows_query_log_events = on

重啓 mysql 服務json

net stop mysql57 net start mysql57

此處,MySQL binlog 即開啓,能夠簡單的驗證,cmd 窗口 mysql -u root -p 登陸 mysql bootstrap

show binary logs;

 能夠看到文件內容,即mysql 變化的二進制文件。到此處,MySQL準備就緒。windows

2、zookeeper 、 kafka  準備

下載 zookeeper-3.4.14.tar.gz 、kafka_2.12-2.2.0.tar

mkdir -p  /usr/local/software/zookeeper mkdir -p  /usr/local/software/kafka mkdir -p  /usr/local/software/confluent

準備好路徑,並將安裝包移入該目錄,並解壓

mv  zookeeper-3.4.14.tar.gz /usr/local/software/zookeeper
mv kafka_2.12-2.2.0.tar

進入 zookeeper   /usr/local/software/zookeeper/zookeeper-3.4.14/conf目錄,修改 zoo.cfg (原名 zoo_sample.cfg)內容

dataDir=/opt/data/zookeeper/data dataLogDir=/opt/data/zookeeper/logs

進入 dataDir 目錄,建立文件 myid ,並添加內容:  1

此處,zk 的配置修改結束。開啓配置 kafka  路徑是:/usr/local/software/kafka/kafka_2.12-2.2.0/config, 修改 server.properties 

broker.id=1 listeners=PLAINTEXT://192.168.91.25:9092
advertised.listeners=PLAINTEXT://192.168.91.25:9092
log.dirs=/opt/data/kafka-logs host.name=192.168.91.25 zookeeper.connect=localhost:2181

3、debezium配置

此處須要 debezium connector 對 mysql 的 jar 包,下載地址:https://debezium.io/releases/1.0/

 

 將下載好的 plugs 上傳到虛擬機,解壓後名稱是: debezium-connector-mysql

移動到: /usr/local/share/kafka/plugins 目錄下,若是沒有該目錄則手動建立

 依賴的 jar 包下載好後,配置 kafka 目錄中conf connector

目錄: /usr/local/software/kafka/kafka_2.12-2.2.0/conf/connect-standalone.properties

bootstrap.servers=本機IP:9092 plugin.path=/usr/local/share/kafka/plugins

 此外,在kafka 根目錄下 建立文件: msyql.properties ,內容

name=mysql connector.class=io.debezium.connector.mysql.MySqlConnector database.hostname=192.168.3.125 database.port=3306 database.user=root database.password=123456 database.server.id=112233 database.server.name=test database.whitelist=orders,users database.history.kafka.bootstrap.servers=192.168.91.25:9092 database.history.kafka.topic=history.test include.schema.changes=true include.query=true # options: adaptive_time_microseconds(default)adaptive(deprecated) connect() time.precision.mode=connect # options: precise(default) double string
decimal.handling.mode=string # options: long(default) precise bigint.unsigned.handling.mode=long

4、啓動服務

01.啓動zk

cd /usr/local/software/zookeeper/zookeeper-3.4.14
zkServer.sh  start

02.啓動kafka

cd  /usr/local/software/kafka/kafka_2.12-2.2.0
./bin/kafka-server-start.sh  -daemon  config/server.properties

03.啓動kafka  connector

cd  /usr/local/software/kafka/kafka_2.12-2.2.0
./bin/connect-standalone.sh  config/connect-standalone.properties  mysql.properties

04.查看 topic ,在新的端口查看

./bin/kafka-topics.sh --list --zookeeper localhost:2181

5、指定監聽的數據庫/表

在 postman 中模擬 post 請求,以 json 格式傳遞參數:表示 監聽 shiro數據庫

{ "name": "shiro", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "192.168.3.125", "database.port": "3306", "database.user": "root", "database.password": "123456", "database.server.id": "184054", "database.server.name": "my", "database.whitelist": "shiro", "database.history.kafka.bootstrap.servers": "192.168.91.25:9092", "database.history.kafka.topic": "history.shiro", "include.schema.changes": "true" }}

從新查看topic 

在新端口啓動 kafka 消費者,消費my.shiro.user 

./bin/kafka-console-consumer.sh  --bootstrap-server localhost:9092 --topic my.shiro.user --from-beginning

Java客戶端消費者代碼

package kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; /** * Created by baizhuang on 2019/10/25 10:39. */

public class MyConsumer { public static void main(String []args){ //1.建立 kafka 生產者配置信息。
        Properties properties = new Properties(); //2.指定 kafka 集羣
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.91.25:9092"); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true); properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); //key,value 反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); properties.put("group.id","test"); KafkaConsumer<String,String> consumer = new  KafkaConsumer<String,String>(properties); consumer.subscribe(Arrays.asList("my.shiro.user")); while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(100); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord.key() + "-----" + consumerRecord.value()); } } } }
View Code

Java 客戶端生產者代碼

package kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /** * Created by baizhuang on 2019/10/24 16:58. */

public class MyProducer { public static void main(String []args){ //1.建立 kafka 生產者配置信息。
        Properties properties = new Properties(); //2.指定 kafka 集羣
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.91.25:9092"); //3.
        properties.put("acks","all"); //4.重試次數
        properties.put("retries",0); //5.批次大小
        properties.put("batch.size",16384); //6.等待時間
        properties.put("linger.ms",1); //7.RecordAccumlate 緩衝區大小
        properties.put("buffer.memory",33554432); //key ,value 序列化
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //9.建立生產者
        KafkaProducer<String,String>  producer = new KafkaProducer<String, String>(properties); for(int i=0;i<10;i++){ //10.發送
            String key = String.valueOf(i); String value = ""+key+"條消息"; producer.send(new ProducerRecord<String, String>("mytopic",key,value)); System.out.println("msg:"+i); } producer.close(); } }
View Code

啓動消費者,修改 shiro 數據庫的user 表,Java客戶端消費者與 linux 消費者都可動態的顯示變化的數據

相關文章
相關標籤/搜索