最近看Elastic-Job源碼,看到它裏面實現的任務運行軌跡的持久化,使用的是Guava的AsyncEventBus,一個內存級別的異步事件總線服務,實現了簡單的生產-消費者模式,從而在不影響任務執行效率的基礎上,將任務執行和任務軌跡記錄解耦,大大提升了EJ的性能。java
EventBus的使用方法不難,具體能夠參考EJ裏面幾個相關的類:JobEventListener、JobEventBus和LiteJobFacade。主要的流程以下:緩存
@Override public void postJobExecutionEvent(final JobExecutionEvent jobExecutionEvent) { jobEventBus.post(jobExecutionEvent); } @Override public void postJobStatusTraceEvent(final String taskId, final State state, final String message) { TaskContext taskContext = TaskContext.from(taskId); jobEventBus.post(new JobStatusTraceEvent(taskContext.getMetaInfo().getJobName(), taskContext.getId(), taskContext.getSlaveId(), Source.LITE_EXECUTOR, taskContext.getType(), taskContext.getMetaInfo().getShardingItems().toString(), state, message)); if (!Strings.isNullOrEmpty(message)) { log.trace(message); } }
言歸正傳,咱們來看看EventBus究竟是如何實現觀察者模式的。他的主要實現類都在com.google.common.eventbus這個包下面。安全
咱們首先來看一下里面比較重要的幾個類,同時理解一些概念。多線程
這個類有幾個屬性:併發
private final String identifier;//惟一標識,默認爲default private final Executor executor;//多線程處理器,默認MoreExecutors.directExecutor() private final SubscriberExceptionHandler exceptionHandler;//異常處理器 private final SubscriberRegistry subscribers = new SubscriberRegistry(this);//訂閱註冊表 private final Dispatcher dispatcher;//消息分發器,默認爲Dispatcher.perThreadDispatchQueue(),單線程消息分發隊列
其中,identifier表示,同一個應用中,能夠根據identifier來區分不一樣的事件總線,只不過默認爲default而已。異步
EventBus主要定義了幾個方法:async
public void register(Object object) { subscribers.register(object); }
註冊的是本身定義的監聽器,也就是listener。ide
public void unregister(Object object) { subscribers.unregister(object); }
相似於註冊。源碼分析
public void post(Object event) { Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event); if (eventSubscribers.hasNext()) { dispatcher.dispatch(event, eventSubscribers); } else if (!(event instanceof DeadEvent)) { // the event had no subscribers and was not itself a DeadEvent post(new DeadEvent(this, event)); } }
這塊主要是根據event事件類型,來獲取事件的訂閱者,而後進行事件消息的分發。固然,若是沒有訂閱者,也就是event的類型是DeadEvent,也會進行對應的處理。post
繼承自EventBus,主要區別在於分發器,使用的是Dispatcher.legacyAsync()。這個後續我們再分析。
乍看這個類,就是訂閱者,其實咱們看源碼就能理解,當一個訂閱類的多個方法用@Subscribe註解時,每一個被註解的方法對應的是一個訂閱者。
這個類只是package內可見,沒有定義爲public,能夠經過靜態方法create來建立它。
static Subscriber create(EventBus bus, Object listener, Method method) { return isDeclaredThreadSafe(method) ? new Subscriber(bus, listener, method) : new SynchronizedSubscriber(bus, listener, method); }
這裏傳入的method就是使用了@Subscribe註解的方法,這塊會先判斷這個方法是否線程安全,便是否使用@AllowConcurrentEvent來進行註解,來建立不一樣的Subscriber。惟一的差異是SynchronizedSubscriber中一個方法使用了synchronized來修飾。
final void dispatchEvent(final Object event) { executor.execute( new Runnable() { @Override public void run() { try { invokeSubscriberMethod(event); } catch (InvocationTargetException e) { bus.handleSubscriberException(e.getCause(), context(event)); } } }); }
調用多線程來處理event。
@VisibleForTesting void invokeSubscriberMethod(Object event) throws InvocationTargetException { try { method.invoke(target, checkNotNull(event)); } catch (IllegalArgumentException e) { throw new Error("Method rejected target/argument: " + event, e); } catch (IllegalAccessException e) { throw new Error("Method became inaccessible: " + event, e); } catch (InvocationTargetException e) { if (e.getCause() instanceof Error) { throw (Error) e.getCause(); } throw e; } }
調用訂閱者的方法。
咱們以前在講到EventBus時,裏面有兩個方法register和unregister,調用的就是這個類的方法。這個類的做用也講到,是存儲event和對應的訂閱者的關係的。咱們來看一下這個類的設計。
private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers = Maps.newConcurrentMap(); @Weak private final EventBus bus;
這個類有兩個屬性。
註冊監聽器。
void register(Object listener) { Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener); for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) { Class<?> eventType = entry.getKey(); Collection<Subscriber> eventMethodsInListener = entry.getValue(); CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType); if (eventSubscribers == null) { CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>(); eventSubscribers = MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet); } eventSubscribers.addAll(eventMethodsInListener); } }
主要的邏輯是:
實現與register相似,先根據listener找到subscriber,找到須要監聽的方法,而後根據事件類型去移除subscriber。
獲取監聽器中全部的監聽方法。
private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) { Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create(); Class<?> clazz = listener.getClass(); for (Method method : getAnnotatedMethods(clazz)) { Class<?>[] parameterTypes = method.getParameterTypes(); Class<?> eventType = parameterTypes[0]; methodsInListener.put(eventType, Subscriber.create(bus, listener, method)); } return methodsInListener; }
findAllSubscribers用於查找事件類型以及事件處理器的對應關係。查找註解須要涉及到反射,經過反射來獲取標註在方法上的註解。由於Guava針對EventBus的註冊採起的是「隱式契約」而非接口這種「顯式契約」。而類與接口是存在繼承關係的,全部頗有可能某個訂閱者其父類(或者父類實現的某個接口)也訂閱了某個事件。所以這裏的查找須要順着繼承鏈向上查找父類的方法是否也被註解標註。
獲取event的訂閱者。
Iterator<Subscriber> getSubscribers(Object event) { ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass()); List<Iterator<Subscriber>> subscriberIterators = Lists.newArrayListWithCapacity(eventTypes.size()); for (Class<?> eventType : eventTypes) { CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType); if (eventSubscribers != null) { // eager no-copy snapshot subscriberIterators.add(eventSubscribers.iterator()); } } return Iterators.concat(subscriberIterators.iterator()); }
分發器,用於將event分發給subscriber。它內部實現了三種不一樣類型的分發器,用於不一樣的狀況下事件的順序性。它的核心方法是:
abstract void dispatch(Object event, Iterator<Subscriber> subscribers);
它的三種實現:
EventBus默認使用的分發器。它的實現是經過ThreadLocal來實現一個事件隊列,每一個線程包含一個這樣的內部隊列。
它的分發代碼以下:
void dispatch(Object event, Iterator<Subscriber> subscribers) { checkNotNull(event); checkNotNull(subscribers); Queue<Event> queueForThread = queue.get(); queueForThread.offer(new Event(event, subscribers)); if (!dispatching.get()) { dispatching.set(true); try { Event nextEvent; while ((nextEvent = queueForThread.poll()) != null) { while (nextEvent.subscribers.hasNext()) { nextEvent.subscribers.next().dispatchEvent(nextEvent.event); } } } finally { dispatching.remove(); queue.remove(); } } }
嵌套兩層循環,第一層事件不爲空,第二層該事件下的訂閱者不爲空,則分發事件下去。
AsyncEventBus使用的分發器。它在內部經過一個ConcurrentLinkedQueue
void dispatch(Object event, Iterator<Subscriber> subscribers) { checkNotNull(event); while (subscribers.hasNext()) { queue.add(new EventWithSubscriber(event, subscribers.next())); } EventWithSubscriber e; while ((e = queue.poll()) != null) { e.subscriber.dispatchEvent(e.event); } }
是一前一後兩個循環。前面一個是遍歷事件訂閱處理器,並構建一個事件實體對象存入隊列。後一個循環是遍歷該事件實體對象隊列,取出事件實體對象中的事件進行分發。
同步分發器。
void dispatch(Object event, Iterator<Subscriber> subscribers) { checkNotNull(event); while (subscribers.hasNext()) { subscribers.next().dispatchEvent(event); } }
Elastic-Job使用的EventBus,能夠說很好的對任務的運行和軌跡記錄進行了解耦,借鑑了Guava的思想,將代碼優雅發揮到了新的境界。固然,Guava對EventBus的設計思想是咱們須要進行學習和使用的。