ActiveMq 總結(二)

4.2.6 MessageConsumer

MessageConsumer是一個由Session建立的對象,用來從Destination接收消息。html

4.2.6.1 建立MessageConsumer

Java客戶端:java

ActiveMQSession方法:ios

MessageConsumer createConsumer(Destination destination);apache

MessageConsumer createConsumer(Destination destination, String messageSelector);  session

MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal); 併發

TopicSubscriber createDurableSubscriber(Topic topic, String name); app

TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal);異步

其中messageSelector爲消息選擇器;noLocal標誌默認爲false,當設置爲true時限制消費者只能接收和本身相同的鏈接(Connection)所發佈的消息,此標誌只適用於主題,不適用於隊列;name標識訂閱主題所對應的訂閱名稱,持久訂閱時須要設置此參數。socket

例如:async

MessageConsumer consumer = session.createConsumer(destination);

C++客戶端:

函數原型:

cms::MessageConsumer* ActiveMQSession::createConsumer(

    const cms::Destination* destination );

cms::MessageConsumer* ActiveMQSession::createConsumer(

    const cms::Destination* destination,

    const std::string& selector )

        throw ( cms::CMSException );

cms::MessageConsumer* ActiveMQSession::createConsumer(

    const cms::Destination* destination,

    const std::string& selector,

    bool noLocal )

        throw ( cms::CMSException );

cms::MessageConsumer* ActiveMQSession::createDurableConsumer(

    const cms::Topic* destination,

    const std::string& name,

    const std::string& selector,

    bool noLocal )

        throw ( cms::CMSException );

例如:

MessageConsumer* consumer = session->createConsumer( destination );

4.2.6.2消息的同步和異步接收

消息的同步接收是指客戶端主動去接收消息,客戶端能夠採用MessageConsumer 的receive方法去接收下一個消息。
    消息的異步接收是指當消息到達時,ActiveMQ主動通知客戶端。客戶端能夠經過註冊一個實現MessageListener 接口的對象到MessageConsumer。MessageListener只有一個必須實現的方法 —— onMessage,它只接收一個參數,即Message。在爲每一個發送到Destination的消息實現onMessage時,將調用該方法。

Java客戶端:

ActiveMQMessageConsumer方法:

Message receive()

Message receive(long timeout)

Message receiveNoWait()

其中timeout爲等待時間,單位爲毫秒。

或者

實現MessageListener接口,每當消息到達時,ActiveMQ會調用MessageListener中的onMessage 函數。

例如:

Message message = consumer.receive();

C++客戶端:

函數原型:

cms::Message* ActiveMQConsumer::receive() throw ( cms::CMSException )

cms::Message* ActiveMQConsumer::receive( int millisecs )

    throw ( cms::CMSException );

cms::Message* ActiveMQConsumer::receiveNoWait(void)

    throw ( cms::CMSException );

或者

實現MessageListener接口,每當消息到達時,ActiveMQ會調用MessageListener中的onMessage 函數。

例如:

Message *message = consumer->receive();

或者

consumer->setMessageListener( this );

virtual void onMessage( const Message* message ){

        //process message

}

4.2.6.3消息選擇器

JMS提 供了一種機制,使用它,消息服務可根據消息選擇器中的標準來執行消息過濾。生產者可在消息中放入應用程序特有的屬性,而消費者可以使用基於這些屬性的選擇標 準來代表對消息是否感興趣。這就簡化了客戶端的工做,並避免了向不須要這些消息的消費者傳送消息的開銷。然而,它也使得處理選擇標準的消息服務增長了一些 額外開銷。

消息選擇器是用於MessageConsumer的過濾器,能夠用來過濾傳入消息的屬性和消息頭部分(但不過濾消息體),並肯定是否將實際消費該消息。按照JMS文檔的說法,消息選擇器是一些字符串,它們基於某種語法,而這種語法是SQL-92的子集。能夠將消息選擇器做爲MessageConsumer建立的一部分。

Java客戶端:

例如:

public final String SELECTOR = 「JMSType = ‘TOPIC_PUBLISHER’」;

該選擇器檢查了傳入消息的JMSType屬性,並肯定了這個屬性的值是否等於TOPIC_PUBLISHER。若是相等,則消息被消費;若是不相等,那麼消息會被忽略。

4.2.7 Message

JMS程序的最終目的是生產和消費的消息能被其餘程序使用,JMS的 Message是一個既簡單又不乏靈活性的基本格式,容許建立不一樣平臺上符合非JMS程序格式的消息。Message由如下幾部分組成:消息頭,屬性和消息體。

Java客戶端:

ActiveMQSession方法:

BlobMessage createBlobMessage(File file)

BlobMessage createBlobMessage(InputStream in)

BlobMessage createBlobMessage(URL url)

BlobMessage createBlobMessage(URL url, boolean deletedByBroker)

BytesMessage createBytesMessage()

MapMessage createMapMessage()

Message createMessage()

ObjectMessage createObjectMessage()

ObjectMessage createObjectMessage(Serializable object)

TextMessage createTextMessage()

TextMessage createTextMessage(String text)

例如:

下例演示建立併發送一個TextMessage到一個隊列:

TextMessage message = queueSession.createTextMessage();

message.setText(msg_text); // msg_text is a String

queueSender.send(message);

下例演示接收消息並轉換爲合適的消息類型:

Message m = queueReceiver.receive();

if (m instanceof TextMessage) {

               TextMessage message = (TextMessage) m;
               System.out.println("Reading message: " + message.getText());
} else {
               // Handle error
}

C++客戶端:

函數原型:

cms::Message* ActiveMQSession::createMessage(void)

    throw ( cms::CMSException )

cms::BytesMessage* ActiveMQSession::createBytesMessage(void)

    throw ( cms::CMSException )

cms::BytesMessage* ActiveMQSession::createBytesMessage(

    const unsigned char* bytes,

    unsigned long long bytesSize )

        throw ( cms::CMSException )

cms::TextMessage* ActiveMQSession::createTextMessage(void)

    throw ( cms::CMSException )

