JMS ActiveMQ研究文檔

 

1. 背景html

當前,CORBA、DCOM、RMI等RPC中間件技術已普遍應用於各個領域。可是面對規模和複雜度都愈來愈高的分佈式系統,這些技術也顯示出其侷限性:(1)同步通訊:客戶發出調用後,必須等待服務對象完成處理並返回結果後才能繼續執行;(2)客戶和服務對象的生命週期緊密耦合:客戶進程和服務對象進程 都必須正常運行;若是因爲服務對象崩潰或者網絡故障致使客戶的請求不可達,客戶會接收到異常;(3)點對點通訊:客戶的一次調用只發送給某個單獨的目標對象。java

面向消息的中間件(Message Oriented Middleware,MOM)較好的解決了以上問題。發送者將消息發送給消息服務器,消息服務器將消息存放在若干隊列中,在合適的時候再將消息轉發給接收者。這種模式下,發送和接收是異步的,發送者無需等待;兩者的生命週期未必相同:發送消息的時候接收者不必定運行,接收消息的時候發送者也不必定運行; 一對多通訊:對於一個消息能夠有多個接收者。web

已有的MOM系統包括IBM的MQSeries、Microsoft的MSMQ和BEA的MessageQ等。因爲沒有一個通用的標準,這些系統很難實現互操做和無縫鏈接。Java Message Service(JMS)是SUN提出的旨在統一各類MOM系統接口的規範,它包含點對點(Point to Point,PTP)和發佈/訂閱(Publish/Subscribe,pub/sub)兩種消息模型,提供可靠消息傳輸、事務和消息過濾等機制。apache

2.JMS概述編程

2.1 JMS規範服務器

JAVA 消息服務(JMS)定義了Java 中訪問消息中間件的接口。JMS 只是接口,並無給予實現,實現JMS 接口的消息中間件稱爲JMS Provider,例如ActiveMQ。網絡

2.2 術語session

JMS Provider:實現JMS 接口的消息中間件;併發

PTP:Point to Point,即點對點的消息模型;app

Pub/Sub:Publish/Subscribe,即發佈/訂閱的消息模型;

Queue:隊列目標;

Topic:主題目標;

ConnectionFactory:鏈接工廠,JMS 用它建立鏈接;

Connection:JMS 客戶端到JMS Provider 的鏈接;

Destination:消息的目的地;

Session:會話,一個發送或接收消息的線程;

MessageProducer:由Session 對象建立的用來發送消息的對象;

MessageConsumer:由Session 對象建立的用來接收消息的對象;

Acknowledge:簽收;

Transaction:事務。

2.3 JMS編程模型

在 JMS 編程模型中,JMS 客戶端(組件或應用程序)經過 JMS 消息服務交換消息。消息生產者將消息發送至消息服務,消息消費者則從消息服務接收這些消息。這些消息傳送操做是使用一組實現 JMS 應用編程接口 (API) 的對象(由 JMS Provide提供)來執行的。

在 JMS 編程模型中,JMS 客戶端使用 ConnectionFactory 對象建立一個鏈接,向消息服務發送消息以及從消息服務接收消息均是經過此鏈接來進行。Connection 是客戶端與消息服務的活動鏈接。建立鏈接時,將分配通訊資源以及驗證客戶端。這是一個至關重要的對象,大多數客戶端均使用一個鏈接來進行全部的消息傳送。

鏈接用於建立會話。Session 是一個用於生成和使用消息的單線程上下文。它用於建立發送的生產者和接收消息的消費者,併爲所發送的消息定義發送順序。會話經過大量確認選項或經過事務來支持可靠傳送。

客戶端使用 MessageProducer 向指定的物理目標(在 API 中表示爲目標身份對象)發送消息。生產者可指定一個默認傳送模式(持久性消息與非持久性消息)、優先級和有效期值,以控制生產者向物理目標發送的全部消息。

一樣,客戶端使用 MessageConsumer 對象從指定的物理目標(在 API 中表示爲目標對象)接收消息。消費者可以使用消息選擇器,藉助它,消息服務能夠只向消費者發送與選擇標準匹配的那些消息。

消費者能夠支持同步或異步消息接收。異步使用可經過向消費者註冊 MessageListener 來實現。當會話線程調用 MessageListener 對象的 onMessage 方法時,客戶端將使用消息。

2.4 JMS編程域

JMS 支持兩種大相徑庭的消息傳送模型:PTP(即點對點模型)和Pub/Sub(即發佈/訂閱模型),分別稱做:PTP Domain 和Pub/Sub Domain。

PTP(使用Queue即隊列目標) 消息從一個生產者傳送至一個消費者。在此傳送模型中,目標是一個隊列。消息首先被傳送至隊列目標,而後根據隊列傳送策略,從該隊列將消息傳送至向此隊列進行註冊的某一個消費者,一次只傳送一條消息。能夠向隊列目標發送消息的生產者的數量沒有限制,但每條消息只能發送至、並由一個消費者成功使用。若是沒有已經向隊列目標註冊的消費者,隊列將保留它收到的消息,並在某個消費者向該隊列進行註冊時將消息傳送給該消費者。

Pub/Sub(使用Topic即主題目標) 消息從一個生產者傳送至任意數量的消費者。在此傳送模型中,目標是一個主題。消息首先被傳送至主題目標,而後傳送至全部已訂閱此主題的活動消費者。能夠向主題目標發送消息的生產者的數量沒有限制,而且每一個消息能夠發送至任意數量的訂閱消費者。主題目標也支持持久訂閱的概念。持久訂閱表示消費者已向主題目標進行註冊,但在消息傳送時此消費者能夠處於非活動狀態。當此消費者再次處於活動狀態時,它將接收此信息。若是沒有已經向主題目標註冊的消費者,主題不保留其接收到的消息,除非有非活動消費者註冊了持久訂閱。

這兩種消息傳送模型使用表示不一樣編程域的 API 對象(其語義稍有不一樣)進行處理,以下所示:

基本類型(統一域) PTP域

Pub/Sub域

ConnectionFactory QueueConnectionFactory TopicConnectionFactory
Connection QueueConnection TopicConnection
Session QueueSession TopicPublisher
Destination(Queue或 Topic) Queue Topic
MessageProducer QueueSender  
MessageConsumer QueueReceiver,QueueBrowser TopicSubscriber

使用圖表第一列中列出的統一域對象編寫點對點和發佈/訂閱消息傳送。這是首選方法(JMS 1.1規範)。然而,爲了符合早期的 JMS 1.02b 規範,可使用PTP域對象編寫點對點消息傳送,使用Pub/Sub域對象編制發佈/訂閱消息傳送。

2.5 JMS消息結構

JMS 消息由如下幾部分組成:消息頭,屬性和消息體。

2.5.1 消息頭(Header)

消息頭包含消息的識別信息和路由信息,消息頭包含一些標準的屬性如:JMSDestination,JMSMessageID等。

消息頭
由誰設置
JMSDestination send方法
JMSDeliveryMode send方法
JMSExpiration send方法
JMSPriority send方法
JMSMessageID send方法
JMSTimestamp send方法
JMSCorrelationID 客戶
JMSReplyTo 客戶
JMSType 客戶
JMSRedelivered JMS Provider

2.5.2 屬性(Properties)

除了消息頭中定義好的標準屬性外,JMS 提供一種機制增長新屬性到消息頭中,這種新屬性包含如下幾種:

1. 應用須要用到的屬性;

2. 消息頭中原有的一些可選屬性;

3. JMS Provider 須要用到的屬性。

標準的JMS 消息頭包含如下屬性:

 消息頭

 描述

JMSDestination 消息發送的目的地
JMSDeliveryMode 傳送模式, 有兩種模式: PERSISTENT 和NON_PERSISTENT,PERSISTENT 表示該消息必定要被送到目的地,不然會致使應用錯誤。NON_PERSISTENT 表示偶然丟失該消息是被容許的,這兩種模式使開發者能夠在消息傳送的可靠性和吞吐量之間找到平衡點。
JMSExpiration 消息過時時間,等於Destination 的send 方法中的timeToLive 值加上發送時刻的GMT 時間值。若是timeToLive值等於零,則JMSExpiration 被設爲零,表示該消息永不過時。若是發送後,在消息過時時間以後消息尚未被髮送到目的地,則該消息被清除。
JMSPriority 消息優先級,從0-9 十個級別,0-4 是普通消息,5-9 是加急消息。JMS 不要求JMS Provider 嚴格按照這十個優先級發送消息,但必須保證加急消息要先於普通消息到達。
JMSMessageID 惟一識別每一個消息的標識,由JMS Provider 產生。
JMSTimestamp 一個消息被提交給JMS Provider 到消息被髮出的時間。
JMSCorrelationID 用來鏈接到另一個消息,典型的應用是在回覆消息中鏈接到原消息。
JMSReplyTo 提供本消息回覆消息的目的地址
JMSType 消息類型的識別符。
JMSRedelivered 若是一個客戶端收到一個設置了JMSRedelivered 屬性的消息,則表示可能客戶端曾經在早些時候收到過該消息,但並無簽收(acknowledged)。

2.5.3 消息體(Body)

