聊聊CanalInstance

本文主要研究一下CanalInstancejava

CanalLifeCycle

canal-1.1.4/common/src/main/java/com/alibaba/otter/canal/common/CanalLifeCycle.javagit

public interface CanalLifeCycle {

    void start();

    void stop();

    boolean isStart();
}
  • CanalLifeCycle接口定義了start、stop、isStart方法

AbstractCanalLifeCycle

canal-1.1.4/common/src/main/java/com/alibaba/otter/canal/common/AbstractCanalLifeCycle.javagithub

public abstract class AbstractCanalLifeCycle implements CanalLifeCycle {

    protected volatile boolean running = false; // 是否處於運行中

    public boolean isStart() {
        return running;
    }

    public void start() {
        if (running) {
            throw new CanalException(this.getClass().getName() + " has startup , don't repeat start");
        }

        running = true;
    }

    public void stop() {
        if (!running) {
            throw new CanalException(this.getClass().getName() + " isn't start , please check");
        }

        running = false;
    }

}
  • AbstractCanalLifeCycle實現了CanalLifeCycle接口,其定義了running屬性,start方法設置running爲true,stop設置running爲false,isStart返回running值

CanalInstance

canal-1.1.4/instance/core/src/main/java/com/alibaba/otter/canal/instance/core/CanalInstance.javaspring

public interface CanalInstance extends CanalLifeCycle {

    String getDestination();

    CanalEventParser getEventParser();

    CanalEventSink getEventSink();

    CanalEventStore getEventStore();

    CanalMetaManager getMetaManager();

    CanalAlarmHandler getAlarmHandler();

    /**
     * 客戶端發生訂閱/取消訂閱行爲
     */
    boolean subscribeChange(ClientIdentity identity);

    CanalMQConfig getMqConfig();
}
  • CanalInstance繼承了CanalLifeCycle,它還定義了getDestination、getEventParser、getEventSink、getEventStore、getMetaManager、getAlarmHandler、subscribeChange、getMqConfig方法

AbstractCanalInstance

canal-1.1.4/instance/core/src/main/java/com/alibaba/otter/canal/instance/core/AbstractCanalInstance.javaide

public class AbstractCanalInstance extends AbstractCanalLifeCycle implements CanalInstance {

    private static final Logger                      logger = LoggerFactory.getLogger(AbstractCanalInstance.class);

    protected Long                                   canalId;                                                      // 和manager交互惟一標示
    protected String                                 destination;                                                  // 隊列名字
    protected CanalEventStore<Event>                 eventStore;                                                   // 有序隊列

    protected CanalEventParser                       eventParser;                                                  // 解析對應的數據信息
    protected CanalEventSink<List<CanalEntry.Entry>> eventSink;                                                    // 連接parse和store的橋接器
    protected CanalMetaManager                       metaManager;                                                  // 消費信息管理器
    protected CanalAlarmHandler                      alarmHandler;                                                 // alarm報警機制
    protected CanalMQConfig                          mqConfig;                                                     // mq的配置

    //......

    @Override
    public void start() {
        super.start();
        if (!metaManager.isStart()) {
            metaManager.start();
        }

        if (!alarmHandler.isStart()) {
            alarmHandler.start();
        }

        if (!eventStore.isStart()) {
            eventStore.start();
        }

        if (!eventSink.isStart()) {
            eventSink.start();
        }

        if (!eventParser.isStart()) {
            beforeStartEventParser(eventParser);
            eventParser.start();
            afterStartEventParser(eventParser);
        }
        logger.info("start successful....");
    }

    @Override
    public void stop() {
        super.stop();
        logger.info("stop CannalInstance for {}-{} ", new Object[] { canalId, destination });

        if (eventParser.isStart()) {
            beforeStopEventParser(eventParser);
            eventParser.stop();
            afterStopEventParser(eventParser);
        }

        if (eventSink.isStart()) {
            eventSink.stop();
        }

        if (eventStore.isStart()) {
            eventStore.stop();
        }

        if (metaManager.isStart()) {
            metaManager.stop();
        }

        if (alarmHandler.isStart()) {
            alarmHandler.stop();
        }

        logger.info("stop successful....");
    }

