ActiveMQ 學習記錄 之 兩種基本通訊方式

在前面一篇文章裏討論過幾種應用系統集成的方式,發現實際上面向消息隊列的集成方案算是一個整體比較合理的選擇。這裏,咱們先針對具體的一個消息隊列Activemq的基本通訊方式進行探討。activemq是JMS消息通訊規範的一個實現。總的來講,消息規範裏面定義最多見的幾種消息通訊模式主要有發佈-訂閱、點對點這兩種。另外,經過結合這些模式的具體應用,咱們在處理某些應用場景的時候也衍生出來了一種請求應答的模式。下面,咱們針對這幾種方式一一討論一下。java

 

1、 基礎流程

 

在討論具體方式的時候,咱們先看看使用activemq須要啓動服務的主要過程。apache

    按照JMS的規範,咱們首先須要得到一個JMS connection factory.,經過這個connection factory來建立connection.session

在這個基礎之上咱們再建立session, destination, producer和consumer。所以主要的幾個步驟以下:tcp

1. 得到JMS connection factory. 經過咱們提供特定環境的鏈接信息來構造factory。ide

2. 利用factory構造JMS connection函數

3. 啓動connection優化

4. 經過connection建立JMS session.spa

5. 指定JMS destination.線程

6. 建立JMS producer或者建立JMS message並提供destination.code

7. 建立JMS consumer或註冊JMS message listener.

8. 發送和接收JMS message.

9. 關閉全部JMS資源,包括connection, session, producer, consumer等。


2、publish-subscribe發佈訂閱模式

 

   發佈訂閱模式有點相似於咱們平常生活中訂閱報紙。每一年到年尾的時候,郵局就會發一本報紙集合讓咱們來選擇訂閱哪個。

在這個表裏頭列了全部出版發行的報紙,那麼對於咱們每個訂閱者來講,咱們能夠選擇一份或者多份報紙。好比北京日報、

瀟湘晨報等。那麼這些個咱們訂閱的報紙,就至關於發佈訂閱模式裏的topic。有不少我的訂閱報紙,也有人可能和我訂閱了相同的

報紙。那麼,在這裏,至關於咱們在同一個topic裏註冊了。對於一份報紙發行方來講,它和全部的訂閱者就構成了一個1對多的關係。

這種關係以下圖所示:

 

 

如今,假定咱們用前面討論的場景來寫一個簡單的示例。咱們首先須要定義的是publisher.

 

publisher

 

      publisher是屬於發佈信息的一方,它經過定義一個或者多個topic,而後給這些topic發送消息。

    publisher的構造函數以下:

public class JMSPublisher {  
  
    private static final String USER=ActiveMQConnection.DEFAULT_USER;  
    private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD;  
    private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL;  
      
    private static final int SENDNUM=10;  
      
    public static void main(String[] args) throws JMSException {  
        ConnectionFactory factory;  
        Connection conn;  
        Session session;  
        Destination des;  
        MessageProducer producer;  
          
        //一、獲取連接工廠  
        factory = new org.apache.activemq.ActiveMQConnectionFactory(JMSPublisher.USER, JMSPublisher.PASSWORD, JMSPublisher.BROKEURL);  
        //二、獲取連接  
        conn = factory.createConnection();  
        //三、開啓連接  
        conn.start();  
        //四、獲取會話  
        session=conn.createSession(true, Session.AUTO_ACKNOWLEDGE);  
        //五、建立隊列  
        des=session.createTopic("topic1");  
        //六、建立消息生產者  
        producer=session.createProducer(des);  
          
        //七、發送消息  
        for(int i=0;i<JMSPublisher.SENDNUM;i++){  
            TextMessage message=session.createTextMessage("生產者生產消息:   第"+i+"條==========");  
            System.out.println("生產者生產消息:   第"+i+"條==========");  
            producer.send(message);  
        }  
          
        //八、提交  
        session.commit();  
    }  
      
}

 

 

 

  Consumer1

 

Consumer的代碼也很相似,具體的步驟無非就是1.初始化資源。 2. 接收消息。 3. 必要的時候關閉資源。

public class JMSConsumer {  
      
    private static final String USER=ActiveMQConnection.DEFAULT_USER;  
    private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD;  
    private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL;  
      
    private static final int SENDNUM=10;  
      
