jms與ActiveMQ實踐與應用

前言 html

這是我本身從不知道JMS爲什麼物到學習如何使用第三方工具實現跨服務器的知識總結,在整個過程當中可能考慮不全。另外,若是想盡快使用JMS,建議直接看實例那一節就能夠了。有問題多交流。 java

詞語解釋 web

(有些詞可能用的不是很正確,在這裏我把本身能意識到的詞拿出來解釋一下): spring

一、  跨服務器:專業術語好像叫「跨實例」。意思是,能夠在多個服務器(能夠是不一樣的服務器,如resintomcat)之間相互通訊。與之對應的是單服務器版。 數據庫

二、  消息生產者:就是專門製造消息的類。 apache

三、  消息消費者:也叫消息接收者,它主要是實現了消息監聽的一個接口,固然,也能夠難過Spring提供的一個轉換器接口指定任意一個類中的任意方法。 tomcat

服務器

咱們都知道,任何一個系統從總體上來看,其實質就是由無數個小的服務或事件(咱們能夠稱之爲事務單元)有機地組合起來的。對於系統中任何一個比較複雜的功能,都是經過調用各個獨立的事務單元以實現統一的協調運做而實現的。 session

如今咱們的問題是,若是有兩個徹底獨立的服務(好比說兩個不一樣系統間的服務)須要相互交換數據,咱們該如何實現? 併發

好吧,我認可,我很傻很天真,我想到的第一個方法就是在須要的系統中將代碼再寫一遍,但我也知道,這絕對不現實!好吧,那我就應該好好學習學習達人們是如何去解決這樣的問題。

第一種方法,估計也是用的最多的,就是rpc模式。這種方法就是在本身的代碼中遠程調用其它程序中的代碼以達到交換數據的目的。可是這種方法很顯然地存在了一個問題:就是必定要等到獲取了數據以後才能繼續下面的操做。固然,若是一些邏輯是須要這些數據才能操做,那這就是咱們須要的。

第二種方法就是Hessian,我我的以爲Hessian的實如今本質上與rpc模式的同樣,只是它採用了配置,簡化了代碼。

上面這兩個方法,基本上能解決全部的遠程調用的問題了。可是美中不足的是,若是我在A系統中有一個操做是須要讓B系統作一個響應的,但我又不須要等它響應完才作下面的操做,這該怎麼辦?因而新的解決方案就須要被提出來,而SUN公司的設計師們也考慮到了,在JAVA中這就被體現爲JMSjava message service)。

1、認識JMS

JMS模塊的功能只提供了接口,並無給予實現,實現JMS接口的消息中間件叫JMS Provider,這樣的消息中間件能夠從Java裏經過JMS接口進行調用。

JMS消息由兩部分構成:headerbodyheader包含消息的識別信息和路由信息,body包含消息的實際數據。

JMS的通用接口集合以異步方式發送或接收消息。另外, JMS採用一種寬鬆結合方式整合企業系統的方法,其主要的目的就是建立可以使用跨平臺數據信息的、可移植的企業級應用程序,而把開發人力解放出來。

Java消息服務支持兩種消息模型:Point-to-Point消息(P2P)和發佈訂閱消息(Publish Subscribe messaging,簡稱Pub/Sub,也就是廣播模式)。

根據數據格式,JMS消息可分爲如下五種:

BytesMessage   消息是字節流。

MapMessage   消息是一系列的命名和值的對應組合。

ObjectMessage   消息是一個流化(即繼承Serializable)Java對象。

StreamMessage   消息是Java中的輸入輸出流。

TextMessage   消息是一個字符串,這種類型將會普遍用於XML格式的數據。

2、使用JMS

在使用JMS時,其步驟很像使用JDBC同樣,須要的步驟爲:

1、創建消息鏈接(也就是創建鏈接工廠);

2、設定消息目的地(其實與步驟1中用的類是同樣的,只是它是用來指定目的地,而步驟1中是用來指定消息服務器地址的)

3、建立jmsTemplate實例(爲下一步構建消息sessin做準備);

4、建立消息生產者(其中就用到了23兩步的產物),它就是一個普通的類,通常是經過send方法發送消息,也能夠經過MessageListenerAdapter指定發送信息的方法;

5、建立MDP(也就是消息接收者,它是一個必須實現MessageListener接口的類);

6、爲每一個MDP創建一個監聽容器,當有相應的消息傳來,則它會自動調用對應的MDP消費消息。

整個過程就像編寫JDBC同樣,代碼維護量很大。爲此,讓Spring對其進行管理是個不錯的選擇。

