上篇博文深刻淺出 JMS(一) – JMS 基本概念,咱們介紹了消息通訊的規範JMS,這篇博文介紹一款開源的 JMS 具體實現—— ActiveMQ。ActiveMQ 是一個易於使用的消息中間件。html
咱們簡單的介紹一下消息中間件,對它有一個基本認識就好,消息中間件有不少的用途和優勢:java
首先簡單的介紹一下 MQ,MQ 英文名 MessageQueue,中文名也就是你們用的消息隊列,幹嗎用的呢,說白了就是一個消息的接受和轉發的容器,可用於消息推送。web
下面進入咱們今天的主題,爲你們介紹 ActiveMQ。apache
Apache ActiveMQ ™ is the most popular and powerful open source messaging and Integration Patterns server. Apache ActiveMQ is fast, supports many Cross Language Clients and Protocols, comes with easy to use Enterprise Integration Patterns and many advanced features while fully supporting JMS 1.1 and J2EE 1.4.
ActiveMQ 是由 Apache 出品的,一款最流行的,能力強勁的開源消息總線。ActiveMQ 是一個徹底支持 JMS1.1 和 J2EE 1.4 規範的 JMS Provider 實現,它很是快速,支持多種語言的客戶端和協議,並且能夠很是容易的嵌入到企業的應用環境中,並有許多高級功能。後端
下面咱們下載一個版本,玩一玩。瀏覽器
ActiveMQ 下載網站:http://activemq.apache.org/activemq-5153-release.html安全
你們如今好以後,將 apache-activemq-5.15.3-bin.zip 解壓縮,咱們能夠看到它的總體目錄結構:服務器
從它的目錄來講,仍是很簡單的:網絡
咱們瞭解 activemq 的基本目錄,下面咱們運行一下 activemq 服務,雙擊 bin 目錄下的 bin/win64/activemq.bat 腳本文件,就能夠看下圖的效果。session
從上圖咱們能夠看到 activemq 的存放地址,以及瀏覽器要訪問的地址。
ActiveMQ 默認使用的 TCP 鏈接端口是 61616, 經過查看該端口的信息能夠測試 ActiveMQ 是否成功啓動
netstat -an | find "61616" TCP 0.0.0.0:61616 0.0.0.0:0 LISTENING
ActiveMQ 默認啓動時,啓動了內置的 jetty 服務器,提供一個用於監控 ActiveMQ 的 admin 應用。地址:http://127.0.0.1:8161/admin/
用戶名和密碼都是 admin
多個項目之間集成
下降系統間模塊的耦合度,解耦
系統先後端隔離
們首先寫一個簡單的 Hello World 示例,讓你們感覺下 Activemq,咱們須要實現接受者和發送者兩部分代碼的編寫。
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.3</version> </dependency>
public class Producer { public static void main(String[] args) throws JMSException { //1. 建立 ConnectionFactory 鏈接工廠 ConnectionFactory factory = new ActiveMQConnectionFactory( // (1) //ActiveMQConnectionFactory.DEFAULT_USER, //ActiveMQConnectionFactory.DEFAULT_PASSWORD, "admin", "password", "tcp://localhost:61616/" ); //2. 建立 connection,並啓動鏈接 Connection connection = factory.createConnection(); // (2) connection.start(); //3. Session 是一個發送或接收消息的線程 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // (3) //4. 指定生產消息目標禾消費消息來源的 Destination 對象 Destination dest = session.createQueue("queue1"); // (4) //5. 建立生產者 MessageProducer producer = session.createProducer(dest); // (5) //6. 指定簽收模式 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // (6) //7. 建立消息 for (int i = 0; i < 10; i++) { // (7) TextMessage message = session.createTextMessage("aaaaaaaaaaaaaaa"); producer.send(message); } if (connection != null) { connection.close(); } } }
建立 ConnectionFactory 實例,ActiveMQConnectionFactory 構造函數傳入三個參數,分別是用戶名,密碼,消息地址,tcp 端口能夠在 conf/activemq.xml 中配製。
建立 connection,並啓動鏈接,Connection 默認是關閉的。注意:使用結束後要關閉 connection.close()
。
建立 session 會話,一個 connection 能夠建立多個 session,Session 是一個發送或接收消息的線程。
參數1:是否開啓事務,若是開啓事務,則在必須提交 session
for (int i = 0; i < 10; i++) { // (7) TextMessage message = session.createTextMessage("aaaaaaaaaaaaaaa"); producer.send(message); } session.commit();
參數2:簽收模式,有 Session.AUTO_ACKNOWLEDGE
(自動)、Session.CLIENT_ACKNOWLEDGE
(手動 經常使用)、Session.DUPS_OK_ACKNOWLEDGE
(可能重複簽收),如若選擇 Session.CLIENT_ACKNOWLEDGE
,則必須在消費端確認,不然 ActiveMQ 不認爲消息已經消費。生產中通常使用 Session.CLIENT_ACKNOWLEDGE
簽收,不要相信自動簽收方式。
# 客戶端處理 TextMessage message = (TextMessage) consumer.receive(); message.acknowledge();
經過 session 建立 Destination 對象,指定生產消息目標禾消費消息來源的對象。在 PTP 模式中 Destination 被稱做 Queue 即隊列;在 Pub/sub 模式 Destination 被稱做 Topic 即主題,在程序中可使用多個 Queue 和 Top。
建立消息的發送和接收對象(生產者和消貴者) MessageProducer/MessageConsumer
指定簽收模式。使用 MessageProducer 的 setDeliveryMode 方法爲其設置持久化特性和非持久化特性(DeliveryMode)
發送消息。使用 JMS 規範的 TextMessage 形式建立數據(經過 session 對象),而且 MessageProducer 的 send 方法發送數據。同理客戶端使用 receive 方法進行接收數據。最後不要忘記關閉 Connection 鏈接。
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
destination
能夠指定將不一樣的消息發送到不一樣的 destination,但消費端就必須指定對應的 destination
deliveryMode
是否開啓持久化,默認爲持久性,DeliveryMode.NON_PERSISTENT
、DeliveryMode.PERSISTENT
priority
優先級 0-9,默認爲 4,優先級越高越先消費,機率
timeToLive
ActiveMQ 中消息保留的時間,單位秒,默認永久保存
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class Consumer { public static void main(String[] args) throws JMSException { ConnectionFactory factory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616/" ); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Destination dest = session.createQueue("queue1"); // (1) MessageConsumer consumer = session.createConsumer(dest); while (true) { TextMessage message = (TextMessage) consumer.receive(); //message.acknowledge(); if (message == null) break; System.out.println(message.getText()); } if (connection != null) { connection.close(); } } }