activemq詳解

1. JMS架構html

 

Java 消息服務(Java Message Service,簡稱JMS)是用於訪問企業消息系統的開發商中立的API。企業消息系統能夠協助應用軟件經過網絡進行消息交互。JMS 在其中扮演的角色與JDBC 很類似,正如JDBC 提供了一套用於訪問各類不一樣關係數據庫的公共API,JMS 也提供了獨立於特定廠商的企業消息系統訪問方式。java

使用JMS 的應用程序被稱爲JMS 客戶端,處理消息路由與傳遞的消息系統被稱爲JMS Provider,而JMS 應用則是由多個JMS 客戶端和一個JMS Provider 構成的業務系統。發送消息的JMS 客戶端被稱爲生產者(producer),而接收消息的JMS 客戶端則被稱爲消費者(consumer)。同一JMS 客戶端既能夠是生產者也能夠是消費者。web

JMS 的編程過程很簡單,歸納爲:應用程序A 發送一條消息到消息服務器(也就是JMS Provider)的某個目得地(Destination),而後消息服務器把消息轉發給應用程序B。由於應用程序A 和應用程序B 沒有直接的代碼關連,因此二者實現瞭解偶。以下圖:spring


 

消息傳遞系統的中心就是消息。一條Message 由三個部分組成:數據庫

 

消息的組成apache

 

1. 頭(head)編程

每條JMS 消息都必須具備消息頭。頭字段包含用於路由和識別消息的值。能夠經過多種方式來設置消息頭的值:安全

a. 由JMS 提供者在生成或傳送消息的過程當中自動設置服務器

b. 由生產者客戶機經過在建立消息生產者時指定的設置進行設置網絡

c. 由生產者客戶機逐一對各條消息進行設置

 

2. 屬性(property)

消息能夠包含稱做屬性的可選頭字段。他們是以屬性名和屬性值對的形式制定的。能夠將屬性是爲消息頭得擴展,其中能夠包括以下信息:建立數據的進程、數據的建立時間以及每條數據的結構。JMS提供者也能夠添加影響消息處理的屬性,如是否應壓縮消息或如何在消息生命週期結束時廢棄消息。

 

3. 主體(body)

包含要發送給接收應用程序的內容。每一個消息接口特定於它所支持的內容類型。JMS爲不一樣類型的內容提供了他們各自的消息類型,可是全部消息都派生自Message接口。

StreamMessage   一種主體中包含Java基元值流的消息。其填充和讀取均按順序進行。

MapMessage     一種主體中包含一組鍵--值對的消息。沒有定義條目順序。

TextMessage       一種主體中包含Java字符串的消息(例如,XML消息)。

ObjectMessage    一種主體中包含序列化Java對象的消息。

BytesMessage     一種主體中包含連續字節流的消息。

例如:MapMessage 消息格式

 

Json代碼  收藏代碼

  1. MapMessage={  
  2.     Header={  
  3.         ... standard headers ...  
  4.         CorrelationID={123-00001}  
  5.     }  
  6.     Properties={  
  7.         AccountID={Integer:1234}  
  8.     }  
  9.     Fields={  
  10.         Name={String:Mark}  
  11.         Age={Integer:47}  
  12.     }   
  13. }  

 

消息的傳遞模型

 

JMS支持兩種消息傳遞模型:點對點(point-to-point,簡稱PTP)和發佈/訂閱(publish/subscribe,簡稱pub/sub)。這兩種消息傳遞模型很是類似,但有如下區別:

a. PTP消息傳遞模型規定了一條消息之恩可以傳遞費一個接收方。

b. Pub/sub消息傳遞模型容許一條消息傳遞給多個接收方

每一個模型都經過擴展公用基類來實現。例如:javax.jms.Queue和Javax.jms.Topic都擴展自javax.jms.Destination類。

 

1. 點對點消息傳遞

經過點對點的消息傳遞模型,一個應用程序能夠向另一個應用程序發送消息。在此傳遞模型中,目標類型時隊列。消息首先被傳送至隊列目標,而後從改對壘將消息傳送至對此隊列進行監聽的某個消費者,以下圖:

 

一個隊列能夠關聯多個隊列發送方和接收方,但一條消息僅傳遞給一個接收方。若是多個接收方正在監聽隊列上的消息,JMS Provider將根據「先來者優先」的原則肯定由哪一個價售房接受下一條消息。若是沒有接收方在監聽隊列,消息將保留在隊列中,直至接收方鏈接到隊列爲止。這種消息傳遞模型是傳統意義上的拉模型或輪詢模型。在此列模型中,消息不時自動推進給客戶端的,而是要由客戶端從隊列中請求得到。

 

