使用canal分析binlog(二) canal源碼分析

在可以跑通example後有幾個疑問html

1. canal的server端對於已經讀取的binlog,client已經ack的position,是否持久化,保存在哪裏java

2. 即便不啓動zookeeper,canal也能夠正常運行,canal使用zookeeper或者不使用有什麼影響mysql

 

 從github上下載源碼,https://github.com/alibaba/canalgit

我使用的版本是1.0.22,照着兩位的博客看着源碼學習一下,版本上有些出入,但瞭解思想和總體架構夠了github

博客-楊武兵-開源社區sql

canal DevGuide - agapple - ITeye技術網站json

 

deployer項目爲發佈的server端程序,使用的日誌系統爲logback,默認日誌不輸出到控制檯,爲方便查看日誌信息,將日誌輸出到控制檯,修改logback.xml緩存

 <logger name="com.alibaba.otter.canal.instance" additivity="false">  
        <level value="INFO" />  
        <appender-ref ref="STDOUT"/>
        <appender-ref ref="CANAL-ROOT" />
    </logger>
    <logger name="com.alibaba.otter.canal.deployer" additivity="false">  
        <level value="INFO" />  
        <appender-ref ref="STDOUT"/>
        <appender-ref ref="CANAL-ROOT" />
    </logger>
    <logger name="com.alibaba.otter.canal.meta.FileMixedMetaManager" additivity="false">  
        <level value="INFO" />  
        <appender-ref ref="STDOUT"/>
        <appender-ref ref="CANAL-META" />
    </logger>
    
	<root level="WARN">
		<appender-ref ref="STDOUT"/>
		<appender-ref ref="CANAL-ROOT" />
	</root>

 

爲canal建立mysql用戶canal權限爲 replication slave,replication client,canal使用這個mysql用戶啓動server架構

這時遇到問題:server啓動報錯:app

com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: com.google.common.collect.ComputationException: com.alibaba.otter.canal.parse.exception.CanalParseException: fetch failed by table meta:`test`.`t_table`
Caused by: com.google.common.collect.ComputationException: com.alibaba.otter.canal.parse.exception.CanalParseException: fetch failed by table meta:`test`.`t_table`
at com.google.common.collect.MapMaker$ComputingMapAdapter.get(MapMaker.java:889) ~[guava-18.0.jar:na]
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache.getTableMeta(TableMetaCache.java:78) ~[classes/:na]
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert.getTableMeta(LogEventConvert.java:677) ~[classes/:na]
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert.parseRowsEvent(LogEventConvert.java:362) ~[classes/:na]
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert.parse(LogEventConvert.java:111) ~[classes/:na]
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert.parse(LogEventConvert.java:1) ~[classes/:na]
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser.parseAndProfilingIfNecessary(AbstractEventParser.java:327) ~[classes/:na]
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3$1.sink(AbstractEventParser.java:177) ~[classes/:na]
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:129) [classes/:na]
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:210) [classes/:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_73]
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: fetch failed by table meta:`test`.`t_table`
Caused by: java.io.IOException: ErrorPacket [errorNumber=1142, fieldCount=-1, message=SELECT command denied to user 'canal'@'localhost' for table 't_terminal', sqlState=42000, sqlStateMarker=#]
with command: desc `test`.`t_table`
at com.alibaba.otter.canal.parse.driver.mysql.MysqlQueryExecutor.query(MysqlQueryExecutor.java:60) ~[classes/:na]
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.query(MysqlConnection.java:73) [classes/:na]
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache.getTableMeta0(TableMetaCache.java:105) ~[classes/:na]
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache.access$0(TableMetaCache.java:104) ~[classes/:na]
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache$1.apply(TableMetaCache.java:51) ~[classes/:na]
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache$1.apply(TableMetaCache.java:1) ~[classes/:na]
at com.google.common.collect.ComputingConcurrentHashMap$ComputingValueReference.compute(ComputingConcurrentHashMap.java:356) ~[guava-18.0.jar:na]
at com.google.common.collect.ComputingConcurrentHashMap$ComputingSegment.compute(ComputingConcurrentHashMap.java:182) ~[guava-18.0.jar:na]
at com.google.common.collect.ComputingConcurrentHashMap$ComputingSegment.getOrCompute(ComputingConcurrentHashMap.java:151) ~[guava-18.0.jar:na]
at com.google.common.collect.ComputingConcurrentHashMap.getOrCompute(ComputingConcurrentHashMap.java:67) ~[guava-18.0.jar:na]
at com.google.common.collect.MapMaker$ComputingMapAdapter.get(MapMaker.java:885) ~[guava-18.0.jar:na]
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache.getTableMeta(TableMetaCache.java:78) ~[classes/:na]
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert.getTableMeta(LogEventConvert.java:677) ~[classes/:na]
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert.parseRowsEvent(LogEventConvert.java:362) ~[classes/:na]
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert.parse(LogEventConvert.java:111) ~[classes/:na]
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert.parse(LogEventConvert.java:1) ~[classes/:na]
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser.parseAndProfilingIfNecessary(AbstractEventParser.java:327) ~[classes/:na]
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3$1.sink(AbstractEventParser.java:177) ~[classes/:na]
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:129) [classes/:na]
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:210) [classes/:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_73]

 

