debezium在debezium關於cdc的使用(上)中有作介紹。具體能夠跳到上文查看。本篇主要講述使用kafka connector
方式來同步數據。而kafka connector實際上也有提供其餘的sink(Kafka Connect JDBC)來同步數據,可是沒有delete事件。因此在這裏選擇了Debezium MySQL CDC Connector方式來同步。本文須要使用Avro方式序列化kafka數據。html
使用kafka消息中間介的話須要對應的服務支持,尤爲須要chema-registry
來管理schema,因電腦內存有限就沒使用docker方式啓動,若是條件ok內存夠大的話闊以使用docker方式。因此使用的就是local本地方式。具體下載,安裝,部署,配置環境變量我就不在重複描述了,闊以參考官方文檔。java
進入目錄後啓動bin/confluent start
mysql
能夠經過kafka命令建立topic也能夠經過Confluent Control Center
地址:http://localhost:9021
來建立topic。咱們仍是按照上文的表來同步數據,因此建立topic:dbserver1.inventory.demo
。spring
能夠經過kafka rest命令建立也能夠使用Confluent Control Center
建立。sql
connect的api命令參考docker
方便點能夠使用crul建立,如下爲配置文件數據庫
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"decimal.handling.mode": "double",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "dbhistory.inventory"
}
}
複製代碼
建立好後能夠使用命令查詢到或者在管理中心查看。apache
命令:http://localhost:8083/connectors/inventory-connectorjson
spring:
application:
name: data-center
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/inventory_back?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC
username: debe
password: 123456
jpa:
show-sql: true
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
# time-zone: UTC
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: debezium-kafka-connector
key-deserializer: "io.confluent.kafka.serializers.KafkaAvroDeserializer"
value-deserializer: "io.confluent.kafka.serializers.KafkaAvroDeserializer"
properties:
schema.registry.url: http://localhost:8081
複製代碼
跟上文的處理流程是同樣的。只不過DDL和DML分紅2個監聽器。bootstrap
package com.example.kakfa.avro;
import com.example.kakfa.avro.sql.SqlProvider;
import com.example.kakfa.avro.sql.SqlProviderFactory;
import io.debezium.data.Envelope;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericData;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Objects;
import java.util.Optional;
@Slf4j
@Component
public class KafkaAvroConsumerRunner {
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private NamedParameterJdbcTemplate namedTemplate;
@KafkaListener(id = "dbserver1-ddl-consumer", topics = "dbserver1")
public void listenerUser(ConsumerRecord<GenericData.Record, GenericData.Record> record) throws Exception {
GenericData.Record key = record.key();
GenericData.Record value = record.value();
log.info("Received record: {}", record);
log.info("Received record: key {}", key);
log.info("Received record: value {}", value);
String databaseName = Optional.ofNullable(value.get("databaseName")).map(Object::toString).orElse(null);
String ddl = Optional.ofNullable(value.get("ddl")).map(Object::toString).orElse(null);
if (StringUtils.isBlank(ddl)) {
return;
}
handleDDL(ddl, databaseName);
}
/** * 執行數據庫ddl語句 * * @param ddl */
private void handleDDL(String ddl, String db) {
log.info("ddl語句 : {}", ddl);
try {
if (StringUtils.isNotBlank(db)) {
ddl = ddl.replace(db + ".", "");
ddl = ddl.replace("`" + db + "`.", "");
}
jdbcTemplate.execute(ddl);
} catch (Exception e) {
log.error("數據庫操做DDL語句失敗,", e);
}
}
@KafkaListener(id = "dbserver1-dml-consumer", topicPattern = "dbserver1.inventory.*")
public void listenerAvro(ConsumerRecord<GenericData.Record, GenericData.Record> record) throws Exception {
GenericData.Record key = record.key();
GenericData.Record value = record.value();
log.info("Received record: {}", record);
log.info("Received record: key {}", key);
log.info("Received record: value {}", value);
if (Objects.isNull(value)) {
return;
}
GenericData.Record source = (GenericData.Record) value.get("source");
String table = source.get("table").toString();
Envelope.Operation operation = Envelope.Operation.forCode(value.get("op").toString());
String db = source.get("db").toString();
handleDML(key, value, table, operation);
}
private void handleDML(GenericData.Record key, GenericData.Record value, String table, Envelope.Operation operation) {
SqlProvider provider = SqlProviderFactory.getProvider(operation);
if (Objects.isNull(provider)) {
log.error("沒有找到sql處理器提供者.");
return;
}
String sql = provider.getSql(key, value, table);
if (StringUtils.isBlank(sql)) {
log.error("找不到sql.");
return;
}
try {
log.info("dml語句 : {}", sql);
namedTemplate.update(sql, provider.getSqlParameterMap());
} catch (Exception e) {
log.error("數據庫DML操做失敗,", e);
}
}
}
複製代碼
剩下的就是在inventory庫中demo表中增刪改數據,在對應的inventory_back庫中demo表數據對應的改變。
關注瞭解最新動態更新