Apache ActiveMQ Queue Topic 詳解

1、特性及優點 


一、實現 JMS1.1 規範,支持 J2EE1.4以上
二、可運行於任何 jvm和大部分 web 容器(ActiveMQ works great in any JVM)
三、支持多種語言客戶端(java, C, C++, AJAX, ACTIONSCRIPT 等等)
四、支持多種協議(stomp,openwire,REST)
五、良好的 spring 支持(ActiveMQ has great Spring Support)
六、速度很快,JBossMQ的十倍(ActiveMQ is very fast; often 10x faster than
JBossMQ.)
七、與 OpenJMS、JbossMQ等開源jms provider 相比,ActiveMQ有 Apache 的支
持,持續發展的優點明顯。 html

 

2、下載部署 

一、下載
http://activemq.apache.org/activemq-510-release.html ,下載 5.1.0 Windows
Distribution版本
二、安裝
直接解壓至任意目錄(如:d:\ apache-activemq-5.1.0)
三、啓動 ActiveMQ服務器
方法 1:
直接運行 bin\activemq.bat
方法 2(在 JVM 中嵌套啓動):
cd example
ant embedBroker
四、ActiveMQ消息管理後臺系統:
http://localhost:8161/admin java

 

 

3、運行附帶的示例程序 

一、Queue 消息示例:(點對點)
*  啓動 Queue 消息消費者
cd example ant consumer
*  啓動 Queue 消息生產者
cd example
ant producer
簡要說明:生產者(producer)發消息,消費者(consumer)接消息,發送/接
收 2000 個消息後自動關閉
二、Topic 消息示例:(羣組訂閱)
*  啓動 Topic 消息消費者
cd example
ant topic-listener
*  啓動 Topic 消息生產者
cd example
ant topic-publisher
簡要說明:重複 10 輪,publisher每輪發送2000 個消息,並等待獲取 listener
的處理結果報告,而後進入下一輪發送,最後統計全局發送時間。 web


4、Queue與 Topic 的比較 spring


一、JMS Queue 執行 load balancer語義:
一條消息僅能被一個 consumer(消費者) 收到。若是在 message 發送的時候沒有可用的
consumer,那麼它將被保存一直到能處理該 message 的 consumer 可用。若是一
個 consumer 收到一條 message 後卻不響應它,那麼這條消息將被轉到另外一個
consumer 那兒。一個 Queue 能夠有不少 consumer,而且在多個可用的 consumer
中負載均衡。  數據庫

 

 

注: apache


點對點消息傳遞域的特色以下: 
•  每一個消息只能有一個消費者。  
•  消息的生產者和消費者之間沒有時間上的相關性。不管消費者在生產者發
送消息的時候是否處於運行狀態,它均可以提取消息。
服務器


二、Topic 實現 publish和 subscribe 語義:
一條消息被 publish時,它將發到全部感興趣的訂閱者,因此零到多個subscriber
將接收到消息的一個拷貝。可是在消息代理接收到消息時,只有激活訂閱的
subscriber可以得到消息的一個拷貝。 session

 

注: 負載均衡

 

發佈/訂閱消息傳遞域的特色以下: 
•  每一個消息能夠有多個消費者。  
•  生產者和消費者之間有時間上的相關性。訂閱一個主題的消費者只能消費
自它訂閱以後發佈的消息。JMS 規範容許客戶建立持久訂閱,這在必定程
度上放鬆了時間上的相關性要求。持久訂閱容許消費者消費它在未處於激
活狀態時發送的消息。
jvm


三、分別對應兩種消息模式:
Point-to-Point (點對點),Publisher/Subscriber Model (發佈/訂閱者) 其中在 Publicher/Subscriber 模式下又有Nondurable subscription(非持久訂閱)
和 durable subscription (持久化訂閱)2種消息處理方式(支持離線消息)。

 

注:

在點對點消息傳遞域中,目的地被成爲隊列(queue);在發佈/訂閱消息傳遞
域中,目的地被成爲主題(topic)。


5、Point-to-Point (點對點)消息模式開發流程


一、生產者(producer)開發流程(ProducerTool.java):

1.1  建立 Connection:
根據 url,user 和 password 建立一個 jms Connection。

 

Java代碼   收藏代碼
  1. ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);  
  2.             connection = connectionFactory.createConnection();  
  3.             connection.start();  
 

 

1.2  建立 Session:
在 connection的基礎上建立一個 session,同時設置是否支持事務和
ACKNOWLEDGE 標識。

 
Java代碼   收藏代碼
  1. Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);  
 


1.3  建立 Destination對象:
需指定其對應的主題(subject)名稱,producer 和 consumer 將根據 subject
來發送/接收對應的消息。

 

