2018開工第一天,記錄一下

導論

記錄一下阿里雲消息服務與Spring的整合,以及ProducerId與ConsumerId的管理,其餘的消息服務也是相似(RocketMQ、Kafka),阿里雲消息服務性能仍是很可觀的,雖然收費,單也推薦使用。html

整合

消息服務的概念就不想多說了,須要的能夠去看官方文檔,參考文檔spring

建立topic

首先建立topic,以下圖填好信息就OK了。express

圖一

建立成功以後是這樣api

圖二

ProducerId的建立

圖三

ConsumerId的建立

圖四

整合spring

上面那些步驟信息填完整以後topic、ProducerId、ConsumerId都建立好了就可使用消息隊列了服務器

Producer的整合tcp

<bean id="producer" class="com.aliyun.openservices.ons.api.bean.ProducerBean" init-method="start"
          destroy-method="shutdown">
        <property name="properties"> <!--生產者配置信息-->
            <props>
                <!-- 生成者ID,須要提早在阿里雲建立  -->
                <prop key="ProducerId">PID-SIT-TransitHub-NotifyUnbind</prop> <!--請替換爲本身的帳戶信息-->
                <!-- AccessKey、SecretKey由阿里雲分配 -->
                <prop key="AccessKey">LTAIqfzogBNFeohh11</prop>
                <prop key="SecretKey">zoahuhZKscEk5Q8Qtr</prop>
                <!-- 根據本身服務器選擇不一樣的tcp接入url,此處選擇公網 -->
                <prop key="ONSAddr">http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet</prop>
            </props>
        </property>
    </bean>

Consumer的整合性能

<!-- 建立Listener將消費者處於阻塞狀態,只要有本身topic訂閱的消息發佈消息立刻就會訂閱到-->
<bean id="tsmDeleteAidMsgListener" class="com.snowball.hub.msg.DataMessageListener" /> Listener配置
<bean id="consumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean"
    init-method="start" destroy-method="shutdown">
    <property name="properties">
        <props>
            <prop key="ConsumerId">CID-SIT-OPS-NotifyUnbind</prop> 
            <prop key="AccessKey">${access_key}</prop>
            <prop key="SecretKey">${secret_key}</prop>
            <!--將消費者線程數固定爲50個,該線程不會和主業務線程耦合-->
            <prop key="ConsumeThreadNums">50</prop>
        </props>
    </property>
    <property name="subscriptionTable">
        <map>
            <entry value-ref="tsmDeleteAidMsgListener">
                <key>
                    <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
                        <!-- 此處填將以前建立的topic -->
                        <property name="topic" value="snb-test-topic4" />
                        <property name="expression" value="*" />
                        <!--
                        expression即Tag,能夠設置成具體的Tag,如 taga||tagb||tagc,也可設置成*。 *僅表明訂閱全部Tag,不支持通配
                        -->
                    </bean>
                </key>
            </entry>
            更多的訂閱添加entry節點便可
        </map>
    </property>
</bean>

Consumer的整合和Producer基本一致,不一樣的是須要建立一個Listener,做用已經在註釋中說明。學習

使用阿里雲sdk發佈和訂閱消息

上面只是整合了普通消息,阿里雲MQ消息分很四種,每一種的整合API都不同,具體整合細節能夠參考文章開始出的參考文檔。阿里雲

發佈消息
public class ProducerTest {
    
    //若是和spring整合了,那就直接注入就行了,本次使用傳統的發佈方式
    //@Autowired
    //private Producer producer;
    
    //topic的管理最好作成可配置,能夠對應不一樣的環境管理不一樣的topic,本次仍是使用傳統的//方式發佈
    //@Value("#{configProperties['send_unbind_topic']}")
    //private String send_unbind_topic;

 public static void main(String[] args) {
     Properties properties = new Properties();
     // 您在MQ控制檯建立的Producer ID
     properties.put(PropertyKeyConst.ProducerId, "XXX");
     // 鑑權用AccessKey,在阿里雲服務器管理控制檯建立
     properties.put(PropertyKeyConst.AccessKey,"XXX");
     // 鑑權用SecretKey,在阿里雲服務器管理控制檯建立
     properties.put(PropertyKeyConst.SecretKey, "XXX");
     // 設置 TCP 接入域名(此處以公共雲的公網接入爲例)
     properties.put(PropertyKeyConst.ONSAddr,
       "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");

     Producer producer = ONSFactory.createProducer(properties);
     // 在發送消息前,必須調用start方法來啓動Producer,只需調用一次便可
     producer.start();

     //循環發送消息
     while(true){
         Message msg = new Message( //
             // 在控制檯建立的Topic,即該消息所屬的Topic名稱
             "TopicTestMQ",
             // Message Tag,
             // 可理解爲Gmail中的標籤,對消息進行再歸類,方便Consumer指定過濾條件在MQ服務器過濾
             "TagA",
             // Message Body
             // 任何二進制形式的數據, MQ不作任何干預,
             // 須要Producer與Consumer協商好一致的序列化和反序列化方式
             "Hello MQ".getBytes());
         // 設置表明消息的業務關鍵屬性,請儘量全局惟一,以方便您在沒法正常收到消息狀況下,可經過MQ控制檯查詢消息並補發
         // 注意:不設置也不會影響消息正常收發
         msg.setKey("ORDERID_100");
         // 發送消息,只要不拋異常就是成功
         // 打印Message ID,以便用於消息發送狀態查詢
         SendResult sendResult = producer.send(msg);
         System.out.println("Send Message success. Message ID is: " + sendResult.getMessageId());
     }

     // 在應用退出前,能夠銷燬Producer對象
     // 注意:若是不銷燬也沒有問題
     producer.shutdown();
 }
}

消息發佈成功能夠看到sendResult是這樣的信息url

{"messageId":"0200010546D011E87BD078ACF4180003","topic":"TPC-SIT-COM-TransitHub-NotifyUnbind"}

根據messageId能夠定位這條消息的軌跡,能夠很清晰的定位消息的消費軌跡。
圖五

訂閱消息
public class ConsumerTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        // 您在MQ控制檯建立的Consumer ID
        properties.put(PropertyKeyConst.ConsumerId, "XXX");
        // 鑑權用AccessKey,在阿里雲服務器管理控制檯建立
        properties.put(PropertyKeyConst.AccessKey, "XXX");
        // 鑑權用SecretKey,在阿里雲服務器管理控制檯建立
        properties.put(PropertyKeyConst.SecretKey, "XXX");
        // 設置 TCP 接入域名(此處以公共雲公網環境接入爲例)
        properties.put(PropertyKeyConst.ONSAddr,
          "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");

        Consumer consumer = ONSFactory.createConsumer(properties);
        //這個Listener若是以前已經在spring容器中註冊過直接使用就行了,這裏就不演示了
        consumer.subscribe("TopicTestMQ", "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println("Receive: " + message);
                return Action.CommitMessage;
            }
        });
        consumer.start();
        System.out.println("Consumer Started");
    }
}

消息的發佈與訂閱就這麼多,要使用消息服務總結起來就四步。

  1. 開通服務
  2. 申請資源
  3. 發佈消息
  4. 訂閱消息
總結

消息的產品不少,阿里雲的消息服務是目前互聯網公司使用佔比很大的,本次只是很簡單介紹消息服務的使用,具體實現細節筆者也在學習中。

相關文章
相關標籤/搜索