Rabbitmq中文Java客戶端API指南

Java客戶端API指南

本指南涵蓋了RabbitMQ Java客戶端及其公共API。它假定使用最近的主要版本的客戶端,讀者熟悉基礎知識html

該庫的5.x版本系列須要JDK 8,用於編譯和運行時。在Android上,這意味着只支持Android 7.0或更高版本。4.x版本系列支持7.0以前的JDK 6和Android版本。java

該庫是開源的,在GitHub上開發,並在三重許可下android

· Apache公共許可證2.0git

· Mozilla公共許可證github

· GPL 2.0spring

這意味着用戶能夠考慮使用上述列表中的任何許可證進行許可。例如,用戶能夠選擇Apache Public License 2.0並將該客戶端包含到商業產品中。根據GPLv2許可的代碼庫能夠選擇GPLv2等。apache

還有一些 與Java客戶端一塊兒提供的命令行工具編程

客戶端API在AMQP 0-9-1協議模型上進行了嚴格建模,並提供了更多的抽象以便於使用。後端

一個API參考(JavaDoc的)是單獨提供的。api

概觀

RabbitMQ Java客戶端使用com.rabbitmq.client做爲其頂層包。關鍵類和接口是:

· Channel:表示AMQP 0-9-1通道,並提供大部分操做(協議方法)。

· Connection:表明AMQP 0-9-1鏈接

· ConnectionFactory:構造鏈接實例

· Consumer:表明消息消費者

· DefaultConsumer:消費者經常使用的基類

· BasicProperties:消息屬性(元數據)

· BasicProperties.Builder:建設者BasicProperties

協議操做可經過 Channel接口得到。Connection用於打開Channel,註冊鏈接生命週期事件處理程序,並關閉再也不須要的鏈接。 Connection經過ConnectionFactory實例化,這就是您如何配置各類鏈接設置,如虛擬主機或用戶名。

鏈接和頻道

核心API類是Connection 和Channel,分別表明AMQP 0-9-1 connection 和Channel。它們一般在使用前進口:

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.Channel;

鏈接到RabbitMQ

如下代碼使用給定參數(host name, port number, etc)鏈接到RabbitMQ節點:

ConnectionFactory factory = new ConnectionFactory();

//  默認帳號密碼|「guest,限於本地鏈接 

factory.setUsername(userName);

factory.setPassword(password);

factory.setVirtualHost(virtualHost);

factory.setHost(hostName);

factory.setPort(portNumber);

 

Connection conn = factory.newConnection();

全部這些參數都對本地運行的RabbitMQ節點具備合理的默認值。若是在建立鏈接以前屬性保持未分配狀態,將使用屬性的默認值:

屬性

默認值

Username

"guest"

Password

"guest"

Virtual host

"/"

Hostname

"localhost"

port

5672用於常規鏈接, 5671用於使用TLS的鏈接

或者,可使用URI

ConnectionFactory factory = new ConnectionFactory();

factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");

Connection conn = factory.newConnection();

全部這些參數都對本地運行的RabbitMQ服務器有合理的默認值。

請注意,用戶guest只能默認從本地主機鏈接。這是爲了限制生產系統中衆所周知的憑證使用。

使用Connection接口建立channel:

Channel channel = conn.createChannel();

如今可使用channel發送和接收消息,如後面的部分所述。

在服務器節點日誌中 能夠觀察到成功和不成功的客戶端鏈接事件。

斷開與RabbitMQ的鏈接

要斷開鏈接,只需關閉通道和鏈接:

channel.close();

conn.close();

請注意,關閉頻道可能被認爲是很好的作法,但在這裏並非必須的 - 當底層鏈接關閉時,它將自動完成。

客戶端斷開事件能夠在服務器節點日誌中觀察到

ConnectionChannel週期

Connection意味着週期較長。底層協議針對長時間運行的鏈接進行設計和優化。這意味着每一個操做打開一個新的鏈接,例如發佈的消息是沒必要要的,而且強烈不鼓勵,由於它會引入大量的網絡往返和開銷。

Channel也意味着週期較長,但因爲許多可恢復的協議錯誤會致使頻道關閉,因此頻道使用壽命可能會比鏈接頻率短。每次操做關閉和打開新頻道一般是沒必要要的,但能夠適當。若有疑問,請考慮重複使用channel第一。

Channel級異常(例如嘗試從不存在的隊列中消耗)將致使通道關閉。已關閉的頻道不能再使用,而且不會再收到來自服務器的更多事件(如消息傳遞)。Channel級異常將由RabbitMQ記錄並啓動通道的關閉序列(見下文)。

