RabbitMQ的開發應用

1.介紹

RabbitMQ 是一個由erlang語言編寫的、開源的、在AMQP基礎上完整的、可複用的企業消息系統。支持多種語言,包括java、Python、ruby、PHP、C/C++等。java

1.1.AMQP模型

AMQP:advanced message queuing protocol ,一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。基於此協議的客戶端與消息中間件可傳遞消息並不受客戶端/中間件不一樣產品、不一樣開發語言等條件的限制。算法

AMQP模型圖
amqp模型.jpgspring

1.1.1.工做過程

發佈者(Publisher)發佈消息(Message),經由交換機(Exchange)。json

交換機根據路由規則將收到的消息分發給與該交換機綁定的隊列(Queue)。緩存

最後 AMQP 代理會將消息投遞給訂閱了此隊列的消費者,或者消費者按照需求自行獲取。安全

一、發佈者、交換機、隊列、消費者均可以有多個。同時由於 AMQP 是一個網絡協議,因此這個過程當中的發佈者,消費者,消息代理 能夠分別存在於不一樣的設備上。springboot

二、發佈者發佈消息時能夠給消息指定各類消息屬性(Message Meta-data)。有些屬性有可能會被消息代理(Brokers)使用,然而其餘的屬性則是徹底不透明的,它們只能被接收消息的應用所使用。ruby

三、從安全角度考慮,網絡是不可靠的,又或是消費者在處理消息的過程當中意外掛掉,這樣沒有處理成功的消息就會丟失。基於此緣由,AMQP 模塊包含了一個消息確認(Message Acknowledgements)機制:當一個消息從隊列中投遞給消費者後,不會當即從隊列中刪除,直到它收到來自消費者的確認回執(Acknowledgement)後,才徹底從隊列中刪除。服務器

四、在某些狀況下,例如當一個消息沒法被成功路由時(沒法從交換機分發到隊列),消息或許會被返回給發佈者並被丟棄。或者,若是消息代理執行了延期操做,消息會被放入一個所謂的死信隊列中。此時,消息發佈者能夠選擇某些參數來處理這些特殊狀況。網絡

1.1.2.Exchange交換機

交換機是用來發送消息的 AMQP 實體。交換機拿到一個消息以後將它路由給一個或零個隊列。它使用哪一種路由算法是由交換機類型和綁定(Bindings)規則所決定的。常見的交換機有以下幾種:

  1. direct 直連交換機:Routing Key==Binding Key,嚴格匹配。
  2. fanout 扇形交換機:把發送到該 Exchange 的消息路由到全部與它綁定的 Queue 中。
  3. topic 主題交換機:Routing Key==Binding Key,模糊匹配。
  4. headers 頭交換機:根據發送的消息內容中的 headers 屬性進行匹配。
    具體有關這五種交換機的說明和用法,後續會有章節詳細介紹。

1.1.3.Queue隊列

AMQP 中的隊列(queue)跟其餘消息隊列或任務隊列中的隊列是很類似的:它們存儲着即將被應用消費掉的消息。隊列跟交換機共享某些屬性,可是隊列也有一些另外的屬性。

  • Durable(消息代理重啓後,隊列依舊存在)
  • Exclusive(只被一個鏈接(connection)使用,並且當鏈接關閉後隊列即被刪除)
  • Auto-delete(當最後一個消費者退訂後即被刪除)
  • Arguments(一些消息代理用他來完成相似與 TTL 的某些額外功能)

1.2.rabbitmq和kafka對比

rabbitmq遵循AMQP協議,用在實時的對可靠性要求比較高的消息傳遞上。kafka主要用於處理活躍的流式數據,大數據量的數據處理上。主要體如今:

1.2.1.架構

  1. rabbitmq:RabbitMQ遵循AMQP協議RabbitMQ的broker由Exchange,Binding,queue組成,其中exchange和binding組成了消息的路由鍵;客戶端Producer經過鏈接channel和server進行通訊,Consumer從queue獲取消息進行消費(長鏈接,queue有消息會推送到consumer端,consumer循環從輸入流讀取數據)。rabbitMQ以broker爲中心。
  2. kafka:kafka聽從通常的MQ結構,producer,broker,consumer,以consumer爲中心,消息的消費信息保存的客戶端consumer上,consumer根據消費的點,從broker上批量pull數據。

