一般在生產環境,咱們的每一個服務都不會以單節點的方式運行在生產環境,當同一個服務啓動多個實例的時候,這些實例都會綁定到同一個消息通道的目標主題(Topic)上。html
默認狀況下,當生產者發出一條消息到綁定通道上,這條消息會產生多個副本被每一個消費者實例接收和處理,可是有些業務場景之下,咱們但願生產者產生的消息只被其中一個實例消費,這個時候咱們須要爲這些消費者設置消費組來實現這樣的功能,實現的方式很是簡單,咱們只須要在服務消費者端設置spring.cloud.stream.bindings.input.group
屬性便可,好比咱們能夠這樣實現:spring
SinkReceiver
,實現了greetings
主題上的輸入通道綁定,它的實現以下: @EnableBinding(value = {Sink.class}) public class SinkReceiver { private static Logger logger = LoggerFactory.getLogger(SinkReceiver.class); @StreamListener(Sink.INPUT) public void receive(User user) { logger.info("Received: " + user); } }
SinkReceiver
的輸入通道目標設置爲greetings
主題,以及將該服務的實例設置爲同一個消費組,作以下設置: spring.cloud.stream.bindings.input.group=Service-A spring.cloud.stream.bindings.input.destination=greetings
經過spring.cloud.stream.bindings.input.group
屬性指定了該應用實例都屬於Service-A
消費組,而spring.cloud.stream.bindings.input.destination
屬性則指定了輸入通道對應的主題名。負載均衡
SinkSender
,具體以下: @EnableBinding(value = {Source.class}) public class SinkSender { private static Logger logger = LoggerFactory.getLogger(SinkSender.class); @Bean @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "2000")) public MessageSource<String> timerMessageSource() { return () -> new GenericMessage<>("{\"name\":\"didi\", \"age\":30}"); } }
SinkSender
作一些設置,讓它的輸出通道綁定目標也指向greetings
主題,具體以下: spring.cloud.stream.bindings.output.destination=greetings
到這裏,對於消費分組的示例就已經完成了。spa
分別運行上面實現的生產者與消費者,其中消費者咱們啓動多個實例。經過控制檯,咱們能夠發現每一個生產者發出的消息,會被啓動的消費者以輪詢的方式進行接收和輸出。源碼來源code