Vert.x系列(二)--EventBusImpl源碼分析

前言:Vert.x 實現了2種完成不一樣的eventBus:node

EventBusImpl(A local event bus implementation)和 它的子類 ClusteredEventBus(An event bus implementation that clusters with other Vert.x nodes)。這裏介紹下EventBusImpl設計模式

 

EventBusImpl 原理:調用consumer方法時,以address-handler做爲k-v存在一個map的容器中。接着調用send方法時,把message,DeploymentOptions等內容封裝成對象(MessageIml,命令模式),從以address爲k從map裏取出handler.把MessageIml做爲參數傳遞給handler運行。安全

 

一.初始化: 數據結構

初始化過程就是new  EventBusImpl,並修改狀態變量started。app

首先,在VertxImpl的構造方法框架

VertxImpl(VertxOptions options, Handler<AsyncResult<Vertx>> resultHandler)

中進行初始化。以  options.isClustered()爲判斷條件,調用createAndStartEventBus(options, resultHandler);oop

其次createAndStartEventBus中作了2件事測試

1.以options.isClustered()判斷條件,new出了ClusteredEventBus/ EventBusImpl. new時並無業務邏輯。(額外提一句eventBus = new EventBusImpl(this);使eventBus和VertImpl相互擁有對方的引用,是很常見的寫法。)ui

2.調用EventBusImpl的初始化方法start(),並返回結果給最外層resultHandler的。start()更沒作什麼事,只是EventBusImpl裏面有個狀態變量started。把它置爲true.this

 

二. consumer訂閱

EventBusImpl維護了

protected final ConcurrentMap<String, Handlers> handlerMap = new ConcurrentHashMap<>()

成員變量。

Handlers 是一個handler的List的封裝類,上面能夠理解爲 ConcurrentMap<String, List<Handler>>這種數據結構。consumer方法以address爲k,以handler做v的list的一員,存放在handlerMap中。

因此重點關注對handlerMap的操做。

 

調用vertx.eventBus().consumer("Address1", ar -> {});發生了什麼?

查看代碼發現,先new HandlerRegistration這裏也有相互引用。再調用HandlerRegistration .handler,那裏面又會調用eventBusImpl.addRegistration()。在HandlerRegistration這個類兜了一圈,又回到eventBusImpl裏。

