本文主要研究一下BinaryLogClient的EventListenerjava
mysql-binlog-connector-java-0.20.1/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.javamysql
public interface EventListener { void onEvent(Event event); }
mysql-binlog-connector-java-0.20.1/src/main/java/com/github/shyiko/mysql/binlog/jmx/BinaryLogClientStatistics.javagit
public class BinaryLogClientStatistics implements BinaryLogClientStatisticsMXBean, BinaryLogClient.EventListener, BinaryLogClient.LifecycleListener { private AtomicReference<EventHeader> lastEventHeader = new AtomicReference<EventHeader>(); private AtomicLong timestampOfLastEvent = new AtomicLong(); private AtomicLong totalNumberOfEventsSeen = new AtomicLong(); private AtomicLong totalBytesReceived = new AtomicLong(); private AtomicLong numberOfSkippedEvents = new AtomicLong(); private AtomicLong numberOfDisconnects = new AtomicLong(); public BinaryLogClientStatistics() { } public BinaryLogClientStatistics(BinaryLogClient binaryLogClient) { binaryLogClient.registerEventListener(this); binaryLogClient.registerLifecycleListener(this); } @Override public String getLastEvent() { EventHeader eventHeader = lastEventHeader.get(); return eventHeader == null ? null : eventHeader.getEventType() + "/" + eventHeader.getTimestamp() + " from server " + eventHeader.getServerId(); } @Override public long getSecondsSinceLastEvent() { long timestamp = timestampOfLastEvent.get(); return timestamp == 0 ? 0 : (getCurrentTimeMillis() - timestamp) / 1000; } @Override public long getSecondsBehindMaster() { // because lastEventHeader and timestampOfLastEvent are not guarded by the common lock // we may get some "distorted" results, though shouldn't be a problem given the nature of the final value long timestamp = timestampOfLastEvent.get(); EventHeader eventHeader = lastEventHeader.get(); if (timestamp == 0 || eventHeader == null) { return -1; } return (timestamp - eventHeader.getTimestamp()) / 1000; } @Override public long getTotalNumberOfEventsSeen() { return totalNumberOfEventsSeen.get(); } @Override public long getTotalBytesReceived() { return totalBytesReceived.get(); } @Override public long getNumberOfSkippedEvents() { return numberOfSkippedEvents.get(); } @Override public long getNumberOfDisconnects() { return numberOfDisconnects.get(); } @Override public void reset() { lastEventHeader.set(null); timestampOfLastEvent.set(0); totalNumberOfEventsSeen.set(0); totalBytesReceived.set(0); numberOfSkippedEvents.set(0); numberOfDisconnects.set(0); } @Override public void onEvent(Event event) { EventHeader header = event.getHeader(); lastEventHeader.set(header); timestampOfLastEvent.set(getCurrentTimeMillis()); totalNumberOfEventsSeen.getAndIncrement(); totalBytesReceived.getAndAdd(header.getHeaderLength() + header.getDataLength()); } @Override public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) { numberOfSkippedEvents.getAndIncrement(); lastEventHeader.set(null); timestampOfLastEvent.set(getCurrentTimeMillis()); totalNumberOfEventsSeen.getAndIncrement(); } @Override public void onDisconnect(BinaryLogClient client) { numberOfDisconnects.getAndIncrement(); } @Override public void onConnect(BinaryLogClient client) { } @Override public void onCommunicationFailure(BinaryLogClient client, Exception ex) { } protected long getCurrentTimeMillis() { return System.currentTimeMillis(); } }
mysql-binlog-connector-java-0.20.1/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.javagithub
public class BinaryLogClient implements BinaryLogClientMXBean { //...... private void listenForEventPackets() throws IOException { ByteArrayInputStream inputStream = channel.getInputStream(); boolean completeShutdown = false; try { while (inputStream.peek() != -1) { int packetLength = inputStream.readInteger(3); inputStream.skip(1); // 1 byte for sequence int marker = inputStream.read(); if (marker == 0xFF) { ErrorPacket errorPacket = new ErrorPacket(inputStream.read(packetLength - 1)); throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(), errorPacket.getSqlState()); } if (marker == 0xFE && !blocking) { completeShutdown = true; break; } Event event; try { event = eventDeserializer.nextEvent(packetLength == MAX_PACKET_LENGTH ? new ByteArrayInputStream(readPacketSplitInChunks(inputStream, packetLength - 1)) : inputStream); if (event == null) { throw new EOFException(); } } catch (Exception e) { Throwable cause = e instanceof EventDataDeserializationException ? e.getCause() : e; if (cause instanceof EOFException || cause instanceof SocketException) { throw e; } if (isConnected()) { for (LifecycleListener lifecycleListener : lifecycleListeners) { lifecycleListener.onEventDeserializationFailure(this, e); } } continue; } if (isConnected()) { eventLastSeen = System.currentTimeMillis(); updateGtidSet(event); notifyEventListeners(event); updateClientBinlogFilenameAndPosition(event); } } } catch (Exception e) { if (isConnected()) { for (LifecycleListener lifecycleListener : lifecycleListeners) { lifecycleListener.onCommunicationFailure(this, e); } } } finally { if (isConnected()) { if (completeShutdown) { disconnect(); // initiate complete shutdown sequence (which includes keep alive thread) } else { disconnectChannel(); } } } } private void notifyEventListeners(Event event) { if (event.getData() instanceof EventDataWrapper) { event = new Event(event.getHeader(), ((EventDataWrapper) event.getData()).getExternal()); } for (EventListener eventListener : eventListeners) { try { eventListener.onEvent(event); } catch (Exception e) { if (logger.isLoggable(Level.WARNING)) { logger.log(Level.WARNING, eventListener + " choked on " + event, e); } } } } //...... }
EventListener接口定義了onEvent方法;BinaryLogClientStatistics實現了BinaryLogClient.EventListener接口,其onEvent方法會更新lastEventHeader、timestampOfLastEvent、totalNumberOfEventsSeen、totalBytesReceivedsql