阿里雲消息隊列RocketMQ使用示例

本文代碼示例參見:https://gitee.com/imlichao/RocketMQ-examplehtml

Apache RocketMQ文檔:http://rocketmq.apache.org/docs/quick-start/java

阿里雲RocketMQ文檔:https://help.aliyun.com/product/29530.htmlgit

簡介

消息隊列 RocketMQ 是阿里巴巴集團自主研發的專業消息中間件,基於高可用分佈式集羣技術,提供消息訂閱和發佈、消息軌跡查詢以及定時(延時)消息、資源統計、監控報警等一系列消息雲服務,是企業級互聯網架構的核心產品。 消息隊列 RocketMQ 歷史超過9年,爲分佈式應用系統提供異步解耦、削峯填谷的能力,同時具有海量消息堆積、高吞吐、可靠重試等互聯網應用所需的特性,是阿里巴巴雙11使用的核心產品。web

消息隊列 RocketMQ 是阿里雲正式商用的產品,目前在阿里雲多個地域(Region)提供了高可用消息雲服務,單個域內採用多機房部署,可用性極高,即便整個機房都不可用,仍然能夠爲應用提供消息發佈服務,產品穩定性及可用性徹底按照阿里巴巴內部標準來實施,無單點。spring

消息隊列 RocketMQ 目前提供 TCP 和 HTTP 協議層面的接入方式,支持 Java、C++、 .NET、Go、Python、Nodejs、PHP 這七種編程語言,方便不一樣編程語言開發的應用快速接入消息隊列 RocketMQ 消息雲服務。 用戶能夠將應用部署在阿里雲 ECS、企業自建雲,或者嵌入到移動端、物聯網設備中與消息隊列 RocketMQ 創建鏈接進行消息收發,同時本地開發者也能夠經過公網接入消息隊列 RocketMQ 服務進行消息收發。apache

 

消息收發模型

消息隊列 RocketMQ 支持「發佈/訂閱」模型,消息發佈者(生產者)能夠將一條消息發送服務端的某個主題(Topic),多個消息接收方(消費者)訂閱這個主題以接收該消息,以下圖所示:編程

 

示例

本例使用阿里雲RocketMQ產品,其中用戶名密碼地址等使用「XXXXXX」表示。api

增長maven依賴服務器

<!-- 增長RocketMQ依賴 -->
<dependency>
  <groupId>com.aliyun.openservices</groupId>
  <artifactId>ons-client</artifactId>
  <version>1.8.0.Final</version>
</dependency>

配置類架構

本例使用兩個同組下的消費者共同消費,而且消息採用exactly-once語義保證每一個消息只被消費一次

package pub.imlichao.config;

import com.aliyun.openservices.ons.api.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Properties;


/**
 * RocketMQ配置
 */
@Configuration
public class RocketMQConfig {
	/**
	 * 生產者配置
	 * @return
	 */
	@Bean
	public Producer  producer () {
		Properties properties = new Properties();
		// 鑑權用 AccessKey,在阿里雲服務器管理控制檯建立
		properties.put(PropertyKeyConst.AccessKey,"XXXXXX");
		// 鑑權用 SecretKey,在阿里雲服務器管理控制檯建立
		properties.put(PropertyKeyConst.SecretKey, "XXXXXX");
		// 設置 TCP 接入域名,進入控制檯的實例管理頁面,在頁面上方選擇實例後,在實例信息中的「獲取接入點信息」區域查看
		properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXXXXX");
		//經過 PropertyKeyConst.EXACTLYONCE_DELIVERY 開啓 exactly-once 投遞語義(保證擁有多個消費者時消息只被消費一次)
		properties.put(PropertyKeyConst.EXACTLYONCE_DELIVERY, "true");
		Producer producer = ONSFactory.createProducer(properties);
		// 在發送消息前,必須調用 start 方法來啓動 Producer,只需調用一次便可
		producer.start();

		return producer;
	}

