- 原文地址:levelup.gitconnected.com/lets-write-…
- 原文做者:Kasun Vithanage
- 譯文地址:github.com/watermelo/d…
- 譯者:咔嘰咔嘰
- 譯者水平有限,若有翻譯或理解謬誤,煩請幫忙指出
事件驅動架構是計算機科學中一種高度可擴展的範例。它容許咱們能夠多方系統異步處理事件。git
事件總線是發佈/訂閱模式的實現,其中發佈者發佈數據,而且感興趣的訂閱者能夠監聽這些數據並基於這些數據做出處理。這使發佈者與訂閱者鬆耦合。發佈者將數據事件發佈到事件總線,總線負責將它們發送給訂閱者。github
傳統的實現事件總線的方法會涉及到使用回調。訂閱者一般實現接口,而後事件總線經過接口傳播數據。golang
使用 go 的併發模型,咱們知道在大多數地方可使用 channel
來替代回調。在本文中,咱們將重點介紹如何使用 channel
來實現事件總線。數組
咱們專一於基於主題(topic)的事件。發佈者發佈到主題,訂閱者能夠收聽它們。數據結構
爲了實現事件總線,咱們須要定義要傳遞的數據結構。咱們可使用 struct
簡單地建立一個新的數據類型。咱們定義一個 DataEvent
的結構體以下:架構
type DataEvent struct {
Data interface{}
Topic string
}
複製代碼
在這裏,咱們已經將基礎數據定義爲接口,這意味着它能夠是任何值。咱們還將主題定義爲結構的成員。訂閱者可能會收聽多個主題,所以,咱們經過主題來讓訂閱者能夠區分不一樣的事件的作法是不錯的。併發
如今咱們已經爲事件總線定義了咱們主要的數據結構,咱們還須要一種方法來傳遞它。爲此,咱們能夠定義一個能夠傳播 DataEvent
的 DataChannel
類型。app
// DataChannel 是一個能接收 DataEvent 的 channel
type DataChannel chan DataEvent
// DataChannelSlice 是一個包含 DataChannels 數據的切片
type DataChannelSlice [] DataChannel
複製代碼
DataChannelSlice
的建立是爲了保留 DataChannel
的切片並輕鬆引用它們。異步
// EventBus 存儲有關訂閱者感興趣的特定主題的信息
type EventBus struct {
subscribers map[string]DataChannelSlice
rm sync.RWMutex
}
複製代碼
EventBus
有 subscribers
,這是一個包含 DataChannelSlices
的 map。咱們使用互斥鎖來保護併發訪問的讀寫。函數
經過使用 map
和定義 topics
,它容許咱們輕鬆地組織事件。主題被視爲 map
的鍵。當有人發佈它時,咱們能夠經過鍵輕鬆找到主題,而後將事件傳播到 channel
中以進行進一步處理。
對於訂閱主題,使用 channel
。它就像傳統方法中的回調同樣。當發佈者向主題發佈數據時,channel
將接收數據。
func (eb *EventBus)Subscribe(topic string, ch DataChannel) {
eb.rm.Lock()
if prev, found := eb.subscribers[topic]; found {
eb.subscribers[topic] = append(prev, ch)
} else {
eb.subscribers[topic] = append([]DataChannel{}, ch)
}
eb.rm.Unlock()
}
複製代碼
簡單地說,咱們將訂閱者添加到 channel
切片中而後給該結構加鎖,最後在操做後將其解鎖。
要發佈事件,發佈者須要提供廣播給訂閱者所須要的主題和數據。
func (eb *EventBus) Publish(topic string, data interface{}) {
eb.rm.RLock()
if chans, found := eb.subscribers[topic]; found {
// 這樣作是由於切片引用相同的數組,即便它們是按值傳遞的
// 所以咱們正在使用咱們的元素建立一個新切片,從而能正確地保持鎖定
channels := append(DataChannelSlice{}, chans...)
go func(data DataEvent, dataChannelSlices DataChannelSlice) {
for _, ch := range dataChannelSlices {
ch <- data
}
}(DataEvent{Data: data, Topic: topic}, channels)
}
eb.rm.RUnlock()
}
複製代碼
在此方法中,首先咱們檢查主題是否存在任何訂閱者。而後咱們只是簡單地遍歷與主題相關的 channel
切片並把事件發佈給它們。
請注意,咱們在發佈方法中使用了 goroutine 來避免阻塞發佈者
首先,咱們須要建立一個事件總線的實例。在實際場景中,你能夠從包中導出單個 EventBus
,使其像單例同樣運行。
var eb = &EventBus{
subscribers: map[string]DataChannelSlice{},
}
複製代碼
爲了測試新建立的事件總線,咱們將建立一個以隨機間隔時間發佈到指定主題的方法。
func publisTo(topic string, data string) {
for {
eb.Publish(topic, data)
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
}
}
複製代碼
接下來,咱們須要一個能夠收聽主題的 main 函數。它使用輔助方法打印出事件的數據。
func printDataEvent(ch string, data DataEvent) {
fmt.Printf("Channel: %s; Topic: %s; DataEvent: %v\n", ch, data.Topic, data.Data)
}
func main() {
ch1 := make(chan DataEvent)
ch2 := make(chan DataEvent)
ch3 := make(chan DataEvent)
eb.Subscribe("topic1", ch1)
eb.Subscribe("topic2", ch2)
eb.Subscribe("topic2", ch3)
go publisTo("topic1", "Hi topic 1")
go publisTo("topic2", "Welcome to topic 2")
for {
select {
case d := <-ch1:
go printDataEvent("ch1", d)
case d := <-ch2:
go printDataEvent("ch2", d)
case d := <-ch3:
go printDataEvent("ch3", d)
}
}
}
複製代碼
咱們建立了三個能夠訂閱主題的 channels
訂閱者(ch1,ch2,ch3)。其中 ch2 和 ch3 這兩個監聽同一事件。
咱們使用 select 語句從最快返回的 channel
中獲取數據。而後它使用另外一個 goroutine 打印輸出數據。用 goroutine 也不是必需的。但在某些狀況下,你必須對事件進行一些繁重的操做處理。爲了防止阻塞 select,咱們使用了 goroutine。
示例輸出將以下所示
Channel: ch1; Topic: topic1; DataEvent: Hi topic 1
Channel: ch2; Topic: topic2; DataEvent: Welcome to topic 2
Channel: ch3; Topic: topic2; DataEvent: Welcome to topic 2
Channel: ch3; Topic: topic2; DataEvent: Welcome to topic 2
Channel: ch2; Topic: topic2; DataEvent: Welcome to topic 2
Channel: ch1; Topic: topic1; DataEvent: Hi topic 1
Channel: ch3; Topic: topic2; DataEvent: Welcome to topic 2
...
複製代碼
你能夠看到事件總線經過 channel
分發事件。
基於簡單 channel
的事件總線的源代碼。
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type DataEvent struct {
Data interface{}
Topic string
}
// DataChannel 是一個能接收 DataEvent 的 channel
type DataChannel chan DataEvent
// DataChannelSlice 是一個包含 DataChannels 數據的切片
type DataChannelSlice []DataChannel
// EventBus 存儲有關訂閱者感興趣的特定主題的信息
type EventBus struct {
subscribers map[string]DataChannelSlice
rm sync.RWMutex
}
func (eb *EventBus) Publish(topic string, data interface{}) {
eb.rm.RLock()
if chans, found := eb.subscribers[topic]; found {
// 這樣作是由於切片引用相同的數組,即便它們是按值傳遞的
// 所以咱們正在使用咱們的元素建立一個新切片,從而正確地保持鎖定
channels := append(DataChannelSlice{}, chans...)
go func(data DataEvent, dataChannelSlices DataChannelSlice) {
for _, ch := range dataChannelSlices {
ch <- data
}
}(DataEvent{Data: data, Topic: topic}, channels)
}
eb.rm.RUnlock()
}
func (eb *EventBus) Subscribe(topic string, ch DataChannel) {
eb.rm.Lock()
if prev, found := eb.subscribers[topic]; found {
eb.subscribers[topic] = append(prev, ch)
} else {
eb.subscribers[topic] = append([]DataChannel{}, ch)
}
eb.rm.Unlock()
}
var eb = &EventBus{
subscribers: map[string]DataChannelSlice{},
}
func printDataEvent(ch string, data DataEvent) {
fmt.Printf("Channel: %s; Topic: %s; DataEvent: %v\n", ch, data.Topic, data.Data)
}
func publisTo(topic string, data string) {
for {
eb.Publish(topic, data)
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
}
}
func main() {
ch1 := make(chan DataEvent)
ch2 := make(chan DataEvent)
ch3 := make(chan DataEvent)
eb.Subscribe("topic1", ch1)
eb.Subscribe("topic2", ch2)
eb.Subscribe("topic2", ch3)
go publisTo("topic1", "Hi topic 1")
go publisTo("topic2", "Welcome to topic 2")
for {
select {
case d := <-ch1:
go printDataEvent("ch1", d)
case d := <-ch2:
go printDataEvent("ch2", d)
case d := <-ch3:
go printDataEvent("ch3", d)
}
}
}
複製代碼
傳統的回調方式要求實現某種接口。
例如,
type Subscriber interface {
onData(event Event)
}
複製代碼
使用回調的話,若是你想訂閱一個事件,你須要實現該接口,以便事件總線能夠傳播它。
type MySubscriber struct {
}
func (m MySubscriber) onData(event Event) {
// 處理事件
}
複製代碼
而 channel
容許你在沒有接口的狀況下在一個簡單的函數中註冊訂閱者。
func main() {
ch1 := make(chan DataEvent)
eb.Subscribe("topic1", ch1)
fmt.Println((<-ch1).Data)
...
}
複製代碼
本文的目的是指出編寫事件總線的不一樣實現方法。
這可能不是理想的解決方案。
例如,channel
被阻塞直到有人消費它們。這有必定的侷限性。
我已經使用切片來存儲主題的全部訂閱者。這用於簡化文章。這須要用 SET 替換,以致於列表中不存在重複的訂閱者。
傳統的回調方法可使用提供的相同的原理去簡單地實現。你能夠輕鬆地在 goroutine 中進行異步裝飾發佈事件。
我很想聽聽你對這篇文章的見解。 :)