pom依賴:算法
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
配置:spring
spring.rabbitmq.host=192.168.99.100 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
而後就能夠直接使用stream的封裝的方式進行消息收發,代碼層面對rabbitmq無感知。微服務
默認狀況下,sender端綁定的exchange會在rabbitmq上建立一個持久化的exchange,類型是topic,routing key是#.reciver端會自動建立一個臨時的queue(隊列名是自動生成的),綁定到exhange上面。 這樣的話若是有多個消費端節點,那麼一條消息就會被多個消費者同時消費。 所以消息分組無非就是消費端指定固定的queue名稱。這樣多個消費者都會綁定到相同的queque上,那麼一條消息也就只會被一個消費者消費。
消費端配置:3d
# 指定交換機名稱 spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct # 指定隊列名稱 spring.cloud.stream.bindings.inputProduct.group=groupProduct
public interface IReceiveService { String INPUT="inputProduct"; @Input(INPUT) SubscribableChannel receive(); }
這樣,Rabbitmq上會建立一個topic類型,routing key爲#,持久化的echange,名稱爲exchangeProduct。 會建立一個持久化的queue,名稱爲exchangeProduct.groupProduct。code
在rabbitmq中沒有這個概念。 在微服務中,是爲了保證相同的消息被同一個節點接收的問題。 相同消息指的應該是序列化後的內容相同。blog
問題:將相同消息路由到同一節點的使用場景是什麼?索引
生產端配置:rabbitmq
# exchange名稱 spring.cloud.stream.bindings.outputProduct.destination=exchangeProduct #經過該參數指定了分區鍵的表達式規則 spring.cloud.stream.bindings.outputProduct.producer.partitionKeyExpression=payload #指定了消息分區的數量(也就是消費者數量) spring.cloud.stream.bindings.outputProduct.producer.partitionCount=2
消費端配置:隊列
# exchange名稱 spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct # 隊列名稱 spring.cloud.stream.bindings.inputProduct.group=groupProduct #開啓消費者分區功能 spring.cloud.stream.bindings.inputProduct.consumer.partitioned=true #指定了當前消費者的總實例數量 spring.cloud.stream.instanceCount=2 #設置當前實例的索引號,從0開始 spring.cloud.stream.instanceIndex=0
這樣配置後,在rabbitmq上,會建立兩個持久化隊列exchangeProduct.groupProduct-0和exchangeProduct.groupProduct-1. 和交換機的routing key分別爲exchangeProduct-0,exchangeProduct-1.ci
可見,消息分區的功能徹底是由stream端實現的。實現大體應該是在生產者端,stream根據序列化後的內容,根據特定的hash算法,將消息路由到特定的routing key,進而發送到對應的queue上面去,從而實現了相同消息會被髮送到相同的節點上。