Guava之eventBus異步事件總線的使用及源碼分析

    最近使用guava的eventBus,記錄下。java

1、如何使用

    List-1.1併發

import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.Subscribe;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class EventBusTest {
    private AsyncEventBus asyncEventBus;

    @Before
    public void before(){
        asyncEventBus=new AsyncEventBus(Executors.newFixedThreadPool(3));
        asyncEventBus.register(this);
    }

    @Subscribe
    @AllowConcurrentEvents
    public void subscribe(Object object){
        System.out.println("收到:"+object);
    }

    @Test
    public void test_sendMsg() throws InterruptedException {
        System.out.println("開始發送消息");
        asyncEventBus.post("這是消息");
        System.out.println("開始睡眠");
        TimeUnit.SECONDS.sleep(5L);
    }
}

    List-1.1中,方法subscribe是接收者,方法test_sendMsg中post消息後,方法subscribe就會收到消息。這是由於方法subscribe上有註解Subscribe。異步

    爲何要在方法subscribe上加上註解AllowConcurrentEvents,加上這個才能達到真正的異步,這要看底層源碼,下面咱們會來分析。async

2、register方法底層實現

    AsyncEventBus的構造方法以下List-2.1所示ide

    List-2.1post

public AsyncEventBus(Executor executor) {
  super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
}

    來看下register方法的實現,以下圖2.1所示,步驟3中,會找到全部方法上有Subscribe註解的方法,在步驟6中,會判斷方法上是否註解AllowConcurrentEvents,若是有,則返回Subscriber,若是沒有則返回SynchronizedSubscriber。this

      

                  圖2.1 AsyncEventBus的register方法google

    SynchronizedSubscriber和Subscriber的區別以下,SynchronizedSubscriber重複了父類的invokeSubscriberMethod,並加上了鎖關鍵字synchronized,因此List-1.1中的方法上若是沒有註解AllowConcurrentEvents,那麼是不會真正的併發的,我看了網上的例子,不少描述的不全面。spa

    List-2.2code

@VisibleForTesting
static final class SynchronizedSubscriber extends Subscriber {

  private SynchronizedSubscriber(EventBus bus, Object target, Method method) {
    super(bus, target, method);
  }

  @Override
  void invokeSubscriberMethod(Object event) throws InvocationTargetException {
    synchronized (this) {
      super.invokeSubscriberMethod(event);
    }
  }
}

3、post方法底層實現

      

                  圖3.1 AsyncEventBus的post實現

    步驟六、7的代碼以下List-3.1,能夠看到List-2.2中涉及的invokeSubscriberMethod在這裏使用。

    List-3.1

final void dispatchEvent(final Object event) {
  executor.execute(
      new Runnable() {
        @Override
        public void run() {
          try {
            invokeSubscriberMethod(event);
          } catch (InvocationTargetException e) {
            bus.handleSubscriberException(e.getCause(), context(event));
          }
        }
      });
}

    guava提供了三個Dispatcher,上面使用了LegacyAsyncDispatcher,LegacyAsyncDispatcher的dispatch實現以下List-3.2所示,能夠看到,是將event和subscriber放入到ConcurrentLinkedQueue中,以後再從queue中poll出來,再調用subscribe的dispatchEvent方法。爲何先放到queue中,以後在poll出來,這是有考慮的,是爲了應用總體的吞吐量考慮。

    List-3.2

private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
    Queues.newConcurrentLinkedQueue();

@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
  checkNotNull(event);
  while (subscribers.hasNext()) {
    queue.add(new EventWithSubscriber(event, subscribers.next()));
  }

  EventWithSubscriber e;
  while ((e = queue.poll()) != null) {
    e.subscriber.dispatchEvent(e.event);
  }
}

 

Reference:

  1. google guava版本27.0-jre源碼
相關文章
相關標籤/搜索