canal源碼分析系列——ErosaConnection分析

類結構

ErosaConnection java

| mysql

|-------------------------------------- sql

|                                                    | 數據庫

MysqlConnection                            LocalBinLogConnection 服務器

ErosaConnection是一個鏈接的接口,定義了一些通用的方法。目前它有兩個實現類,MysqlConnection是與MySQL服務器鏈接的實現類,LocalBinLogConnection是與本地的binlog文件進行鏈接的實現類。從類中能夠看出,目前canal還不支持oracle的實現。 session

接口定義

package com.alibaba.otter.canal.parse.inbound;

import java.io.IOException;

/**
 * 通用的Erosa的連接接口, 用於通常化處理mysql/oracle的解析過程
 * 
 * @author: yuanzu Date: 12-9-20 Time: 下午2:47
 */
public interface ErosaConnection {

	/**
	 * 創建鏈接
	 * @throws IOException
	 */
    public void connect() throws IOException;

    /**
     * 從新創建鏈接,會斷開已有鏈接
     * @throws IOException
     */
    public void reconnect() throws IOException;

    /**
     * 斷開鏈接。
     * @throws IOException
     */
    public void disconnect() throws IOException;

    /**
     * 是否創建鏈接。
     * @return
     */
    public boolean isConnected();

    /**
     * 用於快速數據查找,和dump的區別在於,seek會只給出部分的數據
     * @param binlogfilename biglog文件名
     * @param binlogPosition binlog起始位置。
     * @param func 事件解析處理器。
     */
    public void seek(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException;

    /**
     * 獲取binlog事件,若是沒有數據會阻塞,等待數據的到達。
     * @param binlogfilename biglog文件名
     * @param binlogPosition binlog起始位置。
     * @param func 事件解析處理器。
     */
    public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException;

    /**
     * 獲取binlog事件,若是沒有數據會阻塞,等待數據的到達。
     * @param timestamp 起始時間,只同步該時間以後的產生的新事件。
     * @param func 事件解析處理器。
     */
    public void dump(long timestamp, SinkFunction func) throws IOException;

    /**
     * 產生一個新的鏈接。
     */
    ErosaConnection fork();
}

其中實現類MysqlConnection是咱們常常會使用到的一個類,先看看這個類是如何實現的。 oracle

MysqlConnection

屬性和構造函數

private MysqlConnector      connector;
    private long                slaveId;
    private Charset             charset = Charset.forName("UTF-8");
    private BinlogFormat        binlogFormat;
    private BinlogImage         binlogImage;

    public MysqlConnection(){
    }

    public MysqlConnection(InetSocketAddress address, String username, String password){

        connector = new MysqlConnector(address, username, password);
    }

    public MysqlConnection(InetSocketAddress address, String username, String password, byte charsetNumber,
                           String defaultSchema){
        connector = new MysqlConnector(address, username, password, charsetNumber, defaultSchema);
    }

從代碼能夠看出,它大部分依賴一個MysqlConnector組件來實現與MySQL的鏈接。咱們稍後看看該代碼的實現。 函數

構造函數須要的是MySql服務器的地址,用戶名和密碼,該用戶必須具有了replication slave權限才能夠。slaveId是當前解析器的slaveId,它不能與其它的slaveId衝突。 fetch

鏈接方法實現

public void connect() throws IOException {
        connector.connect();
    }

    public void reconnect() throws IOException {
        connector.reconnect();
    }

    public void disconnect() throws IOException {
        connector.disconnect();
    }

    public boolean isConnected() {
        return connector.isConnected();
    }


基本上都是調用了MysqlConnection的方法實現的,還須要進入該類查看實現。 編碼

dump方法


public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
        updateSettings();
        sendBinlogDump(binlogfilename, binlogPosition);
        DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
        fetcher.start(connector.getChannel());
        LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
        LogContext context = new LogContext();
        while (fetcher.fetch()) {
            LogEvent event = null;
            event = decoder.decode(fetcher, context);

            if (event == null) {
                throw new CanalParseException("parse failed");
            }

            if (!func.sink(event)) {
                break;
            }
        }
    }  
/**
     * the settings that will need to be checked or set:<br>
     * <ol>
     * <li>wait_timeout</li>
     * <li>net_write_timeout</li>
     * <li>net_read_timeout</li>
     * </ol>
     * 
     * @param channel
     * @throws IOException
     */
    private void updateSettings() throws IOException {
        try {
            update("set wait_timeout=9999999");
        } catch (Exception e) {
            logger.warn(ExceptionUtils.getFullStackTrace(e));
        }
        try {
            update("set net_write_timeout=1800");
        } catch (Exception e) {
            logger.warn(ExceptionUtils.getFullStackTrace(e));
        }


        try {
            update("set net_read_timeout=1800");
        } catch (Exception e) {
            logger.warn(ExceptionUtils.getFullStackTrace(e));
        }


        try {
            // 設置服務端返回結果時不作編碼轉化,直接按照數據庫的二進制編碼進行發送,由客戶端本身根據需求進行編碼轉化
            update("set names 'binary'");
        } catch (Exception e) {
            logger.warn(ExceptionUtils.getFullStackTrace(e));
        }


        try {
            // mysql5.6針對checksum支持須要設置session變量
            // 若是不設置會出現錯誤: Slave can not handle replication events with the
            // checksum that master is configured to log
            // 但也不能亂設置,須要和mysql server的checksum配置一致,否則RotateLogEvent會出現亂碼
            update("set @master_binlog_checksum= '@@global.binlog_checksum'");
        } catch (Exception e) {
            logger.warn(ExceptionUtils.getFullStackTrace(e));
        }


        try {
            // mariadb針對特殊的類型,須要設置session變量
            update("SET @mariadb_slave_capability='" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + "'");
        } catch (Exception e) {
            logger.warn(ExceptionUtils.getFullStackTrace(e));
        }
    }


dump方法的實現流程是這樣的。

1. 更新MySQL配置信息。調用方法updateSettings();主要包括設置超時時間、設置數據庫直接發送二進制數據,設置master_binlog_checksum和mariadb_slave_capability等變量值。

2.發送binlogdump命令。發送COM_BINLOG_DUMP命令,攜帶binlogFileName、binlogPosition和slaveServerId等關鍵信息。

3.構建一個binlog獲取器組件DirectLogFetcher。使用它得到binlog數據。

4.循環從DirectLogFetcher獲取內容,將獲取到的數據轉化爲event。

5.調用SinkFunction處理獲取到的event,若處理失敗則會中斷循環,不然繼續。

接下來要看懂他們就須要瞭解MySQL的binlog協議及數據格式定義了。

相關文章
相關標籤/搜索