canal 基於Mysql數據庫增量日誌解析

canal 基於Mysql數據庫增量日誌解析

 1.前言

 最近太多事情 工做的事情,以及終身大事等等 耽誤更新,因爲最近作項目須要同步監聽 將來電視 mysql的變動了解到公司會用canal作增量監聽,就嘗試使用了一下 這裏作個demo 簡單的記錄一下。java

 2.canal簡介

 canal:主要用途是基於 MySQL 數據庫增量日誌解析,提供增量數據訂閱和消費的中間件
 當前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.xmysql

Xnip20200117_145806.png

 3.MySQL 注備複製原理

Xnip20200117_150056.png

  3.1 mysql主備複製工做原理

  1.MySQL master 將數據變動寫入二進制日誌( binary log, 其中記錄叫作二進制日誌事件binary log events,能夠經過 show binlog events 進行查看)
  2.MySQL slave 將 master 的 binary log events 拷貝到它的中繼日誌(relay log)
  3.MySQL slave 重放 relay log 中事件,將數據變動反映它本身的數據git

  3.2 canal 工做原理

  1.canal 模擬 MySQL slave 的交互協議,假裝本身爲 MySQL slave ,向 MySQL master 發送dump 協議
  2.MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal )
  3.canal 解析 binary log 對象(原始爲 byte 流)github

 4.準備

 對於自建MySQL ,須要先開啓 Binlog寫入功能,而且配置binlog-format 爲Row模式 在my.cnf中配置spring

Xnip20200117_151803.png

 受權 canal 連接 MySQL 帳號具備做爲 MySQL slave 的權限, 若是已有帳戶可直接 grantsql

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

 5.canal 下載安裝配置

  5.1 canal下載

  canal 下載地址 (下載速度可能很慢)數據庫

  下載 canal.deployer-xxx.tar.gz 如 canal.deployer-1.1.4.tar.gzvim

  解壓後 能夠看到以下結構
Xnip20200117_150905.pngapp

  5.2 canal 初始配置

  配置修改:maven

vim conf/example/instance.properties

  以下:

#################################################
## mysql serverId
canal.instance.mysql.slaveId = 2020

# position info 修改本身的數據庫(canal要監聽的數據庫 地址 )
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 

#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 

# username/password 修改爲本身 數據庫信息的帳號 (單獨開一個 準備階段建立的帳號)
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8

# table regex  表的監聽規則 
# canal.instance.filter.regex = blogs\.blog_info  
canal.instance.filter.regex = .\*\\\\..\*
# table black regex
canal.instance.filter.black.regex =

  啓動canal

sh bin/startup.sh

  查看server日誌
  看到 the canal server is running now 表示啓動成功

vi logs/canal/canal.log


2020-01-08 15:25:33.361 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ##    start the canal server.
2020-01-08 15:25:33.468 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.12.245:11111]
2020-01-08 15:25:34.061 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

  查看instance的日誌

vi logs/example/example.log

2020-01-08 15:25:33.864 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-blogs 
2020-01-08 15:25:33.998 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....
2020-01-08 15:25:33.999 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - prepare to find start position just show master status

  5.3 擴展 destination 配置

vi conf/canal.properties

  在canal.destinations 處能夠配置當前server上部署的instance 列表 默認爲 example ,我這裏改爲了 blogs最好對應數據庫名稱。一個instance 對應一個 數據庫

Xnip20200117_153243.png

Xnip20200117_153644.png

 6.建立Java 客戶端 監聽canal 消費數據

  6.1 建立maven項目

  6.2 添加canal client POM 依賴

<dependency>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.client</artifactId>
  <version>1.1.0</version>
</dependency>

  6.3 建立 canal 的客戶端監聽

  CanalMessageListener.java

  該類實現InitializingBean 主要是在初始化的時候 執行 init 方法,在init()方法中 建立 CanalConnector對象,鏈接須要監聽的canal,主要提供 canal的 host ,port ,destination ,以及username 和 password

  parse 方法 主要用於將監聽的對象 經過反射等轉換成對應的實體類

/**
* @author johnny
**/
@Component
@Slf4j
@ConditionalOnProperty(name = "application.canal.accessor", havingValue = "canal")
public class CanalMessageListener implements InitializingBean, ParseCanal {


private CanalConnector connector;

@Autowired
private CanalConfig canalConfig;

@Autowired
private IParseDispatcher configParseDispatcher;

private void init() {
    //建立canal 監聽 傳入host port destination等參數
    connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalConfig.getHost(), canalConfig.getPort()),
            canalConfig.getDestination(), canalConfig.getUsername(), canalConfig.getPassword());
    connector.connect();
    //  .*\..*
    connector.subscribe(".*\\..*");
    connector.rollback();

    new Thread(() -> {
    
        while (true) {
            Message message = connector.getWithoutAck(canalConfig.getBatchSize());
            long batchId = message.getId();
            long size = message.getEntries().size();
        //batchId == -1 表示沒有數據變動
            if (batchId == -1 || size == 0) {
                System.out.println("empty data ");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
            } else {
            //解析數據變動
                resoleveEntry(message.getEntries());
            }
        }

    }).start();

}
//解析數據變動
private void resoleveEntry(List<CanalEntry.Entry> entries) {
    CanalEntry.RowChange rowChange = null;
    for (CanalEntry.Entry row : entries) {
     //判斷是不是 事物開始 和 事物結束 
        if (row.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || row.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
            continue;
        }
        try {
            rowChange = CanalEntry.RowChange.parseFrom(row.getStoreValue());
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }

        List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
        String tableName = row.getHeader().getTableName();
        CanalEntry.EventType eventType = row.getHeader().getEventType();

        for (CanalEntry.RowData rowData : rowDataList) {
            if (eventType == CanalEntry.EventType.UPDATE) {
                List<CanalEntry.Column> columns = rowData.getBeforeColumnsList();
                Object object = parse(columns, tableName);
                log.info("收到的 object:{}", JsonUtils.marshalToString(object));
                //根據收到的對象 處理後續業務邏輯
            }
        }

    }
}

