EventBus源碼學習筆記(二)

EventBus深刻學習二

開始研究源碼的設計思路,從Listener註冊出發,EventBus 如何維護監聽者信息,到Publisher發送消息,消息以怎樣的渠道分發給全部的Listener, 順序如何保證,傳遞性如何保證,出現異常如何處理,找不到監聽者怎麼處理等等html

EventBus

這個類至關於一箇中轉站,Publisher 調用它的 post(Object) 來推送事件;而後將事件一次推送給註冊的Listenerjava

1. 註冊關係的維護

在初始化s時, EventBus對象會維護一個 private final SubscriberRegistry subscribers = new SubscriberRegistry(this); 實例, 這個就是維護訂閱關係的核心類web

註冊方法以下spring

/**
   * Registers all subscriber methods on {@code object} to receive events.
   *
   * @param object  object whose subscriber methods should be registered.
   */
  public void register(Object object) {
    subscribers.register(object);
  }

接着咱們看下這個類的具體實現緩存

SubscriberRegistry.java數據結構

/**
   * All registered subscribers, indexed by event type.
   *
   * <p>The {@link CopyOnWriteArraySet} values make it easy and relatively lightweight to get an
   * immutable snapshot of all current subscribers to an event without any locking.
   */
  private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
      Maps.newConcurrentMap();
      

  /**
   * Registers all subscriber methods on the given listener object.
   */
  void register(Object listener) {
    Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);

    for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
      Class<?> eventType = entry.getKey();
      Collection<Subscriber> eventMethodsInListener = entry.getValue();

      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);

      if (eventSubscribers == null) {
        CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>();
        eventSubscribers = MoreObjects.firstNonNull(
            subscribers.putIfAbsent(eventType, newSet), newSet);
      }

      eventSubscribers.addAll(eventMethodsInListener);
    }
  }

subscribers : - 對象初始化時建立 - 維護的是EventType -> Listener的映射關係,value爲一個集合,說明一個事件能夠推送給多個Listener - 監聽者,能夠有能夠監聽多個不一樣類型的事件app

註冊流程: - 根據註冊的對象,將其中全部的回調方法都撈出來 - 將上步的結果塞入 subscribers 集合中; key爲 Listener的類名less

註冊流程詳解

註冊目的就是發佈消息後, EventBus 能夠將這個Event傳遞」 Listener(即訂閱方)ide

爲了實現上面的目的,若是要咱們本身實現,會怎麼作?post

  • 將類中,全部包含@Subscribe 註解的方法撈出來
  • 方法的第一個參數就是 Event, 由於註冊的目的是爲了實現回調, 因此封裝一個類,包含這個Listener對象的引用 + 要執行的方法

上面註冊的實際實現和上面的步驟差很少

  1. 獲取全部包含註解的方法

    • 實際的代碼以下

      private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) {
      Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();
      Map<MethodIdentifier, Method> identifiers = Maps.newHashMap();
      for (Class<?> supertype : supertypes) {
        for (Method method : supertype.getDeclaredMethods()) {
          if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) {
            // TODO(cgdecker): Should check for a generic parameter type and error out
            Class<?>[] parameterTypes = method.getParameterTypes();
            checkArgument(parameterTypes.length == 1,
                "Method %s has @Subscribe annotation but has %s parameters."
                    + "Subscriber methods must have exactly 1 parameter.",
                method, parameterTypes.length);
      
            MethodIdentifier ident = new MethodIdentifier(method);
            if (!identifiers.containsKey(ident)) {
              identifiers.put(ident, method);
            }
          }
        }
      }
      return ImmutableList.copyOf(identifiers.values());
      }
    • 看下上面的實現,很是有意思的是,不只將改對象中的全部@Subscribe註解的方法撈出來,連父類中的也不放過;就是這個 TypeToken.of(clazz).getTypes().rawTypes();

    • 從上面的限定,也能夠看出,對於回調方法是有要求的: 有且只能有一個參數, checkArgument(parameterTypes.length == 1,xxx)

    • 過濾重載的回調方法(這點比較有意思,搞了個Map, key由方法名+方法參數確認(MethodIdentifier的equals方法重寫了), 而不是直接用集合的contains方法, 請注意其中的區別)

    • method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic() 這個判斷條件的後一個能夠參考http://www.xue163.com/2122/1/21224778.html

  2. 將上面的方法轉換爲Map, 看這個 Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);

    • key爲事件類型Event.class; value爲一個包含 Listener, Method, EventBus 實例的對象 Subscriber
  3. 將上面的map塞入subscribers 集合

    • subscribers 集合包含的是全部的 (事件 -> 監聽者回調集合)Event -> Set<Listener.Method>
    • 簡單的迭代便可實現塞數據了
    • 根據 subscribers 的數據結構,其實能夠看到,一個Listener對象,若是註冊屢次,最終的效果實際上是同樣的,這個監聽者,並不會被調用屢次; 若是一個Lisntener類,有多個對象,則註冊後,每一個對象的回調都會執行到;
      • 實現緣由: Set 集合存儲 Subscriber對象
      • Subscriber 的 hashcode & equals 方法沒有重寫

