「從零單排canal 06」 instance模塊源碼解析

基於1.1.5-alpha版本,具體源碼筆記能夠參考個人github:https://github.com/saigu/JavaKnowledgeGraph/tree/master/code_reading/canalmysql

關於instance的定義,能夠參考前面的幾篇源碼解析,介紹的很是清楚。
git

instance模塊比較簡單,咱們重點了解如下幾個問題github

  • instance配置模式有哪幾種,如何根據配置建立instance?
  • 遠端配置如何覆蓋本地配置的?
  • instance實例內部有哪些組件?

1.基本結構


instance模塊下面也分爲三個子模塊,core、manager、spring。spring

其中,core是instance的核心邏輯 。sql

而manager和spring只是兩種不一樣的instance配置讀取方式,manager經過http請求讀取admin的配置,spring經過配置文件的方式讀取。數據庫

主要控制邏輯咱們在deployer模塊源碼分析中提到過,就是在CanalController類 的instanceGenerator,配置參數是canal.instance.global.mode微信

  • 根據destination建立config
  • 若是canal.instance.global.mode = manager,就使用PlainCanalInstanceGenerator
  • 若是canal.instance.global.mode = spring,就使用SpringCanalInstanceGenerator

源碼以下多線程


2.core子模塊


代碼很少,就兩個接口,兩個類。jvm

2.1 CanalInstanceGenerator接口

這個接口只有一個方法ide


具體實現就是開頭提到的兩種,PlainCanalInstanceGenerator和SpringCanalInstanceGenerator,分別在manager子模塊和spring子模塊中實現。

具體選擇就是開頭的那個canalController裏面根據canal.instance.global.mode來選擇。


2.2 CanalInstance接口

先看一張官方文檔的圖,這個前面的文章已經分析過了。


server表明一個canal-server運行實例,對應於一個jvm。server內部能夠有多個instance。

Instance內部有4個主要組件:

  • eventParser :數據源接入,模擬slave協議和master進行交互,協議解析
  • eventSink :Parser和Store鏈接器,進行數據過濾,加工,分發的工做
  • eventStore :數據存儲
  • metaManager:增量訂閱&消費信息管理器

在這個接口中,就定義了獲取4個組件的方法,以及新版本增長的mqProducer的配置信息(mqProducer在server模塊解析中介紹過了,能夠回頭去看看)


咱們簡單看下4個組件接口的各個實現類。

CanalEventParser接口實現類(paser模塊):

  • MysqlEventParser:假裝成單個mysql實例的slave解析binglog日誌
  • GroupEventParser:假裝成多個mysql實例的slave解析binglog日誌。內部維護了多個CanalEventParser,組合多個EventParser進行合併處理,group只是做爲一個delegate處理。主要應用場景是分庫分表:好比一個大表拆分了4個庫,位於不一樣的mysql實例上,正常狀況下,咱們須要配置四個CanalInstance。對應的,業務上要消費數據時,須要啓動4個客戶端,分別連接4個instance實例。爲了方便業務使用,此時咱們可讓CanalInstance引用一個GroupEventParser,由GroupEventParser內部維護4個MysqlEventParser去4個不一樣的mysql實例去拉取binlog,最終合併到一塊兒。此時業務只須要啓動1個客戶端,連接這個CanalInstance便可
  • LocalBinlogEventParser:解析本地的mysql binlog。例如將mysql的binlog文件拷貝到canal的機器上進行解析。
  • RdsLocalBinlogEventParser:基於阿里雲rds的binlog備份文件複製,下載到本地後進行本地的binlog解析。

CanalEventSink接口實現類(sink模塊):

  • EntryEventSink:普通的單個parser的sink操做,進行數據過濾,加工,分發

  • GroupEventSink:用於分庫分表的場景,對應GroupEventParser的數據解析,而後實現基於歸併排序的sink處理

CanalEventStore接口實現類(store模塊):

  • MemoryEventStoreWithBuffer:基於內存實現存儲store

CanalMetaManager(meta模塊):

  • ZooKeeperMetaManager:將元數據存存儲到zk中

  • MemoryMetaManager:將元數據存儲到內存中

  • MixedMetaManager:組合memory + zookeeper的使用模式

  • PeriodMixedMetaManager:基於定時刷新的策略的mixed實現

  • FileMixedMetaManager:先寫內存,而後定時刷新數據到File

關於這些實現的具體細節,咱們在相應模塊的源碼分析時,進行講解。目前只須要知道,一些組件有多種實現,所以組合工做方式有多種。

2.3 AbstractCanalInstance類

AbstractCanalInstance是canalInstance的抽象類,維護了相關組件的引用。

同時,也實現了canalInstance接口的subscribeChange方法。

這個抽象類有兩個實現,CanalInstanceWithManager 和 CanalInstanceWithSpring。

AbstractCanalInstance的初始化過程都是在實現類中完成的。

若是選擇admin控制模式,那就是在CanalInstanceWithManager中完成;若是是spring模式,就在CanalInstanceWithSpring中完成。 

這裏有個小發現:

仔細看下實際代碼調用咱們發現,CanalInstanceWithManager是給ManagerCanalInstanceGenerator使用的,而這個generator實際上沒有被使用到。若是使用admin模式,本文開頭咱們就看到了,使用了PlainCanalInstanceGenerator。PlainCanalInstanceGenerator裏面的generate方法實現,其實跟SpringCanalInstanceGenerator差很少。就是從遠端admin拉到配置,而後替換系統變量,而後再從spring的beanfactory中構建具體的實例。


2.3.1 subscribeChange() 方法

AbstractCanalInstance類實現了CanalInstance接口的subscribeChange方法。

咱們看到,若是訂閱關係發生變化,就作一些操做,這裏看的話,主要就是更新了一下filter。

