消息中間件JMS

一.什麼是消息中間件  

   消息中間件利用高效可靠的消息傳遞機制進行平臺無關的數據交流,並基於數據通訊來進行分佈式系統的集成。經過提供消息傳遞和消息排隊模型,它能夠在分佈式環境下擴展進程間的通訊。apache

  ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個徹底支持JMS1.1和J2EE 1.4規範的 JMS Provider實現。centos

  

  對於消息的傳遞有兩種類型:瀏覽器

    一種是點對點的,即一個生產者和一個消費者一一對應;服務器

    

    另外一種是發佈/ 訂閱模式,即一個生產者產生消息並進行發送後,能夠由多個消費者進行接收。 session

    

二.ActiveMQ下載與安裝

  1.官方網站下載:http://activemq.apache.org/tcp

  2.centos上安裝ActiveMQ
分佈式

    1)將apache-activemq-5.12.0-bin.tar.gz 上傳至服務器ide

    2)解壓測試

      tar  zxvf  apache-activemq-5.12.0-bin.tar.gz網站

    3)爲apache-activemq-5.12.0目錄賦權

      chmod 777 apache-activemq-5.12.0

    4)進入apache-activemq-5.12.0\bin目錄,賦與執行權限

     chmod 755 activemq 

  3.啓動  

    ./activemq start

    啓動成功出現以下信息

    

  4.打開瀏覽器輸入服務器地址

    http://192.168.56.102:8161

    便可訪問管理界面

    

    點擊Manage ActiveMQ broker登陸,登陸名和密碼均爲admin

  5.點對點消息列表Queues  

    

    Number Of Pending Messages  :等待消費的消息 這個是當前未出隊列的數量。
    Number Of Consumers  :消費者 這個是消費者端的消費者數量
    Messages Enqueued  :進入隊列的消息  進入隊列的總數量,包括出隊列的。
    Messages Dequeued  :出了隊列的消息  能夠理解爲是消費這消費掉的數量。

三.測試案例

  1.點對點模式

    1.1消息生產者

    1)建立Maven工程,引入依賴

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-client</artifactId>
    <version>5.13.4</version>
</dependency>

    2)建立類QueueProducer  main方法代碼以下:

    //1.建立鏈接工廠
    ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616");
    //2.獲取鏈接
    Connection connection = connectionFactory.createConnection();
    //3.啓動鏈接
    connection.start();
    //4.獲取session  (參數1:是否啓動事務,參數2:消息確認模式)
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        
    //5.建立隊列對象
    Queue queue = session.createQueue("test-queue");
    //6.建立消息生產者
    MessageProducer producer = session.createProducer(queue);
    //7.建立消息
    TextMessage textMessage = session.createTextMessage("歡迎來到ActiveMQ世界");
    //8.發送消息
    producer.send(textMessage);
    //9.關閉資源
    producer.close();
    session.close();
    connection.close();    

    上述代碼中第4步建立session  的兩個參數: 

    第2個參數 消息的確認模式

  • AUTO_ACKNOWLEDGE = 1    自動確認
  • CLIENT_ACKNOWLEDGE = 2    客戶端手動確認   
  • DUPS_OK_ACKNOWLEDGE = 3    自動批量確認
  • SESSION_TRANSACTED = 0    事務提交併確認

    運行後經過ActiveMQ管理界面查詢 

  

    1.2 消息消費者

    建立類QueueConsumer ,main方法代碼以下:

    //1.建立鏈接工廠
    ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.56.102:61616");
    //2.獲取鏈接
    Connection connection = connectionFactory.createConnection();
    //3.啓動鏈接
    connection.start();
    //4.獲取session  (參數1:是否啓動事務,參數2:消息確認模式)
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    //5.建立隊列對象
    Queue queue = session.createQueue("test-queue");
    //6.建立消息消費
    MessageConsumer consumer = session.createConsumer(queue);
    
    //7.監聽消息
    consumer.setMessageListener(new MessageListener() {
        public void onMessage(Message message) {
            TextMessage textMessage=(TextMessage)message;
            try {
                System.out.println("接收到消息:"+textMessage.getText());
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    });    
    //8.等待鍵盤輸入
    System.in.read();    
    //9.關閉資源
    consumer.close();
    session.close();
    connection.close();    

    執行後看到控制檯輸出:歡迎來到神奇的ActiveMq世界

    1.3 運行測試

    同時開啓2個以上的消費者,再次運行生產者,觀察每一個消費者控制檯的輸出,會發現只有一個消費者會接收到消息。

  2.發佈訂閱模式

    2.1消息生產者

    建立類TopicProducer ,main方法代碼以下:

  

//1.建立鏈接工廠
    ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.56.102:61616");
    //2.獲取鏈接
    Connection connection = connectionFactory.createConnection();
    //3.啓動鏈接
    connection.start();
    //4.獲取session  (參數1:是否啓動事務,參數2:消息確認模式)
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    //5.建立主題對象
    Topic topic = session.createTopic("test-topic");
    //6.建立消息生產者
    MessageProducer producer = session.createProducer(topic);
    //7.建立消息
    TextMessage textMessage = session.createTextMessage("歡迎來到神奇的ActiveMq世界");
    //8.發送消息
    producer.send(textMessage);
    //9.關閉資源
    producer.close();
    session.close();
    connection.close();

    2.2消息消費者

    建立類TopicConsumer ,main方法代碼以下:

    

    //1.建立鏈接工廠
    ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.56.102:61616");
    //2.獲取鏈接
    Connection connection = connectionFactory.createConnection();
    //3.啓動鏈接
    connection.start();
    //4.獲取session  (參數1:是否啓動事務,參數2:消息確認模式)
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    //5.建立主題對象
    //Queue queue = session.createQueue("test-queue");
    Topic topic = session.createTopic("test-topic");
    //6.建立消息消費
    MessageConsumer consumer = session.createConsumer(topic);
    
    //7.監聽消息
    consumer.setMessageListener(new MessageListener() {
        public void onMessage(Message message) {
            TextMessage textMessage=(TextMessage)message;
            try {
                System.out.println("接收到消息:"+textMessage.getText());
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    });
    //8.等待鍵盤輸入
    System.in.read();
    //9.關閉資源
    consumer.close();
    session.close();
    connection.close();    

    2.3 運行測試

同時開啓2個以上的消費者,再次運行生產者,觀察每一個消費者控制檯的輸出,會發現每一個消費者會接收到消息。 

相關文章
相關標籤/搜索