ActiveMQ + Spring 持久化並註冊永久訂閱者

聲明:如下內容來自網絡整理和本身的測試調整。java

感謝:http://greemranqq.iteye.com/blog/2167158 和 http://www.mytju.com/classcode/news_readNews.asp?newsID=486mysql

ActiveMQ 提供了2種方式的消息通訊機制:點對點發佈/訂閱 模式。
(1).使用點對點(Queue,即隊列)時,每一個消息只有一個消費者,因此,持久化很簡單,只要保存到數據庫便可。而後,隨便一個消費者取走處理便可。某個消費者關掉一陣子,也無所謂。
(2).使用發佈/訂閱(Topic,即訂閱)時,每一個消息能夠有多個消費者,就麻煩一些。spring

首先,假設Topic消費者都是普通的消費者:
-------------------------------------------------------------------sql

  • activemq啓動後,發佈消息1,惋惜,如今沒有消費者啓動着,也就是沒有消費者進行了訂閱。那麼,這個消息就被拋棄了。
  • 消費者1啓動了,鏈接了activemq,進行了訂閱,在等待消息~~,activemq發佈消息2,OK,消費者1收到,並進行處理。消息拋棄。
  • 消費者2也啓動了,鏈接了activemq,進行了訂閱,在等待消息~~,activemq發佈消息3,OK,消費者1,消費者2都收到,並進行處理。消息拋棄。
  • 消費者1關掉了。activemq發佈消息4,OK,消費者2收到,並進行處理。消息拋棄。
  • 消費者1又啓動了。activemq發佈消息5,OK,消費者1,消費者2都收到,並進行處理。消息拋棄。

---------------------------------------------------------------------
總結一下:
activemq只是向當前啓動的消費者發送消息。關掉的消費者,會錯過不少消息,並沒有法再次接收這些消息。
若是發送的消息是重要的用戶同步數據,錯過了,用戶數據就不一樣步了。那麼,如何讓消費者從新啓動時,接收到錯過的消息呢?數據庫

答案是:持久訂閱apache

  • 普通的訂閱,不區分消費者,場地裏有幾我的頭,就扔幾個饅頭。
  • 持久訂閱,就要記錄消費者的名字了。

張三說,我是張三,有饅頭給我留着,我回來拿。
李四說,我是李四,有饅頭給我留着,我回來拿。
activemq 就記下張三,李四兩個名字。緩存

那麼,分饅頭時,仍是一我的頭給一個饅頭。
分完了,一看張三沒說話,說明他不在,給他留一個。
李四說話了,那就不用留了。服務器

張三回來了,找activemq,一看,這不張三吧,快把他的饅頭拿來。
多是一個饅頭,也多是100個饅頭,就看張三離開這陣子,分了多少次饅頭了。網絡

activemq區分消費者,是經過 clientId durableSubscriptionName(訂戶名稱) 來區分的。session

要實現 發佈/訂閱模式 的 永久消費者訂閱,首先必須先進行發佈消息的持久化。下面採用MySQL數據庫做爲持久化存儲方式。

1. 修改ActiveMQ服務的持久化配置( conf/activemq.xml ):

<!-- 注意:須要將 mysql-connector-java-version.jar 和 druid-version.jar 包 放到 avtivemq的 lib 目錄下 -->
<bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close">  
	<property name="driverClassName" value="com.mysql.jdbc.Driver" />  
	<property name="url" value="jdbc:mysql://127.0.0.1:3306/activemq_db" />  
	<property name="username" value="root" />  
	<property name="password" value="123456" />    
	<property name="initialSize" value="2" />    
	<property name="minIdle" value="2" />     
	<property name="maxActive" value="50" />    
</bean>  

<persistenceAdapter>
	<!-- 注意:createTablesOnStartup 在第一次啓動時要設置爲 true,後面修改成 false -->
	<jdbcPersistenceAdapter dataDirectory="${activemq.data}/mysql" dataSource="#mysql-ds" createTablesOnStartup="false" useDatabaseLock="true"/>
