ActiveMQ高級特性:ActiveMQ之虛擬主題

ActiveMQ支持的虛擬Destinations分爲有兩種,分別是
1.虛擬主題(Virtual Topics)
2.組合 Destinations(CompositeDestinations)java

這兩種虛擬Destinations能夠看作對簡單的topic和queue用法的補充,基於它們能夠實現一些簡單有用的EIP功能,虛擬主題相似於1對多的分支功能+消費端的cluster+failover,組合Destinations相似於簡單的destinations直接的路由功能。web

虛擬主題(Virtual Topics)
ActiveMQ中,topic只有在持久訂閱(durablesubscription)下是持久化的。存在持久訂閱時,每一個持久訂閱者,都至關於一個持久化的queue的客戶端,它會收取全部消息。這種狀況下存在兩個問題:
1.同一應用內consumer端負載均衡的問題:同一個應用上的一個持久訂閱不能使用多個consumer來共同承擔消息處理功能。由於每一個都會獲取全部消息。queue模式能夠解決這個問題,broker端又不能將消息發送到多個應用端。因此,既要發佈訂閱,又要讓消費者分組,這個功能jms規範自己是沒有的。
2.同一應用內consumer端failover的問題:因爲只能使用單個的持久訂閱者,若是這個訂閱者出錯,則應用就沒法處理消息了,系統的健壯性不高。
爲了解決這兩個問題,ActiveMQ中實現了虛擬Topic的功能。使用起來很是簡單。
對於消息發佈者來講,就是一個正常的Topic,名稱以VirtualTopic.開頭。例如VirtualTopic.TEST。
對於消息接收端來講,是個隊列,不一樣應用裏使用不一樣的前綴做爲隊列的名稱,便可代表本身的身份便可實現消費端應用分組。例如Consumer.A.VirtualTopic.TEST,說明它是名稱爲A的消費端,同理Consumer.B.VirtualTopic.TEST說明是一個名稱爲B的客戶端。能夠在同一個應用裏使用多個consumer消費此queue,則能夠實現上面兩個功能。又由於不一樣應用使用的queue名稱不一樣(前綴不一樣),因此不一樣的應用中均可以接收到所有的消息。每一個客戶端至關於一個持久訂閱者,並且這個客戶端可使用多個消費者共同來承擔消費任務。apache

生產者:
 服務器

import javax.jms.Connection;  
import javax.jms.DeliveryMode;  
import javax.jms.JMSException;  
import javax.jms.MessageProducer;  
import javax.jms.Session;  
import javax.jms.TextMessage;  
import javax.jms.Topic;  
  
import org.apache.activemq.ActiveMQConnectionFactory;  
  
public class Producer {  
  
    public static void main(String[] args) throws JMSException {  
        // 鏈接到ActiveMQ服務器  
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.18.67:61616");  
        Connection connection = factory.createConnection();  
        connection.start();  
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);  
        // 建立主題  
        Topic topic = session.createTopic("VirtualTopic.TEST");  
        MessageProducer producer = session.createProducer(topic);  
        // NON_PERSISTENT 非持久化 PERSISTENT 持久化,發送消息時用使用持久模式  
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);  
        TextMessage message = session.createTextMessage();  
        message.setText("topic 消息。");  
        message.setStringProperty("property", "消息Property");  
        // 發佈主題消息  
        producer.send(message);  
        System.out.println("Sent message: " + message.getText());  
        session.close();  
        connection.close();  
    }  
}  

消費者:session

import javax.jms.Connection;  
import javax.jms.JMSException;  
import javax.jms.Message;  
import javax.jms.MessageConsumer;  
import javax.jms.MessageListener;  
import javax.jms.Queue;  
import javax.jms.Session;  
import javax.jms.TextMessage;  
  
import org.apache.activemq.ActiveMQConnectionFactory;  
  
public class Consumer {  
  
    public static void main(String[] args) throws JMSException, InterruptedException {  
        // 鏈接到ActiveMQ服務器  
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.18.67:61616");  
        Connection connection = factory.createConnection();  
        connection.start();  
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);  
        // 建立主題   
        Queue topicA = session.createQueue("Consumer.A.VirtualTopic.TEST");  
        Queue topicB = session.createQueue("Consumer.B.VirtualTopic.TEST");  
        // 消費者A組建立訂閱  
        MessageConsumer consumerA1 = session.createConsumer(topicA);  
        consumerA1.setMessageListener(new MessageListener() {  
            // 訂閱接收方法  
            public void onMessage(Message message) {  
                TextMessage tm = (TextMessage) message;  
                try {  
                    System.out.println("Received message A1: " + tm.getText()+":"+tm.getStringProperty("property"));  
                } catch (JMSException e) {  
                    e.printStackTrace();  
                }  
            }  
        });  
          