JMS API 定義了5種消息體格式,也叫消息類型,可使用不一樣形式發送接收數據並能夠兼容現有的消息格式,下面描述這5種類型:

消息類型

消息體

TextMessage java.lang.String對象,如xml文件內容
MapMessage 名/值對的集合,名是String對象,值類型能夠是Java任何基本類型
BytesMessage 字節流
StreamMessage Java中的輸入輸出流
ObjectMessage Java中的可序列化對象
Message 沒有消息體,只有消息頭和屬性

2.6 PTP模型

PTP(Point-to-Point)模型是基於隊列的,生產者發消息到隊列,消費者從隊列接收消息,隊列的存在使得消息的異步傳輸成爲可能。和郵件系統中的郵箱同樣,隊列能夠包含各類消息,JMS Provider 提供工具管理隊列的建立、刪除。JMS PTP 模型定義了客戶端如何向隊列發送消息,從隊列接收消息,瀏覽隊列中的消息。

下面描述JMS PTP 模型中的主要概念和對象:

名稱

描述

ConnectionFactory 客戶端用ConnectionFactory 建立Connection 對象。
Connection 一個到JMS Provider 的鏈接,客戶端能夠用Connection 建立Session 來發送和接收消息。
Session 客戶端用Session 建立MessageProducer 和MessageConsumer對象。若是在Session 關閉時,有一些消息已經被收到,但尚未被簽收(acknowledged),那麼,當消費者下次鏈接到相同的隊列時,這些消息還會被再次接收。
Destination(Queue或 TemporaryQueue) 客戶端用Session 建立Destination 對象。此處的目標爲隊列,隊列由隊列名識別。臨時隊列只能由建立它的Connection 所建立的消費者消費,可是任何生產者均可向臨時隊列發送消息。
MessageProducer 客戶端用MessageProducer 發送消息到隊列。
MessageConsumer 客戶端用MessageConsumer 接收隊列中的消息,若是用戶在receive方法中設定了消息選擇條件,那麼不符合條件的消息會留在隊列中,不會被接收到。
可靠性(Reliability) 隊列能夠長久地保存消息直到消費者收到消息。消費者不須要由於擔憂消息會丟失而時刻和隊列保持激活的鏈接狀態,充分體現了異步傳輸模式的優點。

2.7 PUB/SUB模型

JMS Pub/Sub 模型定義瞭如何向一個內容節點發布和訂閱消息,這些節點被稱做主題(topic)。

主題能夠被認爲是消息的傳輸中介,發佈者(publisher)發佈消息到主題,訂閱者(subscribe) 從主題訂閱消息。主題使得消息訂閱者和消息發佈者保持互相獨立,不須要接觸便可保證消息的傳送。

下面描述JMS Pub/Sub 模型中的主要概念和對象:

名稱

描述

訂閱(subscription) 消息訂閱分爲非持久訂閱(non-durable subscription)和持久訂閱(durable subscrip-tion),非持久訂閱只有當客戶端處於激活狀態,也就是和JMS Provider 保持鏈接狀態才能收到發送到某個主題的消息,而當客戶端處於離線狀態,這個時間段發到主題的消息將會丟失,永遠不會收到。持久訂閱時,客戶端向JMS 註冊一個識別本身身份的ID,當這個客戶端處於離線時,JMS Provider 會爲這個ID 保存全部發送到主題的消息,當客戶再次鏈接到JMS Provider時,會根據本身的ID 獲得全部當本身處於離線時發送到主題的消息。
ConnectionFactory 客戶端用ConnectionFactory 建立Connection 對象。
Connection 一個到JMS Provider 的鏈接,客戶端能夠用Connection 建立Session 來發送和接收消息。
Session 客戶端用Session 建立MessageProducer 和MessageConsumer對象。它還提供持久訂閱主題,或使用unsubscribe 方法取消消息的持久訂閱。
Destination(Topic和TemporaryTopic) 客戶端用Session 建立Destination 對象。此處的目標爲主題,主題由主題名識別。臨時主題只能由建立它的Connection所建立的消費者消費。臨時主題不能提供持久訂閱功能。JMS 沒有給出主題的組織和層次結構的定義,由JMS Provider 本身定義。
MessageProducer 客戶端用MessageProducer發佈消息到主題。
MessageConsumer 客戶端用MessageConsumer 接收發布到主題上的消息。能夠在receive 中設置消息過濾功能,這樣,不符合要求的消息不會被接收。
恢復和從新派送(Recovery and Redelivery) 非持久訂閱狀態下,不能恢復或從新派送一個未簽收的消息。只有持久訂閱才能恢復或從新派送一個未簽收的消息。
可靠性(Reliability) 當全部的消息必須被接收,則用持久訂閱模式。當丟失消息可以被容忍,則用非持久訂閱模式。

2.8 JMS支持併發

JMS對象

是否支持併發

Destination
ConnectionFactory
Connection
Session
MessageProducer
MessageConsumer

3. ActiveMQ安裝

3.1 版本

jdk版本:jdk1.5.0_11

ActiveMQ版本:ActiveMQ 4.2測試版

C++客戶端版本:ActiveMQ CPP 1.1 Release

3.2 ActiveMQ二進制安裝

gunzip apache-activemq-4.2-20070328.130210-35.tar.gz

tar xvf apache-activemq-4.2-20070328.130210-35.tar

設置ActiveMQ環境變量:ACTIVEMQ_HOME=安裝目錄

設置CLASSPATH環境變量, CLASSPATH=$CLASSPATH:$ACTIVEMQ_HOME/ apache-activemq-4.2-SNAPSHOT.jar

3.3 ActiveMQ移植

只需將$ACTIVEMQ_HOME打包移植到新機器便可。

3.4 C++客戶端編譯

安裝perl-5.8.8.tar.gz

tar xzvf perl-5.8.8.tar.gz

cd perl-5.8.8

rm -f config.sh Policy.sh

sh Configure -de -Dprefix=/usr

make

make test

make install

reboot

系統從新啓動,登陸系統後能夠執行 perl -v 查看Perl版本信息

檢查/usr/bin/perl或/usr/local/bin/perl是否指向新版本的perl

安裝m4-1.4.8.tar.gz

tar xzvf m4-1.4.8.tar.gz

cd m4-1.4.8

./configure

make

make install

安裝autoconf-2.59.tar.gz

tar xzvf autoconf-2.59.tar.gz

cd autoconf-2.59

./configure

make

make install

安裝automake-1.9.6.tar.gz

tar xzvf automake-1.9.6.tar.gz

cd automake-1.9.6

./configure

make

make install

安裝libtool-1.5.22.tar.gz

tar xzvf libtool-1.5.22.tar.gz

cd libtool-1.5.22

./configure

make

make install

安裝cppunit-1.10.2.tar.gz

tar xzvf cppunit-1.10.2.tar.gz

cd cppunit-1.10.2

./configure

make

make install

安裝e2fsprogs-1.38.tar.gz

注:此項僅Solaris8須要安裝,Solaris9已自帶此uuid頭文件和庫文件

tar xzvf e2fsprogs-1.38.tar.gz

cd e2fsprogs-1.38

./configure

make

make install

安裝gcc-3.4.6

注:無需自行編譯,直接從http://www.sunfreeware.com下載對應solaris版本和cpu的gcc-3.4.6包文件便可

gunzip gcc-3.4.6-sol8-sparc-local.gz

pkgadd –d gcc-3.4.6-sol8-sparc-local

安裝activemq-cpp-1.1.tar.gz

tar xzvf activemq-cpp-1.1.tar.gz

cd activemq-cpp-1.1

./autogen.sh

./configure

make

make install

複製activemq-cpp-1.1/include/activemq-cpp-1.1下文件至/usr/include

複製activemq-cpp-1.1/lib/libactivemq-cpp.a至/usr/lib

3.5 C++客戶端移植

1.在新機器安裝gcc-3.4.6;

2.複製activemq-cpp-1.1/include/activemq-cpp-1.1下文件至新機器/usr/include,複製activemq-cpp-1.1/lib/ libactivemq-cpp.a至新機器/usr/lib;

若是新機器爲Solaris8還需以下操做:複製在Solaris8下編譯e2fsprogs-1.38.tar.gz產生的uuid頭文件和庫文件至新機器相應的目錄(即/usr/include/uuid/uuid.h和/usr/lib/ libuuid.a,此處複製需注意文件層次)

3.6 啓動

cd $ACTIVEMQ_HOME/bin

./activemq

或者

./activemq > activemq.log 2>&1 &

3.6 中止

ps –ef|grep activemq

kill 進程號

4. ActiveMQ編程

ActiveMQ特點:

Supports a variety of Cross Language Clients and Protocols from Java, C, C++, C#, Ruby, Perl, Python, PHP

OpenWire for high performance clients in Java, C, C++, C#

Stomp support so that clients can be written easily in C, Ruby, Perl, Python, PHP to talk to ActiveMQ as well as any other popular Message Broker

Supports many advanced features such as Message Groups, Virtual Destinations, Wildcards and Composite Destinations

Fully supports JMS 1.1 and J2EE 1.4 with support for transient, persistent, transactional and XA messaging

