Guava源碼學習(五)EventBus

基於版本:Guava 22.0git

Wiki:EventBusgithub

 

0. EventBus簡介安全

提供了發佈-訂閱模型,能夠方便的在EventBus上註冊訂閱者,發佈者能夠簡單的將事件傳遞給EventBus,EventBus會自動將事件傳遞給相關聯的訂閱者。併發

支持同步/異步模式。異步

只能用於線程間通訊。ide

 

1. EventBus類圖工具

EventBus是同步實現oop

AsyncEventBus是異步實現post

 

2. 代碼實例學習

        EventBus eventBus = new EventBus();
        
        eventBus.register(new Object() {
            @Subscribe
            public void listen(Object subReport) throws Exception {
                System.out.println("receive object event!");
            }

            @Subscribe
            public void listen(Integer subReport) throws Exception {
                System.out.println("receive integer event!");
            }
        });

        eventBus.post(Integer.valueOf(1));

這段代碼的輸出以下:

receive integer event!
receive object event!

能夠看到咱們聲明瞭一個EventBus,而後向其註冊了兩個訂閱者(含有Subscribe註解的方法),而後調用post方法向EventBus發佈了一條消息。

消息類型是Integer,因爲訂閱者的關注對象是Integer與Object,都與這條消息有關,因此兩個訂閱者都收到了通知。

可是這個發佈-訂閱模式是如何實現的呢?咱們下面來逐步分析一下EventBus的源碼。

 

3. EventBus.register()

EventBus.register /**
   * Registers all subscriber methods on {@code object} to receive events.
   *
   * @param object object whose subscriber methods should be registered.
   */
  public void register(Object object) {
    subscribers.register(object);
  }


SubscriberRegistry.register /**
   * Registers all subscriber methods on the given listener object.
   */
  void register(Object listener) {
    Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);//獲取傳入的listener中含有的全部的訂閱者

    for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
      Class<?> eventType = entry.getKey();//訂閱的事件類型
      Collection<Subscriber> eventMethodsInListener = entry.getValue();//對應的訂閱者自己

      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);

      if (eventSubscribers == null) {
        CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>();
        eventSubscribers =
            MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);//線程安全的修改subscribers對象
      }

      eventSubscribers.addAll(eventMethodsInListener);
    }
  }


  /**
   * Returns all subscribers for the given listener grouped by the type of event they subscribe to.
    分析傳入的對象,遍歷其中全部含有Subscribe註解並且只含有一個參數的方法,而後將其包裝成訂閱者並返回。
   */
  private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
    Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
    Class<?> clazz = listener.getClass();
    for (Method method : getAnnotatedMethods(clazz)) {//getAnnotatedMethods方法最終會調用到下面的SubscriberRegistry.getAnnotatedMethodsNotCached方法,這個方法會用反射處理傳入的clazz及其全部的父類,提取出含有Subscribe註解而且有且只有一個參數的方法
      Class<?>[] parameterTypes = method.getParameterTypes();
      Class<?> eventType = parameterTypes[0];//獲取這個方法的惟一參數的Class
      methodsInListener.put(eventType, Subscriber.create(bus, listener, method));//將EventBus,訂閱者對象,訂閱者方法包裝一下並放入map,後續觸發事件的時候會用到
    }
    return methodsInListener;
  }

SubscriberRegistry.getAnnotatedMethodsNotCached private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) {
    Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();//連clazz的父類也會處理
    Map<MethodIdentifier, Method> identifiers = Maps.newHashMap();
    for (Class<?> supertype : supertypes) {
      for (Method method : supertype.getDeclaredMethods()) {
        if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) {//含有Subscribe註解,並且不是合成的方法
          // TODO(cgdecker): Should check for a generic parameter type and error out
          Class<?>[] parameterTypes = method.getParameterTypes();
          checkArgument(
              parameterTypes.length == 1,
              "Method %s has @Subscribe annotation but has %s parameters."
                  + "Subscriber methods must have exactly 1 parameter.",
              method,
              parameterTypes.length);//方法必須有且只有一個參數

          MethodIdentifier ident = new MethodIdentifier(method);
          if (!identifiers.containsKey(ident)) {
            identifiers.put(ident, method);
          }
        }
      }
    }
    return ImmutableList.copyOf(identifiers.values());
  }

 從SubscriberRegistry.getAnnotatedMethods到SubscriberRegistry.getAnnotatedMethodsNotCached的跳轉會涉及到Guava的LoadingCache,其餘的邏輯都很是直觀。

