kube-controller-manager
基本流程:
一、構造
二、配置
三、初始化
四、執行
![image.png image.png](http://static.javashuo.com/static/loading.gif)
入口函數:/cmd/kube-controller-manager/controller-manager.go
func main() {
rand.Seed(time.Now().UnixNano())
//構造,配置,初始化command
command := app.NewControllerManagerCommand()
logs.InitLogs()
defer logs.FlushLogs()
//執行
if err := command.Execute(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}
構造執行器: /cmd/kube-controller-manager/app/controllermanager.go
func NewControllerManagerCommand() *cobra.Command {
//初始化Controller-manager的配置選項結構
s, err := options.NewKubeControllerManagerOptions()
...
//建立執行命令結構
cmd := &cobra.Command{
Use: "kube-controller-manager",
Long: `The Kubernetes controller manager is a daemon that embeds...'
//獲取全部控制器
c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())
...
}
//返回執行器
return cmd;
}
進入執行:/cmd/kube-controller-manager/app/controllermanager.go
func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
...
//初始化controller manager 的HTTP服務
var unsecuredMux *mux.PathRecorderMux
if c.SecureServing != nil {
...
//構造run的執行體
run := func(stop <-chan struct{}) {
...
//若是隻是單節點,直接運行run if !c.ComponentConfig.GenericComponent.LeaderElection.LeaderElect {
run(wait.NeverStop)
panic("unreachable")
}
//非單點,選主後執行run
//進行選主,並在選爲主節點後執行run
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
...
//選主完成後執行
OnStartedLeading: run,
...
}
run的執行體:/cmd/kube-controller-manager/app/controllermanager.go >> run()
run := func(stop <-chan struct{}) {
//建立控制器上下文
ctx, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, stop)
if err != nil {
glog.Fatalf("error building controller context: %v", err)
}
saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
//初始化全部控制器
if err := StartControllers(ctx, saTokenControllerInitFunc, NewControllerInitializers(ctx.LoopMode)); err != nil {
glog.Fatalf("error starting controllers: %v", err)
}
//啓動監聽資源的事件
ctx.InformerFactory.Start(ctx.Stop)
close(ctx.InformersStarted)
select {}
}
選主流程:/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go
//選主主要有client-go工具類完成,選擇configmap/endpoint來建立資源,哪一個執行單元建立成功了此資源即可得到鎖,鎖信息存儲在此configmap/endpoint中,選主代碼以下
func New(lockType string, ns string, name string, client corev1.CoreV1Interface, rlc ResourceLockConfig) (Interface, error) {
switch lockType {
case EndpointsResourceLock:
return &EndpointsLock{
EndpointsMeta: metav1.ObjectMeta{
Namespace: ns,
Name: name,
},
Client: client,
LockConfig: rlc,
}, nil
case ConfigMapsResourceLock:
return &ConfigMapLock{
ConfigMapMeta: metav1.ObjectMeta{
Namespace: ns,
Name: name,
},
Client: client,
LockConfig: rlc,
}, nil
default:
return nil, fmt.Errorf("Invalid lock-type %s", lockType)
}
}
初始化全部控制器:/cmd/kube-controller-manager/app/controllermanager.go
func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc) error {
···
//遍歷全部的controller list
for controllerName, initFn := range controllers {
if !ctx.IsControllerEnabled(controllerName) {
glog.Warningf("%q is disabled", controllerName)
continue
}
time.Sleep(wait.Jitter(ctx.ComponentConfig.GenericComponent.ControllerStartInterval.Duration, ControllerStartJitter))
glog.V(1).Infof("Starting %q", controllerName)
//執行每一個controller的初始化函數
started, err := initFn(ctx)
···
}
return nil
}
建立控制器上下文:/cmd/kube-controller-manager/app/controllermanager.go
func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) (ControllerContext, error) {
//拿到對apiServer資源的操做的句柄
versionedClient := rootClientBuilder.ClientOrDie("shared-informers")
sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)())
//確認api Server的健康(最多等待的時間爲10s),再獲取鏈接
if err := genericcontrollermanager.WaitForAPIServer(versionedClient, 10*time.Second); err != nil {
return ControllerContext{}, fmt.Errorf("failed to wait for apiserver being healthy: %v", err)
}
//建立並返回controllerContext
ctx := ControllerContext{
ClientBuilder: clientBuilder,
InformerFactory: sharedInformers,
...
}
return ctx,nil
}
kube-scheduler
基本流程
一、初始化配置
二、構造
三、從隊列中獲取pod
四、進行綁定
![clipboard.png clipboard.png](http://static.javashuo.com/static/loading.gif)
入口函數:/cmd/kube-scheduler/scheduler.go
func main() {
rand.Seed(time.Now().UnixNano())
//構造
command := app.NewSchedulerCommand()
pflag.CommandLine.SetNormalizeFunc(utilflag.WordSepNormalizeFunc)
// utilflag.InitFlags()
logs.InitLogs()
defer logs.FlushLogs()
//執行
if err := command.Execute(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}
註冊調度策略:pkg/scheduler/algorithmprovider/defaults/defaults.go
func registerAlgorithmProvider(predSet, priSet sets.String) {
// Registers algorithm providers. By default we use 'DefaultProvider', but user can specify one to be used
// by specifying flag.
factory.RegisterAlgorithmProvider(factory.DefaultProvider, predSet, priSet)
// Cluster autoscaler friendly scheduling algorithm.
factory.RegisterAlgorithmProvider(ClusterAutoscalerProvider, predSet,
copyAndReplace(priSet, priorities.LeastRequestedPriority, priorities.MostRequestedPriority))
}
從組件入口:/cmd/kube-scheduler/app/server.go
func NewSchedulerCommand() *cobra.Command {
//初始化默認的參數
opts, err := options.NewOptions()
//構造執行命令對象
cmd := &cobra.Command{
Use: "kube-scheduler",
Long: `The Kubernetes ······`,
Run: func(cmd *cobra.Command, args []string) {
...
}
//讀取配置參數
opts.AddFlags(cmd.Flags())
cmd.MarkFlagFilename("config", "yaml", "yml", "json")
return cmd
}
啓動:/cmd/kube-scheduler/app/server.go
func Run(c schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
//設置調度算法
algorithmprovider.ApplyFeatureGates()
...
//初始化schedulerConfig
schedulerConfig, err := NewSchedulerConfig(c)
//建立Scheduler對象
sched := scheduler.NewFromConfig(schedulerConfig)
// 進行健康檢查
if c.InsecureServing != nil {
...
//是否須要選主
if c.LeaderElection != nil {
...
//執行調度任務
run(stopCh)
}
執行:/cmd/kube-scheduler/app/server.go
//開始執行調度任務
func (sched *Scheduler) Run() {
if !sched.config.WaitForCacheSync() {
return
}
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
go sched.config.VolumeBinder.Run(sched.bindVolumesWorker, sched.config.StopEverything)
}
//串行執行調度任務
go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}
調度pod邏輯:/cmd/kube-scheduler/scheduler.go
func (sched *Scheduler) scheduleOne() {
//從隊列中獲取pod
pod := sched.config.NextPod()
...
//給獲取到的pod調度到合適的位置
suggestedHost, err := sched.schedule(pod)
...
//在緩存中預先綁定主機(調用apiserver的延時問題)
assumedPod := pod.DeepCopy()
...
//經過apiserver的client進行綁定
go func() {
err := sched.bind(assumedPod, &v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID}
...
}
尋找合適的節點:/pkg/scheduler/core/generic_scheduler.go
func (sched *Scheduler) scheduleOne() {
//從隊列中獲取pod
pod := sched.config.NextPod()
...
//給獲取到的pod調度到合適的位置
suggestedHost, err := sched.schedule(pod)
...
//在緩存中預先綁定主機(調用apiserver的延時問題)
assumedPod := pod.DeepCopy()
...
//經過apiserver的client進行綁定
go func() {
err := sched.bind(assumedPod, &v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID}
...
}