cms::TextMessage* ActiveMQSession::createTextMessage( const std::string& text )

    throw ( cms::CMSException )

cms::MapMessage* ActiveMQSession::createMapMessage(void)

    throw ( cms::CMSException )

    例如:

下例演示建立併發送一個TextMessage到一個隊列:

TextMessage* message = session->createTextMessage( text ); // text is a string

producer->send( message );

delete message;

下例演示接收消息:

Message *message = consumer->receive();

const TextMessage* textMessage = dynamic_cast< const TextMessage* >( message );

string text = textMessage->getText();

printf( "Received: %s/n", text.c_str() );

delete message;

4.3 可靠性機制

發送消息最可靠的方法就是在事務中發送持久性的消息,ActiveMQ默認發送持久性消息。結束事務有兩種方法:提交或者回滾。當一個事務提交,消息被處理。若是事務中有一個步驟失敗,事務就回滾,這個事務中的已經執行的動做將被撤銷。

接收消息最可靠的方法就是在事務中接收信息,不論是從PTP模式的非臨時隊列接收消息仍是從Pub/Sub模式持久訂閱中接收消息。

對於其餘程序,低可靠性能夠下降開銷和提升性能,例如發送消息時能夠更改消息的優先級或者指定消息的過時時間。

消 息傳送的可靠性越高,須要的開銷和帶寬就越多。性能和可靠性之間的折衷是設計時要重點考慮的一個方面。能夠選擇生成和使用非持久性消息來得到最佳性能。另 一方面,也能夠經過生成和使用持久性消息並使用事務會話來得到最佳可靠性。在這兩種極端之間有許多選擇,這取決於應用程序的要求。

4.3.1 基本可靠性機制