Spring Support so that ActiveMQ can be easily embedded into Spring applications and configured using Spring's XML configuration mechanism

Tested inside popular J2EE servers such as Geronimo, JBoss 4, GlassFish and WebLogic

Includes JCA 1.5 resource adaptors for inbound & outbound messaging so that ActiveMQ should auto-deploy in any J2EE 1.4 compliant server

Supports pluggable transport protocols such as in-VM, TCP, SSL, NIO, UDP, multicast, JGroups and JXTA transports

Supports very fast persistence using JDBC along with a high performance journal

Designed for high performance clustering, client-server, peer based communication

REST API to provide technology agnostic and language neutral web based API to messaging

Ajax to support web streaming support to web browsers using pure DHTML, allowing web browsers to be part of the messaging fabric

Axis Support so that ActiveMQ can be easily dropped into Apache Axis runtimes to provide reliable messaging

Can be used as an in memory JMS provider, ideal for unit testing JMS

本文檔僅描述最基本的使用方法,實際使用過程當中請以官方文檔爲準。

1.ActiveMQ官方網站:http://activemq.apache.org

2.JMS官方網站:http://java.sun.com/products/jms

4.1 開發JSM的步驟

廣義上說,一個JMS 應用是幾個JMS 客戶端交換消息,開發JMS 客戶端應用由如下幾步構成:

  • 用JNDI 獲得ConnectionFactory 對象;
  • 用ConnectionFactory 建立Connection 對象;
  • 用Connection 對象建立一個或多個JMS Session;
  • 用JNDI 獲得目標隊列或主題對象,即Destination 對象;
  • 用Session 和Destination 建立MessageProducer 和MessageConsumer;
  • 通知Connection 開始傳送消息。

4.2 編程模版

4.2.1 ConnectionFactory

要初始化JMS,則須要使用鏈接工廠。客戶端經過建立ConnectionFactory創建到ActveMQ的鏈接,一個鏈接工廠封裝了一組鏈接配置參數,這組參數在配置ActiveMQ時已經定義,例如brokerURL參數,此參數傳入的是ActiveMQ服務地址和端口,支持openwire協議的默認鏈接爲tcp://localhost:61616,支持stomp協議的默認鏈接爲tcp://localhost:61613。

注:因爲C++客戶端暫時僅支持stomp協議,因此須要使用tcp://localhost:61613。

ConnectionFactory支持併發。

Java客戶端:

ActiveMQConnectionFactory構造方法:

ActiveMQConnectionFactory();

ActiveMQConnectionFactory(String brokerURL);

ActiveMQConnectionFactory(String userName, String password, String brokerURL) ; 

ActiveMQConnectionFactory(String userName, String password, URI brokerURL) ;

ActiveMQConnectionFactory(URI brokerURL);

其中brokerURL爲ActiveMQ服務地址和端口。

例如:

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.0.135:61616");

或者

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();

connectionFactory.setBrokerURL("tcp://192.168.0.135:61616");

C++客戶端:

ActiveMQConnectionFactory構造函數:

ActiveMQConnectionFactory(void); 
ActiveMQConnectionFactory( const std::string& 
 url, const std::string& username = "",

 

const std::string& password = "",
 const std::string& clientId = "" 
 );

例如:

ActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.0.135:61613");

或者

ActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory();

connectionFactory->setBrokerURL("tcp://192.168.0.135:61613");

4.2.2 Connection

在成功建立正確的ConnectionFactory後,下一步將是建立一個鏈接,它是JMS定義的一個接口。ConnectionFactory負責返回能夠與底層消息傳遞系統進行通訊的Connection實現。一般客戶端只使用單一鏈接。根據JMS文檔,Connection的目的是「利用JMS提供者封裝開放的鏈接」,以及表示「客戶端與提供者服務例程之間的開放TCP/IP套接字」。該文檔還指出Connection應該是進行客戶端身份驗證的地方,除了其餘一些事項外,客戶端還能夠指定唯一標誌符。

當一個Connection被建立時,它的傳輸默認是關閉的,必須使用start方法開啓。

一個Connection能夠創建一個或多個的Session。

當一個程序執行完成後,必須關閉以前建立的Connection,不然ActiveMQ不能釋放資源,關閉一個Connection一樣也關閉了Session,MessageProducer和MessageConsumer。

Connection支持併發。

4.2.2.1 建立Connection

Java客戶端:

ActiveMQConnectionFactory方法:

Connection createConnection();
Connection createConnection(String userName, String password);

例如:

Connection connection = connectionFactory.createConnection();

C++客戶端:

函數原型:

cms::Connection* ActiveMQConnectionFactory::createConnection(void)

throw ( cms::CMSException );

cms::Connection* ActiveMQConnectionFactory::createConnection(

const std::string& username,
const std::string& password,
const std::string& clientId )
throw ( cms::CMSException 

例如:

Connection* connection = connectionFactory->createConnection();

4.2.2.2 開啓Connection

Java客戶端:

ActiveMQConnection方法:

void start();

例如:

Connection.start();

C++客戶端:

函數原型:

void ActiveMQConnection::start(void) throw ( cms::CMSException );

例如:

connection->start();

4.2.2.3 關閉Connection

Java客戶端:

ActiveMQConnection方法:

void close();

例如:

Connection.close();

C++客戶端:

函數原型:

void ActiveMQConnection::close(void) throw ( cms::CMSException );

例如:

connection->close();

4.2.3 Session

一旦從ConnectionFactory中得到一個Connection,就必須從Connection中建立一個或者多個Session。Session是一個發送或接收消息的線程,可使用Session建立MessageProducer,MessageConsumer和Message。

Session能夠被事務化,也能夠不被事務化,一般,能夠經過向Connection上的適當建立方法傳遞一個布爾參數對此進行設置。

Java客戶端:

ActiveMQConnection方法:

Session createSession(boolean transacted, int acknowledgeMode);

其中transacted爲使用事務標識,acknowledgeMode爲簽收模式。

例如:

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

C++客戶端:

函數原型:

cms::Session* ActiveMQConnection::createSession(void);

cms::Session* ActiveMQConnection::createSession(
cms::Session::AcknowledgeMode ackMode );

例如:

Session* session = connection->createSession( Session::AUTO_ACKNOWLEDGE );

4.2.4 Destination

Destination是一個客戶端用來指定生產消息目標和消費消息來源的對象。

在PTP模式中,Destination被稱做Queue即隊列;在Pub/Sub模式,Destination被稱做Topic即主題。在程序中可使用多個Queue和Topic。

Java客戶端:

ActiveMQSession方法:

Queue createQueue(String queueName);
TemporaryQueue createTemporaryQueue();
Topic createTopic(String topicName);
TemporaryTopic createTemporaryTopic();

例如:

Destination destination = session.createQueue("TEST.FOO");

或者

Destination destination = session.createTopic("TEST.FOO");

C++客戶端:

函數原型:

cms::Queue* ActiveMQSession::createQueue( const std::string& queueName )
throw ( cms::CMSException );
cms::TemporaryQueue* ActiveMQSession::createTemporaryQueue(void)
throw ( cms::CMSException );
cms::Topic* ActiveMQSession::createTopic( const std::string& topicName )
throw ( cms::CMSException );
cms::TemporaryTopic* ActiveMQSession::createTemporaryTopic(void)
throw ( cms::CMSException );

例如:

Destination* destination = session->createQueue( "TEST.FOO" );

或者

Destination* destination = session->createTopic( "TEST.FOO" );

4.2.5 MessageProducer

MessageProducer是一個由Session建立的對象,用來向Destination發送消息。

4.2.5.1 建立MessageProducer

Java客戶端:

ActiveMQSession方法:

MessageProducer createProducer(Destination destination);

例如:

MessageProducer producer = session.createProducer(destination);

C++客戶端:

函數原型:

cms::MessageProducer* ActiveMQSession::createProducer(

const cms::Destination* destination );

例如:

MessageProducer* producer = session->createProducer( destination );

4.2.5.2 發送消息

Java客戶端:

ActiveMQMessageProducer方法:

void send(Destination destination, Message message);
void send(Destination destination, Message message, int deliveryMode, int priority,
long timeToLive);
void send(Message message);
void send(Message message, int deliveryMode, int priority, long timeToLive);

其中deliveryMode爲傳送模式,priority爲消息優先級,timeToLive爲消息過時時間。

例如:

producer.send(message);

C++客戶端:

函數原型:

void ActiveMQProducer::send( cms::Message* message )
throw ( cms::CMSException );
void ActiveMQProducer::send( cms::Message* message, int deliveryMode,
int priority,
long long timeToLive )
throw ( cms::CMSException );
void ActiveMQProducer::send( const cms::Destination* destination,
cms::Message* message) throw ( cms::CMSException );
void ActiveMQProducer::send( const cms::Destination* destination,
cms::Message* message, int deliveryMode,
int priority, long long timeToLive)
throw ( cms::CMSException );

例如:

producer->send( message );

4.2.6 MessageConsumer

MessageConsumer是一個由Session建立的對象,用來從Destination接收消息。

4.2.6.1 建立MessageConsumer

Java客戶端:

ActiveMQSession方法:

MessageConsumer createConsumer(Destination destination);
MessageConsumer createConsumer(Destination destination, String messageSelector); 
MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal);
TopicSubscriber createDurableSubscriber(Topic topic, String name);
TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal);