</persistenceAdapter>

2. 修改 spring-jms.xml 的配置(deliveryPersistent 或 deliveryMode):

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.2.xsd">
		
  <!-- jms 鏈接工廠 -->
  <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://localhost:61616?jms.useAsyncSend=true" />
  </bean>
	
  <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory -->
    <property name="targetConnectionFactory" ref="connectionFactory"/>
    <!-- Session緩存數量 -->
    <property name="sessionCacheSize" value="100" />
  </bean>
	
  <!-- 基本的bean模板 -->
  <bean id = "jmsTemplate" class = "org.springframework.jms.core.JmsTemplate">
    <!-- 連接工長 -->
    <property name="connectionFactory" ref="cachingConnectionFactory"/>
    <!-- 進行持久化 -->
		<property name="deliveryPersistent" value="true" />
		<!-- 等同於上面 deliveryPersistent = true -->
    <!-- <property name="deliveryMode" value="2" /> -->
     <!--訂閱 發佈模式 -->
    <property name="pubSubDomain" value="true" />
  </bean>
	
  <!-- 消息訂閱模式 -->
  <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
    <!-- 訂閱消息的名字 -->
    <constructor-arg index="0" value="orderTopic"/>
  </bean>
	
</beans>

3. 想一想咱們訂閱者須要作些什麼呢?

發佈者發佈消息,訂閱者去消費,這是1對多的形式,咱們能夠這樣理解:公司設定不少活動代金卷,去參加活動的人都能領取,固然這分兩種狀況,第一種 就是咱們前面測試的,只要我公司門口等(監聽),活動開始(發佈)就能領取了,若是你當時沒在,就領取不到。第二種:不少狀況下,公司搞活動咱們不會等在那裏,只要活動開始了,那麼我過段時間也能夠去,禮品公司會保留的,這種狀況會致使屢次領取,所以總要登記一下嘛,不能你領取了,過一會又來吧?activemq 裏面會有 clientId 標示來區分,相似於身份證ID嘛。固然有些狀況下, 咱們一個ID 能夠領取多個不一樣的獎品,所以還得須要個字段標示:durableSubscriptionName,標示咱們領取哪一個禮品,下面先看配置:

<beans xmlns="http://www.springframework.org/schema/beans"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.2.xsd">
		
  <!-- jms 鏈接工廠 -->
  <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
    <!-- Jvm 內部傳輸,暫時不用TCP,使用暫時異步傳輸  -->
    <property name="brokerURL" value="tcp://localhost:61616?jms.useAsyncSend=true" />
  </bean>
	
  <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory -->
    <property name="targetConnectionFactory" ref="connectionFactory"/>
    <!-- 接收者ID -->
    <property name="clientId" value="Client-A" />
  </bean>
	
  <!-- 消息訂閱模式 -->
  <bean id="topicCustomerA" class="org.apache.activemq.command.ActiveMQTopic">
    <!-- 訂閱消息的名字 -->
    <constructor-arg index="0" value="orderTopic"/>
  </bean>
	
  <!-- 消息監聽,這裏能夠認爲是A服務器的監聽 -->
  <bean id="messageListener" class="com.xxx.ConsumerMessageListener"/>
	
  <bean id="listenerContainerA" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="destination" ref="topicCustomerA" />
    <property name="messageListener" ref="messageListener" />
    <!-- 持久化消息 -->
    <property name="subscriptionDurable" value="true"/>
    <!-- 接收者ID -->
    <property name="clientId" value="Client-A" />
    <!-- 這裏名字能夠任意改變,A 領取了,你能夠改爲B 還能夠領取,能夠舉例不是很恰當 -->
    <property name="durableSubscriptionName" value="clientA"/>
  </bean>
	
</beans>
// 監聽代碼 直接輸出
public class ConsumerMessageListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        System.out.println("topic 收到消息:"+ ((TextMessage)message).getText());
    }
}

4. 測試

採用持久化方式,模擬測試以下:

