消息隊列概述及ActiveMQ使用

消息中間件定義

通常認爲,消息中間件屬於分佈式系統中一個子系統,關注於數據的發送和接收,利用高效可靠的異步消息傳遞機制對分佈式系統中的其他各個子系統進行集成。html

爲何要用消息中間件

隨着系統的發展,各個模塊愈來愈龐大、業務邏輯愈來愈複雜,必然要作服務化和業務拆分的,這個時候各個系統之間的交互,RPC是首選。可是隨着系統的繼續發展,一些功能涉及幾十個服務的調用,這時候須要消息中間件來解決問題。java

消息中間件主要解決分佈式系統之間消息的傳遞,同時爲分佈式系統中其餘子系統提供了伸縮性和擴展性。爲系統帶來了:
1.低耦合,不論是程序仍是模塊之間,使用消息中間件進行間接通訊。
2.異步通訊能力,使得子系統之間得以充分執行本身的邏輯而無需等待。
3.高併發能力,將高峯期大量的請求存儲下來慢慢交給後臺進行處理,好比適用於秒殺業務。spring

和RPC區別

RPC和消息中間件的場景的差別很大程度上在於就是「依賴性」和「同步性」:
RPC是強依賴,典型的同步方式,像本地調用。消息中間件方式屬於異步方式。消息隊列是系統級、模塊級的通訊。RPC是對象級、函數級通訊。apache

業務上的必須環節通常用RPC,對於一些不影響流程的不是強依賴的能夠考慮消息隊列,如發送短信,統計數據,解耦應用。編程

消息隊列應用場景

1.異步處理;2.應用解耦;3.限流;4.日誌處理;5消息通信windows


經常使用的消息中間件比較

clipboard.png


JMS規範(ActiveMQ基於/實現JMS規範)

JMS規範包含如下6個要素
1.鏈接工廠;2.JMS鏈接;3.JMS會話;4.JMS目的(Broker);5.JMS生產者;6.JMS消費者數組


JMS規範的消息
JMS 消息由如下三部分組成:session

  • 消息頭。每一個消息頭字段都有相應的getter 和setter 方法。
  • 消息屬性。若是須要除消息頭字段之外的值,那麼可使用消息屬性。
  • 消息體。JMS 定義的消息類型有TextMessage、MapMessage、BytesMessage、StreamMessage(BIO) 和 ObjectMessage。ActiveMQ也有對應的實現。

注意1:生產者及消費者的消息類型必須一致才能接受到消息。
注意2:通常來講不用對象消息類型,傳輸對象消息,對象得序列化(實現Serializable接口),JDK自己的序列化效率低,產生的字節碼數組大。可把對象序列化成JSON串。
注意3:社區查閱:通過性能測試消息建議不超過1K(1024字節),大消息如大於1M的選擇kafka性能好併發

JMS消息模型異步

1.Point-to-Point點對點:
生產者發佈消息到隊列queue上,若沒有對應的消費者則消息保留;若queue上有多個消費者的時候,消息只會被一個消費者消費。

clipboard.png

2.Topic/主題(發佈與訂閱)(廣播):
生產者發佈消息到主題,主題會向全部消費者(訂閱者)發送消息;若沒有消費者在線,則消息丟失也就相似廣播,沒了就沒了。

clipboard.png


ActiveMQ安裝

官網http://activemq.apache.org/ac...:8161/admin爲後臺管理平臺可查詢隊列狀況及消息條數等等。

查看activemq.xml可查看ActiveMQ應用的缺省端口爲61616,8161爲管理平臺端口。

ActiveMQ的使用

一:原生API編程(最靈活,重要)

消費者:看代碼很明顯是基於JMS規範的要素來編程的,要通訊必需要創建鏈接。

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsConsumer {
    /*默認鏈接用戶名*/
    private static final String USERNAME
            = ActiveMQConnection.DEFAULT_USER;
    /* 默認鏈接密碼*/
    private static final String PASSWORD
            = ActiveMQConnection.DEFAULT_PASSWORD;
    /* 默認鏈接地址*/
    private static final String BROKEURL
            = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) {
        /* 鏈接工廠*/
        ConnectionFactory connectionFactory;
        /* 鏈接*/
        Connection connection = null;
        /* 會話*/
        Session session;
        /* 消息的目的地*/
        Destination destination;
        /* 消息的消費者*/
        MessageConsumer messageConsumer;

        /* 實例化鏈接工廠*/
        connectionFactory
                = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);

        try {
            /* 經過鏈接工廠獲取鏈接*/
            connection = connectionFactory.createConnection();
            /* 啓動鏈接*/
            connection.start();
            /* 建立session*/
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            /* 建立一個名爲HelloWorldQueue消息隊列*/
            //destination = session.createTopic("HelloWorldTopic");
            destination = session.createQueue("HelloWorldQueue");
            /* 建立消息消費者*/
            messageConsumer = session.createConsumer(destination);
            Message message;
            while((message = messageConsumer.receive())!=null){
                System.out.println("收到消息"+((TextMessage)message).getText());
            }

        } catch (JMSException e) {
            e.printStackTrace();
        }finally {
            if(connection!=null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }

    }
}

