用Spring cloud Stream來開發基於MQ消息驅動的微服務

Spring boot對MQ類如RabbitMQ、kafka支持都很好,可是仍然要寫一些模板代碼。Spring cloud stream進一步掩蓋了這個差別,僅僅使用配置就能夠完成。spring

 

Spring cloud Stream 用了基於topic-subsriber的模式,雖然不支持所有MQ的特性,但絕大多數應用來講,這樣就足夠用了,畢竟方便不少。具體用法以下:數據庫

包含的包

        <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bus-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>

MQ接口

public interface MyMQInterface{ @Input("通道名") SubscribableChannel inputChannel(); @Output("通道名") MessageChannel outputChanel(); }
  • 通道名發送方和接受方必須是一致的
  • 一個應用不能既是同一個通道的發送方和接收方,不然會警告。這實際上也沒有意義。
  • 聲明後,啓用綁定
  @EnableBinding(MyMQInterface.class)
  • 能夠綁定多個接口

發送MQ消息

 @Autowired MyMQInterface myInterface; ..... myInterface.inputChannel().send(MessageBuilder....);

接受消息

@SystemListener("通道名") public void onReceive(Message content)

應用集羣問題

  • 若是某個應用起來多個實例,如上面的配置,會致使每條消息每一個實例都會收到,若是你不想這麼作,請在配置裏面加上:
spring.cloud.stream.bindings.testOrders.group=分組名
  • 每一個應用定義一個惟一的分組名,很差和其餘應用重複。

消息處理異常

  • 若是收到消息處理有問題,好比寫入數據庫失敗,請拋出RuntimeException異常,MQ會重試,不太重試幾回後會失敗,這個要注意。
相關文章
相關標籤/搜索