    @Override
    public boolean subscribeChange(ClientIdentity identity) {
        if (StringUtils.isNotEmpty(identity.getFilter())) {
            logger.info("subscribe filter change to " + identity.getFilter());
            AviaterRegexFilter aviaterFilter = new AviaterRegexFilter(identity.getFilter());

            boolean isGroup = (eventParser instanceof GroupEventParser);
            if (isGroup) {
                // 處理group的模式
                List<CanalEventParser> eventParsers = ((GroupEventParser) eventParser).getEventParsers();
                for (CanalEventParser singleEventParser : eventParsers) {// 須要遍歷啓動
                    if(singleEventParser instanceof AbstractEventParser) {
                        ((AbstractEventParser) singleEventParser).setEventFilter(aviaterFilter);
                    }
                }
            } else {
                if(eventParser instanceof AbstractEventParser) {
                    ((AbstractEventParser) eventParser).setEventFilter(aviaterFilter);
                }
            }

        }

        // filter的處理規則
        // a. parser處理數據過濾處理
        // b. sink處理數據的路由&分發,一份parse數據通過sink後能夠分發爲多份,每份的數據能夠根據本身的過濾規則不一樣而有不一樣的數據
        // 後續內存版的一對多分發,能夠考慮
        return true;
    }


    @Override
    public String getDestination() {
        return destination;
    }

    @Override
    public CanalEventParser getEventParser() {
        return eventParser;
    }

    @Override
    public CanalEventSink getEventSink() {
        return eventSink;
    }

    @Override
    public CanalEventStore getEventStore() {
        return eventStore;
    }

    @Override
    public CanalMetaManager getMetaManager() {
        return metaManager;
    }

    @Override
    public CanalAlarmHandler getAlarmHandler() {
        return alarmHandler;
    }

    @Override
    public CanalMQConfig getMqConfig() {
        return mqConfig;
    }

    //......

}
  • AbstractCanalInstance繼承了AbstractCanalLifeCycle,它覆蓋了start、stop方法,其start方法分別啓動metaManager、alarmHandler、eventStore、eventSink、eventParser,其stop方法分別關閉eventParser、eventSink、eventStore、metaManager、alarmHandler;其subscribeChange方法根據ClientIdentity的pattern建立AviaterRegexFilter,而後設置給eventParser

CanalInstanceWithSpring

canal-1.1.4/instance/spring/src/main/java/com/alibaba/otter/canal/instance/spring/CanalInstanceWithSpring.javathis

public class CanalInstanceWithSpring extends AbstractCanalInstance {

    private static final Logger logger = LoggerFactory.getLogger(CanalInstanceWithSpring.class);

    public void start() {
        logger.info("start CannalInstance for {}-{} ", new Object[] { 1, destination });
        super.start();
    }

    // ======== setter ========

    public void setDestination(String destination) {
        this.destination = destination;
    }

    public void setEventParser(CanalEventParser eventParser) {
        this.eventParser = eventParser;
    }

    public void setEventSink(CanalEventSink<List<CanalEntry.Entry>> eventSink) {
        this.eventSink = eventSink;
    }

    public void setEventStore(CanalEventStore<Event> eventStore) {
        this.eventStore = eventStore;
    }

    public void setMetaManager(CanalMetaManager metaManager) {
        this.metaManager = metaManager;
    }

    public void setAlarmHandler(CanalAlarmHandler alarmHandler) {
        this.alarmHandler = alarmHandler;
    }

    public void setMqConfig(CanalMQConfig mqConfig){
        this.mqConfig = mqConfig;
    }

}
  • CanalInstanceWithSpring繼承了AbstractCanalInstance,它專門給註冊到spring容器使用

小結

CanalInstance繼承了CanalLifeCycle,它還定義了getDestination、getEventParser、getEventSink、getEventStore、getMetaManager、getAlarmHandler、subscribeChange、getMqConfig方法code

doc

相關文章
相關標籤/搜索