Pod 就地升級3--kubelet 經過hash管理容器版本

k8s原生並不支持就地升級。諸如deployment等工做負載在升級的過程當中,直接對Pod進行recreategit

實現容器的就地升級的另一個前提是kubelet經過容器hash 來管理容器版本。github

當建立一個Pod的時候,kubelet會計算每一個容器的hash,而且把該hash值寫到ContainerStatus中,該status的定義以下:json

// Status represents the status of a container.
type Status struct {
    // ID of the container.
    ID ContainerID
    // Name of the container.
    Name string
    // Status of the container.
    State State
    // Creation time of the container.
    CreatedAt time.Time
    // Start time of the container.
    StartedAt time.Time
    // Finish time of the container.
    FinishedAt time.Time
    // Exit code of the container.
    ExitCode int
    // Name of the image, this also includes the tag of the image,
    // the expected form is "NAME:TAG".
    Image string
    // ID of the image.
    ImageID string
    // Hash of the container, used for comparison.
    Hash uint64
    // Number of times that the container has been restarted.
    RestartCount int
    // A string explains why container is in such a status.
    Reason string
    // Message written by the container before exiting (stored in
    // TerminationMessagePath).
    Message string
}

其中的Hashfiled 正是咱們今天講到的hash。less

若是spec.containers[i].image 發生變化以後,kubelet 經過計算hash值,判斷出容器版本已經變化,須要執行升級操做,拉取新的image,重啓該容器,重啓成功以後,會把新的hash寫到上面講的ContainerStatus中。ui

kubelet 源碼中提供了一個計算容器hash的輔助方法:this

// HashContainer returns the hash of the container. It is used to compare
// the running container with its desired spec.
// Note: remember to update hashValues in container_hash_test.go as well.
func HashContainer(container *v1.Container) uint64 {
    hash := fnv.New32a()
    // Omit nil or empty field when calculating hash value
    // Please see https://github.com/kubernetes/kubernetes/issues/53644
    containerJSON, _ := json.Marshal(container)
    hashutil.DeepHashObject(hash, containerJSON)
    return uint64(hash.Sum32())
}

該方法,實現了容器hash的計算。spa

kubelet 經過hash,比較實際運行的容器和spec中指望的容器,判斷是否容器已經改變,具體代碼以下:rest

func containerChanged(container *v1.Container, containerStatus *kubecontainer.Status) (uint64, uint64, bool) {
    expectedHash := kubecontainer.HashContainer(container)
    return expectedHash, containerStatus.Hash, containerStatus.Hash != expectedHash
}

若是已經變化,則會執行重啓操做。code

restart := shouldRestartOnFailure(pod)
        if _, _, changed := containerChanged(&container, containerStatus); changed {
            message = fmt.Sprintf("Container %s definition changed", container.Name)
            // Restart regardless of the restart policy because the container
            // spec changed.
            restart = true
        }

而後就完成了就地升級。orm

上述的邏輯位於 kuberuntime_manager.go 文件SyncPod方法中。該文件是kubelet的kuberuntimeManager 的實現。

