Docker組件go-event 源碼學習

go-event是一個在Docker項目中使用到的一個事件分發組件,實現了常規的廣播,隊列等事件分發模型,代碼簡潔明瞭,也適合初學者對Go語言的入門,對channel用來同步,通訊也會加深理解。程序員

核心數據結構

Eventdocker

type Event interface{}

Event被封裝爲一個空接口,接受任意類型。在go-events表示一個能夠被分發的事件。 json

interface{}的底層相似於c語言中的void*,但比void*強大不少,好比interface{}保存了指向對象的指針和類型,而c程序員使用void*時,必須本身去保證對象的類型是正確的)數據結構

Sinkapp

type Sink interface {
    Write(event Event) error
    Close() error
}

Sink是一個用來分發事件(Event)的結構。能夠看成事件的處理者,使用接口的方式聲明。只要對象實現了這兩個方法,就能夠被看成一個Sink。
核型方法函數

  • Write(event Event) errorpost

    • 定義了事件如何被分發的策略。
  • Close() errorurl

    • 當Sink被關閉的處理策略。

go-event核心就是圍繞Sink作文章,docker官方給出了一個http的例子,就是當調用Write時,發起一次post請求。:設計

func (h *httpSink) Write(event Event) error {
    p, err := json.Marshal(event)
    if err != nil {
        return err
    }
    body := bytes.NewReader(p)
    resp, err := h.client.Post(h.url, "application/json", body)
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    
    if resp.Status != 200 {
        return errors.New("unexpected status")
    }

    return nil
}

// implement (*httpSink).Close()

實現模型

到此爲止,sink定義了事件分發的基本單位。在go-event中,封裝了廣播,消息隊列兩種消息分發的模型,具體來講,就是實現了Sink接口的兩個結構體。指針

Boadcaster

type Broadcaster struct {
    sinks   []Sink //所包含的Sink
    events  chan Event// 同步Event的channel
    adds    chan configureRequest //adds和remove必須保證thread-safe,因此採用channel同步
    removes chan configureRequest

    shutdown chan struct{}
    closed   chan struct{}
    once     sync.Once
}

Boardcaster由多個Sink組成,當Boardcaster接收到一個事件時,會調用自身包含的全部Sink的Write()方法
go-events設計之初就實現協程之間的消息分發,須要保證thread-safe,因此對event的處理,添加,移除Sink都使用管道來通訊。這也是Go的一個使用原則:

使用通訊來共享內存,而不是經過共享內存來通訊

在Broadcaster中全部的臨界資源(sinks,event)都經過自身的run()函數統一管理,外界則經過相應的channel 同步給Broadcaster
例如Write()

func (b *Broadcaster) Write(event Event) error {
    select {
    case b.events <- event:
    case <-b.closed:
        return ErrSinkClosed
    }
    return nil
}

能夠看到增減sink都是經過向對應的channel寫入數據進行的。

func (b *Broadcaster) Add(sink Sink) error {
    return b.configure(b.adds, sink) //  will be block until ch can be writen
}

func (b *Broadcaster) configure(ch chan configureRequest, sink Sink) error {
    response := make(chan error, 1)

    for {
        select {
        case ch <- configureRequest{
            sink:     sink,
            response: response}:
            ch = nil // ?
        case err := <-response: 
            return err
        case <-b.closed:
            return ErrSinkClosed
        }
    }
}

核心run函數的實現,監聽Boardcast管道上的相應事件,並做出處理。

func (b *Broadcaster) run() {
    defer close(b.closed)
    //將remove封裝了一下,由於下面兩處都會用到
    remove := func(target Sink) {
        for i, sink := range b.sinks {
            if sink == target {
                b.sinks = append(b.sinks[:i], b.sinks[i+1:]...)
                break
            }
        }
    }
    // 輪訓處理channel上的事件
    for {
        select {
        case event := <-b.events: //有事件到來,進行廣播
            for _, sink := range b.sinks {
                if err := sink.Write(event); err != nil {
                    if err == ErrSinkClosed {
                        // remove closed sinks
                        remove(sink)
                        continue
                    }
                    logrus.WithField("event", event).WithField("events.sink", sink).WithError(err).
                        Errorf("broadcaster: dropping event")
                }
            }
        case request := <-b.adds: //增長sink事件
            // while we have to iterate for add/remove, common iteration for
            // send is faster against slice.

            var found bool
            for _, sink := range b.sinks {
                if request.sink == sink {
                    found = true
                    break
                }
            }

            if !found {
                b.sinks = append(b.sinks, request.sink)
            }
            // b.sinks[request.sink] = struct{}{}
            request.response <- nil // 喚醒阻塞的configure()函數
            
        case request := <-b.removes://刪除sink事件
            remove(request.sink)
            request.response <- nil
        case <-b.shutdown:
            // close all the underlying sinks
            for _, sink := range b.sinks {
                if err := sink.Close(); err != nil && err != ErrSinkClosed {
                    logrus.WithField("events.sink", sink).WithError(err).
                        Errorf("broadcaster: closing sink failed")
                }
            }
            return
        }
    }
}

queue

queue使用contaienr/list實現了典型的生產消費者模型

type Queue struct {
    dst    Sink
    events *list.List
    cond   *sync.Cond 
    mu     sync.Mutex
    closed bool
}

核心函數run(),在隊列中取出下一個event,交給自身的sink處理,在沒有事件隊列的狀況下,eq.next()老是阻塞的(使用條件變量進行同步)

func (eq *Queue) run() {
    for {
        event := eq.next()

        if event == nil {
            return // nil block means event queue is closed.
        }

        if err := eq.dst.Write(event); err != nil {
            logrus.WithFields(logrus.Fields{
                "event": event,
                "sink":  eq.dst,
            }).WithError(err).Debug("eventqueue: dropped event")
        }
    }
}

生產者:q.next()
消費者:write()

func (eq *Queue) Write(event Event) error {
    eq.mu.Lock()
    defer eq.mu.Unlock()

    if eq.closed {
        return ErrSinkClosed
    }

    eq.events.PushBack(event)
    eq.cond.Signal() // signal waiters

    return nil
}

func (eq *Queue) next() Event {
    eq.mu.Lock()
    defer eq.mu.Unlock()

    for eq.events.Len() < 1 {
        if eq.closed {
            eq.cond.Broadcast()
            return nil
        }

        eq.cond.Wait()
    }

    front := eq.events.Front()
    block := front.Value.(Event)
    eq.events.Remove(front)

    return block
}
相關文章
相關標籤/搜索