使用交換機(Exchanges隊列(Queues 

客戶端應用程序與協議的高級構建塊交換和隊列一塊兒工做。這些必須在可使用以前進行聲明。聲明任何一種類型的對象只是確保其中一個名稱存在,並在必要時建立它。

繼續前面的例子,下面的代碼聲明瞭一個exchange和一個queue,而後將它們綁定在一塊兒。

channel.exchangeDeclare(exchangeName, "direct", true);

String queueName = channel.queueDeclare().getQueue();

channel.queueBind(queueName, exchangeName, routingKey);

這將主動聲明如下對象,這兩個對象均可以經過使用其餘參數進行定製。這裏他們都沒有任何特別的論點。

1. 一個持久的,非自動刪除的「direct」類型的交換

2. 一個具備生成名稱的非持久,獨佔,自動刪除隊列

上面的函數調用而後使用給定的路由密鑰(routing key)將隊列綁定到交換機。

請注意,當只有一個客戶端想要使用它時,這將是一種典型的聲明方式:它不須要知名的名稱,沒有其餘客戶端可使用它(獨佔),而且會自動清除(自動刪除)。若是有幾個客戶想共享一個知名名稱的隊列,那麼這個代碼將是合適的:

channel.exchangeDeclare(exchangeName,「direct」,true);

channel.queueDeclare(queueName,true,false,false,null);

channel.queueBind(queueName,exchangeName,routingKey);

聲明:

1. 一個持久的,非自動刪除的「direct」類型的交換

2. 一個持久的,非獨佔,自動刪除隊列

許多Channel API方法被重載。這些便捷的ExchangeDeclare,queueDeclare和queueBind短格式 使用合理的默認值。還有更多的參數更多的表單,能夠根據須要覆蓋這些默認值,在須要的地方提供徹底控制。

這種「簡單形式,長形式」模式在客戶端API使用中使用。

一些常見操做還有一個「不等待」版本,不會等待服務器響應。例如,要聲明一個隊列並指示服務器不發送任何響應,請使用

channel.queueDeclareNoWait(queueName,true,false,false,null);

「不等待」版本更高效,但提供較低的安全保證,例如,它們更依賴於檢測失敗操做的心跳機制。若有疑問,請從標準版開始。只有在高拓撲(隊列,綁定)流失的場景中才須要「無等待」版本。

隊列或交換能夠被明確刪除:

channel.queueDelete("queue-name")

只有在隊列爲空時才能刪除隊列:

channel.queueDelete(「queue-name」,false,true)

或者若是沒有使用(沒有任何消費者):

channel.queueDelete(「queue-name」,true,false)

能夠清除隊列(刪除全部消息):

channel.queuePurge(「隊列名稱」)

發佈消息

要將消息發佈到交易所,請按以下方式使用Channel.basicPublish:

byte[] messageBodyBytes = "Hello, world!".getBytes();

channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

爲了進行良好的控制,您可使用重載的變體來指定強制標誌,或使用預設的消息屬性發送消息:

channel.basicPublish(exchangeName, routingKey, mandatory,

                    MessageProperties.PERSISTENT_TEXT_PLAIN,

                     messageBodyBytes);

這將發送帶有交付模式2(持久性),優先級1和內容類型「text / plain」的消息。你可使用一個Builder類來構建你本身的消息屬性對象,只要你喜歡就能夠提供許多屬性,例如:

channel.basicPublish(exchangeName, routingKey,

             new AMQP.BasicProperties.Builder()

               .contentType("text/plain")

               .deliveryMode(2)

               .priority(1)

               .userId("bob")

               .build()),

               messageBodyBytes);

本示例使用自定義標題發佈消息:

Map<String, Object> headers = new HashMap<String, Object>();

headers.put("latitude",  51.5252949);

headers.put("longitude", -0.0905493);

 

channel.basicPublish(exchangeName, routingKey,

             new AMQP.BasicProperties.Builder()

               .headers(headers)

               .build()),

               messageBodyBytes);

本示例發佈包含過時(expiration)的消息:

channel.basicPublish(exchangeName, routingKey,

             new AMQP.BasicProperties.Builder()

               .expiration("60000")

               .build()),

               messageBodyBytes);

咱們沒有在這裏說明全部的可能性。

請注意,BasicProperties是自動生成的持有AMQP的內部類。

Channel#basicPublish的 調用最終會阻止 資源驅動型警報生效。

通道Channels 和併發注意事項(線程安全)

做爲一個經驗法則,在線程之間共享Channel實例是須要避免的。應用程序應該更喜歡使用每一個線程的通道,而不是在多個線程之間 共享同一個通道

儘管通道上的某些操做能夠安全地同時調用,但有些操做不會而且會致使不正確的幀交錯,雙重確認等。

