ActiveMQ相關API

1、Producer

1,發送消息

MessageProducer數據庫

  send(Message message)發送消息到默認目的地,就是建立Producer時指定的目的地。session

  send(Destination destination, Message message)發送消息到指定目的地。Producer不建議綁定目的地,就是建立Producer的時候,不綁定目的地,session.createProducer(null);fetch

  send(Message message, int deliveryMode, int priority, long timeToLive);發送消息到默認的目的地,且設置相關參數。deliveryMode:持久化方式(DeliveryMode.PERSISTENT,DeliveryMode.NON_PERSISTENT),priority:優先級,timeToLive:消息有效期(單位毫秒)。spa

  send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive),發送消息到指定目的地,且設置相關參數。日誌

2,消息有效期

  消息過時後,默認會將失效消息保存到「死信隊列(ActiveMQ.DLQ)」。xml

  不持久化的消息,在超時後直接丟棄,不會保存到死信隊列中。排序

  死信隊列名稱可配置(activemq.xml),死信隊列中的消息不能恢復。索引

3,消息優先級

  咱們在發送消息時,指定消息的權重,broker能夠建議權重較高的消息優先發送給Consumer,在某些場景下,咱們一般但願權重較高的消息優先傳遞,不過由於各類緣由,priority並不能決定消息傳遞的嚴格順序。隊列

  JMS標準中約定priority能夠爲0-9的整數數值,值越大表明權重越高,默認爲4,不過ActiveMQ中各個存儲器對priority的支持並不是徹底同樣,好比JDBC存儲器能夠支持0-9,由於JDBC存儲器能夠基於priority對消息進行排序和索引化,但對於kahadb/leveldb等這種基於日誌文件的存儲器而言,pritoity支持相對較弱,只能識別三種優先級(LOW:<4,NORMAL:=4,HIGH:>4)。內存

開啓priority

  在broker 端,默認是不存儲priority 信息的,咱們須要手動開啓,修改activemq.xml 配置文件,在broker 標籤的子標籤policyEntries 中增長下述配置:

  <policyEntry queue=">" prioritizedMessages="true"/>

  不過對於「非持久化」類型的消息(若是沒有被swap 到臨時文件),它們被保存在內存中,它們不存在從文件Paged in 到內存的過程,由於能夠保證優先級較高的消息,老是在prefetch的時候被優先獲取,這也是「非持久化」消息能夠擔保消息發送順序的優勢。

  Broker 在收到Producer 的消息以後,將會把消息cache 到內存,若是消息須要持久化,那麼同時也會把消息寫入文件;若是通道中Consumer 的消費速度足夠快(即積壓的消息不多,還沒有超過內存限制,咱們經過上文可以知道,每一個通道均可以有必定的內存用來cache消息),那麼消息幾乎不須要從存儲文件中Paged In,直接就能從內存的cache 中獲取便可,這種狀況下,priority 能夠擔保「全局順序」;不過,若是消費者滯後太多,cache 已滿,就會觸發新接收的消息直接保存在磁盤中,那麼此時,priority 就沒有那麼有效了。

  在Queue 中,prefetch 的消息列表默認將會採用「輪詢」的方式(roundRobin,注意並非roundRobinDispatch)[備註:由於Queue 不支持任何DispatchPolicy],依次添加到每一個consumer的pending buffer 中, 好比有m1-m2-m3-m4 四條消息, 有C1-C2 兩個消費者, 那麼:m1->C1,m2->C2,m3->C1,m4->C2。這種輪序方式,會對基於權重的消息發送有些額外的影響,假如四條消息的權重都不一樣,可是(m1,m3)->C1,事實上m2 的權重>m3,對於C1 而言,它彷佛丟失了「順序性」。

強順序

  <policyEntry queue=">" strictOrderDispatch="true"/>

  strictOrderDispatch「嚴格順序轉發」,這是區別於「輪詢」的一種消息轉發手段;不過不要誤解它爲「全局嚴格順序」,它只不過是將prefetch 的消息依次填滿每一個consumer 的pendingbuffer 。好比上述例子中, 若是C1-C2 兩個消費者的buffer 尺寸爲3 , 那麼(m1,m2,m3)->C1,(m4)->C2;當C1 填充完畢以後,纔會填充C2。由此這種策略能夠保證buffer中全部的消息都是「權重臨近的」、有序的。(須要注意:strictOrderDispatch 並不是是解決priority消息順序的問題而生,只是在使用priority 時須要關注它)。

嚴格順序

  <policyEntry queue=">" prioritizedMessages="true" useCache="false" expireMessagesPeriod="0" queuePrefetch="1"/>

  useCache=false 來關閉內存,強制將全部的消息都當即寫入文件(索引化,可是會下降消息的轉發效率);queuePrefetch=1 來約束每一個consumer 任什麼時候刻只有一個消息正在處理,那些消息消費以後,將會從文件中從新獲取,這大大增長了消息文件操做的次數,不過每次讀取確定都是priority 最高的消息。

2、Consumer

1,消息的確認

  Consumer拉取消息後,若是沒有作確認acknowledge,此消息不會從MQ中刪除。

  消息若是拉取到consumer後未確認,那麼消息會被鎖定,若是consumer關閉的時候仍舊沒有確認消息,則釋放消息的鎖定信息。消息將發送給其餘consumer處理。

  消息一旦處理,應該必須確認,相似數據庫中的事物機制。

2,消息的過濾

  對消息消費者處理的消息數據進行過濾,這種處理能夠明確消費者的角色,細分消費者的功能。

設置過濾:

  Session.createConsumer(Destination destination, String messageSelector);

  過濾信息爲字符串,語法相似SQL92中的where子句條件信息。可使用諸如AND、IN、OR、NOT IN等關鍵字。

  注意,消息的生產者在發送消息的時候,必須設置可過濾的屬性信息,全部過濾屬性設置方法格式爲:setXxxxProperty(String name, T value),其中方法名中Xxx是類型,如setObjectProperty/setStringProperty。

相關文章
相關標籤/搜索