SpringBoot(17)---SpringBoot整合RocketMQ

#<center> SpringBoot整合RocketMQ </center> 上篇博客講解了服務器集羣部署RocketMQ 博客地址:RocketMQ(2)---Docker部署RocketMQ集羣html

這篇在上篇搭建好的基礎上,將SpringBoot整合RocketMQ實現生產消費。java

GitHub地址: https://github.com/yudiandemingzi/spring-boot-studygit

<font color=#FFD700> 1、搭建步驟 </font>

先說下技術大體架構github

SpringBoot2.1.6 + Maven3.5.4 + rocketmq4.3.0 + JDK1.8 +Lombok(插件)

一、添加rocketmq包

<!--注意: 這裏的版本,要和部署在服務器上的版本號一致-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.3.0</version>
        </dependency>

二、JmsConfig(配置類)

鏈接RocketMQ服務器配置類,這裏爲了方便直接寫成常量。spring

/**
 * @Description: 安裝實際開發這裏的信息 都是應該寫在配置裏,來讀取,這裏爲了方便因此寫成常量
 */
public class JmsConfig {
    /**
     * Name Server 地址,由於是集羣部署 因此有多個用 分號 隔開
     */
    public static final String NAME_SERVER = "127.12.15.6:9876;127.12.15.6:9877";
    /**
     * 主題名稱 主題通常是服務器設置好 而不能在代碼裏去新建topic( 若是沒有建立好,生產者往該主題發送消息 會報找不到topic錯誤)
     */
    public static final String TOPIC = "topic_family";

}

三、Producer (生產者)

@Slf4j
@Component
public class Producer {
    private String producerGroup = "test_producer";
    private DefaultMQProducer producer;
    
    public Producer(){
        //示例生產者
        producer = new DefaultMQProducer(producerGroup);
        //不開啓vip通道 開通口端口會減2
        producer.setVipChannelEnabled(false);
        //綁定name server
        producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
        start();
    }
    /**
     * 對象在使用以前必需要調用一次,只能初始化一次
     */
    public void start(){
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
  
    public DefaultMQProducer getProducer(){
        return this.producer;
    }
    /**
     * 通常在應用上下文,使用上下文監聽器,進行關閉
     */
    public void shutdown(){
        this.producer.shutdown();
    }
}

四、Consumer (消費者)

@Slf4j
@Component
public class Consumer {

    /**
     * 消費者實體對象
     */
    private DefaultMQPushConsumer consumer;
    /**
     * 消費者組
     */
    public static final String CONSUMER_GROUP = "test_consumer";
    /**
     * 經過構造函數 實例化對象
     */
    public Consumer() throws MQClientException {

        consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
        consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
        //消費模式:一個新的訂閱組第一次啓動從隊列的最後位置開始消費 後續再啓動接着上次消費的進度開始消費
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        //訂閱主題和 標籤( * 表明全部標籤)下信息
        consumer.subscribe(JmsConfig.TOPIC, "*");
        // //註冊消費的監聽 並在此監聽中消費信息,並返回消費的狀態信息
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            // msgs中只收集同一個topic,同一個tag,而且key相同的message
            // 會把不一樣的消息分別放置到不一樣的隊列中
            try {
                for (Message msg : msgs) {

                    //消費者獲取消息 這裏只輸出 不作後面邏輯處理
                    String body = new String(msg.getBody(), "utf-8");
                    log.info("Consumer-獲取消息-主題topic爲={}, 消費消息爲={}", msg.getTopic(), body);
                }
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        consumer.start();
        System.out.println("消費者 啓動成功=======");
    }
}

大體就是這邊簡單,下面就是測試。apache

<br>服務器

<font color=#FFD700>2、測試</font>

先寫個測試接口進行測試。架構

一、Controller

@Slf4j
@RestController
public class Controller {

    @Autowired
    private Producer producer;

    private List<String> mesList;

    /**
     * 初始化消息
     */
    public Controller() {
        mesList = new ArrayList<>();
        mesList.add("小小");
        mesList.add("爸爸");
        mesList.add("媽媽");
        mesList.add("爺爺");
        mesList.add("奶奶");

    }

    @RequestMapping("/text/rocketmq")
    public Object callback() throws Exception {
        //總共發送五次消息
        for (String s : mesList) {
            //建立生產信息
            Message message = new Message(JmsConfig.TOPIC, "testtag", ("小小一家人的稱謂:" + s).getBytes());
            //發送
            SendResult sendResult = producer.getProducer().send(message);
            log.info("輸出生產者信息={}",sendResult);
        }
        return "成功";
    } 
}

二、測試結果

很明顯生產發送消息已經成功,二消費者也成功接收了消息!app

另外咱們再來看下RocketMQ控制檯是否也有消費記錄分佈式

很明顯在控制檯這邊也會有消費記錄!

總結這邊只是簡單的整合,後面會經過RocketMQ實現分佈式事務,能夠用於線上實際環境中,到時候會深刻講解下源碼。

<br> <br>

只要本身變優秀了,其餘的事情纔會跟着好起來(中將10)

原文出處:https://www.cnblogs.com/qdhxhz/p/11109696.html

相關文章
相關標籤/搜索