我的理解: 事件驅動(even-driven),字面理解即:由事件去觸發某個或者一系列動做。java
百度百科: 從事件角度說,事件驅動程序的基本結構是由一個事件收集器、一個事件發送器和一個事件處理器組成。 事件收集器專門負責收集全部事件,包括來自用戶的(如鼠標、鍵盤事件等)、來自硬件的(如時鐘事件等)和來自軟件的(如操做系統、應用程序自己等)。 事件發送器負責將收集器收集到的事件分發到目標對象中。 事件處理器作具體的事件響應工做。git
舉個栗子:起牀鬧鐘響了驅動咱們該起牀了,上課鈴聲響了驅動咱們進教室上課、放學鈴聲響了驅動咱們上課結束了能夠愛幹嗎幹嗎去了。
github
或許這個栗子符合你,可是對我來講根本不可能🙈,鬧鐘響了壓根不會起牀、不起牀聽不到上課鈴、不去上課也聽不到下課鈴,GGspring
轉入正題,事件驅動的應用無處不在好比:spring、netty、zookeeper、mq,並且事件【event】和監聽器【listener】都是成對存在的,單個存在是沒有意義的,下面手動實現一個事件驅動模型。服務器
那麼事件驅動模型的組成是怎樣的呢?網絡
先去百度了一張模型圖,噹噹噹當~app
說實話,圖不是本身畫的就是不滿意,可是 MAC
上畫圖工具都找不到好用的,就懶得畫了,把事件收集器 ==> **事件中心 **,事件和監聽器的關係能夠是1:一、1:N、N:M
,取決於本身的需求。框架
說下流程:dom
1. 系統啓動,監聽器把本身註冊到事件中心,與某種事件進行綁定
2. 事件發送器發送事件到事件中心,事件中心去查找與處理該事件的監聽器
複製代碼
過程很是簡單,爲何要用事件驅動呢?恰好百度百科有個栗子,看完相信優秀的你就明白了:異步
一般,咱們寫服務器處理模型的程序時,有如下幾種模型:
(1)每收到一個請求,建立一個新的進程,來處理該請求;
(2)每收到一個請求,建立一個新的線程,來處理該請求;
(3)每收到一個請求,放入一個事件列表,讓主進程經過非阻塞I/O方式來處理請求
上面的幾種方式,各有千秋,
第(1)種方法,因爲建立新的進程的開銷比較大,因此,會致使服務器性能比較差,但實現比較簡單。
第(2)種方式,因爲要涉及到線程的同步,有可能會面臨死鎖等問題。
第(3)種方式,在寫應用程序代碼時,邏輯比前面兩種都複雜。
綜合考慮各方面因素,通常廣泛認爲第(3)種方式是大多數網絡服務器採用的方式
實際上,事件驅動模型的核心就是 線程池 !!! 來實現異步非阻塞。
OK,到這裏就是動手實踐的過程,好記性不如爛筆頭,讀百遍不如敲一遍。看下工程結構
項目地址忘了放 event-driven理論部分說到事件和監聽器的關係能夠是1:一、1:N、N:M
,這裏基於 spring boot 編寫一個事件:監聽器=1:N
的實現,老套路,跟着上面的流程分析走:
- 系統啓動,監聽器把本身註冊到事件中心,與某種事件進行綁定
- 事件發送器發送事件到事件中心,事件中心去查找與處理該事件的監聽器
EventListener
和兩個實現類OrderCancelListener
、OrderCreateListener
package com.glmapper.event.driven.listener;
/** * 監聽器接口 * @author: Jerry * @date: 2018/7/1 */
public interface EventListener{
/** * 事件觸發時調用 * * @param event */
void trigger(OrderEvent event);
}
複製代碼
package com.glmapper.event.driven.listener;
/** * 訂單取消事件監聽 * @author: Jerry * @date: 2018/7/1 */
@Slf4j
@Component
public class OrderCancelListener implements EventListener {
@Autowired
private EventCenter eventCenter;
@PostConstruct
private void registry() {
eventCenter.registry(this, OrderCancelEvent.class);
}
@Override
public void trigger(OrderEvent event) {
log.info("取消訂單,訂單id={}", event.getOrderId());
}
}
複製代碼
package com.glmapper.event.driven.listener;
/** * 訂單建立事件監聽 * @author: Jerry * @date: 2018/7/1 */
@Slf4j
@Component
public class OrderCreateListener implements EventListener {
@Autowired
private EventCenter eventCenter;
@PostConstruct
private void registry() {
eventCenter.registry(this, OrderCreateEvent.class);
}
@Override
public void trigger(OrderEvent event) {
log.info("建立訂單,訂單id={}", event.getOrderId());
}
}
複製代碼
@PostConstruct
至關於org.springframework.beans.factory.InitializingBean#afterPropertiesSet
的功能,在構造方法完成後會調用@PostConstruct
註解的 registry()
方法把本身註冊到 EventCenter
事件中心。
EventCenter
事件中心package com.glmapper.event.driven;
/** * @author: Jerry * @date: 2018/7/1 */
public class EventCenter {
/** * 事件類型和監聽器的綁定映射 */
private final ConcurrentHashMap<Class<?>, List<EventListener>> subscribers = new ConcurrentHashMap<>();
private final Executor executor;
public EventCenter(Executor executor) {
this.executor = executor;
}
/** * 綁定 監聽器與事件類型 * * @param eventListener * @param clazz */
public void registry(EventListener eventListener, Class<?> clazz) {
List<EventListener> listeners = subscribers.get(clazz);
if (listeners == null) {
listeners = new ArrayList<>();
}
listeners.add(eventListener);
subscribers.put(clazz, listeners);
}
/** * 向事件中心發送消息 * * @param orderEvent */
public void post(OrderEvent orderEvent) {
List<EventListener> listeners = subscribers.get(orderEvent.getClass());
if (listeners == null || listeners.size() == 0) {
throw new EventException("找不到該事件的監聽器");
}
for (EventListener listener : listeners) {
//線程池異步處理
executor.execute(() -> listener.trigger(orderEvent));
}
}
}
複製代碼
他的實例化在配置類裏面
package com.glmapper.event.driven;
/** * @author: Jerry * @date: 2018/7/1 */
@Slf4j
@SpringBootApplication
public class EventDrivenApplication {
public static void main(String[] args) throws InterruptedException {
ApplicationContext applicationContext = SpringApplication.run(EventDrivenApplication.class, args);
EventSender eventSender = applicationContext.getBean(EventSender.class);
while (true) {
long orderId = ThreadLocalRandom.current().nextLong();
eventSender.post(new OrderCreateEvent(orderId));
log.info("有一個新訂單,訂單id={}", orderId);
orderId = ThreadLocalRandom.current().nextLong();
eventSender.post(new OrderCancelEvent(orderId));
log.info("有一個訂單取消,訂單id={}", orderId);
Thread.sleep(ThreadLocalRandom.current().nextLong(1000, 10000));
}
}
@Bean
public EventCenter eventCenter() {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("event-bus-%d").build();
int corePoolSize = Runtime.getRuntime().availableProcessors();
int maximumPoolSize = corePoolSize * 2;
// 建立一個線程池
Executor pool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 10L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024),
namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
return new EventCenter(pool);
}
}
複製代碼
系統啓動後,事件發送器不停的向消息中心發送事件,事件中心再把事件委派給對應的監聽器處理
剩下的就是事件發送器EventSender
、一個事件接口、兩個具體的事件類
package com.glmapper.event.driven.sender;
/** * @author: Jerry * @date: 2018/7/1 */
@Slf4j
@Component
public class EventSender {
@Autowired
private EventCenter eventCenter;
public void post(OrderEvent event) {
eventCenter.post(event);
}
}
===========================================分割線=================================================
package com.glmapper.event.driven.event;
/** * @author: Jerry * @date: 2018/7/1 */
public interface OrderEvent {
Long getOrderId();
OrderEventType getEventType();
}
===========================================分割線=================================================
package com.glmapper.event.driven.event;
/** * @author: Jerry * @date: 2018/7/1 */
public class OrderCreateEvent implements OrderEvent {
private Long orderId;
public OrderCreateEvent(Long orderId) {
this.orderId = orderId;
}
@Override
public Long getOrderId() {
return this.orderId;
}
@Override
public OrderEventType getEventType() {
return OrderEventType.CREATE;
}
}
===========================================分割線=================================================
package com.glmapper.event.driven.event;
/** * @author: Jerry * @date: 2018/7/1 */
public class OrderCancelEvent implements OrderEvent {
private Long orderId;
public OrderCancelEvent(Long orderId) {
this.orderId = orderId;
}
@Override
public Long getOrderId() {
return this.orderId;
}
@Override
public OrderEventType getEventType() {
return OrderEventType.CANCEL;
}
}
複製代碼
剩下的這些類就很簡單了,不解釋。最後看下結果
固然了,這是一個簡易的事件驅動實現,若是要在框架中實現必然還要考慮更多的因素如:事件中心定義異常處理器,用於消費方處理事件發生的異常等,原本準備考慮實現這些場景的,出於時間限制就。。。或許是太懶了😝,那麼推薦一個好用的事件驅動類 google guava
的 EventBus
,這是一個考慮很是完善的事件驅動實現了。
可能有人說了如今都用 MQ
了沒人會用這個了,那麼它存在必然有他存在的道理,說實話一些小型項目壓根都不用 MQ
,就用這個 EventBus
就可以解決節約成本,話雖如此,爲了便於擴展仍是推薦 MQ
。MQ
是在應用外進行解耦、經過網絡傳輸,EventBus
在應用內實現解耦、直接在同一個虛擬機中完成,沒必要考慮網絡不可用的問題。EventBus
仍是很是有學習參考意義的
終於寫完了,感受得睡到下午兩三點了。。。。