4.3.1.1 控制消息的簽收(Acknowledgment

客戶端成功接收一條消息的標誌是這條消息被簽收。成功接收一條消息通常包括以下三個階段:

1.客戶端接收消息;

2.客戶端處理消息;

3.消息被簽收。簽收能夠由ActiveMQ發起,也能夠由客戶端發起,取決於Session簽收模式的設置。

在帶事務的Session中,簽收自動發生在事務提交時。若是事務回滾,全部已經接收的消息將會被再次傳送。

在不帶事務的Session中,一條消息什麼時候和如何被簽收取決於Session的設置。

1.Session.AUTO_ACKNOWLEDGE

當客戶端從receive或onMessage成功返回時,Session自動簽收客戶端的這條消息的收條。在AUTO_ACKNOWLEDGE的Session中,同步接收receive是上述三個階段的一個例外,在這種狀況下,收條和簽收緊隨在處理消息以後發生。

2.Session.CLIENT_ACKNOWLEDGE

    客戶端經過調用消息的acknowledge方法簽收消息。在這種狀況下,簽收發生在Session層面:簽收一個已消費的消息會自動地簽收這個Session全部已消費消息的收條。

3.Session.DUPS_OK_ACKNOWLEDGE

   此選項指示Session沒必要確保對傳送消息的簽收。它可能引發消息的重複,可是下降了Session的開銷,因此只有客戶端能容忍重複的消息,纔可以使用(若是ActiveMQ再次傳送同一消息,那麼消息頭中的JMSRedelivered將被設置爲true)。

Java客戶端:

簽收模式分別爲:

1.  Session.AUTO_ACKNOWLEDGE

2.  Session.CLIENT_ACKNOWLEDGE

3.  Session.DUPS_OK_ACKNOWLEDGE

ActiveMQConnection方法:

Session createSession(boolean transacted, int acknowledgeMode);

例如:

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

C++客戶端:

簽收模式分別爲:

1.  Session::AUTO_ACKNOWLEDGE

2.  Session::CLIENT_ACKNOWLEDGE

3.  Session::DUPS_OK_ACKNOWLEDGE

4.  Session::SESSION_TRANSACTED

函數原型:

cms::Session* ActiveMQConnection::createSession(

   cms::Session::AcknowledgeMode ackMode )

      throw ( cms::CMSException )

例如:

Session* session = connection->createSession( Session::AUTO_ACKNOWLEDGE );

對隊列來講,若是當一個Session終止時它接收了消息可是沒有簽收,那麼ActiveMQ將保留這些消息並將再次傳送給下一個進入隊列的消費者。

對主題來講,若是持久訂閱用戶終止時,它已消費未簽收的消息也將被保留,直到再次傳送給這個用戶。對於非持久訂閱,AtiveMQ在用戶Session關閉時將刪除這些消息。

若是使用隊列和持久訂閱,而且Session沒有使用事務,那麼可使用Session的recover方法中止Session,再次啓動後將收到它第一條沒有簽收的消息,事實上,重啓後Session一系列消息的傳送都是以上一次最後一條已簽收消息的下一條爲起點。若是這時有消息過時或者高優先級的消息到來,那麼這時消息的傳送將會和最初的有所不一樣。對於非持久訂閱用戶,重啓後,ActiveMQ有可能刪除全部沒有簽收的消息。

4.3.1.2 指定消息傳送模式

ActiveMQ支持兩種消息傳送模式:PERSISTENT和NON_PERSISTENT兩種。

1.PERSISTENT(持久性消息)

        這是ActiveMQ的 默認傳送模式,此模式保證這些消息只被傳送一次和成功使用一次。對於這些消息,可靠性是優先考慮的因素。可靠性的另外一個重要方面是確保持久性消息傳送至目 標後,消息服務在向消費者傳送它們以前不會丟失這些消息。這意味着在持久性消息傳送至目標時,消息服務將其放入持久性數據存儲。若是消息服務因爲某種緣由 致使失敗,它能夠恢復此消息並將此消息傳送至相應的消費者。雖然這樣增長了消息傳送的開銷,但卻增長了可靠性。

2.NON_PERSISTENT(非持久性消息)

    保證這些消息最多被傳送一次。對於這些消息,可靠性並不是主要的考慮因素。此模式並不要求持久性的數據存儲,也不保證消息服務因爲某種緣由致使失敗後消息不會丟失。

有兩種方法指定傳送模式:

1.使用setDeliveryMode方法,這樣全部的消息都採用此傳送模式;

2.使用send方法爲每一條消息設置傳送模式;

Java客戶端:

傳送模式分別爲:

1.  DeliveryMode.PERSISTENT

2.  DeliveryMode.NON_PERSISTENT

ActiveMQMessageProducer方法:

void setDeliveryMode(int newDeliveryMode);

或者

void send(Destination destination, Message message, int deliveryMode, int priority,

long timeToLive);

void send(Message message, int deliveryMode, int priority, long timeToLive);

其中deliveryMode爲傳送模式,priority爲消息優先級,timeToLive爲消息過時時間。

例如:

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

C++客戶端:

傳送模式分別爲:

1.  DeliveryMode::PERSISTANT

2.  DeliveryMode::NON_PERSISTANT

函數原型:

void setDeliveryMode( int mode );

或者

void ActiveMQProducer::send( cms::Message* message, int deliveryMode,

    int priority,

    long long timeToLive )

    throw ( cms::CMSException );

void ActiveMQProducer::send( const cms::Destination* destination,

    cms::Message* message, int deliveryMode,

    int priority, long long timeToLive)

        throw ( cms::CMSException );

例如:

producer->setDeliveryMode( DeliveryMode::NON_PERSISTANT );

若是不指定傳送模式,那麼默認是持久性消息。若是容忍消息丟失,那麼使用非持久性消息能夠改善性能和減小存儲的開銷。

4.3.1.3 設置消息優先級

一般,能夠確保將單個會話向目標發送的全部消息按其發送順序傳送至消費者。然而,若是爲這些消息分配了不一樣的優先級,消息傳送系統將首先嚐試傳送優先級較高的消息。

有兩種方法設置消息的優先級:

1.使用setDeliveryMode方法,這樣全部的消息都採用此傳送模式;

2.使用send方法爲每一條消息設置傳送模式;

Java客戶端:

ActiveMQMessageProducer方法:

void setPriority(int newDefaultPriority);

或者

void send(Destination destination, Message message, int deliveryMode, int priority,

long timeToLive);

void send(Message message, int deliveryMode, int priority, long timeToLive);

其中deliveryMode爲傳送模式,priority爲消息優先級,timeToLive爲消息過時時間。

例如:

producer.setPriority(4);

C++客戶端:

函數原型:

void setPriority( int priority );

或者

void ActiveMQProducer::send( cms::Message* message, int deliveryMode,

    int priority,

    long long timeToLive )

        throw ( cms::CMSException );

void ActiveMQProducer::send( const cms::Destination* destination,

    cms::Message* message, int deliveryMode,

    int priority, long long timeToLive)

        throw ( cms::CMSException );

例如:

producer-> setPriority(4);

消息優先級從0-9十個級別,0-4是普通消息,5-9是加急消息。若是不指定優先級,則默認爲4。JMS不要求嚴格按照這十個優先級發送消息,但必須保證加急消息要先於普通消息到達。

4.3.1.4 容許消息過時

默認狀況下,消息永不會過時。若是消息在特定週期內失去意義,那麼能夠設置過時時間。

有兩種方法設置消息的過時時間,時間單位爲毫秒:

1.使用setTimeToLive方法爲全部的消息設置過時時間;

2.使用send方法爲每一條消息設置過時時間;

Java客戶端:

ActiveMQMessageProducer方法:

void setTimeToLive(long timeToLive);

或者

void send(Destination destination, Message message, int deliveryMode, int priority,

long timeToLive);

void send(Message message, int deliveryMode, int priority, long timeToLive);

其中deliveryMode爲傳送模式,priority爲消息優先級,timeToLive爲消息過時時間。

例如:

producer.setTimeToLive(1000);

C++客戶端:

函數原型:

void setTimeToLive( long long time );

或者

void ActiveMQProducer::send( cms::Message* message, int deliveryMode,

    int priority,

    long long timeToLive )

        throw ( cms::CMSException );

void ActiveMQProducer::send( const cms::Destination* destination,

    cms::Message* message, int deliveryMode,

    int priority, long long timeToLive)

        throw ( cms::CMSException );

例如:

Producer->setTimeToLive(1000);

消息過時時間,send 方法中的timeToLive 值加上發送時刻的GMT 時間值。若是timeToLive值等於零,則JMSExpiration 被設爲零,表示該消息永不過時。若是發送後,在消息過時時間以後消息尚未被髮送到目的地,則該消息被清除。

4.3.1.5 建立臨時目標

ActiveMQ經過createTemporaryQueue和createTemporaryTopic建立臨時目標,這些目標持續到建立它的Connection關閉。只有建立臨時目標的Connection所建立的客戶端才能夠從臨時目標中接收消息,可是任何的生產者均可以向臨時目標中發送消息。若是關閉了建立此目標的Connection,那麼臨時目標被關閉,內容也將消失。

Java客戶端:

ActiveMQSession方法:

TemporaryQueue createTemporaryQueue();

TemporaryTopic createTemporaryTopic();

C++客戶端:

函數原型:

cms::TemporaryQueue* ActiveMQSession::createTemporaryQueue(void)

    throw ( cms::CMSException );

cms::TemporaryTopic* ActiveMQSession::createTemporaryTopic(void)

    throw ( cms::CMSException );

某些客戶端須要一個目標來接收對發送至其餘客戶端的消息的回覆。這時可使用臨時目標。Message的屬性之一是JMSReplyTo屬性,這個屬性就是用於這個目的的。能夠建立一個臨時的Destination,並把它放入Message的JMSReplyTo屬性中,收到該消息的消費者能夠用它來響應生產者。

Java客戶端:

以下所示代碼段,將建立臨時的Destination,並將它放置在TextMessage的JMSReplyTo屬性中:

// Create a temporary queue for replies...

Destination tempQueue = session.createTemporaryQueue();

// Set ReplyTo to temporary queue...

msg.setJMSReplyTo(tempQueue);

消費者接收這條消息時,會從JMSReplyTo字段中提取臨時Destination,而且會經過應用程序構造一個MessageProducer,以便將響應消息發送回生產者。這展現瞭如何使用JMS Message的屬性,並顯示了私有的臨時Destination的有用之處。它還展現了客戶端能夠既是消息的生產者,又能夠是消息的消費者。

// Get the temporary queue from the JMSReplyTo

// property of the message...

Destination tempQueue = msg.getJMSReplyTo();

...

// create a Sender for the temporary queue

MessageProducer Sender = session.createProducer(tempQueue);

TextMessage msg = session.createTextMessage();

msg.setText(REPLYTO_TEXT);

...

// Send the message to the temporary queue...

sender.send(msg);

4.3.2 高級可靠性機制

4.3.2.1 建立持久訂閱

經過爲發佈者設置PERSISTENT傳送模式,爲訂閱者時使用持久訂閱,這樣能夠保證Pub/Sub程序接收全部發布的消息。

消息訂閱分爲非持久訂閱(non-durable subscription)和持久訂閱(durable subscription),非持久訂閱只有當客戶端處於激活狀態,也就是和ActiveMQ保持鏈接狀態才能收到發送到某個主題的消息,而當客戶端處於離線狀態,這個時間段發到主題的消息將會丟失,永遠不會收到。持久訂閱時,客戶端向ActiveMQ註冊一個識別本身身份的ID,當這個客戶端處於離線時,ActiveMQ會爲這個ID 保存全部發送到主題的消息,當客戶端再次鏈接到ActiveMQ時,會根據本身的ID 獲得全部當本身處於離線時發送到主題的消息。持久訂閱會增長開銷,同一時間在持久訂閱中只有一個激活的用戶。

創建持久訂閱的步驟:

1.  爲鏈接設置一個客戶ID

2.  爲訂閱的主題指定一個訂閱名稱;

上述組合必須惟一。

4.3.2.1.1 建立持久訂閱

Java客戶端:

ActiveMQConnection方法:

void setClientID(String newClientID)

ActiveMQSession方法:

TopicSubscriber createDurableSubscriber(Topic topic, String name)

TopicSubscriber createDurableSubscriber(Topic topic, String name, String

messageSelector, boolean noLocal)

其中messageSelector爲消息選擇器;noLocal標誌默認爲false,當設置爲true時限制消費者只能接收和本身相同的鏈接(Connection)所發佈的消息,此標誌只適用於主題,不適用於隊列;name標識訂閱主題所對應的訂閱名稱,持久訂閱時須要設置此參數。

C++客戶端:

函數原型:

virtual void setClientId( const std::string& clientId );

cms::MessageConsumer* ActiveMQSession::createDurableConsumer(

        const cms::Topic* destination,

        const std::string& name,

        const std::string& selector,

        bool noLocal )

        throw ( cms::CMSException )

4.3.2.1.2 刪除持久訂閱

Java客戶端:

ActiveMQSession方法:

void unsubscribe(String name);

4.3.2.2 使用本地事務

 

在事務中生成或使用消息時,ActiveMQ跟蹤各個發送和接收過程,並在客戶端發出提交事務的調用時完成這些操做。若是事務中特定的發送或接收操做失敗,則出現異常。客戶端代碼經過忽略異常、重試操做或回滾整個事務來處理異常。在事務提交時,將完成全部成功的操做。在事務進行回滾時,將取消全部成功的操做。

本地事務的範圍始終爲一個會話。也就是說,能夠將單個會話的上下文中執行的一個或多個生產者或消費者操做組成一個本地事務。

不但單個會話能夠訪問 Queue 或 Topic (任一類型的 Destination ), 並且單個會話實例能夠用來操縱一個或多個隊列以及一個或多個主題,一切都在單個事務中進行。這意味着單個會話能夠(例如)建立隊列和主題中的生產者,而後 使用單個事務來同時發送隊列和主題中的消息。由於單個事務跨越兩個目標,因此,要麼隊列和主題的消息都獲得發送,要麼都未獲得發送。相似地,單個事務能夠 用來接收隊列中的消息並將消息發送到主題上,反過來也能夠。

因爲事務的範圍只能爲單個的會話,所以不存在既包括消息生成又包括消息使用的端對端事務。(換句話說,至目標的消息傳送和隨後進行的至客戶端的消息傳送不能放在同一個事務中。)

4.3.2.2.1 使用事務

Java客戶端:

ActiveMQConnection方法:

Session createSession(boolean transacted, int acknowledgeMode);

其中transacted爲使用事務標識,acknowledgeMode爲簽收模式。

例如:

Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

C++客戶端:

函數原型:

cms::Session* ActiveMQConnection::createSession(

   cms::Session::AcknowledgeMode ackMode );

其中AcknowledgeMode ackMode需指定爲SESSION_TRANSACTED。

例如:

Session* session = connection->createSession( Session:: SESSION_TRANSACTED );

4.3.2.2.2 提交

Java客戶端:

ActiveMQSession方法:

void commit();

例如:

try {

        producer.send(consumer.receive());

        session.commit();

}

catch (JMSException ex) {

        session.rollback();

}

C++客戶端:

函數原型:

void ActiveMQSession::commit(void) throw ( cms::CMSException )

4.3.2.2.3 回滾

Java客戶端:

ActiveMQSession方法:

void rollback();

C++客戶端:

函數原型:

void ActiveMQSession::rollback(void) throw ( cms::CMSException )

4.4 高級特徵

4.4.1 異步發送消息

ActiveMQ支持生產者以同步或異步模式發送消息。使用不一樣的模式對send方法的反應時間有巨大的影響,反映時間是衡量ActiveMQ吞吐量的重要因素,使用異步發送能夠提升系統的性能。

在默認大多數狀況下,AcitveMQ是以異步模式發送消息。例外的狀況:在沒有使用事務的狀況下,生產者以PERSISTENT傳送模式發送消息。在這種狀況下,send方法都是同步的,而且一直阻塞直到ActiveMQ發回確認消息:消息已經存儲在持久性數據存儲中。這種確認機制保證消息不會丟失,但會形成生產者阻塞從而影響反應時間。

高性能的程序通常都能容忍在故障狀況下丟失少許數據。若是編寫這樣的程序,能夠經過使用異步發送來提升吞吐量(甚至在使用PERSISTENT傳送模式的狀況下)。

Java客戶端:

使用Connection URI配置異步發送:

cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");

在ConnectionFactory層面配置異步發送:

((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);

在Connection層面配置異步發送,此層面的設置將覆蓋ConnectionFactory層面的設置:

((ActiveMQConnection)connection).setUseAsyncSend(true);

4.4.2 消費者特點

4.4.2.1 消費者異步分派

在ActiveMQ4中,支持ActiveMQ以同步或異步模式向消費者分派消息。這樣的意義:能夠以異步模式向處理消息慢的消費者分配消息;以同步模式向處理消息快的消費者分配消息。

ActiveMQ默認以同步模式分派消息,這樣的設置能夠提升性能。可是對於處理消息慢的消費者,須要以異步模式分派。

Java客戶端:

在ConnectionFactory層面配置同步分派:

((ActiveMQConnectionFactory)connectionFactory).setDispatchAsync(false);

在Connection層面配置同步分派,此層面的設置將覆蓋ConnectionFactory層面的設置:

((ActiveMQConnection)connection).setDispatchAsync(false);

在消費者層面以Destination URI配置同步分派,此層面的設置將覆蓋ConnectionFactory和Connection層面的設置:

queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false");

consumer = session.createConsumer(queue);

4.4.2.2 消費者優先級

在ActveMQ分佈式環境中,在有消費者存在的狀況下,若是更但願ActveMQ發送消息給消費者而不是其餘的ActveMQ到ActveMQ的傳送,能夠以下設置:

Java客戶端:

queue = new ActiveMQQueue("TEST.QUEUE?consumer.prority=10");

consumer = session.createConsumer(queue);

4.4.2.3 獨佔的消費者

ActiveMQ維護隊列消息的順序並順序把消息分派給消費者。可是若是創建了多個Session和MessageConsumer,那麼同一時刻多個線程同時從一個隊列中接收消息時就並不能保證處理時有序。

有時候有序處理消息是很是重要的。ActiveMQ4支持獨佔的消費。ActiveMQ挑選一個MessageConsumer,並把一個隊列中全部消息按順序分派給它。若是消費者發生故障,那麼ActiveMQ將自動故障轉移並選擇另外一個消費者。能夠以下設置:

Java客戶端:

queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");

consumer = session.createConsumer(queue);

4.4.2.4 再次傳送策略

在如下三種狀況中,消息會被再次傳送給消費者:

1.在使用事務的Session中,調用rollback()方法;

2.在使用事務的Session中,調用commit()方法以前就關閉了Session;

3.在Session中使用CLIENT_ACKNOWLEDGE簽收模式,而且調用了recover()方法。

能夠經過設置ActiveMQConnectionFactory和ActiveMQConnection來定製想要的再次傳送策略。

屬性

 

默認值

 

描述

 

collisionAvoidanceFactor

 

0.15

 

maximumRedeliveries

6

 

initialRedeliveryDelay

1000L

 

useCollisionAvoidance

false

 

useExponentialBackOff

false

 

  backOffMultiplier

 

5

 

The back-off multiplier

 

4.4.3 目標特點

4.4.3.1 複合目標

在1.1版本以後,ActiveMQ支持混合目標技術。它容許在一個JMS目標中使用一組JMS目標。

例如能夠利用混合目標在同一操做中用向12個隊列發送同一條消息或者在同一操做中向一個主題和一個隊列發送同一條消息。

在混合目標中,經過「,」來分隔不一樣的目標。

Java客戶端:

例如:

// send to 3 queues as one logical operation

Queue queue = new ActiveMQQueue("FOO.A,FOO.B,FOO.C");

producer.send(queue, someMessage);

若是在一個目標中混合不一樣類別的目標,能夠經過使用「queue://」和「topic://」前綴來識別不一樣的目標。

例如:

// send to queues and topic one logical operation

Queue queue = new ActiveMQQueue("FOO.A,topic://NOTIFY.FOO.A");

producer.send(queue, someMessage);

4.4.3.2 目標選項

屬性

 

默認值

 

描述

 

consumer.prefetchSize

 

variable

 

consumer.maximumPendingMessageLimit

0

 

consumer.noLocal

false

 

consumer.dispatchAsync

false

 

consumer.retroactive

false

 

consumer.selector

null

 

JMS Selector used with the consumer.

 

consumer.exclusive

 

false

 

Is this an Exclusive Consumer.

 

consumer.priority

 

0

 

Allows you to configure a Consumer Priority.

 

Java客戶端:

例如:

queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false&consumer.prefetchSize=10");

consumer = session.createConsumer(queue);

4.4.4 消息預取

ActiveMQ的目標之一就是高性能的數據傳送,因此ActiveMQ使用「預取限制」來控制有多少消息能及時的傳送給任何地方的消費者。

一旦預取數量達到限制,那麼就不會有消息被分派給這個消費者直到它發回簽收消息(用來標識全部的消息已經被處理)。

能夠爲每一個消費者指定消息預取。若是有大量的消息而且但願更高的性能,那麼能夠爲這個消費者增大預取值。若是有少許的消息而且每條消息的處理都要花費很長的時間,那麼能夠設置預取值爲1,這樣同一時間,ActiveMQ只會爲這個消費者分派一條消息。

Java客戶端:

在ConnectionFactory層面爲全部消費者配置預取值:

tcp://localhost:61616?jms.prefetchPolicy.all=50

在ConnectionFactory層面爲隊列消費者配置預取值:

tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1

使用「目標選項」爲一個消費者配置預取值:

queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");

consumer = session.createConsumer(queue);

4.4.5 配置鏈接URL

ActiveMQ支持經過Configuration URI明確的配置鏈接屬性。

例如:當要設置異步發送時,能夠經過在Configuration URI中使用jms.$PROPERTY來設置。

tcp://localhost:61616?jms.useAsyncSend=true

如下的選項在URI必須以「jms.」爲前綴。

屬性

 

默認值

 

描述

 

alwaysSessionAsync

 

true

 

clientID

null

 

closeTimeout

15000 (milliseconds)

 

Sets the timeout before a close is considered complete. Normally a close() on a connection waits for confirmation from the broker; this allows that operation to timeout to save the client hanging if there is no broker.

copyMessageOnSend

 

true

 

disableTimeStampsByDefault

false

 

dispatchAsync

false

 

Should the broker dispatch messages asynchronously to the consumer.

 

nestedMapAndListEnabled

 

true

 

Enables/disables whether or not Structured Message Properties and MapMessages are supported so that Message properties and MapMessage entries can contain nested Map and List objects. Available since version 4.1 onwards

 

objectMessageSerializationDefered

 

false

 

When an object is set on an ObjectMessage, the JMS spec requires the object to be serialized by that set method. Enabling this flag causes the object to not get serialized. The object may subsequently get serialized if the message needs to be sent over a socket or stored to disk.

 

optimizeAcknowledge

 

false

 

Enables an optimised acknowledgement mode where messages are acknowledged in batches rather than individually. Alternatively, you could use Session.DUPS_OK_ACKNOWLEDGE acknowledgement mode for the consumers which can often be faster. WARNING enabling this issue could cause some issues with auto-acknowledgement on reconnection

 

optimizedMessageDispatch

 

true

 

If this flag is set then an larger prefetch limit is used - only applicable for durable topic subscribers

 

useAsyncSend

 

false

 

Forces the use of Async Sends which adds a massive performance boost; but means that the send() method will return immediately whether the message has been sent or not which could lead to message loss.

 

useCompression

 

false

 

Enables the use of compression of the message bodies

 

useRetroactiveConsumer

 

false

 

Sets whether or not retroactive consumers are enabled. Retroactive consumers allow non-durable topic subscribers to receive old messages that were published before the non-durable subscriber started.

 

4.5 優化

    優化部分請參閱:http://devzone.logicblaze.com/site/how-to-tune-activemq.html

5. ActiveMQ配置

5.1 配置文件

ActiveMQ配置文件:$AcrtiveMQ/conf/activemq.xml

5.2 配置ActiveMQ服務IP和端口

    <transportConnectors>

       <transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/>

       <transportConnector name="ssl"     uri="ssl://localhost:61617"/>

       <transportConnector name="stomp"   uri="stomp://localhost:61613"/>

    </transportConnectors>

在transportConnectors標識中配置ActiveMQ服務IP和端口,其中name屬性指定協議的名稱,uri屬性指定協議所對應的協議名,IP地址和端口號。上述IP地址和端口能夠根據實際須要指定。Java客戶端默認使用openwire協議,因此ActiveMQ服務地址爲tcp://localhost:61616;目前C++客戶端僅支持stomp協議,因此ActiveMQ服務地址爲tcp://localhost:61613。

5.3 分佈式部署

分佈式部署請參閱:http://activemq.apache.org/networks-of-brokers.html

5.4 監控ActiveMQ

本節將使用JXM和JXM控制檯(JDK1.5控制檯)監控ActiveMQ。

5.4.1 配置JXM

<broker brokerName="emv219" useJmx="true" xmlns="http://activemq.org/config/1.0">

<managementContext>

          <managementContext connectorPort="1099" jmxDomainName="org.apache.activemq"/>

</managementContext>

</broker>

配置JXM步驟以下:

1. 設置broker標識的useJmx屬性爲true;

2. 取消對managementContext標識的註釋(系統默認註釋managementContext標識),監控的默認端口爲1099。

5.4.2 Windows平臺監控

進入%JAVA_HOME%/bin,雙擊jconsole.exe即出現以下畫面,在對話框中輸入ActiveMQ服務主機的地址,JXM的端口和主機登錄賬號。

 

 

 

 

 

 

 

 

6. 目前存在問題

6.1 C++客戶端丟失消息問題

ActiveMQ版本:ActiveMQ 4.1.1SNAPSHOT

C++客戶端版本:ActiveMQ CPP 1.1 Release

測試中發現,當C++客戶端異常退出時(即沒有正常調用close函數關閉鏈接),ActiveMQ並不能檢測到C++客戶端的鏈接已經中斷,這時若是向隊列中發送消息,那麼第一條消息就會丟失,這時ActiveMQ才能檢測到這個鏈接是中斷的。

在ActiveMQ論壇反應此問題後,開發人員答覆並建議使用CLIENT_ACKNOWLEDGE簽收模式。可是此模式會形成消息重複接收。

    測試ActiveMQ 4.2SNAPSHOT時並未發現上述問題。

6.2 隊列消息堆積過多後有可能阻塞程序

默認activemq.xml中配置的內存是20M,這就意味着當消息堆積超過20M後,程序可能出現問題。在mial list中其餘用戶對此問題的描述是:send方法會阻塞或拋出異常。ActiveMQ開發人員的答覆:The memory model is different for ActiveMQ 4.1 in that for Queues, only small references to the Queue messages are held in memory. This means that the Queue depth can be considerably bigger than for ActiveMQ 3.2.x.However, our next major release (5.0 nee 4.2) has a more robust model in that Queue messages are paged in from storage only when space is available - hence Queue depth is now limited by how much disk space you have.

6.3 目前版本的C++客戶端僅支持stomp協議

目前版本的C++客戶端程序(ActiveMQ CPP 1.1 Release)僅支持stomp協議,所以傳輸消息的速度應該沒有使用openwire協議的Java客戶端快。ActiveMQ網站顯示不久將會有支持openwire協議的C++客戶端程序發佈。

6.4 分佈式部署問題

ActiveMQ版本:ActiveMQ 4.1.1SNAPSHOT和ActiveMQ 4.2SNAPSHOT

測試選用上述兩個未正式發佈的版本,未選用正式發佈的ActiveMQ 4.1.0 Release版本是由於此版本bug較多。

在測試中發現,若是重啓其中一臺機器上的ActiveMQ,其餘機器的ActiveMQ有可能會打印:

    java.io.EOFException

        at java.io.DataInputStream.readInt(DataInputStream.java:358)

        at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:267)

        at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:156)

        at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:136)

        at java.lang.Thread.run(Thread.java:595)

