使用canal增量訂閱MySQL binlog

【轉載請註明出處】:https://segmentfault.com/a/1190000022767293html

基於數據庫增量日誌解析,提供增量數據訂閱&消費,目前主要支持了mysql。
早期,阿里巴巴B2B公司由於存在杭州和美國雙機房部署,存在跨機房同步的業務需求。不過早期的數據庫同步業務,主要是基於trigger的方式獲取增量變動,不過從2010年開始,阿里系公司開始逐步的嘗試基於數據庫的日誌解析,獲取增量變動進行同步,由此衍生出了增量訂閱&消費的業務,今後開啓了一段新紀元。java

ps. 目前內部版本已經支持mysql和oracle部分版本的日誌解析,當前的canal開源版本支持5.7及如下的版本(阿里內部mysql 5.7.13, 5.6.10, mysql 5.5.18和5.1.40/48)mysql

基於日誌增量訂閱&消費支持的業務:git

  • 數據庫鏡像
  • 數據庫實時備份
  • 多級索引 (賣家和買家各自分庫索引)
  • search build
  • 業務cache刷新
  • 價格變化等重要業務消息

一、Canal工做原理

mysql主備複製實現

image.png

從上層來看,複製分紅三步:github

  1. master將改變記錄到二進制日誌(binary log)中(這些記錄叫作二進制日誌事件,binary log events,能夠經過show binlog events進行查看);
  2. slave將master的binary log events拷貝到它的中繼日誌(relay log);
  3. slave重作中繼日誌中的事件,將改變反映它本身的數據。
canal的工做原理:

image.png

原理相對比較簡單:web

  1. canal模擬mysql slave的交互協議,假裝本身爲mysql slave,向mysql master發送dump協議
  2. mysql master收到dump請求,開始推送binary log給slave(也就是canal)
  3. canal解析binary log對象(原始爲byte流)
架構

image.png
說明:正則表達式

  • server表明一個canal運行實例,對應於一個jvm
  • instance對應於一個數據隊列 (1個server對應1..n個instance)

instance模塊:spring

  • eventParser (數據源接入,模擬slave協議和master進行交互,協議解析)
  • eventSink (Parser和Store連接器,進行數據過濾,加工,分發的工做)
  • eventStore (數據存儲)
  • metaManager (增量訂閱&消費信息管理器)
EventParser設計

大體過程:
image.png
整個parser過程大體可分爲幾步:sql

  1. Connection獲取上一次解析成功的位置 (若是第一次啓動,則獲取初始指定的位置或者是當前數據庫的binlog位點)
  2. Connection創建連接,發送BINLOG_DUMP指令

// 0. write command number
// 1. write 4 bytes bin-log position to start at
// 2. write 2 bytes bin-log flags
// 3. write 4 bytes server id of the slave
// 4. write bin-log file name數據庫

  1. Mysql開始推送Binaly Log
  2. 接收到的Binaly Log的經過Binlog parser進行協議解析,補充一些特定信息

// 補充字段名字,字段類型,主鍵信息,unsigned類型處理

  1. 傳遞給EventSink模塊進行數據存儲,是一個阻塞操做,直到存儲成功
  2. 存儲成功後,定時記錄Binaly Log位置

mysql的Binlay Log網絡協議:
image.png
說明:

https://dev.mysql.com/doc/internals/en/event-structure.html
https://dev.mysql.com/doc/internals/en/binlog-event.html

EventSink設計

image.png

說明:

  • 數據過濾:支持通配符的過濾模式,表名,字段內容等
  • 數據路由/分發:解決1:n (1個parser對應多個store的模式)
  • 數據歸併:解決n:1 (多個parser對應1個store)
  • 數據加工:在進入store以前進行額外的處理,好比join

數據1:n業務
爲了合理的利用數據庫資源, 通常常見的業務都是按照schema進行隔離,而後在mysql上層或者dao這一層面上,進行一個數據源路由,屏蔽數據庫物理位置對開發的影響,阿里系主要是經過cobar/tddl來解決數據源路由問題。
因此,通常一個數據庫實例上,會部署多個schema,每一個schema會有由1個或者多個業務方關注

數據n:1業務
一樣,當一個業務的數據規模達到必定的量級後,必然會涉及到水平拆分和垂直拆分的問題,針對這些拆分的數據須要處理時,就須要連接多個store進行處理,消費的位點就會變成多份,並且數據消費的進度沒法獲得儘量有序的保證。
因此,在必定業務場景下,須要將拆分後的增量數據進行歸併處理,好比按照時間戳/全局id進行排序歸併.

