1,Java消息服務-JMS

一,消息服務

消息服務指的是兩個應用程序之間進行異步通訊的API,它爲標準消息協議和消息服務提供了一組通用接口,包括建立、發送、讀取消息等,用於支持應用程序開發。在Java中,當兩個應用程序使用JMS進行通訊時,它們之間並非直接相連的,而是經過一個共同的消息收發服務鏈接起來,能夠達到解耦的效果。 java

二,JMS

2.1,簡介 編程

JMS即Java消息服務(Java Message Service)應用程序接口,是一個Java平臺中關於面向消息中間件(MOM-分佈式系統的集成)的API,用於在兩個應用程序之間,或分佈式系統中發送消息,進行異步通訊。 服務器

JMS是一種與廠商無關的 API,用來訪問消息收發系統消息,它相似於JDBC(Java Database Connectivity)。 session

2.2,體系架構 架構

JMS由如下元素組成: 併發

JMS提供者異步

鏈接面向消息中間件的,JMS接口的一個實現。提供者能夠是Java平臺的JMS實現,也能夠是非Java平臺的面向消息中間件的適配器。socket

JMS客戶分佈式

生產或消費基於消息的Java的應用程序或對象。ide

JMS生產者

建立併發送消息的JMS客戶。

JMS消費者

接收消息的JMS客戶。

JMS消息

包括能夠在JMS客戶之間傳遞的數據的對象。

JMS隊列

一個容納那些被髮送的等待閱讀的消息的區域。與隊列名字所暗示的意思不一樣,消息的接受順序並不必定要與消息的發送順序相同。一旦一個消息被閱讀,該消息將被從隊列中移走。

JMS主題

一種支持發送消息給多個訂閱者的機制。

2.3,JMS對象模型

ConnectionFactory

建立Connection對象的工廠,針對兩種不一樣的JMS消息模型,分別有QueueConnectionFactory和TopicConnectionFactory兩種。能夠經過JNDI來查找ConnectionFactory對象。

Connection

Connection表示在客戶端和JMS系統之間創建的連接(對TCP/IP socket的包裝)。Connection能夠產生一個或多個Session。跟ConnectionFactory同樣,Connection也有兩種類型:QueueConnection和TopicConnection。

Session

Session是操做消息的接口。能夠經過session建立生產者、消費者、消息等。Session提供了事務的功能。當須要使用session發送/接收多個消息時,能夠將這些發送/接收動做放到一個事務中。一樣,也分QueueSession和TopicSession。

MessageProducer

消息生產者由Session建立,並用於將消息發送到Destination。一樣,消息生產者分兩種類型:QueueSender和TopicPublisher。能夠調用消息生產者的方法(send或publish方法)發送消息。

MessageConsumer

消息消費者由Session建立,用於接收被髮送到Destination的消息。兩種類型:QueueReceiver和TopicSubscriber。可分別經過session的createReceiver(Queue)或createSubscriber(Topic)來建立。固然,也能夠session的creatDurableSubscriber方法來建立持久化的訂閱者。

Destination

Destination的意思是消息生產者的消息發送目標或者說消息消費者的消息來源。對於消息生產者來講,它的Destination是某個隊列(Queue)或某個主題(Topic);對於消息消費者來講,它的Destination也是某個隊列或主題(即消息來源)。

2.4,JMS消息模型

在JMS標準中,有兩種消息模型PTP(Point to Point),Publish/Subscribe(Pub/Sub)。

2.4.1,PTP模式-點對點消息傳送模型

在點對點消息傳送模型中,應用程序由消息隊列,發送者,接收者組成。每個消息發送給一個特殊的消息隊列,該隊列保存了全部發送給它的消息(除了被接收者消費掉的和過時的消息)。

PTP的特色

1,每一個消息只有一個消費者(Consumer)(即一旦被消費,消息就再也不在消息隊列中)。

2,發送者和接收者之間在時間上沒有依賴性,也就是說當發送者發送了消息以後,無論接收者有沒有正在運行,它不會影響到消息被髮送到隊列。

3,接收者在成功接收消息以後需向隊列發送確認收到通知(acknowledgement)。

2.4.2,Pub/Sub-發佈/訂閱消息傳遞模型