@Override
public void afterPropertiesSet() throws Exception {
    init();
}

//解析 List<CanalEntry.Column>對象到對應的 實體類
@Override
public Object parse(List<CanalEntry.Column> canalDatas, String tableName) {
//根據配置好的map 從中根據key 表名 獲取對應的映射後的 實體類class
    String className = configParseDispatcher.dispatch(tableName);
    Object entity = null;
    Class c = null;
    try {
        c = Class.forName(className);
        entity = c.newInstance();
    } catch (ClassNotFoundException e) {
        log.error("【未找到對應 {} 的 實體類 】", className);
    } catch (Exception e) {
    }

    for (CanalEntry.Column canalDataColumn : canalDatas) {
        String columnName = canalDataColumn.getName();
        Field[] fields = c.getDeclaredFields();

        for (Field field : fields) {
            Object fieldValue = null;
            field.setAccessible(true);
            String fiedName = CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, field.getName());
            log.info("【filedName: {}】", fiedName);
            if (fiedName.equals(columnName)) {
                try {
                    if (Long.class.equals(field.getType())) {
                        fieldValue = NumberUtils.toLong(canalDataColumn.getValue());
                    }else if(Integer.class.equals(field.getType())){
                        fieldValue = NumberUtils.toInt(canalDataColumn.getValue());
                    }else if(Double.class.equals(field.getType())){
                        fieldValue = NumberUtils.toDouble(canalDataColumn.getValue());
                    }else if(Date.class.equals(field.getType())){
                        try {
                            fieldValue = DateUtils.parseDate(canalDataColumn.getValue(), new String[]{"yyyy-MM-dd HH:mm:ss"});
                        } catch (ParseException e) {
                            e.printStackTrace();
                        }
                    }else{
                        fieldValue = canalDataColumn.getValue();
                    }
                    field.set(entity, fieldValue);
                    break;
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                }
            }
        }

    }
    return entity;
}
}

  application.yml
  配置canal 地址,以及表名和實體的映射規則

server:
port: 8881



application:
  canal:
    accessor: canal
    host: 127.0.0.1
    port: 11111
    username:
    password:
    destination: blogs
    batchSize: 30

    parse:   規則,根據表名獲取對應要映射的 實體class
      rule:
        mapping:
          blog_info: com.johnny.canal.canal_test.entity.BlogInfo

  IParseDispatcher.java
  接口:用來根據表名key獲取對應的 要映射的實體,這裏寫成接口是由於能夠提供多種獲取方式,好比我這裏經過yml 配置去獲取

/**
* @author johnny
* @create 2020-01-17 上午11:09
**/
public interface IParseDispatcher {

 String dispatch(String key);

}

  ConfigParseDispatcher.java
  實現上面的接口,提供一種從 application.yml 獲取初始源配置 根據 application.canal.parse.rule進行配置

/**
* @author johnny
* @create 2020-01-17 上午11:07
**/
@Data
@Configuration
@ConfigurationProperties(prefix = "application.canal.parse.rule")
public class ConfigParseDispatcher implements IParseDispatcher {

private Map<String,String> mapping=new HashMap<>();

@Override
public String dispatch(String key) {
    return mapping.get(key);
}

}

  7.演示

  啓動項目 此時控制檯打印 empty data ,無數據變動

Xnip20200117_160125.png

  經過執行 在 canal監聽的mysql 上執行 更新語句

update blog_info set blog_title = 'SpringBoot配置相關for canal test '  where id = 40

  debug 程序,當執行上面的update語句後 能夠看到當即收到
Xnip20200117_160552.png

  經過parse方法解析爲對應的 實體對象,後續作本身的業務邏輯 便可

Xnip20200117_160718.png

 8.總結

本篇主要介紹了canal是什麼,如何下載安裝和配置 ,以及提供了本身寫的一個簡單demo 。後續有機會深刻了解一下canal的其餘功能,好比 如何同步到Kafka/RocketMQ等等。。

我的博客地址: https://www.askajohnny.com 歡迎訪問!
本文由博客一文多發平臺 OpenWrite 發佈!

相關文章
相關標籤/搜索