隨着對消息隊列的應用日益推廣,在分佈式系統中的使用能夠極大的下降對各個組件間的耦合度,從而提升組件的處理效率。由於消息隊列的存在,可使咱們對任務進行異步處理,這樣能夠減小請求響應時間和解耦。同時因爲使用了消息隊列,只要保證消息格式不變,消息的發送方和接收方並不須要彼此聯繫,也不須要受對方的影響,即解耦和。
所謂解耦,就是說A 系統產生一條數據,發送到 MQ 裏面去,哪一個系統須要數據本身去 MQ 裏面消費。若是新系統須要數據,直接從 MQ 裏消費便可;若是某個系統不須要這條數據了,就取消對 MQ 消息的消費便可。這樣下來,A 系統壓根兒不須要去考慮要給誰發送數據,不須要維護這個代碼,也不須要考慮人家是否調用成功、失敗超時等狀況。
所謂異步,那麼 A 系統連續發送 3 條消息到 MQ 隊列中,假如耗時 5ms,A 系統從接受一個請求到返回響應給用戶,總時長是 3 + 5 = 8ms,對於用戶而言,其實感受上就是點個按鈕,8ms 之後就直接返回了,爽!網站作得真好,真快!
此外使用消息隊列還有削峯的優點。所謂削峯,即在某些時刻,用戶會大量的對咱們的服務發起請求,咱們的數據庫有時候須要對這些請求進行寫入,可是呢,mysql的吞吐量頂破天就5000,剩下的就要慢慢等了,並且當併發量太高的時候,數據庫的各類異常也會讓人抓狂,可是呢,咱們使用消息隊列就不同了,用戶的各類請求統統塞入消息隊列裏面,以後由消息隊列返回處理結果,而請求存儲在隊列裏面,一個個按順序消費,使請求寫入不出現高峯低谷
基於這些有點,咱們開發團隊最近在spring boot的開發過程當中,因爲項目的須要咱們進行消息隊列的接入改造。
在改造過程當中遇到了這樣的問題,起初我將註解寫在了class上,在運行的過程當中會出現異常,如下是異常的詳細內容:html
2019-05-31 17:42:36.798 WARN 30544 --- [cTaskExecutor-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.java
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1506)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1417)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.amqp.AmqpException: No method found for class java.util.LinkedHashMap
at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getHandlerForPayload(DelegatingInvocableHandler.java:147)
at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getMethodNameFor(DelegatingInvocableHandler.java:250)
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.getMethodAsString(HandlerAdapter.java:70)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:190)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:120)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414)
... 8 common frames omittedmysql
至於說開發的源碼,我是這麼寫的,我在class這裏進行註解,這個時候我猜想,應該是註解的位置不對spring
@Component @RabbitListener(queues = "xx.yy.zz") public class Receiver { @RabbitHandler public void process(MSGSTO message) { System.out.println("消費消息"); System.out.println(message.toString()); } }
事實上,確實是位置不對,但更加專業的解答方式是,這個listener註解是方法級別上的,而不能用在class上面,咱們不妨來看下RabbitListener的源碼,從根本上理解這個方法的使用。sql
package org.springframework.amqp.rabbit.annotation; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; import java.lang.annotation.Repeatable; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; import org.springframework.messaging.handler.annotation.MessageMapping; @Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE }) @Retention(RetentionPolicy.RUNTIME) @MessageMapping @Documented @Repeatable(RabbitListeners.class) public @interface RabbitListener { String id() default ""; String containerFactory() default ""; String[] queues() default {}; Queue[] queuesToDeclare() default {}; boolean exclusive() default false; String priority() default ""; String admin() default ""; QueueBinding[] bindings() default {}; String group() default ""; String returnExceptions() default ""; String errorHandler() default ""; String concurrency() default ""; String autoStartup() default ""; }
因爲業務須要,咱們確實是須要對消息進行異步處理,而異步接收消息的最簡單的方法是使用帶註解的監聽端點基礎結構。簡而言之,它容許將託管bean的方法公開爲Rabbit listener的端點。br/>在這裏,使用queues屬性時,能夠指定關聯的容器能夠監聽多個隊列。可使用@Header註釋來建立POJO方法可接收消息的隊列名稱。
這裏我經過queues來指定監聽的隊列數據庫
@Component public class Receiver { @RabbitListener(queues = "xx.yy.zz") @RabbitHandler public void process(MSGSTO message) { System.out.println("消費消息"); System.out.println(message.toString());
至於說配置方式,我是經過application.yml的形式進行接入配置的,例如併發
rabbitmq: addresses: 127.0.0.1 port: 5672 username: guest password: guest publisher-confirms: true publisher-returns: true virtual-host: dev listener: simple: concurrency: 10 max-concurrency: 20
這些屬性會被注入到RabbitProperties屬性中,如app
@ConfigurationProperties(prefix = "spring.rabbitmq") public class RabbitProperties { … }
挺有趣的對吧:)異步
參考資料:分佈式