    public static void main(String[] args) throws JMSException {  
        ConnectionFactory factory;  
        Connection conn;  
        Session session;  
        Destination des;  
        MessageConsumer consumer;  
          
        //一、實例化鏈接工廠  
        factory=new ActiveMQConnectionFactory(JMSConsumer.USER, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);  
        //二、獲取連接  
        conn=factory.createConnection();  
        //三、開啓連接  
        conn.start();  
        //四、建立會話  
        session=conn.createSession(true, Session.AUTO_ACKNOWLEDGE);  
        //五、消息目的地  
        des=session.createTopic("topic1");  
        //六、建立消息消費者  
        consumer = session.createConsumer(des);  
        //七、獲取消息  
        consumer.setMessageListener(new MyTopicListener1());  
    }  
}

 

 

Consumer2

 

public class JMSConsumer2 {  
  
    private static final String USER=ActiveMQConnection.DEFAULT_USER;  
    private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD;  
    private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL;  
      
    public static void main(String[] args) throws JMSException {  
        ConnectionFactory connFactory;  
        Connection conn;  
        Session session;  
        Destination des;  
        MessageConsumer consumer;  
          
        //一、鏈接工廠  
        connFactory = new ActiveMQConnectionFactory(JMSConsumer2.USER, JMSConsumer2.PASSWORD, JMSConsumer2.BROKEURL);  
        //二、鏈接  
        conn = connFactory.createConnection();  
        //三、開啓鏈接  
        conn.start();  
        //四、會話  
        session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);  
        //五、目的  
        des = session.createTopic("topic_2");  
        //六、消費者  
        consumer = session.createConsumer(des);  
          
        //七、監聽  
        consumer.setMessageListener(new MyMessageListener2());  
          
    }  
}

 

 

在發佈訂閱模式中,消費者要主動監聽生產者的生產消息

 

MessageListener1:

 

 Listener對象的職責很簡單,主要就是處理接收到的消息:

 

 

public class MyMessageListener implements MessageListener{  
  
    public void onMessage(Message message) {  
        if(message!=null){  
            try {  
                System.out.println("111111111111消費者 接收消息 : "+((TextMessage)message).getText()+"<<<<============");  
            } catch (JMSException e) {  
                e.printStackTrace();  
            }  
              
        }  
    }  
  
}

 

 

MessageListener2:

public class MyMessageListener2 implements MessageListener{  
  
    public void onMessage(Message message) {  
        if(message!=null){  
            try {  
                System.out.println("22222222222消費者 接收消息 : "+((TextMessage)message).getText()+"<<<<============");  
            } catch (JMSException e) {  
                e.printStackTrace();  
            }  
              
        }  
    }  
  
}


結果:

 

 

 

它實現了MessageListener接口,裏面的onMessage方法就是在接收到消息以後會被調用的方法。

    如今,經過實現前面的publisher和consumer咱們已經實現了pub-sub模式的一個實例。仔細回想它的步驟的話,

主要就是要二者設定一個共同的topic,有了這個topic以後他們能夠實現一方發消息另一方接收。另外,爲了鏈接到具體的

message server,這裏是使用了鏈接tcp://localhost:16161做爲定義ActiveMQConnectionFactory的路徑。在publisher端經過

session建立producer,根據指定的參數建立destination,而後將消息和destination做爲producer.send()方法的參數發消息。

在consumer端也要建立相似的connection,session。經過session獲得destination,再經過session.createConsumer(destination)

來獲得一個MessageConsumer對象。有了這個MessageConsumer咱們就能夠自行選擇是直接同步的receive消息仍是註冊listener了。

 

 

3、P2P

 

p2p的過程則理解起來更加簡單。它比如是兩我的打電話,這兩我的是獨享這一條通訊鏈路的。一方發送消息,另一方接收,

就這麼簡單。在實際應用中由於有多個用戶對使用p2p的鏈路,它的通訊場景以下圖所示:

 

 

 

咱們再來看看一個p2p的示例:

    在p2p的場景裏,相互通訊的雙方是經過一個相似於隊列的方式來進行交流。和前面pub-sub的區別在於一個topic有一個發送者

和多個接收者,而在p2p裏一個queue只有一個發送者和一個接收者。

 

 

Producer:

 

 

public class JMSProducer {  
      
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默認鏈接用戶名  
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默認鏈接用戶名  
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默認鏈接地址  
    private static final int SENDNUM = 10;//發送消息數量  
      
    public static void main(String[] args) throws Exception {  
        ConnectionFactory connFactory;//鏈接工廠  
        Connection conn;//鏈接  
        Session session;//會話    接受或者發送消息的線程  
        Destination destination;//消息目的地  
        MessageProducer producer;//消息生產者  
          
        //一、實例化鏈接工廠  
        connFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);  
        //二、獲取鏈接  
        conn = connFactory.createConnection();  
        //三、啓動鏈接  
        conn.start();  
        //四、建立會話--是否開啓事務--消息確認方式  
        session = conn.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE );  
        //五、建立消息隊列  
        destination = session.createQueue("FirstQueue1");  
        //六、建立消息生產者  
        producer = session.createProducer(destination);  
          
        //七、發送消息  
        for(int i=0;i<JMSProducer.SENDNUM;i++){  
            TextMessage textMessage = session.createTextMessage("ActiveMQ 發送消息:"+i);  
            System.out.println("生產者生產消息:"+textMessage.getText());  
            producer.send(textMessage);  
        }  
          
        session.commit();  
        if(conn!=null){  
            conn.close();  
        }  
    }  
}


 

 

