rabbitmq可靠性

本文翻譯彙總自rabbitmq的官方文檔。 java

翻譯使用谷歌翻譯後簡單修改,部份內容讀起來仍然比較晦澀,不過意思傳達到了。 數據庫

可靠性指南

 

本頁介紹瞭如何使用AMQP和RabbitMQ的各類功能來實現可靠的傳送 - 確保消息始終被傳遞,甚至在系統的任何部分遇到故障。 express

 

什麼能夠失敗?

 

網絡問題多是最多見的失敗類。網絡不只可能出現故障,防火牆能夠中斷空閒鏈接,而且不會當即檢測到網絡故障。 服務器

 

除了鏈接故障以外,broker和客戶端應用程序可能會隨時遇到硬件故障(或軟件崩潰)。此外,即便客戶端應用程序持續運行,邏輯錯誤也可能致使通道或鏈接錯誤,迫使客戶端創建新的通道或鏈接,並從問題中恢復。 網絡

 

鏈接失敗

 

在鏈接失敗的狀況下,客戶端將須要與broker創建新的鏈接。之前鏈接中打開的任何通道都將自動關閉,這些通道也須要從新打開。 併發

 

通常來講,當鏈接失敗時,客戶端將被鏈接引起異常(或相似的語言結構)通知。官方Java和.NET客戶端還提供了回調方法,讓您聽到其餘上下文中的鏈接失敗 - Java在Connection和Channel類上都提供了ShutdownListener回調,.NET客戶端提供了IConnection.ConnectionShutdown和IModel.ModelShutdown事件目的。 異步

 

Acknowledgements和confirm

 

當鏈接失敗時,消息可能在客戶端和服務器之間傳輸 - 它們可能處於被解析或生成的中間,在OS緩衝區或電線上。傳輸中的消息將丟失 - 它們將須要重傳。Acknowledgements讓服務器和客戶端知道什麼時候這樣作。 性能

 

Acknowledgements能夠在兩個方向使用 - 容許消費者向服務器指示它已經接收/處理了消息,並容許服務器向生產者指示相同的東西。 RabbitMQ將後一種狀況稱爲"confirm"。 測試

 

固然,TCP確保已經接收到數據包,而且將從新發送,直到它們 - 但這只是網絡層。Acknowledgements和confirm代表已收到消息並採起行動。confirm信號表示接收到消息,而且轉讓全部權,接收方承擔所有責任。 優化

 

Acknowledgements所以具備語義 - 消費的應用程序不該該confirm消息,直到它完成了與它們須要的任何操做 - 將它們記錄在數據庫中,轉發它們,將它們打印到紙張或其餘任何東西上。一旦這樣作,broker能夠自由地忘記該消息。

 

一樣,broker一旦承擔責任,就會confirm消息(見這裏是什麼意思)。

 

confirm的使用保證至少一次Delivery。沒有confirm,在發佈和消費操做期間可能發生消息丟失,而且只有最多的一次Delivery才能獲得保證。

 

用心跳檢測死TCP鏈接

 

在某些類型的網絡故障中,數據包丟失可能意味着中斷的TCP鏈接須要較長時間(例如,在Linux上使用默認配置約11分鐘)才能被操做系統檢測到。 AMQP 0-9-1提供心跳功能,以確保應用程序層及時發現鏈接中斷(以及徹底無響應的對等體)。心跳也能夠防止可能終止"空閒"TCP鏈接的某些網絡設備。有關詳細信息,請參閱心跳。

 

在broker

 

爲了不在broker中丟失消息,咱們須要應對broker從新啓動,broker硬件故障,甚至是甚至broker崩潰。

 

爲了確保從新啓動時消息和broker定義生效,咱們須要確保它們在磁盤上。 AMQP標準具備交換,隊列和持久消息的耐久性概念,要求持久對象或持久消息將在從新啓動後生存。有關持久性和持久性的具體標誌的更多詳細信息,請參見"AMQP概念指南"。

羣集和高可用性

 

若是咱們須要確保咱們的broker倖存硬件故障,咱們可使用RabbitMQ的集羣。在RabbitMQ集羣中,全部定義(交換,綁定,用戶等)都跨整個集羣鏡像。隊列的行爲方式不一樣,默認狀況下只駐留在單個節點上,但能夠跨多個或全部節點進行鏡像。隊列保持可見,而且能夠從全部節點訪問,不管它們位於何處。

 

鏡像隊列在全部已配置的集羣節點之間複製其內容,能夠無縫地容忍節點故障,而且不會丟失消息(儘管請參閱非同步從站上的此註釋)。然而,消費應用程序須要注意,當隊列失敗時,消費者將被取消,他們將須要從新考慮 - 有關詳細信息,請參閱文檔。

 

