RabbitMQ消息和隊列的TTL以及死信隊列和延遲隊列

TTL:Time To Live的簡稱,即過時時間。RabbitMQ能夠對消息和隊列設置TTL。app

設置消息的TTL

目前有兩種方法設置消息的TTL,第一種方法是經過隊列的屬性設置,隊列中的全部消息都有相同的過時時間。第二種方法是對消息自己進行單獨設置,每條消息的TTL能夠不一樣。若是兩種方法一塊兒使用那麼消息的TTL以二者之間較小的那個數值爲準。消息在隊列中的生存時間一旦超過設置的TTL值時,就會變成「死信」(Dead Message),消費者將沒法再收到該消息(這點不是絕對的)。優化

經過隊列的屬性設置

經過隊列屬性設置消息的TTL方法是在channel.queueDeclare方法中加入x-message-ttl參數實現的,這個參數的單位是毫秒(ms)。示例部分代碼以下:ui

        Map<String,Object> arg = new HashMap<String, Object>();
        arg.put("x-message-ttl",6000);
        channel.queueDeclare("normalQueue",true,false,false,arg);    

同時也能夠經過Policy的方式來設置TTL,示例以下:spa

rabbitmqctl set_policy TTL ".*"  '{''message-ttl":6000}' --apply-to queuescode

若是不設置TTL,則表示此消息不會過時;若是設置爲0,則表示除非此時能夠直接將消息投遞給消費者,不然該消息會被當即丟棄。orm

每條消息設置TTL

針對每條消息設置TTL的方法是在channel.basicPublish方法中加入expiration的屬性參數,單位爲毫秒。關鍵代碼以下:對象

        AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
        builder.deliveryMode(2);//持久化消息
        builder.expiration("6000");//設置過時時間
        AMQP.BasicProperties properties = builder.build();
        channel.basicPublish("exchange","routingkey",true,properties,"hello".getBytes());

對於經過隊列設置的TTL屬性的方法,一旦消息過時,就會從隊列中抹去,而對於每條消息的設置TTL屬性,即便消息過時,也不會立刻從隊列中抹去,由於每條消息是否過時是在即將投遞到消費者以前判斷的。blog

爲何這兩種方式的處理不同呢?rabbitmq

  由於在經過隊列設置的TTL屬性的方法裏,對於已過時的消息確定是在隊列的頭部,RabbitMQ只是須要按期從隊列頭開始掃描是否有過時消息便可。而在消息設置TTL的方法裏,每條消息的過時時間不一樣,若是要刪除全部的過時消息勢必要掃描整個隊列,因此不如等到此消息即將被消費時再判斷是否過時,若是過時再進行刪除便可。隊列

設置隊列的TTL

經過channel.queueDeclare方法中的x-expires參數能夠控制隊列被自動刪除前處於未使用狀態的時間。未使用的意思是隊列上沒有任何的消費者,隊列也沒有被從新聲明,而且在過時時間段內也未調用過Basic.Get命令。

RabbitMQ會確保在過時時間到達後將隊列刪除,可是不保障刪除的動做有多及時。在RabbitMQ重啓後,持久化的隊列的過時時間會被從新計算。

用於表示過時時間的x-expires參數以毫秒爲單位,而且服從和x-message-ttl同樣的約束條件,不過不能設置爲0,好比該參數設置爲1000,則表示該隊列若是在1秒內未使用則會被刪除。下面舉例一個建立過時時間爲30分鐘的隊列:

        Map<String,Object> ars = new HashMap<String, Object>();
        ars.put("x-expires",1800000);
        channel.queueDeclare("myqueue",false,false,false,ars);

死信隊列

DLX,全稱爲Dead-Letter-Exchange,能夠稱之爲死信交換器,也有人稱爲死信郵箱。當消息在一個隊列中變爲死信以後,它能被從新被髮送到另外一個交換器中,這個交換器就是DLX,綁定DLX的隊列就稱之爲死信隊列。

消息變成死信通常是由如下幾種狀況形成的:

  ❤ 消息被拒絕,而且設置requeue參數爲false

  ❤ 消息過時

  ❤ 隊列達到最大長度

