client的工做過程,須要咱們本身去編寫對應的邏輯,咱們目前只能從example寫的例子來看。目前examle中提供了兩個例子,一個是單機的,一個是集羣的cluster,咱們後續若是須要進行開發的話,其實也是開發咱們本身的client,以及client的一些邏輯。咱們主要看下集羣的client是如何實現和消費的,又是怎麼和server進行數據交互的。java
咱們來看看具體的代碼:node
protected void process() { int batchSize = 5 * 1024; while (running) { try { MDC.put("destination", destination); connector.connect(); connector.subscribe(); waiting = false; while (running) { Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的數據 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { // try { // Thread.sleep(1000); // } catch (InterruptedException e) { // } } else { printSummary(message, batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // 提交確認 // connector.rollback(batchId); // 處理失敗, 回滾數據 } } catch (Exception e) { logger.error("process error!", e); } finally { connector.disconnect(); MDC.remove("destination"); } } }
這個的這樣的過程是這樣的數據庫
咱們具體來看下。編程
CanalConnector主要有兩個實現,一個是SimpleCanalConnector,一個是ClusterCanalConnector,咱們主要看下ClusterCanalConnector,這也是咱們要用的一個模式。json
咱們用的時候,經過一個工廠類生成咱們須要的Connector,這裏的工廠類是CanalConnectors,裏面包含了生成ClusterCanalConnector的方法。服務器
public static CanalConnector newClusterConnector(String zkServers, String destination, String username, String password) { ClusterCanalConnector canalConnector = new ClusterCanalConnector(username, password, destination, new ClusterNodeAccessStrategy(destination, ZkClientx.getZkClient(zkServers))); canalConnector.setSoTimeout(30 * 1000); return canalConnector; }
用到的參數有zk的地址,canal的名稱,數據庫的帳號密碼。裏面有個ClusterNodeAccessStrategy是用來選擇client的策略,這個ClusterNodeAccessStrategy的構造方法裏面有些東西須要咱們關注下。網絡
public ClusterNodeAccessStrategy(String destination, ZkClientx zkClient){ this.zkClient = zkClient; childListener = new IZkChildListener() { public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { initClusters(currentChilds); } }; dataListener = new IZkDataListener() { public void handleDataDeleted(String dataPath) throws Exception { runningAddress = null; } public void handleDataChange(String dataPath, Object data) throws Exception { initRunning(data); } }; String clusterPath = ZookeeperPathUtils.getDestinationClusterRoot(destination); this.zkClient.subscribeChildChanges(clusterPath, childListener); initClusters(this.zkClient.getChildren(clusterPath)); String runningPath = ZookeeperPathUtils.getDestinationServerRunning(destination); this.zkClient.subscribeDataChanges(runningPath, dataListener); initRunning(this.zkClient.readData(runningPath, true)); }
這邊起了兩個監聽器,都是監聽server端的活動服務器的。一個是獲取全部的server列表,一個是獲取活動的server服務器,都是從zk的對應節點上去取的。session
獲取到CanalConnector以後,就是真正的鏈接了。在ClusterCanalConnector中,咱們能夠看到,其實他底層用的也是SimpleCanalConnector,只不過加了一個選擇的策略。app
public void connect() throws CanalClientException { if (connected) { return; } if (runningMonitor != null) { if (!runningMonitor.isStart()) { runningMonitor.start(); } } else { waitClientRunning(); if (!running) { return; } doConnect(); if (filter != null) { // 若是存在條件,說明是自動切換,基於上一次的條件訂閱一次 subscribe(filter); } if (rollbackOnConnect) { rollback(); } } connected = true; }
若是是集羣模式的客戶端,那麼這邊的runningMonitor不爲空,由於他進行了初始化。咱們主要看下runningMonitor.start()裏面的操做。socket
public void start() { super.start(); String path = ZookeeperPathUtils.getDestinationClientRunning(this.destination, clientData.getClientId()); zkClient.subscribeDataChanges(path, dataListener); initRunning(); }
這邊監聽的路徑是:/otter/canal/destinations/{destination}/{clientId}/running。若是有任何的變化,或節點的刪除,那麼執行dataListener裏面的操做。
dataListener = new IZkDataListener() { public void handleDataChange(String dataPath, Object data) throws Exception { MDC.put("destination", destination); ClientRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ClientRunningData.class); if (!isMine(runningData.getAddress())) { mutex.set(false); } if (!runningData.isActive() && isMine(runningData.getAddress())) { // 說明出現了主動釋放的操做,而且本機以前是active release = true; releaseRunning();// 完全釋放mainstem } activeData = (ClientRunningData) runningData; } public void handleDataDeleted(String dataPath) throws Exception { MDC.put("destination", destination); mutex.set(false); // 觸發一下退出,多是人爲干預的釋放操做或者網絡閃斷引發的session expired timeout processActiveExit(); if (!release && activeData != null && isMine(activeData.getAddress())) { // 若是上一次active的狀態就是本機,則即時觸發一下active搶佔 initRunning(); } else { // 不然就是等待delayTime,避免因網絡瞬端或者zk異常,致使出現頻繁的切換操做 delayExector.schedule(new Runnable() { public void run() { initRunning(); } }, delayTime, TimeUnit.SECONDS); } } };
這裏的註釋比較清楚,基本上若是數據發生了變化,那麼進行節點釋放後,將運行節點置爲活動節點。若是發生了數據刪除,那麼直接觸發退出,若是上一次的active狀態是本機,那麼觸發一下active搶佔,不然等待delayTime,默認5s後重試。下面咱們主要看下initRunning。
這塊主要是建立運行節點的臨時節點。節點路徑是/otter/canal/destinations/{destination}/{clientId},節點內容是ClientRunningData的json序列化結果。鏈接的代碼:
public InetSocketAddress processActiveEnter() { InetSocketAddress address = doConnect(); mutex.set(true); if (filter != null) { // 若是存在條件,說明是自動切換,基於上一次的條件訂閱一次 subscribe(filter); } if (rollbackOnConnect) { rollback(); } return address; }
這塊有幾段邏輯,咱們慢慢看下。
這裏是client直接連上了server,經過socket鏈接,也就是server暴露的socket端口。
private InetSocketAddress doConnect() throws CanalClientException { try { channel = SocketChannel.open(); channel.socket().setSoTimeout(soTimeout); SocketAddress address = getAddress(); if (address == null) { address = getNextAddress(); } channel.connect(address); readableChannel = Channels.newChannel(channel.socket().getInputStream()); writableChannel = Channels.newChannel(channel.socket().getOutputStream()); Packet p = Packet.parseFrom(readNextPacket()); if (p.getVersion() != 1) { throw new CanalClientException("unsupported version at this client."); } if (p.getType() != PacketType.HANDSHAKE) { throw new CanalClientException("expect handshake but found other type."); } // Handshake handshake = Handshake.parseFrom(p.getBody()); supportedCompressions.addAll(handshake.getSupportedCompressionsList()); // ClientAuth ca = ClientAuth.newBuilder() .setUsername(username != null ? username : "") .setPassword(ByteString.copyFromUtf8(password != null ? password : "")) .setNetReadTimeout(soTimeout) .setNetWriteTimeout(soTimeout) .build(); writeWithHeader(Packet.newBuilder() .setType(PacketType.CLIENTAUTHENTICATION) .setBody(ca.toByteString()) .build() .toByteArray()); // Packet ack = Packet.parseFrom(readNextPacket()); if (ack.getType() != PacketType.ACK) { throw new CanalClientException("unexpected packet type when ack is expected"); } Ack ackBody = Ack.parseFrom(ack.getBody()); if (ackBody.getErrorCode() > 0) { throw new CanalClientException("something goes wrong when doing authentication: " + ackBody.getErrorMessage()); } connected = true; return new InetSocketAddress(channel.socket().getLocalAddress(), channel.socket().getLocalPort()); } catch (IOException e) { throw new CanalClientException(e); } }
這邊採用NIO編程,創建和server的socket鏈接後,發送了握手包和認證包,當收到ack包後,認爲鏈接成功。認證包的服務端處理在ClientAuthenticationHandler類中,握手處理在HandshakeInitializationHandler類。
server接收到認證的消息後,會作以下的處理:
public void messageReceived(final ChannelHandlerContext ctx, MessageEvent e) throws Exception { ChannelBuffer buffer = (ChannelBuffer) e.getMessage(); final Packet packet = Packet.parseFrom(buffer.readBytes(buffer.readableBytes()).array()); switch (packet.getVersion()) { case SUPPORTED_VERSION: default: final ClientAuth clientAuth = ClientAuth.parseFrom(packet.getBody()); // 若是存在訂閱信息 if (StringUtils.isNotEmpty(clientAuth.getDestination()) && StringUtils.isNotEmpty(clientAuth.getClientId())) { ClientIdentity clientIdentity = new ClientIdentity(clientAuth.getDestination(), Short.valueOf(clientAuth.getClientId()), clientAuth.getFilter()); try { MDC.put("destination", clientIdentity.getDestination()); embeddedServer.subscribe(clientIdentity); ctx.setAttachment(clientIdentity);// 設置狀態數據 // 嘗試啓動,若是已經啓動,忽略 if (!embeddedServer.isStart(clientIdentity.getDestination())) { ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(clientIdentity.getDestination()); if (!runningMonitor.isStart()) { runningMonitor.start(); } } } finally { MDC.remove("destination"); } } NettyUtils.ack(ctx.getChannel(), new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { //忽略 } }); break; } }
主要的邏輯在subscribe裏面。若是metaManager沒有啓動,那麼須要進行啓動。啓動時,會從zk節點下面拉取一些數據,包括客戶端的消費位點狀況等等。而後就是訂閱,訂閱是新建一個zk節點,路徑爲/otter/canal/destinations/{destination}/{clientId}。而後還有一些過濾器,也須要寫到zk中。以後就是獲取一下本client的位點信息,若是原來zk中包含,那麼直接從內存中獲取,不然取eventStore的第一條數據。
發送訂閱消息給server,經過socket的方式。這邊是判斷,若是filter不爲空,才發送訂閱消息。服務端的處理過程是這樣的:
case SUBSCRIPTION: Sub sub = Sub.parseFrom(packet.getBody()); if (StringUtils.isNotEmpty(sub.getDestination()) && StringUtils.isNotEmpty(sub.getClientId())) { clientIdentity = new ClientIdentity(sub.getDestination(), Short.valueOf(sub.getClientId()), sub.getFilter()); MDC.put("destination", clientIdentity.getDestination()); // 嘗試啓動,若是已經啓動,忽略 if (!embeddedServer.isStart(clientIdentity.getDestination())) { ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(clientIdentity.getDestination()); if (!runningMonitor.isStart()) { runningMonitor.start(); } } embeddedServer.subscribe(clientIdentity); ctx.setAttachment(clientIdentity);// 設置狀態數據 NettyUtils.ack(ctx.getChannel(), null); } else { NettyUtils.error(401, MessageFormatter.format("destination or clientId is null", sub.toString()).getMessage(), ctx.getChannel(), null); } break;
相似於connect的過程,不過這邊帶上了filter的參數。這邊啓動了server以及他的監聽器。
這裏的回滾是指回滾server端記錄的本client的位點信息。
public void rollback() throws CanalClientException { waitClientRunning(); rollback(0);// 0代筆未設置 }
這裏發送了rollback的指令。服務端是這麼處理的:
case CLIENTROLLBACK: ClientRollback rollback = CanalPacket.ClientRollback.parseFrom(packet.getBody()); MDC.put("destination", rollback.getDestination()); if (StringUtils.isNotEmpty(rollback.getDestination()) && StringUtils.isNotEmpty(rollback.getClientId())) { clientIdentity = new ClientIdentity(rollback.getDestination(), Short.valueOf(rollback.getClientId())); if (rollback.getBatchId() == 0L) { embeddedServer.rollback(clientIdentity);// 回滾全部批次 } else { embeddedServer.rollback(clientIdentity, rollback.getBatchId()); // 只回滾單個批次 } } else { NettyUtils.error(401, MessageFormatter.format("destination or clientId is null", rollback.toString()) .getMessage(), ctx.getChannel(), null); } break;
這裏的batchId傳入的是0,也就是要回滾全部的批次。咱們來看下這個回滾的動做:
@Override public void rollback(ClientIdentity clientIdentity) throws CanalServerException { checkStart(clientIdentity.getDestination()); CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination()); // 由於存在第一次連接時自動rollback的狀況,因此須要忽略未訂閱 boolean hasSubscribe = canalInstance.getMetaManager().hasSubscribe(clientIdentity); if (!hasSubscribe) { return; } synchronized (canalInstance) { // 清除batch信息 canalInstance.getMetaManager().clearAllBatchs(clientIdentity); // rollback eventStore中的狀態信息 canalInstance.getEventStore().rollback(); logger.info("rollback successfully, clientId:{}", new Object[] { clientIdentity.getClientId() }); } }
這裏回滾的,實際上是eventStore中的指針,把get的指針設置爲以前ack的指針。
當client鏈接server完成後,就須要進行binlog數據的訂閱。
public void subscribe() throws CanalClientException { subscribe(""); // 傳遞空字符便可 } public void subscribe(String filter) throws CanalClientException { int times = 0; while (times < retryTimes) { try { currentConnector.subscribe(filter); this.filter = filter; return; } catch (Throwable t) { if (retryTimes == -1 && t.getCause() instanceof InterruptedException) { logger.info("block waiting interrupted by other thread."); return; } else { logger.warn(String.format( "something goes wrong when subscribing from server: %s", currentConnector != null ? currentConnector.getAddress() : "null"), t); times++; restart(); logger.info("restart the connector for next round retry."); } } } throw new CanalClientException("failed to subscribe after " + times + " times retry."); }
訂閱這塊的內容再也不贅述,在上面的connect過程當中有提到。這邊還有一個失敗重試的機制,當異常不是中斷異常的狀況下,會重試重啓client connector,直到達到了閾值retryTimes。
在創建鏈接和進行數據訂閱以後,就能夠開始進行binlog數據的獲取了。主要的方法是getWithOutAck這個方法,這種是須要client本身進行數據ack的,保證了只有數據真正的被消費,並且進行了業務邏輯處理以後,纔會ack。固然,若是有了異常,也會進行必定次數的重試和重啓。
public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException { waitClientRunning(); try { ...//忽略 writeWithHeader(Packet.newBuilder() .setType(PacketType.GET) .setBody(Get.newBuilder() .setAutoAck(false) .setDestination(clientIdentity.getDestination()) .setClientId(String.valueOf(clientIdentity.getClientId())) .setFetchSize(size) .setTimeout(time) .setUnit(unit.ordinal()) .build() .toByteString()) .build() .toByteArray()); return receiveMessages(); } catch (IOException e) { throw new CanalClientException(e); } }
咱們能夠看到,實際上是發送了一個GET命令給server端,而後傳遞了一個參數batchSize,還有超時時間,並且不是自動提交的。服務端的處理是這樣的:
embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize());
也是調用的這個方法:
@Override public Message getWithoutAck(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit) throws CanalServerException { checkStart(clientIdentity.getDestination()); checkSubscribe(clientIdentity); CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination()); synchronized (canalInstance) { // 獲取到流式數據中的最後一批獲取的位置 PositionRange<LogPosition> positionRanges = canalInstance.getMetaManager().getLastestBatch(clientIdentity); Events<Event> events = null; if (positionRanges != null) { // 存在流數據 events = getEvents(canalInstance.getEventStore(), positionRanges.getStart(), batchSize, timeout, unit); } else {// ack後第一次獲取 Position start = canalInstance.getMetaManager().getCursor(clientIdentity); if (start == null) { // 第一次,尚未過ack記錄,則獲取當前store中的第一條 start = canalInstance.getEventStore().getFirstPosition(); } events = getEvents(canalInstance.getEventStore(), start, batchSize, timeout, unit); } if (CollectionUtils.isEmpty(events.getEvents())) { logger.debug("getWithoutAck successfully, clientId:{} batchSize:{} but result is null", clientIdentity.getClientId(), batchSize); return new Message(-1, new ArrayList<Entry>()); // 返回空包,避免生成batchId,浪費性能 } else { // 記錄到流式信息 Long batchId = canalInstance.getMetaManager().addBatch(clientIdentity, events.getPositionRange()); List<Entry> entrys = Lists.transform(events.getEvents(), new Function<Event, Entry>() { public Entry apply(Event input) { return input.getEntry(); } }); if (logger.isInfoEnabled()) { logger.info("getWithoutAck successfully, clientId:{} batchSize:{} real size is {} and result is [batchId:{} , position:{}]", clientIdentity.getClientId(), batchSize, entrys.size(), batchId, events.getPositionRange()); } return new Message(batchId, entrys); } } }
最主要的邏輯在這裏:
結果封裝在Messages中,最終改成Message,包含批次號和binlog列表。
拿到message後,須要進行判斷batchId,若是batchId=-1或者binlog大小爲0,說明沒有拿到數據。不然在message基礎上進行邏輯處理。
Message的內容,後續咱們再進行討論。
connector.ack(batchId); // 提交確認
提交批次id,底層發送CLIENTACK命令到server。server調用CanalServerWithEmbedded的ack方法來進行提交。
public void ack(ClientIdentity clientIdentity, long batchId) throws CanalServerException { checkStart(clientIdentity.getDestination()); checkSubscribe(clientIdentity); CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination()); PositionRange<LogPosition> positionRanges = null; positionRanges = canalInstance.getMetaManager().removeBatch(clientIdentity, batchId); // 更新位置 if (positionRanges == null) { // 說明是重複的ack/rollback throw new CanalServerException(String.format("ack error , clientId:%s batchId:%d is not exist , please check", clientIdentity.getClientId(), batchId)); } // 更新cursor if (positionRanges.getAck() != null) { canalInstance.getMetaManager().updateCursor(clientIdentity, positionRanges.getAck()); if (logger.isInfoEnabled()) { logger.info("ack successfully, clientId:{} batchId:{} position:{}", clientIdentity.getClientId(), batchId, positionRanges); } } // 可定時清理數據 canalInstance.getEventStore().ack(positionRanges.getEnd()); }
首先更新metaManager中的batch,而後更新ack指針,同時清理store中到ack指針位置的數據。
若是有失敗的狀況,須要進行回滾。發送CLIENTROLLBACK命令給server端,進行數據回滾。回滾單個批次時的處理邏輯是這樣的:
@Override public void rollback(ClientIdentity clientIdentity, Long batchId) throws CanalServerException { checkStart(clientIdentity.getDestination()); CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination()); // 由於存在第一次連接時自動rollback的狀況,因此須要忽略未訂閱 boolean hasSubscribe = canalInstance.getMetaManager().hasSubscribe(clientIdentity); if (!hasSubscribe) { return; } synchronized (canalInstance) { // 清除batch信息 PositionRange<LogPosition> positionRanges = canalInstance.getMetaManager().removeBatch(clientIdentity, batchId); if (positionRanges == null) { // 說明是重複的ack/rollback throw new CanalServerException(String.format("rollback error, clientId:%s batchId:%d is not exist , please check", clientIdentity.getClientId(), batchId)); } // lastRollbackPostions.put(clientIdentity, // positionRanges.getEnd());// 記錄一下最後rollback的位置 // TODO 後續rollback到指定的batchId位置 canalInstance.getEventStore().rollback();// rollback // eventStore中的狀態信息 logger.info("rollback successfully, clientId:{} batchId:{} position:{}", clientIdentity.getClientId(), batchId, positionRanges); } }
這裏的rollback到指定的batchId,實際上是假的。他的rollback也是全量回滾到ack的指針位置。
在發生異常狀況時,client會斷開與server的鏈接,也就是disconnect方法。
public void disconnect() throws CanalClientException { if (rollbackOnDisConnect && channel.isConnected()) { rollback(); } connected = false; if (runningMonitor != null) { if (runningMonitor.isStart()) { runningMonitor.stop(); } } else { doDisconnnect(); } }
判斷是否在斷開鏈接的時候回滾參數(默認false)和當前socket通道是否鏈接中,進行回滾。
不然調用runningMonitor.stop方法進行中止。主要的過程是這樣的: