原文:https://blog.csdn.net/linpeng_1/article/details/80505828html
Spring AMQP提供了一個發送和接收消息的操做模板類AmqpTemplate。 AmqpTemplate它定義包含了發送和接收消息等的一些基本的操做功能。RabbitTemplate是AmqpTemplate的一個實現。java
RabbitTemplate支持消息的確認與返回,爲了返回消息,RabbitTemplate 須要設置mandatory 屬性爲true,而且CachingConnectionFactory 的publisherReturns屬性也須要設置爲true。返回的消息會根據它註冊的RabbitTemplate.ReturnCallback setReturnCallback 回調發送到給客戶端,spring
一個RabbitTemplate僅能支持一個ReturnCallback 。數據庫
爲了確認Confirms消息, CachingConnectionFactory 的publisherConfirms 屬性也須要設置爲true,確認的消息會根據它註冊的RabbitTemplate.ConfirmCallback setConfirmCallback回調發送到給客戶端。一個RabbitTemplate也僅能支持一個ConfirmCallback.json
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
server.port=8083 #服務器配置 spring.application.name=rabbitmq-hello-sending #rabbitmq鏈接參數 spring.rabbitmq.addresses:ip1:port1,ip2:port2,ip3:port3 #集羣或單機 均可配置 spring.rabbitmq.username=linpeng spring.rabbitmq.password=123456 # rabbitmq服務器的虛擬主機名,能夠在後臺管理系統上查看和新建 spring.rabbitmq.virtual-host=/test # 鏈接超時 spring.rabbitmq.connection-timeout=5s # 發送方 # 開啓發送確認(未到達MQ服務器) spring.rabbitmq.publisher-confirms=true # 開啓發送失敗退回(未找到對應queue) spring.rabbitmq.publisher-returns=true # 消費方 開啓手動ACK(坑:當序列化爲JSON時,此配置會失效,見下文) spring.rabbitmq.listener.direct.acknowledge-mode=manual spring.rabbitmq.listener.simple.acknowledge-mode=manual # 消費方 spring.rabbitmq.listener.concurrency=2 //最小消息監聽線程數 spring.rabbitmq.listener.max-concurrency=2 //最大消息監聽線程數 #消費者每次從隊列獲取的消息數量 (默認一次250個) #經過查看後臺管理器中queue的unacked數量 spring.rabbitmq.listener.simple.prefetch= 5 #消費者自動啓動 spring.rabbitmq.listener.simple.auto-startup=true #消費失敗,自動從新入隊 spring.rabbitmq.listener.simple.default-requeue-rejected= true #啓用發送重試 spring.rabbitmq.template.retry.enabled=true spring.rabbitmq.template.retry.initial-interval=1000 spring.rabbitmq.template.retry.max-attempts=3 spring.rabbitmq.template.retry.max-interval=10000 spring.rabbitmq.template.retry.multiplier=1.0
默認一個RabbitTemplate在RabbitMQ中至關於一個connection,每發送一次消息至關於channel,MQ接收消息後釋放channel。每一個connection最多支持2048個channel,加入從一個connection同時超過2048個線程併發發送,channel超過2048,會報錯org.springframework.amqp.AmqpResourceNotAvailableException: The channelMax limit is reached. Try later。數組
測試啓動publisher-confirms後,400個線程經過一個RabbitTemplate併發發送10000消息,同時就可能產生1000左右的channel。由於channel等在confirm。10000消息所有發送在幾秒內完成,10000消息所有confirm回調完成用時22秒。緩存
後臺管理頁面查看connection+channel
springboot
此connection中有10個線程併發發送消息,監控到10個channel生成,MQ完成接收後釋放channel。若是是publisher-confirms模式,channel會保持到confirm回調完成再釋放,影響併發性能。每一個connection最多支持2048個channel。bash
測試啓動publisher-confirms後,500個線程併發發送,部分消息報AmqpResourceNotAvailableException。400個線程經過一個RabbitTemplate併發發送10000消息,最高同時就可能產生1000多的channel。由於channel在等待執行confirm回調。10000消息所有發送在幾秒內完成,10000消息所有confirm回調完成用時22秒,此時全部channel所有釋放。服務器
若在rabbitmq的管理頁面手動建立隊列和交換機,則能夠再也不代碼中聲明
package com.example.demo; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { @Bean public Queue QueueA() { return new Queue("hello"); } @Bean public Queue QueueB() { return new Queue("helloObj"); } /** * Fanout 就是咱們熟悉的廣播模式或者訂閱模式,給Fanout交換機發送消息,綁定了這個交換機的全部隊列都收到這個消息。 * @return */ @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("ABExchange"); } @Bean DirectExchange Exchange() { return new DirectExchange("DExchange"); } @Bean Binding bindingExchangeA(Queue QueueA, FanoutExchange fanoutExchange) { return BindingBuilder.bind(QueueA).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue QueueB, FanoutExchange fanoutExchange) { return BindingBuilder.bind(QueueB).to(fanoutExchange); } @Bean Binding bindingExchange() { return BindingBuilder.bind(QueueA()).to(Exchange()).with("TEST");//routingKey } }
ConfirmCallback :ACK=true僅僅標示消息已被Broker接收到,並不表示已成功投放至消息隊列中, ACK=false標示消息因爲Broker處理錯誤,消息並未處理成功。如未找到對應交換機返回ACK=false。
ReturnCallback:當消息發送出去找不到對應路由隊列時,將會把消息退回 。若是有任何一個路由隊列接收投遞消息成功,則不會退回消息。MQ成功接收,可是未找到對應隊列觸發
經過以上異步確認機制,增長降級、補償處理。好比發送時保存信息和消息ID,ConfirmCallback 經過ID找到對應信息重發,注意要保證冪等性。
package com.example.demo; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; import java.util.Date; //RabbitTemplate.ConfirmCallback @Service public class HelloSender implements RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; public void send() { String context = "你好如今是 " + new Date() +""; System.out.println("HelloSender發送內容 : " + context); //消息序列化設置 //rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); //自身實現ReturnCallback接口 設置異步回調對象爲this this.rabbitTemplate.setReturnCallback(this); //如果當前類實現RabbitTemplate.ConfirmCallback接口,則能夠設置爲this //發送前給RabbitTemplate設置一個異步回調對象 RabbitTemplate.ConfirmCallback接口的匿名類 this.rabbitTemplate.setConfirmCallback((correlationData, confirm, cause) -> { //若發送時沒有CorrelationData,則這裏correlationData==null if (!confirm) { System.out.println("HelloSender消息發送失敗" + cause + correlationData.getId() ); //correlationData.getReturnedMessage(); Message //correlationData.toString(); } else { System.out.println("HelloSender 消息發送成功 "); } }); //this.rabbitTemplate.setConfirmCallback(this); //rabbitTemplate.convertAndSend("hello", context); //這裏指定路由鍵,注意不是隊列名 //發送時 能夠指定消息ID,方便在ConfirmCallback時候二次處理消息 rabbitTemplate.convertAndSend("DExchange","QueueRoutingKey", context, new CorrelationData("自定義消息ID")); } public void sendObj() { MessageObj obj = new MessageObj(); obj.setACK(false); obj.setId(123); obj.setName("zhangsan"); obj.setValue("data"); System.out.println("發送 : " + obj); this.rabbitTemplate.convertAndSend("helloObj", obj); } @Override public void returnedMessage(Message message, int i, String cause, String exchange, String queue) { //沒有找到queue //Message中的成員,Body爲消息內容 //(Body:'hello' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) } // @Override // public void confirm(CorrelationData correlationData, boolean confirm, String cause) { // System.out.println("sender success"); // } }
測試發送:
使用Spring默認的rabbitTemplate發送消息,CorrelationData能夠重複。
交換機+路由鍵+消息Object+CorrelationData
rabbitTemplate.convertAndSend("TEST.EX","TEST","String:message",new CorrelationData("111"));
在rabbitmq控制檯上getmessage查看 ,rabbitTemplate默認發送deliverymode=2消息,已經設置了消息持久化。
測試速度:
測試100個線程同時併發向同一隊列發送簡單消息(15左右長度的字符串)。從發送到100個消息所有完成ConfirmCallback,用時爲600ms左右。此過程不計入消費速度。
400個線程經過一個RabbitTemplate併發發送10000消息,同時就可能產生1000左右的channel。由於channel等在confirm。10000消息所有發送在幾秒內完成,10000消息所有confirm回調完成用時22秒。
測試ConfirmCallback回調:
public void confirm(CorrelationData correlationData, boolean confirm, String cause) ;
confirm==true僅僅標示消息已被Broker接收到,並不表示已成功投放至消息隊列中, confirm==false標示消息因爲Broker處理錯誤,消息並未處理成功。如未找到對應交換機返回confirm==false。
在此方法中針對confirm==false的消息實現降級/補償處理:重發、本地緩存、計入數據庫/Redis等、更新狀態.....
測試環境:實例化一個ConfirmCallback接口對象,做爲rabbitTemplate共用回調處理對象。
回調測試結果:
1 先發送到MQ的消息,先完成confirm回調。
2 ConfirmCallback默認是由同一個線程執行回調,打印線程名能夠看到線程名爲【AMQP Connection rabbitmqIp:port】
3 若發送時沒有攜帶CorrelationData,回調時這裏correlationData==null
4.設置消息確認會影響併發性能,每一個線程發送生成一個channel,channel會保持到confirm回調完成再釋放。由於每一個connection最多支持2048個channel,當channel達到2048時,會報錯org.springframework.amqp.AmqpResourceNotAvailableException: The channelMax limit is reached. Try later。
測試ReturnCallback 回調:
public void returnedMessage(Message message, int i, String cause, String exchange, String queue) ;
MQ成功接收消息,可是未找到對應路由鍵的隊列後回調。實現降級/補償處理。
測試環境:實例化一個ReturnCallback接口對象,做爲rabbitTemplate共用回調處理對象。
回調測試結果:
默認是由同一個線程執行回調,打印線程名能夠看到線程名爲【AMQP Connection rabbitmqIp:port】
message=返回的Message對象中的成員,Body爲發送時的消息內容 ,receivedDeliveryMode=PERSISTENT=2 爲持久化消息。spring_returned_message_correlation=發送時的CorrelationData
(Body:'String:message' MessageProperties [headers={spring_returned_message_correlation=111}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
cause=NO_ROUTE
exchange、queue 爲發送時的配置
設置QOS,避免觸發流控機制
#消費者每次從隊列獲取的消息數量 (默認一次250個)
spring.rabbitmq.listener.simple.prefetch= 5
當QUEUE達到5條Unacked消息時,不會再推送消息給Consumer。查看後臺管理器中queue的unacked數量
package com.example.demo; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.beans.factory.annotation.Configurable; import org.springframework.context.annotation.Bean; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.Date; import java.util.Map; @Component public class HelloReceiver { @RabbitListener(queues = "hello") //這裏是隊列名,不是路由鍵 public void process(String msg,Channel channel, Message message) throws IOException { System.out.println("HelloReceiver收到 : " + msg +"收到時間"+new Date()); try { //告訴MQ服務器收到這條消息 已經被我消費了 能夠在隊列刪掉 這樣之後就不會再發了 不然消息服務器覺得這條消息沒處理掉 後續還會在發 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); System.out.println("receiver success"); } catch (IOException e) { e.printStackTrace(); //丟棄這條消息 //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false); System.out.println("receiver fail"); } } }
msg是消息內容,至關於Message對象中的body。
Message對象的成員:
能夠看到有消息信息BODY,發送方生成的消息CorrelationData,還有執行的Method對象(@RabbitListener標註的方法),目標BEAN
備註:咱們用註解的方式來接受消息 就不要用 本身建立對象實現ChannelAwareMessageListener的方式來接受消息 這種方式還要去全局裏面配置麻煩,直接用@RabbitListener(queues = "hello")最簡單
消息確認 由於我在屬性配置文件裏面開啓了ACK確認 因此若是代碼沒有執行ACK確認 你在RabbitMQ的後臺會看到消息會一直留在隊列裏面未消費掉 只要程序一啓動開始接受該隊列消息的時候 又會收到
方法參數詳解:http://www.javashuo.com/article/p-dwmtzvog-ey.html
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); deliveryTag:該消息的index,由發送方生成 multiple:是否批量.true:將一次性ack全部小於deliveryTag的消息。
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); deliveryTag:該消息的index multiple:是否批量.true:將一次性拒絕全部小於deliveryTag的消息。 requeue:被拒絕的是否從新入隊列,true 放在隊首,false 消息進入綁定的DLX。必定注意:若此消息一直Nack重入隊會致使的死循環
channel.basicNack 與 channel.basicReject 的區別在於basicNack能夠拒絕多條消息,而basicReject一次只能拒絕一條消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); deliveryTag:該消息的index requeue:被拒絕的是否從新入隊列。false 消息進入綁定的DLX
ShutdownSignalException
1 隊列名找不到
2 代碼中有ack,可是沒有配置手動ACK
消費超時,queue中unacked的消息會退回到queue中,且消費者ACK時會失敗。
@Component public class MessageHandler { //獲取消息的頭屬性和body屬性 @RabbitListener(queues = "zhihao.miao.order") public void handleMessage(@Payload String body, @Headers Map<String,Object> headers){ System.out.println("====消費消息===handleMessage"); System.out.println(headers); System.out.println(body); } }
@Component @RabbitListener(queues = "consumer_queue") public class Receiver { @RabbitHandler public void processMessage1(String message) { System.out.println(message); } @RabbitHandler public void processMessage2(byte[] message) { System.out.println(new String(message)); } }
當中默認的序列化類爲SimpleMessageConverter。
僅僅有調用了convertAndSend方法纔會使用對應的MessageConvert進行消息的序列化與反序列化。
SimpleMessageConverter對於要發送的消息體body爲字節數組時。不進行處理。
對於假設是String。則將String轉成字節數組。
對於假設是Java對象,則使用jdk序列化Serializable將消息轉成字節數組。轉出來的結果較大,含class類名。類對應方法等信息。所以性能較差。
當使用RabbitMq做爲中間件時,數據量比較大,此時就要考慮使用相似Jackson2JsonMessageConverter。hessian等序列化形式。以此提升性能。
https://www.jianshu.com/p/911d987b5f11
發送
@Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); return rabbitTemplate; }
User user = new User("linyuan"); rabbitTemplate.convertAndSend("topic.exchange","queue1",user);
接收
@Bean public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//json序列化時,若想手動ACK,則必須配置 return factory; }
@Component @RabbitListener(queues = "queue1") public class Receiver { @RabbitHandler public void processMessage1(@Payload User user) { System.out.println(user.getName()); } }
解決方案: https://blog.csdn.net/m912595719/article/details/83787486
這是springboot集成RabbitMQ的一個大坑。當消費者配置JSON反序列化時,配置文件中的手動ACK會失效,消費者會變成自動ACK模式。spring.rabbitmq.listener.direct.acknowledge-mode=manual,spring.rabbitmq.listener.simple.acknowledge-mode=manual 配置失效。
解決方法是消費者配置RabbitListenerContainerFactory這個Bean時(見上),設置factory.setAcknowledgeMode(AcknowledgeMode.MANUAL)。把消費者強制轉換爲手動ACK。
若是配置失效切換爲自動ACK,可是代碼中又使用channel.basicAck手動ACK。這樣會形成雙ACK的ERROR,接着信道會重啓重連。以下:
o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
unknown delivery tag 1表示當前Channel中找不到delivery-tag=1的消息,實際上是這個消息已經自動ACK了,basicAck時就會出錯。測試顯示,消息並不會丟失而是在出現ERROR異常後走向Nack後從新入隊,再屢次重複消費後最終ACK成功,嚴重下降消費者的執行效率。
Delivery Tags投遞的標識
當一個消費者向RabbitMQ註冊後,RabbitMQ會用 basic.deliver 方法向消費者推送消息,這個方法攜帶了一個 delivery tag, 它在一個channel中惟一表明瞭一次投遞。delivery tag的惟一標識範圍限於channel. delivery tag是單調遞增的正整數,客戶端獲取投遞的方法用用dellivery tag做爲一個參數。
@Autowired private HelloSender helloSender; /** * 單生產者-單個消費者 */ @RequestMapping("/test") public void hello() throws Exception { helloSender.send(); }
發送消息
ACK場景測試
咱們把HelloReceiver的ACK確認代碼註釋掉 ,那消息就算程序收到了, 可是未確認ACK致使消息服務器覺得他是未成功消費的,若此時消費者斷開則消息返回隊列,後續還會再發。