到此, 註冊完畢;註銷的方法和上面差很少,惟一的區別是最後一個是向 subscribers 塞數據,一個是從其中刪數據而已

題外話

若是咱們想獲取工程中全部包含某個註解的類能夠怎麼辦?
   
   - 若是是用spring的話, 能夠考慮  `ApplicationContext.getBeansWithAnnotation()`
   
   獲取工程中,全部包含某個註解的方法,除了上面的主動註冊,有什麼其餘的方法?

###2. 推送事件

發佈方,調用EventBus.post(Object) 方法實現消息的推送

預測

正式開始以前,咱們能夠先預測一下,當發佈方調用了這個方法以後,會執行那些action

  1. 根據事件類型,能夠從註冊關係表subscribers中獲取出全部的監聽者,以及對應的回調方法, 放在一個集合中
  2. 由於監聽的前後順序可能有要求,那麼將上面的集合進行排序
  3. 循環遍歷上面,依次執行

上面是正向的操做流程,接着一些異常狀況和邊界也須要考慮下

  1. 若是一個事件找不到訂閱者如何處理
  2. 若是某個監聽者執行完畢以後,但願其以後的監聽者都不能接受這個事件(相似web應用中的攔截器,若是被攔截了,若是被攔截了,後面的攔截器也不必繼續執行)
  3. 某個監聽者很遺憾的拋出了個異常,會不會整調鏈路都掛掉

深刻解讀

帶着上面的臆測,來實際看下EventBus本身是怎麼玩的

/**
   * 將事件推送給全部的監聽者,無論監聽者是否正常處理,都是正確返回
   * Posts an event to all registered subscribers.  This method will return
   * successfully after the event has been posted to all subscribers, and
   * regardless of any exceptions thrown by subscribers.
   *
   * 若是一個事件沒有監聽者,且該事件不是 DeadEvent, 則轉爲 DeadEvent並從新推送
   * <p>If no subscribers have been subscribed for {@code event}'s class, and
   * {@code event} is not already a {@link DeadEvent}, it will be wrapped in a
   * DeadEvent and reposted.
   *
   * @param event  event to post.
   */
  public void post(Object event) {
    Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
    if (eventSubscribers.hasNext()) {
      dispatcher.dispatch(event, eventSubscribers);
    } else if (!(event instanceof DeadEvent)) {
      // the event had no subscribers and was not itself a DeadEvent
      post(new DeadEvent(this, event));
    }
  }

上面的解釋比較清楚, 基本上核心的推送就是 dispatcher.dispatch(event, eventSubscribers);

實際的使用的是 PerThreadQueuedDispatcher 推送代碼以下,邏輯比較清晰,將Event塞入隊列, 而後將隊列中的全部消息依次推送給全部的訂閱者

  • 能夠參考下面的設計,優雅的避免重複推送的問題
  • 考慮爲何要先寫入隊列,而後再依次推送隊列中的事件
