阿里雲RocketMQ的消費者簡單實現

業務場景之類的請看另外一篇生產者的實現;java

package com.ttt.eee;

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import java.nio.charset.Charset;
import java.util.Properties;
import java.util.Scanner;

public class MQTestConsumer {

    public static void main(String[] args) {
        Properties properties = new Properties();
        // 您在控制檯建立的 Group ID,其實就是網上說的groupName
        properties.put(PropertyKeyConst.GROUP_ID, "eee");
        // AccessKey 阿里雲身份驗證,在阿里雲服務器管理控制檯建立
        properties.put(PropertyKeyConst.AccessKey, "sss");
        // SecretKey 阿里雲身份驗證,在阿里雲服務器管理控制檯建立
        properties.put(PropertyKeyConst.SecretKey, "bbb");
        // 設置 TCP 接入域名,到控制檯的實例基本信息中查看
        properties.put(PropertyKeyConst.NAMESRV_ADDR,
                       "http://ttt.mq-internet-access.mq-internet.aliyuncs.com:80");
        // 集羣訂閱方式 (默認)
        // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
        // 廣播訂閱方式
        // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
        Consumer consumer = ONSFactory.createConsumer(properties);
        // TODO tag若是是*表示訂閱全部的tag消息,注意在producer裏是叫tags,這裏卻叫subExpresion
        // *表示訂閱全部Tag,TagA||TagB表示訂閱 TagA和TagB
        consumer.subscribe("xxx-change", "*", (message, context) -> {
            // context的用處暫時不知道
            System.out.println("Receive: " + message);
            System.out.println("具體消息爲:" + new String(message.getBody(), Charset.forName("UTF-8")));
            // 正常消費返回這個,若是消費消息後業務處理出現問題通常返回:Action.ReconsumeLater表示這條消息晚點處理;
            return Action.CommitMessage;
        });
        //訂閱另一個 Topic
        // TODO 一個Consumer能夠訂閱多個topic ??,不過既然是官網的例子應該是能夠的
        /*consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { //訂閱所有 Tag
            public Action consume(Message message, ConsumeContext context) {
                System.out.println("Receive: " + message);
                return Action.CommitMessage;
            }
        });*/
        consumer.start();
        System.out.println("Consumer Started");

        var scanner = new Scanner(System.in);
        scanner.next();

        consumer.shutdown();
        System.out.println("closed producer conn.");
    }

}

集合到Spring裏是:api

@Bean(initMethod = "start", destroyMethod = "shutdown")
    public ConsumerBean xxxNotify(XxxNotifyListener xxxNotifyListener) {
        return this.getConsumer(gid, topic, xxxNotifyListener);
    }

    private ConsumerBean getConsumer(String gid, String topic, MessageListener messageListener) {
        Properties properties = new Properties();
        properties.setProperty("addr", addr);
        properties.setProperty("AccessKey", accessKey);
        properties.setProperty("SecretKey", secretKey);
        properties.setProperty("GROUP_ID", gid);

        Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
        Subscription subscription = new Subscription();
        subscription.setTopic(topic);
        // 這裏還能夠設置subExpression來描述tag
        subscriptionTable.put(subscription, messageListener);

        ConsumerBean consumer = new ConsumerBean();
        consumer.setProperties(properties);
        consumer.setSubscriptionTable(subscriptionTable);

        return consumer;
    }

MessageListener裏是用來實現消費這個消息後的具體業務邏輯的;服務器

相關文章
相關標籤/搜索