生產者:一樣的生產者也是基於JMS規範要素。

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsProducer {

    /*默認鏈接用戶名*/
    private static final String USERNAME
            = ActiveMQConnection.DEFAULT_USER;
    /* 默認鏈接密碼*/
    private static final String PASSWORD
            = ActiveMQConnection.DEFAULT_PASSWORD;
    /* 默認鏈接地址*/
    private static final String BROKEURL
            = ActiveMQConnection.DEFAULT_BROKER_URL;
    private static final int SENDNUM = 5;

    public static void main(String[] args) {
        /* 鏈接工廠*/
        ConnectionFactory connectionFactory;
        /* 鏈接*/
        Connection connection = null;
        /* 會話*/
        Session session;
        /* 消息的目的地*/
        Destination destination;
        /* 消息的生產者*/
        MessageProducer messageProducer;

        /* 實例化鏈接工廠*/
        connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,
                BROKEURL);
        try {
            /* 經過鏈接工廠獲取鏈接*/
            connection = connectionFactory.createConnection();
            /* 啓動鏈接*/
            connection.start();
            /* 建立session
            * 第一個參數表示是否使用事務,第二次參數表示是否自動確認*/
            session = connection.createSession(false,
                    Session.AUTO_ACKNOWLEDGE);
            /* 建立一個名爲HelloWorldQueue消息隊列*/
            //destination = session.createTopic("HelloWorldTopic");
            destination = session.createQueue("HelloWorldQueue");
            /* 建立消息生產者*/
            messageProducer = session.createProducer(destination);
            /* 循環發送消息*/
            for(int i=0;i<SENDNUM;i++){
                String msg = "發送消息"+i+" "+System.currentTimeMillis();
                TextMessage textMessage = session.createTextMessage(msg);
                System.out.println("標準用法:"+msg);
                messageProducer.send(textMessage);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            if(connection!=null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

        }
    }
}

先啓動ActiveMQ再執行代碼demo就能夠了解對應的特性,如上代碼爲點對點模型,無論消費者在生產者先後啓動的都能接受到消息,畢竟生產者發佈的消息無對應的消費者消費時,隊列會保存消息。
一樣的也能夠測試下主題模式,如上放開註釋便可。


二:Spring整合

生產者配置:

<!-- ActiveMQ 鏈接工廠 -->
<amq:connectionFactory id="amqConnectionFactory"
         brokerURL="tcp://127.0.0.1:61616" userName="" password="" />

<!-- Spring Caching鏈接工廠 -->
<!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
      class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
    <property name="sessionCacheSize" value="100"></property>
</bean>


<!-- Spring JmsTemplate 的消息生產者 start-->
<!-- 定義JmsTemplate的Queue類型 -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
    <constructor-arg ref="connectionFactory"></constructor-arg>
    <!-- 隊列模式-->
    <property name="pubSubDomain" value="false"></property>
</bean>

<!-- 定義JmsTemplate的Topic類型 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
    <constructor-arg ref="connectionFactory"></constructor-arg>
    <!-- 發佈訂閱模式-->
    <property name="pubSubDomain" value="true"></property>
</bean>

<!--Spring JmsTemplate 的消息生產者 end-->

Queue生產者:直接注入隊列模式的bean便可使用

@Component
public class QueueSender {

    @Autowired
    @Qualifier("jmsQueueTemplate")
    private JmsTemplate jmsTemplate;

    public void send(String queueName,final String message){
        jmsTemplate.send(queueName, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                Message msg = session.createTextMessage(message);
                //TODO  應答
                return msg;
            }
        }); 
        
    }
      
}

Topic生產者:

@Component
public class TopicSender {

    @Autowired
    @Qualifier("jmsTopicTemplate")
    private JmsTemplate jmsTemplate;

    public void send(String queueName,final String message){
        jmsTemplate.send(queueName, new MessageCreator() {

            public Message createMessage(Session session) throws JMSException {
                TextMessage textMessage = session.createTextMessage(message);
                return textMessage;
            }
        });
    }
}

消費者配置:

<!-- ActiveMQ 鏈接工廠 -->
<amq:connectionFactory id="amqConnectionFactory"
         brokerURL="tcp://127.0.0.1:61616" userName="" password="" />

<!-- Spring Caching鏈接工廠 -->
<!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
      class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
    <property name="sessionCacheSize" value="100"></property>
</bean>


<!-- 消息消費者 start-->

<!-- 定義Topic監聽器 -->
<jms:listener-container destination-type="topic" container-type="default"
                        connection-factory="connectionFactory" acknowledge="auto">
    <jms:listener destination="test.topic" ref="topicReceiver1"></jms:listener>
    <jms:listener destination="test.topic" ref="topicReceiver2"></jms:listener>
</jms:listener-container>

<!-- 定義Queue監聽器 -->
<jms:listener-container destination-type="queue" container-type="default"
                        connection-factory="connectionFactory" acknowledge="auto">
    <jms:listener destination="test.queue" ref="queueReceiver1"></jms:listener>
    <jms:listener destination="test.queue" ref="queueReceiver2"></jms:listener>
</jms:listener-container>
<!-- 消息消費者 end -->

隊列消費者:

@Component
public class QueueReceiver1 implements MessageListener {

    public void onMessage(Message message) {
        try {
            String textMsg = ((TextMessage)message).getText();
            System.out.println("QueueReceiver1 accept msg : "+textMsg);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

主題消費者:

@Component
public class TopicReceiver1 implements MessageListener {
    public void onMessage(Message message) {
        try {
            System.out.println(((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

SpringBoot整合可在官網查詢對應配置

相關文章
相關標籤/搜索