在上一篇《Spring Cloud Stream如何處理消息重複消費》中,咱們經過消費組的配置解決了多實例部署狀況下消息重複消費這一入門時的常見問題。本文將繼續說說在另一個被常常問到的問題:若是微服務生產的消息本身也想要消費一份,應該如何實現呢?java
在放出標準答案前,先放出一個常見的錯誤姿式和告警信息(以便您能夠經過搜索引擎找到這裏^_^)。如下錯誤基於Spring Boot 2.0.五、Spring Cloud Finchley SR1。react
首先,根據入門示例,爲了生產和消費消息,須要定義兩個通道:一個輸入、一個輸出。好比下面這樣:git
public interface TestTopic { String OUTPUT = "example-topic"; String INPUT = "example-topic"; @Output(OUTPUT) MessageChannel output(); @Input(INPUT) SubscribableChannel input(); }
經過INPUT
和OUTPUT
使用相同的名稱,讓生產消息和消費消息指向相同的Topic,從而實現消費本身發出的消息。github
接下來,建立一個HTTP接口,並經過上面定義的輸出通道觸來生產消息,好比:web
@Slf4j @RestController public class TestController { @Autowired private TestTopic testTopic; @GetMapping("/sendMessage") public String messageWithMQ(@RequestParam String message) { testTopic.output().send(MessageBuilder.withPayload(message).build()); return "ok"; } }
已經有生產消息的實現,下面來建立對輸入通道的監聽,以實現消息的消費邏輯。spring
@Slf4j @Component public class TestListener { @StreamListener(TestTopic.INPUT) public void receive(String payload) { log.info("Received: " + payload); throw new RuntimeException("BOOM!"); } }
最後,在應用主類中,使用@EnableBinding註解來開啓它,好比:json
@EnableBinding(TestTopic.class) @SpringBootApplication public class TestApplication { public static void main(String[] args) { SpringApplication.run(TestApplication.class, args); } }
看似完美無缺的操做,然而在啓動的瞬間,你可能收到了下面這樣的錯誤:bash
org.springframework.beans.factory.BeanDefinitionStoreException: Invalid bean definition with name 'example-topic' defined in com.didispace.stream.TestTopic: bean definition with this name already exists - Root bean: class [null]; scope=; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=com.didispace.stream.TestTopic; factoryMethodName=input; initMethodName=null; destroyMethodName=null at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerBindingTargetBeanDefinition(BindingBeanDefinitionRegistryUtils.java:64) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE] at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerOutputBindingTargetBeanDefinition(BindingBeanDefinitionRegistryUtils.java:54) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE] at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.lambda$registerBindingTargetBeanDefinitions$0(BindingBeanDefinitionRegistryUtils.java:86) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE] at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:562) ~[spring-core-5.0.9.RELEASE.jar:5.0.9.RELEASE] at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:541) ~[spring-core-5.0.9.RELEASE.jar:5.0.9.RELEASE] at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerBindingTargetBeanDefinitions(BindingBeanDefinitionRegistryUtils.java:76) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE] at org.springframework.cloud.stream.config.BindingBeansRegistrar.registerBeanDefinitions(BindingBeansRegistrar.java:45) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE] at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.lambda$loadBeanDefinitionsFromRegistrars$1(ConfigurationClassBeanDefinitionReader.java:358) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE] at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) ~[na:1.8.0_151] at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsFromRegistrars(ConfigurationClassBeanDefinitionReader.java:357) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE] at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsForConfigurationClass(ConfigurationClassBeanDefinitionReader.java:145) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE] at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitions(ConfigurationClassBeanDefinitionReader.java:117) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE] at org.springframework.context.annotation.ConfigurationClassPostProcessor.processConfigBeanDefinitions(ConfigurationClassPostProcessor.java:328) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE] at org.springframework.context.annotation.ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(ConfigurationClassPostProcessor.java:233) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE] at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanDefinitionRegistryPostProcessors(PostProcessorRegistrationDelegate.java:271) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE] at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(PostProcessorRegistrationDelegate.java:91) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE] at org.springframework.context.support.AbstractApplicationContext.invokeBeanFactoryPostProcessors(AbstractApplicationContext.java:694) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE] at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:532) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE] at org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext.refresh(ReactiveWebServerApplicationContext.java:61) ~[spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE] at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:780) [spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE] at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:412) [spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE] at org.springframework.boot.SpringApplication.run(SpringApplication.java:333) [spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1277) [spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1265) [spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE] at com.didispace.stream.TestApplication.main(TestApplication.java:13) [classes/:na]
根據錯誤提示:Invalid bean definition with name 'example-topic' defined in com.didispace.stream.TestTopic: bean definition with this name already exists
,沒有啓動成功的緣由是已經存在了一個名爲example-topic
的Bean,那麼爲何會重複建立這個Bean呢?app
實際上,在F版的Spring Cloud Stream中,當咱們使用@Output
和@Input
註解來定義消息通道時,都會根據傳入的通道名稱來建立一個Bean。而在上面的例子中,咱們定義的@Output
和@Input
名稱是相同的,由於咱們系統輸入和輸出是同一個Topic,這樣才能實現對本身生產消息的消費。spring-boot
既然這樣,咱們定義相同的通道名是行不通了,那麼咱們只能經過定義不一樣的通道名,併爲這兩個通道配置相同的目標Topic來將這一對輸入輸出指向同一個實際的Topic。對於上面的錯誤程序,只須要作以下兩處改動:
第一步:修改通道名,使用不一樣的名字
public interface TestTopic { String OUTPUT = "example-topic-output"; String INPUT = "example-topic-input"; @Output(OUTPUT) MessageChannel output(); @Input(INPUT) SubscribableChannel input(); }
第二步:在配置文件中,爲這兩個通道設置相同的Topic名稱,好比:
spring.cloud.stream.bindings.example-topic-input.destination=aaa-topic spring.cloud.stream.bindings.example-topic-output.destination=aaa-topic
這樣,這兩個輸入輸出通道就會都指向名爲aaa-topic
的Topic了。
最後,再啓動該程序,沒有報錯。而後訪問接口:localhost:8080/sendMessage?message=hello-didi
,能夠在控制檯中看到以下信息:
2018-11-17 23:24:10.425 INFO 32039 --- [ctor-http-nio-2] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672] 2018-11-17 23:24:10.453 INFO 32039 --- [ctor-http-nio-2] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory.publisher#266753da:0/SimpleConnection@627fba83 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 60752] 2018-11-17 23:24:10.458 INFO 32039 --- [ctor-http-nio-2] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable, auto-delete, or exclusive Queue (aaa-topic.anonymous.fNUxZ8C0QIafxrhkFBFI1A) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost. 2018-11-17 23:24:10.483 INFO 32039 --- [IafxrhkFBFI1A-1] com.didispace.stream.TestListener : Received: hello-didi
消費本身生產的消息成功了!讀者也還能夠訪問一下應用的/actuator/beans
端點,看看當前Spring上下文中有哪些Bean,應該能夠看到有下面Bean,也就是上面分析的兩個通道的Bean對象
"example-topic-output": { "aliases": [], "scope": "singleton", "type": "org.springframework.integration.channel.DirectChannel", "resource": null, "dependencies": [] }, "example-topic-input": { "aliases": [], "scope": "singleton", "type": "org.springframework.integration.channel.DirectChannel", "resource": null, "dependencies": [] },
其實大部分開發者在使用Spring Cloud Stream時候碰到的問題都源於對Spring Cloud Stream的核心概念仍是不夠理解。因此,仍是推薦讀一下下面的文章和示例:
本文示例讀者能夠經過查看下面倉庫的中的stream-consumer-self
項目:
若是您對這些感興趣,歡迎star、follow、收藏、轉發給予支持!