Java代碼   收藏代碼
  1. if (topic) {  
  2.     destination = session.createTopic(subject);  
  3. else {  
  4.     destination = session.createQueue(subject);  
  5. }  
 


1.4  建立 MessageProducer:
根據 Destination建立MessageProducer 對象,同時設置其持久模式。

 

Java代碼   收藏代碼
  1. MessageProducer producer = session.createProducer(destination);            


1.5  發送消息到隊列(Queue):
封裝 TextMessage 消息, 使用 MessageProducer 的 send 方法將消息發送出去。

 

Java代碼   收藏代碼
  1. TextMessage message = session.createTextMessage(createMessageText(i));  
  2. producer.send(message);  

 


二、消費者(consumer)開發流程(ConsumerTool.java):
2.1  實現 MessageListener 接口:
消費者類必須實現MessageListener 接口,而後在onMessage()方法中監聽消息的
到達並處理。

 

Java代碼   收藏代碼
  1. public class ConsumerTool extends Thread implements MessageListener, ExceptionListener  

 實現 onMessage(Message message)方法,實現監聽消息的到達


2.2  建立 Connection:
根據 url,user 和 password 建立一個 jms Connection,若是是durable 模式,
還須要給 connection設置一個 clientId。

 

Java代碼   收藏代碼
  1. ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);  
  2. Connection connection = connectionFactory.createConnection();  
  3. //是不是 durable 模式.(離線消息持久化支持)   
  4. if (durable && clientId != null && clientId.length() > 0 && !"null".equals(clientId)) {  
  5.     connection.setClientID(clientId);  
  6. }  
  7. connection.setExceptionListener(this);  
  8. connection.start();  
 


2.3  建立 Session 和 Destination:
與 ProducerTool.java 中的流程相似,再也不贅述。

 

Java代碼   收藏代碼
  1. session = connection.createSession(transacted, ackMode);  

 

2.4 建立 replyProducer【可選】:
能夠用來將消息處理結果發送給 producer。

2.5  建立 MessageConsumer:  
根據 Destination建立MessageConsumer 對象。

 

Java代碼   收藏代碼
  1. MessageConsumer consumer = null;  
  2. if (durable && topic) {  
  3.     consumer = session.createDurableSubscriber((Topic) destination, consumerName);  
  4. else {  
  5.     consumer = session.createConsumer(destination);  
  6. }  
 

 

2.6  消費 message:
 在 onMessage()方法中接收producer 發送過來的消息進行處理,並能夠經過
replyProducer 反饋信息給 producer

 

 
Java代碼   收藏代碼
  1. if (message.getJMSReplyTo() != null) {  
  2.     replyProducer.send(message.getJMSReplyTo()  
  3.                                    , session.createTextMessage("Reply: "   
  4.                                                                              + message.getJMSMessageID()));  
  5. }  
 


6、Publisher/Subscriber(發佈/訂閱者)消息模式開發流程


一、訂閱者(Subscriber)開發流程(TopicListener.java):
 
1.1  實現 MessageListener 接口:
在 onMessage()方法中監聽發佈者發出的消息隊列,並作相應處理。

 

Java代碼   收藏代碼
  1. public void onMessage(Message message) {  
  2.     if (checkText(message, "SHUTDOWN")) {  
  3.   
  4.         try {  
  5.             connection.close();  
  6.         } catch (Exception e) {  
  7.             e.printStackTrace(System.out);  
  8.         }  
  9.   
  10.     } else if (checkText(message, "REPORT")) {  
  11.         // send a report:  
  12.         try {  
  13.             long time = System.currentTimeMillis() - start;  
  14.             String msg = "Received " + count + " in " + time + "ms";  
  15.             producer.send(session.createTextMessage(msg));  
  16.         } catch (Exception e) {  
  17.             e.printStackTrace(System.out);  
  18.         }  
  19.         count = 0;  
  20.   
  21.     } else {  
  22.   
  23.         if (count == 0) {  
  24.             start = System.currentTimeMillis();  
  25.         }  
  26.   
  27.         if (++count % 1000 == 0) {  
  28.             System.out.println("Received " + count + " messages.");  
  29.         }  
  30.     }  
  31. }  
 


1.2  建立 Connection:
根據 url,user 和 password 建立一個 jms Connection。

 

 
Java代碼   收藏代碼
  1. ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);  
  2. connection = factory.createConnection();  
 


1.3  建立 Session:
在 connection的基礎上建立一個 session,同時設置是否支持事務和
ACKNOWLEDGE 標識。

 

Java代碼   收藏代碼
  1. session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
 


1.4  建立 Topic:
 建立 2 個Topic,  topictest.messages用於接收發布者發出的消息,
topictest.control用於向發佈者發送消息,實現雙方的交互。

 

Java代碼   收藏代碼
  1. topic = session.createTopic("topictest.messages");  
  2. control = session.createTopic("topictest.control");  
 


1.5  建立 consumer 和 producer 對象:
 根據topictest.messages建立consumer,根據topictest.control建立producer。

 

Java代碼   收藏代碼
  1. MessageConsumer consumer = session.createConsumer(topic);//建立消費者  
  2. consumer.setMessageListener(this);  
  3.   
  4. connection.start();  
  5.   
  6. producer = session.createProducer(control);//建立生產者  
 


1.6  接收處理消息:
 在 onMessage()方法中,對收到的消息進行處理,可直接簡單在本地顯示消
