消息隊列—ActiveMQ

1.   學習計劃

一、什麼是MQjava

二、MQ的應用場景spring

三、ActiveMQ的使用方法。數據庫

四、使用消息隊列實現商品同步。apache

 

2.   同步索引庫分析

方案一:在manager(後臺)中,添加商品的業務邏輯中,添加一個同步索引庫的業務邏輯。服務器

  缺點:這樣違背了服務單一職能的原則,業務邏輯耦合度高,業務拆分不明確。session

方案二:業務邏輯在search中實現,調用服務在manager實現。業務邏輯分開。app

  缺點:服務之間的耦合度變高,search服務依賴manager服務,服務的啓動有前後順序。dom

方案三:使用消息隊列。MQ是一個消息中間件。tcp

 

MQ是一個消息中間件,ActiveMQ、RabbitMQ、kafka(大數據)。ide

系統服務之間,非直接通訊,而是經過MQ進行轉發,這樣既解決了系統之間的通訊問題,同時也避免了系統之間的依賴和耦合。

消息隊列的主要應用:

  解決系統之間的通訊問題

  下降系統之間的耦合度

互聯網項目中,爲了用戶的體驗,必須遵照快速響應用戶的原則,好比在電商項目中,當用戶下單以後,其實還有不少的後續業務須要完成,用戶根本不可能等你的流程所有處理完才獲得下單反饋。這是後咱們能夠利用消息隊列,在既可以快速響應用戶的同時,有能後將業務消息壓入MQ中,經過MQ的流轉,通知後續業務的開展,達到數據最終一致

 

3.   ActiveMQ

3.1. 什麼是ActiveMQ

ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個徹底支持JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已是好久的事情了,可是JMS在當今的J2EE應用中間仍然扮演着特殊的地位。

主要特色:

  1. 多種語言和協議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

  2. 徹底支持JMS1.1和J2EE 1.4規範 (持久化,XA消息,事務)

  3. 對Spring的支持,ActiveMQ能夠很容易內嵌到使用Spring的系統裏面去,並且也支持Spring2.0的特性

  4. 經過了常見J2EE服務器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中經過JCA 1.5 resource adaptors的配置,可讓ActiveMQ能夠自動的部署到任何兼容J2EE 1.4 商業服務器上

  5. 支持多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA

  6. 支持經過JDBC和journal提供高速的消息持久化

  7. 從設計上保證了高性能的集羣,客戶端-服務器,點對點

  8. 支持Ajax

  9. 支持與Axis的整合

  10. 能夠很容易得調用內嵌JMS provider,進行測試

 

3.2. ActiveMQ的消息形式

對於消息的傳遞有兩種類型

一種是點對點的,即一個生產者和一個消費者一一對應;(只能有一個消費者)

另外一種是發佈/訂閱模式,即一個生產者產生消息並進行發送後,能夠由多個消費者進行接收。(廣播)

JMS定義了五種不一樣的消息正文格式,以及調用的消息類型,容許你發送並接收以一些不一樣形式的數據,提供現有消息格式的一些級別的兼容性。

  · StreamMessage -- Java原始值的數據流

  · MapMessage--一套名稱-值對

  · TextMessage--一個字符串對象

  · ObjectMessage--一個序列化的 Java對象

  · BytesMessage--一個字節的數據流

4.   ActiveMQ的安裝

進入http://activemq.apache.org/下載ActiveMQ

 

 

使用的版本是5.12.0

 

4.1. 安裝環境:

一、須要jdk

二、安裝Linux系統。生產環境都是Linux系統。

 

4.2. 安裝步驟

第一步: 把ActiveMQ 的壓縮包上傳到Linux系統。

第二步:解壓縮。

第三步:啓動。

4.3.啓動,查看,關閉

使用bin目錄下的activemq命令啓動:

[root@localhost bin]# ./activemq start

關閉:

[root@localhost bin]# ./activemq stop

查看狀態:

[root@localhost bin]# ./activemq status

 4.4 細節

注意:若是ActiveMQ整合spring使用不要使用activemq-all-5.12.0.jar包。建議使用5.11.2

 

進入管理後臺:

http://192.168.25.168:8161/admin

用戶名:admin

密碼:admin

  

   

503錯誤解決:

一、查看機器名

[root@arjenlee168 bin]# cat /etc/sysconfig/network

NETWORKING=yes

HOSTNAME=arjenlee168

 

二、修改host文件

[root@arjenlee168 bin]# cat /etc/hosts

127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4 arjenlee168

