本身構建一個 k8s sample-controller.

更好的閱讀體驗建議點擊下方原文連接。
原文地址:http://maoqide.live/post/cloud/sample-controller/nginx

本身構建 sample-controller.
git

https://github.com/maoqide/sample-controller
https://github.com/kubernetes/sample-controllergithub

編寫 CRD 定義

sample-controller
├── hack
│   ├── boilerplate.go.txt
│   ├── custom-boilerplate.go.txt
│   ├── update-codegen.sh
│   └── verify-codegen.sh
└── pkg
    └── apis
        └── samplecontroller
            ├── register.go
            └── v1alpha1
                ├── doc.go
                ├── register.go
                └── types.go

首先,項目初始如上結構:
hack目錄下的腳本能夠複用,主要是調用了 https://github.com/kubernetes/code-generator 項目中的 generate-groups.sh 腳本,code-gengrator 項目 cmd 目錄下的代碼,須要提早go install生成對應二進制文件。
pkg目錄下的文件,須要本身手動編寫,pkg/apis/samplecontroller是 CRD 所屬的 apiGroupv1alpha1apiVersion,v1alpha1目錄下的types.go文件,包含了 CRD 類型 Foo 的完整定義。
pkg/apis/samplecontroller/register.go中,定義了後面所需的全局變量。
pkg/apis/samplecontroller/v1alpha1/doc.go中,包含了 +<tag-name>[=value] 格式的註釋,這就是 Kubernetes 進行源碼生成用的 Annotation 風格的註釋,doc.go 中的註釋,起到的是全局範圍的做用,包下面的每一個 go 文件,一樣能夠定義本身的 Annotation 註釋。(關於代碼生成,能夠看這篇文章)
pkg/apis/samplecontroller/v1alpha1/types.go,包含了Foo類型的完整定義。Foo是Kubernetes對象的標準定義;FooSpec是咱們須要定義的Foo類型的具體結構;FooList包含一組 Foo 對象,apiserver 的 List 接口,返回的是 List 對象類型;FooStatus描述Foo類型實例狀態的結構體,可使用+genclient:noStatus 註釋,則不須要定義FooStatus
pkg/apis/samplecontroller/v1alpha1/register.go,主要做用是經過addKnownTypes()方法,將咱們定義的 CRD 類型 Foo 添加到 Scheme。golang

代碼生成

pkg下的上述文件完成,便可執行./hack/update-codegen.sh,便可生成管理新定義的 CRD 類型所需的 Kubernetes 代碼:docker

sample-controller
├── hack
│   ├── boilerplate.go.txt
│   ├── custom-boilerplate.go.txt
│   ├── update-codegen.sh
│   └── verify-codegen.sh
└── pkg
    ├── apis
    │   └── samplecontroller
    │       ├── register.go
    │       └── v1alpha1
    │           ├── doc.go
    │           ├── register.go
    │           ├── types.go
    │           └── zz_generated.deepcopy.go
    └── generated
        ├── clientset
        │   └── versioned
        │       ├── clientset.go
        │       ├── doc.go
        │       ├── fake
        │       │   ├── clientset_generated.go
        │       │   ├── doc.go
        │       │   └── register.go
        │       ├── scheme
        │       │   ├── doc.go
        │       │   └── register.go
        │       └── typed
        │           └── samplecontroller
        │               └── v1alpha1
        │                   ├── doc.go
        │                   ├── fake
        │                   │   ├── doc.go
        │                   │   ├── fake_foo.go
        │                   │   └── fake_samplecontroller_client.go
        │                   ├── foo.go
        │                   ├── generated_expansion.go
        │                   └── samplecontroller_client.go
        ├── informers
        │   └── externalversions
        │       ├── factory.go
        │       ├── generic.go
        │       ├── internalinterfaces
        │       │   └── factory_interfaces.go
        │       └── samplecontroller
        │           ├── interface.go
        │           └── v1alpha1
        │               ├── foo.go
        │               └── interface.go
        └── listers
            └── samplecontroller
                └── v1alpha1
                    ├── expansion_generated.go
                    └── foo.go

自動生成了 clientsetinformerslisters 三個文件夾下的文件和apis下的zz_generated.deepcopy.go文件。
其中zz_generated.deepcopy.go中包含 pkg/apis/samplecontroller/v1alpha1/types.go 中定義的結構體的 DeepCopy() 方法。
另外三個文件夾clientsetinformerslisters下都是 Kubernetes 生成的客戶端庫,在 controller 中會用到。api

controller 代碼編寫

接下來就是編寫具體 controller 的代碼,經過上述步驟生成的客戶端庫訪問 apiserver,監聽 CRD 資源的變化,並觸發對應的動做,如建立或刪除 Deployment 等。緩存

編寫自定義controller(Operator)時,可使用 Kubernetes 提供的 client-go 客戶端庫。下圖是 Kubernetes 提供的在使用client-go開發 controller 的過程當中,client-go 和 controller 的交互流程:
安全

