ActiveMQ

1、什麼是消息中間件

  消息中間件利用高效可靠的消息傳遞機制進行平臺無關的數據交流,並基於數據通訊來進行分佈式系統的集成。經過提供消息傳遞和消息排隊模型,它能夠在分佈式環境下擴展進程間的通訊。對於消息中間件,常見的角色大體也就有Producer(生產者)、Consumer(消費者)java

  常見的消息中間件產品:spring

1ActiveMQ數據庫

  ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個徹底支持JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已是好久的事情了,可是JMS在當今的J2EE應用中間仍然扮演着特殊的地位。apache

  主要特色:服務器

  1. 多種語言和協議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQPsession

  2. 徹底支持JMS1.1和J2EE 1.4規範 (持久化,XA消息,事務)app

  3. 對Spring的支持,ActiveMQ能夠很容易內嵌到使用Spring的系統裏面去,並且也支持Spring2.0的特性tcp

  4. 經過了常見J2EE服務器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中經過JCA 1.5 resource adaptors的配置,可讓ActiveMQ能夠自動的部署到任何兼容J2EE 1.4 商業服務器上分佈式

  5. 支持多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTAide

  6. 支持經過JDBC和journal提供高速的消息持久化

  7. 從設計上保證了高性能的集羣,客戶端-服務器,點對點

  8. 支持Ajax

  9. 支持與Axis的整合

  10. 能夠很容易得調用內嵌JMS provider,進行測試

(2)RabbitMQ

  AMQP協議的領導實現,支持多種場景。淘寶的MySQL集羣內部有使用它進行通信,OpenStack開源雲平臺的通訊組件,最早在金融行業獲得運用。

(3)ZeroMQ

  史上最快的消息隊列系統

(4)Kafka

  Apache下的一個子項目 。特色:高吞吐,在一臺普通的服務器上既能夠達到10W/s的吞吐速率;徹底的分佈式系統。適合處理海量數據。

2、JMS簡介

2.1 什麼是JMS

  JMS(JavaMessaging Service)是Java平臺上有關面向消息中間件的技術規範,它便於消息系統中的Java應用程序進行消息交換,而且經過提供標準的產生、發送、接收消息的接口簡化企業應用的開發。

       JMS自己只定義了一系列的接口規範,是一種與廠商無關的 API,用來訪問消息收發系統。它相似於 JDBC(javaDatabase Connectivity):這裏,JDBC 是能夠用來訪問許多不一樣關係數據庫的 API,而 JMS 則提供一樣與廠商無關的訪問方法,以訪問消息收發服務。許多廠商目前都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,這只是幾個例子。 JMS 使您可以經過消息收發服務(有時稱爲消息中介程序或路由器)從一個 JMS 客戶機向另外一個 JML 客戶機發送消息。消息是 JMS 中的一種類型對象,由兩部分組成:報頭和消息主體。報頭由路由信息以及有關該消息的元數據組成。消息主體則攜帶着應用程序的數據或有效負載。

  JMS 定義了五種不一樣的消息正文格式,以及調用的消息類型,容許你發送並接收以一些不一樣形式的數據,提供現有消息格式的一些級別的兼容性。

  • TextMessage--一個字符串對象
  • MapMessage--一套名稱-值對
  • ObjectMessage--一個序列化的 Java 對象
  • BytesMessage--一個字節的數據流
  • StreamMessage -- Java 原始值的數據流

2.2 JMS的消息形式

  對於消息的傳遞有兩種類型:

  一種是點對點的,即一個生產者和一個消費者一一對應;

  

  另外一種是發佈/訂閱模式,即一個生產者產生消息並進行發送後,能夠由多個消費者進行接收。

  

 

3、ActiveMQ的安裝

  官方網址:http://activemq.apache.org/

3.1 安裝環境

  • 須要JDK
  • 安裝Linux系統。生產環境都是Linux系統。