EventStore設計
  1. 目前僅實現了Memory內存模式,後續計劃增長本地file存儲,mixed混合模式
  2. 借鑑了Disruptor的RingBuffer的實現思路

RingBuffer設計:
image

定義了3個cursor

  • Put : Sink模塊進行數據存儲的最後一次寫入位置
  • Get : 數據訂閱獲取的最後一次提取位置
  • Ack : 數據消費成功的最後一次消費位置

借鑑Disruptor的RingBuffer的實現,將RingBuffer拉直來看:
image

實現說明:

  • Put/Get/Ack cursor用於遞增,採用long型存儲
  • buffer的get操做,經過取餘或者與操做。(與操做: cusor & (size - 1) , size須要爲2的指數,效率比較高)
Instance設計

image.png

instance表明了一個實際運行的數據隊列,包括了EventPaser,EventSink,EventStore等組件。

抽象了CanalInstanceGenerator,主要是考慮配置的管理方式:

  • manager方式: 和你本身的內部web console/manager系統進行對接。(目前主要是公司內部使用)
  • spring方式:基於spring xml + properties進行定義,構建spring配置.
Server設計

image.png

server表明了一個canal的運行實例,爲了方便組件化使用,特地抽象了Embeded(嵌入式) / Netty(網絡訪問)的兩種實現

  • Embeded : 對latency和可用性都有比較高的要求,本身又能hold住分佈式的相關技術(好比failover)
  • Netty : 基於netty封裝了一層網絡協議,由canal server保證其可用性,採用的pull模型,固然latency會稍微打點折扣,不過這個也視狀況而定。(阿里系的notify和metaq,典型的push/pull模型,目前也逐步的在向pull模型靠攏,push在數據量大的時候會有一些問題)
增量訂閱/消費設計

image.png
具體的協議格式,可參見:CanalProtocol.proto

get/ack/rollback協議介紹:

  • Message getWithoutAck(int batchSize),容許指定batchSize,一次能夠獲取多條,每次返回的對象爲Message,包含的內容爲:
    a. batch id 惟一標識
    b. entries 具體的數據對象,對應的數據對象格式:EntryProtocol.proto
  • void rollback(long batchId),顧命思議,回滾上次的get請求,從新獲取數據。基於get獲取的batchId進行提交,避免誤操做
  • void ack(long batchId),顧命思議,確認已經消費成功,通知server刪除數據。基於get獲取的batchId進行提交,避免誤操做

canal的get/ack/rollback協議和常規的jms協議有所不一樣,容許get/ack異步處理,好比能夠連續調用get屢次,後續異步按順序提交ack/rollback,項目中稱之爲流式api.

流式api設計的好處:

  • get/ack異步化,減小因ack帶來的網絡延遲和操做成本 (99%的狀態都是處於正常狀態,異常的rollback屬於個別狀況,不必爲個別的case犧牲整個性能)
  • get獲取數據後,業務消費存在瓶頸或者須要多進程/多線程消費時,能夠不停的輪詢get數據,不停的日後發送任務,提升並行化. (做者在實際業務中的一個case:業務數據消費須要跨中美網絡,因此一次操做基本在200ms以上,爲了減小延遲,因此須要實施並行化)

流式api設計:
image.png

  • 每次get操做都會在meta中產生一個mark,mark標記會遞增,保證運行過程當中mark的惟一性
  • 每次的get操做,都會在上一次的mark操做記錄的cursor繼續日後取,若是mark不存在,則在last ack cursor繼續日後取
  • 進行ack時,須要按照mark的順序進行數序ack,不能跳躍ack. ack會刪除當前的mark標記,並將對應的mark位置更新爲last ack cusor
  • 一旦出現異常狀況,客戶端可發起rollback狀況,從新置位:刪除全部的mark, 清理get請求位置,下次請求會從last ack cursor繼續日後取
HA機制設計

canal的ha分爲兩部分,canal server和canal client分別有對應的ha實現

  • canal server: 爲了減小對mysql dump的請求,不一樣server上的instance要求同一時間只能有一個處於running,其餘的處於standby狀態.
  • canal client: 爲了保證有序性,一份instance同一時間只能由一個canal client進行get/ack/rollback操做,不然客戶端接收沒法保證有序。

整個HA機制的控制主要是依賴了zookeeper的幾個特性,watcher和EPHEMERAL節點(和session生命週期綁定),能夠看下我以前zookeeper的相關文章。

Canal Server:
image.png

