傳統上,Java的進程內事件分發都是經過發佈者和訂閱者之間的顯式註冊實現的。設計EventBus就是爲了取代這種顯示註冊方式,使組件間有了更好的解耦。EventBus不是通用型的發佈-訂閱實現,不適用於進程間通訊。html
架構圖以下:git
。github
步驟以下:json
1.構造一個事件總線
2.構造一個事件監聽器
3.把事件監聽器註冊到事件總線上
4.事件總線發佈事件,觸發監聽器方法
主測試類以下:數組
1 package guava.eventbus; 2 3 import com.google.common.eventbus.EventBus; 4 5 /** 6 * @Description 主測試類 7 * @author denny 8 * @date 2018/7/18 上午9:54 9 */ 10 public class MainTest { 11 12 13 public static void main(String[] args) { 14 // 1.構造一個事件總線 15 EventBus eventBus = new EventBus("test"); 16 17 // 2.構造一個事件監聽器 18 EventListener listener = new EventListener(); 19 20 // 3.把事件監聽器註冊到事件總線上 21 eventBus.register(listener); 22 23 // 4.事件總線發佈事件,觸發監聽器方法 24 eventBus.post(new TestEvent1(1)); 25 eventBus.post(new TestEvent2(2)); 26 // 事件3是事件2子類,雖然監聽器只訂閱了父類事件2,同樣能夠監聽到子類 27 eventBus.post(new TestEvent3(3)); 28 // 未被訂閱的事件,用DeadEvent可訂閱 29 eventBus.post(new TestEvent4(4)); 30 } 31 }
事件監聽器以下:緩存
1 package guava.eventbus; 2 3 import com.alibaba.fastjson.JSON; 4 import com.google.common.eventbus.DeadEvent; 5 import com.google.common.eventbus.Subscribe; 6 7 /** 8 * @Description 事件監聽器 9 * @author denny 10 * @date 2018/7/18 上午9:53 11 */ 12 public class EventListener { 13 14 private int message = 0; 15 16 /** 17 * @Description 訂閱事件1 18 * @param event 事件1 19 * @return void 20 * @author denny 21 * @date 2018/7/18 上午9:46 22 */ 23 @Subscribe 24 public void onEvent1(TestEvent1 event) { 25 message = event.getMessage(); 26 System.out.println("EventListener onEvent1 監聽器接收到消息:"+message); 27 } 28 29 /** 30 * @Description 訂閱事件2 31 * @param event 事件2 32 * @return void 33 * @author denny 34 * @date 2018/7/18 上午9:59 35 */ 36 @Subscribe 37 public void onEvent2(TestEvent2 event) { 38 message = event.getMessage(); 39 System.out.println("EventListener onEvent2 監聽器接收到消息:"+message); 40 } 41 42 /** 43 * @Description 死亡事件(該事件沒有被訂閱會觸發) 44 * @param event 未訂閱事件 45 * @return void 46 * @author denny 47 * @date 2018/7/18 上午9:59 48 */ 49 @Subscribe 50 public void onDeadEvent(DeadEvent event) { 51 System.out.println("EventListener DeadEvent 有消費沒有被訂閱!!!!event="+ event.toString()); 52 } 53 }
事件類:安全
1 package guava.eventbus; 2 3 /** 4 * @Description 事件1 5 * @author denny 6 * @date 2018/7/18 上午9:54 7 */ 8 public class TestEvent1 { 9 10 private final int message; 11 12 /** 13 * 構造方法 14 * @param message 15 */ 16 public TestEvent1(int message) { 17 this.message = message; 18 System.out.println("TestEvent1 事件message:"+message); 19 } 20 21 public int getMessage() { 22 return message; 23 } 24 } 25 26 27 /** 28 * @Description 事件2 29 * @author denny 30 * @date 2018/7/18 上午9:54 31 */ 32 public class TestEvent2 { 33 34 private final int message; 35 36 /** 37 * 構造方法 38 * @param message 39 */ 40 public TestEvent2(int message) { 41 this.message = message; 42 System.out.println("TestEvent2 事件message:"+message); 43 } 44 45 public int getMessage() { 46 return message; 47 } 48 } 49 50 /** 51 * @Description 事件3 52 * @author denny 53 * @date 2018/7/18 上午9:54 54 */ 55 public class TestEvent3 extends TestEvent2{ 56 57 private final int message; 58 59 /** 60 * 構造方法 61 * @param message 62 */ 63 public TestEvent3(int message) { 64 super(message); 65 this.message = message; 66 System.out.println("TestEvent2 事件message:"+message); 67 } 68 69 @Override 70 public int getMessage() { 71 return message; 72 } 73 } 74 75 /** 76 * @Description 事件4 77 * @author denny 78 * @date 2018/7/18 上午9:54 79 */ 80 public class TestEvent4 { 81 82 private final int message; 83 84 /** 85 * 構造方法 86 * @param message 87 */ 88 public TestEvent4(int message) { 89 this.message = message; 90 System.out.println("TestEvent4 事件message:"+message); 91 } 92 93 public int getMessage() { 94 return message; 95 } 96 }
運行結果以下:架構
TestEvent1 事件message:1 EventListener onEvent1 監聽器接收到消息:1 ---》觸發訂閱的事件1 TestEvent2 事件message:2 EventListener onEvent2 監聽器接收到消息:2---》觸發訂閱的事件2(一個監聽器能夠訂閱多個事件) TestEvent2 事件message:3 TestEvent2 事件message:3 EventListener onEvent2 監聽器接收到消息:3---》訂閱事件2,可觸發訂閱子類事件3 TestEvent4 事件message:4 EventListener DeadEvent 有消費沒有被訂閱!!!!event="DeadEvent{source=EventBus{test}, event=guava.eventbus.TestEvent4@19e1023e}"---》事件4沒有被訂閱,觸發DeadEvent死亡事件。
注意:併發
1.事件總線EventBus:異步
不提供單列,用戶本身看着用~
2.監聽器Listener:
1)監聽器使用@Subscribe標記的方法(參數爲自定義事件),便可實現事件的監聽。要監聽多個事件,就寫多個方法(每一個方法都用@Subscribe標記)便可。
2)注意必定要把Listener註冊到eventbus上。
源碼版本爲:guava-22.0.jar,先來回顧下第一節的樣例代碼:
1 public static void main(String[] args) { 2 // 1.構造一個事件總線 3 EventBus eventBus = new EventBus("test"); 4 5 // 2.構造一個事件監聽器 6 EventListener listener = new EventListener(); 7 8 // 3.把事件監聽器註冊到事件總線上 9 eventBus.register(listener); 10 11 // 4.事件總線發佈事件,觸發監聽器方法 12 eventBus.post(new TestEvent1(1)); 13 eventBus.post(new TestEvent2(2)); 14 // 事件3是事件2子類,雖然監聽器只訂閱了父類事件2,同樣能夠監聽到子類 15 eventBus.post(new TestEvent3(3)); 16 // deadEvent未被訂閱的事件,供用戶自行處理 17 eventBus.post(new TestEvent4(4)); 18 }
如上圖,雖然是google封裝的事件總線,可是依然是觀察者模式,那麼核心就是發佈、訂閱。下面就從這兩個方面來看一下源碼,看看有沒有值得借鑑的地方。
核心方法
register:把監聽器中申明的全部訂閱事件方法註冊到SubscriberRegistry(訂閱者註冊器)中。
post:發佈事件給全部已註冊過的訂閱者,最終開啓線程完成訂閱方法。
具體以下圖:
1 @Beta 2 public class EventBus { 6 private final String identifier;//事件總線標識:用於自定義標識這個事件總線 7 private final Executor executor;//默認的線程執行器,用於把事件轉發給訂閱者 10 private final SubscriberRegistry subscribers = new SubscriberRegistry(this);//訂閱註冊器 11 private final Dispatcher dispatcher;//事件轉發器 12 15 //構造器:使用默認字符串 16 public EventBus() { 17 this("default"); 18 } 19 //構造器:使用自定義字符串 26 public EventBus(String identifier) { 27 this( 28 identifier, 29 MoreExecutors.directExecutor(), 30 Dispatcher.perThreadDispatchQueue(), 31 LoggingHandler.INSTANCE); 32 } 58 93 //註冊監聽者中申明的全部訂閱方法(@Subscribe標記的),用以接收事件 97 public void register(Object object) { 98 subscribers.register(object); 99 } 100 // 解除訂閱 107 public void unregister(Object object) { 108 subscribers.unregister(object); 109 } 110 111 //發佈事件給全部已註冊過的訂閱者 121 public void post(Object event) {
// 找到事件的全部訂閱者 122 Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event); 123 if (eventSubscribers.hasNext()) {
// 事件轉發器,把事件轉發給訂閱者 124 dispatcher.dispatch(event, eventSubscribers); 125 } else if (!(event instanceof DeadEvent)) { 126 // 若是該事件即沒有訂閱者,也沒事DeadEvent,那麼封裝成DeadEvent並從新發布 127 post(new DeadEvent(this, event)); 128 }
...省略非重要方法167 }
核心方法:dispatchEvent使用executor線程執行器,單獨開啓線程執行訂閱方法。
1 class Subscriber { 2 3 /** 4 * 構造 5 */ 6 static Subscriber create(EventBus bus, Object listener, Method method) { 7 return isDeclaredThreadSafe(method) 8 ? new Subscriber(bus, listener, method) 9 : new SynchronizedSubscriber(bus, listener, method); 10 } 11 12 /** 訂閱者所屬的事件總線*/ 13 @Weak private EventBus bus; 14 15 /** 監聽器 listener*/ 16 @VisibleForTesting final Object target; 17 18 /** 訂閱者方法 */ 19 private final Method method; 20 21 /** 線程執行器,用來分發事件給訂閱者 */ 22 private final Executor executor;
23 /** 構造器:使用事件總線、監聽器、訂閱方法 */ 24 private Subscriber(EventBus bus, Object target, Method method) { 25 this.bus = bus; 26 this.target = checkNotNull(target); 27 this.method = method; 28 method.setAccessible(true); 29 30 this.executor = bus.executor(); 31 } 32 33 /** 34 * 使用executor線程執行器,執行訂閱方法*/ 36 final void dispatchEvent(final Object event) { 37 executor.execute( 38 new Runnable() { 39 @Override 40 public void run() { 41 try { 42 invokeSubscriberMethod(event); 43 } catch (InvocationTargetException e) { 44 bus.handleSubscriberException(e.getCause(), context(event)); 45 } 46 } 47 }); 48 } 49 50 /** 51 * 調用訂閱者方法*/ 54 @VisibleForTesting 55 void invokeSubscriberMethod(Object event) throws InvocationTargetException { 56 try { 57 method.invoke(target, checkNotNull(event)); 58 } catch (IllegalArgumentException e) { 59 throw new Error("Method rejected target/argument: " + event, e); 60 } catch (IllegalAccessException e) { 61 throw new Error("Method became inaccessible: " + event, e); 62 } catch (InvocationTargetException e) { 63 if (e.getCause() instanceof Error) { 64 throw (Error) e.getCause(); 65 } 66 throw e; 67 } 68 }
...
120 }
本節咱們瞭解了2個核心類EventBus(註冊監聽器、發佈事件)、Subscriber訂閱者(執行訂閱方法),下面咱們從源碼流程上來串連一遍。
咱們從註冊監聽器開始看,eventBus.register(listener); 以下圖所示:
1 public void register(Object object) { 2 subscribers.register(object); 3 } 4 5 /** 6 * 把listener中申明的全部訂閱方法都註冊 7 */ 8 void register(Object listener) {
// 獲取該監聽器類型對應的全部訂閱方法,key是事件類型,value是訂閱者集合 9 Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener); 10 // 遍歷map Map<K, Collection<V>> 11 for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) { 12 //key:事件類型
Class<?> eventType = entry.getKey();
//value:訂閱者集合 13 Collection<Subscriber> eventMethodsInListener = entry.getValue(); 14 //從subscribers併發map 中獲取事件對應的事件訂閱者set,subscribers:private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers = Maps.newConcurrentMap(); 15 CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType); 16 //不存在,就構造 17 if (eventSubscribers == null) { 18 CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>(); 19 eventSubscribers = 20 MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet); 21 } 22 //存在,把訂閱者集合添加進subscribers 23 eventSubscribers.addAll(eventMethodsInListener); 24 } 25 }
1.調用SubscriberRegistry的register(listener)來執行註冊監聽器。
2.register步驟以下:
EventBus-包含-》SubscriberRegistry-包含-》ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers 用以維護事件和訂閱者的映射
1). findAllSubscribers從緩存中獲取該監聽器類型對應的全部訂閱方法,key是event class,value是Subscriber集合
2).遍歷map,把訂閱者集合添加進SubscriberRegistry-》subscribers。
其中findAllSubscribers詳細代碼以下:
12 /** 13 * 返回全部該監聽器訂閱者,以事件分組 14 */ 15 private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) { 16 Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create(); 17 Class<?> clazz = listener.getClass();
//從緩存中獲取該監聽器類型對應的全部訂閱方法,遍歷塞進Multimap 18 for (Method method : getAnnotatedMethods(clazz)) { 19 Class<?>[] parameterTypes = method.getParameterTypes(); 20 Class<?> eventType = parameterTypes[0]; 21 methodsInListener.put(eventType, Subscriber.create(bus, listener, method)); 22 } 23 return methodsInListener; 24 }
如上圖,
1.方法getAnnotatedMethods:從緩存中取 listener中訂閱方法的不可變列表,
2.遍歷塞進Multimap:一個key,多個value,每次put進去,往Collection<V>中add(value)。
getAnnotatedMethods源碼以下:
1 private static ImmutableList<Method> getAnnotatedMethods(Class<?> clazz) { 2 return subscriberMethodsCache.getUnchecked(clazz); 3 } 4 5 private static final LoadingCache<Class<?>, ImmutableList<Method>> subscriberMethodsCache = 6 CacheBuilder.newBuilder() 7 .weakKeys() 8 .build( 9 new CacheLoader<Class<?>, ImmutableList<Method>>() { 10 @Override 11 public ImmutableList<Method> load(Class<?> concreteClass) throws Exception { 12 return getAnnotatedMethodsNotCached(concreteClass); 13 } 14 });
如上圖,咱們發現這裏用到了google cache來作緩存,關於google cache飛機票。
這個cache的源碼註釋翻譯以下:一個線程安全的緩存,包含從每一個類到該類中的全部方法和全部超類的映射,這些超類都用{@code @Subscribe}註釋。緩存是跨該類的全部實例共享的;若是建立了多個EventBus實例,而且在全部這些實例上註冊了同一個類的對象,這將大大提升性能。
下面來看看具體怎麼獲取的訂閱事件方法(監聽器@Subscribe註解的訂閱事件方法),核心方法getAnnotatedMethodsNotCached以下:
1 private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) {
//獲取超類class集合 2 Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes(); 3 Map<MethodIdentifier, Method> identifiers = Maps.newHashMap();
//遍歷超類 4 for (Class<?> supertype : supertypes) {
5 //遍歷超類中的全部定義的方法
for (Method method : supertype.getDeclaredMethods()) {
//若是方法上有@Subscribe註解 6 if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) { 7 // 方法的參數類型數組 8 Class<?>[] parameterTypes = method.getParameterTypes(); 9 // 校驗:事件訂閱方法必須只能有一個參數,即事件類
checkArgument( 10 parameterTypes.length == 1, 11 "Method %s has @Subscribe annotation but has %s parameters." 12 + "Subscriber methods must have exactly 1 parameter.", 13 method, 14 parameterTypes.length); 15 // 封裝方法定義對象 16 MethodIdentifier ident = new MethodIdentifier(method);
// 去重並添加進map 17 if (!identifiers.containsKey(ident)) { 18 identifiers.put(ident, method); 19 } 20 } 21 } 22 }
// map轉ImmutableList 23 return ImmutableList.copyOf(identifiers.values()); 24 }
eventBus.post(new TestEvent1(1));
調用事件轉發器Dispatcher,分發事件給訂閱者,
1 public void post(Object event) {
//從註冊器中獲取當前事件對應的訂閱者集合:eventBus-包含-》SubscriberRegistry-包含-》ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers 2 Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event); 3 if (eventSubscribers.hasNext()) { 4 dispatcher.dispatch(event, eventSubscribers); 5 } else if (!(event instanceof DeadEvent)) { 6 //若是該事件即沒有訂閱者,也沒事DeadEvent,那麼封裝成DeadEvent並從新發布
7 post(new DeadEvent(this, event));
8 }
9 }
Dispatcher是個抽象類,有多個內部類複寫不一樣dispatch方法。EventBus默認構造時使用PerThreadQueuedDispatcher,即每一個線程一個待轉發事件隊列。以下圖所示:
1 private static final class PerThreadQueuedDispatcher extends Dispatcher { 2 3 // This dispatcher matches the original dispatch behavior of EventBus. 4 5 /** 6 * 每一個線程待轉發事件隊列 7 */ 8 private final ThreadLocal<Queue<Event>> queue = 9 new ThreadLocal<Queue<Event>>() { 10 @Override 11 protected Queue<Event> initialValue() { 12 return Queues.newArrayDeque(); 13 } 14 }; 15 16 /** 17 * 每一個線程的轉發狀態,用於避免重入事件轉發,初始化狀態爲fasle,即不在轉發。 18 */ 19 private final ThreadLocal<Boolean> dispatching = 20 new ThreadLocal<Boolean>() { 21 @Override 22 protected Boolean initialValue() { 23 return false; 24 } 25 }; 26 27 @Override 28 void dispatch(Object event, Iterator<Subscriber> subscribers) { 29 checkNotNull(event); 30 checkNotNull(subscribers); 31 Queue<Event> queueForThread = queue.get();
// 事件添加進隊列 32 queueForThread.offer(new Event(event, subscribers)); 33 // 當前不在轉發中,開始轉發 34 if (!dispatching.get()) { 35 dispatching.set(true); 36 try { 37 Event nextEvent;
// 迭代從線程隊列中取事件 38 while ((nextEvent = queueForThread.poll()) != null) {
// 迭代事件的Iterator<Subscriber> subscribers,調用訂閱者轉發事件 39 while (nextEvent.subscribers.hasNext()) { 40 nextEvent.subscribers.next().dispatchEvent(nextEvent.event); 41 } 42 } 43 } finally { 44 dispatching.remove(); 45 queue.remove(); 46 } 47 } 48 } 49 50 private static final class Event { 51 private final Object event; 52 private final Iterator<Subscriber> subscribers; 53 54 private Event(Object event, Iterator<Subscriber> subscribers) { 55 this.event = event; 56 this.subscribers = subscribers; 57 } 58 } 59 }
可見核心方法在dispatchEvent,調用訂閱者轉發事件
1 final void dispatchEvent(final Object event) { 2 executor.execute( 3 new Runnable() { 4 @Override 5 public void run() { 6 try { 7 invokeSubscriberMethod(event); 8 } catch (InvocationTargetException e) { 9 bus.handleSubscriberException(e.getCause(), context(event)); 10 } 11 } 12 }); 13 } 14 15 /** 16 * 執行訂閱者方法18 */ 19 @VisibleForTesting 20 void invokeSubscriberMethod(Object event) throws InvocationTargetException { 21 try { 22 method.invoke(target, checkNotNull(event)); 23 } catch (IllegalArgumentException e) { 24 throw new Error("Method rejected target/argument: " + event, e); 25 } catch (IllegalAccessException e) { 26 throw new Error("Method became inaccessible: " + event, e); 27 } catch (InvocationTargetException e) { 28 if (e.getCause() instanceof Error) { 29 throw (Error) e.getCause(); 30 } 31 throw e; 32 } 33 }
本文咱們先快速瞭解Google EventBus整體架構,而後從一個簡單的應用入手知道如何使用,再深刻剖析源碼完全瞭解原理,並分析了有哪些值得借鑑的地方,最後咱們來看一下傳統觀察者模式和EventBus的區別:
監聽者管理 |
監聽特定事件 |
把監聽者註冊到生產者 |
按事件超類監聽 | 檢測沒有監聽者的事件 | 分發事件 | |
傳統觀察者模式 |
用列表管理監聽者,還要考慮線程同步;或者使用工具類 | 定義相應的事件監聽者類 | 調用事件生產者的register方法,開發者必須知道全部事件生產者的類型,才能正確地註冊監聽者 |
很困難,須要開發者本身去實現匹配邏輯 | 在每一個事件分發方法中添加邏輯代碼 | 開發者本身寫代碼,包括事件類型匹配、異常處理、異步分發 |
EventBus | 內部已經實現了監聽者管理 | 以自定義Event爲惟一參數建立方法, 並用Subscribe註解標記。 |
EventBus.register(Object) | EventBus自動把事件分發給事件超類的監聽者 | EventBus會把全部發布後沒有監聽者處理的事件包裝爲DeadEvent | EventBus.post(Object) 異步分發能夠直接用EventBus的子類AsyncEventBus |
==參考==
官方介紹https://github.com/google/guava/wiki/EventBusExplained