register方法的大概流程以下:

a. 解析傳入的Object對象,用反射分析對應的class及其全部父類,找到全部用Subscribe註解修飾,並且只有一個參數的方法。由這個方法能夠生成訂閱者對象。

b. 解析這個方法的參數(訂閱者訂閱的事件類型)

c. 每一個EventBus在初始化時都會與一個SubscriberRegistry關聯,這個SubscriberRegistry內部維護了一個名爲subscribers的ConcurrentMap,這個ConcurrentMap的 key是EventBus關心的事件類型,value是訂閱了這些事件的訂閱者的集合,subscribers在後續發佈事件的流程中很是有用(能夠經過發佈的事件類型,找到這個事件所關聯的全部訂閱者,並通知這些相關的訂閱者)。如今用a,b中的信息來更新subscribers,因爲subscribers可能被併發操做,因此用到了ConcurrentMap.putIfAbsent方法以保證線程安全。

總之,調用register方法會更新subscribers,subscribers中含有事件與訂閱者的關聯關係。

 

4. EventBus.post()

EventBus.post public void post(Object event) {
    Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);//獲取發佈事件所關聯的全部訂閱者(包括event的全部父類/接口所關聯的訂閱者)
    if (eventSubscribers.hasNext()) {
      dispatcher.dispatch(event, eventSubscribers);//用分派器觸發全部訂閱者的訂閱方法,分派器有同步/異步的不一樣實現
    } else if (!(event instanceof DeadEvent)) {
      // the event had no subscribers and was not itself a DeadEvent
      post(new DeadEvent(this, event));//若是這個事件沒有訂閱者,則將這個事件包裝爲DeadEvent事件,而後從新觸發post方法
    }
  }


SubscriberRegistry.getSubscribers /**
   * Gets an iterator representing an immutable snapshot of all subscribers to the given event at
   * the time this method is called.
   */
  Iterator<Subscriber> getSubscribers(Object event) {
    ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass());//flattenHierarchy方法會解析出event的class,及其全部父類/接口的class

    List<Iterator<Subscriber>> subscriberIterators =
        Lists.newArrayListWithCapacity(eventTypes.size());

    for (Class<?> eventType : eventTypes) {//遍歷event關聯的全部class,找到這些class關聯的全部訂閱者,添加到subscriberIterators中
      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
      if (eventSubscribers != null) {
        // eager no-copy snapshot
        subscriberIterators.add(eventSubscribers.iterator());
      }
    }

    return Iterators.concat(subscriberIterators.iterator());
  }

SubscriberRegistry.flattenHierarchy /**
   * Flattens a class's type hierarchy into a set of {@code Class} objects including all
   * superclasses (transitively) and all interfaces implemented by these superclasses.
   */
  @VisibleForTesting
  static ImmutableSet<Class<?>> flattenHierarchy(Class<?> concreteClass) {
    try {
      return flattenHierarchyCache.getUnchecked(concreteClass);//這裏會用Guava的TypeToken工具解析出concreteClass的class,以及全部父類/接口的class並返回
    } catch (UncheckedExecutionException e) {
      throw Throwables.propagate(e.getCause());
    }
  }

post方法的大概流程以下:

a. 用Guava的TypeToken工具解析Event的class,以及Event的全部父類/接口的class,全部訂閱了這些class的訂閱者後續都會收到通知

b. 調用EventBus關聯的Dispatcher的dispatcher方法,將Event發佈給全部關聯的訂閱者。

下一步咱們會解析Dispatcher,以及Dispatcher是如何實現EventBus的同步/異步語義的。

 

5. EventBus的默認Dispatcher

