client-go中的golang技巧

client-go中有不少比較有意思的實現,如定時器,同步機制等,能夠做爲移植使用。下面就遇到的一些技術講解,首先看第一個:html

  • sets.String(k8s.io/apimachinery/pkg/util/sets/string.go)

實現了對golang map的key的處理,如計算交集,並集等。實際中可能會遇到須要判斷兩個map的key是否重合的場景,此時可使用下述方式實現,sets.StringKeySet函數將入參的map的key抽取成一個String類型,這樣就可使用String的方法操做keygolang

ps:更多功能參見源碼算法

package main

import (
    "fmt"
    "k8s.io/apimachinery/pkg/util/sets"
)

func main(){
    map1 := map[string]int{"aaa":1,"bbb":2,"ccc":3}
    map2 := map[string]int{"ccc":1,"ddd":2,"eee":3}
    newmap1 := sets.StringKeySet(map1)
    newmap2 := sets.StringKeySet(map2)
    fmt.Println(newmap1.List(),newmap2.List())
    fmt.Println(newmap1.HasAny(newmap2.List()...)) //3個點用於把數組打散爲單個元素
}

結果:true
  • 同步機制
    • sync.Mutex(golang 內置方法),用於數據同步

有2個方法:api

func (m *Mutex) Lock()
func (m *Mutex) Unlock()

相似C語言線程的互斥鎖,用於對數據進行加解鎖操做。當數據被加鎖後,未得到該鎖的程序將沒法讀取被加鎖的數據。從下面例子能夠看出在數據被解鎖前其餘協程沒法對該數據進行讀寫操做。數組

ps: read data的數據也可能爲「data緩存

package main

import (
    "fmt"
    "sync"
)

type LockTest struct {
    l sync.Mutex
    data string
}

func main(){
    lockTest := LockTest{sync.Mutex{},"data"}
    go func() {
        lockTest.l.Lock()
        fmt.Println("sleep begin")
        time.Sleep(time.Second*2)
        fmt.Println("sleep end")
        lockTest.l.Unlock()
    }()
    
    time.Sleep(time.Second*1)
    
    go func() {
        lockTest.l.Lock()
        fmt.Println("read data:",lockTest.data)
        lockTest.l.Unlock()
    }()

    go func() {
        lockTest.l.Lock()
        fmt.Println("write data begin")
        lockTest.data="new data"
        fmt.Println("write data end")
        lockTest.l.Unlock()
    }()

    time.Sleep(time.Second*5)
}

結果: sleep begin sleep end write data begin write data end read data: new data
    • sync.RWMutex(golang 內置方法),用於數據同步

讀寫鎖,含4個方法,前2個爲讀鎖,後2個爲寫鎖,使用時要一一對應。寫鎖會阻塞讀寫操做,讀鎖不會阻塞寫操做,讀鎖能夠有多個,讀鎖之間不會相互阻塞,適用於讀多寫少的場景。所以若是單純使用RWMutex.Lock/RWMutex.UnLock與使用Mutex.Lock/Mutex.UnLock效果相同網絡

func (rw *RWMutex) RLock()
func (rw *RWMutex) RUnlock()
func (rw *RWMutex) Lock()
func (rw *RWMutex) Unlock()

讀寫鎖通常是讀鎖和寫鎖結合使用的。在有寫鎖的時候,讀鎖會被阻塞,等待寫鎖釋放後才能進行讀操做。app

ps:寫鎖內部僅能對共享資源進行讀操做,若是執行寫操做會致使數據異常。sync.Mutex和sync.RWMutex通常都是內置在結構體中使用,用於保護本結構體的數據函數

package main

import (
    "fmt"
    "sync"
)
type LockTest struct {
    l sync.RWMutex
    data string
}

func main(){
    lockTest := LockTest{sync.RWMutex{},"data"}
    go func() {
        lockTest.l.Lock()
        fmt.Println("write data begin")
        lockTest.data="new data"
        time.Sleep(time.Second*3)
        fmt.Println("write data end")
        lockTest.l.Unlock()
    }()

    time.Sleep(time.Second*1)

    go func() {
        lockTest.l.RLock()  //阻塞等待寫鎖釋放
        fmt.Println("read begin")
        fmt.Println("read data:",lockTest.data)
        fmt.Println("read begin")
        lockTest.l.RUnlock()
    }()

    time.Sleep(time.Second*5)
}

結果:
write data begin write data end read begin read data:
new data read begin
    • sync.Cond(golang 內置方法),用於條件變量