在發佈/訂閱消息模型中,發佈者發佈一個消息,該消息經過topic傳遞給全部的客戶端。在這種模型中,發佈者和訂閱者彼此不知道對方,是匿名的且能夠動態發佈和訂閱topic。

在發佈/訂閱消息模型中,目的地被稱爲主題(topic),topic主要用於保存和傳遞消息,且會一直保存消息直到消息被傳遞給客戶端。

Pub/Sub特色

1,每一個消息能夠有多個消費者。

2,發佈者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須建立一個或多個訂閱者以後,才能消費發佈者的消息,並且爲了消費消息,訂閱者必須保持運行的狀態。

3,爲了緩和這樣嚴格的時間相關性,JMS容許訂閱者建立一個可持久化的訂閱。這樣,即便訂閱者沒有被激活(運行),它也能接收到發佈者的消息。

2.5,接收消息

在JMS中,消息的接收可使用如下兩種方式:

同步

使用同步方式接收消息的話,消息訂閱者調用receive()方法。在receive()中,消息未到達或在到達指定時間以前,方法會阻塞,直到消息可用。

異步

使用異步方式接收消息的話,消息訂閱者需註冊一個消息監聽者,相似於事件監聽器,只要消息到達,JMS服務提供者會經過調用監聽器的onMessage()遞送消息。

2.6,JMS消息結構(Message)

Message主要由三部分組成,分別是Header,Properties,Body, 詳細以下:

Header

消息頭,全部類型的這部分格式都是同樣的

Properties

屬性,按類型能夠分爲應用設置的屬性,標準屬性和消息中間件定義的屬性

Body

消息正文,指咱們具體須要消息傳輸的內容。

下面是Message接口的部分定義,它顯示了JMS消息頭使用的全部方法:

public interface Message {
    public Destination getJMSDestination() throws JMSException;
    public void setJMSDestination(Destination destination) throws JMSException;
    public int getJMSDeliveryMode() throws JMSException
    public void setJMSDeliveryMode(int deliveryMode) throws JMSException;
    public String getJMSMessageID() throws JMSException;
    public void setJMSMessageID(String id) throws JMSException;
    public long getJMSTimestamp() throws JMSException'
    public void setJMSTimestamp(long timestamp) throws JMSException;
    public long getJMSExpiration() throws JMSException;
    public void setJMSExpiration(long expiration) throws JMSException;
    public boolean getJMSRedelivered() throws JMSException;
    public void setJMSRedelivered(boolean redelivered) throws JMSException;
    public int getJMSPriority() throws JMSException;
    public void setJMSPriority(int priority) throws JMSException;
    public Destination getJMSReplyTo() throws JMSException;
    public void setJMSReplyTo(Destination replyTo) throws JMSException;
    public String getJMScorrelationID() throws JMSException;
    public void setJMSCorrelationID(String correlationID) throws JMSException;
    public byte[] getJMSCorrelationIDAsBytes() throws JMSException;
    public void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException;
    public String getJMSType() throws JMSException;
    public void setJMSType(String type) throws JMSException;
}

2.6.1,Header

header中的各個屬性,能夠分爲兩大類:

2.6.1.1,自動分配的消息頭:

這裏這些JMS消息頭是自動分配的。

在傳送消息時,消息頭的值由JMS提供者來設置,所以開發者使用setJMSxxx()方法分配的值就被忽略了。換句話說,對於大多數自動分配的消息頭來講,使用賦值函數方法顯然是徒勞的。不過,這並不是意味着開發者沒法控制這些消息頭的值。一些自動分配的消息頭能夠在建立Session和MessageProducer(也就是TopicPublisher)時,由開發者經過編程方式來設置。

屬性名稱        

說明        

設置者

JMSDeliveryMode

消息的發送模式,分爲NON_PERSISTENTPERSISTENT,即非持久性模式的和持久性模式。默認設置爲PERSISTENT(持久性)。

一條持久性消息應該被傳送一次(就一次),這就意味着若是JMS提供者出現故障,該消息並不會丟失; 它會在服務器恢復正常以後再次傳送。

