業務場景之類的請看另外一篇生產者的實現;java
package com.ttt.eee; import com.aliyun.openservices.ons.api.Action; import com.aliyun.openservices.ons.api.Consumer; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import java.nio.charset.Charset; import java.util.Properties; import java.util.Scanner; public class MQTestConsumer { public static void main(String[] args) { Properties properties = new Properties(); // 您在控制檯建立的 Group ID,其實就是網上說的groupName properties.put(PropertyKeyConst.GROUP_ID, "eee"); // AccessKey 阿里雲身份驗證,在阿里雲服務器管理控制檯建立 properties.put(PropertyKeyConst.AccessKey, "sss"); // SecretKey 阿里雲身份驗證,在阿里雲服務器管理控制檯建立 properties.put(PropertyKeyConst.SecretKey, "bbb"); // 設置 TCP 接入域名,到控制檯的實例基本信息中查看 properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://ttt.mq-internet-access.mq-internet.aliyuncs.com:80"); // 集羣訂閱方式 (默認) // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING); // 廣播訂閱方式 // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING); Consumer consumer = ONSFactory.createConsumer(properties); // TODO tag若是是*表示訂閱全部的tag消息,注意在producer裏是叫tags,這裏卻叫subExpresion // *表示訂閱全部Tag,TagA||TagB表示訂閱 TagA和TagB consumer.subscribe("xxx-change", "*", (message, context) -> { // context的用處暫時不知道 System.out.println("Receive: " + message); System.out.println("具體消息爲:" + new String(message.getBody(), Charset.forName("UTF-8"))); // 正常消費返回這個,若是消費消息後業務處理出現問題通常返回:Action.ReconsumeLater表示這條消息晚點處理; return Action.CommitMessage; }); //訂閱另一個 Topic // TODO 一個Consumer能夠訂閱多個topic ??,不過既然是官網的例子應該是能夠的 /*consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { //訂閱所有 Tag public Action consume(Message message, ConsumeContext context) { System.out.println("Receive: " + message); return Action.CommitMessage; } });*/ consumer.start(); System.out.println("Consumer Started"); var scanner = new Scanner(System.in); scanner.next(); consumer.shutdown(); System.out.println("closed producer conn."); } }
集合到Spring裏是:api
@Bean(initMethod = "start", destroyMethod = "shutdown") public ConsumerBean xxxNotify(XxxNotifyListener xxxNotifyListener) { return this.getConsumer(gid, topic, xxxNotifyListener); } private ConsumerBean getConsumer(String gid, String topic, MessageListener messageListener) { Properties properties = new Properties(); properties.setProperty("addr", addr); properties.setProperty("AccessKey", accessKey); properties.setProperty("SecretKey", secretKey); properties.setProperty("GROUP_ID", gid); Map<Subscription, MessageListener> subscriptionTable = new HashMap<>(); Subscription subscription = new Subscription(); subscription.setTopic(topic); // 這裏還能夠設置subExpression來描述tag subscriptionTable.put(subscription, messageListener); ConsumerBean consumer = new ConsumerBean(); consumer.setProperties(properties); consumer.setSubscriptionTable(subscriptionTable); return consumer; }
MessageListener裏是用來實現消費這個消息後的具體業務邏輯的;服務器