聊聊CanalEventSink

本文主要研究一下CanalEventSinkjava

CanalEventSink

canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/CanalEventSink.javagit

public interface CanalEventSink<T> extends CanalLifeCycle {

    /**
     * 提交數據
     * 
     * @param event
     * @param remoteAddress
     * @param destination
     * @throws CanalSinkException
     * @throws InterruptedException
     */
    boolean sink(T event, InetSocketAddress remoteAddress, String destination) throws CanalSinkException,
                                                                              InterruptedException;

    /**
     * 中斷消費,好比解析模塊發生了切換,想臨時中斷當前的merge請求,清理對應的上下文狀態,可見{@linkplain GroupEventSink}
     */
    void interrupt();

}
  • CanalEventSink繼承了CanalLifeCycle,它定義了sink、interrupt接口

AbstractCanalEventSink

canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/AbstractCanalEventSink.javagithub

public abstract class AbstractCanalEventSink<T> extends AbstractCanalLifeCycle implements CanalEventSink<T> {

    protected CanalEventFilter                  filter;
    protected List<CanalEventDownStreamHandler> handlers = new ArrayList<CanalEventDownStreamHandler>();

    public void setFilter(CanalEventFilter filter) {
        this.filter = filter;
    }

    public void addHandler(CanalEventDownStreamHandler handler) {
        this.handlers.add(handler);
    }

    public CanalEventDownStreamHandler getHandler(int index) {
        return this.handlers.get(index);
    }

    public void addHandler(CanalEventDownStreamHandler handler, int index) {
        this.handlers.add(index, handler);
    }

    public void removeHandler(int index) {
        this.handlers.remove(index);
    }

    public void removeHandler(CanalEventDownStreamHandler handler) {
        this.handlers.remove(handler);
    }

    public CanalEventFilter getFilter() {
        return filter;
    }

    public List<CanalEventDownStreamHandler> getHandlers() {
        return handlers;
    }

    public void interrupt() {
        // do nothing
    }

}
  • AbstractCanalEventSink繼承了AbstractCanalLifeCycle,聲明實現了CanalEventSink接口;它定義了filter及handlers兩個屬性

EntryEventSink

canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/entry/EntryEventSink.java數據庫

public class EntryEventSink extends AbstractCanalEventSink<List<CanalEntry.Entry>> implements CanalEventSink<List<CanalEntry.Entry>> {

    private static final Logger    logger                        = LoggerFactory.getLogger(EntryEventSink.class);
    private static final int       maxFullTimes                  = 10;
    private CanalEventStore<Event> eventStore;
    protected boolean              filterTransactionEntry        = false;                                        // 是否須要儘量過濾事務頭/尾
    protected boolean              filterEmtryTransactionEntry   = true;                                         // 是否須要過濾空的事務頭/尾
    protected long                 emptyTransactionInterval      = 5 * 1000;                                     // 空的事務輸出的頻率
    protected long                 emptyTransctionThresold       = 8192;                                         // 超過8192個事務頭,輸出一個

    protected volatile long        lastTransactionTimestamp      = 0L;
    protected AtomicLong           lastTransactionCount          = new AtomicLong(0L);
    protected volatile long        lastEmptyTransactionTimestamp = 0L;
    protected AtomicLong           lastEmptyTransactionCount     = new AtomicLong(0L);
    protected AtomicLong           eventsSinkBlockingTime        = new AtomicLong(0L);
    protected boolean              raw;

    public EntryEventSink(){
        addHandler(new HeartBeatEntryEventHandler());
    }

    public void start() {
        super.start();
        Assert.notNull(eventStore);

        if (eventStore instanceof MemoryEventStoreWithBuffer) {
            this.raw = ((MemoryEventStoreWithBuffer) eventStore).isRaw();
        }

        for (CanalEventDownStreamHandler handler : getHandlers()) {
            if (!handler.isStart()) {
                handler.start();
            }
        }
    }

    public void stop() {
        super.stop();

        for (CanalEventDownStreamHandler handler : getHandlers()) {
            if (handler.isStart()) {
                handler.stop();
            }
        }
    }

    public boolean sink(List<CanalEntry.Entry> entrys, InetSocketAddress remoteAddress, String destination) {
        return sinkData(entrys, remoteAddress);
    }

