Canal的安裝與使用

1、Canal介紹

  Canal的原理就是它本身假裝成slave, 向mysql發送dump協議,MySQL master接收到dump請求以後推送binlog文件給slave, 也就是canal。  html

 

2、Canal安裝

  1. 下載Canal java

   wget https://github.com/alibaba/canal/releases/download/canal-1.0.24/canal.deployer-1.0.24.tar.gzmysql

  2. 解壓到/opt/softwares/canal目錄, 解壓完以後以下圖所示:git

  3. 配置instancegithub

 

 

  4. 修改canal.propertiessql

 

 

3、Mysql 安裝

  一、mysql 安裝spa

    yum install mysql.net

    yum install mysql-server code

  二、啓動mysqlorm

    /etc/init.d/mysqld start 或者sevice mysqld start

  三、設置root用戶密碼

    mysqladmin -u root password '123456' 

  四、登陸msyql

    mysql -uroot -p123456

  五、檢查並開啓binlog複製功能及binlog模式是否爲ROW模式

    參考: binlog詳解

 

4、Canal抽取binlog

  Canal只是假裝成slave抽取binlog,Canal拿到binlog以後還須要交給業務方去作響應的處理,那麼怎麼去交給業務方呢?通常都是Canal獲取到binlog以後寫到kafka裏,業務方訂閱kafka topic消費binlog,完成業務邏輯處理。

  可是Canal不能直接寫Kafka, 因此還須要有個client鏈接Canal,Canal獲取binlog以後交給Client, Client在往Kafka裏寫binlog消息,Client代碼以下:

  

import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import java.net.InetSocketAddress; import java.util.List; public class CanalClientExample { public static void main(String[] args) { CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.41.254", 11111), "example", "canal", "canal"); try { int batchSize = 1000; connector.connect(); connector.subscribe("zhengxinv6\\..*"); connector.rollback(); while (true) { // 獲取指定數量的數據
                Message message = connector.getWithoutAck(batchSize); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } continue; } System.out.println("batchId = [" + batchId + "]"); printEntry(message.getEntries()); connector.ack(batchId); //提交確認 //connector.rollback(batchId);
 } } finally { connector.disconnect(); } } private static void printEntry(List<CanalEntry.Entry> entrys) { for (CanalEntry.Entry entry : entrys) { if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { continue; } CanalEntry.RowChange rowChange = null; try { rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException( "ERROR ## parser of eromanga-event has an error,data:"
                                + entry.toString(), e); } CanalEntry.EventType eventType = rowChange.getEventType(); System.out.println(String.format("================> binlog[%s:%s] ,name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset()+"", entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (CanalEntry.RowData rowData: rowChange.getRowDatasList()) { if (eventType == CanalEntry.EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == CanalEntry.EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List<CanalEntry.Column> columns) { for (CanalEntry.Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated()); } } }

 

 

5、Canal使用過程出現的問題及解決方法

  參考:canal報錯解決方法

  

 

 

參考:https://www.jianshu.com/p/6299048fad66

相關文章
相關標籤/搜索