聊聊rocketmq-mysql的BinlogPositionManager

本文主要研究一下rocketmq-mysql的BinlogPositionManagerjava

BinlogPositionManager

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.javamysql

public class BinlogPositionManager {
    private Logger logger = LoggerFactory.getLogger(BinlogPositionManager.class);

    private DataSource dataSource;
    private Config config;

    private String binlogFilename;
    private Long nextPosition;

    public BinlogPositionManager(Config config, DataSource dataSource) {
        this.config = config;
        this.dataSource = dataSource;
    }

    public void initBeginPosition() throws Exception {

        if (config.startType == null || config.startType.equals("DEFAULT")) {
            initPositionDefault();

        } else if (config.startType.equals("NEW_EVENT")) {
            initPositionFromBinlogTail();

        } else if (config.startType.equals("LAST_PROCESSED")) {
            initPositionFromMqTail();

        } else if (config.startType.equals("SPECIFIED")) {
            binlogFilename = config.binlogFilename;
            nextPosition = config.nextPosition;

        }

        if (binlogFilename == null || nextPosition == null) {
            throw new Exception("binlogFilename | nextPosition is null.");
        }
    }

    //......

    public String getBinlogFilename() {
        return binlogFilename;
    }

    public Long getPosition() {
        return nextPosition;
    }
}
  • BinlogPositionManager提供了initBeginPosition、getBinlogFilename、getPosition方法;其中initBeginPosition方法根據config.startType的類型來執行不一樣邏輯,DEFAULT的是執行initPositionDefault,NEW_EVENT是執行initPositionFromBinlogTail,LAST_PROCESSED是執行initPositionFromMqTail,SPECIFIED是設置指定的binlogFilename及nextPosition

initPositionDefault

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.javagit

public class BinlogPositionManager {

    //......

    private void initPositionDefault() throws Exception {

        try {
            initPositionFromMqTail();
        } catch (Exception e) {
            logger.error("Init position from mq error.", e);
        }

        if (binlogFilename == null || nextPosition == null) {
            initPositionFromBinlogTail();
        }

    }

    //......

}
  • initPositionDefault執行的是initPositionFromMqTail方法,在binlogFilename或者nextPosition爲null時還會執行initPositionFromBinlogTail

initPositionFromBinlogTail

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.javagithub

public class BinlogPositionManager {

    //......

    private void initPositionFromBinlogTail() throws SQLException {
        String sql = "SHOW MASTER STATUS";

        Connection conn = null;
        ResultSet rs = null;

        try {
            Connection connection = dataSource.getConnection();
            rs = connection.createStatement().executeQuery(sql);

            while (rs.next()) {
                binlogFilename = rs.getString("File");
                nextPosition = rs.getLong("Position");
            }

        } finally {

            if (conn != null) {
                conn.close();
            }
            if (rs != null) {
                rs.close();
            }
        }

    }

    //......

}
  • initPositionFromBinlogTail方法經過SHOW MASTER STATUS來獲取最新的binlogFilename及nextPosition

initPositionFromMqTail

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.javasql

public class BinlogPositionManager {

    //......

    private void initPositionFromMqTail() throws Exception {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("BINLOG_CONSUMER_GROUP");
        consumer.setNamesrvAddr(config.mqNamesrvAddr);
        consumer.setMessageModel(MessageModel.valueOf("BROADCASTING"));
        consumer.start();

        Set<MessageQueue> queues = consumer.fetchSubscribeMessageQueues(config.mqTopic);
        MessageQueue queue = queues.iterator().next();

        if (queue != null) {
            Long offset = consumer.maxOffset(queue);
            if (offset > 0)
                offset--;

            PullResult pullResult = consumer.pull(queue, "*", offset, 100);

            if (pullResult.getPullStatus() == PullStatus.FOUND) {
                MessageExt msg = pullResult.getMsgFoundList().get(0);
                String json = new String(msg.getBody(), "UTF-8");

                JSONObject js = JSON.parseObject(json);
                binlogFilename = (String) js.get("binlogFilename");
                nextPosition = js.getLong("nextPosition");
            }
        }

    }

    //......

}
  • initPositionFromMqTail方法建立DefaultMQPullConsumer,而後從指定的topic拉取MessageQueue,而後經過consumer.maxOffset(queue)獲取offset,再執行pull,找到第一個msg,而後設置binlogFilename及nextPosition

小結

BinlogPositionManager提供了initBeginPosition、getBinlogFilename、getPosition方法;其中initBeginPosition方法根據config.startType的類型來執行不一樣邏輯,DEFAULT的是執行initPositionDefault,NEW_EVENT是執行initPositionFromBinlogTail,LAST_PROCESSED是執行initPositionFromMqTail,SPECIFIED是設置指定的binlogFilename及nextPositionapache

doc

相關文章
相關標籤/搜索