博文原址:debezium關於cdc的使用(上)java
debezium是一個爲了捕獲數據變動(cdc)的開源的分佈式平臺。啓動並指向數據庫,當其餘應用對此數據庫執行inserts
、updates
、delete
操做時,此應用快速獲得響應。debezium是持久化和快速響應的,所以你的應用能夠快速響應且不會丟失任意一條事件。debezium記錄是數據庫表的行級別的變動事件。同時debezium是構建在kafka之上的,同時與kafka深度耦合,因此提供kafka connector來使用,debezium sink。支持的數據庫有mysql、MongoDB、PostgreSQL、Oracle、SQL server。本篇以mysql做爲數據源來實現功能,監聽msyql的binlog,還須要修改。當前版本是0.9.5.Final,0.10版本正在開發中。mysql
本篇文章主要使用Embedding
形式監聽事件,並同步更新到數據庫。git
下篇主要使用kafka connector來同步更新到數據庫。github
mysql須要以下開啓binlog。可是若是使用的是debezium/mysql鏡像,自動已經配置好了。sql
log-bin=mysql-bin #添加這一行就ok binlog-format=ROW #選擇row模式 server_id=1 #配置mysql replaction須要定義,不能和canal的slaveId重複
先來一個效果,主要是配置kafka connector來獲取debezium事件記錄。須要3個服務,zookeeper、kakfa和debezium connector。這裏使用docker來啓動的,因此須要先按照docker。docker
docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:0.9
docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:0.9
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.9
docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:0.9
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'
經過invertory
數據庫了的任一表的數據shell
docker run -it --name watcher --rm --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:0.9 watch-topic -a -k dbserver1.inventory.customers
這裏主要使用內嵌式的方式獲取cdc事件而不須要使用kafka,直接消費debezium事件流。場景是在某一個mysql數據庫裏的table發生變動,把變動同步到另外一mysql數據庫。本次使用的是監聽inventory
數據庫並將數據同步到inventory_back
。數據庫
connector.class=io.debezium.connector.mysql.MySqlConnector offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore offset.storage.file.filename=offset.dat offset.flush.interval.ms=60000 name=debezium-kafka-source database.hostname=localhost database.port=3306 database.user=debezium database.password=dbz #database.dbname=inventory database.whitelist=inventory #database.whitelist=inventory,inventory_back server.id=184054 database.server.name=dbserver1 #transforms=unwrap #transforms.unwrap.type=io.debezium.transforms.UnwrapFromEnvelope #transforms.unwrap.drop.tombstones=false database.history=io.debezium.relational.history.FileDatabaseHistory database.history.file.filename=dbhistory.dat
@Slf4j @Configuration public class DebeziumEmbeddedAutoConfiguration { @Bean public Properties embeddedProperties() { Properties propConfig = new Properties(); try(InputStream propsInputStream = getClass().getClassLoader().getResourceAsStream("config.properties")) { propConfig.load(propsInputStream); } catch (IOException e) { log.error("Couldn't load properties", e); } PropertyLoader.loadEnvironmentValues(propConfig); return propConfig; } @Bean public io.debezium.config.Configuration embeddedConfig(Properties embeddedProperties) { return io.debezium.config.Configuration.from(embeddedProperties); } @Bean public JsonConverter keyConverter(io.debezium.config.Configuration embeddedConfig) { JsonConverter converter = new JsonConverter(); converter.configure(embeddedConfig.asMap(), true); return converter; } @Bean public JsonConverter valueConverter(io.debezium.config.Configuration embeddedConfig) { JsonConverter converter = new JsonConverter(); converter.configure(embeddedConfig.asMap(), false); return converter; } }
這裏主要是利用CommandLineRunner特性,啓動debezium的EmbeddedEngine引擎,獲取到cdc事件後由handleRecord
處理DDL和DML,須要去解析cdc的事件SourceRecord
的key和value。apache
@Slf4j @Order(2) @Component public class DebeziumEmbeddedRunner implements CommandLineRunner { @Autowired private io.debezium.config.Configuration embeddedConfig; @Autowired private JdbcTemplate jdbcTemplate; @Autowired private NamedParameterJdbcTemplate namedTemplate; @Autowired private JsonConverter keyConverter; @Autowired private JsonConverter valueConverter; @Override public void run(String... args) throws Exception { EmbeddedEngine engine = EmbeddedEngine.create() .using(embeddedConfig) .using(this.getClass().getClassLoader()) .using(Clock.SYSTEM) .notifying(this::handleRecord) .build(); ExecutorService executor = Executors.newSingleThreadExecutor(); executor.execute(engine); shutdownHook(engine); awaitTermination(executor); } /** * For every record this method will be invoked. */ private void handleRecord(SourceRecord record) { logRecord(record); Struct payload = (Struct) record.value(); if (Objects.isNull(payload)) { return; } String table = Optional.ofNullable(DebeziumRecordUtils.getRecordStructValue(payload, "source")) .map(s->s.getString("table")).orElse(null); // // 處理數據DML Envelope.Operation operation = DebeziumRecordUtils.getOperation(payload); if (Objects.nonNull(operation)) { Struct key = (Struct) record.key(); handleDML(key, payload, table, operation); return; } // // // 處理結構DDL String ddl = getDDL(payload); if (StringUtils.isNotBlank(ddl)) { handleDDL(ddl); } } private String getDDL(Struct payload) { String ddl = DebeziumRecordUtils.getDDL(payload); if (StringUtils.isBlank(ddl)) { return null; } String db = DebeziumRecordUtils.getDatabaseName(payload); if (StringUtils.isBlank(db)) { db = embeddedConfig.getString(MySqlConnectorConfig.DATABASE_WHITELIST); } ddl = ddl.replace(db + ".", ""); ddl = ddl.replace("`" + db + "`.", ""); return ddl; } /** * 執行數據庫ddl語句 * * @param ddl */ private void handleDDL(String ddl) { log.info("ddl語句 : {}", ddl); try { jdbcTemplate.execute(ddl); } catch (Exception e) { log.error("數據庫操做DDL語句失敗,", e); } } /** * 處理insert,update,delete等DML語句 * * @param key 表主鍵修改事件結構 * @param payload 表正文響應 * @param table 表名 * @param operation DML操做類型 */ private void handleDML(Struct key, Struct payload, String table, Envelope.Operation operation) { AbstractDebeziumSqlProvider provider = DebeziumSqlProviderFactory.getProvider(operation); if (Objects.isNull(provider)) { log.error("沒有找到sql處理器提供者."); return; } String sql = provider.getSql(key, payload, table); if (StringUtils.isBlank(sql)) { log.error("找不到sql."); return; } try { log.info("dml語句 : {}", sql); namedTemplate.update(sql, provider.getSqlParameterMap()); } catch (Exception e) { log.error("數據庫DML操做失敗,", e); } } /** * 打印消息 * * @param record */ private void logRecord(SourceRecord record) { final byte[] payload = valueConverter.fromConnectData("dummy", record.valueSchema(), record.value()); final byte[] key = keyConverter.fromConnectData("dummy", record.keySchema(), record.key()); log.info("Publishing Topic --> {}", record.topic()); log.info("Key --> {}", new String(key)); log.info("Payload --> {}", new String(payload)); } private void shutdownHook(EmbeddedEngine engine) { Runtime.getRuntime().addShutdownHook(new Thread(() -> { log.info("Requesting embedded engine to shut down"); engine.stop(); })); } private void awaitTermination(ExecutorService executor) { try { while (!executor.awaitTermination(10L, TimeUnit.SECONDS)) { log.info("Waiting another 10 seconds for the embedded engine to shut down"); } } catch (InterruptedException e) { Thread.interrupted(); } } }
provider和table字段解析器太多,這裏就不在一一列出來了,以下圖所示,支持mysql大部分字段類型。若是有須要的能夠關注微信公衆號或者郵件以及評論回覆。json
CREATE TABLE `demo` ( `id` int(10) NOT NULL AUTO_INCREMENT, `bigint_id` bigint(20) NOT NULL, `var_name` varchar(255) NOT NULL, `ex_tinyint` tinyint(4) DEFAULT NULL, `ex_char` char(255) DEFAULT NULL, `ex_json` json DEFAULT NULL COMMENT '水電費', `ex_text` text, `ex_year` year(4) DEFAULT NULL, `ex_time` time DEFAULT NULL, `ex_date` date DEFAULT NULL, `ex_datetime` datetime DEFAULT NULL, `ex_timestamp` timestamp NULL DEFAULT NULL, `ex_blob` blob, `ex_tinyblob` tinyblob, `ex_binary` binary(255) DEFAULT NULL, `ex_double` double(10,4) DEFAULT NULL, `ex_float` float(10,2) DEFAULT NULL, `ex_decimal` decimal(10,2) DEFAULT NULL, `ex_numeric` decimal(10,4) DEFAULT NULL, `ex_real` double(10,4) DEFAULT NULL, `ex_bit` bit(1) DEFAULT NULL, `ex_enum` enum('123','@@','22','水電費') DEFAULT '123', `ex_set` set('a','b','c','d') DEFAULT NULL, `ex_geometry` geometry DEFAULT NULL, `ex_point` point DEFAULT NULL, `ex_linestring` linestring DEFAULT NULL, `ex_polygon` polygon DEFAULT NULL, `ex_geometrycollection` geometrycollection DEFAULT NULL, `ex_multipoint` multipoint DEFAULT NULL, PRIMARY KEY (`id`,`bigint_id`,`var_name`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
能夠看出將數據庫表的bigint_id
字段長度改成21,監聽到事件後:執行了ddl語句,inventory_back
庫中的demo
表的bigint_id
字段長度改成21了。
Publishing Topic --> dbserver1 2019-06-24 16:22:21.230 INFO 14995 --- [pool-1-thread-1] c.e.embedded.DebeziumEmbeddedRunner : Key --> {"schema":{"type":"struct","fields":[{"type":"string","optional":false,"field":"databaseName"}],"optional":false,"name":"io.debezium.connector.mysql.SchemaChangeKey"},"payload":{"databaseName":"inventory"}} 2019-06-24 16:22:21.230 INFO 14995 --- [pool-1-thread-1] c.e.embedded.DebeziumEmbeddedRunner : Payload --> {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"default":false,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"databaseName"},{"type":"string","optional":false,"field":"ddl"}],"optional":false,"name":"io.debezium.connector.mysql.SchemaChangeValue"},"payload":{"source":{"version":"0.9.3.Final","connector":"mysql","name":"dbserver1","server_id":223344,"ts_sec":1561364540,"gtid":null,"file":"mysql-bin.000006","pos":22530,"row":0,"snapshot":false,"thread":null,"db":null,"table":null,"query":null},"databaseName":"inventory","ddl":"ALTER TABLE `inventory`.`demo` \nMODIFY COLUMN `bigint_id` bigint(21) NOT NULL AFTER `id`"}} 2019-06-24 16:22:21.230 ERROR 14995 --- [pool-1-thread-1] c.example.embedded.DebeziumRecordUtils : not find op field. 2019-06-24 16:22:21.231 INFO 14995 --- [pool-1-thread-1] c.e.embedded.DebeziumEmbeddedRunner : ddl語句 : ALTER TABLE `demo` MODIFY COLUMN `bigint_id` bigint(21) NOT NULL AFTER `id`
在inventory
庫中的demo
新增一條記錄後有以下日誌記錄,能查看到topic,key,payload以及dml的insert語句。結果會把數據同步到inventory_back
庫中的demo
。
2019-06-24 16:27:14.735 INFO 14995 --- [pool-1-thread-1] i.debezium.connector.mysql.BinlogReader : 1 records sent during previous 00:04:53.506, last recorded offset: {ts_sec=1561364834, file=mysql-bin.000006, pos=23002, row=1, server_id=223344, event=2} 2019-06-24 16:27:14.737 INFO 14995 --- [pool-1-thread-1] c.e.embedded.DebeziumEmbeddedRunner : Publishing Topic --> dbserver1.inventory.demo 2019-06-24 16:27:14.737 INFO 14995 --- [pool-1-thread-1] c.e.embedded.DebeziumEmbeddedRunner : Key --> {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"bigint_id"},{"type":"string","optional":false,"field":"var_name"}],"optional":false,"name":"dbserver1.inventory.demo.Key"},"payload":{"id":2,"bigint_id":1,"var_name":"老王"}} 2019-06-24 16:27:14.738 INFO 14995 --- [pool-1-thread-1] c.e.embedded.DebeziumEmbeddedRunner : Payload --> {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"bigint_id"},{"type":"string","optional":false,"field":"var_name"},{"type":"int16","optional":true,"field":"ex_tinyint"},{"type":"string","optional":true,"field":"ex_char"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"ex_json"},{"type":"string","optional":true,"field":"ex_text"},{"type":"int32","optional":true,"name":"io.debezium.time.Year","version":1,"field":"ex_year"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTime","version":1,"field":"ex_time"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"ex_date"},{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"ex_datetime"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"ex_timestamp"},{"type":"bytes","optional":true,"field":"ex_blob"},{"type":"bytes","optional":true,"field":"ex_tinyblob"},{"type":"bytes","optional":true,"field":"ex_binary"},{"type":"double","optional":true,"field":"ex_double"},{"type":"double","optional":true,"field":"ex_float"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"2","connect.decimal.precision":"10"},"field":"ex_decimal"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"4","connect.decimal.precision":"10"},"field":"ex_numeric"},{"type":"double","optional":true,"field":"ex_real"},{"type":"boolean","optional":true,"field":"ex_bit"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"123,22"},"default":"123","field":"ex_enum"},{"type":"string","optional":true,"name":"io.debezium.data.EnumSet","version":1,"parameters":{"allowed":"a,b,c,d"},"field":"ex_set"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_geometry"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"ex_point"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_linestring"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_polygon"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_geometrycollection"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_multipoint"}],"optional":true,"name":"dbserver1.inventory.demo.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"bigint_id"},{"type":"string","optional":false,"field":"var_name"},{"type":"int16","optional":true,"field":"ex_tinyint"},{"type":"string","optional":true,"field":"ex_char"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"ex_json"},{"type":"string","optional":true,"field":"ex_text"},{"type":"int32","optional":true,"name":"io.debezium.time.Year","version":1,"field":"ex_year"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTime","version":1,"field":"ex_time"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"ex_date"},{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"ex_datetime"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"ex_timestamp"},{"type":"bytes","optional":true,"field":"ex_blob"},{"type":"bytes","optional":true,"field":"ex_tinyblob"},{"type":"bytes","optional":true,"field":"ex_binary"},{"type":"double","optional":true,"field":"ex_double"},{"type":"double","optional":true,"field":"ex_float"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"2","connect.decimal.precision":"10"},"field":"ex_decimal"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"4","connect.decimal.precision":"10"},"field":"ex_numeric"},{"type":"double","optional":true,"field":"ex_real"},{"type":"boolean","optional":true,"field":"ex_bit"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"123,22"},"default":"123","field":"ex_enum"},{"type":"string","optional":true,"name":"io.debezium.data.EnumSet","version":1,"parameters":{"allowed":"a,b,c,d"},"field":"ex_set"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_geometry"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"ex_point"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_linestring"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_polygon"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_geometrycollection"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_multipoint"}],"optional":true,"name":"dbserver1.inventory.demo.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"default":false,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.demo.Envelope"},"payload":{"before":null,"after":{"id":2,"bigint_id":1,"var_name":"老王","ex_tinyint":1,"ex_char":"a","ex_json":"{\"abc\":123}","ex_text":"ert","ex_year":2019,"ex_time":59224000000,"ex_date":null,"ex_datetime":null,"ex_timestamp":null,"ex_blob":null,"ex_tinyblob":null,"ex_binary":null,"ex_double":null,"ex_float":null,"ex_decimal":null,"ex_numeric":null,"ex_real":null,"ex_bit":null,"ex_enum":"123","ex_set":null,"ex_geometry":null,"ex_point":null,"ex_linestring":null,"ex_polygon":null,"ex_geometrycollection":null,"ex_multipoint":null},"source":{"version":"0.9.3.Final","connector":"mysql","name":"dbserver1","server_id":223344,"ts_sec":1561364834,"gtid":null,"file":"mysql-bin.000006","pos":23194,"row":0,"snapshot":false,"thread":9,"db":"inventory","table":"demo","query":null},"op":"c","ts_ms":1561364834477}} 2019-06-24 16:27:14.738 INFO 14995 --- [pool-1-thread-1] c.e.embedded.DebeziumEmbeddedRunner : dml語句 : insert into demo values (:id,:bigint_id,:var_name,:ex_tinyint,:ex_char,:ex_json,:ex_text,:ex_year,:ex_time,:ex_date,:ex_datetime,:ex_timestamp,:ex_blob,:ex_tinyblob,:ex_binary,:ex_double,:ex_float,:ex_decimal,:ex_numeric,:ex_real,:ex_bit,:ex_enum,:ex_set,:ex_geometry,:ex_point,:ex_linestring,:ex_polygon,:ex_geometrycollection,:ex_multipoint)
在inventory
庫中的demo
修改剛剛新增的記錄後有以下日誌記錄,能查看到topic,key,payload以及先delete再insert語句。結果會把數據同步到inventory_back
庫中的demo
。
在inventory
庫中的demo
修改剛剛修改的記錄給刪除掉後有以下日誌記錄,能查看到topic,key,payload以及先delete語句。結果會把數據同步到inventory_back
庫中的demo
將其刪掉。這裏有2個事件,第二條事件是一種標緻,這裏不處理。
日誌:
關注瞭解最新動態更新