xxx

kube-controller-manager

基本流程:
一、構造
二、配置
三、初始化
四、執行

image.png

入口函數:/cmd/kube-controller-manager/controller-manager.go

func main() {

    rand.Seed(time.Now().UnixNano())
    
    //構造,配置,初始化command
    command := app.NewControllerManagerCommand()

    logs.InitLogs()
    defer logs.FlushLogs()

    //執行
    if err := command.Execute(); err != nil {
        fmt.Fprintf(os.Stderr, "%v\n", err)
        os.Exit(1)
    }

}

構造執行器: /cmd/kube-controller-manager/app/controllermanager.go

func NewControllerManagerCommand() *cobra.Command {

    //初始化Controller-manager的配置選項結構
    s, err := options.NewKubeControllerManagerOptions()

    ...

    //建立執行命令結構
    cmd := &cobra.Command{
        Use: "kube-controller-manager",
        Long: `The Kubernetes controller manager is a daemon that embeds...'
        //獲取全部控制器
        c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())
        ...
    }


    //返回執行器
    return cmd;
}

進入執行:/cmd/kube-controller-manager/app/controllermanager.go

func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
    ...
    //初始化controller manager 的HTTP服務
    var unsecuredMux *mux.PathRecorderMux
    if c.SecureServing != nil {
    ...
    //構造run的執行體
    run := func(stop <-chan struct{}) {
    ...
     //若是隻是單節點,直接運行run    if !c.ComponentConfig.GenericComponent.LeaderElection.LeaderElect {
        run(wait.NeverStop)
        panic("unreachable")
    }
    //非單點,選主後執行run
    //進行選主,並在選爲主節點後執行run
    leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
    ...
    //選主完成後執行
    OnStartedLeading: run,
    ...
}

run的執行體:/cmd/kube-controller-manager/app/controllermanager.go >> run()

run := func(stop <-chan struct{}) {
     
    //建立控制器上下文
    ctx, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, stop)
    if err != nil {
    glog.Fatalf("error building controller context: %v", err)
            }
    saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder:     rootClientBuilder}.startServiceAccountTokenController

    //初始化全部控制器
    if err := StartControllers(ctx, saTokenControllerInitFunc, NewControllerInitializers(ctx.LoopMode)); err != nil {
    glog.Fatalf("error starting controllers: %v", err)
    }

    //啓動監聽資源的事件
            ctx.InformerFactory.Start(ctx.Stop)
            close(ctx.InformersStarted)

            select {}
    }

選主流程:/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go

//選主主要有client-go工具類完成,選擇configmap/endpoint來建立資源,哪一個執行單元建立成功了此資源即可得到鎖,鎖信息存儲在此configmap/endpoint中,選主代碼以下
func New(lockType string, ns string, name string, client corev1.CoreV1Interface, rlc ResourceLockConfig) (Interface, error) {
    switch lockType {
    case EndpointsResourceLock:
        return &EndpointsLock{
            EndpointsMeta: metav1.ObjectMeta{
                Namespace: ns,
                Name:      name,
            },
            Client:     client,
            LockConfig: rlc,
        }, nil
    case ConfigMapsResourceLock:
        return &ConfigMapLock{
            ConfigMapMeta: metav1.ObjectMeta{
                Namespace: ns,
                Name:      name,
            },
            Client:     client,
            LockConfig: rlc,
        }, nil
    default:
        return nil, fmt.Errorf("Invalid lock-type %s", lockType)
    }
}

初始化全部控制器:/cmd/kube-controller-manager/app/controllermanager.go

func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc) error {
    ···
    //遍歷全部的controller list
    for controllerName, initFn := range controllers {
        if !ctx.IsControllerEnabled(controllerName) {
        glog.Warningf("%q is disabled", controllerName)
        continue
    }
    time.Sleep(wait.Jitter(ctx.ComponentConfig.GenericComponent.ControllerStartInterval.Duration, ControllerStartJitter))
    glog.V(1).Infof("Starting %q", controllerName)
    //執行每一個controller的初始化函數
    started, err := initFn(ctx)
    ···
    }

    return nil
}

建立控制器上下文:/cmd/kube-controller-manager/app/controllermanager.go

func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) (ControllerContext, error) {
    //拿到對apiServer資源的操做的句柄
    versionedClient := rootClientBuilder.ClientOrDie("shared-informers")
    sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)())

    //確認api Server的健康(最多等待的時間爲10s),再獲取鏈接
    if err := genericcontrollermanager.WaitForAPIServer(versionedClient, 10*time.Second); err != nil {
    return ControllerContext{}, fmt.Errorf("failed to wait for apiserver being healthy: %v", err)
    }
    //建立並返回controllerContext
    ctx := ControllerContext{
        ClientBuilder:      clientBuilder,
        InformerFactory:    sharedInformers,
        ...
    }

    
    return ctx,nil
}

kube-scheduler

基本流程
一、初始化配置
二、構造
三、從隊列中獲取pod
四、進行綁定

clipboard.png

入口函數:/cmd/kube-scheduler/scheduler.go

func main() {
    rand.Seed(time.Now().UnixNano())
    //構造
    command := app.NewSchedulerCommand()
    pflag.CommandLine.SetNormalizeFunc(utilflag.WordSepNormalizeFunc)
    // utilflag.InitFlags()
    logs.InitLogs()
    defer logs.FlushLogs()
    //執行
    if err := command.Execute(); err != nil {
        fmt.Fprintf(os.Stderr, "%v\n", err)
        os.Exit(1)
    }
}

註冊調度策略:pkg/scheduler/algorithmprovider/defaults/defaults.go

func registerAlgorithmProvider(predSet, priSet sets.String) {
    // Registers algorithm providers. By default we use 'DefaultProvider', but user can specify one to be used
    // by specifying flag.
    factory.RegisterAlgorithmProvider(factory.DefaultProvider, predSet, priSet)
    // Cluster autoscaler friendly scheduling algorithm.
    factory.RegisterAlgorithmProvider(ClusterAutoscalerProvider, predSet,
        copyAndReplace(priSet, priorities.LeastRequestedPriority, priorities.MostRequestedPriority))
}

從組件入口:/cmd/kube-scheduler/app/server.go

func NewSchedulerCommand() *cobra.Command {
    //初始化默認的參數
    opts, err := options.NewOptions()
    
    //構造執行命令對象
    cmd := &cobra.Command{
    Use: "kube-scheduler",
    Long: `The Kubernetes ······`,
    Run: func(cmd *cobra.Command, args []string) {
    ...
    }
    //讀取配置參數
    opts.AddFlags(cmd.Flags())
    cmd.MarkFlagFilename("config", "yaml", "yml", "json")

    return cmd
}

啓動:/cmd/kube-scheduler/app/server.go

func Run(c schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
    //設置調度算法
    algorithmprovider.ApplyFeatureGates()
    ...
    //初始化schedulerConfig
    schedulerConfig, err := NewSchedulerConfig(c)

    //建立Scheduler對象
    sched := scheduler.NewFromConfig(schedulerConfig)
    
    // 進行健康檢查
    if c.InsecureServing != nil {    
    ...
    //是否須要選主
    if c.LeaderElection != nil {
    ...
    //執行調度任務
    run(stopCh)

}

執行:/cmd/kube-scheduler/app/server.go

//開始執行調度任務
func (sched *Scheduler) Run() {
    if !sched.config.WaitForCacheSync() {
    return
    }

    if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
        go sched.config.VolumeBinder.Run(sched.bindVolumesWorker, sched.config.StopEverything)
    }

    //串行執行調度任務
    go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}

調度pod邏輯:/cmd/kube-scheduler/scheduler.go

func (sched *Scheduler) scheduleOne() {
    //從隊列中獲取pod
    pod := sched.config.NextPod()
    ...
    //給獲取到的pod調度到合適的位置
     suggestedHost, err := sched.schedule(pod)
    ...
    //在緩存中預先綁定主機(調用apiserver的延時問題)
     assumedPod := pod.DeepCopy()
    ...
    //經過apiserver的client進行綁定
    go func() {
    err := sched.bind(assumedPod, &v1.Binding{
                   ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name:     assumedPod.Name, UID: assumedPod.UID}
    ...
}

尋找合適的節點:/pkg/scheduler/core/generic_scheduler.go

func (sched *Scheduler) scheduleOne() {
    //從隊列中獲取pod
    pod := sched.config.NextPod()
    ...
    //給獲取到的pod調度到合適的位置
     suggestedHost, err := sched.schedule(pod)
    ...
    //在緩存中預先綁定主機(調用apiserver的延時問題)
     assumedPod := pod.DeepCopy()
    ...
    //經過apiserver的client進行綁定
    go func() {
    err := sched.bind(assumedPod, &v1.Binding{
                   ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name:     assumedPod.Name, UID: assumedPod.UID}
    ...
}
本站公眾號
   歡迎關注本站公眾號,獲取更多信息