使用事件驅動進行代碼解耦-Guava篇

什麼是事件驅動模型

事件驅動模型也就是咱們常說的觀察者,或者發佈-訂閱模型;理解它的幾個關鍵點:java

  1. 首先是一種對象間的一對多的依賴關係;
  2. 當一個對象的狀態發生變化時,觀察者(訂閱者)都獲得通知並作相應的處理;
  3. 觀察者如何處理,目標無需干涉,鬆散耦合了它們之間的關係。

常見問題

好比說訂單狀態變化的時候,可以通知到郵件服務,短信服務,積分變化等等; 若是你是個新手,想象一下你去實現這個業務的代碼怎麼去實現?確定是一個OrderService裏面引入積分Service,短信Service,郵件Service,還有不少不少Service,可能還要調用第三方接口。是否是發現問題所在了,Service耦合嚴重,又是還會出現循環引用的問題,代碼超長,以致於不方便維護。spring

從如上例子能夠看出,應該使用一個觀察者來解耦這些Service之間的依賴關係,如圖:
設計模式

圖中增長了一個Listener來解耦OrderService和其餘Service,即註冊成功後,只須要通知相關的監聽器,不須要關心它們如何處理,處理起來很是容易。這就是一個典型的事件處理模型-觀察者模式,解耦目標對象和它的依賴對象,目標只須要通知它的依賴對象,具體怎麼處理,依賴對象本身決定。好比是異步仍是同步,延遲仍是非延遲等。
其實上邊其實也使用了DIP(依賴倒置原則),依賴於抽象,而不是具體。
仍是就是使用了IOC思想,即之前主動去建立它依賴的Service,如今只是被動等待別人註冊進來。
主要目的是:鬆散耦合對象間的一對多的依賴關係。併發

經常使用事件驅動模型

  1. 設計模式裏面的觀察者模式app

  2. JDK觀察者模式異步

  3. JavaBean事件驅動ide

  4. spring事件驅動post

  5. ......測試

JavaBean事件驅動

JavaBean規範提供了一種監聽屬性變化的事件驅動模型,提供操做JavaBean屬性的類PropertyChangeSupport,PropertyEditorSupport以及PropertyChangeListener支持,PropertyEditorSupport就是目標,而PropertyChangeListener就是監聽器。ui

Spring提供的事件

具體表明者是ApplicationEvent,其下有一個ApplicationContextEvent,表示Spring容器事件,且其又有以下實現:

  • ContextStartedEvent:Spring容器啓動後觸發的事件;
  • ContextStoppedEvent:Spring容器中止後觸發的事件;
  • ContextRefreshedEvent:Spring容器初始化或刷新完成後觸發的事件;
  • ContextClosedEvent:Spring容器關閉後觸發的事件;

注:org.springframework.context.support.AbstractApplicationContext抽象類實現了LifeCycle的start和stop回調併發布ContextStartedEvent和ContextStoppedEvent事件。

事件發佈具體表明者是:ApplicationEventPublisher及ApplicationEventMulticaster。
一、ApplicationContext接口繼承了ApplicationEventPublisher,並在AbstractApplicationContext實現了具體代碼,實際執行是委託給ApplicationEventMulticaster(能夠認爲是多播),咱們經常使用的ApplicationContext都繼承自AbstractApplicationContext,如ClassPathXmlApplicationContext、XmlWebApplicationContext等,因此自動擁有這個功能。
二、ApplicationContext自動到本地容器裏找一個名字爲」applicationEventMulticaster「的ApplicationEventMulticaster實現,若是沒有本身new一個SimpleApplicationEventMulticaster,

監聽器具體表明者是:ApplicationListener。其繼承自JDK的EventListener,JDK要求全部監聽器將繼承它。
1. 它只提供了onApplicationEvent方法,咱們須要在該方法實現內部判斷事件類型來處理,或者指定某一個事件類型(泛型,實現ApplicationListener)。
2. 它沒有提供按順序觸發監聽器的語義,因此Spring提供了另外一個接口SmartApplicationListener,該接口支持判斷的事件類型、目標類型,及執行順序。

因爲在上一篇文章:使用事件驅動進行代碼解耦-Spring篇,已經介紹了基於spring事件方式進行代碼解耦的示例,這裏主要介紹基於google guava來進行示例展現。

Guava事件機制