WARN  TransportConnection            - Unexpected extra broker info command received: BrokerInfo {commandId = 6, responseRequired = false, brokerId = ID:emv219n-33945-1174458770157-1:0, brokerURL = tcp://emv219n:61616, slaveBroker = false, masterBroker = false, faultTolerantConfiguration = false, networkConnection = false, duplexConnection = false, peerBrokerInfos = [], brokerName = emv219, connectionId = 0}.

INFO  FailoverTransport              - Transport failed, attempting to automatically reconnect due to: java.io.EOFException。

這時分佈式的消息傳輸就會出現問題,此問題目前還沒找到緣由。

7. 附錄

7.1 完整的Java客戶端例子

import org.apache.activemq.ActiveMQConnectionFactory;

 

import javax.jms.Connection;

import javax.jms.DeliveryMode;

import javax.jms.Destination;

import javax.jms.ExceptionListener;

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.MessageConsumer;

import javax.jms.MessageProducer;

import javax.jms.Session;

import javax.jms.TextMessage;

 

/**

 * Hello world!

 */

public class App {

 

    public static void main(String[] args) throws Exception {

        thread(new HelloWorldProducer(), false);

        thread(new HelloWorldProducer(), false);

        thread(new HelloWorldConsumer(), false);

        Thread.sleep(1000);

        thread(new HelloWorldConsumer(), false);

        thread(new HelloWorldProducer(), false);

        thread(new HelloWorldConsumer(), false);

        thread(new HelloWorldProducer(), false);

        Thread.sleep(1000);

        thread(new HelloWorldConsumer(), false);

        thread(new HelloWorldProducer(), false);

        thread(new HelloWorldConsumer(), false);

        thread(new HelloWorldConsumer(), false);

        thread(new HelloWorldProducer(), false);

        thread(new HelloWorldProducer(), false);

        Thread.sleep(1000);

        thread(new HelloWorldProducer(), false);

        thread(new HelloWorldConsumer(), false);

        thread(new HelloWorldConsumer(), false);

        thread(new HelloWorldProducer(), false);

        thread(new HelloWorldConsumer(), false);

        thread(new HelloWorldProducer(), false);

        thread(new HelloWorldConsumer(), false);

        thread(new HelloWorldProducer(), false);

        thread(new HelloWorldConsumer(), false);

        thread(new HelloWorldConsumer(), false);

        thread(new HelloWorldProducer(), false);

    }

 

    public static void thread(Runnable runnable, boolean daemon) {

        Thread brokerThread = new Thread(runnable);

        brokerThread.setDaemon(daemon);

        brokerThread.start();

    }

 

    public static class HelloWorldProducer implements Runnable {

        public void run() {

            try {

                // Create a ConnectionFactory

                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");

 

                // Create a Connection

                Connection connection = connectionFactory.createConnection();

                connection.start();

 

                // Create a Session

                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

 

                // Create the destination (Topic or Queue)

                Destination destination = session.createQueue("TEST.FOO");

 

                // Create a MessageProducer from the Session to the Topic or Queue

                MessageProducer producer = session.createProducer(destination);

                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

 

                // Create a messages

                String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();

                TextMessage message = session.createTextMessage(text);

 

                // Tell the producer to send the message

                System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName());

                producer.send(message);

 

                // Clean up

                session.close();

                connection.close();

            }

            catch (Exception e) {

                System.out.println("Caught: " + e);

                e.printStackTrace();

            }

        }

    }

 

    public static class HelloWorldConsumer implements Runnable, ExceptionListener {

        public void run() {

            try {

 

                // Create a ConnectionFactory

                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");

 

                // Create a Connection

                Connection connection = connectionFactory.createConnection();

                connection.start();

 

                connection.setExceptionListener(this);

 

                // Create a Session

                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

 

                // Create the destination (Topic or Queue)

                Destination destination = session.createQueue("TEST.FOO");

 

                // Create a MessageConsumer from the Session to the Topic or Queue

                MessageConsumer consumer = session.createConsumer(destination);

 

                // Wait for a message

                Message message = consumer.receive(1000);

 

                if (message instanceof TextMessage) {

                    TextMessage textMessage = (TextMessage) message;

                    String text = textMessage.getText();

                    System.out.println("Received: " + text);

                } else {

                    System.out.println("Received: " + message);

                }

 

                consumer.close();

                session.close();

                connection.close();

            } catch (Exception e) {

                System.out.println("Caught: " + e);

                e.printStackTrace();

            }

        }

 

        public synchronized void onException(JMSException ex) {

            System.out.println("JMS Exception occured.  Shutting down client.");

        }

    }

}