其中messageSelector爲消息選擇器;noLocal標誌默認爲false,當設置爲true時限制消費者只能接收和本身相同的鏈接(Connection)所發佈的消息,此標誌只適用於主題,不適用於隊列;name標識訂閱主題所對應的訂閱名稱,持久訂閱時須要設置此參數。

例如:

MessageConsumer consumer = session.createConsumer(destination);

C++客戶端:

函數原型:

cms::MessageConsumer* ActiveMQSession::createConsumer(
const cms::Destination* destination );
cms::MessageConsumer* ActiveMQSession::createConsumer(
const cms::Destination* destination,
const std::string& selector )
throw ( cms::CMSException );
cms::MessageConsumer* ActiveMQSession::createConsumer(
const cms::Destination* destination,
const std::string& selector,
bool noLocal )
throw ( cms::CMSException );
cms::MessageConsumer* ActiveMQSession::createDurableConsumer(
const cms::Topic* destination,
const std::string& name,
const std::string& selector,
bool noLocal )
throw ( cms::CMSException );

例如:

MessageConsumer* consumer = session->createConsumer( destination );

4.2.6.2消息的同步和異步接收

消息的同步接收是指客戶端主動去接收消息,客戶端能夠採用MessageConsumer 的receive方法去接收下一個消息。

息的異步接收是指當消息到達時,ActiveMQ主動通知客戶端。客戶端能夠經過註冊一個實現MessageListener 接口的對象到MessageConsumer。MessageListener只有一個必須實現的方法 —— onMessage,它只接收一個參數,即Message。在爲每一個發送到Destination的消息實現onMessage時,將調用該方法。

Java客戶端:

ActiveMQMessageConsumer方法:

Message receive()
Message receive(long timeout)
Message receiveNoWait()

其中timeout爲等待時間,單位爲毫秒。

或者

實現MessageListener接口,每當消息到達時,ActiveMQ會調用MessageListener中的onMessage 函數。

例如:

Message message = consumer.receive();

C++客戶端:

函數原型:

cms::Message* ActiveMQConsumer::receive() throw ( cms::CMSException )

cms::Message* ActiveMQConsumer::receive( int millisecs )
throw ( cms::CMSException );
cms::Message* ActiveMQConsumer::receiveNoWait(void)
throw ( cms::CMSException );

或者

實現MessageListener接口,每當消息到達時,ActiveMQ會調用MessageListener中的onMessage 函數。

例如:

Message *message = consumer->receive();

或者

consumer->setMessageListener( this );

virtual void onMessage( const Message* message ){

//process message

}

4.2.6.3消息選擇器

JMS提供了一種機制,使用它,消息服務可根據消息選擇器中的標準來執行消息過濾。生產者可在消息中放入應用程序特有的屬性,而消費者可以使用基於這些屬性的選擇標準來代表對消息是否感興趣。這就簡化了客戶端的工做,並避免了向不須要這些消息的消費者傳送消息的開銷。然而,它也使得處理選擇標準的消息服務增長了一些額外開銷。

消息選擇器是用於MessageConsumer的過濾器,能夠用來過濾傳入消息的屬性和消息頭部分(但不過濾消息體),並肯定是否將實際消費該消息。按照JMS文檔的說法,消息選擇器是一些字符串,它們基於某種語法,而這種語法是SQL-92的子集。能夠將消息選擇器做爲MessageConsumer建立的一部分。

Java客戶端:

例如:

public final String SELECTOR = 「JMSType = ‘TOPIC_PUBLISHER’」;

該選擇器檢查了傳入消息的JMSType屬性,並肯定了這個屬性的值是否等於TOPIC_PUBLISHER。若是相等,則消息被消費;若是不相等,那麼消息會被忽略。

4.2.7 Message

JMS程序的最終目的是生產和消費的消息能被其餘程序使用,JMS的 Message是一個既簡單又不乏靈活性的基本格式,容許建立不一樣平臺上符合非JMS程序格式的消息。Message由如下幾部分組成:消息頭,屬性和消息體。

Java客戶端:

ActiveMQSession方法:

BlobMessage createBlobMessage(File file)
BlobMessage createBlobMessage(InputStream in)
BlobMessage createBlobMessage(URL url)
BlobMessage createBlobMessage(URL url, boolean deletedByBroker)
BytesMessage createBytesMessage()
MapMessage createMapMessage()
Message createMessage()
ObjectMessage createObjectMessage()
ObjectMessage createObjectMessage(Serializable object)
TextMessage createTextMessage()
TextMessage createTextMessage(String text)

例如:

下例演示建立併發送一個TextMessage到一個隊列:

TextMessage message = queueSession.createTextMessage();
message.setText(msg_text); // msg_text is a String
queueSender.send(message);

下例演示接收消息並轉換爲合適的消息類型:

Message m = queueReceiver.receive();
if (m instanceof TextMessage) {
TextMessage message = (TextMessage) m;
System.out.println("Reading message: " + message.getText());
} else {
// Handle error
}

C++客戶端:

函數原型:

cms::Message* ActiveMQSession::createMessage(void)
throw ( cms::CMSException )
cms::BytesMessage* ActiveMQSession::createBytesMessage(void)
throw ( cms::CMSException )
cms::BytesMessage* ActiveMQSession::createBytesMessage(
const unsigned char* bytes,
unsigned long long bytesSize )
throw ( cms::CMSException )
cms::TextMessage* ActiveMQSession::createTextMessage(void)
throw ( cms::CMSException )
cms::TextMessage* ActiveMQSession::createTextMessage( const std::string& text )
throw ( cms::CMSException )
cms::MapMessage* ActiveMQSession::createMapMessage(void)
throw ( cms::CMSException )

例如:

下例演示建立併發送一個TextMessage到一個隊列:

TextMessage* message = session->createTextMessage( text ); // text is a string
producer->send( message );
delete message;

下例演示接收消息:

Message *message = consumer->receive();
const TextMessage* textMessage = dynamic_cast< const TextMessage* >( message );
string text = textMessage->getText();
printf( "Received: %s/n", text.c_str() );
delete message;

4.3 可靠性機制

發送消息最可靠的方法就是在事務中發送持久性的消息,ActiveMQ默認發送持久性消息。結束事務有兩種方法:提交或者回滾。當一個事務提交,消息被處理。若是事務中有一個步驟失敗,事務就回滾,這個事務中的已經執行的動做將被撤銷。

接收消息最可靠的方法就是在事務中接收信息,無論是從PTP模式的非臨時隊列接收消息仍是從Pub/Sub模式持久訂閱中接收消息。

對於其餘程序,低可靠性能夠下降開銷和提升性能,例如發送消息時能夠更改消息的優先級或者指定消息的過時時間。

消息傳送的可靠性越高,須要的開銷和帶寬就越多。性能和可靠性之間的折衷是設計時要重點考慮的一個方面。能夠選擇生成和使用非持久性消息來得到最佳性能。另外一方面,也能夠經過生成和使用持久性消息並使用事務會話來得到最佳可靠性。在這兩種極端之間有許多選擇,這取決於應用程序的要求。

4.3.1 基本可靠性機制

4.3.1.1 控制消息的簽收(Acknowledgment)

客戶端成功接收一條消息的標誌是這條消息被簽收。成功接收一條消息通常包括以下三個階段:

1.客戶端接收消息;

2.客戶端處理消息;

3.消息被簽收。簽收能夠由ActiveMQ發起,也能夠由客戶端發起,取決於Session簽收模式的設置。

在帶事務的Session中,簽收自動發生在事務提交時。若是事務回滾,全部已經接收的消息將會被再次傳送。

在不帶事務的Session中,一條消息什麼時候和如何被簽收取決於Session的設置。

1.Session.AUTO_ACKNOWLEDGE

當客戶端從receive或onMessage成功返回時,Session自動簽收客戶端的這條消息的收條。在AUTO_ACKNOWLEDGE的Session中,同步接收receive是上述三個階段的一個例外,在這種狀況下,收條和簽收緊隨在處理消息以後發生。

2.Session.CLIENT_ACKNOWLEDGE

客戶端經過調用消息的acknowledge方法簽收消息。在這種狀況下,簽收發生在Session層面:簽收一個已消費的消息會自動地簽收這個Session全部已消費消息的收條。

3.Session.DUPS_OK_ACKNOWLEDGE

此選項指示Session沒必要確保對傳送消息的簽收。它可能引發消息的重複,可是下降了Session的開銷,因此只有客戶端能容忍重複的消息,纔可以使用(若是ActiveMQ再次傳送同一消息,那麼消息頭中的JMSRedelivered將被設置爲true)。

Java客戶端:

簽收模式分別爲:

1. Session.AUTO_ACKNOWLEDGE

2. Session.CLIENT_ACKNOWLEDGE

3. Session.DUPS_OK_ACKNOWLEDGE

ActiveMQConnection方法:

Session createSession(boolean transacted, int acknowledgeMode);

例如:

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

C++客戶端:

簽收模式分別爲:

1. Session::AUTO_ACKNOWLEDGE

