本篇主要介紹了異步消息機制及Spring對JMS封裝,本篇文章講解較爲詳細,若是想直接看如何配置,能夠參考: Spring整合JMS(消息中間件)實例,但仍是建議你們先看完本篇文章。java
相似於RMI、Hessian、Burlap等遠程方法調用,它們都是同步的,所謂同步調用就是客戶端必須等待操做完成,若是遠程服務沒有返回任何響應,客戶端會一直等待直到服務完成。
spring
異步調用則不一樣,客戶端發送完消息無需等待服務處理完成即可當即返回,就像發送完消息就馬上被處理成功同樣。
apache
在異步處理的世界,咱們能夠把消息的發送比做一個郵局系統。好比咱們要給某我的發送信件,咱們只需準備好信件,把它投入郵局的郵箱便可,咱們沒必要關心郵件如何送出、可否到達,郵局系統會保證信件最終送達到咱們但願的接收者手中。和郵局系統相似,當一個應用向另外一個應用發送消息,兩個應用之間沒有直接的關聯,而是發送消息的應用把消息交給一個消息系統,由消息系統確保把消息傳遞給接收消息的應用。服務器
在異步消息系統中有兩個重要的角色:消息broker和destination。當一個應用發送一條消息,它會直接把它發送給消息broker,消息broker扮演的就是郵局,它會確保消息被傳遞到特定的destination。當咱們郵寄信件時,信件的地址尤其重要,消息系統中的地址就是destination。不過與信件中的地址不一樣,destination中定義的不是接收者是誰,而是消息被放在消息broker的什麼地方(具體指queue或者topic),destination其實更像郵局系統中的郵筒。網絡
儘管存在各類各樣的消息系統,每一個消息系統都有各自的消息路由方式,但整體上有兩種類型的destination:queue和topic,它們也各自關聯着一種特定的消息處理模型:點對點(point-to-point/queue)和發佈/訂閱(publish/subscribe/topic)session
在點對點模型中,每一個消息只有一個發送者和一個接收者。以下圖所示:
在點對點模型中, 消息broker會把消息放入一個queue。當一個接收者請求下一個消息時,消息會被從queue中取出並傳遞給接收者。由於消息從queue中取出便會被移除,因此這保證了一個消息只能有一個接收者。app
儘管消息隊列中的每一個消息只有一個接收者,但這並不意味着只能有一個接收者從隊列獲取消息,能夠同時有多個接收者從隊列獲取消息,只不過它們只能處理各自接收到的消息。其實這就像在銀行排隊同樣,排隊的人能夠看作一個個消息,而銀行工做窗口即是消息的接收者,每一個窗口服務完一個客戶以後都會讓隊列中的「下一個」到窗口辦理業務。框架
還有,若是多個接收者監聽一個隊列,咱們是很難肯定到底哪一個接收者處理哪一個消息的。不過這也不必定很差,由於這樣就使得咱們很方便的經過增長接收者來拓展應用處理能力了。異步
在發佈/訂閱模式中,消息是被髮送到topic中的。就像queue同樣,不少接收者能夠監聽同一個topic,可是與queue每一個消息只傳遞給一個接收者不一樣,訂閱了同一個topic的全部接收者都會收到消息的拷貝,以下圖所示:
從發佈/訂閱的名字中咱們也可看出,發佈者發佈一條消息,全部訂閱者都能收到,這就是發佈訂閱模式最大的特性。對於發佈者來講,它只知道將消息發佈到了一個特定的topic,它不關心誰監聽這個topic,這也就意味着它並不知道這些消息是被如何處理的。tcp
在具體介紹異步消息系統帶來的好處以前,咱們先看看同步系統的侷限性:
下面咱們再看一下異步消息系統是如何解決這些問題的。
無需等待
當一個消息被異步發送,客戶端不須要等待它處理完成。客戶端直接把消息扔給broker而後作其它事情,broker負責把消息送到合適的目的地。
由於客戶端不須要等待,因此客戶端的性能會有很大的提高。
面向消息和解耦合
不一樣於傳統基於方法調用的RPC會話,消息異步發送是以數據爲中心的。這就意味着客戶端不須要和某個方法簽名綁定,任何queue或topic的訂閱者均可以處理客戶端發送的消息。客戶端沒必要再關心服務方任何相關的問題。
位置獨立
同步RPC服務的調用是經過網絡地址定位的,這就意味着客戶端沒法擺脫網絡拓撲的變化。若是服務的IP或端口發生改變,客戶端也須要作相應的改變。
相反,異步消息系統中的客戶端並不關心服務所在的位置及其如何處理消息,它只負責將消息發送到特定的queue或topic。因此,服務位於什麼地方都無所謂,只要它們可以從queue或topic中獲取消息便可。
在點對點模式中,能夠很方便的利用位置獨立這個特性建立一個服務集羣。客戶端不須要關心服務的位置,集羣中各個服務僅需知道broker的位置,並從同一個queue獲取消息,若是服務壓力過大沒法及時處理消息,咱們只須要在集羣中增長一個服務實例去監聽同一個queue便可。
在發佈/訂閱模式中,位置獨立一樣有很重要的做用。多個服務能夠訂閱同一個topic,他們都能獲取到topic中的每一個消息,可是對各個服務的處理能夠不一樣。好比咱們有一個服務集合訂閱了一個接收新員工消息的topic,因此這些服務均可以獲得每一個新員工消息,一個服務能夠將新員工添加到薪資系統,另外一個服務能夠將新員工增長到hr系統,還有服務負責賦予新員工各類系統權限等等,每一個訂閱topic的服務都能對各自的消息作出本身的處理。
可靠性保證
當一個客戶端和服務經過同步方式進行交互時,若是服務出現任何問題掛掉,都會影響客戶端正常工做。可是當消息是異步發送時,客戶端與服務之間被broker隔離,客戶端只負責發送消息,即便當發送消息時服務掛掉,消息也會被broker存儲起來,等到服務可用時再接着進行處理。
Java Message Service是一個Java標準,它定義了一套與消息broker交互的通用API。在JMS出現以前,每一種消息broker都有本身獨特的一套API,使得應用代碼沒法在不一樣的broker之間適用。可是經過JMS,全部與broker交互的代碼就能夠適用一套通用的API,就像JDBC同樣。
固然Spring對JMS也提供了支持,即JmsTemplate。經過JmsTemplate,咱們能夠更加方便地向queue和topic發送和接收消息。後面咱們會詳細介紹Spring對JMS的實現,可是在發送和接收消息以前,咱們須要現有一個broker。
ActiveMQ是很是優秀的JMS框架,關於ActiveMQ相關內容這裏很少作介紹,具體能夠參考:http://activemq.apache.org/,本篇主要介紹如何在Spring中對其進行配置和使用。
咱們要想發送消息到ActiveMQ,就須要先建立到它的鏈接,ActiveMQConnectionFactory
就是JMS中負責建立到ActiveMQ鏈接的工廠類。在Spring中配置方式以下:
<bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" p:brokerURL="tcp://localhost:61616"/>
除此以外,Spring爲ActiveMQ提供了專門的命名空間,咱們可使用Spring的ActiveMQ命名空間來建立鏈接工廠。首先要在配置文件中聲明amq命名空間:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:amq="http://activemq.apache.org/schema/core" xsi:schemaLocation="http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> ... </beans>
而後咱們就能夠利用<amq:connectionFactory>
元素來聲明一個鏈接工廠:
<amq:connectionFactory id="connectionFactory" brokerURL="tcp://localhost:61616"/>
須要注意,<amq:connectionFactory>
元素是專門針對ActiveMQ的。若是咱們用到的是其它broker,就須要用另外的標籤元素或注入另外的工廠bean。上面元素中的brokerURL
指定了ActiveMQ在服務器中的IP和端口,上面端口值就是ActiveMQ默認端口。
除了要有一個鏈接工廠以外,咱們還須要知道消息發送到的destination。上面講過了,消息的destination只有兩類queue或者topic,在Spring中,咱們須要配置queue或topic對應的bean。
配置一個ActiveMQ queue bean:
<bean id="queue" class="org.apache.activemq.command.ActiveMQQueue" c:_="biz1.queue" />
配置一個ActiveMQ topic bean:
<bean id="topic" class="org.apache.activemq.command.ActiveMQTopic" c:_="biz1.topic" />
上面例子中c:_
屬性表明的是構造器參數,它指定了queue或topic的名稱。
像鏈接工廠同樣,Spring提供了另一種配置destination的方式,就是經過Spring ActiveMQ命名空間進行配置。
使用<amq:queue>
元素配置一個queue:
<amq:queue id="spittleQueue" physicalName="spittle.alert.queue" />
使用<amq:topic>
元素配置一個topic:
<amq:topic id="spittleTopic" physicalName="biz1.topic" />
上面元素中physicalName
屬性表明消息通道的名稱,也就是queue和topic的名稱。
經過上面兩個組件的配置,咱們就能夠向ActiveMQ發送和接收消息了。發送和接收消息咱們使用的是Spring提供的JmsTempate,它是Spring對JMS的抽象,下面就詳細介紹JMSTemplate的使用。
雖然JMS提供了一套與各類broker交互的通用API,但實際使用起來並非很方便,咱們先看一下使用普通JMS API與broker交互的代碼。
ConnectionFactory cf =new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection conn = null; Session session = null; try { conn = cf.createConnection(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = new ActiveMQQueue("spitter.queue"); MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage(); message.setText("Hello world!"); producer.send(message); } catch (JMSException e) { // handle exception? } finally { try { if (session != null) { session.close(); } if (conn != null) { conn.close(); } } catch (JMSException ex) { } }
上面代碼中咱們能夠看到,爲了發送一條 「Hello world」的消息卻用了20多行代碼,就像JDBC同樣,咱們大部分代碼都是再作一些重複性的準備工做,好比獲取鏈接、建立session、異常處理等等。其實接收消息的代碼也是如此,在JDBC中,Spring提供了一個JdbcTemplate來簡化JDBC代碼開發,一樣,Spring也提供了JmsTemplate
來簡化JMS消息處理的開發。
JmsTemplate實際上是Spring對JMS更高一層的抽象,它封裝了大部分建立鏈接、獲取session及發送接收消息相關的代碼,使得咱們能夠把精力集中在消息的發送和接收上。另外,JmsTemplate
對異常也作了很好的封裝,其對應的基本的異常爲JMSException
。
要使用JmsTemplate,就要在Spring配置文件中配置它做爲一個bean:
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" c:_-ref="connectionFactory" />
由於JmsTemplate須要先和broker進行鏈接,因此它須要依賴一個connectionFactory。
發送消息
假如咱們有一個業務須要用到異步消息發送,咱們先定義這樣一個業務接口:
public interface MyMessageService { void sendMessage(String message); }
上面接口中只有一個方法,就是發送消息。
咱們寫這個接口的實現,在這個接口實現中,咱們就是用JmsTemplate
實現異步消息發送:
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsOperations; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; /** * Created by whd@zizizizizi.com on 2016/6/17. */ @Component public class MyMessageServiceImpl implements MyMessageService{ @Autowired private JmsOperations jmsOperations; public void sendMessage(final String message) { jmsOperations.send("biz1.queue", new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); } }
咱們能夠看到,咱們業務的實現中注入了一個JmsOperations
對象,這個對象就是JmsTempate
的實現。JmsOperations
的send()
方法有兩個參數,第一個是消息的destination
,第二個即是具體的Message
,在上面例子中message是經過一個匿名內部類MessageCreator
的createMessage()
方法構造的。
經過上面例子能夠發現,經過JmsTempate
,咱們只須要關心發送消息便可,全部的鏈接和session的維護都由JmsTempate
負責。
設置默認destination
大部分狀況下,一個業務消息的destination是相同的,因此咱們沒必要每次發送都填寫destination,咱們能夠在配置文件中對其進行配置:
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" c:_-ref="connectionFactory" p:defaultDestinationName="biz1.queue" />
在上面配置中咱們默認destination值爲biz1.queue
,由於它只是聲明瞭一個名稱,並無說明是哪一種類型的destination,因此,若是存在相同名稱的queue或topic,就會自動與之匹配,若是不存在,則會默認建立一個相同名稱的queue。若是咱們想指定destination的類型,咱們能夠經過配置讓其依賴以前配置的destination bean便可:
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" c:_-ref="connectionFactory" p:defaultDestination-ref="biz1.Topic" />
當咱們配置了默認destination,咱們就能夠在發送消息時省略第一個參數了:
jmsOperations.send( new MessageCreator() { ... } );
其實上面的send()
方法能夠變得更簡單,咱們能夠利用消息轉換器。
使用消息轉換器發送消息
除了send()
方法以外,JmsTemplate
還提供了convertAndSend()
方法。與send()
方法須要依賴一個MessageCreator
不一樣,convertAndSend()
方法只須要傳入你想發送的消息便可。下面咱們用convertAndSend()
實現接口中的sendMessage()
方法:
public void sendMessage(final String message) { jmsOperations.convertAndSend(message); }
convertAndSend()
方法會自動把你發的消息轉換成Message
,具體如何轉換的由org.springframework.messaging.converter.MessageConverter
的實現來決定。咱們先看一下MessageConverter
接口:
public interface MessageConverter { Object fromMessage(Message<?> var1, Class<?> var2); Message<?> toMessage(Object var1, MessageHeaders var2); }
咱們能夠看到這個接口中只有兩個方法並且很容易實現。其實大部分狀況下咱們不須要本身去實現這個接口,Spring已經爲咱們提供給了不少經常使用的實現:
默認狀況下,當JmsTemplate
的convertAndSend()
方法使用的是SimpleMessageConverter
。可是咱們也能夠經過配置把咱們自定義的MessageConverter
做爲屬性注入到JmsTemplate
中,好比咱們有個一MessageConverter
的實現bean:
<bean id="messageConverter" class="org.springframework.jms.support.converter.MappingJacksonMessageConverter" />
咱們能夠把上面這個bean注入到JmsTemplate中:
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" c:_-ref="connectionFactory" p:defaultDestinationName="spittle.alert.queue" p:messageConverter-ref="messageConverter" />
消費消息
對於消費來講,JmsTemplate
使用起來比發送更簡單,只須要調用JmsOperations
的receive()
方法便可:
public class ReceiveMessage { @Autowired private JmsOperations jmsOperations; public String receive() { try { ObjectMessage message = (ObjectMessage) jmsOperations.receive(); return (String) message.getObject(); } catch (JMSException e) { e.printStackTrace(); throw JmsUtils.convertJmsAccessException(e); } } }
當調用 jmsOperations.receive()
方法時,它會嘗試從broker獲取消息,若此時沒有消息,receive()
方法會一直等待直到有消息產生。前面例子中,當咱們發送消息的時候消息被封裝成的是ObjectMessage
,所咱們在獲取的時候能夠再將其轉換回ObjectMessage
。
這裏有一點須要注意,當調用message.getObject()
方法時會拋出JMSException
,這個異常是屬於JMS API的。JMSException
是一個檢查異常,在JMS操做中會拋出各類各樣的JMSException
,可是前面咱們使用JmsTemplate
時並無捕獲任何JMSException
,是由於JmsTemplate
內部已經將須要檢查的JMSException
轉換成了非檢查的Spring本身的JmsException
。在上面代碼中由於調用的是message.getObject()
方法而不是JmsTemplate
的方法,因此咱們須要捕獲JMSException
。可是按照Spring的設計理念,咱們應該儘可能減小檢查異常,因此在catch塊裏面咱們又經過JmsUtils工具把JMSException
轉換成了非檢查的JmsException
。
一樣,就行消息的發送同樣,咱們也可使用JmsTemplate的receiveAndConvert()
方法替換receive()
方法:
public String receive() { return (String)jmsOperations.receiveAndConvert(); }
咱們看到,由於使用的是JmsTemplate
的方法,因此咱們不須要再捕獲JMSException
檢查異常。
無論使用msTemplate
的receive()
仍是receiveAndConvert()
方法消費消息,它們都是同步的。也就是說接收者在消息到達時須要等待。這樣看起來是否是有點奇怪?發送消息時是異步的,接收消息時倒是同步的。
這也就是爲何會有下面的消息驅動POJO出現的緣由,下面咱們就看一下如何實現異步的接收消息。
咱們上面已經知道,JmsTemplate
的receive()
方法是一個同步方法,在消息到達以前這個方法會掛起一直等待直到消息出現,若是這樣的話,咱們的應用可能會出現一直等待消息而不能作其它事情的狀況。爲什麼不讓應用先去處理其它業務,當消息出現時再告知應用處理呢?
在EJB中,message driven bean(MDB)
就能夠實現異步的處理消息。Spring在這方面參考了EJB3對MDB的實現,不過在Spring中咱們把它稱做消息驅動POJO,也就是message-driven POJO(MDP)
。
要想在消息出現時獲得通知,那麼就須要一個監聽器監聽queue或者topic,之因此稱做消息驅動POJO,意識由於監聽器是消息驅動的,而是由於這個監聽器自己就是一個普通的POJO對象,不須要依賴任何接口:
public class MyMessageHandler { public void handleMessage(String message){ //具體的實現 } }
有了這個POJO對象,下面只須要作簡單的配置便可。
賦予上面POJO接收消息能力的關鍵在於將其配置成一個Spring消息監聽器,Spring的jms命名空間提供了全部相關配置。
首先,咱們現須要把上面的POJO對象聲明成一個bean:
<bean id="myMessageHandler" class="com.heaven.springexamples.jms.MyMessageHandler" />
其次,把MessageHandler變成一個消息驅動POJO,即把這個bean聲明成一個listener:
<jms:listener-container connection-factory="connectionFactory"> <jms:listener destination="biz1.queue" ref="myMessageHandler" method="handleMessage" /> </jms:listener-container>
經過上面配置,消息監聽容器裏面就多了一個消息監聽器。消息監聽容器是一個特殊的bean,它可以監聽JMS的destination,監聽消息的到達。一旦消息到達,消息監聽容器會接受這個消息並將其發送給全部相關的listener。下面這幅圖展現了整個內部處理過程:
爲了配置監聽容器和監聽者,咱們用到了jms命名空間中的兩個元素。<jms:listener-container>
是父元素,<jms:listener >
是子元素。<jms:listener-container>
依賴一個connectionFactory
,這樣它的各個<jms:listener >
就能夠監聽消息了。<jms:listener >
用來定義具體接收消息的bean及方法。按照上面的配置,當消息到達queue時,MyMessageHandler
的handleMessage
方法便會被調用。
須要注意到是,咱們的MessageHandler
還能夠實現一個MessageListener
接口,這樣的話就不須要再單獨指定消息處理的方法了,MyMessageHandler
的onMessage()
方法會自動被調用。MessageListener接口定義以下:
public interface MessageListener { void onMessage(Message var1); }
咱們寫一個簡單的實現類:
public class MyMessageListener implements MessageListener{ public void onMessage(Message message) { //具體的實現 } }
而後直接配置listener便可(不用再配置method方法屬性):
<jms:listener-container connection-factory="connectionFactory"> <jms:listener destination="biz1.queue" ref="myMessageHandler" /> </jms:listener-container>
原文地址:http://blog.csdn.net/suifeng3051/article/details/51718675