註解位置辨析

隨着對消息隊列的應用日益推廣,在分佈式系統中的使用能夠極大的下降對各個組件間的耦合度,從而提升組件的處理效率。由於消息隊列的存在,可使咱們對任務進行異步處理,這樣能夠減小請求響應時間和解耦。同時因爲使用了消息隊列,只要保證消息格式不變,消息的發送方和接收方並不須要彼此聯繫,也不須要受對方的影響,即解耦和。
所謂解耦,就是說A 系統產生一條數據,發送到 MQ 裏面去,哪一個系統須要數據本身去 MQ 裏面消費。若是新系統須要數據,直接從 MQ 裏消費便可;若是某個系統不須要這條數據了,就取消對 MQ 消息的消費便可。這樣下來,A 系統壓根兒不須要去考慮要給誰發送數據,不須要維護這個代碼,也不須要考慮人家是否調用成功、失敗超時等狀況。
註解位置辨析
所謂異步,那麼 A 系統連續發送 3 條消息到 MQ 隊列中,假如耗時 5ms,A 系統從接受一個請求到返回響應給用戶,總時長是 3 + 5 = 8ms,對於用戶而言,其實感受上就是點個按鈕,8ms 之後就直接返回了,爽!網站作得真好,真快!
註解位置辨析
此外使用消息隊列還有削峯的優點。所謂削峯,即在某些時刻,用戶會大量的對咱們的服務發起請求,咱們的數據庫有時候須要對這些請求進行寫入,可是呢,mysql的吞吐量頂破天就5000,剩下的就要慢慢等了,並且當併發量太高的時候,數據庫的各類異常也會讓人抓狂,可是呢,咱們使用消息隊列就不同了,用戶的各類請求統統塞入消息隊列裏面,以後由消息隊列返回處理結果,而請求存儲在隊列裏面,一個個按順序消費,使請求寫入不出現高峯低谷
註解位置辨析
基於這些有點,咱們開發團隊最近在spring boot的開發過程當中,因爲項目的須要咱們進行消息隊列的接入改造。
在改造過程當中遇到了這樣的問題,起初我將註解寫在了class上,在運行的過程當中會出現異常,如下是異常的詳細內容:html

2019-05-31 17:42:36.798 WARN 30544 --- [cTaskExecutor-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.java

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1506)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1417)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.amqp.AmqpException: No method found for class java.util.LinkedHashMap
at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getHandlerForPayload(DelegatingInvocableHandler.java:147)
at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getMethodNameFor(DelegatingInvocableHandler.java:250)
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.getMethodAsString(HandlerAdapter.java:70)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:190)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:120)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414)
... 8 common frames omittedmysql

至於說開發的源碼,我是這麼寫的,我在class這裏進行註解,這個時候我猜想,應該是註解的位置不對spring

@Component
@RabbitListener(queues = "xx.yy.zz")
public class Receiver {
    @RabbitHandler
    public void process(MSGSTO message) {
        System.out.println("消費消息");
        System.out.println(message.toString());
    }
}

事實上,確實是位置不對,但更加專業的解答方式是,這個listener註解是方法級別上的,而不能用在class上面,咱們不妨來看下RabbitListener的源碼,從根本上理解這個方法的使用。sql

package org.springframework.amqp.rabbit.annotation;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Repeatable;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import org.springframework.messaging.handler.annotation.MessageMapping;
@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
@Repeatable(RabbitListeners.class)
public @interface RabbitListener {
   String id() default "";

   String containerFactory() default "";

   String[] queues() default {};

   Queue[] queuesToDeclare() default {};

   boolean exclusive() default false;

   String priority() default "";

   String admin() default "";

   QueueBinding[] bindings() default {};

   String group() default "";

   String returnExceptions() default "";

   String errorHandler() default "";

   String concurrency() default "";

   String autoStartup() default "";

}

因爲業務須要,咱們確實是須要對消息進行異步處理,而異步接收消息的最簡單的方法是使用帶註解的監聽端點基礎結構。簡而言之,它容許將託管bean的方法公開爲Rabbit listener的端點。br/>在這裏,使用queues屬性時,能夠指定關聯的容器能夠監聽多個隊列。可使用@Header註釋來建立POJO方法可接收消息的隊列名稱。
這裏我經過queues來指定監聽的隊列數據庫

@Component
public class Receiver {

    @RabbitListener(queues = "xx.yy.zz")
    @RabbitHandler
    public void process(MSGSTO message) {
        System.out.println("消費消息");
        System.out.println(message.toString());

至於說配置方式,我是經過application.yml的形式進行接入配置的,例如併發

rabbitmq:
  addresses: 127.0.0.1
  port: 5672
  username: guest
  password: guest
  publisher-confirms: true
  publisher-returns: true
  virtual-host: dev
  listener:
      simple:
          concurrency: 10
          max-concurrency: 20

這些屬性會被注入到RabbitProperties屬性中,如app

@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitProperties {
    …
}

挺有趣的對吧:)異步

參考資料:分佈式

  1. 爲何使用消息隊列,https://www.javazhiyin.com/22897.html
相關文章
相關標籤/搜索