2. Session::CLIENT_ACKNOWLEDGE

3. Session::DUPS_OK_ACKNOWLEDGE

4. Session::SESSION_TRANSACTED

函數原型:

cms::Session* ActiveMQConnection::createSession(
cms::Session::AcknowledgeMode ackMode )
throw ( cms::CMSException )

例如:

Session* session = connection->createSession( Session::AUTO_ACKNOWLEDGE );

對隊列來講,若是當一個Session終止時它接收了消息可是沒有簽收,那麼ActiveMQ將保留這些消息並將再次傳送給下一個進入隊列的消費者。

對主題來講,若是持久訂閱用戶終止時,它已消費未簽收的消息也將被保留,直到再次傳送給這個用戶。對於非持久訂閱,AtiveMQ在用戶Session關閉時將刪除這些消息。

若是使用隊列和持久訂閱,而且Session沒有使用事務,那麼可使用Session的recover方法中止Session,再次啓動後將收到它第一條沒有簽收的消息,事實上,重啓後Session一系列消息的傳送都是以上一次最後一條已簽收消息的下一條爲起點。若是這時有消息過時或者高優先級的消息到來,那麼這時消息的傳送將會和最初的有所不一樣。對於非持久訂閱用戶,重啓後,ActiveMQ有可能刪除全部沒有簽收的消息。

4.3.1.2 指定消息傳送模式

ActiveMQ支持兩種消息傳送模式:PERSISTENT和NON_PERSISTENT兩種。

1.PERSISTENT(持久性消息)

這是ActiveMQ的默認傳送模式,此模式保證這些消息只被傳送一次和成功使用一次。對於這些消息,可靠性是優先考慮的因素。可靠性的另外一個重要方面是確保持久性消息傳送至目標後,消息服務在向消費者傳送它們以前不會丟失這些消息。這意味着在持久性消息傳送至目標時,消息服務將其放入持久性數據存儲。若是消息服務因爲某種緣由致使失敗,它能夠恢復此消息並將此消息傳送至相應的消費者。雖然這樣增長了消息傳送的開銷,但卻增長了可靠性。

2.NON_PERSISTENT(非持久性消息)

保證這些消息最多被傳送一次。對於這些消息,可靠性並不是主要的考慮因素。此模式並不要求持久性的數據存儲,也不保證消息服務因爲某種緣由致使失敗後消息不會丟失。

有兩種方法指定傳送模式:

1.使用setDeliveryMode方法,這樣全部的消息都採用此傳送模式;

2.使用send方法爲每一條消息設置傳送模式;

Java客戶端:

傳送模式分別爲:

1. DeliveryMode.PERSISTENT

2. DeliveryMode.NON_PERSISTENT

ActiveMQMessageProducer方法:

void setDeliveryMode(int newDeliveryMode);

或者

void send(Destination destination, Message message, int deliveryMode, int priority,

long timeToLive);

void send(Message message, int deliveryMode, int priority, long timeToLive);

其中deliveryMode爲傳送模式,priority爲消息優先級,timeToLive爲消息過時時間。

例如:

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

C++客戶端:

傳送模式分別爲:

1. DeliveryMode::PERSISTANT

2. DeliveryMode::NON_PERSISTANT

函數原型:

void setDeliveryMode( int mode );

或者

void ActiveMQProducer::send( cms::Message* message, int deliveryMode,
int priority,
long long timeToLive )
throw ( cms::CMSException );
void ActiveMQProducer::send( const cms::Destination* destination,
cms::Message* message, int deliveryMode,
int priority, long long timeToLive)
throw ( cms::CMSException );

例如:

producer->setDeliveryMode( DeliveryMode::NON_PERSISTANT );

若是不指定傳送模式,那麼默認是持久性消息。若是容忍消息丟失,那麼使用非持久性消息能夠改善性能和減小存儲的開銷。

4.3.1.3 設置消息優先級

一般,能夠確保將單個會話向目標發送的全部消息按其發送順序傳送至消費者。然而,若是爲這些消息分配了不一樣的優先級,消息傳送系統將首先嚐試傳送優先級較高的消息。

有兩種方法設置消息的優先級:

1.使用setDeliveryMode方法,這樣全部的消息都採用此傳送模式;

2.使用send方法爲每一條消息設置傳送模式;

Java客戶端:

ActiveMQMessageProducer方法:

void setPriority(int newDefaultPriority);

或者

void send(Destination destination, Message message, int deliveryMode, int priority,

long timeToLive);
void send(Message message, int deliveryMode, int priority, long timeToLive);

其中deliveryMode爲傳送模式,priority爲消息優先級,timeToLive爲消息過時時間。

例如:

producer.setPriority(4);

C++客戶端:

函數原型:

void setPriority( int priority );

或者

void ActiveMQProducer::send( cms::Message* message, int deliveryMode,
int priority,
long long timeToLive )
throw ( cms::CMSException );
void ActiveMQProducer::send( const cms::Destination* destination,
cms::Message* message, int deliveryMode,
int priority, long long timeToLive)
throw ( cms::CMSException );

例如:

producer-> setPriority(4);

消息優先級從0-9十個級別,0-4是普通消息,5-9是加急消息。若是不指定優先級,則默認爲4。JMS不要求嚴格按照這十個優先級發送消息,但必須保證加急消息要先於普通消息到達。

4.3.1.4 容許消息過時

默認狀況下,消息永不會過時。若是消息在特定週期內失去意義,那麼能夠設置過時時間。

有兩種方法設置消息的過時時間,時間單位爲毫秒:

1.使用setTimeToLive方法爲全部的消息設置過時時間;

2.使用send方法爲每一條消息設置過時時間;

Java客戶端:

ActiveMQMessageProducer方法:

void setTimeToLive(long timeToLive);

或者

void send(Destination destination, Message message, int deliveryMode, int priority,

long timeToLive);
void send(Message message, int deliveryMode, int priority, long timeToLive);

其中deliveryMode爲傳送模式,priority爲消息優先級,timeToLive爲消息過時時間。

例如:

producer.setTimeToLive(1000);

C++客戶端:

函數原型:

void setTimeToLive( long long time );

或者

void ActiveMQProducer::send( cms::Message* message, int deliveryMode,
int priority,
long long timeToLive )
throw ( cms::CMSException );
void ActiveMQProducer::send( const cms::Destination* destination,
cms::Message* message, int deliveryMode,
int priority, long long timeToLive)
throw ( cms::CMSException );

例如:

Producer->setTimeToLive(1000);

消息過時時間,send 方法中的timeToLive 值加上發送時刻的GMT 時間值。若是timeToLive值等於零,則JMSExpiration 被設爲零,表示該消息永不過時。若是發送後,在消息過時時間以後消息尚未被髮送到目的地,則該消息被清除。

4.3.1.5 建立臨時目標

ActiveMQ經過createTemporaryQueue和createTemporaryTopic建立臨時目標,這些目標持續到建立它的Connection關閉。只有建立臨時目標的Connection所建立的客戶端才能夠從臨時目標中接收消息,可是任何的生產者均可以向臨時目標中發送消息。若是關閉了建立此目標的Connection,那麼臨時目標被關閉,內容也將消失。

Java客戶端:

ActiveMQSession方法:

TemporaryQueue createTemporaryQueue();
TemporaryTopic createTemporaryTopic();

C++客戶端:

函數原型:

cms::TemporaryQueue* ActiveMQSession::createTemporaryQueue(void)
throw ( cms::CMSException );
cms::TemporaryTopic* ActiveMQSession::createTemporaryTopic(void)
throw ( cms::CMSException );

某些客戶端須要一個目標來接收對發送至其餘客戶端的消息的回覆。這時可使用臨時目標。Message的屬性之一是JMSReplyTo屬性,這個屬性就是用於這個目的的。能夠建立一個臨時的Destination,並把它放入Message的JMSReplyTo屬性中,收到該消息的消費者能夠用它來響應生產者。

Java客戶端:

以下所示代碼段,將建立臨時的Destination,並將它放置在TextMessage的JMSReplyTo屬性中:

// Create a temporary queue for replies...
Destination tempQueue = session.createTemporaryQueue();
// Set ReplyTo to temporary queue...
msg.setJMSReplyTo(tempQueue);

消費者接收這條消息時,會從JMSReplyTo字段中提取臨時Destination,而且會經過應用程序構造一個MessageProducer,以便將響應消息發送回生產者。這展現瞭如何使用JMS Message的屬性,並顯示了私有的臨時Destination的有用之處。它還展現了客戶端能夠既是消息的生產者,又能夠是消息的消費者。

// Get the temporary queue from the JMSReplyTo
// property of the message...
Destination tempQueue = msg.getJMSReplyTo();
// create a Sender for the temporary queue
MessageProducer Sender = session.createProducer(tempQueue);
TextMessage msg = session.createTextMessage();
msg.setText(REPLYTO_TEXT);
...
// Send the message to the temporary queue...
sender.send(msg);

4.3.2 高級可靠性機制

4.3.2.1 建立持久訂閱

經過爲發佈者設置PERSISTENT傳送模式,爲訂閱者時使用持久訂閱,這樣能夠保證Pub/Sub程序接收全部發布的消息。

