[譯] 使用 Go 實現一個簡單的事件總線模式

事件驅動架構是計算機科學中一種高度可擴展的範例。它容許咱們能夠多方系統異步處理事件。git

事件總線是發佈/訂閱模式的實現,其中發佈者發佈數據,而且感興趣的訂閱者能夠監聽這些數據並基於這些數據做出處理。這使發佈者與訂閱者鬆耦合。發佈者將數據事件發佈到事件總線,總線負責將它們發送給訂閱者。github

傳統的實現事件總線的方法會涉及到使用回調。訂閱者一般實現接口,而後事件總線經過接口傳播數據。golang

使用 go 的併發模型,咱們知道在大多數地方可使用 channel 來替代回調。在本文中,咱們將重點介紹如何使用 channel 來實現事件總線。數組

咱們專一於基於主題(topic)的事件。發佈者發佈到主題,訂閱者能夠收聽它們。數據結構

定義數據結構

爲了實現事件總線,咱們須要定義要傳遞的數據結構。咱們可使用 struct 簡單地建立一個新的數據類型。咱們定義一個 DataEvent 的結構體以下:架構

type DataEvent struct {
   Data interface{}
   Topic string
}
複製代碼

在這裏,咱們已經將基礎數據定義爲接口,這意味着它能夠是任何值。咱們還將主題定義爲結構的成員。訂閱者可能會收聽多個主題,所以,咱們經過主題來讓訂閱者能夠區分不一樣的事件的作法是不錯的。併發

介紹 channels

如今咱們已經爲事件總線定義了咱們主要的數據結構,咱們還須要一種方法來傳遞它。爲此,咱們能夠定義一個能夠傳播 DataEventDataChannel 類型。app

// DataChannel 是一個能接收 DataEvent 的 channel
type DataChannel chan DataEvent

// DataChannelSlice 是一個包含 DataChannels 數據的切片
type DataChannelSlice [] DataChannel
複製代碼

DataChannelSlice 的建立是爲了保留 DataChannel 的切片並輕鬆引用它們。異步

事件總線

// EventBus 存儲有關訂閱者感興趣的特定主題的信息
type EventBus struct {
   subscribers map[string]DataChannelSlice
   rm sync.RWMutex
}
複製代碼

EventBussubscribers,這是一個包含 DataChannelSlices 的 map。咱們使用互斥鎖來保護併發訪問的讀寫。函數

經過使用 map 和定義 topics ,它容許咱們輕鬆地組織事件。主題被視爲 map 的鍵。當有人發佈它時,咱們能夠經過鍵輕鬆找到主題,而後將事件傳播到 channel 中以進行進一步處理。

訂閱主題

對於訂閱主題,使用 channel。它就像傳統方法中的回調同樣。當發佈者向主題發佈數據時,channel將接收數據。

func (eb *EventBus)Subscribe(topic string, ch DataChannel)  {
   eb.rm.Lock()
   if prev, found := eb.subscribers[topic]; found {
      eb.subscribers[topic] = append(prev, ch)
   } else {
      eb.subscribers[topic] = append([]DataChannel{}, ch)
   }
   eb.rm.Unlock()
}
複製代碼

簡單地說,咱們將訂閱者添加到 channel 切片中而後給該結構加鎖,最後在操做後將其解鎖。

發佈主題

要發佈事件,發佈者須要提供廣播給訂閱者所須要的主題和數據。

func (eb *EventBus) Publish(topic string, data interface{}) {
   eb.rm.RLock()
   if chans, found := eb.subscribers[topic]; found {
      // 這樣作是由於切片引用相同的數組,即便它們是按值傳遞的
      // 所以咱們正在使用咱們的元素建立一個新切片,從而能正確地保持鎖定
      channels := append(DataChannelSlice{}, chans...)
      go func(data DataEvent, dataChannelSlices DataChannelSlice) {
         for _, ch := range dataChannelSlices {
            ch <- data
         }
      }(DataEvent{Data: data, Topic: topic}, channels)
   }
   eb.rm.RUnlock()
}
複製代碼

在此方法中,首先咱們檢查主題是否存在任何訂閱者。而後咱們只是簡單地遍歷與主題相關的 channel 切片並把事件發佈給它們。

請注意,咱們在發佈方法中使用了 goroutine 來避免阻塞發佈者

開始

首先,咱們須要建立一個事件總線的實例。在實際場景中,你能夠從包中導出單個 EventBus使其像單例同樣運行

var eb = &EventBus{
   subscribers: map[string]DataChannelSlice{},
}
複製代碼

爲了測試新建立的事件總線,咱們將建立一個以隨機間隔時間發佈到指定主題的方法。

func publisTo(topic string, data string)  {
   for {
      eb.Publish(topic, data)
      time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
   }
}
複製代碼

接下來,咱們須要一個能夠收聽主題的 main 函數。它使用輔助方法打印出事件的數據。

func printDataEvent(ch string, data DataEvent)  {
   fmt.Printf("Channel: %s; Topic: %s; DataEvent: %v\n", ch, data.Topic, data.Data)
}
func main()  {
   ch1 := make(chan DataEvent)
   ch2 := make(chan DataEvent)
   ch3 := make(chan DataEvent)
   eb.Subscribe("topic1", ch1)
   eb.Subscribe("topic2", ch2)
   eb.Subscribe("topic2", ch3)
   go publisTo("topic1", "Hi topic 1")
   go publisTo("topic2", "Welcome to topic 2")
   for {
      select {
      case d := <-ch1:
         go printDataEvent("ch1", d)
      case d := <-ch2:
         go printDataEvent("ch2", d)
      case d := <-ch3:
         go printDataEvent("ch3", d)
      }
   }
}
複製代碼

