在發佈/訂閱消息模型中,發佈者發佈一個消息,該消息經過topic傳遞給全部的客戶端。在這種模型中,發佈者和訂閱者彼此不知道對方,是匿名的且能夠動態發佈和訂閱topic。topic主要用於保存和傳遞消息,且會一直保存消息直到消息被傳遞給客戶端。 session
發佈訂閱模型就像訂閱報紙。咱們能夠選擇一份或者多份報紙,好比:北京日報、人民日報。這些報紙就至關於發佈訂閱模型中的topic。若是有不少人訂閱了相同的報紙,那咱們就在同一個topic中註冊,對於報紙發行方,它就和全部的訂閱者造成了一對多的關係。以下: ide
1,每一個消息能夠有多個消費者。 測試
2,發佈者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須建立一個或多個訂閱者以後,才能消費發佈者的消息,並且爲了消費消息,訂閱者必須保持運行的狀態。 spa
3,爲了緩和這樣嚴格的時間相關性,JMS容許訂閱者建立一個可持久化的訂閱。這樣,即便訂閱者沒有被激活(運行),它也能接收到發佈者的消息。 code
3.1,發佈者 對象
/** * * @類名稱:ActiveMQpubsubProducter * @類描述:發佈者-發佈訂閱模型 */ public class ActiveMQpubsubProducter { //會話對象 private static Session session = null; public static void sendMessages(String name){ try { session = ActiveMQLinkUtil.initConnection(true, Session.AUTO_ACKNOWLEDGE); if(session != null){ //建立一個主題 Topic messageTopic = session.createTopic(name); //建立消息發佈者 MessageProducer messageProducer = session.createProducer(messageTopic); for (int i = 0; i < 5 ; i++) { TextMessage t = session.createTextMessage("ActiveMQ發佈消息:" + i); messageProducer.send(t); } session.commit(); } } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { ActiveMQpubsubProducter.sendMessages("topicDemo"); } }
3.2,訂閱者 blog
----------------------------訂閱者1---------------------------- 隊列
/** * * @類名稱:ActiveMQpubsubComsumer1 * @類描述:訂閱者1-發佈訂閱模型 */ public class ActiveMQpubsubComsumer1 { //會話對象 private static Session session = null; public static void getMessages(String name){ try { session = ActiveMQLinkUtil.initConnection(false, Session.AUTO_ACKNOWLEDGE); if(session != null){ Topic topic = session.createTopic(name); //建立消費者 MessageConsumer messageConsumer = session.createConsumer(topic); //註冊消息監聽 messageConsumer.setMessageListener(new MyListener1()); } } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { ActiveMQpubsubComsumer1.getMessages("topicDemo"); } }
----------------------------訂閱者2---------------------------- get
/** * * @類名稱:ActiveMQpubsubComsumer2 * @類描述:訂閱者2-發佈訂閱模型 */ public class ActiveMQpubsubComsumer2 { //會話對象 private static Session session = null; public static void getMessages(String name){ try { session = ActiveMQLinkUtil.initConnection(false, Session.AUTO_ACKNOWLEDGE); if(session != null){ Topic topic = session.createTopic(name); //建立消費者 MessageConsumer messageConsumer = session.createConsumer(topic); //註冊消息監聽 messageConsumer.setMessageListener(new MyListener2()); } } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { ActiveMQpubsubComsumer2.getMessages("topicDemo"); } }
3.3,監聽類 it
----------------------------訂閱者1-監聽類----------------------------
/** * * @類名稱:MyListener1 * @類描述:訂閱者1-監聽類 */ public class MyListener1 implements MessageListener { @Override public void onMessage(Message message) { try { System.out.println("訂閱者一,收到的消息:" + ((TextMessage)message).getText()); //簽收 message.acknowledge(); } catch (JMSException e) { e.printStackTrace(); } } }
----------------------------訂閱者2-監聽類----------------------------
/** * * @類名稱:MyListener2 * @類描述:訂閱者2-監聽類 */ public class MyListener2 implements MessageListener { @Override public void onMessage(Message message) { try { System.out.println("訂閱者二,收到的消息:" + ((TextMessage)message).getText()); //簽收 message.acknowledge(); } catch (JMSException e) { e.printStackTrace(); } } }
在點對點的要先啓動生產者,生產者要生產消息。而發佈訂閱模型,要先啓動訂閱者,訂閱者先訂閱topic,再發布消息。
第一步:運行ActiveMQpubsubComsumer1,ActiveMQpubsubComsumer2
第二步:運行ActiveMQpubsubProducter
ActiveMQpubsubComsumer1:
ActiveMQpubsubComsumer2:
ActiveMQpubsubProducter:
ActiveMQ控制檯截圖:
Number Of Consumers |
在該隊列上還有多少消費者在等待接受消息。 |
Messages Dequeued |
消費了多少條消息,記作C。 |
Messages Enqueued |
生產了多少條消息,記作P。 |
發佈者發佈了5條數據,可是出隊的有10條,由於有兩個訂閱者。
發佈者向一個特定的消息主題發佈消息,0或者多個訂閱者可能接收到來自特定消息主題的消息感興趣。其中發佈者和訂閱者不知道對方的存在。