Consumer:

 

 

public class JMSConsumer {  
  
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默認鏈接用戶名  
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默認鏈接用戶名  
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默認鏈接地址  
      
    public static void main(String[] args) throws Exception {  
        ConnectionFactory connFactory;//鏈接工廠  
        Connection conn;//鏈接  
        Session session;//會話    接受或者發送消息的線程  
        Destination destination;//消息目的地  
        MessageConsumer consumer;//消息生產者  
          
        //一、實例化鏈接工廠  
        connFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);  
        //二、獲取鏈接  
        conn = connFactory.createConnection();  
        //三、啓動鏈接  
        conn.start();  
        //四、建立會話--是否開啓事務--消息確認方式  
        session = conn.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE );  
        //五、建立消息隊列  
        destination = session.createQueue("FirstQueue1");  
        //六、建立消費者  
        consumer = session.createConsumer(destination);  
          
        //七、註冊消息監聽  
        consumer.setMessageListener(new MyMessageListener());  
          
    }  
}


Listener:

public class MyMessageListener implements javax.jms.MessageListener {  
  
    public void onMessage(Message message) {  
        if(message!=null){  
            try {  
                System.out.println("消費者監聽消息:---->"+((TextMessage)message).getText());  
            } catch (JMSException e) {  
                e.printStackTrace();  
            }  
        }  
    }  
  
}


  這裏代碼和前面pub-sub的具體實現代碼很是類似,就再也不贅述。

 

 

      如今若是咱們比較一下pub-sub和p2p模式的具體實現步驟的話,咱們會發現他們基本的處理流程都是相似的,

除了在pub-sub中要經過createTopic來設置topic,而在p2p中要經過createQueue來建立通訊隊列。他們之間存在着不少的重複之處,

在具體的開發過程當中,咱們是否能夠進行一些工程上的優化呢?別急,後面咱們會討論到的。

 

 

  回顧前面三種基本的通訊方式,咱們會發現,他們都存在着必定的共同點,好比說都要初始化ConnectionFactory, Connection,

Session等。在使用完以後都要將這些資源關閉。若是每個實現它的通訊端都這麼寫一通的話,實際上是一種簡單的重複。

從工程的角度來看是徹底沒有必要的。那麼,咱們有什麼辦法能夠減小這種重複呢?

    一種簡單的方式就是經過工廠方法封裝這些對象的建立和銷燬,而後簡單的經過調用工廠方法的方式獲得他們。另外,

既然基本的流程都是在開頭建立資源在結尾銷燬,咱們也能夠採用Template Method模式的思路。經過繼承一個抽象類,

在抽象類裏提供了資源的封裝。全部繼承的類只要實現怎麼去使用這些資源的方法就能夠了。Spring中間的JMSTemplate就提供了

這種相似思想的封裝。

 

 

在上訴的代碼中幾個常量須要寫一下:

 

一、

ActiveMQConnection.DEFAULT_USER 表示默認鏈接用戶名

ActiveMQConnection.DEFAULT_PASSWORD;表示默認鏈接用戶名

ActiveMQConnection.DEFAULT_BROKER_URL;表示默認地址

二、

 

Session.AUTO_ACKNOWLEDGE

當客戶端從 receive 或 onMessage成功返回時,Session 自動簽收客戶端的這條消息的收條。

Session.CLIENT_ACKNOWLEDGE

客戶端經過調用消息的 acknowledge 方法簽收消息。message.acknowledge();

 

客戶經過消息的acknowledge 方法確認消

息。須要注意的是,在這種模式中,確認是在會話層上進行:確認一個被

消費的消息將自動確認全部已被會話消費的消息。例如,若是一個消息消

費者消費了10 個消息,而後確認第5 個消息,那麼全部10 個消息都被確

認。

Session.DUPS_ACKNOWLEDGE。

 

該選擇只是會話遲鈍的確認消息的提交。如

果JMS provider 失敗,那麼可能會致使一些重複的消息。若是是重複的

消息,那麼JMS provider 必須把消息頭的JMSRedelivered 字段設置爲

true。

相關文章
相關標籤/搜索