Guava源碼解析之EventBus

最近看Elastic-Job源碼,看到它裏面實現的任務運行軌跡的持久化,使用的是Guava的AsyncEventBus,一個內存級別的異步事件總線服務,實現了簡單的生產-消費者模式,從而在不影響任務執行效率的基礎上,將任務執行和任務軌跡記錄解耦,大大提升了EJ的性能。java

EventBus在Elastic-Job中的使用

EventBus的使用方法不難,具體能夠參考EJ裏面幾個相關的類:JobEventListener、JobEventBus和LiteJobFacade。主要的流程以下:緩存

  • JobEventListener主要是消費者。定義須要監聽的方法,目前主要定義了兩個listen方法,注意想監聽到的話,須要在方法前加上註解:@Subscribe和@AllowConcurrentEvents。看字面意思就是訂閱和容許併發事件。若是不加上後面那個註解,則會致使效率問題,這個我們後續分析。目前這個接口只有一個實現類JobEventRdbListener,實現了日誌寫入DB的操做。
  • JobEventBus參考的EventBus源碼,提供了register和post方法,去掉了unregister方法。主要的功能就是註冊監聽器和生產消息。他的構造方法中,默認使用的是Guava的AsyncEventBus,初始化中同時包含了註冊動做。
  • LiteJobFacade主要是JobEventBus的使用者。主要調用的是JobEventBus的post方法。
@Override
    public void postJobExecutionEvent(final JobExecutionEvent jobExecutionEvent) {
        jobEventBus.post(jobExecutionEvent);
    }
    
    @Override
    public void postJobStatusTraceEvent(final String taskId, final State state, final String message) {
        TaskContext taskContext = TaskContext.from(taskId);
        jobEventBus.post(new JobStatusTraceEvent(taskContext.getMetaInfo().getJobName(), taskContext.getId(),
                taskContext.getSlaveId(), Source.LITE_EXECUTOR, taskContext.getType(), taskContext.getMetaInfo().getShardingItems().toString(), state, message));
        if (!Strings.isNullOrEmpty(message)) {
            log.trace(message);
        }
    }

EventBus源碼分析

言歸正傳,咱們來看看EventBus究竟是如何實現觀察者模式的。他的主要實現類都在com.google.common.eventbus這個包下面。安全

主要類概念分析

咱們首先來看一下里面比較重要的幾個類,同時理解一些概念。多線程

  • EventBus:這個類的做用有兩個,一個是做爲一個總線通道,另外一個做用是消息的廣播。
  • AsyncEventBus:異步的EventBus,功能與EventBus相似,只不過實現方式有所差別。
  • Subscriber:能夠按照字面理解是訂閱者,也能夠說是監聽器。
  • SubscriberRegistry:訂閱註冊表。主要存儲的是Subcriber和Event之間的關係,用於消息分發時能夠迅速根據Event的類型找到Subscriber。
  • Dispatcher:事件分發器,定義了一些分發的策略,裏面包含三種分發器。
  • 兩個重要的註解@Subscribe和@AllowConcurrentEvents。第一個是標識監聽器的方法,第二個與第一個配合使用,標識容許多線程執行。
  • DeadEvent:死信對象,標識沒有訂閱者關注的事件。
  • SubscribeExceptionHandler:訂閱者拋出異常的處理器。SubscribeExceptionContext:訂閱者拋出異常的上下文對象。

EventBus

這個類有幾個屬性:併發

private final String identifier;//惟一標識,默認爲default
  private final Executor executor;//多線程處理器,默認MoreExecutors.directExecutor()
  private final SubscriberExceptionHandler exceptionHandler;//異常處理器

  private final SubscriberRegistry subscribers = new SubscriberRegistry(this);//訂閱註冊表
  private final Dispatcher dispatcher;//消息分發器,默認爲Dispatcher.perThreadDispatchQueue(),單線程消息分發隊列

其中,identifier表示,同一個應用中,能夠根據identifier來區分不一樣的事件總線,只不過默認爲default而已。異步

