ActiveMQ學習筆記06 - 消費者負載均衡與高可用

ActiveMQ Broker提供基於LevelDB複製的方式提供高可用服務,可是對負載均衡作的很弱,只支持Static的服務器之間轉發。目前比較流行的消息分片居然不支持。可是消費者的負載均衡和高可用仍是比較完善的。另外說一下,生產者的高可用和負載均衡,通常是靠外圍程序控制。好比,基於Tomcat的web程序做爲生產者,那麼這個web程序的高可用,須要靠tomcat等外圍程序。因此通常所說的高可用,主要指Broker和Consumer。java

下面介紹一下幾個經常使用的消費者策略。web

Exclusive Consumer:apache

用於處理Queue的高可用。若是同時使用多個消費者從同一個Queue消費消息,那麼消息的順序性將得不到保證。這時候可使用Exclusive Consumer。使用Exclusive Consumer能夠保證只有一個消費者在消費這個Queue,其餘的消費者處於等待狀態。一旦處理消費狀態的消費者不可用,系統會自動使用失效轉移機制,選擇到一個新的消費者繼續消費。
tomcat

使用以下:服務器

queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");

只須要在Destination上增長consumer.exclusive=true參數便可。網絡

還能夠給消費者設置優先級,用於針對網絡和服務器資源不一樣的狀況。session

queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true&consumer.priority=10");

Message Groups:負載均衡

ActiveMQ 5.3版本新增長了這個功能。Message Groups能夠看作Exclusive Consumer的升級版,是一個能夠並行的Exclusive Consumer。原理是經過使用JMSXGroupID來定義消息組。擁有相同的JMSXGroupID的消息將發送到同一個Queue,這個技術相似於Sesssion Sticky技術。這樣既能夠保證消費者的高可用(由於能夠有多個消費者消費同一隊列),又能夠保證消息按順序消費,還不會像Exclusive Consumer同樣浪費資源(須要額外的處於等待狀態的消費者待命)。若是消費者被關閉或者消息組被關閉,這個擁有JMSXGroupID的消息會自動被髮送到其餘消費者。spa

設置JMSXGroupID的例子以下:code

Mesasge message = session.createTextMessage("foo");   
message.setStringProperty("JMSXGroupID", "your business key");   
...
producer.send(message);

關閉消息組,經過設置JMSXGroupSeq的值爲-1,例子以下:

Mesasge message = session.createTextMessage("foo");
message.setStringProperty("JMSXGroupID", "your business key");
message.setIntProperty("JMSXGroupSeq", -1);
// ...
producer.send(message);

可使用JMSXGroupFirstForConsumer來判斷這個消費者是不是第一次消費這個JMSXGroupID的消息:

if (message.getBooleanProperty("JMSXGroupFirstForConsumer")) {
   // flush cache for groupId
}

若是Broker中已經有消息了,這時因爲啓動消費者的速度不一致,可能會致使某些消費者先啓動並率先消費消息,致使A消費者的負載不均勻。能夠在Broker中配置timeBeforeDispatchStarts,讓消費者延遲一段時間再開始負載消費。或者配置consumersBeforeDispatchStarts,讓消費者達到必定數量再開始負載消費。

具體配置以下,修改${ACTIVEMQ_HOME}\conf\activemq.xml:

<destinationPolicy>
  <policyMap>
    <policyEntries>
      <policyEntry queue=">" consumersBeforeDispatchStarts="2" timeBeforeDispatchStarts="2000"/>
    </policyEntries>
  </policyMap>
</destinationPolicy>

設置有兩個消費者都啓動好或者2秒以後,在開始負載消費消息。

Virtual Topics:

Message Groups只能用於Queue,那麼Topic對應的版本,就是這個Virtual Topics。他實現了和Message Groups相似的功能,負載均衡和失效轉移。

對於消息的生產者來講,Virtual Topics只是一個普通的Topic,可是必須以VirtualTopic.(可配置)開頭,如VirtualTopic.Foo。

而消息的消費者鏈接是一個隊列。這個隊列須要聽從如下規則,即Consumer.ClientID.VirtualTopicName。例如:Consumer.CilentA.VirtualTopic.Foo,表示消費者的Client id是ClientA,消費的是VirtualTopic.Foo這個Topic。

消息生產者代碼示例:

MessageProducer producer = session.createProducer(new ActiveMQTopic("VirtualTopic.FOO"));
TextMessage message = session.createTextMessage("foo");
producer.send(message);

消息消費者代碼示例:

MessageConsumer consumerA = session.createConsumer(new ActiveMQQueue("Consumer.ClientA.VirtualTopic.FOO"));  
consumerA.setMessageListener(new MessageListener() {
    public void onMessage(Message message) {
        // do something ...
    }
});

能夠看到,經過Virtual Topics註冊的queue訂閱關係以下:

這裏使用了2個client,A和B,分別訂閱了兩個主題pojo_topic和string_topic。

而消息的存儲格式以下:

經過上圖能夠看到,消息是以Queue的形式存儲,並不是Topic,可是Queue被直接根據所訂閱的ClientID生成多份消息。好比,VirtualTopic.string_topic有兩個訂閱者A和B,那消息就被分紅Consumer.A.VirtualTopic.string_topic和Consumer.B.VirtualTopic.string_topic。這就是將Topic經過Virtual Topics轉換成了Queue。

只要轉換成了Queue,就能夠結合使用剛纔介紹的Exclusive Consumer或者Message Groups對這個VirtualTopic的隊列進行高可用和負載均衡的配置,從而實現Topic的高可用和負載均衡。

能夠配置Broker,改變虛擬主題的默認前綴,以下面的配置,則表示虛擬主題的前綴是VirtualTopicConsumers。

<broker xmlns="http://activemq.apache.org/schema/core">
    <destinationInterceptors>
      <virtualDestinationInterceptor>
        <virtualDestinations>
          <virtualTopic name=">" prefix="VirtualTopicConsumers.*." selectorAware="true"/>
        </virtualDestinations>
      </virtualDestinationInterceptor>
    </destinationInterceptors>
</broker>

從ActiveMQ 5.4版本開始,能夠配置selectorAware屬性控制只有符合訂閱者規則的消息才被分發給相應的虛擬隊列,用於防止分發不匹配的消息,提高效率。

說了優勢,固然也要說缺點。優勢是能夠實現高可用和負載均衡;缺點是,若是訂閱者不少,每一個訂閱者都須要複製一份消息,這樣會佔用過多的磁盤空間,形成消息爆炸。

Composite Destinations:

能夠將消息發送給多個Destination。能夠混合發送Queue和Topic。例如:

Queue queue = new ActiveMQQueue("FOO.A,topic://NOTIFY.FOO.A");

我的感受這個功能比較雞肋,屬於客戶端控制服務器端的消息高可用。若是是純粹的消息可用性複製,能夠直接使用基於LevelDb的消息複製機制或基於JDBC的主從同步機制。

相關文章
相關標籤/搜索