數據異構重器之 Canal 初探

作積極的人,越努力越幸運!
數據異構重器之 Canal 初探java

源碼分析 Canal 系列開始了,一個全新的系列,即能探討 canal 自己的實現原理,也是筆者源碼閱讀技巧的展現。mysql

一、應用場景


提到 Canal,你們應該都能想到這是一個用於解析 MySQL binlog 日誌的工具,並將 MySQL 數據庫中數據同步到其餘存儲介質中,例如 Elasticsearch。sql

即 Canal 一個很是經常使用的使用場景:數據異構,一種更高級別的數據讀寫分離架構設計方法。數據庫

隨着業務不斷的發展,企業發展到必定階段,發現單體的關係型數據庫已沒法支撐業務高速發展帶來數據不斷累積的壓力,從而會誕生出一種設計架構:分庫分表。分庫分表對緩解單庫數據庫壓力確實是一種很是好的解決方案,但又衍生出另一種困境,關聯查詢不友好,甚至跨庫JOIN就更加如此。架構

舉例說明以下:例如一個訂單系統,一般有兩類用戶須要去查詢訂單,一類是顧客,一類是商家,在對數據庫進行分庫分表時,若是以顧客(buy_id)進行分庫的話,同一個商家的訂單數據會分佈在不一樣的庫中,若是以商家(shop_id)進行分庫的話,同一個用戶購買的全部訂單數據將會分佈在不一樣的庫中,這樣進行關聯查詢,就必然須要跨庫進行join,其成本都會偏高。並且上面的場景只能知足一方的需求,那如何是好呢?jvm

Canal 這個時候就閃亮登場了,在電商設計中,其實商家、顧客會被拆分紅兩個不一樣的服務,咱們能夠爲兩個不一樣的服務搭建不一樣的數據庫集羣,咱們能夠用戶訂單庫、商家訂單庫進行分庫,以用戶訂單庫爲主庫,當用戶在訂單系統下單後,數據進入到用戶訂單庫中,而後能夠經過 canal 監聽數據庫的binlog日誌,而後將數據再同步到商家訂單庫,而用戶訂單庫以用戶ID爲維度進行分庫,商家訂單庫以商家ID作分庫,完美解決問題。ide

二、架構設計原理


在瞭解到 Canal 的基本使用場景後,咱們經過 canal 官方文檔,去探究一下其核心架構設計理念,以此打開進入 Canal 的神祕世界中。工具

首先咱們簡單看一下 MySQL 的主從同步原理:
數據異構重器之 Canal 初探
在這裏插入圖片描述源碼分析

從上面的圖中能夠當作主從複製主要分紅三個步驟:單元測試

  • master將改變記錄到二進制日誌(binary log ) 中( 這些記錄叫作二進制日誌事件,binary log events,能夠經過show binlog events進行查看)

  • slave將master的binary log events拷貝到它的中繼日誌(relay log)

  • slave重作中繼日誌中的事件,將改變反映它本身的數據。

基於 MySQL 這種數據同步機制,那 Canal 的設計目標主要就是實現數據的同步,即數據的複製,從上面的圖天然而然的想到了以下的設計:
數據異構重器之 Canal 初探

原理相對比較簡單:

  • canal 模擬 mysql slave 的交互協議,假裝本身爲 mysql slave,向 mysql master 發送 dump 協議

  • mysql master 收到 dump 請求,開始推送 binary log 給 slave (canal)

  • canal解析 binary log 對象(原始爲byte流)

接下來咱們來看一下 Canale 的總體組成部分:
數據異構重器之 Canal 初探

說明:

  • server表明一個canal運行實例,對應於一個jvm

  • instance對應於一個數據隊列 (1個server對應1..n個instance)

instance模塊:

  • eventParser (數據源接入,模擬slave協議和master進行交互,協議解析)

  • eventSink (Parser和Store連接器,進行數據過濾,加工,分發的工做)

  • eventStore (數據存儲)

  • metaManager (增量訂閱&消費信息管理器)

這些組件我暫時不打算深刻去研究,由於在目前這個階段我本身也不清楚,但這個是我後續須要學習研究的重點。

三、在 IntelliJ IDEA 中運行 Demo


在 Linux 環境中安裝 canal 比較簡單,你們能夠按照官方手冊一步一步操做便可,在這裏我就不重複介紹,本節主要的目的是但願在開發工具中運行 Canal 的 Demo,以便後續在研究源碼的過程當中遇到難題時能夠進行 Debug。

舒適提示:你們在學習過程當中,能夠根據官方文檔先安裝一遍 canal,對理解 Canal 的核心組件有着很是重要的幫助。

首先先從 canal 源碼中尋找官方提供的 Demo,其示例代碼在 example 包中,以下圖所示:
數據異構重器之 Canal 初探

可是另外稍微遺憾的是 canal 提供提供的示例代碼中只包含了 client 端相關的代碼,並無包含服務端(server),故咱們將目光放到其單元測試中,以下圖所示:
數據異構重器之 Canal 初探
接下來我根據官方的一些提示,結合本身的理解,編寫出以下測試代碼,在 IDEA 開發工具中實現運行 Canal 相關的 Demo。下面的代碼已經過測試,可直接使用。
一、Canal Server Demo