2. 發佈/訂閱消息傳遞

經過發佈/訂閱消息傳遞模型,應用程序可以將一條消息發送到多個接收方。在此傳送模型中,目標類型是主題。消息首先被傳送至主題目標,而後傳送至全部已訂閱此主題的或送消費者。以下圖:

 

主題目標也支持長期訂閱。長期訂閱表示消費者已註冊了主題目標,但在消息到達目標時改消費者能夠處於非活動狀態。當消費者再次處於活動狀態時,將會接收該消息。若是消費者均沒有註冊某個主題目標,該主題只保留註冊了長期訂閱的非活動消費者的消息。與PTP消息傳遞模型不一樣,pub/sub消息傳遞模型容許多個主題訂閱者接收同一條消息。JMS一直保留消息,直至全部主題訂閱者都接收到消息爲止。pub/sub消息傳遞模型基本上時一個推模型。在該模型中,消息會自動廣播,消費者無須經過主動請求或輪詢主題的方法來得到新的消息。

 

上面兩種消息傳遞模型裏,咱們都須要定義消息生產者和消費者,生產者吧消息發送到JMS Provider的某個目標地址(Destination),消息從該目標地址傳送至消費者。消費者能夠同步或異步接收消息,通常而言,異步消息消費者的執行和伸縮性都優於同步消息接收者,體如今:

1. 異步消息接收者建立的網絡流量比較小。單向對東消息,並使之經過管道進入消息監聽器。管道操做支持將多條消息聚合爲一個網絡調用。

2. 異步消息接收者使用線程比較少。異步消息接收者在不活動期間不使用線程。同步消息接收者在接收調用期間內使用線程,結果線程可能會長時間保持空閒,尤爲是若是該調用中指定了阻塞超時。

3.對於服務器上運行的應用程序代碼,使用異步消息接收者幾乎老是最佳選擇,尤爲是經過消息驅動Bean。使用異步消息接收者能夠防止應用程序代碼在服務器上執行阻塞操做。而阻塞操做會是服務器端線程空閒,甚至會致使死鎖。阻塞操做使用全部線程時則發生死鎖。若是沒有空餘的線程能夠處理阻塞操做自身解鎖所需的操做,這該操做永遠沒法中止阻塞。

 

 

2. JMS Provider(ActiveMQ)

 

特性及優點

 

1. 實現JMS1.1規範,支持J2EE1.4以上。

2. 可運行與任何JVM和大部分web容器(ActiveMQ works great in any JVM)

3. 支持多種語言客戶端(java, C, C++, Ajax, ActionScript等等)

4. 支持多種協議(stomp, openwire, REST)

5. 良好的Spring支持(ActiveMQ has great Spring Support)

6. 速度很快,JBossMQ的十倍(ActiveMQ is very fast; often 10x faster than JBossMQ)

7. 與OpenJMS、JBossMQ等開源jms provider相比,ActiveMQ有apache的支持,持續發展的優點明顯

 

Queue與Topic的比較

 

1. JMS Queue執行load balancer語義

    一條消息僅能被一個consumer收到。若是在message發送的時候沒有可用的consumer,那麼它講被保存一直到能處理該message的consumer可用。若是一個consumer收到一條message後卻不響應它,那麼這條消息將被轉到另一個consumer那兒。一個Queue能夠有不少consumer,而且在多個可用的consumer中負載均衡。

 

2. Topic實現publish和subscribe語義

    一條消息被publish時,他將發送給全部感興趣的訂閱者,因此零到多個subscriber將接收到消息的一個拷貝。可是在消息代理接收到消息時,只有激活訂閱的subscriber可以得到消息的一個拷貝。

 

3. 分別對應兩種消息模式

    Point-to-Point(點對點),Publisher/Subscriber Model(發佈/訂閱者)

    其中在Publicher/Subscriber模式下又有Nondurable subscription(非持久化訂閱)和durable subscription(持久化訂閱)兩種消息處理方式。

 

Point-to-Point(點對點)消息模式開發流程

 

1. 生產者(producer)開發流程

 

    1.1 建立 Connection