1.2.2.消息確認

  1. rabbitmq:有消息確認機制。
  2. kafka:無消息確認機制。

1.2.3.吞吐量

  1. rabbitmq:rabbitMQ在吞吐量方面稍遜於kafka,他們的出發點不同,rabbitMQ支持對消息的可靠的傳遞,支持事務,不支持批量的操做;基於存儲的可靠性的要求存儲能夠採用內存或者硬盤。
  2. kafka:kafka具備高的吞吐量,內部採用消息的批量處理,zero-copy機制,數據的存儲和獲取是本地磁盤順序批量操做,具備O(1)的複雜度,消息處理的效率很高。
    (備註:kafka零拷貝,經過sendfile方式。(1)普通數據讀取:磁盤->內核緩衝區(頁緩存 PageCache)->用戶緩衝區->內核緩衝區->網卡輸出;(2)kafka的數據讀取:磁盤->內核緩衝區(頁緩存 PageCache)->網卡輸出。

1.2.4.可用性

  1. rabbitmq(1)普通集羣:在多臺機器上啓動多個rabbitmq實例,每一個機器啓動一個。可是你建立的queue,只會放在一個rabbtimq實例上,可是每一個實例都同步queue的元數據。完了你消費的時候,實際上若是鏈接到了另一個實例,那麼那個實例會從queue所在實例上拉取數據過來。(2)鏡像集羣:跟普通集羣模式不同的是,你建立的queue,不管元數據仍是queue裏的消息都會存在於多個實例上,而後每次你寫消息到queue的時候,都會自動把消息到多個實例的queue裏進行消息同步。這樣的話,好處在於,一個機器宕機了,沒事兒,別的機器均可以用。壞處在於,第一,這個性能開銷太大了,消息同步全部機器,致使網絡帶寬壓力和消耗很重。第二,這麼玩兒,就沒有擴展性可言了,若是某個queue負載很重,你加機器,新增的機器也包含了這個queue的全部數據,並無辦法線性擴展你的queue
  2. kafka:kafka是由多個broker組成,每一個broker是一個節點;每建立一個topic,這個topic能夠劃分爲多個partition,每一個partition能夠存在於不一樣的broker上,每一個partition就放一部分數據。這就是自然的分佈式消息隊列,就是說一個topic的數據,是分散放在多個機器上的,每一個機器就放一部分數據。每一個partition的數據都會同步到其餘機器上,造成本身的多個replica副本,而後全部replica會選舉一個leader出來,主從結構。

1.2.5.集羣負載均衡

  1. rabbitmq:rabbitMQ的負載均衡須要單獨的loadbalancer進行支持,如HAProxy和Keepalived等。
  2. kafka:kafka採用zookeeper對集羣中的broker、consumer進行管理,能夠註冊topic到zookeeper上;經過zookeeper的協調機制,producer保存對應topic的broker信息,能夠隨機或者輪詢發送到broker上;而且producer能夠基於語義指定分片,消息發送到broker的某分片上。

2.結構

2.1.交換機模式

RabbitMQ經常使用的Exchange Type有fanout、direct、topic、headers這四種。

2.1.1.Direct Exchange

direct類型的Exchange路由規則很簡單,它會把消息路由到那些binding key與routing key徹底匹配的Queue中。

2.1.2.Topic Exchange

前面講到direct類型的Exchange路由規則是徹底匹配binding key與routing key,但這種嚴格的匹配方式在不少狀況下不能知足實際業務需求。topic類型的Exchange與direct類型的Exchage類似,也是將消息路由到binding key與routing key相匹配的Queue中,但支持模糊匹配:

  • routing key爲一個句點號「. 」分隔的字符串(咱們將被句點號「. 」分隔開的每一段獨立的字符串稱爲一個單詞),如「stock.usd.nyse」、「nyse.vmw」、「quick.orange.rabbit」
  • binding key與routing key同樣也是句點號「. 」分隔的字符串
  • binding key中能夠存在兩種特殊字符"*"與「#」,用於作模糊匹配,其中" * "用於匹配一個單詞,「#」用於匹配多個單詞(能夠是零個)

2.1.3.Fanout Exchange

fanout類型的Exchange路由規則很是簡單,它會把全部發送到fanout Exchange的消息都會被轉發到與該Exchange 綁定(Binding)的全部Queue上。
Fanout Exchange 不須要處理RouteKey 。只須要簡單的將隊列綁定到exchange 上。這樣發送到exchange的消息都會被轉發到與該交換機綁定的全部隊列上。相似子網廣播,每臺子網內的主機都得到了一份複製的消息。因此,Fanout Exchange 轉發消息是最快的。

2.1.4.Headers Exchange

headers類型的Exchange也不依賴於routing key與binding key的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配。
在綁定Queue與Exchange時指定一組鍵值對;當消息發送到Exchange時,RabbitMQ會取到該消息的headers(也是一個鍵值對的形式),對比其中的鍵值對是否徹底匹配Queue與Exchange綁定時指定的鍵值對;若是徹底匹配則消息會路由到該Queue,不然不會路由到該Queue。

2.1.5.Default Exchange 默認

嚴格來講,Default Exchange 並不該該和上面四個交換機在一塊兒,由於它不屬於獨立的一種交換機類型,而是屬於Direct Exchange 直連交換機。

默認交換機(default exchange)其實是一個由消息代理預先聲明好的沒有名字(名字爲空字符串)的直連交換機(direct exchange)。

它有一個特殊的屬性使得它對於簡單應用特別有用處:那就是每一個新建隊列(queue)都會自動綁定到默認交換機上,綁定的路由鍵(routing key)名稱與隊列名稱相同。

舉個例子:當你聲明瞭一個名爲 「search-indexing-online」 的隊列,AMQP 代理會自動將其綁定到默認交換機上,綁定(binding)的路由鍵名稱也是爲 「search-indexing-online」。因此當你但願將消息投遞給「search-indexing-online」的隊列時,指定投遞信息包括:交換機名稱爲空字符串,路由鍵爲「search-indexing-online」便可。

所以 direct exchange 中的 default exchange 用法,體現出了消息隊列的 point to point,感受像是直接投遞消息給指定名字的隊列。

2.2.持久化

雖然咱們要避免系統宕機,可是這種「不可抗力」總會有可能發生。rabbitmq若是宕機了,再啓動即是了,大不了有短暫時間不可用。但若是你啓動起來後,發現這個rabbitmq服務器像是被重置了,之前的exchange,queue和message數據都沒了,那就太使人崩潰了。不光業務系統由於無對應exchange和queue受影響,丟失的不少message數據更是致命的。因此如何保證rabbitmq的持久化,在服務使用前必須得考慮到位。

持久化能夠提升RabbitMQ的可靠性,以防在異常狀況(重啓、關閉、宕機等)下的數據丟失。RabbitMQ的持久化分爲三個部分:交換器的持久化、隊列的持久化和消息的持久化。

2.2.1.exchange持久化

exchange交換器的持久化是在聲明交換器的時候,將durable設置爲true

若是交換器不設置持久化,那麼在RabbitMQ交換器服務重啓以後,相關的交換器信息會丟失,不過消息不會丟失,可是不能將消息發送到這個交換器

spring中建立exchange時,構造方法默認設置爲持久化。

2.2.2.queue持久化

隊列的持久化在聲明隊列的時候,將durable設置爲true

若是隊列不設置持久化,那麼RabbitMQ交換器服務重啓以後,相關的隊列信息會丟失,同時隊列中的消息也會丟失

exchange和queue,若是一個是非持久化,另外一個是持久化,中bind時會報錯。

spring中建立exchange時,構造方法默認設置爲持久化。

2.2.3.message持久化

要確保消息不會丟失,除了設置隊列的持久化,還須要將消息設置爲持久化。經過將消息的投遞模式(BasicProperties中的deliveryMode屬性)設置爲2便可實現消息的持久化

  • 持久化的消息在到達隊列時就被寫入到磁盤,而且若是能夠,持久化的消息也會在內存中保存一份備份,這樣能夠提升必定的性能,只有在內存吃緊的時候纔會從內存中清除。
  • 非持久化的消息通常只保存在內存中,在內存吃緊的時候會被換入到磁盤中,以節省內存空間。

若是將全部的消息都進行持久化操做,這樣會影響RabbitMQ的性能。寫入磁盤的速度比寫入內存的速度慢很,因此要在可靠性和吞吐量之間作權衡。

在spring中,BasicProperties中的deliveryMode屬性,對應的是MessageProperties中的deliveryMode。平時使用的RabbitTemplate.convertAndSend()方法默認設置爲持久化,deliveryMode=2。若是須要設置非持久化發送消息,須要手動設置:

messageProperties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);

