觀察者模式與Guava EventBus

觀察者模式

結構圖安全

 

代碼實現數據結構

public abstract class Subject {

    private List<Observer> observerList = new ArrayList<Observer>();

    /**
     * 註冊觀察者
     * @param observer
     */
    public void register(Observer observer) {
        observerList.add(observer);
    }

    /**
     * 註銷觀察者
     *
     * @param observer
     */
    public void unregister(Observer observer) {
        observerList.remove(observer);
    }

    /**
     * 通知觀察者更新
     */
    public void post() {
        for (Observer observer : observerList) {
            observer.update();
        }
    }

    /**
     * 獲取被通知事件
     *
     * @return
     */
    public abstract Object getEvent();
}


public class ConcreteSubject1 extends Subject {

    /** 個性化的定製內容 */
    private String subjectState;

    public ConcreteSubject1(String subjectState) {
        this.subjectState = subjectState;
    }

    @Override
    public Object getEvent() {
        System.out.println("Custom ConcreteSubject1");
        return subjectState;
    }
}


public class ConcreteSubject2 extends Subject {

    private int subjectState;

    public ConcreteSubject2(int subjectState) {
        this.subjectState = subjectState;
    }

    @Override
    public Object getEvent() {
        System.out.println("Custom ConcreteSubject2");
        return subjectState;
    }
}


public abstract class Observer {

    /** 用於觀察者獲取被通知的事件 */
    protected Subject subject;

    /**
     * 用於給Subject通知時調用的更新方法
     */
    public abstract void update();
}


public class ConcreteObserver1 extends Observer {

    public ConcreteObserver1(Subject subject) {
        this.subject = subject;
    }

    @Override
    public void update() {
        System.out.println("Subject " + subject.getEvent() + " ConcreteObserver1");
    }
}


public class ConcreteObserver1 extends Observer {

    public ConcreteObserver1(Subject subject) {
        this.subject = subject;
    }

    @Override
    public void update() {
        System.out.println("Subject " + subject.getEvent() + " ConcreteObserver1");
    }
}


public class ConcreteObserver1 extends Observer {

    public ConcreteObserver1(Subject subject) {
        this.subject = subject;
    }

    @Override
    public void update() {
        System.out.println("Subject " + subject.getEvent() + " ConcreteObserver1");
    }
}

 

輸出:多線程

Custom ConcreteSubject1
Subject Sub1 ConcreteObserver1
Custom ConcreteSubject1
Subject Sub1 ConcreteObserver2併發

 

 

EventBus簡單示例

EventBus是Guava提供的消息發佈-訂閱類庫,它的工做機制相似於觀察者模式,經過通知者去註冊觀察者,最後由通知者想觀察者發佈消息,示例代碼app

public class MsgCenter {

    /** EventBus的定位跟接近於消息中心,而他的post()方法跟接近於一個自定義的Subject */
    public static EventBus eventBus = new EventBus();
}


public class Observer1 {

    /**
     * 只有經過@Subscribe註解的方法纔會被註冊進EventBus
     * 並且方法有且只能有1個參數
     * @param msg
     */
    @Subscribe
    public void ob1Mthod1(String msg) {
        System.out.println(msg + " test1!");
    }

    @Subscribe
    public void ob1Method2(String msg) {
        System.out.println(msg + " test2!");
    }
}


public class Observer2 {
    @Subscribe
    public void ob2Method1(String msg) {
        System.out.println(msg + " test3!");
    }

    // 錯誤的基本型參數
    // @Subscribe
    // public void ob2Method2(int msg) {
    // System.out.println(msg + " test4!");
    // }
    /**
     * post() 不支持自動裝箱功能,只能使用Integer,不能使用int,不然handlersByType的Class會是int而不是Intege
     * 而傳入的int msg參數在post(int msg)的時候會被包裝成Integer,致使沒法匹配到
     */
    @Subscribe
    public void ob2Method2(Integer msg) {
        System.out.println(msg + " test4!");
    }
}



public class Test {

    public static void main(String[] args) throws InterruptedException {
        EventBus eventBus = new EventBus();
        Observer1 observer1 = new Observer1();
        Observer2 observer2 = new Observer2();

        eventBus.register(observer1);
        eventBus.register(observer2);

        // 只有註冊的參數類型爲String的方法會被調用
        eventBus.post("post string method");

        // 註銷observer2
        eventBus.unregister(observer2);
        eventBus.post("post string method after unregister");
    }
}

 

