go-event是一個在Docker項目中使用到的一個事件分發組件,實現了常規的廣播,隊列等事件分發模型,代碼簡潔明瞭,也適合初學者對Go語言的入門,對channel用來同步,通訊也會加深理解。程序員
Eventdocker
type Event interface{}
Event被封裝爲一個空接口,接受任意類型。在go-events表示一個能夠被分發的事件。 json
interface{}的底層相似於c語言中的void*,但比void*強大不少,好比interface{}保存了指向對象的指針和類型,而c程序員使用void*時,必須本身去保證對象的類型是正確的)數據結構
Sinkapp
type Sink interface { Write(event Event) error Close() error }
Sink是一個用來分發事件(Event)的結構。能夠看成事件的處理者,使用接口的方式聲明。只要對象實現了這兩個方法,就能夠被看成一個Sink。
核型方法函數
Write(event Event) errorpost
Close() errorurl
go-event核心就是圍繞Sink作文章,docker官方給出了一個http的例子,就是當調用Write時,發起一次post請求。:設計
func (h *httpSink) Write(event Event) error { p, err := json.Marshal(event) if err != nil { return err } body := bytes.NewReader(p) resp, err := h.client.Post(h.url, "application/json", body) if err != nil { return err } defer resp.Body.Close() if resp.Status != 200 { return errors.New("unexpected status") } return nil } // implement (*httpSink).Close()
到此爲止,sink定義了事件分發的基本單位。在go-event中,封裝了廣播,消息隊列兩種消息分發的模型,具體來講,就是實現了Sink接口的兩個結構體。指針
type Broadcaster struct { sinks []Sink //所包含的Sink events chan Event// 同步Event的channel adds chan configureRequest //adds和remove必須保證thread-safe,因此採用channel同步 removes chan configureRequest shutdown chan struct{} closed chan struct{} once sync.Once }
Boardcaster由多個Sink組成,當Boardcaster接收到一個事件時,會調用自身包含的全部Sink的Write()方法
go-events設計之初就實現協程之間的消息分發,須要保證thread-safe,因此對event的處理,添加,移除Sink都使用管道來通訊。這也是Go的一個使用原則:
使用通訊來共享內存,而不是經過共享內存來通訊
在Broadcaster中全部的臨界資源(sinks,event)都經過自身的run()函數統一管理,外界則經過相應的channel 同步給Broadcaster
例如Write()
func (b *Broadcaster) Write(event Event) error { select { case b.events <- event: case <-b.closed: return ErrSinkClosed } return nil }
能夠看到增減sink都是經過向對應的channel寫入數據進行的。
func (b *Broadcaster) Add(sink Sink) error { return b.configure(b.adds, sink) // will be block until ch can be writen } func (b *Broadcaster) configure(ch chan configureRequest, sink Sink) error { response := make(chan error, 1) for { select { case ch <- configureRequest{ sink: sink, response: response}: ch = nil // ? case err := <-response: return err case <-b.closed: return ErrSinkClosed } } }
核心run函數的實現,監聽Boardcast管道上的相應事件,並做出處理。
func (b *Broadcaster) run() { defer close(b.closed) //將remove封裝了一下,由於下面兩處都會用到 remove := func(target Sink) { for i, sink := range b.sinks { if sink == target { b.sinks = append(b.sinks[:i], b.sinks[i+1:]...) break } } } // 輪訓處理channel上的事件 for { select { case event := <-b.events: //有事件到來,進行廣播 for _, sink := range b.sinks { if err := sink.Write(event); err != nil { if err == ErrSinkClosed { // remove closed sinks remove(sink) continue } logrus.WithField("event", event).WithField("events.sink", sink).WithError(err). Errorf("broadcaster: dropping event") } } case request := <-b.adds: //增長sink事件 // while we have to iterate for add/remove, common iteration for // send is faster against slice. var found bool for _, sink := range b.sinks { if request.sink == sink { found = true break } } if !found { b.sinks = append(b.sinks, request.sink) } // b.sinks[request.sink] = struct{}{} request.response <- nil // 喚醒阻塞的configure()函數 case request := <-b.removes://刪除sink事件 remove(request.sink) request.response <- nil case <-b.shutdown: // close all the underlying sinks for _, sink := range b.sinks { if err := sink.Close(); err != nil && err != ErrSinkClosed { logrus.WithField("events.sink", sink).WithError(err). Errorf("broadcaster: closing sink failed") } } return } } }
queue使用contaienr/list實現了典型的生產消費者模型
type Queue struct { dst Sink events *list.List cond *sync.Cond mu sync.Mutex closed bool }
核心函數run(),在隊列中取出下一個event,交給自身的sink處理,在沒有事件隊列的狀況下,eq.next()老是阻塞的(使用條件變量進行同步)
func (eq *Queue) run() { for { event := eq.next() if event == nil { return // nil block means event queue is closed. } if err := eq.dst.Write(event); err != nil { logrus.WithFields(logrus.Fields{ "event": event, "sink": eq.dst, }).WithError(err).Debug("eventqueue: dropped event") } } }
生產者:q.next()
消費者:write()
func (eq *Queue) Write(event Event) error { eq.mu.Lock() defer eq.mu.Unlock() if eq.closed { return ErrSinkClosed } eq.events.PushBack(event) eq.cond.Signal() // signal waiters return nil } func (eq *Queue) next() Event { eq.mu.Lock() defer eq.mu.Unlock() for eq.events.Len() < 1 { if eq.closed { eq.cond.Broadcast() return nil } eq.cond.Wait() } front := eq.events.Front() block := front.Value.(Event) eq.events.Remove(front) return block }