一,消息中間件的簡單介紹:java
消息中間件利用高效可靠的消息傳遞機制進行平臺無關的數據交流,並基於數據通訊來進行分佈式系統的集成mysql
二.JMSsql
JMS(Java Messaging Service)是Java平臺上有關面向消息中間件的技術規範,它便於消息系統中的Java應用程序進行消息交換,而且經過提供標準的產生、發送、接收消息的接口簡化企業應用的開發。數據庫
JMS自己只定義了一系列的接口規範,是一種與廠商無關的 API,用來訪問消息收發系統。它相似於 JDBC(java Database Connectivity):這裏,JDBC 是能夠用來訪問許多不一樣關係數據庫的 API,而 JMS 則提供一樣與廠商無關的訪問方法,以訪問消息收發服務。apache
JMS 定義了五種不一樣的消息正文格式:瀏覽器
· TextMessage--一個字符串對象session
· MapMessage--一套名稱-值對maven
· ObjectMessage--一個序列化的 Java 對象tcp
· BytesMessage--一個字節的數據流分佈式
· StreamMessage -- Java 原始值的數據流
JMS消息傳遞類型
點對點模式:即一個生產者和一個消費者一一對應;
發佈/訂閱模式:即一個生產者產生消息並進行發送後,能夠由多個消費者進行接收
--------------------------------------------------------
ActiveMQ官方網站下載:http://activemq.apache.org/
ActiveMQ安裝完成並啓動完成以後,在瀏覽器訪問http://IP地址:8161/ 便可進入ActiveMQ管理頁面 u/p:admin/admin
點對點模式Demo
建立Maven工程(jar)
pom.xml引入依賴
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.13.4</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- java編譯插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
生產者:
public class QueueProducer {
public static void main(String[] args) throws JMSException {
//1.建立鏈接工廠
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.93.131:61616");
//2.獲取鏈接
Connection connection = connectionFactory.createConnection();
//3.啓動鏈接
connection.start();
//4.獲取Session(參數1爲是否啓動事務,第二個參數:消息確認模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.建立隊列對象
Queue queue = session.createQueue("test-queue");
//6.建立消息生產者
MessageProducer producer = session.createProducer(queue);
//7.建立消息
TextMessage textMessage = session.createTextMessage("AvtiveDemo測試");
//8.發送消息
producer.send(textMessage);
//9.關閉資源
producer.close();
}
}
消費者:
public class QueueConsumer {
public static void main(String[] args) throws JMSException, IOException {
//1.建立鏈接工廠
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.93.131:61616");
//2.獲取鏈接
Connection connection = connectionFactory.createConnection();
//3.啓動鏈接
connection.start();
//4.獲取session (參數1:是否啓動事務,參數2:消息確認模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.建立隊列對象
Queue queue = session.createQueue("test-queue");
//6.建立消息消費
MessageConsumer consumer = session.createConsumer(queue);
//7.監聽消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
try {
System.out.println("接收到消息:"+textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
//8.等待鍵盤輸入
System.in.read();
//9.關閉資源
consumer.close();
session.close();
connection.close();
}
}
消費者:
public class QueueConsumer {
public static void main(String[] args) throws JMSException, IOException {
//1.建立鏈接工廠
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.93.131:61616");
//2.獲取鏈接
Connection connection = connectionFactory.createConnection();
//3.啓動鏈接
connection.start();
//4.獲取session (參數1:是否啓動事務,參數2:消息確認模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.建立隊列對象
Queue queue = session.createQueue("test-queue");
//6.建立消息消費
MessageConsumer consumer = session.createConsumer(queue);
//7.監聽消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
try {
System.out.println("接收到消息:"+textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
//8.等待鍵盤輸入
System.in.read();
//9.關閉資源
consumer.close();
session.close();
connection.close();
}
}
發佈/訂閱模式
生產者
public static void main(String[] args) throws JMSException {
//1.建立鏈接工廠
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.93.131:61616");
//2.獲取鏈接
Connection connection = connectionFactory.createConnection();
//3.啓動鏈接
connection.start();
//4.獲取session (參數1:是否啓動事務,參數2:消息確認模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.建立主題對象
Topic topic = session.createTopic("test-topic");
//6.建立消息生產者
MessageProducer producer = session.createProducer(topic);
//7.建立消息
TextMessage textMessage = session.createTextMessage("topic測試");
//8.發送消息
producer.send(textMessage);
//9.關閉資源
producer.close();
session.close();
connection.close();
}
消費者:消費者能夠建多個運行接收信息
public static void main(String[] args) throws JMSException, IOException {
//1.建立鏈接工廠 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.93.131:61616"); //2.獲取鏈接 Connection connection = connectionFactory.createConnection(); //3.啓動鏈接 connection.start(); //4.獲取session (參數1:是否啓動事務,參數2:消息確認模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.建立主題對象 //Queue queue = session.createQueue("test-queue"); Topic topic = session.createTopic("test-topic"); //6.建立消息消費 MessageConsumer consumer = session.createConsumer(topic); //7.監聽消息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //8.等待鍵盤輸入 System.in.read(); //9.關閉資源 consumer.close(); session.close(); connection.close(); }