上篇介紹了guava包中的event bus的使用, 本篇深刻源碼瞭解實現細節數據庫
EventBus
瞭解
首先固然要看EventBus類, 類上的註釋說了不少, 總結以下:
1 EventBus保證在多線程環境下, 不會同時訪問訂閱者, 除非訂閱者標註了AllowConcurrentEvents註解. 若是沒有使用這個註解, 訂閱者不須要擔憂消息重入的問題.緩存
2 消息接收方法只能接收一個參數, 並且會根據消息類型路由消息到指定的訂閱者. 若是消息類型存在父子關係, 那麼發送子類消息, 父類訂閱者也會收到.安全
3 當執行call方法時, 全部該事件的訂閱者會順序執行, 若是一個訂閱者處理耗時(好比數據庫操做), 那麼整個線程會阻塞, 爲了解決這個問題, 咱們一來能夠儘可能減小訂閱者的耗時操做, 好比能夠異步. 另外一種就是可使用AsyncEventBus, AsyncEventBus採用多線程執行訂閱者方法.多線程
4 發出消息後, 若是沒有任何消費者消費, 那麼認爲這個消息是"dead". 咱們定義了DeadEvent類來專門處理這些消息, 咱們能夠監聽這類的消息. 若是訂閱者監聽的消息是Object, 那麼會收到全部的消息, 就不存在"dead" 消息了.app
EventBus 屬性以下less
// event bus 標識, 就是個名字 private final String identifier; // jdk executor, 用來執行線程 private final Executor executor; // 異常處理器 private final SubscriberExceptionHandler exceptionHandler; // 用於註冊訂閱者 private final SubscriberRegistry subscribers = new SubscriberRegistry(this); // 消息分發器 private final Dispatcher dispatcher;
大概瞭解後, 咱們從簡單例子出發異步
@Test public void 科一() { AwesomeEventBusDriver.register(new AwesomeStudent()); AwesomeEventBusDriver.publishAnything("經過~"); }
這裏省略了一步EventBus建立過程 EventBus eventBus = new EventBus();
源碼以下, 很簡單, 設置EventBus的名字, 建立默認的一些類. 這些類咱們在使用到的地方再看ide
/** Creates a new EventBus named "default". */ public EventBus() { this("default"); } public EventBus(String identifier) { this( identifier, MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), LoggingHandler.INSTANCE); }
註冊
建立EventBus後開始註冊事件post
public void register(Object object) { subscribers.register(object); }
註冊由SubscriberRegistry處理, 原理就是掃描listener, 而後把符合要求的(有訂閱註解)方法存儲到集合中, 集合的key是listener方法中的入參類型, 也就是後面消息路由的依據. 每個key(消息類型)對應一個list(多個訂閱者). 那咱們就先來看下SubscriberRegistry類. 註釋就一句話, 該類是用來註冊訂閱者到一個事件總線上的. 事件總線在實例化SubscriberRegistry類的時候指定學習
SubscriberRegistry(EventBus bus) { this.bus = checkNotNull(bus); }
SubscriberRegistry類屬性除了EventBus還有一個集合類, 用於存儲註冊的訂閱者
/** * All registered subscribers, indexed by event type. * * <p>The {@link CopyOnWriteArraySet} values make it easy and relatively lightweight to get an * immutable snapshot of all current subscribers to an event without any locking. */ private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers = Maps.newConcurrentMap();
爲了線程安全, 集合使用ConcurrentMap, 內部使用CopyOnWriteArraySet存儲相同路由的訂閱者, 註釋中解釋, 使用CopyOnWriteArraySet能夠在不是用鎖的狀況下, 很簡單並且相對輕量級的獲取訂閱一個事件(路由)的所有訂閱者的不可變快照. COW(Copy On Write)讀取不用加鎖是由於在修改數據的時候會複製一份, 讀取一份, 修改一份, 修改以後再替換以前的內容, 因此讀取的內容不會受修改數據影響. 在註冊階段, SubscriberRegistry會將訂閱者封裝爲Subscriber對象, 那咱們就來看下訂閱者的抽象Subscriber類.
按照慣例先看註釋, 一共就兩句話 1 這是一個訂閱者類(...) 2 若是兩個訂閱者都是同一個對象的同一個方法, 那麼這兩個訂閱者是相等的. 也就是說每一個訂閱者的座標是@Subscribe修飾方法以及方法所在的對象, 每一個座標惟一肯定一個訂閱者. 這樣能夠保證, 訂閱者重複註冊也只會收到一份消息.
好, 接下來是屬性, 很簡單.
/** 訂閱者註冊的事件總線 */ @Weak private EventBus bus; /** 訂閱者方法所在對象 */ @VisibleForTesting final Object target; /** 訂閱者方法. */ private final Method method; /** Executor to use for dispatching events to this subscriber. */ private final Executor executor;
比較有意思的是訂閱者的建立, 代碼以下(個人代碼怎麼都是public class呢...看着就low)
class Subscriber { /** Creates a {@code Subscriber} for {@code method} on {@code listener}. */ static Subscriber create(EventBus bus, Object listener, Method method) { return isDeclaredThreadSafe(method) ? new Subscriber(bus, listener, method) : new SynchronizedSubscriber(bus, listener, method); } ... private Subscriber(EventBus bus, Object target, Method method) { this.bus = bus; this.target = checkNotNull(target); this.method = method; method.setAccessible(true); this.executor = bus.executor(); } ... static final class SynchronizedSubscriber extends Subscriber { private SynchronizedSubscriber(EventBus bus, Object target, Method method) { super(bus, target, method); } ... } }
訂閱者類提供了靜態的建立方法, 方法中會根據條件建立兩種對象, 經過名字咱們能夠看出來, 一個是普通的, 一個是同步的. 不知道你們是否還記得EventBus類開頭註釋中有一段話. EventBus保證在多線程環境下, 不會同時訪問訂閱者, 除非訂閱者標註了AllowConcurrentEvents註解. 若是沒有使用這個註解, 訂閱者不須要擔憂消息重入的問題. 其中的原理就是這裏, 首先根據@AllowConcurrentEvents註解生成不一樣的兩個對象, 註解修飾的是普通訂閱者, 沒有修飾的是同步訂閱者. 也就是說默認狀況下, 訂閱者是線程安全的.
private static boolean isDeclaredThreadSafe(Method method) { return method.getAnnotation(AllowConcurrentEvents.class) != null; }
同步訂閱者由一個內部類實現, 惟一特殊的就是調用訂閱者方法時使用了同步關鍵詞
@Override void invokeSubscriberMethod(Object event) throws InvocationTargetException { synchronized (this) { // 裏面內容仍是調用的父類方法 super.invokeSubscriberMethod(event); } }
測試代碼以下
@Test public void 多線程開車() throws InterruptedException { MultiThreadSub threadSub = new MultiThreadSub(); AwesomeEventBusDriver.register(threadSub); int count = 10000; Thread t1 = new Thread(() -> { int c1 = count; while (c1 > 0) { c1--; AwesomeEventBusDriver.publishAnything(1); } }); Thread t2 = new Thread(() -> { int c1 = count; while (c1 > 0) { c1--; AwesomeEventBusDriver.publishAnything(1); } }); t1.start(); t2.start(); t1.join(); t2.join(); threadSub.print(); } public class MultiThreadSub { private int allowConcurrentSum = 0; private int noConcurrentSum = 0; @Subscribe @AllowConcurrentEvents public void addAllow(Integer i) { allowConcurrentSum += i; } @Subscribe public void addNo(Integer i) { noConcurrentSum += i; } public void print() { System.out.println("allowConcurrentSum: " + allowConcurrentSum); System.out.println("noConcurrentSum: " + noConcurrentSum); } }
結果
而後就是掃描註冊類了, 經過下面的方法獲取到註冊類全部的訂閱方法
private static ImmutableList<Method> getAnnotatedMethods(Class<?> clazz) { return subscriberMethodsCache.getUnchecked(clazz); }
getUnchecked方法調用層級比較深, 細節先不用關心, 最終會調用下面這個方法
private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) { Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes(); Map<MethodIdentifier, Method> identifiers = Maps.newHashMap(); for (Class<?> supertype : supertypes) { for (Method method : supertype.getDeclaredMethods()) { if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) { // TODO(cgdecker): Should check for a generic parameter type and error out Class<?>[] parameterTypes = method.getParameterTypes(); // 就是這裏在檢查參數的個數 checkArgument( parameterTypes.length == 1, "Method %s has @Subscribe annotation but has %s parameters." + "Subscriber methods must have exactly 1 parameter.", method, parameterTypes.length); MethodIdentifier ident = new MethodIdentifier(method); if (!identifiers.containsKey(ident)) { identifiers.put(ident, method); } } } } return ImmutableList.copyOf(identifiers.values()); } // 方法入口 /** * A thread-safe cache that contains the mapping from each class to all methods in that class and * all super-classes, that are annotated with {@code @Subscribe}. The cache is shared across all * instances of this class; this greatly improves performance if multiple EventBus instances are * created and objects of the same class are registered on all of them. */ private static final LoadingCache<Class<?>, ImmutableList<Method>> subscriberMethodsCache = CacheBuilder.newBuilder() .weakKeys() .build( new CacheLoader<Class<?>, ImmutableList<Method>>() { @Override public ImmutableList<Method> load(Class<?> concreteClass) throws Exception { return getAnnotatedMethodsNotCached(concreteClass); } });
該方法會掃描註冊類, 以及註冊類的全部父類, 將其中有@Subscribe的方法緩存下來, key是方法的參數類型. 這樣的好處就是同一個類不會掃描屢次.
方法中有一句註釋TODO(cgdecker): Should check for a generic parameter type and error out, 大概的意思是說後面版本應該檢查下泛型參數, 而且拋出異常. 那就來看一下對於泛型參數是如何處理的.
public class GenericObject<T> { @Subscribe public void TestGeneric(T obj) { System.out.println("generic " + obj); } } @Test public void 泛型() { AwesomeEventBusDriver.register(new GenericObject<String>()); AwesomeEventBusDriver.publishAnything("string"); AwesomeEventBusDriver.publishAnything(90); AwesomeEventBusDriver.publishAnything(100L); } // 執行結果 generic string generic 90 generic 100
從結果能夠看出來, 泛型沒有起做用, 其實也好理解, 泛型在編譯後會被擦除, 在運行時泛型類型會被認爲Object. 測試了泛型立刻就想到了基礎類型, debug到上述代碼, 能夠看到(方法參數類型爲int)註冊的方法參數爲int.class . 可是若是咱們發送一個數字, 好比5, 會自動裝箱爲Integer類型, 因此發送的時候就找不到對應的訂閱者. 可是若是把int類型換成爲Integer就能夠接收到消息了.
註冊階段到這裏就差很少, 原理很簡單, 細節你們能夠看看代碼.
發送
代碼以下
/** * Posts an event to all registered subscribers. This method will return successfully after the * event has been posted to all subscribers, and regardless of any exceptions thrown by * subscribers. * * <p>If no subscribers have been subscribed for {@code event}'s class, and {@code event} is not * already a {@link DeadEvent}, it will be wrapped in a DeadEvent and reposted. * * @param event event to post. */ public void post(Object event) { Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event); if (eventSubscribers.hasNext()) { dispatcher.dispatch(event, eventSubscribers); } else if (!(event instanceof DeadEvent)) { // the event had no subscribers and was not itself a DeadEvent post(new DeadEvent(this, event)); } }
先看註釋, 兩句話 1 發送方法會將事件發送給全部的訂閱者, 只要訂閱者收到了事件, 那麼發送方法就會放回成功, 不論在收到事件後執行是否報錯. 也就是說訂閱者執行報錯, 那麼本次的事件消息也就丟失了, 你們在使用的時候要考慮好場景. 2 若是該事件沒有訂閱者, 並且不是DeadEvent類, 則會把該事件包裝爲DeadEvent類從新發送.
若是註冊階段看的比較明白, 那發送的邏輯就很是簡單了.
首先根據發送的消息類型拿到對應的訂閱者, 而後調用Dispatcher類的dispatch方法. Dispatcher實例是在實例化EventBus時候建立的, 同步使用的是PerThreadQueuedDispatcher, 異步使用的是LegacyAsyncDispatcher. 先來看同步狀況.
/** Per-thread queue of events to dispatch. */ private final ThreadLocal<Queue<Event>> queue = new ThreadLocal<Queue<Event>>() { @Override protected Queue<Event> initialValue() { return Queues.newArrayDeque(); } }; /** Per-thread dispatch state, used to avoid reentrant event dispatching. */ private final ThreadLocal<Boolean> dispatching = new ThreadLocal<Boolean>() { @Override protected Boolean initialValue() { return false; } }; @Override void dispatch(Object event, Iterator<Subscriber> subscribers) { checkNotNull(event); checkNotNull(subscribers); // 當前線程保存的一個隊列, 該隊列就是後面的發送隊列 Queue<Event> queueForThread = queue.get(); // 將要分發的事件加入隊列 queueForThread.offer(new Event(event, subscribers)); if (!dispatching.get()) { // 若是沒有正在發送的話就發送 dispatching.set(true); // 設置正在發送(? 多線程臨界值) try { Event nextEvent; while ((nextEvent = queueForThread.poll()) != null) { while (nextEvent.subscribers.hasNext()) { // 拿到訂閱者類Subscriber, 並執行發送方法 nextEvent.subscribers.next().dispatchEvent(nextEvent.event); } } } finally { dispatching.remove(); queue.remove(); } } }
經過類名字PerThreadQueuedDispatcher, 大概知道這是"一個線程一個隊列"發送器(...). 看到該類的兩個ThreadLocal屬性, 大概也明白PreThread的意思了. 發送時先從ThreadLocal中拿到隊列, 而後將要發送的內容插入到隊列中, 接着根據另外一個ThreadLocal對象dispatching判斷當前是否正在發送, 若是正在發送, 由於消息已經進入隊列了, 因此就不用管了. 若是沒有發送, 則執行發送邏輯. 從隊列中取出數據, 而後執行數據訂閱者的dispatchEvent方法. 訂閱者的同步發送方法前面講過, 最後就是一個反射. 再來看下異步發送, 異步EventBus實例化的時候, 惟一的不一樣就是入參給了一個線程池, 並指定Dispatcher爲LegacyAsyncDispatcher, 在發送邏輯上也很簡單, 與同步不一樣就是隊列使用了線程安全的類來保存. 其餘沒有什麼大的區別.
若是你是一個敏感的男銀, 可能已經發現還有一個Dispatcher類ImmediateDispatcher, 可是做者沒有找到它在包中使用過. 該類更簡單了, 從名字就知道, 它不用隊列存儲消息, 而是直接將消息發送給訂閱者.
總結
至此EventBus的學習就結束了, 本文從源碼入手從頭過了一遍事件總線的執行過程,做者沒有一行一行解釋源碼, 而只是講解大致流程, 而後拿出比較有意思的部分, 講了下本身的看法. 後面若是碰到有意思的點再繼續補充.