rabbitmq

1、rabbitmq關鍵字

Binding:Exchange和Exchange、Queue之間的鏈接關係
Queue:實際存儲消息。
Durability:是否持久化,Durable:是,Transient:否。
Auto delete:如選yes,表明當最後一個監聽被移除以後,該Queue會自動刪除。
Message:服務器和應用程序之間傳送的數據。本質上就是一段數據,由Properties和Payload(Body)組成。經常使用屬性:delivery mode、headers(自定義屬性)
Virtual host:虛擬主機,用於進行邏輯隔離,最上層的消息路由。一個Virtual Host裏面能夠有若干個Exchange和Queue,同一個Virtual Host裏面不能有相同名稱的Exchange或者Queue。

2、rabbitmq高級特性

  1. 消息如何保證100%的投遞成功?segmentfault

    生產端可靠性投遞服務器

    • 保障消息的成功發出
    • 保障MQ節點的成功接收
    • 發送端收到MQ節點(Broker)確認應答
    • 完善的消息進行補償機制

    可靠性投遞解決方案ide

    • 消息落庫,對消息狀態進行打標
    • 消息的延遲投遞,作二次確認,回調檢查
    • 消息集羣鏡像隊列:rabbitmq集羣模式很是經典的就是Mirror鏡像模式,保證100%數據不丟失,在實際工做中也是用的最多的。而且實現集羣很是簡單,通常互聯網公司都會構建這種鏡像集羣模式。https://segmentfault.com/a/11...
  2. 冪等性概念詳解
  3. 在海量訂單產生的業務高峯期,如何避免消息的重複消費問題?
  4. Confirm確認消息、Return返回消息fetch

    理解Confirm消息確認機制:日誌

    • 消息的確認,是指生產者投遞消息後,若是Broker收到消息,則會給咱們生產者一個應答。
    • 生產者進行接受應答,用來肯定這條消息是否正常的發送到了Broker,這種方式也是消息的可靠性投遞的核心保障。

    如何實現Confirm確認消息?code

    • 第一步:在channel上開啓確認模式:channel.confirmSelect()
    • 第二步:在channel上添加監聽:addConfirmListener,監聽成功和失敗的返回結果,根據具體的結果對消息進行從新發送、或者記錄日誌等後續處理。

    Return消息機制rabbitmq

    • Return Listener用於處理一些不可路由的消息
    • 咱們的消息生產者,經過制定一個Exchange和Routing Key,把消息送達到某一個隊列中去,而後咱們的消費者監聽隊列,進行消費處理。
    • 可是在某寫狀況下,若是咱們在發送消息的時候,當前的exchange不存在或者指定的路由key路由不到,這個時候若是咱們須要監聽這種不可達的消息,就要使用Return Listener。
    • Mandatory:若是爲true,則監聽器會接收到路由不可達的消息,而後進行後續處理,若是爲false,那麼broker端自動刪除該消息。

  • 自定義消費者
public class MyConsumer extends DefaultConsumer implements Consumer {

    public MyConsumer(Channel channel) {
        super(channel);
    }


    @Override
    public void handleDelivery(String arg0, Envelope arg1, BasicProperties arg2, byte[] arg3) throws IOException {
        
    }

}
  • 消息的ACK與重回隊列隊列

    消費端手工ACK(應答成功)和NACK(應答失敗)ip

    1. 消費端進行消費的時候,若是因爲業務異常咱們能夠進行日誌的記錄,而後進行補償。
    2. 因爲服務器宕機等嚴重問題,咱們就要手工進行ACK保障消費端消費成功。好比:消費一半的時候宕機了,broker端沒有收到應答,重發消息。

    消費端重回隊列ci

    • 消費端重回隊列是爲了對沒有處理成功的消息,把消息從新回遞給broker。
    • 通常咱們在實際應用中,都會關閉重回隊列,也就是設置爲false。

    channel.basciNack(envelope.getDeliveryTag(),false,true) // 爲true的話,在消費失敗的狀況下會重回隊列放入隊列末端。

  • 消息的限流

    假設有一個場景,RabbitMq服務器上有上萬條未處理的消息,咱們隨便打開一個消費者客戶端,會出現下面的狀況: 巨量的消息瞬間所有推送過來,可是咱們單個客戶端沒法同時處理這麼多數據。RabbitMq提供了一種qos(服務質量保證)功能,即在非自動確認消息的前提下,若是必定數目的消息(經過基於consume或者channel設置Qos的值)未被確認前,不進行消費新的消息。
    void BasicQos(unit prefetchSize, ushort prefetchCount, bool global);
    prefetchSize:0
    prefetchCount: 會告訴RabbitMq不要同時給一個消費者推送多於N個消息,即一旦有N個消息尚未被ack,則該consumer將block掉,知道有消息ack。
    global: true/false,是否將上面設置應用於channel,簡答點說,就是上面限制是channel級別的仍是consumer級別。
    // 限流方式,第一件事就是autoAck設置爲false,關閉自動簽收,必須手動簽收
    channel.basicQos(0,3,false); // 3表示若是消息積壓了1000條,先給我推3條,這三條消費結束後,我會給你一個ack表示這三條我已經處理完了,而後再給我推送3條...
    channel.basicConsume(queueName,false,new MyConsumer(channel))
    //在MyConsumer中對消息進行簽收ack
    channel.basicAck(envelope,getDeliveryTag(), false);
  • TTL消息

    TTL:是Time To Live。就是生存時間。

    • RabbitMQ支持消息的過時時間,在消息發送時能夠進行指定。
    • RabbitMQ支持隊列的過時時間,從消息入隊開始計算,只要超過了隊列的超時時間配置,那麼消息會自動清除。
  • 死信隊列

    死信隊列:DLX,Dead-Letter-Exchange。
    利用DLX,當消息在一個隊列中變成了死信(dead message:這條消息沒有消費者去消費)以後,他能被從新publish到另一個Exchange,這個Exchange就是DLX。
    進入死信隊列(進入死信的三種方式)

    • 1.消息被拒絕(basic.reject or basic.nack)而且requeue=false

    void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

    • 2.消息TTL過時
    • 3.隊列達到最大長度

    備註說明

    • DLX也是一個正常的Exchange,和通常的Exchange沒區別,他能在任何的隊列上被指定,實際上就是設置某個隊列的屬性。
    • 當這個隊列中有死信時,RabbitMQ會自動將這個消息從新發送到已經設置了的Exchange上去,進而被路由到另一個隊列上去。
    • 能夠監聽這個隊列中消息作相應的處理。

    死信隊列設置

    • 首先須要設置死信隊列的exchange和queue,而後進行綁定:Exchange:dlx.exchange、Queue:dlx.queue、RoutingKey:#
    • 而後進行正常聲明交換機、隊列、綁定,只不過咱們須要在隊列加上一個參數便可:arguments.put("x-dead-letter-exchange","dlx.exchange");
相關文章
相關標籤/搜索