最近使用guava的eventBus,記錄下。java
List-1.1併發
import com.google.common.eventbus.AllowConcurrentEvents; import com.google.common.eventbus.AsyncEventBus; import com.google.common.eventbus.Subscribe; import org.junit.Before; import org.junit.Test; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class EventBusTest { private AsyncEventBus asyncEventBus; @Before public void before(){ asyncEventBus=new AsyncEventBus(Executors.newFixedThreadPool(3)); asyncEventBus.register(this); } @Subscribe @AllowConcurrentEvents public void subscribe(Object object){ System.out.println("收到:"+object); } @Test public void test_sendMsg() throws InterruptedException { System.out.println("開始發送消息"); asyncEventBus.post("這是消息"); System.out.println("開始睡眠"); TimeUnit.SECONDS.sleep(5L); } }
List-1.1中,方法subscribe是接收者,方法test_sendMsg中post消息後,方法subscribe就會收到消息。這是由於方法subscribe上有註解Subscribe。異步
爲何要在方法subscribe上加上註解AllowConcurrentEvents,加上這個才能達到真正的異步,這要看底層源碼,下面咱們會來分析。async
AsyncEventBus的構造方法以下List-2.1所示ide
List-2.1post
public AsyncEventBus(Executor executor) { super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE); }
來看下register方法的實現,以下圖2.1所示,步驟3中,會找到全部方法上有Subscribe註解的方法,在步驟6中,會判斷方法上是否註解AllowConcurrentEvents,若是有,則返回Subscriber,若是沒有則返回SynchronizedSubscriber。this
圖2.1 AsyncEventBus的register方法google
SynchronizedSubscriber和Subscriber的區別以下,SynchronizedSubscriber重複了父類的invokeSubscriberMethod,並加上了鎖關鍵字synchronized,因此List-1.1中的方法上若是沒有註解AllowConcurrentEvents,那麼是不會真正的併發的,我看了網上的例子,不少描述的不全面。spa
List-2.2code
@VisibleForTesting static final class SynchronizedSubscriber extends Subscriber { private SynchronizedSubscriber(EventBus bus, Object target, Method method) { super(bus, target, method); } @Override void invokeSubscriberMethod(Object event) throws InvocationTargetException { synchronized (this) { super.invokeSubscriberMethod(event); } } }
圖3.1 AsyncEventBus的post實現
步驟六、7的代碼以下List-3.1,能夠看到List-2.2中涉及的invokeSubscriberMethod在這裏使用。
List-3.1
final void dispatchEvent(final Object event) { executor.execute( new Runnable() { @Override public void run() { try { invokeSubscriberMethod(event); } catch (InvocationTargetException e) { bus.handleSubscriberException(e.getCause(), context(event)); } } }); }
guava提供了三個Dispatcher,上面使用了LegacyAsyncDispatcher,LegacyAsyncDispatcher的dispatch實現以下List-3.2所示,能夠看到,是將event和subscriber放入到ConcurrentLinkedQueue中,以後再從queue中poll出來,再調用subscribe的dispatchEvent方法。爲何先放到queue中,以後在poll出來,這是有考慮的,是爲了應用總體的吞吐量考慮。
List-3.2
private final ConcurrentLinkedQueue<EventWithSubscriber> queue = Queues.newConcurrentLinkedQueue(); @Override void dispatch(Object event, Iterator<Subscriber> subscribers) { checkNotNull(event); while (subscribers.hasNext()) { queue.add(new EventWithSubscriber(event, subscribers.next())); } EventWithSubscriber e; while ((e = queue.poll()) != null) { e.subscriber.dispatchEvent(e.event); } }