一條非持久性消息最多隻會傳送一次,這意味着若是JMS提供者出現故障,該消息可能會永久丟失。

在持久性和非持久性這兩種傳送模式中,消息服務器都不會將一條消息向同一消息者發送一次以上(成功算一次)。

//在消息生產者上設置JMS傳送模式

TopicPublisher topicPublisher = topicSession.createPublisher(topic);

topicPubiisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

send

JMSMessageID

消息ID,須要以ID:開頭,用於惟一地標識了一條消息

send

JMSTimestamp

消息發送時的時間。這條消息頭用於肯定發送消息和它被消費者實際接收的時間間隔。時間戳是一個以毫秒來計算的Long類型時間值(自1970年1月1日算起)。

send

JMSExpiration

消息的過時時間,以毫秒爲單位,用來防止把過時的消息傳送給消費者。任何直接經過編程方式來調用setJMSExpiration()方法都會被忽略。

TopicPublisher topicPublisher = topicSession.createPublisher(topic);

//將生存時間設置爲1小時(1000毫秒 *60 60

topicPublisher.setTimeToLive(3600000);

send

JMSRedelivered

消息是否重複發送過,若是該消息以前發送過,那麼這個屬性的值須要被設置爲true, 客戶端能夠根據這個屬性的值來確認這個消息是否重複發送過,以免重複處理。

Provider

JMSPriority

消息的優先級,0-4爲普通的優化級,而5-9爲高優先級,一般狀況下,高優化級的消息須要優先發送。任何直接經過編程方式調用setJMSPriority()方法都將被忽略。

TopicPublisher topicPublisher = TopicSession.createPublisher(someTopic);

//設置消息的優先級

topicPublisher.setPriority(9);

send

JMSDestination

消息發送的目的地,是一個Topic或Queue

send

2.6.1.2, 開發者分配的消息頭:

屬性名稱        

說明        

設置者

JMSCorrelationID

關聯的消息ID,這個一般用在須要回傳消息的時候

client

JMSReplyTo

消息回覆的目的地,其值爲一個Topic或Queue, 這個由發送者設置,可是接收者能夠決定是否響應

client

JMSType

由消息發送者設置的消息類型,表明消息的結構,有的消息中間件可能會用到這個,但這個並非是批消息的種類,好比TextMessage之類的

client

從上表中咱們能夠看到,系統提供的標準頭信息一共有10個屬性,其中有6個是由send方法在調用時設置的,有三個是由客戶端(client)設置的,還有一個是由消息中間件(Provider)設置的。

須要注意的是,這裏的客戶端(client)不是指消費者,而是指使用JMS的客戶端,即開發者所寫的應用程序,即在生產消息時,這三個屬性是能夠由應用程序來設定的,而其它的header要麼由消息中間件設置,要麼由發送方法來決定,開發者即便設置了,也是無效的。測試以下:

生產者:

//建立文本消息
TextMessage textMessage = session.createTextMessage("消息內容" + (i + 1 ));
//消息發送的目的地
textMessage.setJMSDestination(new Queue(){
    @Override
    public String getQueueName() throws JMSException {
        return name;
    }
});
//消息的發送模式
textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
//消息ID
textMessage.setJMSMessageID("ID:JMSMessageID");
//消息發送時的時間
textMessage.setJMSTimestamp(1000);
//關聯的消息ID
textMessage.setJMSCorrelationID("100:JMSCorrelationID");
//消息回覆的目的地
textMessage.setJMSReplyTo(new Queue(){
    @Override
    public String getQueueName() throws JMSException {
        return name;
    }
});
//消息是否重複發送過
textMessage.setJMSRedelivered(true);
//消息類型,表明消息的結構
textMessage.setJMSType("type");
//消息的過時時間,以毫秒爲單位
textMessage.setJMSExpiration(36000);
//消息的優先級,0-4爲普通的優化級,而5-9爲高優先級
textMessage.setJMSPriority(5);

消費者:

