guava eventbus源碼解析

說在前面java

本文轉自「天河聊技術」微信公衆號spring

事件驅動模型設計是一種優雅的程序設計方式,實現有不少,原理都是發佈與訂閱,觀察者設計模式實現,java自帶的實現、spring ioc的事件驅動模型,還有guava的實現,今天介紹guava eventbus的源碼實現,看過這篇文章你本身也能夠實現實現一套了。數據庫

 

guava event源碼解析設計模式

先上一個demo實現,瞭解車的原理以前先上去感覺下緩存

/**
 * 事件
 * weifeng.jiang 2018-06-11 19:06
 */
public class HelloEvent {

    private String message;

    public HelloEvent(String message) {
        this.message = message;
    }

    public String getMessage() {
        return message;
    }
}
/**
 * 訂閱者
 * weifeng.jiang 2018-06-11 19:11
 */
public class EventListener {

    @Subscribe
    public void listen(HelloEvent helloEvent){
        System.out.println(helloEvent.getMessage());
    }
}
/**
 * 客戶端
 * weifeng.jiang 2018-06-11 19:12
 */
public class Main {

    public static void main(String[] args) {
//        建立事件總線
        EventBus eventBus = new EventBus("test");
//        建立訂閱者
        EventListener listener = new EventListener();
//       註冊訂閱者
        eventBus.register(listener);
//        發佈事件
        eventBus.post(new HelloEvent("asdasd"));
        eventBus.post(new HelloEvent("asdasdasdas"));
        eventBus.post(new HelloEvent("asdasdasdasd"));
    }
}

實現原理架構圖服務器

怎麼成爲一個訂閱者接受事件呢微信

接受事件的對象應有一個public方法,用@Subscribe註解標記這個方法,將接受事件對象傳遞給EventBus實例的register(Object)方法,參考圖一和圖三架構

 

怎麼發佈事件呢併發

只須要調用EventBus實例的post方法,參考圖三異步

 

guava eventbus事件總線有兩種,同步的實現EventBus,異步的實現AsyncEventBus,若是訂閱者在接收事件後進行長時間的邏輯處理,好比和數據庫交互,這時候就須要用異步事件了,若是是簡單處理,同步實現就能夠。

 

這裏以EventBus事件總線同步實現爲例進行源碼解析。

 

成爲訂閱者的源碼實現

和@Subscribe註解配合使用的還有一個@AllowConcurrentEvents註解,這個註解是能夠容許事件併發的執行,看下建立訂閱者對象的源碼實現,以下

/** Creates a {@code Subscriber} for {@code method} on {@code listener}. 爲監聽器上的方法建立一個訂閱服務器。*/
static Subscriber create(EventBus bus, Object listener, Method method) {
  return isDeclaredThreadSafe(method)
      ? new Subscriber(bus, listener, method)
      : new SynchronizedSubscriber(bus, listener, method);
}

能夠容許併發事件,在這個類中

@VisibleForTesting
  static final class SynchronizedSubscriber extends Subscriber {

    private SynchronizedSubscriber(EventBus bus, Object target, Method method) {
      super(bus, target, method);
    }

    @Override
    void invokeSubscriberMethod(Object event) throws InvocationTargetException {
      synchronized (this) {
        super.invokeSubscriberMethod(event);
      }
    }
  }
}

執行事件的時候是同步實現。

 

事件總線訂閱源碼實現

com.google.common.eventbus.SubscriberRegistry#register

void register(Object listener) {
//    查找全部訂閱者,維護了一個key是事件類型,value是定訂閱這個事件類型的訂閱者集合的一個map
    Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);

    for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
//      獲取事件類型
      Class<?> eventType = entry.getKey();
//      獲取這個事件類型的訂閱者集合
      Collection<Subscriber> eventMethodsInListener = entry.getValue();

//      從緩存中按事件類型查找訂閱者集合
      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);

      if (eventSubscribers == null) {
//        從緩存中取不到,更新緩存
        CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
        eventSubscribers =
            MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
      }

      eventSubscribers.addAll(eventMethodsInListener);
    }
  }

事件和訂閱事件的訂閱者集合是在com.google.common.eventbus.SubscriberRegistry這裏維護的

private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
    Maps.newConcurrentMap();

到這裏,訂閱者已經準備好了,準備接受事件了。

 

發佈事件源碼實現

com.google.common.eventbus.EventBus#post

public void post(Object event) {
//    獲取事件的訂閱者集合
    Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
    if (eventSubscribers.hasNext()) {
//      轉發事件
      dispatcher.dispatch(event, eventSubscribers);
//      若是不是死亡事件,從新包裝成死亡事件從新發布
    } else if (!(event instanceof DeadEvent)) {
      // the event had no subscribers and was not itself a DeadEvent
      post(new DeadEvent(this, event));
    }
  }
Iterator<Subscriber> getSubscribers(Object event) {
//    獲取事件類型類的超類集合
    ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass());

    List<Iterator<Subscriber>> subscriberIterators =
        Lists.newArrayListWithCapacity(eventTypes.size());

    for (Class<?> eventType : eventTypes) {
//      獲取事件類型的訂閱者集合
      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
      if (eventSubscribers != null) {
        // eager no-copy snapshot
        subscriberIterators.add(eventSubscribers.iterator());
      }
    }

    return Iterators.concat(subscriberIterators.iterator());
  }

事件轉發器有三種實現

第一種是當即轉發,實時性比較高,其餘兩種都是隊列實現。

執行訂閱方法都是異步實現

final void dispatchEvent(final Object event) {
  executor.execute(
      new Runnable() {
        @Override
        public void run() {
          try {
            invokeSubscriberMethod(event);
          } catch (InvocationTargetException e) {
            bus.handleSubscriberException(e.getCause(), context(event));
          }
        }
      });
}

 

說到最後

本次源碼解析到這裏,僅供參考。

相關文章
相關標籤/搜索