結構圖安全
代碼實現數據結構
public abstract class Subject { private List<Observer> observerList = new ArrayList<Observer>(); /** * 註冊觀察者 * @param observer */ public void register(Observer observer) { observerList.add(observer); } /** * 註銷觀察者 * * @param observer */ public void unregister(Observer observer) { observerList.remove(observer); } /** * 通知觀察者更新 */ public void post() { for (Observer observer : observerList) { observer.update(); } } /** * 獲取被通知事件 * * @return */ public abstract Object getEvent(); } public class ConcreteSubject1 extends Subject { /** 個性化的定製內容 */ private String subjectState; public ConcreteSubject1(String subjectState) { this.subjectState = subjectState; } @Override public Object getEvent() { System.out.println("Custom ConcreteSubject1"); return subjectState; } } public class ConcreteSubject2 extends Subject { private int subjectState; public ConcreteSubject2(int subjectState) { this.subjectState = subjectState; } @Override public Object getEvent() { System.out.println("Custom ConcreteSubject2"); return subjectState; } } public abstract class Observer { /** 用於觀察者獲取被通知的事件 */ protected Subject subject; /** * 用於給Subject通知時調用的更新方法 */ public abstract void update(); } public class ConcreteObserver1 extends Observer { public ConcreteObserver1(Subject subject) { this.subject = subject; } @Override public void update() { System.out.println("Subject " + subject.getEvent() + " ConcreteObserver1"); } } public class ConcreteObserver1 extends Observer { public ConcreteObserver1(Subject subject) { this.subject = subject; } @Override public void update() { System.out.println("Subject " + subject.getEvent() + " ConcreteObserver1"); } } public class ConcreteObserver1 extends Observer { public ConcreteObserver1(Subject subject) { this.subject = subject; } @Override public void update() { System.out.println("Subject " + subject.getEvent() + " ConcreteObserver1"); } }
輸出:多線程
Custom ConcreteSubject1
Subject Sub1 ConcreteObserver1
Custom ConcreteSubject1
Subject Sub1 ConcreteObserver2併發
EventBus是Guava提供的消息發佈-訂閱類庫,它的工做機制相似於觀察者模式,經過通知者去註冊觀察者,最後由通知者想觀察者發佈消息,示例代碼app
public class MsgCenter { /** EventBus的定位跟接近於消息中心,而他的post()方法跟接近於一個自定義的Subject */ public static EventBus eventBus = new EventBus(); } public class Observer1 { /** * 只有經過@Subscribe註解的方法纔會被註冊進EventBus * 並且方法有且只能有1個參數 * @param msg */ @Subscribe public void ob1Mthod1(String msg) { System.out.println(msg + " test1!"); } @Subscribe public void ob1Method2(String msg) { System.out.println(msg + " test2!"); } } public class Observer2 { @Subscribe public void ob2Method1(String msg) { System.out.println(msg + " test3!"); } // 錯誤的基本型參數 // @Subscribe // public void ob2Method2(int msg) { // System.out.println(msg + " test4!"); // } /** * post() 不支持自動裝箱功能,只能使用Integer,不能使用int,不然handlersByType的Class會是int而不是Intege * 而傳入的int msg參數在post(int msg)的時候會被包裝成Integer,致使沒法匹配到 */ @Subscribe public void ob2Method2(Integer msg) { System.out.println(msg + " test4!"); } } public class Test { public static void main(String[] args) throws InterruptedException { EventBus eventBus = new EventBus(); Observer1 observer1 = new Observer1(); Observer2 observer2 = new Observer2(); eventBus.register(observer1); eventBus.register(observer2); // 只有註冊的參數類型爲String的方法會被調用 eventBus.post("post string method"); // 註銷observer2 eventBus.unregister(observer2); eventBus.post("post string method after unregister"); } }
輸出less
post string method test2!
post string method test1!
post string method test3!
post string method after unregister test2!
post string method after unregister test1!ide
實際上EventBus要表達的意圖很簡單,就是將post(Object arg)這裏的arg當作參數傳入到已註冊的方法(被@Subscribe)的方法裏,並調用該方法,因此當post(String)的時候,調用的參數類型爲String的註冊方法,當post(int)的時候,調用則是參數類型爲Integer的註冊方法post
eventbus的實現方式實際上相似於上例寫的簡單的觀察者模式,不一樣點在於它實現了泛化的註冊方法以及泛化的方法調用,另外還考慮到了多線程的問題,對多線程使用時作了一些優化優化
register()方法註冊一個任意類型的實例並將其使用了@Subscribe的方法註冊到一個Map中,這個Map使用方法的參數類型Class爲Key,值爲一個Set,這個Set包含了全部參數類型爲Key的EventHandler,Eventhandler是EventBus定義的一個數據結構,由listener(方法擁有者instance,例如上例中的observer1)和這個listener的@Subscribe方法構成ui
]這個Map的結構示意圖以下
其實現代碼以下
/** * Registers all handler methods on {@code object} to receive events. * Handler methods are selected and classified using this EventBus's * {@link HandlerFindingStrategy}; the default strategy is the * {@link AnnotatedHandlerFinder}. * * @param object object whose handler methods should be registered. */ public void register(Object object) { handlersByType.putAll(finder.findAllHandlers(object)); } /** * {@inheritDoc} * * This implementation finds all methods marked with a {@link Subscribe} annotation. */ @Override public Multimap<Class<?>, EventHandler> findAllHandlers(Object listener) { Multimap<Class<?>, EventHandler> methodsInListener = HashMultimap.create(); Class<?> clazz = listener.getClass(); Set<? extends Class<?>> supers = TypeToken.of(clazz).getTypes().rawTypes(); for (Method method : clazz.getMethods()) { /* * Iterate over each distinct method of {@code clazz}, checking if it is annotated with * @Subscribe by any of the superclasses or superinterfaces that declare it. */ for (Class<?> c : supers) { try { Method m = c.getMethod(method.getName(), method.getParameterTypes()); if (m.isAnnotationPresent(Subscribe.class)) { Class<?>[] parameterTypes = method.getParameterTypes(); if (parameterTypes.length != 1) { throw new IllegalArgumentException("Method " + method + " has @Subscribe annotation, but requires " + parameterTypes.length + " arguments. Event handler methods must require a single argument."); } Class<?> eventType = parameterTypes[0]; EventHandler handler = makeHandler(listener, method); methodsInListener.put(eventType, handler); break; } } catch (NoSuchMethodException ignored) { // Move on. } } } return methodsInListener; }
post(Object event)方法用於向已註冊的方法傳遞一個參數,並以此調用參數類型爲event.class的全部方法,調用的時候會使用一個ThreadLocal的Queue來進行任務分發,這樣的結果就是在多線程狀況下,線程間共享註冊方法的Map(上面提到那個),當時在發送消息時線程會保有本身獨立的一個Post任務的Queue,保證了線程執行post()方法時候的獨立性而不會相互影響,下面是多線程執行post()時的示意圖
真正在執行post(Object event)的時候,會將msg與全部event.class對應的set裏的全部method組合成一個EventBus.EventWithHandler對象並加入到ThreadLocal的Queue中,最後再將Queue出隊依次執行這些方法,最後清空ThreadLocal的Queue,EventBus的實現代碼以下
/** * Posts an event to all registered handlers. This method will return * successfully after the event has been posted to all handlers, and * regardless of any exceptions thrown by handlers. * * <p>If no handlers have been subscribed for {@code event}'s class, and * {@code event} is not already a {@link com.google.common.eventbus.DeadEvent}, it will be wrapped in a * DeadEvent and reposted. * * @param event event to post. */ @SuppressWarnings("deprecation") // only deprecated for external subclasses public void post(Object event) { Set<Class<?>> dispatchTypes = flattenHierarchy(event.getClass()); boolean dispatched = false; // 將event和全部event.class對應的方法組合成EventWithHandler併入隊 for (Class<?> eventType : dispatchTypes) { Set<EventHandler> wrappers = getHandlersForEventType(eventType); if (wrappers != null && !wrappers.isEmpty()) { dispatched = true; for (EventHandler wrapper : wrappers) { enqueueEvent(event, wrapper); } } } if (!dispatched && !(event instanceof DeadEvent)) { post(new DeadEvent(this, event)); } dispatchQueuedEvents(); } /** * Queue the {@code event} for dispatch during * {@link #dispatchQueuedEvents()}. Events are queued in-order of occurrence * so they can be dispatched in the same order. */ void enqueueEvent(Object event, EventHandler handler) { eventsToDispatch.get().offer(new EventWithHandler(event, handler)); } /** * Drain the queue of events to be dispatched. As the queue is being drained, * new events may be posted to the end of the queue. * * @deprecated This method should not be overridden outside of the eventbus package. It is * scheduled for removal in Guava 14.0. */ @Deprecated protected void dispatchQueuedEvents() { // don't dispatch if we're already dispatching, that would allow reentrancy // and out-of-order events. Instead, leave the events to be dispatched // after the in-progress dispatch is complete. if (isDispatching.get()) { return; } isDispatching.set(true); try { while (true) { EventWithHandler eventWithHandler = eventsToDispatch.get().poll(); if (eventWithHandler == null) { break; } dispatch(eventWithHandler.event, eventWithHandler.handler); } } finally { isDispatching.set(false); } }
斜體加粗部分即爲關鍵部分
public class Test2 { public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread() { @Override public void run() { System.out.println(Thread.currentThread()); System.out.println("start"); Observer1 observer1 = new Observer1(); MsgCenter.eventBus.register(observer1); MsgCenter.eventBus.post("post string"); System.out.println("end"); System.out.println(); } }; Thread t2 = new Thread() { @Override public void run() { System.out.println(Thread.currentThread()); System.out.println("start"); Observer2 observer2 = new Observer2(); MsgCenter.eventBus.register(observer2); MsgCenter.eventBus.post("post string2"); System.out.println("end"); System.out.println(); } }; ExecutorService exec = Executors.newFixedThreadPool(2); exec.execute(t1); // 爲什麼忽略多線程是run()方法的線程安全問題,讓兩個任務分開執行 TimeUnit.MILLISECONDS.sleep(500); exec.execute(t2); exec.shutdown(); } }
輸出
Thread[pool-1-thread-1,5,main]
start
post string test1!
post string test2!
endThread[pool-1-thread-2,5,main]
start
post string2 test1!
post string2 test2!
post string2 test3!
end
這裏爲了讓執行結果更清晰,並無讓兩個線程併發執行,但能夠清楚看到他們是共享同一個註冊方法的Map的,而因爲post()分發消息時間的Queue是ThreadLocal的,這個Queue由每一個線程獨有
對於這裏使用ThreadLocal的Queue的我的理解就是假如不使用ThreadLocal,共享同一個隊列,就有可能由Thread1入隊的EventWithHandler會由Thread2來執行,而由Thread2入隊的EventWithHandler又有可能會由Thread1來執行,而形成執行秩序混亂.
而使用ThreadLocal的Queue,則這些EventWithHandler示例是由哪一個線程入隊就由哪一個線程執行,同時不須要去考慮共享隊列的入隊和出隊時的線程安全問題,也能夠提高效率