SpringBoot接入最新版RocketMq-Spring2.2.0,消費者指定返回消息處理狀態java
由於用的是RocketMq4.8.0,所以接入最新的rocketmq-springgit
首先引入依賴github
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.0</version> </dependency>
RocketMq-Spring提供了默認的rocket生產者,RocketMqTemplatespring
直接注入就能夠直接使用,默認的是讀取application.yml/properties文件裏的rocketMq默認配置路徑apache
@Autowired private RocketMqTemplate rocketMqTemplate
默認配置屬性以下app
rocketmq: name-server: localhost:9876 producer: group: audit-group
也能夠自定義RocketMqTemplate,這樣就能夠同時擁有多個不一樣配置的生產者,自定義生產者很是簡單,只須要直接繼承RocketMqTemlate就能夠了,而後從註解中配置屬性:異步
@ExtRocketMQTemplateConfiguration(group = "audit-test",nameServer = "localhost:9876") public class MyRocketMqTemplate extends RocketMQTemplate { }
上面那個註解中能夠配置不少屬性,能夠直接賦值讀取,也能夠用表達式好比${myrocket.nameserver}從yml/properties配置文件中讀取,要用的時候直接注入這個類的對象就可使用了async
@Service public class Producer { String topic="Topic-test"; //若是須要標籤過濾的話,topic能夠是Topic:Tag的格式 String topicWithTag="Topic-test:TagA"; String msg="hi there!"; @Autowired private MyRocketMqTemplate rocketMQTemplate; //同步發送消息 public void sendSyncMsg(){ rocketMQTemplate.syncSend(topic,msg); } //異步發送消息 public void sendAsyncMsg(){ rocketMQTemplate.asyncSend(topic, msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { if(sendResult.getSendStatus()== SendStatus.SEND_OK){ System.out.println("send successful"); } } @Override public void onException(Throwable throwable) { System.out.println("send failed"); } }); } //發送順序消息,最好對應的topic的寫queue和讀queue都設置爲1 public void sendMsgOrderly(){ String hashKey="orderId:xxxx"; //hashKey的做用是指定queue,萬一topic存在多個queue,能夠指定順序消息生產在這個特色的queue上,好比用orderId指定 rocketMQTemplate.syncSendOrderly(topic,msg,hashKey); } }
再看看消費者的示例:ide
主要是@RocketMessageListener註解,在其中配置消費者的屬性,consumeMode能夠配置是順序消費仍是普通消費spring-boot
@Service @RocketMQMessageListener(topic = "audit-test",consumerGroup = "broker-a",consumeMode = ConsumeMode.ORDERLY) public class Consumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener { @Override public void onMessage(MessageExt messageExt) { System.out.println(messageExt.toString()); } @Override public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) { defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); defaultMQPushConsumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis())); } }
細心的讀者應該能看到,這裏的onMessage方法是void類型的,沒有返回狀態,與咱們平時用的不同,那若是消費失敗,怎麼返回RECONSUME_LATER的狀態呢,github上官方是回覆throw exception的時候會自動處理消息返回RECONSUME_LATER,可是目前在onMessage方法中是無法主動拋出異常的,後續我看官方怎麼回覆再更新