client-go 組件

  • Reflector: 定義在 cache 包的 Reflector 類中,它監聽特定資源類型(Kind)的 Kubernetes API,在ListAndWatch方法中執行。監聽的對象能夠是 Kubernetes 的內置資源類型或者是自定義資源類型。當 reflector 經過 watch API 發現新的資源實例被建立,它將經過對應的 list API 獲取到新建立的對象並在watchHandler方法中將其加入到Delta Fifo隊列中。app

  • Informer: 定義在 cache 包的 base controller 中,它從Delta Fifo隊列中 pop 出對象,在processLoop方法中執行。base controller 的工做是將對象保存一遍後續獲取,並調用 controller 將對象傳給 controller。ide

  • Indexer: 提供對象的 indexing 方法,定義在 cache 包的 Indexer中。一個典型的 indexing 的應用場景是基於對象的 label 建立索引。Indexer 基於幾個 indexing 方法維護索引,它使用線程安全的 data store 來存儲對象和他們的key。在 cache 包的 Store 類中定義了一個名爲MetaNamespaceKeyFunc的默認方法,能夠爲對象生成一個<namespace>/<name>形式的key。

自定義 controller 組件

  • Informer reference: 它是對 Informer 實例的引用,知道如何使用自定義資源對象。你編寫的自定義 controller 須要建立正確的 Informer。
  • Indexer reference: 它是對 Indexer 實例的引用,你編寫的自定義 controller 代碼中須要建立它,在獲取對象供後續使用時你會用到這個引用。

client-go 中的 base controller 提供了NewIndexerInformer來建立 Informer 和 Indexer。在你的代碼中,你能夠直接使用 此方法,或者使用 工廠方法 建立 informer。

  • Resource Event Handlers: 一些回調方法,當 Informer 想要發送一個對象給 controller 時,會調用這些方法。典型的編寫回調方法的模式,是獲取資源對象的 key 並放入一個 work queue隊列,等待進一步的處理(Proceess item)。
  • Work queue: 在 controller 代碼中建立的隊列,用來解耦對象的傳遞和對應的處理。Resource Event Handlers 的方法就是用來接收對象並將其加入 work queue
  • Process Item: 在 controller 代碼中建立的方法,用來對work queue中的對象作對應處理,能夠有一個或多個其餘的方法實際作處理,這些方法通常會使用Indexer reference,或者 list 方法來獲取 key 對應的對象。

編寫自定義 controller

以 sample-controller 爲例,總體流程以下:

/*
*** main.go
*/
// 建立 clientset
kubeClient, err := kubernetes.NewForConfig(cfg)     // k8s clientset, "k8s.io/client-go/kubernetes"
exampleClient, err := clientset.NewForConfig(cfg)   // sample clientset, "k8s.io/sample-controller/pkg/generated/clientset/versioned"

// 建立 Informer
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)       // k8s informer, "k8s.io/client-go/informers"
exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)     // sample informer, "k8s.io/sample-controller/pkg/generated/informers/externalversions"

// 建立 controller,傳入 clientset 和 informer
controller := NewController(kubeClient, exampleClient,
        kubeInformerFactory.Apps().V1().Deployments(),
        exampleInformerFactory.Samplecontroller().V1alpha1().Foos())

// 運行 Informer,Start 方法爲非阻塞,會運行在單獨的 goroutine 中
kubeInformerFactory.Start(stopCh)   
exampleInformerFactory.Start(stopCh)

// 運行 controller
controller.Run(2, stopCh)

/*
*** controller.go 
*/
NewController() *Controller {}
    // 將 CRD 資源類型定義加入到 Kubernetes 的 Scheme 中,以便 Events 能夠記錄 CRD 的事件
    utilruntime.Must(samplescheme.AddToScheme(scheme.Scheme))

    // 建立 Broadcaster
    eventBroadcaster := record.NewBroadcaster()
    // ... ...

    // 監聽 CRD 類型'Foo'並註冊 ResourceEventHandler 方法,當'Foo'的實例變化時進行處理
    fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: controller.enqueueFoo,
        UpdateFunc: func(old, new interface{}) {
            controller.enqueueFoo(new)
        },
    })

    // 監聽 Deployment 變化並註冊 ResourceEventHandler 方法,
    // 當它的 ownerReferences 爲 Foo 類型實例時,將該 Foo 資源加入 work queue
    deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: controller.handleObject,
        UpdateFunc: func(old, new interface{}) {
            newDepl := new.(*appsv1.Deployment)
            oldDepl := old.(*appsv1.Deployment)
            if newDepl.ResourceVersion == oldDepl.ResourceVersion {
                return
            }
            controller.handleObject(new)
        },
        DeleteFunc: controller.handleObject,
    })