sync.Cond用於條件等待,在知足某些條件時程序才能繼續執行。它包含以下3個方法:Wait()會掛起其所在的協程等待Signal()或Broadcast()的喚醒。測試

func (c *Cond) Wait() 
func (c *Cond) Signal()
func (c *Cond) Broadcast() 

官方推薦的典型用法以下。因爲喚醒協程並不意味着條件已就緒,所以在喚醒後須要檢測是否本協程的條件已經知足。

c.L.Lock()
for !condition() {
    c.Wait()
}
... make use of condition ...
c.L.Unlock()

使用Signal()喚醒的方式以下,Signal()用於當次喚醒一個協程。若是註釋掉下例中的Signal(),那麼兩個協程會一直Wait(),並不會繼續執行。

package main

import (
    "fmt"
    "sync"
)

func main(){
    l := sync.Mutex{}
    c := sync.NewCond(&l)
    condition1 := false
    condition2 := false
go func() { c.L.Lock() for !condition1 { c.Wait() } fmt.Println("condition1=true,run1") c.L.Unlock() }() go func() { c.L.Lock() for !condition2 { c.Wait() } fmt.Println("condition2=true,run2") c.L.Unlock() }()
time.Sleep(time.Second
*1) fmt.Println("signal-1") condition1=true c.Signal() time.Sleep(time.Second*1) fmt.Println("signal-2") condition2=true c.Signal() time.Sleep(time.Second*10) } 結果: signal-1 condition1=true,run1 signal-2 condition2=true,run2

使用Signal()喚醒協程時須要注意,在多個協程等待時,該函數並無指定須要喚醒哪個協程。下面程序的輸出可能爲「condition1=true,run1」也可能爲「condition2=true,run2」。所以Signal通常適用於僅有一個協程等待的狀況,不然可能形成混亂。

package main

import (
    "fmt"
    "sync"
)

func main(){
    l := sync.Mutex{}
    c := sync.NewCond(&l)
    condition1 := false
    condition2 := false
    go func() {
        c.L.Lock()
        for !condition1 {
            c.Wait()
        }
        fmt.Println("condition1=true,run1")
        c.L.Unlock()
    }()

    go func() {
        c.L.Lock()
        for !condition2 {
            c.Wait()
        }
        fmt.Println("condition2=true,run2")
        c.L.Unlock()
    }()
    time.Sleep(time.Second*1)
    condition1=true
    condition2=true
    c.Signal()
    time.Sleep(time.Second*10)
}

Broadcast()比較簡單,即喚醒全部等待的協程

package main

import (
    "fmt"
    "sync"
)

func main(){
    l := sync.Mutex{}
    c := sync.NewCond(&l)
    condition1 := false
    condition2 := false
    go func() {
        c.L.Lock()
        for !condition1 {
            c.Wait()
        }
        fmt.Println("condition1=true,run1")
        c.L.Unlock()
    }()

    go func() {
        c.L.Lock()
        for !condition2 {
            c.Wait()
        }
        fmt.Println("condition2=true,run2")
        c.L.Unlock()
    }()
    time.Sleep(time.Second*1)
    condition1=true
    condition2=true
    c.Broadcast()
    time.Sleep(time.Second*10)
}
 結果: condition1=true,run1 condition2=true,run2
    • sync.waitgroup,用於等待協程執行完成

sync.waitgroup有以下3個方法,Add(delta int)入參表示須要等待的協程的個數,如2表示須要等待2個協程完成;Done()表示該協程結束;Wait()用於阻塞主協程,等待全部協程結束後釋放。

func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait() 

舉例以下,啓動10個協程,Wait()會阻塞,直到全部的協程執行Done()。

ps: Add(delta int)函數的入參很重要,入參大於實際須要等待的協程會致使主協程一致阻塞,小於須要等待的協程會致使某些協程提早退出

import (
    "fmt"
    "sync"
)

func main(){
    wg := sync.WaitGroup{}
    wg.Add(10)

    for i := 0; i < 10; i++ {
        go func(i int) {
            defer wg.Done()
            fmt.Print(i, " ")
        }(i)
    }

    wg.Wait()
}
 結果: 9 4 0 1 2 3 6 5 7 8 
    • 協程間使用chan進行同步

下例中使用chan實現主協程控制write,並使用write控制read。協程關閉使用close()函數

ps:使用chan進行協程同步通常將chan做爲入參傳入,或在函數內部實現協程間的同步。爲方便驗證,下面例子將全部chan做爲全局變量

package main

import (
    "fmt"
    "sync"
)
var speakCh = make(chan string)
var stopReadChan = make(chan struct{})
var stopWriteChan = make(chan struct{})

