canal 自身提供了簡單的客戶端,數據格式較爲複雜,處理消費數據也不太方便,爲了方便給業務使用,提供一種直接能獲取實體對象的方式來進行消費才更方便。
先說一下實現的思路,首先canal 客戶端的消息對象有兩種,message 和 flatMessage,分別是普通的消息(protobuf格式)和消息隊列的扁平消息(json格式),如今將這兩種消息轉化爲咱們直接使用的 model 對象,根據消息中的數據庫表名稱找到對應的的實體對象,那麼如何根據數據庫表名找到實體對象呢?
第一種方式,若是咱們的實體對象都使用JPA 的 @Table註解來標識表和實體的對應關係,可使用該註解來找到實體對象和表名的關係
第二種方式,可使用自定義註解的來標註實體和表名的關係,爲解耦各個表的處理,咱們使用策略模式來封裝各個表的增刪改操做java
canal client和server交互之間的身份標識,目前clientId寫死爲1001. (目前canal server上的一個instance只能有一個client消費,clientId的設計是爲1個instance多client消費模式而預留的)git
SimpleCanalConnector/ClusterCanalConnector : 兩種connector的實現,simple針對的是簡單的ip直連模式,cluster針對多ip的模式,可依賴CanalNodeAccessStrategy進行failover控制github
SimpleNodeAccessStrategy/ClusterNodeAccessStrategy:兩種failover的實現,simple針對給定的初始ip列表進行failover選擇,cluster基於zookeeper上的cluster節點動態選擇正在運行的canal server.數據庫
client running相關控制,主要爲解決client自身的failover機制。canal client容許同時啓動多個canal client,經過running機制,可保證只有一個client在工做,其餘client作爲冷備. 當運行中的client掛了,running會控制讓冷備中的client轉爲工做模式,這樣就能夠確保canal client也不會是單點. 保證整個系統的高可用性.json
canal 客戶端能夠主要分如下幾種類型異步
這種方式下,能夠啓動多個客戶端,鏈接同一個canal 服務端,多個客戶端只有一個client 工做,其餘的能夠做爲冷備,當一個client的掛了,其餘的客戶端會有一個進入工做模式
缺點:鏈接同一個服務端,若是服務端掛了將致使不可用ide
這種方式下,客戶端鏈接多個canal服務端,一個客戶端隨機選擇一個canal server 消費,當這個server 掛了,會選擇另一個進行消費
缺點:不支持訂閱消費ui
使用zookeeper來server,client 的狀態,當兩個canal server 鏈接zookeeper 後,
優先鏈接的節點做爲 活躍節點,client從活躍節點消費,當server掛了之後,從另一個節點消費
缺點:不支持訂閱消費this
canal 支持消息直接發送到消息隊列,從消息隊列消費,目前支持的有kafka 和rocketMq,這種方式支持訂閱消費線程
首先定義一個策略接口,定義增長,更新,刪除功能,使用java 8聲明方法爲default,讓客戶端選擇實現其中的方法,提升靈活性,客戶端實現EntryHandler接口後,會返回基於handler中的泛型的實例對象,在對應的方法中實現自定義邏輯
public interface EntryHandler<T> { default void insert(T t) { } default void update(T before, T after) { } default void delete(T t) { } }
定義一個canalClient 的抽象類,封裝canal 的連接開啓關閉操做,啓動一個線程不斷去消費canal 數據,依賴一個 messageHandler 封裝消息處理的邏輯
public abstract class AbstractCanalClient implements CanalClient { @Override public void start() { log.info("start canal client"); workThread = new Thread(this::process); workThread.setName("canal-client-thread"); flag = true; workThread.start(); } @Override public void stop() { log.info("stop canal client"); flag = false; if (null != workThread) { workThread.interrupt(); } } @Override public void process() { if (flag) { try { connector.connect(); connector.subscribe(filter); while (flag) { Message message = connector.getWithoutAck(batchSize, timeout, unit); log.info("獲取消息 {}", message); long batchId = message.getId(); if (message.getId() != -1 && message.getEntries().size() != 0) { messageHandler.handleMessage(message); } connector.ack(batchId); } } catch (Exception e) { log.error("canal client 異常", e); } finally { connector.disconnect(); } } } }
基於該抽象類,分別提供各類客戶端的實現
消息處理器 messageHandler 封裝了消息處理邏輯,其中定義了一個消息處理方法
public interface MessageHandler<T> { void handleMessage(T t); }
消息處理器可能要適配4種狀況,分別是消費message,flatMessage和兩種消息的同步與異步消費
消息處理的工做主要有兩個
首先咱們封裝一個抽象的 message 消息處理器,實現MessageHandler接口
public abstract class AbstractMessageHandler implements MessageHandler<Message> { @Override public void handleMessage(Message message) { List<CanalEntry.Entry> entries = message.getEntries(); for (CanalEntry.Entry entry : entries) { if (entry.getEntryType().equals(CanalEntry.EntryType.ROWDATA)) { try { EntryHandler entryHandler = HandlerUtil.getEntryHandler(entryHandlers, entry.getHeader().getTableName()); if(entryHandler!=null){ CanalModel model = CanalModel.Builder.builder().id(message.getId()).table(entry.getHeader().getTableName()) .executeTime(entry.getHeader().getExecuteTime()).database(entry.getHeader().getSchemaName()).build(); CanalContext.setModel(model); CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList(); CanalEntry.EventType eventType = rowChange.getEventType(); for (CanalEntry.RowData rowData : rowDataList) { rowDataHandler.handlerRowData(rowData,entryHandler,eventType); } } } catch (Exception e) { throw new RuntimeException("parse event has an error , data:" + entry.toString(), e); }finally { CanalContext.removeModel(); } } } } }
分別定義兩個實現類,同步與異步實現類,繼承AbstractMessageHandler抽象類
public class SyncMessageHandlerImpl extends AbstractMessageHandler { public SyncMessageHandlerImpl(List<? extends EntryHandler> entryHandlers, RowDataHandler<CanalEntry.RowData> rowDataHandler) { super(entryHandlers, rowDataHandler); } @Override public void handleMessage(Message message) { super.handleMessage(message); } }
public class AsyncMessageHandlerImpl extends AbstractMessageHandler { private ExecutorService executor; public AsyncMessageHandlerImpl(List<? extends EntryHandler> entryHandlers, RowDataHandler<CanalEntry.RowData> rowDataHandler, ExecutorService executor) { super(entryHandlers, rowDataHandler); this.executor = executor; } @Override public void handleMessage(Message message) { executor.execute(() -> super.handleMessage(message)); } }
消息處理器依賴的行消息處理器主要是將原始的column list 轉爲 實體對象,並將相應的增刪改消息交給相應的hangler對象方法,行消息處理器分別須要處理兩種對象,一個是 message的行數據 和 flatMessage 的行數據
public interface RowDataHandler<T> { void handlerRowData(T t, EntryHandler entryHandler, CanalEntry.EventType eventType) throws Exception; }
兩個行處理器的實現爲
public class RowDataHandlerImpl implements RowDataHandler<CanalEntry.RowData> { private IModelFactory<List<CanalEntry.Column>> modelFactory; public RowDataHandlerImpl(IModelFactory modelFactory) { this.modelFactory = modelFactory; } @Override public void handlerRowData(CanalEntry.RowData rowData, EntryHandler entryHandler, CanalEntry.EventType eventType) throws Exception { if (entryHandler != null) { switch (eventType) { case INSERT: Object object = modelFactory.newInstance(entryHandler, rowData.getAfterColumnsList()); entryHandler.insert(object); break; case UPDATE: Set<String> updateColumnSet = rowData.getAfterColumnsList().stream().filter(CanalEntry.Column::getUpdated) .map(CanalEntry.Column::getName).collect(Collectors.toSet()); Object before = modelFactory.newInstance(entryHandler, rowData.getBeforeColumnsList(),updateColumnSet); Object after = modelFactory.newInstance(entryHandler, rowData.getAfterColumnsList()); entryHandler.update(before, after); break; case DELETE: Object o = modelFactory.newInstance(entryHandler, rowData.getBeforeColumnsList()); entryHandler.delete(o); break; default: break; } } } }
public class MapRowDataHandlerImpl implements RowDataHandler<List<Map<String, String>>> { private IModelFactory<Map<String,String>> modelFactory; public MapRowDataHandlerImpl(IModelFactory<Map<String, String>> modelFactory) { this.modelFactory = modelFactory; } @Override public void handlerRowData(List<Map<String, String>> list, EntryHandler entryHandler, CanalEntry.EventType eventType) throws Exception{ if (entryHandler != null) { switch (eventType) { case INSERT: Object object = modelFactory.newInstance(entryHandler, list.get(0)); entryHandler.insert(object); break; case UPDATE: Object before = modelFactory.newInstance(entryHandler, list.get(1)); Object after = modelFactory.newInstance(entryHandler, list.get(0)); entryHandler.update(before, after); break; case DELETE: Object o = modelFactory.newInstance(entryHandler, list.get(0)); entryHandler.delete(o); break; default: break; } } } }
行消息處理的依賴的工廠 主要是是經過反射建立與表名稱對應的bean實例
public interface IModelFactory<T> { Object newInstance(EntryHandler entryHandler, T t) throws Exception; default Object newInstance(EntryHandler entryHandler, T t, Set<String> updateColumn) throws Exception { return null; } }
目前主要用於保存bean實例之外的其餘數據,使用threadLocal實現
代碼已在github開源canal-client