3,ActiveMQ-入門(基於JMS發佈訂閱模型)

1、Pub/Sub-發佈/訂閱消息傳遞模型

在發佈/訂閱消息模型中,發佈者發佈一個消息,該消息經過topic傳遞給全部的客戶端。在這種模型中,發佈者和訂閱者彼此不知道對方,是匿名的且能夠動態發佈和訂閱topic。topic主要用於保存和傳遞消息,且會一直保存消息直到消息被傳遞給客戶端。 session

發佈訂閱模型就像訂閱報紙。咱們能夠選擇一份或者多份報紙,好比:北京日報、人民日報。這些報紙就至關於發佈訂閱模型中的topic。若是有不少人訂閱了相同的報紙,那咱們就在同一個topic中註冊,對於報紙發行方,它就和全部的訂閱者造成了一對多的關係。以下: ide

二,Pub/Sub特色

1,每一個消息能夠有多個消費者。 測試

2,發佈者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須建立一個或多個訂閱者以後,才能消費發佈者的消息,並且爲了消費消息,訂閱者必須保持運行的狀態。 spa

3,爲了緩和這樣嚴格的時間相關性,JMS容許訂閱者建立一個可持久化的訂閱。這樣,即便訂閱者沒有被激活(運行),它也能接收到發佈者的消息。 code

三,發佈訂閱模型的實現

3.1,發佈者 對象

/**
 * 
 * @類名稱:ActiveMQpubsubProducter
 * @類描述:發佈者-發佈訂閱模型
 */
public class ActiveMQpubsubProducter {
    //會話對象
    private static Session session = null;
    
    public static void sendMessages(String name){
        try {
            session = ActiveMQLinkUtil.initConnection(true, Session.AUTO_ACKNOWLEDGE);
            if(session != null){
                //建立一個主題
                Topic messageTopic = session.createTopic(name);
                //建立消息發佈者  
                MessageProducer messageProducer = session.createProducer(messageTopic);
                for (int i = 0; i < 5 ; i++) {
                    TextMessage  t = session.createTextMessage("ActiveMQ發佈消息:" + i);
                    messageProducer.send(t);
                }
                session.commit();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    public static void main(String[] args) {
        ActiveMQpubsubProducter.sendMessages("topicDemo");
    }
}

3.2,訂閱者 blog

----------------------------訂閱者1---------------------------- 隊列

/**
 * 
 * @類名稱:ActiveMQpubsubComsumer1
 * @類描述:訂閱者1-發佈訂閱模型
 */
public class ActiveMQpubsubComsumer1 {
    //會話對象
    private static Session session = null;
    
    public static void getMessages(String name){
        try {
            session = ActiveMQLinkUtil.initConnection(false, Session.AUTO_ACKNOWLEDGE);
            if(session != null){
                Topic topic = session.createTopic(name);
                //建立消費者  
                MessageConsumer messageConsumer = session.createConsumer(topic);
                //註冊消息監聽  
                messageConsumer.setMessageListener(new MyListener1());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    public static void main(String[] args) {
        ActiveMQpubsubComsumer1.getMessages("topicDemo");
    }
}

----------------------------訂閱者2---------------------------- get

/**
 * 
 * @類名稱:ActiveMQpubsubComsumer2
 * @類描述:訂閱者2-發佈訂閱模型
 */
public class ActiveMQpubsubComsumer2 {
    //會話對象
    private static Session session = null;
    
    public static void getMessages(String name){
        try {
            session = ActiveMQLinkUtil.initConnection(false, Session.AUTO_ACKNOWLEDGE);
            if(session != null){
                Topic topic = session.createTopic(name);
                //建立消費者  
                MessageConsumer messageConsumer = session.createConsumer(topic);
                //註冊消息監聽  
                messageConsumer.setMessageListener(new MyListener2());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    public static void main(String[] args) {
        ActiveMQpubsubComsumer2.getMessages("topicDemo");
    }
}

3.3,監聽類 it

----------------------------訂閱者1-監聽類----------------------------

/**
 * 
 * @類名稱:MyListener1
 * @類描述:訂閱者1-監聽類
 */
public class MyListener1 implements MessageListener {
    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("訂閱者一,收到的消息:" + ((TextMessage)message).getText());
            //簽收
            message.acknowledge();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

----------------------------訂閱者2-監聽類----------------------------

/**
 * 
 * @類名稱:MyListener2
 * @類描述:訂閱者2-監聽類
 */
public class MyListener2 implements MessageListener {
    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("訂閱者二,收到的消息:" + ((TextMessage)message).getText());
            //簽收
            message.acknowledge();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

四,測試

在點對點的要先啓動生產者,生產者要生產消息。而發佈訂閱模型,要先啓動訂閱者,訂閱者先訂閱topic,再發布消息。

第一步:運行ActiveMQpubsubComsumer1,ActiveMQpubsubComsumer2

第二步:運行ActiveMQpubsubProducter

五,結果

ActiveMQpubsubComsumer1:

ActiveMQpubsubComsumer2:

ActiveMQpubsubProducter:

ActiveMQ控制檯截圖:

Number Of Consumers

在該隊列上還有多少消費者在等待接受消息。

Messages Dequeued

消費了多少條消息,記作C。

Messages Enqueued

生產了多少條消息,記作P。

發佈者發佈了5條數據,可是出隊的有10條,由於有兩個訂閱者。

六,總結

發佈者向一個特定的消息主題發佈消息,0或者多個訂閱者可能接收到來自特定消息主題的消息感興趣。其中發佈者和訂閱者不知道對方的存在。

相關文章
相關標籤/搜索