事件驅動是一種靈活的系統設計方法,在事件驅動的系統中,當數據發生變化時系統會產生、發佈一個對應的事件,其它對這個事件感興趣的部分會接收到通知,並進行相應的處理。事件驅動設計最大的好處在我看來有兩點:一是它爲系統提供了很好的擴展能力,好比咱們能夠對某類事件增長一個訂閱者來對系統進行擴展,最主要的是咱們並不須要修改任何已有的代碼,它徹底符合開閉原則;二是它實現了模塊間的低偶合,系統間各個部分不是強依賴關係,而是經過事件把整個系統串聯起來。
固然,任何事務都有兩面性,事件驅動也有其很差的方面。首先,實現一套這樣的系統複雜度就很高,對開發人員的要求也很高;再次,對系統的總體把控會很困難,想象一下面對幾百個類別的事件,而且沒有一個統一的地方可讓咱們看到整個業務處理流程,會是什麼心情?因此當咱們決定採用事件驅動實現系統中,必定要維護好相關的文檔,並保持它們的有效性。
咱們再來看看事件驅動架構的一些其它的優勢:
更好的響應性
事件驅動中,事件的響應是異步處理的,因此它具備更好的響應性。
更好的容錯性
業務主流程在發佈事件以後便結束了,擴展流程的延後處理能夠異步不斷的失敗重試,直到成功爲止,系統總體容錯性更強。
設計篇
首先,咱們須要定義什麼是事件?從業務角度看,事件包括如下屬性:
事件的定義屬性字段類型說明標識IDstring系統內部每一個事件都須要一個惟一的標識。類型eventTypestring數據發生變化產生事件,不一樣類型的數據變化產生不一樣類型的事件。好比會員下單、會員註冊、用戶修改手機號等等。時間eventTimedatetime即數據發生變化的時間。上下文contextstring事件發生時的上下文信息。好比會員修改手機號事件,須要原號碼和新號碼,會員 ID 等信息。
接下來,咱們看看如何設計一套基於事件驅動的系統,你知道設計模式中的觀察者模式嗎?
觀察者模式 :定義了一種一對多的依賴關係,讓多個觀察者對象同時監聽某一個主題對象。這個主題對象在狀態發生變化時,會通知全部觀察者對象,使它們可以自動更新本身。觀察者模式 :定義了一種一對多的依賴關係,讓多個觀察者對象同時監聽某一個主題對象。這個主題對象在狀態發生變化時,會通知全部觀察者對象,使它們可以自動更新本身。
觀察者模式天生就是事件驅動的一個實現,可是直接使用它有不少的弊端。首先,它是基於主題的,有多少類事件就須要多少個主題類,這可能會致使類爆炸;其次,觀察者模式是同步實現的,這樣咱們可能會犧牲掉響應性和容錯性等優點。
因此咱們須要對觀察者模式稍做改進,咱們分別從事件發發布和消費兩個方面來分析。
事件的發佈
本文的標題是《基於 Kafka 實現事件驅動架構》,很明顯,咱們使用 kafka 做爲消息中間件來傳遞事件消息。因此,像修改會員手機號碼的代碼可能實現以下:
@Transactional(readOnly =false, isolation = Isolation.READ_COMMITTED, rollbackFor = Exception.class)br/>@Override
publicvoidchangePhoneNumber(String newNumber){
userDao.updatePhone(this.getUserId(), newNumber);// 本地數據庫修改java
// 發佈 用戶手機號碼變動 事件
Event event =newEvent(...);// 建立一個事件對象,表示用戶修改手機號碼
ProducerRecord record =newProducerRecord(...event);// 根據 event 生成 kakfa recordmysql
Future<RecordMetadata> f = kafkaProducer.send(record);
try{
f.get();
}catch(InterruptedException | ExecutionException e) {
thrownewRuntimeException(e);
}
}算法
這段代碼正確嗎?從邏輯上看,它徹底正確。但從可靠性角度看它是有問題的。Kafka 和數據庫是兩個異構系統,咱們不能僅僅經過一個本地事務保證他們之間的數據一致性。例如,推送 Kafka 成功了,可是在提交 DB 事務的時候失敗了呢(好比說事務超時滾)?這樣 kafka 中就會存在一個髒數據,由於本地數據庫事務已經回滾了。
分佈式系統數據一致性一直就是複雜的問題,經常使用的方案有兩階段提交、三階段提交、zookeeper 的 zab 協議、proxs、raft 等算法,這不是本文的重點。咱們採用一個簡單易懂的方式來解決上面的問題。咱們引入一張 DB 事件表,在發佈事件時將事件信息存入這個事件表,將事件的發佈和業務處理包裝在同一個本地事務中。
createtableifnotexistsevent_queue
(id
bigintnotnullauto_incrementcomment'主鍵',event_id
char(32)notnullcomment'事件 ID',event_type
char(12)notnullcomment'事件類型',event_time
datetimenotnullcomment'事件發生時間',context
mediumtextnotnullcomment'事件內容',
primarykey(id
),
uniquekey(event_id
)
)engine=innodbdefaultcharset=utf8comment='事件隊列表';sql
發佈事件,就是向這個事件表中增長一條記錄,修改會員手機號碼的代碼如今變成了:
@Transactional(readOnly =false, isolation = Isolation.READ_COMMITTED, rollbackFor = Exception.class)br/>@Override
publicvoidchangePhoneNumber(String newNumber){
userDao.updatePhone(this.getUserId(), newNumber);// 本地數據庫修改數據庫
// 發佈 用戶手機號碼變動 事件
Event event =newEvent(...);// 建立一個事件對象,表示用戶修改手機號碼
eventDao.insert(event);// 向事件表建立一條新記錄。
}json
因爲事件消息如今被暫存進了 DB,咱們還須要將它取出來推到 Kafka,爲此咱們須要起一個線程,不斷的讀取事件表中的記錄發送給 Kafka,並在成功發送以後將記錄從 DB 中刪除。若是刪除 DB 的時候失敗了,那麼消息會被從新推送到 kafka,意爲着咱們實現的是 At least once 的遞交語義,對於業務上不接受重複的場景,在消費端須要作好冪等處理。
講到這裏,關於事件的分佈已經接近尾聲,但還有一個問題: 性能 。若是一個系統的負載很高,一秒內產生成千上萬個事件,那咱們的事件表就會成爲瓶頸,由於只用了一個線程來處理事件表向 Kafka 的推送,集羣中只有一個實例能發輝做用,沒法實現彈性。爲了解決這個問題,咱們能夠對事件表進行分表,並使用多線程併發處理,並且這些線程能夠分佈在不一樣的集羣實例中。但這樣使設計變得更復雜了,如今咱們須要解決一個新的問題: 如何保證一個事件表,最多隻被一個線程處理? 咱們須要保證一個事件表同一時刻只能被一個線程處理,同時在實例宕機後,其它實例能夠起線程接替它的工做。這句話咱們換一種方式來描述更容易理解:
集羣有 M 個實例,須要進行 N 個任務(任務是把事件分表中的事件信息推送到 kafka)
一個任務最多能夠分配給 1 個實例,1 個實例能夠同時執行多個任務。
若是一個實例宕機了,分配給它的任務須要從新在其它實例上分配。
N 個任務固定不變,實例能夠動態增長或減小,須要實現實例之間的均衡負載。
若是你熟悉像 HBase、ES 這類分佈式系統的話,不難理解咱們須要在集羣中選出一個實例做爲 Master,由它來負責任務在集羣中的分配工做。咱們藉助 Kookeeper,全部實例在啓動時建立一個 EPHEMERAL 類型的 master 節點,建立成功的實例成爲 Master,其它實例則監聽 master 節點,當 Master 實例宕機後從新競選。設計模式
每一個實例啓動後,會在 workers 節點下建立一個臨時節點,表示本身做爲一個 Worker 加入集羣;Worker 同時會監聽本身建立的子節點,接收由 Master 分配給本身的任務。Master 會監聽 workers 下子節點的變化,當實例下線或有新的實例加入集羣中時,Master 會收到通知並從新進行任務的分配。分配的具體信息保存在 Worker 實例建立的子節點中,Master 經過直接修改這些子節點的內容實現分配。
從事件的發佈來看,系統的架構是這樣的:多線程
這裏有個細節須要說明: 由於 Kafka 只保證 partition 級別的有序性,咱們的事件分表數必須大於或等於 partition 的數量,不然事件的順序得不到保證 。
事件的消費
由於咱們使用了 Kafka 做爲事件消息中間件,事件的消費簡單不少。每一個實例在啓動時啓一個 Kafka Consumer 便可,像實例間的負載、可用性、故障轉移等等問題,Kafka 已經幫咱們解決了,咱們只須要從 Kafka 中獲取事件消息,並通知相應的訂閱者便可。架構
訂閱者須要實現 BaseSubscriber 接口,另外在啓動時,須要把事件與訂閱者的關係維護在 SubscriberConfig 類中:
BaseSubscriber sub = ...// your implementation
SubscriberConfig.instance().addSubscriber("event_type", sub);併發
系統總體的設計是面向擴展的,咱們能夠經過調整集羣應用實例數、事件表分表數量和 kafka partitions 數量來提升系統總體的吞吐量。事件表分表越多,事件消息從 DB 到 kafka 的延遲就更低;應用實例越多,系統單位時間內能承受的事件上限也越多,另外也能更好的負載 kafka 消息的消費。
每個應用,做爲事件發佈者,其產生的事件最終都被推送到一個 Kafka Topic;但做爲消費者,能夠訂閱不一樣的 Topic,這些 Topic 能夠是本身的推送的,也能夠是其它應用推送的事件。
實現篇
這裏咱們只對部分核心代碼做一個簡單的介紹:
SimpleLock 是一個基於 Zookeeper 的簡單分佈式鎖實現,咱們使用 SimpleLock 來實現 Master 的競選。
EventSubmitter 是一個線程,負責把事件表中的事件信息推送到 Kafka broker。初始化時須要傳入一個 int 參數,表示處理哪個事件分表。它被實現成一個響應中斷的線程,由於當 Master 從新分配任務後,Worker 須要先停掉當前進行中的任務。
Master 類是 Master 實例的主要實現。實例在啓動時會調 Master 類的 start 方法,Master 實例監聽 workers 節點,當有新實例加入或實例下線時,Master 實例會調用 onWorkerChange 方法進行從新分配, onWorkerChange 方法實現了一個簡單的分配算法,只有任務變動的 Worker 實例會收到分配通知。
Worker 類是 Worker 實例的主要實現,實例在啓動時會調 Worker 類的 start 方法。集羣中的每個實例都是 Worker,會在 workers 節點下建立一個臨時的節點表示本身,同時監聽該節點,接受 Master 分配給本身的任務。當 Worker 接收到分配通知時,會先中止當前在運行的全部任務,再根據 worker 節點的內容開始執行新分配的任務。
示例
來看一個具體的事例,假設咱們要以天爲維度,統計天天的下單量和下單金額。如今,咱們已經有了訂單表:
createtableifnotexistsorder
(order_id
bigintnotnullauto_incrementcomment'主鍵',user_id
bigintnotnullcomment'客戶 id',order_time
datetimenotnullcomment'訂單時間',order_amount
intnotnullcomment'訂單金額,單位:分',
primarykey(order_id
)
)engine=innodbdefaultcharset=utf8comment='訂單表';
這個需求咱們能夠簡單的使用 sql 來作,好比:
selectdate(order_time)as day,count(*)as total_num,sum(order_amount)as total_amount from order
group bydate(order_time)
{1}
可是在生產環境中這麼作每每不現實,好比性能問題、或者咱們對訂單表作了分表、或者幾個月前的數據庫了備份,而你正好須要查詢這些數據,等等。實現這個需求更好的方式是採用事件驅動,在下單的時候發佈一個事件,而後異步的維護一個查詢表,這樣之間的種種問題都將不復存在。先建立一個查詢表,以下:
createtableifnotexistsdaily_order_report
(id
bigintnotnullauto_incrementcomment'主鍵',day
datenotnullcomment'統計日',order_num
bigintnotnullcomment'訂單數量',order_total
bigintnotnullcomment'訂單總金額,單位:分',
primarykey(id
),
uniquekey(day
)
)engine=innodbdefaultcharset=utf8comment='訂單日報表';
在下單的時候,咱們須要發佈一個 下單事件 :
@Transactional(readOnly =false, propagation = Propagation.REQUIRED, isolation = Isolation.READ_COMMITTED, rollbackFor = Exception.class)br/>@Override
publicvoidcreateOrder(Order order){
orderDao.insert(order);
// 發佈下單事件
publisher.publish("order_created",newDate(), order.json(), order.getUserId().intValue() % Configuration.instance().getNumOfEventTables());
}
以後,咱們須要實現一個訂閱者,在接收到 下單事件 後,根據訂單的日期作相應的統計:br/>@Component
publicclassDailyOrderReportSubscriberimplementsBaseSubscriber{
@Autowired
privateOrderRepos repos;
@Override
publicvoidonEvent(Event e){
Order order = Order.fromJson(e.getContext());
DailyOrderReport report = repos.selectDailyOrderReportByKey(newjava.sql.Date(order.getOrderTime().getTime()));
if(null== report) {
report =newDailyOrderReport();
report.setDay(newjava.sql.Date(order.getOrderTime().getTime()));
report.setOrderNum(1l);
report.setOrderTotal(newLong(order.getOrderAmount()));
repos.createDailyOrderReport(report);
}else{
report.setOrderNum(report.getOrderNum() +1);
report.setOrderTotal(report.getOrderTotal() + order.getOrderAmount());
repos.updateDailyOrderReport(report);
}
}
}
隨機建立 10 個訂單後,咱們的報表狀況以下:
mysql>select*fromorder
;
+----------+---------+---------------------+--------------+
| order_id | user_id | order_time | order_amount |
+----------+---------+---------------------+--------------+
| 21 | 3 | 2018-09-24 01:06:43 | 251 |
| 22 | 2 | 2018-09-24 01:06:43 | 371 |
| 23 | 5 | 2018-09-24 01:06:43 | 171 |
| 24 | 0 | 2018-09-24 01:06:43 | 904 |
| 25 | 3 | 2018-09-24 01:06:43 | 55 |
| 26 | 5 | 2018-09-24 01:06:44 | 315 |
| 27 | 8 | 2018-09-24 01:06:44 | 543 |
| 28 | 8 | 2018-09-24 01:06:44 | 537 |
| 29 | 2 | 2018-09-24 01:06:44 | 123 |
| 30 | 3 | 2018-09-24 01:06:45 | 938 |
+----------+---------+---------------------+--------------+
10 rows inset(0.00sec)
mysql>select*fromdaily_order_report;
+----+------------+-----------+-------------+
| id | day | order_num | order_total |
+----+------------+-----------+-------------+
| 2 | 2018-09-24 | 10 | 4208 |
+----+------------+-----------+-------------+
1 row inset(0.00sec)
mysql>