func readChan(stopCh <-chan struct{}){
    for {
        select {
        case words := <- speakCh:
            fmt.Println("received:",words)
        case <- stopCh:
            fmt.Println("stop read!")
            return
        }
    }
}

func writeChan(stopCh <-chan struct{}){
    for {
        select {
        case <- stopCh:
            fmt.Println("stop write!")
            close(stopReadChan)
            return
        default:
        }
        speakCh <- "hi"
        time.Sleep(time.Second*2)
    }
}

func main(){
    go readChan(stopReadChan)
    go writeChan(stopWriteChan)

    time.Sleep(time.Second*6)
    close(stopWriteChan)
    time.Sleep(time.Second*6)
}

結果: received: hi received: hi received: hi stop write! stop read!
    • 協程間使用context進行同步

context用於對協程進行管理,如主動退出協程,超時退出協程等,能夠看做是使用chan管理協程的擴展。在使用時首先建立一個context,使用cancel()能夠取消context,並使用Done()返回的chan管理協程。

官方推薦的用法以下:

func Stream(ctx context.Context, out chan<- Value) error {
    for {
        v, err := DoSomething(ctx)
        if err != nil {
            return err
        }
        select {
        case <-ctx.Done():
            return ctx.Err()
        case out <- v:
        }
    }
}

下例中使用context.WithCancel建立一個context,使用cancel()給這一組context發送信號,在協程中使用Done()處理退出事件。

package main

import (
    "fmt"
    "context"
)

func main(){
    ctx,cancel := context.WithCancel(context.Background())
    go testCtx(ctx,"ctx1")
    go testCtx(ctx,"ctx2")
    go testCtx(ctx,"ctx3")
    time.Sleep(time.Second*3)
    cancel()

    time.Sleep(time.Second*5)
}

func testCtx(ctx context.Context, name string) error{
    for {
        select {
        case <-ctx.Done():
            fmt.Println("ctx.Done:",name)
            return ctx.err()
        default:
            fmt.Println("default:",name)
            time.Sleep(time.Second*2)
        }
    }
}

結果: default: ctx1 default: ctx3 default: ctx2 default: ctx3 default: ctx1 default: ctx2 ctx.Done: ctx1 ctx.Done: ctx3 ctx.Done: ctx2

建立context的方式以下,其他三個能夠看做是WithCancel的擴展

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)              //須要主動取消context
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)   //在deadline時間點後取消context
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) //在超時後取消context
func WithValue(parent Context, key, val interface{}) Context

再看一個WithTimeout的例子,下面設置context的超時時間爲3s且沒有主動cancel(),3s超時後能夠看到該context對應的協程正常退出

func main(){
    ctx,_ := context.WithTimeout(context.Background(),time.Second*3)
    go testCtx(ctx,"ctx1")
    go testCtx(ctx,"ctx2")
    go testCtx(ctx,"ctx3")
    time.Sleep(time.Second*5)
}

結果: default: ctx3 default: ctx1 default: ctx2 default: ctx3 default: ctx1 default: ctx2 ctx.Done: ctx3 ctx.Done: ctx2 ctx.Done: ctx1 

context能夠看做是一個樹,當cancel一個context時,會同時cancle它的子context。下面首先建立一個ctx,而後在此ctx下面建立一個subctx。當執行cancle() ctx時會同時cancel() 該的subctx。

context.Background()就是已經實現的首個context。

func main(){
    ctx,cancel := context.WithCancel(context.Background())
    subctx,_ := context.WithCancel(ctx)
    go testCtx(ctx,"ctx1")
    go testCtx(subctx,"subctx1")
    go testCtx(subctx,"subctx2")
    time.Sleep(time.Second*3)
    canclel()

    time.Sleep(time.Second*10)
}

結果: default: subctx2 default: ctx1 default: subctx1 default: subctx2 default: ctx1 default: subctx1 timeout ctx.Done: ctx1 ctx.Done: subctx1 ctx.Done: subctx2

下例中僅cancel() subctx,能夠看到並無影響subctx的parent。

func main(){
    ctx, _:= context.WithCancel(context.Background())
    subctx,subcancel := context.WithCancel(ctx)
    go testCtx(ctx,"ctx1")
    go testCtx(subctx,"subctx1")
    go testCtx(subctx,"subctx2")
    time.Sleep(time.Second*3)
    subcancel()

    time.Sleep(time.Second*10)
}