2.2.4.完整方案

這裏講解實現消息持久化的完整方案。

1、exchange、queue、message

要保證消息的持久化,在rabbitmq自己的結構上須要實現下面這些:

  • exchange交換機的durable設置爲true。
  • queue隊列的durable設置爲true。
  • message消息的投遞模式deliveryMode設置爲2。

2、發佈確認
前面是保證了消息在投遞到rabbitmq中,如何保證rabbit中消息的持久化。
那麼還須要保證生產者能成功發佈消息,如交換機名字寫錯了等等。能夠在發佈消息時設置投遞成功的回調,肯定消息能成功投遞到目標隊列中。

3、接收確認
對於消費者來講,若是在訂閱消息的時候,將autoAck設置爲true,那麼消費者接收到消息後,尚未處理,就出現了異常掛掉了,此時,隊列中已經將消息刪除,消費者不可以在收到消息。

這種狀況能夠將autoAck設置爲false,進行手動確認。

4、鏡像隊列集羣
在持久化後的消息存入RabbitMQ以後,還須要一段時間才能存入磁盤。RabbitMQ並不會爲每條消息都進行同步存盤,可能僅僅是保存到操做系統緩存之中而不是物理磁盤。若是在這段時間,服務器宕機或者重啓,消息還沒來得及保存到磁盤當中,從而丟失。對於這種狀況,能夠引入RabiitMQ鏡像隊列機制。

