繼上一篇 乾貨|Spring Cloud Stream 體系及原理介紹 以後,本期咱們來了解下 Spring Cloud 體系中的另一個組件 Spring Cloud Bus (建議先熟悉 Spring Cloud Stream,否則沒法理解 Spring Cloud Bus 內部的代碼)。html
Spring Cloud Bus 對本身的定位是 Spring Cloud 體系內的消息總線,使用 message broker 來鏈接分佈式系統的全部節點。Bus 官方的 Reference 文檔比較簡單,簡單到連一張圖都沒有。node
這是最新版的 Spring Cloud Bus 代碼結構(代碼量比較少):git
在分析 Bus 的實現以前,咱們先來看兩個使用 Spring Cloud Bus 的簡單例子。github
Bus 的例子比較簡單,由於 Bus 的 AutoConfiguration 層都有了默認的配置,只須要引入消息中間件對應的 Spring Cloud Stream 以及 Spring Cloud Bus 依賴便可,以後全部啓動的應用都會使用同一個 Topic 進行消息的接收和發送。spring
Bus 對應的 Demo 已經放到了 github 上:https://github.com/fangjian0423/rocketmq-binder-demo/tree/master/rocketmq-bus-demo 。 該 Demo 會模擬啓動 5 個節點,只須要對其中任意的一個實例新增配置項,全部節點都會新增該配置項。json
訪問任意節點提供的 Controller 提供的獲取配置的地址(key爲hangzhou
):併發
curl -X GET 'http://localhost:10001/bus/env?key=hangzhou'
全部節點返回的結果都是 unknown,由於全部節點的配置中沒有 hangzhou
這個 key。app
Bus 內部提供了 EnvironmentBusEndpoint
這個 Endpoint 經過 message broker 用來新增/更新配置。dom
訪問任意節點該 Endpoint 對應的 url /actuator/bus-env?name=hangzhou&value=alibaba
進行配置項的新增(好比訪問 node1 的url):curl
curl -X POST 'http://localhost:10001/actuator/bus-env?name=hangzhou&value=alibaba' -H 'content-type: application/json'
而後再次訪問全部節點 /bus/env
獲取配置:
$ curl -X GET 'http://localhost:10001/bus/env?key=hangzhou' unknown% ~ ⌚ $ curl -X GET 'http://localhost:10002/bus/env?key=hangzhou' unknown% ~ ⌚ $ curl -X GET 'http://localhost:10003/bus/env?key=hangzhou' unknown% ~ ⌚ $ curl -X GET 'http://localhost:10004/bus/env?key=hangzhou' unknown% ~ ⌚ $ curl -X GET 'http://localhost:10005/bus/env?key=hangzhou' unknown% ~ ⌚ $ curl -X POST 'http://localhost:10001/actuator/bus-env?name=hangzhou&value=alibaba' -H 'content-type: application/json' ~ ⌚ $ curl -X GET 'http://localhost:10005/bus/env?key=hangzhou' alibaba% ~ ⌚ $ curl -X GET 'http://localhost:10004/bus/env?key=hangzhou' alibaba% ~ ⌚ $ curl -X GET 'http://localhost:10003/bus/env?key=hangzhou' alibaba% ~ ⌚ $ curl -X GET 'http://localhost:10002/bus/env?key=hangzhou' alibaba% ~ ⌚ $ curl -X GET 'http://localhost:10001/bus/env?key=hangzhou' alibaba%
能夠看到,全部節點都新增了一個 key 爲 hangzhou
的配置,且對應的 value 是 alibaba
。這個配置項是經過 Bus 提供的 EnvironmentBusEndpoint
完成的。
這裏引用 程序猿DD 畫的一張圖片,Spring Cloud Config 配合 Bus 完成全部節點配置的刷新來描述以前的實例(本文實例不是刷新,而是新增配置,可是流程是同樣的):
好比在 node1 上指定 destination 爲 rocketmq-bus-node2 (node2 配置了 spring.cloud.bus.id 爲 rocketmq-bus-node2:10002
,能夠匹配上) 進行配置的修改:
curl -X POST 'http://localhost:10001/actuator/bus-env/rocketmq-bus-node2?name=hangzhou&value=xihu' -H 'content-type: application/json'
訪問 /bus/env
獲取配置(因爲在 node1 上發送消息,Bus 也會對發送方的節點 node1 進行配置修改):
~ ⌚ $ curl -X POST 'http://localhost:10001/actuator/bus-env/rocketmq-bus-node2?name=hangzhou&value=xihu' -H 'content-type: application/json' ~ ⌚ $ curl -X GET 'http://localhost:10005/bus/env?key=hangzhou' alibaba% ~ ⌚ $ curl -X GET 'http://localhost:10004/bus/env?key=hangzhou' alibaba% ~ ⌚ $ curl -X GET 'http://localhost:10003/bus/env?key=hangzhou' alibaba% ~ ⌚ $ curl -X GET 'http://localhost:10002/bus/env?key=hangzhou' xihu% ~ ⌚ $ curl -X GET 'http://localhost:10001/bus/env?key=hangzhou' xihu%
能夠看到,只有 node1 和 node2 修改了配置,其他的 3 個節點配置未改變。
事件
Bus 中定義了遠程事件 RemoteApplicationEvent
,該事件繼承了 Spring 的事件 ApplicationEvent
,並且它目前有 4 個具體的實現:
Map<String, String>
類型的數據並更新到 Spring 上下文中 Environment
中的事件。文中的實例就是使用這個事件並配合 EnvironmentBusEndpoint
和 EnvironmentChangeListener
完成的。AckRemoteApplicationEvent
確認事件進行確認。@RefreshScope
以及全部的 @ConfigurationProperties
註解修飾的配置類的動態刷新。Bus 內部還存在一個非 RemoteApplicationEvent
事件 - SentApplicationEvent
消息發送事件,配合 Trace 進行遠程消息發送的記錄。
這些事件會配合 ApplicationListener
進行操做,好比 EnvironmentChangeRemoteApplicationEvent
配了 EnvironmentChangeListener
進行配置的新增/修改:
public class EnvironmentChangeListener implements ApplicationListener<EnvironmentChangeRemoteApplicationEvent> { private static Log log = LogFactory.getLog(EnvironmentChangeListener.class); @Autowired private EnvironmentManager env; @Override public void onApplicationEvent(EnvironmentChangeRemoteApplicationEvent event) { Map<String, String> values = event.getValues(); log.info("Received remote environment change request. Keys/values to update " + values); for (Map.Entry<String, String> entry : values.entrySet()) { env.setProperty(entry.getKey(), entry.getValue()); } } }
收到其它節點發送來的 EnvironmentChangeRemoteApplicationEvent
事件以後調用 EnvironmentManager#setProperty
進行配置的設置,該方法內部針對每個配置項都會發送一個 EnvironmentChangeEvent
事件,而後被 ConfigurationPropertiesRebinder
所監聽,進行 rebind 操做新增/更新配置。
Actuator Endpoint
Bus 內部暴露了 2 個 Endpoint,分別是 EnvironmentBusEndpoint
和 RefreshBusEndpoint
,進行配置的新增/修改以及全局配置刷新。它們對應的 Endpoint id 即 url 是 bus-env
和 bus-refresh
。
配置
Bus 對於消息的發送一定涉及到 Topic、Group 之類的信息,這些內容都被封裝到了 BusProperties
中,其默認的配置前綴爲 spring.cloud.bus
,好比:
spring.cloud.bus.refresh.enabled
用於開啓/關閉全局刷新的 Listener。spring.cloud.bus.env.enabled
用於開啓/關閉配置新增/修改的 Endpoint。spring.cloud.bus.ack.enabled
用於開啓開啓/關閉 AckRemoteApplicationEvent
事件的發送。spring.cloud.bus.trace.enabled
用於開啓/關閉消息記錄 Trace 的 Listener。消息發送涉及到的 Topic 默認用的是 springCloudBus
,能夠配置進行修改,Group 能夠設置成廣播模式或使用 UUID 配合 offset 爲 lastest 的模式。
每一個 Bus 應用都有一個對應的 Bus id,官方取值方式較複雜:
${vcap.application.name:${spring.application.name:application}}:${vcap.application.instance_index:${spring.application.index:${local.server.port:${server.port:0}}}}:${vcap.application.instance_id:${random.value}}
建議手動配置 Bus id,由於 Bus 遠程事件中的 destination 會根據 Bus id 進行匹配:
spring.cloud.bus.id=${spring.application.name}-${server.port}
Bus 的底層分析無非牽扯到這幾個方面:
BusAutoConfiguration
自動化配置類被 @EnableBinding(SpringCloudBusClient.class)
所修飾。
@EnableBinding
的用法在上期文章 乾貨|Spring Cloud Stream 體系及原理介紹 中已經說明,且它的 value 爲 SpringCloudBusClient.class
,會在 SpringCloudBusClient
中基於代理建立出 input 和 output 的 DirectChannel
:
public interface SpringCloudBusClient { String INPUT = "springCloudBusInput"; String OUTPUT = "springCloudBusOutput"; @Output(SpringCloudBusClient.OUTPUT) MessageChannel springCloudBusOutput(); @Input(SpringCloudBusClient.INPUT) SubscribableChannel springCloudBusInput(); }
springCloudBusInput 和 springCloudBusOutput 這兩個 Binding 的屬性能夠經過配置文件進行修改(好比修改 topic):
spring.cloud.stream.bindings: springCloudBusInput: destination: my-bus-topic springCloudBusOutput: destination: my-bus-topic
消息的接收的發送:
// BusAutoConfiguration @EventListener(classes = RemoteApplicationEvent.class) // 1 public void acceptLocal(RemoteApplicationEvent event) { if (this.serviceMatcher.isFromSelf(event) && !(event instanceof AckRemoteApplicationEvent)) { // 2 this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build()); // 3 } } @StreamListener(SpringCloudBusClient.INPUT) // 4 public void acceptRemote(RemoteApplicationEvent event) { if (event instanceof AckRemoteApplicationEvent) { if (this.bus.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event) && this.applicationEventPublisher != null) { // 5 this.applicationEventPublisher.publishEvent(event); } // If it's an ACK we are finished processing at this point return; } if (this.serviceMatcher.isForSelf(event) && this.applicationEventPublisher != null) { // 6 if (!this.serviceMatcher.isFromSelf(event)) { // 7 this.applicationEventPublisher.publishEvent(event); } if (this.bus.getAck().isEnabled()) { // 8 AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this, this.serviceMatcher.getServiceId(), this.bus.getAck().getDestinationService(), event.getDestinationService(), event.getId(), event.getClass()); this.cloudBusOutboundChannel .send(MessageBuilder.withPayload(ack).build()); this.applicationEventPublisher.publishEvent(ack); } } if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher != null) { // 9 // We are set to register sent events so publish it for local consumption, // irrespective of the origin this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this, event.getOriginService(), event.getDestinationService(), event.getId(), event.getClass())); } }
RemoteApplicationEvent
遠程事件(好比 bus-env
會在本地發送 EnvironmentChangeRemoteApplicationEvent
事件,bus-refresh
會在本地發送 RefreshRemoteApplicationEvent
事件,這些事件在這裏都會被監聽到)。AckRemoteApplicationEvent
遠程確認事件(否則會死循環,一直接收消息,發送消息...)以及該事件是應用自身發送出去的(事件發送方是應用自身),若是都知足執行步驟 3。@StreamListener
註解消費 Spring Cloud Stream 構造的 Binding name 爲 springCloudBusInput 的 MessageChannel,接收的消息爲遠程消息。AckRemoteApplicationEvent
遠程確認事件而且應用開啓了消息追蹤 trace 開關,同時該遠程事件不是應用自身發送的(事件發送方不是應用自身,表示事件是其它應用發送過來的),那麼本地發送 AckRemoteApplicationEvent
遠程確認事件表示應用確認收到了其它應用發送過來的遠程事件,流程結束。AckRemoteApplicationEvent
遠程確認事件的開關,構造 AckRemoteApplicationEvent
事件並在遠程和本地都發送該事件(本地發送是由於步驟 5 沒有進行本地 AckRemoteApplicationEvent
事件的發送,也就是自身應用對自身應用確認; 遠程發送是爲了告訴其它應用,自身應用收到了消息)。SentApplicationEvent
事件bus-env
觸發後全部節點的 EnvironmentChangeListener
監聽到了配置的變化,控制檯都會打印出如下信息:
o.s.c.b.event.EnvironmentChangeListener : Received remote environment change request. Keys/values to update {hangzhou=alibaba}
若是在本地監聽遠程確認事件 AckRemoteApplicationEvent
,都會收到全部節點的信息,好比 node5 節點的控制檯監聽到的 AckRemoteApplicationEvent
事件以下:
ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670484,"originService":"rocketmq-bus-node5:10005","destinationService":"**","id":"375f0426-c24e-4904-bce1-5e09371fc9bc","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"} ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670184,"originService":"rocketmq-bus-node1:10001","destinationService":"**","id":"91f06cf1-4bd9-4dd8-9526-9299a35bb7cc","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"} ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670402,"originService":"rocketmq-bus-node2:10002","destinationService":"**","id":"7df3963c-7c3e-4549-9a22-a23fa90a6b85","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"} ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670406,"originService":"rocketmq-bus-node3:10003","destinationService":"**","id":"728b45ee-5e26-46c2-af1a-e8d1571e5d3a","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"} ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670427,"originService":"rocketmq-bus-node4:10004","destinationService":"**","id":"1812fd6d-6f98-4e5b-a38a-4b11aee08aeb","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}
那麼回到本章節開頭提到的 4 個問題,咱們分別作一下解答:
BusAutoConfiguration#acceptLocal
方法中經過 Spring Cloud Stream 發送事件到 springCloudBus
topic 中。BusAutoConfiguration#acceptRemote
方法中經過 Spring Cloud Stream 接收 springCloudBus
topic 的消息。BusAutoConfiguration#acceptRemote
方法中接收遠程事件方法裏對 destination 進行匹配。RemoteApplicationEvent
具體的實現事件再作下一步的動做(好比 EnvironmentChangeListener
接收了 EnvironmentChangeRemoteApplicationEvent
事件, RefreshListener
接收了 RefreshRemoteApplicationEvent
事件)。Spring Cloud Bus 自身內容仍是比較少的,不過仍是須要提早了解 Spring Cloud Stream 體系以及 Spring 自身的事件機制,在此基礎上,才能更好地理解 Spring Cloud Bus 對本地事件和遠程事件的處理邏輯。
目前 Bus 內置的遠程事件較少,大多數爲配置相關的事件,咱們能夠繼承 RemoteApplicationEvent
並配合 @RemoteApplicationEventScan
註解構建自身的微服務消息體系。
本文做者:中間件小哥
本文爲雲棲社區原創內容,未經容許不得轉載。