	/**
	 * 消費者1配置
	 * @return
	 */
	@Bean
	public Consumer consumer1 () {
		Properties properties = new Properties();
		// 您在控制檯建立的 Group ID
		properties.put(PropertyKeyConst.GROUP_ID, "GID_pmall_consumer");
		// 鑑權用 AccessKey,在阿里雲服務器管理控制檯建立
		properties.put(PropertyKeyConst.AccessKey,"XXXXXX");
		// 鑑權用 SecretKey,在阿里雲服務器管理控制檯建立
		properties.put(PropertyKeyConst.SecretKey, "XXXXXX");
		// 設置 TCP 接入域名,進入控制檯的實例管理頁面,在頁面上方選擇實例後,在實例信息中的「獲取接入點信息」區域查看
		properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXXXXX");
		Consumer consumer = ONSFactory.createConsumer(properties);
		//建立消息監聽和消息處理邏輯
		consumer.subscribe("pmall_message", "data_storage", new MessageListener() {
			@Override
			public Action consume(Message message, ConsumeContext context) {
				System.out.println("Receive1: " +  new String (message.getBody()) + " " + message.getMsgID());
				return Action.CommitMessage;
			}
		});
		//啓動監聽
		consumer.start();

		return consumer;
	}

	/**
	 * 消費者2配置
	 * @return
	 */
	@Bean
	public Consumer consumer2 () {
		Properties properties = new Properties();
		// 您在控制檯建立的 Group ID
		properties.put(PropertyKeyConst.GROUP_ID, "GID_pmall_consumer");
		// 鑑權用 AccessKey,在阿里雲服務器管理控制檯建立
		properties.put(PropertyKeyConst.AccessKey,"XXXXXX");
		// 鑑權用 SecretKey,在阿里雲服務器管理控制檯建立
		properties.put(PropertyKeyConst.SecretKey, "XXXXXX");
		// 設置 TCP 接入域名,進入控制檯的實例管理頁面,在頁面上方選擇實例後,在實例信息中的「獲取接入點信息」區域查看
		properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXXXXX");
		Consumer consumer = ONSFactory.createConsumer(properties);
		//建立消息監聽和消息處理邏輯
		consumer.subscribe("pmall_message", "data_storage", new MessageListener() {
			@Override
			public Action consume(Message message, ConsumeContext context) {
				System.out.println("Receive2: " + new String (message.getBody()) + " " + message.getMsgID());
				return Action.CommitMessage;
			}
		});
		//啓動監聽
		consumer.start();

		return consumer;
	}
}

發送消息

package pub.imlichao;

import com.aliyun.openservices.ons.api.*;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import pub.imlichao.config.RocketMQConfig;

import javax.annotation.Resource;
import java.util.Date;

@Controller
public class ProducerTest {
    @Resource
    private RocketMQConfig rocketMQConfig;

    /**
     * 向RocketMQ發送消息
     * @return
     */
    @GetMapping(value = "/send")
    public String send(){
        //循環發送消息100次
        for (int i =0;i<100;i++){
            //建立消息
            Message msg = new Message(
                    // 在控制檯建立的 Topic,即該消息所屬的 Topic 名稱
                    "pmall_message",
                    // Message Tag。可理解爲 Gmail 中的標籤,對消息進行再歸類,方便 Consumer 指定過濾條件在消息隊列 RocketMQ 服務器過濾
                    "data_storage",
                    // Message Body。任何二進制形式的數據,消息隊列 RocketMQ 不作任何干預。須要 Producer 與 Consumer 協商好一致的序列化和反序列化方式
                    ("pmall MQ "+ new Date()).getBytes());

            // 設置表明消息的業務關鍵屬性,請儘量全局惟一,以方便您在沒法正常收到消息狀況下,可經過控制檯查詢消息並補發。注意:不設置也不會影響消息正常收發
            msg.setKey("data_id");
            // 發送消息,只要不拋異常就是成功
            SendResult sendResult = rocketMQConfig.producer().send(msg);
            //打印 Message ID,以便用於消息發送狀態查詢
            System.out.println("Send Message success. Message ID is: " + sendResult.getMessageId());
        }
        return "redirect:/";
    }
}
相關文章
相關標籤/搜索