MQ消息中間件⼴泛應⽤在應⽤解耦合、異步消息處理、流量削峯等場景中。java
不一樣的MQ消息中間件內部機制包括使⽤⽅式都會有所不一樣,⽐如RabbitMQ中有Exchange(交換機/交換器)這⼀概念, kafka有Topic、 Partition分區這些概念, MQ消息中間件的差別性不利於咱們上層的開發應⽤,當咱們的系統但願從原有的RabbitMQ切換到Kafka時,咱們會發現⽐較困難,不少要操做可能重來(由於應⽤程序和具體的某⼀款MQ消息中間件耦合在⼀起了)。git
Spring Cloud Stream進⾏了很好的上層抽象,可讓咱們與具體消息中間件解耦合,屏蔽掉了底層具體MQ消息中間件的細節差別,就像Hibernate屏蔽掉了具體數據庫(Mysql/Oracle⼀樣)。如此⼀來,咱們學習、開發、維護MQ都會變得輕鬆。⽬前Spring Cloud Stream⽀持RabbitMQ和Kafka。spring
本質: 屏蔽掉了底層不一樣MQ消息中間件之間的差別,統⼀了MQ的編程模型,下降了學習、開發、維護MQ的成本sql
Spring Cloud Stream 是⼀個構建消息驅動微服務的框架。應⽤程序經過inputs(至關於消息消費者consumer)或者outputs(至關於消息⽣產者producer)來與Spring Cloud Stream中的binder對象交互,⽽Binder對象是⽤來屏蔽底層MQ細節的,它負責與具體的消息中間件交互。數據庫
說⽩了:對於咱們來講,只須要知道如何使⽤Spring Cloud Stream與Binder對象交互便可編程
註解 | 描述 |
---|---|
@Input(在消費者⼯程中使⽤) | 註解標識輸⼊通道,經過該輸⼊通道接收到的消息進⼊應⽤程序 |
@Output(在⽣產者⼯程中使⽤) | 註解標識輸出通道,發佈的消息將經過該通道離開應⽤程序 |
@StreamListener(在消費者⼯程中使⽤,監聽message的到來) | 監聽隊列,⽤於消費者的隊列的消息的接收(有消息監聽.....) |
@EnableBinding | 把Channel和Exchange(對於RabbitMQ)綁定在⼀起 |
在父工程下新建子模塊lagou-cloud-stream-producer-9090,引入jar:json
<!--spring cloud stream 依賴(rabbit) --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
添加rabbit相關配置:app
spring: application: name: lagou-cloud-stream-producer cloud: stream: binders: # 綁定MQ服務信息(此處咱們是RabbitMQ) lagouRabbitBinder: # 給Binder定義的名稱,⽤於後⾯的關聯 type: rabbit # MQ類型,若是是Kafka的話,此處配置kafka environment: # MQ環境配置(⽤戶名、密碼等) spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 關聯整合通道和binder對象 output: # output是咱們定義的通道名稱,此處不能亂改 destination: lagouExchange # 要使⽤的Exchange名稱(消息隊列主題名稱) content-type: text/plain # application/json # 消息類型設置,⽐如json binder: lagouRabbitBinder # 關聯MQ服務
定義一個發送消息的接口及實現類:框架
public interface IMessageProduder { void sendMessage(String message); }
//綁定的通道,output。springcloud stream內對輸出通道的定義 @EnableBinding(Source.class) public class MessageProducerImpl implements IMessageProduder { @Autowired private Source source; @Override public void sendMessage(String message) { //向mq發送消息(並不直接操做mq,而是操做springcloud stream //使用source指定的output通道向外發送消息 source.output().send(MessageBuilder.withPayload(message).build()); } }
父工程下新建消費者子模塊lagou-cloud-stream-consumer-9091,引入jar座標和服務端同樣,這裏再也不贅述。
application.yml裏面配置與rabbit相關參數,惟一與服務者端不一樣的是input和output參數:
其餘都保持一致。
在消費端定義service類來接受消息:異步
@EnableBinding(Sink.class) public class MessageConsumerService { @StreamListener(Sink.INPUT) public void receiveMessage(Message<String> message){ System.out.println("----接受到的消息---->"+message); } }
咱們在服務提供者端寫一個測試類來發送消息:
@SpringBootTest(classes = {StreamProducerApplication9090.class}) @RunWith(SpringJUnit4ClassRunner.class) public class MessageProducerTest { @Autowired private IMessageProduder iMessageProduder; @Test public void testSendMessage(){ iMessageProduder.sendMessage("hello world----!!"); } }
咱們先啓動服務消費者,而後運行服務提供者端的測試類,看服務消費者端的控制檯輸出了接收到的信息:
咱們將服務消費者複製一份,新消費者的端口是9092,前一個消費者端口是9091。
這樣咱們繼續測試,會發現同一個服務提供者發送的消息,被兩個消費者都接收到並進行處理了。
這明顯是有問題的,好比電商網站的訂單,確定只須要處理一次就行。
爲了解決這個問題,rabbitmq有個消息分組的概念,只要兩個消費者實例處在一個組裏,那麼這個組裏只有一個消費者會處理這個消息。
咱們僅僅須要在服務消費者端設置 spring.cloud.stream.bindings.input.group 屬性,多個消費者實例配置爲同⼀個group名稱(在同⼀個group中的多個消費者只有⼀個能夠獲取到消息並消費)。以下:
擴展:前面咱們都是先啓動服務消費者,而後再啓動服務提供者發送消息,也就是消息是臨時性的,並無持久化存儲。當咱們設置了分組以後,消息就會持久化存儲。咱們先發送消息,而後再啓動服務消費者客戶端,也可以接收到消息。
歡迎訪問個人博客:https://www.liuyj.top