// SyncPod syncs the running pod into the desired pod by executing following steps:
//
//  1. Compute sandbox and container changes.
//  2. Kill pod sandbox if necessary.
//  3. Kill any containers that should not be running.
//  4. Create sandbox if necessary.
//  5. Create ephemeral containers.
//  6. Create init containers.
//  7. Create normal containers.
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
    // Step 1: Compute sandbox and container changes.
    podContainerChanges := m.computePodActions(pod, podStatus)
    klog.V(3).Infof("computePodActions got %+v for pod %q", podContainerChanges, format.Pod(pod))
    if podContainerChanges.CreateSandbox {
        ref, err := ref.GetReference(legacyscheme.Scheme, pod)
        if err != nil {
            klog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), err)
        }
        if podContainerChanges.SandboxID != "" {
            m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.")
        } else {
            klog.V(4).Infof("SyncPod received new pod %q, will create a sandbox for it", format.Pod(pod))
        }
    }

    // Step 2: Kill the pod if the sandbox has changed.
    if podContainerChanges.KillPod {
        if podContainerChanges.CreateSandbox {
            klog.V(4).Infof("Stopping PodSandbox for %q, will start new one", format.Pod(pod))
        } else {
            klog.V(4).Infof("Stopping PodSandbox for %q because all other containers are dead.", format.Pod(pod))
        }

        killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
        result.AddPodSyncResult(killResult)
        if killResult.Error() != nil {
            klog.Errorf("killPodWithSyncResult failed: %v", killResult.Error())
            return
        }

        if podContainerChanges.CreateSandbox {
            m.purgeInitContainers(pod, podStatus)
        }
    } else {
        // Step 3: kill any running containers in this pod which are not to keep.
        for containerID, containerInfo := range podContainerChanges.ContainersToKill {
            klog.V(3).Infof("Killing unwanted container %q(id=%q) for pod %q", containerInfo.name, containerID, format.Pod(pod))
            killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name)
            result.AddSyncResult(killContainerResult)
            if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil); err != nil {
                killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
                klog.Errorf("killContainer %q(id=%q) for pod %q failed: %v", containerInfo.name, containerID, format.Pod(pod), err)
                return
            }
        }
    }

    // Keep terminated init containers fairly aggressively controlled
    // This is an optimization because container removals are typically handled
    // by container garbage collector.
    m.pruneInitContainersBeforeStart(pod, podStatus)

    // We pass the value of the PRIMARY podIP and list of podIPs down to
    // generatePodSandboxConfig and generateContainerConfig, which in turn
    // passes it to various other functions, in order to facilitate functionality
    // that requires this value (hosts file and downward API) and avoid races determining
    // the pod IP in cases where a container requires restart but the
    // podIP isn't in the status manager yet. The list of podIPs is used to
    // generate the hosts file.
    //
    // We default to the IPs in the passed-in pod status, and overwrite them if the
    // sandbox needs to be (re)started.
    var podIPs []string
    if podStatus != nil {
        podIPs = podStatus.IPs
    }

    // Step 4: Create a sandbox for the pod if necessary.
    podSandboxID := podContainerChanges.SandboxID
    if podContainerChanges.CreateSandbox {
        var msg string
        var err error

        klog.V(4).Infof("Creating PodSandbox for pod %q", format.Pod(pod))
        createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))
        result.AddSyncResult(createSandboxResult)
        podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
        if err != nil {
            createSandboxResult.Fail(kubecontainer.ErrCreatePodSandbox, msg)
            klog.Errorf("createPodSandbox for pod %q failed: %v", format.Pod(pod), err)
            ref, referr := ref.GetReference(legacyscheme.Scheme, pod)
            if referr != nil {
                klog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), referr)
            }
            m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedCreatePodSandBox, "Failed to create pod sandbox: %v", err)
            return
        }
        klog.V(4).Infof("Created PodSandbox %q for pod %q", podSandboxID, format.Pod(pod))

        podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID)
        if err != nil {
            ref, referr := ref.GetReference(legacyscheme.Scheme, pod)
            if referr != nil {
                klog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), referr)
            }
            m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedStatusPodSandBox, "Unable to get pod sandbox status: %v", err)
            klog.Errorf("Failed to get pod sandbox status: %v; Skipping pod %q", err, format.Pod(pod))
            result.Fail(err)
            return
        }

        // If we ever allow updating a pod from non-host-network to
        // host-network, we may use a stale IP.
        if !kubecontainer.IsHostNetworkPod(pod) {
            // Overwrite the podIPs passed in the pod status, since we just started the pod sandbox.
            podIPs = m.determinePodSandboxIPs(pod.Namespace, pod.Name, podSandboxStatus)
            klog.V(4).Infof("Determined the ip %v for pod %q after sandbox changed", podIPs, format.Pod(pod))
        }
    }

    // the start containers routines depend on pod ip(as in primary pod ip)
    // instead of trying to figure out if we have 0 < len(podIPs)
    // everytime, we short circuit it here
    podIP := ""
    if len(podIPs) != 0 {
        podIP = podIPs[0]
    }

    // Get podSandboxConfig for containers to start.
    configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID)
    result.AddSyncResult(configPodSandboxResult)
    podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt)
    if err != nil {
        message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)
        klog.Error(message)
        configPodSandboxResult.Fail(kubecontainer.ErrConfigPodSandbox, message)
        return
    }

    // Helper containing boilerplate common to starting all types of containers.
    // typeName is a label used to describe this type of container in log messages,
    // currently: "container", "init container" or "ephemeral container"
    start := func(typeName string, spec *startSpec) error {
        startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, spec.container.Name)
        result.AddSyncResult(startContainerResult)

        isInBackOff, msg, err := m.doBackOff(pod, spec.container, podStatus, backOff)
        if isInBackOff {
            startContainerResult.Fail(err, msg)
            klog.V(4).Infof("Backing Off restarting %v %+v in pod %v", typeName, spec.container, format.Pod(pod))
            return err
        }

        klog.V(4).Infof("Creating %v %+v in pod %v", typeName, spec.container, format.Pod(pod))
        // NOTE (aramase) podIPs are populated for single stack and dual stack clusters. Send only podIPs.
        if msg, err := m.startContainer(podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs); err != nil {
            startContainerResult.Fail(err, msg)
            // known errors that are logged in other places are logged at higher levels here to avoid
            // repetitive log spam
            switch {
            case err == images.ErrImagePullBackOff:
                klog.V(3).Infof("%v %+v start failed in pod %v: %v: %s", typeName, spec.container, format.Pod(pod), err, msg)
            default:
                utilruntime.HandleError(fmt.Errorf("%v %+v start failed in pod %v: %v: %s", typeName, spec.container, format.Pod(pod), err, msg))
            }
            return err
        }

        return nil
    }

    // Step 5: start ephemeral containers
    // These are started "prior" to init containers to allow running ephemeral containers even when there
    // are errors starting an init container. In practice init containers will start first since ephemeral
    // containers cannot be specified on pod creation.
    if utilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers) {
        for _, idx := range podContainerChanges.EphemeralContainersToStart {
            start("ephemeral container", ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))
        }
    }

    // Step 6: start the init container.
    if container := podContainerChanges.NextInitContainerToStart; container != nil {
        // Start the next init container.
        if err := start("init container", containerStartSpec(container)); err != nil {
            return
        }

        // Successfully started the container; clear the entry in the failure
        klog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod))
    }

    // Step 7: start containers in podContainerChanges.ContainersToStart.
    for _, idx := range podContainerChanges.ContainersToStart {
        start("container", containerStartSpec(&pod.Spec.Containers[idx]))
    }

    return
}

發佈於 4 小時前

相關文章
相關標籤/搜索