前面幾篇記錄了收發消息的demo,今天記錄下關於 消息確認方面的 問題.html
下面是幾個問題:java
1.爲何要進行消息確認?git
2.rabbitmq消息確認 機制是什麼樣的?github
3.發送方如何確認消息發送成功?什麼樣纔算發送成功?spring
4.消費方如何告知rabbitmq消息消費成功或失敗?json
5.使用spring的代碼示例app
1.爲何要進行消息確認?異步
常常會聽到丟消息的字眼, 對於前面的demo來講,就存在丟消息的隱患.ide
發送者無法確認是否發送成功,消費者處理失敗也沒法反饋.性能
沒有消息確認機制,就會出現消息莫名其妙的沒了,也不知道什麼狀況.
2.rabbitmq消息確認 機制是什麼樣的?
首先看官網對消息確認的介紹http://www.rabbitmq.com/confirms.html
網上會有不少總結的博客(包括如今看的),不少就是對官網的翻譯.因此看資料首先要去官網看看,這很關鍵.
看上圖官網的介紹.惟一保證消息不丟失的是使用事務,可是性能太差,做爲補償,有了消息確認機制.
並說明了開啓方法,以及和事務模式不共存.
還寫了一個例子,可是點進去那個連接已經失效了,新版的源碼上也沒有這個例子,我找了最近一版是3.6.7上面還有.
點這裏看官方的例子
3.發送的消息什麼樣纔算成功或失敗? 如何確認?
判斷消息成功或失敗,其實就是看進行消息確認的時機,由於成功或失敗後就會把結果告訴發送方.仍是看官方解釋:
意思以下:
確認消息不能路由時(exchange確認不能路由到任何queue),進行確認操做(確認失敗).若是發送方設置了mandatory模式,則會先調用basic.return方法.
消息能夠路由時,當須要發送的隊列都發送成功後,進行消息確認.對於持久化的隊列,意味着已經寫入磁盤,對於鏡像隊列,意味着全部鏡像都接受成功.
至於如何確認的問題,上面已經寫了 basic.ack方法
4.消費方如何告知rabbitmq消息消費成功或失敗?
如圖可知,根據消費方不一樣的確認模式,確認時機也不一樣.
自動確認會在消息發送給消費者後當即確認,若是手動則當消費者調用ack,nack,reject幾種方法時進行確認.
通常會設置手動模式,業務失敗後能夠進行一些操做.
5.使用spring的代碼示例
下面是一個使用spring整合的代碼示例:
首先是rabbitmq的配置文件:
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
- xsi:schemaLocation="http://www.springframework.org/schema/beans
- http://www.springframework.org/schema/beans/spring-beans.xsd
- http://www.springframework.org/schema/rabbit
- http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd">
-
-
- <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
-
- <rabbit:connection-factory
- id="connectionFactory"
- host="${rabbit.host}"
- port="${rabbit.port}"
- username="${rabbit.username}"
- password="${rabbit.password}"
- publisher-confirms="true"
- />
-
- <rabbit:admin connection-factory="connectionFactory" />
-
-
- <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
- confirm-callback="confirmCallBackListener"
- return-callback="returnCallBackListener"
- mandatory="true"
- />
-
- <rabbit:queue name="CONFIRM_TEST" />
-
- <rabbit:direct-exchange name="DIRECT_EX" id="DIRECT_EX" >
- <rabbit:bindings>
- <rabbit:binding queue="CONFIRM_TEST" />
- </rabbit:bindings>
- </rabbit:direct-exchange>
-
-
- <rabbit:listener-container
- connection-factory="connectionFactory" acknowledge="manual" >
- <rabbit:listener queues="CONFIRM_TEST" ref="receiveConfirmTestListener" />
- </rabbit:listener-container>
-
- </beans>
而後發送方:
- import org.springframework.amqp.core.AmqpTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
-
- @Service("publishService")
- public class PublishService {
- @Autowired
- private AmqpTemplate amqpTemplate;
-
- public void send(String exchange, String routingKey, Object message) {
- amqpTemplate.convertAndSend(exchange, routingKey, message);
- }
- }
消費方:
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
- import org.springframework.stereotype.Service;
-
- import com.rabbitmq.client.Channel;
-
- @Service("receiveConfirmTestListener")
- public class ReceiveConfirmTestListener implements ChannelAwareMessageListener {
- @Override
- public void onMessage(Message message, Channel channel) throws Exception {
- try{
- System.out.println("consumer--:"+message.getMessageProperties()+":"+new String(message.getBody()));
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
- }catch(Exception e){
- e.printStackTrace();
- channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
- }
- }
- }
確認後回調:
- import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
- import org.springframework.amqp.rabbit.support.CorrelationData;
- import org.springframework.stereotype.Service;
-
- @Service("confirmCallBackListener")
- public class ConfirmCallBackListener implements ConfirmCallback{
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- System.out.println("confirm--:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause);
- }
- }
失敗後return回調:
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
- import org.springframework.stereotype.Service;
-
- @Service("returnCallBackListener")
- public class ReturnCallBackListener implements ReturnCallback{
- @Override
- public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
- System.out.println("return--message:"+new String(message.getBody())+",replyCode:"+replyCode+",replyText:"+replyText+",exchange:"+exchange+",routingKey:"+routingKey);
- }
- }
測試類:
- import org.junit.Test;
- import org.junit.runner.RunWith;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.test.context.ContextConfiguration;
- import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
-
- import com.dingcheng.confirms.publish.PublishService;
-
- @RunWith(SpringJUnit4ClassRunner.class)
- @ContextConfiguration(locations = {"classpath:application-context.xml"})
- public class TestConfirm {
- @Autowired
- private PublishService publishService;
-
- private static String exChange = "DIRECT_EX";
-
- @Test
- public void test1() throws InterruptedException{
- String message = "currentTime:"+System.currentTimeMillis();
- System.out.println("test1---message:"+message);
-
- publishService.send(exChange,"CONFIRM_TEST",message);
- Thread.sleep(1000);
- }
-
- @Test
- public void test2() throws InterruptedException{
- String message = "currentTime:"+System.currentTimeMillis();
- System.out.println("test2---message:"+message);
-
- publishService.send(exChange+"NO","CONFIRM_TEST",message);
- Thread.sleep(1000);
- }
-
- @Test
- public void test3() throws InterruptedException{
- String message = "currentTime:"+System.currentTimeMillis();
- System.out.println("test3---message:"+message);
-
- publishService.send(exChange,"",message);
- }
-
- @Test
- public void test4() throws InterruptedException{
- String message = "currentTime:"+System.currentTimeMillis();
- System.out.println("test4---message:"+message);
-
- publishService.send(exChange+"NO","CONFIRM_TEST",message);
- Thread.sleep(1000);
- }
- }
測試結果:
- test1---message:currentTime:1483786948506
- test2---message:currentTime:1483786948532
- consumer--:MessageProperties [headers={spring_return_correlation=445bc7ca-a5bd-47e2-8ba3-f0448420e441}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=DIRECT_EX, receivedRoutingKey=CONFIRM_TEST, deliveryTag=1, messageCount=0]:currentTime:1483786948506
- test3---message:currentTime:1483786948536
- confirm--:correlationData:null,ack:false,cause:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40)
- confirm--:correlationData:null,ack:false,cause:Channel closed by application
- [ERROR] 2017-01-07 19:02:28 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.shutdownCompleted(CachingConnectionFactory.java:281):--> Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40)
- return--message:currentTime:1483786948536,replyCode:312,replyText:NO_ROUTE,exchange:DIRECT_EX,routingKey:
- confirm--:correlationData:null,ack:true,cause:null
- test4---message:currentTime:1483786948546
- confirm--:correlationData:null,ack:false,cause:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40)
- [ERROR] 2017-01-07 19:02:28 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.shutdownCompleted(CachingConnectionFactory.java:281):--> Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40)
-
代碼和配置裏面,已經都有註釋,就不在多說明了.(callback是異步的,因此測試中sleep1秒鐘等待下)
總結下就是:
若是消息沒有到exchange,則confirm回調,ack=false
若是消息到達exchange,則confirm回調,ack=true
exchange到queue成功,則不回調return
exchange到queue失敗,則回調return(需設置mandatory=true,不然不回回調,消息就丟了)
備註:須要說明,spring-rabbit和原生的rabbit-client ,表現是不同的.
測試的時候,原生的client,exchange錯誤的話,直接就報錯了,是不會到confirmListener和returnListener的