EventBus
  /**
   * Creates a new EventBus named "default".
   */
  public EventBus() {
    this("default");
  }

  /**
   * Creates a new EventBus with the given {@code identifier}.
   *
   * @param identifier a brief name for this bus, for logging purposes. Should be a valid Java
   *     identifier.
   */
  public EventBus(String identifier) {
    this(
        identifier,
        MoreExecutors.directExecutor(),//由當前線程直接運行提交任務的「線程池」
        Dispatcher.perThreadDispatchQueue(),
        LoggingHandler.INSTANCE);
  }


Dispatcher.perThreadDispatchQueue()
  static Dispatcher perThreadDispatchQueue() {
    return new PerThreadQueuedDispatcher();
  }

Dispatcher.PerThreadQueuedDispatcher /**
   * Implementation of a {@link #perThreadDispatchQueue()} dispatcher.
   */
  private static final class PerThreadQueuedDispatcher extends Dispatcher {

    // This dispatcher matches the original dispatch behavior of EventBus.

    /**
     * Per-thread queue of events to dispatch.
     */
    private final ThreadLocal<Queue<Event>> queue =
        new ThreadLocal<Queue<Event>>() {
          @Override
          protected Queue<Event> initialValue() {
            return Queues.newArrayDeque();
          }
        };//爲每一個調用post方法的工做線程維護一個發佈隊列,工做線程獨立操做這個隊列完成時間發佈流程,因此是線程安全的

    /**
     * Per-thread dispatch state, used to avoid reentrant event dispatching.
     */
    private final ThreadLocal<Boolean> dispatching =
        new ThreadLocal<Boolean>() {
          @Override
          protected Boolean initialValue() {
            return false;
          }
        };

    @Override
    void dispatch(Object event, Iterator<Subscriber> subscribers) {
      checkNotNull(event);
      checkNotNull(subscribers);
      Queue<Event> queueForThread = queue.get();
      queueForThread.offer(new Event(event, subscribers));//將發佈事件+事件所關聯的全部訂閱者包裝成一個Event對象,並提交至發佈隊列

      if (!dispatching.get()) {
        dispatching.set(true);
        try {
          Event nextEvent;
          while ((nextEvent = queueForThread.poll()) != null) {//嘗試從發佈隊列中poll元素
            while (nextEvent.subscribers.hasNext()) {//遍歷發佈事件的全部訂閱者,向訂閱者派發事件,因爲EventBus默認使用的線程池是MoreExecutors.directExecutor(),因此實際上發佈者會串行並且同步的向事件的全部訂閱者派發事件,直到所有派發結束,post方法纔會返回,因此EventBus在默認狀況下是同步的。
              nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
            }
          }
        } finally {
          dispatching.remove();
          queue.remove();
        }
      }
    }

    private static final class Event {
      private final Object event;
      private final Iterator<Subscriber> subscribers;

      private Event(Object event, Iterator<Subscriber> subscribers) {
        this.event = event;
        this.subscribers = subscribers;
      }
    }
  }


Subscriber.dispatchEvent /**
   * Dispatches {@code event} to this subscriber using the proper executor.
   */
  final void dispatchEvent(final Object event) {//向訂閱者派發事件
    executor.execute(
        new Runnable() {
          @Override
          public void run() {
            try {
              invokeSubscriberMethod(event);//用反射調用訂閱的方法
            } catch (InvocationTargetException e) {
              bus.handleSubscriberException(e.getCause(), context(event));
            }
          }
        });
  }


Subscriber.invokeSubscriberMethod /**
   * Invokes the subscriber method. This method can be overridden to make the invocation
   * synchronized.
   */
  @VisibleForTesting
  void invokeSubscriberMethod(Object event) throws InvocationTargetException {
    try {
      method.invoke(target, checkNotNull(event));
    } catch (IllegalArgumentException e) {
      throw new Error("Method rejected target/argument: " + event, e);
    } catch (IllegalAccessException e) {
      throw new Error("Method became inaccessible: " + event, e);
    } catch (InvocationTargetException e) {
      if (e.getCause() instanceof Error) {
        throw (Error) e.getCause();
      }
      throw e;
    }
  }

同步語義的實現關鍵在於