大體步驟:

  1. canal server要啓動某個canal instance時都先向zookeeper進行一次嘗試啓動判斷 (實現:建立EPHEMERAL節點,誰建立成功就容許誰啓動)
  2. 建立zookeeper節點成功後,對應的canal server就啓動對應的canal instance,沒有建立成功的canal instance就會處於standby狀態
  3. 一旦zookeeper發現canal server A建立的節點消失後,當即通知其餘的canal server再次進行步驟1的操做,從新選出一個canal server啓動instance.
  4. canal client每次進行connect時,會首先向zookeeper詢問當前是誰啓動了canal instance,而後和其創建連接,一旦連接不可用,會從新嘗試connect.

Canal Client的方式和canal server方式相似,也是利用zookeeper的搶佔EPHEMERAL節點的方式進行控制.

二、環境要求

  • jdk建議使用1.6.25以上的版本
  • 當前的canal開源版本支持5.7及如下的版本
    ps. mysql4.x版本沒有通過嚴格測試,理論上是能夠兼容
  • 開啓mysql的binlog寫入功能,而且配置binlog模式爲row

    [mysqld]  
    log-bin=mysql-bin  
    binlog-format=ROW #選擇row模式  
    server_id=1 #配置mysql replaction須要定義,不能和canal的slaveId重複

    檢查配置是否有效

    #查看binlog的開啓狀態及文件名
    mysql> show variables like '%log_bin%';
    #查看binlog當前的格式
    mysql> show variables like '%format%';
    #查看binlog文件列表
    mysql> show binary logs; 
    #查看binlog的狀態
    mysql> show master status;
  • canal的原理是模擬本身爲mysql slave,因此這裏必定須要作爲mysql slave的相關權限

    mysql> CREATE USER canal IDENTIFIED BY 'canal';    
    mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';  
    # -- mysql> GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;  
    mysql> FLUSH PRIVILEGES;

    針對已有的帳戶可經過grants查詢權限:

    mysql> show grants for 'canal' ;

三、部署

獲取發佈包

方法1: (直接下載)

訪問:https://github.com/alibaba/canal/releases,會列出全部歷史的發佈版本包
當前的最新版本是1.1.3

wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz

方法2:  (本身編譯)

git clone git@github.com:alibaba/canal.git
git co canal-1.1.3 #切換到對應的版本上
mvn clean install -Denv=release

執行完成後,會在canal工程根目錄下生成一個target目錄,裏面會包含一個 canal.deployer-1.1.3.tar.gz

配置介紹

介紹配置以前,先了解下canal的配置加載方式:
image.png

canal配置方式有兩種:

  1. ManagerCanalInstanceGenerator: 基於manager管理的配置方式,目前alibaba內部配置使用這種方式。你們能夠實現CanalConfigClient,鏈接各自的管理系統,便可完成接入。
  2. SpringCanalInstanceGenerator:基於本地spring xml的配置方式,目前開源版本已經自帶該功能全部代碼,建議使用
Spring配置方式介紹

spring配置的原理是將整個配置抽象爲兩部分:

  • xxxx-instance.xml   (canal組件的配置定義,能夠在多個instance配置中共享)
  • xxxx.properties   (每一個instance通道都有各自一份定義,由於每一個mysql的ip,賬號,密碼等信息不會相同)

經過spring的PropertyPlaceholderConfigurer經過機制將其融合,生成一份instance實例對象,每一個instance對應的組件都是相互獨立的,互不影響

properties配置文件
properties配置分爲兩部分:

  • canal.properties  (系統根配置文件)
  • instance.properties  (instance級別的配置文件,每一個instance一份)

canal.properties介紹:
canal配置主要分爲兩部分定義:

  1. instance列表定義 (列出當前server上有多少個instance,每一個instance的加載方式是spring/manager等)
  2. common參數定義,好比能夠將instance.properties的公用參數,抽取放置到這裏,這樣每一個instance啓動的時候就能夠共享. 【instance.properties配置定義優先級高於canal.properties】

instance.properties介紹:

  1. 在canal.properties定義了canal.destinations後,須要在canal.conf.dir對應的目錄下創建同名的文件

好比:

canal.destinations = example1,example2

這時須要建立example1和example2兩個目錄,每一個目錄裏各自有一份instance.properties.
ps. canal自帶了一份instance.properties demo,可直接複製conf/example目錄進行配置修改

  1. 若是canal.properties未定義instance列表,但開啓了canal.auto.scan時
  • server第一次啓動時,會自動掃描conf目錄下,將文件名作爲instance name,啓動對應的instance
  • server運行過程當中,會根據canal.auto.scan.interval定義的頻率,進行掃描

    1. 發現目錄有新增,啓動新的instance
    2. 發現目錄有刪除,關閉老的instance
    3. 發現對應目錄的instance.properties有變化,重啓instance