    //......
}
  • EntryEventSink繼承了AbstractCanalEventSink,聲明實現了CanalEventSink,其start方法會遍歷handlers,挨個執行handler.start方法;其stop方法會遍歷handlers,挨個執行handler.stop方法;其sink方法執行的是sinkData方法

sinkData

canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/entry/EntryEventSink.javaapp

public class EntryEventSink extends AbstractCanalEventSink<List<CanalEntry.Entry>> implements CanalEventSink<List<CanalEntry.Entry>> {

    //......

    private boolean sinkData(List<CanalEntry.Entry> entrys, InetSocketAddress remoteAddress)
                                                                                            throws InterruptedException {
        boolean hasRowData = false;
        boolean hasHeartBeat = false;
        List<Event> events = new ArrayList<Event>();
        for (CanalEntry.Entry entry : entrys) {
            if (!doFilter(entry)) {
                continue;
            }

            if (filterTransactionEntry
                && (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND)) {
                long currentTimestamp = entry.getHeader().getExecuteTime();
                // 基於必定的策略控制,放過空的事務頭和尾,便於及時更新數據庫位點,代表工做正常
                if (lastTransactionCount.incrementAndGet() <= emptyTransctionThresold
                    && Math.abs(currentTimestamp - lastTransactionTimestamp) <= emptyTransactionInterval) {
                    continue;
                } else {
                    lastTransactionCount.set(0L);
                    lastTransactionTimestamp = currentTimestamp;
                }
            }

            hasRowData |= (entry.getEntryType() == EntryType.ROWDATA);
            hasHeartBeat |= (entry.getEntryType() == EntryType.HEARTBEAT);
            Event event = new Event(new LogIdentity(remoteAddress, -1L), entry, raw);
            events.add(event);
        }

        if (hasRowData || hasHeartBeat) {
            // 存在row記錄 或者 存在heartbeat記錄,直接跳給後續處理
            return doSink(events);
        } else {
            // 須要過濾的數據
            if (filterEmtryTransactionEntry && !CollectionUtils.isEmpty(events)) {
                long currentTimestamp = events.get(0).getExecuteTime();
                // 基於必定的策略控制,放過空的事務頭和尾,便於及時更新數據庫位點,代表工做正常
                if (Math.abs(currentTimestamp - lastEmptyTransactionTimestamp) > emptyTransactionInterval
                    || lastEmptyTransactionCount.incrementAndGet() > emptyTransctionThresold) {
                    lastEmptyTransactionCount.set(0L);
                    lastEmptyTransactionTimestamp = currentTimestamp;
                    return doSink(events);
                }
            }

            // 直接返回true,忽略空的事務頭和尾
            return true;
        }
    }

    protected boolean doSink(List<Event> events) {
        for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {
            events = handler.before(events);
        }
        long blockingStart = 0L;
        int fullTimes = 0;
        do {
            if (eventStore.tryPut(events)) {
                if (fullTimes > 0) {
                    eventsSinkBlockingTime.addAndGet(System.nanoTime() - blockingStart);
                }
                for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {
                    events = handler.after(events);
                }
                return true;
            } else {
                if (fullTimes == 0) {
                    blockingStart = System.nanoTime();
                }
                applyWait(++fullTimes);
                if (fullTimes % 100 == 0) {
                    long nextStart = System.nanoTime();
                    eventsSinkBlockingTime.addAndGet(nextStart - blockingStart);
                    blockingStart = nextStart;
                }
            }

            for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {
                events = handler.retry(events);
            }

        } while (running && !Thread.interrupted());
        return false;
    }

    //......

}
  • sinkData方法遍歷entrys,經過doFilter過濾掉一些entry,以後將entry轉換爲Event添加到events中,以後執行doSink方法;doSink方法遍歷handlers,挨個執行handler.before(events),以後執行eventStore.tryPut(events),而後遍歷handlers,挨個執行handler.after(events);若tryPut不成功,則遍歷handlers,挨個執行handler.retry(events)

小結

CanalEventSink繼承了CanalLifeCycle,它定義了sink、interrupt接口;AbstractCanalEventSink繼承了AbstractCanalLifeCycle,聲明實現了CanalEventSink接口;它定義了filter及handlers兩個屬性;EntryEventSink繼承了AbstractCanalEventSink,聲明實現了CanalEventSink,其start方法會遍歷handlers,挨個執行handler.start方法;其stop方法會遍歷handlers,挨個執行handler.stop方法;其sink方法執行的是sinkData方法this

doc

相關文章
相關標籤/搜索