3.2 安裝步驟

  第一步: 把ActiveMQ 的壓縮包上傳到Linux系統。

  第二步:解壓縮。

  第三步:啓動。

  

  第四步:進入後臺管理: http://192.168.25.130:8161/admin

      用戶名:admin

      密碼:admin

     

4、ActiveMQ的入門案例

4.1 Queue(點對點模式)

  點對點的模式主要創建在一個隊列上面,當鏈接一個列隊的時候,發送端不須要知道接收端是否正在接收,能夠直接向ActiveMQ發送消息,發送的消息,將會先進入隊列中,若是有接收端在監聽,則會發向接收端,若是沒有接收端接收,則會保存在activemq服務器,直到接收端接收消息,點對點的消息模式能夠有多個發送端,多個接收端,可是一條消息,只會被一個接收端給接收到,哪一個接收端先連上ActiveMQ,則會先接收到,然後來的接收端則接收不到那條消息。

【Producer】——生產者:生產消息,發送端

  第一步:把jar包添加到工程中。使用5.11.2版本的jar包。

    <!-- ActiveMQ客戶端依賴的jar包 -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
        </dependency>

  第二步:建立ConnectionFactory對象,須要指定服務端ip及端口號。

  第三步:使用ConnectionFactory對象建立一個Connection對象。

  第四步:開啓鏈接,調用Connection對象的start方法。

  第五步:使用Connection對象建立一個Session對象。

  第六步:使用Session對象建立一個Destination對象(topic、queue),此處建立一個Queue對象。

  第七步:使用Session對象建立一個Producer對象。

  第八步:建立一個Message對象,建立一個TextMessage對象。

  第九步:使用Producer對象發送消息。

  第十步:關閉資源。

  @Test
    public void testQueueProducer() throws Exception {
        // 第一步:建立ConnectionFactory對象,須要指定服務端ip及端口號。
        // brokerURL:服務器的ip及端口號
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.130:61616");
        // 第二步:使用ConnectionFactory對象建立一個Connection對象。
        Connection connection = connectionFactory.createConnection();
        // 第三步:開啓鏈接,調用Connection對象的start方法。
        connection.start();
        // 第四步:使用Connection對象建立一個Session對象。
        // 第一個參數:是否開啓事務。true:開啓事務,第二個參數忽略。
        // 第二個參數:當第一個參數爲false時,纔有意義。消息的應答模式。一、自動應答二、手動應答。通常是自動應答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session對象建立一個Destination對象(topic、queue),此處建立一個Queue對象。
        Queue queue = session.createQueue("test-queue");
        // 第六步:使用Session對象建立一個Producer對象。
        MessageProducer producer = session.createProducer(queue);
        // 第七步:建立一個Message對象,建立一個TextMessage對象。
        /*ActiveMQTextMessage message = new ActiveMQTextMessage();
        message.setText("hello activeMq,this is my first test.");*/
        TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test.");
        // 第八步:使用Producer對象發送消息。
        producer.send(textMessage);
        // 第九步:關閉資源。
        producer.close();
        session.close();
        connection.close();
    }

   訪問ActiveMQ首頁,進入Queues標籤後,便可看到發送的消息。(若是點擊Queues或Topics時出現HTTP ERROR: 503的錯誤,處理方式爲:ActiveMQ頁面出現HTTP ERROR: 503錯誤處理方式

  

  列表各列信息含義以下:

  Number Of Pending Messages  等待消費的消息。這個是當前未出隊列的數量。

  Number Of Consumers  消費者。這個是消費者端的消費者數量

  Messages Enqueued  進入隊列的消息 。進入隊列的總數量,包括出隊列的。

  Messages Dequeued  出了隊列的消息 。能夠理解爲是消費這消費掉的數量。

  其中:Messages Enqueued -Messages Dequeued =Number Of Pending Messages

【Consumer】——消費者:接收消息

  第一步:建立一個ConnectionFactory對象。

  第二步:從ConnectionFactory對象中得到一個Connection對象。

  第三步:開啓鏈接。調用Connection對象的start方法。

  第四步:使用Connection對象建立一個Session對象。

  第五步:使用Session對象建立一個Destination對象。和發送端保持一致queue,而且隊列的名稱一致。

  第六步:使用Session對象建立一個Consumer對象。

  第七步:接收消息。

  第八步:打印消息。

  第九步:關閉資源

    //1.建立鏈接工廠
    ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.130:61616");
    //2.獲取鏈接
    Connection connection = connectionFactory.createConnection();
    //3.啓動鏈接
    connection.start();
    //4.獲取session  (參數1:是否啓動事務,參數2:消息確認模式)
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    //5.建立隊列對象
    Queue queue = session.createQueue("test-queue");
    //6.建立消息消費
    MessageConsumer consumer = session.createConsumer(queue);
    
    //7.監聽消息
    consumer.setMessageListener(new MessageListener() {
        public void onMessage(Message message) {
            TextMessage textMessage=(TextMessage)message;
            try {
                System.out.println("接收到消息:"+textMessage.getText());
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    });    
    //8.等待鍵盤輸入
    System.in.read();    
    //9.關閉資源
    consumer.close();
    session.close();
    connection.close();    

  接收消息後,ActiveMQ首頁的Queues以下

  

4.2 Topic(發佈/訂閱模式)

【Producer】

  第一步:建立ConnectionFactory對象,須要指定服務端ip及端口號。

  第二步:使用ConnectionFactory對象建立一個Connection對象。

  第三步:開啓鏈接,調用Connection對象的start方法。

  第四步:使用Connection對象建立一個Session對象。

  第五步:使用Session對象建立一個Destination對象(topic、queue),此處建立一個Topic對象。

  第六步:使用Session對象建立一個Producer對象。

  第七步:建立一個Message對象,建立一個TextMessage對象。

  第八步:使用Producer對象發送消息。

  第九步:關閉資源。

@Test
    public void testTopicProducer() throws Exception {
        // 第一步:建立ConnectionFactory對象,須要指定服務端ip及端口號。
        // brokerURL:服務器的ip及端口號
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.130:61616");
        // 第二步:使用ConnectionFactory對象建立一個Connection對象。
        Connection connection = connectionFactory.createConnection();
        // 第三步:開啓鏈接,調用Connection對象的start方法。
        connection.start();
        // 第四步:使用Connection對象建立一個Session對象。
        // 第一個參數:是否開啓事務。true:開啓事務,第二個參數忽略。
        // 第二個參數:當第一個參數爲false時,纔有意義。消息的應答模式。一、自動應答二、手動應答。通常是自動應答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session對象建立一個Destination對象(topic、queue),此處建立一個topic對象。
        // 參數:話題的名稱。
        Topic topic = session.createTopic("test-topic");
        // 第六步:使用Session對象建立一個Producer對象。
        MessageProducer producer = session.createProducer(topic);
        // 第七步:建立一個Message對象,建立一個TextMessage對象。
        TextMessage textMessage = session.createTextMessage("hello activeMq,this is my topic test");
        // 第八步:使用Producer對象發送消息。
        producer.send(textMessage);
        // 第九步:關閉資源。
        producer.close();
        session.close();
        connection.close();
    }

   發送完畢後,查看ActiveMQ後臺的Topics

  

【Consumer】——消費者:接收消息

  第一步:建立一個ConnectionFactory對象。

  第二步:從ConnectionFactory對象中得到一個Connection對象。

  第三步:開啓鏈接。調用Connection對象的start方法。

  第四步:使用Connection對象建立一個Session對象。

  第五步:使用Session對象建立一個Destination對象。和發送端保持一致topic,而且話題的名稱一致。

  第六步:使用Session對象建立一個Consumer對象。

  第七步:接收消息。

  第八步:打印消息。

  第九步:關閉資源

@Test
    public void testTopicConsumer() throws Exception {
        // 第一步:建立一個ConnectionFactory對象。
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.130:61616");
        // 第二步:從ConnectionFactory對象中得到一個Connection對象。
        Connection connection = connectionFactory.createConnection();
        // 第三步:開啓鏈接。調用Connection對象的start方法。
        connection.start();
        // 第四步:使用Connection對象建立一個Session對象。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session對象建立一個Destination對象。和發送端保持一致topic,而且話題的名稱一致。
        Topic topic = session.createTopic("test-topic");
        // 第六步:使用Session對象建立一個Consumer對象。
        MessageConsumer consumer = session.createConsumer(topic);
        // 第七步:接收消息。
        consumer.setMessageListener(new MessageListener() {
            
            @Override
            public void onMessage(Message message) {
                try {
                    TextMessage textMessage = (TextMessage) message;
                    String text = null;
                    // 取消息的內容
                    text = textMessage.getText();
                    // 第八步:打印消息。
                    System.out.println(text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        System.out.println("topic的消費端01。。。");
        // 等待接收消息
        System.in.read();
        // 第九步:關閉資源
        consumer.close();
        session.close();
        connection.close();
    }

4.3 Queue與Topic的區別

  Topic和queue的最大區別在於topic是以廣播的形式,通知全部在線監聽的客戶端有新的消息,沒有監聽的客戶端將收不到消息;而queue則是以點對點的形式通知多個處於監聽狀態的客戶端中的一個。

 5、ActiveMQ整合Spring

5.1 Producer

  在e3-manager-service中配置

  第一步:引用相關的jar包。

     <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
        </dependency>

   第二步:配置Activemq整合spring。配置ConnectionFactory

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd 
    http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">


    <!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://192.168.25.130:61616" />
    </bean>
    <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>
</beans>

  第三步:配置生產者。使用JMSTemplate對象。發送消息。

    <!-- 配置生產者 -->
    <!-- Spring提供的JMS工具類,它能夠進行消息發送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 這個connectionFactory對應的是咱們定義的Spring提供的那個ConnectionFactory對象 -->
        <property name="connectionFactory" ref="connectionFactory" />
    </bean>

  第四步:在spring容器中配置Destination。

<!--這個是隊列目的地,點對點的 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>spring-queue</value>
        </constructor-arg>
    </bean>
    <!--這個是主題目的地,一對多的 -->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="topic" />
    </bean>

  第五步:代碼測試(發送消息)

@Test
    public void testSpringActiveMq() throws Exception {
        // 第一步:初始化一個spring容器
        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
        // 第二步:從容器中得到JMSTemplate對象。
        JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
        // 第三步:從容器中得到一個Destination對象
        Destination destination = (Destination) applicationContext.getBean("queueDestination");
        // 第四步:使用JMSTemplate對象發送消息,須要知道Destination
        jmsTemplate.send(destination, new MessageCreator() {
            
            @Override
            public Message createMessage(Session session) throws JMSException {
                //建立一個消息對象並返回
                TextMessage textMessage = session.createTextMessage("spring activemq queue message");
                return textMessage;
            }
        });
    } 

5.2 Consumer

  在e3-search-service中配置

  第一步:把Activemq相關的jar包添加到工程中

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
        </dependency>

  第二步:建立一個MessageListener的實現類。

public class MyMessageListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        
        try {
            TextMessage textMessage = (TextMessage) message;
            //取消息內容
            String text = textMessage.getText();
            System.out.println(text);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

  第三步:配置spring和Activemq整合。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">

    <!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://192.168.25.168:61616" />
    </bean>
    <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>
    <!--這個是隊列目的地,點對點的 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>spring-queue</value>
        </constructor-arg>
    </bean>
    <!--這個是主題目的地,一對多的 -->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="topic" />
    </bean>
    <!-- 接收消息 -->
    <!-- 配置監聽器 -->
    <bean id="myMessageListener" class="cn.e3mall.search.listener.MyMessageListener" />
    <!-- 消息監聽容器 -->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queueDestination" />
        <property name="messageListener" ref="myMessageListener" />
    </bean>
</beans>

  第四步:測試代碼。

  @Test
    public void testQueueConsumer() throws Exception {
        //初始化spring容器
        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
        //等待
        System.in.read();
    }
相關文章
相關標籤/搜索