在Kubernetes中,一般kube-schduler和kube-controller-manager都是多副本進行部署的來保證高可用,而真正在工做的實例其實只有一個。這裏就利用到 leaderelection
的選主機制,保證leader是處於工做狀態,而且在leader掛掉以後,從其餘節點選取新的leader保證組件正常工做。git
不僅僅只是k8s中的這兩個組件用到,在其餘服務中也能夠看到這個包的使用,好比cluster-autoscaler等都能看獲得這個包的,今天就來看看這個包的使用以及它內部是如何實現的。github
如下是一個簡單使用的例子,編譯完成以後同時啓動多個進程,可是隻有一個進程在工做,當把leader進程kill掉以後,會從新選舉出一個leader進行工做,即執行其中的 run
方法:json
/* 例子來源於client-go中的example包中 */ package main import ( "context" "flag" "os" "os/signal" "syscall" "time" "github.com/google/uuid" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/klog" ) func buildConfig(kubeconfig string) (*rest.Config, error) { if kubeconfig != "" { cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig) if err != nil { return nil, err } return cfg, nil } cfg, err := rest.InClusterConfig() if err != nil { return nil, err } return cfg, nil } func main() { klog.InitFlags(nil) var kubeconfig string var leaseLockName string var leaseLockNamespace string var id string flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file") flag.StringVar(&id, "id", uuid.New().String(), "the holder identity name") flag.StringVar(&leaseLockName, "lease-lock-name", "", "the lease lock resource name") flag.StringVar(&leaseLockNamespace, "lease-lock-namespace", "", "the lease lock resource namespace") flag.Parse() if leaseLockName == "" { klog.Fatal("unable to get lease lock resource name (missing lease-lock-name flag).") } if leaseLockNamespace == "" { klog.Fatal("unable to get lease lock resource namespace (missing lease-lock-namespace flag).") } // leader election uses the Kubernetes API by writing to a // lock object, which can be a LeaseLock object (preferred), // a ConfigMap, or an Endpoints (deprecated) object. // Conflicting writes are detected and each client handles those actions // independently. config, err := buildConfig(kubeconfig) if err != nil { klog.Fatal(err) } client := clientset.NewForConfigOrDie(config) run := func(ctx context.Context) { // complete your controller loop here klog.Info("Controller loop...") select {} } // use a Go context so we can tell the leaderelection code when we // want to step down ctx, cancel := context.WithCancel(context.Background()) defer cancel() // listen for interrupts or the Linux SIGTERM signal and cancel // our context, which the leader election code will observe and // step down ch := make(chan os.Signal, 1) signal.Notify(ch, os.Interrupt, syscall.SIGTERM) go func() { <-ch klog.Info("Received termination, signaling shutdown") cancel() }() // we use the Lease lock type since edits to Leases are less common // and fewer objects in the cluster watch "all Leases". // 指定鎖的資源對象,這裏使用了Lease資源,還支持configmap,endpoint,或者multilock(即多種配合使用) lock := &resourcelock.LeaseLock{ LeaseMeta: metav1.ObjectMeta{ Name: leaseLockName, Namespace: leaseLockNamespace, }, Client: client.CoordinationV1(), LockConfig: resourcelock.ResourceLockConfig{ Identity: id, }, } // start the leader election code loop leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ Lock: lock, // IMPORTANT: you MUST ensure that any code you have that // is protected by the lease must terminate **before** // you call cancel. Otherwise, you could have a background // loop still running and another process could // get elected before your background loop finished, violating // the stated goal of the lease. ReleaseOnCancel: true, LeaseDuration: 60 * time.Second,//租約時間 RenewDeadline: 15 * time.Second,//更新租約的 RetryPeriod: 5 * time.Second,//非leader節點重試時間 Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { //變爲leader執行的業務代碼 // we're notified when we start - this is where you would // usually put your code run(ctx) }, OnStoppedLeading: func() { // 進程退出 // we can do cleanup here klog.Infof("leader lost: %s", id) os.Exit(0) }, OnNewLeader: func(identity string) { //當產生新的leader後執行的方法 // we're notified when new leader elected if identity == id { // I just got the lock return } klog.Infof("new leader elected: %s", identity) }, }, }) }
關鍵啓動參數說明:segmentfault
kubeconfig: 指定kubeconfig文件地址 lease-lock-name:指定lock的名稱 lease-lock-namespace:指定lock存儲的namespace id: 例子中提供的區別參數,用於區分實例 logtostderr:klog提供的參數,指定log輸出到控制檯 v: 指定日誌輸出級別
同時啓動兩個進程:
啓動進程1:api
go run main.go -kubeconfig=/Users/silenceper/.kube/config -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=1 -v=4 I0215 19:56:37.049658 48045 leaderelection.go:242] attempting to acquire leader lease default/example... I0215 19:56:37.080368 48045 leaderelection.go:252] successfully acquired lease default/example I0215 19:56:37.080437 48045 main.go:87] Controller loop...
啓動進程2:bash
➜ leaderelection git:(master) ✗ go run main.go -kubeconfig=/Users/silenceper/.kube/config -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=2 -v=4 I0215 19:57:35.870051 48791 leaderelection.go:242] attempting to acquire leader lease default/example... I0215 19:57:35.894735 48791 leaderelection.go:352] lock is held by 1 and has not yet expired I0215 19:57:35.894769 48791 leaderelection.go:247] failed to acquire lease default/example I0215 19:57:35.894790 48791 main.go:151] new leader elected: 1 I0215 19:57:44.532991 48791 leaderelection.go:352] lock is held by 1 and has not yet expired I0215 19:57:44.533028 48791 leaderelection.go:247] failed to acquire lease default/example
這裏能夠看出來id=1的進程持有鎖,而且運行的程序,而id=2的進程表示沒法獲取到鎖,在不斷的進程嘗試。併發
如今kill掉id=1進程,在等待lock釋放以後(有個LeaseDuration時間),leader變爲id=2的進程執行工做less
I0215 20:01:41.489300 48791 leaderelection.go:252] successfully acquired lease default/example I0215 20:01:41.489577 48791 main.go:87] Controller loop...
基本原理其實就是利用經過Kubernetes中 configmap
, endpoints
或者 lease
資源實現一個分佈式鎖,搶(acqure)到鎖的節點成爲leader,而且按期更新(renew)。其餘進程也在不斷的嘗試進行搶佔,搶佔不到則繼續等待下次循環。當leader節點掛掉以後,租約到期,其餘節點就成爲新的leader。分佈式
經過 leaderelection.RunOrDie
啓動,ide
func RunOrDie(ctx context.Context, lec LeaderElectionConfig) { le, err := NewLeaderElector(lec) if err != nil { panic(err) } if lec.WatchDog != nil { lec.WatchDog.SetLeaderElection(le) } le.Run(ctx) }
傳入參數 LeaderElectionConfig
:
type LeaderElectionConfig struct { // Lock 的類型 Lock rl.Interface //持有鎖的時間 LeaseDuration time.Duration //在更新租約的超時時間 RenewDeadline time.Duration //競爭獲取鎖的時間 RetryPeriod time.Duration //狀態變化時執行的函數,支持三種: //一、OnStartedLeading 啓動是執行的業務代碼 //二、OnStoppedLeading leader中止執行的方法 //三、OnNewLeader 當產生新的leader後執行的方法 Callbacks LeaderCallbacks //進行監控檢查 // WatchDog is the associated health checker // WatchDog may be null if its not needed/configured. WatchDog *HealthzAdaptor //leader退出時,是否執行release方法 ReleaseOnCancel bool // Name is the name of the resource lock for debugging Name string }
LeaderElectionConfig.lock
支持保存在如下三種資源中:configmap
endpoint
lease
包中還提供了一個 multilock
,便可以進行選擇兩種,當其中一種保存失敗時,選擇第二張
能夠在interface.go中看到:
switch lockType { case EndpointsResourceLock://保存在endpoints return endpointsLock, nil case ConfigMapsResourceLock://保存在configmaps return configmapLock, nil case LeasesResourceLock://保存在leases return leaseLock, nil case EndpointsLeasesResourceLock://優先嚐試保存在endpoint失敗時保存在lease return &MultiLock{ Primary: endpointsLock, Secondary: leaseLock, }, nil case ConfigMapsLeasesResourceLock://優先嚐試保存在configmap,失敗時保存在lease return &MultiLock{ Primary: configmapLock, Secondary: leaseLock, }, nil default: return nil, fmt.Errorf("Invalid lock-type %s", lockType) }
以lease資源對象爲例,能夠在查看到保存的內容:
➜ ~ kubectl get lease example -n default -o yaml apiVersion: coordination.k8s.io/v1 kind: Lease metadata: creationTimestamp: "2020-02-15T11:56:37Z" name: example namespace: default resourceVersion: "210675" selfLink: /apis/coordination.k8s.io/v1/namespaces/default/leases/example uid: a3470a06-6fc3-42dc-8242-9d6cebdf5315 spec: acquireTime: "2020-02-15T12:01:41.476971Z"//得到鎖時間 holderIdentity: "2"//持有鎖進程的標識 leaseDurationSeconds: 60//lease租約 leaseTransitions: 1//leader更換次數 renewTime: "2020-02-15T12:05:37.134655Z"//更新租約的時間
關注其spec中的字段,分別進行標註,對應結構體以下:
type LeaderElectionRecord struct { HolderIdentity string `json:"holderIdentity"`//持有鎖進程的標識,通常能夠利用主機名 LeaseDurationSeconds int `json:"leaseDurationSeconds"`// lock的租約 AcquireTime metav1.Time `json:"acquireTime"`//持有鎖的時間 RenewTime metav1.Time `json:"renewTime"`//更新時間 LeaderTransitions int `json:"leaderTransitions"`//leader更換的次數 }
Run方法中包含了獲取鎖以及更新鎖的入口
// Run starts the leader election loop func (le *LeaderElector) Run(ctx context.Context) { defer func() { //進行退出執行 runtime.HandleCrash() //中止時執行回調方法 le.config.Callbacks.OnStoppedLeading() }() //不斷的進行得到鎖,若是得到鎖成功則執行後面的方法,不然不斷的進行重試 if !le.acquire(ctx) { return // ctx signalled done } ctx, cancel := context.WithCancel(ctx) defer cancel() //獲取鎖成功,當前進程變爲leader,執行回調函數中的業務代碼 go le.config.Callbacks.OnStartedLeading(ctx) //不斷的循環進行進行租約的更新,保證鎖一直被當前進行持有 le.renew(ctx) }
le.acquire
和 le.renew
內部都是調用了 le.tryAcquireOrRenew
函數,只是對於返回結果的處理不同。
le.acquire
對於 le.tryAcquireOrRenew
返回成功則退出,失敗則繼續。
le.renew
則相反,成功則繼續,失敗則退出。
咱們來看看 tryAcquireOrRenew
方法:
func (le *LeaderElector) tryAcquireOrRenew() bool { now := metav1.Now() //鎖資源對象內容 leaderElectionRecord := rl.LeaderElectionRecord{ HolderIdentity: le.config.Lock.Identity(),//惟一標識 LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second), RenewTime: now, AcquireTime: now, } // 1. obtain or create the ElectionRecord // 第一步:從k8s資源中獲取原有的鎖 oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get() if err != nil { if !errors.IsNotFound(err) { klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err) return false } //資源對象不存在,進行鎖資源建立 if err = le.config.Lock.Create(leaderElectionRecord); err != nil { klog.Errorf("error initially creating leader election record: %v", err) return false } le.observedRecord = leaderElectionRecord le.observedTime = le.clock.Now() return true } // 2. Record obtained, check the Identity & Time // 第二步,對比存儲在k8s中的鎖資源與上一次獲取的鎖資源是否一致 if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) { le.observedRecord = *oldLeaderElectionRecord le.observedRawRecord = oldLeaderElectionRawRecord le.observedTime = le.clock.Now() } //判斷持有的鎖是否到期以及是否被本身持有 if len(oldLeaderElectionRecord.HolderIdentity) > 0 && le.observedTime.Add(le.config.LeaseDuration).After(now.Time) && !le.IsLeader() { klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity) return false } // 3. We're going to try to update. The leaderElectionRecord is set to it's default // here. Let's correct it before updating. //第三步:本身如今是leader,可是分兩組狀況,上一次也是leader和首次變爲leader if le.IsLeader() { //本身自己就是leader則不須要更新AcquireTime和LeaderTransitions leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions } else { //首次本身變爲leader則更新leader的更換次數 leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1 } //更新鎖資源,這裏若是在 Get 和 Update 之間有變化,將會更新失敗 // update the lock itself if err = le.config.Lock.Update(leaderElectionRecord); err != nil { klog.Errorf("Failed to update lock: %v", err) return false } le.observedRecord = leaderElectionRecord le.observedTime = le.clock.Now() return true }
在這一步若是發生併發操做怎麼樣?
這裏很重要一點就是利用到了k8s api操做的原子性:
在 le.config.Lock.Get()
中會獲取到鎖的對象,其中有一個 resourceVersion
字段用於標識一個資源對象的內部版本,每次更新操做都會更新其值。若是一個更新操做附加上了 resourceVersion
字段,那麼 apiserver 就會經過驗證當前 resourceVersion
的值與指定的值是否相匹配來確保在這次更新操做週期內沒有其餘的更新操做,從而保證了更新操做的原子性。
leaderelection 主要是利用了k8s API操做的原子性實現了一個分佈式鎖,在不斷的競爭中進行選舉。選中爲leader的進行纔會執行具體的業務代碼,這在k8s中很是的常見,並且咱們很方便的利用這個包完成組件的編寫,從而實現組件的高可用,好比部署爲一個多副本的Deployment,當leader的pod退出後會從新啓動,可能鎖就被其餘pod獲取繼續執行。
完整代碼:https://github.com/go-demo/le...
關注"學點程序"公衆號,瞭解更多幹貨內容 !![]()