3、源碼分析node
在介紹代碼以前,zouyee先帶各位看一看CPU manager的啓動圖(CPU manager屬於Container Manager模塊的子系統)
複製代碼
對於上圖的內容,zouyee總結流程以下:linux
一、在命令行啓動部分,Kubelet中調用NewContainerManager構建ContainerManager
二、NewContainerManager函數調用topologymanager.NewManager構建拓撲管理器
三、NewContainerManager函數調用cpumanager.NewManager構建CPU管理器
四、拓撲管理器使用AddHintPriovider方法將CPU管理器加入管理
五、回到命令行啓動部分,調用NewMainKubelet(),構建Kubelet結構體
六、構建Kubelet結構體時,將CPU管理器跟拓撲管理器封裝爲InternalContainerLifecycle接口,其實現Pod相關的生命週期資源管理操做,涉及CPU相關的是PreStart方法
七、構建Kubelet結構體時,調用AddPodmitHandler將GetAllocateResourcesPodAdmitHandler方法加入到Pod准入插件中,在Pod建立時,資源預分配檢查
八、構建Kubelet結構體後,調用ContainerManager的Start方法,ContainerManager在Start方法中調用CPU管理器的Start方法,其作一些處理工做並孵化一個goroutine,執行reconcileState()
下面依次進行講解。
STEP 1
Kubelet中調用NewContainerManager構建ContainerManager, 涉及代碼爲cmd/kubelet/app/server.go
在run函數中完成ContainerManager初始化工做
func run(ctx context.Context, 參數太長,不寫全了){
....
if kubeDeps.ContainerManager == nil {
...
kubeDeps.ContainerManager, err = cm.NewContainerManager(
...
)
...
}
}
STEP 2-4
NewContainerManager函數調用topologymanager.NewManager構建拓撲管理器,涉及代碼
複製代碼
pkg/kubelet/cm/container_manager_linux.gomarkdown
func NewContainerManager(參數太長,不寫全了) {
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager){
// 判斷特性是否開啓,構建拓撲管理
cm.topologyManager, err = topologymanager.NewManager(
machineInfo.Topology,
nodeConfig.ExperimentalTopologyManagerPolicy,
nodeConfig.ExperimentalTopologyManagerScope,
)
}
// 判斷特性是否開啓,構建CPU管理
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
cm.cpuManager, err = cpumanager.NewManager(
nodeConfig.ExperimentalCPUManagerPolicy,
nodeConfig.ExperimentalCPUManagerReconcilePeriod,
machineInfo,
nodeConfig.NodeAllocatableConfig.ReservedSystemCPUs,
cm.GetNodeAllocatableReservation(),
nodeConfig.KubeletRootDir,
cm.topologyManager,
)
if err != nil {
klog.Errorf("failed to initialize cpu manager: %v", err)
return nil, err
}
// 拓撲管理器使用AddHintPriovider方法將CPU管理器加入管理
cm.topologyManager.AddHintProvider(cm.cpuManager)
}
}
其中關於CPU管理器的初始化:
type CPUTopology struct {
NumCPUs int
NumCores int
NumSockets int
CPUDetails CPUDetails
}
type CPUDetails map[int]CPUInfo
type CPUInfo struct {
NUMANodeID int
SocketID int
CoreID int
}
func NewManager(參數太多,省略了) (Manager, error) {
// 根據cpuPolicyName,決定初始化policy,當前支持none和static
switch policyName(cpuPolicyName) {
...
case PolicyStatic:
// 1. 根據cadvisor的數據,生產topology結構體
topo, err = topology.Discover(machineInfo)
// 2. 檢查reserved的CPU是否爲0,須要kube+system reserved的CPU > 0
// 3. 初始化policy
policy, err = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity)
...
}
}
STEP 5-7
在run函數中完成ContainerManager初始化工做後,調用RunKubelet函數構建Kubelet結構體,其最終調用NewMainKubelet(),完成Kubelet結構體構建。涉及代碼pkg/kubelet/kubelet.go
func NewMainKubelet(參數太長,不寫全了)(*Kubelet, error) {
...
klet := &Kubelet{
...
containerManager: kubeDeps.ContainerManager,
...
}
...
runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
...
// 構建Kubelet結構體時,將CPU管理器跟拓撲管理器封裝爲InternalContainerLifecycle接口
kubeDeps.ContainerManager.InternalContainerLifecycle(),
...
)
...
// 調用AddPodmitHandler將GetAllocateResourcesPodAdmitHandler方法加入到Pod准入插件中,在Pod建立時,資源預分配檢查
klet.admitHandlers.AddPodAdmitHandler(klet.containerManager.GetAllocateResourcesPodAdmitHandler())
...
}
其中在InternalContainerLifecycle接口,涉及CPU部分在PreStartContainer方法,涉及代碼pkg/kubelet/cm/internal_container_lifecycle.go
func (i *internalContainerLifecycleImpl) PreStartContainer(參數太長,不寫全了) error {
if i.cpuManager != nil {
i.cpuManager.AddContainer(pod, container, containerID)
}
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) {
err := i.topologyManager.AddContainer(pod, containerID)
if err != nil {
return err
}
}
return nil
}
那麼什麼時候調用呢?
上面咱們提到了kuberuntime.NewKubeGenericRuntimeManager,該函數實例化KubeGenericRuntimeManager結構體(後續詳細介紹),而該結構體在startContainer方法中,進行調用,涉及代碼pkg/kubelet/kuberuntime/kuberuntime_container.go
// 用於啓動容器,該結構體實現了Runtime接口
func (m *kubeGenericRuntimeManager) startContainer(參數太多,不寫了) (string, error) {
...
err = m.internalLifecycle.PreStartContainer(pod, container, containerID)
...
}
另外GetAllocateResourcesPodAdmitHandler 須要實現返回的結構體須要實現Admit接口
複製代碼
涉及代碼pkg/kubelet/cm/container_manager_linux.goapp
func (m *resourceAllocator) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
pod := attrs.Pod
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
...
if m.cpuManager != nil {
err = m.cpuManager.Allocate(pod, &container)
...
}
}
return lifecycle.PodAdmitResult{Admit: true}
}
實際調用邏輯爲m.cpuManager.Allocate->m.policy.Allocate->func (p *staticPolicy) Allocate (none策略無需操做),涉及代碼pkg/kubelet/cm/cpumanager/policy_static.go
func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error {
// 1. 如介紹所說,檢查是否知足分配,即QOS爲Guaranteed,且分配CPU爲整型
if numCPUs := p.guaranteedCPUs(pod, container); numCPUs != 0 {
// 2. 獲取是否分配過,分配過則更新便可
if cpuset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
...
}
// 3. 獲取親和性拓撲
hint := p.affinity.GetAffinity(string(pod.UID), container.Name)
// 4. 根據numa親和性進行分配
cpuset, err := p.allocateCPUs(.. )
// 5. 設置分配結果
s.SetCPUSet(string(pod.UID), container.Name, cpuset)
// 6. 設置reuse字段
p.updateCPUsToReuse(pod, container, cpuset)
}
// container belongs in the shared pool (nothing to do; use default cpuset)
return nil
}
STEP 8
構建完成Kubelet結構體後,在Kubelet方法initializeRuntimeDependentModules中調用ContainerManager的Start方法,涉及代碼pkg/kubelet/kubelet.go
func (kl *Kubelet) initializeRuntimeDependentModules() {
...
// 這裏根據咱們前面說明的,須要cadvisor的數據,所以須要提早啓動
if err := kl.containerManager.Start(省略); err != nil {
...
}
...
}
複製代碼
ContainerManager在Start方法中調用CPU管理器的Start方法,具體步驟以下:ide
a. 構建Checkpoint,其中包含文件及內存的操做
b. 根據初始化的policy,運行Start, 實際只有static起到做用,主要是校驗工做
c. 孵化一個goroutine,執行reconcileState()
func (cm *containerManagerImpl) Start(參數太多,省略) error {
...
// 初始化CPU管理器
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
...
err = cm.cpuManager.Start(參數太多,省略)
...
}
...
}
// 涉及代碼pkg/kubelet/cm/cpumanager/cpu_manager.go
func (m *manager) Start(參數太多,省略) error {
...
// 該處爲Checkpoint處理,實際爲文件管理工做,即分配等狀況的數據保存
stateImpl, err := state.NewCheckpointState(m.stateFileDirectory, cpuManagerStateFileName, m.policy.Name(), m.containerMap)
...
// 孵化一個goroutine,執行reconcileState()
// 處理當前實際CPU分配的工做,相似actual與desired
go wait.Until(func() { m.reconcileState() }, m.reconcilePeriod, wait.NeverStop)
}
其中reconcileState 主要完成如下工做
a. 處理當前活躍Pod,更新containerMap結構體
b. 經過CRI接口更新容器底層的CPU配置(即m.containerRuntime.UpdateContainerResources)
後續zouyee將帶各位看看ContainerManager各大組件:拓撲管理、設備管理、容器管理等。
複製代碼
點擊查看全文函數
後續相關內容,請查看公衆號:DCOSoop
4、參考資料源碼分析
一、cpusetspa
二、cpu topology插件
三、cpu manager policy