a. 使用的線程池爲MoreExecutors.directExecutor(),這實際上不能算是一個真正的線程池,全部提交到這個池子裏的任務,都是由提交任務的線程自身來執行的。

b. EventBus爲每一個調用post方法的線程維護了一個發佈隊列,工做線程會將事件提交到這個私有隊列裏,而後逐個通知事件所關聯的訂閱者,因爲使用的線程池是MoreExecutors.directExecutor(),因此這個過程其實是徹底串行並且同步執行的,調用post方法的工做線程實際上會在將事件通知給全部訂閱者後纔會返回,從而實現了同步的EventBus語義。

 

6. AsyncEventBus的Dispatcher

AsyncEventBus public AsyncEventBus(Executor executor) {
    super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);

Dispatcher.legacyAsync()
  static Dispatcher legacyAsync() {
    return new LegacyAsyncDispatcher();
  }

Dispatcher.LegacyAsyncDispatcher private static final class LegacyAsyncDispatcher extends Dispatcher {

    // This dispatcher matches the original dispatch behavior of AsyncEventBus.
    //
    // We can't really make any guarantees about the overall dispatch order for this dispatcher in
    // a multithreaded environment for a couple reasons:
    //
    // 1. Subscribers to events posted on different threads can be interleaved with each other
    //    freely. (A event on one thread, B event on another could yield any of
    //    [a1, a2, a3, b1, b2], [a1, b2, a2, a3, b2], [a1, b2, b3, a2, a3], etc.)
    // 2. It's possible for subscribers to actually be dispatched to in a different order than they
    //    were added to the queue. It's easily possible for one thread to take the head of the
    //    queue, immediately followed by another thread taking the next element in the queue. That
    //    second thread can then dispatch to the subscriber it took before the first thread does.
    //
    // All this makes me really wonder if there's any value in queueing here at all. A dispatcher
    // that simply loops through the subscribers and dispatches the event to each would actually
    // probably provide a stronger order guarantee, though that order would obviously be different
    // in some cases.

    /**
     * Global event queue.
     */
    private final ConcurrentLinkedQueue<EventWithSubscriber> queue = Queues.newConcurrentLinkedQueue();//公用的,支持併發訪問的發佈隊列

    @Override
    void dispatch(Object event, Iterator<Subscriber> subscribers) {
      checkNotNull(event);
      while (subscribers.hasNext()) {//將Event+訂閱者包裝成EventWithSubscriber,放入發佈隊列中
        queue.add(new EventWithSubscriber(event, subscribers.next()));
      }

      EventWithSubscriber e;
      while ((e = queue.poll()) != null) {
        e.subscriber.dispatchEvent(e.event);//從發佈隊列中提取EventWithSubscriber,而後將事件發佈給訂閱者,這一過程是發送到AsyncEventBus初始化時提供的線程池裏執行的,因此是異步的。也就是說post方法返回的時候,可能發佈過程還沒有完成,還有訂閱者沒有收到消息。
      }
    }

    private static final class EventWithSubscriber {
      private final Object event;
      private final Subscriber subscriber;

      private EventWithSubscriber(Object event, Subscriber subscriber) {
        this.event = event;
        this.subscriber = subscriber;
      }
    }
  }

異步語義實現的關鍵在於:

a. 發佈事件使用的線程池是建立AsyncEventBus時傳入的線程池,通常來講都是正常的含有多個工做線程的線程池,提交到線程池裏的任務能夠被異步執行

b. LegacyAsyncDispatcher中維護了一個公用的ConcurrentLinkedQueue,這個AsyncEventBus收到的全部Event都會被put到這個隊列中,put完Event的線程同時也會從這個隊列中poll Event,而後submit這些Event到線程池中,若是這個線程池是普通的含有工做線程的線程池,那麼submit完Event以後post方法便可當即返回,傳遞Event給訂閱者的任務會由線程池裏的工做線程完成。這樣就實現了異步語義。

 

 

總的來講,EventBus的源碼仍是比較清晰易懂的,實現手法也很是優雅,值得咱們學習。

相關文章
相關標籤/搜索