Spring Cloud Stream使用細節

上篇文章咱們看了Spring Cloud Stream的基本使用,小夥伴們對Spring Cloud Stream應該也有了一個基本的瞭解,可是上篇文章中的消息咱們是從RabbitMQ的web管理頁面發來的,若是咱們想要從代碼中發送消息呢?本文咱們就來看看Spring Cloud Stream的一些使用細節。web


本文是Spring Cloud系列的第三十篇文章,瞭解前二十九篇文章內容有助於更好的理解本文: spring

1.使用Spring Cloud搭建服務註冊中心
2.使用Spring Cloud搭建高可用服務註冊中心
3.Spring Cloud中服務的發現與消費
4.Eureka中的核心概念
5.什麼是客戶端負載均衡
6.Spring RestTemplate中幾種常見的請求方式
7.RestTemplate的逆襲之路,從發送請求到負載均衡
8.Spring Cloud中負載均衡器概覽
9.Spring Cloud中的負載均衡策略
10.Spring Cloud中的斷路器Hystrix
11.Spring Cloud自定義Hystrix請求命令
12.Spring Cloud中Hystrix的服務降級與異常處理
13.Spring Cloud中Hystrix的請求緩存
14.Spring Cloud中Hystrix的請求合併
15.Spring Cloud中Hystrix儀表盤與Turbine集羣監控
16.Spring Cloud中聲明式服務調用Feign
17.Spring Cloud中Feign的繼承特性
18.Spring Cloud中Feign配置詳解
19.Spring Cloud中的API網關服務Zuul
20.Spring Cloud Zuul中路由配置細節
21.Spring Cloud Zuul中異常處理細節
22.分佈式配置中心Spring Cloud Config初窺
23.Spring Cloud Config服務端配置細節(一)
24.Spring Cloud Config服務端配置細節(二)之加密解密
25.Spring Cloud Config客戶端配置細節
26.Spring Cloud Bus之RabbitMQ初窺
27.Spring Cloud Bus整合RabbitMQ
28.Spring Cloud Bus整合Kafka
29.Spring Cloud Stream初窺緩存


自定義消息通道

上篇文章咱們提到了Sink和Source兩個接口,這兩個接口中分別定義了輸入通道和輸出通道,而Processor經過繼承Source和Sink,同時具備輸入通道和輸出通道。這裏咱們就模仿Sink和Source,來定義一個本身的消息通道。 負載均衡

仍是在上文的基礎上,首先咱們定義一個接口叫作MySink,以下:分佈式

public interface MySink {
    String INPUT = "mychannel";

    @Input(INPUT)
    SubscribableChannel input();
}

這裏咱們定義了一個名爲mychannel的消息輸入通道,@Input註解的參數則表示了消息通道的名稱,同時咱們還定義了一個方法返回一個SubscribableChannel對象,該對象用來維護消息通道訂閱者。而後,咱們再定義一個名爲MySource的接口,以下:微服務

public interface MySource {
    @Output(MySink.INPUT)
    MessageChannel output();
}

@Output註解中描述了消息通道的名稱,仍是mychannel,而後這裏咱們也定義了一個返回MessageChannel對象的方法,該對象中有一個向消息通道發送消息的方法。 單元測試

最後咱們定義一個消息接收類,以下:測試

@EnableBinding(value = {MySink.class})
public class SinkReceiver2 {
    private static Logger logger = LoggerFactory.getLogger(StreamHelloApplication.class);

    @StreamListener(MySink.INPUT)
    public void receive(Object playload) {
        logger.info("Received:" + playload);
    }
}

OK,咱們在這裏綁定消息通道,而後監聽自定義的消息通道,最後來一個單元測試測試一下,以下:ui

@RunWith(SpringJUnit4ClassRunner.class)
@WebAppConfiguration
@SpringBootTest(classes = StreamHelloApplication.class)
@EnableBinding(MySource.class)
public class StreamHelloApplicationTests {

