聊聊CanalEventDownStreamHandler

本文主要研究一下CanalEventDownStreamHandlerjava

CanalEventDownStreamHandler

canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/CanalEventDownStreamHandler.javagit

public interface CanalEventDownStreamHandler<T> extends CanalLifeCycle {

    /**
     * 提交到store以前作一下處理,容許替換Event
     */
    public T before(T events);

    /**
     * store處於full後,retry時處理作一下處理
     */
    public T retry(T events);

    /**
     * 提交store成功後作一下處理
     */
    public T after(T events);
}
  • CanalEventDownStreamHandler繼承了CanalLifeCycle接口,它定義了before、retry、after方法

AbstractCanalEventDownStreamHandler

canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/AbstractCanalEventDownStreamHandler.javagithub

public class AbstractCanalEventDownStreamHandler<T> extends AbstractCanalLifeCycle implements CanalEventDownStreamHandler<T> {

    public T before(T events) {
        return events;
    }

    public T retry(T events) {
        return events;
    }

    public T after(T events) {
        return events;
    }

}
  • AbstractCanalEventDownStreamHandler繼承了AbstractCanalLifeCycle,實現了CanalEventDownStreamHandler接口,before、retry、after方法默認返回入參的events

HeartBeatEntryEventHandler

canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/entry/HeartBeatEntryEventHandler.javaide

public class HeartBeatEntryEventHandler extends AbstractCanalEventDownStreamHandler<List<Event>> {

    public List<Event> before(List<Event> events) {
        boolean existHeartBeat = false;
        for (Event event : events) {
            if (event.getEntryType() == EntryType.HEARTBEAT) {
                existHeartBeat = true;
            }
        }

        if (!existHeartBeat) {
            return events;
        } else {
            // 目前heartbeat和其餘事件是分離的,保險一點仍是作一下檢查處理
            List<Event> result = new ArrayList<Event>();
            for (Event event : events) {
                if (event.getEntryType() != EntryType.HEARTBEAT) {
                    result.add(event);
                }
            }

            return result;
        }
    }

}
  • HeartBeatEntryEventHandler繼承了AbstractCanalEventDownStreamHandler,其before方法遍歷events判斷是否有EntryType.HEARTBEAT類型的event,若是沒有則當即返回events,若是有則返回非heartbeat的事件

PrometheusCanalEventDownStreamHandler

canal-1.1.4/prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/PrometheusCanalEventDownStreamHandler.javacode

public class PrometheusCanalEventDownStreamHandler extends AbstractCanalEventDownStreamHandler<List<Event>> {

    private final AtomicLong latestExecuteTime  = new AtomicLong(System.currentTimeMillis());
    private final AtomicLong transactionCounter = new AtomicLong(0L);

    @Override
    public List<Event> before(List<Event> events) {
        long localExecTime = 0L;
        if (events != null && !events.isEmpty()) {
            for (Event e : events) {
                EntryType type = e.getEntryType();
                if (type == null) continue;
                switch (type) {
                    case TRANSACTIONBEGIN: {
                        long exec = e.getExecuteTime();
                        if (exec > 0) localExecTime = exec;
                        break;
                    }
                    case ROWDATA: {
                        long exec = e.getExecuteTime();
                        if (exec > 0) localExecTime = exec;
                        break;
                    }
                    case TRANSACTIONEND: {
                        long exec = e.getExecuteTime();
                        if (exec > 0) localExecTime = exec;
                        transactionCounter.incrementAndGet();
                        break;
                    }
                    case HEARTBEAT:
                        CanalEntry.EventType eventType = e.getEventType();
                        if (eventType == CanalEntry.EventType.MHEARTBEAT) {
                            localExecTime = System.currentTimeMillis();
                        }
                        break;
                    default:
                        break;
                }
            }
            if (localExecTime > 0) {
                latestExecuteTime.lazySet(localExecTime);
            }
        }
        return events;
    }

    @Override
    public void start() {

        super.start();
    }

    @Override
    public void stop() {
        super.stop();
    }

    public AtomicLong getLatestExecuteTime() {
        return latestExecuteTime;
    }

    public AtomicLong getTransactionCounter() {
        return transactionCounter;
    }

}
  • PrometheusCanalEventDownStreamHandler繼承了AbstractCanalEventDownStreamHandler,其before方法遍歷events,而後根據不一樣的EntryType來更新localExecTime、transactionCounter、latestExecuteTime

小結

CanalEventDownStreamHandler繼承了CanalLifeCycle接口,它定義了before、retry、after方法;AbstractCanalEventDownStreamHandler繼承了AbstractCanalLifeCycle,實現了CanalEventDownStreamHandler接口,before、retry、after方法默認返回入參的events;目前有HeartBeatEntryEventHandler及PrometheusCanalEventDownStreamHandler繼承了AbstractCanalEventDownStreamHandler繼承

doc

相關文章
相關標籤/搜索