RabbitMQ消息可靠性分析和應用

RabbitMQ流程簡介(帶Exchange)

       RabbitMQ使用一些機制來保證可靠性,如持久化、消費確認及發佈確認等。服務器

       先看如下這個圖:網絡

 

       P爲生產者,X爲中轉站(Exchange),紅色部分爲消息隊列,C一、C2爲消費者。併發

       整個流程分紅三部分:第一,生產者生產消息,發送到中轉站;第二,中轉站按定義的規則轉發消息到消息隊列;第三,消費者從消息隊列獲取消息進行消費(處理)。spa

RabbitMQ消息可靠性分析和應用

       應用代碼均使用C#客戶端代碼實現。code

1、發佈確認

       生產者生產消息,發送到中轉站的過程當中,可能會由於網絡丟包、網絡故障等問題形成消息丟失。爲了確保生產者發送的消息不會丟失,RabbitMQ提供了發佈確認(Publisher Confirms)機制,從而提升消息的可靠性(注意:發佈確認機制不能和事務機制一塊兒使用)。blog

       單條消息發佈確認:rabbitmq

1隊列

2事務

3內存

4

5

6

7

8

9

10

channel.ConfirmSelect();//發佈確認機制

string message = "msg";

var body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(

        exchange: "MarkTopicChange",

        routingKey: "MarkRouteKey.one",

        basicProperties: null,

        body: body

        );

bool isPublished = channel.WaitForConfirms();//通道(channel)裏消息發送成功返回true

       使用channel.ConfirmSelect,一旦信道進入確認模式,全部在該信道上面發佈的消息都會被指派一個惟一的ID(從1開始)。消息被投遞到全部匹配的隊列以後,RabbitMQ就會發送(Basic.Ack)給生產者(包含消息的惟一ID),生產者從而知道消息發送成功。

       多條消息發佈確認:

1

2

3

4

5

6

7

8

9

10

11

12

13

channel.ConfirmSelect();//發佈確認機制

foreach (var itemMsg in lstMsg)

{

    byte[] sendBytes = Encoding.UTF8.GetBytes(itemMsg);

    //發佈消息

    channel.BasicPublish(

        exchange: "MarkTopicChange",

        routingKey: "MarkRouteKey.one",

        basicProperties: null,

        body: sendBytes

        );

}

bool isAllPublished = channel.WaitForConfirms();//通道(channel)裏全部消息均發送才返回true 

       注意:多消息發佈確認機制狀況下,假若要發送100條消息,發送90條後,忽然網絡故障,後面的消息發送失敗了,那麼isAllPublished返回的是false,而前面90條消息已經發送到消息隊列了。咱們還不知道哪些消息是發送失敗的,因此不少條消息發佈確認,建議分幾回發送或多通道發送。

       此外,須要確保在中轉站(Exchange)的消息能夠順利到達消息隊列。

       (1)首先須要定義匹配的Exchange和Queue,根據Exchange的類型和routingKey肯定轉發的關係。

       (2)設置BasicPublish方法中mandatory參數爲true,而後監聽Exchange中沒有匹配的隊列的消息,而後進行相操做。

       (3)確保消息隊列有足夠內存存儲消息。

       RabbitMQ默認配置vm_memory_high_watermark爲0.4。意思是控制消息佔40%內存左右。vm_memory_high_watermark_paging_ratio爲0.5,當消息佔用內存超過50%,RabbitMQ會把消息轉移到磁盤上以釋放內存。當磁盤剩餘空間小於閥值disk_free_limit(默認爲50M),全部生產者阻塞,避免充滿磁盤,致使全部的寫操做失敗。

       RabbitMQ配置文件通常在%APPDATA%\RabbitMQ\rabbitmq.config.

       %APPDATA% 通常爲 C:\Users\%USERNAME%\AppData\Roaming(Windows環境)

2、持久化

       消息存放到消息隊列後,在不配置消息持久化的狀況下,若服務器重啓、關閉或宕機等,消息都會丟失。配置持久化能夠有效提升消息的可靠性。持久化須要同時配置消息持久化和隊列持久化。單配置消息持久化,隊列消失了,消息沒有地方存放;單配置隊列持久化,隊列還在,消息沒了。

       隊列持久化在定義隊列時候配置

1

2

3

4

5

6

7

8

//定義隊列

channel.QueueDeclare(

    queue: "Mark_Queue"//隊列名稱

    durable: true//隊列磁盤持久化                  

    exclusive: false,//是否排他的,false。若是一個隊列聲明爲排他隊列,該隊列首次聲明它的鏈接可見,並在鏈接斷開時自動刪除

    autoDelete: false,//是否自動刪除,通常設成false

    arguments: null

    );

  消息持久化在發佈消息時候配置

1

2

3

4

5

6

7

8

9

10

//消息持久化,把DeliveryMode設成2

IBasicProperties properties = channel.CreateBasicProperties();

properties.DeliveryMode = 2;

    //發佈消息

    channel.BasicPublish(

        exchange: "MarkTopicChange",

        routingKey: "MarkRouteKey.one",

        basicProperties: properties,

        body: sendBytes

        );

       如何配置了事務機制或發佈確認(publisher confirm)機制,服務端的返回Basic.Ack是在消息落盤以後執行的,進一步的提升了消息的可靠性。

       爲了防止磁盤損壞帶來的消息丟失,能夠配置鏡像隊列,這裏不做介紹。

3、消費確認

       爲了確保消息被消費者消費,RabbitMQ提供消費確認模式(consumer Acknowledgements)。自動確認模式,當消費者成功接收到消息後,自動通知RabbitMQ,把消息隊列中相應消息刪除。這很大程度上知足不了咱們,假如消費者接收到消息後,服務器宕機,消息還沒處理完成,這樣就會形成消息丟失。手動確認模式,當消費者成功處理完消息後,手動發消息通知RabbitMQ,把消息隊列中相應消息刪除。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

consumer.Received += (model, ea) =>

{

    var body = ea.Body;

    var message = Encoding.UTF8.GetString(body);

    var routingKey = ea.RoutingKey;

    Console.WriteLine(" [x] Received '{0}':'{1}'",

                      routingKey,

                      message);

 

//確認該消息已被消費,發刪除消息給RabbitMQ,把消息隊列中的消息刪除

channel.BasicAck(ea.DeliveryTag, false);

//消費消息失敗,拒絕此消息,重回隊列,讓它能夠繼續發送到其餘消費者

//channel.BasicReject(ea.DeliveryTag, true);

//消費消息失敗,拒絕多條消息,重回隊列,讓它們能夠繼續發送到其餘消費者

//channel.BasicNack(ea.DeliveryTag, true, true);

};

//手動確認消息,把autoAck設成false

channel.BasicConsume(queue: "Mark_Queue",

                     autoAck: false,

                     consumer: consumer);

       這裏值得注意的是,消息處理完成後,必定要把處理完成的消息發送到RabbitMQ(channel.BasicAck(ea.DeliveryTag, false)),否則RabbitMQ會一直等待,從而形成內存泄露。若處理消息過程當中發生異常,能夠使用channel.BasicReject(ea.DeliveryTag, true)來拒絕此消息,讓它重回隊列。若RabbitMQ收不到消費者任何確認消息的信號(包括確認信號,拒絕信號燈),直到此消費者斷開鏈接,消息才能重回隊列,繼續發送到其餘消費者。

       提醒一下,假如消費者消費消息的方法不支持併發(取決於需求),能夠限制消費者每次只接收一條消息。

1

channel.BasicQos(0, 1, false);

相關文章
相關標籤/搜索