    @Autowired
    private MySource mySource;

    @Test
    public void contextLoads() {
        mySource.output().send(MessageBuilder.withPayload("hello 123").build());
    }
}

運行單元測試,咱們能夠看到以下日誌,表示消息發送成功了: 加密

圖片描述

若是想要發送對象也能夠直接發送,不用進行對象轉換,以下:

發送:

Book book = new Book(1l, "三國演義", "羅貫中");
mySource.output().send(MessageBuilder.withPayload(book).build());

接收:

@StreamListener(MySink.INPUT)
public void receive(Book playload) {
    logger.info("Received:" + playload);
}

若是咱們想要在接收成功後給一個回執,也是OK的,以下:

@StreamListener(MySink.INPUT)
@SendTo(Source.OUTPUT)//定義回執發送的消息通道
public String receive(Book playload) {
    logger.info("Received:" + playload);
    return "receive msg :" + playload;
}

方法的返回值就是回執消息,回執消息在系統默認的output通道中,咱們若是想要接收這個消息,固然就要監聽這個通道,以下:

@StreamListener(Source.OUTPUT)
public void receive2(String msg) {
    System.out.println("msg:"+msg);
}

固然要記得Source類也要在@EnableBinding註解中進行綁定。此時運行結果以下:

圖片描述

消費組

因爲咱們的服務可能會有多個實例同時在運行,若是不作任何設置,此時發送一條消息將會被全部的實例接收到,可是有的時候咱們可能只但願消息被一個實例所接收,這個需求咱們能夠經過消息分組來解決。方式很簡單,給項目配置消息組和主題,以下:

spring.cloud.stream.bindings.mychannel.group=g1
spring.cloud.stream.bindings.mychannel.destination=dest1

這裏咱們設置該工程都屬於g1消費組,輸入通道的主題名則爲dest1。這裏配置完成以後,咱們在消息發送方作以下配置:

spring.cloud.stream.bindings.mychannel.destination=dest1

也配置消息主題名爲dest1(若是發送和接收就在同一個應用中,則這裏能夠不配置)。OK,此時咱們將咱們的項目啓動兩個實例,注意兩個實例的端口不同,此時若是咱們再發送消息,則只會被兩個實例中的一個接收到,另一個應用則接收不到,可是究竟是兩個實例中的哪個接收,則是不肯定的。

消息分區

有的時候,咱們可能須要相同特徵的消息可以老是被髮送到同一個消費者上去處理,若是咱們只是單純的使用消費組則沒法實現功能,此時咱們須要藉助於消息分區,消息分區以後,具備相同特徵的消息就能夠老是被同一個消費者處理了,配置方式以下(這裏的配置都是在消費組的配置基礎上完成的):

在消費者上添加以下配置:

spring.cloud.stream.bindings.mychannel.consumer.partitioned=true
spring.cloud.stream.instance-count=2
spring.cloud.stream.instance-index=0

關於這個配置我說三點:

1.第一行表示開啓消息分區
2.第二行表示當前消息者的總的實例個數
3.第三行表示當前實例的索引,從0開始,當咱們啓動多個實例時,須要在啓動時在命令行配置索引

而後在消息生產者上添加以下配置:

spring.cloud.stream.bindings.mychannel.producer.partitionKeyExpression=payload
spring.cloud.stream.bindings.mychannel.producer.partitionCount=2

第一行配置設置了分區鍵的表達式規則,第二行則設置了消息分區數量。

OK,此時咱們再次啓動多個消費者實例,而後重複發送多條消息,這些消息都將被同一個消費者處理掉。

Spring Cloud Stream使用細節咱們就先說到這裏,有問題歡迎留言討論。

參考資料:
1.《Spring Cloud微服務實戰》

更多JavaEE資料請關注公衆號:

圖片描述

相關文章
相關標籤/搜索