1. 啓動兩個ConsumerA,ConsumerB 監聽,發佈一個topic消息 ,同時收到消息,OK
2. 啓動一個ConsumerA,發佈一個topic消息,再啓動ConsumerB,也收到消息,OK
3. 啓動一個ConsumerA,發佈一個topic消息,A收到,關閉mq服務器,重啓mq服務,重啓 ConsumerB,一樣收到消息,OK。
4. 啓動兩個ConsumerA,ConsumerB,發佈topic消息,A、B收到消息,重啓A、B  不收重複消息,OK    

那麼新問題是:
1. 若是A、B 收到消息後,topic  的消息怎麼處理呢? 一直保存着嗎? 若是能夠清除,怎麼清除,何時進行清除呢?

最後附上我本身測試經過的完整的 spring-jms.xml配置(在註冊消費者等地方略有變化,採用<jms:listener-container>等替代):

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
	xmlns:jms="http://www.springframework.org/schema/jms"
	xsi:schemaLocation="http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd   
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">

	<!-- ActiveMQ 鏈接工廠 -->
 	<!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供-->
	<!-- 若是鏈接網絡:tcp://ip:61616;未鏈接網絡:tcp://localhost:61616 以及用戶名,密碼-->
	<amq:connectionFactory id="amqConnectionFactory" brokerURL="failover(tcp://127.0.0.1:61616)" userName="admin" password="admin"  />

	<!-- Spring Caching鏈接工廠 -->
 	<!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->  
	<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
		<!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory -->  
  		<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
  		<!-- 同上,同理 -->
		<!-- <constructor-arg ref="amqConnectionFactory" /> -->
		<!-- Session緩存數量 -->
		<property name="sessionCacheSize" value="100" />
		<!-- 接收者ID,用於Topic訂閱者的永久訂閱 -->
		<property name="clientId" value="Client-A" />
	</bean>
	
	<!-- Spring JmsTemplate 的消息生產者-->

	<!-- 定義JmsTemplate的Topic類型 -->
	<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
		 <!-- 這個connectionFactory對應的是咱們定義的Spring提供的那個ConnectionFactory對象 -->  
		<constructor-arg ref="connectionFactory" />
		<!-- pub/sub模型(發佈/訂閱) -->
		<property name="pubSubDomain" value="true" />
		<!-- 訂閱消息持久化 -->
		<property name="deliveryPersistent" value="true" />
		<!-- 配置持久化,同上 deliveryPersistent
		<property name="deliveryMode" value="2" />
		-->
	</bean>

	
	<!-- 消息消費者 -->
	
	<!-- 定義Topic監聽器 -->
	<!-- 注意:定義 client-id = Client-A -->
	<jms:listener-container destination-type="durableTopic" container-type="default" connection-factory="connectionFactory" acknowledge="auto" client-id="Client-A">
		<!-- 注意:定義 subscription(即:durableSubscriptionName) -->
		<jms:listener destination="test.topic" subscription="topic_receiver1" ref="topicReceiver1"/>
		<jms:listener destination="test.topic" subscription="topic_receiver2" ref="topicReceiver2"/>
	</jms:listener-container>
	
</beans>
// Topic訂閱者1
@Component("topicReceiver1")
public class TopicReceiver1 implements MessageListener 
{
	@Override
	public void onMessage(Message message) 
	{
		try {
			System.out.println("TopicReceiver1 收到消息:"+ ((TextMessage) message).getText());
		} catch(JMSException e) {
			// todo
			System.out.println("TopicReceiver1訂閱失敗!" + e.getMessage());
			e.printStackTrace();
		}
	}
}
// Topic訂閱者2
@Component("topicReceiver2")
public class TopicReceiver2 implements MessageListener 
{
	@Override
	public void onMessage(Message message) 
	{
		try {
			System.out.println("TopicReceiver2 收到消息:"+ ((TextMessage) message).getText());
		} catch(JMSException e) {
			// todo
			System.out.println("TopicReceiver2訂閱失敗!" + e.getMessage());
			e.printStackTrace();
		}
	}
}
相關文章
相關標籤/搜索