在Producer

 

當使用confirms時,從通道恢復的生產者或鏈接故障應重發任何還沒有從broker收到confirm的消息。這裏存在消息重複的可能性,由於broker可能發送了一個從未到達生產者的confirm(因爲網絡故障等)。所以,消費者應用程序將須要以冪等(重複執行的效果一致)方式執行重複數據刪除或處理傳入的消息。

 

確保消息路由

 

在某些狀況下,生產者可能很重要的是確保他們的消息被路由到隊列(儘管並不老是 - 在公共子系統生產者只會發佈的狀況下,若是沒有消費者感興趣,那麼消息是正確的丟棄)。

 

爲了確保消息被路由到一個已知的隊列,生產者只能聲明一個目標隊列並直接發佈給它。若是消息可能以更復雜的方式進行路由,可是生產者仍然須要知道他們是否到達了至少一個隊列,則能夠在basic.publish上設置mandatory標誌,確保basic.return(包含回覆碼和一些文本解釋)將被髮送回客戶端,若是沒有隊列被適當地綁定。

 

在消費者

 

在網絡故障(或節點崩潰)的狀況下,可能消息重複,消費者必須準備好處理它們。若是可能,最簡單的方法是確保您的消費者以冪等方式處理消息,而不是明確處理重複數據消除。

 

不能處理的消息

 

若是消費者肯定它不能處理消息,那麼它可使用basic.reject(或basic.nack)拒絕它,要求服務器從新啓動它(在這種狀況下,服務器可能被配置爲死信)代替。

 

 

消費者Acknowledgements和Producerconfirm

 

介紹

 

使用消息傳遞broker(如RabbitMQ)的系統按照定義分佈。因爲發送的協議方法(消息)不能保證到達對等體或被其成功處理,因此發佈者和消費者都須要一種用於傳送和處理confirm的機制。 RabbitMQ支持的幾種消息協議提供了這樣的功能。

 

(消費者)DeliveryAcknowledgements

 

當RabbitMQ向消費者發送消息時,須要知道什麼時候成功發送消息。什麼樣的邏輯優化取決於系統。所以,它主要是應用程序的決定。

 

在咱們繼續討論其餘主題以前,重要的是要解釋Delivery是如何被識別的(並且confirm代表他們各自的Delivery)。當消費者(訂閱)註冊時,消息將由RabbitMQ使用basic.deliver方法傳遞(推送)。該方法攜帶Deliverytags,其惟一地標識信道上的傳遞。

 

Deliverytags是單調增加的正整數,並由客戶端庫呈現。認可Delivery的客戶端庫方法將Deliverytags做爲參數。

 

頻道預取設置(QoS)

 

因爲消息以異步方式發送(推送)到客戶端,所以一般在任何給定時刻一般會有多個消息"在飛行中"。此外,客戶端的手動confirm本質上也是異步的。因此有一個未被confirm的Deliverytags的滑動窗口。開發人員一般會傾向於限制此窗口的大小,以免消費者端端的無限緩衝區問題。這是經過使用basic.qos方法設置"預取計數"值來完成的。該值定義了通道上容許的未confirmDelivery的最大數量。一旦數量達到配置的計數,RabbitMQ將中止在通道上傳遞更多消息,除非至少有一個未confirm的消息被confirm。

 

例如,鑑於在通道Ch上未confirm的Deliverytags5,6,7和8設置爲4,RabbitMQ不會再推送任何更多的Delivery,除非至少有一個未完成的Delivery被confirm。當經過delivery_tag設置爲8的confirm幀到達該通道時,RabbitMQ將會注意並傳遞一條消息。

 

值得重申的是,Delivery流程和手動客戶端confirm徹底是異步的。所以,若是在飛行中已經有Delivery時改變了預取值,則出現天然競爭條件,而且可能暫時超過在通道上預取計數未confirm的消息。

 

能夠爲通道或消費者配置QoS設置。有關詳細信息,請參閱消費者預取。

 

即便在手動confirm模式下,QoS設置也不會影響使用basic.get("pull API")獲取的消息。

 

 

Producer confirm

 

使用標準AMQP 0-9-1,保證消息不丟失的惟一方法是使用事務 - 使信道事務發佈,發佈消息,提交。在這種狀況下,交易是沒必要要的重量級,並將吞吐量下降250倍。爲了彌補這一點,引入了confirm機制。它模仿了協議中已經存在的消費者confirm機制。

 

