ActiveMQ入門系列二:入門代碼實例(點對點模式)

在上一篇《ActiveMQ入門系列一:認識並安裝ActiveMQ(Windows下)》中,大體介紹了ActiveMQ和一些概念,並下載、安裝、啓動他,還訪問了他的控制檯頁面。html

這篇,就用代碼實例說下如何實現消息的生產和消費。java

1、理論基礎git

RabbitMQ同樣,ActiveMQ中也是有兩種模式:github

  • 點對點模式(Point to Point,簡寫爲PTP)
  • 發佈/訂閱模式(Publish & Subscribe,簡寫爲Pub & Sub)

經過上一篇咱們知道了製造消息的應用叫生產者(Producer),生產者在生產了消息後會發送消息到目的地(Destination),到達消費和處理消息的應用(也就是消費者Consumer)。這裏的兩種模式就經過對應不一樣的消息目的地(Destination)來實現,PTP對應Queue(隊列)、Pub&Sub對應Topic(主題)。apache

今天就詳細介紹下PTP和Queue,下一篇介紹Pub & Sub和Topic。session

在PTP模式的示意圖:maven

 

  • 消息生產者生產消息發送到queue中,而後消息消費者從queue中取出而且消費消息。
  • 消息被消費之後,queue中再也不有存儲,因此消息消費者不可能消費到已經被消費的消息。
  • Queue支持存在多個消費者,可是對一個消息而言,只會有一個消費者能夠消費、其它的則不能消費此消息了。
  • 當消費者不存在時,消息會一直保存,直到有消費消費。

在PTP中,代碼實現有兩種方式:消費者主動消費和消費者監聽消費,下面就分別說下。tcp

2、消費者主動消費ide