這裏強調是鏡像隊列集羣,而非普通集羣。由於出於同步效率考慮,普通集羣只會同步隊列的元數據,而不會同步隊列中的消息。只有升級成鏡像隊列集羣后,才能也同步消息。

每一個鏡像隊列由一個master和一個或多個mirrors組成。主節點位於一個一般稱爲master的節點上。每一個隊列都有本身的主節點。給定隊列的全部操做首先應用於隊列的主節點,而後傳播到鏡像。這包括隊列發佈(enqueueing publishes)、向消費者傳遞消息、跟蹤消費者的確認等等。

發佈到隊列的消息將複製到全部鏡像。無論消費者鏈接到哪一個節點,都會鏈接到master,鏡像會刪除在master上已確認的消息。所以,隊列鏡像提升了可用性,但不會在節點之間分配負載。 若是承載隊列master的節點出現故障,則最舊的鏡像將升級爲新的master,只要它已同步。根據隊列鏡像參數,也能夠升級未同步的鏡像。

3.開發

java開發上,這裏以spring-boot-starter-amqp爲例,記錄在springboot中使用rabbitmq的一些關注點。pom.xml中引用爲:

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

3.1.簡單示例

一個簡單的示例,僅限於文本消息的發佈和接收。

3.1.1.生產者

ProducerController.java
@RestController
public class ProducerController {
    private static final String HEADER_KEY_UID="uid";
    @Autowired
    private ProducerService producerService;

    @PostMapping("/sendText")
    public void sendText(@RequestParam("uid")String uid,@RequestParam("msg")String msg){
        MessageProperties messageProperties=new MessageProperties();
        messageProperties.setHeader(HEADER_KEY_UID,uid);
        producerService.sendText(msg,messageProperties);
    }
}
ProducerService.java
@Service
public class ProducerService {
    private static final String EXCHANGE_NAME="direct.exchange.a";
    private static final String ROUTING_KEY_NAME="direct.routingKey.a";
    @Resource
    private RabbitTemplate rabbitTemplate;


    /**
     * 發送 消息文本
     * @param data 文本消息
     * @param messageProperties 消息屬性
     */
    public void sendText(String data, MessageProperties messageProperties) {
        Message message = rabbitTemplate.getMessageConverter().toMessage(data, messageProperties);
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY_NAME, message);
    }
}

消息發送的經常使用方法:

  • rabbitTemplate.send(message); //發消息,參數類型爲org.springframework.amqp.core.Message
  • rabbitTemplate.convertAndSend(object); //轉換併發送消息。 將參數對象轉換爲org.springframework.amqp.core.Message後發送
  • rabbitTemplate.convertSendAndReceive(message) //轉換併發送消息,且等待消息者返回響應消息。

