事件驅動,Do you know?

我的理解: 事件驅動(even-driven),字面理解即:由事件去觸發某個或者一系列動做。java

百度百科: 從事件角度說,事件驅動程序的基本結構是由一個事件收集器、一個事件發送器和一個事件處理器組成。  事件收集器專門負責收集全部事件,包括來自用戶的(如鼠標、鍵盤事件等)、來自硬件的(如時鐘事件等)和來自軟件的(如操做系統、應用程序自己等)。  事件發送器負責將收集器收集到的事件分發到目標對象中。  事件處理器作具體的事件響應工做。git

舉個栗子:起牀鬧鐘響了驅動咱們該起牀了,上課鈴聲響了驅動咱們進教室上課、放學鈴聲響了驅動咱們上課結束了能夠愛幹嗎幹嗎去了。github

或許這個栗子符合你,可是對我來講根本不可能🙈,鬧鐘響了壓根不會起牀、不起牀聽不到上課鈴、不去上課也聽不到下課鈴,GGspring

轉入正題,事件驅動的應用無處不在好比:spring、netty、zookeeper、mq,並且事件【event】和監聽器【listener】都是成對存在的,單個存在是沒有意義的,下面手動實現一個事件驅動模型。服務器

那麼事件驅動模型的組成是怎樣的呢?網絡

1、事件驅動模型

先去百度了一張模型圖,噹噹噹當~app

說實話,圖不是本身畫的就是不滿意,可是 MAC 上畫圖工具都找不到好用的,就懶得畫了,把事件收集器 ==> **事件中心 **,事件和監聽器的關係能夠是1:一、1:N、N:M,取決於本身的需求。框架

說下流程:dom

1. 系統啓動,監聽器把本身註冊到事件中心,與某種事件進行綁定
2. 事件發送器發送事件到事件中心,事件中心去查找與處理該事件的監聽器
複製代碼

過程很是簡單,爲何要用事件驅動呢?恰好百度百科有個栗子,看完相信優秀的你就明白了:異步

一般,咱們寫服務器處理模型的程序時,有如下幾種模型:

(1)每收到一個請求,建立一個新的進程,來處理該請求;

(2)每收到一個請求,建立一個新的線程,來處理該請求;

(3)每收到一個請求,放入一個事件列表,讓主進程經過非阻塞I/O方式來處理請求

上面的幾種方式,各有千秋,

第(1)種方法,因爲建立新的進程的開銷比較大,因此,會致使服務器性能比較差,但實現比較簡單。

第(2)種方式,因爲要涉及到線程的同步,有可能會面臨死鎖等問題。

第(3)種方式,在寫應用程序代碼時,邏輯比前面兩種都複雜。

綜合考慮各方面因素,通常廣泛認爲第(3)種方式是大多數網絡服務器採用的方式

實際上,事件驅動模型的核心就是 線程池 !!! 來實現異步非阻塞。

2、Simple實現

OK,到這裏就是動手實踐的過程,好記性不如爛筆頭,讀百遍不如敲一遍。看下工程結構

項目地址忘了放 event-driven

理論部分說到事件和監聽器的關係能夠是1:一、1:N、N:M,這裏基於 spring boot 編寫一個事件:監聽器=1:N的實現,老套路,跟着上面的流程分析走:

  1. 系統啓動,監聽器把本身註冊到事件中心,與某種事件進行綁定
  2. 事件發送器發送事件到事件中心,事件中心去查找與處理該事件的監聽器

寫一個監聽器接口EventListener 和兩個實現類OrderCancelListenerOrderCreateListener

package com.glmapper.event.driven.listener;

/** * 監聽器接口 * @author: Jerry * @date: 2018/7/1 */
public interface EventListener{

    /** * 事件觸發時調用 * * @param event */
    void trigger(OrderEvent event);
}
複製代碼
package com.glmapper.event.driven.listener;

/** * 訂單取消事件監聽 * @author: Jerry * @date: 2018/7/1 */
@Slf4j
@Component
public class OrderCancelListener implements EventListener {

    @Autowired
    private EventCenter eventCenter;

    @PostConstruct
    private void registry() {
        eventCenter.registry(this, OrderCancelEvent.class);
    }

    @Override
    public void trigger(OrderEvent event) {
        log.info("取消訂單,訂單id={}", event.getOrderId());
    }
}
複製代碼
package com.glmapper.event.driven.listener;

/** * 訂單建立事件監聽 * @author: Jerry * @date: 2018/7/1 */
@Slf4j
@Component
public class OrderCreateListener implements EventListener {

    @Autowired
    private EventCenter eventCenter;

    @PostConstruct
    private void registry() {
        eventCenter.registry(this, OrderCreateEvent.class);
    }

    @Override
    public void trigger(OrderEvent event) {
        log.info("建立訂單,訂單id={}", event.getOrderId());
    }
}
複製代碼

@PostConstruct 至關於org.springframework.beans.factory.InitializingBean#afterPropertiesSet的功能,在構造方法完成後會調用@PostConstruct註解的 registry()方法把本身註冊到 EventCenter 事件中心。

重點類 EventCenter 事件中心

