k8s原生並不支持就地升級。諸如deployment等工做負載在升級的過程當中,直接對Pod進行recreate
。git
實現容器的就地升級的另一個前提是kubelet經過容器hash 來管理容器版本。github
當建立一個Pod的時候,kubelet會計算每一個容器的hash,而且把該hash值寫到ContainerStatus
中,該status的定義以下:json
// Status represents the status of a container. type Status struct { // ID of the container. ID ContainerID // Name of the container. Name string // Status of the container. State State // Creation time of the container. CreatedAt time.Time // Start time of the container. StartedAt time.Time // Finish time of the container. FinishedAt time.Time // Exit code of the container. ExitCode int // Name of the image, this also includes the tag of the image, // the expected form is "NAME:TAG". Image string // ID of the image. ImageID string // Hash of the container, used for comparison. Hash uint64 // Number of times that the container has been restarted. RestartCount int // A string explains why container is in such a status. Reason string // Message written by the container before exiting (stored in // TerminationMessagePath). Message string }
其中的Hash
filed 正是咱們今天講到的hash。less
若是spec.containers[i].image 發生變化以後,kubelet 經過計算hash值,判斷出容器版本已經變化,須要執行升級操做,拉取新的image,重啓該容器,重啓成功以後,會把新的hash寫到上面講的ContainerStatus中。ui
kubelet 源碼中提供了一個計算容器hash的輔助方法:this
// HashContainer returns the hash of the container. It is used to compare // the running container with its desired spec. // Note: remember to update hashValues in container_hash_test.go as well. func HashContainer(container *v1.Container) uint64 { hash := fnv.New32a() // Omit nil or empty field when calculating hash value // Please see https://github.com/kubernetes/kubernetes/issues/53644 containerJSON, _ := json.Marshal(container) hashutil.DeepHashObject(hash, containerJSON) return uint64(hash.Sum32()) }
該方法,實現了容器hash的計算。spa
kubelet 經過hash,比較實際運行的容器和spec中指望的容器,判斷是否容器已經改變,具體代碼以下:rest
func containerChanged(container *v1.Container, containerStatus *kubecontainer.Status) (uint64, uint64, bool) { expectedHash := kubecontainer.HashContainer(container) return expectedHash, containerStatus.Hash, containerStatus.Hash != expectedHash }
若是已經變化,則會執行重啓操做。code
restart := shouldRestartOnFailure(pod) if _, _, changed := containerChanged(&container, containerStatus); changed { message = fmt.Sprintf("Container %s definition changed", container.Name) // Restart regardless of the restart policy because the container // spec changed. restart = true }
而後就完成了就地升級。orm
上述的邏輯位於 kuberuntime_manager.go 文件SyncPod方法中。該文件是kubelet的kuberuntimeManager 的實現。
// SyncPod syncs the running pod into the desired pod by executing following steps: // // 1. Compute sandbox and container changes. // 2. Kill pod sandbox if necessary. // 3. Kill any containers that should not be running. // 4. Create sandbox if necessary. // 5. Create ephemeral containers. // 6. Create init containers. // 7. Create normal containers. func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) { // Step 1: Compute sandbox and container changes. podContainerChanges := m.computePodActions(pod, podStatus) klog.V(3).Infof("computePodActions got %+v for pod %q", podContainerChanges, format.Pod(pod)) if podContainerChanges.CreateSandbox { ref, err := ref.GetReference(legacyscheme.Scheme, pod) if err != nil { klog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), err) } if podContainerChanges.SandboxID != "" { m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.") } else { klog.V(4).Infof("SyncPod received new pod %q, will create a sandbox for it", format.Pod(pod)) } } // Step 2: Kill the pod if the sandbox has changed. if podContainerChanges.KillPod { if podContainerChanges.CreateSandbox { klog.V(4).Infof("Stopping PodSandbox for %q, will start new one", format.Pod(pod)) } else { klog.V(4).Infof("Stopping PodSandbox for %q because all other containers are dead.", format.Pod(pod)) } killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil) result.AddPodSyncResult(killResult) if killResult.Error() != nil { klog.Errorf("killPodWithSyncResult failed: %v", killResult.Error()) return } if podContainerChanges.CreateSandbox { m.purgeInitContainers(pod, podStatus) } } else { // Step 3: kill any running containers in this pod which are not to keep. for containerID, containerInfo := range podContainerChanges.ContainersToKill { klog.V(3).Infof("Killing unwanted container %q(id=%q) for pod %q", containerInfo.name, containerID, format.Pod(pod)) killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name) result.AddSyncResult(killContainerResult) if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil); err != nil { killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error()) klog.Errorf("killContainer %q(id=%q) for pod %q failed: %v", containerInfo.name, containerID, format.Pod(pod), err) return } } } // Keep terminated init containers fairly aggressively controlled // This is an optimization because container removals are typically handled // by container garbage collector. m.pruneInitContainersBeforeStart(pod, podStatus) // We pass the value of the PRIMARY podIP and list of podIPs down to // generatePodSandboxConfig and generateContainerConfig, which in turn // passes it to various other functions, in order to facilitate functionality // that requires this value (hosts file and downward API) and avoid races determining // the pod IP in cases where a container requires restart but the // podIP isn't in the status manager yet. The list of podIPs is used to // generate the hosts file. // // We default to the IPs in the passed-in pod status, and overwrite them if the // sandbox needs to be (re)started. var podIPs []string if podStatus != nil { podIPs = podStatus.IPs } // Step 4: Create a sandbox for the pod if necessary. podSandboxID := podContainerChanges.SandboxID if podContainerChanges.CreateSandbox { var msg string var err error klog.V(4).Infof("Creating PodSandbox for pod %q", format.Pod(pod)) createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod)) result.AddSyncResult(createSandboxResult) podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt) if err != nil { createSandboxResult.Fail(kubecontainer.ErrCreatePodSandbox, msg) klog.Errorf("createPodSandbox for pod %q failed: %v", format.Pod(pod), err) ref, referr := ref.GetReference(legacyscheme.Scheme, pod) if referr != nil { klog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), referr) } m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedCreatePodSandBox, "Failed to create pod sandbox: %v", err) return } klog.V(4).Infof("Created PodSandbox %q for pod %q", podSandboxID, format.Pod(pod)) podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID) if err != nil { ref, referr := ref.GetReference(legacyscheme.Scheme, pod) if referr != nil { klog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), referr) } m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedStatusPodSandBox, "Unable to get pod sandbox status: %v", err) klog.Errorf("Failed to get pod sandbox status: %v; Skipping pod %q", err, format.Pod(pod)) result.Fail(err) return } // If we ever allow updating a pod from non-host-network to // host-network, we may use a stale IP. if !kubecontainer.IsHostNetworkPod(pod) { // Overwrite the podIPs passed in the pod status, since we just started the pod sandbox. podIPs = m.determinePodSandboxIPs(pod.Namespace, pod.Name, podSandboxStatus) klog.V(4).Infof("Determined the ip %v for pod %q after sandbox changed", podIPs, format.Pod(pod)) } } // the start containers routines depend on pod ip(as in primary pod ip) // instead of trying to figure out if we have 0 < len(podIPs) // everytime, we short circuit it here podIP := "" if len(podIPs) != 0 { podIP = podIPs[0] } // Get podSandboxConfig for containers to start. configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID) result.AddSyncResult(configPodSandboxResult) podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt) if err != nil { message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err) klog.Error(message) configPodSandboxResult.Fail(kubecontainer.ErrConfigPodSandbox, message) return } // Helper containing boilerplate common to starting all types of containers. // typeName is a label used to describe this type of container in log messages, // currently: "container", "init container" or "ephemeral container" start := func(typeName string, spec *startSpec) error { startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, spec.container.Name) result.AddSyncResult(startContainerResult) isInBackOff, msg, err := m.doBackOff(pod, spec.container, podStatus, backOff) if isInBackOff { startContainerResult.Fail(err, msg) klog.V(4).Infof("Backing Off restarting %v %+v in pod %v", typeName, spec.container, format.Pod(pod)) return err } klog.V(4).Infof("Creating %v %+v in pod %v", typeName, spec.container, format.Pod(pod)) // NOTE (aramase) podIPs are populated for single stack and dual stack clusters. Send only podIPs. if msg, err := m.startContainer(podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs); err != nil { startContainerResult.Fail(err, msg) // known errors that are logged in other places are logged at higher levels here to avoid // repetitive log spam switch { case err == images.ErrImagePullBackOff: klog.V(3).Infof("%v %+v start failed in pod %v: %v: %s", typeName, spec.container, format.Pod(pod), err, msg) default: utilruntime.HandleError(fmt.Errorf("%v %+v start failed in pod %v: %v: %s", typeName, spec.container, format.Pod(pod), err, msg)) } return err } return nil } // Step 5: start ephemeral containers // These are started "prior" to init containers to allow running ephemeral containers even when there // are errors starting an init container. In practice init containers will start first since ephemeral // containers cannot be specified on pod creation. if utilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers) { for _, idx := range podContainerChanges.EphemeralContainersToStart { start("ephemeral container", ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx])) } } // Step 6: start the init container. if container := podContainerChanges.NextInitContainerToStart; container != nil { // Start the next init container. if err := start("init container", containerStartSpec(container)); err != nil { return } // Successfully started the container; clear the entry in the failure klog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod)) } // Step 7: start containers in podContainerChanges.ContainersToStart. for _, idx := range podContainerChanges.ContainersToStart { start("container", containerStartSpec(&pod.Spec.Containers[idx])) } return }
發佈於 4 小時前