最近太多事情 工做的事情,以及終身大事等等 耽誤更新,因爲最近作項目須要同步監聽 將來電視 mysql的變動了解到公司會用canal作增量監聽,就嘗試使用了一下 這裏作個demo 簡單的記錄一下。java
canal:主要用途是基於 MySQL 數據庫增量日誌解析,提供增量數據訂閱和消費的中間件
當前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.xmysql
1.MySQL master 將數據變動寫入二進制日誌( binary log, 其中記錄叫作二進制日誌事件binary log events,能夠經過 show binlog events 進行查看)
2.MySQL slave 將 master 的 binary log events 拷貝到它的中繼日誌(relay log)
3.MySQL slave 重放 relay log 中事件,將數據變動反映它本身的數據git
1.canal 模擬 MySQL slave 的交互協議,假裝本身爲 MySQL slave ,向 MySQL master 發送dump 協議
2.MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal )
3.canal 解析 binary log 對象(原始爲 byte 流)github
對於自建MySQL ,須要先開啓 Binlog寫入功能,而且配置binlog-format 爲Row模式 在my.cnf中配置spring
受權 canal 連接 MySQL 帳號具備做爲 MySQL slave 的權限, 若是已有帳戶可直接 grantsql
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
canal 下載地址 (下載速度可能很慢)數據庫
下載 canal.deployer-xxx.tar.gz 如 canal.deployer-1.1.4.tar.gzvim
解壓後 能夠看到以下結構
app
配置修改:maven
vim conf/example/instance.properties
以下:
################################################# ## mysql serverId canal.instance.mysql.slaveId = 2020 # position info 修改本身的數據庫(canal要監聽的數據庫 地址 ) canal.instance.master.address = 127.0.0.1:3306 canal.instance.master.journal.name = canal.instance.master.position = canal.instance.master.timestamp = #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = # username/password 修改爲本身 數據庫信息的帳號 (單獨開一個 準備階段建立的帳號) canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.defaultDatabaseName = canal.instance.connectionCharset = UTF-8 # table regex 表的監聽規則 # canal.instance.filter.regex = blogs\.blog_info canal.instance.filter.regex = .\*\\\\..\* # table black regex canal.instance.filter.black.regex =
啓動canal
sh bin/startup.sh
查看server日誌
看到 the canal server is running now 表示啓動成功
vi logs/canal/canal.log 2020-01-08 15:25:33.361 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server. 2020-01-08 15:25:33.468 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.12.245:11111] 2020-01-08 15:25:34.061 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
查看instance的日誌
vi logs/example/example.log 2020-01-08 15:25:33.864 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-blogs 2020-01-08 15:25:33.998 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful.... 2020-01-08 15:25:33.999 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - prepare to find start position just show master status
vi conf/canal.properties
在canal.destinations 處能夠配置當前server上部署的instance 列表 默認爲 example ,我這裏改爲了 blogs最好對應數據庫名稱。一個instance 對應一個 數據庫
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.0</version> </dependency>
CanalMessageListener.java
該類實現InitializingBean 主要是在初始化的時候 執行 init 方法,在init()方法中 建立 CanalConnector對象,鏈接須要監聽的canal,主要提供 canal的 host ,port ,destination ,以及username 和 password
parse 方法 主要用於將監聽的對象 經過反射等轉換成對應的實體類
/** * @author johnny **/ @Component @Slf4j @ConditionalOnProperty(name = "application.canal.accessor", havingValue = "canal") public class CanalMessageListener implements InitializingBean, ParseCanal { private CanalConnector connector; @Autowired private CanalConfig canalConfig; @Autowired private IParseDispatcher configParseDispatcher; private void init() { //建立canal 監聽 傳入host port destination等參數 connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalConfig.getHost(), canalConfig.getPort()), canalConfig.getDestination(), canalConfig.getUsername(), canalConfig.getPassword()); connector.connect(); // .*\..* connector.subscribe(".*\\..*"); connector.rollback(); new Thread(() -> { while (true) { Message message = connector.getWithoutAck(canalConfig.getBatchSize()); long batchId = message.getId(); long size = message.getEntries().size(); //batchId == -1 表示沒有數據變動 if (batchId == -1 || size == 0) { System.out.println("empty data "); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { //解析數據變動 resoleveEntry(message.getEntries()); } } }).start(); } //解析數據變動 private void resoleveEntry(List<CanalEntry.Entry> entries) { CanalEntry.RowChange rowChange = null; for (CanalEntry.Entry row : entries) { //判斷是不是 事物開始 和 事物結束 if (row.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || row.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { continue; } try { rowChange = CanalEntry.RowChange.parseFrom(row.getStoreValue()); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList(); String tableName = row.getHeader().getTableName(); CanalEntry.EventType eventType = row.getHeader().getEventType(); for (CanalEntry.RowData rowData : rowDataList) { if (eventType == CanalEntry.EventType.UPDATE) { List<CanalEntry.Column> columns = rowData.getBeforeColumnsList(); Object object = parse(columns, tableName); log.info("收到的 object:{}", JsonUtils.marshalToString(object)); //根據收到的對象 處理後續業務邏輯 } } } } @Override public void afterPropertiesSet() throws Exception { init(); } //解析 List<CanalEntry.Column>對象到對應的 實體類 @Override public Object parse(List<CanalEntry.Column> canalDatas, String tableName) { //根據配置好的map 從中根據key 表名 獲取對應的映射後的 實體類class String className = configParseDispatcher.dispatch(tableName); Object entity = null; Class c = null; try { c = Class.forName(className); entity = c.newInstance(); } catch (ClassNotFoundException e) { log.error("【未找到對應 {} 的 實體類 】", className); } catch (Exception e) { } for (CanalEntry.Column canalDataColumn : canalDatas) { String columnName = canalDataColumn.getName(); Field[] fields = c.getDeclaredFields(); for (Field field : fields) { Object fieldValue = null; field.setAccessible(true); String fiedName = CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, field.getName()); log.info("【filedName: {}】", fiedName); if (fiedName.equals(columnName)) { try { if (Long.class.equals(field.getType())) { fieldValue = NumberUtils.toLong(canalDataColumn.getValue()); }else if(Integer.class.equals(field.getType())){ fieldValue = NumberUtils.toInt(canalDataColumn.getValue()); }else if(Double.class.equals(field.getType())){ fieldValue = NumberUtils.toDouble(canalDataColumn.getValue()); }else if(Date.class.equals(field.getType())){ try { fieldValue = DateUtils.parseDate(canalDataColumn.getValue(), new String[]{"yyyy-MM-dd HH:mm:ss"}); } catch (ParseException e) { e.printStackTrace(); } }else{ fieldValue = canalDataColumn.getValue(); } field.set(entity, fieldValue); break; } catch (IllegalAccessException e) { e.printStackTrace(); } } } } return entity; } }
application.yml
配置canal 地址,以及表名和實體的映射規則
server: port: 8881 application: canal: accessor: canal host: 127.0.0.1 port: 11111 username: password: destination: blogs batchSize: 30 parse: 規則,根據表名獲取對應要映射的 實體class rule: mapping: blog_info: com.johnny.canal.canal_test.entity.BlogInfo
IParseDispatcher.java
接口:用來根據表名key獲取對應的 要映射的實體,這裏寫成接口是由於能夠提供多種獲取方式,好比我這裏經過yml 配置去獲取
/** * @author johnny * @create 2020-01-17 上午11:09 **/ public interface IParseDispatcher { String dispatch(String key); }
ConfigParseDispatcher.java
實現上面的接口,提供一種從 application.yml 獲取初始源配置 根據 application.canal.parse.rule進行配置
/** * @author johnny * @create 2020-01-17 上午11:07 **/ @Data @Configuration @ConfigurationProperties(prefix = "application.canal.parse.rule") public class ConfigParseDispatcher implements IParseDispatcher { private Map<String,String> mapping=new HashMap<>(); @Override public String dispatch(String key) { return mapping.get(key); } }
啓動項目 此時控制檯打印 empty data ,無數據變動
經過執行 在 canal監聽的mysql 上執行 更新語句
update blog_info set blog_title = 'SpringBoot配置相關for canal test ' where id = 40
debug 程序,當執行上面的update語句後 能夠看到當即收到
經過parse方法解析爲對應的 實體對象,後續作本身的業務邏輯 便可
本篇主要介紹了canal是什麼,如何下載安裝和配置 ,以及提供了本身寫的一個簡單demo 。後續有機會深刻了解一下canal的其餘功能,好比 如何同步到Kafka/RocketMQ等等。。
我的博客地址: https://www.askajohnny.com 歡迎訪問!
本文由博客一文多發平臺 OpenWrite 發佈!