在共享通道上同時發佈可能會致使連線上的幀錯誤交錯,觸發鏈接級別的協議異常並由代理當即關閉鏈接。所以它須要在應用程序代碼中進行明確的同步(Channel#basicPublish必須在關鍵部分中調用)。在線程之間共享頻道也會干擾發佈商確認。最好避免在共享通道上同時發佈,例如經過使用每一個線程的通道。

可使用通道池來避免在共享通道上同時發佈:一旦線程完成一個通道的處理,它就會將其返回到池中,從而使該通道可用於另外一個線程。通道池能夠被認爲是一個特定的同步解決方案。建議使用現有的共享庫來代替自行開發的解決方案。例如,Spring AMQP 具備即用型通道池功能。

通道消耗資源,在大多數狀況下,應用程序在同一個JVM進程中不多須要超過幾百個開放通道。若是咱們假設應用程序對每一個通道都有一個線程(由於不該該同時使用通道),那麼單個JVM的數千個線程已經有至關可觀的開銷,這多是能夠避免的。此外,一些快速發佈商能夠輕鬆地使網絡接口和代理節點飽和:發佈涉及的工做量少於路由,存儲和傳遞消息的工做量。

要避免的典型反模式是爲每一個發佈的消息打開一個頻道。渠道應該是至關長壽的,打開一個新渠道是一個網絡往返,這使得這種模式很是低效。

在一個線程中使用並在共享通道上的另外一個線程中發佈多是安全的。

服務器推送的交付(請參見下面的部分)與保證每通道排序被保留的保證同時進行分派。調度機制使用java.util.concurrent.ExecutorService,每一個鏈接一個。能夠提供一個自定義執行程序,該自定義執行程序將由使用 ConnectionFactory#setSharedExecutor設置程序的單個ConnectionFactory生成的全部鏈接共享。

使用手動確認時,重要的是要考慮哪些線程進行確認。若是它與接收交付的線程不一樣(例如Consumer#handleDelivery 委託交付處理到另外一個線程),則將multiple參數設置爲true進行確認是不安全的,將致使雙重確認,並所以致使通道級協議異常關閉頻道。一次確認一條消息多是安全的。

經過訂閱接收消息(「推送API」)

import com.rabbitmq.client.Consumer;

import com.rabbitmq.client.DefaultConsumer;

接收消息的最有效方式是使用Consumer 界面設置訂閱。消息將在到達時自動發送,而不是必須明確要求。

在調用與Consumer相關的API方法時 ,我的訂閱始終由其消費者標籤引用。消費者標籤是消費者標識符,能夠是客戶端或服務器生成的。要讓RabbitMQ生成節點範圍的惟一標記,請使用Channel#basicConsume覆蓋,該覆蓋不會接收使用者標記參數,也不會傳遞消費者標記的空字符串,並使用Channel#basicConsume返回的值。消費者標籤用於取消消費者。

不一樣的消費者實例必須具備不一樣的消費者標籤。強烈建議在鏈接上重複使用消費者標籤,而且可能會致使自動鏈接恢復問題,並在監控消費者時混淆監控數據。

實現Consumer的最簡單方法是爲便利類DefaultConsumer建立子類。該子類的一個對象能夠經過basicConsume 調用來設置訂閱:

boolean autoAck = false;

channel.basicConsume(queueName, autoAck, "myConsumerTag",

     new DefaultConsumer(channel) {

         @Override

         public void handleDelivery(

String consumerTag,

                        Envelope envelope,

                        AMQP.BasicProperties properties,

                        byte[] body)

             throws IOException

         {

             String routingKey = envelope.getRoutingKey();

             String contentType = properties.getContentType();

             long deliveryTag = envelope.getDeliveryTag();

             //(處理消息組件在這裏...) 

             channel.basicAck(deliveryTag, false);

         }

     });

在這裏,由於咱們指定了autoAck = false,確認傳遞給消費者的消息,最簡單的 方法是在handleDelivery方法中完成,如圖中所示。

更復雜的消費者將須要覆蓋更多的方法。特別是,handleShutdownSignal 當通道和鏈接關閉被調用,handleConsumeOk傳遞消費者標籤的任何其餘回調到以前消費者被調用。

消費者也能夠分別實現 handleCancelOk和handleCancel 方法來通知顯式和隱式取消。

您可使用 Channel.basicCancel明確取消特定的消費者:

channel.basicCancel(consumerTag);

經過消費者標籤。

就像出版商同樣,爲消費者考慮併發危害安全也很重要。

對消費者的回調被調度到與實例化其通道的線程分離的線程池中 。這意味着消費者能夠安全地調用Connection或Channel上的阻塞方法 ,例如 Channel#queueDeclare或 Channel#basicCancel。

每一個通道都有本身的調度線程。對於每一個 頻道一個消費者最多見的使用狀況,這意味着消費者不支持其餘消費者。若是每一個頻道有多個消費者,請注意,長時間運行的消費者可能會阻止向該頻道上的 其餘消費者發送回調 。

有關併發性和併發性危害安全性的其餘主題,請參閱併發注意事項(線程安全性)部分。

檢索單個消息(「Pull API」)

要顯式檢索消息,請使用 Channel.basicGet。返回的值是GetResponse的一個實例,從中能夠提取標題信息(屬性)和消息正文:

boolean autoAck = false ;

GetResponse response = channel.basicGet(queueName, autoAck);

if (response == null) {

     //沒有檢索到消息。

} else {

    AMQP.BasicProperties props = response.getProps();

    byte[] body = response.getBody();

    long deliveryTag = response.getEnvelope().getDeliveryTag();

    ...

 

    

而且因爲上面的autoAck = false,您還必須調用Channel.basicAck來確認您已成功接收消息:

    ...

    channel.basicAck(method.deliveryTag,false); //確認收到消息 

}

處理不可路由的消息

若是消息發佈時設置了「強制(mandatory)」標誌,但沒法路由,代理會將其返回給發送客戶端(經過AMQP.Basic.Return 命令)。

通知這樣的回報,客戶能夠實現ReturnListener 接口並調用Channel.addReturnListener。若是客戶端還沒有配置特定通道的返回偵聽器,則相關的返回消息將被靜默放棄。

channel.addReturnListener(new ReturnListener() {

    public void handleReturn(int replyCode,

                                  String replyText,

                                  String exchange,

                                  String routingKey,

                                  AMQP.BasicProperties properties,

                                  byte[] body)

    throws IOException {

        ...

    }

});

例如,若是客戶端發佈的消息的「mandatory」標誌設置爲未綁定到隊列的「direct」類型的交換,則會調用返回監聽器。

關機協議

客戶端關機過程概述

AMQP 0-9-1鏈接和通道共享相同的通常方法來管理網絡故障,內部故障和明確的本地關閉。

AMQP 0-9-1鏈接和通道具備如下生命週期狀態:

· open:對象已準備好使用

· closing:對象已明確通知本地關閉,已向任何支持的下層對象發出關閉請求,而且正在等待其關閉過程完成

· closed:對象已收到來自任何底層對象的全部關閉完成通知,所以已關閉

這些對象老是處於關閉狀態,不管致使關閉的緣由如應用程序請求,內部客戶端庫故障,遠程網絡請求仍是網絡故障。

AMQP鏈接和通道對象具備如下與關機相關的方法:

· addShutdownListener(ShutdownListener listener)和 removeShutdownListener(ShutdownListener listener)來管理任何偵聽器,當對象轉換到關閉(closing)狀態時將會觸發這些偵聽器 。請注意,將ShutdownListener添加到已關閉的對象將當即觸發偵聽器

· getCloseReason(),以容許調查對象關閉的緣由

· isOpen(),用於測試對象是否處於打開狀態

· close(int closeCode,String closeMessage),以顯式通知要關閉的對象

監聽的簡單用法以下所示:

import com.rabbitmq.client.ShutdownSignalException;import com.rabbitmq.client.ShutdownListener;

connection.addShutdownListener(new ShutdownListener() {

    public void shutdownCompleted(ShutdownSignalException cause)

    {

        ...

    }

});

關於關機狀況的信息

能夠經過顯式調用getCloseReason() 方法或使用ShutdownListener類的服務中的cause參數(ShutdownSignalException cause) 方法來檢索 ShutdownSignalException,其中包含關於關閉緣由的全部可用信息。

該ShutdownSignalException類提供方法來分析關機的緣由。經過調用isHardError()方法,咱們能夠得到有關鏈接或通道錯誤的信息,getReason()以AMQP方法的形式返回有關緣由的信息 -AMQP.Channel.Close或 AMQP.Connection.Close(若是緣由是庫中的某個異常(例如網絡通訊故障),則返回null,在這種狀況下,可使用getCause()檢索異常。

public void shutdownCompleted(ShutdownSignalException cause){

  if (cause.isHardError())

  {

    Connection conn = (Connection)cause.getReference();

    if (!cause.isInitiatedByApplication())

    {

      Method reason = cause.getReason();

      ...

    }

    ...

  } else {

    Channel ch = (Channel)cause.getReference();

    ...

  }

}

  

原子性和使用isOpen()方法

不建議在生產代碼中使用通道和鏈接對象 的isOpen()方法,由於方法返回的值取決於關閉緣由的存在。如下代碼說明了競爭條件的可能性:

public void brokenMethod(Channel channel){

    if (channel.isOpen())

    {

 //下面的代碼依賴於處於打開狀態的通道。//可是沒有在信道狀態變化的可能性ISOPEN()和basicQos(1)呼叫之間// 

       ...

        channel.basicQos(1);

    }

}  

相反,咱們一般應該忽略這種檢查,並簡單地嘗試所需的行動。若是在代碼的執行過程當中鏈接的通道關閉,則會引起ShutdownSignalException異常,指示對象處於無效狀態。咱們還應該捕獲 由SocketException引發的IOException,當代理意外關閉鏈接時,或者在代理啓動clean close時發生ShutdownSignalException。

public  void  validMethod (Channel channel) {

     try {

        ...

        channel.basicQos( 1);

    } catch(ShutdownSignalException sse){

         //可能檢查頻道是否被關閉

        //當咱們開始操做時,

        //關閉它的緣由 

        ...

    } catch(IOException ioe){

         //檢查鏈接關閉的緣由 

        ...

    }

}

高級鏈接選項

消費者線程池

消費者線程(請參閱下面的接收)默認狀況下會自動分配到新的ExecutorService線程池中。若是須要更大的控制權,請在newConnection()方法上 提供ExecutorService,以便使用此線程池。下面是一個例子,其中提供了比一般分配的更大的線程池:

 ExecutorService es = Executors.newFixedThreadPool(20);

  Connection conn = factory.newConnection(es);

不管執行人及的ExecutorService類中的java.util.concurrent包。

當鏈接關閉時,默認的ExecutorService 將被shutdown(),但用戶提供的 ExecutorService(如上面的es) 不會被shutdown()。提供定製ExecutorService的客戶端必須確保它最終關閉(經過調用其shutdown() 方法),不然池的線程可能會阻止JVM終止。

同一個執行者服務能夠在多個鏈接之間共享,或者在從新鏈接時被重複使用,可是在關閉後它不能使用()。

若是有證據代表消費者 回調處理中存在嚴重瓶頸,則應僅考慮使用此功能。若是沒有消費者回調執行,或者不多,默認分配綽綽有餘。開銷最小,而且分配的總線程資源是有界的,即便偶爾會出現一連串的消費者活動。

使用主機列表

能夠將Address數組傳遞給newConnection()。的地址是簡單地在一個方便的類com.rabbitmq.client包與主機 和端口組件。例如:

 Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1), new Address(hostname2, portnumber2)};

  Connection conn = factory.newConnection(addrArr);

將嘗試鏈接到hostname1:portnumber1,而且若是沒法鏈接到hostname2:portnumber2。返回的鏈接是數組中的第一個成功(不拋出 IOException)。這徹底等同於重複設置工廠的主機和端口,每次都調用factory.newConnection(),直到其中一個成功。

若是還提供了ExecutorService(使用表單factory.newConnection(es,addrArr)),則線程池將與(第一個)成功鏈接相關聯。

若是您想要更多地控制主機鏈接到,請參閱 對服務發現的支持

使用AddressResolver接口進行服務發現

從版本3.6.6開始,可讓AddressResolver的實現 在建立鏈接時選擇鏈接的位置:

  Connection conn = factory.newConnection(addressResolver);

該AddressResolver接口是這樣的:

  public interface AddressResolver {

    List<Address> getAddresses() throws IOException;

  }

就像主機列表同樣,返回的第一個地址將首先嚐試,而後第二個地址返回,若是客戶端沒法鏈接到第一個地址,依此類推。

若是還提供了ExecutorService(使用表單factory.newConnection(es,addressResolver)),則線程池將與(第一個)成功鏈接相關聯。

該AddressResolver是實現定製服務發現邏輯,這是一個動態的基礎設施特別有用的理想場所。結合自動恢復功能,客戶端能夠自動鏈接到第一次啓動時還沒有達到的節點。親和性和負載平衡是其中可使用自定義AddressResolver的其餘場景。

Java客戶端隨附如下實現(有關詳細信息,請參閱javadoc):

1. DnsRecordIpAddressResolver:給定主機的名稱,返回其IP地址(針對平臺DNS服務器的分辨率)。這對於簡單的基於DNS的負載平衡或故障轉移頗有用。

2. DnsSrvRecordAddressResolver:給定服務的名稱,返回主機名/端口對。搜索被實現爲DNS SRV請求。當使用像HashiCorp Consul這樣的服務註冊表時,這可能頗有用 。

心跳超時

有關檢測信號以及如何在Java客戶端中配置它們的更多信息,請參閱Heartbeats指南

自定義線程工廠

諸如Google App Engine(GAE)等環境能夠限制直接線程實例化。要在這樣的環境中使用RabbitMQ Java客戶端,有必要配置一個自定義的ThreadFactory,它使用適當的方法來實例化線程,例如GAE的ThreadManager。如下是Google App Engine的一個示例。

import com.google.appengine.api.ThreadManager;

ConnectionFactory cf = new ConnectionFactory();

cf.setThreadFactory(ThreadManager.backgroundThreadFactory();

支持Java非阻塞IO

Java客戶端4.0版爲Java非阻塞IO(又名Java NIO)帶來實驗性支持。NIO不必定比堵塞IO更快,它只是容許更容易地控制資源(在這種狀況下,線程)。

在默認的阻塞IO模式下,每一個鏈接使用一個線程從網絡套接字讀取。使用NIO模式,您能夠控制從網絡套接字讀寫的線程數。

若是Java進程使用許多鏈接(數十或數百),請使用NIO模式。您應該使用比使用默認阻止模式更少的線程。經過設置適當的線程數量,您不該該嘗試下降性能,特別是在鏈接不太忙時。

NIO必須明確啓用:

ConnectionFactory connectionFactory = new ConnectionFactory();

connectionFactory.useNio();

NIO模式能夠經過NioParams類來配置:

connectionFactory.setNioParams(new NioParams().setNbIoThreads(4));

NIO模式使用合理的默認值,但您可能須要根據您本身的工做負載進行更改。其中一些設置是:使用的IO線程總數,緩衝區大小,用於IO循環的服務執行程序,內存寫入隊列的參數(寫請求在網絡上發送以前已排隊)。請閱讀Javadoc瞭解詳情和默認值。

從網絡故障中自動恢復

鏈接恢復

客戶端和RabbitMQ節點之間的網絡鏈接可能會失敗。RabbitMQ Java客戶端支持鏈接和拓撲(隊列,交換,綁定和使用者)的自動恢復。許多應用程序的自動恢復過程遵循如下步驟:

1. 從新鏈接

2. 還原鏈接偵聽器

3. 從新開放頻道

4. 還原通道偵聽器

5. 恢復頻道basic.qos設置,發行商確認和交易設置

拓撲恢復包括爲每一個通道執行的如下操做

1. 從新申報交換機(除了預約義的)

2. 從新申報隊列

3. 恢復全部綁定

4. 恢復全部消費者

從Java客戶端的4.0.0版開始,默認狀況下啓用自動恢復(所以也是拓撲恢復)。

拓撲恢復依賴於實體(隊列,交換,綁定,使用者)的每一個鏈接緩存。當鏈接聲明一個隊列時,它將被添加到緩存中。當它被刪除或計劃刪除(例如,由於它被自動刪除)它將被刪除。這個模型有一些侷限在下面。

要禁用或啓用自動鏈接恢復,請使用factory.setAutomaticRecoveryEnabled(boolean) 方法。如下片斷顯示瞭如何顯式啓用自動恢復(例如,對於Java 4.0.0以前的客戶端):

ConnectionFactory factory = new ConnectionFactory();

factory.setUsername(userName);

factory.setPassword(password);

factory.setVirtualHost(virtualHost);

factory.setHost(hostName);

factory.setPort(portNumber);

factory.setAutomaticRecoveryEnabled(true);

//鏈接會自動恢復 

Connection conn = factory.newConnection();

若是因爲異常致使恢復失敗(例如,RabbitMQ節點仍然沒法訪問),它將在固定時間間隔後重試(默認爲5秒)。間隔能夠配置:

ConnectionFactory factory = new ConnectionFactory();

//每10秒嘗試恢復一次 

factory.setNetworkRecoveryInterval(10000);

當提供地址列表時,列表會被混淆,而且全部地址都會在下一個地址以後被嘗試:

ConnectionFactory factory = new ConnectionFactory();

Address[] addresses = {new Address("192.168.1.4"), new Address("192.168.1.5")};

factory.newConnection(addresses);

什麼時候會觸發鏈接恢復?

自動鏈接恢復(若是啓用)將由如下事件觸發:

· I / O異常在鏈接的I / O循環中拋出

· 套接字讀取操做超時

· 檢測到錯過的服務器心跳(超時)

· 鏈接的I / O循環中會引起任何其餘意外的異常

以先發生者爲準。

通道級別的異常不會觸發任何形式的恢復,由於它們一般表示應用程序中存在語義問題(例如嘗試從不存在的隊列中使用)。

恢復監聽器

能夠在可恢復的鏈接和通道上註冊一個或多個恢復監聽器。當啓用鏈接恢復時,由ConnectionFactory#newConnection和Connection#createChannel返回的 鏈接將 實現com.rabbitmq.client.Recoverable接口,提供兩個具備至關描述性名稱的方法:

· addRecoveryListener

· removeRecoveryListener

請注意,您目前須要將鏈接和頻道投射到Recoverable 才能使用這些方法。

對發佈的影響

鏈接斷開時 使用Channel.basicPublish發佈的消息將丟失。在鏈接恢復後,客戶端不會將它們排隊等待傳遞。爲了確保發佈的消息到達RabbitMQ應用程序須要使用Publisher確認 並考慮鏈接失敗。

拓撲恢復

拓撲恢復涉及恢復交換,隊列,綁定和消費者。當啓用自動恢復功能時,它默認啓用。所以,從Java客戶端4.0.0開始,默認啓用拓撲恢復。

若是須要,能夠顯式禁用拓撲恢復:

ConnectionFactory factory = new ConnectionFactory();

Connection conn = factory.newConnection();

 //啓用自動恢復(例如,先前的Java客戶端4.0.0) 

factory.setAutomaticRecoveryEnabled(true);

//禁用拓撲恢復 

factory.setTopologyRecoveryEnabled(false);

故障檢測和恢復限制

自動鏈接恢復具備許多應用程序開發人員須要注意的侷限性和故意設計決策。

拓撲恢復依賴於實體(隊列,交換,綁定,使用者)的每一個鏈接緩存。當鏈接聲明一個隊列時,它將被添加到緩存中。當它被刪除或計劃刪除(例如,由於它被自動刪除)它將被刪除。這使得能夠在不出現意外結果的狀況下在不一樣頻道上聲明和刪除實體。這也意味着使用自動鏈接恢復的鏈接上的全部通道上的消費者標記(通道專用標識符)必須是惟一的。

當鏈接中斷或丟失時,須要時間來檢測。所以,庫和應用程序都不知道有效的鏈接失敗。在這個時間段內發佈的任何消息都會像往常同樣序列化並寫入TCP套接字。他們只能經過發佈商確認向代理商交付:經過AMQP 0-9-1進行發佈徹底是異步設計。

當啓用了自動恢復功能的鏈接檢測到套接字或I / O操做錯誤時,缺省狀況下會在可配置延遲5秒後進行恢復。這種設計假定即便大量的網絡故障是短暫的而且一般很短暫,但它們不會當即消失。鏈接恢復嘗試將以相同的時間間隔繼續,直到成功打開新鏈接。

當鏈接處於恢復狀態時,任何在其頻道上嘗試發佈的內容都將被拒絕,並有異常。客戶端當前不執行此類傳出消息的任何內部緩衝。應用程序開發者有責任跟蹤這些消息並在恢復成功時從新發布它們。 發佈商確認是一種協議擴展,應該由發佈商不能承受消息丟失的狀況下使用。

因爲通道級別的異常致使通道關閉時,鏈接恢復不會啓動。這種例外一般表示應用程序級別的問題。目前(library)沒法就此狀況作出明智的決定。

即便在鏈接恢復啓動後,閉合通道也不會恢復。這包括明確關閉的通道和上面的通道級異常狀況。

手動確認和自動恢復

當使用手動確認時,在消息傳遞和確認之間,到RabbitMQ節點的網絡鏈接可能會失敗。鏈接恢復後,RabbitMQ將重置全部通道上的交付標籤。這意味着basic.ackbasic.nackbasic.reject 與舊的交付標籤將致使通道異常。爲了不這種狀況,RabbitMQ Java客戶端跟蹤並更新交付標籤,使它們在恢復之間單調增加。 Channel.basicAck, Channel.basicNack和 Channel.basicReject而後將調整後的交付標籤轉換爲RabbitMQ使用的標籤。帶有陳舊交付標籤的確認將不會發送。使用手動確認和自動恢復的應用程序必須可以處理從新投遞。

渠道Channels生命週期和拓撲恢復

對於應用程序開發人員來講,自動鏈接恢復應該儘量透明,這就是爲何Channel實例保持不變,即便多個鏈接失敗並在幕後恢復。從技術上講,當自動恢復打開時,Channel實例充當代理或裝飾器:他們將AMQP業務委託給實際的AMQP通道實現,並在其周圍實施一些恢復機制。這就是爲何當它建立了一些資源(隊列,交換,綁定)以後不該該關閉通道,或者這些資源的拓撲恢復稍後會失敗,由於通道已關閉。相反,應該在應用程序的生命週期中建立通道。

未處理的異常

與鏈接,通道,恢復和消費者生命週期相關的未處理異常委派給異常處理程序。異常處理程序是實現ExceptionHandler接口的任何對象 。默認狀況下,使用DefaultExceptionHandler的一個實例。它將異常詳細信息打印到標準輸出。

可使用ConnectionFactory#setExceptionHandler覆蓋處理程序 。它將用於工廠建立的全部鏈接:

ConnectionFactory factory = new ConnectionFactory();

cf.setExceptionHandler(customHandler);

異常處理程序應該用於異常記錄。

Metrics 性能監控Metrics and monitoring

從版本4.0.0開始,客戶端收集運行時指標(例如已發佈消息的數量)。度量標準集合是可選的,並使用setMetricsCollector(metricsCollector)方法在ConnectionFactory級別進行設置 。此方法須要一個MetricsCollector實例,該實例在客戶端代碼的多個位置中調用。

客戶端支持 Micrometer (截至版本4.3)和 Dropwizard Metrics開箱即用。

如下是收集的指標:

· 打開的鏈接數

· 開放頻道的數量

· 已發佈消息的數量

· 消費的消息數量

· 已確認消息的數量

· 被拒絕的信息數量

Micrometer和Dropwizard指標都提供計數,但也包括平均速率,最後五分鐘速率等與消息相關的指標。他們還支持常見的監控和報告工具(JMX,Graphite,Ganglia,Datadog等)。有關更多詳細信息,請參閱下面的專用章

請注意關於metrics collection的如下內容:

· 在使用Micrometer或Dropwizard指標時,不要忘記將適當的依賴關係(以Maven,Gradle或甚至JAR文件的形式)添加到JVM類路徑。這些是可選的依賴關係,不會隨Java客戶端自動拖動。您可能還須要添加其餘依賴項,具體取決於所使用的報告後端。

· metrics collection是可擴展的。鼓勵爲特定需求實施自定義 MetricsCollector。

· 所述MetricsCollector設置在ConnectionFactory,但能夠在不一樣的實例共享。

· metrics collection不支持事務。例如,若是在事務中發送確認而且事務被回滾,則確認在客戶metrics中被計數(顯然不是broker實體)。請注意,確認實際上發送給代理,而後經過事務回滾取消,所以客戶端指標在發送確認方面是正確的。總而言之,不要將客戶端指標用於關鍵業務邏輯,它們不能保證徹底準確。它們旨在簡化關於正在運行的系統的推理並使操做更高效。

Micrometer 支持

您能夠經過如下方式使用Micrometer 啓用metrics collection :

ConnectionFactory connectionFactory = new ConnectionFactory();

MicrometerMetricsCollector metrics = new MicrometerMetricsCollector();

connectionFactory.setMetricsCollector(metrics);

...

metrics.getPublishedMessages();//得到Micrometer的Counter對象

Micrometer支持 多種報告後端:Netflix Atlas,Prometheus,Datadog,Influx,JMX等。

您一般會將MeterRegistry的一個實例傳遞 給MicrometerMetricsCollector。這裏是JMX的一個例子:

JmxMeterRegistry registry = new JmxMeterRegistry();

MicrometerMetricsCollector metrics = new MicrometerMetricsCollector(registry);

ConnectionFactory connectionFactory = new ConnectionFactory();

connectionFactory.setMetricsCollector(metrics);

Dropwizard指標Metrics支持

您能夠經過如下方式使用Dropwizard啓用metrics collection :

ConnectionFactory connectionFactory = new ConnectionFactory();

StandardMetricsCollector metrics = new StandardMetricsCollector();

connectionFactory.setMetricsCollector(metrics);

...

metrics.getPublishedMessages();//得到Metrics的Meter對象

Dropwizard指標支持 多種報告後端:控制檯,JMX,HTTP,Graphite,Ganglia等。

您一般會將MetricsRegistry的實例傳遞 給StandardMetricsCollector。這裏是JMX的一個例子:

MetricRegistry registry = new MetricRegistry();

StandardMetricsCollector metrics = new StandardMetricsCollector(registry);

 

ConnectionFactory connectionFactory = new ConnectionFactory();

connectionFactory.setMetricsCollector(metrics);

 

JmxReporter reporter = JmxReporter

  .forRegistry(registry)

  .inDomain("com.rabbitmq.client.jmx")

  .build();

reporter.start();

          

Google App Engine上的RabbitMQ Java客戶端

在Google App Engine上使用RabbitMQ Java客戶端(GAE)須要使用自定義線程工廠,使用GAE的ThreadManager實例化線程(請參閱上文)。此外,有必要設置一個低心跳間隔(4-5秒),以免運行到低的InputStream上GAE讀超時:

ConnectionFactory factory = new ConnectionFactory();

cf.setRequestedHeartbeat(5);

        

警告和限制

爲了使拓撲恢復成爲可能,RabbitMQ Java客戶端維護已聲明的隊列,交換和綁定的緩存。緩存是按鏈接的。某些RabbitMQ功能使客戶沒法觀察一些拓撲變化,例如,當因爲TTL而刪除隊列時。RabbitMQ Java客戶端嘗試在最多見的狀況下使緩存條目無效:

· 當隊列被刪除時。

· 交換被刪除時。

· 當綁定被刪除。

· 消費者在自動刪除的隊列上取消時。

· 當隊列或交換機從自動刪除的交易所解除鎖定時。

可是,除了單個鏈接以外,客戶端沒法跟蹤這些拓撲變化。依賴自動刪除隊列或交換機以及隊列TTL(注意:不是消息TTL!)並使用自動鏈接恢復的應用程序應顯式刪除已知未使用或已刪除的實體,以清除客戶端拓撲高速緩存。這是經過促進通道#queueDelete, 通道#exchangeDelete,通道#queueUnbind和通道#exchangeUnbind 是冪等在RabbitMQ的3.3.x(刪除的內容不是有不致使異常)。

RPC(請求/回覆)模式:一個例子

爲了方便編程,Java客戶端API提供了一個類RpcClient,它使用臨時答覆隊列經過AMQP 0-9-1 提供簡單的RPC式通訊工具。

該類不會對RPC參數和返回值施加任何特定的格式。它只是提供了一種機制,使用特定的路由密鑰向給定的交換機發送消息,並等待回覆隊列上的響應。

import com.rabbitmq.client.RpcClient;

RpcClient rpc = new RpcClient(channel, exchangeName, routingKey);

(這個類如何使用AMQP 0-9-1的實現細節以下:請求消息是在 basic.correlation_id字段被設置爲這個RpcClient實例惟一的值的狀況下發送的,而且basic.reply_to被設置爲回覆隊列。)

一旦建立了此類的實例,就可使用它經過使用如下任何方法發送RPC請求:

byte[] primitiveCall(byte[] message);

String stringCall(String message)

Map mapCall(Map message)

Map mapCall(Object[] keyValuePairs)

該primitiveCall方法傳送原始字節數組做爲請求和響應機構。方法stringCall是一個簡單的primitiveCall簡便包裝器,它將消息體做爲默認字符編碼中的String實例處理。

該mapCall變種是有點更復雜的:它們編碼java.util.Map包含普通的Java值到AMQP 0-9-1二進制表表示,和解碼以一樣的方式迴應。(請注意,這裏可使用哪些值類型有一些限制 - 請參閱javadoc瞭解詳細信息。)

全部的marshalling/unmarshalling便利方法使用primitiveCall做爲傳輸機制,並在其上提供一個包裝層。

TLS支持

能夠使用TLS加密客戶端與代理之間的通訊 。客戶端和服務器認證(又名同行認證)也被支持。如下是對Java客戶端使用加密的最簡單方法:

ConnectionFactory factory = new ConnectionFactory();

factory.setHost(「localhost」);

factory.setPort(5671);

factory.useSslProtocol();

      

請注意,客戶端並未強制執行上述示例中的任何服務器身份驗證(對等證書鏈驗證)做爲缺省值,使用TrustManager的 「信任全部證書」 。這對本地開發很方便,但容易發生中間人攻擊,所以不推薦用於生產。要了解更多關於RabbitMQ中TLS支持的信息,請參閱TLS指南。若是您只想配置Java客戶端(尤爲是對等驗證和信任管理器部分),請閱讀TLS指南的相應部分

相關文章
相關標籤/搜索