本文翻譯彙總自rabbitmq的官方文檔。 java
翻譯使用谷歌翻譯後簡單修改,部份內容讀起來仍然比較晦澀,不過意思傳達到了。 數據庫
本頁介紹瞭如何使用AMQP和RabbitMQ的各類功能來實現可靠的傳送 - 確保消息始終被傳遞,甚至在系統的任何部分遇到故障。 express
網絡問題多是最多見的失敗類。網絡不只可能出現故障,防火牆能夠中斷空閒鏈接,而且不會當即檢測到網絡故障。 服務器
除了鏈接故障以外,broker和客戶端應用程序可能會隨時遇到硬件故障(或軟件崩潰)。此外,即便客戶端應用程序持續運行,邏輯錯誤也可能致使通道或鏈接錯誤,迫使客戶端創建新的通道或鏈接,並從問題中恢復。 網絡
在鏈接失敗的狀況下,客戶端將須要與broker創建新的鏈接。之前鏈接中打開的任何通道都將自動關閉,這些通道也須要從新打開。 併發
通常來講,當鏈接失敗時,客戶端將被鏈接引起異常(或相似的語言結構)通知。官方Java和.NET客戶端還提供了回調方法,讓您聽到其餘上下文中的鏈接失敗 - Java在Connection和Channel類上都提供了ShutdownListener回調,.NET客戶端提供了IConnection.ConnectionShutdown和IModel.ModelShutdown事件目的。 異步
當鏈接失敗時,消息可能在客戶端和服務器之間傳輸 - 它們可能處於被解析或生成的中間,在OS緩衝區或電線上。傳輸中的消息將丟失 - 它們將須要重傳。Acknowledgements讓服務器和客戶端知道什麼時候這樣作。 性能
Acknowledgements能夠在兩個方向使用 - 容許消費者向服務器指示它已經接收/處理了消息,並容許服務器向生產者指示相同的東西。 RabbitMQ將後一種狀況稱爲"confirm"。 測試
固然,TCP確保已經接收到數據包,而且將從新發送,直到它們 - 但這只是網絡層。Acknowledgements和confirm代表已收到消息並採起行動。confirm信號表示接收到消息,而且轉讓全部權,接收方承擔所有責任。 優化
Acknowledgements所以具備語義 - 消費的應用程序不該該confirm消息,直到它完成了與它們須要的任何操做 - 將它們記錄在數據庫中,轉發它們,將它們打印到紙張或其餘任何東西上。一旦這樣作,broker能夠自由地忘記該消息。
一樣,broker一旦承擔責任,就會confirm消息(見這裏是什麼意思)。
confirm的使用保證至少一次Delivery。沒有confirm,在發佈和消費操做期間可能發生消息丟失,而且只有最多的一次Delivery才能獲得保證。
用心跳檢測死TCP鏈接
在某些類型的網絡故障中,數據包丟失可能意味着中斷的TCP鏈接須要較長時間(例如,在Linux上使用默認配置約11分鐘)才能被操做系統檢測到。 AMQP 0-9-1提供心跳功能,以確保應用程序層及時發現鏈接中斷(以及徹底無響應的對等體)。心跳也能夠防止可能終止"空閒"TCP鏈接的某些網絡設備。有關詳細信息,請參閱心跳。
爲了不在broker中丟失消息,咱們須要應對broker從新啓動,broker硬件故障,甚至是甚至broker崩潰。
爲了確保從新啓動時消息和broker定義生效,咱們須要確保它們在磁盤上。 AMQP標準具備交換,隊列和持久消息的耐久性概念,要求持久對象或持久消息將在從新啓動後生存。有關持久性和持久性的具體標誌的更多詳細信息,請參見"AMQP概念指南"。
若是咱們須要確保咱們的broker倖存硬件故障,咱們可使用RabbitMQ的集羣。在RabbitMQ集羣中,全部定義(交換,綁定,用戶等)都跨整個集羣鏡像。隊列的行爲方式不一樣,默認狀況下只駐留在單個節點上,但能夠跨多個或全部節點進行鏡像。隊列保持可見,而且能夠從全部節點訪問,不管它們位於何處。
鏡像隊列在全部已配置的集羣節點之間複製其內容,能夠無縫地容忍節點故障,而且不會丟失消息(儘管請參閱非同步從站上的此註釋)。然而,消費應用程序須要注意,當隊列失敗時,消費者將被取消,他們將須要從新考慮 - 有關詳細信息,請參閱文檔。
當使用confirms時,從通道恢復的生產者或鏈接故障應重發任何還沒有從broker收到confirm的消息。這裏存在消息重複的可能性,由於broker可能發送了一個從未到達生產者的confirm(因爲網絡故障等)。所以,消費者應用程序將須要以冪等(重複執行的效果一致)方式執行重複數據刪除或處理傳入的消息。
在某些狀況下,生產者可能很重要的是確保他們的消息被路由到隊列(儘管並不老是 - 在公共子系統生產者只會發佈的狀況下,若是沒有消費者感興趣,那麼消息是正確的丟棄)。
爲了確保消息被路由到一個已知的隊列,生產者只能聲明一個目標隊列並直接發佈給它。若是消息可能以更復雜的方式進行路由,可是生產者仍然須要知道他們是否到達了至少一個隊列,則能夠在basic.publish上設置mandatory標誌,確保basic.return(包含回覆碼和一些文本解釋)將被髮送回客戶端,若是沒有隊列被適當地綁定。
在網絡故障(或節點崩潰)的狀況下,可能消息重複,消費者必須準備好處理它們。若是可能,最簡單的方法是確保您的消費者以冪等方式處理消息,而不是明確處理重複數據消除。
若是消費者肯定它不能處理消息,那麼它可使用basic.reject(或basic.nack)拒絕它,要求服務器從新啓動它(在這種狀況下,服務器可能被配置爲死信)代替。
介紹
使用消息傳遞broker(如RabbitMQ)的系統按照定義分佈。因爲發送的協議方法(消息)不能保證到達對等體或被其成功處理,因此發佈者和消費者都須要一種用於傳送和處理confirm的機制。 RabbitMQ支持的幾種消息協議提供了這樣的功能。
(消費者)DeliveryAcknowledgements
當RabbitMQ向消費者發送消息時,須要知道什麼時候成功發送消息。什麼樣的邏輯優化取決於系統。所以,它主要是應用程序的決定。
在咱們繼續討論其餘主題以前,重要的是要解釋Delivery是如何被識別的(並且confirm代表他們各自的Delivery)。當消費者(訂閱)註冊時,消息將由RabbitMQ使用basic.deliver方法傳遞(推送)。該方法攜帶Deliverytags,其惟一地標識信道上的傳遞。
Deliverytags是單調增加的正整數,並由客戶端庫呈現。認可Delivery的客戶端庫方法將Deliverytags做爲參數。
因爲消息以異步方式發送(推送)到客戶端,所以一般在任何給定時刻一般會有多個消息"在飛行中"。此外,客戶端的手動confirm本質上也是異步的。因此有一個未被confirm的Deliverytags的滑動窗口。開發人員一般會傾向於限制此窗口的大小,以免消費者端端的無限緩衝區問題。這是經過使用basic.qos方法設置"預取計數"值來完成的。該值定義了通道上容許的未confirmDelivery的最大數量。一旦數量達到配置的計數,RabbitMQ將中止在通道上傳遞更多消息,除非至少有一個未confirm的消息被confirm。
例如,鑑於在通道Ch上未confirm的Deliverytags5,6,7和8設置爲4,RabbitMQ不會再推送任何更多的Delivery,除非至少有一個未完成的Delivery被confirm。當經過delivery_tag設置爲8的confirm幀到達該通道時,RabbitMQ將會注意並傳遞一條消息。
值得重申的是,Delivery流程和手動客戶端confirm徹底是異步的。所以,若是在飛行中已經有Delivery時改變了預取值,則出現天然競爭條件,而且可能暫時超過在通道上預取計數未confirm的消息。
能夠爲通道或消費者配置QoS設置。有關詳細信息,請參閱消費者預取。
即便在手動confirm模式下,QoS設置也不會影響使用basic.get("pull API")獲取的消息。
使用標準AMQP 0-9-1,保證消息不丟失的惟一方法是使用事務 - 使信道事務發佈,發佈消息,提交。在這種狀況下,交易是沒必要要的重量級,並將吞吐量下降250倍。爲了彌補這一點,引入了confirm機制。它模仿了協議中已經存在的消費者confirm機制。
要啓用confirm,客戶端發送confirm.select方法。根據是否設置不等待,broker能夠經過confirm.select-ok進行回覆。一旦在通道上使用了confirm.select方法,就被認爲處於confirm模式。事務通道不能進入confirm模式,一旦通道處於confirm模式,則不能進行事務處理。
一旦一個通道處於confirm模式,broker和客戶端都會計數消息(從第一個confirm.select開始計數)。而後,broker經過在同一個頻道上發送basic.ack來confirm消息。發送tags字段包含已confirm消息的序列號。broker還能夠在basic.ack中設置多個字段,以指示全部到達幷包含具備序列號的消息的消息已被處理。
下面是Java中以confirm模式向通道發佈大量消息並等待confirm的示例。
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved. // // This software, the RabbitMQ Java client library, is triple-licensed under the // Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 // ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see // LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, // please see LICENSE-APACHE2. // // This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, // either express or implied. See the LICENSE file for specific language governing // rights and limitations of this software. // // If you have any questions regarding licensing, please contact us at // info@rabbitmq.com.
package com.rabbitmq.examples;
import java.io.IOException;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; import com.rabbitmq.client.QueueingConsumer;
public class ConfirmDontLoseMessages { static int msgCount = 10000; final static String QUEUE_NAME = "confirm-test"; static ConnectionFactory connectionFactory;
public static void main(String[] args) throws IOException, InterruptedException { if (args.length > 0) { msgCount = Integer.parseInt(args[0]); }
connectionFactory = new ConnectionFactory();
// Consume msgCount messages. (new Thread(new Consumer())).start(); // Publish msgCount messages and wait for confirms. (new Thread(new Publisher())).start(); }
@SuppressWarnings("ThrowablePrintedToSystemOut") static class Publisher implements Runnable { public void run() { try { long startTime = System.currentTimeMillis();
// Setup Connection conn = connectionFactory.newConnection(); Channel ch = conn.createChannel(); ch.queueDeclare(QUEUE_NAME, true, false, false, null); ch.confirmSelect();
// Publish for (long i = 0; i < msgCount; ++i) { ch.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_BASIC, "nop".getBytes()); }
ch.waitForConfirmsOrDie();
// Cleanup ch.queueDelete(QUEUE_NAME); ch.close(); conn.close();
long endTime = System.currentTimeMillis(); System.out.printf("Test took %.3fs\n", (float)(endTime - startTime)/1000); } catch (Throwable e) { System.out.println("foobar :("); System.out.print(e); } } }
static class Consumer implements Runnable { public void run() { try { // Setup Connection conn = connectionFactory.newConnection(); Channel ch = conn.createChannel(); ch.queueDeclare(QUEUE_NAME, true, false, false, null);
// Consume QueueingConsumer qc = new QueueingConsumer(ch); ch.basicConsume(QUEUE_NAME, true, qc); for (int i = 0; i < msgCount; ++i) { qc.nextDelivery(); }
// Cleanup ch.close(); conn.close(); } catch (Throwable e) { System.out.println("Whoosh!"); System.out.print(e); } } } } |
在特殊狀況下,當broker沒法成功處理消息時,代替basic.ack,broker將發送一個basic.nack。在這種狀況下,basic.nack的字段具備與basic.ack中相應的含義相同的含義,而且請求字段應該被忽略。broker表示沒法處理消息,拒絕對其發送一則或多封消息;在這一點上,客戶端可能會選擇從新發布消息。
通道置於confirm模式後,全部後續發佈的消息將被confirm或不存在一次。不能保證消息被confirm多久。沒有任何消息將被confirm和否認。
若是在負責隊列的Erlang進程中發生內部錯誤,則只會傳遞basic.nack。
當消息被從新排隊時,若是可能,它將被置於其隊列中的原始位置。若是沒有(因爲多個消費者共享隊列時因爲其餘消費者的併發Delivery和confirm),該消息將被從新排列到更接近隊列頭的位置。
對於不可路由的消息,一旦交換驗證消息將不會路由到任何隊列(返回空列表的隊列),broker將發出confirm。若是消息也被髮布爲強制性,則basic.return將在basic.ack以前發送給客戶端。否認的confirm也是如此(basic.nack)。
對於可路由消息,當全部隊列接受消息時,發送basic.ack。對於路由到持久隊列的持久消息,這意味着持續到磁盤。對於鏡像隊列,這意味着全部鏡像都已接受該消息。
在將消息持續存儲到磁盤後,將發送一個持久消息的basic.ack路由到持久化隊列。 RabbitMQ消息存儲在間隔(幾百毫秒)以後分批地將消息存儲到磁盤,以最小化fsync(2)調用的數量,或者當隊列空閒時。這意味着在一個恆定的負載下,basic.ack的延遲能夠達到幾百毫秒。爲了提升吞吐量,強烈建議應用程序異步處理confirm(做爲流)或發佈批次的消息,並等待未完成的confirm。客戶端庫之間的具體API有所不一樣。
在大多數狀況下,RabbitMQ將按照發布的相同順序向Producerconfirm消息(這適用於在單個頻道上發佈的消息)。然而,發佈者的confirm是異步發出的,能夠confirm一個消息或一組消息。發出confirm的確切時刻取決於消息的傳遞模式(持久與瞬態)以及消息被路由到的隊列的屬性(見上文)。也就是說,不一樣的消息能夠被認爲是準備好在不一樣的時間進行confirm。這意味着與其各自的消息相比,confirm能夠以不一樣的順序到達。應用程序不該該依賴於confirm的順序。
若是在全部消息寫入磁盤以前崩潰,broker將丟失持久的消息。在某些狀況下,這將致使broker以驚人的方式表現。
例如,考慮這種狀況:
客戶端向持久隊列發佈持久消息
客戶端從隊列中消耗消息(指出消息是持久的,隊列持久的),可是尚未肯定,
broker死亡並從新啓動,
客戶端從新鏈接並開始消費消息。
在這一點上,客戶端能夠合理地假設該消息將被再次發送。不是這樣:從新啓動致使broker丟失該消息。爲了保證持久性,客戶應該使用confirm。若是Producer的頻道處於confirm模式,Producer將不會收到丟失的消息的confirm(由於該消息還沒有寫入磁盤)。
限制
Deliverytags是一個64位長的值,所以其最大值爲9223372036854775807.因爲每一個渠道的Deliverytags是範圍限定的,因此Producer或消費者在實踐中不太可能超過此值。
上面的描述很是複雜,我總結來講,有一下幾種狀況須要在開發中注意:
消息可能終止在了傳送的層面,如操做系統緩衝層,或者網絡傳輸層,或者是在Broker接受以後,因爲內部故障不能處理,如exchang故障,也不會發送confirm給Producter。因此,在咱們的系統中,咱們在producter端實際上是有數據庫表存儲須要發送的消息的,咱們一次批量發送100條消息,一旦收到confirm,就會刪除這部分消息,因此沒有接收到confirm的話,就不刪除相應的數據。
還要保證消息的冪等。如此就能夠保證在producter層面不會丟失消息。
設置消息和exchang和queue都爲持久的。
咱們的程序不會出現這種狀況。
這種狀況下,消息會在queue中擠壓,也不會丟失。
broker將在下面的狀況中對消息進行confirm:
批量發送消息,並批量接收確認的例子:
// 發送持久化消息,消息內容爲helloWorld for (long i = 0; i < msgCount; ++i) { ch.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_BASIC, "helloWorld".getBytes()); } // 等待全部消息都被ack或者nack,若是某個消息被nack,則拋出IOException ch.waitForConfirmsOrDie(); |
網上有人作的測試,使用這種批量確認的模式,和使用異步的方式,性能差的不是太多。可是若是使用單條確認,性能將差異數倍。