事件驅動模型也就是咱們常說的觀察者,或者發佈-訂閱模型;理解它的幾個關鍵點:java
好比說訂單狀態變化的時候,可以通知到郵件服務,短信服務,積分變化等等; 若是你是個新手,想象一下你去實現這個業務的代碼怎麼去實現?確定是一個OrderService裏面引入積分Service,短信Service,郵件Service,還有不少不少Service,可能還要調用第三方接口。是否是發現問題所在了,Service耦合嚴重,又是還會出現循環引用的問題,代碼超長,以致於不方便維護。spring
從如上例子能夠看出,應該使用一個觀察者來解耦這些Service之間的依賴關係,如圖:
設計模式
圖中增長了一個Listener來解耦OrderService和其餘Service,即註冊成功後,只須要通知相關的監聽器,不須要關心它們如何處理,處理起來很是容易。這就是一個典型的事件處理模型-觀察者模式,解耦目標對象和它的依賴對象,目標只須要通知它的依賴對象,具體怎麼處理,依賴對象本身決定。好比是異步仍是同步,延遲仍是非延遲等。
其實上邊其實也使用了DIP(依賴倒置原則),依賴於抽象,而不是具體。
仍是就是使用了IOC思想,即之前主動去建立它依賴的Service,如今只是被動等待別人註冊進來。
主要目的是:鬆散耦合對象間的一對多的依賴關係。併發
設計模式裏面的觀察者模式app
JDK觀察者模式異步
JavaBean事件驅動ide
spring事件驅動post
......測試
JavaBean規範提供了一種監聽屬性變化的事件驅動模型,提供操做JavaBean屬性的類PropertyChangeSupport,PropertyEditorSupport以及PropertyChangeListener支持,PropertyEditorSupport就是目標,而PropertyChangeListener就是監聽器。ui
具體表明者是ApplicationEvent,其下有一個ApplicationContextEvent,表示Spring容器事件,且其又有以下實現:
注:org.springframework.context.support.AbstractApplicationContext抽象類實現了LifeCycle的start和stop回調併發布ContextStartedEvent和ContextStoppedEvent事件。
事件發佈具體表明者是:ApplicationEventPublisher及ApplicationEventMulticaster。
一、ApplicationContext接口繼承了ApplicationEventPublisher,並在AbstractApplicationContext實現了具體代碼,實際執行是委託給ApplicationEventMulticaster(能夠認爲是多播),咱們經常使用的ApplicationContext都繼承自AbstractApplicationContext,如ClassPathXmlApplicationContext、XmlWebApplicationContext等,因此自動擁有這個功能。
二、ApplicationContext自動到本地容器裏找一個名字爲」applicationEventMulticaster「的ApplicationEventMulticaster實現,若是沒有本身new一個SimpleApplicationEventMulticaster,
監聽器具體表明者是:ApplicationListener。其繼承自JDK的EventListener,JDK要求全部監聽器將繼承它。
1. 它只提供了onApplicationEvent方法,咱們須要在該方法實現內部判斷事件類型來處理,或者指定某一個事件類型(泛型,實現ApplicationListener)。
2. 它沒有提供按順序觸發監聽器的語義,因此Spring提供了另外一個接口SmartApplicationListener,該接口支持判斷的事件類型、目標類型,及執行順序。
因爲在上一篇文章:使用事件驅動進行代碼解耦-Spring篇,已經介紹了基於spring事件方式進行代碼解耦的示例,這裏主要介紹基於google guava來進行示例展現。
在guava中,事件處理器被稱爲事件總線EventBus,具體表明有EventBus類和AsyncEventBus類(異步);事件監聽者被稱爲訂閱者Subscriber。
1. 註冊訂閱者:新建一個事件總線EventBus/AsyncEventBus,就能夠向其中註冊訂閱者,訂閱者其實就是一個被標註了註解@com.google.common.eventbus.Subscribe的方法,向事件總線EventBus/AsyncEventBus註冊訂閱者的代碼邏輯若下:
void register(Object listener) { Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener); for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) { Class<?> eventType = entry.getKey(); Collection<Subscriber> eventMethodsInListener = entry.getValue(); CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType); if (eventSubscribers == null) { CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>(); eventSubscribers = MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet); } eventSubscribers.addAll(eventMethodsInListener); } } private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) { Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create(); Class<?> clazz = listener.getClass(); for (Method method : getAnnotatedMethods(clazz)) { Class<?>[] parameterTypes = method.getParameterTypes(); Class<?> eventType = parameterTypes[0]; methodsInListener.put(eventType, Subscriber.create(bus, listener, method)); } return methodsInListener; } private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) { Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes(); Map<SubscriberRegistry.MethodIdentifier, Method> identifiers = Maps.newHashMap(); for (Class<?> supertype : supertypes) { for (Method method : supertype.getDeclaredMethods()) { if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) { // TODO(cgdecker): Should check for a generic parameter type and error out Class<?>[] parameterTypes = method.getParameterTypes(); checkArgument( parameterTypes.length == 1, "Method %s has @Subscribe annotation but has %s parameters." + "Subscriber methods must have exactly 1 parameter.", method, parameterTypes.length); SubscriberRegistry.MethodIdentifier ident = new SubscriberRegistry.MethodIdentifier(method); if (!identifiers.containsKey(ident)) { identifiers.put(ident, method); } } } } return ImmutableList.copyOf(identifiers.values()); }
從「if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic())」這一句代碼能夠明顯的知道,一個bean向事件總線EventBus/AsyncEventBus進行註冊,註冊的並非bean自己,而是該bean中被全部標註了註解@Subscribe的方法,一個被標註了註解@Subscribe的方法就是一個訂閱者。當有事件發佈到事件總線中,事件總線會遍歷全部的訂閱者進行事件處理。
2. 向事件總線EventBus和AsyncEventBus發佈事件,直接調用事件總線的post方法便可
public void post(Object event) { Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event); if (eventSubscribers.hasNext()) { dispatcher.dispatch(event, eventSubscribers); } else if (!(event instanceof DeadEvent)) { // the event had no subscribers and was not itself a DeadEvent post(new DeadEvent(this, event)); } }
本示例模擬訂單生成與更新時會發送短信與站內信簡單業務場景demo
1. 自定義一些接口以及POJO
/** * 業務事件發佈者 * @author dongsilin * @version 1.0 * @date 2019/1/24 */ public interface BizEventPublisher { /** * 發佈同步事件 * @param eventData */ void publishEvent(Object eventData); /** * 發佈異步事件 * @param eventData */ void publishEventAsync(Object eventData); } /** * 業務事件發佈者 * @author dongsilin * @version 1.0 * @date 2019/1/24 */ public interface BizEventPublisher { /** * 發佈同步事件 * @param eventData */ void publishEvent(Object eventData); /** * 發佈異步事件 * @param eventData */ void publishEventAsync(Object eventData); } /** * 業務事件類型 * @author dongsilin * @version 1.0 * @date 2019/1/24 */ public enum BizEventType { ORDER_CREATE("訂單-建立"), ORDER_UPDATE("訂單-修改"), ; String describe; BizEventType(String describe) { this.describe = describe; } } /** * @author dongsilin * @version 1.0 * @date 2019/1/24 * 測試訂單類 */ @Data public class Order { private long orderId; private long userId; public Order(long orderId, long userId) { this.setOrderId(orderId); this.setUserId(userId); } }
2. 自定義業務數據,包含事件通用數據屬性
/** * @author dongsilin * @version 1.0 * @date 2019/1/24 * 業務事件數據 */ @Data @AllArgsConstructor public class BizEventData<S> implements EventExecutor<S> { private BizEventType eventType; private S data; @Override public void executeEvent(Consumer<S> executor) { executor.accept(data); } public static<S> BizEventData of(BizEventType eventType, S data) { return new BizEventData(eventType, data); } }
3. 業務事件發佈配置管理
/** * @author dongsilin * @version 1.0 * @date 2019/1/24 * 業務事件發佈配置管理 */ @Slf4j @Component public class BizEventPublisherConfiguration implements BizEventPublisher { /** 同步事件總線 */ private EventBus eventBus = new EventBus(); /** 異步事件總線 */ private AsyncEventBus eventBusAsync = new AsyncEventBus( new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() * 2, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadFactoryBuilder().setNameFormat("guava-event-executor-pool-%d").build() ), (Throwable e, SubscriberExceptionContext exceptionContext) -> { log.error("", e); } ); @Override public void publishEvent(Object eventData) { eventBus.post(eventData); } @Override public void publishEventAsync(Object eventData) { eventBusAsync.post(eventData); } /** * 構造器,註冊監聽者 * @param beanFactory */ public BizEventPublisherConfiguration (ListableBeanFactory beanFactory) { // 獲取全部帶有 @BizEventListener 的 bean,將他們註冊爲監聽者 beanFactory.getBeansWithAnnotation(BizEventListener.class) .forEach((beanName, listener) -> { eventBusAsync.register(listener); eventBus.register(listener); }); } }
4. 業務事件監聽配置管理
/** * @author dongsilin * @version 1.0 * @date 2019/1/24 * 業務事件監聽配置管理 */ @Slf4j @Component @BizEventListener public class BizEventListenerConfiguration { @Autowired private TestMsgService msgService; @Subscribe public void executeEvent(BizEventData bizEventData) { switch (bizEventData.getEventType()) { // 訂單建立,發送短信,....... case ORDER_CREATE: bizEventData.executeEvent((data) -> { msgService.sendPhoneMsg((Order) data); });break; // 訂單修改,站內信提醒,....... case ORDER_UPDATE: bizEventData.executeEvent((data) -> { msgService.sendWebMsg((Order) data); });break; default: bizEventData.executeEvent((data) -> { log.info("executeEvent bizEventData = {}", data); }); } } }
5. 定義兩個測試service,TestOrderService 和 TestMsgService
/** * @author dongsilin * @version 1.0 * @date 2019/1/24 */ @Slf4j @Service public class TestOrderService { @Autowired private BizEventPublisher bizEventPublisher; public void create(Order order) { ...... log.info("TestOrderService create order = {}", order); bizEventPublisher.publishEvent(BizEvent.of(BizEventType.ORDER_CREATE, order)); } public void update(Order order) { ...... log.info("TestOrderService update order = {}", order); bizEventPublisher.publishEventAsync(BizEvent.of(BizEventType.ORDER_UPDATE, order)); } } /** * @author dongsilin * @version 1.0 * @date 2019/1/24 */ @Slf4j @Service public class TestMsgService { public void sendPhoneMsg(Order order) { ...... log.info("TestMsgService sendPhoneMsg order = {}", order); } public void sendWebMsg(Order order) { ...... log.info("TestMsgService sendWebMsg order = {}", order); } }
此處TestOrderService 中並無強制依賴注入TestMsgService,而是經過發佈事件的方式將事件數據發佈到事件管理中心,由事件管理者來統一管理事件處理方式,解決了各個業務service嚴重耦合的場景,實現軟件開發中的「高內聚-低耦合」原則。
經過如上,大致瞭解了google guava的事件機制,可使用該機制很是簡單的完成事件流程。