filter規定了須要訂閱哪些庫,哪些表。


2.3.2 start() 方法

啓動沒什麼特別的邏輯,就是按照順序依次啓動各個組件。

順序爲 metaManager -> alarmHandler -> eventStore -> eventSink -> eventParser。

啓動順序主要跟依賴關係有關,元信息相關的管理跟全部都有關,因此metaManager最早啓動,其餘的按照彼此之間的關係一一啓動。


這裏咱們發現,在啓動eventParser的時候作了特殊處理,分別是beforeStartEventParser 和 afterStartEventParser。咱們在2.3.4專門講一下。


2.3.3 stop()方法

stop也沒什麼特殊的,就是依次關閉各個組件。

關閉的順序就是start的逆過程。

這裏就不貼代碼了。


2.3.4 eventParserr的特殊處理

在start和stop方法中的eventParser先後都有特殊的處理,start的beforeStartEventParser 和 afterStartEventParser,Stop的beforeStopEventParser 和 afterStopEventParser。

這個其實跟eventParser的設計有關。

EventParser 設計

  • 每一個EventParser都會關聯兩個內部組件

  • CanalLogPositionManager : 記錄binlog 最後一次解析成功位置信息,主要是描述下一次canal啓動的位點 CanalHAController: 控制 EventParser 的連接主機管理,判斷當前該連接哪一個mysql數據庫

因此這兩個beforexxx、afterxxxx方法作的主要是CanalLogPositionManager和CanalHAController的啓停工做。


2.3.5 AbstractCanalInstance類 總結

能夠看到AbstractCanalInstance除了負責啓動和中止其內部組件,就沒有其餘工做了。

eventParser在AbstractCanalInstance中啓動後,就會自行開啓多線程任務dump數據,經過eventSink投遞給eventStore。

而對eventStore的操做邏輯,實際上都是在CanalServerWithEmbedded中完成的,咱們能夠回顧一下CanalServerWithEmbedded中 getWithoutAck( ) 的相關邏輯。

包括:

  • 根據clientIdentity的destination獲取對應的instance

  • 獲取到流式數據中的最後一批獲取的位置positionRanges(跟batchId有關聯,就是上面那個map裏面的)

  • 從cananlEventStore裏面獲取binlog,轉化爲event。通常是從最後的一個batchId位置開始,若是以前沒有batchId,那麼就從cursor記錄的消費位點開始;若是cursor爲空,那隻能從eventStore的第一條消息開始。(這裏幾個位置關係再想想,跟ack有關,畫個圖)

  • event轉化爲entry,並生成新的batchId,組合成message返回給客戶端

因此,其實這裏只是簡單的啓動和中止,組件的交互邏輯是在CanalServerWithEmbedded中get出instance的各個組件來進行實現的。


3.spring模塊

前面提到了,PlainCanalInstanceGenerator裏面的generate方法實現,其實跟SpringCanalInstanceGenerator差很少。就是從遠端admin拉到配置,而後替換系統變量,而後再從spring的beanfactory中構建具體的實例。

因此咱們重點關注spring子模塊的配置方式便可。

就下面四個類



3.1 CanalInstanceWithSpring類

基於spring容器啓動canal實例,方便獨立於manager啓動。

繼承了AbstractCanalInstance,其實就是一系列組件的setter方法,就不貼源碼了。

具體配置是基於spring的xml來作的.

當咱們配置加載方式爲spring時,建立的CanalInstance實例類型都是CanalInstanceWithSpring。canal將會尋找本地的spring配置文件來建立instance實例。canal默認提供瞭如下幾種spring配置文件:

  • spring/memory-instance.xml
  • spring/file-instance.xml
  • spring/default-instance.xml
  • spring/group-instance.xml

四個配置文件中,對CanalInstanceWithSpring都採用了一樣的配置方式:


固然,具體每一個組件的ref在不一樣配置文件中有所不一樣。

最主要的就是metaManager 和eventParser 這兩個配置有所不一樣,可能在內存、文件或zk進行存儲。

eventStore 、和eventSink 定義都是相同的,eventStore目前的開源版本中eventStore只有一種基於內存的實現,eventSink其做用是eventParser和eventStore的鏈接器,進行數據過濾,加工,分發的工做。不涉及存儲,也就沒有必要針對內存、file、或者zk進行區分。


3.2 SpringCanalInstanceGenerator類

這個是具體建立instance的邏輯。


順便看下PlainCanalInstanceGenerator裏面的實現,就是多了從遠端拉取配置,而後用PropertyPlaceholderConfigurer進行了變量替換,而後仍是用beanFactory來獲取實例。

com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer繼承了org.springframework.beans.factory.config.PropertyPlaceholderConfigurer,設置動態properties,替換掉本地properties。


4.總結

其實這個模塊的東西比較少,沒有什麼特別複雜的邏輯。

咱們來回顧下開頭的幾個問題

  • instance配置模式有哪幾種,如何根據配置建立instance?

主要有基於spring和基於遠端配置兩種方式,前者的實如今,後者的實如今PlainCanalInstanceGenerator

  • 遠端配置如何覆蓋本地配置的?

PlainCanalInstanceGenerator中使用了spring的PropertyPlaceholderConfigurer來覆蓋配置

  • instance實例內部有哪些組件?

包括了parser、sink、store、metamanager等組件,可是隻負責了啓動和中止邏輯,具體交互邏輯仍是在CanalServerWithEmbedded中實現的。



原創:阿丸筆記(微信公衆號:aone_note),歡迎 分享,轉載請保留出處。

掃描下方二維碼能夠關注我哦~

                                                                              以爲不錯,就點個 再看 吧👇



本文分享自微信公衆號 - 阿丸筆記(aone_note)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索