EventBus是經過使用發佈者/訂閱者模式來實現解耦的Android和Java開源庫。在Android開發中一般使用EventBus實現Activities, Fragments, Threads, Services等組件之間的通訊。但EventBus不能實現跨進程間的通訊。java
UML類圖 android
源碼分析 EventBus.java數組
package de.greenrobot.event;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import android.os.Handler;
import android.os.Looper;
import android.os.Message;
import android.util.Log;
/**
*
Class based event bus, optimized for Android. By default, subscribers will handle events in methods named "onEvent".
*
* @author Markus Junginger, greenrobot
*/
public class EventBus {
/**
* Log tag, apps may override it.
*/
public static String TAG = "Event";
private static final EventBus defaultInstance = new EventBus();//默認實例
public enum ThreadMode {
/**
* Subscriber will be called in the same thread, which is posting the event.
*/
PostThread,//發佈事件所在線程訂閱
/**
* Subscriber will be called in Android's main thread (sometimes referred to as UI thread). */ MainThread,//主線程訂閱 /* BackgroundThread */ } //表示某個類中同一個方法名的全部重載方法。(也就是一個類中全部的訂閱方法),緩存,目的:提升性能 private static final Map<String, List<Method>> methodCache = new HashMap<String, List<Method>>(); //保存事件類型的全部的事件(包括父類和接口),懶加載的,緩存,目的:了提升性能 private static final Map<Class<?>, List<Class<?>>> eventTypesCache = new HashMap<Class<?>, List<Class<?>>>(); //一個事件類型的全部訂閱者:發送事件時使用到(時間複雜度爲:O(1)) //用CopyOnWriteArrayList是爲了讀取是線程安全的。 private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType; //一個訂閱對象的全部訂閱事件: // 訂閱(register)時是加入 // 取消訂閱(unregister)時使用:獲取當前訂閱對象的全部訂閱事件,而後在訂閱事件中將這個訂閱者刪除。 private final Map<Object, List<Class<?>>> typesBySubscriber; //每一個線程的事件隊列 private final ThreadLocal<List<Object>> currentThreadEventQueue = new ThreadLocal<List<Object>>() { @Override protected List<Object> initialValue() { return new ArrayList<Object>(); } }; //每一個線程的隊列是否在運轉中 private final ThreadLocal<BooleanWrapper> currentThreadIsPosting = new ThreadLocal<BooleanWrapper>() { @Override protected BooleanWrapper initialValue() { return new BooleanWrapper(); } }; private String defaultMethodName = "onEvent"; private PostViaHandler mainThreadPoster; public static EventBus getDefault() { return defaultInstance; } public EventBus() { subscriptionsByEventType = new HashMap<Class<?>, CopyOnWriteArrayList<Subscription>>(); typesBySubscriber = new HashMap<Object, List<Class<?>>>(); mainThreadPoster = new PostViaHandler(Looper.getMainLooper()); } public void register(Object subscriber) {//註冊 register(subscriber, defaultMethodName, ThreadMode.PostThread);//默認方法名,即onEvent, } public void registerForMainThread(Object subscriber) { register(subscriber, defaultMethodName, ThreadMode.MainThread); } //這裏應該加同步synchronized public void register(Object subscriber, String methodName, ThreadMode threadMode) {//傳入訂閱者,遍歷查找其訂閱的全部事件 List<Method> subscriberMethods = findSubscriberMethods(subscriber.getClass(), methodName); for (Method method : subscriberMethods) { Class<?> eventType = method.getParameterTypes()[0];//參數的類型 //查找的時候是根據參數的類型來肯定訂閱者的 subscribe(subscriber, method, eventType, threadMode); } } //若是當前類含有父類,是否查找父類中的全部含指定名稱的方法 private List<Method> findSubscriberMethods(Class<?> subscriberClass, String methodName) {//查找某個類中全部指定名稱的方法 String key = subscriberClass.getName() + '.' + methodName; //類名+方法名爲Key //key相同的同時進入,會出現兩次建立 List<Method> subscriberMethods; synchronized (methodCache) { subscriberMethods = methodCache.get(key);//方法名稱同樣的存在多個重載方法 } if (subscriberMethods != null) { return subscriberMethods; } //同時進入 subscriberMethods = new ArrayList<Method>(); Class<?> clazz = subscriberClass; HashSet<Class<?>> eventTypesFound = new HashSet<Class<?>>();// 同一個類參數相同的方法只加入一次(就是重寫的方法) while (clazz != null) { String name = clazz.getName(); if (name.startsWith("java.") || name.startsWith("javax.") || name.startsWith("android.")) {//過濾掉系統類 // Skip system classes, this just degrades performance break; } Method[] methods = clazz.getDeclaredMethods(); for (Method method : methods) { if (method.getName().equals(methodName)) { Class<?>[] parameterTypes = method.getParameterTypes(); if (parameterTypes.length == 1) {//參數爲1個 if (eventTypesFound.add(parameterTypes[0])) { // Only add if not already found in a sub class subscriberMethods.add(method); } } } } clazz = clazz.getSuperclass();//查找父類,也就是父類方法也會被調用 } if (subscriberMethods.isEmpty()) {//沒有訂閱方法,拋出異常 throw new RuntimeException("Subscriber " + subscriberClass + " has no methods called " + methodName); } else { synchronized (methodCache) { methodCache.put(key, subscriberMethods); } return subscriberMethods; } } public void register(Object subscriber, Class<?> eventType, Class<?>... moreEventTypes) {//指定事件類型 register(subscriber, defaultMethodName, ThreadMode.PostThread, eventType, moreEventTypes); } public void registerForMainThread(Object subscriber, Class<?> eventType, Class<?>... moreEventTypes) {//指定事件類型註冊 register(subscriber, defaultMethodName, ThreadMode.MainThread, eventType, moreEventTypes); } public synchronized void register(Object subscriber, String methodName, ThreadMode threadMode, Class<?> eventType, Class<?>... moreEventTypes) { Class<?> subscriberClass = subscriber.getClass(); Method method = findSubscriberMethod(subscriberClass, methodName, eventType); subscribe(subscriber, method, eventType, threadMode); for (Class<?> anothereventType : moreEventTypes) { method = findSubscriberMethod(subscriberClass, methodName, anothereventType); subscribe(subscriber, method, anothereventType, threadMode); } } //應該在對象lock中調用 private void subscribe(Object subscriber, Method subscriberMethod, Class<?> eventType, ThreadMode threadMode) { //調用方:register(Object subscriber, String methodName, ThreadMode threadMode)沒有加鎖 //併發的問題:多個線程同時進入,不一樣頁面都含有同一事件 CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions == null) { subscriptions = new CopyOnWriteArrayList<Subscription>(); subscriptionsByEventType.put(eventType, subscriptions);//事件類型爲key,訂閱者爲value,一對多 } else { for (Subscription subscription : subscriptions) {//同一事件的訂閱者,屢次設置 if (subscription.subscriber == subscriber) { throw new RuntimeException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType); } } } subscriberMethod.setAccessible(true); Subscription subscription = new Subscription(subscriber, subscriberMethod, threadMode); subscriptions.add(subscription); List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);//訂閱者爲key,訂閱的事件爲value,一對多 if (subscribedEvents == null) { subscribedEvents = new ArrayList<Class<?>>(); typesBySubscriber.put(subscriber, subscribedEvents); } subscribedEvents.add(eventType); } /** * Class.getMethod is slow on Android 2.3 (and probably other versions), so use getDeclaredMethod and go up in the * class hierarchy if neccessary. */ private Method findSubscriberMethod(Class<?> subscriberClass, String methodName, Class<?> eventType) { Class<?> clazz = subscriberClass; while (clazz != null) { try { return clazz.getDeclaredMethod(methodName, eventType); } catch (NoSuchMethodException ex) { clazz = clazz.getSuperclass(); } } throw new RuntimeException("Method " + methodName + " not found in " + subscriberClass + " (must have single parameter of event type " + eventType + ")"); } /** * Unregisters the given subscriber for the given event classes. */ public synchronized void unregister(Object subscriber, Class<?>... eventTypes) { if (eventTypes.length == 0) { throw new IllegalArgumentException("Provide at least one event class"); } List<Class<?>> subscribedClasses = typesBySubscriber.get(subscriber); if (subscribedClasses != null) { for (Class<?> eventType : eventTypes) { unubscribeByEventType(subscriber, eventType); subscribedClasses.remove(eventType); } if (subscribedClasses.isEmpty()) { typesBySubscriber.remove(subscriber); } } else { Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass()); } } /** * Only updates subscriptionsByEventType, not typesBySubscriber! Caller must update typesBySubscriber. */ private void unubscribeByEventType(Object subscriber, Class<?> eventType) { List<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions != null) { int size = subscriptions.size(); for (int i = 0; i < size; i++) { if (subscriptions.get(i).subscriber == subscriber) { subscriptions.remove(i); i--; size--; } } } } /** * Unregisters the given subscriber from all event classes. */ public synchronized void unregister(Object subscriber) { List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);//刪除全部的訂閱事件 if (subscribedTypes != null) { for (Class<?> eventType : subscribedTypes) { unubscribeByEventType(subscriber, eventType);//刪除事件類型 } typesBySubscriber.remove(subscriber); } else { Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass()); } } //post(Object event) //post(Object event) /** * Posts the given event to the event bus. */ public void post(Object event) {//String Object List<Object> eventQueue = currentThreadEventQueue.get(); //每一個線程一個隊列,保證同一個線程發送事件的有序性 eventQueue.add(event); BooleanWrapper isPosting = currentThreadIsPosting.get(); if (isPosting.value) {//當前線程隊列正在運轉 return; } else {//沒有運轉開始運轉 isPosting.value = true; try { while (!eventQueue.isEmpty()) { postSingleEvent(eventQueue.remove(0)); //開始發送事件 } } finally { isPosting.value = false; } } } private void postSingleEvent(Object event) throws Error { List<Class<?>> eventTypes = findEventTypes(event.getClass()); //找到全部的事件類型(須要給父事件和接口事件也發送) boolean subscriptionFound = false; int countTypes = eventTypes.size(); for (int h = 0; h < countTypes; h++) { Class<?> clazz = eventTypes.get(h); CopyOnWriteArrayList<Subscription> subscriptions; synchronized (this) {//只能保證Map的原子性 subscriptions = subscriptionsByEventType.get(clazz); } if (subscriptions != null) { for (Subscription subscription : subscriptions) {//由於是CopyOnWriteArrayList,因此寫入時讀取是線程安全的 if (subscription.threadMode == ThreadMode.PostThread) {//在發送線程中執行 postToSubscribtion(subscription, event); } else if (subscription.threadMode == ThreadMode.MainThread) {//在主線程中執行 mainThreadPoster.enqueue(event, subscription);//在主線程中執行 } else { throw new IllegalStateException("Unknown thread mode: " + subscription.threadMode); } } subscriptionFound = true; } } if (!subscriptionFound) { Log.d(TAG, "No subscripers registered for event " + event.getClass()); } } //查找全部的事件類型:當前類型和全部父類型,全部接口,包括父類型 // /** * Finds all Class objects including super classes and interfaces. */ private List<Class<?>> findEventTypes(Class<?> eventClass) { synchronized (eventTypesCache) { List<Class<?>> eventTypes = eventTypesCache.get(eventClass); //緩存中存在,直接返回 if (eventTypes == null) { eventTypes = new ArrayList<Class<?>>(); Class<?> clazz = eventClass; while (clazz != null) {//事件類型,父事件 eventTypes.add(clazz); //自己是一個事件類型 addInterfaces(eventTypes, clazz.getInterfaces()); clazz = clazz.getSuperclass(); } eventTypesCache.put(eventClass, eventTypes); } return eventTypes; } } /** * Recurses through super interfaces. */ static void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) { for (Class<?> interfaceClass : interfaces) { if (!eventTypes.contains(interfaceClass)) { eventTypes.add(interfaceClass); addInterfaces(eventTypes, interfaceClass.getInterfaces()); } } } static void postToSubscribtion(Subscription subscription, Object event) throws Error { try { subscription.method.invoke(subscription.subscriber, event); } catch (InvocationTargetException e) { Throwable cause = e.getCause(); Log.e(TAG, "Could not dispatch event: " + event.getClass() + " to subscribing class " + subscription.subscriber.getClass(), cause); if (cause instanceof Error) { throw (Error) cause; } } catch (IllegalAccessException e) { throw new IllegalStateException("Unexpected exception", e); } } final static class Subscription { final Object subscriber; final Method method; final ThreadMode threadMode; Subscription(Object subscriber, Method method, ThreadMode threadMode) { this.subscriber = subscriber; this.method = method; this.threadMode = threadMode; } @Override public boolean equals(Object other) { if (other instanceof Subscription) { Subscription otherSubscription = (Subscription) other; // Super slow (improve once used): http://code.google.com/p/android/issues/detail?id=7811 return subscriber == otherSubscription.subscriber && method.equals(otherSubscription.method); } else { return false; } } @Override public int hashCode() { // Check performance once used return subscriber.hashCode() + method.hashCode(); } } /** * For ThreadLocal, much faster to set than storing a new Boolean. */ final static class BooleanWrapper {//提升性能,使用Boolean,修改值時須要每次set boolean value; } final static class PostViaHandler extends Handler {//主線程發送隊列 PostViaHandler(Looper looper) { super(looper); } void enqueue(Object event, Subscription subscription) { PendingPost pendingPost = PendingPost.obtainPendingPost(event, subscription); //待發送對象 Message message = obtainMessage(); message.obj = pendingPost; if (!sendMessage(message)) {//發送失敗:usually because the looper processing the message queue is exiting. throw new RuntimeException("Could not send handler message"); } } @Override public void handleMessage(Message msg) { PendingPost pendingPost = (PendingPost) msg.obj; Object event = pendingPost.event; Subscription subscription = pendingPost.subscription; PendingPost.releasePendingPost(pendingPost); postToSubscribtion(subscription, event); } } } 複製代碼
PendingPost.java緩存
package de.greenrobot.event;
import java.util.ArrayList;
import java.util.List;
import de.greenrobot.event.EventBus.Subscription;
final class PendingPost {//使用了享元模式
private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
Object event;
Subscription subscription;
private PendingPost(Object event, Subscription subscription) {
this.event = event;
this.subscription = subscription;
}
static PendingPost obtainPendingPost(Object event, Subscription subscription) {
synchronized (pendingPostPool) {
int size = pendingPostPool.size();
if (size > 0) {
PendingPost pendingPost = pendingPostPool.remove(size - 1);
pendingPost.event = event;
pendingPost.subscription = subscription;
return pendingPost;
}
}
return new PendingPost(event, subscription);
}
static void releasePendingPost(PendingPost pendingPost) {
synchronized (pendingPostPool) {
pendingPostPool.add(pendingPost);
}
}
}
複製代碼
優勢安全
1.在做者的註釋中(Class based event bus[指Google guava庫中的 event bus], optimized for Android. By default, subscribers will handle events in methods named "onEvent".)能夠看到,爲了優化性能,棄用了註解,使用固定或指定訂閱方法的方式,性能優化
2.輕巧,不依賴Context。一些處理細節上也作了相應的優化,如:使用BooleanWrapper代替Boolean,這樣就不須要每次更改值都調用ThreadLocal的set方法;PendingPost的建立採用了享元模式提升了獲取其實例的性能,不須要頻繁建立和銷燬;利用CopyOnWriteArrayList在寫入安全的狀況下提升了讀取性能。bash
3.能指定在主線程中訂閱事件。由於一般狀況下,咱們會在非UI線程中發送事件來更新UI,而更新UI須要在主線程中執行。併發
不足app
1.爲了性能最求性能的極致,失去了訂閱方法的靈活性,只能指定onEvent方法才能監聽,或者註冊時調用指定方法名,沒有使用註解那麼靈活。ide
2.訂閱者接收事件的線程不能根據訂閱事件設置。由於常常咱們在一個類中會訂閱多個事件,而每一個事件基本上是沒有相關性的,在哪一個線程上訂閱是根據事件自己來決定的。
三、強制使用事件的傳遞性,如:咱們發送某個事件,那麼這個事件全部的父類及接口都會被強制觸發,最好有設置開關,讓用戶決定。
解讀
public void post(Object event) {//String Object
List<Object> eventQueue = currentThreadEventQueue.get(); //每一個線程一個隊列,保證同一個線程發送事件的有序性
eventQueue.add(event);
BooleanWrapper isPosting = currentThreadIsPosting.get();
if (isPosting.value) {//當前線程隊列正在運轉
//在同一個線程中何時會走到這裏呢?若是走不到這裏那麼這個隊列就沒有意義
return;
} else {//沒有運轉開始運轉
isPosting.value = true;
try {
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0)); //開始發送事件
}
} finally {
isPosting.value = false;
}
}
}
若是咱們能找到隊列正在運轉的狀況下,那麼隊列就有意義,能保證事件處理的有序性。
舉例:
public class RefreshEvent {}
public class Activity1 extends AppCompatActivity{
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
EventBus.getDefault().register(this);
}
@Override
protected void onDestroy() {
EventBus.getDefault().unregister(this);
super.onDestroy();
}
public void onEvent(TestEvent event) {
EventBus.getDefault().post(new RefreshEvent());
}
}
public class ActivityB extends AppCompatActivity{
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
EventBus.getDefault().register(this);
}
@Override
protected void onDestroy() {
EventBus.getDefault().unregister(this);
super.onDestroy();
}
public void onEvent(TestEvent event) {
}
}
咱們在後臺業務方法中發送了刷新事件:
EventBus.getDefault().post(new RefreshEvent());
上面的例子中:一、一個事件被多個訂閱者訂閱;二、訂閱者收到事件後再次發送相同事件。
在這種狀況下,若是咱們不使用隊列,那麼在同一個線程中,先發送的事件訂閱者就可能會被後收到。
這裏使用ThreadLocal+隊列,就很巧妙地維護了在同一個線程中事件處理的有序性。
複製代碼
bug
public void register(Object subscriber, String methodName, ThreadMode threadMode) 方法存在線程安全的問題,除非調用者都加了鎖,但代碼中最經常使用的調用public void register(Object subscriber, String methodName, ThreadMode threadMode)卻沒有加鎖。
第一步,實現主邏輯:
EventBus.java 代碼
package org.hjb.eventbus;
import android.util.Log;
import androidx.annotation.NonNull;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
public class EventBus {
private static final String TAG = "EventBus";
private static volatile EventBus mDefaultEventBus;
private static final String DEFAULT_METHOD_NAME = "onEvent";
//key:訂閱事件
//value:訂閱者信息,訂閱者信息包括:訂閱對象,訂閱方法
private final HashMap<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType = new HashMap<>();
private ThreadLocal<ArrayList<Object>> currentThreadEventQueue = new ThreadLocal<ArrayList<Object>>() {
@Override
protected ArrayList<Object> initialValue() {
return new ArrayList<>();
}
};
private ThreadLocal<Boolean> currentThreadIsPosting = new ThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return Boolean.FALSE;
}
};
public EventBus() {
}
public static EventBus getDefault() {
if (mDefaultEventBus == null) {
synchronized (EventBus.class) {
if (mDefaultEventBus == null) {
mDefaultEventBus = new EventBus();
}
}
}
return mDefaultEventBus;
}
/**
* 訂閱
*
* @param subscriber 訂閱者
*/
public synchronized void register(@NonNull Object subscriber) {//訂閱
//保存當前訂閱者的全部訂閱事件
//由於查詢是經過訂閱事件來查找的,因此使用map來保存訂閱數據
//查找當前訂閱對象的全部訂閱方法
ArrayList<Method> methods = findSubscriberMethods(subscriber, DEFAULT_METHOD_NAME);
for (Method method : methods) {
//訂閱事件
Class<?> eventType = method.getParameterTypes()[0];
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else { //須要判斷是否重複添加(同一個事件已經加過訂閱者),拋出異常
for (Subscription subscription : subscriptions) {
if (subscription.subscriber == subscriber) {
throw new RuntimeException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType);
}
}
}
method.setAccessible(true);//防止私有方法不能訪問
subscriptions.add(new Subscription(subscriber, method));
}
}
/**
* 須要查找當前對象全部帶onEvent的重載方法,包括父類
* <p>
* 注意點:
* 一、覆蓋的方法不要重複添加
* 二、過濾掉系統類,java/javax/android開頭的
* 三、只查找單個參數的方法
* 四、私有方法也查找
*
* @param subscriber 訂閱者
* @return 全部訂閱對象
*/
private ArrayList<Method> findSubscriberMethods(Object subscriber, String methodName) {
ArrayList<Method> subscriberMethods = new ArrayList<>();
Class<?> clazz = subscriber.getClass();
while (clazz != null) {
//過濾系統類
String name = clazz.getName();
if (name.startsWith("java") || name.startsWith("javax") || name.startsWith("android")) {
break;
}
//查找目標方法
for (Method method : clazz.getDeclaredMethods()) {
if (method.getName().equals(methodName) && method.getParameterTypes().length == 1) {
int i = 0, size = subscriberMethods.size();
for (; i < size; i++) {
if (subscriberMethods.get(i).getParameterTypes()[0].equals(method.getParameterTypes()[0])) {
break;
}
}
if (i == size) {//表示不存在,加入
subscriberMethods.add(method);
}
}
}
clazz = clazz.getSuperclass();
}
return subscriberMethods;
}
/**
* 發送事件
* <p>
* 須要給全部訂閱者發送事件
* <p>
* 注意:咱們要發送的不只是當前對象的訂閱者,同時包括當前父類對象和接口對象的訂閱者。
* <p>
* 解決有有序性的問題:咱們須要保證同一個線程中
*
* @param event 事件
*/
public void post(Object event) {
List<Object> eventQueue = currentThreadEventQueue.get();
eventQueue.add(event);
Boolean isPosting = currentThreadIsPosting.get();
if (!isPosting.booleanValue()) {
currentThreadIsPosting.set(Boolean.TRUE);
while (!eventQueue.isEmpty()) {
postEvent(eventQueue.remove(0));
}
currentThreadIsPosting.set(Boolean.FALSE);
} else {
Log.i("hejunbin", "正在運轉");
}
}
/**
* 發送事件
*
* @param event 待發送事件
*/
private void postEvent(Object event) {
//一、找到當前對象的全部事件類型
ArrayList<Class<?>> eventTypes = findEventTypes(event.getClass());
//二、找到此事件全部的訂閱者
boolean subscriptionFound = false; //是否找到了訂閱者
for (Class<?> eventType : eventTypes) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventType);
}
if (subscriptions != null && subscriptions.size() > 0) {
subscriptionFound = true;
for (Subscription subscription : subscriptions) {
try {
subscription.method.invoke(subscription.subscriber, event);
} catch (Exception e) {
Log.e(TAG, "Could not dispatch event: " + event.getClass() + " to subscribing class " + subscription.subscriber.getClass(), e.getCause());
}
}
}
}
if (!subscriptionFound) {
Log.d(TAG, "No subscripers registered for event " + event.getClass());
}
}
/**
* 找到當前對象的全部事件類型(包括父類和接口)
*
* @param eventClass 事件類型
* @return 全部事件類型
*/
private ArrayList<Class<?>> findEventTypes(Class<?> eventClass) {
ArrayList<Class<?>> eventTypes = new ArrayList<>();
Class<?> clazz = eventClass;
while (clazz != null) {
eventTypes.add(clazz);
//接口自己自己存在層級
addInterfaces(eventTypes, clazz.getInterfaces());
clazz = clazz.getSuperclass();
}
return eventTypes;
}
private void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) {
for (Class<?> interfaceClass : interfaces) {
if (!eventTypes.contains(interfaceClass)) {
eventTypes.add(interfaceClass);
addInterfaces(eventTypes, interfaceClass.getInterfaces());
}
}
}
/**
* 取消訂閱
*
* @param subscriber 訂閱者
*/
public synchronized void unregister(Object subscriber) {//取消訂閱
for (Map.Entry<Class<?>, CopyOnWriteArrayList<Subscription>> entry : subscriptionsByEventType.entrySet()) {
int size = entry.getValue().size();
for (int i = 0; i < size; i++) {
if (entry.getValue().get(i).subscriber == subscriber) {
entry.getValue().remove(i);
i--;
size--;
}
}
}
}
/**
* 訂閱對象
*/
static class Subscription {
Object subscriber;
Method method;
public Subscription(Object subscriber, Method method) {
this.subscriber = subscriber;
this.method = method;
}
}
}
複製代碼
能夠在主線程中訂閱事件
在Android利用Handler的機制,將事件發送到主隊列中執行。
由於須要將訂閱信息(Subscription)和事件(event)都須要傳遞,而Handler中的Message只能傳遞單個Object,因此本身建立了PendingPost包裝類。
EventBus.java代碼
package org.hjb.eventbus;
import android.os.Handler;
import android.os.Looper;
import android.os.Message;
import android.util.Log;
import androidx.annotation.MainThread;
import androidx.annotation.NonNull;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
public class EventBus {
private static final String TAG = "EventBus";
private static volatile EventBus mDefaultEventBus;
private static final String DEFAULT_METHOD_NAME = "onEvent";
public enum ThreadMode {
PostThread, //post所在線程
MainThread,//主線程
}
//key:訂閱事件
//value:訂閱者信息,訂閱者信息包括:訂閱對象,訂閱方法
private final HashMap<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType = new HashMap<>();
private ThreadLocal<ArrayList<Object>> currentThreadEventQueue = new ThreadLocal<ArrayList<Object>>() {
@Override
protected ArrayList<Object> initialValue() {
return new ArrayList<>();
}
};
private ThreadLocal<Boolean> currentThreadIsPosting = new ThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return Boolean.FALSE;
}
};
private MainThreadHandler mainThreadHandler;
public EventBus() {
mainThreadHandler = new MainThreadHandler();
}
public static EventBus getDefault() {
if (mDefaultEventBus == null) {
synchronized (EventBus.class) {
if (mDefaultEventBus == null) {
mDefaultEventBus = new EventBus();
}
}
}
return mDefaultEventBus;
}
/**
* 訂閱
*
* @param subscriber 訂閱者
*/
public synchronized void register(@NonNull Object subscriber) {//訂閱
subscribe(subscriber, ThreadMode.PostThread);
}
public synchronized void registerForMainThread(Object subscriber) {
subscribe(subscriber, ThreadMode.MainThread);
}
public synchronized void register(Object subscriber, ThreadMode mode) {
subscribe(subscriber, mode);
}
private void subscribe(Object subscriber, ThreadMode mode) {//須要在線程安全的方法中調用
//保存當前訂閱者的全部訂閱事件
//由於查詢是經過訂閱事件來查找的,因此使用map來保存訂閱數據
//查找當前訂閱對象的全部訂閱方法
ArrayList<Method> methods = findSubscriberMethods(subscriber, DEFAULT_METHOD_NAME);
for (Method method : methods) {
//訂閱事件
Class<?> eventType = method.getParameterTypes()[0];
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else { //須要判斷是否重複添加(同一個事件已經加過訂閱者),拋出異常
for (Subscription subscription : subscriptions) {
if (subscription.subscriber == subscriber) {
throw new RuntimeException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType);
}
}
}
method.setAccessible(true);//防止私有方法不能訪問
subscriptions.add(new Subscription(subscriber, method, mode));
}
}
/**
* 須要查找當前對象全部帶onEvent的重載方法,包括父類
* <p>
* 注意點:
* 一、覆蓋的方法不要重複添加
* 二、過濾掉系統類,java/javax/android開頭的
* 三、只查找單個參數的方法
* 四、私有方法也查找
*
* @param subscriber 訂閱者
* @return 全部訂閱對象
*/
private ArrayList<Method> findSubscriberMethods(Object subscriber, String methodName) {
ArrayList<Method> subscriberMethods = new ArrayList<>();
Class<?> clazz = subscriber.getClass();
while (clazz != null) {
//過濾系統類
String name = clazz.getName();
if (name.startsWith("java") || name.startsWith("javax") || name.startsWith("android")) {
break;
}
//查找目標方法
for (Method method : clazz.getDeclaredMethods()) {
if (method.getName().equals(methodName) && method.getParameterTypes().length == 1) {
int i = 0, size = subscriberMethods.size();
for (; i < size; i++) {
if (subscriberMethods.get(i).getParameterTypes()[0].equals(method.getParameterTypes()[0])) {
break;
}
}
if (i == size) {//表示不存在,加入
subscriberMethods.add(method);
}
}
}
clazz = clazz.getSuperclass();
}
return subscriberMethods;
}
/**
* 發送事件
* <p>
* 須要給全部訂閱者發送事件
* <p>
* 注意:咱們要發送的不只是當前對象的訂閱者,同時包括當前父類對象和接口對象的訂閱者。
* <p>
* 解決有有序性的問題:咱們須要保證同一個線程中
*
* @param event 事件
*/
public void post(Object event) {
List<Object> eventQueue = currentThreadEventQueue.get();
eventQueue.add(event);
Boolean isPosting = currentThreadIsPosting.get();
if (!isPosting.booleanValue()) {
currentThreadIsPosting.set(Boolean.TRUE);
while (!eventQueue.isEmpty()) {
postEvent(eventQueue.remove(0));
}
currentThreadIsPosting.set(Boolean.FALSE);
} else {
Log.i("hejunbin", "正在運轉");
}
}
/**
* 發送事件
*
* @param event 待發送事件
*/
private void postEvent(Object event) {
//一、找到當前對象的全部事件類型
ArrayList<Class<?>> eventTypes = findEventTypes(event.getClass());
//二、找到此事件全部的訂閱者
boolean subscriptionFound = false; //是否找到了訂閱者
for (Class<?> eventType : eventTypes) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventType);
}
if (subscriptions != null && subscriptions.size() > 0) {
subscriptionFound = true;
for (Subscription subscription : subscriptions) {
if (subscription.mode == ThreadMode.PostThread) {
postEvent(subscription, event);
} else if (subscription.mode == ThreadMode.MainThread) {//在主線程中發送
mainThreadHandler.enqueue(subscription, event);
} else {//非法
throw new IllegalStateException("Unknown thread mode: " + subscription.mode);
}
}
}
}
if (!subscriptionFound) {
Log.d(TAG, "No subscripers registered for event " + event.getClass());
}
}
private static void postEvent(Subscription subscription, Object event) {
try {
subscription.method.invoke(subscription.subscriber, event);
} catch (Exception e) {
Log.e(TAG, "Could not dispatch event: " + event.getClass() + " to subscribing class " + subscription.subscriber.getClass(), e.getCause());
}
}
/**
* 找到當前對象的全部事件類型(包括父類和接口)
*
* @param eventClass 事件類型
* @return 全部事件類型
*/
private ArrayList<Class<?>> findEventTypes(Class<?> eventClass) {
ArrayList<Class<?>> eventTypes = new ArrayList<>();
Class<?> clazz = eventClass;
while (clazz != null) {
eventTypes.add(clazz);
//接口自己自己存在層級
addInterfaces(eventTypes, clazz.getInterfaces());
clazz = clazz.getSuperclass();
}
return eventTypes;
}
private void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) {
for (Class<?> interfaceClass : interfaces) {
if (!eventTypes.contains(interfaceClass)) {
eventTypes.add(interfaceClass);
addInterfaces(eventTypes, interfaceClass.getInterfaces());
}
}
}
/**
* 取消訂閱
*
* @param subscriber 訂閱者
*/
public synchronized void unregister(Object subscriber) {//取消訂閱
for (Map.Entry<Class<?>, CopyOnWriteArrayList<Subscription>> entry : subscriptionsByEventType.entrySet()) {
int size = entry.getValue().size();
for (int i = 0; i < size; i++) {
if (entry.getValue().get(i).subscriber == subscriber) {
entry.getValue().remove(i);
i--;
size--;
}
}
}
}
/**
* 訂閱對象
*/
static class Subscription {
Object subscriber;
Method method;
ThreadMode mode;
public Subscription(Object subscriber, Method method, ThreadMode mode) {
this.subscriber = subscriber;
this.method = method;
this.mode = mode;
}
}
static class MainThreadHandler extends Handler {
MainThreadHandler() {
super(Looper.getMainLooper());
}
void enqueue(Subscription subscription, Object event) {
Message message = Message.obtain();
message.obj = new PendingPost(subscription, event);
//如今有兩個對象須要傳遞,但Message只能傳遞單個對象,而Java沒有元祖類型,因此須要本身建立對象或者放入數組中
if (!sendMessage(message)) {//發送失敗:usually because the looper processing the message queue is exiting.
throw new RuntimeException("Could not send handler message");
}
}
@Override
public void handleMessage(Message msg) {
PendingPost pendingPost = (PendingPost) msg.obj;
postEvent(pendingPost.subscription, pendingPost.event);
}
}
}
複製代碼
PendingPost.java代碼
package de.greenrobot.event;
import java.util.ArrayList;
import java.util.List;
import de.greenrobot.event.EventBus.Subscription;
final class PendingPost {
Object event;
Subscription subscription;
PendingPost(Object event, Subscription subscription) {
this.event = event;
this.subscription = subscription;
}
複製代碼
性能優化
findSubscriberMethods 每次都要循環遍歷查找,能夠第一次查找後緩存起來,下次直接從緩存中獲取。(咱們常常在進入頁面時register,在返回頁面時unregister,當第二次進入時就可使用緩存中獲取了)
實現代碼爲:
private static final Map<String, ArrayList<Method>> methodCache = new HashMap<>();
/**
* 須要查找當前對象全部帶onEvent的重載方法,包括父類
* <p>
* 注意點:
* 一、覆蓋的方法不要重複添加
* 二、過濾掉系統類,java/javax/android開頭的
* 三、只查找單個參數的方法
* 四、私有方法也查找
*
* @param subscriberClass 訂閱者類
* @return 全部訂閱對象
*/
private ArrayList<Method> findSubscriberMethods(Class<?> subscriberClass, String methodName) {
String key = subscriberClass.getName() + '.' + methodName; //類名+方法名爲Key
ArrayList<Method> subscriberMethods = methodCache.get(key);
if (subscriberMethods != null) {
return subscriberMethods;
}
synchronized (methodCache) {
subscriberMethods = methodCache.get(key);
if (subscriberMethods != null) {
return subscriberMethods;
}
subscriberMethods = new ArrayList<>();
Class<?> clazz = subscriberClass;
while (clazz != null) {
//過濾系統類
String name = clazz.getName();
if (name.startsWith("java") || name.startsWith("javax") || name.startsWith("android")) {
break;
}
//查找目標方法
for (Method method : clazz.getDeclaredMethods()) {
if (method.getName().equals(methodName) && method.getParameterTypes().length == 1) {
int i = 0, size = subscriberMethods.size();
for (; i < size; i++) {
if (subscriberMethods.get(i).getParameterTypes()[0].equals(method.getParameterTypes()[0])) {
break;
}
}
if (i == size) {//表示不存在,加入
subscriberMethods.add(method);
}
}
}
clazz = clazz.getSuperclass();
}
if (subscriberMethods.isEmpty()) {//沒有訂閱方法,拋出異常
throw new RuntimeException("Subscriber " + subscriberClass + " has no methods called " + methodName);
} else {
methodCache.put(key, subscriberMethods);
return subscriberMethods;
}
}
}
複製代碼
findEventTypes 每次發送事件都會調用,而內部實現也須要循環遍歷,也能夠經過緩存來提升性能。
修改代碼爲:
private static final Map<Class<?>, ArrayList<Class<?>>> eventTypesCache = new HashMap<>();
/**
* 找到當前對象的全部事件類型(包括父類和接口)
*
* @param eventClass 事件類型
* @return 全部事件類型
*/
private ArrayList<Class<?>> findEventTypes(Class<?> eventClass) {
ArrayList<Class<?>> eventTypes = eventTypesCache.get(eventClass);
if (eventTypes != null) {
return eventTypes;
}
synchronized (eventTypesCache) {
eventTypes = eventTypesCache.get(eventClass);
if (eventTypes != null) {
return eventTypes;
}
eventTypes = new ArrayList<>();
Class<?> clazz = eventClass;
while (clazz != null) {
eventTypes.add(clazz);
//接口自己自己存在層級
addInterfaces(eventTypes, clazz.getInterfaces());
clazz = clazz.getSuperclass();
}
eventTypesCache.put(eventClass, eventTypes);
return eventTypes;
}
}
複製代碼
unregister方法,無論有沒有訂閱過,都須要雙層遍歷,時間複雜度爲O(n2),能夠創建以subscriber爲key的哈希表,將時間複雜度降爲O(n)。
修改代碼爲:
//存儲一個訂閱對象的全部訂閱事件(key:訂閱對象,value:全部訂閱事件)
private final HashMap<Object, List<Class<?>>> eventTypesBySubscriber = new HashMap<>();
private void subscribe(Object subscriber, ThreadMode mode) {//須要在線程安全的方法中調用
//保存當前訂閱者的全部訂閱事件
//由於查詢是經過訂閱事件來查找的,因此使用map來保存訂閱數據
//查找當前訂閱對象的全部訂閱方法
ArrayList<Method> methods = findSubscriberMethods(subscriber.getClass(), DEFAULT_METHOD_NAME);
for (Method method : methods) {
//訂閱事件
Class<?> eventType = method.getParameterTypes()[0];
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else { //須要判斷是否重複添加(同一個事件已經加過訂閱者),拋出異常
for (Subscription subscription : subscriptions) {
if (subscription.subscriber == subscriber) {
throw new RuntimeException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType);
}
}
}
method.setAccessible(true);//防止私有方法不能訪問
subscriptions.add(new Subscription(subscriber, method, mode));
//訂閱對象的所訂閱的數據類型
List<Class<?>> subscribedEvents = eventTypesBySubscriber.get(subscriber);//訂閱者爲key,訂閱的事件爲value,一對多
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
eventTypesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
}
}
/**
* 取消訂閱
*
* @param subscriber 訂閱者
*/
public synchronized void unregister(Object subscriber) {//取消訂閱
List<Class<?>> eventTypes = eventTypesBySubscriber.get(subscriber);
if (eventTypes == null) {
Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass());
return;
}
for (Class<?> eventType : eventTypes) { //雖然這裏也是循環,可是這裏的只是單個訂閱者中的事件,而不是全局的事件的遍歷
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size =subscriptions.size();
for (int i = 0; i < size; i++) {
if (subscriptions.get(i).subscriber == subscriber) {
subscriptions.remove(i);
i--;
size--;
}
}
}
}
eventTypesBySubscriber.remove(subscriber);
}
複製代碼
PendingPost使用享元模式(重複利用對象)
修改代碼爲:
package org.hjb.eventbus;
import java.util.ArrayList;
import java.util.List;
import org.hjb.eventbus.EventBus.Subscription;
public class PendingPost { //使用了享元模式
private static final List<PendingPost> pendingPostPool = new ArrayList<>(0);
Subscription subscription;
Object event;
private PendingPost(EventBus.Subscription subscription, Object event) {
this.subscription = subscription;
this.event = event;
}
static PendingPost obtain(Subscription subscription, Object event) {
synchronized (pendingPostPool) {//看池子裏是否有對象,若是有,使用池子裏但對象
// if (!pendingPostPool.isEmpty()) {
// PendingPost pendingPost = pendingPostPool.remove(0);
// pendingPost.subscription = subscription;
// pendingPost.event = event;
// return pendingPost;
// }
//上面註釋的代碼使用第一個元素,應該使用最後一個元素,
//由於使用的是數組存儲,若是remove第一個的話,那麼每次取出的操做都涉及到數組的搬移操做時間複雜度爲O(n)
//若是刪除最後一個元素,不涉及搬移操做時間複雜度爲O(1)
//因此應該寫成
int size = pendingPostPool.size();
if (size > 0) {
PendingPost pendingPost = pendingPostPool.remove(size - 1);
pendingPost.subscription = subscription;
pendingPost.event = event;
return pendingPost;
}
}
return new PendingPost(subscription, event);//沒有,建立
}
void recycle() {
synchronized (pendingPostPool) {
pendingPostPool.add(this);
}
}
}
複製代碼
使用時也須要修改,代碼爲:
static class MainThreadHandler extends Handler {
MainThreadHandler() {
super(Looper.getMainLooper());
}
void enqueue(Subscription subscription, Object event) {
Message message = Message.obtain();
//如今有兩個對象須要傳遞,但Message只能傳遞單個對象,而Java沒有元祖類型,因此須要本身建立對象或者放入數組中
message.obj = PendingPost.obtain(subscription, event);
if (!sendMessage(message)) {//發送失敗:usually because the looper processing the message queue is exiting.
throw new RuntimeException("Could not send handler message");
}
}
@Override
public void handleMessage(Message msg) {
PendingPost pendingPost = (PendingPost) msg.obj;
postEvent(pendingPost.subscription, pendingPost.event);
pendingPost.recycle();//回收
}
}
複製代碼
咱們能夠看出,1.0.1版本仍是有些bug和不足的,後面我將分析2.x和3.x版本。