kubernetes version: v1.3.0node
在Kubernetes急羣衆,在每一個Node節點上都會啓動一個kubelet服務進程。該進程用於處理Master節點下發到本節點的任務,管理Pod及Pod中的容器。每一個Kubelet進程會在APIServer上註冊節點自身信息,按期向Master節點彙報節點資源的使用狀況,並經過cAdvise監控容器和節點資源。git
type KubeletConfiguration struct { // kubelet的參數配置文件 Config string `json:"config"` // kubelet支持三種源數據: // 1. ApiServer: kubelet經過ApiServer監聽etcd目錄,同步Pod清單 // 2. file: 經過kubelet啓動參數"--config"指定配置文件目錄下的文件 // 3. http URL: 經過"--manifest-url"參數設置 // 因此下面會有三種同步的頻率配置 // 同步容器和配置的頻率。 SyncFrequency unversioned.Duration `json:"syncFrequency"` // 文件檢查頻率 FileCheckFrequency unversioned.Duration `json:"fileCheckFrequency"` // Http模式檢查頻率 HTTPCheckFrequency unversioned.Duration `json:"httpCheckFrequency"` // 該參數設置HTTP模式下的endpoint ManifestURL string `json:"manifestURL"` ManifestURLHeader string `json:"manifestURLHeader"` // 是否須要開啓kubelet Server,就是指下列的10250端口 EnableServer bool `json:"enableServer"` // kubelet服務地址 Address string `json:"address"` // kubelet服務端口,默認10250 // 別的服務端口以下: // -->Scheduler服務端口:10251 // -->ControllerManagerPort: 10252 Port uint `json:"port"` // kubelet服務的只讀端口,沒有任何認證(0:disable)。默認爲10255 // 該功能只要配置端口,就一定開啓服務 ReadOnlyPort uint `json:"readOnlyPort"` // 證書相關: TLSCertFile string `json:"tLSCertFile"` TLSPrivateKeyFile string `json:"tLSPrivateKeyFile"` CertDirectory string `json:"certDirectory"` // 用於識別kubelet的hostname,代替實際的hostname HostnameOverride string `json:"hostnameOverride"` // 指定建立Pod時的基礎鏡像 PodInfraContainerImage string `json:"podInfraContainerImage"` // 配置kubelet須要交互的docker的endpoint // 好比:unix:///var/run/docker.sock, 這個是默認的Linux配置 DockerEndpoint string `json:"dockerEndpoint"` // kubelet的volume、mounts、配置目錄路徑 // 默認是/var/lib/kubelet RootDirectory string `json:"rootDirectory"` SeccompProfileRoot string `json:"seccompProfileRoot"` // 是否容許root權限 AllowPrivileged bool `json:"allowPrivileged"` // kubelet容許pods使用的資源:主機的Network、PID、IPC // 默認都是kubetypes.AllSource,即全部資源"*" HostNetworkSources string `json:"hostNetworkSources"` HostPIDSources string `json:"hostPIDSources"` HostIPCSources string `json:"hostIPCSources"` // 限制從鏡像倉庫拉取鏡像的速度, 0:unlimited; 5.0: default RegistryPullQPS float64 `json:"registryPullQPS"` // 從鏡像倉庫拉取鏡像容許產生的爆發值 RegistryBurst int32 `json:"registryBurst"` // 限制每秒產生的events最大數量 EventRecordQPS float32 `json:"eventRecordQPS"` // 容許產生events的爆發值 EventBurst int32 `json:"eventBurst"` // 使能debug模式,進行log收集和本地容許容器和命令 EnableDebuggingHandlers bool `json:"enableDebuggingHandlers"` // 容器被回收以前存在的最小時間,在這時間以前是不容許被回收的 MinimumGCAge unversioned.Duration `json:"minimumGCAge"` // Pod中容許存在Container的最大數量,默認是2 MaxPerPodContainerCount int32 `json:"maxPerPodContainerCount"` // 該節點上容許存在的最大container數量,默認是240 MaxContainerCount int32 `json:"maxContainerCount"` // cAdvisor服務端口,默認是4194 CAdvisorPort uint `json:"cAdvisorPort"` // 健康檢測端口,默認是10248 HealthzPort int32 `json:"healthzPort"` // 健康檢測綁定地址,默認是「127.0.0.1」 HealthzBindAddress string `json:"healthzBindAddress"` // kubelet進程的oom-score-adj值,範圍:[-1000, 1000] OOMScoreAdj int32 `json:"oomScoreAdj"` // 是否自動向Apiserver註冊 RegisterNode bool `json:"registerNode"` ClusterDomain string `json:"clusterDomain"` MasterServiceNamespace string `json:"masterServiceNamespace"` // 集羣DNS的IP,kubelet將配置全部的containers去使用該DNS ClusterDNS string `json:"clusterDNS"` // 流鏈接的超時時間 StreamingConnectionIdleTimeout unversioned.Duration `json:"streamingConnectionIdleTimeout"` // Node狀態更新頻率,該值須要和nodeController中的nodeMonitorGracePeriod一塊兒做用 // 設置kubelet每隔多少時間向APIServer彙報節點狀態,默認爲10s NodeStatusUpdateFrequency unversioned.Duration `json:"nodeStatusUpdateFrequency"` // 設置鏡像被回收以前存在的最短期,在這時間以前是不會被回收 ImageMinimumGCAge unversioned.Duration `json:"imageMinimumGCAge"` // 磁盤佔用率超過該值後,鏡像垃圾回收進程將一直運行 ImageGCHighThresholdPercent int32 `json:"imageGCHighThresholdPercent"` // 磁盤佔用率低於該值,鏡像垃圾回收進程將不運行 ImageGCLowThresholdPercent int32 `json:"imageGCLowThresholdPercent"` // 磁盤空間的保留大小,當低於該值時,Pods將不能再建立 LowDiskSpaceThresholdMB int32 `json:"lowDiskSpaceThresholdMB"` // 計算全部Pods和緩存容量的磁盤使用狀況的頻率 VolumeStatsAggPeriod unversioned.Duration `json:"volumeStatsAggPeriod"` // Network和volume的插件相關 NetworkPluginName string `json:"networkPluginName"` NetworkPluginDir string `json:"networkPluginDir"` VolumePluginDir string `json:"volumePluginDir"` CloudProvider string `json:"cloudProvider,omitempty"` CloudConfigFile string `json:"cloudConfigFile,omitempty"` // 一個cgroups的名字,用於隔離kubelet ????爲啥要隔離?單節點支持多個kubelet?? KubeletCgroups string `json:"kubeletCgroups,omitempty"` // 用於隔離容器運行時(Docker、Rkt)的cgroups RuntimeCgroups string `json:"runtimeCgroups,omitempty"` SystemCgroups string `json:"systemContainer,omitempty"` CgroupRoot string `json:"cgroupRoot,omitempty"` // ??? ContainerRuntime string `json:"containerRuntime"` // 設置全部的runtime請求的超時時間(如:pull、logs、exec、attach),除了那些長時間運行的任務 RuntimeRequestTimeout unversioned.Duration `json:"runtimeRequestTimeout,omitempty"` // rkt執行文件的路徑 RktPath string `json:"rktPath,omitempty"` // rkt通信端點 RktAPIEndpoint string `json:"rktAPIEndpoint,omitempty"` RktStage1Image string `json:"rktStage1Image,omitempty"` // kubelet文件鎖,用於與別的kubelet進行同步 LockFilePath string `json:"lockFilePath"` ExitOnLockContention bool `json:"exitOnLockContention"` // 基於Node.Spec.PodCIDR來配置網卡cbr0 ConfigureCBR0 bool `json:"configureCbr0"` // 配置網絡模式, promiscuous-bridge、hairpin-veth、none HairpinMode string `json:"hairpinMode"` // 表示該節點已經有監控docker和kubelet的程序 BabysitDaemons bool `json:"babysitDaemons"` // 該kubelet下能運行的最大Pods數量 MaxPods int32 `json:"maxPods"` NvidiaGPUs int32 `json:"nvidiaGPUs"` // 容器命令執行的Handler,經過字符串來配置不一樣的Handler // 可配置:"native" or "nsender",default: "native" DockerExecHandlerName string `json:"dockerExecHandlerName"` // 這個CIDR用於分配Pod IP地址,只做用在standalone模式 PodCIDR string `json:"podCIDR"` // 配置容器的DNS解析文件,默認是"/etc/resolv.conf" ResolverConfig string `json:"resolvConf"` // 使能容器的CPU配額功能 CPUCFSQuota bool `json:"cpuCFSQuota"` // 若是kubelet運行在容器中的話,須要把該值設置爲true // kubelet運行在主機上和容器裏會有差別: // 在主機上的話,寫文件數據沒有什麼限制,直接調用ioutil.WriteFile()接口就OK // 在容器裏的話,若是kubelet要寫數據到它所建立的容器的話,就得使用nsender進入到 // 容器對應的namespace中,而後寫數據 Containerized bool `json:"containerized"` // kubelet進程能夠打開的最大文件數 MaxOpenFiles uint64 `json:"maxOpenFiles"` // 由apiServer指定CIDR ReconcileCIDR bool `json:"reconcileCIDR"` // 指定kubelet將它所在的Node註冊到Apiserver,爲Schedulable RegisterSchedulable bool `json:"registerSchedulable"` // kubelet發送給apiServer的請求的正文類型,default:"application/vnd.kubernetes.protobuf" ContentType string `json:"contentType"` // kubelet和apiServer交互所設定的QPS KubeAPIQPS float32 `json:"kubeAPIQPS"` // kubelet與apiServer交互容許產生的爆發值 KubeAPIBurst int32 `json:"kubeAPIBurst"` // 設置爲true的話,告訴kubelet串行的去pull image SerializeImagePulls bool `json:"serializeImagePulls"` // 使能Flannel網絡來啓動kubelet,該前提是默認Flannel已經啓動了 ExperimentalFlannelOverlay bool `json:"experimentalFlannelOverlay"` // Node可能會出於out-of-disk的狀態(磁盤空間不足),kubelet須要定時查詢node狀態 // 因此該值就是定時查詢的頻率 OutOfDiskTransitionFrequency unversioned.Duration `json:"outOfDiskTransitionFrequency,omitempty"` // kubelet所在節點的IP.若是該值有設置,那麼kubelet會把該值設置到node上 NodeIP string `json:"nodeIP,omitempty"` // 該Node的Labels NodeLabels map[string]string `json:"nodeLabels"` NonMasqueradeCIDR string `json:"nonMasqueradeCIDR"` EnableCustomMetrics bool `json:"enableCustomMetrics"` // 如下幾個都跟回收策略有關,詳細的須要查看代碼實現。 // 用逗號分隔的回收資源的條件表達式 // 參考: https://kubernetes.io/docs/admin/out-of-resource/ EvictionHard string `json:"evictionHard,omitempty"` EvictionSoft string `json:"evictionSoft,omitempty"` EvictionSoftGracePeriod string `json:"evictionSoftGracePeriod,omitempty"` EvictionPressureTransitionPeriod unversioned.Duration `json:"evictionPressureTransitionPeriod,omitempty"` EvictionMaxPodGracePeriod int32 `json:"evictionMaxPodGracePeriod,omitempty"` // 設置每一個核最大的Pods數量 PodsPerCore int32 `json:"podsPerCore"` // 是否使能kubelet attach/detach的功能 EnableControllerAttachDetach bool `json:"enableControllerAttachDetach"` }
main入口: cmd/kubelet/kubelet.go
Main源碼以下:github
func main() { runtime.GOMAXPROCS(runtime.NumCPU()) s := options.NewKubeletServer() s.AddFlags(pflag.CommandLine) flag.InitFlags() util.InitLogs() defer util.FlushLogs() verflag.PrintAndExitIfRequested() if err := app.Run(s, nil); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } }
有看過源碼的同窗,應該會發現kubernetes全部執行程序的入口函數風格都差很少一致。
options.NewKubeletServer(): 建立了一個KubeletServer結構,並進行了默認值的初始化。
接口以下:docker
func NewKubeletServer() *KubeletServer { return &KubeletServer{ ... KubeletConfiguration: componentconfig.KubeletConfiguration{ Address: "0.0.0.0", CAdvisorPort: 4194, VolumeStatsAggPeriod: unversioned.Duration{Duration: time.Minute}, CertDirectory: "/var/run/kubernetes", CgroupRoot: "", CloudProvider: AutoDetectCloudProvider, ConfigureCBR0: false, ContainerRuntime: "docker", RuntimeRequestTimeout: unversioned.Duration{Duration: 2 * time.Minute}, CPUCFSQuota: true, ... }
s.AddFlags(pflag.CommandLine): 該接口用於從kubelet命令行獲取參數。
接口以下:json
func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.Config, "config", s.Config, "Path to the config file or directory of files") fs.DurationVar(&s.SyncFrequency.Duration, "sync-frequency", s.SyncFrequency.Duration, "Max period between synchronizing running containers and config") fs.DurationVar(&s.FileCheckFrequency.Duration, "file-check-frequency", s.FileCheckFrequency.Duration, "Duration between checking config files for new data") ... }
命令行參數獲取完以後,就是進行日誌等的初始化。
verflag.PrintAndExitIfRequested(): 判斷了參數是不是help,是的話直接打印help信息,而後退出。
最後就進入到關鍵函數app.Run(s, nil)。api
Run入口: cmd/kubelet/app/server.go
該接口的代碼很長,其實主要也是作了一些準備工做,先來看下參數配置的過程。
代碼以下:緩存
func run(s *options.KubeletServer, kcfg *KubeletConfig) (err error) { ... // 能夠看到app.Run()進來的時候,kcfg=nil if kcfg == nil { // UnsecuredKubeletConfig()返回一個有效的KubeConfig cfg, err := UnsecuredKubeletConfig(s) if err != nil { return err } kcfg = cfg // 初始化一個Config,用來與APIServer交互 clientConfig, err := CreateAPIServerClientConfig(s) if err == nil { // 用於建立各種client: 核心client、認證client、受權client... kcfg.KubeClient, err = clientset.NewForConfig(clientConfig) // 建立一個events的client // make a separate client for events eventClientConfig := *clientConfig eventClientConfig.QPS = s.EventRecordQPS eventClientConfig.Burst = int(s.EventBurst) kcfg.EventClient, err = clientset.NewForConfig(&eventClientConfig) } ... } // 建立了一個cAdvisor對象,用於獲取各種資源信息 // 其中有部分接口還未支持 if kcfg.CAdvisorInterface == nil { kcfg.CAdvisorInterface, err = cadvisor.New(s.CAdvisorPort, kcfg.ContainerRuntime) if err != nil { return err } } // kubelet的容器管理模塊 if kcfg.ContainerManager == nil { if kcfg.SystemCgroups != "" && kcfg.CgroupRoot == "" { return fmt.Errorf("invalid configuration: system container was specified and cgroup root was not specified") } kcfg.ContainerManager, err = cm.NewContainerManager(kcfg.Mounter, kcfg.CAdvisorInterface, cm.NodeConfig{ RuntimeCgroupsName: kcfg.RuntimeCgroups, SystemCgroupsName: kcfg.SystemCgroups, KubeletCgroupsName: kcfg.KubeletCgroups, ContainerRuntime: kcfg.ContainerRuntime, }) if err != nil { return err } } ... // 配置系統OOM參數 // TODO(vmarmol): Do this through container config. oomAdjuster := kcfg.OOMAdjuster if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil { glog.Warning(err) } // 繼續接下去的kubelet運行步驟 if err := RunKubelet(kcfg); err != nil { return err } // kubelet的監控檢測 if s.HealthzPort > 0 { healthz.DefaultHealthz() go wait.Until(func() { err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), nil) if err != nil { glog.Errorf("Starting health server failed: %v", err) } }, 5*time.Second, wait.NeverStop) } if s.RunOnce { return nil } <-done return nil }
該接口主要準備了一個KubeletConfig結構,調用UnsecuredKubeletConfig()接口進行建立。
而後還建立了一些該結構中的kubeClient、EventClient、CAdvisorInterface、ContainerManager、oomAdjuster等對象。
而後調用了RunKubelet()接口,走接下去的服務運行流程。
最後運行健康檢測服務。網絡
下面挑關鍵的接口進行介紹:app
func UnsecuredKubeletConfig(s *options.KubeletServer) (*KubeletConfig, error) { 。。。 // kubelet可能會以容器的方式部署,須要配置標準輸出 mounter := mount.New() var writer io.Writer = &io.StdWriter{} if s.Containerized { glog.V(2).Info("Running kubelet in containerized mode (experimental)") mounter = mount.NewNsenterMounter() writer = &io.NsenterWriter{} } // 配置kubelet的TLS tlsOptions, err := InitializeTLS(s) if err != nil { return nil, err } // kubelet有兩種部署方式: 直接運行在物理機上,還有一種是經過容器部署。 // 若部署到容器中,就會有namespace隔離的問題,致使kubelet沒法訪問docker容器的 // namespace而且docker exec運行命令。 // 因此這裏會進行判斷,若是運行在容器中的話,就須要用到nsenter,它能夠協助kubelet // 到指定的namespace運行命令。 // nsenter參考資料: https://github.com/jpetazzo/nsenter var dockerExecHandler dockertools.ExecHandler switch s.DockerExecHandlerName { case "native": dockerExecHandler = &dockertools.NativeExecHandler{} case "nsenter": dockerExecHandler = &dockertools.NsenterExecHandler{} default: glog.Warningf("Unknown Docker exec handler %q; defaulting to native", s.DockerExecHandlerName) dockerExecHandler = &dockertools.NativeExecHandler{} } // k8s對image的回收管理策略 // MinAge: 表示鏡像存活的最小時間,只有在這以後才能回收該鏡像 // HighThresholdPercent: 磁盤佔用超過該值後,GC一直開啓 // LowThresholdPercent: 磁盤佔用低於該值的話,GC不開啓 imageGCPolicy := kubelet.ImageGCPolicy{ MinAge: s.ImageMinimumGCAge.Duration, HighThresholdPercent: int(s.ImageGCHighThresholdPercent), LowThresholdPercent: int(s.ImageGCLowThresholdPercent), } // k8s根據磁盤空間配置策略 // DockerFreeDiskMB: 磁盤可用空間低於該值時,pod將沒法再在該節點建立,也是指該磁盤須要保留的空間大小 diskSpacePolicy := kubelet.DiskSpacePolicy{ DockerFreeDiskMB: int(s.LowDiskSpaceThresholdMB), RootFreeDiskMB: int(s.LowDiskSpaceThresholdMB), } 。。。 // k8s v1.3引入的功能。Eviction用於k8s集羣提早感知節點memory/disk負載狀況,來調度資源。 thresholds, err := eviction.ParseThresholdConfig(s.EvictionHard, s.EvictionSoft, s.EvictionSoftGracePeriod) if err != nil { return nil, err } evictionConfig := eviction.Config{ PressureTransitionPeriod: s.EvictionPressureTransitionPeriod.Duration, MaxPodGracePeriodSeconds: int64(s.EvictionMaxPodGracePeriod), Thresholds: thresholds, } // 初始化KubeletConfig結構 return &KubeletConfig{ Address: net.ParseIP(s.Address), AllowPrivileged: s.AllowPrivileged, Auth: nil, // default does not enforce auth[nz] 。。。 }, nil }
這段代碼中,我的以爲有幾個點比較值得了解下:ide
該接口中會涉及到kubelet跑在物理機上仍是容器中。
若是運行在容器中,會存在namespace權限的問題,須要經過nsenter來操做docker容器。
kubelet提供了參數"--docker-exec-handler"(即DockerExecHandlerName),來配置是否使用nsenter.
Nsenter功能能夠了解下。
還有一個kubelet Eviction功能。該功能是k8s v1.3.0新引入的功能,eviction功能就是在節點超負荷以前,提早不讓Pod進行建立,主要就是針對memory和disk。
以前的版本是不會提早感知集羣的節點負荷,當內存吃緊時,k8s只依靠內核的OOM Killer、磁盤按期對image和container進行垃圾回收功能,這樣對於Pod有不肯定性。eviction很好的解決了該問題,能夠在kubelet啓動時指定memory/disk等參數,來保證節點穩定工做,讓集羣提早感知節點負荷。
建立client會有兩步:
調用CreateAPIServerClientConfig()進行Config初始化
調用clientset.NewForConfig()根據以前初始化的Config,建立各種Client。
CreateAPIServerClientConfig()接口以下:
func CreateAPIServerClientConfig(s *options.KubeletServer) (*restclient.Config, error) { // 檢查APIServer是否有配置 if len(s.APIServerList) < 1 { return nil, fmt.Errorf("no api servers specified") } // 檢查是否配置了多個APIServer,新版本已經支持多APIServer的HA // 如今默認是用第一個Server // TODO: adapt Kube client to support LB over several servers if len(s.APIServerList) > 1 { glog.Infof("Multiple api servers specified. Picking first one") } clientConfig, err := createClientConfig(s) if err != nil { return nil, err } clientConfig.ContentType = s.ContentType // Override kubeconfig qps/burst settings from flags clientConfig.QPS = s.KubeAPIQPS clientConfig.Burst = int(s.KubeAPIBurst) addChaosToClientConfig(s, clientConfig) return clientConfig, nil } func createClientConfig(s *options.KubeletServer) (*restclient.Config, error) { if s.KubeConfig.Provided() && s.AuthPath.Provided() { return nil, fmt.Errorf("cannot specify both --kubeconfig and --auth-path") } if s.KubeConfig.Provided() { return kubeconfigClientConfig(s) } if s.AuthPath.Provided() { return authPathClientConfig(s, false) } // Try the kubeconfig default first, falling back to the auth path default. clientConfig, err := kubeconfigClientConfig(s) if err != nil { glog.Warningf("Could not load kubeconfig file %s: %v. Trying auth path instead.", s.KubeConfig, err) return authPathClientConfig(s, true) } return clientConfig, nil } // 就是這邊默認指定了第一個APIServer func kubeconfigClientConfig(s *options.KubeletServer) (*restclient.Config, error) { return clientcmd.NewNonInteractiveDeferredLoadingClientConfig( &clientcmd.ClientConfigLoadingRules{ExplicitPath: s.KubeConfig.Value()}, &clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: s.APIServerList[0]}}).ClientConfig() }
建立Config成功以後,便調用clientset.NewForConfig()建立各種Clients:
func NewForConfig(c *restclient.Config) (*Clientset, error) { // 配置Client鏈接限制 configShallowCopy := *c if configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 { configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst) } var clientset Clientset var err error // 建立核心Client clientset.CoreClient, err = unversionedcore.NewForConfig(&configShallowCopy) if err != nil { return nil, err } // 建立第三方Client clientset.ExtensionsClient, err = unversionedextensions.NewForConfig(&configShallowCopy) if err != nil { return nil, err } // 建立自動伸縮Client clientset.AutoscalingClient, err = unversionedautoscaling.NewForConfig(&configShallowCopy) if err != nil { return nil, err } // 建立批量操做的Client clientset.BatchClient, err = unversionedbatch.NewForConfig(&configShallowCopy) if err != nil { return nil, err } // 建立Rbac Client (RBAC:基於角色的訪問控制) // 跟k8s的認證受權有關,能夠參考: https://kubernetes.io/docs/admin/authorization/ clientset.RbacClient, err = unversionedrbac.NewForConfig(&configShallowCopy) if err != nil { return nil, err } // 建立服務發現Client clientset.DiscoveryClient, err = discovery.NewDiscoveryClientForConfig(&configShallowCopy) if err != nil { glog.Errorf("failed to create the DiscoveryClient: %v", err) return nil, err } return &clientset, nil }
上面的各類客戶端實際就是api rest請求的客戶端。
上面的各種建立及初始化完以後,便進入下一步驟RunKubelet:
func RunKubelet(kcfg *KubeletConfig) error { ... // k8s event對象建立,用於kubelet向APIServer發送管理容器相關的各種events // 後面會單獨介紹k8s events功能,這裏再也不展開細講 eventBroadcaster := record.NewBroadcaster() kcfg.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "kubelet", Host: kcfg.NodeName}) eventBroadcaster.StartLogging(glog.V(3).Infof) if kcfg.EventClient != nil { glog.V(4).Infof("Sending events to api server.") eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kcfg.EventClient.Events("")}) } else { glog.Warning("No api server defined - no events will be sent to API server.") } // 配置capabilities privilegedSources := capabilities.PrivilegedSources{ HostNetworkSources: kcfg.HostNetworkSources, HostPIDSources: kcfg.HostPIDSources, HostIPCSources: kcfg.HostIPCSources, } capabilities.Setup(kcfg.AllowPrivileged, privilegedSources, 0) credentialprovider.SetPreferredDockercfgPath(kcfg.RootDirectory) // 調用CreateAndInitKubelet()接口,進行各種初始化 builder := kcfg.Builder if builder == nil { builder = CreateAndInitKubelet } if kcfg.OSInterface == nil { kcfg.OSInterface = kubecontainer.RealOS{} } k, podCfg, err := builder(kcfg) if err != nil { return fmt.Errorf("failed to create kubelet: %v", err) } // 設置kubelet進程自身最大能打開的文件句柄數 util.ApplyRLimitForSelf(kcfg.MaxOpenFiles) // TODO(dawnchen): remove this once we deprecated old debian containervm images. // This is a workaround for issue: https://github.com/opencontainers/runc/issues/726 // The current chosen number is consistent with most of other os dist. const maxkeysPath = "/proc/sys/kernel/keys/root_maxkeys" const minKeys uint64 = 1000000 key, err := ioutil.ReadFile(maxkeysPath) if err != nil { glog.Errorf("Cannot read keys quota in %s", maxkeysPath) } else { fields := strings.Fields(string(key)) nkey, _ := strconv.ParseUint(fields[0], 10, 64) if nkey < minKeys { glog.Infof("Setting keys quota in %s to %d", maxkeysPath, minKeys) err = ioutil.WriteFile(maxkeysPath, []byte(fmt.Sprintf("%d", uint64(minKeys))), 0644) if err != nil { glog.Warningf("Failed to update %s: %v", maxkeysPath, err) } } } const maxbytesPath = "/proc/sys/kernel/keys/root_maxbytes" const minBytes uint64 = 25000000 bytes, err := ioutil.ReadFile(maxbytesPath) if err != nil { glog.Errorf("Cannot read keys bytes in %s", maxbytesPath) } else { fields := strings.Fields(string(bytes)) nbyte, _ := strconv.ParseUint(fields[0], 10, 64) if nbyte < minBytes { glog.Infof("Setting keys bytes in %s to %d", maxbytesPath, minBytes) err = ioutil.WriteFile(maxbytesPath, []byte(fmt.Sprintf("%d", uint64(minBytes))), 0644) if err != nil { glog.Warningf("Failed to update %s: %v", maxbytesPath, err) } } } // kubelet能夠只運行一次,也能夠做爲一個後臺daemon一直運行 // 一次運行的話,就是Runonce,處理下pods事件而後退出 // 一直運行的話,就是startKubelet() // process pods and exit. if kcfg.Runonce { if _, err := k.RunOnce(podCfg.Updates()); err != nil { return fmt.Errorf("runonce failed: %v", err) } glog.Infof("Started kubelet %s as runonce", version.Get().String()) } else { // 進入關鍵函數startKubelet() startKubelet(k, podCfg, kcfg) glog.Infof("Started kubelet %s", version.Get().String()) } return nil }
該接口中會調用CreateAndInitKubelet()接口再進行初始化,其中又調用了kubelet.NewMainKubelet()接口。
kubelet能夠只運行一次,也能夠後臺一直運行。要一直運行的話就是調用startKubelet()。
咱們先看下初始化接口乾了些什麼?
func CreateAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.PodConfig, err error) { // TODO: block until all sources have delivered at least one update to the channel, or break the sync loop // up into "per source" synchronizations // TODO: KubeletConfig.KubeClient should be a client interface, but client interface misses certain methods // used by kubelet. Since NewMainKubelet expects a client interface, we need to make sure we are not passing // a nil pointer to it when what we really want is a nil interface. var kubeClient clientset.Interface if kc.KubeClient != nil { kubeClient = kc.KubeClient // TODO: remove this when we've refactored kubelet to only use clientset. } // 初始化container GC參數 gcPolicy := kubecontainer.ContainerGCPolicy{ MinAge: kc.MinimumGCAge, MaxPerPodContainer: kc.MaxPerPodContainerCount, MaxContainers: kc.MaxContainerCount, } // 配置kubelet server的端口, default: 10250 daemonEndpoints := &api.NodeDaemonEndpoints{ KubeletEndpoint: api.DaemonEndpoint{Port: int32(kc.Port)}, } // 建立PodConfig pc = kc.PodConfig if pc == nil { // kubelet支持三種數據源: file、HTTP URL、k8s APIServer // 默認是k8s APIServer,這裏還會涉及到cache,能夠深刻學習下具體實現 pc = makePodSourceConfig(kc) } // k, err = kubelet.NewMainKubelet( kc.Hostname, kc.NodeName, kc.DockerClient, kubeClient, 。。。 ) if err != nil { return nil, nil, err } k.BirthCry() k.StartGarbageCollection() return k, pc, nil }
初始化接口中還有一層調用:kubelet.NewMainKubelet(),該接口在1.3中是N多參數,而且函數實現也是很長很長,寫的很是不友好,不過看了下新版本已經重寫過了。咱們仍是拿這個又長又胖的接口,繼續瞭解下:
func NewMainKubelet( hostname string, nodeName string, 。。。 ) (*Kubelet, error) { 。。。 // 建立service的cache.NewStore, 設置service的監聽函數listWatch,並設置對應的反射NewReflector,而後設置serviceLister serviceStore := cache.NewStore(cache.MetaNamespaceKeyFunc) if kubeClient != nil { // TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather // than an interface. There is no way to construct a list+watcher using resource name. listWatch := &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { return kubeClient.Core().Services(api.NamespaceAll).List(options) }, WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return kubeClient.Core().Services(api.NamespaceAll).Watch(options) }, } cache.NewReflector(listWatch, &api.Service{}, serviceStore, 0).Run() } serviceLister := &cache.StoreToServiceLister{Store: serviceStore} // 建立node的cache.NewStore, 設置fieldSelector,設置監聽函數listWatch,設置對應的反射NewReflector,並設置nodeLister,nodeInfo和nodeRef nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc) if kubeClient != nil { // TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather // than an interface. There is no way to construct a list+watcher using resource name. fieldSelector := fields.Set{api.ObjectNameField: nodeName}.AsSelector() listWatch := &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { options.FieldSelector = fieldSelector return kubeClient.Core().Nodes().List(options) }, WatchFunc: func(options api.ListOptions) (watch.Interface, error) { options.FieldSelector = fieldSelector return kubeClient.Core().Nodes().Watch(options) }, } cache.NewReflector(listWatch, &api.Node{}, nodeStore, 0).Run() } nodeLister := &cache.StoreToNodeLister{Store: nodeStore} nodeInfo := &predicates.CachedNodeInfo{StoreToNodeLister: nodeLister} // TODO: get the real node object of ourself, // and use the real node name and UID. // TODO: what is namespace for node? nodeRef := &api.ObjectReference{ Kind: "Node", Name: nodeName, UID: types.UID(nodeName), Namespace: "", } // 建立磁盤空間管理對象,該對象須要使用cAdvisor的接口來獲取磁盤相關信息 // 最後一個參數即是配置磁盤管理的Policy diskSpaceManager, err := newDiskSpaceManager(cadvisorInterface, diskSpacePolicy) if err != nil { return nil, fmt.Errorf("failed to initialize disk manager: %v", err) } // 建立一個空的container reference manager對象 containerRefManager := kubecontainer.NewRefManager() // 建立OOM 監控對象,使用cAdvisor接口監控內存,並使用event recorder上報oom事件 oomWatcher := NewOOMWatcher(cadvisorInterface, recorder) // TODO: remove when internal cbr0 implementation gets removed in favor // of the kubenet network plugin if networkPluginName == "kubenet" { configureCBR0 = false flannelExperimentalOverlay = false } // 初始化Kubelet klet := &Kubelet{ hostname: hostname, nodeName: nodeName, 。。。 } ... procFs := procfs.NewProcFS() imageBackOff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff) klet.livenessManager = proberesults.NewManager() // 初始化pod的cache和manager對象 klet.podCache = kubecontainer.NewCache() klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient)) // 初始化Docker container Runtime switch containerRuntime { case "docker": // dockerClient就是以後會介紹,就是kubelet用於操做docker的client // recorder: 即以前建立的event recorder // 還會有各種物理機信息,pull images的QPS等等參數 // 具體能夠了解下DockerManager結構 // Only supported one for now, continue. klet.containerRuntime = dockertools.NewDockerManager( dockerClient, kubecontainer.FilterEventRecorder(recorder), klet.livenessManager, containerRefManager, klet.podManager, machineInfo, podInfraContainerImage, pullQPS, pullBurst, containerLogsDir, osInterface, klet.networkPlugin, klet, klet.httpClient, dockerExecHandler, oomAdjuster, procFs, klet.cpuCFSQuota, imageBackOff, serializeImagePulls, enableCustomMetrics, klet.hairpinMode == componentconfig.HairpinVeth, seccompProfileRoot, containerRuntimeOptions..., ) case "rkt": ... default: return nil, fmt.Errorf("unsupported container runtime %q specified", containerRuntime) } ... // 設置containerGC containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy) if err != nil { return nil, err } klet.containerGC = containerGC // 設置imageManager imageManager, err := newImageManager(klet.containerRuntime, cadvisorInterface, recorder, nodeRef, imageGCPolicy) if err != nil { return nil, fmt.Errorf("failed to initialize image manager: %v", err) } klet.imageManager = imageManager klet.runner = klet.containerRuntime // 設置statusManager klet.statusManager = status.NewManager(kubeClient, klet.podManager) // 設置probeManager klet.probeManager = prober.NewManager( klet.statusManager, klet.livenessManager, klet.runner, containerRefManager, recorder) klet.volumePluginMgr, err = NewInitializedVolumePluginMgr(klet, volumePlugins) if err != nil { return nil, err } // 設置volumeManager klet.volumeManager, err = kubeletvolume.NewVolumeManager( enableControllerAttachDetach, hostname, klet.podManager, klet.kubeClient, klet.volumePluginMgr, klet.containerRuntime) // 建立runtime Cache對象 runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime) if err != nil { return nil, err } klet.runtimeCache = runtimeCache klet.reasonCache = NewReasonCache() klet.workQueue = queue.NewBasicWorkQueue(klet.clock) // 建立podWorkers對象,這個比較關鍵,後面會單獨介紹 klet.podWorkers = newPodWorkers(klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache) klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff) klet.podKillingCh = make(chan *kubecontainer.PodPair, podKillingChannelCapacity) klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs() // 設置eviction manager evictionManager, evictionAdmitHandler, err := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers), recorder, nodeRef, klet.clock) if err != nil { return nil, fmt.Errorf("failed to initialize eviction manager: %v", err) } klet.evictionManager = evictionManager klet.AddPodAdmitHandler(evictionAdmitHandler) // apply functional Option's for _, opt := range kubeOptions { opt(klet) } return klet, nil }
該接口中,會建立podWorkers,該對象比較重要,跟pod的實際操做有關,後面會單獨進行介紹。這裏先只點到爲止。
咱們回想下整個流程就會發現,cmd/kubelet/app主要就是作一些簡單的參數處理,具體的初始化都是在pkg/kubelet中作的。
看完初始化,咱們要進入真正運行的接口startKubelet():
func startKubelet(k KubeletBootstrap, podCfg *config.PodConfig, kc *KubeletConfig) { // 這裏是真正的啓動kubelet go wait.Until(func() { k.Run(podCfg.Updates()) }, 0, wait.NeverStop) // 這裏是開啓kubelet Server,便於調用kubelet的API進行操做 if kc.EnableServer { go wait.Until(func() { k.ListenAndServe(kc.Address, kc.Port, kc.TLSOptions, kc.Auth, kc.EnableDebuggingHandlers) }, 0, wait.NeverStop) } // 該處是開啓kubelet的只讀服務,端口是10255 if kc.ReadOnlyPort > 0 { go wait.Until(func() { k.ListenAndServeReadOnly(kc.Address, kc.ReadOnlyPort) }, 0, wait.NeverStop) } }
繼續深刻,進入到真正啓動kubelet的接口k.Run(),這個裏的k是個KubeletBootstrap類型的interface,實際對象是由CreateAndInitKubelet()接口返回的Kubelet對象,因此Run()實現能夠查看該對象的實現。
具體實現路徑:pkg/kubelet/kubelet.go,接口以下:
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { // 開啓日誌服務 if kl.logServer == nil { kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/"))) } if kl.kubeClient == nil { glog.Warning("No api server defined - no node status update will be sent.") } // init modulers,如imageManager、containerManager、oomWathcer、resourceAnalyzer if err := kl.initializeModules(); err != nil { kl.recorder.Eventf(kl.nodeRef, api.EventTypeWarning, kubecontainer.KubeletSetupFailed, err.Error()) glog.Error(err) kl.runtimeState.setInitError(err) } // Start volume manager go kl.volumeManager.Run(wait.NeverStop) // 起協程,定時向APIServer更新node status if kl.kubeClient != nil { // Start syncing node status immediately, this may set up things the runtime needs to run. go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop) } // 起協程,定時同步網絡狀態 go wait.Until(kl.syncNetworkStatus, 30*time.Second, wait.NeverStop) go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop) // Start a goroutine responsible for killing pods (that are not properly // handled by pod workers). // 起協程,定時處理那些被killing pods go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop) // Start component sync loops. kl.statusManager.Start() kl.probeManager.Start() // 啓動evictionManager kl.evictionManager.Start(kl.getActivePods, evictionMonitoringPeriod) // Start the pod lifecycle event generator. kl.pleg.Start() // 開啓pods事件,用於處理APIServer下發的任務,updates是一個管道 kl.syncLoop(updates, kl) } func (kl *Kubelet) initializeModules() error { // Step 1: Promethues metrics. metrics.Register(kl.runtimeCache) // Step 2: Setup filesystem directories. if err := kl.setupDataDirs(); err != nil { return err } // Step 3: If the container logs directory does not exist, create it. if _, err := os.Stat(containerLogsDir); err != nil { if err := kl.os.MkdirAll(containerLogsDir, 0755); err != nil { glog.Errorf("Failed to create directory %q: %v", containerLogsDir, err) } } // Step 4: Start the image manager. if err := kl.imageManager.Start(); err != nil { return fmt.Errorf("Failed to start ImageManager, images may not be garbage collected: %v", err) } // Step 5: Start container manager. if err := kl.containerManager.Start(); err != nil { return fmt.Errorf("Failed to start ContainerManager %v", err) } // Step 6: Start out of memory watcher. if err := kl.oomWatcher.Start(kl.nodeRef); err != nil { return fmt.Errorf("Failed to start OOM watcher %v", err) } // Step 7: Start resource analyzer kl.resourceAnalyzer.Start() return nil }
到這裏基本就結束了,學習源碼的過程當中會發現不少點值得深刻研究,好比:
dockerclient
podWorkers
podManager
cAdvisor
containerGC
imageManager
diskSpaceManager
statusManager
volumeManager
containerRuntime
kubelet cache
events recorder
Eviction Manager
kubelet如何收到APIServer任務,建立pod的流程
等等。。
後面會繼續挑一些關鍵點進行分析。