Java代碼  收藏代碼

  1. // 根據url,user和password建立一個jms Connection。  
  2. ActiveMQConnectionFactory connectionFactory   =   new ActiveMQConnectionFactory (user, password, url);  
  3. connection = connectionFactory.createConnection();  
  4. connection.start();  

 

    1.2 建立Session

Java代碼  收藏代碼

  1. /**在connection的基礎上建立一個session,同時設置是否支持事務ACKNOWLEDGE標識。 
  2. • AUTO_ACKNOWLEDGE:自動確認模式。一旦接收方應用程序的方法調用從處理消息處返回,會話對象就會確認消息的接收。  
  3. • CLIENT_ACKNOWLEDGE:客戶端確認模式。會話對象依賴於應用程序對被接收的消息調用一個acknowledge()方法。一旦這個方法被調用,會話會確認最後一次確認以後全部接收到的消息。這種模式容許應用程序以一個調用來接收,處理並確認一批消息。注意:在管理控制檯中,若是鏈接工廠的Acknowledge Policy(確認方針)屬性被設置爲"Previous"(提早),可是你但願爲一個給定的會話確認全部接收到的消息,那麼就用最後一條消息來調用acknowledge()方法。  
  4. • DUPS_OK_ACKNOWLEDGE:容許副本的確認模式。一旦接收方應用程序的方法調用從處理消息處返回,會話對象就會確認消息的接收;並且容許重複確認。在須要考慮資源使用時,這種模式很是有效。注意:若是你的應用程序沒法處理重複的消息的話,你應該避免使用這種模式。若是發送消息的初始化嘗試失敗,那麼重複的消息能夠被從新發送。 
  5. •  SESSION_TRANSACTED**/  
  6. Session session = connection.createSession(  
  7. transacted, Session.AUTO_ACKNOWLEDGE);  

 

    1.3 建立Destination對象

Java代碼  收藏代碼

  1. //需指定其對應的主題(subject)名稱,producer和consumer將根據subject來發送/接收對應的消息  
  2. if (topic) {  
  3.     destination = session.createTopic(subject);  
  4. } else {  
  5.     destination = session.createQueue(subject);  
  6. }  

 

    1.4 建立MessageProducer

Java代碼  收藏代碼

  1. //根據Destination建立MessageProducer對象,同時設置其持久模式。   
  2. MessageProducer producer = session.createProducer(destination);  
  3. if (persistent) {  
  4.       producer.setDeliveryMode(DeliveryMode.PERSISTENT);  
  5. } else {  
  6.       producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  
  7. }  
  8. if (timeToLive != 0) {  
  9.        producer.setTimeToLive(timeToLive);  
  10. }   

 

    1.5 發送消息到隊列(Queue)

Java代碼  收藏代碼

  1. //封裝TextMessage消息,使用MessageProducer的send方法將消息發送出去。  
  2. TextMessage message = session.createTextMessage("createMessageText");  
  3. producer.send(message);  

 

2. 消費者(consumer)開發流程

 

    2.1 實現MessageListener接口

Java代碼  收藏代碼

  1. //消費者類必須實現MessageListener接口,而後在onMessage方法中監聽消息到達處理。  

 

    2.2 建立Connection

Java代碼  收藏代碼

  1. //根據url,user和password建立一個jms Connection,若是是durable模式,還須要給connection設置一個clientId。  
  2. ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);  
  3. Connection connection = connectionFactory.createConnection();  
  4. if (durable && clientId != null && clientId.length() > 0 && !"null".equals(clientId)) {  
  5.       connection.setClientID(clientId);  
  6. }  
  7. connection.setExceptionListener(this);  
  8. connection.start();  

 

    2.3 建立Session和Destination

Java代碼  收藏代碼

  1. //與產品相似  

 

    2.4 建立replayProducer【可選】

Java代碼  收藏代碼

  1. //能夠用來將消息處理結果發送給producer。   
  2. replyProducer = session.createProducer(null);  
  3. replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);<span style="white-space: normal;"> </span>  

 

    2.5 建立MessageConsumer

Java代碼  收藏代碼

  1. //根據Destination建立MessageConsumer對象。  
  2. MessageConsumer consumer = null;  
  3. if (durable && topic) {  
  4.      consumer = session.createDurableSubscriber((Topic)destination, consumerName);  
  5. } else {  
  6.      consumer = session.createConsumer(destination);  
  7. }<span style="white-space: normal;"> </span>  

 

    2.6 消費message