package com.glmapper.event.driven;
/** * @author: Jerry * @date: 2018/7/1 */
public class EventCenter {

    /** * 事件類型和監聽器的綁定映射 */
    private final ConcurrentHashMap<Class<?>, List<EventListener>> subscribers = new ConcurrentHashMap<>();

    private final Executor executor;

    public EventCenter(Executor executor) {
        this.executor = executor;
    }
    /** * 綁定 監聽器與事件類型 * * @param eventListener * @param clazz */
    public void registry(EventListener eventListener, Class<?> clazz) {
        List<EventListener> listeners = subscribers.get(clazz);
        if (listeners == null) {
            listeners = new ArrayList<>();
        }
        listeners.add(eventListener);
        subscribers.put(clazz, listeners);
    }
    /** * 向事件中心發送消息 * * @param orderEvent */
    public void post(OrderEvent orderEvent) {
        List<EventListener> listeners = subscribers.get(orderEvent.getClass());
        if (listeners == null || listeners.size() == 0) {
            throw new EventException("找不到該事件的監聽器");
        }
        for (EventListener listener : listeners) {
            //線程池異步處理
            executor.execute(() -> listener.trigger(orderEvent));
        }
    }
}
複製代碼

他的實例化在配置類裏面

package com.glmapper.event.driven;

/** * @author: Jerry * @date: 2018/7/1 */
@Slf4j
@SpringBootApplication
public class EventDrivenApplication {

    public static void main(String[] args) throws InterruptedException {
        ApplicationContext applicationContext = SpringApplication.run(EventDrivenApplication.class, args);
        EventSender eventSender = applicationContext.getBean(EventSender.class);
        while (true) {
            long orderId = ThreadLocalRandom.current().nextLong();
            eventSender.post(new OrderCreateEvent(orderId));
            log.info("有一個新訂單,訂單id={}", orderId);

            orderId = ThreadLocalRandom.current().nextLong();
            eventSender.post(new OrderCancelEvent(orderId));
            log.info("有一個訂單取消,訂單id={}", orderId);

            Thread.sleep(ThreadLocalRandom.current().nextLong(1000, 10000));
        }
    }

    @Bean
    public EventCenter eventCenter() {
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
                .setNameFormat("event-bus-%d").build();
        int corePoolSize = Runtime.getRuntime().availableProcessors();
        int maximumPoolSize = corePoolSize * 2;
        // 建立一個線程池
        Executor pool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 10L,
                TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024),
                namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
        return new EventCenter(pool);
    }
}
複製代碼

系統啓動後,事件發送器不停的向消息中心發送事件,事件中心再把事件委派給對應的監聽器處理

其餘類

剩下的就是事件發送器EventSender、一個事件接口、兩個具體的事件類

package com.glmapper.event.driven.sender;

/** * @author: Jerry * @date: 2018/7/1 */
@Slf4j
@Component
public class EventSender {

    @Autowired
    private EventCenter eventCenter;

    public void post(OrderEvent event) {
        eventCenter.post(event);
    }
}
===========================================分割線=================================================

package com.glmapper.event.driven.event;

/** * @author: Jerry * @date: 2018/7/1 */
public interface OrderEvent {

    Long getOrderId();

    OrderEventType getEventType();
}
===========================================分割線=================================================
package com.glmapper.event.driven.event;

/** * @author: Jerry * @date: 2018/7/1 */
public class OrderCreateEvent implements OrderEvent {

    private Long orderId;

    public OrderCreateEvent(Long orderId) {
        this.orderId = orderId;
    }

    @Override
    public Long getOrderId() {
        return this.orderId;
    }

    @Override
    public OrderEventType getEventType() {
        return OrderEventType.CREATE;
    }
}
===========================================分割線=================================================
package com.glmapper.event.driven.event;

/** * @author: Jerry * @date: 2018/7/1 */
public class OrderCancelEvent implements OrderEvent {

    private Long orderId;

    public OrderCancelEvent(Long orderId) {
        this.orderId = orderId;
    }

    @Override
    public Long getOrderId() {
        return this.orderId;
    }

    @Override
    public OrderEventType getEventType() {
        return OrderEventType.CANCEL;
    }
}
複製代碼

剩下的這些類就很簡單了,不解釋。最後看下結果

固然了,這是一個簡易的事件驅動實現,若是要在框架中實現必然還要考慮更多的因素如:事件中心定義異常處理器,用於消費方處理事件發生的異常等,原本準備考慮實現這些場景的,出於時間限制就。。。或許是太懶了😝,那麼推薦一個好用的事件驅動類 google guavaEventBus ,這是一個考慮很是完善的事件驅動實現了。

可能有人說了如今都用 MQ 了沒人會用這個了,那麼它存在必然有他存在的道理,說實話一些小型項目壓根都不用 MQ,就用這個 EventBus 就可以解決節約成本,話雖如此,爲了便於擴展仍是推薦 MQMQ 是在應用外進行解耦、經過網絡傳輸,EventBus在應用內實現解耦、直接在同一個虛擬機中完成,沒必要考慮網絡不可用的問題。EventBus仍是很是有學習參考意義的

終於寫完了,感受得睡到下午兩三點了。。。。

相關文章
相關標籤/搜索