EventBus 源碼學習筆記(三)

EventBus 深刻學習三之Guava小結

上一篇講述了 EventBus 的整個執行流程, 本片則從細節處出發,探討下設計的精妙java

  1. 巧妙的利用緩存, 解決重複耗時的操做
  2. 異步化的操做
  3. 隊列存儲消息, 以及如何避免消息的重複消費
  4. 消費的前後順序
  5. 截斷
  6. 異常處理

1. 緩存

看代碼時,能夠看到不少地方都用到了緩存,如再註冊時, 根據class獲取全部帶註解的方法; 推送消息時,根據事件類型,獲取全部的超類集合緩存

如註冊時,一條完整的調用鏈安全

com.google.common.eventbus.SubscriberRegistry#register ->
com.google.common.eventbus.SubscriberRegistry#findAllSubscribers ->  com.google.common.eventbus.SubscriberRegistry#getAnnotatedMethods    ->
subscriberMethodsCache.getUnchecked(clazz) ->
com.google.common.eventbus.SubscriberRegistry#getAnnotatedMethodsNotCached

2. 根據類查詢全部超類

TypeToken.of(concreteClass).getTypes().rawTypes());

// 咱們本身的實現, 一直到返回null爲止
clz.getSuperClass().getSuperClass();
// 獲取接口
clz.getInterfaces()

3. 異步

異步推送處理Event和同步處理主要的區別點是使用的 Dispatcher不一樣, 同步是使用 PerThreadQueuedDispatcher , 異步是 LegacyAsyncDispatcher異步

異步的消息分發ide

/**
* Global event queue.
*/
private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
   Queues.newConcurrentLinkedQueue();
   
@Override
    void dispatch(Object event, Iterator<Subscriber> subscribers) {
      checkNotNull(event);
      while (subscribers.hasNext()) {
        queue.add(new EventWithSubscriber(event, subscribers.next()));
      }

      EventWithSubscriber e;
      while ((e = queue.poll()) != null) {
        e.subscriber.dispatchEvent(e.event);
      }
    }

    private static final class EventWithSubscriber {
      private final Object event;
      private final Subscriber subscriber;

      private EventWithSubscriber(Object event, Subscriber subscriber) {
        this.event = event;
        this.subscriber = subscriber;
      }
    }
  }

同步的消息推送學習

/**
* Per-thread queue of events to dispatch.
*/
private final ThreadLocal<Queue<Event>> queue =
   new ThreadLocal<Queue<Event>>() {
     @Override
     protected Queue<Event> initialValue() {
       return Queues.newArrayDeque();
     }
   };
   
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
 checkNotNull(event);
 checkNotNull(subscribers);
 Queue<Event> queueForThread = queue.get();
 queueForThread.offer(new Event(event, subscribers));

 if (!dispatching.get()) {
   dispatching.set(true);
   try {
     Event nextEvent;
     while ((nextEvent = queueForThread.poll()) != null) {
       while (nextEvent.subscribers.hasNext()) {
         nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
       }
     }
   } finally {
     dispatching.remove();
     queue.remove();
   }
 }
}

private static final class Event {
 private final Object event;
 private final Iterator<Subscriber> subscribers;

 private Event(Object event, Iterator<Subscriber> subscribers) {
   this.event = event;
   this.subscribers = subscribers;
 }
}
}

執行時, 在 AsyncEventBus 是在線程池中執行; 而 EventBus 則是直接執行, 實質上的執行器this

public static Executor directExecutor() {
    return DirectExecutor.INSTANCE;
  }

  /** See {@link #directExecutor} for behavioral notes. */
  private enum DirectExecutor implements Executor {
    INSTANCE;
    @Override public void execute(Runnable command) {
      command.run();
    }
  }

4. 線程安全

5. 異常處理

  • 沒有訂閱者時, 拋一個 DeadEventgoogle

  • 訂閱者接收消息後的,執行異常時 (訂閱者之間的隔離)spa

    • 看下具體的執行,比較清晰, 將異常拋給 EventBus的 ExceptionHandler統一處理
    final void dispatchEvent(final Object event) {
    executor.execute(new Runnable() {
      @Override
      public void run() {
        try {
          invokeSubscriberMethod(event);
        } catch (InvocationTargetException e) {
          bus.handleSubscriberException(e.getCause(), context(event));
        }
      }
    });
    }

6. 消費順序 & 截斷

Guava的EventBus不支持定義訂閱者的順序,更談不上截斷線程

相關文章
相關標籤/搜索