易用的 canal java 客戶端 canal-client

易用的canaljava 客戶端

canal 自身提供了簡單的客戶端,數據格式較爲複雜,處理消費數據也不太方便,爲了方便給業務使用,提供一種直接能獲取實體對象的方式來進行消費才更方便。
先說一下實現的思路,首先canal 客戶端的消息對象有兩種,message 和 flatMessage,分別是普通的消息(protobuf格式)和消息隊列的扁平消息(json格式),如今將這兩種消息轉化爲咱們直接使用的 model 對象,根據消息中的數據庫表名稱找到對應的的實體對象,那麼如何根據數據庫表名找到實體對象呢?
第一種方式,若是咱們的實體對象都使用JPA 的 @Table註解來標識表和實體的對應關係,可使用該註解來找到實體對象和表名的關係
第二種方式,可使用自定義註解的來標註實體和表名的關係,爲解耦各個表的處理,咱們使用策略模式來封裝各個表的增刪改操做java

canal 主要客戶端類

ClientIdentity

canal client和server交互之間的身份標識,目前clientId寫死爲1001. (目前canal server上的一個instance只能有一個client消費,clientId的設計是爲1個instance多client消費模式而預留的)git

CanalConnector

SimpleCanalConnector/ClusterCanalConnector : 兩種connector的實現,simple針對的是簡單的ip直連模式,cluster針對多ip的模式,可依賴CanalNodeAccessStrategy進行failover控制github

CanalNodeAccessStrategy

SimpleNodeAccessStrategy/ClusterNodeAccessStrategy:兩種failover的實現,simple針對給定的初始ip列表進行failover選擇,cluster基於zookeeper上的cluster節點動態選擇正在運行的canal server.數據庫

ClientRunningMonitor/ClientRunningListener/ClientRunningData

client running相關控制,主要爲解決client自身的failover機制。canal client容許同時啓動多個canal client,經過running機制,可保證只有一個client在工做,其餘client作爲冷備. 當運行中的client掛了,running會控制讓冷備中的client轉爲工做模式,這樣就能夠確保canal client也不會是單點. 保證整個系統的高可用性.json

Canal 客戶端類型

canal 客戶端能夠主要分如下幾種類型異步

單一ip 直連模式

這種方式下,能夠啓動多個客戶端,鏈接同一個canal 服務端,多個客戶端只有一個client 工做,其餘的能夠做爲冷備,當一個client的掛了,其餘的客戶端會有一個進入工做模式
缺點:鏈接同一個服務端,若是服務端掛了將致使不可用ide

多ip 模式

這種方式下,客戶端鏈接多個canal服務端,一個客戶端隨機選擇一個canal server 消費,當這個server 掛了,會選擇另一個進行消費
缺點:不支持訂閱消費ui

zookeeper 模式

使用zookeeper來server,client 的狀態,當兩個canal server 鏈接zookeeper 後,
優先鏈接的節點做爲 活躍節點,client從活躍節點消費,當server掛了之後,從另一個節點消費
缺點:不支持訂閱消費this

消息 隊列模式

canal 支持消息直接發送到消息隊列,從消息隊列消費,目前支持的有kafka 和rocketMq,這種方式支持訂閱消費線程

canal 客戶端實現

EntryHandler 實體消息處理器

首先定義一個策略接口,定義增長,更新,刪除功能,使用java 8聲明方法爲default,讓客戶端選擇實現其中的方法,提升靈活性,客戶端實現EntryHandler接口後,會返回基於handler中的泛型的實例對象,在對應的方法中實現自定義邏輯

public interface EntryHandler<T> {

    default void insert(T t) {

    }


    default void update(T before, T after) {

    }


    default void delete(T t) {

    }
}

定義一個canalClient 的抽象類,封裝canal 的連接開啓關閉操做,啓動一個線程不斷去消費canal 數據,依賴一個 messageHandler 封裝消息處理的邏輯

public abstract class AbstractCanalClient implements CanalClient {



    @Override
    public void start() {
        log.info("start canal client");
        workThread = new Thread(this::process);
        workThread.setName("canal-client-thread");
        flag = true;
        workThread.start();
    }

    @Override
    public void stop() {
        log.info("stop canal client");
        flag = false;
        if (null != workThread) {
            workThread.interrupt();
        }

    }

    @Override
    public void process() {
        if (flag) {
            try {
                connector.connect();
                connector.subscribe(filter);
                while (flag) {
                    Message message = connector.getWithoutAck(batchSize, timeout, unit);
                    log.info("獲取消息 {}", message);
                    long batchId = message.getId();
                    if (message.getId() != -1 && message.getEntries().size() != 0) {
                        messageHandler.handleMessage(message);
                    }
                    connector.ack(batchId);
                }
            } catch (Exception e) {
                log.error("canal client 異常", e);
            } finally {
                connector.disconnect();
            }
        }
    }

}

基於該抽象類,分別提供各類客戶端的實現

  1. SimpleCanalClient
  2. ClusterCanalClient
  3. ZookeeperCanalClient
  4. KafkaCanalClient

消息處理器 messageHandler

消息處理器 messageHandler 封裝了消息處理邏輯,其中定義了一個消息處理方法

public interface MessageHandler<T> {

     void handleMessage(T t);

}

消息處理器可能要適配4種狀況,分別是消費message,flatMessage和兩種消息的同步與異步消費
消息處理的工做主要有兩個

  1. 獲取增刪改的行數據,交給行處理器繼續處理
  2. 在上下文對象中保存其餘的數據,例如庫名,表名,binlog 時間戳等等數據