結果: default: subctx1 default: subctx2 default: ctx1 default: ctx1 default: subctx1 default: subctx2 timeout ctx.Done: subctx2 default: ctx1 ctx.Done: subctx1 default: ctx1 default: ctx1 default: ctx1 default: ctx1
    • wait.Group(k8s.io/apimachinery/pkg/util/wait/wait.go)

client-go中的wait.Group創造性地將sync.WaitGroup與chan和ctx結合,實現了協程間同步和等待所有Group中的協程結束的功能。因爲StartWithChannel和StartWithContext的入參函數類型比較固定,所以使用上並不通用,但能夠做爲參考,在實際中擴展使用。下例中給出了簡單用法。

func (g *Group) Wait() 
func (g *Group) StartWithChannel(stopCh <-chan struct{}, f func(stopCh <-chan struct{}))
func (g *Group) StartWithContext(ctx context.Context, f func(context.Context))
func main(){
    f1:= func(ctx context.Context) {
        for {
            select {
            case <- ctx.Done():
                return
            default:
                fmt.Println("hi11")
                time.Sleep(time.Second)
            }
        }
    }
    wg := wait.Group{}
    ctx, cancel := context.WithCancel(context.Background())
    wg.StartWithContext(ctx,f1)
    time.Sleep(time.Second*3)
    cancel()
    wg.Wait()
}

結果:
hi
hi
hi

 

  •  定時器
    • ticker定時器

首先看一下通常使用的定時器,client-go中比較複雜的定時器也是在此基礎上封裝的。下面例子中給出的是ticker定時器,它會按照必定的時間頻率往Ticker.C中發time.Time類型的數據,能夠在協程中經過判斷Ticker.C來執行定時任務。下例來自官方,實現每秒執行一次打印,

import (
    "fmt"
    "time"
)

func main(){
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()
    done := make(chan bool)
    go func() {
        time.Sleep(10 * time.Second)
        done <- true
    }()
    for {
        select {
        case <-done:
            fmt.Println("Done!")
            return
        case t := <-ticker.C:
            fmt.Println("Current time: ", t)
        }
    }
}

結果: Current time: 2019-07-04 14:30:37.9088968 +0800 CST m=+5.328291301 Current time: 2019-07-04 14:30:38.9089349 +0800 CST m=+6.328328801 Current time: 2019-07-04 14:30:39.9101415 +0800 CST m=+7.329534901 Current time: 2019-07-04 14:30:40.9095174 +0800 CST m=+8.328910201 Current time: 2019-07-04 14:30:41.9092961 +0800 CST m=+9.328688301 Current time: 2019-07-04 14:30:42.9087682 +0800 CST m=+10.328159801 Current time: 2019-07-04 14:30:43.9088604 +0800 CST m=+11.328251401 Current time: 2019-07-04 14:30:44.909609 +0800 CST m=+12.328999501 Current time: 2019-07-04 14:30:45.9094782 +0800 CST m=+13.328868101 Current time: 2019-07-04 14:30:46.909006 +0800 CST m=+14.328395401 Done!

須要注意的是使用ticker並不能保證程序被精確性調度,若是程序的執行時間大於ticker的調度週期,那麼程序的觸發週期會發生誤差(可能因爲系統cpu佔用太高,網絡延遲等緣由)。以下面例子中,ticker觸發週期爲1s,但程序執行大於2s,此時會出現程序執行頻率不一致的狀況。適用於週期性觸發一個任務。

func main(){
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()
    done := make(chan bool)
    go func() {
        time.Sleep(10 * time.Second)
        done <- true
    }()
    for {
        select {
        case <-done:
            fmt.Println("Done!")
            return
        case t := <-ticker.C:
            time.Sleep(time.Second*2)
            fmt.Println("Current time: ", t)
        }
    }
}

結果: Current time: 2019-07-04 14:56:52.5446526 +0800 CST m=+5.281916601 Current time: 2019-07-04 14:56:53.5452488 +0800 CST m=+6.282512201 //和上一條相差1s,但和下一條相差2s Current time: 2019-07-04 14:56:55.5443528 +0800 CST m=+8.281615101 Current time: 2019-07-04 14:56:57.5449183 +0800 CST m=+10.282179401 Current time: 2019-07-04 14:56:59.5448671 +0800 CST m=+12.282127101 Done!
    • timer定時器

timer的機制和ticker相同,在定時器超時後往一個chan中發送time.Time數據。不一樣的是ticker能夠週期性調度,timer只會執行一次,若是須要重複調度,須要使用Reset函數重置timer。利用該機制,能夠在同一個timer上以不一樣間隔調度程序。

