第十八章:springboot 整合阿里雲 rocketMQ

1、rocketMQ簡介

消息隊列 RocketMQ 是阿里巴巴集團自主研發的專業消息中間件。 產品基於高可用分佈式集羣技術,提供消息訂閱和發佈、消息軌跡查詢、定時(延時)消息、資源統計、監控報警等一系列消息雲服務,是企業級互聯網架構的核心產品。 消息隊列 RocketMQ 歷史超過9年,爲分佈式應用系統提供異步解耦、削峯填谷的能力,同時具有海量消息堆積、高吞吐、可靠重試等互聯網應用所需的特性,是阿里巴巴雙11使用的核心產品。java

 

2、rocketMQ的使用

打開阿里雲產品,找到 rocketMQspring

 

這裏須要咱們根據須要開通包年仍是包月服務,開通成功後進入控制檯json

根據提示建立實例、建立Topics、建立Groupspringboot

建立好了以後,打開 Topic 管理,手動發送一條消息架構

能夠看到發送成功後會返回信息的 messageIDapp

 

3、整合 springboot

首先引入 pom 異步

<!--消息隊列 RocketMQ-->
<dependency>
   <groupId>com.aliyun.openservices</groupId>
   <artifactId>ons-client</artifactId>
   <version>1.7.9.Final</version>
</dependency>

定義 rocketMQ 配置分佈式

@Configuration
public class RocketMQConfig {


    public Properties getProperties(){

        Properties properties=new Properties();
        /**
         * 鍵的首字母必須大寫
         */
        properties.setProperty("AccessKey","**");
        //
        properties.setProperty("SecretKey","**");
        //
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
        // 順序消息消費失敗進行重試前的等待時間,單位(毫秒)
        properties.put(PropertyKeyConst.SuspendTimeMillis, "100");
        // 消息消費失敗時的最大重試次數
        properties.put(PropertyKeyConst.MaxReconsumeTimes, "20");
        //
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://MQ_INST_1944503281593155_BaOTPbFU.mq-internet-access.mq-internet.aliyuncs.com:80");
        return  properties;
    }
}

AccessKey、SecretKey 可在阿里雲我的信息中找到ide

NAMESRV_ADDR  是實例的接入點測試

 

定義消息發送者

@Component
public class RocketMQProducer {
    @Autowired
    private RocketMQConfig rocketMQConfig;

    /**
     * 一、發送普通消息
     *
     * @param message
     * @return
     */
    public boolean sendNormalMessage(Message message,String groupId) {

        Properties properties=rocketMQConfig.getProperties();
        properties.setProperty(PropertyKeyConst.GROUP_ID,groupId);
        Producer producer = ONSFactory.createProducer(properties);
        // 在發送消息前,必須調用 start 方法來啓動 Producer,只需調用一次便可
        producer.start();
        try {
            SendResult sendResult = producer.send(message);
            // 同步發送消息,只要不拋異常就是成功
            if (sendResult != null) {
                System.out.println("消息發送成功:messageID:"+sendResult.getMessageId());
                return true;
            }
        } catch (Exception e) {
            // 消息發送失敗,須要進行重試處理,可從新發送這條消息或持久化這條數據進行補償處理
            e.printStackTrace();
        }
        return false;
    }
}

定義消息消費者

@Component
public class RocketMQConsumer {

    @Autowired
    private RocketMQConfig rocketMQConfig;


    /**
     * 一、普通訂閱
     *
     * @param
     */
    public void normalSubscribe( ) {

        Properties properties = rocketMQConfig.getProperties();

        properties.put(PropertyKeyConst.GROUP_ID, "GID-test");

        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("test", "*", new MessageListener() {
            @Override
            public Action consume(Message message, ConsumeContext context) {
                System.out.println("Receive: " + new String(message.getBody()));

                    //把消息轉化爲java對象
                    //JSONObject jsonObject=JSONObject.parseObject(jsonString);
                    //Book book= jsonObject.toJavaObject(Book.class);

                return Action.CommitMessage;
            }
        });

        consumer.start();
    }
}

測試類

@Autowired
	private RocketMQProducer rocketMQProducer;

	@Autowired
	private RocketMQConsumer rocketMQConsumer;

    //發送信息
	@RequestMapping("/send")
	public String send(String msg){
        // test 是建立的topic是名稱, tag 是消息的二級分類,能夠填空
		Message message=new Message("test","tag",msg.getBytes());
        // GID-test 是 發送信息組ID
		rocketMQProducer.sendNormalMessage(message,"GID-test");
		return "ok";
	}


    //接收信息
	@RequestMapping("/receive")
	public String receive(){
		rocketMQConsumer.normalSubscribe();
		return "ok";
	}

啓動項目,訪問 send 和 receive,控制檯打印以下

消息發送成功:messageID:C0A83292361818B4AAC23C548787000F
Receive: 測試

到這裏說明整合成功。最後咱們只須要在啓動項目的時候啓動消費者。spring 監聽器能夠實現,或者能夠經過實現接口 CommandLineRunner 

@Component
public class RocketConsumerListener implements CommandLineRunner {

    @Autowired
    private RocketMQConsumer rocketMQConsumer;

    @Override
    public void run(String... args) throws Exception {
        System.out.println("========rocketMQ消費者啓動==========");
        rocketMQConsumer.normalSubscribe();
    }
}

這樣在啓動項目的時候消費者也被啓動。到此springboot和rocketMQ的整合就完成啦。

相關文章
相關標籤/搜索