::1         localhost localhost.localdomain localhost6 localhost6.localdomain6

[root@arjenlee168 bin]#

 

三、重啓Activemq服務

5.   ActiveMQ的使用方法

 

 

 下面是java代碼依據JMS規範操做MQ。

5.1. Queue

  • Queue消息形式,服務端默認進行持久化。
  • Queue消息形式,只要被任意一個consumer消費後,服務端消除該消息,即一個消息只能被一個consumer消費。

若是消息沒有被消費,則會一直被保存在服務端,直到被消費爲止。

5.1.1.    Producer

生產者:生產消息,發送端。

把jar包添加到工程中。使用5.11.2版本的jar包。

 

 

5.1.1.1    建立步驟

第一步:建立ConnectionFactory對象,須要指定服務端ip及端口號。

第二步:使用ConnectionFactory對象建立一個Connection對象。

第三步:開啓鏈接,調用Connection對象的start方法。

第四步:使用Connection對象建立一個Session對象。

第五步:使用Session對象建立一個Destination對象(topic、queue),此處建立一個Queue對象。

第六步:使用Session對象建立一個Producer對象。

第七步:建立一個Message對象,建立一個TextMessage對象。

第八步:使用Producer對象發送消息。

第九步:關閉資源。

5.1.1.2  代碼示例

@Test
    public void testQueueProducer() throws Exception {
        // 第一步:建立ConnectionFactory對象,須要指定服務端ip及端口號。
        //brokerURL服務器的ip及端口號
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
        // 第二步:使用ConnectionFactory對象建立一個Connection對象。
        Connection connection = connectionFactory.createConnection();
        // 第三步:開啓鏈接,調用Connection對象的start方法。
        connection.start();
        // 第四步:使用Connection對象建立一個Session對象。
        //第一個參數:是否開啓事務。true:開啓事務,第二個參數忽略。
        //第二個參數:當第一個參數爲false時,纔有意義。消息的應答模式。一、自動應答二、手動應答。通常是自動應答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session對象建立一個Destination對象(topic、queue),此處建立一個Queue對象。
        //參數:隊列的名稱。
        Queue queue = session.createQueue("test-queue");
        // 第六步:使用Session對象建立一個Producer對象。
        MessageProducer producer = session.createProducer(queue);
        // 第七步:建立一個Message對象,建立一個TextMessage對象。
        /*TextMessage message = new ActiveMQTextMessage();
        message.setText("hello activeMq,this is my first test.");*/
        TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test.");
        // 第八步:使用Producer對象發送消息。
        producer.send(textMessage);
        // 第九步:關閉資源。
        producer.close();
        session.close();
        connection.close();
    }

 

5.1.2.    Consumer

消費者:接收消息。

5.1.2.1    建立步驟

第一步:建立一個ConnectionFactory對象。

第二步:從ConnectionFactory對象中得到一個Connection對象。

第三步:開啓鏈接。調用Connection對象的start方法。

第四步:使用Connection對象建立一個Session對象。

第五步:使用Session對象建立一個Destination對象。和發送端保持一致queue,而且隊列的名稱一致。

第六步:使用Session對象建立一個Consumer對象。

第七步:接收消息。

第八步:打印消息。

第九步:關閉資源

5.1.2.2    代碼示例