func main(){
    timer := time.NewTimer(time.Second)
    defer timer.Stop()
    t := <-timer.C
    fmt.Println("Current time: ", t)
    timer.Reset(time.Second*2)
    t = <-timer.C
    fmt.Println("Current time: ", t)
    timer.Reset(time.Second*3)
    t = <-timer.C
    fmt.Println("Current time: ", t)
}

結果: Current time: 2019-07-04 15:47:01.7518201 +0800 CST m=+5.312710501 Current time: 2019-07-04 15:47:03.7766692 +0800 CST m=+7.337558501 Current time: 2019-07-04 15:47:06.7770913 +0800 CST m=+10.337978901

使用timer須要注意Reset函數只能在timer超時後使用,不然將無效。由於Timer.C的長度只有1,若是前面一個定時器結束前執行了Reset,那麼前面的定時器會被取消。具體能夠參見這裏

func NewTimer(d Duration) *Timer {
    c := make(chan Time, 1)
    ...
}

下面例子中能夠看出,屢次執行Reset並不會屢次觸發定時任務,在前一個定時器超時前執行Reset,會取消前一個定時器並以Reset中的duration開始計時。

func main(){
    fmt.Println("now time: "time.Now())
    timer := time.NewTimer(time.Second*5)
    
    defer timer.Stop()
    timer.Reset(time.Second*2)
    timer.Reset(time.Second*2)
    timer.Reset(time.Second*2)


    go func() {
        for ; ;  {
            select {
            case t:=<- timer.C:
                fmt.Println("Current time: ", t)
            }
        }
    }()
    
    time.Sleep(time.Second*10)
}

結果: now time: 2019-07-04 16:16:31.7246084 +0800 CST m=+4.281414201 Current time: 2019-07-04 16:16:33.7505395 +0800 CST m=+6.307344201

官方推薦的用法以下,因爲沒有加鎖,此方法不能在多個協程中同時使用。

if !t.Stop() {
    <-t.C
}
t.Reset(d)

更多timer的用法能夠參見官方文檔

  • wait實現(k8s.io/apimachinery/pkg/util/wait/wait.go)
    • wait中實現了不少與定時相關的函數,首先來看第一組:
func Forever(f func(), period time.Duration) 
func Until(f func(), period time.Duration, stopCh <-chan struct{}) 
func UntilWithContext(ctx context.Context, f func(context.Context), period time.Duration) 
func NonSlidingUntil(f func(), period time.Duration, stopCh <-chan struct{}) 
func NonSlidingUntilWithContext(ctx context.Context, f func(context.Context), period time.Duration) 

Until函數每period會調度f函數,若是stopCh中有中止信號,則退出。當程序運行時間超過period時,也不會退出調度循環,該特性和Ticker相同。底層使用Timer實現。

Until和NonSlidingUntil爲一對,UntilWithContext和NonSlidingUntilWithContext爲一對,區別只是定時器啓動時間點不一樣,能夠簡單用下圖表示。能夠看到帶「NonSliding」前綴的函數。

這兩種(帶「NonSliding」前綴的)函數在處理正常程序時沒有什麼區別,但在一些場景下會有不一樣的地方。下面例子中使用wait.NonSlidingUntil處理的程序中sleep了2s,這能夠表示程序由於某種緣由致使超出正常處理時間。此時能夠看到結果中的「num 1」和「num 2」是同時調用的

func main(){
    first := true
    num := 0
    stopCh:=make(chan struct{} )
    
    go func() {
        time.Sleep(time.Second*10)
        close(stopCh)
        fmt.Println("done")
    }()

    go wait.NonSlidingUntil(func(){
        if true == first{
            time.Sleep(time.Second*2)
            first=false
        }
num = num + 1 fmt.Println(
"num:",num,"time",time.Now()) },time.Second*1,stopCh) time.Sleep(time.Second*100) } 結果: num: 1 time 2019-07-04 21:05:59.5298524 +0800 CST m=+26.277103101 num: 2 time 2019-07-04 21:05:59.554999 +0800 CST m=+26.302249701 num: 3 time 2019-07-04 21:06:00.5559679 +0800 CST m=+27.303218601 num: 4 time 2019-07-04 21:06:01.5566608 +0800 CST m=+28.303911501

將上述程序的wait.NonSlidingUntil替換爲wait.Until,獲得以下結果,能夠看到首次(異常)和第二次(正常)的間隔正好是wait.Until中設置的調度週期,即1s。

ps:大部分場景下二者使用上並無什麼不一樣,畢竟正常狀況下程序運行時間必然小於程序調度週期。若是須要在程序處理延時的狀況下儘快進行下一次調度,則選擇帶」NonSliding「前綴的函數