EventBus主要定義了幾個方法:async

註冊

public void register(Object object) {
    subscribers.register(object);
}

註冊的是本身定義的監聽器,也就是listener。ide

取消註冊

public void unregister(Object object) {
    subscribers.unregister(object);
}

相似於註冊。源碼分析

消息廣播

public void post(Object event) {
    Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
    if (eventSubscribers.hasNext()) {
      dispatcher.dispatch(event, eventSubscribers);
    } else if (!(event instanceof DeadEvent)) {
      // the event had no subscribers and was not itself a DeadEvent
      post(new DeadEvent(this, event));
    }
}

這塊主要是根據event事件類型,來獲取事件的訂閱者,而後進行事件消息的分發。固然,若是沒有訂閱者,也就是event的類型是DeadEvent,也會進行對應的處理。post

AsyncEventBus

繼承自EventBus,主要區別在於分發器,使用的是Dispatcher.legacyAsync()。這個後續我們再分析。

Subscriber

乍看這個類,就是訂閱者,其實咱們看源碼就能理解,當一個訂閱類的多個方法用@Subscribe註解時,每一個被註解的方法對應的是一個訂閱者。

構造

這個類只是package內可見,沒有定義爲public,能夠經過靜態方法create來建立它。

static Subscriber create(EventBus bus, Object listener, Method method) {
    return isDeclaredThreadSafe(method)
        ? new Subscriber(bus, listener, method)
        : new SynchronizedSubscriber(bus, listener, method);
}

這裏傳入的method就是使用了@Subscribe註解的方法,這塊會先判斷這個方法是否線程安全,便是否使用@AllowConcurrentEvent來進行註解,來建立不一樣的Subscriber。惟一的差異是SynchronizedSubscriber中一個方法使用了synchronized來修飾。

dispatchEvent

final void dispatchEvent(final Object event) {
    executor.execute(
        new Runnable() {
          @Override
          public void run() {
            try {
              invokeSubscriberMethod(event);
            } catch (InvocationTargetException e) {
              bus.handleSubscriberException(e.getCause(), context(event));
            }
          }
        });
  }

調用多線程來處理event。

invokeSubscriberMethod

@VisibleForTesting
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
    try {
      method.invoke(target, checkNotNull(event));
    } catch (IllegalArgumentException e) {
      throw new Error("Method rejected target/argument: " + event, e);
    } catch (IllegalAccessException e) {
      throw new Error("Method became inaccessible: " + event, e);
    } catch (InvocationTargetException e) {
      if (e.getCause() instanceof Error) {
        throw (Error) e.getCause();
      }
      throw e;
    }
}

調用訂閱者的方法。

SubscriberRegistry

咱們以前在講到EventBus時,裏面有兩個方法register和unregister,調用的就是這個類的方法。這個類的做用也講到,是存儲event和對應的訂閱者的關係的。咱們來看一下這個類的設計。

屬性

private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
      Maps.newConcurrentMap();

@Weak private final EventBus bus;

這個類有兩個屬性。

  • 第一個是ConcurrentMap,他的鍵是Class類,也就是Event的類型,值是CopyOnWriteArraySet ,也就是訂閱者。這個ConcurrentMap是Guava定義的併發Map,這個後續我們有機會再分析。
  • 第二個屬性就是EventBus。

register

註冊監聽器。

void register(Object listener) {
    Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);

    for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
      Class<?> eventType = entry.getKey();
      Collection<Subscriber> eventMethodsInListener = entry.getValue();

      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);

      if (eventSubscribers == null) {
        CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>();
        eventSubscribers =
            MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
      }

      eventSubscribers.addAll(eventMethodsInListener);
    }
}

主要的邏輯是:

  • 獲取這個類中全部用@Subscribe註解的方法,存儲到Multimap中。
  • 遍歷Multimap,鍵爲eventType,而後根據這個鍵,從緩存中獲取這個事件對應的訂閱者集合。
  • 獲取到以後,判斷集合是否爲空,若是爲空,新建一個集合來存儲。

unregister

