本文主要研究一下rocketmq的LitePullConsumerjava
rocketmq-all-4.6.0-source-release/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.javagit
public interface LitePullConsumer { void start() throws MQClientException; void shutdown(); void subscribe(final String topic, final String subExpression) throws MQClientException; void subscribe(final String topic, final MessageSelector selector) throws MQClientException; void unsubscribe(final String topic); void assign(Collection<MessageQueue> messageQueues); List<MessageExt> poll(); List<MessageExt> poll(long timeout); void seek(MessageQueue messageQueue, long offset) throws MQClientException; void pause(Collection<MessageQueue> messageQueues); void resume(Collection<MessageQueue> messageQueues); boolean isAutoCommit(); void setAutoCommit(boolean autoCommit); Collection<MessageQueue> fetchMessageQueues(String topic) throws MQClientException; Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException; void commitSync(); Long committed(MessageQueue messageQueue) throws MQClientException; void registerTopicMessageQueueChangeListener(String topic, TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException; }
rocketmq-all-4.6.0-source-release/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.javagithub
public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer { private final DefaultLitePullConsumerImpl defaultLitePullConsumerImpl; private String consumerGroup; private long brokerSuspendMaxTimeMillis = 1000 * 20; private long consumerTimeoutMillisWhenSuspend = 1000 * 30; private long consumerPullTimeoutMillis = 1000 * 10; private MessageModel messageModel = MessageModel.CLUSTERING; private MessageQueueListener messageQueueListener; private OffsetStore offsetStore; private AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMessageQueueAveragely(); private boolean unitMode = false; private boolean autoCommit = true; private int pullThreadNums = 20; private long autoCommitIntervalMillis = 5 * 1000; private int pullBatchSize = 10; private long pullThresholdForAll = 10000; private int consumeMaxSpan = 2000; private int pullThresholdForQueue = 1000; private int pullThresholdSizeForQueue = 100; private long pollTimeoutMillis = 1000 * 5; private long topicMetadataCheckIntervalMillis = 30 * 1000; public DefaultLitePullConsumer() { this(null, MixAll.DEFAULT_CONSUMER_GROUP, null); } public DefaultLitePullConsumer(final String consumerGroup) { this(null, consumerGroup, null); } public DefaultLitePullConsumer(RPCHook rpcHook) { this(null, MixAll.DEFAULT_CONSUMER_GROUP, rpcHook); } public DefaultLitePullConsumer(final String consumerGroup, RPCHook rpcHook) { this(null, consumerGroup, rpcHook); } public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) { this.namespace = namespace; this.consumerGroup = consumerGroup; defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook); } @Override public void start() throws MQClientException { this.defaultLitePullConsumerImpl.start(); } @Override public void shutdown() { this.defaultLitePullConsumerImpl.shutdown(); } @Override public void subscribe(String topic, String subExpression) throws MQClientException { this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), subExpression); } @Override public void subscribe(String topic, MessageSelector messageSelector) throws MQClientException { this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), messageSelector); } @Override public void unsubscribe(String topic) { this.defaultLitePullConsumerImpl.unsubscribe(withNamespace(topic)); } @Override public void assign(Collection<MessageQueue> messageQueues) { defaultLitePullConsumerImpl.assign(queuesWithNamespace(messageQueues)); } @Override public List<MessageExt> poll() { return defaultLitePullConsumerImpl.poll(this.getPollTimeoutMillis()); } @Override public List<MessageExt> poll(long timeout) { return defaultLitePullConsumerImpl.poll(timeout); } @Override public void seek(MessageQueue messageQueue, long offset) throws MQClientException { this.defaultLitePullConsumerImpl.seek(queueWithNamespace(messageQueue), offset); } @Override public void pause(Collection<MessageQueue> messageQueues) { this.defaultLitePullConsumerImpl.pause(queuesWithNamespace(messageQueues)); } @Override public void resume(Collection<MessageQueue> messageQueues) { this.defaultLitePullConsumerImpl.resume(queuesWithNamespace(messageQueues)); } @Override public Collection<MessageQueue> fetchMessageQueues(String topic) throws MQClientException { return this.defaultLitePullConsumerImpl.fetchMessageQueues(withNamespace(topic)); } @Override public Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException { return this.defaultLitePullConsumerImpl.searchOffset(queueWithNamespace(messageQueue), timestamp); } @Override public void registerTopicMessageQueueChangeListener(String topic, TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException { this.defaultLitePullConsumerImpl.registerTopicMessageQueueChangeListener(withNamespace(topic), topicMessageQueueChangeListener); } @Override public void commitSync() { this.defaultLitePullConsumerImpl.commitSync(); } @Override public Long committed(MessageQueue messageQueue) throws MQClientException { return this.defaultLitePullConsumerImpl.committed(messageQueue); } @Override public boolean isAutoCommit() { return autoCommit; } @Override public void setAutoCommit(boolean autoCommit) { this.autoCommit = autoCommit; } //...... }
rocketmq6.0引入了LitePullConsumer,解決Add lite pull consumer support for RocketMQ #1388,提供了以下功能:apache
(1) Support consume messages in subscribe way with auto rebalance. (2) Support consume messages in assign way with no auto rebalance support. (3) Add seek/commit offset for a specified message queue.