Java代碼  收藏代碼

  1. //在onMessage()方法中接收producer發送過來的消息進行處理,並能夠經過replyProducer反饋信息給producer   
  2. if (message.getJMSReplyTo() != null) {   
  3.     replyProducer.send(message.getJMSReplyTo(),   
  4.     session.createTextMessage("Reply: " + message.getJMSMessageID()));   
  5. }    

 

 

Publish/Subscriber(發佈/訂閱者)消息開發模式

 

1. 訂閱者(Subscriber)開發流程

 

    1.1 實現MessageListener接口

  在onMessage()方法中監聽發佈者發出的消息隊列,並作相應處理。

 

    1.2 建立Connection

          根據url, user和password建立一個jms connection

 

    1.3 建立Session

  在connection的基礎上建立一個session,同時設置是否支持和ACKNOWLEDGE標誌。

 

    1.4 建立 Topic

          建立兩個Topic,topictest.message用於接收發布者發出的消息,topictest.control用於向發佈者發送消息,實現雙方的交互。

 

    1.5 建立consumer和producer對象

  根據topictest.message建立consumer,根據topictest.control建立producer

 

    1.6 接收處理消息

  在onMessage()方法中,對收到的消息進行處理,可直接簡單在本地顯示消息,或者根據消息內容不一樣處理對應的業務邏輯(好比:數據庫更新、文件操做等等),而且可使用   producer對象處理結果返回給發佈者。

 

 

2. 發佈者(Publisher)開發流程

 

     2.1 實現MessageListener接口 

   在onMessage()方法中接收訂閱者的反饋消息。

 

     2.2 建立Connection

   根據url, user和password 建立一個 jms Connection。

 

     2.3 建立session

   在connection的基礎上建立一個session,同時設置是否支持事務和ACKNOWLEDGE標誌。

 

     2.4 建立Topic

   建立兩個Topic,topictest.messages用於向訂閱者發佈消息,topictest.control用於接收訂閱者反饋的消息。這兩個Topic與訂閱者開發流程中的topic是一一對應的。

 

     2.5 建立consumer和producer對象

   根據topictest.message建立publisher;

   根據topictest.control穿件consumer,同時監聽訂閱者反饋的消息。

 

3. JMS For Spring

 

      Spring提供了用於簡化JMS API使用的抽象框架,而且對用戶屏蔽了JMS API中1.0.2和1.1版本的差別。

      JMS的功能大體上分爲兩塊,叫作消息製造和消息消耗。JmsTemplate用於製造消息和同步消息接收。和J2EE的事件驅動Bean風格相似,對於異步接收消息,Spring提供了一些消息監聽容器來建立消息驅動的POJO(MDP)。

 

1. Spring 配置 connectionFactory