結果:
num: 1 time 2019-07-04 21:09:14.9643889 +0800 CST m=+2.010865201 num: 2 time 2019-07-04 21:09:15.9935285 +0800 CST m=+3.040004801 num: 3 time 2019-07-04 21:09:16.9956846 +0800 CST m=+4.042160901
    • func Forever(f func(), period time.Duration)

該函數比較簡單,就是取消了用於控制Until中止的stopCh。以永遠不中止的方式週期性執行f函數

    •  func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error 

ExponentialBackoff能夠實如今函數執行錯誤後實現以指數退避方式的延時重試。ExponentialBackoff內部使用的是time.Sleep

ExponentialBackoff的首個入參Backoff以下:

  • Duration:表示初始的延時時間
  • Factor:指數退避的因子
  • Jitter:能夠看做是誤差因子,該值越大,每次重試的延時的可選區間越大
  • Steps:指數退避的步數,能夠看做程序的最大重試次數
  • Cap:用於在Factor非0時限制最大延時時間和最大重試次數,爲0表示不限制最大延時時間
type Backoff struct {
    // The initial duration.
    Duration time.Duration
    // Duration is multiplied by factor each iteration. Must be greater
    // than or equal to zero.
    Factor float64
    // The amount of jitter applied each iteration. Jitter is applied after
    // cap.
    Jitter float64
    // The number of steps before duration stops changing. If zero, initial
    // duration is always used. Used for exponential backoff in combination
    // with Factor.
    Steps int
    // The returned duration will never be greater than cap *before* jitter
    // is applied. The actual maximum cap is `cap * (1.0 + jitter)`.
    Cap time.Duration
}

第二個參數ConditionFunc表示運行的函數,返回的bool值表示該函數是否執行成功,若是執行成功則會退出指數退避

type ConditionFunc func() (done bool, err error)

下面作幾組測試:

=> 當Factor和Jitter都爲0時,能夠看到調度週期是相同的,即Duration的值(1s)。

import (
    "fmt"
    "k8s.io/apimachinery/pkg/util/wait"
    "time"
)


func main(){
    var DefaultRetry = wait.Backoff{
        Steps:    5,
        Duration: 1 * time.Second,
        Factor:   0,
        Jitter:   0,
    }

    fmt.Println(wait.ExponentialBackoff(DefaultRetry,func() (bool, error){
        fmt.Println(time.Now())
        return false,nil
    }))
}

結果: 2019-07-05 10:17:33.9610108 +0800 CST m=+0.079831101 2019-07-05 10:17:34.961132 +0800 CST m=+1.079952301 2019-07-05 10:17:35.961512 +0800 CST m=+2.080332301 2019-07-05 10:17:36.9625144 +0800 CST m=+3.081334701 2019-07-05 10:17:37.9636334 +0800 CST m=+4.082453701 timed out waiting for the condition

=> 先看Jitter對duration的影響,Jitter(duration, b.Jitter)的計算方式以下,若是入參的Factor爲0,而Jitter非0,則將Factor調整爲1。rand.Float64()爲[0.0,1.0)的僞隨機數。

將Jitter調整爲0.5,根據下面計算方式預期duration爲[1s,1.5s)。運行程序得出以下結果,觀察能夠發現,duration大概是1.4s

if maxFactor <= 0.0 {
    maxFactor = 1.0 } wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration))
var DefaultRetry = wait.Backoff{
    Steps:    5,
    Duration: 1 * time.Second,
    Factor:   0,
    Jitter:   0.5,
}

結果: 2019-07-05 10:21:49.5993445 +0800 CST m=+2.382669101 2019-07-05 10:21:50.9026701 +0800 CST m=+3.685994701 2019-07-05 10:21:52.3759019 +0800 CST m=+5.159226401 2019-07-05 10:21:53.7086265 +0800 CST m=+6.491951001 2019-07-05 10:21:54.9283913 +0800 CST m=+7.711715901 timed out waiting for the condition

=> Factor非0且Jitter爲0時,對duration的調整以下

if b.Factor != 0 {
    b.Duration = time.Duration(float64(b.Duration) * b.Factor)
    if b.Cap > 0 && b.Duration > b.Cap {
        b.Duration = b.Cap
        b.Steps = 0
    }
}

從公式中能夠得出,Factor對程序執行的延的影響以下,能夠看到Factor爲1時並無什麼做用

duration(1) = duration
duration(2) = Factor * duration(1) duration(3) = Factor * duration(2) ... duration(n) = Factor * duration(n-1)

Factor爲1時,能夠看到函數執行間隔均爲1s