3、Spring整合JMS

Spring框架提供了一個模板機制來隱藏Java APIs的細節。開發人員可使用JDBCTemplateJNDITemplate類來分別訪問後臺數據庫和JEE資源(數據源,鏈接池)。JMS也不例外,Spring提供JMSTemplate類,所以開發人員不用爲一個JMS實現去編寫樣本代碼。接下來是在開發JMS應用程序時Spring所具備一些的優點。

1. 提供JMS抽象API,簡化了訪問目標(隊列或主題)和向指定目標發佈消息時JMS的使用。

2. 開發人員不須要關心JMS不一樣版本(例如JMS 1.0.2JMS 1.1)之間的差別。

3. 開發人員沒必要專門處理JMS異常,由於Spring爲全部JMS異常提供了一個未經檢查的異常,並在JMS代碼中從新拋出

具體的詳細步驟與方法參考 spring-reference2.5.pdf 中的第十九章。

下面,我就將我在整個學習過程當中實踐過的例子一一列舉出來,並將在其中遇到的問題和心得給出必定的說明,但願對你們能有所幫助。

4、實例

(一)、配置單服務器版消息機制

1、首先,咱們須要配置resin下的resin.conf文件,在其中(<server></server>之間)加上:

<!-- The ConnectionFactory resource defines the JMS factory for creating JMS connections -->

<resource jndi-name="jms/factory"

    type="com.caucho.jms.ConnectionFactoryImpl">

</resource>

<!-- Queue configuration with Resin's database  -->

<resource jndi-name="jms/queue"

    type="com.caucho.jms.memory. MemoryQueue">

<init>

    <queue-name>OssQueue</queue-name>

    </init>

</resource>

<!-- Queue configuration with Resin's database  -->

<resource jndi-name="jms/topic"

    type="com.caucho.jms.memory. MemoryTopic">

<init>

    <queue-name>ossTopic</queue-name>

    </init>

</resource>

注:i、我如今只知道JNDI方式配置消息的鏈接工廠,我並不知道有沒有其它的方式,但我看了許多資料上也沒提到其它配置方式。

ii、網上不多有關於在resin中配置JMS消息工廠的資料,只有在resin的官網上才能見到。

iii、上面JNDI配置的地方須要注意的是,你們若是在網上看資料的話,可能會發現網上會比我給出的老是會多一些,也就是老是多一些<data-source>的初始化配置,如:

<resource jndi-name="jms/factory"

    type="com.caucho.jms.ConnectionFactoryImpl">

  <init>

    <data-source>jdbc/database</data-source>

  </init>

</resource>

就這樣的配置,單獨啓動resin是沒有問題的,可是若是將其按照下面的Spring配置加到系統中,就會出異常(具體的異常名稱我忘了,中文的大概意思是:數據庫對象不能轉換成JMS鏈接對象,還有一種狀況是啓動系統時會內存溢出)。我認爲這種配置多是數據庫消息模式的配置(由於JMS有內存和數據庫兩種管理方式,我目前只學習了內存管理的方式,至於數據庫管理方式你們要是有興趣能夠參考:

http://www.oracle.com/technology/books/pdfs/2352_Ch06_FINAL.pdf

2、在web.xml文件中配置一個spring用的上下文:

<context-param>

    <param-name>contextConfigLocation</param-name>

    <param-value>/WEB-INF/jmsconfig.xml</param-value>

</context-param>

<!-- 配置Spring容器 -->

<listener>

    <listener-class>

    org.springframework.web.context.ContextLoaderListener

</listener-class>

</listener>

注:我是將jmsconfig.xml加載到service.xml中隨系統啓動的。

3、建立jmsconfig.xml用來裝配jms,內容以下:

<?xml version="1.0" encoding="UTF-8"?>  

<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN"

    "http://www.springframework.org/dtd/spring-beans.dtd">  

 

<beans>

    <bean id="jmsConnectionFactory"

 class="org.springframework.jndi.JndiObjectFactoryBean">

 <property name="jndiName">

     <value>java:comp/env/jms/factory </value>

 </property>

    </bean>

   

    <bean id="destination" class="org.springframework.jndi.JndiObjectFactoryBean">

 <property name="jndiName">

     <value> java:comp/env/jms/queue</value>

 </property>

    </bean>

   

    <!--  Spring JmsTemplate config -->

    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">

 <property name="connectionFactory">

     <bean

  class="org.springframework.jms.connection.SingleConnectionFactory">

  <property name="targetConnectionFactory"

      ref="jmsConnectionFactory"/>

     </bean>

 </property>

    </bean>

   

    <!-- POJO which send Message uses Spring JmsTemplate --> <!--配置消息生產者-->

    <bean id="messageProducer" class="com.focustech.jms.MessageProducer">

 <property name="template" ref="jmsTemplate"/>

 <property name="destination" ref="destination"/>

    </bean>

   

    <!--  Message Driven POJO (MDP) -->

    <bean id="messageListener" class=" com.focustech.jms.MessageConsumer"/>

   

    <!--  listener containerMDP無需實現接口 -->

    <bean id="listenerContainer"

 class="org.springframework.jms.listener.DefaultMessageListenerContainer">

 <property name="connectionFactory" ref="jmsConnectionFactory"/>

 <property name="destination" ref="destination"/>

 <property name="messageListener" ref="messageListener"/>

    </bean>

</beans>

其中:

1)   jmsConnectionFactorydestination都是使用自定義的,並且你會發現,這兩個對象的加載類實際上是同樣的,都是JndiObjectFactoryBean,這是從JNDI讀取鏈接的意思。