3.1.2.消費者

MessageListener.java
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MessageListener {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "direct.queue.d",
                    durable = "true"),
            exchange = @Exchange(value = "direct.exchange.a",
                    durable = "true",
                    type = ExchangeTypes.DIRECT,
                    ignoreDeclarationExceptions = "true"),
            key = "direct.routingKey.a"
    )
    )
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {
        MessageConverter messageConverter = rabbitTemplate.getMessageConverter();
        String msg = (String) messageConverter.fromMessage(message);
        log.info("消費端 Body: " + msg);
    }
}
  • @RabbitListener 能夠標註在類上面,需配合 @RabbitHandler 註解一塊兒使用
  • @RabbitListener 標註在類上面表示當有收到消息的時候,就交給 @RabbitHandler 的方法處理,具體使用哪一個方法處理,根據 MessageConverter 轉換後的參數類型

3.2.消息序列化

rabbitmq中消息的序列化依賴於MessageConvert,這是一個接口,用於消息內容的序列化。

  • Message分爲body和MessageProperties兩部分。RabbitMQ的序列化是指Message的 body 屬性,即咱們真正須要傳輸的內容,RabbitMQ 抽象出一個MessageConvert 接口處理消息的序列化,其實現有SimpleMessageConverter(默認)、Jackson2JsonMessageConverter等。
  • 當調用了 convertAndSend方法時,方法內部會使用MessageConvert進行消息的序列化。
  • MessageConvert是在RabbitTemplate中定義的屬性,若是項目中須要使用多種MessageConvert。由於Spring中RabbitTemplate是單例模式注入,建議每種MessageConvert單獨定義一種RabbitTemplate。

3.2.1.生產者

RabbitConfig.java
public class RabbitConfig {
    
    @Bean("jsonRabbitTemplate")
    public RabbitTemplate jsonRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    @Bean("defaultRabbitTemplate")
    public RabbitTemplate defaultRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }
}
ProducerService.java
@Service
public class ProducerService {
    private static final String EXCHANGE_NAME="direct.exchange.a";
    private static final String ROUTING_KEY_NAME="direct.routingKey.a";

    @Resource(name = "defaultRabbitTemplate")
    private RabbitTemplate defaultRabbitTemplate;
    @Resource(name = "jsonRabbitTemplate")
    private RabbitTemplate jsonRabbitTemplate;

    /**
     * 發送 消息對象 json
     *
     * @param data
     * @param messageProperties
     */
    public void sendObject(Object data, MessageProperties messageProperties) {
        Message message = jsonRabbitTemplate.getMessageConverter().toMessage(data, messageProperties);
        jsonRabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY_NAME, message);
    }

    /**
     * 發送 消息文本
     *
     * @param data
     * @param messageProperties
     */
    public void sendText(String data, MessageProperties messageProperties) {
        Message message = defaultRabbitTemplate.getMessageConverter().toMessage(data, messageProperties);
        defaultRabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY_NAME, message);
    }
}

3.2.2.消費者

MessageListener.java
@Component
@Slf4j
public class MessageListener {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private ObjectMapper objectMapper;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "direct.queue.d",
                    durable = "false"),
            exchange = @Exchange(value = "direct.exchange.a",
                    durable = "true",
                    type = ExchangeTypes.DIRECT,
                    ignoreDeclarationExceptions = "true"),
            key = "direct.routingKey.a"
    )
    )
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {
        String contentType = message.getMessageProperties().getContentType();
        String bodyText = null;
        System.out.println(contentType);
        switch (contentType) {
            //字符串
            case MessageProperties.CONTENT_TYPE_TEXT_PLAIN:
                bodyText = (String) rabbitTemplate.getMessageConverter().fromMessage(message);
                break;
            //json對象
            case MessageProperties.CONTENT_TYPE_JSON:
                User user = objectMapper.readValue(message.getBody(), User.class);
                bodyText = user.toString();
                break;
        }
        log.info("消費端Payload: " + bodyText);
    }  
}