咱們建立了三個能夠訂閱主題的 channels 訂閱者(ch1,ch2,ch3)。其中 ch2 和 ch3 這兩個監聽同一事件。

咱們使用 select 語句從最快返回的 channel 中獲取數據。而後它使用另外一個 goroutine 打印輸出數據。用 goroutine 也不是必需的。但在某些狀況下,你必須對事件進行一些繁重的操做處理。爲了防止阻塞 select,咱們使用了 goroutine。

示例輸出將以下所示

Channel: ch1; Topic: topic1; DataEvent: Hi topic 1
Channel: ch2; Topic: topic2; DataEvent: Welcome to topic 2
Channel: ch3; Topic: topic2; DataEvent: Welcome to topic 2
Channel: ch3; Topic: topic2; DataEvent: Welcome to topic 2
Channel: ch2; Topic: topic2; DataEvent: Welcome to topic 2
Channel: ch1; Topic: topic1; DataEvent: Hi topic 1
Channel: ch3; Topic: topic2; DataEvent: Welcome to topic 2
...
複製代碼

你能夠看到事件總線經過 channel 分發事件。

基於簡單 channel 的事件總線的源代碼。

完整的代碼

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

type DataEvent struct {
	Data  interface{}
	Topic string
}

// DataChannel 是一個能接收 DataEvent 的 channel
type DataChannel chan DataEvent

// DataChannelSlice 是一個包含 DataChannels 數據的切片
type DataChannelSlice []DataChannel

// EventBus 存儲有關訂閱者感興趣的特定主題的信息
type EventBus struct {
	subscribers map[string]DataChannelSlice
	rm          sync.RWMutex
}

func (eb *EventBus) Publish(topic string, data interface{}) {
	eb.rm.RLock()
	if chans, found := eb.subscribers[topic]; found {
		// 這樣作是由於切片引用相同的數組,即便它們是按值傳遞的
		// 所以咱們正在使用咱們的元素建立一個新切片,從而正確地保持鎖定
		channels := append(DataChannelSlice{}, chans...)
		go func(data DataEvent, dataChannelSlices DataChannelSlice) {
			for _, ch := range dataChannelSlices {
				ch <- data
			}
		}(DataEvent{Data: data, Topic: topic}, channels)
	}
	eb.rm.RUnlock()
}

func (eb *EventBus) Subscribe(topic string, ch DataChannel) {
	eb.rm.Lock()
	if prev, found := eb.subscribers[topic]; found {
		eb.subscribers[topic] = append(prev, ch)
	} else {
		eb.subscribers[topic] = append([]DataChannel{}, ch)
	}
	eb.rm.Unlock()
}

var eb = &EventBus{
	subscribers: map[string]DataChannelSlice{},
}

func printDataEvent(ch string, data DataEvent) {
	fmt.Printf("Channel: %s; Topic: %s; DataEvent: %v\n", ch, data.Topic, data.Data)
}

func publisTo(topic string, data string) {
	for {
		eb.Publish(topic, data)
		time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
	}
}

func main() {
	ch1 := make(chan DataEvent)
	ch2 := make(chan DataEvent)
	ch3 := make(chan DataEvent)

	eb.Subscribe("topic1", ch1)
	eb.Subscribe("topic2", ch2)
	eb.Subscribe("topic2", ch3)

	go publisTo("topic1", "Hi topic 1")
	go publisTo("topic2", "Welcome to topic 2")

	for {
		select {
		case d := <-ch1:
			go printDataEvent("ch1", d)
		case d := <-ch2:
			go printDataEvent("ch2", d)
		case d := <-ch3:
			go printDataEvent("ch3", d)
		}
	}
}
複製代碼

使用 channel 取代回調的理由

傳統的回調方式要求實現某種接口。

例如,

type Subscriber interface {
   onData(event Event)
}
複製代碼

使用回調的話,若是你想訂閱一個事件,你須要實現該接口,以便事件總線能夠傳播它。

type MySubscriber struct {
}
func (m MySubscriber) onData(event Event)  {
   // 處理事件
}
複製代碼

channel 容許你在沒有接口的狀況下在一個簡單的函數中註冊訂閱者。

func main() {
   ch1 := make(chan DataEvent)
   eb.Subscribe("topic1", ch1)
   fmt.Println((<-ch1).Data)
   ...
}
複製代碼

結論

本文的目的是指出編寫事件總線的不一樣實現方法。

這可能不是理想的解決方案。

例如,channel 被阻塞直到有人消費它們。這有必定的侷限性。

我已經使用切片來存儲主題的全部訂閱者。這用於簡化文章。這須要用 SET 替換,以致於列表中不存在重複的訂閱者。

傳統的回調方法可使用提供的相同的原理去簡單地實現。你能夠輕鬆地在 goroutine 中進行異步裝飾發佈事件。

我很想聽聽你對這篇文章的見解。 :)

相關文章
相關標籤/搜索