1、JMS基本概念服務器
2、 jms的消息結構session
消息頭包含消息的識別信息和路由信息,消息頭包含一些標準的屬性以下
1.JMS Destination:由send方法設置
2.JMSDeliveryMode:由send方法設置
3.JMSExpiration:由send方法設置
4.JMSPriority:由send方法設置
5.JMSMessageID:由send方法設置
6.JMSTimestamp:由客戶端設置
7.JMSCorre1ationID:由客戶端設置
8.JMSReplyTo:由客戶端設置
9.JMSType:由客戶端設置
10.JMSRedelivered:由JMS Provider設置
標準的JMS消息頭包含如下屬性:
1:JMSDestination:消息發送的目的地:主要是指Queue和Topic,自動分配
2:JMSDe1iveryMode:傳送模式。有兩種:持久模式和非持久模式。一條持久性的消息應該被傳送「一次僅僅一次」,這就意味者若是JMS提供者出現故障,該消息並不會丟失,它會在服務器恢復以後再次傳遞。一條非持久的消息最多會傳送一次,這味這服務器出現故障,該消息將永遠丟失。自動分配
3:JMSExpiration:消息過時時間,等於Destination的send方法中的timeToLive值加上發送時刻的GMT時間值。若是timeToLive值等於零,則JMSExpiration被設爲零,表示該消息永不過時。若是發送後,在消息過時時間以後消息尚未被髮送到目的地,則該消息被清除。自動分配
4:JMSPriority:消息優先級,從0-9十個級別,0到是普通消息,5一9是加急消息。JMS不要求JMS Provider嚴格接照這十個優先級發送消息,但必須保證加尋消息要先於普通消息到達。默認是4級。自動分配
5:JMSMessageID:惟一識別每一個消息的標識,由JMS Provider產生。自動分配
6:JMSTimestamp:一個JMS Provider在調用send()方法時自動設置的,它是消息被髮送和消費者實際接收的時間差。自動分配
7:JMSCorre1ationID:用來鏈接到另一個消息,典型的應用是在回覆消息中鏈接到原消息。在大多數狀況下,JMSCorre1ationID用於將一條消息標記爲對JMSMessageID標示的上條消息的應答,不過,JMSCorre1ationID能夠是任何值,不只僅是JMSMessageID。由開發者設置
8:JMSReplyTo:提供本消息回覆消息的目的地。由開發者設置
9:JMSType:消息類類型的識別符。由開發者設置
10:JHSRedelivered:若是一個客戶端收到一個設置了JMSRede1ivered屬性的消息,則表示可能客戶端曾經在早些時候收到過該消息,但並無簽收(acknowledged),若是該消息被從新傳送,JMSRede1ivered=true反之,
JMSRedelivered=false。自動設置dom
JMS定義的屬性以下:
1:JMSXUserID:發送消息的用戶標識,發送時提供商設置
2:JMSXAppID:發送消息的應用標識,發送時提供商設置
3:JMSXDe1iveryCount:轉發消息重試次數,第一次是1,第二次是2,...。發送時提供商沒置
4:JMSXGroupID:消息所在消息組的標識,由客戶端設置
5:JMSXGroupSeq::組內消息的序號,第一個小時是1,第二個是2,...。由客戶端設置
6:JMSXProducerTXID:產生消息的事務的事務標識,發送時提供商設置。
7:JMSXConsumerTXID:消費消息的事務的事務標識,接收時提供商設置
8:JMSXRcvTimestamp:JMS轉發消息到消費者的時間,接收時提供商設置
9:JMSXState:假定存在一個消息倉庫,它存儲了每一個消息的的單獨拷貝,且這些消息從原始消息被髮送時開始。每一個拷貝的狀態有:1(等待),2(準備〕,3(到期)或,4(保留)。因爲狀態與生產者和消費者無關,因此它不是由它們來提供。它只和在倉庫中查找消息相關,所以JMS沒有提以這種API。由提供商設置。異步
3、JMS的可靠性機制tcp
持久訂閱
首先消息生產者必須使用PERSISTENT提交消息。客戶能夠經過會話上的
createDurab1eSubscriber方法來建立一個持久訂閱,該方法的第一個參數必須
是一個topic。第二個參數是訂閱的名稱。
JMS provider會存儲發佈到持久訂閱對應的topic上的消息。若是最初建立
持久訂閱的客戶或者任何其它客戶,使用相同的鏈接工廠和鏈接的客戶ID,相同的主題和相同的到訂閱名,再次調用會話上的createDurab1eSubscriber方法,那麼持久訂閱就會被激活。JMS provider會向客戶發送客戶處於非激活狀態時所發佈的消息。
持久訂閱在某個時刻只能有一個激活的訂閱者。持久訂閱在建立以後會一直保留,直到應用程序調用會話上的unsubscribe方法。ide
4、JMS的PTP模型工具
5、JMS的Pub/Sub模型測試
7、JMS應用開發的基本步驟線程
8、非持久的Topic消息示例code
對於非持久的Topic消息的發送
基本跟前面發送隊列信息是同樣的,只是把建立Destination的地方,由創
建隊列替換成建立Topic,例如:
Destination destination = session.createTopic(MyTopic");
3:因爲不知道客戶端發送多少信息,所以改爲while循環的方式了,例如:
Message message = consumer.receive(); while(message!=null){ TextMessage txtMsg= (TextMessage)message; System.out.println(「收到消息:」+txtMsg.getText()); message = consumer.receive(1000L); }
完整示例
public class NoPersistenceSender { public static void main(String[] args) throws Exception{ //鏈接工廠 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); //帶事務的session Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("myTopic"); MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 3; i++) { TextMessage message = session.createTextMessage("message---"+i); producer.send(message); } session.commit(); session.close(); connection.close(); } }
public class NoPersistenceReceive { public static void main(String[] args) throws Exception{ //鏈接工廠 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); //帶事務的session final Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("myTopic"); MessageConsumer consumer = session.createConsumer(destination); Message message = consumer.receive(); while(message!=null){ TextMessage txtMsg= (TextMessage)message; System.out.println("收到消息:"+txtMsg.getText()); message = consumer.receive(1000L); } session.commit(); session.close(); connection.close(); } }
注意:對於非持久性的Topic消息,則須要接收者保持運行狀態,否則消息發送者發出的消息會接收不到。
9、持久的Topic消息示例
1:須要在鏈接上設置消費者ID,用來識別消費者
2:須要建立TopicSubscriber來訂閱
3:要設置好了事後再start這個connnection
4:消費者必定要先運行一次,等於向消息服務中間件註冊這個消費者,而後再運行客戶端發送消息,這個時候不管消費者是否在線,都會接收到,下次鏈接的時候,會把沒有收過的消息都接收下來。
public class PersistenceSender { public static void main(String[] args) throws Exception{ //鏈接工廠 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = connectionFactory.createConnection(); //帶事務的session Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("myTopic"); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); connection.start(); for (int i = 0; i < 3; i++) { TextMessage message = session.createTextMessage("message---"+i); producer.send(message); } session.commit(); session.close(); connection.close(); } }
public class PersistenceReceive { public static void main(String[] args) throws Exception{ //鏈接工廠 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = connectionFactory.createConnection(); connection.setClientID("cc1"); //帶事務的session final Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); Topic destination = session.createTopic("myTopic"); TopicSubscriber ts = session.createDurableSubscriber(destination,"T1"); connection.start(); Message message = ts.receive(); while(message!=null){ TextMessage txtMsg= (TextMessage)message; System.out.println("收到消息:"+txtMsg.getText()); message = ts.receive(1000L); } session.commit(); session.close(); connection.close(); } }
10、總結