實現與register相似,先根據listener找到subscriber,找到須要監聽的方法,而後根據事件類型去移除subscriber。

findAllSubscribers

獲取監聽器中全部的監聽方法。

private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
    Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
    Class<?> clazz = listener.getClass();
    for (Method method : getAnnotatedMethods(clazz)) {
      Class<?>[] parameterTypes = method.getParameterTypes();
      Class<?> eventType = parameterTypes[0];
      methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
    }
    return methodsInListener;
}

findAllSubscribers用於查找事件類型以及事件處理器的對應關係。查找註解須要涉及到反射,經過反射來獲取標註在方法上的註解。由於Guava針對EventBus的註冊採起的是「隱式契約」而非接口這種「顯式契約」。而類與接口是存在繼承關係的,全部頗有可能某個訂閱者其父類(或者父類實現的某個接口)也訂閱了某個事件。所以這裏的查找須要順着繼承鏈向上查找父類的方法是否也被註解標註。

getSubscribes

獲取event的訂閱者。

Iterator<Subscriber> getSubscribers(Object event) {
    ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass());

    List<Iterator<Subscriber>> subscriberIterators =
        Lists.newArrayListWithCapacity(eventTypes.size());

    for (Class<?> eventType : eventTypes) {
      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
      if (eventSubscribers != null) {
        // eager no-copy snapshot
        subscriberIterators.add(eventSubscribers.iterator());
      }
    }

    return Iterators.concat(subscriberIterators.iterator());
  }

Dispatcher

分發器,用於將event分發給subscriber。它內部實現了三種不一樣類型的分發器,用於不一樣的狀況下事件的順序性。它的核心方法是:

abstract void dispatch(Object event, Iterator<Subscriber> subscribers);

它的三種實現:

PerThreadQueuedDispatcher

EventBus默認使用的分發器。它的實現是經過ThreadLocal來實現一個事件隊列,每一個線程包含一個這樣的內部隊列。

它的分發代碼以下:

void dispatch(Object event, Iterator<Subscriber> subscribers) {
  checkNotNull(event);
  checkNotNull(subscribers);
  Queue<Event> queueForThread = queue.get();
  queueForThread.offer(new Event(event, subscribers));

  if (!dispatching.get()) {
    dispatching.set(true);
    try {
      Event nextEvent;
      while ((nextEvent = queueForThread.poll()) != null) {
        while (nextEvent.subscribers.hasNext()) {
          nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
        }
      }
    } finally {
      dispatching.remove();
      queue.remove();
    }
 }
}

嵌套兩層循環,第一層事件不爲空,第二層該事件下的訂閱者不爲空,則分發事件下去。

LegacyAsyncDispatcher

AsyncEventBus使用的分發器。它在內部經過一個ConcurrentLinkedQueue 的全局隊列來存儲事件。他和PerThreadQueuedDispatcher的主要區別在於分發循環這塊。

void dispatch(Object event, Iterator<Subscriber> subscribers) {
  checkNotNull(event);
  while (subscribers.hasNext()) {
    queue.add(new EventWithSubscriber(event, subscribers.next()));
  }

  EventWithSubscriber e;
  while ((e = queue.poll()) != null) {
    e.subscriber.dispatchEvent(e.event);
  }
}

是一前一後兩個循環。前面一個是遍歷事件訂閱處理器,並構建一個事件實體對象存入隊列。後一個循環是遍歷該事件實體對象隊列,取出事件實體對象中的事件進行分發。

ImmediateDispatcher

同步分發器。

void dispatch(Object event, Iterator<Subscriber> subscribers) {
  checkNotNull(event);
  while (subscribers.hasNext()) {
    subscribers.next().dispatchEvent(event);
  }
}

總結

Elastic-Job使用的EventBus,能夠說很好的對任務的運行和軌跡記錄進行了解耦,借鑑了Guava的思想,將代碼優雅發揮到了新的境界。固然,Guava對EventBus的設計思想是咱們須要進行學習和使用的。

相關文章
相關標籤/搜索