本文主要研究一下rocketmq-mysql的Replicatorjava
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.javamysql
public class Replicator { private static final Logger LOGGER = LoggerFactory.getLogger(Replicator.class); private static final Logger POSITION_LOGGER = LoggerFactory.getLogger("PositionLogger"); private Config config; private EventProcessor eventProcessor; private RocketMQProducer rocketMQProducer; private Object lock = new Object(); private BinlogPosition nextBinlogPosition; private long nextQueueOffset; private long xid; public static void main(String[] args) { Replicator replicator = new Replicator(); replicator.start(); } public void start() { try { config = new Config(); config.load(); rocketMQProducer = new RocketMQProducer(config); rocketMQProducer.start(); BinlogPositionLogThread binlogPositionLogThread = new BinlogPositionLogThread(this); binlogPositionLogThread.start(); eventProcessor = new EventProcessor(this); eventProcessor.start(); } catch (Exception e) { LOGGER.error("Start error.", e); System.exit(1); } } public void commit(Transaction transaction, boolean isComplete) { String json = transaction.toJson(); for (int i = 0; i < 3; i++) { try { if (isComplete) { long offset = rocketMQProducer.push(json); synchronized (lock) { xid = transaction.getXid(); nextBinlogPosition = transaction.getNextBinlogPosition(); nextQueueOffset = offset; } } else { rocketMQProducer.push(json); } break; } catch (Exception e) { LOGGER.error("Push error,retry:" + (i + 1) + ",", e); } } } public void logPosition() { String binlogFilename = null; long xid = 0L; long nextPosition = 0L; long nextOffset = 0L; synchronized (lock) { if (nextBinlogPosition != null) { xid = this.xid; binlogFilename = nextBinlogPosition.getBinlogFilename(); nextPosition = nextBinlogPosition.getPosition(); nextOffset = nextQueueOffset; } } if (binlogFilename != null) { POSITION_LOGGER.info("XID: {}, BINLOG_FILE: {}, NEXT_POSITION: {}, NEXT_OFFSET: {}", xid, binlogFilename, nextPosition, nextOffset); } } public Config getConfig() { return config; } public BinlogPosition getNextBinlogPosition() { return nextBinlogPosition; } }
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/productor/RocketMQProducer.javagit
public class RocketMQProducer { private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQProducer.class); private DefaultMQProducer producer; private Config config; public RocketMQProducer(Config config) { this.config = config; } public void start() throws MQClientException { producer = new DefaultMQProducer("BINLOG_PRODUCER_GROUP"); producer.setNamesrvAddr(config.mqNamesrvAddr); producer.start(); } public long push(String json) throws Exception { LOGGER.debug(json); Message message = new Message(config.mqTopic, json.getBytes("UTF-8")); SendResult sendResult = producer.send(message); return sendResult.getQueueOffset(); } }
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionLogThread.javagithub
public class BinlogPositionLogThread extends Thread { private Logger logger = LoggerFactory.getLogger(BinlogPositionLogThread.class); private Replicator replicator; public BinlogPositionLogThread(Replicator replicator) { this.replicator = replicator; setDaemon(true); } @Override public void run() { while (true) { try { Thread.sleep(1000); } catch (InterruptedException e) { logger.error("Offset thread interrupted.", e); } replicator.logPosition(); } } }
Replicator提供了start、commit、logPosition方法;start方法會建立RocketMQProducer、BinlogPositionLogThread及EventProcessor,而後執行其start方法;commit方法會經過rocketMQProducer將transaction.toJson()發送出去,對於isComplete爲true的會更新xid、nextBinlogPosition、nextQueueOffset;logPosition方法會打印binlogFilename、nextPosition、nextOffsetsql