MySQL同步NoSQL

此示例使用框架:mysql-binlog-connector-java  (https://github.com/shyiko/mysql-binlog-connector-java)java

配置文件(Nacos)mysql

spring:
  # MySQL 鏈接信息
  datasource:
    host: ${MYSQL-HOST:mysql-host}
    port: ${MYSQL-PORT:3306}
    username: ${MYSQL-USER:root}
    password: ${MYSQL-PWD:root}

加入依賴:git

<!-- MySQL binlog 日誌監聽 -->
<dependency>
   <groupId>com.github.shyiko</groupId>
   <artifactId>mysql-binlog-connector-java</artifactId>
   <version>${binlog.version}</version>
</dependency>

 

 

 


入門簡單案例講解:github

讀取 BinLog 的主體程序:spring

/**
 * 監聽MySQL binlog
 * CommandLineRunner SpringBoot啓動後執行的代碼(後置初始化)
 *
 * @author Alay
 * @date 2020-12-26 15:43
 */
@Component
public class BinLogRunner implements CommandLineRunner {
 
   @Value("${spring.datasource.host}")
   private String host;
 
   @Value("${spring.datasource.port}")
   private int port;
 
   @Value("${spring.datasource.username}")
   private String userName;
 
   @Value("${spring.datasource.password}")
   private String password;
 
 
   @Override
   public void run(String... args) throws Exception {
    // 客戶端鏈接創建
      BinaryLogClient logClient = new BinaryLogClient(host, port, userName, password);
      /*
      // 此配置爲系列化處理,
      EventDeserializer eventDeserializer = new EventDeserializer();
      eventDeserializer.setCompatibilityMode(
            EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
            EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
      );
      logClient.setEventDeserializer(eventDeserializer);
 
      */
      logClient.setServerId(1);
      logClient.registerEventListener(new BinLogEvent());
      // 此處可能出現異常處理,AuthenticationException 由於 MySQL 訪問密碼加密協議的不一樣問題
      logClient.connect(); 
   }
 
   /**
    * 查詢表結構
    * SELECT
    *      TABLE_SCHEMA,
    *      TABLE_NAME,
    *      COLUMN_NAME,
    *      ORDINAL_POSITION,
    *      COLUMN_DEFAULT,
    *      IS_NULLABLE,
    *      DATA_TYPE,
    *      CHARACTER_MAXIMUM_LENGTH,
    *      CHARACTER_OCTET_LENGTH,
    *      NUMERIC_PRECISION,
    *      NUMERIC_SCALE,
    *      CHARACTER_SET_NAME,
    *      COLLATION_NAME
    * FROM
    *      INFORMATION_SCHEMA.COLUMNS
    * WHERE
    *      TABLE_NAME = 'person_domain_user_stat'   #  表名
    *      AND TABLE_schema = 'braineex'            #  庫名
    */
   class BinLogEvent implements BinaryLogClient.EventListener {
 
      @Override
      public void onEvent(Event event) {
         EventType eventType = event.getHeader().getEventType();
         if (eventType == EventType.TABLE_MAP) {
            TableMapEventData tableData = event.getData();
            System.out.println("tableId:" + tableData.getTableId());
            System.out.println("庫名:" + tableData.getDatabase());
            System.out.println("表名:" + tableData.getTable());
            /**
             * 字段名集合,位置:在event對象中 Event -> EventData -> TableMapEventData ->TableMapEventMetadata
             * SHOW VARIABLES LIKE '%BINLOG%' binlog_row_metadata=FULL 的時候纔會 Binlog日誌纔會存在此值
             * SET GLOBAL binlog_row_metadata='FULL'
             */
            tableData.getEventMetadata().getColumnNames().forEach(System.out::println);
         }
         EventData eventData = event.getData();
         if (null != eventData) {
 
            if (eventData instanceof DeleteRowsEventData) {
               System.out.println("刪除操做");
               DeleteRowsEventData deleteRowsEventData = (DeleteRowsEventData) eventData;
            }
 
            if (eventData instanceof UpdateRowsEventData) {
               System.out.println("修改操做");
               UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData) eventData;
               List<Map.Entry<Serializable[], Serializable[]>> rows = updateRowsEventData.getRows();
               for (Map.Entry<Serializable[], Serializable[]> row : rows) {
                  /**
                   *  執行更行前 row 記錄的全部值的 Array
                   *  順序和 event.getHeader() 中的 columnNames 字段名順序一一對應
                   */
                  Serializable[] oldValues = row.getKey();
 
                  /**執行更行後 row 記錄的全部的值 Array
                   * 順序和 event.getHeader() 中的 columnNames 字段名順序一一對應
                   */
                  Serializable[] newValues = row.getValue();
               }
            }
            if (eventData instanceof WriteRowsEventData) {
               System.out.println("寫入操做");
               WriteRowsEventData writeRowsEventData = (WriteRowsEventData) eventData;
               System.out.println(writeRowsEventData.getTableId());
            }
         }
      }
   }
}

 

