SpringBoot整合RocketMQ

上篇博客講解了服務器集羣部署RocketMQ 博客地址:RocketMQ(2)---Docker部署RocketMQ集羣html

這篇在上篇搭建好的基礎上,將SpringBoot整合RocketMQ實現生產消費。git

GitHub地址https://github.com/yudiandemingzi/spring-boot-studygithub

1、搭建步驟

先說下技術大體架構spring

SpringBoot2.1.6 + Maven3.5.4 + rocketmq4.3.0 + JDK1.8 +Lombok(插件)

一、添加rocketmq包

<!--注意: 這裏的版本,要和部署在服務器上的版本號一致-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.3.0</version>
        </dependency>

二、JmsConfig(配置類)

鏈接RocketMQ服務器配置類,這裏爲了方便直接寫成常量。apache

/** * @Description: 安裝實際開發這裏的信息 都是應該寫在配置裏,來讀取,這裏爲了方便因此寫成常量 */
public class JmsConfig { /** * Name Server 地址,由於是集羣部署 因此有多個用 分號 隔開 */
    public static final String NAME_SERVER = "127.12.15.6:9876;127.12.15.6:9877"; /** * 主題名稱 主題通常是服務器設置好 而不能在代碼裏去新建topic( 若是沒有建立好,生產者往該主題發送消息 會報找不到topic錯誤) */
    public static final String TOPIC = "topic_family"; }

三、Producer (生產者)

@Slf4j @Component public class Producer { private String producerGroup = "test_producer"; private DefaultMQProducer producer; public Producer(){ //示例生產者
        producer = new DefaultMQProducer(producerGroup); //不開啓vip通道 開通口端口會減2
        producer.setVipChannelEnabled(false); //綁定name server
 producer.setNamesrvAddr(JmsConfig.NAME_SERVER); start(); } /** * 對象在使用以前必需要調用一次,只能初始化一次 */
    public void start(){ try { this.producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } public DefaultMQProducer getProducer(){ return this.producer; } /** * 通常在應用上下文,使用上下文監聽器,進行關閉 */
    public void shutdown(){ this.producer.shutdown(); } }

四、Consumer (消費者)

@Slf4j @Component public class Consumer { /** * 消費者實體對象 */
    private DefaultMQPushConsumer consumer; /** * 消費者組 */
    public static final String CONSUMER_GROUP = "test_consumer"; /** * 經過構造函數 實例化對象 */
    public Consumer() throws MQClientException { consumer = new DefaultMQPushConsumer(CONSUMER_GROUP); consumer.setNamesrvAddr(JmsConfig.NAME_SERVER); //消費模式:一個新的訂閱組第一次啓動從隊列的最後位置開始消費 後續再啓動接着上次消費的進度開始消費
 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //訂閱主題和 標籤( * 表明全部標籤)下信息
        consumer.subscribe(JmsConfig.TOPIC, "*"); // //註冊消費的監聽 並在此監聽中消費信息,並返回消費的狀態信息
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { // msgs中只收集同一個topic,同一個tag,而且key相同的message // 會把不一樣的消息分別放置到不一樣的隊列中
            try { for (Message msg : msgs) { //消費者獲取消息 這裏只輸出 不作後面邏輯處理
                    String body = new String(msg.getBody(), "utf-8"); log.info("Consumer-獲取消息-主題topic爲={}, 消費消息爲={}", msg.getTopic(), body); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); System.out.println("消費者 啓動成功======="); } }

大體就是這邊簡單,下面就是測試。服務器

 

2、測試

先寫個測試接口進行測試。架構

一、Controller

@Slf4j @RestController public class Controller { @Autowired private Producer producer; private List<String> mesList; /** * 初始化消息 */
    public Controller() { mesList = new ArrayList<>(); mesList.add("小小"); mesList.add("爸爸"); mesList.add("媽媽"); mesList.add("爺爺"); mesList.add("奶奶"); } @RequestMapping("/text/rocketmq") public Object callback() throws Exception { //總共發送五次消息
        for (String s : mesList) { //建立生產信息
            Message message = new Message(JmsConfig.TOPIC, "testtag", ("小小一家人的稱謂:" + s).getBytes()); //發送
            SendResult sendResult = producer.getProducer().send(message); log.info("輸出生產者信息={}",sendResult); } return "成功"; } }

二、測試結果

 

 

很明顯生產發送消息已經成功,二消費者也成功接收了消息!app

另外咱們再來看下RocketMQ控制檯是否也有消費記錄分佈式

 

 

很明顯在控制檯這邊也會有消費記錄!函數

總結這邊只是簡單的整合,後面會經過RocketMQ實現分佈式事務,能夠用於線上實際環境中,到時候會深刻講解下源碼。

 

轉載於:http://www.javashuo.com/article/p-nfawozci-eh.html

相關文章
相關標籤/搜索