解構EventBus框架(四)Google guava的實現

EventBus的主要模塊

Subscribe

註解,能夠標註哪一個方法能夠被註冊和通知。它要求被註解的方法有且只有一個參數,而且該參數就是要註冊監聽的事件,例如:java

class EventBusChangeRecorder {
  @Subscribe 
  public void recordCustomerChange(ChangeEvent e) {
    recordChange(e.getChange());
  }
}
複製代碼

註冊時,無須明確指定事件git

eventBus.register(new EventBusChangeRecorder());
複製代碼

Subscriber

被@Subscribe註解的方法,能夠被調用執行,至關於事件處理器github

private EventBus bus;
final Object target;
private final Method method;
private final Executor executor;

final void dispatchEvent(final Object event) {
    this.executor.execute(new Runnable() {
        public void run() {
            try {
                Subscriber.this.invokeSubscriberMethod(event);
            } catch (InvocationTargetException var2) {
                Subscriber.this.bus.handleSubscriberException(var2.getCause(), Subscriber.this.context(event));
            }
        }
    });
}
複製代碼

target,事件註冊的實例編程

method,Java反射中的方法實例安全

dispatchEvent,對外暴露的調用(事件分發)方法併發

SubscriberRegistry

事件的註冊表類,主要提供了註冊方法,取消註冊方法框架

void register(Object listener) {
    Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);

    for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
      Class<?> eventType = entry.getKey();
      Collection<Subscriber> eventMethodsInListener = entry.getValue();

      //引入了CopyOnWriteArraySet解決集合的線程安全問題
      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
        
      if (eventSubscribers == null) {
        CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
      }
	  
      eventSubscribers.addAll(eventMethodsInListener);
    }
  }
複製代碼
void unregister(Object listener) {
    Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);

    for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
      Class<?> eventType = entry.getKey();
      Collection<Subscriber> listenerMethodsForType = entry.getValue();

      CopyOnWriteArraySet<Subscriber> currentSubscribers = subscribers.get(eventType);
      if (currentSubscribers == null || !currentSubscribers.removeAll(listenerMethodsForType)) {
        throw new IllegalArgumentException();
      }
    }
  }
複製代碼

從代碼中咱們能夠看到,這兩個方法都實現了線程安全,經過使用CopyOnWriteArraySet巧妙解決了註冊/取消註冊時的線程安全問題。ide

CopyOnWriteArraySet,在寫入數據的時候,會建立一個新的 set,而且將原始數據 clone 到新的 set 中,在新的 set 中寫入數據完成以後,再用新的 set 替換老的 set。這樣就能保證在寫入數據的時候,不影響數據的讀取操做,以此來解決讀寫併發問題。post

後續有時間解析一下CopyOnWriteArraySet的源碼this

EventBus

事件總線的組合類,組合了事件分發器(後面要提的Dispatcher)、事件註冊表等,統一對外提供註冊、分發等功能

private final String identifier;
private final Executor executor;
private final SubscriberExceptionHandler exceptionHandler;

private final SubscriberRegistry subscribers = new SubscriberRegistry(this);
private final Dispatcher dispatcher;

public void register(Object object) {
   subscribers.register(object);
}

public void unregister(Object object) {
   subscribers.unregister(object);
}

public void post(Object event) {
    Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
    if (eventSubscribers.hasNext()) {
      dispatcher.dispatch(event, eventSubscribers);
    } else if (!(event instanceof DeadEvent)) {
      // the event had no subscribers and was not itself a DeadEvent
      post(new DeadEvent(this, event));
    }
}
複製代碼

總結一下,以上幾個模塊構成了EventBus的主體框架,基於這個框架解決了咱們上一章節提到的

  • 線程安全問題:經過引入線程安全集合CopyOnWriteArraySet

  • 顯式註冊問題:經過定義註解類Subscribe,標記能夠被註冊的方法,而且將要監聽的Event做爲方法的惟一參數,再利用Java反射的特性,實現了隱式註冊

其它模塊

Dispatcher

EventBus提供專門的事件分發器,而且爲事件的分發提供了兩種策略,一種是廣度優先,一種是深度優先。

private static final class PerThreadQueuedDispatcher extends Dispatcher {

  private final ThreadLocal<Queue<Event>> queue;
  private final ThreadLocal<Boolean> dispatching;

  @Override
  void dispatch(Object event, Iterator<Subscriber> subscribers) {
    Queue<Event> queueForThread = queue.get();
    queueForThread.offer(new Event(event, subscribers));

    if (!dispatching.get()) {
      dispatching.set(true);
      try {
        Event nextEvent;
        while ((nextEvent = queueForThread.poll()) != null) {
          while (nextEvent.subscribers.hasNext()) {
            nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
          }
        }
      } finally {
        dispatching.remove();
        queue.remove();
      }
    }
  }
}
複製代碼

上面的分發器,在被同一個線程分發(同一個線程調用post)時,可以保證事件分發的有序性。同時也引入了Queue實現了廣度優先,下面咱們看一下另一個深度優先的實現

private static final class ImmediateDispatcher extends Dispatcher {

  @Override
  void dispatch(Object event, Iterator<Subscriber> subscribers) {
    while (subscribers.hasNext()) {
      subscribers.next().dispatchEvent(event);
    }
  }
}
複製代碼

SubscriberExceptionHandler

上章節中咱們一樣提到了調用時異常處理的問題,guava一樣給處理解決辦法。guava定義了一個接口

public interface SubscriberExceptionHandler {
  /** Handles exceptions thrown by subscribers. */
  void handleException(Throwable exception, SubscriberExceptionContext context);
}
複製代碼

在調用出現異常時,回調這個接口(完整代碼請參考Subscriber的dispatchEvent方法)

try {
    Subscriber.this.invokeSubscriberMethod(event);
} catch (InvocationTargetException var2) {
    //出現異常,調用自定義異常處理
    Subscriber.this.bus.handleSubscriberException(var2.getCause(), Subscriber.this.context(event));
}
複製代碼

同時在實例化EventBus時,可傳入自定義異常處理

public EventBus(SubscriberExceptionHandler exceptionHandler) {
    this("default", MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), exceptionHandler);
}
複製代碼

以上,結合上章節提出的問題,對guava的EventBus作了分析,咱們能看到Google實現的Eventbus代碼很優雅,程序也很健壯,他們在設計的時候會考慮到不少方面,這對咱們本身編程以及代碼框架會有很多啓發。

最後

有關EventBus的整個系列都寫完了,在寫做的過程當中,我不斷回看guava的源碼,收穫甚多。建議你們也去讀一讀guava源碼,瞭解一下世界上頂級的Java開發者是如何寫代碼的。

github.com/google/guav…

相關文章
相關標籤/搜索