輸出less

post string method test2!
post string method test1!
post string method test3!
post string method after unregister test2!
post string method after unregister test1!ide

 

實際上EventBus要表達的意圖很簡單,就是將post(Object arg)這裏的arg當作參數傳入到已註冊的方法(被@Subscribe)的方法裏,並調用該方法,因此當post(String)的時候,調用的參數類型爲String的註冊方法,當post(int)的時候,調用則是參數類型爲Integer的註冊方法post

 

EventBus的實現方式

eventbus的實現方式實際上相似於上例寫的簡單的觀察者模式,不一樣點在於它實現了泛化的註冊方法以及泛化的方法調用,另外還考慮到了多線程的問題,對多線程使用時作了一些優化優化

register(Object listener)

register()方法註冊一個任意類型的實例並將其使用了@Subscribe的方法註冊到一個Map中,這個Map使用方法的參數類型Class爲Key,值爲一個Set,這個Set包含了全部參數類型爲Key的EventHandler,Eventhandler是EventBus定義的一個數據結構,由listener(方法擁有者instance,例如上例中的observer1)和這個listener的@Subscribe方法構成ui

]這個Map的結構示意圖以下

 

其實現代碼以下

  /**
   * Registers all handler methods on {@code object} to receive events.
   * Handler methods are selected and classified using this EventBus's
   * {@link HandlerFindingStrategy}; the default strategy is the
   * {@link AnnotatedHandlerFinder}.
   *
   * @param object  object whose handler methods should be registered.
   */
  public void register(Object object) {
    handlersByType.putAll(finder.findAllHandlers(object));
  }



  /**
   * {@inheritDoc}
   *
   * This implementation finds all methods marked with a {@link Subscribe} annotation.
   */
  @Override
  public Multimap<Class<?>, EventHandler> findAllHandlers(Object listener) {
    Multimap<Class<?>, EventHandler> methodsInListener = HashMultimap.create();
    Class<?> clazz = listener.getClass();
    Set<? extends Class<?>> supers = TypeToken.of(clazz).getTypes().rawTypes();

    for (Method method : clazz.getMethods()) {
      /*
       * Iterate over each distinct method of {@code clazz}, checking if it is annotated with
       * @Subscribe by any of the superclasses or superinterfaces that declare it.
       */
      for (Class<?> c : supers) {
        try {
          Method m = c.getMethod(method.getName(), method.getParameterTypes());
          if (m.isAnnotationPresent(Subscribe.class)) {
            Class<?>[] parameterTypes = method.getParameterTypes();
            if (parameterTypes.length != 1) {
              throw new IllegalArgumentException("Method " + method
                  + " has @Subscribe annotation, but requires " + parameterTypes.length
                  + " arguments.  Event handler methods must require a single argument.");
            }
            Class<?> eventType = parameterTypes[0];
            EventHandler handler = makeHandler(listener, method);

            methodsInListener.put(eventType, handler);
            break;
          }
        } catch (NoSuchMethodException ignored) {
          // Move on.
        }
      }
    }
    return methodsInListener;
  }

 

post(Object event)

post(Object event)方法用於向已註冊的方法傳遞一個參數,並以此調用參數類型爲event.class的全部方法,調用的時候會使用一個ThreadLocal的Queue來進行任務分發,這樣的結果就是在多線程狀況下,線程間共享註冊方法的Map(上面提到那個),當時在發送消息時線程會保有本身獨立的一個Post任務的Queue,保證了線程執行post()方法時候的獨立性而不會相互影響,下面是多線程執行post()時的示意圖