首先咱們封裝一個抽象的 message 消息處理器,實現MessageHandler接口

public abstract class AbstractMessageHandler implements MessageHandler<Message> {


    @Override
    public void handleMessage(Message message) {
        List<CanalEntry.Entry> entries = message.getEntries();
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType().equals(CanalEntry.EntryType.ROWDATA)) {
                try {
                    EntryHandler entryHandler = HandlerUtil.getEntryHandler(entryHandlers, entry.getHeader().getTableName());
                    if(entryHandler!=null){
                        CanalModel model = CanalModel.Builder.builder().id(message.getId()).table(entry.getHeader().getTableName())
                                .executeTime(entry.getHeader().getExecuteTime()).database(entry.getHeader().getSchemaName()).build();
                        CanalContext.setModel(model);
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                        List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
                        CanalEntry.EventType eventType = rowChange.getEventType();
                        for (CanalEntry.RowData rowData : rowDataList) {
                            rowDataHandler.handlerRowData(rowData,entryHandler,eventType);
                        }
                    }
                } catch (Exception e) {
                    throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                }finally {
                   CanalContext.removeModel();
                }

            }
        }
    }
}

分別定義兩個實現類,同步與異步實現類,繼承AbstractMessageHandler抽象類

public class SyncMessageHandlerImpl extends AbstractMessageHandler {


    public SyncMessageHandlerImpl(List<? extends EntryHandler> entryHandlers, RowDataHandler<CanalEntry.RowData> rowDataHandler) {
        super(entryHandlers, rowDataHandler);
    }

    @Override
    public void handleMessage(Message message) {
        super.handleMessage(message);
    }
}
public class AsyncMessageHandlerImpl extends AbstractMessageHandler {


    private ExecutorService executor;


    public AsyncMessageHandlerImpl(List<? extends EntryHandler> entryHandlers, RowDataHandler<CanalEntry.RowData> rowDataHandler, ExecutorService executor) {
        super(entryHandlers, rowDataHandler);
        this.executor = executor;
    }

    @Override
    public void handleMessage(Message message) {
        executor.execute(() -> super.handleMessage(message));
    }
}

RowDataHandler 行消息處理器

消息處理器依賴的行消息處理器主要是將原始的column list 轉爲 實體對象,並將相應的增刪改消息交給相應的hangler對象方法,行消息處理器分別須要處理兩種對象,一個是 message的行數據 和 flatMessage 的行數據

public interface RowDataHandler<T> {


    void handlerRowData(T t, EntryHandler entryHandler, CanalEntry.EventType eventType) throws Exception;
}

兩個行處理器的實現爲

public class RowDataHandlerImpl implements RowDataHandler<CanalEntry.RowData> {



    private IModelFactory<List<CanalEntry.Column>> modelFactory;




    public RowDataHandlerImpl(IModelFactory modelFactory) {
        this.modelFactory = modelFactory;
    }

    @Override
    public void handlerRowData(CanalEntry.RowData rowData, EntryHandler entryHandler, CanalEntry.EventType eventType) throws Exception {
        if (entryHandler != null) {
            switch (eventType) {
                case INSERT:
                    Object object = modelFactory.newInstance(entryHandler, rowData.getAfterColumnsList());
                    entryHandler.insert(object);
                    break;
                case UPDATE:
                    Set<String> updateColumnSet = rowData.getAfterColumnsList().stream().filter(CanalEntry.Column::getUpdated)
                            .map(CanalEntry.Column::getName).collect(Collectors.toSet());
                    Object before = modelFactory.newInstance(entryHandler, rowData.getBeforeColumnsList(),updateColumnSet);
                    Object after = modelFactory.newInstance(entryHandler, rowData.getAfterColumnsList());
                    entryHandler.update(before, after);
                    break;
                case DELETE:
                    Object o = modelFactory.newInstance(entryHandler, rowData.getBeforeColumnsList());
                    entryHandler.delete(o);
                    break;
                default:
                    break;
            }
        }
    }
}
public class MapRowDataHandlerImpl implements RowDataHandler<List<Map<String, String>>> {



    private IModelFactory<Map<String,String>> modelFactory;


    public MapRowDataHandlerImpl(IModelFactory<Map<String, String>> modelFactory) {
        this.modelFactory = modelFactory;
    }

    @Override
    public void handlerRowData(List<Map<String, String>> list, EntryHandler entryHandler, CanalEntry.EventType eventType) throws Exception{
        if (entryHandler != null) {
            switch (eventType) {
                case INSERT:
                    Object object = modelFactory.newInstance(entryHandler, list.get(0));
                    entryHandler.insert(object);
                    break;
                case UPDATE:
                    Object before = modelFactory.newInstance(entryHandler, list.get(1));
                    Object after = modelFactory.newInstance(entryHandler, list.get(0));
                    entryHandler.update(before, after);
                    break;
                case DELETE:
                    Object o = modelFactory.newInstance(entryHandler, list.get(0));
                    entryHandler.delete(o);
                    break;
                default:
                    break;
            }
        }
    }
}

IModelFactory bean實例建立工廠

行消息處理的依賴的工廠 主要是是經過反射建立與表名稱對應的bean實例

public interface IModelFactory<T> {


    Object newInstance(EntryHandler entryHandler, T t) throws Exception;


    default Object newInstance(EntryHandler entryHandler, T t, Set<String> updateColumn) throws Exception {
        return null;
    }
}

CanalContext canal 消息上下文

目前主要用於保存bean實例之外的其餘數據,使用threadLocal實現

代碼已在github開源canal-client

相關文章
相關標籤/搜索