3)  MessageProducer是消息發送方。

4)  MessageConsumer實現了一個MessageListener,監聽是否收到消息。

4、發送和接收消息的class以下(主要代碼)

MessageProducer.java

public class MessageProducer {

    private JmsTemplate template;

    private Destination destination;

 

    public void send(final String message) {

 template.send(destination, new MessageCreator() {

     public Message createMessage(Session session) throws JMSException {

  Message m = session.createTextMessage(message);

  return m;

     }

 });

    }

 

    public void setDestination(Destination destination) {

 this.destination = destination;

    }

 

    public void setTemplate(JmsTemplate template) {

 this.template = template;

    }

 

}

 

MessageConsumer.java

public class MessageConsumer implements MessageListener {

 

    public void onMessage(Message message) {

        try

       {

           System.out.println(((TextMessage) message).getText());

       }

       catch (JMSException e)

       {

       }

    }

}

注:在上面的實例類中,因爲在發送方發送的是文本消息(TextMessage),因此在上面的接收者代碼中我直接將其轉換成TextMessage就好了。若是是在真正的環境下,應該首先判斷一下對方發送的是什麼類型,而後才轉換成對應的消息。

5、測試消息

爲了測試的方便,能夠在webroot下新建一個test.jsp,而後將下面的代碼放到JSP的代碼中,而後在網頁地址欄中輸入連接(如:http://oss.vemic.com/test.jsp 注:oss.vemic.com是本地服務器連接)就能夠看到發送的消息了。

<%

try {

     ServletContext servletContext = this.getServletContext();

     WebApplicationContext wac = WebApplicationContextUtils

      .getRequiredWebApplicationContext(servletContext);

     MessageProducer mp = (MessageProducer) wac.getBean("messageProducer");

           mp.send("JMS TEST!!");

      } catch (JmsException e) {

  }

%>

(二)、配置跨服務器(即兩個或多個resin之間)版消息機制

上面介紹的是單服務器的消息模式配置,使用消息模式,是由於咱們須要在兩個或多個服務器之間進行消息的傳遞,而不是單個服務器內容的消息傳遞。看過不少資料才發現,幾乎全部的服務器都不支持跨服務器的消息模式,就算有(如JBoss),那也是由於它們自己集成了第三方的工具而實現的。而在第三方軟件裏面,我最終選擇了apache activeMQ

apache activeMQ的簡介能夠去其官網查看:http://activemq.apache.org/

或參考:http://www.blogjava.net/cctvx1/archive/2007/02/07/98457.html

IActiveMQ的安裝與配置

在其官網上下載最新的對應系統的版本。通常來講,下載完解壓以後就可能經過運行:apache-activemq-5.2.0\bin\ activemq.bat就能夠成功啓動。具體詳細的信息參考:

http://andyao.javaeye.com/blog/153171,或者也可參考其官網。

II、整合SpringJMS消息發送

在真正操做以前,爲了避免至於糊塗,咱們應該忘掉前面所說的全部配置(固然,JMS的基礎知識咱們仍是應該記住的,由於全部的JMS操做都是基於此的。還有那兩個生產消息與消費消息的類與測試頁面咱們也要保留,由於咱們下面還須要它們),好吧,如今將全部的配置迴歸到開始的狀態吧(如:resin.conf, JMSSpring中的配置等等都回到原始狀態吧)。

先說一下我運行時所需的環境吧:JDK1.5.0_12JDK1.6.0_05均可以;resin-3.0.25(其它版本沒有試過);配置ActiveMQ所需的JAR包有:

activemq-core-5.2.0.jaractivemq-web-5.2.0.jargeronimo-j2ee-management_1.0_spec-1.0.jargeronimo-jms_1.1_spec-1.1.1.jargeronimo-jta_1.0.1B_spec-1.0.1.jarxbean-spring-3.4.jar

好了,一切準備就緒了。那就讓ActiveMQ先在系統中運行吧(也就是先單服務器運行,先易後難嘛)。爲了讓它可以運行起來,咱們須要作如下的準備工做:

一、 使用ActiveMQJMSSpring中的配置

其實這裏的許多配置和上面說的單服務器的配置是差很少的,只是這裏再也不須要配置resinweb.xml的配置與上面的如出一轍(固然,我仍是按照個人方式配置在了service.xml中),好了,如今不一樣的配置是jmsconfig.xml

<beans xmlns="http://www.springframework.org/schema/beans"

       xmlns:amq="http://activemq.org/config/1.0"

       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

  http://activemq.org/config/1.0 http://people.apache.org/repository/org.apache.activemq/xsds/activemq-core-4.1-incubator-SNAPSHOT.xsd">

       <!-- 配置ActiveMQ服務 -->

       <amq:broker useJmx="false" persistent="false">

              <amq:transportConnectors>

<!-- ActiveMQ目前支持的transport有:VM TransportTCP TransportSSL TransportPeer TransportUDP TransportMulticast TransportHTTP and HTTPS TransportFailover TransportFanout TransportDiscovery TransportZeroConf Transport-->

                     <amq:transportConnector uri="tcp://test.vemic.com:61616" />

              </amq:transportConnectors>

       </amq:broker>

       <!-- 配置JMS鏈接工廠(注:brokerURL是關鍵,它應該是上面的amq:transportConnectors裏面的值之一對應,由於這裏指定鏈接的對象) -->

       <amq:connectionFactory id="jmsConnectionFactory"

              brokerURL="tcp://test.vemic.com:61616" />

       <!-- 消息發送的目的地(注:」amq:queue」是用於指定是發送topic仍是queue -->

       <amq:queue name="destination" physicalName="ossQueue" />

       <!-- 建立JMSSession生成類,也就是jmsTemplate -->

       <bean id="jmsTemplate"

              class="org.springframework.jms.core.JmsTemplate">

              <property name="connectionFactory">

                     <bean

       class="org.springframework.jms.connection.SingleConnectionFactory">

                            <property name="targetConnectionFactory"

                                   ref="jmsConnectionFactory" />

                     </bean>

              </property>

       </bean>

       <!-- 消息生產者(經過指定目的地, 就能夠同時指定其發送的消息模式是topic仍是queue) -->

       <bean id="messageProducer"

              class="com.focustech.jms.MessageProducer">

              <property name="template" ref="jmsTemplate" />

              <property name="destination" ref="destination" />

       </bean>

       <!-- 消息接收類(這個類須要繼承javax.jms.MessageListener,固然也能夠經過MessageListenerAdapter指定消息轉換器來實現用戶自定義的消息收發) -->

       <bean id="messageListener"

              class=" com.focustech.jms.MessageConsumer">

       </bean>

       <!-- 消息監聽容器,其各屬性的意義爲:

              connectionFactory:指定所監聽的對象,在這裏就是監聽鏈接到tcp://test.vemic.com:61616上面的ActiveMQ

              destination:監聽的消息模式;

              messageListener:接收者

               -->

       <bean id="listenerContainer"

       class="org.springframework.jms.listener.DefaultMessageListenerContainer">

              <property name="connectionFactory" ref="jmsConnectionFactory" />

              <property name="destination" ref="destination" />

              <property name="messageListener" ref="messageListener" />

       </bean>

</beans>

注:test.vemic.com是我本機的URL,和localhost同樣。

二、 消息測試

採用上面單機測試的消息就能夠了。最終運行的結果爲:

JMS TEST!!

注:

注意到了沒有?上面的配置從jmsTemplate開始往下就和前面介紹過的單服務器的配置同樣了。看到這裏,我相信你們對JMS的工做過程應該很清楚了。我我的認爲咱們能夠簡單的這樣理解其工做過程:

生產者

 

消費者

 

JMS connectionFactory

產生消息併發往指定的目的地

接收消息並給出已消費確認信息,JMS connectionFactory收到確認信息後就將對應的信息從本身的管理庫中刪除

從上面的圖很清楚地看出,要想實現跨服務器的JMS消息機制,JMS connectionFactory是關鍵的地方,簡單地說:connectionFactory就決定了JMS的做用範圍。若是connectionFactory是受制於系統(也就是說,當系統停掉以後connectionFactory也就跟着銷燬),那麼它就不能實現跨服務器功能。要想實現跨服務功能,connectionFactory就必須獨立於系統或服務器。因而可知,結合前面的知識,咱們就能夠知道,可以實現跨服務器的JMS消息機制其實有兩種方式:JDBC方式和採用第三方工具。前面我也說過,我選擇了後者(並且我也一直這麼作了)。

三、  實現多服務器的JMS共享,即實現JMS跨服務器功能

ActiveMQ的單服務器版咱們已經成功搭建並能成功運行了。如今讓咱們實現JMS跨服務器功能吧。等等,咱們先準備另外一個服務器環境。爲了明顯的區別兩個服務,我將上面全部的環境從新弄了一份(一個新的MyEclipse;一個新的web工程,固然web工程裏面的環境與上面的同樣;一個新的resin;一個新的resin端口8081,上面的resin端口是80),我稱之爲client

接下來,咱們來配置client中的JMS。在全部的系統配置中只有一個配置文件與上面的有區別,那就是jmsconfig.xml

<beans xmlns="http://www.springframework.org/schema/beans"

       xmlns:amq="http://activemq.org/config/1.0"

       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

  http://activemq.org/config/1.0 http://people.apache.org/repository/org.apache.activemq/xsds/activemq-core-4.1-incubator-SNAPSHOT.xsd">

       <!-- 配置JMS鏈接工廠(注:brokerURL是關鍵,它應該是上面的amq:transportConnectors裏面的值之一對應,由於這裏指定鏈接的對象) -->

       <amq:connectionFactory id="jmsConnectionFactory"

              brokerURL="tcp://test.vemic.com:61616" />

       <!-- 消息發送的目的地(注:」 amq:queue」是用於指定是發送topic仍是queue,對應上面配置中的amq:destinations -->

       <amq:queue name="destination" physicalName="ossQueue" />

       <!-- 建立JMSSession生成類 -->

       <bean id="jmsTemplate"

              class="org.springframework.jms.core.JmsTemplate">

              <property name="connectionFactory">

                     <bean

       class="org.springframework.jms.connection.SingleConnectionFactory">

                            <property name="targetConnectionFactory"

                                   ref="jmsConnectionFactory" />

                     </bean>

              </property>

       </bean>

       <!-- 消息生產者(經過指定目的地, 就能夠同時指定其發送的消息模式是topic仍是queue) -->

       <bean id="messageProducer"

              class="com.focustech.jms.MessageProducer">

              <property name="template" ref="jmsTemplate" />

              <property name="destination" ref="destination" />

       </bean>

       <!-- 消息接收類(這個類須要繼承javax.jms.MessageListener,固然也能夠經過MessageListenerAdapter指定消息轉換器來實現用戶自定義的消息收發) -->

       <bean id="messageListener"

              class="com.focustech.jms.MessageConsumer">

       </bean>

       <!-- 消息監聽容器,其各屬性的意義爲:

              connectionFactory:指定所監聽的對象,在這裏就是監聽鏈接到tcp://test.vemic.com:61616上面的ActiveMQ

              destination:監聽的消息模式;

              messageListener:接收者

               -->

       <bean id="listenerContainer"

       class="org.springframework.jms.listener.DefaultMessageListenerContainer">

              <property name="connectionFactory" ref="jmsConnectionFactory" />

              <property name="destination" ref="destination" />

              <property name="messageListener" ref="messageListener" />

       </bean>

</beans>

注意了!這個配置與上面的ActiveMQ的配置只有一個地方不同,也就是:這個配置中沒有配置ActiveMQ的相關信息。爲何?結合上面所述的JMS簡單工做方式,咱們應該不可貴到答案:由於ActiveMQ要實現跨服務器就必須獨立運行,因此咱們只須要啓動一個就夠了。

注:

其實在這個簡單的跨服務器的例子中,其中一方只須要配置消息生產者,而另外一方只須要配置消息的消費者就能夠測試經過了,並且測試效果會很明顯:在消息生產者的那個服務器上運行測試程序,在消息接收的服務器上就會有相應的響應!在這裏我之因此這樣作,是讓你們在測試時發現一個奇怪的現象:當在一端運行測試程序,第一個消息會被當前運行測試程序的服務消費掉,而接下來的消息又被另外一個服務器消費掉,如此循環(我沒有測試過三個及以上的服務器運行狀況,我想多是消息一個個的均攤下去的吧)。之因此要讓你們看到這個現象,是爲了讓你們有個疑問,容易接受後面高級應用中關於JMS的兩種消息機制:Point-to-Point消息(P2P)和發佈訂閱消息(Publish Subscribe messaging,簡稱Pub/Sub,也就是廣播模式)的使用方法。

III、高級應用

從這裏開始,咱們將進入JMS消息的一些特殊用法,或者叫高級應用。在介紹這些應用的時候,咱們會用到上面已經佈署好的跨服務器的應用來做例子。爲了區別兩個服務器,咱們把配置有ActiveMQ的叫server,另外一個仍是叫client

1、使用指定的消息,即消息的P2PPub/Sub的應用

上節內容的最後給你們留下了一個有趣的現象。在這裏,咱們將針對這個現象進行詳細的解析。

咱們前面也知道了,消息模式有兩種,但怎麼使用卻一直沒提過。可是若是仔細的看過官網的資料也許已經知道一些了。在這裏,我將用實例的方式給你們展示一下它的具體使用。(參考:http://andyao.javaeye.com/blog/234101

首先,咱們來看Queue消息的使用實例。上面的跨服務器的實例其實就是queue實例的應用,但問題是:如何指定惟一的接收者呢?也就是不能出現上面提到的那個奇怪的循環現象呢?

其實這個現象也並不難回答,首先讓咱們來仔細看一下queue消息的目的地配置:

<amq:queue name="destination" physicalName="ossQueue" />

對於上面的配置,咱們能夠一一解讀其中各參數的含義就知道奧妙所在了:

amq:queue:表示這是配置是queue消息;

name:指定的消息發送與接收的目的地的名稱;

physicalName:指定消息隊列的物理名稱,在ActiveMQ中它就是一個消息集羣的表示形式。

根據上面配置的含義,咱們不難發現,其實奧妙就在physicalName這個屬性中體現的。具體來講,對於queue消息而言,只要發送方與接收方都使用同一個physicalName,這就是點對點指定了。例如:將上面的例子中client中的:

<amq:queue name="destination" physicalName="ossQueue" />

改爲:

<amq:queue name="destination" physicalName="ossQueue1" />

這樣的話,咱們上面說的「奇怪的現象」就不會存在了。由於server中消息是發送到ossQueue這個消息隊列裏的,而client中消息目的地是指向ossQueue1的,固然就收不到server裏面ossQueue中的消息了。

咱們再來看一下如何使用Topic消息。其實很簡單,只要將第II節配置中的

<amq:queue name="destination" physicalName="ossQueue" />

改爲:

<amq:topic name="destination" physicalName="ossQueue" />

就能夠了。運行測試程序時,會發如今兩個服務都會收到響應信息。

注:關於topic,有一個概念,叫「訂閱」。關於這個詞我也不是很瞭解,但我理解是:在系統中配置了amq:topic而且connectionFactory指定到對應的uri上的前提上,只要amq:topic中對應的physicalNamepublish端相同,這就是訂閱,這樣的配置以後它就能收到發送的信息了。

總之一句話,點對點(或發佈訂閱模式)的消息發送關鍵在於收發雙方是否共同指定同一個physicalName

2、自定義消息的收發類

正如前面例子中配置文件中的一行註釋說的同樣,消息的收發類用戶能夠自定義,它是經過MessageListenerAdapter指定消息轉換器來實現用戶自定義的消息收發。具體的操做咱們仍是來看實例吧(爲了簡單起見,咱們以單服務器的配置與運行做實例,跨服務配置是同樣的):

jmsconfig.xml

<beans xmlns="http://www.springframework.org/schema/beans"

       xmlns:amq="http://activemq.org/config/1.0"

       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

  http://activemq.org/config/1.0 http://activemq.apache.org/schema/core/activemq-core-5.0.0.xsd">

       <!-- 配置ActiveMQ服務 -->

       <amq:broker useJmx="false" persistent="false">

              <amq:transportConnectors>

                     <!-- 提供的鏈接方式有:VM TransportTCP TransportSSL Transport

                            Peer TransportUDP TransportMulticast TransportHTTP and HTTPS Transport

                            Failover TransportFanout TransportDiscovery TransportZeroConf Transport -->

                     <amq:transportConnector uri="tcp://test.vemic.com:61616" />

              </amq:transportConnectors>

       </amq:broker>

       <!-- 配置JMS鏈接工廠(注:brokerURL是關鍵,

它應該是上面的amq:transportConnectors裏面的值之一) -->

       <amq:connectionFactory id="jmsConnectionFactory"

              brokerURL="tcp://test.vemic.com:61616" />

       <!-- 消息發送的目的地(注:amq:queue是用於指定是發送topic不是queue,對應上面配置中的amq:destinations -->

       <amq:queue name="destination" physicalName="ossQueue" />

       <!-- 建立JMSSession生成類 -->

       <bean id="jmsTemplate"

              class="org.springframework.jms.core.JmsTemplate">

              <property name="connectionFactory">

                     <bean

                            class="org.springframework.jms.connection.SingleConnectionFactory">

                            <property name="targetConnectionFactory"

                                   ref="jmsConnectionFactory" />

                     </bean>

              </property>

              <!-- 指定發送信息時使用的消息轉換類.

                     這個選項不填的話,默認的是:SimpleMessageConverter,它只支持4種類型的對象:String, byte[],Map,Serializable

              -->

              <!—若是加上下面這段配置就會出錯, 錯誤緣由是Book不是一個原始類, 但我已經將它繼承Serializable,可仍是不行,我想可能有其餘什麼緣由吧, 但我如今不清楚 -->

              <!-- <property name="messageConverter"

                     ref="resourceMessageConverter" /> -->

       </bean>

       <!-- 發送消息的轉換類

(這個類要繼承org.springframework.jms.support.converter.MessageConverter -->

       <bean id="resourceMessageConverter"

              class=" com.focustech.jms.ResourceMessageConverter" />

       <!-- 消息生產者(經過指定目的地, 就能夠同時指定其發送的消息模式是topic仍是queue) -->

       <bean id="resourceMessageProducer"

              class=" com.focustech.jms.ResourceMessageProducer">

              <property name="template" ref="jmsTemplate" />

              <property name="destination" ref="destination" />

       </bean>

       <!-- 消息接收類(這個類須要繼承,固然也能夠經過MessageListenerAdapter指定消息轉換器來實現用戶自定義的消息收發) -->

       <bean id="resourceMessageListener"

              class="org.springframework.jms.listener.adapter.MessageListenerAdapter">

              <constructor-arg>

                     <bean

                            class=" com.focustech.jms.ResourceMessageConsumer">

                     </bean>

              </constructor-arg>

              <property name="defaultListenerMethod" value="recieve" />

              <!—自定義接收類與接收的方法 -->

              <property name="messageConverter"

                     ref="resourceMessageConverter" />

       </bean>

       <!-- 消息監聽容器,其各屬性的意義爲:

              connectionFactory:指定所監聽的對象,在這裏就是監聽鏈接到tcp://test.vemic.com:61616上面的ActiveMQ

              destination:監聽的消息模式;

              messageListener:接收者

       -->

       <bean id="listenerContainer"

              class="org.springframework.jms.listener.DefaultMessageListenerContainer">

              <property name="connectionFactory" ref="jmsConnectionFactory" />

              <property name="destination" ref="destination" />

              <property name="messageListener" ref="resourceMessageListener" />

       </bean>

</beans>

在這裏,咱們須要發送本身定義的消息格式,這樣,咱們就須要不一樣的消息的生產者與消費者,固然,也須要一個自定義的將二者消息進行轉換的一個自定義的類,如上面配置文件中指定的同樣,這三個自定義的類的主要代碼以下:

ResourceMessageProducer

public class ResourceMessageProducer

{

       private JmsTemplate    template;

       private Destination      destination;

       public JmsTemplate getTemplate()

       {

              return template;

       }

       public void setTemplate(JmsTemplate template)

       {

              this.template = template;

       }

       public Destination getDestination()

       {

              return destination;

       }

       public void setDestination(Destination destination)

       {

              this.destination = destination;

       }

       public void send(Book book)

       {

              System.out.println("=======================================");

              System.out.println("do send ......");

              long l1 = System.currentTimeMillis();

              template.convertAndSend(this.destination, book);

              System.out.println("send time:" + (System.currentTimeMillis() - l1) / 1000 + "s");

              System.out.println("=======================================");

       }

}

ResourceMessageConverter

public class ResourceMessageConverter implements MessageConverter

{

       @SuppressWarnings("unchecked")

       public Message toMessage(Object obj, Session session) throws JMSException, MessageConversionException

       {

              // check Type

              if (obj instanceof Book)

              {

                     // 採用ActiveMQ的方式傳遞消息

                     ActiveMQObjectMessage objMsg = (ActiveMQObjectMessage) session.createObjectMessage();

                     Map map = new HashMap();

                     map.put("Book", obj);

                     // objMsg.setObjectProperty裏面放置的類型只能是:String, Map, Object, List

                     objMsg.setObjectProperty("book", map);

                     return objMsg;

              }

              else

              {

                     throw new JMSException("Object:[" + obj + "] is not Book");

              }

       }

       public Object fromMessage(Message msg) throws JMSException, MessageConversionException

       {

              if (msg instanceof ObjectMessage)

              {

                     Object obj = ((ObjectMessage) msg).getObject();

                     return obj;

              }

              else

              {

                     throw new JMSException("Msg:[" + msg + "] is not Map");

              }

       }

}

ResourceMessageConsumer

public class ResourceMessageConsumer

{

       public void recieve(Object obj)

       {

              Book book = (Book) obj;

              System.out.println("=======================================");

              System.out.println("receiveing message ...");

              System.out.println(book.toString());

              System.out.println("here to invoke our business method...");

              System.out.println("=======================================");

       }

}

Book

public class Book implements Serializable

{

       /**

        *

        */

       private static final long       serialVersionUID   = -6988445616774288928L;

       long                                    id;

       String                                 name;

       String                                 author;

       public String getAuthor()

       {

              return author;

       }

       public void setAuthor(String author)

       {

              this.author = author;

       }

       public long getId()

       {

              return id;

       }

       public void setId(long id)

       {

              this.id = id;

       }

       public String getName()

       {

              return name;

       }

       public void setName(String name)

       {

              this.name = name;

       }

}

消息測試:將測試JSP中的JAVA代碼改爲:

<%

try {

     ServletContext servletContext = this.getServletContext();

     WebApplicationContext wac = WebApplicationContextUtils

      .getRequiredWebApplicationContext(servletContext);

ResourceMessageProducer resourceMessageProducer = (ResourceMessageProducer) context.getBean("messageProducer");

Book book = new Book();

book.setId(123);

book.setName("jms test!");

book.setAuthor("taofucheng");

resourceMessageProducer.send(book);

      } catch (JmsException e) {

  }

%>

運行系統,打開測試頁面,會發現消息已經成功接收!

注:(1)、經過這種方法,咱們就能夠發送咱們想發送的任何對象了(有些限制:這些對象的類型必須是:String, Map, byte[],Serializable。上面的例子已經註釋得很清楚)

2)、若是你們有興趣的話,看一下MessageListenerAdapter的源碼,你就會發現其實它就是MessageListener的實現類,在它實現的onMessage方法中使用了用戶自定義的轉換類而已。

3、集成事務

Spring提供的JMSAPI中已經有了集成事務的功能,咱們只要將上面監聽容器的配置改爲下面的就好了:

首先,將jmsTemplate設置成支持事務(它默認是不支持事務的):

       <bean id="jmsTemplate"

              class="org.springframework.jms.core.JmsTemplate">

              <property name="connectionFactory">

                     <bean

                            class="org.springframework.jms.connection.SingleConnectionFactory">

                            <property name="targetConnectionFactory"

                                   ref="jmsConnectionFactory" />

                     </bean>

              </property>

              <property name="sessionTransacted" value="true"/>

       </bean>

而後再在消息監聽容器中設置指定的事務管理:

    <bean id="listenerContainer"

              class="org.springframework.jms.listener.DefaultMessageListenerContainer">

              <property name="connectionFactory" ref="jmsConnectionFactory" />

              <property name="destination" ref="destination" />

              <property name="messageListener" ref="resourceMessageListener" />

              <!—jtaTransactionManager是系統中的事務管理類,在咱們的系統中,是由Spring託管的 -->

              <property name="transactionManager" ref="jtaTransactionManager" />

       </bean>

這樣配置以後,當事務發生回滾時,消息也會有回滾,即不發送出去。

4、其它高級應用

ActiveMQ還有許多其它高級的應用,如:自動重連機制,也就是保證當通訊雙方或多方的連接斷裂後它會根據用戶的設置自動鏈接,以保證創建可靠的傳輸;另外,ActiveMQ還有其它方式嵌入到Spring中,如它能夠經過xbean, file等方式創建應用;它還能夠經過JMX對消息的發送與接收進行實時查看;消息的確認方式等等,還有不少高級的應用,請參考:《ActiveMQ in Action(網址:http://whitesock.javaeye.com/blog/164925)

相關文章
相關標籤/搜索