【rabbitmq】rabbitmq概念解析--消息確認--示例程序

概述

本示例程序所有來自rabbitmq官方示例程序,rabbitmq-demo
官方共有6個demo,針對不一樣的語言(如 C#,Java,Spring-AMQP等),都有不一樣的示例程序;
本示例程序主要是Spring-AMQP的參考示例,若是須要其餘語言的參考示例,能夠參考官網;html

rabbitmq模擬器

模擬器

rabbitmq簡介

核心架構圖

核心架構圖
數據流轉圖
架構圖

AMQP 0-9-1 Model Explained

重要語法說明

  • producer或publisher: 消息生產者/發佈者,即:產生消息的;
  • Exchange:producer或publisher只會將message發送到Exchange,目前有4種不一樣的Exchange類型;
  • Queue:消息隊列,全部的消費者都是直接從Queue獲取Message並消費;
  • Binging:鏈接Exchange和Queue的紐帶,決定Exchange如何路由消息到不一樣的Queue;
  • routingKey:生產者-->message-->Exchange,須要指定一個key,叫作routingKey;
  • routingKey:Exchange-->Binging-->Queue,Binging有一個Key值,叫routingKey或bingingKey;
  • bingingKey:Exchange-->Binging-->Queue,Binging有一個Key值,bingingKey;

核心理解

4種不一樣的Exchange,對routingKey的解釋都不相同;
對routingKey的不一樣解釋,決定了Exchange路由Message到Queue的不一樣方案;java

  1. direct exchange: 匹配2個routingKey(即routingKey和bingingKey)是否相等,相等時才進行消息路由;
  2. fanout exchange: 忽略routingKey,會將Message路由到全部綁定的Queue;
  3. topic exchange: routingKey格式形如aaa.bbb.xxx*.ccc.dd.#,相似正則表達式匹配;
  4. headers exchange:

jar包說明

  • Java版本:
    Java版本使用以下jar(說明:如果使用):
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.0.2</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

demo1: 單生產者-單消費者

單生產者-單消費者官方示例

spring.profiles.active=hello-world, sender, receiver

demo2: 單生產者-多消費者

Work queues官方示例
workQueue
application.properties配置github

spring.profiles.active=work-queues, sender, receiver
#spring.profiles.active=work-queues, sender
#spring.profiles.active=work-queues, receiver

詳細描述參見:單生產者-多消費者詳細正則表達式


demo3: 發佈/訂閱

Publish/Subscribe官方示例
發佈/訂閱spring

  • 消費廣播到多個消費者進行消費;
  • 使用fanout pattern;

application.properties配置緩存

spring.profiles.active=pub-sub, receiver , sender

詳細描述參見:發佈/訂閱詳細架構


demo4: Routing

Routing官方示例app

Direct exchange 模式進行route結構圖

direct-exchange
a message goes to the queues whose binding key exactly matches the routing key of the message;(相等時才路由)異步

Multiple bindings

Multiple bindings
兩個Queue使用相同的BingingKey(black) ==> 效果相似於:發佈/訂閱模式(demo3);

完整的結構圖

together

application.properties配置

pring.profiles.active=routing, receiver , sender

詳細描述參見:發佈/訂閱詳細


demo5: Topics

Topics官方示例
結構示例圖

  • 使用 Topic exchange實現;
  • 發送到Topic exchange的routingKey必須知足必定要求:用"."分割的words列表,如:*.aaa.bbb.#
  • BingingKey和routingKey有相同的格式要求;
  • * : 能夠匹配一個word;
  • #: 能夠匹配0個或多個words;

application.properties配置

pring.profiles.active=topics, receiver , sender

詳細描述參見:Topics


demo6: RPC over RabbitMQ

RPC官方示例

結構圖

架構圖

application.properties配置

spring.profiles.active=rpc,server
#spring.profiles.active=rpc,client

詳細描述參見:RPC


消費端確認

Delivery Identifiers: Delivery Tags

消費者註冊後,rabbitmq將消息交付給消費者時,都會帶有一個「Delivery Tags」,這個是惟一的ID標識,id以整數的遞增的方式實現。

Acknowledgement Modes(消費端)

自動確認模式

  • 發送以後,就認爲是發送成功(fire-and-forget)
  • 消息不停的發送到消費端消費,無需等待消費端任何確認;

缺點:

  • 可能形成消費端不堪重負;

手動模式

  1. basic.ack: 確定的確認;
  2. basic.nack: 否認的確認(RabbitMQ對AMQP 0-9-1的擴展),支持消息批量確認;
  3. basic.reject:否認的確認,消息消費失敗後,直接從broker中將消息delete不支持批量確認

Acknowledging Multiple Deliveries at Once(消息批量確認)

  • 一次確認多個消息發送,而不是每個消息單獨確認;
  • basic.reject:不具有該功能;
  • basic.nack: 具有該功能;

實現方式

  • multiple field: 設置爲true;

示例

假設:在Channel(ch)上有5,6,7,8這4個delivery tags未確認;

  • 狀況1,delivery_tag=8 & multiple=true: 則5,6,7,8這4個tags都將被確認;
  • 狀況2,delivery_tag=8 & multiple=false:則只有8被確認,而5,6,7將不會被被確認;

Channel Prefetch Count (QoS)[能夠設置消費端消費的速率]

  • 消息消費是異步完成的,手動確認也是異步的;
  • 有一部分消息是被消費了,可是還將來得及確認:但願控制未被確認消息的size,防止無界的緩存
  • prefetch count:使用basic.qos方法設置該值能夠控制未被確認消息的max size;
  • 當達到該最大值時,rabbitmq將中止交付消息進行消費;
  • 僅對basic.qos方法有效,對basic.get方法無效;

示例

假設:在Channel(Ch)上有5,6,7,8共4個未被確認的消息,且ch的prefetch count=4
結果:rabbitmq將不會再交付任何消息到該Channel上,除非有消息被確認;

消費確認選擇,prefetch設置以及吞吐量

  • 狀況1:增大prefetch:提升向消費者傳遞消息的速度;
  • 狀況2:自動確認模式能夠產生最佳的傳送速率;

應避免:

  1. 自動確認模式
  2. 手動確認模式 + 無限制的prefetch

結論:

  • 狀況1狀況2均可能致使交付但將來得及處理的Message增長,增大RAM的消耗;

推薦值:

  • prefetch: 100~300,能夠有效提升吞吐量,並避免RAM消耗過多的風險;

消費失敗或鏈接中斷: 自動從新reQueue

當消息發送給消費端後,若是出現以下狀況,則消息會從新reQueue,會被再次發送;

  1. TCP鏈接中斷;
  2. 消費端掛掉:沒法進行消息確認;

Client Errors: Double Acking and Unknown Tags

消費端沒法對同一個消息確認超過一次,當超過一次以後,將拋出Channel error: PRECONDITION_FAILED - unknown delivery tag XXXX

總結

  • 每一個交付給消費端的消息,都有一個惟一的標識delivery tag
  • 自動消息確認;
  • 手動消息確認:每一個消息單獨確認批量消息確認;
  • prefetchCount:能夠控制消息端的吞吐量,避免消費端消費過慢,產生RAM大量消耗;
  • 失敗重傳:TCP鏈接中斷消費端掛掉,都會引發消息從新入隊列,從新消費(手動消息確認時);
  • 沒法對同一個消息進行2次或2次以上的確認,不然會拋出異常;

發送端確認

Channel事務

  • 不推薦使用: 會嚴重下降吞吐量;

在 AMQP 0-9-1中,保證消息不丟失的惟一方法,就是使用事務;

  1. 開啓Channel事務;
  2. 發送消息,提交事務;

相似消費端的應答確認機制

  • confirm.select: 應用於Channel時,表示使用確認模式
  • 事務確認模式沒法共存:兩者只能選擇其一;

確認模式 (confirm.select)

  • 發送端使用confirm.select;
  • broker發送basic.ack來確認Message已被處理;
  • delivery-tag: 消息序列,具備惟一性;
  • multiple=true: 用於設置批量消息確認
  • 沒法保證消息什麼時候被確認;
  • 確認模式:消息要麼被confirmed(OK),要麼被nack(fail),且only once;

Java示例:(發送端發送大量messages,使用確認模式)
程序-確認模式

否認確認

異常狀況時,服務端沒法處理消息,則broker發送basic.nack來進行否認確認

應答延時和持久化消息

  • 僅當消息被持久化到disk以後,纔會發送basic.ack應答;
  • 吞吐量提升建議:異步處理應答批量發送消息;

應答順序

當使用異步發送和持久化消息時,broker對消息的確認順序可能和發送者的消息發送順序不一致;

發送確認 + 保證交付

  • 消息持久化: 並不能保證消息不丟失(在寫入disk前broker就掛掉);

限制

Delivery tag is a 64 bit long value, and thus its maximum value is 9223372036854775807.Since delivery tags are scoped per channel, it is very unlikely that a publisher or consumer will run over this value in practice.

參考

Consumer Acknowledgements and Publisher Confirms

相關文章
相關標籤/搜索