此示例使用框架:mysql-binlog-connector-java (https://github.com/shyiko/mysql-binlog-connector-java)java
配置文件(Nacos)mysql
spring: # MySQL 鏈接信息 datasource: host: ${MYSQL-HOST:mysql-host} port: ${MYSQL-PORT:3306} username: ${MYSQL-USER:root} password: ${MYSQL-PWD:root}
加入依賴:git
<!-- MySQL binlog 日誌監聽 --> <dependency> <groupId>com.github.shyiko</groupId> <artifactId>mysql-binlog-connector-java</artifactId> <version>${binlog.version}</version> </dependency>
入門簡單案例講解:github
讀取 BinLog 的主體程序:spring
/** * 監聽MySQL binlog * CommandLineRunner SpringBoot啓動後執行的代碼(後置初始化) * * @author Alay * @date 2020-12-26 15:43 */ @Component public class BinLogRunner implements CommandLineRunner { @Value("${spring.datasource.host}") private String host; @Value("${spring.datasource.port}") private int port; @Value("${spring.datasource.username}") private String userName; @Value("${spring.datasource.password}") private String password; @Override public void run(String... args) throws Exception { // 客戶端鏈接創建 BinaryLogClient logClient = new BinaryLogClient(host, port, userName, password); /* // 此配置爲系列化處理, EventDeserializer eventDeserializer = new EventDeserializer(); eventDeserializer.setCompatibilityMode( EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG, EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY ); logClient.setEventDeserializer(eventDeserializer); */ logClient.setServerId(1); logClient.registerEventListener(new BinLogEvent()); // 此處可能出現異常處理,AuthenticationException 由於 MySQL 訪問密碼加密協議的不一樣問題 logClient.connect(); } /** * 查詢表結構 * SELECT * TABLE_SCHEMA, * TABLE_NAME, * COLUMN_NAME, * ORDINAL_POSITION, * COLUMN_DEFAULT, * IS_NULLABLE, * DATA_TYPE, * CHARACTER_MAXIMUM_LENGTH, * CHARACTER_OCTET_LENGTH, * NUMERIC_PRECISION, * NUMERIC_SCALE, * CHARACTER_SET_NAME, * COLLATION_NAME * FROM * INFORMATION_SCHEMA.COLUMNS * WHERE * TABLE_NAME = 'person_domain_user_stat' # 表名 * AND TABLE_schema = 'braineex' # 庫名 */ class BinLogEvent implements BinaryLogClient.EventListener { @Override public void onEvent(Event event) { EventType eventType = event.getHeader().getEventType(); if (eventType == EventType.TABLE_MAP) { TableMapEventData tableData = event.getData(); System.out.println("tableId:" + tableData.getTableId()); System.out.println("庫名:" + tableData.getDatabase()); System.out.println("表名:" + tableData.getTable()); /** * 字段名集合,位置:在event對象中 Event -> EventData -> TableMapEventData ->TableMapEventMetadata * SHOW VARIABLES LIKE '%BINLOG%' binlog_row_metadata=FULL 的時候纔會 Binlog日誌纔會存在此值 * SET GLOBAL binlog_row_metadata='FULL' */ tableData.getEventMetadata().getColumnNames().forEach(System.out::println); } EventData eventData = event.getData(); if (null != eventData) { if (eventData instanceof DeleteRowsEventData) { System.out.println("刪除操做"); DeleteRowsEventData deleteRowsEventData = (DeleteRowsEventData) eventData; } if (eventData instanceof UpdateRowsEventData) { System.out.println("修改操做"); UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData) eventData; List<Map.Entry<Serializable[], Serializable[]>> rows = updateRowsEventData.getRows(); for (Map.Entry<Serializable[], Serializable[]> row : rows) { /** * 執行更行前 row 記錄的全部值的 Array * 順序和 event.getHeader() 中的 columnNames 字段名順序一一對應 */ Serializable[] oldValues = row.getKey(); /**執行更行後 row 記錄的全部的值 Array * 順序和 event.getHeader() 中的 columnNames 字段名順序一一對應 */ Serializable[] newValues = row.getValue(); } } if (eventData instanceof WriteRowsEventData) { System.out.println("寫入操做"); WriteRowsEventData writeRowsEventData = (WriteRowsEventData) eventData; System.out.println(writeRowsEventData.getTableId()); } } } } }
MySQL 加密協議問題異常處理:GitHub上做者做者討論 ( https://github.com/shyiko/mysql-binlog-connector-java/issues/240 )sql
在MySQL 中執行查詢語句:將MySQL 加密協議修改成 8.0 以前的:mysql_native_password,這僅僅是權宜之計,萬全之策等做者更新新的協議,感興趣的朋友能夠持續追蹤,做者後續會 對 8.0 以後的版本的加密協議進行兼容處理的,數據庫
暫時的權益之計:json
ALTER USER '訪問的用戶名如:root' @'%' IDENTIFIED WITH mysql_native_password BY '密碼如:root';
查看 MySQL 版本號:SELECT VERSION(); 查看當前數據庫庫名:SELECT DATABASE()緩存
BinLog 的頭信息中沒有返回表的字段名,tableData.getEventMetadata().getColumnNames();app
若須要字段名方案有二:
一:自行前往數據庫中查詢(推薦)
SELECT `table_schema`, `table_name`, `column_name`, `ordinal_position`, `column_default`, `is_nullable`, `data_type`, `character_maximum_length`, `character_octet_length`, `numeric_precision`, `numeric_scale`, `character_set_name`, `collation_name` FROM `information_schema`.`columns` WHERE `table_schema` = 'behelpful' # 庫名 AND `table_name` = 'person_information' # 表名 ORDER BY `ordinal_position` # 按表設計結構的順序排序,從數字 1 開始
查詢全部的
SELECT `table_schema`, #庫名 `table_name`, # 表名 `engine`, # 引擎 `table_comment`, # 表註釋 `table_collation`, # 表字符集及排序規則 `create_time` # 建表時間 FROM `information_schema`.`tables` WHERE `table_schema` = ( SELECT DATABASE() # 當前所在的庫 ) ORDER BY `create_time` DESC
如:JDBC 查詢,或者,經過以上SQL 語句字段自建一個對象,經過 Mybatis 方式查詢(僞代碼)
Connection connection = ... DatabaseMetaData metaData = connection.getMetaData(); ResultSet tableResultSet = metaData.getTables(null, "public", null, new String[]{"TABLE"}); try { while (tableResultSet.next()) { String tableName = tableResultSet.getString("TABLE_NAME"); ResultSet columnResultSet = metaData.getColumns(null, "public", tableName, null); try { while (columnResultSet.next()) { String columnName = columnResultSet.getString("COLUMN_NAME"); ... } } finally { columnResultSet.close(); } } } finally { tableResultSet.close(); }
方案二(實際項目中不推薦使用,測試時使用此方法,方便):
設置 SET GLOBAL binlog_row_metadata='FULL' (不推薦,由於一般咱們並不須要全局,只是須要指定的個別數據庫表),因此推薦使用查詢,而後存儲緩存
默認值:MINIMAL
SET GLOBAL binlog_row_metadata='FULL'; SHOW VARIABLES LIKE '%BINLOG%';
經過以上代碼,根據 BinLog 的到的信息
頭信息事件中獲得:庫 -> 表,及字段名,每次事件前必先觸發一次頭事件
其餘事件,根據具體的業務需求進行封裝成爲具體的 Java 對象,而後使用不少方案能夠實現 NoSQL 的數據同步,如使用 Kafka ,ActiveMQ,等均可以實現
整合到項目中:碼雲中有具體示例: https://gitee.com/chxlay/be-helpful behelpful-search 模塊
@Data @ConfigurationProperties(prefix = "spring.datasource") public class MySqlConnection { private String host; private int port; private String userName; private String password; }
BinLog 監聽器:
logClient.connect()執行時間太長,可是不影響正常的 Log 事件監聽,因此此處使用異步處理 @Async 、@Order,避免耽擱 SpringBoot 啓動時間
/** * 監聽MySQL binlog * CommandLineRunner SpringBoot啓動後執行的代碼(後置初始化)*/ @Order @Component @AllArgsConstructor @EnableConfigurationProperties(value = MySqlConnection.class) public class BinLogRunner implements CommandLineRunner { /** * 監聽器(這裏只註冊了一個監聽器,而後在其中進行邏輯分發事件處理) */ private final ListenerAllocate listenerAllocate; private final MySqlConnection mySQLConnection; @Async @Order @Override public void run(String... args) throws Exception { EventDeserializer eventDeserializer = new EventDeserializer(); // 因爲下面的自定義系列化須要反系列化對象的 tableMapEventByTableId 字段值,而此字段是私有的,因此經過反射拿 Field field = eventDeserializer.getClass().getDeclaredField("tableMapEventByTableId"); field.setAccessible(true); Map<Long, TableMapEventData> tableMapEventByTableId = (Map<Long, TableMapEventData>) field.get(eventDeserializer); // 自定義反系列化類 (讀寫更新) eventDeserializer.setEventDataDeserializer(EventType.EXT_WRITE_ROWS, new WriteRowsDeserializer(tableMapEventByTableId)); eventDeserializer.setEventDataDeserializer(EventType.EXT_UPDATE_ROWS, new UpdateRowsDeserializer(tableMapEventByTableId)); eventDeserializer.setEventDataDeserializer(EventType.EXT_DELETE_ROWS, new DeleteRowsDeserializer(tableMapEventByTableId)); BinaryLogClient logClient = new BinaryLogClient(mySQLConnection.getHost(), mySQLConnection.getPort(), mySQLConnection.getUserName(), mySQLConnection.getPassword()); logClient.setServerId(1); logClient.setEventDeserializer(eventDeserializer); // 監聽器 logClient.registerEventListener(listenerAllocate); /** * 此處可能出現異常處理,AuthenticationException 由於 MySQL 訪問密碼加密協議的不一樣問題 * 關於此問題做者的交流討論: https://github.com/shyiko/mysql-binlog-connector-java/issues/240 */ logClient.connect(); } }
具體的監聽器,
BinLog 的事件觸發順序
啓動服務時:
事件順序:ROTATE、FORMAT_DESCRIPTION
啓動後:(事件中有個共通的值,threadId,須要自行研究,反覆啓動,切換不一樣的庫不停的增刪改,修改結構等操做,此值均保持不變)
事件觸發順序:
每次操做只觸發一次(批量操做也只會觸發一次):ANONYMOUS_GTID,QUERY,(如果修改表結構操做,只觸發此處的兩個事件)
每條記錄row觸發一次獨立的事件:TABLE_MAP(攜帶表、庫、字段信息)、多選一:EXT_UPDATE_ROWS(執行更新),EXT_DELETE_ROWS,EXT_WRITE_ROWS
每次操做只觸發一次(批量操做也只會觸發一次)XID
/** * 負責分配 BinLog 事件到具體的處理類中的(管理者角色)*/ @Slf4j @Component public class ListenerAllocate extends AbsBinLogEvent { @Override protected void init() { } /** * 一次MySQL的修改、插入、刪除,會觸發屢次事件,會調用方法屢次,注意處理好邏輯優化性能 * 啓動時: * 事件順序:ROTATE、FORMAT_DESCRIPTION * </br> * 啓動後: * 事件觸發順序: * 每次操做只觸發一次(批量操做也只會觸發一次):ANONYMOUS_GTID,QUERY, * 每條記錄row 觸發一次獨立的事件:TABLE_MAP(攜帶表庫字段信息)、多選一:EXT_UPDATE_ROWS(執行更新),EXT_DELETE_ROWS,EXT_WRITE_ROWS * 每次操做只觸發一次(批量操做也只會觸發一次) XID * * @param event */ @Override public void onEvent(Event event) { EventType eventType = event.getHeader().getEventType(); // 只處理我想要處理的事件 if (eventType != EventType.QUERY && eventType != EventType.TABLE_MAP && eventType != EventType.EXT_UPDATE_ROWS && eventType != EventType.EXT_DELETE_ROWS && eventType != EventType.EXT_WRITE_ROWS) { return; } EventData eventData = event.getData(); /** * 此事件操做的是修改表結構,修改後須要將緩存中存儲的表結構刪除 * 解析得出修改表結構的 sql 語句進行解析 * sql='ALTER TABLE `database_name(數據庫的名稱)`.`person_information`\r\n後面是具體的執行語句 */ if (EventType.QUERY == eventType) { QueryEventData queryEventData = (QueryEventData) eventData; String sql = queryEventData.getSql(); // 增、刪、改的事件,不作處理 if (sql.startsWith(SearchConstants.EVENT_SQL_BEGIN)) { return; } // 修改表結構的事件 String tableInfo = sql.substring(12); // 注意表名後面有一個空格須要去除 tableInfo = StrUtil.subBefore(tableInfo, " \r\n", false); tableInfo = tableInfo.replace("`", ""); String[] tableArr = tableInfo.split("\\."); // 移除緩存 sqlSchemaColumnService.removeTableColumn(tableArr[0], tableArr[1]); return; } /** * 表結構映射事件 */ if (eventType == EventType.TABLE_MAP) { TableMapEventData mapEventData = (TableMapEventData) eventData; long tableId = mapEventData.getTableId(); // 獲取頭文件中存儲的 tableId <---> tableName 的映射 String tableFullName = tableFullNameMap.get(tableId); if (null != tableFullName) { // 此表已經不是第一次觸發該事件了,不須要重複的處理作準備的工做 return; } if (null == tableFullName) { String tableName = mapEventData.getTable(); // 此事件不是我要處理的數據庫表的日誌事件,不在個人事件實例中,不作處理 if (null == eventInstanceMap.get(tableName)) { return; } String database = mapEventData.getDatabase(); tableFullName = database + "." + tableName; tableFullNameMap.put(tableId, tableFullName); return; } } /** * 觸發的事件是具體操做的事件 */ if (eventType == EventType.EXT_UPDATE_ROWS || eventType == EventType.EXT_WRITE_ROWS || eventType == EventType.EXT_DELETE_ROWS) { try { // 經過反射獲取到事件對象數據的 tableId的值 Field tableIdField = eventData.getClass().getDeclaredField("tableId"); tableIdField.setAccessible(true); long tableId = (long) tableIdField.get(eventData); String tableFullName = tableFullNameMap.get(tableId); // 不是我想要監聽的數據庫表的的事件,不作處理 if (null == tableFullName) { return; } // 經過表名 得到具體是時間處理類對象 tableName <----> eventInstance String[] tableNameArr = tableFullName.split("\\."); AbsBinLogEvent eventInstance = eventInstanceMap.get(tableNameArr[1]); // 調用具體的事件邏輯處理 eventInstance.onEvent(event); } catch (NoSuchFieldException | IllegalAccessException e) { log.error("執行 BinLog 同步數據失敗,時間類型{},緣由:{},Msg:{}", eventType.toString(), e.getCause(), e.getMessage()); } } } }
補充:QUERY 事件的補充
修改表結構中觸發的 BinLog 事件以下:(sql='ALTER TABLE `behelpful`.`person_information,修改表事件, sql 信息會攜帶 修改的語句)
Event{ header=EventHeaderV4{ timestamp=1609379484000, eventType=QUERY, serverId=1, headerLength=19, dataLength=256, nextPosition=20948893, flags=0 }, data=QueryEventData{ threadId=305142, executionTime=1, errorCode=0, database='braineex', sql='ALTER TABLE `braineex`.`person_information` ------------>> 修改表結構事件 MODIFY COLUMN `nick_name` varchar(31) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '暱稱' AFTER `age`' } }
增、刪、改 觸發的 QUERY 中 sql = ' BEGIN ' ,數據增刪改 觸發的BinLog 事件中, sql 不會攜帶執行的 sql 語句,而是 爲 BEGIN
Event{ header=EventHeaderV4{ timestamp=1609380048000, eventType=QUERY, serverId=1, headerLength=19, dataLength=60, nextPosition=20949876, flags=8 }, data=QueryEventData{ threadId=305142, executionTime=0, errorCode=0, database='braineex', sql='BEGIN' } }
解析 BinLog 事件的具體類 (抽象類,以上匿名內部類的方式將其添加到具體處理),日誌解析獲得具體的 row -> Java 對象之後使用任何方式操做 同步都可
/** * 具體的 BinLog 事件增刪改事件處理的邏輯代碼和公共的變量管理存儲*/ public abstract class AbsBinLogEvent<T extends ISearchModel> implements BinaryLogClient.EventListener { /** * Map< tableId, MySQL表名> */ protected volatile static Map<Long, String> tableFullNameMap = new HashMap<>(1 << 3); /** * Map< tableId, MySQL表對應 Java 實體類的類對象 Class> */ protected volatile static Map<String, AbsBinLogEvent> eventInstanceMap = new HashMap<>(1 << 4); protected ISearchService<T> baseEsService; @Autowired protected ActionListener updateListener; @Autowired protected ActionListener indexListener; @Autowired protected ActionListener deleteListener; @Autowired protected SqlSchemaColumnService sqlSchemaColumnService; /** * 初始化須要處理的表的事件處理類 * 初始化變量,及注入 * * @throws BaseRuntimeException */ @PostConstruct protected abstract void init() throws BaseRuntimeException; @Override public void onEvent(Event event) { EventData eventData = event.getData(); /** * 修改操做 */ if (eventData instanceof UpdateRowsEventData) { T entity = this.updateEvent(eventData); // 同步 ES 的操做 T esEntity = baseEsService.selectById(entity.getEsId()); if (null == esEntity) { baseEsService.saveEntityAsy(entity, indexListener); } else { baseEsService.updateByIdAsy(entity, updateListener); } return; } /** * 寫入操做 */ if (eventData instanceof WriteRowsEventData) { T entity = this.saveEvent(eventData); // 寫入數據 baseEsService.saveEntityAsy(entity, indexListener); return; } /** * 刪除操做 */ if (eventData instanceof DeleteRowsEventData) { T entity = this.deleteEvent(eventData); // 刪除 ES 中數據 baseEsService.deleteByIdAsy(entity.getEsId(), deleteListener); return; } } protected T saveEvent(EventData eventData) { WriteRowsEventData writeRowsEventData = (WriteRowsEventData) eventData; List<Serializable[]> rows = writeRowsEventData.getRows(); T entity = this.rowsToEntity(rows, writeRowsEventData.getTableId()); return entity; } protected T updateEvent(EventData eventData) { UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData) eventData; // rows 每個 Entry 是條記錄,其中 Key 爲修改前的記錄,Value 爲修改後的新的記錄 List<Map.Entry<Serializable[], Serializable[]>> rows = updateRowsEventData.getRows(); // 獲取修改後的新的值 List<Serializable[]> newValues = rows.stream().map(entry -> entry.getValue()).collect(Collectors.toList()); // 將修改後的新的值轉成 Java 對象 T entity = this.rowsToEntity(newValues, updateRowsEventData.getTableId()); return entity; } protected T deleteEvent(EventData eventData) { DeleteRowsEventData deleteRowsEventData = (DeleteRowsEventData) eventData; List<Serializable[]> rows = deleteRowsEventData.getRows(); T entity = this.rowsToEntity(rows, deleteRowsEventData.getTableId()); return entity; } protected T rowsToEntity(List<Serializable[]> rows, Long tableId) { String tableFullName = tableFullNameMap.get(tableId); String[] tableNameArr = tableFullName.split("\\."); // 得到當前 row 的數據庫中對應的字段名稱 String[] columnNames = sqlSchemaColumnService.columnsByTable(tableNameArr[0], tableNameArr[1]); JSONObject beanJSON = new JSONObject(); for (Serializable[] row : rows) { for (int i = 0; i < row.length; i++) { beanJSON.put(columnNames[i], row[i]); } } T entity = this.jsonToBean(beanJSON); return entity; } protected T jsonToBean(JSONObject beanJSON) { T entity = JSON.toJavaObject(beanJSON, this.entityClass()); return entity; } /** * 獲取調用方法實現類中泛型的具體類對象 * * @return */ protected Class<T> entityClass() { // 當前調用方法的 Impl實現類的父類的類型 ParameterizedType superclass = (ParameterizedType) this.getClass().getGenericSuperclass(); // 當前調用方法的 Impl實現類的泛型的類型,實現類必須帶泛型,不然報錯 Type[] type = superclass.getActualTypeArguments(); Class clazz = (Class) type[0]; return clazz; } /** * 獲取實體類映射的數據庫表名稱 * * @param entityClazz * @return */ protected String entityTableName(Class<?> entityClazz) { boolean isAnno = entityClazz.isAnnotationPresent(TableName.class); if (isAnno) { TableName annotation = entityClazz.getAnnotation(TableName.class); // 表名 @TableName(value = "person_information") return annotation.value(); } throw new BaseRuntimeException("操做不容許", "ERROR"); } }
處理表結構的查詢、緩存的 Service 實現類,方案中使用了 MP(以上說所的方案 一):
/** * 查詢得到數據庫中表結構*/ @Service public class SqlSchemaColumnServiceImpl extends ServiceImpl<SqlSchemaColumnMapper, SqlSchemaColumn> implements SqlSchemaColumnService { @Autowired private IRedisUtil iRedisUtil; /** * 此方法不可類內部調用,不然使用AOP處理Jedis將不會關閉 * 保證字段的順序,故用RedisZSet 進行存儲 * * @param database * @param tableName * @return */ @ThreadJedis @Override public String[] columnsByTable(String database, String tableName) { String key = CacheEnum.SQL_SCHEMA.key + database + "." + tableName; Jedis jedis = iRedisUtil.threadJedis(); jedis.select(CacheEnum.SQL_SCHEMA.index); Boolean exists = jedis.exists(key); String[] columnNames; if (exists) { // 集合中全部成員數 Long total = jedis.zcard(key); columnNames = new String[total.intValue()]; for (int i = 0; i < total; i++) { Set<String> columns = jedis.zrange(key, i, i); String column = columns.iterator().next(); columnNames[i] = column; } } else { // 查詢數據表結構存入緩存 List<SqlSchemaColumn> schemaColumns = this.list(Wrappers.<SqlSchemaColumn>lambdaQuery() .eq(SqlSchemaColumn::getTableSchema, database) .eq(SqlSchemaColumn::getTableName, tableName) .orderByAsc(SqlSchemaColumn::getOrdinalPosition)); List<String> columns = schemaColumns.stream().map(SqlSchemaColumn::getColumnName).collect(Collectors.toList()); columnNames = new String[columns.size()]; for (int i = 0; i < columns.size(); i++) { columnNames[i] = columns.get(i); // 存入緩存 jedis.zadd(key, i + 1, columns.get(i)); } } return columnNames; } @ThreadJedis @Override public boolean removeTableColumn(String database, String tableName) { String key = CacheEnum.SQL_SCHEMA.key + database + "." + tableName; Jedis jedis = iRedisUtil.threadJedis(); jedis.select(CacheEnum.SQL_SCHEMA.index); Long del = jedis.del(key); return del > 0; } }
數據庫表結構對應的 實體類:
/** * MySQL 表機構 */ @Data @EqualsAndHashCode(callSuper = true) @TableName(value = "`information_schema`.`columns`") public class SqlSchemaColumn extends Model<SqlSchemaColumn> { private static final long serialVersionUID = 1L; /** * 數據庫名稱 */ private String tableSchema; /** * 數據表名 */ private String tableName; /** * 字段名 */ private String columnName; private String ordinalPosition; private String columnDefault; private String isNullable; private String dataType; private String characterMaximumLength; private String characterOctetLength; private String numericPrecision; private String numericScale; private String characterSetName; private String collationName; }
補充:
因爲 數據庫中我使用的是 Bit 類型記錄 Java 對象中 Boolean 類型,致使時間中 EvenData 中返回的 數據 Serializable[] rows 對應的字段爲 BitSet 類型,
須要自定義 凡系列化規則,官方文檔中有相應的解釋,怎樣自定義反系列化規則(僞代碼)
EventDeserializer eventDeserializer = new EventDeserializer(); // do not deserialize EXT_DELETE_ROWS event data, return it as a byte array eventDeserializer.setEventDataDeserializer(EventType.EXT_DELETE_ROWS, new ByteArrayEventDataDeserializer()); // skip EXT_WRITE_ROWS event data altogether eventDeserializer.setEventDataDeserializer(EventType.EXT_WRITE_ROWS, new NullEventDataDeserializer()); // use custom event data deserializer for EXT_DELETE_ROWS eventDeserializer.setEventDataDeserializer(EventType.EXT_DELETE_ROWS, new EventDataDeserializer() { ... }); BinaryLogClient client = ... client.setEventDeserializer(eventDeserializer);
經過源碼追蹤得出反系列化多數代碼在 AbstractRowsEventDataDeserializer 抽象類中
其有三個子類:
DeleteRowsEventDataDeserializer ( 做用於刪除事件 EventType.EXT_DELETE_ROWS)
UpdateRowsEventDataDeserializer ( 做用於更新時間 EventType.EXT_UPDATE_ROWS)
WriteRowsEventDataDeserializer ( 做用於刪除事件 EventType.EXT_WRITE_ROWS)
因爲我並不須要大面積的重寫 以上三個類的反系列化規則,我僅僅須要方反系列化規則 MySQL 中的 Bit 反系列化爲 Boolean 類型,
因此,我採用直接粗暴的繼承 以上三個類,
重寫 AbstractRowsEventDataDeserializer 中的 deserializeBit(int meta,ByteArrayInputStream inputStream) 便可
以 EXT_UPDATE_ROWS 事件爲例,其他兩個同理:
原方法:
protected Serializable deserializeBit(int meta, ByteArrayInputStream inputStream) throws IOException { int bitSetLength = (meta >> 8) * 8 + (meta & 0xFF); return inputStream.readBitSet(bitSetLength, false); }
重寫後,只須要細微的改動,返回值從 BitSet 替換爲 Boolean 便可
public class UpdateRowsDeserializer extends UpdateRowsEventDataDeserializer { public UpdateRowsDeserializer(Map<Long, TableMapEventData> tableMapEventByTableId) { super(tableMapEventByTableId); this.setMayContainExtraInformation(true); } /** * 自定義系列換中 數據庫 Bit 字段轉爲 Boolean 類型 * * @param meta * @param inputStream * @return * @throws IOException */ @Override protected Serializable deserializeBit(int meta, ByteArrayInputStream inputStream) throws IOException { int bitSetLength = (meta >> 8) * 8 + (meta & 0xFF); BitSet bitSet = inputStream.readBitSet(bitSetLength, false); int cardinality = bitSet.cardinality(); Boolean booleanValue = Boolean.valueOf(cardinality == 1); return booleanValue; } }
具體示例,可前往 本人的 碼雲 中查看: https://gitee.com/chxlay/be-helpful