instance.xml配置文件
目前默認支持的instance.xml有如下幾種:

  • spring/memory-instance.xml
  • spring/default-instance.xml
  • spring/group-instance.xml

在介紹instance配置以前,先了解一下canal如何維護一份增量訂閱&消費的關係信息:

  • 解析位點 (parse模塊會記錄,上一次解析binlog到了什麼位置,對應組件爲:CanalLogPositionManager)
  • 消費位點 (canal server在接收了客戶端的ack後,就會記錄客戶端提交的最後位點,對應的組件爲:CanalMetaManager)

對應的兩個位點組件,目前都有幾種實現:

  • memory (memory-instance.xml中使用)
  • zookeeper
  • mixed
  • period (default-instance.xml中使用,集合了zookeeper+memory模式,先寫內存,定時刷新數據到zookeeper上)

memory-instance.xml:
   全部的組件(parser , sink , store)都選擇了內存版模式,記錄位點的都選擇了memory模式,重啓後又會回到初始位點進行解析 

**特色:** 速度最快,依賴最少(不須要zookeeper)

   場景:通常應用在quickstart,或者是出現問題後,進行數據分析的場景,不該該將其應用於生產環境

default-instance.xml:
   store選擇了內存模式,其他的parser/sink依賴的位點管理選擇了持久化模式,目前持久化的方式主要是寫入zookeeper,保證數據集羣共享. 
   特色: 支持HA
   場景: 生產環境,集羣化部署. 

group-instance.xml:
    主要針對須要進行多庫合併時,能夠將多個物理instance合併爲一個邏輯instance,提供客戶端訪問。

**場景:** 分庫業務。 好比產品數據拆分了4個庫,每一個庫會有一個instance,若是不用group,業務上要消費數據時,須要啓動4個客戶端,分別連接4個instance實例。使用group後,能夠在canal server上合併爲一個邏輯instance,只須要啓動1個客戶端,連接這個邏輯instance便可. 

instance.xml設計初衷:
  容許進行自定義擴展,好比實現了基於數據庫的位點管理後,能夠自定義一份本身的instance.xml,整個canal設計中最大的靈活性在於此

HA模式配置

修改canal.properties

canal.zkServers =127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
canal.destinations = example #當前server上部署的instance列表
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

修改instance.properties

canal.instance.mysql.slaveId=1234 #mysql集羣配置中的serverId概念,須要保證和當前mysql集羣中id惟一 (v1.1.x版本以後canal會自動生成,不須要手工指定)
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.filter.regex=.*\\..* #mysql 數據解析關注的表,Perl正則表達式.多個正則之間以逗號(,)分隔,轉義符須要雙斜槓(\\)

注意: 其餘機器上的instance目錄的名字須要保證徹底一致,HA模式是依賴於instance name進行管理,同時必須都選擇default-instance.xml配置,canal.instance.mysql.slaveId應該惟一。
執行啓動腳本startup.sh,啓動後,你能夠查看logs/example/example.log,只會看到一臺機器上出現了啓動成功的日誌,其餘的處於standby狀態。

客戶端消費數據

建立mvn工程,修改pom.xml,添加依賴:

<dependency>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.client</artifactId>
  <version>1.1.3</version>
</dependency>

CanalClientTest代碼

package com.stepper.canalclient;

import java.net.InetSocketAddress;
import java.util.List;


import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
 
public class CanalClientTest {

    public static void main(String args[]) { 
        String zkServers="127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
        String destination="example";
        CanalConnector connector = CanalConnectors.newClusterConnector(zkServers,destination,"","");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
//            connector.rollback();
            int totalEmptyCount = 120;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的數據
//                System.out.println(message.toString());
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交確認
                // connector.rollback(batchId); // 處理失敗, 回滾數據
            }

            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================ binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------&gt; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }


}

啓動Canal Client後,操做數據庫變動數據便可從控制檯從看到消息。
更多參數及介紹能夠參考官方wiki文檔.

注意:

  • 生產環境下儘可能採用HA的方式。
  • 關於Canal消費binlog的順序,爲保證binlog嚴格有序,儘可能不要用多線程。
  • 若是Canal消費binlog後的數據要發往kafka,又要保證有序,kafka topic 的partition能夠設置成1個分區。

【轉載請註明出處】: https://segmentfault.com/a/1190000022767293

公衆號_掃碼_後端老鳥.png

相關文章
相關標籤/搜索