RabbitMQ 是一個由erlang語言編寫的、開源的、在AMQP基礎上完整的、可複用的企業消息系統。支持多種語言,包括java、Python、ruby、PHP、C/C++等。java
AMQP:advanced message queuing protocol ,一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。基於此協議的客戶端與消息中間件可傳遞消息並不受客戶端/中間件不一樣產品、不一樣開發語言等條件的限制。算法
AMQP模型圖
spring
發佈者(Publisher)發佈消息(Message),經由交換機(Exchange)。json
交換機根據路由規則將收到的消息分發給與該交換機綁定的隊列(Queue)。緩存
最後 AMQP 代理會將消息投遞給訂閱了此隊列的消費者,或者消費者按照需求自行獲取。安全
一、發佈者、交換機、隊列、消費者均可以有多個。同時由於 AMQP 是一個網絡協議,因此這個過程當中的發佈者,消費者,消息代理 能夠分別存在於不一樣的設備上。springboot
二、發佈者發佈消息時能夠給消息指定各類消息屬性(Message Meta-data)。有些屬性有可能會被消息代理(Brokers)使用,然而其餘的屬性則是徹底不透明的,它們只能被接收消息的應用所使用。ruby
三、從安全角度考慮,網絡是不可靠的,又或是消費者在處理消息的過程當中意外掛掉,這樣沒有處理成功的消息就會丟失。基於此緣由,AMQP 模塊包含了一個消息確認(Message Acknowledgements)機制:當一個消息從隊列中投遞給消費者後,不會當即從隊列中刪除,直到它收到來自消費者的確認回執(Acknowledgement)後,才徹底從隊列中刪除。服務器
四、在某些狀況下,例如當一個消息沒法被成功路由時(沒法從交換機分發到隊列),消息或許會被返回給發佈者並被丟棄。或者,若是消息代理執行了延期操做,消息會被放入一個所謂的死信隊列中。此時,消息發佈者能夠選擇某些參數來處理這些特殊狀況。網絡
交換機是用來發送消息的 AMQP 實體。交換機拿到一個消息以後將它路由給一個或零個隊列。它使用哪一種路由算法是由交換機類型和綁定(Bindings)規則所決定的。常見的交換機有以下幾種:
AMQP 中的隊列(queue)跟其餘消息隊列或任務隊列中的隊列是很類似的:它們存儲着即將被應用消費掉的消息。隊列跟交換機共享某些屬性,可是隊列也有一些另外的屬性。
rabbitmq遵循AMQP協議,用在實時的對可靠性要求比較高的消息傳遞上。kafka主要用於處理活躍的流式數據,大數據量的數據處理上。主要體如今:
RabbitMQ經常使用的Exchange Type有fanout、direct、topic、headers這四種。
direct類型的Exchange路由規則很簡單,它會把消息路由到那些binding key與routing key徹底匹配的Queue中。
前面講到direct類型的Exchange路由規則是徹底匹配binding key與routing key,但這種嚴格的匹配方式在不少狀況下不能知足實際業務需求。topic類型的Exchange與direct類型的Exchage類似,也是將消息路由到binding key與routing key相匹配的Queue中,但支持模糊匹配:
fanout類型的Exchange路由規則很是簡單,它會把全部發送到fanout Exchange的消息都會被轉發到與該Exchange 綁定(Binding)的全部Queue上。
Fanout Exchange 不須要處理RouteKey 。只須要簡單的將隊列綁定到exchange 上。這樣發送到exchange的消息都會被轉發到與該交換機綁定的全部隊列上。相似子網廣播,每臺子網內的主機都得到了一份複製的消息。因此,Fanout Exchange 轉發消息是最快的。
headers類型的Exchange也不依賴於routing key與binding key的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配。
在綁定Queue與Exchange時指定一組鍵值對;當消息發送到Exchange時,RabbitMQ會取到該消息的headers(也是一個鍵值對的形式),對比其中的鍵值對是否徹底匹配Queue與Exchange綁定時指定的鍵值對;若是徹底匹配則消息會路由到該Queue,不然不會路由到該Queue。
嚴格來講,Default Exchange 並不該該和上面四個交換機在一塊兒,由於它不屬於獨立的一種交換機類型,而是屬於Direct Exchange 直連交換機。
默認交換機(default exchange)其實是一個由消息代理預先聲明好的沒有名字(名字爲空字符串)的直連交換機(direct exchange)。
它有一個特殊的屬性使得它對於簡單應用特別有用處:那就是每一個新建隊列(queue)都會自動綁定到默認交換機上,綁定的路由鍵(routing key)名稱與隊列名稱相同。
舉個例子:當你聲明瞭一個名爲 「search-indexing-online」 的隊列,AMQP 代理會自動將其綁定到默認交換機上,綁定(binding)的路由鍵名稱也是爲 「search-indexing-online」。因此當你但願將消息投遞給「search-indexing-online」的隊列時,指定投遞信息包括:交換機名稱爲空字符串,路由鍵爲「search-indexing-online」便可。
所以 direct exchange 中的 default exchange 用法,體現出了消息隊列的 point to point,感受像是直接投遞消息給指定名字的隊列。
雖然咱們要避免系統宕機,可是這種「不可抗力」總會有可能發生。rabbitmq若是宕機了,再啓動即是了,大不了有短暫時間不可用。但若是你啓動起來後,發現這個rabbitmq服務器像是被重置了,之前的exchange,queue和message數據都沒了,那就太使人崩潰了。不光業務系統由於無對應exchange和queue受影響,丟失的不少message數據更是致命的。因此如何保證rabbitmq的持久化,在服務使用前必須得考慮到位。
持久化能夠提升RabbitMQ的可靠性,以防在異常狀況(重啓、關閉、宕機等)下的數據丟失。RabbitMQ的持久化分爲三個部分:交換器的持久化、隊列的持久化和消息的持久化。
exchange交換器的持久化是在聲明交換器的時候,將durable設置爲true。
若是交換器不設置持久化,那麼在RabbitMQ交換器服務重啓以後,相關的交換器信息會丟失,不過消息不會丟失,可是不能將消息發送到這個交換器。
spring中建立exchange時,構造方法默認設置爲持久化。
隊列的持久化在聲明隊列的時候,將durable設置爲true。
若是隊列不設置持久化,那麼RabbitMQ交換器服務重啓以後,相關的隊列信息會丟失,同時隊列中的消息也會丟失。
exchange和queue,若是一個是非持久化,另外一個是持久化,中bind時會報錯。
spring中建立exchange時,構造方法默認設置爲持久化。
要確保消息不會丟失,除了設置隊列的持久化,還須要將消息設置爲持久化。經過將消息的投遞模式(BasicProperties中的deliveryMode屬性)設置爲2便可實現消息的持久化。
若是將全部的消息都進行持久化操做,這樣會影響RabbitMQ的性能。寫入磁盤的速度比寫入內存的速度慢很,因此要在可靠性和吞吐量之間作權衡。
在spring中,BasicProperties中的deliveryMode屬性,對應的是MessageProperties中的deliveryMode。平時使用的RabbitTemplate.convertAndSend()方法默認設置爲持久化,deliveryMode=2。若是須要設置非持久化發送消息,須要手動設置:
messageProperties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
這裏講解實現消息持久化的完整方案。
1、exchange、queue、message
要保證消息的持久化,在rabbitmq自己的結構上須要實現下面這些:
2、發佈確認
前面是保證了消息在投遞到rabbitmq中,如何保證rabbit中消息的持久化。
那麼還須要保證生產者能成功發佈消息,如交換機名字寫錯了等等。能夠在發佈消息時設置投遞成功的回調,肯定消息能成功投遞到目標隊列中。
3、接收確認
對於消費者來講,若是在訂閱消息的時候,將autoAck設置爲true,那麼消費者接收到消息後,尚未處理,就出現了異常掛掉了,此時,隊列中已經將消息刪除,消費者不可以在收到消息。
這種狀況能夠將autoAck設置爲false,進行手動確認。
4、鏡像隊列集羣
在持久化後的消息存入RabbitMQ以後,還須要一段時間才能存入磁盤。RabbitMQ並不會爲每條消息都進行同步存盤,可能僅僅是保存到操做系統緩存之中而不是物理磁盤。若是在這段時間,服務器宕機或者重啓,消息還沒來得及保存到磁盤當中,從而丟失。對於這種狀況,能夠引入RabiitMQ鏡像隊列機制。
這裏強調是鏡像隊列集羣,而非普通集羣。由於出於同步效率考慮,普通集羣只會同步隊列的元數據,而不會同步隊列中的消息。只有升級成鏡像隊列集羣后,才能也同步消息。
每一個鏡像隊列由一個master和一個或多個mirrors組成。主節點位於一個一般稱爲master的節點上。每一個隊列都有本身的主節點。給定隊列的全部操做首先應用於隊列的主節點,而後傳播到鏡像。這包括隊列發佈(enqueueing publishes)、向消費者傳遞消息、跟蹤消費者的確認等等。
發佈到隊列的消息將複製到全部鏡像。無論消費者鏈接到哪一個節點,都會鏈接到master,鏡像會刪除在master上已確認的消息。所以,隊列鏡像提升了可用性,但不會在節點之間分配負載。 若是承載隊列master的節點出現故障,則最舊的鏡像將升級爲新的master,只要它已同步。根據隊列鏡像參數,也能夠升級未同步的鏡像。
java開發上,這裏以spring-boot-starter-amqp
爲例,記錄在springboot中使用rabbitmq的一些關注點。pom.xml中引用爲:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
一個簡單的示例,僅限於文本消息的發佈和接收。
ProducerController.java
@RestController public class ProducerController { private static final String HEADER_KEY_UID="uid"; @Autowired private ProducerService producerService; @PostMapping("/sendText") public void sendText(@RequestParam("uid")String uid,@RequestParam("msg")String msg){ MessageProperties messageProperties=new MessageProperties(); messageProperties.setHeader(HEADER_KEY_UID,uid); producerService.sendText(msg,messageProperties); } }
ProducerService.java
@Service public class ProducerService { private static final String EXCHANGE_NAME="direct.exchange.a"; private static final String ROUTING_KEY_NAME="direct.routingKey.a"; @Resource private RabbitTemplate rabbitTemplate; /** * 發送 消息文本 * @param data 文本消息 * @param messageProperties 消息屬性 */ public void sendText(String data, MessageProperties messageProperties) { Message message = rabbitTemplate.getMessageConverter().toMessage(data, messageProperties); rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY_NAME, message); } }
消息發送的經常使用方法:
MessageListener.java
import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component @Slf4j public class MessageListener { @Autowired private RabbitTemplate rabbitTemplate; @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "direct.queue.d", durable = "true"), exchange = @Exchange(value = "direct.exchange.a", durable = "true", type = ExchangeTypes.DIRECT, ignoreDeclarationExceptions = "true"), key = "direct.routingKey.a" ) ) @RabbitHandler public void onMessage(Message message, Channel channel) throws Exception { MessageConverter messageConverter = rabbitTemplate.getMessageConverter(); String msg = (String) messageConverter.fromMessage(message); log.info("消費端 Body: " + msg); } }
rabbitmq中消息的序列化依賴於MessageConvert,這是一個接口,用於消息內容的序列化。
RabbitConfig.java
public class RabbitConfig { @Bean("jsonRabbitTemplate") public RabbitTemplate jsonRabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); return rabbitTemplate; } @Bean("defaultRabbitTemplate") public RabbitTemplate defaultRabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory); return rabbitTemplate; } }
ProducerService.java
@Service public class ProducerService { private static final String EXCHANGE_NAME="direct.exchange.a"; private static final String ROUTING_KEY_NAME="direct.routingKey.a"; @Resource(name = "defaultRabbitTemplate") private RabbitTemplate defaultRabbitTemplate; @Resource(name = "jsonRabbitTemplate") private RabbitTemplate jsonRabbitTemplate; /** * 發送 消息對象 json * * @param data * @param messageProperties */ public void sendObject(Object data, MessageProperties messageProperties) { Message message = jsonRabbitTemplate.getMessageConverter().toMessage(data, messageProperties); jsonRabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY_NAME, message); } /** * 發送 消息文本 * * @param data * @param messageProperties */ public void sendText(String data, MessageProperties messageProperties) { Message message = defaultRabbitTemplate.getMessageConverter().toMessage(data, messageProperties); defaultRabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY_NAME, message); } }
MessageListener.java
@Component @Slf4j public class MessageListener { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private ObjectMapper objectMapper; @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "direct.queue.d", durable = "false"), exchange = @Exchange(value = "direct.exchange.a", durable = "true", type = ExchangeTypes.DIRECT, ignoreDeclarationExceptions = "true"), key = "direct.routingKey.a" ) ) @RabbitHandler public void onMessage(Message message, Channel channel) throws Exception { String contentType = message.getMessageProperties().getContentType(); String bodyText = null; System.out.println(contentType); switch (contentType) { //字符串 case MessageProperties.CONTENT_TYPE_TEXT_PLAIN: bodyText = (String) rabbitTemplate.getMessageConverter().fromMessage(message); break; //json對象 case MessageProperties.CONTENT_TYPE_JSON: User user = objectMapper.readValue(message.getBody(), User.class); bodyText = user.toString(); break; } log.info("消費端Payload: " + bodyText); } }
生產者發送對象消息時,咱們使用Jackson2JsonMessageConverter,並用其toMessage方法封裝。可是在消費者接收對象消息時,咱們卻沒有用Jackson2JsonMessageConverter的fromMessage方法,而是使用ObjectMapper來反序列化Json對象。是由於rabbitmq在發送Jackson2JsonMessageConverter的序列化對象時,會在包含類的包名信息,消費者在使用fromMessage反序列化時,必須建立一個和生產者中包名等如出一轍的類。明顯不太現實。
ConfirmCallback接口用於實現消息發送到RabbitMQ交換器後接收ack回調。
使用方式在於:
ProducerService.java
@Slf4j @Service public class ProducerService { private static final String EXCHANGE_NAME = "direct.exchange.a"; private static final String ROUTING_KEY_NAME = "direct.routingKey.ab"; @Resource(name = "defaultRabbitTemplate") private RabbitTemplate defaultRabbitTemplate; /** * ConfirmCallback * * 投遞對象:exchange * 回調觸發:不管成功或失敗,都會觸發回調。 * 投遞成功:ack=true * 投遞失敗:ack=false */ RabbitTemplate.ConfirmCallback confirmCallback = (CorrelationData correlationData, boolean ack, String cause) -> { log.info("ack: " + ack); if (!ack) { log.info("投遞exchange失敗!....能夠進行日誌記錄、異常處理、補償處理等"); } else { log.info("投遞exchange成功!"); } }; /** * 發送 消息文本 * * @param data * @param messageProperties */ public void sendText(String data, MessageProperties messageProperties) { Message message = defaultRabbitTemplate.getMessageConverter().toMessage(data, messageProperties); //confirmCallback defaultRabbitTemplate.setConfirmCallback(confirmCallback); defaultRabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY_NAME, message); } }
配置文件須要設置:
spring.rabbitmq.publisher-confirm-type = correlated
ReturnCallback接口用於實現消息發送到RabbitMQ交換器,但無相應隊列與交換器綁定時的回調。
使用方式在於:
ProducerService.java
@Slf4j @Service public class ProducerService { private static final String EXCHANGE_NAME = "direct.exchange.a"; private static final String ROUTING_KEY_NAME = "direct.routingKey.ab"; @Resource(name = "defaultRabbitTemplate") private RabbitTemplate defaultRabbitTemplate; /** * ReturnCallback * * 投遞對象:queue * 回調觸發:只有投遞失敗,纔會觸發回調。 */ RabbitTemplate.ReturnCallback returnCallback = (Message message, int replyCode, String replyText, String exchange, String routingKey) -> { log.info("投遞到queue失敗! exchange: " + exchange + ", routingKey: " + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText); }; /** * 發送 消息文本 * * @param data * @param messageProperties */ public void sendText(String data, MessageProperties messageProperties) { Message message = defaultRabbitTemplate.getMessageConverter().toMessage(data, messageProperties); //returnCallback defaultRabbitTemplate.setMandatory(true); defaultRabbitTemplate.setReturnCallback(returnCallback); defaultRabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY_NAME, message); } }
須要在配置文件中配置:
spring.rabbitmq.publisher-returns = true
上一節講解的是,如何在生產者發佈消息時,確認消息發佈到rabbitmq的交換機和隊列中。那麼這一節講解的是,如何保障消費者能徹底「消費」了消息。
一般狀況下,rabbitmq做爲消息中間件,它把message推送給消費者就完成了它的使命,該message就自動被「簽收」了。而消費者在接收到message後,再去實現關於該message的業務邏輯。可若是在實現該業務邏輯過程當中發生了錯誤,須要從新執行,那就難辦了。由於message一旦被「簽收」後,就從rabbitmq中被刪除,不可能從新再發送。
若是消費者能手動控制message的「簽收」操做,只有當關於message的業務邏輯執行完成後再「簽收」,message再從rabbitmq中刪除,不然可讓message重發就行了。這一節就講這個。
Acknowledge意思是「確認」,消息經過 ACK 確認是否被正確接收,每一個 Message 都要被確認(acknowledged),能夠手動去 ACK 或自動 ACK。
使用手動應答消息,有一點須要特別注意,那就是不能忘記應答消息,由於對於RabbitMQ來講處理消息沒有超時,只要不該答消息,他就會認爲仍在正常處理消息,致使消息隊列出現阻塞,影響業務執行。若是不想處理,能夠reject丟棄該消息。
消息確認模式有:
默認是自動確認,能夠經過RabbitListenerContainerFactory 中進行開啓手動ack,或者中配置文件中開啓:
spring.rabbitmq.listener.simple.acknowledge-mode = manual
MessageListener.java
@Component @Slf4j public class MessageListener { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private ObjectMapper objectMapper; @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "direct.queue.d", durable = "false"), exchange = @Exchange(value = "direct.exchange.a", durable = "true", type = ExchangeTypes.DIRECT, ignoreDeclarationExceptions = "true"), key = "direct.routingKey.a" ) ) @RabbitHandler public void onMessage(Message message, Channel channel) throws Exception { String contentType = message.getMessageProperties().getContentType(); String bodyText = null; System.out.println(contentType); switch (contentType) { //字符串 case MessageProperties.CONTENT_TYPE_TEXT_PLAIN: bodyText = (String) rabbitTemplate.getMessageConverter().fromMessage(message); break; //json對象 case MessageProperties.CONTENT_TYPE_JSON: User user = objectMapper.readValue(message.getBody(), User.class); bodyText = user.toString(); break; } log.info("消費端Payload: " + bodyText); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
設置爲手動確認後,有3種確認操做:
如示例代碼中的 basicAck 方法,須要注意的是,要傳遞兩個參數:
除了上述手動確認的方式,還有一種不太經常使用的方式,能夠實現重複發送消息。在開啓異常重試的前提下,在消費者代碼中拋出異常,會自動重發消息。
application.properties
spring.rabbitmq.listener.simple.retry.enabled=true 是否開啓消費者重試 spring.rabbitmq.listener.simple.retry.max-attempts=5 最大重試次數 spring.rabbitmq.listener.simple.retry.initial-interval=5000 重試間隔時間(單位毫秒) spring.rabbitmq.listener.simple.default-requeue-rejected=false 重試次數超過上面的設置以後是否丟棄
MessageListener.java
@RabbitListener(bindings = @QueueBinding( value = @Queue(value = "direct.queue.d", durable = "false"), exchange = @Exchange(value = "direct.exchange.a", durable = "true", type = ExchangeTypes.DIRECT, ignoreDeclarationExceptions = "true"), key = "direct.routingKey.a" ) ) @RabbitHandler public void onMessage(Message message, Channel channel) throws Exception { String contentType = message.getMessageProperties().getContentType(); String bodyText = null; System.out.println(contentType); switch (contentType) { //字符串 case MessageProperties.CONTENT_TYPE_TEXT_PLAIN: bodyText = (String) rabbitTemplate.getMessageConverter().fromMessage(message); break; //json對象 case MessageProperties.CONTENT_TYPE_JSON: User user = objectMapper.readValue(message.getBody(), User.class); bodyText = user.toString(); break; } log.info("消費端Payload: " + bodyText); throw new RuntimeException("重試啦"); }
在RabbitMQ中消費者有2種方式獲取隊列中的消息:
對比來講,若是有持續消費的需求,建議用push的方式,經過監聽器來訂閱。若是隻是特定時刻須要從隊列中,一次性取些數據,能夠用pull方式。
咱們知道不管是生產者仍是消費者,都須要和 RabbitMQ Broker 創建鏈接,這個鏈接就是一條 TCP 鏈接,也就是 Connection。一旦 TCP 鏈接創建起來,客戶端緊接着能夠建立一個 AMQP 信道(Channel),每一個信道都會被指派一個惟一的 ID。
信道是創建在 Connection 之上的虛擬鏈接,RabbitMQ 處理的每條 AMQP 指令都是經過信道完成的。
咱們徹底可使用 Connection 就能完成信道的工做,爲何還要引入信道呢?試想這樣一個場景,一個應用程序中有不少個線程須要從 RabbitMQ 中消費消息,或者生產消息,那麼必然須要創建不少個 Connection,也就是多個 TCP 鏈接。然而對於操做系統而言,創建和銷燬 TCP 鏈接是很是昂貴的開銷,若是遇到使用高峯,性能瓶頸也隨之顯現。
RabbitMQ 採用相似 NIO(Non-blocking I/O)的作法,選擇 TCP 鏈接複用,不只能夠減小性能開銷,同時也便於管理。
每一個線程把持一個信道,因此信道複用了 Connection 的 TCP 鏈接。同時 RabbitMQ 能夠確保每一個線程的私密性,就像擁有獨立的鏈接同樣。當每一個信道的流量不是很大時,複用單一的 Connection 能夠在產生性能瓶頸的狀況下有效地節省 TCP 鏈接資源。可是信道自己的流量很大時,這時候多個信道複用一個 Connection 就會產生性能瓶頸,進而使總體的流量被限制了。此時就須要開闢多個 Connection,將這些信道均攤到這些 Connection 中,至於這些相關的調優策略須要根據業務自身的實際狀況進行調節。
信道在 AMQP 中是一個很重要的概念,大多數操做都是在信道這個層面展開的。好比 channel.exchangeDeclare、channel.queueDeclare、channel.basicPublish、channel.basicConsume 等方法。RabbitMQ 相關的 API 與 AMQP 緊密相連,好比 channel.basicPublish 對應 AMQP 的 Basic.Publish 命令。
針對push方式,RabbitMQ能夠設置basicQoS(Consumer Prefetch)來對consumer進行流控,從而限制未Ack的消息數量。
前提包括,消息確認模式必須是手動確認。
basicQos(int var1, boolean var2)