註解,能夠標註哪一個方法能夠被註冊和通知。它要求被註解的方法有且只有一個參數,而且該參數就是要註冊監聽的事件,例如:java
class EventBusChangeRecorder {
@Subscribe
public void recordCustomerChange(ChangeEvent e) {
recordChange(e.getChange());
}
}
複製代碼
註冊時,無須明確指定事件git
eventBus.register(new EventBusChangeRecorder());
複製代碼
被@Subscribe註解的方法,能夠被調用執行,至關於事件處理器github
private EventBus bus;
final Object target;
private final Method method;
private final Executor executor;
final void dispatchEvent(final Object event) {
this.executor.execute(new Runnable() {
public void run() {
try {
Subscriber.this.invokeSubscriberMethod(event);
} catch (InvocationTargetException var2) {
Subscriber.this.bus.handleSubscriberException(var2.getCause(), Subscriber.this.context(event));
}
}
});
}
複製代碼
target,事件註冊的實例編程
method,Java反射中的方法實例安全
dispatchEvent,對外暴露的調用(事件分發)方法併發
事件的註冊表類,主要提供了註冊方法,取消註冊方法框架
void register(Object listener) {
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
Class<?> eventType = entry.getKey();
Collection<Subscriber> eventMethodsInListener = entry.getValue();
//引入了CopyOnWriteArraySet解決集合的線程安全問題
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
if (eventSubscribers == null) {
CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
}
eventSubscribers.addAll(eventMethodsInListener);
}
}
複製代碼
void unregister(Object listener) {
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
Class<?> eventType = entry.getKey();
Collection<Subscriber> listenerMethodsForType = entry.getValue();
CopyOnWriteArraySet<Subscriber> currentSubscribers = subscribers.get(eventType);
if (currentSubscribers == null || !currentSubscribers.removeAll(listenerMethodsForType)) {
throw new IllegalArgumentException();
}
}
}
複製代碼
從代碼中咱們能夠看到,這兩個方法都實現了線程安全,經過使用CopyOnWriteArraySet巧妙解決了註冊/取消註冊時的線程安全問題。ide
CopyOnWriteArraySet,在寫入數據的時候,會建立一個新的 set,而且將原始數據 clone 到新的 set 中,在新的 set 中寫入數據完成以後,再用新的 set 替換老的 set。這樣就能保證在寫入數據的時候,不影響數據的讀取操做,以此來解決讀寫併發問題。post
後續有時間解析一下CopyOnWriteArraySet的源碼this
事件總線的組合類,組合了事件分發器(後面要提的Dispatcher)、事件註冊表等,統一對外提供註冊、分發等功能
private final String identifier;
private final Executor executor;
private final SubscriberExceptionHandler exceptionHandler;
private final SubscriberRegistry subscribers = new SubscriberRegistry(this);
private final Dispatcher dispatcher;
public void register(Object object) {
subscribers.register(object);
}
public void unregister(Object object) {
subscribers.unregister(object);
}
public void post(Object event) {
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
if (eventSubscribers.hasNext()) {
dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
post(new DeadEvent(this, event));
}
}
複製代碼
總結一下,以上幾個模塊構成了EventBus的主體框架,基於這個框架解決了咱們上一章節提到的
線程安全問題:經過引入線程安全集合CopyOnWriteArraySet
顯式註冊問題:經過定義註解類Subscribe,標記能夠被註冊的方法,而且將要監聽的Event做爲方法的惟一參數,再利用Java反射的特性,實現了隱式註冊
EventBus提供專門的事件分發器,而且爲事件的分發提供了兩種策略,一種是廣度優先,一種是深度優先。
private static final class PerThreadQueuedDispatcher extends Dispatcher {
private final ThreadLocal<Queue<Event>> queue;
private final ThreadLocal<Boolean> dispatching;
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
Queue<Event> queueForThread = queue.get();
queueForThread.offer(new Event(event, subscribers));
if (!dispatching.get()) {
dispatching.set(true);
try {
Event nextEvent;
while ((nextEvent = queueForThread.poll()) != null) {
while (nextEvent.subscribers.hasNext()) {
nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
}
}
} finally {
dispatching.remove();
queue.remove();
}
}
}
}
複製代碼
上面的分發器,在被同一個線程分發(同一個線程調用post)時,可以保證事件分發的有序性。同時也引入了Queue實現了廣度優先,下面咱們看一下另一個深度優先的實現
private static final class ImmediateDispatcher extends Dispatcher {
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
while (subscribers.hasNext()) {
subscribers.next().dispatchEvent(event);
}
}
}
複製代碼
上章節中咱們一樣提到了調用時異常處理的問題,guava一樣給處理解決辦法。guava定義了一個接口
public interface SubscriberExceptionHandler {
/** Handles exceptions thrown by subscribers. */
void handleException(Throwable exception, SubscriberExceptionContext context);
}
複製代碼
在調用出現異常時,回調這個接口(完整代碼請參考Subscriber的dispatchEvent方法)
try {
Subscriber.this.invokeSubscriberMethod(event);
} catch (InvocationTargetException var2) {
//出現異常,調用自定義異常處理
Subscriber.this.bus.handleSubscriberException(var2.getCause(), Subscriber.this.context(event));
}
複製代碼
同時在實例化EventBus時,可傳入自定義異常處理
public EventBus(SubscriberExceptionHandler exceptionHandler) {
this("default", MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), exceptionHandler);
}
複製代碼
以上,結合上章節提出的問題,對guava的EventBus作了分析,咱們能看到Google實現的Eventbus代碼很優雅,程序也很健壯,他們在設計的時候會考慮到不少方面,這對咱們本身編程以及代碼框架會有很多啓發。
有關EventBus的整個系列都寫完了,在寫做的過程當中,我不斷回看guava的源碼,收穫甚多。建議你們也去讀一讀guava源碼,瞭解一下世界上頂級的Java開發者是如何寫代碼的。