        MessageConsumer consumerA2 = session.createConsumer(topicA);  
        consumerA2.setMessageListener(new MessageListener() {  
            // 訂閱接收方法  
            public void onMessage(Message message) {  
                TextMessage tm = (TextMessage) message;  
                try {  
                    System.out.println("Received message A2: " + tm.getText()+":"+tm.getStringProperty("property"));  
                } catch (JMSException e) {  
                    e.printStackTrace();  
                }  
            }  
        });  
          
        //消費者B組建立訂閱  
        MessageConsumer consumerB1 = session.createConsumer(topicB);  
        consumerB1.setMessageListener(new MessageListener() {  
            // 訂閱接收方法  
            public void onMessage(Message message) {  
                TextMessage tm = (TextMessage) message;  
                try {  
                    System.out.println("Received message B1: " + tm.getText()+":"+tm.getStringProperty("property"));  
                } catch (JMSException e) {  
                    e.printStackTrace();  
                }  
            }  
        });  
        MessageConsumer consumerB2 = session.createConsumer(topicB);  
        consumerB2.setMessageListener(new MessageListener() {  
            // 訂閱接收方法  
            public void onMessage(Message message) {  
                TextMessage tm = (TextMessage) message;  
                try {  
                    System.out.println("Received message B2: " + tm.getText()+":"+tm.getStringProperty("property"));  
                } catch (JMSException e) {  
                    e.printStackTrace();  
                }  
            }  
        });  
        session.close();  
        connection.close();  
    }  
}  

組合列隊Composite Destinations負載均衡

組合隊列容許用一個虛擬的destination表明多個destinations。這樣就能夠經過composite destinations在一個操做中同時向多個queue發送消息。異步

 

客戶端實現的方式tcp

在composite destinations中,多個destination之間採用「,」分割。例如:fetch

Queue queue = new ActiveMQQueue("FOO.A,FOO.B,FOO.C");
  或
  Destination destination = session.createQueue("my-queue,my-queue2");

若是你但願使用不一樣類型的destination,那麼須要加上前綴如queue:// 或topic://,例如:spa

Queue queue = new ActiveMQQueue("FOO.A,topic://NOTIFY.FOO.A");

在conf/activemq.xml中的broker下配置實現

<destinationInterceptors>
    <virtualDestinationInterceptor>
      <virtualDestinations>
        <compositeQueue name="MY.QUEUE">
          <forwardTo>
            <queue physicalName="my-queue" />
          <queue physicalName="my-queue2" />
          </forwardTo>
            </compositeQueue>
        </virtualDestinations>
    </virtualDestinationInterceptor>
</destinationInterceptors>

再java代碼發送的時候,隊列的的名字就用MY.QUEUQ

Configure Startup Destinations

若是須要在ActiveMQ啓動的時候,建立Destination的話,能夠以下配置conf/activemq.xml的broker下:

<destinations>
    <queue physicalName="FOO.BAR" />
  <topic physicalName="SOME.TOPIC" />
</destinations>

Delete Inactive Destinations

通常狀況下,ActiveMQ的queue在不使用以後,能夠經過web控制檯或是JMX方式來刪除掉。固然,也能夠經過配置,使得broker能夠自動探測到無用

的隊列(必定時間內爲空的隊列)並刪除掉,回收響應資源。能夠以下配置conf/activemq.xml:

<broker schedulePeriodForDestinationPurge="10000">
    <destinationPolicy>
      <policyMap>
        <policyEntries>
          <policyEntry queue=">" gcInactiveDestinations="true" inactiveTimoutBeforeGC="30000"/>
      </policyEntries>
    </policyMap>
    </destinationPolicy>
</broker>

說明:

  schedulePeriodForDestinationPurge:設置多長時間檢查一次,這裏是10秒,默認爲0

  inactiveTimoutBeforeGC:設置當Destination爲空後,多長時間被刪除,這裏是30秒,默認爲60

  gcInactiveDestinations: 設置刪除掉不活動隊列,默認爲false

Destination Options

隊列選項是給consumer在JMS規範以外添加的功能特性,經過在隊列名稱後面使用相似URL的語法添加多個選項。包括:

 1:consumer.prefetchSize,consumer持有的未確認最大消息數量,默認值 variable

 2:consumer.maximumPendingMessageLimit:用來控制非持久化的topic在存在慢消費者的狀況下,丟棄的數量,默認0

 3:consumer.noLocal :默認false

 4:consumer.dispatchAsync :是否異步分發 ,默認true

 5:consumer.retroactive:是否爲回溯消費者 ,默認false

 6:consumer.selector:Jms的Selector,默認null

 7:consumer.exclusive:是否爲獨佔消費者 ,默認false

 8:consumer.priority:設置消費者的優先級,默認0

 

使用示例:

queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false&consumer.prefetchSize=10");
consumer = session.createConsumer(queue);
相關文章
相關標籤/搜索