func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {}
    // 在啓動 worker 前等待緩存同步
    if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok {
        return fmt.Errorf("failed to wait for caches to sync")
    }
    // 運行兩個 worker 來處理資源
    for i := 0; i < threadiness; i++ {
        go wait.Until(c.runWorker, time.Second, stopCh)
    }
    // 無限循環,不斷的調用 processNextWorkItem 處理下一個對象
    func (c *Controller) runWorker() {
        for c.processNextWorkItem() {
        }
    }
    // 從workqueue中獲取下一個對象並進行處理,經過調用 syncHandler
    func (c *Controller) processNextWorkItem() bool {
        obj, shutdown := c.workqueue.Get()
        if shutdown {
            return false
        }
        err := func(obj interface{}) error {
            // 調用 workqueue.Done(obj) 方法告訴 workqueue 當前項已經處理完畢,
            // 若是咱們不想讓當前項從新入隊,必定要調用 workqueue.Forget(obj)。
            // 當咱們沒有調用Forget時,當前項會從新入隊 workqueue 並在一段時間後從新被獲取。
            defer c.workqueue.Done(obj)
            var key string
            var ok bool
            // 咱們指望的是 key 'namespace/name' 格式的 string
            if key, ok = obj.(string); !ok {
                // 無效的項調用Forget方法,避免從新入隊。
                c.workqueue.Forget(obj)
                utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
                return nil
            }
            if err := c.syncHandler(key); err != nil {
                // 放回workqueue避免偶發的異常
                c.workqueue.AddRateLimited(key)
                return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
            }
            // 若是沒有異常,Forget當前項,同步成功
            c.workqueue.Forget(obj)
            klog.Infof("Successfully synced '%s'", key)
            return nil
        }(obj)
        if err != nil {
            utilruntime.HandleError(err)
            return true
        }

        return true
    }
    // 對比真實的狀態和指望的狀態並嘗試合併,而後更新Foo類型實例的狀態信息
    func (c *Controller) syncHandler(key string) error {
        // 經過 workqueue 中的 key 解析出 namespace 和 name
        namespace, name, err := cache.SplitMetaNamespaceKey(key)
        // 調用 lister 接口經過 namespace 和 name 獲取 Foo 實例
        foo, err := c.foosLister.Foos(namespace).Get(name)
        deploymentName := foo.Spec.DeploymentName
        // 獲取 Foo 實例中定義的 deploymentname
        deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
        // 沒有發現對應的 deployment,新建一個
        if errors.IsNotFound(err) {
            deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(newDeployment(foo))
        }
        // OwnerReferences 不是 Foo 實例,warning並返回錯誤
        if !metav1.IsControlledBy(deployment, foo) {
            msg := fmt.Sprintf(MessageResourceExists, deployment.Name)
            c.recorder.Event(foo, corev1.EventTypeWarning, ErrResourceExists, msg)
            return fmt.Errorf(msg)
        }
        // deployment 中 的配置和 Foo 實例中 Spec 的配置不一致,即更新 deployment
        if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas {
            deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(newDeployment(foo))
        }
        // 更新 Foo 實例狀態
        err = c.updateFooStatus(foo, deployment)
        c.recorder.Event(foo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
    }

接下來編寫對應的 CRD 和 對應 CRD 實例的 yaml 文件及 operator 的 Dockerfile:

sample-controller
├── artifacts
│   └── examples
│       ├── crd.yaml
│       └── example-foo.yaml
├── controller.go
├── Dockerfile
├── hack
│   ├── boilerplate.go.txt
│   ├── custom-boilerplate.go.txt
│   ├── update-codegen.sh
│   └── verify-codegen.sh
├── main.go
└── pkg
    ├── apis
    │   └── samplecontroller
    │       ├── register.go
    │       └── v1alpha1
    │           ├── doc.go
    │           ├── register.go
    │           ├── types.go
    │           └── zz_generated.deepcopy.go
    ├── generated
    │   ├── clientset
    │   │   └── ...
    │   ├── informers
    │   │   └── ...
    │   └── listers
    │       └── ...
    └── signals
        └── signal.go

部署到 k8s

controller 鏡像 Dockerfile:

FROM golang
RUN mkdir -p /go/src/k8s.io/sample-controller
ADD . /go/src/k8s.io/sample-controller
WORKDIR /go
RUN go get -v ./...
RUN go install -v ./...
CMD ["/go/bin/sample-controller"]

controller RBAC 及 Deployment yaml:

apiVersion: apps/v1beta1
kind: Deployment
metadata:
  name: sample-controller
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: sample
    spec:
      containers:
      - name: sample
        image: "maoqide/sample-controller"
---

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: operator-role
rules:
- apiGroups:
  - ""
  resources:
  - events
  verbs:
  - get
  - list
  - watch
  - create
  - update
  - patch
  - delete
- apiGroups:
  - apps
  resources:
  - deployments
  - events
  verbs:
  - get
  - list
  - watch
  - create
  - update
  - patch
  - delete
- apiGroups:
  - samplecontroller.k8s.io
  resources:
  - foos
  verbs:
  - get
  - list
  - watch
  - create
  - update
  - patch
  - delete
---

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: operator-rolebinding
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: operator-role
subjects:
- kind: ServiceAccount
  name: default
  namespace: default

將 operator 部署到 k8s 中並建立一個 CRD 對象,便可看到 operator 自動按照 CRD 對象 的配置建立出一個 nginx Deployment。

相關文章
相關標籤/搜索