查看官方說明,示例中爲canal配置的mysql用戶還須要select權限。

汗,我是按照mysql slave配的權限,原覺得不須要select。。。仔細一想,至少經過select方式判斷心跳確定須要select權限啊,不過我貌似沒有開啓心跳啊。 爲mysql用戶canal增長了select權限後,server能夠正常運行。

既然都報錯了,就看看canal到底select了些什麼。

查看報錯信息,異常發生的最近的位置:MapMaker.java:889,是第三方代碼,MapMaker是一個功能強大的Map的實現,示例查看這裏 。估計是在使用這個工具的時出現的異常,內部有調用回調函數之類的。

下一行報錯信息 TableMetaCache.getTableMeta(TableMetaCache.java:78),查看代碼

    public TableMeta getTableMeta(String schema, String table, boolean useCache) {
        if (!useCache) {
            tableMetaCache.remove(getFullName(schema, table));
        }

        return tableMetaCache.get(getFullName(schema, table));  // 78行
    }

查看
tableMetaCache的聲明,及初始化
/**
 * 處理table meta解析和緩存
 * 
 * @author jianghang 2013-1-17 下午10:15:16
 * @version 1.0.0
 */
public class TableMetaCache {

    // 第一層tableId,第二層schema.table,解決tableId重複,對應多張表
    private Map<String, TableMeta> tableMetaCache;

    public TableMetaCache(MysqlConnection con){
        this.connection = con;
        tableMetaCache = MigrateMap.makeComputingMap(new Function<String, TableMeta>() {

            public TableMeta apply(String name) {
                try {
                    return getTableMeta0(name);
                } catch (IOException e) {
                    // 嘗試作一次retry操做
                    try {
                        connection.reconnect();
                        return getTableMeta0(name);
                    } catch (IOException e1) {
                        throw new CanalParseException("fetch failed by table meta:" + name, e1);
                    }
                }
            }

        });
        .... // 省略
    }

}

當第78行調用tableMetaCache.get方法時,參數爲空,就會去執行回調函數 apply();

查看在apply方法中調用的getTableMeta0();

 private TableMeta getTableMeta0(String fullname) throws IOException {
        ResultSetPacket packet = connection.query("desc " + fullname);
        return new TableMeta(fullname, parserTableMeta(packet));
    }

執行select的就是這裏,經過斷點看到出錯時執行的查詢是 desc test.t_table。

那麼是何時須要獲取表結構信息呢,經過報錯信息繼續向上找LogEventConvert類的parse方法,源碼太長就不貼了,LogEventConvert類的功能是是將LogEvent轉化爲Entry對象,對着註釋仍是能看的。能夠肯定的是在MysqlConnection在dump到binlog數據後做處理時去查詢的表結構。

 

select的問題告一段落,回到正題。canal的工做原理可參考這裏。通過測試,同步數據是會保存到 config/{destination}/meta.dat文件中( 運行deployer項目時文件保存位置爲canal/config/{destination}/meta.dat )。文件內容爲json格式,以下所示

{
"clientDatas": [
{
"clientIdentity": {
"clientId": 1001,
"destination": "example",
"filter": ".*\\..*"
},
"cursor": {
"identity": {
"slaveId": -1,
"sourceAddress": {
"address": "127.0.0.1",
"port": 3306
}
},
"postion": {
"included": false,
"journalName": "mysql-log.000002",
"position": 10329,
"serverId": 1,
"timestamp": 1480670799000
}
}
}
],
"destination": "example"
}

在不使用zookeeper,非HA模式下,我作了以下幾個測試:

1. 刪除meta.dat文件->啓動server->啓動client->操做mysql

測試結果:在操做mysql後建立了meta.dat文件

2. 刪除meta.dat文件->操做mysql->啓動server->啓動client->操做mysql    

測試結果:啓動client時未建立meta.dat文件,第二次操做mysql建立了meta.dat文件

3. 不刪除meta.dat文件->操做mysql—>啓動server->啓動client    

測試結果:啓動client後client獲取到mysql的log數據

根據測試結果,大體猜想一下處理過程

1. canal server端啓動時去查看meta.dat文件,若是存在則加載該destination的位置信息到內存中,根據內存中destination的journalName和position從mysql master獲取binlog數據;若是不存在meta.dat文件,則獲取最新發生的binlog數據,並保存在內存中(中繼日誌)。

2. 啓動canal client,client端從server端獲取binlog數據,若是以前client端有過成功的ACK,server端會記錄到內存中並保存到meta.dat文件,client獲取binlog數據會從上次ACK成功位置開始;若是client以前沒有過ACK,則client獲取binlog數據會從server保存在內存中(中繼日誌)開始。

接下來經過查閱源碼去證明猜想的是否正確。

......

經過打斷點和讀源碼 ,發現保存文件到本地是經過 FileMixedMetaManager類實現。

相關文章
相關標籤/搜索