k8s 中定時任務的實現

k8s 中有許多優秀的包均可以在平時的開發中借鑑與使用,好比,任務的定時輪詢、高可用的實現、日誌處理、緩存使用等都是獨立的包,能夠直接引用。本篇文章會介紹 k8s 中定時任務的實現,k8s 中定時任務都是經過 wait 包實現的,wait 包在 k8s 的多個組件中都有用到,如下是 wait 包在 kubelet 中的幾處使用:golang

func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) (err error) {
        ...
        // kubelet 每5分鐘一次從 apiserver 獲取證書
        closeAllConns, err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, 5*time.Minute)
        if err != nil {
            return err
        }

        closeAllConns, err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, 5*time.Minute)
        if err != nil {
            return err
        }
        ...
}
...
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration,   kubeDeps *kubelet.Dependencies, enableServer bool) {
    // 持續監聽 pod 的變化
    go wait.Until(func() {
        k.Run(podCfg.Updates())
    }, 0, wait.NeverStop)
    ...
}

golang 中能夠經過 time.Ticker 實現定時任務的執行,但在 k8s 中用了更原生的方式,使用 time.Timer 實現的。time.Ticker 和 time.Timer 的使用區別以下:編程

  • ticker 只要定義完成,今後刻開始計時,不須要任何其餘的操做,每隔固定時間都會自動觸發。api

  • timer 定時器是到了固定時間後會執行一次,僅執行一次緩存

  • 若是 timer 定時器要每隔間隔的時間執行,實現 ticker 的效果,使用 func (t *Timer) Reset(d Duration) bool微信

一個示例:函數

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup

    timer1 := time.NewTimer(2 * time.Second)
    ticker1 := time.NewTicker(2 * time.Second)

    wg.Add(1)
    go func(t *time.Ticker) {
        defer wg.Done()
        for {
            <-t.C
            fmt.Println("exec ticker", time.Now().Format("2006-01-02 15:04:05"))
        }
    }(ticker1)

    wg.Add(1)
    go func(t *time.Timer) {
        defer wg.Done()
        for {
            <-t.C
            fmt.Println("exec timer", time.Now().Format("2006-01-02 15:04:05"))
            t.Reset(2 * time.Second)
        }
    }(timer1)

    wg.Wait()
}

1、wait 包中的核心代碼

核心代碼(k8s.io/apimachinery/pkg/util/
wait/wait.go):工具

func JitterUntil(f func()period time.DurationjitterFactor float64sliding boolstopCh <-chan struct{}) {
    var t *time.Timer
    var sawTimeout bool

    for {
        select {
        case <-stopCh:
            return
        default:
        }

        jitteredPeriod := period
        if jitterFactor > 0.0 {
            jitteredPeriod = Jitter(period, jitterFactor)
        }

        if !sliding {
            t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
        }

        func() {
            defer runtime.HandleCrash()
            f()
        }()

        if sliding {
            t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
        }

        select {
        case <-stopCh:
            return
        case <-t.C:
            sawTimeout = true
        }
    }
}

...

func resetOrReuseTimer(t *time.Timer, d time.Duration, sawTimeout bool) *time.Timer {
    if t == nil {
        return time.NewTimer(d)
    }
    if !t.Stop() && !sawTimeout {
        <-t.C
    }
    t.Reset(d)
    return t
}

幾個關鍵點的說明:性能

  • 一、若是 sliding 爲 true,則在 f() 運行以後計算週期。若是爲 false,那麼 period 包含 f() 的執行時間。spa

  • 二、在 golang 中 select 沒有優先級選擇,爲了不額外執行 f(),在每次循環開始後會先判斷 stopCh chan。.net

k8s 中 wait 包實際上是對 time.Timer 作了一層封裝實現。

2、wait 包經常使用的方法

一、按期執行一個函數,永不中止,可使用 Forever 方法:

func Forever(f func(), period time.Duration)

二、在須要的時候中止循環,那麼可使用下面的方法,增長一個用於中止的 chan 便可,方法定義以下:

func Until(f func(), period time.Duration, stopCh <-chan struct{})

上面的第三個參數 stopCh 就是用於退出無限循環的標誌,中止的時候咱們 close 掉這個 chan 就能夠了。

三、有時候,咱們還會須要在運行前去檢查先決條件,在條件知足的時候纔去運行某一任務,這時候可使用 Poll 方法:

func Poll(interval, timeout time.Duration, condition ConditionFunc)

這個函數會以 interval 爲間隔,不斷去檢查 condition 條件是否爲真,若是爲真則能夠繼續後續處理;若是指定了 timeout 參數,則該函數也能夠只常識指定的時間。

四、PollUntil 方法和上面的相似,可是沒有 timeout 參數,多了一個 stopCh 參數,以下所示:

PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error

此外還有 PollImmediate 、 PollInfinite 和 PollImmediateInfinite 方法。

3、總結

本篇文章主要講了 k8s 中定時任務的實現與對應包(wait)中方法的使用。經過閱讀 k8s 的源代碼,能夠發現 k8s 中許多功能的實現也都是咱們須要在平時工做中用的,其大部分包的性能都是通過大規模考驗的,經過使用其相關的工具包不只能學到大量的編程技巧也能避免本身造輪子。



本文分享自微信公衆號 - 田飛雨(kubeConChina)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索