var DefaultRetry = wait.Backoff{
    Steps:    5,
    Duration: 1 * time.Second,
    Factor:   1,
    Jitter:   0,
}

結果: 2019-07-05 10:28:50.8481017 +0800 CST m=+2.363983901 2019-07-05 10:28:51.8482274 +0800 CST m=+3.364109601 2019-07-05 10:28:52.8482359 +0800 CST m=+4.364118201 2019-07-05 10:28:53.848687 +0800 CST m=+5.364569301 2019-07-05 10:28:54.849409 +0800 CST m=+6.365291201 timed out waiting for the condition

調整Factor爲3,預期延時時間爲1s,3s,9s,27s,從測試結果看與預期相符

var DefaultRetry = wait.Backoff{
    Steps:    5,
    Duration: 1 * time.Second,
    Factor:   3,
    Jitter:   0,
}
 結果: 2019-07-05 10:35:06.9030165 +0800 CST m=+0.077746101 2019-07-05 10:35:07.9038392 +0800 CST m=+1.078568701 2019-07-05 10:35:10.9038733 +0800 CST m=+4.078602901 2019-07-05 10:35:19.9042141 +0800 CST m=+13.078943601 2019-07-05 10:35:46.904647 +0800 CST m=+40.079376501 timed out waiting for the condition

=> 當Factor和Jitter非0時的延遲計算方式以下:

    save_duration(0) = duration
duration(1) =  Jitter(save_duration(0) , b.Jitter)
    save_duration(1) = Factor * save_duration(0) 

duration(2) = Jitter(save_duration(1), b.Jitter)
    save_duration(2) = Factor * save_duration(1)

duration(3) = Jitter(save_duration(2), b.Jitter)
    save_duration = Factor * save_duration(2)
...
duration(n) = Jitter(save_duration(n-1), b.Jitter)

設置Backoff參數以下,按照上述公式得出的指望延時爲[1,1.1),[3,3.3),  [9,9.9), [27,29.7)。實際運行以下,小數點一位後四捨五入得出實際延時爲1.1, 3.3, 9.6, 28.2,與預期相符。

var DefaultRetry = wait.Backoff{
    Steps:    5,
    Duration: 1 * time.Second,
    Factor:   3,
    Jitter:   0.1,
}

結果: 2019-07-05 11:42:54.8779046 +0800 CST m=+0.135740401 2019-07-05 11:42:55.9399737 +0800 CST m=+1.197782901 2019-07-05 11:42:59.2240904 +0800 CST m=+4.481817401 2019-07-05 11:43:08.8232438 +0800 CST m=+14.080730501 2019-07-05 11:43:37.0058953 +0800 CST m=+42.262752301 timed out waiting for the condition

=> 最後看下Backoff.Cap的影響。設置Cap爲10s,預期會比上面不帶Cap的少執行2次(不帶Cap限制的在Step爲0時還會執行一次)。實際執行上也是如此

var DefaultRetry = wait.Backoff{
    Steps:    5,
    Duration: 1 * time.Second,
    Factor:   3,
    Jitter:   0.1,
    Cap:      time.Second*10,
}
 結果: 2019-07-05 12:02:43.8678742 +0800 CST m=+0.120673901 2019-07-05 12:02:44.9294079 +0800 CST m=+1.182202101 2019-07-05 12:02:48.2125558 +0800 CST m=+4.465333301

ExponentialBackoff借鑑了TCP協議的指數退避算法,適用於可能會產生資源競爭的場景。指數退避能夠有效地在沒有緩存處理場景下減少服務端的壓力。

    •  wait庫的第二組
func Poll(interval, timeout time.Duration, condition ConditionFunc) error 
func PollImmediate(interval, timeout time.Duration, condition ConditionFunc) error 
func PollInfinite(interval time.Duration, condition ConditionFunc) error 
func PollImmediateInfinite(interval time.Duration, condition ConditionFunc) error 
func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error 
func PollImmediateUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error 

Poll表示以interval的週期執行condition函數,直到timeout超時或condition返回true/err非空。

wait.Poll和wait.Until使用上仍是有些相似的,區別在於一個使用timeout限制超時時間,一個使用chan提供主動中止調度。

import (
    "fmt"
    "k8s.io/apimachinery/pkg/util/wait"
    "time"
)