DLX也是一個正常的交換器,和通常的交換器沒有區別,它能在任何的隊列上被指定,實際上就是設置某個隊列的屬性。當這個隊列存在死信時,RabbitMQ就會自動的將這個消息從新發布到設置的DLX上去,進而被路由到另外一個隊列,即死信隊列。能夠監聽這個隊列中的消息以進行相應的處理。

方法:

  經過在channel.queueDeclare方法中設置x-dead-letter-exchange參數來爲這個隊列添加DLX。示例代碼以下:

      channel.exchangeDeclare("dlx_exchange","direct");
        Map<String,Object> ars = new HashMap<String, Object>();
        ars.put("x-dead-letter-exchange","dlx_exchange");
        //能夠爲這個DLX指定路由鍵,若是沒有特殊指定,則使用原隊列的路由鍵
        ars.put("x-dead-letter-routing-key","dlx-routing-key");
        //爲隊列添加DLX
        channel.queueDeclare("myqueue",false,false,false,ars);

經過Policy的方式設置:

rabbitmqctl set_policy DLX ".*" '{"dead-letter-change":"dlx_exchange"}' --apply-to queues

下面舉個例子,爲其設置TTL和DLX,以下:

     channel.exchangeDeclare("exchange.dlx","direct",true);
        channel.exchangeDeclare("exchange.normal","fanout",true);
        Map<String,Object> ars = new HashMap<String, Object>();
        ars.put("x-dead-letter-exchange","exchange.dlx");
        //能夠爲這個DLX指定路由鍵,若是沒有特殊指定,則使用原隊列的路由鍵
        ars.put("x-dead-letter-routing-key","dlx-routing-key");
        ars.put("x-message-ttl",10000);
        //爲隊列添加DLX
        channel.queueDeclare("queue.normal",true,false,false,ars);
        channel.queueBind("queue.normal","exchange.normal","");
        channel.queueDeclare("queue.dlx",true,false,false,null);
        channel.queueBind("queue.dlx","exchange.dlx","dlx-routing-key");
        channel.basicPublish("exchange.normal","rk",MessageProperties.PERSISTENT_TEXT_PLAIN,"dlx".getBytes());

上述例子分別建立了兩個交換器exchange.normal和exchange.dlx,分別綁定了兩個隊列queue.normal和queue.dlx。

上述代碼執行過程:

  生產者首先發送一條攜帶路由鍵爲「rk」的消息,而後通過交換器exchange.normal順利的存儲到隊列queue.normal中,因爲隊列queue.normal設置了過時時間爲10s,在這10s內沒有消費者消費這條消息,那麼斷定這條消息過時。因爲設置了DLX,過時之時,消息會被丟給交換器excange.dlx中,這時找到與exchange.dlx匹配的隊列queue.dlx,最後消息被存儲在queue.dlx這個死信隊列中。

  對於RabbitMQ來講,DLX是一個很是有用的特性。它能夠處理異常的狀況下,消息不可以正確的被消費者消費(消費者調用了Basix.Nack或者Basic.Reject)而被置於死信隊列中的狀況,後續分析過程能夠經過消費這個死信隊列中的內容來分析當時所遇到的異常狀況,進而改善和優化系統。

延遲隊列

延遲隊列所存儲的對象是對應的延遲消息,所謂「延遲消息」是指當消息發送後,並不想讓消費者當即拿到消息,而是等待特定時間後,消費者才能拿到這個消息進行消費。

在AMQP協議中,或者RabbitMQ自己沒有直接支持延遲隊列的功能,可是能夠經過DLX和TTL模擬出延遲隊列的功能。

其實在文章的上半段中的最後一個例子就是延遲隊列的用法:對於queue.dlx這個死信隊列來說。假設一個應用中須要將每條消息設置爲10秒的延遲,生產者經過exchange.normal這個交換器將發送的消息存儲在queue.normal這個隊列中。消費者訂閱的並不是是queue.normal這個隊列,而是queue.dlx這個隊列。當消息從queue.normal這個消息隊列中過時以後被存入queue.dlx這個隊列中,消費者就剛好延遲10秒接收到這個消息。

   在真實的應用中,對於延遲隊列能夠根據延遲時間的長短分爲多個等級,通常爲5秒,10秒,30秒,1分鐘,5分鐘,10分鐘,30分鐘,一個小時這幾個維度。

參考:《RabbitMQ實戰指南》 朱忠華 編著; 

相關文章
相關標籤/搜索