本文主要研究一下rocketmq-mysql的BinlogPositionManagerjava
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.javamysql
public class BinlogPositionManager { private Logger logger = LoggerFactory.getLogger(BinlogPositionManager.class); private DataSource dataSource; private Config config; private String binlogFilename; private Long nextPosition; public BinlogPositionManager(Config config, DataSource dataSource) { this.config = config; this.dataSource = dataSource; } public void initBeginPosition() throws Exception { if (config.startType == null || config.startType.equals("DEFAULT")) { initPositionDefault(); } else if (config.startType.equals("NEW_EVENT")) { initPositionFromBinlogTail(); } else if (config.startType.equals("LAST_PROCESSED")) { initPositionFromMqTail(); } else if (config.startType.equals("SPECIFIED")) { binlogFilename = config.binlogFilename; nextPosition = config.nextPosition; } if (binlogFilename == null || nextPosition == null) { throw new Exception("binlogFilename | nextPosition is null."); } } //...... public String getBinlogFilename() { return binlogFilename; } public Long getPosition() { return nextPosition; } }
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.javagit
public class BinlogPositionManager { //...... private void initPositionDefault() throws Exception { try { initPositionFromMqTail(); } catch (Exception e) { logger.error("Init position from mq error.", e); } if (binlogFilename == null || nextPosition == null) { initPositionFromBinlogTail(); } } //...... }
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.javagithub
public class BinlogPositionManager { //...... private void initPositionFromBinlogTail() throws SQLException { String sql = "SHOW MASTER STATUS"; Connection conn = null; ResultSet rs = null; try { Connection connection = dataSource.getConnection(); rs = connection.createStatement().executeQuery(sql); while (rs.next()) { binlogFilename = rs.getString("File"); nextPosition = rs.getLong("Position"); } } finally { if (conn != null) { conn.close(); } if (rs != null) { rs.close(); } } } //...... }
SHOW MASTER STATUS
來獲取最新的binlogFilename及nextPositionrocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.javasql
public class BinlogPositionManager { //...... private void initPositionFromMqTail() throws Exception { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("BINLOG_CONSUMER_GROUP"); consumer.setNamesrvAddr(config.mqNamesrvAddr); consumer.setMessageModel(MessageModel.valueOf("BROADCASTING")); consumer.start(); Set<MessageQueue> queues = consumer.fetchSubscribeMessageQueues(config.mqTopic); MessageQueue queue = queues.iterator().next(); if (queue != null) { Long offset = consumer.maxOffset(queue); if (offset > 0) offset--; PullResult pullResult = consumer.pull(queue, "*", offset, 100); if (pullResult.getPullStatus() == PullStatus.FOUND) { MessageExt msg = pullResult.getMsgFoundList().get(0); String json = new String(msg.getBody(), "UTF-8"); JSONObject js = JSON.parseObject(json); binlogFilename = (String) js.get("binlogFilename"); nextPosition = js.getLong("nextPosition"); } } } //...... }
BinlogPositionManager提供了initBeginPosition、getBinlogFilename、getPosition方法;其中initBeginPosition方法根據config.startType的類型來執行不一樣邏輯,DEFAULT的是執行initPositionDefault,NEW_EVENT是執行initPositionFromBinlogTail,LAST_PROCESSED是執行initPositionFromMqTail,SPECIFIED是設置指定的binlogFilename及nextPositionapache