【Java】事件驅動模型和觀察者模式

你有一件事情,作這件事情的過程包含了許多職責單一的子過程。這樣的狀況及其常見。當這些子過程有以下特色時,咱們應該考慮設計一種合適的框架,讓框架來完成一些業務無關的事情,從而使得各個子過程的開發能夠專一於本身的業務。java

  •  這些子過程有必定的執行次序;
  •  這些子過程之間須要較靈活的跳轉;
  •  這些子過程也許須要圍繞同一個上下文作操做;

此時能夠考慮使用事件驅動的方式來組織這些子過程,此時這些子過程能夠被稱之爲事件處理器(或監聽器),而將事件處理器組織起來的管理者,叫作事件中心。最顯而易見的實現方式,是觀察者模式,或者監聽者模式。做爲一個例子,考慮一個消息轉發系統,它從上游接收消息,而後轉發給正確的下游用戶。整個過程能夠拆分爲消息解析、消息存儲、消息發送等步驟。spring

 

事件Event安全

首先定義事件Event。事件將做爲一個基本元素,在處理器和事件中心之間創建其連線。這裏爲了可以統一處理異常。以及針對異常打出日誌,除了業務相關的事件,還增長了異常事件和日誌事件。固然相應的也應該新增與之對應的事件處理器。框架

 1 package me.test.eventcenter;
 2 
 3 /**
 4  * Created by chng on 2015/12/18.
 5  */
 6 public class EventName {
 7 
 8     private final String name;
 9     public EventName(String name) {
10         this.name = name;
11     }
12 
13     public static EventName msg_received = new EventName("msg_received");
14     public static EventName msg_resolved = new EventName("msg_resolved");
15     public static EventName msg_stored = new EventName("msg_stored");
16     public static EventName msg_pushed = new EventName("msg_pushed");
17     public static EventName exception_occured = new EventName("exception_occured");
18     public static EventName end_and_log = new EventName("end_and_log");
19 
20     public String getName() {
21         return name;
22     }
23 }

 

事件處理器 EventHandleride

隨後,定義一個簡單的事件處理器的抽象類,其中包含一個單例的事件中心,每一個處理器經過持有這個事件中心來執行註冊本身(即訂閱一個事件)和呼起下一個事件的操做。測試

package me.test.eventcenter.handler;

import me.test.eventcenter.EventCenter;
import org.springframework.beans.factory.InitializingBean;

import javax.annotation.Resource;

/**
 * Created by chng on 2015/12/18.
 */
public abstract class EventHandler implements InitializingBean {
    @Resource
    EventCenter eventCenter;

    public abstract void handle(Object ... param);
}

 

事件中心 EventCenterthis

有了事件和事件處理器,接下來定義一個事件中心,將兩者粘起來。google

package me.test.eventcenter;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import me.test.eventcenter.handler.EventHandler;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.List;
import java.util.Map;

/**
 * Created by chng on 2015/12/18.
 */
@Component
public class EventCenter {

    private Map<EventName, List<EventHandler>> regTable = Maps.newHashMap();

    /**
     * 向事件中心廣播一個時間,驅使事件中心執行該事件的處理器
     * @param eventName
     * @param param
     */
    public void fire(EventName eventName, Object ... param) {
        System.out.println(eventName.getName());
        List<EventHandler> handlerList = regTable.get(eventName);
        if(CollectionUtils.isEmpty(handlerList)) {
            // log
            return;
        }
        for(EventHandler handler: handlerList) {
            try {
                handler.handle(param);
            } catch (Exception e) {
                fire(EventName.exception_occured, e);
            }
        }
    }

    /**
     * 將本身註冊爲事件中心的某個事件的處理器
     * @param eventName
     * @param handler
     */
    public void register(EventName eventName, EventHandler handler) {

        List<EventHandler> handlerList = regTable.get(eventName);
        if(null == handlerList) {
            handlerList = Lists.newLinkedList();
        }

        handlerList.add(handler);
        regTable.put(eventName, handlerList);
    }
}

在事件中心中,事件和處理器之間的關係表示爲一個HashMap,每一個事件能夠被多個處理器監聽,而一個處理器只能監聽一個事件(這樣的關係並不是是固定的,也可在運行時動態地改變)。當呼起一個事件時,事件中心找到該事件的監聽者,逐個調用他們的處理方法。將各子模塊的執行集中在這裏管理,還有兩個額外的好處:spa

