登陸能夠執行kubectl命令的機器,建立student.yamlnode
apiVersion: apiextensions.k8s.io/v1beta1 kind: CustomResourceDefinition metadata: # metadata.name的內容是由"複數名.分組名"構成,以下,students是複數名,bolingcavalry.k8s.io是分組名 name: students.bolingcavalry.k8s.io spec: # 分組名,在REST API中也會用到的,格式是: /apis/分組名/CRD版本 group: bolingcavalry.k8s.io # list of versions supported by this CustomResourceDefinition versions: - name: v1 # 是否有效的開關. served: true # 只有一個版本能被標註爲storage storage: true # 範圍是屬於namespace的 scope: Namespaced names: # 複數名 plural: students # 單數名 singular: student # 類型名 kind: Student # 簡稱,就像service的簡稱是svc shortNames: - stu
在student.yaml所在目錄執行命令kubectl apply -f student.yaml,便可在k8s環境建立Student的定義,從此若是發起對類型爲Student的對象的處理,k8s的api server就能識別到該對象類型了git
前面的步驟使得k8s能識別Student類型了,接下來建立Students對象github
建立object-student.yaml文件golang
apiVersion: bolingcavalry.k8s.io/v1 kind: Student metadata: name: object-student spec: name: "張三" school: "深圳中學"
在object-student.yaml文件所在目錄執行命令kubectl apply -f object-student.yaml,會看到提示建立成功docker
執行命令kubectl get stu可見已建立成功的Student對象json
至此,自定義API對象(也就是CRD)就建立成功了,此刻咱們只是讓k8s能識別到Student這個對象的身份,可是當咱們建立Student對象的時候,尚未觸發任何業務(相對於建立Pod對象的時候,會觸發kubelet在node節點建立docker容器)windows
若是僅僅是在etcd保存Student對象是沒有什麼意義的,試想經過deployment建立pod時,若是隻在etcd建立pod對象,而不去node節點建立容器,那這個pod對象只是一條數據而已,沒有什麼實質性做用,其餘對象如service、pv也是如此。api
controller的做用就是監聽指定對象的新增、刪除、修改等變化,針對這些變化作出相應的響應(例如新增pod的響應爲建立docker容器)緩存
如上圖,API對象的變化會經過Informer存入隊列(WorkQueue),在Controller中消費隊列的數據作出響應,響應相關的具體代碼就是咱們要作的真正業務邏輯。bash
從上圖能夠發現整個邏輯仍是比較複雜的,爲了簡化咱們的自定義controller開發,k8s的大師們利用自動代碼生成工具將controller以外的事情都作好了,咱們只要專一於controller的開發就好。
接下來要作的事情就是編寫API對象Student相關的聲明的定義代碼,而後用代碼生成工具結合這些代碼,自動生成Client、Informet、WorkQueue相關的代碼;
在$GOPATH/src目錄下建立一個文件夾k8s_customize_controller
進入文件夾執行以下命令建立三層目錄
mkdir -p pkg/apis/bolingcavalry
在新建的bolingcavalry目錄下建立文件register.go
package bolingcavalry const( GroupName = "bolingcavalry.k8s.io" Version = "v1" )
在新建的bolingcavalry目錄下建立名爲v1的文件夾
在v1文件夾下建立文件doc.go
package v1
上述代碼中的兩行註釋,都是代碼生成工具會用到的,一個是聲明爲整個v1包下的類型定義生成DeepCopy方法,另外一個聲明瞭這個包對應的API的組名,和CRD中的組名一致;
在v1文件夾建立文件types.go,裏面定義Student對象的具體內容
package v1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // +genclient // +genclient:noStatus // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object type Student struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` Spec StudentSpec `json:"spec"` } type StudentSpec struct { name string `json:"name"` school string `json:"school"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // StudentList is a list of Student resources type StudentList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata"` Items []Student `json:"items"` }
從上述源碼可見,Student對象的內容已經被設定好,主要有name和school這兩個字段,表示學生的名字和所在學校,所以建立Student對象的時候內容就要和這裏匹配了;
在v1目錄下建立register.go文件,此文件的做用是經過addKnownTypes方法使得client能夠知道Student類型的API對象
package v1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s_customize_controller/pkg/apis/bolingcavalry" ) var SchemeGroupVersion = schema.GroupVersion{ Group: bolingcavalry.GroupName, Version: bolingcavalry.Version, } var ( SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes) AddToScheme = SchemeBuilder.AddToScheme ) func Resource(resource string) schema.GroupResource { return SchemeGroupVersion.WithResource(resource).GroupResource() } func Kind(kind string) schema.GroupKind { return SchemeGroupVersion.WithKind(kind).GroupKind() } func addKnownTypes(scheme *runtime.Scheme) error { scheme.AddKnownTypes( SchemeGroupVersion, &Student{}, &StudentList{}, ) // register the type in the scheme metav1.AddToGroupVersion(scheme, SchemeGroupVersion) return nil }
至此,爲自動生成代碼作的準備工做已經完成
執行如下命令,會先下載依賴包,再下載代碼生成工具,再執行代碼生成工做:
cd $GOPATH/src \ && go get -u -v k8s.io/apimachinery/pkg/apis/meta/v1 \ && go get -u -v k8s.io/code-generator/... \ && cd $GOPATH/src/k8s.io/code-generator \ && ./generate-groups.sh all \ k8s_customize_controller/pkg/client \ k8s_customize_controller/pkg/apis \ bolingcavalry:v1 #若是code-generator安裝失敗(網絡緣由),能夠手動下載代碼安裝,在執行上面命令 git clone https://github.com/kubernetes/code-generator ./generate-groups.sh all "$ROOT_PACKAGE/pkg/client" "$ROOT_PACKAGE/pkg/apis" "$CUSTOM_RESOURCE_NAME:$CUSTOME_RESOURCE_VERSION"
若是代碼沒問題,會看到如下輸出
Generating deepcopy funcs Generating clientset for bolingcavalry:v1 at k8s_customize_controller/pkg/client/clientset Generating listers for bolingcavalry:v1 at k8s_customize_controller/pkg/client/listers Generating informers for bolingcavalry:v1 at k8s_customize_controller/pkg/client/informers
此時再去$GOPATH/src/k8s_customize_controller目錄下執行tree命令,可見已生成了不少內容
[root@master k8s_customize_controller]# tree . └── pkg ├── apis │ └── bolingcavalry │ ├── register.go │ └── v1 │ ├── doc.go │ ├── register.go │ ├── types.go │ └── zz_generated.deepcopy.go └── client ├── clientset │ └── versioned │ ├── clientset.go │ ├── doc.go │ ├── fake │ │ ├── clientset_generated.go │ │ ├── doc.go │ │ └── register.go │ ├── scheme │ │ ├── doc.go │ │ └── register.go │ └── typed │ └── bolingcavalry │ └── v1 │ ├── bolingcavalry_client.go │ ├── doc.go │ ├── fake │ │ ├── doc.go │ │ ├── fake_bolingcavalry_client.go │ │ └── fake_student.go │ ├── generated_expansion.go │ └── student.go ├── informers │ └── externalversions │ ├── bolingcavalry │ │ ├── interface.go │ │ └── v1 │ │ ├── interface.go │ │ └── student.go │ ├── factory.go │ ├── generic.go │ └── internalinterfaces │ └── factory_interfaces.go └── listers └── bolingcavalry └── v1 ├── expansion_generated.go └── student.go 21 directories, 27 files
如上所示,zz_generated.deepcopy.go就是DeepCopy代碼文件,client目錄下的內容都是客戶端相關代碼,在開發controller時會用到;
client目錄下的clientset、informers、listers的身份和做用能夠和前面的圖結合來理解
如今已經能監聽到Student對象的增刪改等事件,接下來就是根據這些事件來作不一樣的事情,知足個性化的業務需求
編寫的第一個go文件就是controller.go,在k8s_customize_controller目錄下建立controller.go
package main import ( "fmt" "time" "github.com/golang/glog" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" bolingcavalryv1 "github.com/zq2599/k8s-controller-custom-resource/pkg/apis/bolingcavalry/v1" clientset "github.com/zq2599/k8s-controller-custom-resource/pkg/client/clientset/versioned" studentscheme "github.com/zq2599/k8s-controller-custom-resource/pkg/client/clientset/versioned/scheme" informers "github.com/zq2599/k8s-controller-custom-resource/pkg/client/informers/externalversions/bolingcavalry/v1" listers "github.com/zq2599/k8s-controller-custom-resource/pkg/client/listers/bolingcavalry/v1" ) const controllerAgentName = "student-controller" const ( SuccessSynced = "Synced" MessageResourceSynced = "Student synced successfully" ) // Controller is the controller implementation for Student resources type Controller struct { // kubeclientset is a standard kubernetes clientset kubeclientset kubernetes.Interface // studentclientset is a clientset for our own API group studentclientset clientset.Interface studentsLister listers.StudentLister studentsSynced cache.InformerSynced workqueue workqueue.RateLimitingInterface recorder record.EventRecorder } // NewController returns a new student controller func NewController( kubeclientset kubernetes.Interface, studentclientset clientset.Interface, studentInformer informers.StudentInformer) *Controller { utilruntime.Must(studentscheme.AddToScheme(scheme.Scheme)) glog.V(4).Info("Creating event broadcaster") eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) controller := &Controller{ kubeclientset: kubeclientset, studentclientset: studentclientset, studentsLister: studentInformer.Lister(), studentsSynced: studentInformer.Informer().HasSynced, workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Students"), recorder: recorder, } glog.Info("Setting up event handlers") // Set up an event handler for when Student resources change studentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueueStudent, UpdateFunc: func(old, new interface{}) { oldStudent := old.(*bolingcavalryv1.Student) newStudent := new.(*bolingcavalryv1.Student) if oldStudent.ResourceVersion == newStudent.ResourceVersion { //版本一致,就表示沒有實際更新的操做,當即返回 return } controller.enqueueStudent(new) }, DeleteFunc: controller.enqueueStudentForDelete, }) return controller } //在此處開始controller的業務 func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { defer runtime.HandleCrash() defer c.workqueue.ShutDown() glog.Info("開始controller業務,開始一次緩存數據同步") if ok := cache.WaitForCacheSync(stopCh, c.studentsSynced); !ok { return fmt.Errorf("failed to wait for caches to sync") } glog.Info("worker啓動") for i := 0; i < threadiness; i++ { go wait.Until(c.runWorker, time.Second, stopCh) } glog.Info("worker已經啓動") <-stopCh glog.Info("worker已經結束") return nil } func (c *Controller) runWorker() { for c.processNextWorkItem() { } } // 取數據處理 func (c *Controller) processNextWorkItem() bool { obj, shutdown := c.workqueue.Get() if shutdown { return false } // We wrap this block in a func so we can defer c.workqueue.Done. err := func(obj interface{}) error { defer c.workqueue.Done(obj) var key string var ok bool if key, ok = obj.(string); !ok { c.workqueue.Forget(obj) runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) return nil } // 在syncHandler中處理業務 if err := c.syncHandler(key); err != nil { return fmt.Errorf("error syncing '%s': %s", key, err.Error()) } c.workqueue.Forget(obj) glog.Infof("Successfully synced '%s'", key) return nil }(obj) if err != nil { runtime.HandleError(err) return true } return true } // 處理 func (c *Controller) syncHandler(key string) error { // Convert the namespace/name string into a distinct namespace and name namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { runtime.HandleError(fmt.Errorf("invalid resource key: %s", key)) return nil } // 從緩存中取對象 student, err := c.studentsLister.Students(namespace).Get(name) if err != nil { // 若是Student對象被刪除了,就會走到這裏,因此應該在這裏加入執行 if errors.IsNotFound(err) { glog.Infof("Student對象被刪除,請在這裏執行實際的刪除業務: %s/%s ...", namespace, name) return nil } runtime.HandleError(fmt.Errorf("failed to list student by: %s/%s", namespace, name)) return err } glog.Infof("這裏是student對象的指望狀態: %#v ...", student) glog.Infof("實際狀態是從業務層面獲得的,此處應該去的實際狀態,與指望狀態作對比,並根據差別作出響應(新增或者刪除)") c.recorder.Event(student, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced) return nil } // 數據先放入緩存,再入隊列 func (c *Controller) enqueueStudent(obj interface{}) { var key string var err error // 將對象放入緩存 if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { runtime.HandleError(err) return } // 將key放入隊列 c.workqueue.AddRateLimited(key) } // 刪除操做 func (c *Controller) enqueueStudentForDelete(obj interface{}) { var key string var err error // 從緩存中刪除指定對象 key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { runtime.HandleError(err) return } //再將key放入隊列 c.workqueue.AddRateLimited(key) }
上述代碼有如下幾處關鍵點:
a. 建立controller的NewController方法中,定義了收到Student對象的增刪改消息時的具體處理邏輯,除了同步本地緩存,就是將該對象的key放入消息中;
b. 實際處理消息的方法是syncHandler,這裏面能夠添加實際的業務代碼,來響應Student對象的增刪改狀況,達到業務目的;
接下來能夠寫main.go了,不過在此以前把處理系統信號量的輔助類先寫好,而後在main.go中會用到(處理例如ctrl+c的退出),在$GOPATH/src/k8s_customize_controller/pkg目錄下新建目錄signals;
在signals目錄下新建文件signal_posix.go:
// +build !windows package signals import ( "os" "syscall" ) var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
在signals目錄下新建文件signal_windows.go
package signals import ( "os" ) var shutdownSignals = []os.Signal{os.Interrupt}
在signals目錄下新建文件signal.go
package signals import ( "os" "os/signal" ) var onlyOneSignalHandler = make(chan struct{}) func SetupSignalHandler() (stopCh <-chan struct{}) { close(onlyOneSignalHandler) // panics when called twice stop := make(chan struct{}) c := make(chan os.Signal, 2) signal.Notify(c, shutdownSignals...) go func() { <-c close(stop) <-c os.Exit(1) // second signal. Exit directly. }() return stop }
接下來能夠編寫main.go了,在k8s_customize_controller目錄下建立main.go文件,內容以下,關鍵位置已經加了註釋,就再也不贅述了:
package main import ( "flag" "time" "github.com/golang/glog" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters). // _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" clientset "k8s_customize_controller/pkg/client/clientset/versioned" informers "k8s_customize_controller/pkg/client/informers/externalversions" "k8s_customize_controller/pkg/signals" ) var ( masterURL string kubeconfig string ) func main() { flag.Parse() // 處理信號量 stopCh := signals.SetupSignalHandler() // 處理入參 cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig) if err != nil { glog.Fatalf("Error building kubeconfig: %s", err.Error()) } kubeClient, err := kubernetes.NewForConfig(cfg) if err != nil { glog.Fatalf("Error building kubernetes clientset: %s", err.Error()) } studentClient, err := clientset.NewForConfig(cfg) if err != nil { glog.Fatalf("Error building example clientset: %s", err.Error()) } studentInformerFactory := informers.NewSharedInformerFactory(studentClient, time.Second*30) //獲得controller controller := NewController(kubeClient, studentClient, studentInformerFactory.Bolingcavalry().V1().Students()) //啓動informer go studentInformerFactory.Start(stopCh) //controller開始處理消息 if err = controller.Run(2, stopCh); err != nil { glog.Fatalf("Error running controller: %s", err.Error()) } } func init() { flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") }
至此,全部代碼已經編寫完畢,接下來是編譯構建
在$GOPATH/src/k8s_customize_controller目錄下,執行如下命令:
go get k8s.io/client-go/kubernetes/scheme \ && go get github.com/golang/glog \ && go get k8s.io/kube-openapi/pkg/util/proto \ && go get k8s.io/utils/buffer \ && go get k8s.io/utils/integer \ && go get k8s.io/utils/trace
上述腳本將編譯過程當中依賴的庫經過go get方式進行獲取,屬於笨辦法,更好的方法是選用一種包依賴工具,具體的能夠參照k8s的官方demo,這個代碼中同時提供了godep和vendor兩種方式來處理上面的包依賴問題,地址是:https
解決了包依賴問題後,在$GOPATH/src/k8s_customize_controller目錄下執行命令go build,便可在當前目錄生成k8s_customize_controller文件;
將文件k8s_customize_controller複製到k8s環境中,記得經過chmod a+x命令給其可執行權限;
執行命令./k8s_customize_controller -kubeconfig=$HOME/.kube/config -alsologtostderr=true,會當即啓動controller
如今小結一下自定義controller開發的整個過程:
建立CRD(Custom Resource Definition),令k8s明白咱們自定義的API對象;
編寫代碼,將CRD的狀況寫入對應的代碼中,而後經過自動代碼生成工具,將controller以外的informer,client等內容較爲固定的代碼經過工具生成;
編寫controller,在裏面判斷實際狀況是否達到了API對象的聲明狀況,若是未達到,就要進行實際業務處理,而這也是controller的通用作法;
實際編碼過程並不負載,動手編寫的文件以下:
├── controller.go ├── main.go └── pkg ├── apis │ └── bolingcavalry │ ├── register.go │ └── v1 │ ├── doc.go │ ├── register.go │ └── types.go └── signals ├── signal.go ├── signal_posix.go └── signal_windows.go
原文連接:https://blog.csdn.net/boling_cavalry/article/details/88924194