7.2 完整的C++客戶端例子

#include <activemq/concurrent/Thread.h>

#include <activemq/concurrent/Runnable.h>

#include <activemq/core/ActiveMQConnectionFactory.h>

#include <activemq/util/Integer.h>

#include <cms/Connection.h>

#include <cms/Session.h>

#include <cms/TextMessage.h>

#include <cms/ExceptionListener.h>

#include <cms/MessageListener.h>

#include <stdlib.h>

#include <iostream>

 

using namespace activemq::core;

using namespace activemq::util;

using namespace activemq::concurrent;

using namespace cms;

using namespace std;

 

class HelloWorldProducer : public Runnable {

private:

   

    Connection* connection;

    Session* session;

    Destination* destination;

    MessageProducer* producer;

    int numMessages;

    bool useTopic;

 

public:

   

    HelloWorldProducer( int numMessages, bool useTopic = false ){

        connection = NULL;

        session = NULL;

        destination = NULL;

        producer = NULL;

        this->numMessages = numMessages;

        this->useTopic = useTopic;

    }

   

    virtual ~HelloWorldProducer(){

        cleanup();

    }

   

    virtual void run() {

        try {

            // Create a ConnectionFactory

            ActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61613");

 

            // Create a Connection

            connection = connectionFactory->createConnection();

            connection->start();

 

            // Create a Session

            session = connection->createSession( Session::AUTO_ACKNOWLEDGE );

 

            // Create the destination (Topic or Queue)

            if( useTopic ) {

                destination = session->createTopic( "TEST.FOO" );

            } else {

                destination = session->createQueue( "TEST.FOO" );

            }

 

            // Create a MessageProducer from the Session to the Topic or Queue

            producer = session->createProducer( destination );

            producer->setDeliveryMode( DeliveryMode::NON_PERSISTANT );

           

            // Create the Thread Id String

            string threadIdStr = Integer::toString( Thread::getId() );

           

            // Create a messages

            string text = (string)"Hello world! from thread " + threadIdStr;

           

            for( int ix=0; ix<numMessages; ++ix ){

                TextMessage* message = session->createTextMessage( text );

 

                // Tell the producer to send the message

                printf( "Sent message from thread %s/n", threadIdStr.c_str() );

                producer->send( message );

                

                delete message;

            }

           

        }catch ( CMSException& e ) {

            e.printStackTrace();

        }

    }

   

private:

 

