Spring Cloud Stream在同一通道根據消息內容分發不一樣的消費邏輯

  應用場景html

  有的時候,咱們對於同一通道中的消息處理,會經過判斷頭信息或者消息內容來作一些差別化處理,好比:可能在消息頭信息中帶入消息版本號,而後經過if判斷來執行不一樣的處理邏輯,其代碼結構多是這樣的:spring

  @StreamListener(value = TestTopic.INPUT)app

  public void receiveV1(String payload, @Header("version") String version) {優化

  if("1.0".equals(version)) {ui

  // Version 1.0spa

  }.net

  if("2.0".equals(version)) {日誌

  // Version 2.0htm

  }接口

  }

  那麼當消息處理邏輯複雜的時候,這段邏輯就會變得特別複雜。針對這個問題,在@StreamListener註解中提供了一個不錯的屬性condition,能夠用來優化這樣的處理結構。

  動手試試

  下面經過編寫一個簡單的例子來具體體會一下這個屬性的用法:

  @EnableBinding(TestApplication.TestTopic.class)

  @SpringBootApplication

  public class TestApplication {

  public static void main(String[] args) {

  SpringApplication.run(TestApplication.class, args);

  }

  @RestController

  static class TestController {

  @Autowired

  private TestTopic testTopic;

  /**

  * 消息生產接口

  *

  * @param message

  * @return

  */

  @GetMapping("/sendMessage")

  public String messageWithMQ(@RequestParam String message) {

  testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "1.0").build());

  testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "2.0").build());

  return "ok";

  }

  }

  /**

  * 消息消費邏輯

  */

  @Slf4j無錫婦科醫院哪家好 http://wapyyk.39.net/wx/zonghe/fc96e.html/

  @Component

  static class TestListener {

  @StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='1.0'")

  public void receiveV1(String payload, @Header("version") String version) {

  log.info("Received v1 : " + payload + ", " + version);

  }

  @StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='2.0'")

  public void receiveV2(String payload, @Header("version") String version) {

  log.info("Received v2 : " + payload + ", " + version);

  }

  }

  interface TestTopic {

  String OUTPUT = "example-topic-output";

  String INPUT = "example-topic-input";

  @Output(OUTPUT)

  MessageChannel output();

  @Input(INPUT)

  SubscribableChannel input();

  }

  }

  內容很簡單,既包含了消息的生產,也包含了消息消費。在/sendMessage接口的定義中,發送了兩條消息,一條消息的頭信息中包含version=1.0,另一條消息的頭信息中包含version=2.0。在消息監聽類TestListener中,對TestTopic.INPUT通道定義了兩個@StreamListener,這兩個監聽邏輯有不一樣的condition,這裏的表達式表示會根據消息頭信息中的version值來作不一樣的處理邏輯分發。

  在啓動應用以前,還要記得配置一下輸入輸出通道對應的物理目標(exchange或topic名),好比:

  spring.cloud.stream.bindings.example-topic-input.destination=test-topic

  spring.cloud.stream.bindings.example-topic-input.group=stream-content-route

  spring.cloud.stream.bindings.example-topic-output.destination=test-topic

  完成了上面配置以後,就能夠啓動應用,並嘗試訪問localhost:8080/sendMessage?message=hello接口來發送一個消息到MQ中了。此時能夠看到相似下面的日誌:

  2018-12-24 15:50:33.361 INFO 17683 --- [content-route-1] c.d.stream.TestApplication$TestListener : Received v1 : hello, 1.0

  2018-12-24 15:50:33.363 INFO 17683 --- [content-route-1] c.d.stream.TestApplication$TestListener : Received v2 : hello, 2.0

  從日誌中能夠看到,兩條帶有不一樣頭信息的消息,分別經過不一樣的監聽處理邏輯輸出了對應的日誌打印。

相關文章
相關標籤/搜索