JMS即Java消息服務(Java Message Service)應用程序接口,是一個Java平臺中關於面向消息中間件(MOM)的API,用於在兩個應用程序之間,或分佈式系統中發送消息,進行異步通訊。Java消息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供支持。java
要學習JMS,有幾個概念必需要搞清楚:sql
l Messaging (消息通知、消息通訊)數據庫
一種應用系統或組件之間相互通訊的方式(進程直接的通信,可爲不一樣計算機)。編程
l Message (消息)服務器
消息即爲消息通訊的載體,消息包括Message Headers, Message properties, Message bodiessession
l JMS有兩種方式進行消息通訊:Point-to-Point (P2P) 和 Publish/Subscriber (PUB/SUB)異步
P2P方式是一對一的,一條消息只有一個接收者,默認狀況下是P2P消息是持久的,也就是說發送者(sender)產生的一條消息(message)發送到消息隊列(queue)之上後,只有等到消息接收者(receiver)接收到它,纔會從消息隊列中刪除,沒有被接收的消息會一直存在JMS容器裏。這種方式有點像郵政通訊,信件只有一個接收者,信件在接收以前,會一直存放在信箱裏。分佈式
PUB/SUB方式的工做流程,首先subscriber(訂閱者)向JMS容器訂閱(Listen to)本身感興趣的topic(主題),多個訂閱者能夠同時對一個主題進行訂閱,消息發佈者發佈一條消息,全部訂閱了該主題的訂閱者都能收到這個消息。默認狀況下,pub/sub方式下的消息不是持久的,這意味着,消息一經發出,無論有沒有人接收,都不會保存下來,並且訂閱者只能接收到自已訂閱以後發佈者發出的消息。這種方式有點像訂閱報刊雜誌,一種報刊能夠有多人同時訂閱,但訂閱者只能收到開始訂閱以後的報社發行的期刊。ide
l JMS(Java Messaging Service)函數
是Java EE中的一種技術,它定義一套完整的接口,來實現不一樣系統或應用之間的消息通訊。這意味着:咱們針對JMS接口編寫的應用程序(客戶程序),在任何一個實現了標準JMS接口的容器下都能運行起來,咱們的應用程序與容器實現了真正的解藕,這也就是面向接口編程的好處之一吧。這點相似JDBC編程。
l JMS提供者(JMS Provider)
JMS提供者,也叫JMS服務器或JMS容器,也就是JMS服務的提供者,主流的J2EE容器通常都提供JMS服務(好比JBoss,BEA WebLogic,IBM WebSphere,Oracle Application Server等都支持)
l 鏈接工廠(Connection Factories)
鏈接工廠是用來建立客戶端到JMS容器之間JMS鏈接的工廠,鏈接工廠有兩種:(QueueConnectionFactory和TopicConnectionFactory),分別用來建立QueueConnection 和 TopicConnection的。
Context ctx = new InitialContext(); QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) ctx.lookup("QueueConnectionFactory"); TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) ctx.lookup("TopicConnectionFactory");
l 目的地(Destinations)
目的地是消息生產者(producer)消息發住的目的地,也是消費者(consumer)接收消息的來源地,它有點像信箱,郵遞員把信件投往信箱,收件人從信箱取信件。對P2P方式來講,目的地就是Queue,對pub/sub方式來講,目的地就是Topic。咱們要獲得這個目的地的引用,只能經過JNDI查找(lookup)的方式獲得,由於目的地是註冊在JMS服務器的(後面的章節會講到如何註冊一個目的地)
Topic myTopic = (Topic) ctx.lookup("MyTopic"); Queue myQueue = (Queue) ctx.lookup("MyQueue");
l 鏈接(Connection)
這裏說的鏈接是指客戶端與JMS提供者(容器)之間的鏈接。鏈接也分兩種:QueueConnection和TopicConnection,分別對應於P2P鏈接和Pub/Sub鏈接。
QueueConnection queueConnection = queueConnectionFactory.createQueueConnection(); TopicConnection topicConnection = topicConnectionFactory.createTopicConnection();
鏈接用完以後必須記得關閉,不然鏈接資源不會被釋放掉。關閉鏈接的同時會自動把會話、產生者、消費者都關閉掉。
l 會話(Session)
會話是用來建立消息產生者和消息消費者的單線程環境,你能夠它來建立消息生產者、消費者、消息,用它來維持消息監聽。
TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); QueueSession queueSession = queueConnection.createQueueSession(true, 0);
l 消息生產者(Message Producers)
消息生產者也就是消息的產生者或發送者,在P2P方式下它是QueueSender
,在Pub/Sub方式下它是TopicPublisher
。
它是一個由session建立的,用來把把消息發送到目的地的對象。
QueueSender queueSender = queueSession.createSender(myQueue); TopicPublisher topicPublisher = topicSession.createPublisher(myTopic);
一旦你建立好生產者,你就能夠用它來發送消息
queueSender.send(message); topicPublisher.publish(message);
l 消息消費者(Message Consumer)
消息消費者也就是消息的接收者或使用者,在P2P方式下這是QueueReceiver,在Pub/Sub方式下它是TopicSubscriber。這是一個由session來建立的,用來接收來自目的地消息的對象。JMS容器來負責把消息從目的地投遞到註冊了該目的地的消息消費者。
QueueReceiver queueReceiver = queueSession.createReceiver(myQueue); TopicSubscriber topicSubscriber = topicSession.createSubscriber(myTopic);
一旦建立好消息消費者,它就是活動的,你能夠用它來接收消息,你也能夠用close()方法來使它失效(Inactive)。當你調用Connection的start()方法以前,消費者是不會接收到任何消息的。兩種接收者都有一個receive方法,這是一個同步的方法,也就是說程序執行到這個方法會被阻塞,直到收到消息爲止。
queueConnection.start(); Message m = queueReceiver.receive();
topicConnection.start(); Message m = topicSubscriber.receive(1000); // time out after a second
若是咱們不想它被阻塞,就須要異步的接收消息,這時咱們得用消息臨聽器(Message Listener)了。
l 消息監聽器(Message Listener)
消息監聽器是一個充當消息的異步事件處理器的對象,它實現了MessageListener接口,這個接口只有一個方法onMessage,在這個方法裏,你能夠定義當接收到消息以後的要作的操做。你能夠用setMessageListener方法爲消息消費者註冊一個監聽器。
MessageListener listener = new MessageListener( {
public void onMessage(Message msg) { // } });
topicSubscriber.setMessageListener(listener); //註冊監聽 topicConnection.start();
有一點要注意,若是你先調用Connection的start,而後再調用setMessageListener,消息極可能接收不到,正確的作法是先註冊監聽,再啓動Connection。
註冊監聽以後,一旦JMS容器有消費投遞過來,消息消費(接收)者就會自動調用監聽器的onMessage方法。這個方法的帶有一個參數Message,這就接收到的消息。
l 消息選擇器(Message Selectors)
假如你只須要一個對濾器來過濾收到的消息,那麼你可使用消息選擇器,它容許消費者指定只對特定的消息感興趣。消息選擇器只能是工做在JMS容器的,而不是咱們的應用程序上。消息選擇器是一個包含一個表達式的字符串,這個表達式的語法相似SQL的條件表達式,在createReceiver, createSubscriber這些方法裏有一個參數讓你指定一個消息選擇器,由這些方法建立的消費者就只能收到與消息選擇器匹配的消息了。
l 消息(Messages)
JMS消息包括三個部分:消息頭(Header),屬性(Properties),消息體(Body)
其中消息頭是必須的,後兩個是可選的。
1)消息頭裏你能夠指定JMSMessageID, JMSCorrelationID, JMSReplyTo, JMSType等信息。
2)屬性指定一些消息頭沒有包括的附加信息,好比能夠在屬性裏指定消息選擇器。
3)消息體是消息的內容,有5種消息類型:TextMessage,MapMessage,BytesMessage,StreamMessage,ObjectMessage=-
TextMessage message = queueSession.createTextMessage(); message.setText(msg_text); // msg_text is a String queueSender.send(message);
在消費者端,接收到的老是一個通用的Message對象,你須要把它轉型成特定的類型才能提取出裏面的內容。
Message m = queueReceiver.receive(); if (m instanceof TextMessage) { TextMessage message = (TextMessage) m; System.out.println("Reading message: " + message.getText()); } else { // Handle error}
實戰篇
前面對JMS概念的做了一個基本介紹,下面咱們看一個具體的例子程序
Pub/sub方式的消息傳遞的例子:
l HelloPublisher.java
package com.jms.test;
import java.util.Hashtable;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
/**
* pub/sub方式的消息發送程序
*/
public class HelloPublisher {
TopicConnection topicConnection;// JMS鏈接,屬於Pub/Sub方式的鏈接
TopicSession topicSession; //JMS會話,屬於Pub/Sub方式的會話
TopicPublisher topicPublisher; //消息發佈者
Topic topic; //主題
public HelloPublisher(String factoryJNDI, String topicJNDI)
throws JMSException, NamingException {
Hashtable<String, String> env = new Hashtable<String, String>();
//設置好鏈接JMS容器的屬性,不一樣的容器須要的屬性可能不一樣,須要查閱相關文檔
env.put(Context.INITIAL_CONTEXT_FACTORY,
"org.jnp.interfaces.NamingContextFactory");
env.put(Context.PROVIDER_URL, "localhost:1099");
env.put("java.naming.rmi.security.manager", "yes");
env.put(Context.URL_PKG_PREFIXES, "org.jboss.naming");
//建立鏈接JMS容器的上下文(context)
Context context = new InitialContext(env);
//經過鏈接工廠的JNDI名查找ConnectionFactory
TopicConnectionFactory topicFactory =
(TopicConnectionFactory) context.lookup(factoryJNDI);
//用鏈接工廠建立一個JMS鏈接
topicConnection = topicFactory.createTopicConnection();
//經過JMS鏈接建立一個Session
topicSession = topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
//經過上下文查找到一個主題(topic)
topic = (Topic) context.lookup(topicJNDI);
//用session來建立一個特定主題的消息發送者
topicPublisher = topicSession.createPublisher(topic);
}
/**
* 發佈一條文本消息
* @param msg 待發布的消息
* @throws JMSException
*/
public void publish(String msg) throws JMSException {
//用session來建立一個文本類型的消息
TextMessage message = topicSession.createTextMessage();
message.setText(msg);//設置消息內容
topicPublisher.publish(topic, message);//消息發送,發送到特定主題
}
public void close() throws JMSException {
topicSession.close();//關閉session
topicConnection.close();//關閉鏈接
}
public static void main(String[] args)
throws JMSException, NamingException {
HelloPublisher publisher =
new HelloPublisher("ConnectionFactory", "topic/testTopic");
try {
for (int i = 1; i < 11; i++) {
String msg = "Hello World no. " + i;
System.out.println("Publishing message: " + msg);
publisher.publish(msg);
}
publisher.close();//session和connection用完以後必定記得關閉
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
程序在控制檯輸出:
Publishing message: Hello World no. 1
Publishing message: Hello World no. 2
Publishing message: Hello World no. 3
Publishing message: Hello World no. 4
Publishing message: Hello World no. 5
Publishing message: Hello World no. 6
Publishing message: Hello World no. 7
Publishing message: Hello World no. 8
Publishing message: Hello World no. 9
Publishing message: Hello World no. 10
l HelloSubscriber.java
package com.jms.test;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
/**
* pub/sub方式下的消息接收器。注意,這個消息接收器能夠與上面的消息發送器能夠工做
* 在不一樣的JVM中,只要保證它們各自可以連通JMS容器(JMS Provider)
*
*/
public class HelloSubscriber implements MessageListener {
TopicConnection topicConnection;
TopicSession topicSession;
TopicSubscriber topicSubscriber;
Topic topic;
public HelloSubscriber(String factoryJNDI, String topicJNDI)
throws JMSException, NamingException {
Hashtable<String, String> env = new Hashtable<String, String>();
//設置好鏈接JMS容器的屬性,不一樣的容器須要的屬性可能不一樣,須要查閱相關文檔
env.put(Context.INITIAL_CONTEXT_FACTORY,
"org.jnp.interfaces.NamingContextFactory");
env.put(Context.PROVIDER_URL, "localhost:1099");
env.put("java.naming.rmi.security.manager", "yes");
env.put(Context.URL_PKG_PREFIXES, "org.jboss.naming");
Context context = new InitialContext();
TopicConnectionFactory topicFactory =
(TopicConnectionFactory) context.lookup(factoryJNDI);
//建立鏈接
topicConnection = topicFactory.createTopicConnection();
topicSession = topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);//建立session
topic = (Topic) context.lookup(topicJNDI);//查找到主題
//用session建立一個特定queue的消息接收者
topicSubscriber = topicSession.createSubscriber(topic);
//註冊監聽,這裏設置的監聽是本身,由於本類已經實現了MessageListener接口,
//一旦queueReceiver接收到了消息,就會調用本類的onMessage方法
topicSubscriber.setMessageListener(this);
System.out.println("HelloSubscriber subscribed to topic: "
+ topicJNDI);
topicConnection.start();//啓動鏈接,這時監聽器才真正生效
}
public void onMessage(Message msg) {
try {
if (msg instanceof TextMessage) {
//把Message 轉型成 TextMessage 並提取消息內容
String msgTxt = ((TextMessage) msg).getText();
System.out.println("HelloSubscriber got message: " +
msgTxt);
}
} catch (JMSException ex) {
System.err.println("Could not get text message: " + ex);
ex.printStackTrace();
}
}
public void close() throws JMSException {
topicSession.close();
topicConnection.close();
}
public static void main(String[] args) {
try {
new HelloSubscriber("TopicConnectionFactory",
"topic/testTopic");
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
程序在控制檯輸出:
HelloSubscriber subscribed to topic: topic/testTopic
HelloSubscriber got message: Hello World no. 1
HelloSubscriber got message: Hello World no. 2
HelloSubscriber got message: Hello World no. 3
HelloSubscriber got message: Hello World no. 4
HelloSubscriber got message: Hello World no. 5
HelloSubscriber got message: Hello World no. 6
HelloSubscriber got message: Hello World no. 7
HelloSubscriber got message: Hello World no. 8
HelloSubscriber got message: Hello World no. 9
HelloSubscriber got message: Hello World no. 10
P2P方式下的消息傳遞
l HelloQueue.java
package com.jms.test;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.QueueSender;
import javax.jms.Queue;
import javax.jms.TextMessage;
import javax.jms.Session;
import javax.jms.JMSException;
import java.util.Hashtable;
public class HelloQueue {
QueueConnection queueConnection; //queue方式的JMS鏈接
QueueSession queueSession; //queue會話
QueueSender queueSender; //queue消息發送者
Queue queue; //消息隊列
public HelloQueue(String factoryJNDI, String topicJNDI)
throws JMSException, NamingException {
//鏈接JMS Provider的環境參數
Hashtable<String, String> props = new Hashtable<String, String>();
props.put(Context.INITIAL_CONTEXT_FACTORY,
"org.jnp.interfaces.NamingContextFactory");
//JMS provider的主機和端口
props.put(Context.PROVIDER_URL, "localhost:1099");
props.put("java.naming.rmi.security.manager", "yes");
props.put(Context.URL_PKG_PREFIXES, "org.jboss.naming");
Context context = new InitialContext(props);
//lookup到鏈接工廠
QueueConnectionFactory queueFactory =
(QueueConnectionFactory) context.lookup(factoryJNDI);
queueConnection = queueFactory.createQueueConnection();//建立鏈接
queueSession = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);//建立會話
queue = (Queue) context.lookup(topicJNDI);//lookup到特定的消息隊列
queueSender = queueSession.createSender(queue);//建立隊列消息的發送者
}
public void send(String msg) throws JMSException {
TextMessage message = queueSession.createTextMessage();
message.setText(msg);
queueSender.send(queue, message);
}
public void close() throws JMSException {
queueSession.close();
queueConnection.close();
}
public static void main(String[] args) {
try {
HelloQueue queue = new HelloQueue("ConnectionFactory",
"queue/testQueue");
for (int i = 11; i < 21; i++) {
String msg = "Hello World no. " + i;
System.out.println("Hello Queue Publishing message: " + msg);
queue.send(msg);
}
queue.close();
} catch (Exception ex) {
System.err.println("An exception occurred " +
"while testing HelloPublisher25: " + ex);
ex.printStackTrace();
}
}
}
程序在控制檯輸出:
Hello Queue Publishing message: " Hello World no. 11
Hello Queue Publishing message: " Hello World no. 12
Hello Queue Publishing message: " Hello World no. 13
Hello Queue Publishing message: " Hello World no. 14
Hello Queue Publishing message: " Hello World no. 15
Hello Queue Publishing message: " Hello World no. 16
Hello Queue Publishing message: " Hello World no. 17
Hello Queue Publishing message: " Hello World no. 18
Hello Queue Publishing message: " Hello World no. 19
Hello Queue Publishing message: " Hello World no. 20
l HelloRecvQueue.java
package com.jms.test;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.QueueReceiver;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class HelloRecvQueue implements MessageListener {
QueueConnection queueConnection;
QueueSession queueSession;
QueueReceiver queueReceiver;
Queue queue;
public HelloRecvQueue(String factoryJNDI, String topicJNDI)
throws JMSException, NamingException {
Context context = new InitialContext();
QueueConnectionFactory queueFactory =
(QueueConnectionFactory) context.lookup(factoryJNDI);
queueConnection = queueFactory.createQueueConnection();
queueSession = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
queue = (Queue) context.lookup(topicJNDI);
queueReceiver = queueSession.createReceiver(queue);
queueReceiver.setMessageListener(this);
System.out.println("HelloReceQueue receiver to queue: " + topicJNDI);
queueConnection.start();
}
public void onMessage(Message m) {
try {
String msg = ((TextMessage) m).getText();
System.out.println("HelloReceQueue got message: " + msg);
} catch (JMSException ex) {
System.err.println("Could not get text message: " + ex);
ex.printStackTrace();
}
}
public void close() throws JMSException {
queueSession.close();
queueConnection.close();
}
Public ovid main(String[] args) {
new HelloRecvQueue();
}
}
程序在控制檯輸出:
HelloReceQueue got message: Hello World no. 11
HelloReceQueue got message: Hello World no. 12
HelloReceQueue got message: Hello World no. 13
HelloReceQueue got message: Hello World no. 14
HelloReceQueue got message: Hello World no. 15
HelloReceQueue got message: Hello World no. 16
HelloReceQueue got message: Hello World no. 17
HelloReceQueue got message: Hello World no. 18
HelloReceQueue got message: Hello World no. 19
HelloReceQueue got message: Hello World no. 20
下面咱們來看看是JMS是在JBoss下如何配置的,首先JMS須要一個數據庫來保存其持久化的消息,幸運的是JBoss自帶有一個開源的JAVA數據庫HSQL(www.hsqldb.org)
在這裏簡單地介紹一下這個數據庫,它支持標準的SQL語法和JDBC接口,是一個用純JAVA編寫的數據庫,其實它只有一個jar文件而已:hsqldb.jar,在%JBOSS_HOME%/server/default/lib目錄下你能找到它。
啓動這個數據庫有三種模式:Server模式、進程模式和內存模式,在Server模式下,你能夠用下面的命令讓它啓動起來:
$cd %JBOSS_HOME%/server/default/lib
$ java -cp hsqldb.jar org.hsqldb.Server -database.0 mydb -dbname.0 demoDB
其中mydb是數據庫名,demoDB是數據庫別名,咱們用JDBC連它是就用這個別名,用戶名是sa,密碼默認是空,咱們下列語句就能建立表、插入數據了
create table employee (
employee_id int,
employee_name varchar(50),
age int,
hiredate date
)
insert into employee values(1, 'linyufa', 33, '2007-12-17')
insert into employee values(2, 'scott', 25, '2008-11-23')
insert into employee values(3, 'larry', 35, '2004-11-23')
想進一步瞭解HSQL的知識,網上有不少學習資料,好了,回到咱們討論的JMS話題,有了這個數據庫,那咱們就沒必要去找其餘數據庫了,JMS默認是用內存模式來啓動它的,因此咱們基本上不用去關心它是如何工做的。
1) 在 %JBOSS_HOME%/server/default/deploy/jms目錄下,
打開hsqldb-jdbc-state-service.xml文件,
<depends optional-attribute-name="ConnectionManager">
jboss.jca:service= DataSourceBinding, name=DefaultDS
</depends>
DefaultDS這個名字就是JMS鏈接數據庫的數據源,可讓其保持默認值。
2) 再在同一目錄打開hsqldb-jdbc2-service.xml 文件,
<depends optional-attribute-name="ConnectionManager">
jboss.jca:service=DataSourceBinding,name=DefaultDS
</depends>
DefaultDS這個名字保持和前面一致便可,也可讓其保持默認值。
3) 在同一目錄打開jbossmq-destinations-service.xml文件,找到下面的代碼段:
<mbean code="org.jboss.mq.server.jmx.Topic"
name="jboss.mq.destination:service=Topic,name=testTopic">
<depends optional-attribute-name="DestinationManager">
jboss.mq:service=DestinationManager
</depends>
<depends optional-attribute-name="SecurityManager">
jboss.mq:service=SecurityManager
</depends>
<attribute name="SecurityConf">
<security>
<role name="guest" read="true" write="true"/>
<role name="publisher" read="true" write="true" create="false"/>
<role name="durpublisher" read="true" write="true" create="true"/>
</security>
</attribute>
</mbean>
這是定義一個名叫testTopic的示例,若是你要定義一個新的topic,只須要複製這段代碼,改一下name屬性便可。
一樣找到下面這段的代碼,這是定義一個名叫testQueue的示例,若是要定義一個新的queue,複製這段代碼,改一下名字便可。
<mbean code="org.jboss.mq.server.jmx.Queue"
name="jboss.mq.destination:service=Queue,name=testQueue">
<depends optional-attribute-name="DestinationManager">
jboss.mq:service=DestinationManager
</depends>
<depends optional-attribute-name="SecurityManager">
jboss.mq:service=SecurityManager
</depends>
<attribute name="MessageCounterHistoryDayLimit">-1</attribute>
<attribute name="SecurityConf">
<security>
<role name="guest" read="true" write="true"/>
<role name="publisher" read="true" write="true" create="false"/>
<role name="noacc" read="false" write="false" create="false"/>
</security>
</attribute>
</mbean>
4) 啓動Jboss後在控制檯看到以下輸出,即說明JMS正常啓動了
09:50:28,390 INFO [A] Bound to JNDI name: queue/A
09:50:28,406 INFO [B] Bound to JNDI name: queue/B
09:50:28,406 INFO [C] Bound to JNDI name: queue/C
09:50:28,406 INFO [D] Bound to JNDI name: queue/D
09:50:28,421 INFO [ex] Bound to JNDI name: queue/ex
09:50:28,437 INFO [testTopic] Bound to JNDI name: topic/testTopic
09:50:28,484 INFO [securedTopic] Bound to JNDI name: topic/securedTopic
09:50:28,484 INFO [testDurableTopic] Bound to JNDI name: topic/testDurableTopic
09:50:28,500 INFO [testQueue] Bound to JNDI name: queue/testQueue
5) 若是是Jboss4.2或以上的版本,在啓動Jboss時必須指定 –b 0.0.0.0參數,不然本機以外的任何主機都沒法鏈接JMS。能夠修改run.bat或run.sh文件,也能夠在運行命令時附帶上這個參數,以下 sh run.sh –b 0.0.0.0
從上面介紹能夠看出,在Jboss下配置JMS是很是簡單的,僅須要copy一段代碼,改個名字便可。若是在WebLogic下,你就要依次配置JMS Module, ConnectionFactory, Topic, Queue, Template,不過好在console都有嚮導,很是直觀,因此配置起來也不是什麼難事。
建立一個JMS Connection、查找ConnectionFactory和Destination都是須要很大的系統開銷的操做,因此咱們的應用程序應避免頻繁地去作這些操做。通常狀況下,咱們能夠把ConnectionFactory,Connection, Topic, Queue定義成類的成員變量,並在類的構造函數裏初始化他們,避免在每次接收和發送JMS消息時去作這些工做。可是所以也帶了一個問題,就是說當Connection不可用了(好比JMS Server重啓了),咱們的應用程序就會開始不工做了,因此咱們要有一種機制去檢測咱們的Connection是否有效,若是已經斷掉,應該試圖去從新鏈接,並通知系統管理員。
JMS的Connection和JDBC的Connection相似,再也不使用後應該關閉,不論是正常退出,仍是異常退出,不然別的客戶程序可能就再也取不到鏈接了。Session也是如此。
由於JMS工做模式是異步的,咱們要意識到調用Connection.start()這個方法,系統已經啓動了一個新的線程在工做,也就是說退出了這行語句所在的方法以後,這個線程還在工做,它會不斷地去偵聽有沒有新的JMS消息,直到這個Connection被關閉或不可用。
要學習JMS,有幾個概念必需要搞清楚:
l Messaging (消息通知、消息通訊)
一種應用系統或組件之間相互通訊的方式。
l Message (消息)
消息即爲消息通訊的載體,消息包括Message Headers, Message properties, Message bodies
l JMS有兩種方式進行消息通訊:Point-to-Point (P2P) 和 Publish/Subscriber (PUB/SUB)
P2P方式是一對一的,一條消息只有一個接收者,默認狀況下是P2P消息是持久的,也就是說發送者(sender)產生的一條消息(message)發送到消息隊列(queue)之上後,只有等到消息接收者(receiver)接收到它,纔會從消息隊列中刪除,沒有被接收的消息會一直存在JMS容器裏。這種方式有點像郵政通訊,信件只有一個接收者,信件在接收以前,會一直存放在信箱裏。
PUB/SUB方式的工做流程,首先subscriber(訂閱者)向JMS容器訂閱(Listen to)本身感興趣的topic(主題),多個訂閱者能夠同時對一個主題進行訂閱,消息發佈者發佈一條消息,全部訂閱了該主題的訂閱者都能收到這個消息。默認狀況下,pub/sub方式下的消息不是持久的,這意味着,消息一經發出,無論有沒有人接收,都不會保存下來,並且訂閱者只能接收到自已訂閱以後發佈者發出的消息。這種方式有點像訂閱報刊雜誌,一種報刊能夠有多人同時訂閱,但訂閱者只能收到開始訂閱以後的報社發行的期刊。
l JMS(Java Messaging Service)
是Java EE中的一種技術,它定義一套完整的接口,來實現不一樣系統或應用之間的消息通訊。這意味着:咱們針對JMS接口編寫的應用程序(客戶程序),在任何一個實現了標準JMS接口的容器下都能運行起來,咱們的應用程序與容器實現了真正的解藕,這也就是面向接口編程的好處之一吧。這點相似JDBC編程。
l JMS提供者(JMS Provider)
JMS提供者,也叫JMS服務器或JMS容器,也就是JMS服務的提供者,主流的J2EE容器通常都提供JMS服務(好比JBoss,BEA WebLogic,IBM WebSphere,Oracle Application Server等都支持)
l 鏈接工廠(Connection Factories)
鏈接工廠是用來建立客戶端到JMS容器之間JMS鏈接的工廠,鏈接工廠有兩種:(QueueConnectionFactory和TopicConnectionFactory),分別用來建立QueueConnection 和 TopicConnection的。
Context ctx = new InitialContext(); QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) ctx.lookup("QueueConnectionFactory"); TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) ctx.lookup("TopicConnectionFactory");
l 目的地(Destinations)
目的地是消息生產者(producer)消息發住的目的地,也是消費者(consumer)接收消息的來源地,它有點像信箱,郵遞員把信件投往信箱,收件人從信箱取信件。對P2P方式來講,目的地就是Queue,對pub/sub方式來講,目的地就是Topic。咱們要獲得這個目的地的引用,只能經過JNDI查找(lookup)的方式獲得,由於目的地是註冊在JMS服務器的(後面的章節會講到如何註冊一個目的地)
Topic myTopic = (Topic) ctx.lookup("MyTopic"); Queue myQueue = (Queue) ctx.lookup("MyQueue");
l 鏈接(Connection)
這裏說的鏈接是指客戶端與JMS提供者(容器)之間的鏈接。鏈接也分兩種:QueueConnection和TopicConnection,分別對應於P2P鏈接和Pub/Sub鏈接。
QueueConnection queueConnection = queueConnectionFactory.createQueueConnection(); TopicConnection topicConnection = topicConnectionFactory.createTopicConnection();
鏈接用完以後必須記得關閉,不然鏈接資源不會被釋放掉。關閉鏈接的同時會自動把會話、產生者、消費者都關閉掉。
l 會話(Session)
會話是用來建立消息產生者和消息消費者的單線程環境,你能夠它來建立消息生產者、消費者、消息,用它來維持消息監聽。
TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); QueueSession queueSession = queueConnection.createQueueSession(true, 0);
l 消息生產者(Message Producers)
消息生產者也就是消息的產生者或發送者,在P2P方式下它是QueueSender
,在Pub/Sub方式下它是TopicPublisher
。
它是一個由session建立的,用來把把消息發送到目的地的對象。
QueueSender queueSender = queueSession.createSender(myQueue); TopicPublisher topicPublisher = topicSession.createPublisher(myTopic);
一旦你建立好生產者,你就能夠用它來發送消息
queueSender.send(message); topicPublisher.publish(message);
l 消息消費者(Message Consumer)
消息消費者也就是消息的接收者或使用者,在P2P方式下這是QueueReceiver,在Pub/Sub方式下它是TopicSubscriber。這是一個由session來建立的,用來接收來自目的地消息的對象。JMS容器來負責把消息從目的地投遞到註冊了該目的地的消息消費者。
QueueReceiver queueReceiver = queueSession.createReceiver(myQueue); TopicSubscriber topicSubscriber = topicSession.createSubscriber(myTopic);
一旦建立好消息消費者,它就是活動的,你能夠用它來接收消息,你也能夠用close()方法來使它失效(Inactive)。當你調用Connection的start()方法以前,消費者是不會接收到任何消息的。兩種接收者都有一個receive方法,這是一個同步的方法,也就是說程序執行到這個方法會被阻塞,直到收到消息爲止。
queueConnection.start(); Message m = queueReceiver.receive();
topicConnection.start(); Message m = topicSubscriber.receive(1000); // time out after a second
若是咱們不想它被阻塞,就須要異步的接收消息,這時咱們得用消息臨聽器(Message Listener)了。
l 消息監聽器(Message Listener)
消息監聽器是一個充當消息的異步事件處理器的對象,它實現了MessageListener接口,這個接口只有一個方法onMessage,在這個方法裏,你能夠定義當接收到消息以後的要作的操做。你能夠用setMessageListener方法爲消息消費者註冊一個監聽器。
MessageListener listener = new MessageListener( {
public void onMessage(Message msg) { // } });
topicSubscriber.setMessageListener(listener); //註冊監聽 topicConnection.start();
有一點要注意,若是你先調用Connection的start,而後再調用setMessageListener,消息極可能接收不到,正確的作法是先註冊監聽,再啓動Connection。
註冊監聽以後,一旦JMS容器有消費投遞過來,消息消費(接收)者就會自動調用監聽器的onMessage方法。這個方法的帶有一個參數Message,這就接收到的消息。
l 消息選擇器(Message Selectors)
假如你只須要一個對濾器來過濾收到的消息,那麼你可使用消息選擇器,它容許消費者指定只對特定的消息感興趣。消息選擇器只能是工做在JMS容器的,而不是咱們的應用程序上。消息選擇器是一個包含一個表達式的字符串,這個表達式的語法相似SQL的條件表達式,在createReceiver, createSubscriber這些方法裏有一個參數讓你指定一個消息選擇器,由這些方法建立的消費者就只能收到與消息選擇器匹配的消息了。
l 消息(Messages)
JMS消息包括三個部分:消息頭(Header),屬性(Properties),消息體(Body)
其中消息頭是必須的,後兩個是可選的。
1)消息頭裏你能夠指定JMSMessageID, JMSCorrelationID, JMSReplyTo, JMSType等信息。
2)屬性指定一些消息頭沒有包括的附加信息,好比能夠在屬性裏指定消息選擇器。
3)消息體是消息的內容,有5種消息類型:TextMessage,MapMessage,BytesMessage,StreamMessage,ObjectMessage=-
TextMessage message = queueSession.createTextMessage(); message.setText(msg_text); // msg_text is a String queueSender.send(message);
在消費者端,接收到的老是一個通用的Message對象,你須要把它轉型成特定的類型才能提取出裏面的內容。
Message m = queueReceiver.receive(); if (m instanceof TextMessage) { TextMessage message = (TextMessage) m; System.out.println("Reading message: " + message.getText()); } else { // Handle error}
實戰篇
前面對JMS概念的做了一個基本介紹,下面咱們看一個具體的例子程序
Pub/sub方式的消息傳遞的例子:
l HelloPublisher.java
package com.jms.test;
import java.util.Hashtable;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
/**
* pub/sub方式的消息發送程序
*/
public class HelloPublisher {
TopicConnection topicConnection;// JMS鏈接,屬於Pub/Sub方式的鏈接
TopicSession topicSession; //JMS會話,屬於Pub/Sub方式的會話
TopicPublisher topicPublisher; //消息發佈者
Topic topic; //主題
public HelloPublisher(String factoryJNDI, String topicJNDI)
throws JMSException, NamingException {
Hashtable<String, String> env = new Hashtable<String, String>();
//設置好鏈接JMS容器的屬性,不一樣的容器須要的屬性可能不一樣,須要查閱相關文檔
env.put(Context.INITIAL_CONTEXT_FACTORY,
"org.jnp.interfaces.NamingContextFactory");
env.put(Context.PROVIDER_URL, "localhost:1099");
env.put("java.naming.rmi.security.manager", "yes");
env.put(Context.URL_PKG_PREFIXES, "org.jboss.naming");
//建立鏈接JMS容器的上下文(context)
Context context = new InitialContext(env);
//經過鏈接工廠的JNDI名查找ConnectionFactory
TopicConnectionFactory topicFactory =
(TopicConnectionFactory) context.lookup(factoryJNDI);
//用鏈接工廠建立一個JMS鏈接
topicConnection = topicFactory.createTopicConnection();
//經過JMS鏈接建立一個Session
topicSession = topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
//經過上下文查找到一個主題(topic)
topic = (Topic) context.lookup(topicJNDI);
//用session來建立一個特定主題的消息發送者
topicPublisher = topicSession.createPublisher(topic);
}
/**
* 發佈一條文本消息
* @param msg 待發布的消息
* @throws JMSException
*/
public void publish(String msg) throws JMSException {
//用session來建立一個文本類型的消息
TextMessage message = topicSession.createTextMessage();
message.setText(msg);//設置消息內容
topicPublisher.publish(topic, message);//消息發送,發送到特定主題
}
public void close() throws JMSException {
topicSession.close();//關閉session
topicConnection.close();//關閉鏈接
}
public static void main(String[] args)
throws JMSException, NamingException {
HelloPublisher publisher =
new HelloPublisher("ConnectionFactory", "topic/testTopic");
try {
for (int i = 1; i < 11; i++) {
String msg = "Hello World no. " + i;
System.out.println("Publishing message: " + msg);
publisher.publish(msg);
}
publisher.close();//session和connection用完以後必定記得關閉
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
程序在控制檯輸出:
Publishing message: Hello World no. 1
Publishing message: Hello World no. 2
Publishing message: Hello World no. 3
Publishing message: Hello World no. 4
Publishing message: Hello World no. 5
Publishing message: Hello World no. 6
Publishing message: Hello World no. 7
Publishing message: Hello World no. 8
Publishing message: Hello World no. 9
Publishing message: Hello World no. 10
l HelloSubscriber.java
package com.jms.test;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
/**
* pub/sub方式下的消息接收器。注意,這個消息接收器能夠與上面的消息發送器能夠工做
* 在不一樣的JVM中,只要保證它們各自可以連通JMS容器(JMS Provider)
*
*/
public class HelloSubscriber implements MessageListener {
TopicConnection topicConnection;
TopicSession topicSession;
TopicSubscriber topicSubscriber;
Topic topic;
public HelloSubscriber(String factoryJNDI, String topicJNDI)
throws JMSException, NamingException {
Hashtable<String, String> env = new Hashtable<String, String>();
//設置好鏈接JMS容器的屬性,不一樣的容器須要的屬性可能不一樣,須要查閱相關文檔
env.put(Context.INITIAL_CONTEXT_FACTORY,
"org.jnp.interfaces.NamingContextFactory");
env.put(Context.PROVIDER_URL, "localhost:1099");
env.put("java.naming.rmi.security.manager", "yes");
env.put(Context.URL_PKG_PREFIXES, "org.jboss.naming");
Context context = new InitialContext();
TopicConnectionFactory topicFactory =
(TopicConnectionFactory) context.lookup(factoryJNDI);
//建立鏈接
topicConnection = topicFactory.createTopicConnection();
topicSession = topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);//建立session
topic = (Topic) context.lookup(topicJNDI);//查找到主題
//用session建立一個特定queue的消息接收者
topicSubscriber = topicSession.createSubscriber(topic);
//註冊監聽,這裏設置的監聽是本身,由於本類已經實現了MessageListener接口,
//一旦queueReceiver接收到了消息,就會調用本類的onMessage方法
topicSubscriber.setMessageListener(this);
System.out.println("HelloSubscriber subscribed to topic: "
+ topicJNDI);
topicConnection.start();//啓動鏈接,這時監聽器才真正生效
}
public void onMessage(Message msg) {
try {
if (msg instanceof TextMessage) {
//把Message 轉型成 TextMessage 並提取消息內容
String msgTxt = ((TextMessage) msg).getText();
System.out.println("HelloSubscriber got message: " +
msgTxt);
}
} catch (JMSException ex) {
System.err.println("Could not get text message: " + ex);
ex.printStackTrace();
}
}
public void close() throws JMSException {
topicSession.close();
topicConnection.close();
}
public static void main(String[] args) {
try {
new HelloSubscriber("TopicConnectionFactory",
"topic/testTopic");
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
程序在控制檯輸出:
HelloSubscriber subscribed to topic: topic/testTopic
HelloSubscriber got message: Hello World no. 1
HelloSubscriber got message: Hello World no. 2
HelloSubscriber got message: Hello World no. 3
HelloSubscriber got message: Hello World no. 4
HelloSubscriber got message: Hello World no. 5
HelloSubscriber got message: Hello World no. 6
HelloSubscriber got message: Hello World no. 7
HelloSubscriber got message: Hello World no. 8
HelloSubscriber got message: Hello World no. 9
HelloSubscriber got message: Hello World no. 10
P2P方式下的消息傳遞
l HelloQueue.java
package com.jms.test;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.QueueSender;
import javax.jms.Queue;
import javax.jms.TextMessage;
import javax.jms.Session;
import javax.jms.JMSException;
import java.util.Hashtable;
public class HelloQueue {
QueueConnection queueConnection; //queue方式的JMS鏈接
QueueSession queueSession; //queue會話
QueueSender queueSender; //queue消息發送者
Queue queue; //消息隊列
public HelloQueue(String factoryJNDI, String topicJNDI)
throws JMSException, NamingException {
//鏈接JMS Provider的環境參數
Hashtable<String, String> props = new Hashtable<String, String>();
props.put(Context.INITIAL_CONTEXT_FACTORY,
"org.jnp.interfaces.NamingContextFactory");
//JMS provider的主機和端口
props.put(Context.PROVIDER_URL, "localhost:1099");
props.put("java.naming.rmi.security.manager", "yes");
props.put(Context.URL_PKG_PREFIXES, "org.jboss.naming");
Context context = new InitialContext(props);
//lookup到鏈接工廠
QueueConnectionFactory queueFactory =
(QueueConnectionFactory) context.lookup(factoryJNDI);
queueConnection = queueFactory.createQueueConnection();//建立鏈接
queueSession = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);//建立會話
queue = (Queue) context.lookup(topicJNDI);//lookup到特定的消息隊列
queueSender = queueSession.createSender(queue);//建立隊列消息的發送者
}
public void send(String msg) throws JMSException {
TextMessage message = queueSession.createTextMessage();
message.setText(msg);
queueSender.send(queue, message);
}
public void close() throws JMSException {
queueSession.close();
queueConnection.close();
}
public static void main(String[] args) {
try {
HelloQueue queue = new HelloQueue("ConnectionFactory",
"queue/testQueue");
for (int i = 11; i < 21; i++) {
String msg = "Hello World no. " + i;
System.out.println("Hello Queue Publishing message: " + msg);
queue.send(msg);
}
queue.close();
} catch (Exception ex) {
System.err.println("An exception occurred " +
"while testing HelloPublisher25: " + ex);
ex.printStackTrace();
}
}
}
程序在控制檯輸出:
Hello Queue Publishing message: " Hello World no. 11
Hello Queue Publishing message: " Hello World no. 12
Hello Queue Publishing message: " Hello World no. 13
Hello Queue Publishing message: " Hello World no. 14
Hello Queue Publishing message: " Hello World no. 15
Hello Queue Publishing message: " Hello World no. 16
Hello Queue Publishing message: " Hello World no. 17
Hello Queue Publishing message: " Hello World no. 18
Hello Queue Publishing message: " Hello World no. 19
Hello Queue Publishing message: " Hello World no. 20
l HelloRecvQueue.java
package com.jms.test;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.QueueReceiver;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class HelloRecvQueue implements MessageListener {
QueueConnection queueConnection;
QueueSession queueSession;
QueueReceiver queueReceiver;
Queue queue;
public HelloRecvQueue(String factoryJNDI, String topicJNDI)
throws JMSException, NamingException {
Context context = new InitialContext();
QueueConnectionFactory queueFactory =
(QueueConnectionFactory) context.lookup(factoryJNDI);
queueConnection = queueFactory.createQueueConnection();
queueSession = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
queue = (Queue) context.lookup(topicJNDI);
queueReceiver = queueSession.createReceiver(queue);
queueReceiver.setMessageListener(this);
System.out.println("HelloReceQueue receiver to queue: " + topicJNDI);
queueConnection.start();
}
public void onMessage(Message m) {
try {
String msg = ((TextMessage) m).getText();
System.out.println("HelloReceQueue got message: " + msg);
} catch (JMSException ex) {
System.err.println("Could not get text message: " + ex);
ex.printStackTrace();
}
}
public void close() throws JMSException {
queueSession.close();
queueConnection.close();
}
Public ovid main(String[] args) {
new HelloRecvQueue();
}
}
程序在控制檯輸出:
HelloReceQueue got message: Hello World no. 11
HelloReceQueue got message: Hello World no. 12
HelloReceQueue got message: Hello World no. 13
HelloReceQueue got message: Hello World no. 14
HelloReceQueue got message: Hello World no. 15
HelloReceQueue got message: Hello World no. 16
HelloReceQueue got message: Hello World no. 17
HelloReceQueue got message: Hello World no. 18
HelloReceQueue got message: Hello World no. 19
HelloReceQueue got message: Hello World no. 20
下面咱們來看看是JMS是在JBoss下如何配置的,首先JMS須要一個數據庫來保存其持久化的消息,幸運的是JBoss自帶有一個開源的JAVA數據庫HSQL(www.hsqldb.org)
在這裏簡單地介紹一下這個數據庫,它支持標準的SQL語法和JDBC接口,是一個用純JAVA編寫的數據庫,其實它只有一個jar文件而已:hsqldb.jar,在%JBOSS_HOME%/server/default/lib目錄下你能找到它。
啓動這個數據庫有三種模式:Server模式、進程模式和內存模式,在Server模式下,你能夠用下面的命令讓它啓動起來:
$cd %JBOSS_HOME%/server/default/lib
$ java -cp hsqldb.jar org.hsqldb.Server -database.0 mydb -dbname.0 demoDB
其中mydb是數據庫名,demoDB是數據庫別名,咱們用JDBC連它是就用這個別名,用戶名是sa,密碼默認是空,咱們下列語句就能建立表、插入數據了
create table employee (
employee_id int,
employee_name varchar(50),
age int,
hiredate date
)
insert into employee values(1, 'linyufa', 33, '2007-12-17')
insert into employee values(2, 'scott', 25, '2008-11-23')
insert into employee values(3, 'larry', 35, '2004-11-23')
想進一步瞭解HSQL的知識,網上有不少學習資料,好了,回到咱們討論的JMS話題,有了這個數據庫,那咱們就沒必要去找其餘數據庫了,JMS默認是用內存模式來啓動它的,因此咱們基本上不用去關心它是如何工做的。
1) 在 %JBOSS_HOME%/server/default/deploy/jms目錄下,
打開hsqldb-jdbc-state-service.xml文件,
<depends optional-attribute-name="ConnectionManager">
jboss.jca:service= DataSourceBinding, name=DefaultDS
</depends>
DefaultDS這個名字就是JMS鏈接數據庫的數據源,可讓其保持默認值。
2) 再在同一目錄打開hsqldb-jdbc2-service.xml 文件,
<depends optional-attribute-name="ConnectionManager">
jboss.jca:service=DataSourceBinding,name=DefaultDS
</depends>
DefaultDS這個名字保持和前面一致便可,也可讓其保持默認值。
3) 在同一目錄打開jbossmq-destinations-service.xml文件,找到下面的代碼段:
<mbean code="org.jboss.mq.server.jmx.Topic"
name="jboss.mq.destination:service=Topic,name=testTopic">
<depends optional-attribute-name="DestinationManager">
jboss.mq:service=DestinationManager
</depends>
<depends optional-attribute-name="SecurityManager">
jboss.mq:service=SecurityManager
</depends>
<attribute name="SecurityConf">
<security>
<role name="guest" read="true" write="true"/>
<role name="publisher" read="true" write="true" create="false"/>
<role name="durpublisher" read="true" write="true" create="true"/>
</security>
</attribute>
</mbean>
這是定義一個名叫testTopic的示例,若是你要定義一個新的topic,只須要複製這段代碼,改一下name屬性便可。
一樣找到下面這段的代碼,這是定義一個名叫testQueue的示例,若是要定義一個新的queue,複製這段代碼,改一下名字便可。
<mbean code="org.jboss.mq.server.jmx.Queue"
name="jboss.mq.destination:service=Queue,name=testQueue">
<depends optional-attribute-name="DestinationManager">
jboss.mq:service=DestinationManager
</depends>
<depends optional-attribute-name="SecurityManager">
jboss.mq:service=SecurityManager
</depends>
<attribute name="MessageCounterHistoryDayLimit">-1</attribute>
<attribute name="SecurityConf">
<security>
<role name="guest" read="true" write="true"/>
<role name="publisher" read="true" write="true" create="false"/>
<role name="noacc" read="false" write="false" create="false"/>
</security>
</attribute>
</mbean>
4) 啓動Jboss後在控制檯看到以下輸出,即說明JMS正常啓動了
09:50:28,390 INFO [A] Bound to JNDI name: queue/A
09:50:28,406 INFO [B] Bound to JNDI name: queue/B
09:50:28,406 INFO [C] Bound to JNDI name: queue/C
09:50:28,406 INFO [D] Bound to JNDI name: queue/D
09:50:28,421 INFO [ex] Bound to JNDI name: queue/ex
09:50:28,437 INFO [testTopic] Bound to JNDI name: topic/testTopic
09:50:28,484 INFO [securedTopic] Bound to JNDI name: topic/securedTopic
09:50:28,484 INFO [testDurableTopic] Bound to JNDI name: topic/testDurableTopic
09:50:28,500 INFO [testQueue] Bound to JNDI name: queue/testQueue
5) 若是是Jboss4.2或以上的版本,在啓動Jboss時必須指定 –b 0.0.0.0參數,不然本機以外的任何主機都沒法鏈接JMS。能夠修改run.bat或run.sh文件,也能夠在運行命令時附帶上這個參數,以下 sh run.sh –b 0.0.0.0
從上面介紹能夠看出,在Jboss下配置JMS是很是簡單的,僅須要copy一段代碼,改個名字便可。若是在WebLogic下,你就要依次配置JMS Module, ConnectionFactory, Topic, Queue, Template,不過好在console都有嚮導,很是直觀,因此配置起來也不是什麼難事。
建立一個JMS Connection、查找ConnectionFactory和Destination都是須要很大的系統開銷的操做,因此咱們的應用程序應避免頻繁地去作這些操做。通常狀況下,咱們能夠把ConnectionFactory,Connection, Topic, Queue定義成類的成員變量,並在類的構造函數裏初始化他們,避免在每次接收和發送JMS消息時去作這些工做。可是所以也帶了一個問題,就是說當Connection不可用了(好比JMS Server重啓了),咱們的應用程序就會開始不工做了,因此咱們要有一種機制去檢測咱們的Connection是否有效,若是已經斷掉,應該試圖去從新鏈接,並通知系統管理員。
JMS的Connection和JDBC的Connection相似,再也不使用後應該關閉,不論是正常退出,仍是異常退出,不然別的客戶程序可能就再也取不到鏈接了。Session也是如此。
由於JMS工做模式是異步的,咱們要意識到調用Connection.start()這個方法,系統已經啓動了一個新的線程在工做,也就是說退出了這行語句所在的方法以後,這個線程還在工做,它會不斷地去偵聽有沒有新的JMS消息,直到這個Connection被關閉或不可用。