息,或者根據消息內容不一樣處理對應的業務邏輯(好比:數據庫更新、文件操做
等等),而且可使用 producer對象將處理結果返回給發佈者。

 

Java代碼   收藏代碼
  1. //能夠先檢查消息類型  
  2. ate static boolean checkText(Message m, String s) {  
  3.     try {  
  4.         return m instanceof TextMessage && ((TextMessage)m).getText().equals(s);  
  5.     } catch (JMSException e) {  
  6.         e.printStackTrace(System.out);  
  7.         return false;  
  8.     }  
  9. }  
 
Java代碼   收藏代碼
  1. //而後  
  2.   if (checkText(message, "SHUTDOWN")) {  
  3.   
  4.           //關機  
  5.   
  6.         } else if (checkText(message, "REPORT")) {  
  7.             // 打印  
  8.             
  9.   
  10.         } else {  
  11.             //別的操做  
  12.            
  13.         }  
 


二、發佈者(Publisher)開發流程(TopicPublisher.java):


2.1  實現 MessageListener 接口:
在 onMessage()方法中接收訂閱者的反饋消息。

 

Java代碼   收藏代碼
  1. public void onMessage(Message message) {  
  2.     synchronized (mutex) {  
  3.         System.out.println("Received report " + getReport(message) + " " + --remaining + " remaining");  
  4.         if (remaining == 0) {  
  5.             mutex.notify();  
  6.         }  
  7.     }  
  8. }  
 


2.2  建立 Connection:
根據 url  建立一個 jms Connection。

 

Java代碼   收藏代碼
  1. ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);  
  2. connection = factory.createConnection();  
 

 

2.3  建立 Session:
在 connection的基礎上建立一個 session,同時設置是否支持事務和
ACKNOWLEDGE 標識。

 

Java代碼   收藏代碼
  1. session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
 


2.4  建立 Topic:
 建立 2 個Topic,topictest.messages用於向訂閱者發佈消息,topictest.control用
於接收訂閱者反饋的消息。這2個topic與訂閱者開發流程中的topic是一一對應
的。

 

Java代碼   收藏代碼
  1. topic = session.createTopic("topictest.messages");  
  2. control = session.createTopic("topictest.control");  

 

2.5  建立 consumer 和 producer 對象:
 根據topictest.messages建立publisher;
根據topictest.control建立consumer,同時監聽訂閱者反饋的消息。

 

Java代碼   收藏代碼
  1. publisher = session.createProducer(topic);  
  2. publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//非持久化模式  
  3.   
  4. session.createConsumer(control).setMessageListener(this);//加入監聽  
  5. connection.start();  
 


2.6  給全部訂閱者發送消息,並接收反饋消息:
 示例代碼中,一共重複 10 輪操做。

 

Java代碼   收藏代碼
  1. for (int i = 0; i < batch; i++) {  
  2.     if (i > 0) {  
  3.         Thread.sleep(delay * 1000);  
  4.     }  
  5.     times[i] = batch(messages);  
  6.     System.out.println("Batch " + (i + 1) + " of " + batch + " completed in " + times[i] + " ms.");  
  7. }  
 


每輪先向全部訂閱者發送 2000 個消息;

 

Java代碼   收藏代碼
  1. private long batch(int msgCount) throws Exception {  
  2.     long start = System.currentTimeMillis();  
  3.     remaining = subscribers;  
  4.     publish();  
  5.     waitForCompletion();  
  6.     return System.currentTimeMillis() - start;  
  7. }  

 

 

Java代碼   收藏代碼
  1. private void publish() throws Exception {  
  2.   
  3.     // send events  
  4.     BytesMessage msg = session.createBytesMessage();  
  5.     msg.writeBytes(payload);  
  6.     for (int i = 0; i < messages; i++) {  
  7.         publisher.send(msg);  
  8.         if ((i + 1) % 1000 == 0) {  
  9.             System.out.println("Sent " + (i + 1) + " messages");  
  10.         }  
  11.     }  
  12.   
  13.     // request report  
  14.     publisher.send(session.createTextMessage("REPORT"));  
  15. }  

 

而後堵塞線程,開始等待;

 

Java代碼   收藏代碼
  1. private void waitForCompletion() throws Exception {  
  2.     System.out.println("Waiting for completion...");  
  3.     synchronized (mutex) {  
  4.         while (remaining > 0) {  
  5.             mutex.wait();//賭賽線程  
  6.         }  
  7.     }  
  8. }  
 


最後經過 onMessage()方法,接收到訂閱者反饋的「REPORT」類信息後,才
print 反饋信息並解除線程堵塞,進入下一輪。

 

Java代碼   收藏代碼
  1. public void onMessage(Message message) {  
  2.     synchronized (mutex) {  
  3.         System.out.println("Received report " + getReport(message) + " " + --remaining + " remaining");  
  4.         if (remaining == 0) {  
  5.             mutex.notify();//喚醒線程  
  6.         }  
  7.     }  
  8. }  
 

注:可同時運行多個訂閱者測試查看此模式效果

相關文章
相關標籤/搜索