消息訂閱分爲非持久訂閱(non-durable subscription)和持久訂閱(durable subscription),非持久訂閱只有當客戶端處於激活狀態,也就是和ActiveMQ保持鏈接狀態才能收到發送到某個主題的消息,而當客戶端處於離線狀態,這個時間段發到主題的消息將會丟失,永遠不會收到。持久訂閱時,客戶端向ActiveMQ註冊一個識別本身身份的ID,當這個客戶端處於離線時,ActiveMQ會爲這個ID 保存全部發送到主題的消息,當客戶端再次鏈接到ActiveMQ時,會根據本身的ID 獲得全部當本身處於離線時發送到主題的消息。持久訂閱會增長開銷,同一時間在持久訂閱中只有一個激活的用戶。

創建持久訂閱的步驟:

1. 爲鏈接設置一個客戶ID;

2. 爲訂閱的主題指定一個訂閱名稱;

上述組合必須惟一。

4.3.2.1.1 建立持久訂閱

Java客戶端:

ActiveMQConnection方法:

void setClientID(String newClientID)

ActiveMQSession方法:

TopicSubscriber createDurableSubscriber(Topic topic, String name)
TopicSubscriber createDurableSubscriber(Topic topic, String name, String
messageSelector, boolean noLocal)

其中messageSelector爲消息選擇器;noLocal標誌默認爲false,當設置爲true時限制消費者只能接收和本身相同的鏈接(Connection)所發佈的消息,此標誌只適用於主題,不適用於隊列;name標識訂閱主題所對應的訂閱名稱,持久訂閱時須要設置此參數。

C++客戶端:

函數原型:

virtual void setClientId( const std::string& clientId );

cms::MessageConsumer* ActiveMQSession::createDurableConsumer(
const cms::Topic* destination,
const std::string& name,
const std::string& selector,
bool noLocal )
throw ( cms::CMSException )

4.3.2.1.2 刪除持久訂閱

Java客戶端:

ActiveMQSession方法:

void unsubscribe(String name);

4.3.2.2 使用本地事務

在事務中生成或使用消息時,ActiveMQ跟蹤各個發送和接收過程,並在客戶端發出提交事務的調用時完成這些操做。若是事務中特定的發送或接收操做失敗,則出現異常。客戶端代碼經過忽略異常、重試操做或回滾整個事務來處理異常。在事務提交時,將完成全部成功的操做。在事務進行回滾時,將取消全部成功的操做。

本地事務的範圍始終爲一個會話。也就是說,能夠將單個會話的上下文中執行的一個或多個生產者或消費者操做組成一個本地事務。

不但單個會話能夠訪問 Queue 或 Topic (任一類型的 Destination ),並且單個會話實例能夠用來操縱一個或多個隊列以及一個或多個主題,一切都在單個事務中進行。這意味着單個會話能夠(例如)建立隊列和主題中的生產者,而後使用單個事務來同時發送隊列和主題中的消息。由於單個事務跨越兩個目標,因此,要麼隊列和主題的消息都獲得發送,要麼都未獲得發送。相似地,單個事務能夠用來接收隊列中的消息並將消息發送到主題上,反過來也能夠。

因爲事務的範圍只能爲單個的會話,所以不存在既包括消息生成又包括消息使用的端對端事務。(換句話說,至目標的消息傳送和隨後進行的至客戶端的消息傳送不能放在同一個事務中。)

4.3.2.2.1 使用事務

Java客戶端:

ActiveMQConnection方法:

Session createSession(boolean transacted, int acknowledgeMode);

其中transacted爲使用事務標識,acknowledgeMode爲簽收模式。

例如:

Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

C++客戶端:

函數原型:

cms::Session* ActiveMQConnection::createSession(
cms::Session::AcknowledgeMode ackMode );

其中AcknowledgeMode ackMode需指定爲SESSION_TRANSACTED。

例如:

Session* session = connection->createSession( Session:: SESSION_TRANSACTED );

4.3.2.2.2 提交

Java客戶端:

ActiveMQSession方法:

void commit();

例如:

try {
producer.send(consumer.receive());
session.commit();
}

catch (JMSException ex) {
session.rollback();
}

C++客戶端:

函數原型:

void ActiveMQSession::commit(void) throw 
  ( cms::CMSException )

4.3.2.2.3 回滾

Java客戶端:

ActiveMQSession方法:

void rollback();

C++客戶端:

函數原型:

void ActiveMQSession::rollback(void) throw ( cms::CMSException )

4.4 高級特徵

4.4.1 異步發送消息

ActiveMQ支持生產者以同步或異步模式發送消息。使用不一樣的模式對send方法的反應時間有巨大的影響,反映時間是衡量ActiveMQ吞吐量的重要因素,使用異步發送能夠提升系統的性能。

在默認大多數狀況下,AcitveMQ是以異步模式發送消息。例外的狀況:在沒有使用事務的狀況下,生產者以PERSISTENT傳送模式發送消息。在這種狀況下,send方法都是同步的,而且一直阻塞直到ActiveMQ發回確認消息:消息已經存儲在持久性數據存儲中。這種確認機制保證消息不會丟失,但會形成生產者阻塞從而影響反應時間。

高性能的程序通常都能容忍在故障狀況下丟失少許數據。若是編寫這樣的程序,能夠經過使用異步發送來提升吞吐量(甚至在使用PERSISTENT傳送模式的狀況下)。

Java客戶端:

使用Connection URI配置異步發送:

cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");

在ConnectionFactory層面配置異步發送:

((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);

在Connection層面配置異步發送,此層面的設置將覆蓋ConnectionFactory層面的設置:

((ActiveMQConnection)connection).setUseAsyncSend(true);

4.4.2 消費者特點

4.4.2.1 消費者異步分派

在ActiveMQ4中,支持ActiveMQ以同步或異步模式向消費者分派消息。這樣的意義:能夠以異步模式向處理消息慢的消費者分配消息;以同步模式向處理消息快的消費者分配消息。

ActiveMQ默認以同步模式分派消息,這樣的設置能夠提升性能。可是對於處理消息慢的消費者,須要以異步模式分派。

Java客戶端:

在ConnectionFactory層面配置同步分派:

((ActiveMQConnectionFactory)connectionFactory).setDispatchAsync(false);

在Connection層面配置同步分派,此層面的設置將覆蓋ConnectionFactory層面的設置:

((ActiveMQConnection)connection).setDispatchAsync(false);

在消費者層面以Destination URI配置同步分派,此層面的設置將覆蓋ConnectionFactory和Connection層面的設置:

queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false");

consumer = session.createConsumer(queue);

4.4.2.2 消費者優先級

在ActveMQ分佈式環境中,在有消費者存在的狀況下,若是更但願ActveMQ發送消息給消費者而不是其餘的ActveMQ到ActveMQ的傳送,能夠以下設置:

Java客戶端:

queue = new ActiveMQQueue("TEST.QUEUE?consumer.prority=10");
consumer = session.createConsumer(queue);

4.4.2.3 獨佔的消費者

ActiveMQ維護隊列消息的順序並順序把消息分派給消費者。可是若是創建了多個Session和MessageConsumer,那麼同一時刻多個線程同時從一個隊列中接收消息時就並不能保證處理時有序。

有時候有序處理消息是很是重要的。ActiveMQ4支持獨佔的消費。ActiveMQ挑選一個MessageConsumer,並把一個隊列中全部消息按順序分派給它。若是消費者發生故障,那麼ActiveMQ將自動故障轉移並選擇另外一個消費者。能夠以下設置:

Java客戶端:

queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
consumer = session.createConsumer(queue);

4.4.2.4 再次傳送策略

在如下三種狀況中,消息會被再次傳送給消費者:

1.在使用事務的Session中,調用rollback()方法;

2.在使用事務的Session中,調用commit()方法以前就關閉了Session;

3.在Session中使用CLIENT_ACKNOWLEDGE簽收模式,而且調用了recover()方法。

能夠經過設置ActiveMQConnectionFactory和ActiveMQConnection來定製想要的再次傳送策略。

屬性
默認值
描述
collisionAvoidanceFactor 0.15 The percentage of range of collision avoidance if enabled
maximumRedeliveries 6 Sets the maximum number of times a message will be redelivered before it is considered a poisoned pill and returned to the broker so it can go to a Dead Letter Queue
initialRedeliveryDelay 1000L The initial redelivery delay in milliseconds
useCollisionAvoidance false Should the redelivery policy use collision avoidance
useExponentialBackOff false Should exponential back-off be used (i.e. to exponentially increase the timeout)
backOffMultiplier 5 The back-off multiplier

4.4.3 目標特點

4.4.3.1 複合目標

在1.1版本以後,ActiveMQ支持混合目標技術。它容許在一個JMS目標中使用一組JMS目標。

例如能夠利用混合目標在同一操做中用向12個隊列發送同一條消息或者在同一操做中向一個主題和一個隊列發送同一條消息。

在混合目標中,經過「,」來分隔不一樣的目標。

Java客戶端:

例如:

// send to 3 queues as one logical operation
Queue queue = new ActiveMQQueue("FOO.A,FOO.B,FOO.C");
producer.send(queue, someMessage);

若是在一個目標中混合不一樣類別的目標,能夠經過使用「queue://」和「topic://」前綴來識別不一樣的目標。

例如:

// send to queues and topic one logical operation
Queue queue = new ActiveMQQueue("FOO.A,topic://NOTIFY.FOO.A");
producer.send(queue, someMessage);

4.4.3.2 目標選項

屬性
默認值
描述
consumer.prefetchSize variable The number of message the consumer will prefetch.
consumer.maximumPendingMessageLimit 0 Use to control if messages are dropped if a slow consumer situation exists.
consumer.noLocal false Same as the noLocal flag on a Topic consumer. Exposed here so that it can be used with a queue.
consumer.dispatchAsync false Should the brokerdispatch messages asynchronously to the consumer.
consumer.retroactive false Is this a Retroactive Consumer.
consumer.selector null JMS Selector used with the consumer.
consumer.exclusive false Is this an Exclusive Consumer .
consumer.priority 0 Allows you to configure a Consumer Priority

Java客戶端:

例如:

queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false&consumer.prefetchSize=10");
consumer = session.createConsumer(queue);

4.4.4 消息預取

ActiveMQ的目標之一就是高性能的數據傳送,因此ActiveMQ使用「預取限制」來控制有多少消息能及時的傳送給任何地方的消費者。

一旦預取數量達到限制,那麼就不會有消息被分派給這個消費者直到它發回簽收消息(用來標識全部的消息已經被處理)。

能夠爲每一個消費者指定消息預取。若是有大量的消息而且但願更高的性能,那麼能夠爲這個消費者增大預取值。若是有少許的消息而且每條消息的處理都要花費很長的時間,那麼能夠設置預取值爲1,這樣同一時間,ActiveMQ只會爲這個消費者分派一條消息。

Java客戶端:

在ConnectionFactory層面爲全部消費者配置預取值:

tcp://localhost:61616?jms.prefetchPolicy.all=50

在ConnectionFactory層面爲隊列消費者配置預取值:

tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1

使用「目標選項」爲一個消費者配置預取值:

queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");

consumer = session.createConsumer(queue);

4.4.5 配置鏈接URL

ActiveMQ支持經過Configuration URI明確的配置鏈接屬性。

例如:當要設置異步發送時,能夠經過在Configuration URI中使用jms.$PROPERTY來設置。

tcp://localhost:61616?jms.useAsyncSend=true

如下的選項在URI必須以「jms.」爲前綴。

屬性
默認值
描述
alwaysSessionAsync true If this flag is set then a seperate thread is not used for dispatching messages for each Session in the Connection. However, a separate thread is always used if there is more than one session, or the session isn't in auto acknowledge or dups ok mode
clientID null Sets the JMS clientID to use for the connection
closeTimeout 15000 (milliseconds) Sets the timeout before a close is considered complete. Normally a close() on a connection waits for confirmation from the broker; this allows that operation to timeout to save the client hanging if there is no broker.
copyMessageOnSend true Should a JMS message be copied to a new JMS Message object as part of the send() method in JMS. This is enabled by default to be compliant with the JMS specification. You can disable it if you do not mutate JMS messages after they are sent for a performance boost.
disableTimeStampsByDefault false Sets whether or not timestamps on messages should be disabled or not. If you disable them it adds a small performance boost.
dispatchAsync false Should the broker dispatch messages asynchronouslyto the consumer.
nestedMapAndListEnabled true Enables/disables whether or not Structured Message Properties and MapMessages are supported so that Message properties and MapMessage entries can contain nested Map and List objects. Available since version 4.1 onwards
objectMessageSerializationDefered false When an object is set on an ObjectMessage, the JMS spec requires the object to be serialized by that set method. Enabling this flag causes the object to not get serialized. The object may subsequently get serialized if the message needs to be sent over a socket or stored to disk.
optimizeAcknowledge false Enables an optimised acknowledgement mode where messages are acknowledged in batches rather than individually. Alternatively, you could use Session.DUPS_OK_ACKNOWLEDGE acknowledgement mode for the consumers which can often be faster.WARNING enabling this issue could cause some issues with auto-acknowledgement on reconnection  
optimizedMessageDispatch true If this flag is set then an larger prefetch limit is used - only applicable for durable topic subscribers  
useAsyncSend false Forces the use of Async Sends which adds a massive performance boost; but means that the send() method will return immediately whether the message has been sent or not which could lead to message loss.
useCompression false Enables the use of compression of the message bodies
useRetroactiveConsumer false Sets whether or not retroactive consumers are enabled. Retroactive consumers allow non-durable topic subscribers to receive old messages that were published before the non-durable subscriber started.

4.5 優化

優化部分請參閱:http://devzone.logicblaze.com/site/how-to-tune-activemq.html

5. ActiveMQ配置

5.1 配置文件

ActiveMQ配置文件:$AcrtiveMQ/conf/activemq.xml

5.2 配置ActiveMQ服務IP和端口

<transportConnectors>

<transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/>

<transportConnector name="ssl" uri="ssl://localhost:61617"/>

<transportConnector name="stomp" uri="stomp://localhost:61613"/

</transportConnectors>

 

在transportConnectors標識中配置ActiveMQ服務IP和端口,其中name屬性指定協議的名稱,uri屬性指定協議所對應的協議名,IP地址和端口號。上述IP地址和端口能夠根據實際須要指定。Java客戶端默認使用openwire協議,因此ActiveMQ服務地址爲tcp://localhost:61616;目前C++客戶端僅支持stomp協議,因此ActiveMQ服務地址爲tcp://localhost:61613。

5.3 分佈式部署

分佈式部署請參閱:http://activemq.apache.org/networks-of-brokers.html

5.4 監控ActiveMQ

本節將使用JXM和JXM控制檯(JDK1.5控制檯)監控ActiveMQ。

5.4.1 配置JXM

<broker brokerName="emv219" useJmx="true" xmlns="http://activemq.org/config/1.0">

<managementContext>

<managementContext connectorPort="1099" jmxDomainName="org.apache.activemq"/>

</managementContext>

</broker>

配置JXM步驟以下:

1. 設置broker標識的useJmx屬性爲true;

2. 取消對managementContext標識的註釋(系統默認註釋managementContext標識),監控的默認端口爲1099。

5.4.2 在Windows平臺監控

進入%JAVA_HOME%/bin,雙擊jconsole.exe即出現以下畫面,在對話框中輸入ActiveMQ服務主機的地址,JXM的端口和主機登錄賬號。

6. 目前存在問題

6.1 C++客戶端丟失消息問題

ActiveMQ版本:ActiveMQ 4.1.1SNAPSHOT

C++客戶端版本:ActiveMQ CPP 1.1 Release

測試中發現,當C++客戶端異常退出時(即沒有正常調用close函數關閉鏈接),ActiveMQ並不能檢測到C++客戶端的鏈接已經中斷,這時若是向隊列中發送消息,那麼第一條消息就會丟失,這時ActiveMQ才能檢測到這個鏈接是中斷的。

在ActiveMQ論壇反應此問題後,開發人員答覆並建議使用CLIENT_ACKNOWLEDGE簽收模式。可是此模式會形成消息重複接收。

測試ActiveMQ 4.2SNAPSHOT時並未發現上述問題。

6.2 隊列消息堆積過多後有可能阻塞程序

默認activemq.xml中配置的內存是20M,這就意味着當消息堆積超過20M後,程序可能出現問題。在mial list中其餘用戶對此問題的描述是:send方法會阻塞或拋出異常。ActiveMQ開發人員的答覆:The memory model is different for ActiveMQ 4.1 in that for Queues, only small references to the Queue messages are held in memory. This means that the Queue depth can be considerably bigger than for ActiveMQ 3.2.x.However, our next major release (5.0 nee 4.2) has a more robust model in that Queue messages are paged in from storage only when space is available - hence Queue depth is now limited by how much disk space you have.

6.3 目前版本的C++客戶端僅支持stomp協議

目前版本的C++客戶端程序(ActiveMQ CPP 1.1 Release)僅支持stomp協議,所以傳輸消息的速度應該沒有使用openwire協議的Java客戶端快。ActiveMQ網站顯示不久將會有支持openwire協議的C++客戶端程序發佈。

6.4 分佈式部署問題

ActiveMQ版本:ActiveMQ 4.1.1SNAPSHOT和ActiveMQ 4.2SNAPSHOT

測試選用上述兩個未正式發佈的版本,未選用正式發佈的ActiveMQ 4.1.0 Release版本是由於此版本bug較多。

在測試中發現,若是重啓其中一臺機器上的ActiveMQ,其餘機器的ActiveMQ有可能會打印:

java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:358)
at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:267)
at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:156)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:136)
at java.lang.Thread.run(Thread.java:595)
WARN TransportConnection - Unexpected extra broker info command received: BrokerInfo 
{commandId = 6, responseRequired = false, brokerId = ID:emv219n-33945-1174458770157-1:0, 
brokerURL = tcp://emv219n:61616, slaveBroker = false, masterBroker = false, faultTolerantConfiguration = false, 
networkConnection = false, duplexConnection = false, peerBrokerInfos = [], brokerName = emv219, connectionId = 0}.
INFO FailoverTransport - Transport failed, attempting to automatically reconnect due to: java.io.EOFException。