生產者發送對象消息時,咱們使用Jackson2JsonMessageConverter,並用其toMessage方法封裝。可是在消費者接收對象消息時,咱們卻沒有用Jackson2JsonMessageConverter的fromMessage方法,而是使用ObjectMapper來反序列化Json對象。是由於rabbitmq在發送Jackson2JsonMessageConverter的序列化對象時,會在包含類的包名信息,消費者在使用fromMessage反序列化時,必須建立一個和生產者中包名等如出一轍的類。明顯不太現實。

3.3.發佈確認(生產者)

3.3.1.ConfirmCallback

ConfirmCallback接口用於實現消息發送到RabbitMQ交換器後接收ack回調。

  • 投遞對象:exchange
  • 回調觸發:不管成功或失敗,都會觸發回調。
  • 投遞成功:ack=true
  • 投遞失敗:ack=false

使用方式在於:

  • 設置 publisher-confirm-type 爲 correlated。
  • 實現RabbitTemplate.ReturnCallback 的函數式接口,並使用。
ProducerService.java
@Slf4j
@Service
public class ProducerService {
    private static final String EXCHANGE_NAME = "direct.exchange.a";
    private static final String ROUTING_KEY_NAME = "direct.routingKey.ab";

    @Resource(name = "defaultRabbitTemplate")
    private RabbitTemplate defaultRabbitTemplate;

    /**
     * ConfirmCallback
     *
     * 投遞對象:exchange
     * 回調觸發:不管成功或失敗,都會觸發回調。
     * 投遞成功:ack=true
     * 投遞失敗:ack=false
     */
    RabbitTemplate.ConfirmCallback confirmCallback = (CorrelationData correlationData, boolean ack, String cause) -> {
        log.info("ack: " + ack);
        if (!ack) {
            log.info("投遞exchange失敗!....能夠進行日誌記錄、異常處理、補償處理等");
        } else {
            log.info("投遞exchange成功!");
        }
    };

 
    /**
     * 發送 消息文本
     *
     * @param data
     * @param messageProperties
     */
    public void sendText(String data, MessageProperties messageProperties) {
        Message message = defaultRabbitTemplate.getMessageConverter().toMessage(data, messageProperties);
        
        //confirmCallback
        defaultRabbitTemplate.setConfirmCallback(confirmCallback);

        defaultRabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY_NAME, message);
    }
}

配置文件須要設置:

spring.rabbitmq.publisher-confirm-type = correlated

3.3.2.ReturnCallback

ReturnCallback接口用於實現消息發送到RabbitMQ交換器,但無相應隊列與交換器綁定時的回調。

  • 投遞對象:queue
  • 回調觸發:只有投遞失敗,纔會觸發回調。

使用方式在於:

  • 設置 publisher-returns 爲 true。
  • 設置 mandatory 爲 true。
  • 實現RabbitTemplate.ReturnCallback的函數式接口,並使用。
ProducerService.java
@Slf4j
@Service
public class ProducerService {
    private static final String EXCHANGE_NAME = "direct.exchange.a";
    private static final String ROUTING_KEY_NAME = "direct.routingKey.ab";

    @Resource(name = "defaultRabbitTemplate")
    private RabbitTemplate defaultRabbitTemplate;

    /**
     * ReturnCallback
     *
     * 投遞對象:queue
     * 回調觸發:只有投遞失敗,纔會觸發回調。
     */
    RabbitTemplate.ReturnCallback returnCallback = (Message message, int replyCode, String replyText,
                                                    String exchange, String routingKey) -> {
        log.info("投遞到queue失敗! exchange: " + exchange + ", routingKey: "
                + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
    };

    /**
     * 發送 消息文本
     *
     * @param data
     * @param messageProperties
     */
    public void sendText(String data, MessageProperties messageProperties) {
        Message message = defaultRabbitTemplate.getMessageConverter().toMessage(data, messageProperties);
        //returnCallback
        defaultRabbitTemplate.setMandatory(true);
        defaultRabbitTemplate.setReturnCallback(returnCallback);

        defaultRabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY_NAME, message);
    }
}

須要在配置文件中配置:

spring.rabbitmq.publisher-returns = true

3.4.接收確認(消費者)

上一節講解的是,如何在生產者發佈消息時,確認消息發佈到rabbitmq的交換機和隊列中。那麼這一節講解的是,如何保障消費者能徹底「消費」了消息。

