什麼是EventBus
EventBus是對發佈-訂閱模式的一種實現。其以一種很是優雅的方式實現了組件間的解耦與通訊,在Android開發、DDD等領域都有很是普遍的應用。
java
事件流大體以下:git
注:一個組件便可以是Producer,也能夠是Consumer。github
分佈式服務間的EventBus
在分佈式系統中,事件在服務之間的傳遞要比單機EventBus複雜不少。有沒有一種適用於分佈式服務之間的,而且事件傳遞就像單機同樣簡單的EventBus呢?在GitHub上搜索了JAVA實現的EventBus,排名前十的幾乎都是用於Android或JAVA的單機事件總線。良久以後...仍是本身動手吧。集羣環境下的EventBus比單機版須要多考慮一些問題,好比:spring
解決方案:緩存
部分關鍵源碼分佈式
一、事件消息的定義ide
public abstract class EventArg implements IEventArg{ private Date eventTime; public EventArg(){ eventTime = new Date(); } public Date getEventTime() { return eventTime; } public void setEventTime(Date eventTime) { this.eventTime = eventTime; } }
事件消息默認記錄建立時間,可擴展其餘字段,好比發送時間、標識等。ui
二、使用spring-kafka發送消息this
/** * kafka事件註冊器,向kafka隊列中push消息 */ @Component public class KafkaRegister implements IEventRegister { @Autowired(required = false) private KafkaTemplate<String,IEventArg> kafkaTemplate; /** * 事件註冊 * * @param eventArg 事件參數 */ @Override public void regist(IEventArg eventArg) { kafkaTemplate.send(getTopic(eventArg),eventArg); } /** * 獲取kafka的topic * * * @param eventArg * @return topic */ private String getTopic(IEventArg eventArg){ return eventArg.getClass().getName(); } }
三、消費kafka消息並執行當前服務中全部訂閱了該消息的事件spa
/** * kafka事件監聽器 */ public class KafkaEventArgListener implements MessageListener<String,EventArg> { @Autowired private IEventHandlerFactory eventHandlerFactory; @Override public void onMessage(ConsumerRecord<String, EventArg> consumerRecord) { if (consumerRecord == null) return; EventArg value = consumerRecord.value(); Set<IEventHandler> handlers = eventHandlerFactory.getHandlers(value); if (handlers == null) return; for (IEventHandler handler : handlers) { handler.HandleEvent(value); } } }
EventBus的使用
一、事件的定義。全部事件均繼承於上文EventArg抽象類,示例以下:
public class TestEventArg extends EventArg{ private String value; public String getValue() { return value; } public void setValue(String value) { this.value = value; } }
二、事件發佈。示例代碼:
eventBus.push(new TestEventArg());
三、事件訂閱。一個服務發佈事件以後,任何服務中的任何`xxxServiceImpl`類都可訂閱該事件,實現`IEventHandler<TEventArg>`接口便可完成事件的訂閱,示例以下:
public class FormServiceImpl extends AbstractServiceImpl<Form> implements FormService,IEventHandler<TestEventArg> { @Override public void HandleEvent(TestEventArg eventArg) { System.out.println("notify zero======"); } }
總體來講,使用仍是很簡單的,EventBus實現與使用示例收錄於bird-java項目中,項目地址:https://github.com/liuxx001/bird-java。