摘要: 消息訂閱與分發機制在系統開發中有很是多的使用場景,有進程內的實現(觀察者模式實現,Event, Listener),也系統間的實現(例如 各類 message queue)。Guava提供了一個進程內很是輕量級的實現 EventBus,能夠很好的實現模塊之間的解耦。而且提供了同步和異步的實現版本。java
EventBus 看到這個名字,相信你已經大概明白是個什麼東東了。
Event就是事件,Bus 這裏不是巴士汽車的意思,在計算機領域通常翻譯爲總線。那麼EventBus就是裝載Event,而後作分發的。若作過java swing開發,確定記得XXEvent,以及XXEventListener接口,都會有一個方法onEvent(XXEvent event)。沒錯EventBus乾的就是相同的事情,但藉着採用jdk1.5引入的註解,使得開發消息發佈與訂閱系統很是簡潔方便,無需實現接口或繼承基類。
關於消息發佈與訂閱系統的應用場景,其實在服務端系統開發中也常常用到,遊戲中的任務系統或者一些電商系統的活動系統中常常會用到。例如,遊戲中收集幾個道具就完成某個任務,獲取獎勵。或者不少平臺的簽到系統,累計簽到多少天得到獎勵。這裏就使用Guava中的EventBus開發一個簽到系統。緩存
public class SignInEvent { // 簽到天數 private int count; public SignInEvent(int count) { this.count = count; } public int getCount() { return count; } } public class SignInProcessor { @Subscribe public void signIn(SignInEvent event) { int count = event.getCount(); // TODO 根據簽到的天數發放獎勵 System.out.println("簽到" + count + "天"); } } public class AppTest { @Test public void eventBusTest() { EventBus signInEventBus = new EventBus("SignInEventBus"); SignInProcessor processor = new SignInProcessor(); signInEventBus.register(processor); signInEventBus.post(new SignInEvent(2)); } }
事件處理類SignInProcessor並不須要實現某個接口,只須要在須要處理的方法上加上@Subscribe註解,這裏也並無限制SignInProcessor只能處理SignInEvent,要在SignInProcessor添加其餘事件的處理邏輯也只須要添加一個方法,添加註解@Subscribe,第一個參數傳入須要處理的事件實例。安全
public class SignInProcessor { @Subscribe public void signIn(SignInEvent event) { int count = event.getCount(); // TODO 根據簽到的天數發放獎勵 System.out.println("簽到" + count + "天"); } @Subscribe public void logout(LogoutEvent event) { // 獲取登出的時間 Date date = event.getTime(); // TODO } }
這裏先說下,EventBus是如何肯定一個類中的某個方法來處理相應的事件的呢,Guava中提供了一個接口多線程
interface SubscriberFindingStrategy { Multimap<Class<?>, EventSubscriber> findAllSubscribers(Object source); }
並提供了一個機遇註解的實現併發
class AnnotatedSubscriberFinder implements SubscriberFindingStrategy
findAllSubscribers的實現邏輯app
@Override public Multimap<Class<?>, EventSubscriber> findAllSubscribers(Object listener) { Multimap<Class<?>, EventSubscriber> methodsInListener = HashMultimap.create(); Class<?> clazz = listener.getClass(); for (Method method : getAnnotatedMethods(clazz)) { Class<?>[] parameterTypes = method.getParameterTypes(); Class<?> eventType = parameterTypes[0]; EventSubscriber subscriber = makeSubscriber(listener, method); methodsInListener.put(eventType, subscriber); } return methodsInListener; }
1. 獲取listener對象類,以及父類中全部被@Subscriber註解的方法,並且這個方法有且只有一個參數
2. 獲取這些方法的第一個參數的類型
3. 建立EventSubscriber,保存listener實例,以及對應的Method實例,方便之後反射調用方法處理事件
講完如何查找處理方法後,再看下具體的事件對象是如何發佈出去的呢?這裏的具體邏輯就在EventBus中的post方法中異步
public void post(Object event) { // 獲取事件對象的全部父類以及父類實現的接口 // 獲取父類的目的爲的是能夠把event發佈給某些接收父類Event的處理方法 Set<Class<?>> dispatchTypes = flattenHierarchy(event.getClass()); boolean dispatched = false; for (Class<?> eventType : dispatchTypes) { // subscribersByType 是一個非線程安全的集合,因此在操做的時候須要添加鎖 subscribersByTypeLock.readLock().lock(); try { Set<EventSubscriber> wrappers = subscribersByType.get(eventType); if (!wrappers.isEmpty()) { dispatched = true; for (EventSubscriber wrapper : wrappers) { // 放入當前線程對應的Queue中,這裏使用到ThreadLocal變量 enqueueEvent(event, wrapper); } } } finally { subscribersByTypeLock.readLock().unlock(); } } // 若未能找到對應Event的處理器並且當前事件的類型不是DeadEvent就把傳入的事件包裝成DeadEvent if (!dispatched && !(event instanceof DeadEvent)) { post(new DeadEvent(this, event)); } // 從當前線程的對應的隊列中事件處理器和事件,並處理事件 dispatchQueuedEvents(); }
從EventBus中的post方法處理邏輯來看,事件的分發和處理是在同一個線程中同步處理的。
可是不少時候事件的處理邏輯比較複雜耗時,須要將事件的分發和處理異步。事件的處理不阻塞分發的主線程。
Guava提供了AsyncEventBus,就是將分發和處理異步化。AsyncEventBus的實現並不複雜。
AsyncEventBus 繼承自 EventBus, 將EventBus中的存放待分發的事件隊列eventsToDispatch從ThreadLocal<Queue<EventWithSubscriber>>換成了ConcurrentLinkedQueue<EventWithSubscriber> 支持多個線程併發訪問獲取事件處理,AsyncEventBus的構造函數須要傳入一個Executor,能夠根據實際須要傳入定製的線程池。
某些場景下,在事件處理類的實例中須要保存事件相關狀態,多線程併發訪問的時候可能出現問題。Guava提供了註解@AllowConcurrentEvents,它的用途標記多個線程可否同時調用同一個事件處理器的處理方法來處理相依的事件。
具體處理邏輯在AnnotatedSubscriberFinder類中的ide
private static EventSubscriber makeSubscriber(Object listener, Method method) { EventSubscriber wrapper; // 這裏判斷事件處理方法是否有被@AllowConcurrentEvents註解 if (methodIsDeclaredThreadSafe(method)) { wrapper = new EventSubscriber(listener, method); } else { // 若沒有被@AllowConcurrentEvents註解,多個線程在處理的時候就須要同步調用該處理器來處理事件 wrapper = new SynchronizedEventSubscriber(listener, method); } return wrapper; }
好了,Guava中的EventBus相關使用及實現基本講完了。其實並不複雜。須要你對下面的相關類和處理機制比較熟悉
1. ThreadLocal
2. ConcurrentLinkedQueue
3. 反射獲取類的父類和接口,這裏使用Guava中的TypeToken封裝類,已經反射方法調用
4. ReentrantReadWriteLock
5. Cache,Guava對經常使用的緩存作了一些封裝,下一篇將會講到函數