聊聊rocketmq的MessageQueueSelector

本文主要研究一下rocketmq的MessageQueueSelectorjava

MessageQueueSelector

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/MessageQueueSelector.javagit

public interface MessageQueueSelector {
    MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
複製代碼
  • MessageQueueSelector接口定義了select方法,返回MessageQueue;它有幾個實現類,分別是SelectMessageQueueByHash、SelectMessageQueueByRandom、SelectMessageQueueByMachineRoom

SelectMessageQueueByHash

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByHash.javagithub

public class SelectMessageQueueByHash implements MessageQueueSelector {

    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int value = arg.hashCode();
        if (value < 0) {
            value = Math.abs(value);
        }

        value = value % mqs.size();
        return mqs.get(value);
    }
}
複製代碼
  • SelectMessageQueueByHash實現了MessageQueueSelector接口,其select方法取arg參數的hashcode的絕對值,而後對mqs.size()取餘,獲得目標隊列在mqs的下標

SelectMessageQueueByRandom

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByRandom.javaspring

public class SelectMessageQueueByRandom implements MessageQueueSelector {
    private Random random = new Random(System.currentTimeMillis());

    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int value = random.nextInt(mqs.size());
        return mqs.get(value);
    }
}
複製代碼
  • SelectMessageQueueByRandom實現了MessageQueueSelector接口,其select方法直接根據mqs.size()隨機一個值做爲目標隊列在mqs的下標

SelectMessageQueueByMachineRoom

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.javaapache

public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
    private Set<String> consumeridcs;

    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        return null;
    }

    public Set<String> getConsumeridcs() {
        return consumeridcs;
    }

    public void setConsumeridcs(Set<String> consumeridcs) {
        this.consumeridcs = consumeridcs;
    }
}
複製代碼
  • SelectMessageQueueByMachineRoom實現了MessageQueueSelector接口,其select方法目前返回null

RocketMQTemplate

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.javabash

public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean {
    private static final  Logger log = LoggerFactory.getLogger(RocketMQTemplate.class);

    private DefaultMQProducer producer;

    private ObjectMapper objectMapper;

    private String charset = "UTF-8";

    private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash();

    private final Map<String, TransactionMQProducer> cache = new ConcurrentHashMap<>(); //only put TransactionMQProducer by now!!!

    public DefaultMQProducer getProducer() {
        return producer;
    }

    public void setProducer(DefaultMQProducer producer) {
        this.producer = producer;
    }

    public ObjectMapper getObjectMapper() {
        return objectMapper;
    }

    public void setObjectMapper(ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
    }

    public String getCharset() {
        return charset;
    }

    public void setCharset(String charset) {
        this.charset = charset;
    }

    public MessageQueueSelector getMessageQueueSelector() {
        return messageQueueSelector;
    }

    public void setMessageQueueSelector(MessageQueueSelector messageQueueSelector) {
        this.messageQueueSelector = messageQueueSelector;
    }

    //......
}
複製代碼
  • RocketMQTemplate默認建立的MessageQueueSelector是SelectMessageQueueByHash

小結

MessageQueueSelector接口定義了select方法,返回MessageQueue;它有幾個實現類,分別是SelectMessageQueueByHash、SelectMessageQueueByRandom、SelectMessageQueueByMachineRoom;RocketMQTemplate默認建立的MessageQueueSelector是SelectMessageQueueByHashapp

doc

相關文章
相關標籤/搜索