在guava中,事件處理器被稱爲事件總線EventBus,具體表明有EventBus類和AsyncEventBus類(異步);事件監聽者被稱爲訂閱者Subscriber。

1. 註冊訂閱者:新建一個事件總線EventBus/AsyncEventBus,就能夠向其中註冊訂閱者,訂閱者其實就是一個被標註了註解@com.google.common.eventbus.Subscribe的方法,向事件總線EventBus/AsyncEventBus註冊訂閱者的代碼邏輯若下:

void register(Object listener) {
    Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);

    for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
        Class<?> eventType = entry.getKey();
        Collection<Subscriber> eventMethodsInListener = entry.getValue();

        CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);

        if (eventSubscribers == null) {
            CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>();
            eventSubscribers =
                MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
        }

        eventSubscribers.addAll(eventMethodsInListener);
    }
}

private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
    Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
    Class<?> clazz = listener.getClass();
    for (Method method : getAnnotatedMethods(clazz)) {
        Class<?>[] parameterTypes = method.getParameterTypes();
        Class<?> eventType = parameterTypes[0];
        methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
    }
    return methodsInListener;
}

private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) {
    Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();
    Map<SubscriberRegistry.MethodIdentifier, Method> identifiers = Maps.newHashMap();
    for (Class<?> supertype : supertypes) {
        for (Method method : supertype.getDeclaredMethods()) {
            if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) {
                // TODO(cgdecker): Should check for a generic parameter type and error out
                Class<?>[] parameterTypes = method.getParameterTypes();
                checkArgument(
                    parameterTypes.length == 1,
                    "Method %s has @Subscribe annotation but has %s parameters."
                        + "Subscriber methods must have exactly 1 parameter.",
                    method,
                    parameterTypes.length);

                SubscriberRegistry.MethodIdentifier ident = new SubscriberRegistry.MethodIdentifier(method);
                if (!identifiers.containsKey(ident)) {
                    identifiers.put(ident, method);
                }
            }
        }
    }
    return ImmutableList.copyOf(identifiers.values());
}

從「if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic())」這一句代碼能夠明顯的知道,一個bean向事件總線EventBus/AsyncEventBus進行註冊,註冊的並非bean自己,而是該bean中被全部標註了註解@Subscribe的方法,一個被標註了註解@Subscribe的方法就是一個訂閱者。當有事件發佈到事件總線中,事件總線會遍歷全部的訂閱者進行事件處理。
2. 向事件總線EventBus和AsyncEventBus發佈事件,直接調用事件總線的post方法便可

public void post(Object event) {
    Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
    if (eventSubscribers.hasNext()) {
        dispatcher.dispatch(event, eventSubscribers);
    } else if (!(event instanceof DeadEvent)) {
        // the event had no subscribers and was not itself a DeadEvent
        post(new DeadEvent(this, event));
    }
}

 

Guava事件機制示例

本示例模擬訂單生成與更新時會發送短信與站內信簡單業務場景demo

1. 自定義一些接口以及POJO

/**
 * 業務事件發佈者
 * @author dongsilin
 * @version 1.0
 * @date 2019/1/24
 */
public interface BizEventPublisher {

     /**
      * 發佈同步事件
      * @param eventData
      */
     void publishEvent(Object eventData);

     /**
      * 發佈異步事件
      * @param eventData
      */
     void publishEventAsync(Object eventData);

}


/**
 * 業務事件發佈者
 * @author dongsilin
 * @version 1.0
 * @date 2019/1/24
 */
public interface BizEventPublisher {

     /**
      * 發佈同步事件
      * @param eventData
      */
     void publishEvent(Object eventData);

     /**
      * 發佈異步事件
      * @param eventData
      */
     void publishEventAsync(Object eventData);

}

/**
 * 業務事件類型
 * @author dongsilin
 * @version 1.0
 * @date 2019/1/24
 */
public enum BizEventType {

    ORDER_CREATE("訂單-建立"),
    ORDER_UPDATE("訂單-修改"),
    ;

    String describe;
    BizEventType(String describe) {
        this.describe = describe;
    }

}

/**
 * @author dongsilin
 * @version 1.0
 * @date 2019/1/24
 * 測試訂單類
 */
@Data
public class Order {
    private long orderId;
    private long userId;
    public Order(long orderId, long userId) {
        this.setOrderId(orderId);
        this.setUserId(userId);
    }
}

2. 自定義業務數據,包含事件通用數據屬性

