摘要: 介紹confirm的工做機制。使用spring-amqp介紹事務以及發佈確認的使用方式。由於事務以及發佈確認是針對channel來說,因此在一個鏈接中兩個channel,一個channel可使用事務,另外一個channel可使用發佈確認,並介紹了何時該使用事務,何時該使用發佈確認java
Confirms是增長的一個確認機制的類,繼承自標準的AMQP。這個類只包含了兩個方法:confirm.select和confirm.select-ok。另外,basic.ack方法被髮送到客戶端。spring
confirm.select是在一個channel中啓動發佈確認。注意:一個具備事務的channel不能放入到確認模式,一樣確認模式下的channel不能用事務。typescript
當confirm.select被髮送/接收。發佈者/broker開始計數(首先是發佈而後confirm.select被記爲1)。一旦channel爲確認模式,發佈者應該指望接收到basic.ack方法,delivery-tag屬性顯示確認消息的數量。數據庫
當broker確認了一個消息,會通知發佈者消息被成功處理;express
basic的規則是這樣的:服務器
一個未被路由的具備manadatory或者immediate的消息被正確確認後觸發basic.return;網絡
另外,一個瞬時態的消息被確認目前已經入隊;less
持久化的消息在持久化到磁盤或者每一個隊列的消息被消費以後被確認。異步
關於confirm會有一些問題:ide
首先,broker不能保證消息會被confirm,只知道將會進行confirm。
第二,當未被確認的消息堆積時消息處理緩慢,對於確認模式下的發佈,broker會作幾個操做,日誌記錄未被確認的消息
第三,若是發佈者與broker之間的鏈接刪除了未能獲得確認,它不必定知道消息丟失,因此可能會發布重複的消息。
最後,若是在broker中發生壞事會致使消息丟失,將會basic.nack那些消息
總之,Confirms給客戶端一種輕量級的方式,可以跟蹤哪些消息被broker處理,哪些可能由於broker宕掉或者網絡失敗的狀況而從新發布。
確認而且保證消息被送達,提供了兩種方式:發佈確認和事務。(二者不可同時使用)在channel爲事務時,不可引入確認模式;一樣channel爲確認模式下,不可以使用事務。
Spring AMQP作的不只僅是回滾事務,並且能夠手動拒絕消息,如當監聽容器發生異常時是否從新入隊。
持久化的消息是應該在broker重啓前都有效。若是在消息有機會寫入到磁盤以前broker宕掉,消息仍然會丟失。在某些狀況下,這是不夠的,發佈者須要知道消息是否處理正確。簡單的解決方案是使用事務,即提交每條消息。
案例:
RabbitTemplate的使用案例(同步),由調用者提供外部事務,在模板中配置了channe-transacted=true。一般是首選,由於它是非侵入性的(低耦合)
<rabbit:template id="rabbitTemplate" connection-factory="cachingConnectionFactory" exchange="sslexchange" channel-transacted="true"/>
@Transactional
public void doSomething() { ApplicationContext context = new GenericXmlApplicationContext("spring-amqp-test.xml"); RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class); String incoming = (String) rabbitTemplate.receiveAndConvert(); // do some more database processing... String outgoing = processInDatabaseAndExtractReply(incoming); //數據庫操做中若是失敗了,outgoing這條消息不會被髮送,incoming消息也會返回到broker服務器中,由於這是一條事務鏈。 //可作XA事務,在消息傳送與數據庫訪問中共享事務。 rabbitTemplate.convertAndSend(outgoing); } private String processInDatabaseAndExtractReply(String incoming){ return incoming; }
異步使用案例(外部事務)
<bean id="rabbitTxManage" class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager"> <property name="connectionFactory" ref="cachingConnectionFactory"></property> </bean> <rabbit:listener-container connection-factory="cachingConnectionFactory" transaction-manager="rabbitTxManage" channel-transacted="true"> <rabbit:listener ref="foo" method="onMessage" queue-names="rabbit-ssl-test"/> </rabbit:listener-container>
在容器中配置事務時,若是提供了transactionManager,channelTransaction必須爲true;若是爲false,外部的事務仍然能夠提供給監聽容器,形成的影響是在回滾的業務操做中也會提交消息傳輸的操做。
使用事務有兩個問題:
Ø 一是會阻塞,發佈者必須等待broker處理每一個消息。若是發佈者知道在broker死掉以前哪些消息沒有被處理就足夠了。
Ø 第二個問題是事務是重量級的,每次提交都須要fsync(),須要耗費大量的時間。
confirm模式下,broker將會確認消息並處理。這種模式下是異步的,生產者能夠流水式的發佈而不用等待broker,broker能夠批量的往磁盤寫入。
發佈確認必須配置在CachingConnectionFactory上
<bean id="cachingConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> <property name="host" value="192.168.111.128"></property> <property name="port" value="5672"></property> <property name="username" value="admin"/> <property name="password" value="admin"/> <property name="publisherConfirms" value="true"/> <property name="publisherReturns" value="true"/> </bean>
若使用confirm-callback或return-callback,必需要配置publisherConfirms或publisherReturns爲true
每一個rabbitTemplate只能有一個confirm-callback和return-callback
//確認消息是否到達broker服務器,也就是隻確認是否正確到達exchange中便可,只要正確的到達exchange中,broker便可確認該消息返回給客戶端ack。 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { System.out.println("消息確認成功"); } else { //處理丟失的消息(nack) System.out.println("消息確認失敗"); } } });
使用return-callback時必須設置mandatory爲true,或者在配置中設置mandatory-expression的值爲true,可針對每次請求的消息去肯定’mandatory’的boolean值,只能在提供’return -callback’時使用,與mandatory互斥。
rabbitTemplate.setMandatory(true); //確認消息是否到達broker服務器,也就是隻確認是否正確到達exchange中便可,只要正確的到達exchange中,broker便可確認該消息返回給客戶端ack。 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { //從新發布 RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(errorTemplate,"errorExchange", "errorRoutingKey"); Throwable cause = new Exception(new Exception("route_fail_and_republish")); recoverer.recover(message,cause); System.out.println("Returned Message:"+replyText); } }); errorTemplate配置: <rabbit:queue id="errorQueue" name="errorQueue" auto-delete="false" durable="true"> <rabbit:queue-arguments> <entry key="x-ha-policy" value="all"/> <entry key="ha-params" value="1"/> <entry key="ha-sync-mode" value="automatic"/> </rabbit:queue-arguments> </rabbit:queue> <rabbit:direct-exchange id="errorExchange" name="errorExchange" auto-delete="false" durable="true"> <rabbit:bindings> <rabbit:binding queue="errorQueue" key="errorRoutingKey"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate"> <property name="backOffPolicy"> <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy"> <property name="initialInterval" value="200" /> <property name="maxInterval" value="30000" /> </bean> </property> <property name="retryPolicy"> <bean class="org.springframework.retry.policy.SimpleRetryPolicy"> <property name="maxAttempts" value="5"/> </bean> </property> </bean> <rabbit:template id="errorTemplate" connection-factory="cachingConnectionFactory" exchange="errorExchange" queue="errorQueue" routing-key="errorRoutingKey" retry-template="retryTemplate" />
private RabbitTemplate rabbitTemplate; private TransactionTemplate transactionTemplate; @Before public void init() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost("192.168.111.128"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); template = new RabbitTemplate(connectionFactory); template.setChannelTransacted(true); RabbitTransactionManager transactionManager = new RabbitTransactionManager(connectionFactory); transactionTemplate = new TransactionTemplate(transactionManager); connectionFactory.setPublisherConfirms(true); rabbitTemplate = new RabbitTemplate(connectionFactory); }
//發佈確認測試 @Test public void testPublishConfirm(){ rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(ack){ System.out.println("消息確認成功"); }else{ System.out.println("消息確認失敗"); } } }); //發送到一個不存在的exchange,則會觸發發佈確認 rabbitTemplate.convertAndSend("asd","aaa","message"); String message = (String) rabbitTemplate.receiveAndConvert(ROUTE); assertEquals("message",message); }
//事務測試 @Test public void testSendAndReceiveInTransaction() throws Exception { //因爲有spring的事務參與,而發送操做在提交事務時,是不容許除template的事務有其餘事務的參與,因此這裏不會提交 //隊列中就沒有消息,因此在channel.basicGet時命令返回的是basic.get-empty(隊列中沒有消息時),而有消息時,返回basic.get-ok String result = transactionTemplate.execute(new TransactionCallback<String>() { @Override public String doInTransaction(TransactionStatus status) { template.convertAndSend(ROUTE, "message"); return (String) template.receiveAndConvert(ROUTE); } }); //spring事務完成,對其中的操做須要提交,發送與接收操做被認爲是一個事務鏈而提交 assertEquals(null, result); //這裏的執行不受spring事務的影響 result = (String) template.receiveAndConvert(ROUTE); assertEquals("message", result); }
轉載:https://my.oschina.net/lzhaoqiang/blog/670749
學習:http://www.kancloud.cn/longxuan/rabbitmq-arron/117518