/**
* Per-thread queue of events to dispatch.
*/
private final ThreadLocal<Queue<Event>> queue =
   new ThreadLocal<Queue<Event>>() {
     @Override
     protected Queue<Event> initialValue() {
       return Queues.newArrayDeque();
     }
   };

/**
* Per-thread dispatch state, used to avoid reentrant event dispatching.
*/
private final ThreadLocal<Boolean> dispatching =
   new ThreadLocal<Boolean>() {
     @Override
     protected Boolean initialValue() {
       return false;
     }
   };

@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
 checkNotNull(event);
 checkNotNull(subscribers);
 Queue<Event> queueForThread = queue.get();
 queueForThread.offer(new Event(event, subscribers));

 if (!dispatching.get()) {
   dispatching.set(true);
   try {
     Event nextEvent;
     while ((nextEvent = queueForThread.poll()) != null) {
       while (nextEvent.subscribers.hasNext()) {
         nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
       }
     }
   } finally {
     dispatching.remove();
     queue.remove();
   }
 }
}

最終真正執行推送Event的是這個方法 com.google.common.eventbus.Subscriber#dispatchEvent

/**
   * Dispatches {@code event} to this subscriber using the proper executor.
   */
  final void dispatchEvent(final Object event) {
    executor.execute(new Runnable() {
      @Override
      public void run() {
        try {
          invokeSubscriberMethod(event);
        } catch (InvocationTargetException e) {
          bus.handleSubscriberException(e.getCause(), context(event));
        }
      }
    });
  }
  
  /**
   * Invokes the subscriber method. This method can be overridden to make the invocation
   * synchronized.
   */
  @VisibleForTesting
  void invokeSubscriberMethod(Object event) throws InvocationTargetException {
    try {
      method.invoke(target, checkNotNull(event));
    } catch (IllegalArgumentException e) {
      throw new Error("Method rejected target/argument: " + event, e);
    } catch (IllegalAccessException e) {
      throw new Error("Method became inaccessible: " + event, e);
    } catch (InvocationTargetException e) {
      if (e.getCause() instanceof Error) {
        throw (Error) e.getCause();
      }
      throw e;
    }
  }

3. 圖解

上面從源碼的角度,對整個流程順了一遍,下面的圖對幾個主要的類結構進行了抽取,並對上面的幾個方法進行了簡要的說明

圖一, 將上面說明的幾個類屬性 + 方法進行了說明

輸入圖片說明

圖二, 對邏輯進行列舉

輸入圖片說明

輸入圖片說明

4. 新技能

1.根據class,獲取全部超類集合 (EventBus的實際使用中,Event的超類集合都塞入了緩存,加快查詢速度)

TypeToken.of(concreteClass).getTypes().rawTypes());

2.獲取類中標有註解的方法

private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz, Class annotationClz) {
    Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();
    Map<MethodIdentifier, Method> identifiers = Maps.newHashMap();
    for (Class<?> supertype : supertypes) {
      for (Method method : supertype.getDeclaredMethods()) {
        if (method.isAnnotationPresent(annotationClz) && !method.isSynthetic()) {
          // TODO(cgdecker): Should check for a generic parameter type and error out
          Class<?>[] parameterTypes = method.getParameterTypes();
          
          MethodIdentifier ident = new MethodIdentifier(method);
          if (!identifiers.containsKey(ident)) {
            identifiers.put(ident, method);
          }
        }
      }
    }
    return ImmutableList.copyOf(identifiers.values());
  }
  
  private static final class MethodIdentifier {

    private final String name;
    private final List<Class<?>> parameterTypes;

    MethodIdentifier(Method method) {
      this.name = method.getName();
      this.parameterTypes = Arrays.asList(method.getParameterTypes());
    }

    @Override
    public int hashCode() {
      return Objects.hashCode(name, parameterTypes);
    }

    @Override
    public boolean equals(@Nullable Object o) {
      if (o instanceof MethodIdentifier) {
        MethodIdentifier ident = (MethodIdentifier) o;
        return name.equals(ident.name) && parameterTypes.equals(ident.parameterTypes);
      }
      return false;
    }
  }
相關文章
相關標籤/搜索