做者 | 洛夜 來源 | 阿里巴巴雲原生公衆號node
在 Spring 生態中玩轉 RocketMQ 系列文章:git
- 《如何在 Spring 生態中玩轉 RocketMQ?》
- 《羅美琪和春波特的故事...》
- 《RocketMQ-Spring 畢業兩週年,爲何能成爲 Spring 生態中最受歡迎的 messaging 實現?》
- 《使用 rocketmq-spring-boot-starter 來配置、發送和消費 RocketMQ 消息》
- 《Spring Cloud Stream 體系及原理介紹》
本文配套可交互教程已登陸阿里雲知行動手實驗室,PC 端登陸 start.aliyun.com 在瀏覽器中當即體驗。github
Spring Cloud Bus 對本身的定位是 Spring Cloud 體系內的消息總線,使用 message broker 來鏈接分佈式系統的全部節點。Bus 官方的 Reference 文檔 比較簡單,簡單到連一張圖都沒有。spring
這是最新版的 Spring Cloud Bus 代碼結構(代碼量比較少):json
Bus 實例演示
在分析 Bus 的實現以前,咱們先來看兩個使用 Spring Cloud Bus 的簡單例子。瀏覽器
1. 全部節點的配置新增
Bus 的例子比較簡單,由於 Bus 的 AutoConfiguration 層都有了默認的配置,只須要引入消息中間件對應的 Spring Cloud Stream 以及 Spring Cloud Bus 依賴便可,以後全部啓動的應用都會使用同一個 Topic 進行消息的接收和發送。併發
Bus 對應的 Demo 已經放到了 github 上, 該 Demo 會模擬啓動 5 個節點,只須要對其中任意的一個實例新增配置項,全部節點都會新增該配置項。app
Demo 地址:https://github.com/fangjian0423/rocketmq-binder-demo/tree/master/rocketmq-bus-demodom
訪問任意節點提供的 Controller 提供的獲取配置的地址(key 爲hangzhou):curl
curl -X GET 'http://localhost:10001/bus/env?key=hangzhou'
全部節點返回的結果都是 unknown,由於全部節點的配置中沒有hangzhou這個 key。
Bus 內部提供了EnvironmentBusEndpoint這個 Endpoint 經過 message broker 用來新增/更新配置。
訪問任意節點該 Endpoint 對應的 url: /actuator/bus-env?name=hangzhou&value=alibaba 進行配置項的新增(好比訪問 node1 的url):
curl -X POST 'http://localhost:10001/actuator/bus-env?name=hangzhou&value=alibaba' -H 'content-type: application/json'
而後再次訪問全部節點/bus/env獲取配置:
$ curl -X GET 'http://localhost:10001/bus/env?key=hangzhou' unknown% ~ ⌚ $ curl -X GET 'http://localhost:10002/bus/env?key=hangzhou' unknown% ~ ⌚ $ curl -X GET 'http://localhost:10003/bus/env?key=hangzhou' unknown% ~ ⌚ $ curl -X GET 'http://localhost:10004/bus/env?key=hangzhou' unknown% ~ ⌚ $ curl -X GET 'http://localhost:10005/bus/env?key=hangzhou' unknown% ~ ⌚ $ curl -X POST 'http://localhost:10001/actuator/bus-env?name=hangzhou&value=alibaba' -H 'content-type: application/json' ~ ⌚ $ curl -X GET 'http://localhost:10005/bus/env?key=hangzhou' alibaba% ~ ⌚ $ curl -X GET 'http://localhost:10004/bus/env?key=hangzhou' alibaba% ~ ⌚ $ curl -X GET 'http://localhost:10003/bus/env?key=hangzhou' alibaba% ~ ⌚ $ curl -X GET 'http://localhost:10002/bus/env?key=hangzhou' alibaba% ~ ⌚ $ curl -X GET 'http://localhost:10001/bus/env?key=hangzhou' alibaba%
能夠看到,全部節點都新增了一個 key 爲hangzhou的配置,且對應的 value 是alibaba。這個配置項是經過 Bus 提供的 EnvironmentBusEndpoint 完成的。
這裏引用 程序猿DD 畫的一張圖片,Spring Cloud Config 配合 Bus 完成全部節點配置的刷新來描述以前的實例(本文實例不是刷新,而是新增配置,可是流程是同樣的):
2. 部分節點的配置修改
好比在 node1 上指定 destination 爲 rocketmq-bus-node2 ( node2 配置了 spring.cloud.bus.id 爲rocketmq-bus-node2:10002,能夠匹配上) 進行配置的修改:
curl -X POST 'http://localhost:10001/actuator/bus-env/rocketmq-bus-node2?name=hangzhou&value=xihu' -H 'content-type: application/json'
訪問/bus/env 獲取配置(因爲在 node1 上發送消息,Bus 也會對發送方的節點 node1 進行配置修改):
~ ⌚ $ curl -X POST 'http://localhost:10001/actuator/bus-env/rocketmq-bus-node2?name=hangzhou&value=xihu' -H 'content-type: application/json' ~ ⌚ $ curl -X GET 'http://localhost:10005/bus/env?key=hangzhou' alibaba% ~ ⌚ $ curl -X GET 'http://localhost:10004/bus/env?key=hangzhou' alibaba% ~ ⌚ $ curl -X GET 'http://localhost:10003/bus/env?key=hangzhou' alibaba% ~ ⌚ $ curl -X GET 'http://localhost:10002/bus/env?key=hangzhou' xihu% ~ ⌚ $ curl -X GET 'http://localhost:10001/bus/env?key=hangzhou' xihu%
能夠看到,只有 node1 和 node2 修改了配置,其他的 3 個節點配置未改變。
Bus 的實現
1. Bus 概念介紹
1)事件
Bus 中定義了遠程事件RemoteApplicationEvent,該事件繼承了 Spring 的事件ApplicationEvent,並且它目前有 4 個具體的實現:
- EnvironmentChangeRemoteApplicationEvent:遠程環境變動事件。主要用於接收一個 Map<String,String> 類型的數據並更新到 Spring 上下文中 Environment 中的事件。文中的實例就是使用這個事件並配合 EnvironmentBusEndpoint 和 EnvironmentChangeListener 完成的。
- AckRemoteApplicationEvent:遠程確認事件。Bus 內部成功接收到遠程事件後會發送回AckRemoteApplicationEvent確認事件進行確認。
- RefreshRemoteApplicationEvent: 遠程配置刷新事件。配合 @RefreshScope 以及全部的 @ConfigurationProperties註解修飾的配置類的動態刷新。
- UnknownRemoteApplicationEvent:遠程未知事件。Bus 內部消息體進行轉換遠程事件的時候若是發生異常會統一包裝成該事件。
Bus 內部還存在一個非RemoteApplicationEvent事件 -SentApplicationEvent消息發送事件,配合 Trace 進行遠程消息發送的記錄。
這些事件會配合ApplicationListener進行操做,好比EnvironmentChangeRemoteApplicationEvent配了EnvironmentChangeListener進行配置的新增/修改:
public class EnvironmentChangeListener implements ApplicationListener<EnvironmentChangeRemoteApplicationEvent> { private static Log log = LogFactory.getLog(EnvironmentChangeListener.class); @Autowired private EnvironmentManager env; @Override public void onApplicationEvent(EnvironmentChangeRemoteApplicationEvent event) { Map<String, String> values = event.getValues(); log.info("Received remote environment change request. Keys/values to update " + values); for (Map.Entry<String, String> entry : values.entrySet()) { env.setProperty(entry.getKey(), entry.getValue()); } } }
收到其它節點發送來EnvironmentChangeRemoteApplicationEven事件以後調用EnvironmentManager#setProperty進行配置的設置,該方法內部針對每個配置項都會發送一個EnvironmentChangeEvent事件,而後被ConfigurationPropertiesRebinder所監聽,進行 rebind 操做新增/更新配置。
2)Actuator Endpoint
Bus 內部暴露了 2 個 Endpoint,分別是EnvironmentBusEndpoint和RefreshBusEndpoint,進行配置的新增/修改以及全局配置刷新。它們對應的 Endpoint id 即 url 是 bus-env和bus-refresh。
3)配置
Bus 對於消息的發送一定涉及到 Topic、Group 之類的信息,這些內容都被封裝到了BusProperties中,其默認的配置前綴爲spring.cloud.bus,好比:
- spring.cloud.bus.refresh.enabled用於開啓/關閉全局刷新的 Listener。
- spring.cloud.bus.env.enabled 用於開啓/關閉配置新增/修改的 Endpoint。
- spring.cloud.bus.ack.enabled 用於開啓開啓/關閉AckRemoteApplicationEvent事件的發送。
- spring.cloud.bus.trace.enabled 用於開啓/關閉息記錄 Trace 的 Listener。
消息發送涉及到的 Topic 默認用的是springCloudBus,能夠配置進行修改,Group 能夠設置成廣播模式或使用 UUID 配合 offset 爲 lastest 的模式。
每一個 Bus 應用都有一個對應的 Bus id,官方取值方式較複雜:
${vcap.application.name:${spring.application.name:application}}:${vcap.application.instance_index:${spring.application.index:${local.server.port:${server.port:0}}}}:${vcap.application.instance_id:${random.value}}
建議手動配置 Bus id,由於 Bus 遠程事件中的 destination 會根據 Bus id 進行匹配:
spring.cloud.bus.id=${spring.application.name}-${server.port}
2. Bus 底層分析
Bus 的底層分析無非牽扯到這幾個方面:
- 消息是如何發送的
- 消息是如何接收的
- destination 是如何匹配的
- 遠程事件收到後如何觸發下一個 action
BusAutoConfiguration自動化配置類被@EnableBinding(SpringCloudBusClient.class)所修飾。
@EnableBinding的用法在文章《Spring Cloud Stream 體系及原理介紹》中已經說明,且它的 value 爲SpringCloudBusClient.class,會在SpringCloudBusClient中基於代理建立出 input 和 output 的DirectChannel:
public interface SpringCloudBusClient { String INPUT = "springCloudBusInput"; String OUTPUT = "springCloudBusOutput"; @Output(SpringCloudBusClient.OUTPUT) MessageChannel springCloudBusOutput(); @Input(SpringCloudBusClient.INPUT) SubscribableChannel springCloudBusInput(); }
springCloudBusInput 和 springCloudBusOutput 這兩個 Binding 的屬性能夠經過配置文件進行修改(好比修改 topic):
spring.cloud.stream.bindings: springCloudBusInput: destination: my-bus-topic springCloudBusOutput: destination: my-bus-topic
消息的接收和發送:
// BusAutoConfiguration @EventListener(classes = RemoteApplicationEvent.class) // 1 public void acceptLocal(RemoteApplicationEvent event) { if (this.serviceMatcher.isFromSelf(event) && !(event instanceof AckRemoteApplicationEvent)) { // 2 this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build()); // 3 } } @StreamListener(SpringCloudBusClient.INPUT) // 4 public void acceptRemote(RemoteApplicationEvent event) { if (event instanceof AckRemoteApplicationEvent) { if (this.bus.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event) && this.applicationEventPublisher != null) { // 5 this.applicationEventPublisher.publishEvent(event); } // If it's an ACK we are finished processing at this point return; } if (this.serviceMatcher.isForSelf(event) && this.applicationEventPublisher != null) { // 6 if (!this.serviceMatcher.isFromSelf(event)) { // 7 this.applicationEventPublisher.publishEvent(event); } if (this.bus.getAck().isEnabled()) { // 8 AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this, this.serviceMatcher.getServiceId(), this.bus.getAck().getDestinationService(), event.getDestinationService(), event.getId(), event.getClass()); this.cloudBusOutboundChannel .send(MessageBuilder.withPayload(ack).build()); this.applicationEventPublisher.publishEvent(ack); } } if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher != null) { // 9 // We are set to register sent events so publish it for local consumption, // irrespective of the origin this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this, event.getOriginService(), event.getDestinationService(), event.getId(), event.getClass())); } }
-
利用 Spring 事件的監聽機制監聽本地全部的RemoteApplicationEvent遠程事件(好比bus-env會在本地發送EnvironmentChangeRemoteApplicationEvent事件,bus-refresh會在本地發送RefreshRemoteApplicationEvent事件,這些事件在這裏都會被監聽到)。
-
判斷本地接收到的事件不是AckRemoteApplicationEvent遠程確認事件(否則會死循環,一直接收消息,發送消息...)以及該事件是應用自身發送出去的(事件發送方是應用自身),若是都知足執行步驟 3。
-
構造 Message 並將該遠程事件做爲 payload,而後使用 Spring Cloud Stream 構造的 Binding name 爲 springCloudBusOutput 的 MessageChannel 將消息發送到 broker。
4.@StreamListener註解消費 Spring Cloud Stream 構造的 Binding name 爲 springCloudBusInput 的 MessageChannel,接收的消息爲遠程消息。
-
若是該遠程事件是AckRemoteApplicationEvent遠程確認事件而且應用開啓了消息追蹤 trace 開關,同時該遠程事件不是應用自身發送的(事件發送方不是應用自身,表示事件是其它應用發送過來的),那麼本地發送AckRemoteApplicationEvent遠程確認事件表示應用確認收到了其它應用發送過來的遠程事件,流程結束。
-
若是該遠程事件是其它應用發送給應用自身的(事件的接收方是應用自身),那麼進行步驟 7 和 8,不然執行步驟 9。
-
該遠程事件不是應用自身發送(事件發送方不是應用自身)的話,將該事件以本地的方式發送出去。應用自身一開始已經在本地被對應的消息接收方處理了,無需再次發送。
-
若是開啓了AckRemoteApplicationEvent遠程確認事件的開關,構造AckRemoteApplicationEvent事件並在遠程和本地都發送該事件(本地發送是由於步驟 5 沒有進行本地AckRemoteApplicationEvent事件的發送,也就是自身應用對自身應用確認; 遠程發送是爲了告訴其它應用,自身應用收到了消息)。
-
若是開啓了消息記錄 Trace 的開關,本地構造併發送SentApplicationEvent事件。
bus-env觸發後全部節點的EnvironmentChangeListener監聽到了配置的變化,控制檯都會打印出如下信息:
o.s.c.b.event.EnvironmentChangeListener : Received remote environment change request. Keys/values to update {hangzhou=alibaba}
若是在本地監聽遠程確認事件 AckRemoteApplicationEvent,都會收到全部節點的信息,好比 node5 節點的控制檯監聽到的 AckRemoteApplicationEvent事件以下:
ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670484,"originService":"rocketmq-bus-node5:10005","destinationService":"**","id":"375f0426-c24e-4904-bce1-5e09371fc9bc","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"} ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670184,"originService":"rocketmq-bus-node1:10001","destinationService":"**","id":"91f06cf1-4bd9-4dd8-9526-9299a35bb7cc","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"} ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670402,"originService":"rocketmq-bus-node2:10002","destinationService":"**","id":"7df3963c-7c3e-4549-9a22-a23fa90a6b85","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"} ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670406,"originService":"rocketmq-bus-node3:10003","destinationService":"**","id":"728b45ee-5e26-46c2-af1a-e8d1571e5d3a","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"} ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670427,"originService":"rocketmq-bus-node4:10004","destinationService":"**","id":"1812fd6d-6f98-4e5b-a38a-4b11aee08aeb","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}
那麼回到本章節開頭提到的 4 個問題,咱們分別作一下解答:
- 消息是如何發送的: 在BusAutoConfiguration#acceptLocal方法中經過 Spring Cloud Stream 發送事件到springCloudBustopic 中。
- 消息是如何接收的: 在BusAutoConfiguration#acceptRemote方法中經過 Spring Cloud Stream 接收springCloudBustopic 的消息。
- destination 是如何匹配的: 在BusAutoConfiguration#acceptRemote方法中接收遠程事件方法裏對 destination 進行匹配。
- 遠程事件收到後如何觸發下一個 action: Bus 內部經過 Spring 的事件機制接收本地的RemoteApplicationEvent具體的實現事件再作下一步的動做(好比EnvironmentChangeListener接收了EnvironmentChangeRemoteApplicationEvent事件,RefreshListener接收了RefreshRemoteApplicationEvent事件)。
總結
Spring Cloud Bus 自身內容仍是比較少的,不過仍是須要提早了解 Spring Cloud Stream 體系以及 Spring 自身的事件機制,在此基礎上,才能更好地理解 Spring Cloud Bus 對本地事件和遠程事件的處理邏輯。
目前 Bus 內置的遠程事件較少,大多數爲配置相關的事件,咱們能夠繼承RemoteApplicationEvent並配合@RemoteApplicationEventScan註解構建自身的微服務消息體系。
做者簡介
方劍(花名:洛夜),GitHub ID @fangjian0423,開源愛好者,阿里巴巴高級開發工程師,阿里雲產品 EDAS 開發,Spring Cloud Alibaba 開源項目負責人之一。