beehive 源碼閱讀- go 語言的自動化機器

beehive

Overview

看一下做者本人的註釋api

// Package bees is Beehive's central module system.tcp

beehive 很是有趣的在於各邏輯的解耦設計,這不只讓自己功能操做簡單,也讓擴展變得關注點少了不少,只須要一點學習成本就能夠擴展本身的 beehivesvg

首先解釋一下 bee hive 中 的概念函數

bee 表明的是咱們常見的 Worker 也就是說,實際的行爲是由這些 小蜜蜂執行的。他們就相似於採蜜的工人,採集到了以後統一放回來處理工具

hive 是蜂房,也就是咱們常見的 WorkerPool 不一樣的是,她更像一個 Facotry ,什麼意思呢?她能夠建立專屬的 bee 。在極少的配置下,好比只須要配置上一個 token 便可。就能夠生成一隻 bee 專門針對某一種蜜工做了。oop

chain 又是什麼? chain 就是連接事件與處理的工具,咱們認爲 bee 採回蜜是一個事件,總不可能採回來啥都不幹吧。針對不一樣的 蜜 咱們就有不一樣的反應,就有不一樣的 action
好比某人的 blog 更新了 ,rss bee 接收到了以後飛回來,咱們就能夠再要求 email bee 把這其中的信息經過郵件發給咱們或者咱們想發給的人。
這就須要 chain 來聯繫 event 和 action 了
學習

Landscape

API Register

beehive-api (1).svg
成組的 API 實現了 Resource 接口註冊到 Container 的路由 Route 中。this

幫助理解

在文件中寫好了 做者 實現的 Handle 的實現來註冊 http 請求
image.pngspa

API 只是供調用,邏輯重點在 bees 這個包裏的實現。debug

Bee

首先是有的接口

// BeeInterface is an interface all bees implement.
type BeeInterface interface {
    // Name of the bee
    Name() string
    // Namespace of the bee
    Namespace() string

    // Description of the bee
    Description() string
    // SetDescription sets a description
    SetDescription(s string)

    // Config returns this bees config
    Config() BeeConfig
    // Options of the bee
    Options() BeeOptions
    // SetOptions to configure the bee
    SetOptions(options BeeOptions)

    // ReloadOptions gets called after a bee's options get updated
    ReloadOptions(options BeeOptions)

    // Activates the bee
    Run(eventChannel chan Event)
    // Running returns the current state of the bee
    IsRunning() bool
    // Start the bee
    Start()
    // Stop the bee
    Stop()

    LastEvent() time.Time
    LogEvent()
    LastAction() time.Time
    LogAction()

    Logln(args ...interface{})
    Logf(format string, args ...interface{})
    LogErrorf(format string, args ...interface{})
    LogFatal(args ...interface{})

    SetSigChan(c chan bool)
    WaitGroup() *sync.WaitGroup

    // Handles an action
    Action(action Action) []Placeholder
}

和他的基礎實現

// Bee is the base-struct to be embedded by bee implementations.
type Bee struct {
    config BeeConfig

    lastEvent  time.Time
    lastAction time.Time

    Running   bool
    SigChan   chan bool
    waitGroup *sync.WaitGroup
}

這裏須要注意的是 Run 接口,在 Base-Struct Bee 中該方法 是空的實現,由於 Run 是 Bee 的生命週期開始處,是自動開始的。

WebBee 實現

簡單的看某一個實現便可

// WebBee is a Bee that starts an HTTP server and fires events for incoming
// requests.
type WebBee struct {
    bees.Bee

    addr string

    eventChan chan bees.Event
}

能夠很清楚的指導,這個 WebBee 中的 eventChan 正是通知的地方,也就是上文所說的 Chain 的開始處。注意的是因爲鬆耦合的設計,任何 Bee 均可以成爲 Chain 上的一環,只要它能觸發事件。或者監聽事件。

func (mod *WebBee) Run(cin chan bees.Event)
// Run executes the Bee's event loop.
func (mod *WebBee) Run(cin chan bees.Event) {
    mod.eventChan = cin

    srv := &http.Server{Addr: mod.addr, Handler: mod}
    l, err := net.Listen("tcp", mod.addr)
    if err != nil {
        mod.LogErrorf("Can't listen on %s", mod.addr)
        return
    }
    defer l.Close()

    go func() {
        err := srv.Serve(l)
        if err != nil {
            mod.LogErrorf("Server error: %v", err)
        }
        // Go 1.8+: srv.Close()
    }()

    select {
    case <-mod.SigChan:
        return
    }
}

同時 WebBee 也有一個方法 ServeHTTP 來實現 http.Handle 來處理請求。
這裏也就是前文所說的 註冊的那些 API 的部分來源,每個 bee 自身實現的自動註冊暴露給外界調用。

func (mod *WebBee) ServeHTTP(w http.ResponseWriter, req *http.Request)

Event

package:beehive/bees/event.go
剛纔講到了 觸發事件 event 的 WebBee 實現,如今咱們來看 event 的實現

其實是經過 這個函數實現的

// handleEvents handles incoming events and executes matching Chains.
func handleEvents() {
    for {
        event, ok := <-eventsIn
        ···
        bee := GetBee(event.Bee)
        (*bee).LogEvent()

        ···
        go func() {
            defer func() {
                if e := recover(); e != nil {
                    log.Printf("Fatal chain event: %s %s", e, debug.Stack())
                }
            }()

            execChains(&event)
        }()
    }
}

省略了 日誌部分。能夠看到 handleEvents 經過接受通道里的 event,並檢查 event 中的 Bee 做爲 標誌找到對應的 Bee 喚醒。

這裏咱們能夠看到 最後進入了 Chains 中執行,即上文所說的 Chain 將 Event 和 Action 連接了起來,讓 Bee 之間可以協做。

chain

package:beehive/bees/chains.go
chain 中其實是調用 Actions 經過下面的 execActions 函數

for _, el := range c.Actions {
            action := GetAction(el)
            if action == nil {
                log.Println("\t\tERROR: Unknown action referenced!")
                continue
            }
            execAction(*action, m)
        }

咱們來看看 Action 的執行。

Action Exec

package: beehive/bees/actions.go
actions 既能夠運行設置中的 options 也能夠直接在 運行函數中傳入須要運行的 options
func execAction(action Action, opts map[string]interface{}) bool

bee-actions (1).svg

Summary

整個執行邏輯是如此了,其餘還有一些 

  • 日誌處理:用於跟蹤 bee 和 hive 的狀況
  • Config:保存配置文件,每一次啓動能夠從新放飛之前的 Bee 們
  • Signal:beehive 攔截了一些 Signal 的 Kill 等信號來執行優雅退出,避免了 Config 等的丟失。
  • Run Flag:執行的附帶參數,設定 Beehive 整個應用的監聽端口和版本配置。
相關文章
相關標籤/搜索