一般狀況下,rabbitmq做爲消息中間件,它把message推送給消費者就完成了它的使命,該message就自動被「簽收」了。而消費者在接收到message後,再去實現關於該message的業務邏輯。可若是在實現該業務邏輯過程當中發生了錯誤,須要從新執行,那就難辦了。由於message一旦被「簽收」後,就從rabbitmq中被刪除,不可能從新再發送。

若是消費者能手動控制message的「簽收」操做,只有當關於message的業務邏輯執行完成後再「簽收」,message再從rabbitmq中刪除,不然可讓message重發就行了。這一節就講這個。

3.4.1.AcknowledgeMode

Acknowledge意思是「確認」,消息經過 ACK 確認是否被正確接收,每一個 Message 都要被確認(acknowledged),能夠手動去 ACK 或自動 ACK。

使用手動應答消息,有一點須要特別注意,那就是不能忘記應答消息,由於對於RabbitMQ來講處理消息沒有超時,只要不該答消息,他就會認爲仍在正常處理消息,致使消息隊列出現阻塞,影響業務執行。若是不想處理,能夠reject丟棄該消息。

消息確認模式有:

  • AcknowledgeMode.NONE:自動確認
  • AcknowledgeMode.AUTO:根據狀況確認
  • AcknowledgeMode.MANUAL:手動確認

默認是自動確認,能夠經過RabbitListenerContainerFactory 中進行開啓手動ack,或者中配置文件中開啓:

spring.rabbitmq.listener.simple.acknowledge-mode = manual
MessageListener.java
@Component
@Slf4j
public class MessageListener {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private ObjectMapper objectMapper;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "direct.queue.d",
                    durable = "false"),
            exchange = @Exchange(value = "direct.exchange.a",
                    durable = "true",
                    type = ExchangeTypes.DIRECT,
                    ignoreDeclarationExceptions = "true"),
            key = "direct.routingKey.a"
    )
    )
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {
        String contentType = message.getMessageProperties().getContentType();
        String bodyText = null;
        System.out.println(contentType);
        switch (contentType) {
            //字符串
            case MessageProperties.CONTENT_TYPE_TEXT_PLAIN:
                bodyText = (String) rabbitTemplate.getMessageConverter().fromMessage(message);
                break;
            //json對象
            case MessageProperties.CONTENT_TYPE_JSON:
                User user = objectMapper.readValue(message.getBody(), User.class);
                bodyText = user.toString();
                break;
        }
        log.info("消費端Payload: " + bodyText);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

}

3.4.2.Ack/Nack/Reject

設置爲手動確認後,有3種確認操做:

  • Ack:確認收到消息,而後消息從隊列中刪除。
  • Nack:確認沒有收到消息,消息從新回到隊列中發送。
  • Reject:拒絕該消息,直接丟棄該消息,不會回到隊列中。

如示例代碼中的 basicAck 方法,須要注意的是,要傳遞兩個參數:

  • deliveryTag(惟一標識 ID):當一個消費者向 RabbitMQ 註冊後,會創建起一個 Channel ,RabbitMQ 會用 basic.deliver 方法向消費者推送消息,這個方法攜帶了一個 delivery tag, 它表明了 RabbitMQ 向該 Channel 投遞的這條消息的惟一標識 ID,是一個單調遞增的正整數,delivery tag 的範圍僅限於 Channel
  • multiple:爲了減小網絡流量,手動確承認以被批處理,當該參數爲 true 時,則能夠一次性確認 delivery_tag 小於等於傳入值的全部消息

3.4.3.異常重試

除了上述手動確認的方式,還有一種不太經常使用的方式,能夠實現重複發送消息。在開啓異常重試的前提下,在消費者代碼中拋出異常,會自動重發消息。

application.properties

