一:JMS基本概念html
JMS即Java消息服務(Java Message Service)應用程序接口是一個Java平臺中關於面向消息中間件(MOM)的API,用於在兩個應用程序之間,或分佈式系統中發送消息,進行異步通訊(用於解決兩個或者多個程序之間的耦合)。它便於消息系統中的 Java 應用程序進行消息交換,而且經過提供標準的產生、發送、接收消息的接口簡化企業應用的開發。java
也就是說它定義看一系列規範,而後你們按照這種規範來開發本身消息服務,固然,如今有好多開源的來供你們使用了, 好比說Apache ActiveMQ、RabbitMQ、Redis、Jafka/Kafka 等等這些web
1. JMS的目標apache
爲企業級的應用提供一種智能的消息系統,JMS定義了一整套的企業級的消息概念與工具,儘量最小化的Java語言概念去構建最大化企業消息應用。統一已經存在的企業級消息系統功能。windows
2. JMS應用程序, 服務器
一個完整的JMS應用應該實現如下功能:session
l JMS 客戶端 – Java語言開發的接受與發送消息的程序併發
l 非JMS客戶端 – 基於消息系統的本地API實現而不是JMSapp
l 消息 – 應用程序用來相互交流信息的載體webapp
l 被管理對象–預先配置的JMS對象,JMS管理員建立,被客戶端運用。如連接工廠,主題等
l JMS提供者–完成JMS功能與管理功能的消息系統
3. JMS體系結構
描述以下:
l JMS提供者(JMS的實現者,好比activemq jbossmq等)
l JMS客戶(使用提供者發送消息的程序或對象,例如在12306中,負責發送一條購票消息處處理隊列中,用來解決購票高峯問題,那麼,發送消息到隊列的程序和從隊列獲取消息的程序都叫作客戶)
l JMS生產者,JMS消費者(生產者及負責建立併發送消息的客戶,消費者是負責接收並處理消息的客戶)
l JMS消息(在JMS客戶之間傳遞數據的對象)
l JMS隊列(一個容納那些被髮送的等待閱讀的消息的區域)
l JMS主題(一種支持發送消息給多個訂閱者的機制)
4. JMS對象模型
l 鏈接工廠(connectionfactory)客戶端使用鏈接工廠建立一個JMS鏈接(connection)。
l JMS鏈接 表示JMS客戶端和服務器端之間的一個活動的鏈接,是由客戶端經過調用鏈接工廠的方法創建的。
l JMS會話 session 標識JMS客戶端和服務端的會話狀態。會話創建在JMS鏈接上,標識客戶與服務器之間的一個會話進程。
二:JMS的消息模式
1. 點對點的消息模式(Point to Point Messaging)
點對點消息模型:經過一個服務器消息隊列實現,消息的發送者向隊列寫入消息,消息的接收者從隊列取出消息。
下面的JMS對象在點對點消息模式中是必須的:
a. 隊列(Queue) – 一個提供者命名的隊列對象,客戶端將會使用這個命名的隊列對象
b. 隊列連接工廠(QueueConnectionFactory) – 客戶端使用隊列連接工廠建立連接隊列
ConnectionQueue來取得與JMS點對點消息提供者的連接。
c. 連接隊列(ConnectionQueue) – 一個活動的連接隊列存在在客戶端與點對點消息提供者之間,客戶用它建立一個或者多個JMS隊列會話(QueueSession)
d. 隊列會話(QueueSession) – 用來建立隊列消息的發送者與接受者(QueueSenderand
QueueReceiver)
e. 消息發送者(QueueSender 或者MessageProducer)– 發送消息到已經聲明的隊列
f. 消息接受者(QueueReceiver或者MessageConsumer) – 接受已經被髮送到指定隊列的消息
2. 發佈訂閱模式(publish – subscribe Mode)
發佈-訂閱模式:把消息發送到給一個主題(Topic),消息服務器將消息發佈給訂閱器該主題的每個訂閱者。舉個通俗的例子,就比如如一家雜誌社(至關於消息發送者)把一堆雜誌(至關於消息)寄到了郵政(至關於主題),再由郵政將雜誌發給每個有訂閱這本雜誌的讀者(至關於消息接收者)
必須的消息對象:
a. 主題Topic(Destination) – 一個提供者命名的主題對象,客戶端將會使用這個命名的主題對象
b. 主題連接工廠(TopciConnectionFactory) – 客戶端使用主題連接工廠建立連接主題
ConnectionTopic來取得與JMS消息Pub/Sub提供者的連接。
c. 連接主題(ConnectionTopic) – 一個活動的連接主題存在發佈者與訂閱者之間
d. 會話(TopicSession) – 用來建立主題消息的發佈者與訂閱者 (TopicPublisher and
TopicSubscribers)
e. 消息發送者MessageProducer) – 發送消息到已經聲明的主題
f. 消息接受者(MessageConsumer) – 接受已經被髮送到指定主題的消息
3. 區別:
點對點模型每個消息只有一個接收者。
發佈-訂閱消息模式的每個消息能夠有多個接收者。
三:介紹ActiveMQ
ActiveMQ 是 Apache 出品,最流行的、能力強勁的開源消息總線。ActiveMQ 是一個徹底支持 JMS1.1 和 J2EE 1.4 規範的 JMS Provider 實現,能夠很容易內嵌到使用Spring的系統裏面去,因此咱們選擇它。
ActiveMQ擁有如下優勢
1.支持多種語言和協議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
2.徹底支持JMS1.1和J2EE 1.4規範 (持久化,XA消息,事務)
3.對Spring的支持,ActiveMQ能夠很容易內嵌到使用Spring的系統裏面
4.徹底支持JMS1.1和J2EE 1.4規範 (持久化,XA消息,事務)
5.經過了常見J2EE服務器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中經過JCA 1.5 resource adaptors的配置,可讓ActiveMQ能夠自動的部署到任何兼容J2EE 1.4 商業服務器上
6.支持多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
7.從設計上保證了高性能的集羣,客戶端-服務器,點對點
8.支持Ajax
9.支持與Axis的整合
10.能夠很容易得調用內嵌JMS provider,進行測試
安裝:
ActiveMQ(本文簡稱MQ)要求JDK1.5以上。
下載地址:
http://activemq.apache.org/download.html
解壓:
activemq-all-5.5.0.jar:全部MQ JAR包的集合,用於用戶系統調用
bin:其中包含MQ的啓動腳本
conf:包含MQ的全部配置文件
data:日誌文件及持久性消息數據
example:MQ的示例
lib:MQ運行所需的全部Lib
webapps:MQ的Web控制檯及一些相關的DEMO
啓動MQ:
Linux ./active start | stop
windows: 雙擊bin目錄下的activemq.bat文件便可啓動MQ
登陸地址:
http://IP:8161 (http://172.16.0.15:8161)
四:基於ActiveMQ的(Point to Point)模式Demo程序
消息對象
public class MqBean implements Serializable {
private Integer age;
private String name;
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
4.1 隊列消息的發送:
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.transport.TransportListener;
import javax.jms.*;
import java.io.IOException;
/**
* Created with IntelliJ IDEA.
* Project Name : ActiveMQ
* User: Jelynn
* Date: 2017/4/10
* Time: 10:07
* Describe:
* Version:1.0
*/
public class Sender {
public static void main(String[] args) {
send();
}
//隊列消息的發送
public static void send() {
Connection connection;
Session session;
Destination destination;
MessageProducer producer;
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "failover:tcp://172.16.0.15:61616");
connectionFactory.setTrustAllPackages(true);
try {
connection = connectionFactory.createConnection();
connection.start();
//第一個參數是是不是事務型消息,設置爲true,第二個參數無效
//第二個參數是
//Session.AUTO_ACKNOWLEDGE爲自動確認,客戶端發送和接收消息不須要作額外的工做。異常也會確認消息,應該是在執行以前確認的
//Session.CLIENT_ACKNOWLEDGE爲客戶端確認。客戶端接收到消息後,必須調用javax.jms.Message的acknowledge方法。jms服務器纔會刪除消息。能夠在失敗的
//時候不確認消息,不確認的話不會移出隊列,一直存在,下次啓動繼續接受。接收消息的鏈接不斷開,其餘的消費者也不會接受(正常狀況下隊列模式不存在其餘消費者)
//DUPS_OK_ACKNOWLEDGE容許副本的確認模式。一旦接收方應用程序的方法調用從處理消息處返回,會話對象就會確認消息的接收;並且容許重複確認。在須要考慮資源使用時,這種模式很是有效。
//待測試
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
destination = session.createQueue("jelynn-queue"); //點對點模型
producer = session.createProducer(destination);
//NON_PERSISTENT
//PERSISTENT
producer.setDeliveryMode(DeliveryMode.PERSISTENT); //消息持久化,默認就是持久的(未消費的消息會持久化)
//ObjectMessage
MqBean mqBean = new MqBean();
mqBean.setAge(20);
int i = 0;
String str;
while (true){
i++;
// str = "小黃" + i;
// producer.send(session.createTextMessage(str));
mqBean.setName("小黃" + i);
producer.send(session.createObjectMessage(mqBean));
Thread.sleep(1000);
}
// producer.close();
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
4.2 隊列消息的接收
package com.jelynn.activemq.p2p;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Created with IntelliJ IDEA.
* Project Name : ActiveMQ
* User: Jelynn
* Date: 2017/4/10
* Time: 10:07
* Describe:
* Version:1.0
*/
public class Receiver {
public static void main(String[] args) {
receive();
}
//消息隊列接收
public static void receive(){
Connection connection;
Session session;
Destination destination;
MessageConsumer consumer;
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin","failover:tcp://172.16.0.15:61616");
connectionFactory.setTrustAllPackages(true);
try {
// 構造從工廠獲得鏈接對象
connection = connectionFactory.createConnection();
// 啓動
connection.start();
// 獲取操做鏈接
//這個最好仍是有事務
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("jelynn-queue");
consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
if(null != message){
MqBean mqBean = (MqBean) ((ObjectMessage)message).getObject();
System.out.println("接收到消息"+mqBean.getName());
}
// if(null != message){
// String str = ((TextMessage)message).getText();
// System.out.println("接收到消息 : "+str);
// }
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (JMSException e) {
e.printStackTrace();
}
}
}
若是針對一個queue,定義有多個Receiver,則一條message只能被一個Receiver消費,其餘的沒法接收到該消息。
注意:若是傳輸的消息爲ObjectMessage,須要進行以下配置:
在${ACTIVEMQ_HOME}/bin/env 的ACTIVEMQ_OPTS參數中添加:
-Dorg.apache.activemq.SERIALIZABLE_PACKAGES=* (*表示全部,也能夠添加具體的包)
5.12.4和5.13.0之後的版本,能夠在客戶端須要信任的包:
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); factory.setTrustedPackages(new ArrayList(Arrays.asList("org.apache.activemq.test,org.apache.camel.test".split(",")))); |
或者信任全部:
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); factory.setTrustAllPackages(true); |
參考:(http://activemq.apache.org/objectmessage.html)
五:基於ActiveMQ的Publish/subscribe模式Demo程序
5.1訂閱消息的發送
package com.jelynn.activemq.publishsubscribe;
import com.jelynn.activemq.p2p.MqBean;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Created by Jelynn on 2017/4/10.
* 訂閱消息的發送
*/
public class Publisher {
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("failover:tcp://172.16.0.15:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("myTopic.messages");
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
while(true) {
TextMessage message = session.createTextMessage();
message.setText("message_" + System.currentTimeMillis());
producer.send(message);
System.out.println("Sent message: " + message.getText());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// session.close();
// connection.stop();
// connection.close();
}
}
5.2訂閱消息的接收
package com.jelynn.activemq.publishsubscribe;
import com.jelynn.activemq.p2p.MqBean;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Created by Jelynn on 2017/4/10.
* 訂閱消息的接收
* <p/>
* Number Of Pending Messages 等待消費的消息這個是當前未出隊列的數量。能夠理解爲總接收數-總出隊列數
* Messages Enqueued 進入隊列的消息進入隊列的總數量,包括出隊列的。這個數量只增不減
* Messages Dequeued 出了隊列的消息能夠理解爲是消費這消費掉的數量
*/
public class Subscriber {
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("failover:tcp://172.16.0.15:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("myTopic.messages");
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
System.out.println("Received message: " + tm.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// session.close();
// connection.stop();
// connection.close();
}
}
能夠定義多個Subscriber,進行訂閱消息的接收,每一個Subscriber都能接收到訂閱消息