【bird-java】分佈式服務間的事件總線EventBus

什麼是EventBus
EventBus是對發佈-訂閱模式的一種實現。其以一種很是優雅的方式實現了組件間的解耦與通訊,在Android開發、DDD等領域都有很是普遍的應用。
java

事件流大體以下:git

  • Producer向EventBus發送事件。
  • EventBus向全部監聽了該事件的Consumer推送事件。
  • 監聽了該事件的Consumer消費事件。


注:一個組件便可以是Producer,也能夠是Consumer。github

分佈式服務間的EventBus
在分佈式系統中,事件在服務之間的傳遞要比單機EventBus複雜不少。有沒有一種適用於分佈式服務之間的,而且事件傳遞就像單機同樣簡單的EventBus呢?在GitHub上搜索了JAVA實現的EventBus,排名前十的幾乎都是用於Android或JAVA的單機事件總線。良久以後...仍是本身動手吧。集羣環境下的EventBus比單機版須要多考慮一些問題,好比:spring

  1.  服務集羣部署的狀況下,如何保證每一個集羣都可訂閱該事件,且每一個集羣只能消費一次該事件。
  2. 如何實現一個服務內部多個`xxxService`訂閱同一事件。

解決方案:緩存

  1. 使用`kafka`實現集羣間的發佈訂閱(基於`topic`),同一集羣處於同一個kafka的consumer-group來保證每一個集羣只會消費一次該事件。
  2. 服務在啓動時可反射得到全部實現了`IEventHandler<TEventArg>`的類並緩存,服務消費消息時獲取全部註冊了該消息的handler並調用其`HandleEvent`方法。

部分關鍵源碼分佈式

一、事件消息的定義ide

public abstract class EventArg implements IEventArg{

    private Date eventTime;

    public EventArg(){
        eventTime = new Date();
    }

    public Date getEventTime() {
        return eventTime;
    }

    public void setEventTime(Date eventTime) {
        this.eventTime = eventTime;
    }
}

事件消息默認記錄建立時間,可擴展其餘字段,好比發送時間、標識等。ui

二、使用spring-kafka發送消息this

/**
 * kafka事件註冊器,向kafka隊列中push消息
 */
@Component
public class KafkaRegister implements IEventRegister {

    @Autowired(required = false)
    private KafkaTemplate<String,IEventArg> kafkaTemplate;

    /**
     * 事件註冊
     *
     * @param eventArg 事件參數
     */
    @Override
    public void regist(IEventArg eventArg) {
        kafkaTemplate.send(getTopic(eventArg),eventArg);
    }

    /**
     * 獲取kafka的topic
     *
     *
     * @param eventArg
     * @return topic
     */
    private String getTopic(IEventArg eventArg){
        return eventArg.getClass().getName();
    }
}

三、消費kafka消息並執行當前服務中全部訂閱了該消息的事件spa

/**
 * kafka事件監聽器
 */
public class KafkaEventArgListener implements MessageListener<String,EventArg> {

    @Autowired
    private IEventHandlerFactory eventHandlerFactory;

    @Override
    public void onMessage(ConsumerRecord<String, EventArg> consumerRecord) {
        if (consumerRecord == null) return;
        EventArg value = consumerRecord.value();

        Set<IEventHandler> handlers = eventHandlerFactory.getHandlers(value);
        if (handlers == null) return;
        for (IEventHandler handler : handlers) {
            handler.HandleEvent(value);
        }
    }
}

 

EventBus的使用

一、事件的定義。全部事件均繼承於上文EventArg抽象類,示例以下:

public class TestEventArg extends EventArg{
    private String value;

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }
}

 

二、事件發佈。示例代碼:

eventBus.push(new TestEventArg());

 

三、事件訂閱。一個服務發佈事件以後,任何服務中的任何`xxxServiceImpl`類都可訂閱該事件,實現`IEventHandler<TEventArg>`接口便可完成事件的訂閱,示例以下:

public class FormServiceImpl extends AbstractServiceImpl<Form> implements FormService,IEventHandler<TestEventArg> {

    @Override
    public void HandleEvent(TestEventArg eventArg) {
        System.out.println("notify zero======");
    }
}

 總體來講,使用仍是很簡單的,EventBus實現與使用示例收錄於bird-java項目中,項目地址:https://github.com/liuxx001/bird-java

相關文章
相關標籤/搜索