本篇文章爲系列文章,未讀第一集的同窗請猛戳這裏:Spring Cloud 系列之 Stream 消息驅動(一)java
本篇文章講解 Stream 如何實現消息分組和消息分區。spring
點擊連接觀看:Stream 消息分組視頻(獲取更多請關注公衆號「哈嘍沃德先生」)shell
若是有多個消息消費者,那麼消息生產者發送的消息會被多個消費者都接收到,這種狀況在某些實際場景下是有很大問題的,好比在以下場景中,訂單系統作集羣部署,都會從 RabbitMQ 中獲取訂單信息,若是一個訂單消息同時被兩個服務消費,系統確定會出現問題。爲了不這種狀況,Stream 提供了消息分組來解決該問題。express
在 Stream 中處於同一個 group
中的多個消費者是競爭關係,可以保證消息只會被其中一個應用消費。不一樣的組是能夠消費的,同一個組會發生競爭關係,只有其中一個能夠消費。經過 spring.cloud.stream.bindings.<bindingName>.group
屬性指定組名。segmentfault
在 stream-demo
項目下建立 stream-consumer02
子項目。 api
項目代碼使用入門案例中消息消費者的代碼。服務器
單元測試代碼以下:app
package com.example; import com.example.producer.MessageProducer; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest(classes = {StreamProducerApplication.class}) public class MessageProducerTest { @Autowired private MessageProducer messageProducer; @Test public void testSend() { messageProducer.send("hello spring cloud stream"); } }
運行單元測試發送消息,兩個消息消費者控制檯打印結果以下:ide
stream-consumer 的控制檯:單元測試
message = hello spring cloud stream
stream-consumer02 的控制檯:
message = hello spring cloud stream
經過結果能夠看到消息被兩個消費者同時消費了,緣由是由於它們屬於不一樣的分組,默認狀況下分組名稱是隨機生成的,經過 RabbitMQ 也能夠得知:
stream-consumer 的分組配置爲:group-A
。
server: port: 8002 # 端口 spring: application: name: stream-consumer # 應用名稱 rabbitmq: host: 192.168.10.101 # 服務器 IP port: 5672 # 服務器端口 username: guest # 用戶名 password: guest # 密碼 virtual-host: / # 虛擬主機地址 cloud: stream: bindings: # 消息接收通道 # 與 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 註解的 value 相同 input: destination: stream.message # 綁定的交換機名稱 group: group-A
stream-consumer02 的分組配置爲:group-A
。
server: port: 8003 # 端口 spring: application: name: stream-consumer # 應用名稱 rabbitmq: host: 192.168.10.101 # 服務器 IP port: 5672 # 服務器端口 username: guest # 用戶名 password: guest # 密碼 virtual-host: / # 虛擬主機地址 cloud: stream: bindings: # 消息接收通道 # 與 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 註解的 value 相同 input: destination: stream.message # 綁定的交換機名稱 group: group-A
運行單元測試發送消息,此時多個消息消費者只有其中一個能夠消費。RabbitMQ 結果以下:
點擊連接觀看:Stream 消息分區視頻(獲取更多請關注公衆號「哈嘍沃德先生」)
經過消息分組能夠解決消息被重複消費的問題,但在某些場景下分組還不能知足咱們的需求。好比,同時有多條同一個用戶的數據發送過來,咱們須要根據用戶統計,可是消息被分散到了不一樣的集羣節點上了,這時咱們就能夠考慮使用消息分區了。
當生產者將消息發送給多個消費者時,保證同一消息始終由同一個消費者實例接收和處理。消息分區是對消息分組的一種補充。
先給你們演示一下消息未分區的效果,單元測試代碼以下:
package com.example; import com.example.producer.MessageProducer; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest(classes = {StreamProducerApplication.class}) public class MessageProducerTest { @Autowired private MessageProducer messageProducer; @Test public void testSend() { for (int i = 1; i <= 10; i++) { messageProducer.send("hello spring cloud stream"); } } }
運行單元測試發送消息,兩個消息消費者控制檯打印結果以下:
stream-consumer 的控制檯:
message = hello spring cloud stream message = hello spring cloud stream message = hello spring cloud stream message = hello spring cloud stream message = hello spring cloud stream
stream-consumer02 的控制檯:
message = hello spring cloud stream message = hello spring cloud stream message = hello spring cloud stream message = hello spring cloud stream message = hello spring cloud stream
假設這 10 條消息都來自同一個用戶,正確的方式應該都由一個消費者消費全部消息,不然系統確定會出現問題。爲了不這種狀況,Stream 提供了消息分區來解決該問題。
消息生產者配置分區鍵的表達式規則和消息分區的數量。
server: port: 8001 # 端口 spring: application: name: stream-producer # 應用名稱 rabbitmq: host: 192.168.10.101 # 服務器 IP port: 5672 # 服務器端口 username: guest # 用戶名 password: guest # 密碼 virtual-host: / # 虛擬主機地址 cloud: stream: bindings: # 消息發送通道 # 與 org.springframework.cloud.stream.messaging.Source 中的 @Output("output") 註解的 value 相同 output: destination: stream.message # 綁定的交換機名稱 producer: partition-key-expression: payload # 配置分區鍵的表達式規則 partition-count: 2 # 配置消息分區的數量
經過 partition-key-expression
參數指定分區鍵的表達式規則,用於區分每一個消息被髮送至對應分區的輸出 channel
。
該表達式做用於傳遞給 MessageChannel
的 send
方法的參數,該參數實現 org.springframework.messaging.Message
接口的 GenericMessage
類。
源碼 MessageChannel.java
package org.springframework.messaging; @FunctionalInterface public interface MessageChannel { long INDEFINITE_TIMEOUT = -1L; default boolean send(Message<?> message) { return this.send(message, -1L); } boolean send(Message<?> var1, long var2); }
源碼 GenericMessage.java
package org.springframework.messaging.support; import java.io.Serializable; import java.util.Map; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; public class GenericMessage<T> implements Message<T>, Serializable { private static final long serialVersionUID = 4268801052358035098L; private final T payload; private final MessageHeaders headers; ... }
若是 partition-key-expression
的值是 payload
,將會使用全部放在 GenericMessage
中的數據做爲分區數據。payload
是消息的實體類型,能夠爲自定義類型好比 User
,Role
等等。
若是 partition-key-expression
的值是 headers["xxx"]
,將由 MessageBuilder
類的 setHeader()
方法完成賦值,好比:
package com.example.producer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; /** * 消息生產者 */ @Component @EnableBinding(Source.class) public class MessageProducer { @Autowired private Source source; /** * 發送消息 * * @param message */ public void send(String message) { source.output().send(MessageBuilder.withPayload(message).setHeader("xxx", 0).build()); } }
消息消費者配置消費者總數和當前消費者的索引並開啓分區支持。
stream-consumer 的 application.yml
server: port: 8002 # 端口 spring: application: name: stream-consumer # 應用名稱 rabbitmq: host: 192.168.10.101 # 服務器 IP port: 5672 # 服務器端口 username: guest # 用戶名 password: guest # 密碼 virtual-host: / # 虛擬主機地址 cloud: stream: instance-count: 2 # 消費者總數 instance-index: 0 # 當前消費者的索引 bindings: # 消息接收通道 # 與 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 註解的 value 相同 input: destination: stream.message # 綁定的交換機名稱 group: group-A consumer: partitioned: true # 開啓分區支持
stream-consumer02 的 application.yml
server: port: 8003 # 端口 spring: application: name: stream-consumer # 應用名稱 rabbitmq: host: 192.168.10.101 # 服務器 IP port: 5672 # 服務器端口 username: guest # 用戶名 password: guest # 密碼 virtual-host: / # 虛擬主機地址 cloud: stream: instance-count: 2 # 消費者總數 instance-index: 1 # 當前消費者的索引 bindings: # 消息接收通道 # 與 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 註解的 value 相同 input: destination: stream.message # 綁定的交換機名稱 group: group-A consumer: partitioned: true # 開啓分區支持
運行單元測試發送消息,此時多個消息消費者只有其中一個能夠消費全部消息。RabbitMQ 結果以下:
至此 Stream 消息驅動全部的知識點就講解結束了。
本文采用 知識共享「署名-非商業性使用-禁止演繹 4.0 國際」許可協議
。
你們能夠經過 分類
查看更多關於 Spring Cloud
的文章。
🤗 您的點贊
和轉發
是對我最大的支持。
📢 掃碼關注 哈嘍沃德先生
「文檔 + 視頻」每篇文章都配有專門視頻講解,學習更輕鬆噢 ~