本篇是「事件驅動的微服務」系列的第二篇,主要講述事件驅動設計。若是想要了解整體設計,請看第一篇"事件驅動的微服務-整體設計"git
咱們經過一個具體的例子來說解事件驅動設計。 本文中的程序有兩個微服務,一個是訂單服務(Order Service), 另外一個是支付服務(Payment Service)。用戶調用訂單服務的用例createOrder()來建立訂單,建立以後的訂單暫時尚未支付信息,訂單服務而後發佈命令(Command)給支付服務,支付服務完成支付,發送支付完成(Payment Created)消息。訂單服務收到消息(Event),在Order表裏增長Payment_Id並修改訂單狀態爲「已付款」。
下面就是組件圖:github
事件分紅內部事件和外部事件,內部事件是存在於一個微服務內部的事件,不與其餘微服務共享。若是用DDD的語言來描述就是在有界上下文(Bounded Context)內的域事件(Domain Event)。外部事件是從一個微服務發佈,而被其餘微服務接收的事件。若是用DDD的語言來描述就是在不一樣有界上下文(Bounded Context)之間傳送的域事件(Domain Event)。這兩種事件的處理方式不一樣。編程
對於內部事件的處理早已有了成熟的方法,它的基本思路是建立一個事件總線(Event Bus),由它來監聽事件。而後註冊不一樣的事件處理器(Event Handler)來處理事件。這種思路被普遍地用於各類領域。架構
下面就是事件總線(Event Bus)的接口,它有兩個函數,一個是發佈事件(Publish Event),另外一個是添加事件處理器(Event Handler)。一個事件能夠有一個或多個事件處理器。框架
type EventBus interface { PublishEvent(EventMessage) AddHandler(EventHandler, ...interface{}) }
事件總線的代碼的關鍵部分是加載事件處理器。咱們以訂單服務爲例,下面就是加載事件處理器(Event Handler)的代碼,它是初始化容器代碼的一部分。在這段代碼中,它只註冊了一個事件,支付完成事件(PaymentCreateEvent),和與之相對應的事件處理器-支付完成事件處理器(PaymentCreatedEventHandler)。函數
func loadEventHandler(c servicecontainer.ServiceContainer) error { var value interface{} var found bool rluf, err := containerhelper.BuildModifyOrderUseCase(&c) if err != nil { return err } pceh := event.PaymentCreatedEventHandler{rluf} if value, found = c.Get(container.EVENT_BUS); !found { message := "can't find key=" + container.EVENT_BUS + " in container " return errors.New(message) } eb := value.(ycq.EventBus) eb.AddHandler(pceh,&event.PaymentCreatedEvent{}) return nil }
因爲在處理事件時要調用相應的用例,所以須要把用例注入到事件處理器中。在上段代碼中,首先從容器中得到用例,而後建立事件處理器,最後把事件和與之對應的處理器加入到事件總線中。微服務
事件的發佈是經過調用事件總線的PublishEvent()來實現的。下面的例子就是在訂單服務中經過消息中間件來監聽來自外部的支付完成事件(PaymentCreatedEvent),收到後,把它轉化成內部事件,而後發送到事件總線上,這樣已經註冊的事件處理器就能處理它了。ui
eb := value.(ycq.EventBus) subject := config.SUBJECT_PAYMENT_CREATED _, err := ms.Subscribe(subject, func(pce event.PaymentCreatedEvent) { cpm := pce.NewPaymentCreatedDescriptor() logger.Log.Debug("payload:",pce) eb.PublishEvent(cpm) })
那麼事件是怎樣被處理的呢?關鍵就在PublishEvent函數。當一個事件發佈時,事件總線會把全部註冊到該事件的事件處理器的Handle()函數依次調用一遍, 下面就是PublishEvent()的代碼。這樣每一個事件處理器只要實現Handle()函數就能夠了。spa
func (b *InternalEventBus) PublishEvent(event EventMessage) { if handlers, ok := b.eventHandlers[event.EventType()]; ok { for handler := range handlers { handler.Handle(event) } } }
下面就是PaymentCreatedEventHandler的代碼。它的邏輯比較簡單,就是從Event裏得到須要的支付信息,而後調用相應的用例來完成UpdatePayment()功能。.net
type PaymentCreatedEventHandler struct { Mouc usecase.ModifyOrderUseCaseInterface } func(pc PaymentCreatedEventHandler) Handle (message ycq.EventMessage) { switch event := message.Event().(type) { case *PaymentCreatedEvent: status := model.ORDER_STATUS_PAID err := pc.Mouc.UpdatePayment(event.OrderNumber, event.Id,status) if err != nil { logger.Log.Errorf("error in PaymentCreatedEventHandler:", err) } default: logger.Log.Errorf("event type mismatch in PaymentCreatedEventHandler:") } }
我在這裏用到了一個第三方庫"jetbasrawi/go.cqrs"來處理Eventbus。Jetbasrawi是一個事件溯源(Event Sourcing)的庫。事件溯源與事件驅動很容易搞混,它們看起來有點像,但其實是徹底不一樣的兩個東西。事件驅動是微服務之間的一種調用方式,存在於微服務之間,與RPC的調用方式相對應;而事件溯源是一種編程模式,你能夠在微服務內部使用它或不使用它。但我一時找不到事件驅動的庫,就先找一個事件溯源的庫來用。其實本身寫一個也很簡單,但我不以爲能寫的比jetbasrawi更好,那就仍是先用它把。不過事件溯源要比事件驅動複雜,所以用Jetbasrawi可能有點大材小用了。
外部事件的不一樣之處是它要在微服務之間進行傳送,所以須要消息中間件。我定義了一個通用接口,這樣能夠支持不一樣的消息中間件。它的最重要的兩個函數是publish()和Subscribe()。
package gmessaging type MessagingInterface interface { Publish(subject string, i interface{}) error Subscribe(subject string, cb interface{} ) (interface{}, error) Flush() error // Close will close the decorated connection (For example, it could be the coded connection) Close() // CloseConnection will close the connection to the messaging server. If the connection is not decorated, then it is // the same with Close(), otherwise, it is different CloseConnection() }
因爲定義了通用接口,它能夠支持多種消息中間件,我這裏選的是"NATS"消息中間件。當初選它是由於它是雲原生計算基金會("CNCF")的項目,並且功能強大,速度也快。若是想了解雲原生概念,請參見"雲原生的不一樣解釋及正確含義"
下面的代碼就是NATS的實現,若是你想換用別的消息中間件,能夠參考下面的代碼。
type Nat struct { Ec *nats.EncodedConn } func (n Nat) Publish(subject string, i interface{}) error { return n.Ec.Publish(subject,i) } func (n Nat) Subscribe(subject string, i interface{} ) (interface{}, error) { h := i.(nats.Handler) subscription, err :=n.Ec.Subscribe(subject, h) return subscription, err } func (n Nat) Flush() error { return n.Ec.Flush() } func (n Nat) Close() { n.Ec.Close() } func (n Nat) CloseConnection() { n.Ec.Conn.Close() }
「Publish(subject string, i interface{})」有兩個參數,「subject」是消息中間件的隊列(Queue)或者是主題(Topic)。第二個參數是要發送的信息,它通常是JSON格式。使用消息中間件時須要一個連接(Connection),這裏用的是「*nats.EncodedConn」, 它是一個封裝以後的連接,它裏面含有一個JSON解碼器,能夠支持在結構(struct)和JSON之間進行轉換。當你調用發佈函數時,發送的是結構(struct),解碼器自動把它轉換成JSON文本再發送出去。「Subscribe(subject string, i interface{} )」也有兩個參數,第一個與Publish()的同樣,第二個是事件驅動器。當接收到JSON文本後,解碼器自動把它轉換成結構(struct),而後調用事件處理器。
我把與消息中間件有關的代碼寫成了一個單獨的第三方庫,這樣不論你是否使用本框架均可以使用這個庫。詳細信息參見"jfeng45/gmessaging"
命令(Command)在代碼實現上和事件(Event)很是類似,但他們在概念上徹底不一樣。例如支付申請(Make Payment)是命令,是你主動要求第三方(支付服務)去作一件事情,並且你知道這個第三方是誰。支付完成(Payment Created)是事件,是你在彙報一件事情已經作完,而其餘第三方程序可能會根據它的結果來決定是否要作下一步的動做,例如訂單服務當收到支付完成這個事件時,就能夠更改本身的訂單狀態爲「已支付」。這裏,事件的發送方並不知道誰會對這條消息感興趣,所以這個發送是廣播式發送。並且這個動做(支付)已經完成,而命令是還沒有完成的動做,所以接收方能夠選擇拒絕執行一條命令。咱們日常常常講的事件驅動是鬆耦合,而RPC是緊耦合,這裏指的是事件方式,而不是命令方式。採用命令方式時,因爲你已經知道了要發給誰,所以是緊耦合的。
在實際應用中,咱們所看到的大部分的命令都是在一個微服務內部使用,不多有在微服務之間發送命令的,微服務之間傳遞的主要是事件。但因爲事件和命令很容易混淆,有很多在微服務之間傳遞的「事件」其實是「命令」。所以並非使用事件驅動方式就能把程序變成鬆耦合的,而要進一步檢查你是否將「命令」錯用成了「事件」。在本程序中會嚴格區分它們。
下面就是命令總線(Dispatcher)的接口,除了函數名字不同外,其餘與事件總線幾乎如出一轍。
type Dispatcher interface { Dispatch(CommandMessage) error RegisterHandler(CommandHandler, ...interface{}) error }
咱們徹底能夠把它定義成下面的樣子,是否是就與事件總線很像了?下面的接口和上面的是等值的。
type CommandBus interface { PublishCommand(CommandMessage) error AddHandler(CommandHandler, ...interface{}) error }
事件和命令的其餘方面,例如定義方式,處理流程,實現方式,傳送方式也幾乎如出一轍。詳細的我就不講了,你能夠本身看代碼進行比較。那咱們可不能夠只用一個事件總線同時處理時間和命令呢?理論上來說是沒有問題的。我開始的時候也是這麼想的,但因爲如今的接口("jetbasrawi/go.cqrs")不支持,若是要改的話須要從新定義接口,所以就暫時放棄了。另外,他們兩個在概念上仍是很不一樣的,因此在實現上定義不一樣的接口也是有必要的。
下面來說解在設計事件驅動時應注意的問題。
事件驅動模式與RPC相比增長的部分是事件和命令。所以首先要考慮的是要對RPC的程序結構作哪些擴充和怎樣擴充。「Event」和「command」從本質上來說是業務邏輯的一部分,所以應屬於領域層。所以在程序結構上也增長了兩個目錄「Event」和「command」分別用來存放事件和命令。結構以下圖所示。
如今的代碼在處理外部事件時,在發送端和接收端的方式是不同的。
下面就是發送端的代碼(代碼在支付服務項目裏),整個代碼功能是建立支付,完成以後再發布「支付完成」消息。它直接經過消息中間件接口把事件發送出去。
type MakePaymentUseCase struct { PaymentDataInterface dataservice.PaymentDataInterface Mi gmessaging.MessagingInterface } func (mpu *MakePaymentUseCase) MakePayment(payment *model.Payment) (*model.Payment, error) { payment, err := mpu.PaymentDataInterface.Insert(payment) if err!= nil { return nil, errors.Wrap(err, "") } pce := event.NewPaymentCreatedEvent(*payment) err = mpu.Mi.Publish(config.SUBJECT_PAYMENT_CREATED, pce) if err != nil { return nil, err } return payment, nil }
下面就是接收端的代碼例子。是它是先用消息接口接收時間,再把外部事件轉化爲內部事件,而後調用事件總線的接口在微服務內部發布事件。
eb := value.(ycq.EventBus) subject := config.SUBJECT_PAYMENT_CREATED _, err := ms.Subscribe(subject, func(pce event.PaymentCreatedEvent) { cpm := pce.NewPaymentCreatedDescriptor() logger.Log.Debug("payload:",pce) eb.PublishEvent(cpm) })
爲何會有這種不一樣?在接收時,可不能夠不生成內部事件,而是直接調用用例來處理外部事件呢?在發送時,若是沒有別的內部事件處理器,那麼直接調用消息中間件來發送是最簡單的方法(這個發送過程是輕量級的,耗時很短)。而接收時可能須要處理比較複雜的業務邏輯。所以你但願把這個過程分紅接收和處理兩個部分,讓複雜的業務邏輯在另一個過程裏處理,這樣能夠儘可能縮短接收時間,提升接收效率。
如今的設計是每一個事件和事件驅動器都有一個單獨的文件。我見過有些人只用一個文件例如PaymentEvent來存放全部與Payment相關的事件,事件驅動器也是同樣。這兩種辦法都是可行的。如今看來,生成單獨的文件比較清晰,但若是之後事件很是多,也許一個文件存放多個事件會比較容易管理,不過到那時再改也不遲。
對於一個好的設計來說,全部的業務邏輯都應該集中在一塊兒,這樣便於管理。在如今的架構裏,業務邏輯是放在用例(Use Case)裏的,但事件處理器裏也須要有業務邏輯,應該怎麼辦?支付事件處理器的主要功能是修改訂單中的付款信息,這部分的業務邏輯已經體如今修改訂單(Modify Order)用例裏,所以支付事件處理器只要調用修改訂單的MakePayment()函數就能夠了。實際上全部的事件處理器都應該這樣設計,它們自己不該包含業務邏輯,而只是一個簡單的封裝,去調用用例裏的業務邏輯。那麼可不能夠直接在用例裏定義Handle()函數,這樣用例就變成了事件處理器?這樣的設計確實可行,但我以爲把事件處理器作成一個單獨的文件,這樣邏輯上更清晰。由於修改訂單付款功能你是必定要有的,但事件處理器只有在事件驅動模式下才有,它們是屬於兩個不一樣層面的東西,只有分開放置才層次清晰。
完整的源程序連接:
3 "CNCF"
5 "NATS"