TextMessage msg = (TextMessage) messageConsumer.receive();
//得到消息的發送模式
int jmsDeliveryMode = msg.getJMSDeliveryMode();
//得到消息ID
String jmsMessageID = msg.getJMSMessageID();
//得到消息發送時的時間
Long jmsTimestamp = msg.getJMSTimestamp();
//得到關聯的消息ID
String jmsCorrelationID = msg.getJMSCorrelationID();
//得到消息回覆的目的地
String jmsReplyTo = ((Queue)msg.getJMSReplyTo()).getQueueName();
//得到消息是否重複發送過
Boolean jmsRedelivered = msg.getJMSRedelivered();
//得到消息類型,表明消息的結構
String jmsType = msg.getJMSType();
//得到消息的過時時間,以毫秒爲單位
Long jmsExpiration = msg.getJMSExpiration();
//得到消息的優先級,0-4爲普通的優化級,而5-9爲高優先級
int jmsPriority = msg.getJMSPriority();
System.out.println("jmsDeliveryMode:" + jmsDeliveryMode);
System.out.println("jmsMessageID:" + jmsMessageID);
System.out.println("jmsTimestamp:" + jmsTimestamp);
System.out.println("jmsCorrelationID:" + jmsCorrelationID);
System.out.println("jmsReplyTo:" + jmsReplyTo);
System.out.println("jmsRedelivered:" + jmsRedelivered);
System.out.println("jmsType:" + jmsType);
System.out.println("jmsExpiration:" + jmsExpiration);
System.out.println("jmsPriority:" + jmsPriority);
System.out.println("----------------------------");

結果:

只有紅框的JmsType,ReplyTo,CorrelationId能夠顯示設置,其它設置了都無效。

public interface Message {
    public Destination getJMSDestination() throws JMSException;
    public void setJMSDestination(Destination destination) throws JMSException;
    public int getJMSDeliveryMode() throws JMSException
    public void setJMSDeliveryMode(int deliveryMode) throws JMSException;
    public String getJMSMessageID() throws JMSException;
    public void setJMSMessageID(String id) throws JMSException;
    public long getJMSTimestamp() throws JMSException'
    public void setJMSTimestamp(long timestamp) throws JMSException;
    public long getJMSExpiration() throws JMSException;
    public void setJMSExpiration(long expiration) throws JMSException;
    public boolean getJMSRedelivered() throws JMSException;
    public void setJMSRedelivered(boolean redelivered) throws JMSException;
    public int getJMSPriority() throws JMSException;
    public void setJMSPriority(int priority) throws JMSException;
    public Destination getJMSReplyTo() throws JMSException;
    public void setJMSReplyTo(Destination replyTo) throws JMSException;
    public String getJMScorrelationID() throws JMSException;
    public void setJMSCorrelationID(String correlationID) throws JMSException;
    public byte[] getJMSCorrelationIDAsBytes() throws JMSException;
    public void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException;
    public String getJMSType() throws JMSException;
    public void setJMSType(String type) throws JMSException;
}

2.6.2,消息屬性

消息的屬性就像能夠分配給一條消息的附加消息頭同樣。它們容許開發者添加有關消息的不透明附加信息。它們還用於暴露消息選擇器在消息過濾時使用的數據。

Message接口爲讀取和寫入屬性提供了若干個取值函數和賦值函數方法。消息的屬性值能夠是Stringboolean , byte,shortdoubleint ,longfloat型。Message接口爲每種類型的屬性值都提供了取值函數和賦值方法。以下:

public interface Message {
    public String getStringProperty(String name) throws JMSException,MessageFormatException;
    public void setStringProperty(String name,String value) throws JMSException,MessageNotWriteableException;
    public int getIntProperty(String name) throws JMSException,MessageFormatException;
    public void setIntProperty(String name,int value) throws JMSException,MessageNotWriteableException;
    public boolean getBooleanProperty(String name) throws JMSException,MessageFormatException;
    public void setBooleanProperty(String name,boolean value) throws JMSException,MessageNotWriteableException;
    public double getDoubleProperty(String name) throws JMSException,MessageFormatException;
    public void setDoubleProperty(String name) throws JMSException,MessageFormatException;
    public float getFloatProperty (String name) throws JMSException,MessageFormatExdeption;
    public void setFloatProperty(String name,float value) throws JMSException,MessageNotWriteableException;
    public byte getByteProperty(String name) throws JMSException,MessageFormatException;
    public void setByteProperty(String name) throws JMSException,MessageNotWriteableException;
    public long getLongProperty(String name) throws JMSException,MessageNotWriteableException;
    public void setLongProperty(String name,long value) throws JMSException,MessageNotWriteableException;
    public short getShortProperty(String name) throws JMSException,MessageFormatException;
    public void setShortProperty(String name,short value) throws JMSException,MessageNotWriteableException;
    public Object getObjectProperty(String name) throws JMSException,MessageNotWriteableException;
    public void setObjectProperty(String name,Object value) throws JMSException,MessageNotWriteableException;
    ......
}

