聲明:如下內容來自網絡整理和本身的測試調整。java
感謝:http://greemranqq.iteye.com/blog/2167158 和 http://www.mytju.com/classcode/news_readNews.asp?newsID=486mysql
ActiveMQ 提供了2種方式的消息通訊機制:點對點 和 發佈/訂閱 模式。
(1).使用點對點(Queue,即隊列)時,每一個消息只有一個消費者,因此,持久化很簡單,只要保存到數據庫便可。而後,隨便一個消費者取走處理便可。某個消費者關掉一陣子,也無所謂。
(2).使用發佈/訂閱(Topic,即訂閱)時,每一個消息能夠有多個消費者,就麻煩一些。spring
首先,假設Topic消費者都是普通的消費者:
-------------------------------------------------------------------sql
---------------------------------------------------------------------
總結一下:
activemq只是向當前啓動的消費者發送消息。關掉的消費者,會錯過不少消息,並沒有法再次接收這些消息。
若是發送的消息是重要的用戶同步數據,錯過了,用戶數據就不一樣步了。那麼,如何讓消費者從新啓動時,接收到錯過的消息呢?數據庫
答案是:持久訂閱。apache
張三說,我是張三,有饅頭給我留着,我回來拿。
李四說,我是李四,有饅頭給我留着,我回來拿。
activemq 就記下張三,李四兩個名字。緩存
那麼,分饅頭時,仍是一我的頭給一個饅頭。
分完了,一看張三沒說話,說明他不在,給他留一個。
李四說話了,那就不用留了。服務器
張三回來了,找activemq,一看,這不張三吧,快把他的饅頭拿來。
多是一個饅頭,也多是100個饅頭,就看張三離開這陣子,分了多少次饅頭了。網絡
activemq區分消費者,是經過 clientId 和 durableSubscriptionName(訂戶名稱) 來區分的。session
要實現 發佈/訂閱模式 的 永久消費者訂閱,首先必須先進行發佈消息的持久化。下面採用MySQL數據庫做爲持久化存儲方式。
1. 修改ActiveMQ服務的持久化配置( conf/activemq.xml ):
<!-- 注意:須要將 mysql-connector-java-version.jar 和 druid-version.jar 包 放到 avtivemq的 lib 目錄下 --> <bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver" /> <property name="url" value="jdbc:mysql://127.0.0.1:3306/activemq_db" /> <property name="username" value="root" /> <property name="password" value="123456" /> <property name="initialSize" value="2" /> <property name="minIdle" value="2" /> <property name="maxActive" value="50" /> </bean> <persistenceAdapter> <!-- 注意:createTablesOnStartup 在第一次啓動時要設置爲 true,後面修改成 false --> <jdbcPersistenceAdapter dataDirectory="${activemq.data}/mysql" dataSource="#mysql-ds" createTablesOnStartup="false" useDatabaseLock="true"/> </persistenceAdapter>
2. 修改 spring-jms.xml 的配置(deliveryPersistent 或 deliveryMode):
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd"> <!-- jms 鏈接工廠 --> <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616?jms.useAsyncSend=true" /> </bean> <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="connectionFactory"/> <!-- Session緩存數量 --> <property name="sessionCacheSize" value="100" /> </bean> <!-- 基本的bean模板 --> <bean id = "jmsTemplate" class = "org.springframework.jms.core.JmsTemplate"> <!-- 連接工長 --> <property name="connectionFactory" ref="cachingConnectionFactory"/> <!-- 進行持久化 --> <property name="deliveryPersistent" value="true" /> <!-- 等同於上面 deliveryPersistent = true --> <!-- <property name="deliveryMode" value="2" /> --> <!--訂閱 發佈模式 --> <property name="pubSubDomain" value="true" /> </bean> <!-- 消息訂閱模式 --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <!-- 訂閱消息的名字 --> <constructor-arg index="0" value="orderTopic"/> </bean> </beans>
3. 想一想咱們訂閱者須要作些什麼呢?
發佈者發佈消息,訂閱者去消費,這是1對多的形式,咱們能夠這樣理解:公司設定不少活動代金卷,去參加活動的人都能領取,固然這分兩種狀況,第一種 就是咱們前面測試的,只要我公司門口等(監聽),活動開始(發佈)就能領取了,若是你當時沒在,就領取不到。第二種:不少狀況下,公司搞活動咱們不會等在那裏,只要活動開始了,那麼我過段時間也能夠去,禮品公司會保留的,這種狀況會致使屢次領取,所以總要登記一下嘛,不能你領取了,過一會又來吧?activemq 裏面會有 clientId 標示來區分,相似於身份證ID嘛。固然有些狀況下, 咱們一個ID 能夠領取多個不一樣的獎品,所以還得須要個字段標示:durableSubscriptionName,標示咱們領取哪一個禮品,下面先看配置:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd"> <!-- jms 鏈接工廠 --> <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"> <!-- Jvm 內部傳輸,暫時不用TCP,使用暫時異步傳輸 --> <property name="brokerURL" value="tcp://localhost:61616?jms.useAsyncSend=true" /> </bean> <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="connectionFactory"/> <!-- 接收者ID --> <property name="clientId" value="Client-A" /> </bean> <!-- 消息訂閱模式 --> <bean id="topicCustomerA" class="org.apache.activemq.command.ActiveMQTopic"> <!-- 訂閱消息的名字 --> <constructor-arg index="0" value="orderTopic"/> </bean> <!-- 消息監聽,這裏能夠認爲是A服務器的監聽 --> <bean id="messageListener" class="com.xxx.ConsumerMessageListener"/> <bean id="listenerContainerA" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="topicCustomerA" /> <property name="messageListener" ref="messageListener" /> <!-- 持久化消息 --> <property name="subscriptionDurable" value="true"/> <!-- 接收者ID --> <property name="clientId" value="Client-A" /> <!-- 這裏名字能夠任意改變,A 領取了,你能夠改爲B 還能夠領取,能夠舉例不是很恰當 --> <property name="durableSubscriptionName" value="clientA"/> </bean> </beans>
// 監聽代碼 直接輸出 public class ConsumerMessageListener implements MessageListener { @Override public void onMessage(Message message) { System.out.println("topic 收到消息:"+ ((TextMessage)message).getText()); } }
4. 測試
採用持久化方式,模擬測試以下:
1. 啓動兩個ConsumerA,ConsumerB 監聽,發佈一個topic消息 ,同時收到消息,OK
2. 啓動一個ConsumerA,發佈一個topic消息,再啓動ConsumerB,也收到消息,OK
3. 啓動一個ConsumerA,發佈一個topic消息,A收到,關閉mq服務器,重啓mq服務,重啓 ConsumerB,一樣收到消息,OK。
4. 啓動兩個ConsumerA,ConsumerB,發佈topic消息,A、B收到消息,重啓A、B 不收重複消息,OK
那麼新問題是:
1. 若是A、B 收到消息後,topic 的消息怎麼處理呢? 一直保存着嗎? 若是能夠清除,怎麼清除,何時進行清除呢?
最後附上我本身測試經過的完整的 spring-jms.xml配置(在註冊消費者等地方略有變化,採用<jms:listener-container>等替代):
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd"> <!-- ActiveMQ 鏈接工廠 --> <!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供--> <!-- 若是鏈接網絡:tcp://ip:61616;未鏈接網絡:tcp://localhost:61616 以及用戶名,密碼--> <amq:connectionFactory id="amqConnectionFactory" brokerURL="failover(tcp://127.0.0.1:61616)" userName="admin" password="admin" /> <!-- Spring Caching鏈接工廠 --> <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> <!-- 同上,同理 --> <!-- <constructor-arg ref="amqConnectionFactory" /> --> <!-- Session緩存數量 --> <property name="sessionCacheSize" value="100" /> <!-- 接收者ID,用於Topic訂閱者的永久訂閱 --> <property name="clientId" value="Client-A" /> </bean> <!-- Spring JmsTemplate 的消息生產者--> <!-- 定義JmsTemplate的Topic類型 --> <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 這個connectionFactory對應的是咱們定義的Spring提供的那個ConnectionFactory對象 --> <constructor-arg ref="connectionFactory" /> <!-- pub/sub模型(發佈/訂閱) --> <property name="pubSubDomain" value="true" /> <!-- 訂閱消息持久化 --> <property name="deliveryPersistent" value="true" /> <!-- 配置持久化,同上 deliveryPersistent <property name="deliveryMode" value="2" /> --> </bean> <!-- 消息消費者 --> <!-- 定義Topic監聽器 --> <!-- 注意:定義 client-id = Client-A --> <jms:listener-container destination-type="durableTopic" container-type="default" connection-factory="connectionFactory" acknowledge="auto" client-id="Client-A"> <!-- 注意:定義 subscription(即:durableSubscriptionName) --> <jms:listener destination="test.topic" subscription="topic_receiver1" ref="topicReceiver1"/> <jms:listener destination="test.topic" subscription="topic_receiver2" ref="topicReceiver2"/> </jms:listener-container> </beans>
// Topic訂閱者1 @Component("topicReceiver1") public class TopicReceiver1 implements MessageListener { @Override public void onMessage(Message message) { try { System.out.println("TopicReceiver1 收到消息:"+ ((TextMessage) message).getText()); } catch(JMSException e) { // todo System.out.println("TopicReceiver1訂閱失敗!" + e.getMessage()); e.printStackTrace(); } } }
// Topic訂閱者2 @Component("topicReceiver2") public class TopicReceiver2 implements MessageListener { @Override public void onMessage(Message message) { try { System.out.println("TopicReceiver2 收到消息:"+ ((TextMessage) message).getText()); } catch(JMSException e) { // todo System.out.println("TopicReceiver2訂閱失敗!" + e.getMessage()); e.printStackTrace(); } } }