消息中間件利用高效可靠的消息傳遞機制進行平臺無關的數據交流,並基於數據通訊來進行分佈式系統的集成。經過提供消息傳遞和消息排隊模型,它能夠在分佈式環境下擴展進程間的通訊。而Java中的消息中間件則是JMS---Java Message Service.在常見的消息中間件類型ActiveMQ無疑是不錯的選擇。接下來咱們先簡單介紹一下什麼是JMS及ActiveMQ然後咱們介紹一下ActiveMQ中的兩種模式 -- 隊列模型以及主題模式。java
JMS基本概念
JMS(Java Message Service)是訪問企業消息系統的標準API,它便於消息系統中的Java應用程序進行消息交換,而且經過提供標準的產生、發送、接收消息的接口簡化企業應用的開發。數據庫
JMS應用由如下幾部分組成:
JMS provider :是一個消息系統,它實現了JMS 接口並提供管理和控制的功能。
JMS clients :是用Java語言寫的一些程序和組件,它們產生和使用消息。
Messages :是在JMS clients之間傳遞的消息的對象。
Administered objects :是由使用JMS clients 的人生成的預選設置好的JMS 對象。有兩種這樣的對象:
destinations和connection factories。apache
JMS基本功能
JMS是用於和麪向消息的中間件相互通訊的應用程序接口。它既支持點對點(point-to-point)的域,又支持發佈/訂閱 (publish/subscribe)類型的域,而且提供對下列類型的支持:經承認的消息傳遞,事務型消息的傳遞,一致性消息和具備持久性的訂閱者支 持。JMS還提供了另外一種方式來對您的應用與舊的後臺系統相集成。服務器
消息服務類型
1) point-to-point (PTP)方式:點到點的模型。消息由一個JMS客戶機(發佈者)發送到服務器上的一個目的地,即一個隊列(queue)。而另外一個JMS客戶機(訂閱者)則能夠訪問這個隊列,並從該服務器獲取這條消息。
2) publish/subscribe (pub/sub)方式:發佈-訂閱模型。這裏仍然是由一個JMS客戶機將一條消息發佈到服務器上的一個目的地上,可是此次這個目的地叫作一個主題 (topic),可有多個訂閱者去訪問該消息。消息將一直維持在主題中,直到這個主題的全部訂閱者都取走了該消息的一個副本。消息也包括了一個參數,用於 定義了該消息的耐久性(它可以在服務器上等待訂閱者多長時間)。session
ActiveMQ是Apache所提供的一個開源的消息系統,徹底採用Java來實現,所以,它能很好地支持J2EE提出的JMS(Java Message Service,即Java消息服務)規範。JMS是一組Java應用程序接口,它提供消息的建立、發送、讀取等一系列服務。JMS提供了一組公共應用程序接口和響應的語法,相似於Java數據庫的統一訪問接口JDBC,它是一種與廠商無關的API,使得Java程序可以與不一樣廠商的消息組件很好地進行通訊。
JMS支持兩種消息發送和接收模型。一種稱爲P2P(Ponit to Point)模型,即採用點對點的方式發送消息。P2P模型是基於隊列的,消息生產者發送消息到隊列,消息消費者從隊列中接收消息,隊列的存在使得消息的異步傳輸稱爲可能,P2P模型在點對點的狀況下進行消息傳遞時採用。下面兩張圖來描述一下這兩種模型:
異步
【1】隊列模型tcp
隊列模型之生產者端分佈式
package com.imooc.jms.queue;
import java.awt.font.TextMeasurer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;ide
public class AppProducer {url
private static final String url = "tcp://192.168.133.1:61616"; private static final String queueName = "queue-test"; public static void main(String[] args) throws JMSException { //1.建立connectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); //2.建立connection Connection connection = connectionFactory.createConnection(); //3. 啓動連接 connection.start(); //4. 建立會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5. 建立目標 Destination destination = session.createQueue(queueName); //6. 建立生產者向目標發送消息 MessageProducer producer= session.createProducer(destination); for(int i = 0 ;i<100;i++){ //7. 建立消息 TextMessage textMessage = session.createTextMessage("test"+i); //8. 發佈消息 producer.send(textMessage); System.out.println("發送消息:"+textMessage.getText()); } //9. 關閉鏈接 connection.close(); }
}
說明:此爲隊列模型中的生產者端,下面就該端代碼進行說明:
new 一個ConnectionFactory --》 new 一個connection ---》啓動這個鏈接----》建立一個會話----》建立消息發送目的地---》建立一個消息生產者向目的地發送消息--》發送消息----》關閉鏈接
隊列模型之消費者端
package com.imooc.jms.queue;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class AppConsumer {
private static final String url = "tcp://192.168.133.1:61616"; private static final String queueName = "queue-test"; public static void main(String[] args) throws JMSException { //1.建立connectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); //2.建立connection Connection connection = connectionFactory.createConnection(); //3. 啓動連接 connection.start(); //4. 建立會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5. 建立目標 Destination destination = session.createQueue(queueName); //6. 建立一個消費者 MessageConsumer consumer = session.createConsumer(destination); //7. 建立一個監聽器 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage = (TextMessage)message; try { System.out.println("接受消息:"+textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //9. 關閉鏈接 //connection.close(); }
}
說明:此爲隊列模型中的消費者端,下面就該端代碼進行說明:
new 一個ConnectionFactory --》 new 一個connection ---》啓動這個鏈接----》建立一個會話----》建立消息發送目的地---》建立一個消息消費者--》建立一個消息監聽器監聽消息----》關閉鏈接
【2】主題模型
主題模型之生產者端
package com.imooc.jms.topic;
import java.awt.font.TextMeasurer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class AppProducer {
private static final String url = "tcp://192.168.133.1:61616"; private static final String topicName = "topic-test"; public static void main(String[] args) throws JMSException { //1.建立connectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); //2.建立connection Connection connection = connectionFactory.createConnection(); //3. 啓動連接 connection.start(); //4. 建立會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5. 建立目標 Destination destination = session.createTopic(topicName); //6. 建立生產者向目標發送消息 MessageProducer producer= session.createProducer(destination); for(int i = 0 ;i<100;i++){ //7. 建立消息 TextMessage textMessage = session.createTextMessage("test"+i); //8. 發佈消息 producer.send(textMessage); System.out.println("發送消息:"+textMessage.getText()); } //9. 關閉鏈接 connection.close(); }
}
主題模型之消費者端
package com.imooc.jms.topic;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class AppConsumer {
private static final String url = "tcp://192.168.133.1:61616"; private static final String topicName = "topic-test"; public static void main(String[] args) throws JMSException { //1.建立connectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); //2.建立connection Connection connection = connectionFactory.createConnection(); //3. 啓動連接 connection.start(); //4. 建立會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5. 建立目標 Destination destination = session.createTopic(topicName); //6. 建立一個消費者 MessageConsumer consumer = session.createConsumer(destination); //7. 建立一個監聽器 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage = (TextMessage)message; try { System.out.println("接受消息:"+textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //9. 關閉鏈接 //connection.close(); }
}
最後建立項目源碼地址:
連接:http://pan.baidu.com/s/1bo1PSib 密碼:khyc