最近收到好幾個相似的問題:使用Spring Cloud Stream操做RabbitMQ或Kafka的時候,出現消息重複消費的問題。經過溝通與排查下來主要仍是用戶對消費組的認識不夠。其實,在以前的博文以及《Spring Cloud微服務實戰》一書中都有提到關於消費組的概念以及做用。java
那麼什麼是消費組呢?爲何要用消費組?它解決什麼問題呢?摘錄一段以前博文的內容,來解答這些疑問:git
一般在生產環境,咱們的每一個服務都不會以單節點的方式運行在生產環境,當同一個服務啓動多個實例的時候,這些實例都會綁定到同一個消息通道的目標主題(Topic)上。默認狀況下,當生產者發出一條消息到綁定通道上,這條消息會產生多個副本被每一個消費者實例接收和處理(出現上述重複消費問題)。可是有些業務場景之下,咱們但願生產者產生的消息只被其中一個實例消費,這個時候咱們須要爲這些消費者設置消費組來實現這樣的功能。
詳細也可查看原文:消息驅動的微服務(消費組)。github
下面,經過一個例子來看看如何使用消費組:spring
第一步:建立綁定接口,綁定example-topic
輸入通道(默認狀況下,會綁定到RabbitMQ的同名Exchange或Kafaka的同名Topic)。app
interface ExampleBinder { String NAME = "example-topic"; @Input(NAME) SubscribableChannel input(); }
第二步:對上述輸入通道建立監聽與處理邏輯。負載均衡
@EnableBinding(ExampleBinder.class) public class ExampleReceiver { private static Logger logger = LoggerFactory.getLogger(ExampleReceiver.class); @StreamListener(ExampleBinder.NAME) public void receive(String payload) { logger.info("Received: " + payload); } }
第三步;建立應用主類和配置文件微服務
@SpringBootApplication public class ExampleApplication { public static void main(String[] args) { SpringApplication.run(ExampleApplication.class, args); } }
spring.application.name=stream-consumer-group server.port=0
這裏設置server.port=0
,以方便在本地啓動多實例來重現問題。測試
完成上述操做以後,啓動兩個該應用的實例,以備後續調用。ui
比較簡單,須要注意的是,使用@Output
建立一個同名的輸出綁定,這樣發出的消息才能被上述啓動的實例接收到。具體實現以下:spa
@RunWith(SpringRunner.class) @EnableBinding(value = {ExampleApplicationTests.ExampleBinder.class}) public class ExampleApplicationTests { @Autowired private ExampleBinder exampleBinder; @Test public void exampleBinderTester() { exampleBinder.output().send(MessageBuilder.withPayload("Produce a message from : http://blog.didispace.com").build()); } public interface ExampleBinder { String NAME = "example-topic"; @Output(NAME) MessageChannel output(); } }
啓動上述測試用例以後,能夠發現以前啓動的兩個實例都收到的消息,並在日誌中打印了:Received: Produce a message from : http://blog.didispace.com
。消息重複消費的問題成功重現!
如何解決上述消息重複消費的問題呢?咱們只須要在配置文件中增長以下配置便可:
spring.cloud.stream.bindings.example-topic.group=aaa
當咱們指定了某個綁定所指向的消費組以後,往當前主題發送的消息在每一個訂閱消費組中,只會有一個訂閱者接收和消費,從而實現了對消息的負載均衡。只因此以前會出現重複消費的問題,是因爲默認狀況下,任何訂閱都會產生一個匿名消費組,因此每一個訂閱實例都會有本身的消費組,從而當有消息發送的時候,就造成了廣播的模式。
另外,須要注意上述配置中example-topic
是在代碼中@Output
和@Input
中傳入的名字。
本文示例讀者能夠經過查看下面倉庫的中的stream-consumer-group
項目:
若是您對這些感興趣,歡迎star、follow、收藏、轉發給予支持!