閱讀源碼的過程,就像是在像武俠小說裏閱讀武功祕籍同樣,分析高手的一招一式,提煉出精髓,來加強本身的內力。
以前的帖子說了一下微服務的雪崩效應和常見的解決方案,太水,沒有上代碼怎麼叫解決方案。github
上有不少開源的庫來解決雪崩問題
,比較出名的是Netflix
的開源庫hystrix。集流量控制
、熔斷
、容錯
等於一身的java
語言的庫。今天分析的源碼庫是 hystrix-go,他是hystrix的的go
語言版,應該是說簡化版本,用不多的代碼量實現了主要功能。很推薦朋友們有時間讀一讀。html
hystrix
的使用是很是簡單的,同步執行,直接調用Do
方法。java
err := hystrix.Do("my_command", func() error { // talk to other services return nil }, func(err error) error { // do this when services are down return nil })
異步執行Go
方法,內部實現是啓動了一個gorouting
,若是想獲得自定義方法的數據,須要你傳channel
來處理數據,或者輸出。返回的error
也是一個channel
git
output := make(chan bool, 1) errors := hystrix.Go("my_command", func() error { // talk to other services output <- true return nil }, nil) select { case out := <-output: // success case err := <-errors: // failure
大概的執行流程圖
github
其實方法Do
和Go
方法內部都是調用了hystrix.GoC
方法,只是Do
方法處理了異步的過程算法
func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) error { done := make(chan struct{}, 1) r := func(ctx context.Context) error { err := run(ctx) if err != nil { return err } done <- struct{}{} return nil } f := func(ctx context.Context, e error) error { err := fallback(ctx, e) if err != nil { return err } done <- struct{}{} return nil } var errChan chan error if fallback == nil { errChan = GoC(ctx, name, r, nil) } else { errChan = GoC(ctx, name, r, f) } select { case <-done: return nil case err := <-errChan: return err } }
在調用Do
Go
等方法以前咱們能夠先自定義一些配置docker
hystrix.ConfigureCommand("mycommand", hystrix.CommandConfig{ Timeout: int(time.Second * 3), MaxConcurrentRequests: 100, SleepWindow: int(time.Second * 5), RequestVolumeThreshold: 30, ErrorPercentThreshold: 50, }) err := hystrix.DoC(context.Background(), "mycommand", func(ctx context.Context) error { // ... return nil }, func(i context.Context, e error) error { // ... return e })
我大要說了一下CommandConfig
第個字段的意義:json
默認時間是1000毫秒
默認值是10
默認值是5000毫秒
默認值是20
RequestVolumeThreshold
而且錯誤率到達這個百分比後就會啓動熔斷
默認值是50
固然若是不配置他們,會使用默認值
併發
講完了怎麼用,接下來就是分析源碼了。我是從下層到上層的順序分析代碼和執行流程異步
每個Command都會有一個默認統計控制器,固然也能夠添加多個自定義的控制器。
默認的統計控制器DefaultMetricCollector
保存着熔斷器
的全部狀態,調用次數
,失敗次數
,被拒絕次數
等等微服務
type DefaultMetricCollector struct { mutex *sync.RWMutex numRequests *rolling.Number errors *rolling.Number successes *rolling.Number failures *rolling.Number rejects *rolling.Number shortCircuits *rolling.Number timeouts *rolling.Number contextCanceled *rolling.Number contextDeadlineExceeded *rolling.Number fallbackSuccesses *rolling.Number fallbackFailures *rolling.Number totalDuration *rolling.Timing runDuration *rolling.Timing }
最主要的仍是要看一下rolling.Number
,rolling.Number
纔是狀態最終保存的地方
Number
保存了10秒內的Buckets
數據信息,每個Bucket
的統計時長爲1秒
type Number struct { Buckets map[int64]*numberBucket Mutex *sync.RWMutex } type numberBucket struct { Value float64 }
字典字段Buckets map[int64]*numberBucket
中的Key
保存的是當前時間
可能你會好奇Number
是如何保證只保存10秒內的數據的。每一次對熔斷器
的狀態進行修改時,Number
都要先獲得當前的時間(秒級)的Bucket
不存在則建立。
func (r *Number) getCurrentBucket() *numberBucket { now := time.Now().Unix() var bucket *numberBucket var ok bool if bucket, ok = r.Buckets[now]; !ok { bucket = &numberBucket{} r.Buckets[now] = bucket } return bucket }
修改完後去掉10秒外的數據
func (r *Number) removeOldBuckets() { now := time.Now().Unix() - 10 for timestamp := range r.Buckets { // TODO: configurable rolling window if timestamp <= now { delete(r.Buckets, timestamp) } } }
好比Increment
方法,先獲得Bucket
再刪除舊的數據
func (r *Number) Increment(i float64) { if i == 0 { return } r.Mutex.Lock() defer r.Mutex.Unlock() b := r.getCurrentBucket() b.Value += i r.removeOldBuckets() }
統計控制器是最基層和最重要的一個實現,上層全部的執行判斷都是基於他的數據進行邏輯處理的
斷路器-->執行-->上報執行狀態信息-->保存到相應的Buckets
每一次斷路器邏輯的執行都會上報執行過程當中的狀態,
// ReportEvent records command metrics for tracking recent error rates and exposing data to the dashboard. func (circuit *CircuitBreaker) ReportEvent(eventTypes []string, start time.Time, runDuration time.Duration) error { // ... circuit.mutex.RLock() o := circuit.open circuit.mutex.RUnlock() if eventTypes[0] == "success" && o { circuit.setClose() } var concurrencyInUse float64 if circuit.executorPool.Max > 0 { concurrencyInUse = float64(circuit.executorPool.ActiveCount()) / float64(circuit.executorPool.Max) } select { case circuit.metrics.Updates <- &commandExecution{ Types: eventTypes, Start: start, RunDuration: runDuration, ConcurrencyInUse: concurrencyInUse, }: default: return CircuitError{Message: fmt.Sprintf("metrics channel (%v) is at capacity", circuit.Name)} } return nil }
circuit.metrics.Updates
這個信道就是處理上報信息的,上報執行狀態自信的結構是metricExchange
,結構體很簡單隻有4個字段。要的就是
channel
字段Updates
他是一個有buffer
的channel
默認的數量是2000
個,全部的狀態信息都在他裏面metricCollectors
字段,就是保存的具體的這個command
執行過程當中的各類信息type metricExchange struct { Name string Updates chan *commandExecution Mutex *sync.RWMutex metricCollectors []metricCollector.MetricCollector } type commandExecution struct { Types []string `json:"types"` Start time.Time `json:"start_time"` RunDuration time.Duration `json:"run_duration"` ConcurrencyInUse float64 `json:"concurrency_inuse"` } func newMetricExchange(name string) *metricExchange { m := &metricExchange{} m.Name = name m.Updates = make(chan *commandExecution, 2000) m.Mutex = &sync.RWMutex{} m.metricCollectors = metricCollector.Registry.InitializeMetricCollectors(name) m.Reset() go m.Monitor() return m }
在執行newMetricExchange
的時候會啓動一個協程 go m.Monitor()
去監控Updates
的數據,而後上報給metricCollectors
保存執行的信息數據好比前面提到的調用次數
,失敗次數
,被拒絕次數
等等
func (m *metricExchange) Monitor() { for update := range m.Updates { // we only grab a read lock to make sure Reset() isn't changing the numbers. m.Mutex.RLock() totalDuration := time.Since(update.Start) wg := &sync.WaitGroup{} for _, collector := range m.metricCollectors { wg.Add(1) go m.IncrementMetrics(wg, collector, update, totalDuration) } wg.Wait() m.Mutex.RUnlock() } }
更新調用的是go m.IncrementMetrics(wg, collector, update, totalDuration)
,裏面判斷了他的狀態
func (m *metricExchange) IncrementMetrics(wg *sync.WaitGroup, collector metricCollector.MetricCollector, update *commandExecution, totalDuration time.Duration) { // granular metrics r := metricCollector.MetricResult{ Attempts: 1, TotalDuration: totalDuration, RunDuration: update.RunDuration, ConcurrencyInUse: update.ConcurrencyInUse, } switch update.Types[0] { case "success": r.Successes = 1 case "failure": r.Failures = 1 r.Errors = 1 case "rejected": r.Rejects = 1 r.Errors = 1 // ... } // ... collector.Update(r) wg.Done() }
hystrix-go
對流量控制的代碼是很簡單的。用了一個簡單的令牌算法,能獲得令牌的就能夠執行後繼的工做,執行完後要返還令牌。得不到令牌就拒絕,拒絕後調用用戶設置的callback
方法,若是沒有設置就不執行。
結構體executorPool
就是hystrix-go
流量控制
的具體實現。字段Max
就是每秒最大的併發值。
type executorPool struct { Name string Metrics *poolMetrics Max int Tickets chan *struct{} }
在建立executorPool
的時候,會根據Max
值來建立令牌
。Max值若是沒有設置會使用默認值10
func newExecutorPool(name string) *executorPool { p := &executorPool{} p.Name = name p.Metrics = newPoolMetrics(name) p.Max = getSettings(name).MaxConcurrentRequests p.Tickets = make(chan *struct{}, p.Max) for i := 0; i < p.Max; i++ { p.Tickets <- &struct{}{} } return p }
注意一下字段 Metrics
他用於統計執行數量,好比:執行的總數量
,最大的併發數
具體的代碼就不貼上來了。這個數量也能夠顯露出,供可視化程序直觀的表現出來。
令牌使用完後是須要返還的,返回的時候纔會作上面所說的統計工做。
func (p *executorPool) Return(ticket *struct{}) { if ticket == nil { return } p.Metrics.Updates <- poolMetricsUpdate{ activeCount: p.ActiveCount(), } p.Tickets <- ticket } func (p *executorPool) ActiveCount() int { return p.Max - len(p.Tickets) }
上面把 統計控制器
、流量控制
、上報執行狀態
講完了,主要的實現也就講的差很少了。最後就是串一次command的執行都經歷了啥:
err := hystrix.Do("my_command", func() error { // talk to other services return nil }, func(err error) error { // do this when services are down return nil })
hystrix
在執行一次command的前面也有提到過會調用GoC
方法,下面我把代碼貼出來來,篇幅問題去掉了一些代碼
,主要邏輯都在。就是在判斷斷路器是否已打開
,獲得Ticket
得不到就限流,執行咱們本身的的方法
,判斷context是否Done或者執行是否超時
固然,每次執行結果都要上報執行狀態
,最後要返還Ticket
func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) chan error { cmd := &command{ run: run, fallback: fallback, start: time.Now(), errChan: make(chan error, 1), finished: make(chan bool, 1), } //獲得斷路器,不存在則建立 circuit, _, err := GetCircuit(name) if err != nil { cmd.errChan <- err return cmd.errChan } //... // 返還ticket returnTicket := func() { // ... cmd.circuit.executorPool.Return(cmd.ticket) } // 上報執行狀態 reportAllEvent := func() { err := cmd.circuit.ReportEvent(cmd.events, cmd.start, cmd.runDuration) // ... } go func() { defer func() { cmd.finished <- true }() // 查看斷路器是否已打開 if !cmd.circuit.AllowRequest() { // ... returnOnce.Do(func() { returnTicket() cmd.errorWithFallback(ctx, ErrCircuitOpen) reportAllEvent() }) return } // ... // 獲取ticket 若是得不到就限流 select { case cmd.ticket = <-circuit.executorPool.Tickets: ticketChecked = true ticketCond.Signal() cmd.Unlock() default: // ... returnOnce.Do(func() { returnTicket() cmd.errorWithFallback(ctx, ErrMaxConcurrency) reportAllEvent() }) return } // 執行咱們自已的方法,並上報執行信息 returnOnce.Do(func() { defer reportAllEvent() cmd.runDuration = time.Since(runStart) returnTicket() if runErr != nil { cmd.errorWithFallback(ctx, runErr) return } cmd.reportEvent("success") }) }() // 等待context是否被結束,或執行者超時,並上報 go func() { timer := time.NewTimer(getSettings(name).Timeout) defer timer.Stop() select { case <-cmd.finished: // returnOnce has been executed in another goroutine case <-ctx.Done(): // ... return case <-timer.C: // ... } }() return cmd.errChan }
代碼中StreamHandler
就是把全部斷路器
的狀態以流的方式不斷的推送到dashboard. 這部分代碼我就不用說了,很簡單。
須要在你的服務端加3行代碼,啓動咱們的流服務
hystrixStreamHandler := hystrix.NewStreamHandler() hystrixStreamHandler.Start() go http.ListenAndServe(net.JoinHostPort("", "81"), hystrixStreamHandler)
dashboard
我使用的是docker
版。
docker run -d -p 8888:9002 --name hystrix-dashboard mlabouardy/hystrix-dashboard:latest
在下面輸入你服務的地址,我是
http://192.168.1.67:81/hystrix.stream
若是是集羣可使用Turbine進行監控,有時間你們本身來看吧