本文主要研究一下CanalEventDownStreamHandlerjava
canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/CanalEventDownStreamHandler.javagit
public interface CanalEventDownStreamHandler<T> extends CanalLifeCycle { /** * 提交到store以前作一下處理,容許替換Event */ public T before(T events); /** * store處於full後,retry時處理作一下處理 */ public T retry(T events); /** * 提交store成功後作一下處理 */ public T after(T events); }
canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/AbstractCanalEventDownStreamHandler.javagithub
public class AbstractCanalEventDownStreamHandler<T> extends AbstractCanalLifeCycle implements CanalEventDownStreamHandler<T> { public T before(T events) { return events; } public T retry(T events) { return events; } public T after(T events) { return events; } }
canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/entry/HeartBeatEntryEventHandler.javaide
public class HeartBeatEntryEventHandler extends AbstractCanalEventDownStreamHandler<List<Event>> { public List<Event> before(List<Event> events) { boolean existHeartBeat = false; for (Event event : events) { if (event.getEntryType() == EntryType.HEARTBEAT) { existHeartBeat = true; } } if (!existHeartBeat) { return events; } else { // 目前heartbeat和其餘事件是分離的,保險一點仍是作一下檢查處理 List<Event> result = new ArrayList<Event>(); for (Event event : events) { if (event.getEntryType() != EntryType.HEARTBEAT) { result.add(event); } } return result; } } }
canal-1.1.4/prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/PrometheusCanalEventDownStreamHandler.javacode
public class PrometheusCanalEventDownStreamHandler extends AbstractCanalEventDownStreamHandler<List<Event>> { private final AtomicLong latestExecuteTime = new AtomicLong(System.currentTimeMillis()); private final AtomicLong transactionCounter = new AtomicLong(0L); @Override public List<Event> before(List<Event> events) { long localExecTime = 0L; if (events != null && !events.isEmpty()) { for (Event e : events) { EntryType type = e.getEntryType(); if (type == null) continue; switch (type) { case TRANSACTIONBEGIN: { long exec = e.getExecuteTime(); if (exec > 0) localExecTime = exec; break; } case ROWDATA: { long exec = e.getExecuteTime(); if (exec > 0) localExecTime = exec; break; } case TRANSACTIONEND: { long exec = e.getExecuteTime(); if (exec > 0) localExecTime = exec; transactionCounter.incrementAndGet(); break; } case HEARTBEAT: CanalEntry.EventType eventType = e.getEventType(); if (eventType == CanalEntry.EventType.MHEARTBEAT) { localExecTime = System.currentTimeMillis(); } break; default: break; } } if (localExecTime > 0) { latestExecuteTime.lazySet(localExecTime); } } return events; } @Override public void start() { super.start(); } @Override public void stop() { super.stop(); } public AtomicLong getLatestExecuteTime() { return latestExecuteTime; } public AtomicLong getTransactionCounter() { return transactionCounter; } }
CanalEventDownStreamHandler繼承了CanalLifeCycle接口,它定義了before、retry、after方法;AbstractCanalEventDownStreamHandler繼承了AbstractCanalLifeCycle,實現了CanalEventDownStreamHandler接口,before、retry、after方法默認返回入參的events;目前有HeartBeatEntryEventHandler及PrometheusCanalEventDownStreamHandler繼承了AbstractCanalEventDownStreamHandler繼承