(相關代碼截斷以下:  EventBusImpl.consumer(address);--> new HandlerRegistration --> consumer.handler-->eventBus.addRegistration(address, this, repliedAddress != null, localOnly);

核心邏輯在addRegistration() 和 addLocalRegistration()中。個人理解是,前個方法明顯有問題。最後一句addRegistration(newAddress, address, replyHandler, localOnly, registration::setResult);前面的參數都沒有使用,應該能夠省略,修改成addRegistration(registration::setResult);就能夠。不多在Vert.x框架中看到這樣不合規範的代碼。若是讀者有好的看法,歡迎留言。

 

// 調用 addLocalRegistration

// 註冊完成

protected <T> void addRegistration(String address, HandlerRegistration<T> registration,
                                   boolean replyHandler, boolean localOnly) {
  Objects.requireNonNull(registration.getHandler(), "handler");
  boolean newAddress = addLocalRegistration(address, registration, replyHandler, localOnly);
  addRegistration(newAddress, address, replyHandler, localOnly, registration::setResult);
}

 

/** *

* 初始化 或 獲取原 Contex

初始化 或 獲取原 Handlers

* 新建  HandlerHolder

* Handlers 裏添加  HandlerHolder

**/

protected <T> boolean addLocalRegistration(String address, HandlerRegistration<T> registration,
                                           boolean replyHandler, boolean localOnly) {
  Objects.requireNonNull(address, "address");

  Context context = Vertx.currentContext();
  boolean hasContext = context != null;
  if (!hasContext) {
    // Embedded
    context = vertx.getOrCreateContext();
  }
  registration.setHandlerContext(context);

  boolean newAddress = false;

  HandlerHolder holder = new HandlerHolder<>(metrics, registration, replyHandler, localOnly, context);

  Handlers handlers = handlerMap.get(address);
  if (handlers == null) {
    handlers = new Handlers();
    Handlers prevHandlers = handlerMap.putIfAbsent(address, handlers);
    if (prevHandlers != null) {
      handlers = prevHandlers;
    }
    newAddress = true;
  }
  handlers.list.add(holder);

  if (hasContext) {
    HandlerEntry entry = new HandlerEntry<>(address, registration);
    context.addCloseHook(entry);
  }

  return newAddress;
}

新出現的幾個類的做用:

Context 線程調度--Vert.x框架的優勢是線程安全,就是經過Context實現。

HandlerHolder--對HandlerRegistration的封裝,外加Context。

Handlers--上面HandlerHolder 的集合封裝,外加平衡輪詢邏輯。

 

handlers.list.add(holder);這句做爲壓軸(戲曲名詞,指一場摺子戲演出的倒數第二個劇目)出場完成整個功能的核心註冊操做。

至於後面的那段代碼,我以爲有點問題。

if (hasContext) {
 HandlerEntry entry = new HandlerEntry<>(address, registration);
 context.addCloseHook(entry);
 }

做用是在context上註冊關閉事件,由DeploymentManager在unploy的時候調用,對應的核心邏輯在 CloseHooks.run()方法中。但這個這個判斷條件案例只有第2次添加consumer的時候纔有效果。或者是上面的代碼boolean hasContext = context != null;給人的誤導? 以上consumer的流程還被reply方法使用。

 

三. Send/Publish發送

多個send重載方法最後定位到EventBusImpl.send(String address, Object message, DeliveryOptions options, Handler<AsyncResult<Message<T>>> replyHandler)。但這個核心方法的最終卻調用了一個

名爲sendOrPubInternal的方法,不禁得在讓人想起寫程序最難的事之一是命名。正如開頭說的這個使用了設計模式中的命令模式,把參數封裝成MessageImpl對象發送到後面的方法。

sendOrPubInternal作了3個事情,

1.createReplyHandlerRegistration -- 有replyHandler.reply()這步纔有意義

2.new SendContextImpl -- 從Context類判斷,SendContextImpl能夠綁定線程

3.sendContext.next(); -- 在執行方法前,執行攔截器。攔截器極大地豐富開發人員的自定義使用。

原本應該1,2,3順序介紹代碼,可是消息流程通常是:

Sender----(    message  )--->customer;

Sender<---(reply message)---customer;

根據這個流程,得先介紹2.new SendContextImpl 和3.sendContext.next();

再回頭介紹 1.createReplyHandlerRegistration

 

 

先說 2.new SendContextImpl

這個類是整個Send相關類的大封裝。

3.sendContext.next();

根據代碼流程

sendOrPub--》deliverMessageLocally--》deliverMessageLocally

進入到deliverMessageLocally(),這個方法作了2個大事情。

  1. 獲取address所對應的全部handlers
  2. 根據isSend()區分 send (平衡輪詢發一個handler)/publish(遍歷handlers發給全部)

方法的第一句話msg.setBus(this);和reply邏輯有關係。在這個local eventbus下,是重複賦值,沒有做用的。

而後Handlers handlers = handlerMap.get(msg.address());

這句根據以address爲k,取出Handlers。sender的messageImpl 終於和consumer的HandlerHold見面

 

Handler.choose()方法實現了輪詢發送message, 我的認爲這個方法叫作 balanceChoose()更好。

代碼以下:

public HandlerHolder choose() {
  while (true) {
    int size = list.size();
    if (size == 0) {
      return null;
    }
    int p = pos.getAndIncrement();
    if (p >= size - 1) {
      pos.set(0);
    }
    try {
      return list.get(p);
    } catch (IndexOutOfBoundsException e) {
      // Can happen
      pos.set(0);
    }
  }
}

當時我使用Vert.x的時候,就很好奇eventBus的輪詢功能怎麼實現。如今看到其實很是簡單。維護一個 AtomicInteger 的變量,每次調用累加一次。若是超過List的長度,則重置爲0,方法永遠返回 list.get(p)。巧妙!

最後在deliverToHandler()方法裏,在Context的線程控制下,完成message和handler的最終交互。

 

那麼,回到最開始的問題,

Sender----(    message  )--->Customer;

Sender<---(reply message)---Customer;

在上面的流程中,Sender根據address找到Customer從而發送message,那麼Customer的reply是怎麼找到Sender的呢?

答案是一個臨時的replyAddress。經過以 replyAddress爲key,把Sender做爲handler註冊到eventBusImpl上,處理後直接註銷。replyAddress的規律是從1開始的步長爲1的自增數列,因此開發者不該該使用純數字做爲自身業務的Address,避免衝突。

 

最後說說1.createReplyHandlerRegistration

若是sender在發送消息時使用了

send(String address, Object message, Handler<AsyncResult<Message<T>>> replyHandler);方法。

vertx.eventBus().send("address1", "測試消息", ar -> { if (ar.succeeded()) { System.out.println("我是producer1:" + ar.result().body()); } });

而且consumer在接受消息到後,調用了 reply();

vertx.eventBus().consumer("address1", ar -> {
    System.out.println("consumer:" + ar.body());
    ar.reply("consumer reply message ");
});

則會進入createReplyHandlerRegistration的處理邏輯。

使用

protected String generateReplyAddress() {
 return Long.toString(replySequence.incrementAndGet());
}

這裏產生從1開始的步長爲1的自增數列address。

Handler<Message<T>> simpleReplyHandler = convertHandler(replyHandler); HandlerRegistration<T> registration = new HandlerRegistration<>(vertx, metrics, this, replyAddress, message.address, true, replyHandler, timeout); registration.handler(simpleReplyHandler);

裏面的this是eventBusImpl,並在handler()方法裏把 boolean replyHander的值置爲true.

這樣,eventBusImpl的handlerMap變量裏,就多了<replyAddress, replyHander>。

在cuomser處調用reply()後,會在eventBusImpl的內部類ReplySendContextImpl<T> extends SendContextImpl 的參與下,走相似send()的流程。區別是最後在deliverToHandler()方法裏,會判斷boolean  replyHander的值,若是是true調用完畢就註銷.

 

錯誤代碼測驗:

vertx.eventBus().consumer("1", ar -> { System.out.println("我不該該在這裏" + ar.body()); ar.reply("對不起,其實我是阿杜。"); }); vertx.eventBus().consumer("address1", ar -> { System.out.println("consumer:" + ar.body()); ar.reply("我是高帥富"); }); vertx.eventBus().send("address1", "測試消息", ar -> { if (ar.succeeded()) { System.out.println("sender:接收收到的迴應是:"+ar.result().body()); }else{ System.out.println("發送失敗"); } }); 

存在consumer("1", ar -> {})的Console:

consumer:測試消息

我不該該在這裏我是高帥富

20:08:56.404 [vert.x-eventloop-thread-0] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024

20:08:56.405 [vert.x-eventloop-thread-0] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096

發送失敗

能夠看到上面的輸出徹底不是設想的結果。

 

若是不存在consumer("1", ar -> {})address爲1的Console:

consumer:測試消息 sender:接收收到的迴應是:我是高帥富

最後,再次提醒:使用eventBus時,不要使用純數字做爲自身業務的address。

相關文章
相關標籤/搜索