MySQL 加密協議問題異常處理GitHub上做者做者討論 ( https://github.com/shyiko/mysql-binlog-connector-java/issues/240 )sql

MySQL 中執行查詢語句:將MySQL 加密協議修改成 8.0 以前的:mysql_native_password,這僅僅是權宜之計,萬全之策等做者更新新的協議,感興趣的朋友能夠持續追蹤,做者後續會 對 8.0 以後的版本的加密協議進行兼容處理的,數據庫

暫時的權益之計:json

ALTER USER '訪問的用戶名如:root' @'%' IDENTIFIED WITH mysql_native_password BY '密碼如:root';

查看 MySQL 版本號:SELECT VERSION();  查看當前數據庫庫名:SELECT DATABASE()緩存

 


BinLog 的頭信息中沒有返回表的字段名,tableData.getEventMetadata().getColumnNames()app

若須要字段名方案有二:

一:自行前往數據庫中查詢(推薦)

SELECT `table_schema`,
       `table_name`,
       `column_name`,
       `ordinal_position`,
       `column_default`,
       `is_nullable`,
       `data_type`,
       `character_maximum_length`,
       `character_octet_length`,
       `numeric_precision`,
       `numeric_scale`,
       `character_set_name`,
       `collation_name`
FROM `information_schema`.`columns`
WHERE `table_schema` = 'behelpful'          # 庫名
  AND `table_name` = 'person_information'   # 表名
ORDER BY `ordinal_position`                 # 按表設計結構的順序排序,從數字 1 開始

查詢全部的

SELECT `table_schema`,    #庫名
       `table_name`,      # 表名
       `engine`,          # 引擎
       `table_comment`,   # 表註釋
       `table_collation`, # 表字符集及排序規則
       `create_time`      # 建表時間 FROM `information_schema`.`tables`
WHERE `table_schema` = (
    SELECT DATABASE()   # 當前所在的庫 )
ORDER BY `create_time` DESC

JDBC 查詢,或者,經過以上SQL 語句字段自建一個對象,經過 Mybatis 方式查詢(僞代碼)

Connection connection = ...
DatabaseMetaData metaData = connection.getMetaData();
ResultSet tableResultSet = metaData.getTables(null, "public", null, new String[]{"TABLE"});
try {
   while (tableResultSet.next()) {
      String tableName = tableResultSet.getString("TABLE_NAME");
      ResultSet columnResultSet = metaData.getColumns(null, "public", tableName, null);
      try {
         while (columnResultSet.next()) {
            String columnName = columnResultSet.getString("COLUMN_NAME");
              ...
         }
      } finally {
         columnResultSet.close();
      }
   }
} finally {
   tableResultSet.close();
}

方案二(實際項目中不推薦使用,測試時使用此方法,方便):

設置 SET GLOBAL binlog_row_metadata='FULL'  (不推薦,由於一般咱們並不須要全局,只是須要指定的個別數據庫表),因此推薦使用查詢,而後存儲緩存

默認值:MINIMAL

SET GLOBAL binlog_row_metadata='FULL';
 
SHOW VARIABLES LIKE '%BINLOG%';

 

 經過以上代碼,根據 BinLog 的到的信息

頭信息事件中獲得: -> 表,及字段名,每次事件前必先觸發一次頭事件

其餘事件,根據具體的業務需求進行封裝成爲具體的 Java 對象,而後使用不少方案能夠實現 NoSQL 的數據同步,如使用 Kafka ,ActiveMQ,等均可以實現


整合到項目中:碼雲中有具體示例: https://gitee.com/chxlay/be-helpful    behelpful-search   模塊

@Data
@ConfigurationProperties(prefix = "spring.datasource")
public class MySqlConnection {

    private String host;
    private int port;
    private String userName;
    private String password;
}

BinLog 監聽器:

logClient.connect()執行時間太長,可是不影響正常的 Log 事件監聽,因此此處使用異步處理 @Async 、@Order,避免耽擱 SpringBoot 啓動時間

/**
 * 監聽MySQL binlog
 * CommandLineRunner SpringBoot啓動後執行的代碼(後置初始化)*/
@Order
@Component
@AllArgsConstructor
@EnableConfigurationProperties(value = MySqlConnection.class)
public class BinLogRunner implements CommandLineRunner {
    /**
     * 監聽器(這裏只註冊了一個監聽器,而後在其中進行邏輯分發事件處理)
     */
    private final ListenerAllocate listenerAllocate;
    private final MySqlConnection mySQLConnection;

    @Async
    @Order
    @Override
    public void run(String... args) throws Exception {
        EventDeserializer eventDeserializer = new EventDeserializer();

        // 因爲下面的自定義系列化須要反系列化對象的 tableMapEventByTableId 字段值,而此字段是私有的,因此經過反射拿
        Field field = eventDeserializer.getClass().getDeclaredField("tableMapEventByTableId");
        field.setAccessible(true);
        Map<Long, TableMapEventData> tableMapEventByTableId = (Map<Long, TableMapEventData>) field.get(eventDeserializer);

        // 自定義反系列化類 (讀寫更新)
        eventDeserializer.setEventDataDeserializer(EventType.EXT_WRITE_ROWS, new WriteRowsDeserializer(tableMapEventByTableId));
        eventDeserializer.setEventDataDeserializer(EventType.EXT_UPDATE_ROWS, new UpdateRowsDeserializer(tableMapEventByTableId));
        eventDeserializer.setEventDataDeserializer(EventType.EXT_DELETE_ROWS, new DeleteRowsDeserializer(tableMapEventByTableId));

        BinaryLogClient logClient = new BinaryLogClient(mySQLConnection.getHost(), mySQLConnection.getPort(),
                mySQLConnection.getUserName(), mySQLConnection.getPassword());
        logClient.setServerId(1);
        logClient.setEventDeserializer(eventDeserializer);
        // 監聽器
        logClient.registerEventListener(listenerAllocate);
        /**
         * 此處可能出現異常處理,AuthenticationException 由於 MySQL 訪問密碼加密協議的不一樣問題
         * 關於此問題做者的交流討論: https://github.com/shyiko/mysql-binlog-connector-java/issues/240
         */
        logClient.connect();
    }
}

具體的監聽器,

BinLog 的事件觸發順序

啓動服務時:

事件順序:ROTATE、FORMAT_DESCRIPTION 

啓動後:(事件中有個共通的值,threadId,須要自行研究,反覆啓動,切換不一樣的庫不停的增刪改,修改結構等操做,此值均保持不變)

事件觸發順序:

每次操做只觸發一次(批量操做也只會觸發一次):ANONYMOUS_GTID,QUERY,(如果修改表結構操做,只觸發此處的兩個事件)

每條記錄row觸發一次獨立的事件:TABLE_MAP(攜帶表、庫、字段信息)、多選一:EXT_UPDATE_ROWS(執行更新),EXT_DELETE_ROWS,EXT_WRITE_ROWS

每次操做只觸發一次(批量操做也只會觸發一次)XID

/**
 * 負責分配 BinLog 事件到具體的處理類中的(管理者角色)*/
@Slf4j
@Component
public class ListenerAllocate extends AbsBinLogEvent {

    @Override
    protected void init() {
    }

    /**
     * 一次MySQL的修改、插入、刪除,會觸發屢次事件,會調用方法屢次,注意處理好邏輯優化性能
     * 啓動時:
     * 事件順序:ROTATE、FORMAT_DESCRIPTION
     * </br>
     * 啓動後:
     * 事件觸發順序:
     * 每次操做只觸發一次(批量操做也只會觸發一次):ANONYMOUS_GTID,QUERY,
     * 每條記錄row 觸發一次獨立的事件:TABLE_MAP(攜帶表庫字段信息)、多選一:EXT_UPDATE_ROWS(執行更新),EXT_DELETE_ROWS,EXT_WRITE_ROWS
     * 每次操做只觸發一次(批量操做也只會觸發一次) XID
     *
     * @param event
     */
    @Override
    public void onEvent(Event event) {
        EventType eventType = event.getHeader().getEventType();
        // 只處理我想要處理的事件
        if (eventType != EventType.QUERY && eventType != EventType.TABLE_MAP && eventType != EventType.EXT_UPDATE_ROWS && eventType != EventType.EXT_DELETE_ROWS && eventType != EventType.EXT_WRITE_ROWS) {
            return;
        }
        EventData eventData = event.getData();

        /**
         * 此事件操做的是修改表結構,修改後須要將緩存中存儲的表結構刪除
         *     解析得出修改表結構的 sql 語句進行解析
         * sql='ALTER TABLE `database_name(數據庫的名稱)`.`person_information`\r\n後面是具體的執行語句
         */
        if (EventType.QUERY == eventType) {
            QueryEventData queryEventData = (QueryEventData) eventData;
            String sql = queryEventData.getSql();
            // 增、刪、改的事件,不作處理
            if (sql.startsWith(SearchConstants.EVENT_SQL_BEGIN)) {
                return;
            }
            // 修改表結構的事件
            String tableInfo = sql.substring(12);
            // 注意表名後面有一個空格須要去除
            tableInfo = StrUtil.subBefore(tableInfo, " \r\n", false);
            tableInfo = tableInfo.replace("`", "");
            String[] tableArr = tableInfo.split("\\.");
            // 移除緩存
            sqlSchemaColumnService.removeTableColumn(tableArr[0], tableArr[1]);
            return;
        }

        /**
         * 表結構映射事件
         */
        if (eventType == EventType.TABLE_MAP) {
            TableMapEventData mapEventData = (TableMapEventData) eventData;
            long tableId = mapEventData.getTableId();
            // 獲取頭文件中存儲的 tableId  <---> tableName 的映射
            String tableFullName = tableFullNameMap.get(tableId);

            if (null != tableFullName) {
                // 此表已經不是第一次觸發該事件了,不須要重複的處理作準備的工做
                return;
            }
            if (null == tableFullName) {
                String tableName = mapEventData.getTable();
                // 此事件不是我要處理的數據庫表的日誌事件,不在個人事件實例中,不作處理
                if (null == eventInstanceMap.get(tableName)) {
                    return;
                }
                String database = mapEventData.getDatabase();
                tableFullName = database + "." + tableName;
                tableFullNameMap.put(tableId, tableFullName);
                return;
            }
        }

        /**
         * 觸發的事件是具體操做的事件
         */
        if (eventType == EventType.EXT_UPDATE_ROWS || eventType == EventType.EXT_WRITE_ROWS || eventType == EventType.EXT_DELETE_ROWS) {
            try {
                // 經過反射獲取到事件對象數據的 tableId的值
                Field tableIdField = eventData.getClass().getDeclaredField("tableId");
                tableIdField.setAccessible(true);
                long tableId = (long) tableIdField.get(eventData);

                String tableFullName = tableFullNameMap.get(tableId);
                // 不是我想要監聽的數據庫表的的事件,不作處理
                if (null == tableFullName) {
                    return;
                }
                // 經過表名 得到具體是時間處理類對象 tableName <----> eventInstance
                String[] tableNameArr = tableFullName.split("\\.");
                AbsBinLogEvent eventInstance = eventInstanceMap.get(tableNameArr[1]);
                // 調用具體的事件邏輯處理
                eventInstance.onEvent(event);
            } catch (NoSuchFieldException | IllegalAccessException e) {
                log.error("執行 BinLog 同步數據失敗,時間類型{},緣由:{},Msg:{}", eventType.toString(), e.getCause(), e.getMessage());
            }
        }
    }
}

補充:QUERY 事件的補充

修改表結構中觸發的 BinLog 事件以下:(sql='ALTER TABLE `behelpful`.`person_information,修改表事件, sql 信息會攜帶 修改的語句)

Event{
  header=EventHeaderV4{
     timestamp=1609379484000, 
     eventType=QUERY, 
     serverId=1, 
     headerLength=19, 
     dataLength=256, 
     nextPosition=20948893, 
     flags=0
  }, 
  data=QueryEventData{
     threadId=305142, 
     executionTime=1, 
     errorCode=0, 
     database='braineex', 
     sql='ALTER TABLE `braineex`.`person_information`    ------------>> 修改表結構事件
     MODIFY COLUMN 
     `nick_name` varchar(31) 
     CHARACTER SET utf8mb4 
     COLLATE utf8mb4_general_ci NULL 
     DEFAULT NULL 
     COMMENT '暱稱' 
     AFTER `age`'
  }
}

增、刪、改 觸發的 QUERY 中 sql = ' BEGIN '  ,數據增刪改 觸發的BinLog 事件中, sql 不會攜帶執行的 sql 語句,而是 爲  BEGIN

Event{
  header=EventHeaderV4{
    timestamp=1609380048000, 
    eventType=QUERY, 
    serverId=1, 
    headerLength=19, 
    dataLength=60, 
    nextPosition=20949876, 
    flags=8
 }, 
 data=QueryEventData{
    threadId=305142, 
    executionTime=0, 
    errorCode=0, 
    database='braineex', 
    sql='BEGIN'
  }
}

解析 BinLog 事件的具體類 (抽象類,以上匿名內部類的方式將其添加到具體處理),日誌解析獲得具體的 row  ->  Java 對象之後使用任何方式操做 同步都可

/**
 * 具體的 BinLog 事件增刪改事件處理的邏輯代碼和公共的變量管理存儲*/
public abstract class AbsBinLogEvent<T extends ISearchModel> implements BinaryLogClient.EventListener {

    /**
     * Map< tableId, MySQL表名>
     */
    protected volatile static Map<Long, String> tableFullNameMap = new HashMap<>(1 << 3);
    /**
     * Map< tableId, MySQL表對應 Java 實體類的類對象 Class>
     */
    protected volatile static Map<String, AbsBinLogEvent> eventInstanceMap = new HashMap<>(1 << 4);

    protected ISearchService<T> baseEsService;

    @Autowired
    protected ActionListener updateListener;
    @Autowired
    protected ActionListener indexListener;
    @Autowired
    protected ActionListener deleteListener;

    @Autowired
    protected SqlSchemaColumnService sqlSchemaColumnService;

    /**
     * 初始化須要處理的表的事件處理類
     * 初始化變量,及注入
     *
     * @throws BaseRuntimeException
     */
    @PostConstruct
    protected abstract void init() throws BaseRuntimeException;

    @Override
    public void onEvent(Event event) {
        EventData eventData = event.getData();
        /**
         * 修改操做
         */
        if (eventData instanceof UpdateRowsEventData) {
            T entity = this.updateEvent(eventData);
            // 同步 ES 的操做
            T esEntity = baseEsService.selectById(entity.getEsId());
            if (null == esEntity) {
                baseEsService.saveEntityAsy(entity, indexListener);
            } else {
                baseEsService.updateByIdAsy(entity, updateListener);
            }
            return;
        }

        /**
         * 寫入操做
         */
        if (eventData instanceof WriteRowsEventData) {
            T entity = this.saveEvent(eventData);
            // 寫入數據
            baseEsService.saveEntityAsy(entity, indexListener);
            return;
        }
        /**
         * 刪除操做
         */
        if (eventData instanceof DeleteRowsEventData) {
            T entity = this.deleteEvent(eventData);
            // 刪除 ES 中數據
            baseEsService.deleteByIdAsy(entity.getEsId(), deleteListener);
            return;
        }
    }

    protected T saveEvent(EventData eventData) {
        WriteRowsEventData writeRowsEventData = (WriteRowsEventData) eventData;
        List<Serializable[]> rows = writeRowsEventData.getRows();
        T entity = this.rowsToEntity(rows, writeRowsEventData.getTableId());
        return entity;
    }

    protected T updateEvent(EventData eventData) {
        UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData) eventData;
        // rows 每個 Entry 是條記錄,其中 Key 爲修改前的記錄,Value 爲修改後的新的記錄
        List<Map.Entry<Serializable[], Serializable[]>> rows = updateRowsEventData.getRows();
        // 獲取修改後的新的值
        List<Serializable[]> newValues = rows.stream().map(entry -> entry.getValue()).collect(Collectors.toList());
        // 將修改後的新的值轉成 Java 對象
        T entity = this.rowsToEntity(newValues, updateRowsEventData.getTableId());
        return entity;
    }

    protected T deleteEvent(EventData eventData) {
        DeleteRowsEventData deleteRowsEventData = (DeleteRowsEventData) eventData;
        List<Serializable[]> rows = deleteRowsEventData.getRows();
        T entity = this.rowsToEntity(rows, deleteRowsEventData.getTableId());
        return entity;
    }


    protected T rowsToEntity(List<Serializable[]> rows, Long tableId) {
        String tableFullName = tableFullNameMap.get(tableId);
        String[] tableNameArr = tableFullName.split("\\.");

        // 得到當前 row 的數據庫中對應的字段名稱
        String[] columnNames = sqlSchemaColumnService.columnsByTable(tableNameArr[0], tableNameArr[1]);
        JSONObject beanJSON = new JSONObject();
        for (Serializable[] row : rows) {
            for (int i = 0; i < row.length; i++) {
                beanJSON.put(columnNames[i], row[i]);
            }
        }
        T entity = this.jsonToBean(beanJSON);
        return entity;
    }

    protected T jsonToBean(JSONObject beanJSON) {
        T entity = JSON.toJavaObject(beanJSON, this.entityClass());
        return entity;
    }


    /**
     * 獲取調用方法實現類中泛型的具體類對象
     *
     * @return
     */
    protected Class<T> entityClass() {
        // 當前調用方法的 Impl實現類的父類的類型
        ParameterizedType superclass = (ParameterizedType) this.getClass().getGenericSuperclass();
        // 當前調用方法的 Impl實現類的泛型的類型,實現類必須帶泛型,不然報錯
        Type[] type = superclass.getActualTypeArguments();
        Class clazz = (Class) type[0];
        return clazz;
    }

    /**
     * 獲取實體類映射的數據庫表名稱
     *
     * @param entityClazz
     * @return
     */
    protected String entityTableName(Class<?> entityClazz) {
        boolean isAnno = entityClazz.isAnnotationPresent(TableName.class);
        if (isAnno) {
            TableName annotation = entityClazz.getAnnotation(TableName.class);
            // 表名 @TableName(value = "person_information")
            return annotation.value();
        }
        throw new BaseRuntimeException("操做不容許", "ERROR");
    }
}

 

處理表結構的查詢、緩存的 Service 實現類,方案中使用了 MP(以上說所的方案 一):

/**
 * 查詢得到數據庫中表結構*/
@Service
public class SqlSchemaColumnServiceImpl extends ServiceImpl<SqlSchemaColumnMapper, SqlSchemaColumn> implements SqlSchemaColumnService {

   @Autowired
   private IRedisUtil iRedisUtil;

   /**
    * 此方法不可類內部調用,不然使用AOP處理Jedis將不會關閉
    * 保證字段的順序,故用RedisZSet 進行存儲
    *
    * @param database
    * @param tableName
    * @return
    */
   @ThreadJedis
   @Override
   public String[] columnsByTable(String database, String tableName) {
      String key = CacheEnum.SQL_SCHEMA.key + database + "." + tableName;
      Jedis jedis = iRedisUtil.threadJedis();
      jedis.select(CacheEnum.SQL_SCHEMA.index);
      Boolean exists = jedis.exists(key);
      String[] columnNames;
      if (exists) {
         // 集合中全部成員數
         Long total = jedis.zcard(key);
         columnNames = new String[total.intValue()];
         for (int i = 0; i < total; i++) {
            Set<String> columns = jedis.zrange(key, i, i);
            String column = columns.iterator().next();
            columnNames[i] = column;
         }
      } else {
         // 查詢數據表結構存入緩存
         List<SqlSchemaColumn> schemaColumns = this.list(Wrappers.<SqlSchemaColumn>lambdaQuery()
               .eq(SqlSchemaColumn::getTableSchema, database)
               .eq(SqlSchemaColumn::getTableName, tableName)
               .orderByAsc(SqlSchemaColumn::getOrdinalPosition));
         List<String> columns = schemaColumns.stream().map(SqlSchemaColumn::getColumnName).collect(Collectors.toList());
         columnNames = new String[columns.size()];
         for (int i = 0; i < columns.size(); i++) {
            columnNames[i] = columns.get(i);
            // 存入緩存
            jedis.zadd(key, i + 1, columns.get(i));
         }
      }
      return columnNames;
   }

   @ThreadJedis
   @Override
   public boolean removeTableColumn(String database, String tableName) {
      String key = CacheEnum.SQL_SCHEMA.key + database + "." + tableName;
      Jedis jedis = iRedisUtil.threadJedis();
      jedis.select(CacheEnum.SQL_SCHEMA.index);
      Long del = jedis.del(key);
      return del > 0;
   }
}

數據庫表結構對應的 實體類:

/**
 * MySQL 表機構
 */
@Data
@EqualsAndHashCode(callSuper = true)
@TableName(value = "`information_schema`.`columns`")
public class SqlSchemaColumn extends Model<SqlSchemaColumn> {
   private static final long serialVersionUID = 1L;
   /**
    * 數據庫名稱
    */
   private String tableSchema;
   /**
    * 數據表名
    */
   private String tableName;
   /**
    * 字段名
    */
   private String columnName;
   private String ordinalPosition;
   private String columnDefault;
   private String isNullable;
   private String dataType;
   private String characterMaximumLength;
   private String characterOctetLength;
   private String numericPrecision;
   private String numericScale;
   private String characterSetName;
   private String collationName;
}

補充:

因爲 數據庫中我使用的是 Bit 類型記錄 Java 對象中 Boolean 類型,致使時間中 EvenData 中返回的 數據 Serializable[] rows 對應的字段爲 BitSet 類型,

須要自定義 凡系列化規則,官方文檔中有相應的解釋,怎樣自定義反系列化規則(僞代碼)

EventDeserializer eventDeserializer = new EventDeserializer();
 
// do not deserialize EXT_DELETE_ROWS event data, return it as a byte array
eventDeserializer.setEventDataDeserializer(EventType.EXT_DELETE_ROWS,
      new ByteArrayEventDataDeserializer());
 
// skip EXT_WRITE_ROWS event data altogether
eventDeserializer.setEventDataDeserializer(EventType.EXT_WRITE_ROWS,
      new NullEventDataDeserializer());
 
// use custom event data deserializer for EXT_DELETE_ROWS
eventDeserializer.setEventDataDeserializer(EventType.EXT_DELETE_ROWS,
      new EventDataDeserializer() {
      ...
      });
 
BinaryLogClient client = ...
client.setEventDeserializer(eventDeserializer);

經過源碼追蹤得出反系列化多數代碼在 AbstractRowsEventDataDeserializer 抽象類中

 其有三個子類:

DeleteRowsEventDataDeserializer ( 做用於刪除事件 EventType.EXT_DELETE_ROWS)
UpdateRowsEventDataDeserializer ( 做用於更新時間 EventType.EXT_UPDATE_ROWS)
WriteRowsEventDataDeserializer  ( 做用於刪除事件 EventType.EXT_WRITE_ROWS

因爲我並不須要大面積的重寫 以上三個類的反系列化規則,我僅僅須要方反系列化規則 MySQL 中的 Bit 反系列化爲 Boolean 類型,

因此,我採用直接粗暴的繼承 以上三個類,

重寫  AbstractRowsEventDataDeserializer 中的 deserializeBit(int meta,ByteArrayInputStream inputStream) 便可

 

 

 

EXT_UPDATE_ROWS  事件爲例,其他兩個同理:

原方法:

protected Serializable deserializeBit(int meta, ByteArrayInputStream inputStream) throws IOException {
    int bitSetLength = (meta >> 8) * 8 + (meta & 0xFF);
    return inputStream.readBitSet(bitSetLength, false);
}

重寫後,只須要細微的改動,返回值從 BitSet 替換爲 Boolean 便可

public class UpdateRowsDeserializer extends UpdateRowsEventDataDeserializer {

   public UpdateRowsDeserializer(Map<Long, TableMapEventData> tableMapEventByTableId) {
      super(tableMapEventByTableId);
      this.setMayContainExtraInformation(true);
   }


   /**
    * 自定義系列換中 數據庫 Bit 字段轉爲 Boolean 類型
    *
    * @param meta
    * @param inputStream
    * @return
    * @throws IOException
    */
   @Override
   protected Serializable deserializeBit(int meta, ByteArrayInputStream inputStream) throws IOException {
      int bitSetLength = (meta >> 8) * 8 + (meta & 0xFF);
      BitSet bitSet = inputStream.readBitSet(bitSetLength, false);
      int cardinality = bitSet.cardinality();
      Boolean booleanValue = Boolean.valueOf(cardinality == 1);
      return booleanValue;
   }
}

 

具體示例,可前往 本人的 碼雲 中查看:  https://gitee.com/chxlay/be-helpful

相關文章
相關標籤/搜索