這時分佈式的消息傳輸就會出現問題,此問題目前還沒找到緣由。

7. 附錄

7.1 完整的Java客戶端例子

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
 * Hello world!
 */
public class App {
    public static void main(String[] args) throws Exception {
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        Thread.sleep(1000);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        Thread.sleep(1000);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldProducer(), false);
        Thread.sleep(1000);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
    }
    public static void thread(Runnable runnable, boolean daemon) {
        Thread brokerThread = new Thread(runnable);
        brokerThread.setDaemon(daemon);
        brokerThread.start();
    }
    public static class HelloWorldProducer implements Runnable {
        public void run() {
            try {
                // Create a ConnectionFactory
                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
                // Create a Connection
                Connection connection = connectionFactory.createConnection();
                connection.start();
                // Create a Session
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                // Create the destination (Topic or Queue)
                Destination destination = session.createQueue("TEST.FOO");
                // Create a MessageProducer from the Session to the Topic or Queue
                MessageProducer producer = session.createProducer(destination);
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                // Create a messages
                String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
                TextMessage message = session.createTextMessage(text);
                // Tell the producer to send the message
                System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName());
                producer.send(message);
                // Clean up
                session.close();
                connection.close();
            }
            catch (Exception e) {
                System.out.println("Caught: " + e);
                e.printStackTrace();
            }
        }
    }
    public static class HelloWorldConsumer implements Runnable, ExceptionListener {
        public void run() {
            try {
                // Create a ConnectionFactory
                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
                // Create a Connection
                Connection connection = connectionFactory.createConnection();
                connection.start()
                connection.setExceptionListener(this);
                // Create a Session
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                // Create the destination (Topic or Queue)
                Destination destination = session.createQueue("TEST.FOO");
                // Create a MessageConsumer from the Session to the Topic or Queue
                MessageConsumer consumer = session.createConsumer(destination);
                // Wait for a message
                Message message = consumer.receive(1000);
                if (message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    String text = textMessage.getText();
                    System.out.println("Received: " + text);
                } else {
                    System.out.println("Received: " + message);
                }
                consumer.close();
                session.close();
                connection.close();
            } catch (Exception e) {
                System.out.println("Caught: " + e);
                e.printStackTrace();
            }
        }
        public synchronized void onException(JMSException ex) {
            System.out.println("JMS Exception occured.  Shutting down client.");
        }
    }
}

7.2 完整的C++客戶端例子

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
using namespace activemq::core;
using namespace activemq::util;
using namespace activemq::concurrent;
using namespace cms;
using namespace std;
class HelloWorldProducer : public Runnable {
private:
    Connection* connection;
	Session* session;
	Destination* destination;
	MessageProducer* producer;
	int numMessages;
	 bool useTopic;
	 public:
    HelloWorldProducer( int numMessages, bool useTopic = false ){
        connection = NULL;
        session = NULL;
        destination = NULL
        producer = NULL;
        this->numMessages = numMessages;
        this->useTopic = useTopic;
    }
    virtual ~HelloWorldProducer(){
        cleanup();
    }
    virtual void run() {
        try {
            // Create a ConnectionFactory
            ActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61613");
            // Create a Connection
            connection = connectionFactory->createConnection();
            connection->start()
            // Create a Session
            session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
            // Create the destination (Topic or Queue)
            if( useTopic ) {
                destination = session->createTopic( "TEST.FOO" );
            } else {
                destination = session->createQueue( "TEST.FOO" );
            }
            // Create a MessageProducer from the Session to the Topic or Queue
            producer = session->createProducer( destination );
            producer->setDeliveryMode( DeliveryMode::NON_PERSISTANT );
            // Create the Thread Id String
            string threadIdStr = Integer::toString( Thread::getId() );
            // Create a messages

            string text = (string)"Hello world! from thread " + threadIdStr;

           

            for( int ix=0; ixcreateTextMessage( text );

 

                // Tell the producer to send the message

                printf( "Sent message from thread %s/n", threadIdStr.c_str() );

                producer->send( message );

                

                delete message;

            }

           

        }catch ( CMSException& e ) {

            e.printStackTrace();

        }

    }

   

private:

 

    void cleanup(){

                   

            // Destroy resources.

            try{                       

                if( destination != NULL ) delete destination;

            }catch ( CMSException& e ) {}

            destination = NULL;

           

            try{

                if( producer != NULL ) delete producer;

            }catch ( CMSException& e ) {}

            producer = NULL;

           

            // Close open resources.

            try{

                if( session != NULL ) session->close();

                if( connection != NULL ) connection->close();

            }catch ( CMSException& e ) {}

 

            try{

                if( session != NULL ) delete session;

            }catch ( CMSException& e ) {}

            session = NULL;

           

            try{

                if( connection != NULL ) delete connection;

            }catch ( CMSException& e ) {}

            connection = NULL;

    }

};

 

class HelloWorldConsumer : public ExceptionListener,

                           public MessageListener,

                           public Runnable {

   

private:

   

    Connection* connection;

    Session* session;

    Destination* destination;

    MessageConsumer* consumer;

    long waitMillis;

    bool useTopic;

       

public:

 

    HelloWorldConsumer( long waitMillis, bool useTopic = false ){

        connection = NULL;

        session = NULL;

        destination = NULL;

        consumer = NULL;

        this->waitMillis = waitMillis;

        this->useTopic = useTopic;

    }

    virtual ~HelloWorldConsumer(){     

        cleanup();

    }

   

    virtual void run() {

               

        try {

 

            // Create a ConnectionFactory

            ActiveMQConnectionFactory* connectionFactory =

                new ActiveMQConnectionFactory( "tcp://127.0.0.1:61613" );

 

            // Create a Connection

            connection = connectionFactory->createConnection();

            delete connectionFactory;

            connection->start();

           

            connection->setExceptionListener(this);

 

            // Create a Session

            session = connection->createSession( Session::AUTO_ACKNOWLEDGE );

 

            // Create the destination (Topic or Queue)

            if( useTopic ) {

                destination = session->createTopic( "TEST.FOO" );

            } else {

                destination = session->createQueue( "TEST.FOO" );

            }

 

            // Create a MessageConsumer from the Session to the Topic or Queue

            consumer = session->createConsumer( destination );

           

            consumer->setMessageListener( this );

           

            // Sleep while asynchronous messages come in.

            Thread::sleep( waitMillis );       

           

        } catch (CMSException& e) {

            e.printStackTrace();

        }

    }

   

    // Called from the consumer since this class is a registered MessageListener.

    virtual void onMessage( const Message* message ){

       

        static int count = 0;

       

        try

        {

            count++;

            const TextMessage* textMessage =

                dynamic_cast< const TextMessage* >( message );

            string text = textMessage->getText();

            printf( "Message #%d Received: %s/n", count, text.c_str() );

        } catch (CMSException& e) {

            e.printStackTrace();

        }

    }

 

    // If something bad happens you see it here as this class is also been

    // registered as an ExceptionListener with the connection.

    virtual void onException( const CMSException& ex ) {

        printf("JMS Exception occured.  Shutting down client./n");

    }

   

private:

 

    void cleanup(){

       

        //*************************************************

        // Always close destination, consumers and producers before

        // you destroy their sessions and connection.

        //*************************************************

       

        // Destroy resources.

        try{                       

            if( destination != NULL ) delete destination;

        }catch (CMSException& e) {}

        destination = NULL;

       

        try{

            if( consumer != NULL ) delete consumer;

        }catch (CMSException& e) {}

        consumer = NULL;

       

        // Close open resources.

        try{

            if( session != NULL ) session->close();

            if( connection != NULL ) connection->close();

        }catch (CMSException& e) {}

       

        // Now Destroy them

        try{

            if( session != NULL ) delete session;

        }catch (CMSException& e) {}

        session = NULL;

       

        try{

            if( connection != NULL ) delete connection;

        }catch (CMSException& e) {}

        connection = NULL;

    }

};

   

int main(int argc, char* argv[]) {

 

    std::cout << "=====================================================/n";   

    std::cout << "Starting the example:" << std::endl;

    std::cout << "-----------------------------------------------------/n";

 

    //============================================================

    // set to true to use topics instead of queues

    // Note in the code above that this causes createTopic or

    // createQueue to be used in both consumer an producer.

    //============================================================   

    bool useTopics = false; 

 

    HelloWorldProducer producer( 1000, useTopics );

    HelloWorldConsumer consumer( 8000, useTopics );

   

    // Start the consumer thread.

    Thread consumerThread( &consumer );

    consumerThread.start();

   

    // Start the producer thread.

    Thread producerThread( &producer );

    producerThread.start();

 

    // Wait for the threads to complete.

    producerThread.join();

    consumerThread.join();

 

    std::cout << "-----------------------------------------------------/n";   

    std::cout << "Finished with the example, ignore errors from this"

              << std::endl

              << "point on as the sockets breaks when we shutdown."

              << std::endl;

    std::cout << "=====================================================/n";   

}

 

 

   
分享到
相關文章
相關標籤/搜索