發佈訂閱模式,發佈者發佈消息到broker,全部訂閱者都會接收到相同消息的copy。javascript
p2p是生產者生成消息,通過broker-queue,只能有一個消費者處理。在p2p的場景裏,相互通訊的雙方是經過一個相似於隊列的方式來進行交流。和前面pub-sub的區別在於一個topic有一個發送者和多個接收者,而在p2p裏一個queue只有一個發送者和一個接收者。html
下圖爲queue處理過程,以及各類ack_type回覆的處理流程,這些ack_type配合ack_mode進行,由jms內部控制。java
1. 得到JMS connection factory. 經過咱們提供特定環境的鏈接信息來構造factory。mysql
2. 利用factory構造JMS connectionspring
3. 啓動connectionsql
4. 經過connection建立JMS session.docker
5. 指定JMS destination.數據庫
6. 建立JMS producer或者建立JMS message並提供destination.apache
7. 建立JMS consumer或註冊JMS message listener.緩存
8. 發送和接收JMS message.
9. 關閉全部JMS資源,包括connection, session, producer, consumer等。
和前面兩種方式比較起來,request-response的通訊方式很常見,可是不是默認提供的一種模式。在前面的兩種模式中都是一方負責發送消息而另一方負責處理。而咱們實際中的不少應用至關於一種一應一答的過程,須要雙方都能給對方發送消息。因而請求-應答的這種通訊方式也很重要。它也應用的很廣泛。
請求-應答方式並非JMS規範系統默認提供的一種通訊方式,而是經過在現有通訊方式的基礎上稍微運用一點技巧實現的。下圖是典型的請求-應答方式的交互過程:
在JMS裏面,若是要實現請求/應答的方式,能夠利用JMSReplyTo和JMSCorrelationID消息頭來將通訊的雙方關聯起來。另外,QueueRequestor和TopicRequestor可以支持簡單的請求/應答過程。
經過建立producer,而後調用send接口實現。
在spring框架中使用JMS傳遞消息有兩種方式:JMS template和message listener Container,前者用於同步收發消息,後者用於異步收發消息。
同步方式是建立consumer,經過session進行receive()方式接收消息。
異步方式是經過實現messageListener的onMessage()接口實現,在該接口中完成消息處理。
Session.AUTO_ACKNOWLEDGE(自動確認模式)
當消息成功的從receive方法返回時,或者從MessageListener接口的onMessage方法成功返回時,會話自動確認客戶端的消息接收。
自動確認,這就意味着消息的確認時機將有consumer擇機確認."擇機確認"彷佛充滿了不肯定性,這也意味着,開發者必須明確知道"擇機確認"的具體時機,不然將有可能致使消息的丟失,或者消息的重複接受.那麼在ActiveMQ中,AUTO_ACKNOWLEDGE是如何運做的呢?
1) 對於consumer而言,optimizeAcknowledge屬性只會在AUTO_ACK模式下有效。
2) 其中DUPS_ACKNOWLEGE也是一種潛在的AUTO_ACK,只是確認消息的條數和時間上有所不一樣。
3) 在「同步」(receive)方法返回message以前,會檢測optimizeACK選項是否開啓,若是沒有開啓,此單條消息將當即確認,因此在這種狀況下,message返回以後,若是開發者在處理message過程當中出現異常,會致使此消息也不會redelivery,即"潛在的消息丟失";若是開啓了optimizeACK,則會在unAck數量達到prefetch * 0.65時確認,固然咱們能夠指定prefetchSize = 1來實現逐條消息確認。
4) 在"異步"(messageListener)方式中,將會首先調用listener.onMessage(message),此後再ACK,若是onMessage方法異常,將致使client端補充發送一個ACK_TYPE爲REDELIVERED_ACK_TYPE確認指令;若是onMessage方法正常,消息將會正常確認(STANDARD_ACK_TYPE)。此外須要注意,消息的重發次數是有限制的,每條消息中都會包含「redeliveryCounter」計數器,用來表示此消息已經被重發的次數,若是重發次數達到閥值,將會致使發送一個ACK_TYPE爲POSION_ACK_TYPE確認指令,這就致使broker端認爲此消息沒法消費,此消息將會被刪除或者遷移到"dead letter"通道中。
所以當咱們使用messageListener方式消費消息時,一般建議在onMessage方法中使用try-catch,這樣能夠在處理消息出錯時記錄一些信息,而不是讓consumer不斷去重發消息;若是你沒有使用try-catch,就有可能會由於異常而致使消息重複接收的問題,須要注意你的onMessage方法中邏輯是否可以兼容對重複消息的判斷。
Session.CLIENT_ACKNOWLEDGE(客戶端確認模式)
客戶端經過調用消息的acknowledge方法簽收消息。在這種模式中,簽收是在會話層上進行:簽收一個已消費的消息會自動地簽收這個Session全部已消費消息的收條。
客戶端手動確認,這就意味着AcitveMQ將不會「自做主張」的爲你ACK任何消息,開發者須要本身擇機確認。在此模式下,開發者須要須要關注幾個方法:
1) message.acknowledge(),
2) ActiveMQMessageConsumer.acknowledege(),
3) ActiveMQSession.acknowledge();
其1)和3)是等效的,將當前session中全部consumer中還沒有ACK的消息都一塊兒確認,2)只會對當前consumer中那些還沒有確認的消息進行確認。開發者能夠在合適的時機必須調用一次上述方法。
咱們一般會在基於Group(消息分組)狀況下會使用CLIENT_ACKNOWLEDGE,咱們將在一個group的消息序列接受完畢以後確認消息(組);不過當你認爲消息很重要,只有當消息被正確處理以後才能確認時,也很可使用此ACK_MODE。
若是開發者忘記調用acknowledge方法,將會致使當consumer重啓後,會接受到重複消息,由於對於broker而言,那些還沒有真正ACK的消息被視爲「未消費」。
開發者能夠在當前消息處理成功以後,當即調用message.acknowledge()方法來"逐個"確認消息,這樣能夠儘量的減小因網絡故障而致使消息重發的個數;固然也能夠處理多條消息以後,間歇性的調用acknowledge方法來一次確認多條消息,減小ack的次數來提高consumer的效率,不過這仍然是一個利弊權衡的問題。
除了message.acknowledge()方法以外,ActiveMQMessageConumser.acknowledge()和ActiveMQSession.acknowledge()也能夠確認消息,只不過前者只會確認當前consumer中的消息。其中sesson.acknowledge()和message.acknowledge()是等效的。
不管是「同步」/「異步」,ActiveMQ都不會發送STANDARD_ACK_TYPE,直到message.acknowledge()調用。若是在client端未確認的消息個數達到prefetchSize * 0.5時,會補充發送一個ACK_TYPE爲DELIVERED_ACK_TYPE的確認指令,這會觸發broker端能夠繼續push消息到client端。(參看PrefetchSubscription.acknwoledge方法)
在broker端,針對每一個Consumer,都會保存一個由於"DELIVERED_ACK_TYPE"而「拖延」的消息個數,這個參數爲prefetchExtension,事實上這個值不會大於prefetchSize * 0.5,由於Consumer端會嚴格控制DELIVERED_ACK_TYPE指令發送的時機(參見ActiveMQMessageConsumer.ackLater方法),broker端經過「prefetchExtension」與prefetchSize互相配合,來決定即將push給client端的消息個數,count = prefetchExtension + prefetchSize - dispatched.size(),其中dispatched表示已經發送給client端可是尚未「STANDARD_ACK_TYPE」的消息總量;因而可知,在CLIENT_ACK模式下,足夠快速的調用acknowledge()方法是決定consumer端消費消息的速率;若是client端由於某種緣由致使acknowledge方法未被執行,將致使大量消息不能被確認,broker端將不會push消息,事實上client端將處於「假死」狀態,而沒法繼續消費消息。咱們要求client端在消費1.5*prefetchSize個消息以前,必須acknowledge()一次;一般咱們老是每消費一個消息調用一次,這是一種良好的設計。
此外須要額外的補充一下:全部ACK指令都是依次發送給broker端,在CLIET_ACK模式下,消息在交付給listener以前,都會首先建立一個DELIVERED_ACK_TYPE的ACK指令,直到client端未確認的消息達到"prefetchSize * 0.5"時纔會發送此ACK指令,若是在此以前,開發者調用了acknowledge()方法,會致使消息直接被確認(STANDARD_ACK_TYPE)。broker端一般會認爲「DELIVERED_ACK_TYPE」確認指令是一種「slow consumer」信號,若是consumer不能及時的對消息進行acknowledge而致使broker端阻塞,那麼此consumer將會被標記爲「slow」,此後queue中的消息將會轉發給其餘Consumer。
Session.DUPS_OK_ACKNOWLEDGE(延時/批量確認模式)
這種確認方式容許JMS沒必要急於確認收到的消息,容許在收到多個消息以後一次完成確認,與Auto_AcKnowledge相比,這種確認方式在某些狀況下可能更有效,由於沒有確認,當系統崩潰或者網絡出現故障的時候,消息能夠被從新傳遞.
這種方式會引發消息的重複,可是下降了Session的開銷,因此只有客戶端能容忍重複的消息,纔可以使用。(若是ActiveMQ再次傳送同一消息,那麼消息頭中的JMSRedelivered將被設置爲true)
"消息可重複"確認,意思是此模式下,可能會出現重複消息,並非一條消息須要發送屢次ACK才行。它是一種潛在的"AUTO_ACK"確認機制,爲批量確認而生,並且具備「延遲」確認的特色。對於開發者而言,這種模式下的代碼結構和AUTO_ACKNOWLEDGE同樣,不須要像CLIENT_ACKNOWLEDGE那樣調用acknowledge()方法來確認消息。
1) 在ActiveMQ中,若是在Destination是Queue通道,咱們真的能夠認爲DUPS_OK_ACK就是「AUTO_ACK + optimizeACK + (prefetch > 0)」這種狀況,在確認時機上幾乎徹底一致;此外在此模式下,若是prefetchSize =1 或者沒有開啓optimizeACK,也會致使消息逐條確認,從而失去批量確認的特性。
2) 若是Destination爲Topic,DUPS_OK_ACKNOWLEDGE纔會產生JMS規範中詮釋的意義,即不管optimizeACK是否開啓,都會在消費的消息個數>=prefetch * 0.5時,批量確認(STANDARD_ACK_TYPE),在此過程當中,不會發送DELIVERED_ACK_TYPE的確認指令,這是1)和AUTO_ACK的最大的區別。
這也意味着,當consumer故障重啓後,那些還沒有ACK的消息會從新發送過來。
當session使用事務時,就是使用此模式。在事務開啓以後,和session.commit()以前,全部消費的消息,要麼所有正常確認,要麼所有redelivery。這種嚴謹性,一般在基於GROUP(消息分組)或者其餘場景下特別適合。在SESSION_TRANSACTED模式下,optimizeACK並不能發揮任何效果,由於在此模式下,optimizeACK會被強制設定爲false,不過prefetch仍然能夠決定DELIVERED_ACK_TYPE的發送時機。
由於Session非線程安全,那麼當前session下全部的consumer都會共享同一個transactionContext;同時建議,一個事務類型的Session中只有一個Consumer,已避免rollback()或者commit()方法被多個consumer調用而形成的消息混亂。
當consumer接受到消息以後,首先檢測TransactionContext是否已經開啓,若是沒有,就會開啓並生成新的transactionId,並把信息發送給broker;此後將檢測事務中已經消費的消息個數是否 >= prefetch * 0.5,若是大於則補充發送一個「DELIVERED_ACK_TYPE」的確認指令;這時就開始調用onMessage()方法,若是是同步(receive),那麼即返回message。上述過程,和其餘確認模式沒有任何特殊的地方。
當開發者決定事務能夠提交時,必須調用session.commit()方法,commit方法將會致使當前session的事務中全部消息當即被確認;事務的確認過程當中,首先把本地的deliveredMessage隊列中還沒有確認的消息所有確認(STANDARD_ACK_TYPE);此後向broker發送transaction提交指令並等待broker反饋,若是broker端事務操做成功,那麼將會把本地deliveredMessage隊列清空,新的事務開始;若是broker端事務操做失敗(此時broker已經rollback),那麼對於session而言,將執行inner-rollback,這個rollback所作的事情,就是將當前事務中的消息清空並要求broker重發(REDELIVERED_ACK_TYPE),同時commit方法將拋出異常。
當session.commit方法異常時,對於開發者而言一般是調用session.rollback()回滾事務(事實上開發者不調用也沒有問題),固然你能夠在事務開始以後的任什麼時候機調用rollback(),rollback意味着當前事務的結束,事務中全部的消息都將被重發。須要注意,不管是inner-rollback仍是調用session.rollback()而致使消息重發,都會致使message.redeliveryCounter計數器增長,最終都會受限於brokerUrl中配置的"jms.redeliveryPolicy.maximumRedeliveries",若是rollback的次數過多,而達到重發次數的上限時,消息將會被DLQ(dead letter)。
單條消息確認,這種確認模式,咱們不多使用,它的確認時機和CLIENT_ACKNOWLEDGE幾乎同樣,當消息消費成功以後,須要調用message.acknowledege來確認此消息(單條),而CLIENT_ACKNOWLEDGE模式先message.acknowledge()方法將致使整個session中全部消息被確認(批量確認)。
消息重傳主要是針對事務性消息以及client_ack方式的進行,重傳觸發時機以下,
1)事務session,而且調用了rollback()方法;
2)事務session,關閉以前調用了commit;
3)非事務session中使用CLIENT_ACKNOWLEDGE簽收模式,而且調用了Session.recover()方法。
若是不是以上狀況,(未成功確認的消息)不會當即觸發消息重傳,會在下次客戶端啓動時,重傳。
一旦消息重發嘗試超太重發策略中配置的maximumRedeliveries(缺省爲6次)時,會給broker發送一個"Poison ack",通知它,這個消息被認爲是一個毒丸(a poison pill),接着broker會將這個消息發送到DLQ(Dead Letter Queue),以便後續分析處理。
經過recover重傳方式,當前線程會和消息綁定,當前線程掛起,消息不轉發給其餘線程,只有當該線程關閉,纔會釋放對該消息的持有。
經過拋異常的方式,也會重發從新投遞,但此時,只是將該消息看成一個新消息,從新負載分配到全部監聽。
具體參數以下,
屬性 | 默認值 | 說明 |
collisionAvoidanceFactor | 0.15 | 設置防止衝突範圍的正負百分比,只有啓用useCollisionAvoidance參數時才生效。 |
maximumRedeliveries | 6 | 最大重傳次數,達到最大重連次數後拋出異常。爲-1時不限制次數,爲0時表示不進行重傳。 |
maximumRedeliveryDelay | -1 | 最大傳送延遲,只在useExponentialBackOff爲true時有效(V5.5),假設首次重連間隔爲10ms,倍數爲2,那麼第二次重連時間間隔爲 20ms,第三次重連時間間隔爲40ms,當重連時間間隔大的最大重連時間間隔時,之後每次重連時間間隔都爲最大重連時間間隔。 |
initialRedeliveryDelay | 1000L | 初始重發延遲時間 |
redeliveryDelay | 1000L | 重發延遲時間,當initialRedeliveryDelay=0時生效(v5.4) |
useCollisionAvoidance | false | 啓用防止衝突功能,由於消息接收時是可使用多線程併發處理的,應該是爲了重發的安全性,避開全部併發線程都在同一個時間點進行消息接收處理。全部線程在同一個時間點處理時會發生什麼問題呢?應該沒有問題,只是爲了平衡broker處理性能,不會有時很忙,有時很空閒。 |
useExponentialBackOff | false | 啓用指數倍數遞增的方式增長延遲時間。 |
backOffMultiplier | 5 | 重連時間間隔遞增倍數,只有值大於1和啓用useExponentialBackOff參數時才生效。 |
爲了不意外宕機之後丟失信息,須要作到重啓後能夠恢復消息隊列,消息系統通常都會採用持久化機制。
ActiveMQ的消息持久化機制有JDBC,AMQ,KahaDB和LevelDB,不管使用哪一種持久化方式,消息的存儲邏輯都是一致的。
就是在發送者將消息發送出去後,消息中心首先將消息存儲到本地數據文件、內存數據庫或者遠程數據庫等,而後試圖將消息發送給接收者,發送成功則將消息從存儲中刪除,失敗則繼續嘗試。
消息中心啓動之後首先要檢查指定的存儲位置,若是有未發送成功的消息,則須要把消息發送出去。
使用JDBC持久化方式,數據庫會建立3個表:activemq_msgs,activemq_acks和activemq_lock。
activemq_msgs用於存儲消息,Queue和Topic都存儲在這個表中。
(1)配置方式
配置持久化的方式,都是修改安裝目錄下conf/acticvemq.xml文件,
首先定義一個mysql-ds的MySQL數據源,而後在persistenceAdapter節點中配置jdbcPersistenceAdapter而且引用剛纔定義的數據源。
<persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false" /> </persistenceAdapter>
dataSource指定持久化數據庫的bean,createTablesOnStartup是否在啓動的時候建立數據表,默認值是true,這樣每次啓動都會去建立數據表了,通常是第一次啓動的時候設置爲true,以後改爲false。
使用MySQL配置JDBC持久化:
<beans> <broker brokerName="test-broker" persistent="true" xmlns="http://activemq.apache.org/schema/core"> <persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false"/> </persistenceAdapter> </broker> <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> <property name="username" value="activemq"/> <property name="password" value="activemq"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean> </beans>
(2)數據庫表信息
activemq_msgs用於存儲消息,Queue和Topic都存儲在這個表中:
ID:自增的數據庫主鍵
CONTAINER:消息的Destination
MSGID_PROD:消息發送者客戶端的主鍵
MSG_SEQ:是發送消息的順序,MSGID_PROD+MSG_SEQ能夠組成JMS的MessageID
EXPIRATION:消息的過時時間,存儲的是從1970-01-01到如今的毫秒數
MSG:消息本體的Java序列化對象的二進制數據
PRIORITY:優先級,從0-9,數值越大優先級越高
activemq_acks用於存儲訂閱關係。若是是持久化Topic,訂閱者和服務器的訂閱關係在這個表保存:
主要的數據庫字段以下:
CONTAINER:消息的Destination
SUB_DEST:若是是使用Static集羣,這個字段會有集羣其餘系統的信息
CLIENT_ID:每一個訂閱者都必須有一個惟一的客戶端ID用以區分
SUB_NAME:訂閱者名稱
SELECTOR:選擇器,能夠選擇只消費知足條件的消息。條件能夠用自定義屬性實現,可支持多屬性AND和OR操做
LAST_ACKED_ID:記錄消費過的消息的ID。
表activemq_lock在集羣環境中才有用,只有一個Broker能夠得到消息,稱爲Master Broker,
其餘的只能做爲備份等待Master Broker不可用,纔可能成爲下一個Master Broker。
這個表用於記錄哪一個Broker是當前的Master Broker。
性能高於JDBC,寫入消息時,會將消息寫入日誌文件,因爲是順序追加寫,性能很高。爲了提高性能,建立消息主鍵索引,而且提供緩存機制,進一步提高性能。每一個日誌文件的大小都是有限制的(默認32m,可自行配置)。
當超過這個大小,系統會從新創建一個文件。當全部的消息都消費完成,系統會刪除這個文件或者歸檔(取決於配置)。
主要的缺點是AMQ Message會爲每個Destination建立一個索引,若是使用了大量的Queue,索引文件的大小會佔用不少磁盤空間。
並且因爲索引巨大,一旦Broker崩潰,重建索引的速度會很是慢。
配置片斷以下:
<persistenceAdapter> <amqPersistenceAdapter directory="${activemq.data}/activemq-data" maxFileLength="32mb"/> </persistenceAdapter>
然AMQ性能略高於下面的Kaha DB方式,可是因爲其重建索引時間過長,並且索引文件佔用磁盤空間過大,因此已經不推薦使用。
KahaDB是從ActiveMQ 5.4開始默認的持久化插件,也是咱們項目如今使用的持久化方式。
KahaDb恢復時間遠遠小於其前身AMQ而且使用更少的數據文件,因此能夠徹底代替AMQ。
kahaDB的持久化機制一樣是基於日誌文件,索引和緩存。
配置方式:
<persistenceAdapter> <kahaDB directory="${activemq.data}/activemq-data" journalMaxFileLength="16mb"/> </persistenceAdapter>
directory : 指定持久化消息的存儲目錄
journalMaxFileLength : 指定保存消息的日誌文件大小,具體根據你的實際應用配置
(1)KahaDB主要特性
一、日誌形式存儲消息;
二、消息索引以B-Tree結構存儲,能夠快速更新;
三、徹底支持JMS事務;
四、支持多種恢復機制;
(2)KahaDB的結構
消息存儲在基於文件的數據日誌中。若是消息發送成功,變標記爲可刪除的。系統會週期性的清除或者歸檔日誌文件。
消息文件的位置索引存儲在內存中,這樣能快速定位到。按期將內存中的消息索引保存到metadata store中,避免大量消息未發送時,消息索引佔用過多內存空間。
Data logs:
Data logs用於存儲消息日誌,消息的所有內容都在Data logs中。
同AMQ同樣,一個Data logs文件大小超過規定的最大值,會新建一個文件。一樣是文件尾部追加,寫入性能很快。
每一個消息在Data logs中有計數引用,因此當一個文件裏全部的消息都不須要了,系統會自動刪除文件或放入歸檔文件夾。
Metadata cache :
緩存用於存放在線消費者的消息。若是消費者已經快速的消費完成,那麼這些消息就不須要再寫入磁盤了。
Btree索引會根據MessageID建立索引,用於快速的查找消息。這個索引一樣維護持久化訂閱者與Destination的關係,以及每一個消費者消費消息的指針。
Metadata store
在db.data文件中保存消息日誌中消息的元數據,也是以B-Tree結構存儲的,定時從Metadata cache更新數據。Metadata store中也會備份一些在消息日誌中存在的信息,這樣可讓Broker實例快速啓動。
即使metadata store文件被破壞或者誤刪除了。broker能夠讀取Data logs恢復過來,只是速度會相對較慢些。
從ActiveMQ 5.6版本以後,又推出了LevelDB的持久化引擎。
目前默認的持久化方式仍然是KahaDB,不過LevelDB持久化性能高於KahaDB,多是之後的趨勢。
在ActiveMQ 5.9版本提供了基於LevelDB和Zookeeper的數據複製方式,用於Master-slave方式的首選數據複製方案。
結語:到目前爲止,咱們已經已經簡單的瞭解了ActiveMQ中消息傳送機制,還有JMS中ACK策略,重點分析了optimizeACK的策略,但願開發者可以在使用activeMQ中避免一些沒必要要的錯誤。本文若有疏漏和錯誤之處,請各位不吝賜教,特此感謝。在後續的文章中,會詳細介紹具體配置。
有時候咱們不但願消息立刻被broker投遞出去,而是想要消息60秒之後發給消費者,或者咱們想讓消息沒隔必定時間投遞一次,一共投遞指定的次數。。。
相似這種需求,ActiveMQ提供了一種broker端消息定時調度機制。
咱們只須要把幾個描述消息定時調度方式的參數做爲屬性添加到消息,broker端的調度器就會按照咱們想要的行爲去處理消息。
首先開啓schedulerSupport爲true,在activemq.xml文件添加
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">
一共有四個屬性:
Property name | type | description |
---|---|---|
AMQ_SCHEDULED_DELAY | long | 延遲投遞的時間 |
AMQ_SCHEDULED_PERIOD | long | 重複投遞的時間間隔 |
AMQ_SCHEDULED_REPEAT | int | 重複投遞次數 |
AMQ_SCHEDULED_CRON | String | Cron表達式 |
固然ActiveMQ也提供了一個封裝的消息類型:org.apache.activemq.ScheduledMessage.
使用示例,延遲60秒:
MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage("test msg"); long time = 60 * 1000; message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); producer.send(message);
延遲30秒,投遞10次,間隔10秒:
MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage("test msg"); long delay = 30 * 1000; long period = 10 * 1000; int repeat = 9; message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period); message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat); producer.send(message);
使用 CRON 表達式的例子:
MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage("test msg"); message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *"); producer.send(message);
CRON表達式的優先級高於另外三個參數,若是在設置了CRON的同時,也有repeat和period參數,則會在每次CRON執行的時候,重複投遞repeat次,每次間隔爲period。就是說設置是疊加的效果。例如每小時都會發生消息被投遞10次,延遲1秒開始,每次間隔1秒:
MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage("test msg"); message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *"); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 1000); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1000); message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 9); producer.send(message);
stomp方式,只須要在head中添加屬性便可,
activeMQ.publish(exports.signQueue,msg,{'persistent':true,'AMQ_SCHEDULED_DELAY':60000});
參考文章:
http://blog.csdn.net/czp11210/article/details/47022639
http://activemq.apache.org/redelivery-policy.html
http://www.myexception.cn/internet/1252460.html