package com.alibaba.otter.canal.server;
import com.alibaba.otter.canal.instance.core.CanalInstance;
import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator;
import com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager;
import com.alibaba.otter.canal.instance.manager.model.Canal;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter;
import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
import com.alibaba.otter.canal.server.netty.CanalServerWithNetty;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Arrays;
public class CanalServerTestMain {
    protected static final String ZK_CLUSTER_ADDRESS      = "127.0.0.1:2181";
    protected static final String DESTINATION   = "example";
    protected static final String DETECTING_SQL = "select 1";
    protected static final String MYSQL_ADDRESS = "127.0.0.1";
    protected static final String USERNAME      = "canal";
    protected static final String PASSWORD      = "canal";
    protected static final String FILTER        = ".\\*\\\\\\\\..\\*";
    /** 默認 500s 後關閉 */
    protected static final long RUN_TIME = 120 * 1000;
    private final ByteBuffer header        = ByteBuffer.allocate(4);
    private CanalServerWithNetty nettyServer;
    public static void main(String[] args) {
        CanalServerTestMain test = new CanalServerTestMain();
        try {
            test.setUp();
            System.out.println("start");
        } catch (Throwable e) {
            e.printStackTrace();
        } finally {
            System.out.println("sleep");
            try {
                Thread.sleep(RUN_TIME);
            } catch (Throwable ee) {
            }
            test.tearDown();
            System.out.println("end");
        }
    }
    public void setUp() {
        CanalServerWithEmbedded embeddedServer = new CanalServerWithEmbedded();
        embeddedServer.setCanalInstanceGenerator(new CanalInstanceGenerator() {
            public CanalInstance generate(String destination) {
                Canal canal = buildCanal();
                return new CanalInstanceWithManager(canal, FILTER);
            }
        });
        nettyServer = CanalServerWithNetty.instance();
        nettyServer.setEmbeddedServer(embeddedServer);
        nettyServer.setPort(11111);
        nettyServer.start();
        // 啓動 instance
        embeddedServer.start("example");
    }
    public void tearDown() {
        nettyServer.stop();
    }
    private Canal buildCanal() {
        Canal canal = new Canal();
        canal.setId(1L);
        canal.setName(DESTINATION);
        canal.setDesc("test");
        CanalParameter parameter = new CanalParameter();
        //parameter.setZkClusters(Arrays.asList(ZK_CLUSTER_ADDRESS));
        parameter.setMetaMode(CanalParameter.MetaMode.MEMORY);
        parameter.setHaMode(CanalParameter.HAMode.HEARTBEAT);
        parameter.setIndexMode(CanalParameter.IndexMode.MEMORY);
        parameter.setStorageMode(CanalParameter.StorageMode.MEMORY);
        parameter.setMemoryStorageBufferSize(32 * 1024);
        parameter.setSourcingType(CanalParameter.SourcingType.MYSQL);
        parameter.setDbAddresses(Arrays.asList(new InetSocketAddress(MYSQL_ADDRESS, 3306),
                new InetSocketAddress(MYSQL_ADDRESS, 3306)));
        parameter.setDbUsername(USERNAME);
        parameter.setDbPassword(PASSWORD);
        parameter.setSlaveId(1234L);
        parameter.setDefaultConnectionTimeoutInSeconds(30);
        parameter.setConnectionCharset("UTF-8");
        parameter.setConnectionCharsetNumber((byte) 33);
        parameter.setReceiveBufferSize(8 * 1024);
        parameter.setSendBufferSize(8 * 1024);
        parameter.setDetectingEnable(false);
        parameter.setDetectingIntervalInSeconds(10);
        parameter.setDetectingRetryTimes(3);
        parameter.setDetectingSQL(DETECTING_SQL);
        canal.setCanalParameter(parameter);
        return canal;
    }
}

二、Canal Client Demo

package com.alibaba.otter.canal.example;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
public class SimpleCanalClientExample {
    public static void main(String[] args) {
        // 建立連接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmptyCount = 3000;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的數據
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }
                connector.ack(batchId); // 提交確認
                // connector.rollback(batchId); // 處理失敗, 回滾數據
            }
            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }
    private static void printEntry(List<CanalEntry.Entry> entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }
            CanalEntry.RowChange rowChage = null;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }
            CanalEntry.EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));
            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }
    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

運行 client 的效果以下圖所示:
數據異構重器之 Canal 初探

在數據庫中變動一條數據,以便產生新的binlog日誌,其輸出結果以下:
數據異構重器之 Canal 初探
能在 IDEA 中搭建並運行 Demo,是咱們踏入 canal 的第一步,後續將根據官方文檔中的內容爲提綱,嘗試逐步解開 canal 的實現原理,以便更好的指導實踐。

本文就先介紹到這裏了,Canal 系列正式開始連載,敬請關注。

原創不易,若是對你有所幫助請你爲本文點個【在看】吧,這將是我寫做更多優質文章的最強動力。

歡迎加入個人知識星球,一塊兒交流源碼,探討架構,揭祕億級訂單的架構設計與實踐經驗,打造高質量的技術交流圈,爲廣大星友提供高質量問答服務,長按以下二維碼加入。
數據異構重器之 Canal 初探

丁威素質三連是對我最大的鼓勵

相關文章
相關標籤/搜索