在上篇文章中咱們給你們介紹了Stream的消息分組,能夠實現消息的重複消費的問題,但在某些場景下分組還不能知足咱們的需求,好比,同時有多條同一個用戶的數據,發送過來,咱們須要根據用戶統計,可是消息被分散到了不一樣的集羣節點上了,這時咱們就能夠考慮消息分區了。
當生產者將消息數據發送給多個消費者實例時,保證同一消息數據始終是由同一個消費者實例接收和處理。java
將咱們上篇文章中的分組的三個項目,拷貝一份修更名稱及服務名稱git
發送多條消息查看效果github
@RunWith(SpringRunner.class) @SpringBootTest(classes=StreamSenderStart.class) public class StreamTest { @Autowired private ISendeService sendService; @Test public void testStream(){ Product p = new Product(999, "stream test ...999"); // 將須要發送的消息封裝爲Message對象 Message message = MessageBuilder .withPayload(p) .build(); for (int i = 0; i < 10; i++) { // 發送多條消息到隊列中 sendService.send().send(message ); } } }
10條消息被隨機的分散到了兩個消費者中:spring
咱們能夠看到A中6條消息,B中4條消息,並且這是隨機的,下次執行的結果確定不同。app
spring.application.name=stream-partition-sender server.port=9060 #設置服務註冊中心地址,指向另外一個註冊中心 eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/ #rebbitmq 連接信息 spring.rabbitmq.host=192.168.88.150 spring.rabbitmq.port=5672 spring.rabbitmq.username=dpb spring.rabbitmq.password=123 spring.rabbitmq.virtualHost=/ # 對應 MQ 是 exchange outputProduct自定義的信息 spring.cloud.stream.bindings.outputProduct.destination=exchangeProduct #經過該參數指定了分區鍵的表達式規則 spring.cloud.stream.bindings.outputProduct.producer.partitionKeyExpression=payload #指定了消息分區的數量。 spring.cloud.stream.bindings.outputProduct.producer.partitionCount=2
服務A測試
spring.application.name=stream-partition-receiverA server.port=9070 #設置服務註冊中心地址,指向另外一個註冊中心 eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/ #rebbitmq 連接信息 spring.rabbitmq.host=192.168.88.150 spring.rabbitmq.port=5672 spring.rabbitmq.username=dpb spring.rabbitmq.password=123 spring.rabbitmq.virtualHost=/ # 對應 MQ 是 exchange 和消息發送者的 交換器是同一個 spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct # 具體分組 對應 MQ 是 隊列名稱 而且持久化隊列 inputProduct 自定義 spring.cloud.stream.bindings.inputProduct.group=groupProduct999 #開啓消費者分區功能 spring.cloud.stream.bindings.inputProduct.consumer.partitioned=true #指定了當前消費者的總實例數量 spring.cloud.stream.instanceCount=2 #設置當前實例的索引號,從 0 開始 spring.cloud.stream.instanceIndex=0
服務Bui
spring.application.name=stream-partition-receiverB server.port=9071 #設置服務註冊中心地址,指向另外一個註冊中心 eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/ #rebbitmq 連接信息 spring.rabbitmq.host=192.168.88.150 spring.rabbitmq.port=5672 spring.rabbitmq.username=dpb spring.rabbitmq.password=123 spring.rabbitmq.virtualHost=/ # 對應 MQ 是 exchange 和消息發送者的 交換器是同一個 spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct # 具體分組 對應 MQ 是 隊列名稱 而且持久化隊列 inputProduct 自定義 spring.cloud.stream.bindings.inputProduct.group=groupProduct999 #開啓消費者分區功能 spring.cloud.stream.bindings.inputProduct.consumer.partitioned=true #指定了當前消費者的總實例數量 spring.cloud.stream.instanceCount=2 #設置當前實例的索引號,從 1 開始 spring.cloud.stream.instanceIndex=1
啓動服務測試3d
10個消息都被消費者A給消費了,說明到達了咱們須要的效果。
案例源碼:https://github.com/q279583842q/springcloud-e-bookcode