主動消費是最基本也是最簡單的消費方式,先上代碼:測試

  1. 建立maven工程並引入依賴
    <dependency>
          <groupId>org.apache.activemq</groupId>
          <artifactId>activemq-core</artifactId>
          <version>5.7.0</version>
        </dependency>
  2. 實現生產者
    package com.sam.ptp;
    import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @author JAVA開發老菜鳥 * */ public class Producer { public static final String QUEUE_NAME = "ptp-demo";//隊列名 public void producer(String message) throws JMSException { ConnectionFactory factory = null; Connection connection = null; Session session = null; MessageProducer producer = null; try { /** * 1.建立鏈接工廠 * 建立工廠,構造方法有三個參數:分別是用戶名、密碼、鏈接地址 * 無參構造:有默認的鏈接地址,localhost * 一個參數:無驗證模式,無用戶的認證 * 三個參數:有認證和鏈接地址,我這裏使用三個參數的構造方法 */ factory = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616"); /** * 2.建立鏈接,有兩個方法(我這裏使用無參數的) * 無參數 * 有參數:用戶名、密碼; */ connection = factory.createConnection(); /** * 3.啓動鏈接 * 生產者能夠不用調用start()方法啓動,由於在發送消息的時候回進行檢查 * 若是未啓動鏈接,會自動啓動。 * 若是有特殊配置,須要配置完成後再啓動鏈接 */ connection.start(); /** * 4.用鏈接建立會話 * 有兩個參數:是否須要事務、消息確認機制 * 若是支持事務,對於生產者來講第二個參數就無效了,這個時候第二個參數建議傳入Session.SESSION_TRANSACTED * 若是不支持事務,第二個參數有效且必須傳遞 * * AUTO_ACKNOWLEDGE:自動確認,消息處理後自動確認(商業開發不推薦) * CLIENT_ACKNOWLEDGE:客戶端手動確認,消費者處理後必須手動確認 * DUPS_OK_ACKNOWLEDGE:有副本的客戶端手動確認,消息能夠屢次處理(不建議) */ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); /** * 5.用會話建立目的地(隊列)、生產者、消息 * 隊列名是隊列的惟一標記 * 建立生產者的時候能夠指定目的地,也能夠在發送消息的時候再指定 */ Destination destination = session.createQueue(QUEUE_NAME); producer = session.createProducer(destination); TextMessage textMessage = session.createTextMessage(message); /** * 6.生產者發送消息到目的地 */ producer.send(textMessage); System.out.println("消息發送成功"); } catch(Exception ex){ throw ex; } finally { /** * 7.釋放資源 */ if(producer != null){ producer.close(); } if(session != null){ session.close(); } if(connection != null){ connection.close(); } } } public static void main(String[] args){ Producer producer = new Producer(); try{ producer.producer("hello, activemq"); } catch (Exception ex){ ex.printStackTrace(); } } }

     

  3. 實現消費者
    package com.sam.ptp;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * @author JAVA開發老菜鳥
     *
     * 主動消費
     */
    public class Consumer {
    
        public String consumer() throws JMSException {
            ConnectionFactory factory = null;
            Connection connection = null;
            Session session = null;
            MessageConsumer consumer = null;
            try {
                factory = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616");
                connection = factory.createConnection();
                /**
                 * 消費者必須啓動鏈接,不然沒法消費
                 */
                connection.start();
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                Destination destination = session.createQueue(Producer.QUEUE_NAME);
                consumer = session.createConsumer(destination);
                /**
                 * 獲取隊列消息
                 */
                Message message = consumer.receive();
                String text = ((TextMessage) message).getText();
                return text;
            } catch(Exception ex){
                throw ex;
            } finally {
                /**
                 * 7.釋放資源
                 */
                if(consumer != null){
                    consumer.close();
                }
                
                if(session != null){
                    session.close();
                }
    
                if(connection != null){
                    connection.close();
                }
            }
        }
    
        public static void main(String[] args){
            Consumer consumer = new Consumer();
            try{
                String message = consumer.consumer();
                System.out.println("消息消費成功:" + message);
            } catch (Exception ex){
                ex.printStackTrace();
            }
        }
    }

     

好,這樣代碼就寫好了,咱們來測試下。

1.先運行生產者,我發現報錯了。。。

好吧,原來是我此次沒有啓動ActiveMQ,被本身蠢哭了。。。

啓動ActiveMQ以後,再運行生產者,成功了。

去看下控制檯頁面的變化,隊列裏面多了個「ptp-demo」隊列,這個就是咱們生產者代碼裏面的隊列名,而且能看到該隊列的基本狀況:

從左到右依次爲,有待消費的消息1條、消費者0個、已經發送的消息1條、已經消費的消息0條

2.接下來運行消費者,成功

再去看下控制檯頁面,發現隊列信息變了,從左到右依次爲:有待消費的消息0條、消費者0個、已經發送的消息1條、已經消費的消息1條

也就是說,消息真的被消費了!

 代碼寫完了,也按照預期執行完了,咱們如今再回過頭來分析下消費者的代碼,會發現他在consumer.receive()以後不會再消費其餘消息了,即使後面再有消息被生產出來也不會再消費。也就是說只能在運行後消費一次消息,這個就是主動消費。

若是想要循環消費屢次產生的消息的話,怎麼辦呢?請用下面的監聽消費

 

3、消費者監聽消費

仍是先上代碼,代碼結構同主動消費相似,有細微差異,具體代碼不貼了,能夠到個人GitHub碼雲上獲取源碼

  1. 首先爲了區分,我把隊列名改了
    public static final  String QUEUE_NAME = "ptp-listener-demo";//隊列名

     

  2. 生產者和消費者的消息確認方式都改爲了客戶端手動確認,再也不自動確認,手動確認有個好處就是能夠防止消息沒有被正常消費而丟失,這個同RabbitMQ機制同樣
    session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

     

  3. 生產者生產消息的時候,爲了方便我改爲了一次性發送10條
    /**
                 * 6.建立消息而且生產者發送消息到目的地
                 */
                for(int num = 0; num < 10; num++){
                    TextMessage textMessage = session.createTextMessage(message + num);
                    producer.send(textMessage);
                    System.out.println("消息發送成功"+textMessage.getText());
                }

     

  4. 關鍵點來了,在消費者上加了一個監聽器
     /**
                 * 註冊監聽器,隊列中的消息變化會自動觸發監聽器,接收並自動處理消息
                 *
                 * 監聽器一旦註冊,永久有效,一直到程序關閉
                 * 監聽器能夠註冊多個,至關於集羣
                 * activemq自動輪詢多個監聽器,實現並行處理
                 */
                consumer.setMessageListener(new MessageListener() {
                    @Override
                    public void onMessage(Message message) {
    
                        try {
                            //須要手動確認消息
                            message.acknowledge();
                            TextMessage om = (TextMessage) message;
                            String data = om.getText();
                            System.out.println(data);
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                });

     

執行生產者:

 

執行消費者,消息所有被消費了:

 

再執行2遍生產者,消息一樣都被消費了。 

控制檯頁面多了個隊列,因爲監聽中的消費者沒有關閉,所以這裏能看到消費者數量爲1,我執行了三遍生產者,所以消息有30條。

還沒完,繼續...

咱們此次先啓動2個消費者,而後啓動生產者

兩個生產者分別消費了消息0,2,4,6,8和1,3,5,7,9

也就是說兩個消費者都監聽到了消息,而且activemq自動輪詢兩個監聽器發送消息。

 

好,到這裏,ActiveMQ的點對點模式就介紹完了。下一篇介紹發佈訂閱模式,敬請期待

相關文章
相關標籤/搜索