1 若是發生異常,則呼起異常處理器。這樣,一旦業務模塊發生了不起不終止整個過程的時候,不須要本身寫try/catch子句,而只須要將異常往上拋,直到拋給框架層,由它來作這些統一的事情。然而這並不意味着各業務模塊不折不扣地擺脫了難看的try/catch/finally,運行時發生的異常被catch後,並不是均可以直接END,何去何從仍然視狀況而定,直接將異常吞掉也何嘗不可能。線程

2 打日誌的活兒交給EventCenter就行了,沒人比它更清楚當前執行到了哪一步。而各子模塊裏面,能夠省去許多散佈在各處的日誌語句。對於散彈式日誌的問題,解決方法不止一種,AOP也是個不錯的選擇。

 

測試

爲了讓整個過程跑起來,咱們只須要發起一個初始的事件,將全部的事件處理器都依次驅動起來:

/**
 * Created by OurEDA on 2015/12/18.
 */
public class TestEventCenter extends BaseTest {

    @Resource
    EventCenter eventCenter;

    @Test
    public void test() {
        RawMessage rawMessage = new RawMessage("NotifyType: amq");
        rawMessage.setType(RawMessage.MessageType.amq);
        eventCenter.fire(EventName.msg_received, notify);
    }
}

以測試經過爲目標,咱們開始定義一系列的EventHandler,並將這些Handler註冊到合適的事件上。例如一個消息解析的Handler,對msg_receive事件感興趣,解析完成後將發起msg_store事件,那麼: 

package me.test.eventcenter.handler;

import me.test.eventcenter.*;
import me.test.messagedo.Message;
import me.test.messagedo.RawMessage;
import me.test.resolvers.MsgResolverList;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * Created by chng on 2015/12/18.
 */
@Component
public class MsgResolveHandler extends EventHandler implements InitializingBean {

    @Resource
    private MsgResolverList resolverList;

    @Override
    public void handle(Object... param) {

        /**
         * Resolver
         */
        RawMessage rm = (RawMessage) param[0];
        Message message = resolverList.resolve(rm);
        eventCenter.fire(EventName.msg_resolved, message);
    }

    public void afterPropertiesSet() throws Exception {
        eventCenter.register(EventName.msg_received, this);
    }
}

能夠看到,對象在初始階段把本身(this)註冊到了事件中內心。handler方法則只關心如何解析消息,不須要關係別的事情。針對不一樣類型的消息,解析器能夠寫成Map的形式,一種類型對應一個解析器;若是消息的分類比較複雜,還能夠寫成職責鏈的形式固然這都可有可無,咱們須要知道的是,這個模塊只解析消息,與其餘子模塊之間是徹底解耦的。

 

例如,一種可能的解析器組合體是這樣的:

MsgResolver.java (interface)
package me.test.resolvers;

import me.test.messagedo.Message;
import me.test.messagedo.RawMessage;

/**
 * Created by OurEDA on 2015/12/18.
 */
public interface MsgResolver {

    public boolean canResolve(RawMessage rm);

    public Message resolve(RawMessage rm);

}
MsgResolverList.java
package me.test.resolvers;

import me.test.messagedo.Message;
import me.test.messagedo.RawMessage;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * Created by chng on 2015/12/18.
 */
@Component
public class MsgResolverList implements MsgResolver{

    //職責鏈
    private List<MsgResolver> resolvers;
    public List<MsgResolver> getResolvers() {
        return resolvers;
    }
    public void setResolvers(List<MsgResolver> resolvers) {
        this.resolvers = resolvers;
    }

    public boolean canResolve(RawMessage rawMessage) {
        return true;
    }

    public Message resolve(RawMessage rawMessage) {
        for(MsgResolver resolver: resolvers) {
            if(resolver.canResolve(rawMessage)) {
                System.out.println("NotifyType: "+rawMessage.type);
                return resolver.resolve(rawMessage);
            }
        }
        return null;
    }
}

 沒必要額外打日誌,用例的輸出是這樣的:

哪一步出了問題,出了什麼問題,統統一目瞭然。

 

其餘:

1 上下文 Context

各個處理器都圍繞一個上下文作處理,此例爲了體現通用性,上下文直接用Object表示。在實際的場景下,則須要一個統一的結構體。不一樣的Handler將對該統一上下文的不一樣內容感興趣。

 

2 線程封閉 ThreadLocal

當有多個線程都在事件中心中進行週轉時,還須要考慮線程安全問題,保證線程的調度不會對事件處理器的呼起次序形成干擾。所以整個事件中心和上下文,都須要作隔離。

 

3 反思

上面這種寫法有兩個明確的缺點:事件的註冊操做寫死在每一個處理器的初始化代碼中,一來缺少靈活性,二來對於各Handler是如何組織起來的,沒有一個統一而清晰的bigmap。

相關文章
相關標籤/搜索