spring.rabbitmq.listener.simple.retry.enabled=true 是否開啓消費者重試
spring.rabbitmq.listener.simple.retry.max-attempts=5  最大重試次數
spring.rabbitmq.listener.simple.retry.initial-interval=5000 重試間隔時間(單位毫秒)
spring.rabbitmq.listener.simple.default-requeue-rejected=false 重試次數超過上面的設置以後是否丟棄
MessageListener.java
@RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "direct.queue.d",
                    durable = "false"),
            exchange = @Exchange(value = "direct.exchange.a",
                    durable = "true",
                    type = ExchangeTypes.DIRECT,
                    ignoreDeclarationExceptions = "true"),
            key = "direct.routingKey.a"
    )
    )
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {
        String contentType = message.getMessageProperties().getContentType();
        String bodyText = null;
        System.out.println(contentType);
        switch (contentType) {
            //字符串
            case MessageProperties.CONTENT_TYPE_TEXT_PLAIN:
                bodyText = (String) rabbitTemplate.getMessageConverter().fromMessage(message);
                break;
            //json對象
            case MessageProperties.CONTENT_TYPE_JSON:
                User user = objectMapper.readValue(message.getBody(), User.class);
                bodyText = user.toString();
                break;
        }
        log.info("消費端Payload: " + bodyText);
        throw new RuntimeException("重試啦");
    }

3.5.消費模式

在RabbitMQ中消費者有2種方式獲取隊列中的消息:

  • push:basic.consume命令訂閱某一個隊列中的消息,channel會自動在處理完上一條消息以後,接收下一條消息。(同一個channel消息處理是串行的)。除非關閉channel或者取消訂閱,不然客戶端將會一直接收隊列的消息。
  • pull:basic.get命令主動獲取隊列中的消息,可是絕對不能夠經過循環調用basic.get來代替basic.consume,這是由於basic.get RabbitMQ在實際執行的時候,是首先consume某一個隊列,而後檢索第一條消息,而後再取消訂閱。若是是高吞吐率的消費者,最好仍是建議使用basic.consume。

對比來講,若是有持續消費的需求,建議用push的方式,經過監聽器來訂閱。若是隻是特定時刻須要從隊列中,一次性取些數據,能夠用pull方式。

4.名詞概念

4.1.channel

咱們知道不管是生產者仍是消費者,都須要和 RabbitMQ Broker 創建鏈接,這個鏈接就是一條 TCP 鏈接,也就是 Connection。一旦 TCP 鏈接創建起來,客戶端緊接着能夠建立一個 AMQP 信道(Channel),每一個信道都會被指派一個惟一的 ID。

信道是創建在 Connection 之上的虛擬鏈接,RabbitMQ 處理的每條 AMQP 指令都是經過信道完成的。

咱們徹底可使用 Connection 就能完成信道的工做,爲何還要引入信道呢?試想這樣一個場景,一個應用程序中有不少個線程須要從 RabbitMQ 中消費消息,或者生產消息,那麼必然須要創建不少個 Connection,也就是多個 TCP 鏈接。然而對於操做系統而言,創建和銷燬 TCP 鏈接是很是昂貴的開銷,若是遇到使用高峯,性能瓶頸也隨之顯現。

RabbitMQ 採用相似 NIO(Non-blocking I/O)的作法,選擇 TCP 鏈接複用,不只能夠減小性能開銷,同時也便於管理。

每一個線程把持一個信道,因此信道複用了 Connection 的 TCP 鏈接。同時 RabbitMQ 能夠確保每一個線程的私密性,就像擁有獨立的鏈接同樣。當每一個信道的流量不是很大時,複用單一的 Connection 能夠在產生性能瓶頸的狀況下有效地節省 TCP 鏈接資源。可是信道自己的流量很大時,這時候多個信道複用一個 Connection 就會產生性能瓶頸,進而使總體的流量被限制了。此時就須要開闢多個 Connection,將這些信道均攤到這些 Connection 中,至於這些相關的調優策略須要根據業務自身的實際狀況進行調節。

信道在 AMQP 中是一個很重要的概念,大多數操做都是在信道這個層面展開的。好比 channel.exchangeDeclare、channel.queueDeclare、channel.basicPublish、channel.basicConsume 等方法。RabbitMQ 相關的 API 與 AMQP 緊密相連,好比 channel.basicPublish 對應 AMQP 的 Basic.Publish 命令。

4.2.QoS

針對push方式,RabbitMQ能夠設置basicQoS(Consumer Prefetch)來對consumer進行流控,從而限制未Ack的消息數量。

前提包括,消息確認模式必須是手動確認。

basicQos(int var1, boolean var2)
  • 第一個參數是限制未Ack消息的最大數量。
  • 第二個參數是布爾值,(1)爲true時,說明是針對channel作的流控限制;(2)爲false時,說明是針對整個消費者作的流控限制。
相關文章
相關標籤/搜索