    void cleanup(){

                   

            // Destroy resources.

            try{                       

                if( destination != NULL ) delete destination;

            }catch ( CMSException& e ) {}

            destination = NULL;

           

            try{

                if( producer != NULL ) delete producer;

            }catch ( CMSException& e ) {}

            producer = NULL;

           

            // Close open resources.

            try{

                if( session != NULL ) session->close();

                if( connection != NULL ) connection->close();

            }catch ( CMSException& e ) {}

 

            try{

                if( session != NULL ) delete session;

            }catch ( CMSException& e ) {}

            session = NULL;

           

            try{

                if( connection != NULL ) delete connection;

            }catch ( CMSException& e ) {}

            connection = NULL;

    }

};

 

class HelloWorldConsumer : public ExceptionListener,

                           public MessageListener,

                           public Runnable {

   

private:

   

    Connection* connection;

    Session* session;

    Destination* destination;

    MessageConsumer* consumer;

    long waitMillis;

    bool useTopic;

       

public:

 

    HelloWorldConsumer( long waitMillis, bool useTopic = false ){

        connection = NULL;

        session = NULL;

        destination = NULL;

        consumer = NULL;

        this->waitMillis = waitMillis;

        this->useTopic = useTopic;

    }

    virtual ~HelloWorldConsumer(){     

        cleanup();

    }

   

    virtual void run() {

               

        try {

 

            // Create a ConnectionFactory

            ActiveMQConnectionFactory* connectionFactory =

                new ActiveMQConnectionFactory( "tcp://127.0.0.1:61613" );

 

            // Create a Connection

            connection = connectionFactory->createConnection();

            delete connectionFactory;

            connection->start();

           

            connection->setExceptionListener(this);

 

            // Create a Session

            session = connection->createSession( Session::AUTO_ACKNOWLEDGE );

 

            // Create the destination (Topic or Queue)

            if( useTopic ) {

                destination = session->createTopic( "TEST.FOO" );

            } else {

                destination = session->createQueue( "TEST.FOO" );

            }

 

            // Create a MessageConsumer from the Session to the Topic or Queue

            consumer = session->createConsumer( destination );

           

            consumer->setMessageListener( this );

           

            // Sleep while asynchronous messages come in.

            Thread::sleep( waitMillis );       

           

        } catch (CMSException& e) {

            e.printStackTrace();

        }

    }

   

    // Called from the consumer since this class is a registered MessageListener.

    virtual void onMessage( const Message* message ){

       

        static int count = 0;

       

        try

        {

            count++;

            const TextMessage* textMessage =

                dynamic_cast< const TextMessage* >( message );

            string text = textMessage->getText();

            printf( "Message #%d Received: %s/n", count, text.c_str() );

        } catch (CMSException& e) {

            e.printStackTrace();

        }

    }

 

    // If something bad happens you see it here as this class is also been

    // registered as an ExceptionListener with the connection.

    virtual void onException( const CMSException& ex ) {

        printf("JMS Exception occured.  Shutting down client./n");

    }

   

private:

 

    void cleanup(){

       

        //*************************************************

        // Always close destination, consumers and producers before

        // you destroy their sessions and connection.

        //*************************************************

       

        // Destroy resources.

        try{                       

            if( destination != NULL ) delete destination;

        }catch (CMSException& e) {}

        destination = NULL;

       

        try{

            if( consumer != NULL ) delete consumer;

        }catch (CMSException& e) {}

        consumer = NULL;

       

        // Close open resources.

        try{

            if( session != NULL ) session->close();

            if( connection != NULL ) connection->close();

        }catch (CMSException& e) {}

       

        // Now Destroy them

        try{

            if( session != NULL ) delete session;

        }catch (CMSException& e) {}

        session = NULL;

       

        try{

            if( connection != NULL ) delete connection;

        }catch (CMSException& e) {}

        connection = NULL;

    }

};

   

int main(int argc, char* argv[]) {

 

    std::cout << "=====================================================/n";   

    std::cout << "Starting the example:" << std::endl;

    std::cout << "-----------------------------------------------------/n";

 

    //============================================================

    // set to true to use topics instead of queues

    // Note in the code above that this causes createTopic or

    // createQueue to be used in both consumer an producer.

    //============================================================   

    bool useTopics = false; 

 

    HelloWorldProducer producer( 1000, useTopics );

    HelloWorldConsumer consumer( 8000, useTopics );

   

    // Start the consumer thread.

    Thread consumerThread( &consumer );

    consumerThread.start();

   

    // Start the producer thread.

    Thread producerThread( &producer );

    producerThread.start();

 

    // Wait for the threads to complete.

    producerThread.join();

    consumerThread.join();

 

    std::cout << "-----------------------------------------------------/n";   

    std::cout << "Finished with the example, ignore errors from this"

              << std::endl

              << "point on as the sockets breaks when we shutdown."

              << std::endl;

    std::cout << "=====================================================/n";   

}

相關文章
相關標籤/搜索