何爲事件監聽者模式 ?java
第一就是爲啥我強調事件二字 ,由於他是目標 . 在咱們開發中絕對見到過一堆後綴是
Listener
的類, 這個就是監聽者模式, 監聽者模式是一種CS開發架構
,很好的作了一種設計的解耦,監聽者註冊到一個郵局中,訂閱某種事件(提早說好了), 郵局會按需求發佈消息, 監聽者會及時收到消息來處理 . 其中整個Java開發環境中 , JDK已經幫咱們定義好了接口 , Spring就是基於JDK接口下實現的, Guava則是另外一種實現方式, 各有千秋 , 你們看看吧 , 究竟是回調好仍是阻塞, 仍是Guava這種方式呢, 他和觀察者模式有何區別呢 ? 咱們有機會在講spring
事件對象 , 他須要一個事件源 , 用構造函數傳遞的設計模式
public class EventObject implements java.io.Serializable {
protected transient Object source;
public EventObject(Object source) {
if (source == null)
throw new IllegalArgumentException("null source");
this.source = source;
}
........... 其餘省略
}
複製代碼
事件監聽者,他是負責監聽事件的 , JAVA提供的是一個空接口, 讓咱們根絕需求寫安全
public interface EventListener {
}
複製代碼
咱們發現 java 提供的只提供了一個事件對象 ,和一個事件監聽器 ,因此須要咱們遵照這個規範去開發多線程
通常狀況下 都會設置成一個 Object 類型的 , 不須要咱們去設計一個,爲了體現設計模式的角色,咱們就設計了一個架構
@ToString
@Setter
@Getter
public class EventSource {
private String name;
private String info;
}
複製代碼
這裏咱們繼承了 EventObject , 只是簡單的實現了一下 , 並無作過多的包裝app
public class CoreEventObject extends EventObject {
public CoreEventObject(EventSource source) {
super(source);
}
}
複製代碼
這裏咱們真正的監聽者 , 通常狀況下都須要設計成一個 函數式接口 , 我這個是和Spring框架學習的 , 由於函數式接口才能體現回調 ,框架
@FunctionalInterface
public interface CoreEventListener<E extends CoreEventObject> extends EventListener {
void onEventObject(E event);
}
複製代碼
事件發佈者 ,由於沒有發佈的事件對象, 哪來的監聽異步
public class EventPublisher<E extends CoreEventObject> {
private CoreEventListener<E> listener;
public EventPublisher(CoreEventListener<E> listener) {
this.listener = listener;
}
public void publish(E object){
System.out.println("發佈事件 : " + object);
// 傳給 CoreEventListener
listener.onEventObject(object);
}
}
複製代碼
public class TestDemo {
public static void main(String[] args) {
// 1. 建立一個事件發佈者
EventPublisher<CoreEventObject> publisher = new EventPublisher<>(new CoreEventListener<CoreEventObject>() {
@Override
public void onEventObject(CoreEventObject event) {
System.out.println("接收到事件源 : " + event.getSource() + " , 當前線程 : " + Thread.currentThread().getName());
}
});
// 2. 發佈一個事件對象
publisher.publish(getCoreEventObject());
}
private static CoreEventObject getCoreEventObject(){
..... 此處省略
return eventObject;
}
}
複製代碼
輸出結果 :ide
發佈事件 : com.example.listener_design_pattern.CoreEventObject[source=EventSource(name=事件源, info=Sat Nov 09 14:34:50 CST 2019)]
接收到事件源 : EventSource(name=事件源, info=Sat Nov 09 14:34:50 CST 2019) , 當前線程 : main
複製代碼
咱們發現咱們成功的接收到了事件對象 和 事件源 , 這個就是鉤子函數的魅力 . 其實你只是作了一個事件發佈你無意觀察其餘的東西 , 只須要一個監聽者就能夠作到監聽了 , 這樣你的事件發佈 和 監聽 徹底就解耦了 .其實底層就是一個地址引用 .
不少場景下,咱們的發佈事件和監聽事件徹底在兩個線程中,那麼咱們如何拿到事件對象呢 ?
若是咱們簡單使用一下 , 會這麼寫 ?
public class TestEventListener implements CoreEventListener<CoreEventObject> {
private CoreEventObject object;
@Override
public void onEventObject(CoreEventObject object) {
// 賦值給成員變量
this.object = object;
}
// 獲取成員變量
public CoreEventObject getObject() {
return object;
}
}
複製代碼
測試一下 :
public class TestDemo {
public static void main(String[] args) {
TestEventListener listener = new TestEventListener();
CoreEventObject object = listener.getObject();
// 先去拿 ,後去發佈
System.out.println(object.getSource());
EventPublisher<CoreEventObject> publisher = new EventPublisher<>(listener);
publisher.publish(getCoreEventObject());
}
private static CoreEventObject getCoreEventObject(){
....
return eventObject;
}
}
複製代碼
輸出結果
Exception in thread "main" java.lang.NullPointerException
at com.example.listener_design_pattern.TestDemo.main(TestDemo.java:28)
複製代碼
有些人就會說 , 你這不對哇 ,你固然拿不到了 ,由於人家還沒發佈了 ,可是在多線程 ,在解耦的狀況下 ,你哪知道對面什麼時候發佈結束了 , 你再去拿呢 ? 那就須要java的多線程知識了 ,Future 給咱們帶來了提醒 , 就是阻塞的思想 , 只有監聽者真正的收到對象 , 咱們才能去拿 .
瞭解過我前面提到的那一節 FutureTask
是如何實現的 ,我以爲問題就迎刃而解了 .
public class TestEventListener implements CoreEventListener<CoreEventObject> {
private CoreEventObject object;
/** * 當 X = 0 ,表明 obj尚未初始化了 * 當 x = 1 , 表明 obj 以及初始化了 , 已經接收到了 */
private static volatile int x = 0;
@Override
public void onEventObject(CoreEventObject object) {
this.object = object;
// 收到改爲 1
x = 1;
}
public CoreEventObject getObject() {
while (true) {
if (x == 1) {
break;
}
}
// 拿到對象,再設置爲1
x = 0;
return object;
}
}
複製代碼
因爲這個解決方案,會使得執行getObject()
的線程一直的阻塞下去,就是死循環下去,咱們必須一個線程去執行這個方法 ,
public class TestDemo {
public static void main(String[] args) {
TestEventListener listener = new TestEventListener();
// 新建一個線程去接收
Thread thread = new Thread(() -> {
System.out.println("我開始接收對象 : " + System.currentTimeMillis());
CoreEventObject object = listener.getObject();
System.out.println("成功接收對象 : "+object.getSource());
});
thread.start();
// 新建一個線程去發佈
EventPublisher<CoreEventObject> publisher = new EventPublisher<>(listener);
new Thread(()->{
publisher.publish(getCoreEventObject());
}).start();
}
private static CoreEventObject getCoreEventObject(){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
EventSource source = new EventSource();
source.setName("事件源");
source.setInfo("" + System.currentTimeMillis());
return new CoreEventObject(source);
}
}
複製代碼
輸出結果 :
我開始接收對象 : 1573282924555
發佈事件 : com.example.listener_design_pattern.CoreEventObject[source=EventSource(name=事件源, info=1573282925590)]
成功接收對象 : EventSource(name=事件源, info=1573282925590)
複製代碼
咱們咱們接收的時候是在1573282924555的時間搓 , 而真正拿到的對象確實在1573282925590發佈的 , 這個就徹底在倆時間軸上,因此咱們成功的解決了問題 .
Class to be extended by all application events. Abstract as it doesn't make sense for generic events to be published directly.
此類被全部的
application events
所繼承 。抽象的緣由是由於直接發佈這個ApplicationEvent
是沒有意義的。
Interface to be implemented by application event listeners. Based on the standard java.util.EventListener interface for the Observer design pattern.
這個接口被全部的
application event listeners.
所實現 , 基於Java的java.util.EventListener
接口規範
咱們有一個需求就是 ,咱們有一個服務會從遠程不斷的去拉去配置信息 ,一旦有改變就會發布配置信息 .
@ToString
@Setter
@Getter
public class Config {
private String namespace;
private Map<String, Object> info;
}
複製代碼
// 這個註解,咱們是根據Spring源碼看到的 , 因此一致性,我就加了
@SuppressWarnings("serial")
public class ConfigEvent extends ApplicationEvent {
public ConfigEvent(Config source) {
super(source);
}
}
複製代碼
@Component
public class ConfigEventListener implements ApplicationListener<ConfigEvent> , Ordered, InitializingBean {
@Override
public void onApplicationEvent(ConfigEvent event) {
System.out.println("接收到更新信息 : " + event.getSource()+" , 當前線程 : "+Thread.currentThread().getName());
}
// 保證執行順序 , 多個 ConfigEventListener就須要實現這個接口
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
// 初始化之後要作什麼 ?
@Override
public void afterPropertiesSet() throws Exception {
System.out.println("初始化當前ConfigEventListener");
}
}
複製代碼
@Service
public class ConfigServer {
// 注入applicationContext,由於只有他才能夠執行發佈事件
@Autowired
private ApplicationContext applicationContext;
// 這個是開啓異步 ,後面會說到
// @Async
public void publishConfig(){
// 須要發佈 --- > 改變的事件
System.out.println("發佈事件成功 , 當前線程 : "+Thread.currentThread().getName());
applicationContext.publishEvent(getChange());
}
public ConfigEvent getChange(){
Config config = new Config();
config.setNamespace("application");
HashMap<String, Object> conf = new HashMap<>();
conf.put("server.port", 8088);
config.setInfo(conf);
return new ConfigEvent(config);
}
}
複製代碼
@SpringBootApplication
public class SpringListenerApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(SpringListenerApplication.class, args);
}
@Autowired
private ConfigServer server;
@Override
public void run(String... args) throws Exception {
server.publishConfig();
}
}
複製代碼
輸出結果 :
.....
初始化當前ConfigEventListener
....
發佈事件成功 , 當前線程 : main
接收到更新信息 : Config(namespace=application, info={server.port=8088}) , 當前線程 : main
複製代碼
因此一個Spring-Boot
的事件監聽仍是很簡單的 ,類比到 Spring
一個道理,相信懂得人都知道 . 可是又一個問題是咱們的 發佈和監聽都是 main線程 ,很差吧 ,玩意有不少事件了 ?
須要兩個註解 @EnableAsync
啓動Async功能 , 和 @Async
某個方法使用異步執行
發佈事件成功 , 當前線程 : SimpleAsyncTaskExecutor-1
接收到更新信息 : Config(namespace=application, info={server.port=8088}) , 當前線程 : SimpleAsyncTaskExecutor-1
複製代碼
咱們發現就出現了線程池執行 , 這個理的線程池 ,是能夠進行配置的 , 只須要咱們顯式的注入下面這個SimpleAsyncTaskExecutor
Bean 就能夠了
@Bean
public SimpleAsyncTaskExecutor simpleAsyncTaskExecutor() {
SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
// 須要傳入一個 ThreadFactory實現類 , 因此看過我前面寫的文章應該會寫這個,好比 JUC- Executor那節
executor.setThreadFactory(new MyThreadFactory("anthony"));
return executor;
}
複製代碼
輸出結果 :
發佈事件成功 , 當前線程 : anthony-1
接收到更新信息 : Config(namespace=application, info={server.port=8088}) , 當前線程 : anthony-1
複製代碼
你能夠跟我同樣選擇實現 ApplicationListener
和Ordered
,或者你能夠直接實現 SmartApplicationListener
都同樣的哈,沒有哪一個好哪一個很差
監聽器 一 :
@Component
public class ConfigEventListenerStart implements ApplicationListener<ConfigEvent> , Ordered, InitializingBean {
@Override
public void onApplicationEvent(ConfigEvent event) {
System.out.println("ConfigEventListenerStart 接收到更新信息 : " + event.getSource()+" , 當前線程 : "+Thread.currentThread().getName());
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
@Override
public void afterPropertiesSet() throws Exception {
System.out.println("初始化當前監聽器 : " + this.toString());
}
}
複製代碼
監聽器 二 :
@Component
public class ConfigEventListenerEnd implements ApplicationListener<ConfigEvent> , Ordered, InitializingBean {
@Override
public void onApplicationEvent(ConfigEvent event) {
System.out.println("ConfigEventListenerEnd 接收到更新信息 : " + event.getSource()+" , 當前線程 : "+Thread.currentThread().getName());
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE-1;
}
@Override
public void afterPropertiesSet() throws Exception {
System.out.println("初始化當前監聽器 : " + this.toString());
}
}
複製代碼
輸出結果 :
初始化當前監聽器 : com.example.springlistener.listener.ConfigEventListenerEnd@6b54655f
初始化當前監聽器 : com.example.springlistener.listener.ConfigEventListenerStart@665e9289
.....
發佈事件成功 , 當前線程 : anthony-1
ConfigEventListenerStart 接收到更新信息 : Config(namespace=application, info={server.port=8088}) , 當前線程 : anthony-1
ConfigEventListenerEnd 接收到更新信息 : Config(namespace=application, info={server.port=8088}) , 當前線程 : anthony-1
複製代碼
Guava的EventBus 就是一個很好的事件註冊發佈的管理工具 , 他屬於一種推送的模式 , 跟spring的很類似,
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.0-jre</version>
</dependency>
複製代碼
主要的對象就是 ,
EventBus
事件總線, 他是管理全部監聽者(或者叫作訂閱者) ,經過EventBus#register
或者EventBus#unRegister
來管理的 , 同時監聽者要有監聽的事件 , 這裏是基於方法級別的 (注意方法只能有一個參數,就是監聽的事件), 須要在方法上加上@Subscribe
註解來表示監聽 ,EventBus
能夠經過EventBus#post
方法來發布事件 , 對應類型的監聽者就會收到 . 同時EventBus
能夠處理異常
public class QuicklyStart {
public static void main(String[] args) {
// 建立一個 事件總線
EventBus bus = new EventBus(new SubscriberExceptionHandler() {
@Override
public void handleException(Throwable exception, SubscriberExceptionContext context) {
// 處理訂閱者異常信息
System.out.println("異常信息 : "+exception.getMessage() + ", 異常事件 : " + context.getEvent());
}
});
// 註冊你的監聽器 , 其實更加準確來講是訂閱者 , 他屬於一種發佈訂閱模式
bus.register(new EventListener());
// 事件總線發佈事件
bus.post("sb");
// 事件總線發佈事件
bus.post(new Event("hello Guava"));
}
}
/** * 事件源 */
class Event {
String msg;
public Event(String msg) {
this.msg = msg;
}
@Override
public String toString() {
return "Event{" + "msg='" + msg + '\'' + '}';
}
}
/** * 監聽器 */
class EventListener {
/** * {@link Subscribe} 一個這個表明一個訂閱者,EventBus會將符合的事件發佈到對應的訂閱者上 , 可是不支持java的基本數據類型, int 之類的 * * @param event */
@Subscribe
public void onEvent(Event event) {
System.out.println("當前線程 : " + Thread.currentThread().getName() + ", 接收到事件 : " + event);
}
@Subscribe
public void onStringEvent(String event) {
error(); // 模擬異常
System.out.println("當前線程 : " + Thread.currentThread().getName() + ", 接收到事件 : " + event);
}
private void error() {
int i = 1 / 0;
}
}
複製代碼
輸出 :
異常信息 : / by zero, 異常事件 : sb
當前線程 : main, 接收到事件 : Event{msg='hello Guava'}
複製代碼
其實很簡單 , 第一註冊的時候 :
/** Registers all subscriber methods on the given listener object. */
void register(Object listener) {
// 其實就是註解掃描 , 而後把元信息都給整出來
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
Class<?> eventType = entry.getKey();
Collection<Subscriber> eventMethodsInListener = entry.getValue();
// 查找有沒有
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
// 沒有建立一個對象
if (eventSubscribers == null) {
CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
eventSubscribers =
MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
}
// 添加進去 ,實際上是 Subscriber對象 , 把method信息, 對象信息所有封裝進去了
eventSubscribers.addAll(eventMethodsInListener);
}
}
複製代碼
第二就是 發佈
public void post(Object event) {
// 根據事件獲取對應的 Subscriber
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
if (eventSubscribers.hasNext()) {
// 發佈出去
dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
post(new DeadEvent(this, event));
}
}
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
checkNotNull(subscribers);
// 獲取當前線程的隊列 ,用ThreadLocal維護的線程安全, 實際上是爲了安全
Queue<Event> queueForThread = queue.get();
// 建立一個事件對象
queueForThread.offer(new Event(event, subscribers));
if (!dispatching.get()) {
dispatching.set(true);
try {
Event nextEvent;
while ((nextEvent = queueForThread.poll()) != null) {
while (nextEvent.subscribers.hasNext()) {
// 處理
nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
}
}
} finally {
dispatching.remove();
queue.remove();
}
}
}
/** Dispatches {@code event} to this subscriber using the proper executor. */
final void dispatchEvent(final Object event) {
executor.execute(
new Runnable() {
@Override
public void run() {
try {
// 關鍵點 ,就是這
invokeSubscriberMethod(event);
} catch (InvocationTargetException e) {
// 出現異常直接就 ... 調用異常回調方法了
bus.handleSubscriberException(e.getCause(), context(event));
}
}
});
}
@VisibleForTesting
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
try {
// 原來如此 , .........method.invok 真.... 因此能夠抓取異常
method.invoke(target, checkNotNull(event));
} catch (IllegalArgumentException e) {
throw new Error("Method rejected target/argument: " + event, e);
} catch (IllegalAccessException e) {
throw new Error("Method became inaccessible: " + event, e);
} catch (InvocationTargetException e) {
if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
}
throw e;
}
}
複製代碼
因此這個玩意很簡單 , 原理一看就分析出來了
咱們發現咱們的本身實現的監聽者和 Spring
和Guava
這倆種實現有啥區別 , 無非就是咱們本身實現的監聽者模式, 對於 listener 的管理,沒有作 , 咱們只是一個 Publisher 一個 Listener,一對一的關係 , 這樣子就很很差, 100個監聽者就須要100個發佈者 , 不符合設計模式的原則 , 因此參考Guava
,咱們發現他無非作的就是一個對於Listener 的管理 , 可是有一個細節但願你們知道, 對於監聽者模式 , 萬一事件發佈失敗了 , 咱們如何知道, 因此Guava
至少幫咱們作了 , 他不是基於回調機制的, 而是使用了Java
的 Method#invoke
,看需求而定吧 , 只不過回調更加輕量級,