消息屬性有3種基本類型

2.6.2.1,應用程序特定的屬性

由應用程序開發者定義的全部屬性均可以做爲一個應用程序特定的屬性。應用程序屬性在消息傳送以前進行設置。並不存在預先定義的應用程序屬性,開發者能夠自由定義可以知足它們須要的任何屬性。例如,在一個應用中,能夠添加一個特定的屬性,該屬性用於標識正在發送消息的用戶:

TextMessage message = pubSession.createTextMessage();

message.setStringProperty("username",username);//自定義屬性

publisher.publish(message);

做爲一個應用程序的特定屬性,username一旦離開該應用程序就變得毫無心義,它專門用於應用程序根據發佈者身份對消息進行過濾。

一旦一條消息發佈或發送之後,它就變成了只讀屬性;消費者或生產者都沒法修改它的屬性。若是消費者試圖設置某個屬性,該方法就會拋出一個javax.jms.MessageNotWriteableException。

2.6.2.2,JMS定義的屬性

JMS定義的屬性具備和應用程序屬性相同的特性,除了前者大多數在消息發送時由JMS提供者來設置以外。JMS定義的屬性能夠做爲可選的JMS消息頭,下面是JMS定義的9個屬性清單:

JMSXUserID

JMSXAppID

JMSXProducerTXID

JMSXConsumerTXID

JMSXRcvTimestamp

JMSXDeliveryCount

JMSXState

JMSXGroupID

JMSXGroupSeq

在這份清單中,只有JMSXGroupID和JMSXGroupSeq須要全部JMS提供者的支持。這些可選屬性用於聚合消息。

請注意:在Message接口中,您將沒法找到對應的setJMSX<PROPERTY>()和getJMSX<PROPERTY>()方法定義,在使用這些方法時,必須使用和應用程序特定屬性相同的方法來設置它們,以下:

message.setStringProperty("JMSXGroupID","GroupID-001");

message.setIntProperty("JMSXGroupSeq",5);

2.6.2.3,提供者特定的屬性

每一個JMS提供者均可以定義一組私有屬性,這些屬性能夠由客戶端或提供者自動設置。提供者特定的屬性必須之前綴JMS開頭,後面緊接着是屬性名稱(JMS<vendor-property-name>),例如:JMSUserID。提供者特定的屬性,其做用就是支持廠商的私有特性。

2.6.3,消息體

爲了適應不一樣場景下的消息,提升消息存儲的靈活性,JMS定義了幾種具體類型的消息,不一樣的子類型的消息體也不同,須要注意的是,Message接口並無提供一個統一的getBody之類的方法。消息子接口定義以下:

TextMessage

最簡單的消息接口,用於發送文本類的消息,設置/獲取其body的方法定義以下setText()/getText()。

StreamMessage

流式消息接口,裏面定義了一系列的對基本類型的set/get方法,消息發送者能夠經過這些方法寫入基本類型的數據,消息接收者須要按發送者的寫入順序來讀取相應的數據。

MapMessage

把消息內容存儲在Map裏,本接口定義了一系列對基本類型的的set/get方法,與StreamMessage不一樣的是,每一個值都對應了一個相應的key,因此消息接收者沒必要按順序去讀取數據。

ObjectMessage

將對象做爲消息的接口,提供了一個set/get 對象的方法,須要注意的是隻能設置一個對象,這個對象能夠是一個Collection,但必須是序列化的。

BytesMessage

以字節的形式來傳遞消息的接口,除了提供了對基本類型的set/get,還提供了按字節方式進行set/get。

相關文章
相關標籤/搜索