本文主要研究一下CanalEventSinkjava
canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/CanalEventSink.javagit
public interface CanalEventSink<T> extends CanalLifeCycle { /** * 提交數據 * * @param event * @param remoteAddress * @param destination * @throws CanalSinkException * @throws InterruptedException */ boolean sink(T event, InetSocketAddress remoteAddress, String destination) throws CanalSinkException, InterruptedException; /** * 中斷消費,好比解析模塊發生了切換,想臨時中斷當前的merge請求,清理對應的上下文狀態,可見{@linkplain GroupEventSink} */ void interrupt(); }
canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/AbstractCanalEventSink.javagithub
public abstract class AbstractCanalEventSink<T> extends AbstractCanalLifeCycle implements CanalEventSink<T> { protected CanalEventFilter filter; protected List<CanalEventDownStreamHandler> handlers = new ArrayList<CanalEventDownStreamHandler>(); public void setFilter(CanalEventFilter filter) { this.filter = filter; } public void addHandler(CanalEventDownStreamHandler handler) { this.handlers.add(handler); } public CanalEventDownStreamHandler getHandler(int index) { return this.handlers.get(index); } public void addHandler(CanalEventDownStreamHandler handler, int index) { this.handlers.add(index, handler); } public void removeHandler(int index) { this.handlers.remove(index); } public void removeHandler(CanalEventDownStreamHandler handler) { this.handlers.remove(handler); } public CanalEventFilter getFilter() { return filter; } public List<CanalEventDownStreamHandler> getHandlers() { return handlers; } public void interrupt() { // do nothing } }
canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/entry/EntryEventSink.java數據庫
public class EntryEventSink extends AbstractCanalEventSink<List<CanalEntry.Entry>> implements CanalEventSink<List<CanalEntry.Entry>> { private static final Logger logger = LoggerFactory.getLogger(EntryEventSink.class); private static final int maxFullTimes = 10; private CanalEventStore<Event> eventStore; protected boolean filterTransactionEntry = false; // 是否須要儘量過濾事務頭/尾 protected boolean filterEmtryTransactionEntry = true; // 是否須要過濾空的事務頭/尾 protected long emptyTransactionInterval = 5 * 1000; // 空的事務輸出的頻率 protected long emptyTransctionThresold = 8192; // 超過8192個事務頭,輸出一個 protected volatile long lastTransactionTimestamp = 0L; protected AtomicLong lastTransactionCount = new AtomicLong(0L); protected volatile long lastEmptyTransactionTimestamp = 0L; protected AtomicLong lastEmptyTransactionCount = new AtomicLong(0L); protected AtomicLong eventsSinkBlockingTime = new AtomicLong(0L); protected boolean raw; public EntryEventSink(){ addHandler(new HeartBeatEntryEventHandler()); } public void start() { super.start(); Assert.notNull(eventStore); if (eventStore instanceof MemoryEventStoreWithBuffer) { this.raw = ((MemoryEventStoreWithBuffer) eventStore).isRaw(); } for (CanalEventDownStreamHandler handler : getHandlers()) { if (!handler.isStart()) { handler.start(); } } } public void stop() { super.stop(); for (CanalEventDownStreamHandler handler : getHandlers()) { if (handler.isStart()) { handler.stop(); } } } public boolean sink(List<CanalEntry.Entry> entrys, InetSocketAddress remoteAddress, String destination) { return sinkData(entrys, remoteAddress); } //...... }
canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/entry/EntryEventSink.javaapp
public class EntryEventSink extends AbstractCanalEventSink<List<CanalEntry.Entry>> implements CanalEventSink<List<CanalEntry.Entry>> { //...... private boolean sinkData(List<CanalEntry.Entry> entrys, InetSocketAddress remoteAddress) throws InterruptedException { boolean hasRowData = false; boolean hasHeartBeat = false; List<Event> events = new ArrayList<Event>(); for (CanalEntry.Entry entry : entrys) { if (!doFilter(entry)) { continue; } if (filterTransactionEntry && (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND)) { long currentTimestamp = entry.getHeader().getExecuteTime(); // 基於必定的策略控制,放過空的事務頭和尾,便於及時更新數據庫位點,代表工做正常 if (lastTransactionCount.incrementAndGet() <= emptyTransctionThresold && Math.abs(currentTimestamp - lastTransactionTimestamp) <= emptyTransactionInterval) { continue; } else { lastTransactionCount.set(0L); lastTransactionTimestamp = currentTimestamp; } } hasRowData |= (entry.getEntryType() == EntryType.ROWDATA); hasHeartBeat |= (entry.getEntryType() == EntryType.HEARTBEAT); Event event = new Event(new LogIdentity(remoteAddress, -1L), entry, raw); events.add(event); } if (hasRowData || hasHeartBeat) { // 存在row記錄 或者 存在heartbeat記錄,直接跳給後續處理 return doSink(events); } else { // 須要過濾的數據 if (filterEmtryTransactionEntry && !CollectionUtils.isEmpty(events)) { long currentTimestamp = events.get(0).getExecuteTime(); // 基於必定的策略控制,放過空的事務頭和尾,便於及時更新數據庫位點,代表工做正常 if (Math.abs(currentTimestamp - lastEmptyTransactionTimestamp) > emptyTransactionInterval || lastEmptyTransactionCount.incrementAndGet() > emptyTransctionThresold) { lastEmptyTransactionCount.set(0L); lastEmptyTransactionTimestamp = currentTimestamp; return doSink(events); } } // 直接返回true,忽略空的事務頭和尾 return true; } } protected boolean doSink(List<Event> events) { for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) { events = handler.before(events); } long blockingStart = 0L; int fullTimes = 0; do { if (eventStore.tryPut(events)) { if (fullTimes > 0) { eventsSinkBlockingTime.addAndGet(System.nanoTime() - blockingStart); } for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) { events = handler.after(events); } return true; } else { if (fullTimes == 0) { blockingStart = System.nanoTime(); } applyWait(++fullTimes); if (fullTimes % 100 == 0) { long nextStart = System.nanoTime(); eventsSinkBlockingTime.addAndGet(nextStart - blockingStart); blockingStart = nextStart; } } for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) { events = handler.retry(events); } } while (running && !Thread.interrupted()); return false; } //...... }
CanalEventSink繼承了CanalLifeCycle,它定義了sink、interrupt接口;AbstractCanalEventSink繼承了AbstractCanalLifeCycle,聲明實現了CanalEventSink接口;它定義了filter及handlers兩個屬性;EntryEventSink繼承了AbstractCanalEventSink,聲明實現了CanalEventSink,其start方法會遍歷handlers,挨個執行handler.start方法;其stop方法會遍歷handlers,挨個執行handler.stop方法;其sink方法執行的是sinkData方法this