本文主要學習的目標有兩個:java
雖然不是全部的系統都要求像銀行同樣對消息可靠投遞有很是嚴格的要求,但確保消息被接收和投遞是很是重要的。RabbitMQ基於AMQP規範,後者提供消息發佈中的事務以及消息持久化選項,以提供比自身普通消息發佈更高級的可靠消息通訊機制。git
在RabbitMQ中,建立可靠性投遞的每一個機制都會對性能產生必定的影響。單獨使用時可能不太會注意到吞吐量的差別,可是當它們組合使用時,吞吐量就會由明顯不一樣,只有經過執行本身的性能基準測試,才能肯定性能與可靠性投遞之間能夠接受的平衡。github
下面從左到右依次說明這些機制會產生哪些性能影響。spring
另外,會使用Spring提供的RabbitTemplate客戶端工具(使用過RabbitTemplate,後續可能不會介紹RabbitTemplate),對每種機制進行配置,併發送消息到RabbitMQ。bash
代碼在Github:https://github.com/XuePeng87/rabbitmq-example服務器
在完美世界裏,無須任何額外的配置或操做,RabbitMQ就能夠可靠的投遞消息。併發
不幸的是,當墨菲定律肆虐咱們的程序時,完美世界並不存在。dom
在非核心應用中,發佈的消息沒必要處理每一個可能的故障點,例如發一些容許丟棄的消息,那麼咱們能夠不使用任何保障機制,直接使用Basic.Publish發送消息。異步
使用RabbitTemplate時,能夠在配置文件中設置:函數
spring: #消息隊列配置 rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: / publisher-returns: false publisher-confirms: false connection-timeout: 5000ms
將publisher-returns和publisher-confirms設置爲false。
設置mandatory後,RabbitMQ將不接受不可路由的消息。
mandatory標誌是一個與Basic.Publish命令一塊兒傳遞的參數,該參數會告訴RabbitMQ,若是消息不可路由,它應該經過Basic.Return命令將消息返回給發佈者。設置mandatory標誌能夠被認爲是開啓故障檢測模式,它只會讓RabbitMQ向你通知失敗,而不會通知成功。若是消息路由正確,你的發佈者將不會收到通知。
/** * 定製AmqpTemplate對象。 * 可根據須要定製多個。 * * @return AmqpTemplate對象。 */ @Bean public AmqpTemplate amqpTemplate() { rabbitTemplate.setEncoding("UTF-8"); // 設置不接受不可路由的消息,須要在yml中配置:publisher-returns: true rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { String correlationId = message.getMessageProperties().getCorrelationId(); log.warn("ReturnCallback -> 消息 {} 發送失敗,應答碼:{},緣由:{},交換器: {},路由鍵:{}", correlationId, replyCode, replyText, exchange, routingKey); }); return rabbitTemplate; }
如上面的配置,咱們設置了mandatory等於true,同時將配置文件中的publisher-returns也設置爲true,這樣就打開了失敗通知。下面作個測試:
/** * 發送direct消息。 * 交換器存在,但隊列不存在,爲了測試Mandatory與ReturnCallback。 * * @param message 消息內容。 */ public void directNotExistQueue(String message) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "DIRECT_ROUTING_KEY_NOT_EXIST", message, correlationData); }
咱們建立了交換器DIRECT_EXCHANGE,可是使用一個不存在的RoutingKey,這就等於發送消息到交換器成功,可是沒法路由到某一個隊列,執行測試用例,觀察結果:
/** * 發送direct消息,但消息路由不存在。 * 交換器存在,但隊列不存在,爲了測試Mandatory與ReturnCallback。 */ @Test public void testDirectNotExistQueue() { messageProducer.directNotExistQueue("{}"); }
結果以下:
ReturnCallback -> 消息 null 發送失敗,應答碼:312,緣由:NO_ROUTE,交換器: DIRECT_EXCHANGE,路由鍵:DIRECT_ROUTING_KEY_NOT_EXIST
Basic.Return調用是一個RabbitMQ的異步調用,而且在消息發佈後的任什麼時候候均可能發生。
若是代碼中沒有設置setReturnCallback,那麼該調用將被忽略。
其實setReturnCallback就是處理Basic.Return的回調方法,RabbitTemplate接收到Basic.Return命令後,調用該方法。
發佈者確認模式是AMQP規範的擴展功能,只能用在支持這個特定擴展的客戶端,RabbitTemplate支持這個模式。
在協議層,發佈任何消息以前,消息發佈者必須向RabbitMQ發送Confirm.Select請求,並等待Confirm.SelectOk響應以獲知投遞確認已經被啓動。在這一點上,對於發佈者發送給RabbitMQ的每條消息,服務器會發送一個確認響應(Basic.Ack)或否認確認響應(Basic.Nack)。
在RabbitTemplate中,要使用發佈者確認,須要在配置文件中配置:
publisher-confirms: true
而後在設置回調函數:
/** * 定製AmqpTemplate對象。 * 可根據須要定製多個。 * * @return AmqpTemplate對象。 */ @Bean public AmqpTemplate amqpTemplate() { // 設置消息轉換器爲Jackson rabbitTemplate.setEncoding("UTF-8"); // 設置不接受不可路由的消息,須要在yml中配置:publisher-returns: true rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { String correlationId = message.getMessageProperties().getCorrelationId(); log.warn("ReturnCallback -> 消息 {} 發送失敗,應答碼:{},緣由:{},交換器: {},路由鍵:{}", correlationId, replyCode, replyText, exchange, routingKey); }); // 設置消息發佈確認功能,須要在yml中配置:publisher-confirms: true rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { log.info("ConfirmCallback -> 消息發佈到交換器成功,id:{}", correlationData); } else { log.warn("ConfirmCallback -> 消息發佈到交換器失敗,錯誤緣由爲:{}", cause); } }); // 開啓事務模式,須要在yml中配置:publisher-confirms: false // rabbitTemplate.setChannelTransacted(true); return rabbitTemplate; }
調用setConfirmCallback方法,設置回調函數,每次發送消息到RabbitMQ,服務器都會返回響應,能夠經過判斷ack來肯定是否發送成功。
當成功發送到交換器後,ConfirmCallback會接收到ack爲true的響應,若是沒有成功發送到交換器,則會接收到ack爲false的響應。
具體測試代碼以下:
/** * 發送direct消息。 * 交換器不存在,隊列也不存在,爲了測試ConfirmCallback。 * * @param message 消息內容。 */ public void directNotExistExchangeAndQueue(String message) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("DIRECT_EXCHANGE_NOT_EXIST", "DIRECT_ROUTING_KEY_NOT_EXIST", message, correlationData); }
首先向不存在的交換器發送消息,結果爲:
/** * 發送direct消息,交換器和路由都不存在。 * 交換器不存在,隊列也不存在,爲了測試ConfirmCallback。 */ @Test public void testDirectNotExistExchangeAndQueue() { messageProducer.directNotExistExchangeAndQueue("{}"); }
ConfirmCallback -> 消息發佈到交換器失敗,錯誤緣由爲:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXCHANGE_NOT_EXIST' in vhost '/', class-id=60, method-id=40)
而後在使用失敗通知模式的測試用例測試一下,即能發送到交換器,可是沒法路由到隊列:
ReturnCallback -> 消息 null 發送失敗,應答碼:312,緣由:NO_ROUTE,交換器: DIRECT_EXCHANGE,路由鍵:DIRECT_ROUTING_KEY_NOT_EXIST ConfirmCallback -> 消息發佈到交換器成功,id:CorrelationData [id=9282dbe9-4fe9-4b85-af06-79305f4c99e1]
不管是否使用發佈者確認模式,若是你發佈消息到不存在的交換器,那麼發佈用的信道將會被RabbitMQ關閉。
發佈者確認模式不能與事務模式一塊兒工做,此外,做爲對Basic.Publish請求的異步響應,它並不能保證什麼時候會收到確認。
備用交換器是RabbitMQ對AMQP的另外一種擴展,用於處理沒法路由的消息。備用交換器在第一次聲明交換器時被指定,用來提供一種預先存在的交換器,即若是交換器沒法路由消息,那麼消息就會被路由到這個新的備用交換器。
若是將消息發送到具備備用交換器的交換器(設置了mandatory=true)上, 那麼一旦預期的交換器沒法正常路由消息,Basic.Return就不會發給發佈者。由於消息成功的發佈到了備用交換器。
RabbitTemplate聲明備用交換器的代碼以下:
/** * 聲明Direct交換器。 * 同時指定備用交換器。 * * @return Exchange對象。 */ @Bean("directExchange") public Exchange directExchange() { return ExchangeBuilder.directExchange("DIRECT_EXCHANGE") .durable(false) .withArgument("alternate-exchange", "UN_ROUTE_EXCHANGE") .build(); }
在聲明交換器時,調用withArgument函數,key爲alternate-exchange,value爲備用交換器的名稱,這裏是UN_ROUTE_EXCHANGE(備用服務器也須要建立)。
下面進行測試,發送一個沒法路由的消息到DIRECT_EXCHANGE,這個消息將不能被路由,但不會回調ReturnCallback,而是會進入到UN_ROUTE_EXCHANGE交換器中:
AMQP事務提供了一種機制,經過這種機制,消息能夠批量發佈到RabbitMQ,而後提交到隊列或者回滾。
在RabbitTemplate中,使用事務就不能使用ReturnConfime模式,因此要把publisher-confimes設置爲false,具體代碼以下:
/** * 定製AmqpTemplate對象。 * 可根據須要定製多個。 * * @return AmqpTemplate對象。 */ @Bean public AmqpTemplate amqpTemplate() { // 設置消息轉換器爲Jackson rabbitTemplate.setEncoding("UTF-8"); // 設置不接受不可路由的消息,須要在yml中配置:publisher-returns: true rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { String correlationId = message.getMessageProperties().getCorrelationId(); log.warn("ReturnCallback -> 消息 {} 發送失敗,應答碼:{},緣由:{},交換器: {},路由鍵:{}", correlationId, replyCode, replyText, exchange, routingKey); }); // 開啓事務模式,須要在yml中配置:publisher-confirms: false rabbitTemplate.setChannelTransacted(true); return rabbitTemplate; }
代碼中,要設置setChannelTransacted爲true,而後聲明RabbitMQ的事務管理器:
/** * 聲明RabbitMQ事務管理器。 * * @param connectionFactory 鏈接工廠。 * @return PlatformTransactionManager對象。 */ @Bean public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) { return new RabbitTransactionManager(connectionFactory); }
到這裏,事務的配置準備工做就作好了,接下來,基於事務模式發送消息:
/** * 在事務模式下,發送direct消息。 * <p> * 第一次發送,消息能夠正常路由到隊列。 * 第二次發送,消息不能路由到隊列。 */ @Transactional(rollbackFor = Exception.class) public void directOnTransaction(String message) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("DIRECT_TRANSACTION_EXCHANGE", "DIRECT_TRANSACTION_ROUTING_KEY", message, correlationData); rabbitTemplate.convertAndSend("DIRECT_TRANSACTION_EXCHANGE_NOT_EXIST", "DIRECT_TRANSACTION_ROUTING_KEY_NOT_EXIST", message, correlationData); }
代碼中,加入了@Transactional修飾方法,前後發送兩條消息到交換器,第一次發送的消息會正常路由到隊列,第二次發送的消息則不會發送到隊列,下面是測試代碼和結果:
/** * 在事務模式下,發送direct消息。 * 第一次發送,消息能夠正常路由到隊列。 * 第二次發送,消息不能路由到隊列。 */ @Test public void testDirectOnTransaction() { messageProducer.directOnTransaction("{}"); }
org.springframework.amqp.AmqpException: failed to commit RabbitMQ transaction
因爲發生了異常,執行了回滾,因此第一條消息也沒有被髮送到隊列:
若是兩條數據都會成功發送到RabbitMQ,則會成功提交兩條消息。
若是不用@Transactional修飾方法,那麼會有一條消息進入RabbitMQ,另外一條消息丟失,具體測試以下,首先是兩條消息都能發送到RabbitMQ:
/** * 在事務模式下,發送direct消息。 * <p> * 第一次發送,消息能夠正常路由到隊列。 * 第二次發送,消息不能路由到隊列。 */ @Transactional(rollbackFor = Exception.class) public void directOnTransaction(String message) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("DIRECT_TRANSACTION_EXCHANGE", "DIRECT_TRANSACTION_ROUTING_KEY", message, correlationData); rabbitTemplate.convertAndSend("DIRECT_TRANSACTION_EXCHANGE", "DIRECT_TRANSACTION_ROUTING_KEY", message, correlationData); }
下面把@Transactional修飾去掉,而後一條能夠發送到RabbitMQ,另外一條不能夠:
/** * 在事務模式下,發送direct消息。 * <p> * 第一次發送,消息能夠正常路由到隊列。 * 第二次發送,消息不能路由到隊列。 */ // @Transactional(rollbackFor = Exception.class) public void directOnTransaction(String message) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("DIRECT_TRANSACTION_EXCHANGE", "DIRECT_TRANSACTION_ROUTING_KEY", message, correlationData); rabbitTemplate.convertAndSend("DIRECT_TRANSACTION_EXCHANGE_NOT_EXIST", "DIRECT_TRANSACTION_ROUTING_KEY_NOT_EXIST", message, correlationData); }
執行後結果以下:
org.springframework.amqp.AmqpIOException: java.io.IOException
能夠看到程序依舊拋出了異常,但第一條消息發送到了RabbitMQ中:
在協議層,當RabbitMQ因爲錯誤而沒法路由時,它將發送一個Basic.Return響應,但願終止事務的發佈者應該發送TX.Rollback請求,並等待TX.RollbackOk響應,而後繼續工做。
RabbitMQ只在每一個發出的命令做用於單個隊列時才執行原子事務。若是不僅一個隊列受到事務中任何命令的影響,則提交就不具有原子性。
推薦使用發佈確認模式用做輕量級替代方案,由於它的速度快,能夠同時提供確定或否認的確認。
高可用隊列(HA隊列)時RabbitMQ的一項加強功能,它容許隊列在多個服務器上擁有冗餘副本。
當消息發送到高可用隊列是,消息會發送到集羣中的每臺服務器,一旦消息在集羣中的任何節點都完成消費,那麼消息全部副本將當即從其餘節點刪除。
HA隊列中有一個節點是主節點,其餘全部節點都是輔助節點。當主節點發生故障,會在輔助節點中選擇一個接管主節點的角色。若是HA節點中的一個輔助節點故障了,其餘節點將照常工做。
當一個故障節點恢復了,或者新添加進來一個輔助節點,它將不包含任何已經存在於現有節點中的消息,當現有節點的消息被消費後,故障節點或新節點則開始接收消息,並執行同步操做。
若是使用事務或消息確認機制,則消息須要在HA隊列中全部活動節點肯定後,RabbitMQ纔會發送成功響應。
高可用隊列的配置在後面會單獨寫一篇。
若是將一個消息的delivery-mode設置爲1,RabbitMQ會被告知不須要將消息存儲到磁盤,而消息會一直保存在內存中。
爲了使消息在RabbitMQ重啓後仍然存在,除了將delivery-mode設置爲2,還須要在建立隊列時設置durable,使隊列變爲持久化隊列。
在發佈消息時,RabbitTemplate默認採用持久化策略,若是但願持久化存儲消息,須要在發送消息時作以下設置:
/** * 發送direct非持久化消息。 * RabbitTemplate默認採用消息持久化存儲。 * * @param message 消息內容。 */ public void directNonPersistent(String message) { rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "DIRECT_ROUTING_KEY", message, msg -> { msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); msg.getMessageProperties().setCorrelationId(UUID.randomUUID().toString()); return msg; } ); }
setDeliveryMode爲非持久化模式後,發送的消息將只保存在RabbitMQ的內存中。
在I/O密集型服務器中,經過操做系統在存儲設備之間傳輸數據時,操做系統將阻塞I/O操做的進程。當RabbitMQ服務器正在嘗試執行I/O操做,並等待存儲設備響應時,操做系統內核發生阻塞,那麼RabbitMQ能作的就只有等待。
儘管消息持久化時保障消息最終被投遞的最重要的方式之一,但實現它的代價也時最大的。