真正在執行post(Object event)的時候,會將msg與全部event.class對應的set裏的全部method組合成一個EventBus.EventWithHandler對象並加入到ThreadLocal的Queue中,最後再將Queue出隊依次執行這些方法,最後清空ThreadLocal的Queue,EventBus的實現代碼以下

    /**
     * Posts an event to all registered handlers.  This method will return
     * successfully after the event has been posted to all handlers, and
     * regardless of any exceptions thrown by handlers.
     *
     * <p>If no handlers have been subscribed for {@code event}'s class, and
     * {@code event} is not already a {@link com.google.common.eventbus.DeadEvent}, it will be wrapped in a
     * DeadEvent and reposted.
     *
     * @param event  event to post.
     */
    @SuppressWarnings("deprecation") // only deprecated for external subclasses
    public void post(Object event) {
        Set<Class<?>> dispatchTypes = flattenHierarchy(event.getClass());

        boolean dispatched = false;
        // 將event和全部event.class對應的方法組合成EventWithHandler併入隊
 for (Class<?> eventType : dispatchTypes) { Set<EventHandler> wrappers = getHandlersForEventType(eventType); if (wrappers != null && !wrappers.isEmpty()) { dispatched = true; for (EventHandler wrapper : wrappers) { enqueueEvent(event, wrapper); } } } if (!dispatched && !(event instanceof DeadEvent)) {
            post(new DeadEvent(this, event));
        }

        dispatchQueuedEvents();
    }

    /**
     * Queue the {@code event} for dispatch during
     * {@link #dispatchQueuedEvents()}. Events are queued in-order of occurrence
     * so they can be dispatched in the same order.
     */
    void enqueueEvent(Object event, EventHandler handler) {
        eventsToDispatch.get().offer(new EventWithHandler(event, handler));
    }

    /**
     * Drain the queue of events to be dispatched. As the queue is being drained,
     * new events may be posted to the end of the queue.
     *
     * @deprecated This method should not be overridden outside of the eventbus package. It is
     *     scheduled for removal in Guava 14.0.
     */
    @Deprecated
    protected void dispatchQueuedEvents() {
        // don't dispatch if we're already dispatching, that would allow reentrancy
        // and out-of-order events. Instead, leave the events to be dispatched
        // after the in-progress dispatch is complete.
        if (isDispatching.get()) {
            return;
        }

        isDispatching.set(true);
        try {
            while (true) {
                EventWithHandler eventWithHandler = eventsToDispatch.get().poll();
                if (eventWithHandler == null) {
                    break;
                }

                dispatch(eventWithHandler.event, eventWithHandler.handler);
            }
        } finally {
            isDispatching.set(false);
        }
    }

斜體加粗部分即爲關鍵部分

 

EventBus多線程使用示例

public class Test2 {

    public static void main(String[] args) throws InterruptedException {

        Thread t1 = new Thread() {

            @Override
            public void run() {
                System.out.println(Thread.currentThread());
                System.out.println("start");
                Observer1 observer1 = new Observer1();
                MsgCenter.eventBus.register(observer1);
                MsgCenter.eventBus.post("post string");
                System.out.println("end");
                System.out.println();
            }
        };

        Thread t2 = new Thread() {

            @Override
            public void run() {
                System.out.println(Thread.currentThread());
                System.out.println("start");
                Observer2 observer2 = new Observer2();
                MsgCenter.eventBus.register(observer2);
                MsgCenter.eventBus.post("post string2");
                System.out.println("end");
                System.out.println();
            }
        };

        ExecutorService exec = Executors.newFixedThreadPool(2);

        exec.execute(t1);

        // 爲什麼忽略多線程是run()方法的線程安全問題,讓兩個任務分開執行
        TimeUnit.MILLISECONDS.sleep(500);

        exec.execute(t2);

        exec.shutdown();
    }
}

輸出

Thread[pool-1-thread-1,5,main]
start
post string test1!
post string test2!
end

Thread[pool-1-thread-2,5,main]
start
post string2 test1!
post string2 test2!
post string2 test3!
end

這裏爲了讓執行結果更清晰,並無讓兩個線程併發執行,但能夠清楚看到他們是共享同一個註冊方法的Map的,而因爲post()分發消息時間的Queue是ThreadLocal的,這個Queue由每一個線程獨有

對於這裏使用ThreadLocal的Queue的我的理解就是假如不使用ThreadLocal,共享同一個隊列,就有可能由Thread1入隊的EventWithHandler會由Thread2來執行,而由Thread2入隊的EventWithHandler又有可能會由Thread1來執行,而形成執行秩序混亂.

而使用ThreadLocal的Queue,則這些EventWithHandler示例是由哪一個線程入隊就由哪一個線程執行,同時不須要去考慮共享隊列的入隊和出隊時的線程安全問題,也能夠提高效率

相關文章
相關標籤/搜索