從 EventBus 的介紹中,EventBus 給的定位是:設計模式
Event bus for Android and Java that simplifies communication between Activities, Fragments, Threads, Services, etc. Less code, better quality安全
簡單理解一下就是 Event bus 給 Android 及 Java (固然主要是 Android)的 Activity,Fragment,Threads,Services 之間提供一個簡單的通訊方式,從而能讓咱們用質量高且少的代碼來完成咱們的功能。bash
EventBus 是一個基於發佈/訂閱的設計模式的事件總線,其架構圖以下。網絡
從架構圖上來看,仍是很簡單的,跟咱們所最熟悉的觀察者模式是相似的。大概工做過程就是: (1) Subscriber 也就是訂閱者,訂閱一個 Event,其中 Event 是本身定義的,符合 POJO 的規範便可。 (2) 在須要時,Publisher 也就是發佈者,就可將事件經過 EventBus 分發布相應的訂閱者 Subscriber。數據結構
恩,就是如此簡單。固然,其實現遠不會這麼簡單了,仍是幹了不少髒活和累活的。多線程
EventBus 的官方文檔爲咱們提供了經典的使用 EventBus 的 3 個過程。架構
普通的 POJO 便可框架
public class MessageEvent {
public final String message;
public MessageEvent(String message) {
this.message = message;
}
}
複製代碼
文章是基於 EventBus 3 進行分析,對於訂閱者的方法命名能夠自由命名了,而不須要採用約定命名。異步
@Subscribe(threadMode = ThreadMode.MAIN)
public void onMessageEvent(MessageEvent event) {
Toast.makeText(getActivity(), event.message, Toast.LENGTH_SHORT).show();
}
// This method will be called when a SomeOtherEvent is posted
@Subscribe
public void handleSomethingElse(SomeOtherEvent event) {
doSomethingWith(event);
}
複製代碼
準備好了訂閱者以後,咱們還須要向總線註冊這個訂閱者,這個咱們的訂閱者方法才能接收到相應的事件。async
@Override
public void onStart() {
super.onStart();
EventBus.getDefault().register(this);
}
@Override
public void onStop() {
EventBus.getDefault().unregister(this);
super.onStop();
}
複製代碼
咱們能夠在任何地方發送事件。
EventBus.getDefault().post(new MessageEvent("Hello everyone!"));
複製代碼
根據前文的 3 步經典使用方法,源碼的分析大概也分紅以下 4 步來進行。
方法getDefault()
public static EventBus getDefault() {
if (defaultInstance == null) {
synchronized (EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}
複製代碼
getDefault() 方法就是一個「懶漢」的單例模式,目的就是讓咱們能在全局對 EventBus 的引用進行使用。值得關注的是,這個懶漢的單例模式實現並不會有多線程的安全問題。由於對於 defaultInstance 的定義是 volatile 的。
static volatile EventBus defaultInstance;
複製代碼
接下來繼續看看構造方法。
構造方法EventBus()
public EventBus() {
this(DEFAULT_BUILDER);
}
EventBus(EventBusBuilder builder) {
logger = builder.getLogger();
subscriptionsByEventType = new HashMap<>();
typesBySubscriber = new HashMap<>();
stickyEvents = new ConcurrentHashMap<>();
mainThreadSupport = builder.getMainThreadSupport();
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
backgroundPoster = new BackgroundPoster(this);
asyncPoster = new AsyncPoster(this);
indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
builder.strictMethodVerification, builder.ignoreGeneratedIndex);
logSubscriberExceptions = builder.logSubscriberExceptions;
logNoSubscriberMessages = builder.logNoSubscriberMessages;
sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
throwSubscriberException = builder.throwSubscriberException;
eventInheritance = builder.eventInheritance;
executorService = builder.executorService;
}
複製代碼
構造方法使用了常見的 Builder 模式對 EventBus 中的各個屬性進行了初始化。這裏使用的是默認配置。通常狀況下,咱們也都是使用 getDefault() 及其默認配置來獲取實例的。咱們也能夠經過配置 EventBusBuilder 來 build 出一個本身的實例,可是要注意的是,後面的註冊、註銷以及發送事件都要基於此實例來進行,不然就會發生事件錯亂髮錯的問題。
方法register() 若是不看方法的實現,根據經驗判斷,我想當 register 的發生,應該是將訂閱者中用於接收事件的方法與該事件關聯起來。那是否是這樣呢?
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
// 1.經過訂閱者類找到其訂閱方法
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
// 2. 而後進行逐個訂閱
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
複製代碼
首行來看看找訂閱方法。這裏封裝了一個專門的類 SubscriberMethodFinder,而且經過其方法 findSubscriberMethods() 來進行查找,這裏就不貼 findSubscriberMethods() 的代碼,而是看看其內部實際用於尋找訂閱方法的 findUsingReflection()。
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
......
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
......
}
for (Method method : methods) {
int modifiers = method.getModifiers();
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
......
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
......
}
}
}
複製代碼
總結一下,該方法主要就是尋找訂閱者類中的公有方法,且其參數惟一的以及含有註解聲明@Subscribe 的方法。就是咱們在寫代碼時所定義的訂閱者方法了。找到訂閱者方法後,會將其 method(方法的反射類 Method) 、event、thread mode 以及優先級等封裝成一個 SubscribeMethod 而後添加到 FindState 中。
這個 FindState 是 SubscriberMethodFinder 的一個內部類。其用了大小隻有 4 的 FIND_STATE_POOL 來進行管理,這樣避免了 FindState 對象的重複建立。
最後在 find 中會將所找到訂閱者方法添加到 「METHOD_CACHE」 中,這是一個 ConcurrentHashMap 的結構。
Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();
複製代碼
方法subscribe()
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
......
}
}
// 按優先級進行插入
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
// 粘性事件,註冊就會發送
if (subscriberMethod.sticky) {
.....
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
複製代碼
subscribe() 方法完成事件與 subscribe 、 subscribeMethod 的關聯,具體怎麼關聯的呢,請看圖。
這裏分別用了 2 個 HashMap 來描述。subscriptionsByEventType 記錄了一個事件應該調用哪一個訂閱者的訂閱方法來處理。而 typesBySubscriber 記錄了訂閱者訂閱了哪些事件。
至此,訂閱者的註冊完成以後,也就至關因而 EventBus 的初始化完成了,那麼接下來就能夠愉快發送事件了。
public void post(Object event) {
// 獲取發送者線程的狀態管理器
PostingThreadState postingState = currentPostingThreadState.get();
// 從狀態管理器中取出事件隊列,並將事件添加到隊列中
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
// 若是當前不是發送狀態,那就進入發送狀態
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
......
}
try {
// 在發送狀態下,經過遍歷事件隊列逐個發送事件
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
複製代碼
post() 方法的主要過程都寫在代碼註釋裏了。這裏主要關注下 currentPostingThreadState,它的定義以下:
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
複製代碼
而 PostingThreadState 的定義以下。
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<>();
boolean isPosting;
boolean isMainThread;
Subscription subscription;
Object event;
boolean canceled;
}
複製代碼
這裏用了 ThreadLocal 來保存 PostingThreadState。ThreadLocal 的主要做用是使得所定義的資源是線程私有的。那麼,也就是說,對於每個發送事件的線程其都有一個惟一的 PostingThreadState 來記錄事件發送的隊列以及狀態。
下圖是 ThreadLocal 在 Thread 中的數據結構描述,而這裏的 PostingThreadState 就是下圖中的 Value。
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
......
複製代碼
postSingleEvent 所作的事情主要就是根據要發送的事情收集事件,而後逐個發送。收集的什麼事情呢,很簡單,就是收集它的父類事件。也就是說,當咱們發送一個事件時,它的全部父類事件也同時會被髮送,這裏的父類還包括了 Object 在內。因此,這裏須要開發者們很是注意了:
事件是具備向上傳遞性質的
接下來的 postSingleEventForEventType 就是從 subscriptionsByEventType 找出事件所訂閱的訂閱者以及訂閱者方法,即 CopyOnWriteArrayList。 而後再經過方法 postToSubscription() 將事件逐個逐個向訂閱者進行發送。在 postToSubscription() 方法中會根據不一樣的 ThreadMode 來決定不一樣的線程調度策略,具體在下面講。而無論採用何種策略,最終都將會經過invokeSubscriber() 進行方法的反射調用來進行訂閱方法的調用。
void invokeSubscriber(Subscription subscription, Object event) {
......
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
......
}
複製代碼
至此,事件的發送就完成了。很簡單,其真實的面目就是一個方法的反射調用。而對於事件的發送,這裏還應該關注一下線程的調度策略。
上面說到線程的調度策略在 postToSubscription() 方法中,那麼就來看一看這個方法。
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
複製代碼
針對每一個不一樣的策略,下面來簡單看一看。 POSTING 發送者線程直接發送,也是 ThreadMode 的默認值。 MAIN 若是發送者是主線程,則直接發送。若是不是則交給 mainThreadPoster 來發送。mainThreadPoster 是 HandlerPoster 類型,該類繼承了 Handler 並實現了 Poster 接口,其實例是由 AndroidHandlerMainThreadSupport 所建立的。AndroidHandlerMainThreadSupport 包含了主線程的 Looper ,而 HandlerPoster 包含了一個鏈表結構的隊列 PendingPostQueue,事件也將插入到 PendingPostQueue 中。每個事件都會以一個空 Message 發送到主線程的消息隊列,消息處理完再取出下一個事件,一樣以 Message 的形式來執行,如此反覆執行。直到隊列爲空。 MAIN_ORDERED 通常來講也是經過 mainThreadPoster 來發送,異常狀況下才經過發送者線程來發送。從代碼側來看,它主要就是屬於 MAIN 策略的第二種狀況。 BACKGROUND 若是當前爲主線程,則將其投遞到 backgroundPoster,不然直接發送。這裏的 backgroundPoster 就是一個子線程。 ASYNC 異步模式下,最終就是經過 ExecutorService ,也即線程池來發送的。asyncPoster 即 AsyncPoster,其關於線程調度的關鍵代碼爲:
eventBus.getExecutorService().execute(this);
複製代碼
註銷就比較簡單了,以下代碼,主要就是從 typesBySubscriber 這個 map 中將訂閱者移除掉,而且解決訂閱者和事件的關聯。
public synchronized void unregister(Object subscriber) {
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
typesBySubscriber.remove(subscriber);
} else {
logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}
複製代碼
從整個分析過程來看,EventBus 是一個比較簡單的框架,其涉及到的核心知識是 ThreadLocal 以及方法的反射調用,而基於此爲咱們封裝了一套完整的事件 分發-傳遞-響應 機制,固然也是一個很是優秀的框架。總結一下,其主要的特色:
最後,感謝你能讀到並讀完此文章,若是分析的過程當中存在錯誤或者疑問都歡迎留言討論。若是個人分享可以幫助到你,還請記得幫忙點個贊吧,謝謝。