k8s 中有許多優秀的包均可以在平時的開發中借鑑與使用,好比,任務的定時輪詢、高可用的實現、日誌處理、緩存使用等都是獨立的包,能夠直接引用。本篇文章會介紹 k8s 中定時任務的實現,k8s 中定時任務都是經過 wait 包實現的,wait 包在 k8s 的多個組件中都有用到,如下是 wait 包在 kubelet 中的幾處使用:java
func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) (err error) { ... // kubelet 每5分鐘一次從 apiserver 獲取證書 closeAllConns, err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, 5*time.Minute) if err != nil { return err } closeAllConns, err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, 5*time.Minute) if err != nil { return err } ... } ... func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) { // 持續監聽 pod 的變化 go wait.Until(func() { k.Run(podCfg.Updates()) }, 0, wait.NeverStop) ... }
golang 中能夠經過 time.Ticker 實現定時任務的執行,但在 k8s 中用了更原生的方式,使用 time.Timer 實現的。time.Ticker 和 time.Timer 的使用區別以下:golang
func (t *Timer) Reset(d Duration) bool
一個示例:編程
package main import ( "fmt" "sync" "time" ) func main() { var wg sync.WaitGroup timer1 := time.NewTimer(2 * time.Second) ticker1 := time.NewTicker(2 * time.Second) wg.Add(1) go func(t *time.Ticker) { defer wg.Done() for { <-t.C fmt.Println("exec ticker", time.Now().Format("2006-01-02 15:04:05")) } }(ticker1) wg.Add(1) go func(t *time.Timer) { defer wg.Done() for { <-t.C fmt.Println("exec timer", time.Now().Format("2006-01-02 15:04:05")) t.Reset(2 * time.Second) } }(timer1) wg.Wait() }
核心代碼(k8s.io/apimachinery/pkg/util/wait/wait.go):api
func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) { var t *time.Timer var sawTimeout bool for { select { case <-stopCh: return default: } jitteredPeriod := period if jitterFactor > 0.0 { jitteredPeriod = Jitter(period, jitterFactor) } if !sliding { t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout) } func() { defer runtime.HandleCrash() f() }() if sliding { t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout) } select { case <-stopCh: return case <-t.C: sawTimeout = true } } } ... func resetOrReuseTimer(t *time.Timer, d time.Duration, sawTimeout bool) *time.Timer { if t == nil { return time.NewTimer(d) } if !t.Stop() && !sawTimeout { <-t.C } t.Reset(d) return t }
幾個關鍵點的說明:緩存
k8s 中 wait 包實際上是對 time.Timer 作了一層封裝實現。架構
func Forever(f func(), period time.Duration)分佈式
二、在須要的時候中止循環,那麼可使用下面的方法,增長一個用於中止的 chan 便可,方法定義以下:函數
func Until(f func(), period time.Duration, stopCh <-chan struct{})工具
上面的第三個參數 stopCh 就是用於退出無限循環的標誌,中止的時候咱們 close 掉這個 chan 就能夠了。源碼分析
三、有時候,咱們還會須要在運行前去檢查先決條件,在條件知足的時候纔去運行某一任務,這時候可使用 Poll 方法:
func Poll(interval, timeout time.Duration, condition ConditionFunc)
這個函數會以 interval 爲間隔,不斷去檢查 condition 條件是否爲真,若是爲真則能夠繼續後續處理;若是指定了 timeout 參數,則該函數也能夠只常識指定的時間。
PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error
此外還有 PollImmediate 、 PollInfinite 和 PollImmediateInfinite 方法。
本篇文章主要講了 k8s 中定時任務的實現與對應包(wait)中方法的使用。經過閱讀 k8s 的源代碼,能夠發現 k8s 中許多功能的實現也都是咱們須要在平時工做中用的,其大部分包的性能都是通過大規模考驗的,經過使用其相關的工具包不只能學到大量的編程技巧也能避免本身造輪子。
歡迎學Java和大數據的朋友們加入java架構交流: 855835163 加羣連接:https://jq.qq.com/?_wv=1027&k=5dPqXGI 羣內提供免費的架構資料還有:Java工程化、高性能及分佈式、高性能、深刻淺出。高架構。性能調優、Spring,MyBatis,Netty源碼分析和大數據等多個知識點高級進階乾貨的免費直播講解 能夠進來一塊兒學習交流哦 直播課堂地址:https://ke.qq.com/course/260263?flowToken=1007014