@Test
    public void testQueueConsumer() throws Exception {
        // 第一步:建立一個ConnectionFactory對象。
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
        // 第二步:從ConnectionFactory對象中得到一個Connection對象。
        Connection connection = connectionFactory.createConnection();
        // 第三步:開啓鏈接。調用Connection對象的start方法。
        connection.start();
        // 第四步:使用Connection對象建立一個Session對象。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session對象建立一個Destination對象。和發送端保持一致queue,而且隊列的名稱一致。
        Queue queue = session.createQueue("test-queue");
        // 第六步:使用Session對象建立一個Consumer對象。
        MessageConsumer consumer = session.createConsumer(queue);
        // 第七步:接收消息。
        consumer.setMessageListener(new MessageListener() {
            
            @Override
            public void onMessage(Message message) {
                try {
                    TextMessage textMessage = (TextMessage) message;
                    String text = null;
                    //取消息的內容
                    text = textMessage.getText();
                    // 第八步:打印消息。
                    System.out.println(text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //等待鍵盤輸入
        System.in.read();
        // 第九步:關閉資源
        consumer.close();
        session.close();
        connection.close();
    }

 

5.2. Topic

  • Topic消息形式,服務端默認不進行持久化存儲。
  • Topic消息能夠被多個consumer接收到,不會由於某一個consumer的消費行爲,而是的消息被服務端刪除。

5.2.1.    Producer

5.2.1.1.    建立步驟

第一步:建立ConnectionFactory對象,須要指定服務端ip及端口號。

第二步:使用ConnectionFactory對象建立一個Connection對象。

第三步:開啓鏈接,調用Connection對象的start方法。

第四步:使用Connection對象建立一個Session對象。

第五步:使用Session對象建立一個Destination對象(topic、queue),此處建立一個Topic對象。

第六步:使用Session對象建立一個Producer對象。

第七步:建立一個Message對象,建立一個TextMessage對象。

第八步:使用Producer對象發送消息。

第九步:關閉資源。

5.2.1.2.  示例代碼  

@Test
    public void testTopicProducer() throws Exception {
        // 第一步:建立ConnectionFactory對象,須要指定服務端ip及端口號。
        // brokerURL服務器的ip及端口號
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
        // 第二步:使用ConnectionFactory對象建立一個Connection對象。
        Connection connection = connectionFactory.createConnection();
        // 第三步:開啓鏈接,調用Connection對象的start方法。
        connection.start();
        // 第四步:使用Connection對象建立一個Session對象。
        // 第一個參數:是否開啓事務。true:開啓事務,第二個參數忽略。
        // 第二個參數:當第一個參數爲false時,纔有意義。消息的應答模式。一、自動應答二、手動應答。通常是自動應答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session對象建立一個Destination對象(topic、queue),此處建立一個topic對象。
        // 參數:話題的名稱。
        Topic topic = session.createTopic("test-topic");
        // 第六步:使用Session對象建立一個Producer對象。
        MessageProducer producer = session.createProducer(topic);
        // 第七步:建立一個Message對象,建立一個TextMessage對象。
        /*
         * TextMessage message = new ActiveMQTextMessage(); message.setText(
         * "hello activeMq,this is my first test.");
         */
        TextMessage textMessage = session.createTextMessage("hello activeMq,this is my topic test");
        // 第八步:使用Producer對象發送消息。
        producer.send(textMessage);
        // 第九步:關閉資源。
        producer.close();
        session.close();
        connection.close();
    }

 

 

5.2.2.    Consumer

消費者:接收消息。

5.2.2.1.    建立步驟

第一步:建立一個ConnectionFactory對象。

第二步:從ConnectionFactory對象中得到一個Connection對象。

第三步:開啓鏈接。調用Connection對象的start方法。

第四步:使用Connection對象建立一個Session對象。

第五步:使用Session對象建立一個Destination對象。和發送端保持一致topic,而且話題的名稱一致。

第六步:使用Session對象建立一個Consumer對象。

第七步:接收消息。須要利用MessageListener。

第八步:打印消息。

第九步:關閉資源

5.2.2.2.    示例代碼

@Test
    public void testTopicConsumer() throws Exception {
        // 第一步:建立一個ConnectionFactory對象。
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
        // 第二步:從ConnectionFactory對象中得到一個Connection對象。
        Connection connection = connectionFactory.createConnection();
        // 第三步:開啓鏈接。調用Connection對象的start方法。
        connection.start();
        // 第四步:使用Connection對象建立一個Session對象。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session對象建立一個Destination對象。和發送端保持一致topic,而且話題的名稱一致。
        Topic topic = session.createTopic("test-topic");
        // 第六步:使用Session對象建立一個Consumer對象。
        MessageConsumer consumer = session.createConsumer(topic);
        // 第七步:接收消息。
        consumer.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {
                try {
                    TextMessage textMessage = (TextMessage) message;
                    String text = null;
                    // 取消息的內容
                    text = textMessage.getText();
                    // 第八步:打印消息。
                    System.out.println(text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        System.out.println("topic的消費端03。。。。。");
        // 等待鍵盤輸入
        System.in.read();
        // 第九步:關閉資源
        consumer.close();
        session.close();
        connection.close();
    }

 

6.   Activemq整合spring

使用spring提整合ActiveMQ,能夠避免如上繁瑣的使用步驟。

6.1. 配置producer

首先是spring和ActiveMQ整合中,如何配置producer

6.1.1.   導入整合包

第一步:引用spring和ActiveMQ整合的相關jar包。

<dependency>

              <groupId>org.springframework</groupId>

              <artifactId>spring-jms</artifactId>

         </dependency>

         <dependency>

              <groupId>org.springframework</groupId>

              <artifactId>spring-context-support</artifactId>

</dependency>

 

6.1.2.   配置ConnectionFactory

第二步:配置Activemq整合spring。配置ConnectionFactory

spring對ConnectionFactory進行了更上一層的包裝(接口),真正的connectionFactory由JMS服務廠商提供,在這裏是ActiveMQConnectionFactory,須要注入到spring中的connectionFactory。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd 
    http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">


    <!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://192.168.25.168:61616" />
    </bean>
    <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>
</beans>

6.1.2.   配置生產者

第三步:配置生產者:spring中提供了一個模板,JMSTemplate用來簡化如上展現的發送消息的步驟。

使用JMSTemplate對象。發送消息,template中須要注入connectionFactory。

<!-- Spring提供的JMS工具類,它能夠進行消息發送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 這個connectionFactory對應的是咱們定義的Spring提供的那個ConnectionFactory對象 -->
        <property name="connectionFactory" ref="connectionFactory" />
    </bean>

 

6.1.3.   配置Destination

第四步:在spring容器中配置Destination。能夠是Queue,也能夠是Topic

    <!--這個是隊列目的地,點對點的 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>spring-queue</value>
        </constructor-arg>
    </bean>
    <!--這個是主題目的地,一對多的 -->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="topic" />
    </bean>

 

6.1.4.   完整的配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">

    <!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://192.168.25.168:61616" />
    </bean>
    <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>
    <!-- 配置生產者 -->
    <!-- Spring提供的JMS工具類,它能夠進行消息發送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 這個connectionFactory對應的是咱們定義的Spring提供的那個ConnectionFactory對象 -->
        <property name="connectionFactory" ref="connectionFactory" />
    </bean>
    <!--這個是隊列目的地,點對點的 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>spring-queue</value>
        </constructor-arg>
    </bean>
    <!--這個是主題目的地,一對多的 -->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="topic" />
    </bean>
</beans>

 

下面看一下spring和ActiveMQ整合中,如何配置cousumer

6.2. 配置consumer

  • 導入jar包
  • 配置connectionFactory
  • 配置destination

以上同producer

 

6.2.1.   建立MessageListener

消息的接受須要用到,MessageListener,listener一旦監聽到有消息傳來,便會執行。

public class MyMessageListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        
        try {
            TextMessage textMessage = (TextMessage) message;
            //取消息內容
            String text = textMessage.getText();
            System.out.println(text);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

 

6.2.2   配置消費者

 

與是手動建立consumer接受消息的代碼相比,在spring中,咱們只須要配置消息監聽器消息監聽器容器,而後啓動spring容器就能夠了。

<!-- 接收消息 -->
    <!-- 配置監聽器 -->
    <bean id="myMessageListener" class="com.taotao.search.listener.MyMessageListener" />
    <!-- 消息監聽容器 -->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queueDestination" />
        <property name="messageListener" ref="myMessageListener" />
    </bean>

 

6.2.3.   完整的配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">

    <!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://192.168.25.168:61616" />
    </bean>
    <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>
    <!--這個是隊列目的地,點對點的 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>spring-queue</value>
        </constructor-arg>
    </bean>
    <!--這個是主題目的地,一對多的 -->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="topic" />
    </bean>
    <!-- 接收消息 -->
    <!-- 配置監聽器 -->
    <bean id="myMessageListener" class="com.taotao.search.listener.MyMessageListener" />
    <!-- 消息監聽容器 -->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queueDestination" />
        <property name="messageListener" ref="myMessageListener" />
    </bean>
</beans>

 

 

6.3. 代碼測試

6.3.1.    發送消息

第一步:初始化一個spring容器

第二步:從容器中得到JMSTemplate對象。

第三步:從容器中得到一個Destination對象

第四步:使用JMSTemplate對象發送消息,須要知道Destination

@Test

     public void testQueueProducer() throws Exception {

         // 第一步:初始化一個spring容器

         ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");

         // 第二步:從容器中得到JMSTemplate對象。

         JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);

         // 第三步:從容器中得到一個Destination對象

         Queue queue = (Queue) applicationContext.getBean("queueDestination");

         // 第四步:使用JMSTemplate對象發送消息,須要知道Destination

         jmsTemplate.send(queue, new MessageCreator() {

             

              @Override

              public Message createMessage(Session session) throws JMSException {

                   TextMessage textMessage = session.createTextMessage("spring activemq test");

                   return textMessage;

              }

         });

     }

6.3.2.    接收消息

Taotao-search-Service中接收消息。

第一步:把Activemq相關的jar包添加到工程中

第二步:建立一個MessageListener的實現類


第三步:配置spring和Activemq整合。


第四步:測試代碼。

@Test

     public void testQueueConsumer() throws Exception {

         //初始化spring容器

         ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");

         //等待

         System.in.read();

     }

 


 

7.   添加商品同步索引庫

7.1. Producer

manager-service工程中發送消息。

當商品添加完成後發送一個TextMessage,包含一個商品id。search-service從消息隊列MQ中接受到推送過來的id值,根據id值去查詢數據庫,拿到對應的數據;利用查詢獲得的數據創建索引,添加到索引庫中。

 

 

@Override
    public TaotaoResult addItem(TbItem item, String desc) {
        // 一、生成商品id
        final long itemId = IDUtils.genItemId();
        // 二、補全TbItem對象的屬性
        item.setId(itemId);
        //商品狀態,1-正常,2-下架,3-刪除
        item.setStatus((byte) 1);
        Date date = new Date();
        item.setCreated(date);
        item.setUpdated(date);
        // 三、向商品表插入數據
        itemMapper.insert(item);
        // 四、建立一個TbItemDesc對象
        TbItemDesc itemDesc = new TbItemDesc();
        // 五、補全TbItemDesc的屬性
        itemDesc.setItemId(itemId);
        itemDesc.setItemDesc(desc);
        itemDesc.setCreated(date);
        itemDesc.setUpdated(date);
        // 六、向商品描述表插入數據
        itemDescMapper.insert(itemDesc);
        //發送一個商品添加消息
        jmsTemplate.send(topicDestination, new MessageCreator() {
            
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage textMessage = session.createTextMessage(itemId + "");
                return textMessage;
            }
        });
        // 七、TaotaoResult.ok()
        return TaotaoResult.ok();
    }

 

7.2. Consumer

7.2.1.    功能分析

一、接收消息。須要建立MessageListener接口的實現類。

二、取消息,取商品id。

三、根據商品id查詢數據庫。

四、建立一SolrInputDocument對象。

五、使用SolrServer對象寫入索引庫。

六、返回成功,返回TaotaoResult。

 

7.2.2.    Dao層

根據商品id查詢商品信息。

 

 

映射文件:

<select id="getItemById" parameterType="long" resultType="com.taotao.common.pojo.SearchItem">
        SELECT
            a.id,
            a.title,
            a.sell_point,
            a.price,
            a.image,
            b. NAME category_name,
            c.item_desc
        FROM
            tb_item a
        JOIN tb_item_cat b ON a.cid = b.id
        JOIN tb_item_desc c ON a.id = c.item_id
        WHERE a.status = 1
          AND a.id=#{itemId}
    </select>

7.2.3.    Service層

參數:商品ID

業務邏輯:

一、根據商品id查詢商品信息。

二、建立一SolrInputDocument對象。

三、使用SolrServer對象寫入索引庫。

四、返回成功,返回TaotaoResult。

返回值:TaotaoResult

public TaotaoResult addDocument(long itemId) throws Exception {
        // 一、根據商品id查詢商品信息。
        SearchItem searchItem = searchItemMapper.getItemById(itemId);
        // 二、建立一SolrInputDocument對象。
        SolrInputDocument document = new SolrInputDocument();
        // 三、使用SolrServer對象寫入索引庫。
        document.addField("id", searchItem.getId());
        document.addField("item_title", searchItem.getTitle());
        document.addField("item_sell_point", searchItem.getSell_point());
        document.addField("item_price", searchItem.getPrice());
        document.addField("item_image", searchItem.getImage());
        document.addField("item_category_name", searchItem.getCategory_name());
        document.addField("item_desc", searchItem.getItem_desc());
        // 五、向索引庫中添加文檔。
        solrServer.add(document);
        solrServer.commit();
        // 四、返回成功,返回TaotaoResult。
        return TaotaoResult.ok();
    }

7.2.4.    Listener

public class ItemChangeListener implements MessageListener {
    
    @Autowired
    private SearchItemServiceImpl searchItemServiceImpl;

    @Override
    public void onMessage(Message message) {
        try {
            TextMessage textMessage = null;
            Long itemId = null; 
            //取商品id
            if (message instanceof TextMessage) {
                textMessage = (TextMessage) message;
                itemId = Long.parseLong(textMessage.getText());
            }
            //向索引庫添加文檔
            searchItemServiceImpl.addDocument(itemId);
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

7.2.5.    Spring配置監聽

 

相關文章
相關標籤/搜索