ActiveMQ 是由 Apache 出品的一款開源消息中間件,旨在爲應用程序提供高效、可擴展、穩定、安全的企業級消息通訊。 它的設計目標是提供標準的、面向消息的、多語言的應用集成消息通訊中間件。ActiveMQ 實現了 JMS 1.1 並提供了不少附加的特性,好比 JMX 管理、主從管理、消息組通訊、消息優先級、延遲接收消息、虛擬接收者、消息持久化、消息隊列監控等等。其主要特性有:html
由於 ActiveMQ 是完整支持 JMS 1.1 的,因此從 Java 使用者的角度其基本概念與 JMS 1.1 規範是一致的。java
點對點模型(Point to Point) 使用隊列(Queue)做爲消息通訊載體,知足生產者與消費者模式,一條消息只能被一個消費者使用,未被消費的消息在隊列中保留直到被消費或超時。程序員
發佈訂閱模型(Pub/Sub) 使用主題做爲消息通訊載體,相似於廣播模式,發佈者發佈一條消息,該消息經過主題傳遞給全部的訂閱者,在一條消息廣播以後才訂閱的用戶則是收不到該條消息的。web
ActiveMQ 使用時包含的基本組件各與 JMS 是相同的:spring
因爲這些概念在 JMS 中已介紹過,這裏再也不詳細介紹。數據庫
ActiveMQ Broker 的主要做用是爲客戶端應用提供一種通訊機制,爲此 ActiveMQ 提供了一種鏈接機制,並用鏈接器(connector)來描述這種鏈接機制。ActiveMQ 中鏈接器有兩種,一種是用於客戶端與消息代理服務器(client-to-broker)之間通訊的傳輸鏈接器(transport connector),一種是用於消息代理服務器之間(broker-to-broker)通訊的網絡鏈接器(network connector)。connector 使用 URI(統一資源定位符)來表示,URI 格式爲: <schema name>:<hierarchical part>[?<query>][#<fragment>]
schema name 表示協議, 例如:foo://username:password@example.com:8042/over/there/index.dtb?type=animal&name=narwhal#noseapache
其中 schema name 部分是 foo,hierarchical part 是 username:password@example.com:8042/over/there/index.dtb,query 是 type=animal&name=narwhal,fragment 是 nose。緩存
<transportConnectors>
<transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/>
<transportConnector name="ssl" uri="ssl://localhost:61617"/>
<transportConnector name="stomp" uri="stomp://localhost:61613"/>
<transportConnector name="ws" uri="ws://localhost:61614/" />
</transportConnectors>
複製代碼
傳輸鏈接器定義在<transportConnectors>
元素中,一個<transportConnector>
元素定義一個特定的鏈接器,一個鏈接器必須有本身惟一的名字和 URI 屬性,但discoveryUri
屬性是可選的。目前在 ActiveMQ 最新的5.15版本中經常使用的傳輸鏈接器鏈接協議有:vm、tcp、udp、multicast、nio、ssl、http、https、websocket、amqp、mqtt、stomp 等等安全
每一個協議的具體配置見官網(http://activemq.apache.org/uri-protocols.html )。除了以上這些基本協議以外 ActiveMQ 還支持一些高級協議也能夠經過 URI 的方式進行配置,好比 Failover 和 Fanout 。bash
如圖所示,服務器 S1 和 S2 經過 NewworkConnector 相連,生產者 P1 發送的消息,消費者 C3 和 C4 均可以接收到,而生產者 P3 發送的消息,消費者 C1 和 C2 也能夠接收到。要使用網絡鏈接器的功能須要在服務器 S1 的 activemq.xml 中的 broker 節點下添加以下配置(假設192.168.11.23:61617 爲 S2 的地址):
<networkConnectors>
<networkConnector uri="static:(tcp://192.168.11.23:61617)"/>
</networkConnectors>
複製代碼
若是隻是這樣,S1 能夠將消息發送到 S2,但這只是單方向的通訊,發送到 S2 上的的消息還不能發送到 S1 上。若是想 S1 也收到從 S2 發來的消息須要在 S2 的 activemq.xml 中的 broker 節點下也添加以下配置(假設192.168.11.45:61617爲 S1 的地址):
<networkConnectors>
<networkConnector uri="static:(tcp://192.168.11.45:61617)"/>
</networkConnectors>
複製代碼
這樣,S1和S2就能夠雙向通訊了。目前在 ActiveMQ 最新的5.15版本中經常使用的網絡鏈接器協議有 static 和 multicast 兩種。
static://(tcp://ip:61616,tcp://ip2:61616)
對這塊感興趣的話能夠看官方文檔:http://activemq.apache.org/networks-of-brokers.html
JMS 規範中消息的分發方式有兩種:非持久化和持久化。對於非持久化消息 JMS 實現者須保證盡最大努力分發消息,但消息不會持久化存儲;而持久化方式分發的消息則必須進行持久化存儲。非持久化消息經常使用於發送通知或實時數據,當你比較看重系統性能而且即便丟失一些消息並不影響業務正常運做時可選擇非持久化消息。持久化消息被髮送到消息服務器後若是當前消息的消費者並無運行則該消息繼續存在,只有等到消息被處理並被消息消費者確認以後,消息纔會從消息服務器中刪除。
對以上這兩種方式 ActiveMQ 都支持,而且還支持經過緩存在內存中的中間狀態消息的方式來恢復消息。歸納起來看 ActiveMQ 的消息存儲有三種:存儲到內存、存儲到文件、存儲到數據庫。具體使用上 ActiveMQ 提供了一個插件式的消息存儲機制,相似於消息的多點傳播,主要實現了以下幾種:
JMS 規範中傳遞消息的方式有兩種,一種是點對點模型的隊列(Queue)方式,另外一種是發佈訂閱模型的主題(Topic)方式。下面看下用 ActiveMQ 以主題方式傳遞消息的 Java 示例。
Java 工程中須要引入 ActiveMQ 包的依賴,jar 包版本同你安裝 ActiveMQ 版本一致便可:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.2</version>
</dependency>
複製代碼
package org.study.mq.activeMQ;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class TopicPublisher {
/**
* 默認用戶名
*/
public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默認密碼
*/
public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默認鏈接地址
*/
public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) {
//建立鏈接工廠
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
try {
//建立鏈接
Connection connection = connectionFactory.createConnection();
//開啓鏈接
connection.start();
//建立會話,不須要事務
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//建立 Topic,用做消費者訂閱消息
Topic myTestTopic = session.createTopic("activemq-topic-test1");
//消息生產者
MessageProducer producer = session.createProducer(myTestTopic);
for (int i = 1; i <= 3; i++) {
TextMessage message = session.createTextMessage("發送消息 " + i);
producer.send(myTestTopic, message);
}
//關閉資源
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
複製代碼
在 Topic 模式中消息生產者是用於發佈消息的,絕大部分代碼與 Queue 模式中類似,不一樣的是本例中基於 Session 建立的是主題(Topic),該主題做爲消費者消費消息的目的地。
package org.study.mq.activeMQ;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class TopicSubscriber {
/**
* 默認用戶名
*/
public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默認密碼
*/
public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默認鏈接地址
*/
public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) {
//建立鏈接工廠
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
try {
//建立鏈接
Connection connection = connectionFactory.createConnection();
//開啓鏈接
connection.start();
//建立會話,不須要事務
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//建立 Topic
Topic myTestTopic = session.createTopic("activemq-topic-test1");
MessageConsumer messageConsumer = session.createConsumer(myTestTopic);
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
System.out.println("消費者1 接收到消息:" + ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
MessageConsumer messageConsumer2 = session.createConsumer(myTestTopic);
messageConsumer2.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
System.out.println("消費者2 接收到消息:" + ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
MessageConsumer messageConsumer3 = session.createConsumer(myTestTopic);
messageConsumer3.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
System.out.println("消費者3 接收到消息:" + ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//讓主線程休眠100秒,使消息消費者對象能繼續存活一段時間從而能監聽到消息
Thread.sleep(100 * 1000);
//關閉資源
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
複製代碼
爲了展現主題模式中消息廣播給多個訂閱者的功能,這裏建立了三個消費者對象並訂閱了同一個主題,比較特殊的是最後讓主線程休眠了一段時間,這麼作的目的是讓消費者對象能繼續存活,從而使控制檯能打印出監聽到的消息內容。
在 ActiveMQ 的 bin 目錄下直接執行activemq start
即啓動了 ActiveMQ
須要先運行 TopicSubscriber 類的 main 方法,這樣發佈者發佈消息的時候訂閱者才能接收到消息,若是將執行順序倒過來則消息先發布出去但沒有任何訂閱者在運行,則看不到消息被消費了。
接着運行 TopicPublisher 類的 main 方法,向主題中發佈3條消息,而後能夠在 TopicSubscriber 後臺看到接收到的消息內容:
在實際項目中若是使用原生的 ActiveMQ API 開發顯然比較囉嗦,這中間建立鏈接工廠、建立鏈接之類代碼徹底能夠抽取出來由框架統一作,這些事情 Spring 也想到了並幫咱們作了。ActiveMQ 徹底支持基於 Spring 的方式 配置 JMS 客戶端和服務器,下面的例子展現一下在 Spring 中如何使用隊列模式和主題模式傳遞消息。
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.2</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.3.10.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.0</version>
</dependency>
複製代碼
工程中除了 activemq 的包以外還要添加 Spring 支持 JMS 的包。因爲 connection、session、producer 的建立會消耗大量系統資源,爲此這裏使用 鏈接池 來複用這些資源,因此還要添加 activemq-pool 的依賴。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">
<context:component-scan base-package="org.study.mq.activeMQ.spring"/>
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://localhost:61616</value>
</property>
</bean>
</property>
<property name="maxConnections" value="100"></property>
</bean>
<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="jmsFactory"/>
<property name="sessionCacheSize" value="1"/>
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="cachingConnectionFactory"/>
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>
<bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg name="name" value="spring-queue"/>
</bean>
<bean id="testTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="spring-topic"/>
</bean>
<bean id="queueListener" class="org.study.mq.activeMQ.spring.QueueListener"/>
<bean id="topic1Listener" class="org.study.mq.activeMQ.spring.Topic1Listener"/>
<bean id="topic2Listener" class="org.study.mq.activeMQ.spring.Topic2Listener"/>
<bean id="queueContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="cachingConnectionFactory"/>
<property name="destination" ref="testQueue"/>
<property name="messageListener" ref="queueListener"/>
</bean>
<bean id="topic1Container"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="cachingConnectionFactory"/>
<property name="destination" ref="testTopic"/>
<property name="messageListener" ref="topic1Listener"/>
</bean>
<bean id="topic2Container"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="cachingConnectionFactory"/>
<property name="destination" ref="testTopic"/>
<property name="messageListener" ref="topic2Listener"/>
</bean>
</beans>
複製代碼
下面的項目示例中的 Java 代碼採用註解的方式,這也是如今不少程序員的習慣用法,因此在配置文件一開始定義註解掃描包路徑org.study.mq.activeMQ.spring
,您能夠根據本身實際狀況修改包名稱,本例中的全部 Java 代碼都放在該包之下。
接下來定義了一個 JMS 工廠 bean,採用的是池化鏈接工廠類org.apache.activemq.pool.PooledConnectionFactory
,實際就是對內部的 ActiveMQ 鏈接工廠增長了鏈接池的功能,從其內部配置能夠看到就是對org.apache.activemq.ActiveMQConnectionFactory
的功能封裝,而ActiveMQConnectionFactory
類則比較熟悉了,就是上面 Java 訪問 ActiveMQ 示例一開始建立鏈接工廠時使用的類。brokerURL 屬性配置的就是鏈接服務器的協議和服務器地址。接下來的 cachingConnectionFactory 是實際項目代碼中經常使用的,對鏈接工廠的又一層加強,使用鏈接的緩存功能以提高效率,讀者可酌情選擇使用。
jmsTemplate 就是 Spring 解決 JMS 訪問時冗長重複代碼的方案,它須要配置的兩個主要屬性是 connectionFactory 和 messageConverter,經過 connectionFactory 獲取鏈接、會話等對象, messageConverter 則是配置消息轉換器,由於一般消息在發送前和接收後都須要進行一個前置和後置處理,轉換器便進行這個工做。這樣實際代碼直接經過 jmsTemplate 來發送和接收消息,而每次發送接收消息時建立鏈接工廠、建立鏈接、建立會話等工做都由 Spring 框架作了。
有了 JMS 模板還須要知道隊列和主題做爲實際發送和接收消息的目的地,因此接下來定義了 testQueue 和 testTopic 做爲兩種模式的示例。而異步接收消息時則須要提供 MessageListener 的實現類,因此定義了 queueListener 做爲隊列模式下異步接收消息的監聽器,topic1Listener 和 topic2Listener 做爲主題模式下異步接收消息的監聽器,主題模式用兩個監聽器是爲了演示多個消費者時都能收到消息。最後的 queueContainer、topic1Container、topic2Container 用於將消息監聽器綁定到具體的消息目的地上。
下面是使用 JMS 模板處理消息的消息服務類
package org.study.mq.activeMQ.spring;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import javax.jms.*;
@Service
public class MessageService {
@Resource(name = "jmsTemplate")
private JmsTemplate jmsTemplate;
@Resource(name = "testQueue")
private Destination testQueue;
@Resource(name = "testTopic")
private Destination testTopic;
//向隊列發送消息
public void sendQueueMessage(String messageContent) {
jmsTemplate.send(testQueue, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage msg = session.createTextMessage();
// 設置消息內容
msg.setText(messageContent);
return msg;
}
});
}
//向主題發送消息
public void sendTopicMessage(String messageContent) {
jmsTemplate.send(testTopic, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage msg = session.createTextMessage();
// 設置消息內容
msg.setText(messageContent);
return msg;
}
});
}
}
複製代碼
@Service 將該類聲明爲一個服務,實際項目中不少服務代碼也相似。經過 Resource 註解直接將上面配置文件中定義的 jmsTemplate 引入到 MessageService 類中就能夠直接使用了,testQueue 和 testTopic 也是相似,服務類中直接引入配置文件中定義好的隊列和主題。重點是下面的兩個發送消息的方法,sendQueueMessage 向隊列發送消息,sendTopicMessage 向主題發送消息,兩種模式都使用了 jmsTemplate 的 send 方法,send 方法第1個參數是javax.jms.Destination
類型,表示消息目的地。因爲javax.jms.Queue
和javax.jms.Topic
都繼承了javax.jms.Destination
接口,因此該方法對隊列模式和主題模式都適用。send 方法的第2個參數是org.springframework.jms.core.MessageCreator
,這裏使用了匿名內部類的方式建立對象,從支持的 Session 對象中建立文本消息,這樣就能夠發送消息了。能夠看到不管是隊列仍是主題,經過 Spring 框架來發送消息的代碼比以前的 Java 代碼示例簡潔了不少。
package org.study.mq.activeMQ.spring;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class QueueListener implements MessageListener {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
TextMessage txtMsg = (TextMessage) message;
String messageStr = txtMsg.getText();
System.out.println("隊列監聽器接收到文本消息:" + messageStr);
} catch (JMSException e) {
e.printStackTrace();
}
} else {
throw new IllegalArgumentException("只支持 TextMessage 類型消息!");
}
}
}
複製代碼
隊列消息監聽器在收到消息時校驗是不是文本消息類型,是的話則打印出內容。
package org.study.mq.activeMQ.spring;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class Topic1Listener implements MessageListener {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
TextMessage txtMsg = (TextMessage) message;
String messageStr = txtMsg.getText();
System.out.println("主題監聽器1 接收到文本消息:" + messageStr);
} catch (JMSException e) {
e.printStackTrace();
}
} else {
throw new IllegalArgumentException("只支持 TextMessage 類型消息!");
}
}
}
複製代碼
package org.study.mq.activeMQ.spring;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class Topic2Listener implements MessageListener {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
TextMessage txtMsg = (TextMessage) message;
String messageStr = txtMsg.getText();
System.out.println("主題監聽器2 接收到文本消息:" + messageStr);
} catch (JMSException e) {
e.printStackTrace();
}
} else {
throw new IllegalArgumentException("只支持 TextMessage 類型消息!");
}
}
}
複製代碼
主題監聽器的代碼與隊列監聽器相似,只是打印時經過不一樣字符串表示當前是不一樣監聽器接收的消息。
爲了演示例子,寫了一個 StartApplication 類,在 main 方法中加載 Spring ,獲取到 MessageService 服務以後調用 sendQueueMessage 和 sendTopicMessage 方法發送消息。
package org.study.mq.activeMQ.spring;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class StartApplication {
public static void main(String[] args) {
ApplicationContext ctx = new ClassPathXmlApplicationContext("spring-context.xml");
MessageService messageService = (MessageService) ctx.getBean("messageService");
messageService.sendQueueMessage("個人測試消息1");
messageService.sendTopicMessage("個人測試消息2");
messageService.sendTopicMessage("個人測試消息3");
}
}
複製代碼
啓動好 activeMQ 服務以後運行 StartApplication 類,在控制檯看到接收到文本消息:
隊列監聽器監聽到了一條消息,兩個主題監聽器分別監聽到了兩條消息。