/**
 * @author dongsilin
 * @version 1.0
 * @date 2019/1/24
 * 業務事件數據
 */
@Data
@AllArgsConstructor
public class BizEventData<S> implements EventExecutor<S> {

    private BizEventType eventType;
    private S data;

    @Override
    public void executeEvent(Consumer<S> executor) {
        executor.accept(data);
    }

    public static<S> BizEventData of(BizEventType eventType, S data) {
        return new BizEventData(eventType, data);
    }

}

3. 業務事件發佈配置管理

/**
 * @author dongsilin
 * @version 1.0
 * @date 2019/1/24
 * 業務事件發佈配置管理
 */
@Slf4j
@Component
public class BizEventPublisherConfiguration implements BizEventPublisher {

    /** 同步事件總線 */
    private EventBus eventBus = new EventBus();
    /** 異步事件總線 */
    private AsyncEventBus eventBusAsync = new AsyncEventBus(
        new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors() * 2,
            5L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(),
            new ThreadFactoryBuilder().setNameFormat("guava-event-executor-pool-%d").build()
        ),
        (Throwable e, SubscriberExceptionContext exceptionContext) -> {
            log.error("", e);
        }
    );

    @Override
    public void publishEvent(Object eventData) {
        eventBus.post(eventData);
    }

    @Override
    public void publishEventAsync(Object eventData) {
        eventBusAsync.post(eventData);
    }

    /**
     * 構造器,註冊監聽者
     * @param beanFactory
     */
    public BizEventPublisherConfiguration (ListableBeanFactory beanFactory) {
        // 獲取全部帶有 @BizEventListener 的 bean,將他們註冊爲監聽者
        beanFactory.getBeansWithAnnotation(BizEventListener.class)
            .forEach((beanName, listener) -> {
                eventBusAsync.register(listener);
                eventBus.register(listener);
            });
    }

}

4. 業務事件監聽配置管理

/**
 * @author dongsilin
 * @version 1.0
 * @date 2019/1/24
 * 業務事件監聽配置管理
 */
@Slf4j
@Component
@BizEventListener
public class BizEventListenerConfiguration {

    @Autowired
    private TestMsgService msgService;

    @Subscribe
    public void executeEvent(BizEventData bizEventData) {
        switch (bizEventData.getEventType()) {
            // 訂單建立,發送短信,.......
            case ORDER_CREATE: bizEventData.executeEvent((data) -> {
                msgService.sendPhoneMsg((Order) data);
            });break;
            // 訂單修改,站內信提醒,.......
            case ORDER_UPDATE: bizEventData.executeEvent((data) -> {
                msgService.sendWebMsg((Order) data);
            });break;

            default: bizEventData.executeEvent((data) -> {
                log.info("executeEvent bizEventData = {}", data);
            });
        }
    }

}

5. 定義兩個測試service,TestOrderService 和 TestMsgService

/**
 * @author dongsilin
 * @version 1.0
 * @date 2019/1/24
 */
@Slf4j
@Service
public class TestOrderService {

    @Autowired
    private BizEventPublisher bizEventPublisher;

    public void create(Order order) {
        ......
        log.info("TestOrderService create order = {}", order);
        bizEventPublisher.publishEvent(BizEvent.of(BizEventType.ORDER_CREATE, order));
    }

    public void update(Order order) {
        ......
        log.info("TestOrderService update order = {}", order);
        bizEventPublisher.publishEventAsync(BizEvent.of(BizEventType.ORDER_UPDATE, order));
    }

}

/**
 * @author dongsilin
 * @version 1.0
 * @date 2019/1/24
 */
@Slf4j
@Service
public class TestMsgService {


    public void sendPhoneMsg(Order order) {
        ......
        log.info("TestMsgService sendPhoneMsg order = {}", order);
    }

    public void sendWebMsg(Order order) {
        ......
        log.info("TestMsgService sendWebMsg order = {}", order);
    }
}

此處TestOrderService 中並無強制依賴注入TestMsgService,而是經過發佈事件的方式將事件數據發佈到事件管理中心,由事件管理者來統一管理事件處理方式,解決了各個業務service嚴重耦合的場景,實現軟件開發中的「高內聚-低耦合」原則。

經過如上,大致瞭解了google guava的事件機制,可使用該機制很是簡單的完成事件流程。

相關文章
相關標籤/搜索