要啓用confirm,客戶端發送confirm.select方法。根據是否設置不等待,broker能夠經過confirm.select-ok進行回覆。一旦在通道上使用了confirm.select方法,就被認爲處於confirm模式。事務通道不能進入confirm模式,一旦通道處於confirm模式,則不能進行事務處理。

 

一旦一個通道處於confirm模式,broker和客戶端都會計數消息(從第一個confirm.select開始計數)。而後,broker經過在同一個頻道上發送basic.ack來confirm消息。發送tags字段包含已confirm消息的序列號。broker還能夠在basic.ack中設置多個字段,以指示全部到達幷包含具備序列號的消息的消息已被處理。

 

下面是Java中以confirm模式向通道發佈大量消息並等待confirm的示例。

// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.

//

// This software, the RabbitMQ Java client library, is triple-licensed under the

// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2

// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see

// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,

// please see LICENSE-APACHE2.

//

// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,

// either express or implied. See the LICENSE file for specific language governing

// rights and limitations of this software.

//

// If you have any questions regarding licensing, please contact us at

// info@rabbitmq.com.

 

 

package com.rabbitmq.examples;

 

import java.io.IOException;

 

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.MessageProperties;

import com.rabbitmq.client.QueueingConsumer;

 

public class ConfirmDontLoseMessages {

static int msgCount = 10000;

final static String QUEUE_NAME = "confirm-test";

static ConnectionFactory connectionFactory;

 

public static void main(String[] args)

throws IOException, InterruptedException

{

if (args.length > 0) {

msgCount = Integer.parseInt(args[0]);

}

 

connectionFactory = new ConnectionFactory();

 

// Consume msgCount messages.

(new Thread(new Consumer())).start();

// Publish msgCount messages and wait for confirms.

(new Thread(new Publisher())).start();

}

 

@SuppressWarnings("ThrowablePrintedToSystemOut")

static class Publisher implements Runnable {

public void run() {

try {

long startTime = System.currentTimeMillis();

 

// Setup

Connection conn = connectionFactory.newConnection();

Channel ch = conn.createChannel();

ch.queueDeclare(QUEUE_NAME, true, false, false, null);

ch.confirmSelect();

 

// Publish

for (long i = 0; i < msgCount; ++i) {

ch.basicPublish("", QUEUE_NAME,

MessageProperties.PERSISTENT_BASIC,

"nop".getBytes());

}

 

ch.waitForConfirmsOrDie();

 

// Cleanup

ch.queueDelete(QUEUE_NAME);

ch.close();

conn.close();

 

long endTime = System.currentTimeMillis();

System.out.printf("Test took %.3fs\n",

(float)(endTime - startTime)/1000);

} catch (Throwable e) {

System.out.println("foobar :(");

System.out.print(e);

}

}

}

 

static class Consumer implements Runnable {

public void run() {

try {

// Setup

Connection conn = connectionFactory.newConnection();

Channel ch = conn.createChannel();

ch.queueDeclare(QUEUE_NAME, true, false, false, null);

 

// Consume

QueueingConsumer qc = new QueueingConsumer(ch);

ch.basicConsume(QUEUE_NAME, true, qc);

for (int i = 0; i < msgCount; ++i) {

qc.nextDelivery();

}

 

// Cleanup

ch.close();

conn.close();

} catch (Throwable e) {

System.out.println("Whoosh!");

System.out.print(e);

}

}

}

}

否認confirm

 

在特殊狀況下,當broker沒法成功處理消息時,代替basic.ack,broker將發送一個basic.nack。在這種狀況下,basic.nack的字段具備與basic.ack中相應的含義相同的含義,而且請求字段應該被忽略。broker表示沒法處理消息,拒絕對其發送一則或多封消息;在這一點上,客戶端可能會選擇從新發布消息。

 

通道置於confirm模式後,全部後續發佈的消息將被confirm或不存在一次。不能保證消息被confirm多久。沒有任何消息將被confirm和否認。

 

若是在負責隊列的Erlang進程中發生內部錯誤,則只會傳遞basic.nack。

 

當消息被從新排隊時,若是可能,它將被置於其隊列中的原始位置。若是沒有(因爲多個消費者共享隊列時因爲其餘消費者的併發Delivery和confirm),該消息將被從新排列到更接近隊列頭的位置。

 

何時confirm message?

 

對於不可路由的消息,一旦交換驗證消息將不會路由到任何隊列(返回空列表的隊列),broker將發出confirm。若是消息也被髮布爲強制性,則basic.return將在basic.ack以前發送給客戶端。否認的confirm也是如此(basic.nack)。

 

