public interface TaskExecutor extends LifeCycle { boolean isStop(); boolean isMerging(); void stopUntil(long timestamp); void cancelStopUntil(); void setContext(PumaContext context); void initContext(); PumaContext getContext(); String getTaskId(); void setTaskId(String taskId); String getTaskName(); void setTaskName(String taskName); String getDefaultBinlogFileName(); void setDefaultBinlogFileName(String binlogFileName); Long getDefaultBinlogPosition(); void setDefaultBinlogPosition(Long binlogFileName); void setInstanceStorageManager(InstanceStorageManager holder); List<Sender> getFileSender(); DataHandler getDataHandler(); void resume() throws Exception; void pause() throws Exception; PumaTaskStateEntity getTaskState(); void setTaskState(PumaTaskStateEntity taskState); void setInstanceTask(InstanceTask instanceTask); InstanceTask getInstanceTask(); TableSet getTableSet(); }
@ThreadUnSafe public abstract class AbstractTaskExecutor implements TaskExecutor { private PumaContext context; private String taskId; private long serverId; protected String taskName; protected Date beginTime; protected TableSet tableSet; private String defaultBinlogFileName; private Long defaultBinlogPosition; protected Parser parser; protected DataHandler dataHandler; protected Dispatcher dispatcher; private volatile boolean stop = true; protected InstanceStorageManager instanceStorageManager; protected PumaTaskStateEntity state; protected InstanceManager instanceManager; @Override public String getTaskId() { return taskId; } @Override public void setTaskId(String taskId) { this.taskId = taskId; } @Override public String getTaskName() { return taskName; } @Override public void setTaskName(String taskName) { this.taskName = taskName; } /** * @param instanceStorageManager * the binlogPositionHolder to set */ public void setInstanceStorageManager(InstanceStorageManager instanceStorageManager) { this.instanceStorageManager = instanceStorageManager; } public void setContext(PumaContext context) { this.context = context; } public PumaContext getContext() { return context; } public String getDefaultBinlogFileName() { return defaultBinlogFileName; } public void setDefaultBinlogFileName(String binlogFileName) { this.defaultBinlogFileName = binlogFileName; } /** * @return the defaultBinlogPosition */ public Long getDefaultBinlogPosition() { return defaultBinlogPosition; } /** * @param defaultBinlogPosition * the defaultBinlogPosition to set */ public void setDefaultBinlogPosition(Long defaultBinlogPosition) { this.defaultBinlogPosition = defaultBinlogPosition; } /** * @param parser * the parser to set */ public void setParser(Parser parser) { this.parser = parser; } /** * @param dataHandler * the dataHandler to set */ public void setDataHandler(DataHandler dataHandler) { this.dataHandler = dataHandler; } /** * @param dispatcher * the dispatcher to set */ public void setDispatcher(Dispatcher dispatcher) { this.dispatcher = dispatcher; } public long getServerId() { return serverId; } public void setServerId(long serverId) { this.serverId = serverId; } public boolean isStop() { return stop; } protected abstract void doStop() throws Exception; protected abstract void doStart() throws Exception; @Override public void start() { try { stop = false; parser.start(); dataHandler.start(); dispatcher.start(); doStart(); } catch (Exception e) { throw new RuntimeException(e); } } @Override public void stop() { try { stop = true; parser.stop(); dataHandler.stop(); dispatcher.stop(); doStop(); } catch (Exception e) { throw new RuntimeException(e); } } public void resume() throws Exception { stop = false; } public void pause() throws Exception { stop = true; } @Override public List<Sender> getFileSender() { return dispatcher.getSenders(); } @Override public DataHandler getDataHandler() { return this.dataHandler; } public PumaTaskStateEntity getTaskState() { return state; } public void setTaskState(PumaTaskStateEntity state) { this.state = state; } public Date getBeginTime() { return beginTime; } public void setBeginTime(Date beginTime) { this.beginTime = beginTime; } public TableSet getTableSet() { return tableSet; } public void setTableSet(TableSet tableSet) { this.tableSet = tableSet; } public InstanceManager getInstanceManager() { return instanceManager; } public void setInstanceManager(InstanceManager instanceManager) { this.instanceManager = instanceManager; } }
@ThreadUnSafe public class DefaultTaskExecutor extends AbstractTaskExecutor { private static final Logger LOG = LoggerFactory.getLogger(DefaultTaskExecutor.class); private SrcDbEntity currentSrcDbEntity; private DefaultTableMetaInfoFetcher tableMetaInfoFetcher; private String encoding = "utf-8"; private Socket mysqlSocket; private InputStream is; private OutputStream os; private InstanceTask instanceTask; private boolean merging = false; private long runUntilTimestamp; @Override public void doStart() throws Exception { Thread.currentThread().setName("DefaultTaskExecutor-" + taskName); long failCount = 0; merging = false; SystemStatusManager.addServer(getTaskName(), "", 0, tableSet); do { try { loadServerId(instanceManager.getUrlByCluster(instanceTask.getInstance())); // 讀position/file文件 BinlogInfo binlogInfo = instanceStorageManager.getBinlogInfo(getContext().getPumaServerName()); if (binlogInfo == null) { this.currentSrcDbEntity = initSrcDbByServerId(-1); if (beginTime != null) { binlogInfo = getBinlogByTimestamp(beginTime.getTime() / 1000); } } else { this.currentSrcDbEntity = initSrcDbByServerId(binlogInfo.getServerId()); if (binlogInfo.getServerId() != currentSrcDbEntity.getServerId()) { BinlogInfo oldBinlogInfo = binlogInfo; binlogInfo = getBinlogByTimestamp(oldBinlogInfo.getTimestamp() - 60); if (binlogInfo == null) { throw new IOException("Switch Binlog Failed!"); } else { Cat.logEvent("BinlogSwitch", taskName, Message.SUCCESS, oldBinlogInfo.toString() + " -> " + binlogInfo.toString()); } } } updateTableMetaInfoFetcher(); getContext().setMasterUrl(currentSrcDbEntity.getHost(), currentSrcDbEntity.getPort()); if (!connect()) { throw new IOException("Connection failed."); } initConnect(); initBinlogPosition(binlogInfo); if (dumpBinlog()) { processBinlog(); } else { throw new IOException("Binlog dump failed."); } } catch (Exception e) { if (++failCount % 3 == 0) { this.currentSrcDbEntity = chooseNextSrcDb(); updateTableMetaInfoFetcher(); failCount = 0; } String msg = "Exception occurs. taskName: " + getTaskName() + " dbServerId: " + (currentSrcDbEntity == null ? 0 : currentSrcDbEntity.getServerId()) + ". Reconnect..."; LOG.error(msg, e); Cat.logError(msg, e); Thread.sleep(((failCount % 10) + 1) * 2000); } } while (!isStop() && !Thread.currentThread().isInterrupted()); } protected void doStop() throws Exception { LOG.info("TaskName: " + getTaskName() + ", Stopped."); closeTransport(); SystemStatusManager.deleteServer(getTaskName()); } //...... }
protected BinlogInfo getBinlogByTimestamp(long time) throws IOException { BinlogInfo binlogResult = null; Transaction t = Cat.newTransaction("BinlogFindByTime", taskName); Cat.logEvent("BinlogFindByTime.Time", String.valueOf(time)); try { if (!connect()) { throw new IOException("Connection failed."); } initConnect(); List<BinlogInfo> binaryLogs = getBinaryLogs(); Cat.logEvent("BinlogFindByTime.BinaryLogs", currentSrcDbEntity.toString(), Message.SUCCESS, Joiner.on(",").join(binaryLogs)); BinlogInfo closestBinlogInfo = null; for (int k = binaryLogs.size() - 1; k >= 0; k--) { if (binlogResult != null) { break; } BinlogInfo newBinlogInfo = binaryLogs.get(k); Cat.logEvent("BinlogFindByTime.Start", newBinlogInfo.toString()); getContext().setDBServerId(currentSrcDbEntity.getServerId()); getContext().setBinlogFileName(newBinlogInfo.getBinlogFile()); getContext().setBinlogStartPos(4); getContext().setMasterUrl(currentSrcDbEntity.getHost(), currentSrcDbEntity.getPort()); if (!connect()) { throw new IOException("Connection failed."); } initConnect(); if (dumpBinlog()) { while (!isStop()) { BinlogPacket binlogPacket = (BinlogPacket) PacketFactory.parsePacket(is, PacketType.BINLOG_PACKET, getContext()); if (!binlogPacket.isOk()) { LOG.error("TaskName: " + getTaskName() + ", Binlog packet response error."); throw new IOException("TaskName: " + getTaskName() + ", Binlog packet response error."); } else { BinlogEvent binlogEvent = parser.parse(binlogPacket.getBinlogBuf(), getContext()); try { getContext().setNextBinlogPos(binlogEvent.getHeader().getNextPosition()); if (binlogEvent.getHeader().getEventType() == BinlogConstants.ROTATE_EVENT) { if (closestBinlogInfo == null) { break; } else { continue; } } if (binlogEvent.getHeader().getTimestamp() >= time) { if (closestBinlogInfo != null) { binlogResult = closestBinlogInfo; } break; } if (binlogEvent.getHeader().getEventType() == BinlogConstants.XID_EVENT && binlogEvent.getHeader().getTimestamp() < time) { closestBinlogInfo = new BinlogInfo( currentSrcDbEntity.getServerId(), getContext().getBinlogFileName(), binlogEvent.getHeader().getNextPosition(), 0, binlogEvent.getHeader().getTimestamp()); } } finally { if (binlogEvent.getHeader().getEventType() == BinlogConstants.ROTATE_EVENT) { RotateEvent rotateEvent = (RotateEvent) binlogEvent; getContext().setBinlogFileName(rotateEvent.getNextBinlogFileName()); getContext().setBinlogStartPos(rotateEvent.getFirstEventPosition()); } else { getContext().setBinlogStartPos(binlogEvent.getHeader().getNextPosition()); } } } } } else { throw new IOException("Binlog dump failed."); } } Cat.logEvent("BinlogFindByTime.Success", taskName, Message.SUCCESS, time + " -> " + (binlogResult == null ? "null" : binlogResult.toString())); t.setStatus(Message.SUCCESS); t.complete(); return binlogResult; } catch (IOException e) { t.setStatus(e); t.complete(); throw e; } }
private boolean connect() { try { closeTransport(); this.mysqlSocket = new Socket(); this.mysqlSocket.setTcpNoDelay(false); this.mysqlSocket.setKeepAlive(true); this.mysqlSocket.connect(new InetSocketAddress(currentSrcDbEntity.getHost(), currentSrcDbEntity.getPort())); this.is = new BufferedInputStream(mysqlSocket.getInputStream()); this.os = new BufferedOutputStream(mysqlSocket.getOutputStream()); PacketFactory.parsePacket(is, PacketType.CONNECT_PACKET, getContext()); LOG.info("TaskName: " + getTaskName() + ", Connection db success."); return true; } catch (Exception e) { LOG.error("TaskName: " + getTaskName() + ", Connect failed. Reason: " + e.getMessage()); return false; } }
protected void initConnect() throws IOException { if (!auth()) { throw new IOException("Login failed."); } if (getContext().isCheckSum()) { if (!updateSetting()) { throw new IOException("Update setting command failed."); } } if (!queryBinlogFormat()) { throw new IOException("Query config binlogformat failed."); } if (!queryBinlogImage()) { throw new IOException("Query config binlog row image failed."); } if (queryServerId() != currentSrcDbEntity.getServerId()) { throw new IOException("Server Id Changed."); } } private boolean auth() { try { LOG.info("server logining taskName: " + getTaskName() + " host: " + currentSrcDbEntity.getHost() + " port: " + currentSrcDbEntity.getPort() + " username: " + currentSrcDbEntity.getUsername() + " dbServerId: " + currentSrcDbEntity.getServerId()); AuthenticatePacket authPacket = (AuthenticatePacket) PacketFactory.createCommandPacket( PacketType.AUTHENTICATE_PACKET, getContext()); authPacket.setPassword(currentSrcDbEntity.getPassword()); authPacket.setUser(currentSrcDbEntity.getUsername()); authPacket.buildPacket(getContext()); authPacket.write(os, getContext()); OKErrorPacket okErrorPacket = (OKErrorPacket) PacketFactory.parsePacket(is, PacketType.OKERROR_PACKET, getContext()); boolean isAuth; if (okErrorPacket.isOk()) { LOG.info("TaskName: " + getTaskName() + ", Server login success."); isAuth = true; } else { isAuth = false; LOG.error("TaskName: " + getTaskName() + ", Login failed. Reason: " + okErrorPacket.getMessage()); } return isAuth; } catch (Exception e) { LOG.error("TaskName: " + getTaskName() + ", Login failed. Reason: " + e.getMessage()); return false; } } private boolean queryBinlogFormat() throws IOException { try { QueryExecutor executor = new QueryExecutor(is, os); String cmd = "show global variables like 'binlog_format'"; ResultSet rs = executor.query(cmd, getContext()); List<String> columnValues = rs.getFiledValues(); boolean isQuery = true; if (columnValues == null || columnValues.size() != 2 || columnValues.get(1) == null) { LOG.error("TaskName: " + getTaskName() + ", QueryConfig failed Reason:unexcepted binlog format query result."); isQuery = false; } BinlogFormat binlogFormat = BinlogFormat.valuesOf(columnValues.get(1)); String eventName = String.format("slave(%s) -- db(%s:%d)", getTaskName(), currentSrcDbEntity.getHost(), currentSrcDbEntity.getPort()); if (binlogFormat == null || !binlogFormat.isRow()) { isQuery = false; LOG.error("TaskName: " + getTaskName() + ", Unexcepted binlog format: " + binlogFormat.value); } Cat.logEvent("Slave.dbBinlogFormat", eventName, isQuery ? Message.SUCCESS : "1", ""); if (isQuery) { LOG.info("TaskName: " + getTaskName() + ", Query config binlogformat is legal."); } return isQuery; } catch (Exception e) { LOG.error("TaskName: " + getTaskName() + ", QueryConfig failed Reason: " + e.getMessage()); return false; } } private boolean queryBinlogImage() throws IOException { try { QueryExecutor executor = new QueryExecutor(is, os); String cmd = "show variables like 'binlog_row_image'"; ResultSet rs = executor.query(cmd, getContext()); List<String> columnValues = rs.getFiledValues(); boolean isQuery = true; if (columnValues == null || columnValues.size() == 0) {// 5.1 isQuery = true; } else if (columnValues != null && columnValues.size() == 2 && columnValues.get(1) != null) {// 5.6 BinlogRowImage binlogRowImage = BinlogRowImage.valuesOf(columnValues.get(1)); isQuery = true; if (binlogRowImage == null || !binlogRowImage.isFull()) { isQuery = false; LOG.error("TaskName: " + getTaskName() + ", Unexcepted binlog row image: " + binlogRowImage.value); } } else { LOG.error("TaskName: " + getTaskName() + ", QueryConfig failed Reason:unexcepted binlog row image query result."); isQuery = false; } String eventName = String.format("slave(%s) -- db(%s:%d)", getTaskName(), currentSrcDbEntity.getHost(), currentSrcDbEntity.getPort()); Cat.logEvent("Slave.dbBinlogRowImage", eventName, isQuery ? Message.SUCCESS : "1", ""); if (isQuery) { LOG.info("TaskName: " + getTaskName() + ", Query config binlog row image is legal."); } return isQuery; } catch (Exception e) { LOG.error("TaskName: " + getTaskName() + ", QueryConfig failed Reason: " + e.getMessage()); return false; } }
show global variables like 'binlog_format'
命令;queryBinlogImage主要執行show variables like 'binlog_row_image'
protected void initBinlogPosition(BinlogInfo binlogInfo) throws IOException { if (binlogInfo == null) { List<BinlogInfo> binaryLogs = getBinaryLogs(); BinlogInfo begin = beginTime == null ? binaryLogs.get(binaryLogs.size() - 1) : binaryLogs.get(0); binlogInfo = new BinlogInfo(currentSrcDbEntity.getServerId(), begin.getBinlogFile(), 4l, 0, begin.getTimestamp()); } getContext().setDBServerId(currentSrcDbEntity.getServerId()); getContext().setBinlogFileName(binlogInfo.getBinlogFile()); getContext().setBinlogStartPos(binlogInfo.getBinlogPosition()); setBinlogInfo(binlogInfo); SystemStatusManager.addServer(getTaskName(), currentSrcDbEntity.getHost(), currentSrcDbEntity.getPort(), tableSet); SystemStatusManager.updateServerBinlog(getTaskName(), binlogInfo); }
private boolean dumpBinlog() { try { ComBinlogDumpPacket dumpBinlogPacket = (ComBinlogDumpPacket) PacketFactory.createCommandPacket( PacketType.COM_BINLOG_DUMP_PACKET, getContext()); dumpBinlogPacket.setBinlogFileName(getContext().getBinlogFileName()); dumpBinlogPacket.setBinlogFlag(0); dumpBinlogPacket.setBinlogPosition(getContext().getBinlogStartPos()); dumpBinlogPacket.setServerId(getServerId()); dumpBinlogPacket.buildPacket(getContext()); dumpBinlogPacket.write(os, getContext()); OKErrorPacket dumpCommandResultPacket = (OKErrorPacket) PacketFactory.parsePacket(is, PacketType.OKERROR_PACKET, getContext()); if (dumpCommandResultPacket.isOk()) { LOG.info("TaskName: " + getTaskName() + ", Dump binlog command success."); return true; } else { LOG.error("TaskName: " + getTaskName() + ", Dump binlog failed. Reason: " + dumpCommandResultPacket.getMessage()); return false; } } catch (Exception e) { LOG.error("TaskName: " + getTaskName() + " Dump binlog failed. Reason: " + e.getMessage()); return false; } }
private void processBinlog() throws IOException { while (!isStop()) { BinlogPacket binlogPacket = (BinlogPacket) PacketFactory.parsePacket(is, PacketType.BINLOG_PACKET, getContext()); if (!binlogPacket.isOk()) { LOG.error("TaskName: " + getTaskName() + ", Binlog packet response error."); throw new IOException("TaskName: " + getTaskName() + ", Binlog packet response error."); } else { processBinlogPacket(binlogPacket); } } } protected void processBinlogPacket(BinlogPacket binlogPacket) throws IOException { BinlogEvent binlogEvent = parser.parse(binlogPacket.getBinlogBuf(), getContext()); if (merging) { if (binlogEvent.getHeader().getTimestamp() >= runUntilTimestamp) { stop(); } } SystemStatusManager.incServerParsedCounter(getTaskName()); if (binlogEvent.getHeader().getEventType() == BinlogConstants.INTVAR_EVENT || binlogEvent.getHeader().getEventType() == BinlogConstants.RAND_EVENT || binlogEvent.getHeader().getEventType() == BinlogConstants.USER_VAR_EVENT) { LOG.error("TaskName: " + getTaskName() + ", Binlog_format is MIXED or STATEMENT ,System is not support."); String eventName = String.format("slave(%s) -- db(%s:%d)", getTaskName(), currentSrcDbEntity.getHost(), currentSrcDbEntity.getPort()); Cat.logEvent("Slave.dbBinlogFormat", eventName, "1", ""); Cat.logError("Puma.server.mixedorstatement.format", new IllegalArgumentException("TaskName: " + getTaskName() + ", Binlog_format is MIXED or STATEMENT ,System is not support.")); stopTask(); } if (binlogEvent.getHeader().getEventType() != BinlogConstants.FORMAT_DESCRIPTION_EVENT) { getContext().setNextBinlogPos(binlogEvent.getHeader().getNextPosition()); } if (binlogEvent.getHeader().getEventType() == BinlogConstants.ROTATE_EVENT) { processRotateEvent(binlogEvent); } else { processDataEvent(binlogEvent); } } protected void processRotateEvent(BinlogEvent binlogEvent) { RotateEvent rotateEvent = (RotateEvent) binlogEvent; getContext().setBinlogFileName(rotateEvent.getNextBinlogFileName()); getContext().setBinlogStartPos(rotateEvent.getFirstEventPosition()); } protected void processDataEvent(BinlogEvent binlogEvent) { DataHandlerResult dataHandlerResult = null; // 一直處理一個binlogEvent的多行,處理完每行立刻分發,以防止一個binlogEvent包含太多ChangedEvent而耗費太多內存 int eventIndex = 0; do { dataHandlerResult = dataHandler.process(binlogEvent, getContext()); if (dataHandlerResult != null && !dataHandlerResult.isEmpty()) { ChangedEvent changedEvent = dataHandlerResult.getData(); changedEvent.getBinlogInfo().setEventIndex(eventIndex++); updateOpsCounter(changedEvent); dispatch(changedEvent); } } while (dataHandlerResult != null && !dataHandlerResult.isFinished()); if (binlogEvent.getHeader().getEventType() != BinlogConstants.FORMAT_DESCRIPTION_EVENT) { getContext().setBinlogStartPos(binlogEvent.getHeader().getNextPosition()); setBinlogInfo(new BinlogInfo(getBinlogInfo().getServerId(), getBinlogInfo().getBinlogFile(), binlogEvent .getHeader().getNextPosition(), 0, 0)); } BinlogInfo binlogInfo = new BinlogInfo(getContext().getDBServerId(), getContext() .getBinlogFileName(), binlogEvent.getHeader().getNextPosition(), 0, binlogEvent.getHeader().getTimestamp()); SystemStatusManager.updateServerBinlog(getTaskName(), binlogInfo); if (binlogEvent.getHeader().getNextPosition() != 0 && StringUtils.isNotBlank(getContext().getBinlogFileName()) && dataHandlerResult != null && !dataHandlerResult.isEmpty() && (dataHandlerResult.getData() instanceof DdlEvent || (dataHandlerResult.getData() instanceof RowChangedEvent && ((RowChangedEvent) dataHandlerResult .getData()).isTransactionCommit()))) { instanceStorageManager.setBinlogInfo(getTaskName(), binlogInfo); } }
- processBinlog方法循環接收binlogPacket,而後執行processBinlogPacket;該方法經過parser.parse獲取binlogEvent,對於FORMAT_DESCRIPTION_EVENT,則更新binlogEvent.getHeader().getNextPosition()到context中;對於ROTATE_EVENT則執行processRotateEvent,不然執行processDataEvent;processRotateEvent主要是更新binlogFileName及binlogStartPos;processDataEvent則主要是經過dataHandler.process(binlogEvent, getContext())處理,而後執行dispatch(changedEvent)sql
private void closeTransport() { // Close in. try { if (this.is != null) { this.is.close(); } } catch (IOException ioEx) { LOG.warn("Server " + this.getTaskName() + ", Failed to close the input stream."); } finally { this.is = null; } // Close os. try { if (this.os != null) { this.os.close(); } } catch (IOException ioEx) { LOG.warn("Server " + this.getTaskName() + ", Failed to close the output stream"); } finally { this.os = null; } // Close socket. try { if (this.mysqlSocket != null) { this.mysqlSocket.close(); } } catch (IOException ioEx) { LOG.warn("Server " + this.getTaskName() + ", Failed to close the socket", ioEx); } finally { this.mysqlSocket = null; } }