Xml代碼  收藏代碼

  1. <!--客戶端使用普通傳輸方式:tcp://localhost:61616   
  2.     此處需加以注意的是Listener端的borkerURL使用了failover傳輸方式:   
  3.     failover:(tcp://localhost:61616)?initialReconnectDelay=100&amp;  
  4. maxReconnectAttempts=5   
  5.     failover transport是一種從新鏈接機制,用於創建可靠的傳輸。此處配置的是一旦ActiveMQ broker中斷,Listener端將每隔100ms自動嘗試鏈接,直至成功鏈接或重試5次鏈接失敗爲止。  
  6.     failover還支持多個borker同時提供服務,實現負載均衡的同時可增長系統容錯性,格式: failover:(uri1,...,uriN)?transportOptions  
  7. -->  
  8. <bean id="jmsFactory" destroy-method="stop" class="org.apache.activemq.pool.PooledConnectionFactory">  
  9.         <property name="connectionFactory">  
  10.             <bean class="org.apache.activemq.ActiveMQConnectionFactory">  
  11.                 <property name="brokerURL" value="tcp://localhost:61616" />  
  12.                  <property name="userName" value="${activemq.username}" />  
  13.                 <property name="password" value="${activemq.password}" />  
  14.             </bean>  
  15.         </property>  
  16. </bean>  

  

2. Spring JmsTemplate

Xml代碼  收藏代碼

  1. <!-- Spring JMS Template  
  2.         JmsTemplate 類的實例 一經配置即是線程安全 的。 要清楚一點,JmsTemplate   
  3.         是有狀態的,由於它維護了 ConnectionFactory 的引用,但這個狀態時不是會話狀態。  
  4. -->  
  5.     <bean id="myJmsTemplate"  
  6.         class="org.springframework.jms.core.JmsTemplate">  
  7.         <property name="connectionFactory" ref="jmsFactory" />  
  8.         <property name="defaultDestinationName" value="MySubject" />  
  9.         <!--JMS API有兩種發送方法,一種採用發送模式、  
  10. 優先級和存活時間做爲服務質量(QOS)參數,  
  11.             默認{deliveryMode:2(1),priority:4,timeToLive:0}  
  12.             另外一種使用無需QOS參數的缺省值方法。  
  13.             <property name="explicitQosEnabled" value="true"/>  
  14.             <property name="deliveryMode" value="2"/>  
  15.             <property name="priority" value="4"/>  
  16.                         <property name="timeToLive" value="1000"/>  
  17.          -->  
  18.         <!-- 
  19.             <property name="receiveTimeout" value="1000" /> 
  20.          -->  
  21.     </bean>  

  

3. 發送的接收消息

Xml代碼  收藏代碼

  1. <bean id="destinationTopic"  
  2.         class="org.apache.activemq.command.ActiveMQTopic">  
  3.         <constructor-arg index="0" value="HelloWorldTopic" />  
  4.     </bean>  
  5.     <!-- 讀取信息 -->  
  6.     <bean id="consumer" class="com.d1xn.jms.demo.Consumer">  
  7.         <property name="jmsTemplate" ref="myJmsTemplate" />  
  8.     </bean>  
  9.   
  10.     <!-- 發送信息 -->  
  11.     <bean id="producerTopic" class="com.d1xn.jms.demo.ProducerTopic">  
  12.         <property name="jmsTemplet" ref="myJmsTemplate" />  
  13.         <property name="destination" ref="destinationTopic" />  
  14.     </bean>  
  15.     <!-- 消息監聽 -->  
  16.   
  17.     <bean id="listenerContainerTopic"  
  18. class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
  19.         <property name="connectionFactory" ref="jmsFactory" />  
  20.         <property name="destination" ref="destinationTopic" />  
  21.         <property name="messageListener" ref="consumer" />  
  22.          <!—持久化客戶端ID -->  
  23.         <property name="clientId" value="clientId_001" />  
  24.         <property name="subscriptionDurable" value="true" />  
  25.         <property name="durableSubscriptionName" value="My_001" />  
  26. </bean>  

 

 

說明(基於ActiveMQ5.4.2版本):

        一、Web Console 的安全配置可參考      

 

    將conf/jetty.xml下面一段xml配置:

 

Xml代碼  收藏代碼

  1. <bean id="securityConstraint" class="org.eclipse.jetty.http.security.Constraint">  
  2.     <property name="name" value="BASIC" />                                         
  3.     <property name="roles" value="admin" />                                        
  4.     <property name="authenticate" value="true" />                                  
  5. </bean>                                                                            

 

   authenticate值設置爲true便可!那用戶名/密碼的配置是在conf/jetty-realm.properties!

   詳細可參考http://activemq.apache.org/web-console.html

 

二、鏈接安全配置可參考

     將conf/activemq-security.xml中以下的配置

Xml代碼  收藏代碼

  1. <plugins>                                                                                
  2.  <!-- Configure authentication; Username, passwords and groups -->                       
  3.  <simpleAuthenticationPlugin anonymousAccessAllowed="false">                             
  4.     <users>                                                                              
  5.         <authenticationUser username="system" password="${activemq.password}"            
  6.             groups="admins"/>                                                            
  7.         <!--<authenticationUser username="user" password="${guest.password}"             
  8.             groups="users"/>                                                             
  9.         <authenticationUser username="guest" password="${guest.password}" groups="guests"/>-->   
  10.     </users>                                                                             
  11.  </simpleAuthenticationPlugin>                                                           
  12. </plugins>                                                                               

            copy至conf/activemq.xml中

Xml代碼  收藏代碼

  1. <persistenceAdapter>                                    
  2.     <kahaDB directory="${activemq.base}/data/kahadb"/>  
  3. </persistenceAdapter>                                   

      的下面(這是簡單的用戶名、密碼的認證方式)!

   用戶名、密碼的可在conf/credentials.properties配置!

   詳細可參考:

     http://activemq.apache.org/security.html

     http://activemq.apache.org/xml-reference.html

相關文章
相關標籤/搜索