對於可路由消息,當全部隊列接受消息時,發送basic.ack。對於路由到持久隊列的持久消息,這意味着持續到磁盤。對於鏡像隊列,這意味着全部鏡像都已接受該消息。

 

持久化消息的Ack延遲

 

在將消息持續存儲到磁盤後,將發送一個持久消息的basic.ack路由到持久化隊列。 RabbitMQ消息存儲在間隔(幾百毫秒)以後分批地將消息存儲到磁盤,以最小化fsync(2)調用的數量,或者當隊列空閒時。這意味着在一個恆定的負載下,basic.ack的延遲能夠達到幾百毫秒。爲了提升吞吐量,強烈建議應用程序異步處理confirm(做爲流)或發佈批次的消息,並等待未完成的confirm。客戶端庫之間的具體API有所不一樣。

 

Producerconfirm的訂購注意事項

 

在大多數狀況下,RabbitMQ將按照發布的相同順序向Producerconfirm消息(這適用於在單個頻道上發佈的消息)。然而,發佈者的confirm是異步發出的,能夠confirm一個消息或一組消息。發出confirm的確切時刻取決於消息的傳遞模式(持久與瞬態)以及消息被路由到的隊列的屬性(見上文)。也就是說,不一樣的消息能夠被認爲是準備好在不一樣的時間進行confirm。這意味着與其各自的消息相比,confirm能夠以不一樣的順序到達。應用程序不該該依賴於confirm的順序。

 

Producer confirm和保證Delivery

 

若是在全部消息寫入磁盤以前崩潰,broker將丟失持久的消息。在某些狀況下,這將致使broker以驚人的方式表現。

 

例如,考慮這種狀況:

 

客戶端向持久隊列發佈持久消息

客戶端從隊列中消耗消息(指出消息是持久的,隊列持久的),可是尚未肯定,

broker死亡並從新啓動,

客戶端從新鏈接並開始消費消息。

在這一點上,客戶端能夠合理地假設該消息將被再次發送。不是這樣:從新啓動致使broker丟失該消息。爲了保證持久性,客戶應該使用confirm。若是Producer的頻道處於confirm模式,Producer將不會收到丟失的消息的confirm(由於該消息還沒有寫入磁盤)。

限制

 

最大Deliverytags

 

Deliverytags是一個64位長的值,所以其最大值爲9223372036854775807.因爲每一個渠道的Deliverytags是範圍限定的,因此Producer或消費者在實踐中不太可能超過此值。

 

 

我的理解:

上面的描述很是複雜,我總結來講,有一下幾種狀況須要在開發中注意:

  • Producter發送消息以後,沒有收到Broker的confirm:

消息可能終止在了傳送的層面,如操做系統緩衝層,或者網絡傳輸層,或者是在Broker接受以後,因爲內部故障不能處理,如exchang故障,也不會發送confirm給Producter。因此,在咱們的系統中,咱們在producter端實際上是有數據庫表存儲須要發送的消息的,咱們一次批量發送100條消息,一旦收到confirm,就會刪除這部分消息,因此沒有接收到confirm的話,就不刪除相應的數據。

還要保證消息的冪等。如此就能夠保證在producter層面不會丟失消息。

  • broker接收到消息以後,在exchang或者queue中丟失:

設置消息和exchang和queue都爲持久的。

  • 找不到消息對應的queue

咱們的程序不會出現這種狀況。

  • queue沒有對應的consumer

這種狀況下,消息會在queue中擠壓,也不會丟失。

  • 消息可能會重複發送,因此須要保證消息處理的冪等性。

 

 

broker將在下面的狀況中對消息進行confirm:

  • broker發現當前消息沒法被路由到指定的queues中(若是設置了mandatory屬性,則broker會發送basic.return) 
  • 非持久屬性的消息到達了其所應該到達的全部queue中(和鏡像queue中)
  • 持久消息到達了其所應該到達的全部queue中(和鏡像中),並被持久化到了磁盤(fsync) 
  • 持久消息從其所在的全部queue中被consume了(若是必要則會被ack)

 

批量發送消息,並批量接收確認的例子:

// 發送持久化消息,消息內容爲helloWorld for (long i = 0; i < msgCount; ++i)

{

ch.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_BASIC, "helloWorld".getBytes());

}

// 等待全部消息都被ack或者nack,若是某個消息被nack,則拋出IOException

ch.waitForConfirmsOrDie();

網上有人作的測試,使用這種批量確認的模式,和使用異步的方式,性能差的不是太多。可是若是使用單條確認,性能將差異數倍。

相關文章
相關標籤/搜索