SpringBoot接入最新版RocketMq-Spring2.2.0,消費者指定返回消息處理狀態

SpringBoot接入最新版RocketMq-Spring2.2.0,消費者指定返回消息處理狀態java

由於用的是RocketMq4.8.0,所以接入最新的rocketmq-springgit

首先引入依賴github

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>

RocketMq-Spring提供了默認的rocket生產者,RocketMqTemplatespring

直接注入就能夠直接使用,默認的是讀取application.yml/properties文件裏的rocketMq默認配置路徑apache

@Autowired
private RocketMqTemplate rocketMqTemplate

默認配置屬性以下app

rocketmq:
  name-server: localhost:9876
  producer:
    group: audit-group

也能夠自定義RocketMqTemplate,這樣就能夠同時擁有多個不一樣配置的生產者,自定義生產者很是簡單,只須要直接繼承RocketMqTemlate就能夠了,而後從註解中配置屬性:異步

@ExtRocketMQTemplateConfiguration(group = "audit-test",nameServer = "localhost:9876")
public class MyRocketMqTemplate extends RocketMQTemplate {
}

上面那個註解中能夠配置不少屬性,能夠直接賦值讀取,也能夠用表達式好比${myrocket.nameserver}從yml/properties配置文件中讀取,要用的時候直接注入這個類的對象就可使用了async

@Service
public class Producer {

    String topic="Topic-test";
    //若是須要標籤過濾的話,topic能夠是Topic:Tag的格式
    String topicWithTag="Topic-test:TagA";
    String msg="hi there!";

    @Autowired
    private MyRocketMqTemplate rocketMQTemplate;

    //同步發送消息
    public void sendSyncMsg(){
        rocketMQTemplate.syncSend(topic,msg);
    }
    
    //異步發送消息
    public void sendAsyncMsg(){
        rocketMQTemplate.asyncSend(topic, msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                if(sendResult.getSendStatus()== SendStatus.SEND_OK){
                    System.out.println("send successful");
                }
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.println("send failed");
            }
        });
    }
    
    //發送順序消息,最好對應的topic的寫queue和讀queue都設置爲1
    public void sendMsgOrderly(){
        String hashKey="orderId:xxxx";
        //hashKey的做用是指定queue,萬一topic存在多個queue,能夠指定順序消息生產在這個特色的queue上,好比用orderId指定
        rocketMQTemplate.syncSendOrderly(topic,msg,hashKey);
    }
}

再看看消費者的示例:ide

主要是@RocketMessageListener註解,在其中配置消費者的屬性,consumeMode能夠配置是順序消費仍是普通消費spring-boot

@Service
@RocketMQMessageListener(topic = "audit-test",consumerGroup = "broker-a",consumeMode = ConsumeMode.ORDERLY)
public class Consumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {
    @Override
    public void onMessage(MessageExt messageExt) {
        System.out.println(messageExt.toString());
    }

    @Override
    public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
        defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        defaultMQPushConsumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
    }
}

細心的讀者應該能看到,這裏的onMessage方法是void類型的,沒有返回狀態,與咱們平時用的不同,那若是消費失敗,怎麼返回RECONSUME_LATER的狀態呢,github上官方是回覆throw exception的時候會自動處理消息返回RECONSUME_LATER,可是目前在onMessage方法中是無法主動拋出異常的,後續我看官方怎麼回覆再更新

相關文章
相關標籤/搜索