SpringBoot+Rocketmq

@PostConstruct:用於在依賴關係注入完成以後須要執行的方法上,以執行任何初始化。此方法必須在將類放入服務以前調用。
@PreDestroy:在開發中咱們若是要在關閉spring容器後釋放一些資源,就在這個類中寫一個被@PreDestroy的方法。今天就由於這個浪費了好長時間,mq的生產者啓動以後沒有被銷燬,致使我用idea結束程序以後端口號依然被佔用,每次再啓動都要殺進程。。。

先把最簡單的代碼貼出來,只有最基本的發送接收功能

pom.xml
<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-common -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-common</artifactId>
            <version>4.3.0</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.3.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.49</version>
        </dependency>
    </dependencies>

application.propertiesjava

apache.rocketmq.consumer.PushConsumer=PushConsumer apache.rocketmq.producer.producerGroup=Producer apache.rocketmq.namesrvAddr=localhost:9876

TestController.javaweb

package com.rmqspringtest.demo; import com.rmqspringtest.demo.producer.ProducerService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class TestController { @Autowired private ProducerService producer; @RequestMapping("/push") public String pushMsg(String msg) { return producer.send("test1", "push", msg); } }

ConsumerService.javaspring

package com.rmqspringtest.demo.consumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @Component public class ConsumerService { @Value("${apache.rocketmq.consumer.PushConsumer}") private String consumerGroup; @Value("${apache.rocketmq.namesrvAddr}") private String namesrvAddr; @PostConstruct public void defaultMQPushConsumer() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); consumer.setNamesrvAddr(namesrvAddr); try { consumer.subscribe("test1", "push"); // 若是是第一次啓動,從隊列頭部開始消費 // 若是不是第一次啓動,從上次消費的位置繼續消費
 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> { try { for (MessageExt messageExt : list) { String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET); System.out.println("[Consumer] msgID(" + messageExt.getMsgId() + ") msgBody : " + messageBody); } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); System.out.println("[Consumer 已啓動]"); } catch (Exception e) { e.printStackTrace(); } } }

ProducerService.javaapache

package com.rmqspringtest.demo.producer; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; @Service public class ProducerService { @Value("${apache.rocketmq.producer.producerGroup}") private String producerGroup; @Value("${apache.rocketmq.namesrvAddr}") private String namesrvAddr; private DefaultMQProducer producer; @PostConstruct public void initProducer() { producer = new DefaultMQProducer(producerGroup); producer.setNamesrvAddr(namesrvAddr); producer.setRetryTimesWhenSendFailed(3); try { producer.start(); System.out.println("[Producer 已啓動]"); } catch (Exception e) { e.printStackTrace(); } } public String send(String topic, String tags, String msg) { SendResult result = null; try { Message message = new Message(topic, tags, msg.getBytes(RemotingHelper.DEFAULT_CHARSET)); result = producer.send(message); System.out.println("[Producer] msgID(" + result.getMsgId() + ") " + result.getSendStatus()); } catch (Exception e) { e.printStackTrace(); } return "{\"MsgId\":\"" + result.getMsgId() + "\"}"; } @PreDestroy public void shutDownProducer() { if (producer != null) { producer.shutdown(); } } }

cmd中執行命令開啓服務:json

start mqnamesrv後端

start mqbroker -n 127.0.0.1:9876app

發送請求:127.0.0.1:8080/push?msg=helloide

okspring-boot

 

 

 

 

RocketMQTemplate 這玩意 看一下
相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息