func main(){

    wait.Poll(time.Second, time.Second*5, func() (done bool, err error) {
        fmt.Println(time.Now())
        return false,nil
    })

結果: 2019-07-05 13:43:31.2622405 +0800 CST m=+1.069324901 2019-07-05 13:43:32.2619663 +0800 CST m=+2.069050701 2019-07-05 13:43:33.2626114 +0800 CST m=+3.069695801 2019-07-05 13:43:34.2626876 +0800 CST m=+4.069772001 2019-07-05 13:43:35.2624168 +0800 CST m=+5.069501201 2019-07-05 13:43:35.2624168 +0800 CST m=+5.069501201

PollInfinite相比Poll取消了timeout的限制。

PollUntil相比Until來講,PollUntil在condition函數返回true或error的時候會退出調度。

Poll和PollImmediate爲一組,PollInfinite和PollImmediateInfinite爲一組,PollUntil和PollImmediateUntil爲一組,它們的細微差異在於前者在執行condition函數前會等待interval時間,後者則會首先運行condition函數,而後再檢查是否須要等待(condition返回true或err非空時不會再等待)。若是不關注這點差別,用哪一個均可以。

    •  heap 堆(k8s.io/client-go/tools/cache)

實現heap須要實現下面Interface接口,heap使用隊列實現了一個徹底二叉樹

// heap.Interface
type Interface interface {
    sort.Interface
    Push(x interface{}) // add x as element Len()
    Pop() interface{}   // remove and return element Len() - 1.
}

// sort.Interface
type Interface interface {
    // Len is the number of elements in the collection.
    Len() int
    // Less reports whether the element with
    // index i should sort before the element with index j.
    Less(i, j int) bool
    // Swap swaps the elements with indexes i and j.
    Swap(i, j int)
}

heap對外提供的方法爲以下:

func Init(h Interface)
func Push(h Interface, x interface{})
func Pop(h Interface) interface{}
func Remove(h Interface, i int) interface{}
func Fix(h Interface, i int) // 當修改完隊列中的index=i的元素後,從新排序

例子以下:

import (
    "container/heap"
    "fmt"
)
    
func GetAllHeapItems(t Heap_t,name string){
    items := []interface{}{}
    for t.Len() != 0{
        items = append(items, heap.Pop(&t))
    }
    fmt.Println(name,":",items)
}

type Heap_t []int
func (h Heap_t)Len() int{return len(h)}
func (h Heap_t)Less(i,j int)bool {return h[i]<h[j]}
func (h Heap_t)Swap(i,j int){h[i], h[j] = h[j], h[i]}
func (h *Heap_t)Push(x interface{}){*h = append(*h,x.(int))}
func (h *Heap_t)Pop() interface{}{
    if h.Len() == 0{
        return nil
    }
    x := (*h)[len(*h)-1]
    *h = (*h)[0:(len(*h) - 1)]
    return x
}

func main(){
    h := &Heap_t{4,2,6,80,100,45} //[1 2 4 8 80 45 6 23 56 100]
    heap.Init(h)
    GetAllHeapItems(*h,"h")

    h1 := &Heap_t{4,2,6,80,100,45}
    heap.Init(h1)
    h1.Push(3)
    GetAllHeapItems(*h1,"h1")

    h2 := &Heap_t{4,2,6,80,100,45}
    heap.Init(h2)
    GetAllHeapItems(*h2,"h2")

    h3 := &Heap_t{4,2,6,80,100,45}
    heap.Init(h3)
    (*h3)[2] = 200
    fmt.Println(1111,h3)
    heap.Fix(h3,2)
    fmt.Println(2222,h3)
    GetAllHeapItems(*h3,"h3")
}

結果: h : [2 4 6 45 80 100] h1 : [2 3 4 6 45 80 100] h2 : [2 4 6 45 80 100] 1111 &[2 4 200 80 100 45] 2222 &[2 4 45 80 100 200] h3 : [2 4 45 80 100 200]

heap的實現比較巧妙,使用隊列實現了徹底二叉樹,比較適用於查詢頻繁的場景,原理解析能夠參見這裏

更多使用和例子參見官方文檔

  • klog(k8s.io/klog) 實現執行日誌打印
  • 使用select{}實現主協程不退出
func main(){
    ...
    select{}
}
  •  可使用switch對地址進行判斷
package main

import (
    "fmt"
)

func main(){
    type emptyCtx int
    background := new(emptyCtx)
    todo       := new(emptyCtx)
    typeSwitch := func (i interface{}) {
        switch i {
        case background:
            fmt.Println("background")
        case todo:
            fmt.Println("todo")
        default:
            fmt.Println("default")
        }
    }

    typeSwitch(background)
}

結果: true

 

 參考:

https://www.flysnow.org/2017/05/12/go-in-action-go-context.html

相關文章
相關標籤/搜索