更好的閱讀體驗建議點擊下方原文連接。
原文地址:http://maoqide.live/post/cloud/sample-controller/nginx
本身構建 sample-controller.
git
https://github.com/maoqide/sample-controller
https://github.com/kubernetes/sample-controllergithub
sample-controller ├── hack │ ├── boilerplate.go.txt │ ├── custom-boilerplate.go.txt │ ├── update-codegen.sh │ └── verify-codegen.sh └── pkg └── apis └── samplecontroller ├── register.go └── v1alpha1 ├── doc.go ├── register.go └── types.go
首先,項目初始如上結構:
hack
目錄下的腳本能夠複用,主要是調用了 https://github.com/kubernetes/code-generator 項目中的 generate-groups.sh
腳本,code-gengrator 項目 cmd
目錄下的代碼,須要提早go install
生成對應二進制文件。
pkg
目錄下的文件,須要本身手動編寫,pkg/apis/samplecontroller
是 CRD 所屬的 apiGroup
,v1alpha1
是 apiVersion
,v1alpha1目錄下的types.go
文件,包含了 CRD 類型 Foo
的完整定義。
pkg/apis/samplecontroller/register.go
中,定義了後面所需的全局變量。
pkg/apis/samplecontroller/v1alpha1/doc.go
中,包含了 +<tag-name>[=value]
格式的註釋,這就是 Kubernetes 進行源碼生成用的 Annotation 風格的註釋,doc.go 中的註釋,起到的是全局範圍的做用,包下面的每一個 go 文件,一樣能夠定義本身的 Annotation 註釋。(關於代碼生成,能夠看這篇文章)
pkg/apis/samplecontroller/v1alpha1/types.go
,包含了Foo
類型的完整定義。Foo
是Kubernetes對象的標準定義;FooSpec
是咱們須要定義的Foo
類型的具體結構;FooList
包含一組 Foo
對象,apiserver 的 List 接口,返回的是 List 對象類型;FooStatus
描述Foo
類型實例狀態的結構體,可使用+genclient:noStatus
註釋,則不須要定義FooStatus
。
pkg/apis/samplecontroller/v1alpha1/register.go
,主要做用是經過addKnownTypes()
方法,將咱們定義的 CRD 類型 Foo
添加到 Scheme。golang
pkg
下的上述文件完成,便可執行./hack/update-codegen.sh
,便可生成管理新定義的 CRD 類型所需的 Kubernetes 代碼:docker
sample-controller ├── hack │ ├── boilerplate.go.txt │ ├── custom-boilerplate.go.txt │ ├── update-codegen.sh │ └── verify-codegen.sh └── pkg ├── apis │ └── samplecontroller │ ├── register.go │ └── v1alpha1 │ ├── doc.go │ ├── register.go │ ├── types.go │ └── zz_generated.deepcopy.go └── generated ├── clientset │ └── versioned │ ├── clientset.go │ ├── doc.go │ ├── fake │ │ ├── clientset_generated.go │ │ ├── doc.go │ │ └── register.go │ ├── scheme │ │ ├── doc.go │ │ └── register.go │ └── typed │ └── samplecontroller │ └── v1alpha1 │ ├── doc.go │ ├── fake │ │ ├── doc.go │ │ ├── fake_foo.go │ │ └── fake_samplecontroller_client.go │ ├── foo.go │ ├── generated_expansion.go │ └── samplecontroller_client.go ├── informers │ └── externalversions │ ├── factory.go │ ├── generic.go │ ├── internalinterfaces │ │ └── factory_interfaces.go │ └── samplecontroller │ ├── interface.go │ └── v1alpha1 │ ├── foo.go │ └── interface.go └── listers └── samplecontroller └── v1alpha1 ├── expansion_generated.go └── foo.go
自動生成了 clientset
,informers
,listers
三個文件夾下的文件和apis
下的zz_generated.deepcopy.go
文件。
其中zz_generated.deepcopy.go
中包含 pkg/apis/samplecontroller/v1alpha1/types.go
中定義的結構體的 DeepCopy()
方法。
另外三個文件夾clientset
,informers
,listers
下都是 Kubernetes 生成的客戶端庫,在 controller 中會用到。api
接下來就是編寫具體 controller 的代碼,經過上述步驟生成的客戶端庫訪問 apiserver,監聽 CRD 資源的變化,並觸發對應的動做,如建立或刪除 Deployment
等。緩存
編寫自定義controller(Operator)時,可使用 Kubernetes 提供的 client-go
客戶端庫。下圖是 Kubernetes 提供的在使用client-go
開發 controller 的過程當中,client-go
和 controller 的交互流程:
安全
Reflector: 定義在 cache 包的 Reflector 類中,它監聽特定資源類型(Kind)的 Kubernetes API,在ListAndWatch
方法中執行。監聽的對象能夠是 Kubernetes 的內置資源類型或者是自定義資源類型。當 reflector 經過 watch API 發現新的資源實例被建立,它將經過對應的 list API 獲取到新建立的對象並在watchHandler
方法中將其加入到Delta Fifo
隊列中。app
Informer: 定義在 cache 包的 base controller 中,它從Delta Fifo
隊列中 pop 出對象,在processLoop
方法中執行。base controller 的工做是將對象保存一遍後續獲取,並調用 controller 將對象傳給 controller。ide
Indexer: 提供對象的 indexing 方法,定義在 cache 包的 Indexer中。一個典型的 indexing 的應用場景是基於對象的 label 建立索引。Indexer 基於幾個 indexing 方法維護索引,它使用線程安全的 data store 來存儲對象和他們的key。在 cache 包的 Store 類中定義了一個名爲MetaNamespaceKeyFunc
的默認方法,能夠爲對象生成一個<namespace>/<name>
形式的key。
client-go 中的 base controller 提供了NewIndexerInformer
來建立 Informer 和 Indexer。在你的代碼中,你能夠直接使用 此方法,或者使用 工廠方法 建立 informer。
work queue
隊列,等待進一步的處理(Proceess item)。work queue
。work queue
中的對象作對應處理,能夠有一個或多個其餘的方法實際作處理,這些方法通常會使用Indexer reference
,或者 list 方法來獲取 key 對應的對象。以 sample-controller 爲例,總體流程以下:
/* *** main.go */ // 建立 clientset kubeClient, err := kubernetes.NewForConfig(cfg) // k8s clientset, "k8s.io/client-go/kubernetes" exampleClient, err := clientset.NewForConfig(cfg) // sample clientset, "k8s.io/sample-controller/pkg/generated/clientset/versioned" // 建立 Informer kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30) // k8s informer, "k8s.io/client-go/informers" exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30) // sample informer, "k8s.io/sample-controller/pkg/generated/informers/externalversions" // 建立 controller,傳入 clientset 和 informer controller := NewController(kubeClient, exampleClient, kubeInformerFactory.Apps().V1().Deployments(), exampleInformerFactory.Samplecontroller().V1alpha1().Foos()) // 運行 Informer,Start 方法爲非阻塞,會運行在單獨的 goroutine 中 kubeInformerFactory.Start(stopCh) exampleInformerFactory.Start(stopCh) // 運行 controller controller.Run(2, stopCh) /* *** controller.go */ NewController() *Controller {} // 將 CRD 資源類型定義加入到 Kubernetes 的 Scheme 中,以便 Events 能夠記錄 CRD 的事件 utilruntime.Must(samplescheme.AddToScheme(scheme.Scheme)) // 建立 Broadcaster eventBroadcaster := record.NewBroadcaster() // ... ... // 監聽 CRD 類型'Foo'並註冊 ResourceEventHandler 方法,當'Foo'的實例變化時進行處理 fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueueFoo, UpdateFunc: func(old, new interface{}) { controller.enqueueFoo(new) }, }) // 監聽 Deployment 變化並註冊 ResourceEventHandler 方法, // 當它的 ownerReferences 爲 Foo 類型實例時,將該 Foo 資源加入 work queue deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.handleObject, UpdateFunc: func(old, new interface{}) { newDepl := new.(*appsv1.Deployment) oldDepl := old.(*appsv1.Deployment) if newDepl.ResourceVersion == oldDepl.ResourceVersion { return } controller.handleObject(new) }, DeleteFunc: controller.handleObject, }) func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {} // 在啓動 worker 前等待緩存同步 if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok { return fmt.Errorf("failed to wait for caches to sync") } // 運行兩個 worker 來處理資源 for i := 0; i < threadiness; i++ { go wait.Until(c.runWorker, time.Second, stopCh) } // 無限循環,不斷的調用 processNextWorkItem 處理下一個對象 func (c *Controller) runWorker() { for c.processNextWorkItem() { } } // 從workqueue中獲取下一個對象並進行處理,經過調用 syncHandler func (c *Controller) processNextWorkItem() bool { obj, shutdown := c.workqueue.Get() if shutdown { return false } err := func(obj interface{}) error { // 調用 workqueue.Done(obj) 方法告訴 workqueue 當前項已經處理完畢, // 若是咱們不想讓當前項從新入隊,必定要調用 workqueue.Forget(obj)。 // 當咱們沒有調用Forget時,當前項會從新入隊 workqueue 並在一段時間後從新被獲取。 defer c.workqueue.Done(obj) var key string var ok bool // 咱們指望的是 key 'namespace/name' 格式的 string if key, ok = obj.(string); !ok { // 無效的項調用Forget方法,避免從新入隊。 c.workqueue.Forget(obj) utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) return nil } if err := c.syncHandler(key); err != nil { // 放回workqueue避免偶發的異常 c.workqueue.AddRateLimited(key) return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) } // 若是沒有異常,Forget當前項,同步成功 c.workqueue.Forget(obj) klog.Infof("Successfully synced '%s'", key) return nil }(obj) if err != nil { utilruntime.HandleError(err) return true } return true } // 對比真實的狀態和指望的狀態並嘗試合併,而後更新Foo類型實例的狀態信息 func (c *Controller) syncHandler(key string) error { // 經過 workqueue 中的 key 解析出 namespace 和 name namespace, name, err := cache.SplitMetaNamespaceKey(key) // 調用 lister 接口經過 namespace 和 name 獲取 Foo 實例 foo, err := c.foosLister.Foos(namespace).Get(name) deploymentName := foo.Spec.DeploymentName // 獲取 Foo 實例中定義的 deploymentname deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName) // 沒有發現對應的 deployment,新建一個 if errors.IsNotFound(err) { deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(newDeployment(foo)) } // OwnerReferences 不是 Foo 實例,warning並返回錯誤 if !metav1.IsControlledBy(deployment, foo) { msg := fmt.Sprintf(MessageResourceExists, deployment.Name) c.recorder.Event(foo, corev1.EventTypeWarning, ErrResourceExists, msg) return fmt.Errorf(msg) } // deployment 中 的配置和 Foo 實例中 Spec 的配置不一致,即更新 deployment if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas { deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(newDeployment(foo)) } // 更新 Foo 實例狀態 err = c.updateFooStatus(foo, deployment) c.recorder.Event(foo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced) }
接下來編寫對應的 CRD 和 對應 CRD 實例的 yaml 文件及 operator 的 Dockerfile:
sample-controller ├── artifacts │ └── examples │ ├── crd.yaml │ └── example-foo.yaml ├── controller.go ├── Dockerfile ├── hack │ ├── boilerplate.go.txt │ ├── custom-boilerplate.go.txt │ ├── update-codegen.sh │ └── verify-codegen.sh ├── main.go └── pkg ├── apis │ └── samplecontroller │ ├── register.go │ └── v1alpha1 │ ├── doc.go │ ├── register.go │ ├── types.go │ └── zz_generated.deepcopy.go ├── generated │ ├── clientset │ │ └── ... │ ├── informers │ │ └── ... │ └── listers │ └── ... └── signals └── signal.go
controller 鏡像 Dockerfile:
FROM golang RUN mkdir -p /go/src/k8s.io/sample-controller ADD . /go/src/k8s.io/sample-controller WORKDIR /go RUN go get -v ./... RUN go install -v ./... CMD ["/go/bin/sample-controller"]
controller RBAC 及 Deployment yaml:
apiVersion: apps/v1beta1 kind: Deployment metadata: name: sample-controller spec: replicas: 1 template: metadata: labels: app: sample spec: containers: - name: sample image: "maoqide/sample-controller" --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: name: operator-role rules: - apiGroups: - "" resources: - events verbs: - get - list - watch - create - update - patch - delete - apiGroups: - apps resources: - deployments - events verbs: - get - list - watch - create - update - patch - delete - apiGroups: - samplecontroller.k8s.io resources: - foos verbs: - get - list - watch - create - update - patch - delete --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: name: operator-rolebinding roleRef: apiGroup: rbac.authorization.k8s.io kind: ClusterRole name: operator-role subjects: - kind: ServiceAccount name: default namespace: default
將 operator 部署到 k8s 中並建立一個 CRD 對象,便可看到 operator 自動按照 CRD 對象 的配置建立出一個 nginx Deployment。