前言:Vert.x 實現了2種完成不一樣的eventBus:node
EventBusImpl(A local event bus implementation)和 它的子類 ClusteredEventBus(An event bus implementation that clusters with other Vert.x nodes)。這裏介紹下EventBusImpl設計模式
EventBusImpl 原理:調用consumer方法時,以address-handler做爲k-v存在一個map的容器中。接着調用send方法時,把message,DeploymentOptions等內容封裝成對象(MessageIml,命令模式),從以address爲k從map裏取出handler.把MessageIml做爲參數傳遞給handler運行。安全
一.初始化: 數據結構
初始化過程就是new EventBusImpl,並修改狀態變量started。app
首先,在VertxImpl的構造方法框架
VertxImpl(VertxOptions options, Handler<AsyncResult<Vertx>> resultHandler)
中進行初始化。以 options.isClustered()爲判斷條件,調用createAndStartEventBus(options, resultHandler);oop
其次createAndStartEventBus中作了2件事測試
1.以options.isClustered()判斷條件,new出了ClusteredEventBus/ EventBusImpl. new時並無業務邏輯。(額外提一句eventBus = new EventBusImpl(this);使eventBus和VertImpl相互擁有對方的引用,是很常見的寫法。)ui
2.調用EventBusImpl的初始化方法start(),並返回結果給最外層resultHandler的。start()更沒作什麼事,只是EventBusImpl裏面有個狀態變量started。把它置爲true.this
二. consumer訂閱
EventBusImpl維護了
protected final ConcurrentMap<String, Handlers> handlerMap = new ConcurrentHashMap<>();
成員變量。
Handlers 是一個handler的List的封裝類,上面能夠理解爲 ConcurrentMap<String, List<Handler>>這種數據結構。consumer方法以address爲k,以handler做v的list的一員,存放在handlerMap中。
因此重點關注對handlerMap的操做。
調用vertx.eventBus().consumer("Address1", ar -> {});發生了什麼?
查看代碼發現,先new HandlerRegistration(這裏也有相互引用)。再調用HandlerRegistration .handler,那裏面又會調用eventBusImpl.addRegistration()。在HandlerRegistration這個類兜了一圈,又回到eventBusImpl裏。
(相關代碼截斷以下: EventBusImpl.consumer(address);--> new HandlerRegistration --> consumer.handler-->eventBus.addRegistration(address, this, repliedAddress != null, localOnly);)
核心邏輯在addRegistration() 和 addLocalRegistration()中。個人理解是,前個方法明顯有問題。最後一句addRegistration(newAddress, address, replyHandler, localOnly, registration::setResult);前面的參數都沒有使用,應該能夠省略,修改成addRegistration(registration::setResult);就能夠。不多在Vert.x框架中看到這樣不合規範的代碼。若是讀者有好的看法,歡迎留言。
// 調用 addLocalRegistration
// 註冊完成
protected <T> void addRegistration(String address, HandlerRegistration<T> registration, boolean replyHandler, boolean localOnly) { Objects.requireNonNull(registration.getHandler(), "handler"); boolean newAddress = addLocalRegistration(address, registration, replyHandler, localOnly); addRegistration(newAddress, address, replyHandler, localOnly, registration::setResult); }
/** *
* 初始化 或 獲取原 Contex
初始化 或 獲取原 Handlers
* 新建 HandlerHolder
* Handlers 裏添加 HandlerHolder
**/
protected <T> boolean addLocalRegistration(String address, HandlerRegistration<T> registration, boolean replyHandler, boolean localOnly) { Objects.requireNonNull(address, "address"); Context context = Vertx.currentContext(); boolean hasContext = context != null; if (!hasContext) { // Embedded context = vertx.getOrCreateContext(); } registration.setHandlerContext(context); boolean newAddress = false; HandlerHolder holder = new HandlerHolder<>(metrics, registration, replyHandler, localOnly, context); Handlers handlers = handlerMap.get(address); if (handlers == null) { handlers = new Handlers(); Handlers prevHandlers = handlerMap.putIfAbsent(address, handlers); if (prevHandlers != null) { handlers = prevHandlers; } newAddress = true; } handlers.list.add(holder); if (hasContext) { HandlerEntry entry = new HandlerEntry<>(address, registration); context.addCloseHook(entry); } return newAddress; }
新出現的幾個類的做用:
Context 線程調度--Vert.x框架的優勢是線程安全,就是經過Context實現。
HandlerHolder--對HandlerRegistration的封裝,外加Context。
Handlers--上面HandlerHolder 的集合封裝,外加平衡輪詢邏輯。
handlers.list.add(holder);這句做爲壓軸(戲曲名詞,指一場摺子戲演出的倒數第二個劇目)出場完成整個功能的核心註冊操做。
至於後面的那段代碼,我以爲有點問題。
if (hasContext) { HandlerEntry entry = new HandlerEntry<>(address, registration); context.addCloseHook(entry); }
做用是在context上註冊關閉事件,由DeploymentManager在unploy的時候調用,對應的核心邏輯在 CloseHooks.run()方法中。但這個這個判斷條件案例只有第2次添加consumer的時候纔有效果。或者是上面的代碼boolean hasContext = context != null;給人的誤導? 以上consumer的流程還被reply方法使用。
三. Send/Publish發送
多個send重載方法最後定位到EventBusImpl.send(String address, Object message, DeliveryOptions options, Handler<AsyncResult<Message<T>>> replyHandler)。但這個核心方法的最終卻調用了一個
名爲sendOrPubInternal的方法,不禁得在讓人想起寫程序最難的事之一是命名。正如開頭說的這個使用了設計模式中的命令模式,把參數封裝成MessageImpl對象發送到後面的方法。
sendOrPubInternal作了3個事情,
1.createReplyHandlerRegistration -- 有replyHandler.reply()這步纔有意義
2.new SendContextImpl -- 從Context類判斷,SendContextImpl能夠綁定線程
3.sendContext.next(); -- 在執行方法前,執行攔截器。攔截器極大地豐富開發人員的自定義使用。
原本應該1,2,3順序介紹代碼,可是消息流程通常是:
Sender----( message )--->customer;
Sender<---(reply message)---customer;
根據這個流程,得先介紹2.new SendContextImpl 和3.sendContext.next();
再回頭介紹 1.createReplyHandlerRegistration
先說 2.new SendContextImpl
這個類是整個Send相關類的大封裝。
3.sendContext.next();
根據代碼流程
sendOrPub--》deliverMessageLocally--》deliverMessageLocally
進入到deliverMessageLocally(),這個方法作了2個大事情。
方法的第一句話msg.setBus(this);和reply邏輯有關係。在這個local eventbus下,是重複賦值,沒有做用的。
而後Handlers handlers = handlerMap.get(msg.address());
這句根據以address爲k,取出Handlers。sender的messageImpl 終於和consumer的HandlerHold見面
Handler.choose()方法實現了輪詢發送message, 我的認爲這個方法叫作 balanceChoose()更好。
代碼以下:
public HandlerHolder choose() { while (true) { int size = list.size(); if (size == 0) { return null; } int p = pos.getAndIncrement(); if (p >= size - 1) { pos.set(0); } try { return list.get(p); } catch (IndexOutOfBoundsException e) { // Can happen pos.set(0); } } }
當時我使用Vert.x的時候,就很好奇eventBus的輪詢功能怎麼實現。如今看到其實很是簡單。維護一個 AtomicInteger 的變量,每次調用累加一次。若是超過List的長度,則重置爲0,方法永遠返回 list.get(p)。巧妙!
最後在deliverToHandler()方法裏,在Context的線程控制下,完成message和handler的最終交互。
那麼,回到最開始的問題,
Sender----( message )--->Customer;
Sender<---(reply message)---Customer;
在上面的流程中,Sender根據address找到Customer從而發送message,那麼Customer的reply是怎麼找到Sender的呢?
答案是一個臨時的replyAddress。經過以 replyAddress爲key,把Sender做爲handler註冊到eventBusImpl上,處理後直接註銷。replyAddress的規律是從1開始的步長爲1的自增數列,因此開發者不該該使用純數字做爲自身業務的Address,避免衝突。
最後說說1.createReplyHandlerRegistration
若是sender在發送消息時使用了
send(String address, Object message, Handler<AsyncResult<Message<T>>> replyHandler);方法。
vertx.eventBus().send("address1", "測試消息", ar -> { if (ar.succeeded()) { System.out.println("我是producer1:" + ar.result().body()); } });
而且consumer在接受消息到後,調用了 reply();
vertx.eventBus().consumer("address1", ar -> {
System.out.println("consumer:" + ar.body());
ar.reply("consumer reply message ");
});
則會進入createReplyHandlerRegistration的處理邏輯。
使用
protected String generateReplyAddress() {
return Long.toString(replySequence.incrementAndGet());
}
這裏產生從1開始的步長爲1的自增數列address。
Handler<Message<T>> simpleReplyHandler = convertHandler(replyHandler); HandlerRegistration<T> registration = new HandlerRegistration<>(vertx, metrics, this, replyAddress, message.address, true, replyHandler, timeout); registration.handler(simpleReplyHandler);
裏面的this是eventBusImpl,並在handler()方法裏把 boolean replyHander的值置爲true.
這樣,eventBusImpl的handlerMap變量裏,就多了<replyAddress, replyHander>。
在cuomser處調用reply()後,會在eventBusImpl的內部類ReplySendContextImpl<T> extends SendContextImpl 的參與下,走相似send()的流程。區別是最後在deliverToHandler()方法裏,會判斷boolean replyHander的值,若是是true調用完畢就註銷.
錯誤代碼測驗:
vertx.eventBus().consumer("1", ar -> { System.out.println("我不該該在這裏" + ar.body()); ar.reply("對不起,其實我是阿杜。"); }); vertx.eventBus().consumer("address1", ar -> { System.out.println("consumer:" + ar.body()); ar.reply("我是高帥富"); }); vertx.eventBus().send("address1", "測試消息", ar -> { if (ar.succeeded()) { System.out.println("sender:接收收到的迴應是:"+ar.result().body()); }else{ System.out.println("發送失敗"); } });
存在consumer("1", ar -> {})的Console:
consumer:測試消息 我不該該在這裏我是高帥富 20:08:56.404 [vert.x-eventloop-thread-0] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024 20:08:56.405 [vert.x-eventloop-thread-0] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096 發送失敗
能夠看到上面的輸出徹底不是設想的結果。
若是不存在consumer("1", ar -> {})address爲1的Console:
consumer:測試消息 sender:接收收到的迴應是:我是高帥富
最後,再次提醒:使用eventBus時,不要使用純數字做爲自身業務的address。