轉載請聲明出處哦~,本篇文章發佈於luozhiyun的博客:https://www.luozhiyun.comphp
源碼版本是1.19html
Pod 水平自動擴縮全名是Horizontal Pod Autoscaler簡稱HPA。它能夠基於 CPU 利用率或其餘指標自動擴縮 ReplicationController、Deployment 和 ReplicaSet 中的 Pod 數量。nginx
Pod 水平自動擴縮器由--horizontal-pod-autoscaler-sync-period 參數指定週期(默認值爲 15 秒)。每一個週期內,控制器管理器根據每一個 HorizontalPodAutoscaler 定義中指定的指標查詢資源利用率。git
Pod 水平自動擴縮控制器跟據當前指標和指望指標來計算擴縮比例,公式爲:github
desiredReplicas = ceil[currentReplicas * ( currentMetricValue / desiredMetricValue )]
currentReplicas表示當前度量值,desiredMetricValue表示指望度量值,desiredReplicas表示指望副本數。例如,當前度量值爲 200m,目標設定值爲 100m,那麼因爲 200.0/100.0 == 2.0, 副本數量將會翻倍。 若是當前指標爲 50m,副本數量將會減半,由於50.0/100.0 == 0.5。apache
咱們能夠經過使用kubectl來建立HPA。如經過 kubectl create 命令建立一個 HPA 對象, 經過 kubectl get hpa 命令來獲取全部 HPA 對象, 經過 kubectl describe hpa 命令來查看 HPA 對象的詳細信息。 最後,可使用 kubectl delete hpa 命令刪除對象。vim
也能夠經過kubectl autoscale來建立 HPA 對象。 例如,命令 kubectl autoscale rs foo --min=2 --max=5 --cpu-percent=80 將會爲名 爲 foo 的 ReplicationSet 建立一個 HPA 對象, 目標 CPU 使用率爲 80%,副本數量配置爲 2 到 5 之間。api
若是指標變化太頻繁,咱們也可使用--horizontal-pod-autoscaler-downscale-stabilization
指令設置擴縮容延遲時間,表示的是自從上次縮容執行結束後,多久能夠再次執行縮容,默認是5m。數組
編寫用於測試的Deployment:app
apiVersion: apps/v1 kind: Deployment metadata: name: hpatest spec: replicas: 1 selector: matchLabels: app: hpatest template: metadata: labels: app: hpatest spec: containers: - name: hpatest image: nginx imagePullPolicy: IfNotPresent command: ["/bin/sh"] args: ["-c","/usr/sbin/nginx; while true;do echo `hostname -I` > /usr/share/nginx/html/index.html; sleep 120;done"] ports: - containerPort: 80 resources: requests: cpu: 1m memory: 100Mi limits: cpu: 3m memory: 400Mi --- apiVersion: v1 kind: Service metadata: name: hpatest-svc spec: selector: app: hpatest ports: - port: 80 targetPort: 80 protocol: TCP
編寫HPA,用於水平擴展,當cpu達到50%的利用率的時候開始擴展:
apiVersion: autoscaling/v1 kind: HorizontalPodAutoscaler metadata: name: haptest-nginx spec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: hpatest minReplicas: 2 maxReplicas: 6 targetCPUUtilizationPercentage: 50
寫一個簡單的壓測腳本:
[root@localhost HPA]# vim hpatest.sh while true do wget -q -O- http://10.68.50.65 done
觀察一下hpa的TARGETS狀況:
[root@localhost ~]# kubectl get hpa -w NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE hpatest Deployment/hpatest 0%/50% 1 5 1 5m47s hpatest Deployment/hpatest 400%/50% 1 5 1 5m49s hpatest Deployment/hpatest 400%/50% 1 5 4 6m4s hpatest Deployment/hpatest 400%/50% 1 5 5 6m19s hpatest Deployment/hpatest 500%/50% 1 5 5 6m49s
觀察是否會自動擴容:
[root@localhost ~]# kubectl get pods -o wide -w NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES hpatest-bbb44c476-jv8zr 0/1 ContainerCreating 0 0s <none> 192.168.13.130 <none> <none> hpatest-bbb44c476-sk6qb 0/1 ContainerCreating 0 0s <none> 192.168.13.130 <none> <none> hpatest-bbb44c476-7s5qn 0/1 ContainerCreating 0 0s <none> 192.168.13.130 <none> <none> hpatest-bbb44c476-7s5qn 1/1 Running 0 6s 172.20.0.23 192.168.13.130 <none> <none> hpatest-bbb44c476-sk6qb 1/1 Running 0 6s 172.20.0.22 192.168.13.130 <none> <none> hpatest-bbb44c476-jv8zr 1/1 Running 0 6s 172.20.0.21 192.168.13.130 <none> <none> hpatest-bbb44c476-dstnf 0/1 Pending 0 0s <none> <none> <none> <none> hpatest-bbb44c476-dstnf 0/1 Pending 0 0s <none> 192.168.13.130 <none> <none> hpatest-bbb44c476-dstnf 0/1 ContainerCreating 0 0s <none> 192.168.13.130 <none> <none> hpatest-bbb44c476-dstnf 1/1 Running 0 6s 172.20.0.24 192.168.13.130 <none> <none>
中止壓測以後,HPA開始自動縮容:
[root@localhost HPA]# kubectl get pod -w hpatest-bbb44c476-dstnf 0/1 Terminating 0 9m52s hpatest-bbb44c476-jv8zr 0/1 Terminating 0 10m hpatest-bbb44c476-7s5qn 0/1 Terminating 0 10m hpatest-bbb44c476-sk6qb 0/1 Terminating 0 10m hpatest-bbb44c476-sk6qb 0/1 Terminating 0 10m hpatest-bbb44c476-dstnf 0/1 Terminating 0 10m hpatest-bbb44c476-dstnf 0/1 Terminating 0 10m hpatest-bbb44c476-7s5qn 0/1 Terminating 0 10m hpatest-bbb44c476-7s5qn 0/1 Terminating 0 10m hpatest-bbb44c476-jv8zr 0/1 Terminating 0 10m hpatest-bbb44c476-jv8zr 0/1 Terminating 0 10m
文件位置:cmd/kube-controller-manager/app/controllermanager.go
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc { ... controllers["horizontalpodautoscaling"] = startHPAController ... }
HPA Controller和其餘的Controller同樣,都在NewControllerInitializers方法中進行註冊,而後經過startHPAController來啓動。
文件位置:cmd/kube-controller-manager/app/autoscaling.go
func startHPAController(ctx ControllerContext) (http.Handler, bool, error) { ... return startHPAControllerWithLegacyClient(ctx) } func startHPAControllerWithLegacyClient(ctx ControllerContext) (http.Handler, bool, error) { hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler") metricsClient := metrics.NewHeapsterMetricsClient( hpaClient, metrics.DefaultHeapsterNamespace, metrics.DefaultHeapsterScheme, metrics.DefaultHeapsterService, metrics.DefaultHeapsterPort, ) return startHPAControllerWithMetricsClient(ctx, metricsClient) } func startHPAControllerWithMetricsClient(ctx ControllerContext, metricsClient metrics.MetricsClient) (http.Handler, bool, error) { hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler") hpaClientConfig := ctx.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler") scaleKindResolver := scale.NewDiscoveryScaleKindResolver(hpaClient.Discovery()) scaleClient, err := scale.NewForConfig(hpaClientConfig, ctx.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver) if err != nil { return nil, false, err } // 初始化 go podautoscaler.NewHorizontalController( hpaClient.CoreV1(), scaleClient, hpaClient.AutoscalingV1(), ctx.RESTMapper, metricsClient, ctx.InformerFactory.Autoscaling().V1().HorizontalPodAutoscalers(), ctx.InformerFactory.Core().V1().Pods(), ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration, ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerDownscaleStabilizationWindow.Duration, ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerTolerance, ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerCPUInitializationPeriod.Duration, ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerInitialReadinessDelay.Duration, ).Run(ctx.Stop) return nil, true, nil }
最後會調用到startHPAControllerWithMetricsClient方法,啓動一個線程來調用NewHorizontalController方法初始化一個HPA Controller,而後執行Run方法。
文件位置:pkg/controller/podautoscaler/horizontal.go
func (a *HorizontalController) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer a.queue.ShutDown() klog.Infof("Starting HPA controller") defer klog.Infof("Shutting down HPA controller") if !cache.WaitForNamedCacheSync("HPA", stopCh, a.hpaListerSynced, a.podListerSynced) { return } // 啓動異步線程,每秒執行一次 go wait.Until(a.worker, time.Second, stopCh) <-stopCh }
這裏會調用worker執行具體的擴縮容的邏輯。
worker裏面一路執行下來會走到reconcileAutoscaler方法裏面,這裏是HPA的核心。下面咱們專一看看這部分。
func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.HorizontalPodAutoscaler, key string) error { ... //副本數爲0,不啓動自動擴縮容 if scale.Spec.Replicas == 0 && minReplicas != 0 { // Autoscaling is disabled for this resource desiredReplicas = 0 rescale = false setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "ScalingDisabled", "scaling is disabled since the replica count of the target is zero") // 若是當前副本數大於最大指望副本數,那麼設置指望副本數爲最大副本數 } else if currentReplicas > hpa.Spec.MaxReplicas { rescaleReason = "Current number of replicas above Spec.MaxReplicas" desiredReplicas = hpa.Spec.MaxReplicas // 同上 } else if currentReplicas < minReplicas { rescaleReason = "Current number of replicas below Spec.MinReplicas" desiredReplicas = minReplicas } else { var metricTimestamp time.Time //計算須要擴縮容的數量 metricDesiredReplicas, metricName, metricStatuses, metricTimestamp, err = a.computeReplicasForMetrics(hpa, scale, hpa.Spec.Metrics) if err != nil { ... } klog.V(4).Infof("proposing %v desired replicas (based on %s from %s) for %s", metricDesiredReplicas, metricName, metricTimestamp, reference) rescaleMetric := "" if metricDesiredReplicas > desiredReplicas { desiredReplicas = metricDesiredReplicas rescaleMetric = metricName } if desiredReplicas > currentReplicas { rescaleReason = fmt.Sprintf("%s above target", rescaleMetric) } if desiredReplicas < currentReplicas { rescaleReason = "All metrics below target" } //從1.18開始支持behavior字段 //能夠在擴縮容的時候指定一個穩定窗口,以防止縮放目標中的副本數量出現波動 //doc:https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/#support-for-configurable-scaling-behavior if hpa.Spec.Behavior == nil { desiredReplicas = a.normalizeDesiredReplicas(hpa, key, currentReplicas, desiredReplicas, minReplicas) } else { desiredReplicas = a.normalizeDesiredReplicasWithBehaviors(hpa, key, currentReplicas, desiredReplicas, minReplicas) } rescale = desiredReplicas != currentReplicas } ... }
這一段代碼是reconcileAutoscaler裏面的核心代碼,在這裏會肯定一個區間,首先根據當前的scale對象和當前hpa裏面配置的對應的參數的值,決策當前的副本數量,其中針對於超過設定的maxReplicas和小於minReplicas兩種狀況,只須要簡單的修正爲對應的值,直接更新對應的scale對象便可,而scale副本爲0的對象,則hpa不會在進行任何操做。
對於當前副本數在maxReplicas和minReplicas之間的時候,則須要計算是否須要擴縮容,計算則是調用computeReplicasForMetrics方法來實現。
最後若是設置了Behavior則調用normalizeDesiredReplicasWithBehaviors函數來修正最後的結果,Behavior相關能夠看文檔:https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/#support-for-configurable-scaling-behavior。
下面咱們一步步分析。
func (a *HorizontalController) computeReplicasForMetrics(hpa *autoscalingv2.HorizontalPodAutoscaler, scale *autoscalingv1.Scale, metricSpecs []autoscalingv2.MetricSpec) (replicas int32, metric string, statuses []autoscalingv2.MetricStatus, timestamp time.Time, err error) { ... //這裏的度量目標能夠是一個列表,因此遍歷以後取最大的須要擴縮容的數量 for i, metricSpec := range metricSpecs { //根據type類型計算須要擴縮容的數量 replicaCountProposal, metricNameProposal, timestampProposal, condition, err := a.computeReplicasForMetric(hpa, metricSpec, specReplicas, statusReplicas, selector, &statuses[i]) if err != nil { if invalidMetricsCount <= 0 { invalidMetricCondition = condition invalidMetricError = err } invalidMetricsCount++ } //記錄最大的須要擴縮容的數量 if err == nil && (replicas == 0 || replicaCountProposal > replicas) { timestamp = timestampProposal replicas = replicaCountProposal metric = metricNameProposal } } ... return replicas, metric, statuses, timestamp, nil }
由於咱們在設置metrics的時候其實是一個數組,以下:
apiVersion: autoscaling/v2beta2 kind: HorizontalPodAutoscaler metadata: name: php-apache spec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: php-apache minReplicas: 1 maxReplicas: 10 metrics: - type: Resource resource: name: cpu target: type: Utilization averageUtilization: 50 - type: Pods pods: metric: name: packets-per-second target: type: AverageValue averageValue: 1k - type: Object object: metric: name: requests-per-second describedObject: apiVersion: networking.k8s.io/v1beta1 kind: Ingress name: main-route target: type: Value value: 10k
例如這個官方的例子中,設置了三個metric,因此咱們在上面的代碼中遍歷全部的metrics,而後選取返回副本數最大的那個。主要計算邏輯都在computeReplicasForMetric中,下面咱們看看這個方法。
func (a *HorizontalController) computeReplicasForMetric(hpa *autoscalingv2.HorizontalPodAutoscaler, spec autoscalingv2.MetricSpec, specReplicas, statusReplicas int32, selector labels.Selector, status *autoscalingv2.MetricStatus) (replicaCountProposal int32, metricNameProposal string, timestampProposal time.Time, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) { //根據不一樣的類型來進行計量 switch spec.Type { //表示若是是一個k8s對象,如Ingress對象 case autoscalingv2.ObjectMetricSourceType: ... // 表示pod度量類型 case autoscalingv2.PodsMetricSourceType: metricSelector, err := metav1.LabelSelectorAsSelector(spec.Pods.Metric.Selector) if err != nil { condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetPodsMetric", err) return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err) } //僅支持AverageValue度量目標,計算須要擴縮容的數量 replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForPodsMetric(specReplicas, spec, hpa, selector, status, metricSelector) if err != nil { return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err) } // 表示Resource度量類型 case autoscalingv2.ResourceMetricSourceType: ... case autoscalingv2.ExternalMetricSourceType: ... default: errMsg := fmt.Sprintf("unknown metric source type %q", string(spec.Type)) err = fmt.Errorf(errMsg) condition := a.getUnableComputeReplicaCountCondition(hpa, "InvalidMetricSourceType", err) return 0, "", time.Time{}, condition, err } return replicaCountProposal, metricNameProposal, timestampProposal, autoscalingv2.HorizontalPodAutoscalerCondition{}, nil }
這裏會根據不一樣的度量類型來進行統計,目前度量類型有四種,分別是Pods、Object、Resource、External,解釋以下:
const ( // ObjectMetricSourceType is a metric describing a kubernetes object // (for example, hits-per-second on an Ingress object). // 這種度量專門用來描述k8s的內置對象 ObjectMetricSourceType MetricSourceType = "Object" // PodsMetricSourceType is a metric describing each pod in the current scale // target (for example, transactions-processed-per-second). The values // will be averaged together before being compared to the target value. // 這種度量描述在目前被統計的每一個pod平均指望值 PodsMetricSourceType MetricSourceType = "Pods" // ResourceMetricSourceType is a resource metric known to Kubernetes, as // specified in requests and limits, describing each pod in the current // scale target (e.g. CPU or memory). Such metrics are built in to // Kubernetes, and have special scaling options on top of those available // to normal per-pod metrics (the "pods" source). // Resource描述的是每一個pod中資源,如CPU或內存 ResourceMetricSourceType MetricSourceType = "Resource" // ExternalMetricSourceType is a global metric that is not associated // with any Kubernetes object. It allows autoscaling based on information // coming from components running outside of cluster // (for example length of queue in cloud messaging service, or // QPS from loadbalancer running outside of cluster). // External類型表示的是一種全局的度量,和k8s對象無關,主要依賴外部集羣提供信息 ExternalMetricSourceType MetricSourceType = "External" )
咱們這裏不會所有都介紹,挑選pod度量類型做爲例子。pod這個分支會調用computeStatusForPodsMetric方法來計算須要擴縮容的數量。
文件位置:pkg/controller/podautoscaler/replica_calculator.go
func (a *HorizontalController) computeStatusForPodsMetric(currentReplicas int32, metricSpec autoscalingv2.MetricSpec, hpa *autoscalingv2.HorizontalPodAutoscaler, selector labels.Selector, status *autoscalingv2.MetricStatus, metricSelector labels.Selector) (replicaCountProposal int32, timestampProposal time.Time, metricNameProposal string, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) { //計算須要擴縮容的數量 replicaCountProposal, utilizationProposal, timestampProposal, err := a.replicaCalc.GetMetricReplicas(currentReplicas, metricSpec.Pods.Target.AverageValue.MilliValue(), metricSpec.Pods.Metric.Name, hpa.Namespace, selector, metricSelector) if err != nil { condition = a.getUnableComputeReplicaCountCondition(hpa, "FailedGetPodsMetric", err) return 0, timestampProposal, "", condition, err } ... return replicaCountProposal, timestampProposal, fmt.Sprintf("pods metric %s", metricSpec.Pods.Metric.Name), autoscalingv2.HorizontalPodAutoscalerCondition{}, nil } func (c *ReplicaCalculator) GetMetricReplicas(currentReplicas int32, targetUtilization int64, metricName string, namespace string, selector labels.Selector, metricSelector labels.Selector) (replicaCount int32, utilization int64, timestamp time.Time, err error) { //獲取pod中度量數據 metrics, timestamp, err := c.metricsClient.GetRawMetric(metricName, namespace, selector, metricSelector) if err != nil { return 0, 0, time.Time{}, fmt.Errorf("unable to get metric %s: %v", metricName, err) } //經過結合度量數據來計算但願擴縮容的數量是多少 replicaCount, utilization, err = c.calcPlainMetricReplicas(metrics, currentReplicas, targetUtilization, namespace, selector, v1.ResourceName("")) return replicaCount, utilization, timestamp, err }
這裏會調用GetRawMetric方法來獲取pod對應的度量數據,而後再調用calcPlainMetricReplicas方法結合度量數據與目標指望來計算但願擴縮容的數量是多少。
calcPlainMetricReplicas方法邏輯比較多,下面分開來說解。
func (c *ReplicaCalculator) calcPlainMetricReplicas(metrics metricsclient.PodMetricsInfo, currentReplicas int32, targetUtilization int64, namespace string, selector labels.Selector, resource v1.ResourceName) (replicaCount int32, utilization int64, err error) { podList, err := c.podLister.Pods(namespace).List(selector) ... //將pod分紅三類進行統計,獲得ready的pod數量、ignored Pod集合、missing Pod集合 readyPodCount, ignoredPods, missingPods := groupPods(podList, metrics, resource, c.cpuInitializationPeriod, c.delayOfInitialReadinessStatus) //在度量的數據裏移除ignored Pods集合的數據 removeMetricsForPods(metrics, ignoredPods) //計算pod中container request 設置的資源之和 requests, err := calculatePodRequests(podList, resource) ... }
這裏會調用groupPods將pod列表的進行一個分類統計。ignoredPods集合裏面包含了pod狀態爲PodPending的數據;missingPods列表裏面包含了在度量數據裏面根據pod名找不到的數據。
由於missingPods的度量數據已經在metrics裏是找不到的,而後只須要剔除掉ignored Pods集合中度量的資源就行了。
接下來調用calculatePodRequests方法統計pod中container request 設置的資源之和。
咱們繼續往下看:
func (c *ReplicaCalculator) calcPlainMetricReplicas(metrics metricsclient.PodMetricsInfo, currentReplicas int32, targetUtilization int64, namespace string, selector labels.Selector, resource v1.ResourceName) (replicaCount int32, utilization int64, err error) { ... //獲取資源使用率 usageRatio, utilization := metricsclient.GetMetricUtilizationRatio(metrics, targetUtilization) ... }
到這裏會調用GetMetricUtilizationRatio方法計算資源使用率。 這個方法比較簡單:
usageRatio=currentUtilization/targetUtilization;
currentUtilization = metrics值之和metricsTotal/metrics的長度;
繼續往下:
func (c *ReplicaCalculator) calcPlainMetricReplicas(metrics metricsclient.PodMetricsInfo, currentReplicas int32, targetUtilization int64, namespace string, selector labels.Selector, resource v1.ResourceName) (replicaCount int32, utilization int64, err error) { ... rebalanceIgnored := len(ignoredPods) > 0 && usageRatio > 1.0 if !rebalanceIgnored && len(missingPods) == 0 { if math.Abs(1.0-usageRatio) <= c.tolerance { // return the current replicas if the change would be too small return currentReplicas, utilization, nil } //若是沒有unready 或 missing 的pod,那麼使用 usageRatio*readyPodCount計算須要擴縮容數量 return int32(math.Ceil(usageRatio * float64(readyPodCount))), utilization, nil } if len(missingPods) > 0 { if usageRatio < 1.0 { //若是是縮容,那麼將missing pod使用率設置爲目標資源使用率 for podName := range missingPods { metrics[podName] = metricsclient.PodMetric{Value: targetUtilization} } } else { //若是是擴容,那麼將missing pod使用率設置爲0 for podName := range missingPods { metrics[podName] = metricsclient.PodMetric{Value: 0} } } } if rebalanceIgnored { // 將unready pods使用率設置爲0 for podName := range ignoredPods { metrics[podName] = metricsclient.PodMetric{Value: 0} } } ... }
這裏邏輯比較清晰,首先是判斷若是missingPods和ignoredPods集合爲空,那麼檢查一下是否在tolerance容忍度以內默認是0.1,若是在的話直接返回不進行擴縮容,不然返回usageRatio*readyPodCount表示須要擴縮容的容量;
若是missingPods集合不爲空,那麼須要判斷一下是擴容仍是縮容,相應調整metrics裏面的值;
最後若是是擴容,還須要將ignoredPods集合的pod在metrics集合裏設置爲空。
接着看最後一部分:
func (c *ReplicaCalculator) calcPlainMetricReplicas(metrics metricsclient.PodMetricsInfo, currentReplicas int32, targetUtilization int64, namespace string, selector labels.Selector, resource v1.ResourceName) (replicaCount int32, utilization int64, err error) { ... //從新計算資源利用率 newUsageRatio, _ := metricsclient.GetMetricUtilizationRatio(metrics, targetUtilization) if math.Abs(1.0-newUsageRatio) <= c.tolerance || (usageRatio < 1.0 && newUsageRatio > 1.0) || (usageRatio > 1.0 && newUsageRatio < 1.0) { return currentReplicas, utilization, nil } return int32(math.Ceil(newUsageRatio * float64(len(metrics)))), utilization, nil }
由於上面從新對missingPods列表和ignoredPods列表中的metrics值進行了從新設置,因此這裏須要從新計算資源利用率。
若是變化在容忍度以內,或者usageRatio與newUsageRatio一個大於一個小於零表示二者伸縮方向不一致,那麼直接返回。不然返回newUsageRatio* metrics的長度做爲擴縮容的具體值。
介紹完了這一塊咱們再來看看整個邏輯流程圖:
講完了computeReplicasForMetrics方法,下面咱們繼續回到reconcileAutoscaler方法中往下看。
繼續往下就到了檢查是否設置了Behavior,若是沒有設置那麼走的是normalizeDesiredReplicas方法,這個方法較爲簡單,咱們直接看看normalizeDesiredReplicasWithBehaviors方法作了什麼,以及是怎麼實現的。
關於Behavior具體的例子能夠到這裏看:https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/#default-behavior。
func (a *HorizontalController) normalizeDesiredReplicasWithBehaviors(hpa *autoscalingv2.HorizontalPodAutoscaler, key string, currentReplicas, prenormalizedDesiredReplicas, minReplicas int32) int32 { //若是StabilizationWindowSeconds設置爲空,那麼給一個默認的值,默認300s a.maybeInitScaleDownStabilizationWindow(hpa) normalizationArg := NormalizationArg{ Key: key, ScaleUpBehavior: hpa.Spec.Behavior.ScaleUp, ScaleDownBehavior: hpa.Spec.Behavior.ScaleDown, MinReplicas: minReplicas, MaxReplicas: hpa.Spec.MaxReplicas, CurrentReplicas: currentReplicas, DesiredReplicas: prenormalizedDesiredReplicas} //根據參數獲取建議副本數 stabilizedRecommendation, reason, message := a.stabilizeRecommendationWithBehaviors(normalizationArg) normalizationArg.DesiredReplicas = stabilizedRecommendation ... //根據scaleDown或scaleUp指定的參數作限制 desiredReplicas, reason, message := a.convertDesiredReplicasWithBehaviorRate(normalizationArg) ... return desiredReplicas }
這個方法主要分爲兩部分,一部分是調用stabilizeRecommendationWithBehaviors方法來根據時間窗口來獲取一個建議副本數;另外一部分convertDesiredReplicasWithBehaviorRate方法是根據scaleDown或scaleUp指定的參數作限制。
stabilizeRecommendationWithBehaviors
func (a *HorizontalController) stabilizeRecommendationWithBehaviors(args NormalizationArg) (int32, string, string) { recommendation := args.DesiredReplicas foundOldSample := false oldSampleIndex := 0 var scaleDelaySeconds int32 var reason, message string var betterRecommendation func(int32, int32) int32 // 若是指望的副本數大於等於當前的副本數,則延遲時間=scaleUpBehaviro的穩定窗口時間 if args.DesiredReplicas >= args.CurrentReplicas { scaleDelaySeconds = *args.ScaleUpBehavior.StabilizationWindowSeconds betterRecommendation = min reason = "ScaleUpStabilized" message = "recent recommendations were lower than current one, applying the lowest recent recommendation" } else { // 指望副本數<當前的副本數 scaleDelaySeconds = *args.ScaleDownBehavior.StabilizationWindowSeconds betterRecommendation = max reason = "ScaleDownStabilized" message = "recent recommendations were higher than current one, applying the highest recent recommendation" } //獲取一個最大的時間窗口 maxDelaySeconds := max(*args.ScaleUpBehavior.StabilizationWindowSeconds, *args.ScaleDownBehavior.StabilizationWindowSeconds) obsoleteCutoff := time.Now().Add(-time.Second * time.Duration(maxDelaySeconds)) cutoff := time.Now().Add(-time.Second * time.Duration(scaleDelaySeconds)) for i, rec := range a.recommendations[args.Key] { if rec.timestamp.After(cutoff) { // 在截止時間以後,則當前建議有效, 則根據以前的比較函數來決策最終的建議副本數 recommendation = betterRecommendation(rec.recommendation, recommendation) } //若是被遍歷到的建議時間是在obsoleteCutoff以前,那麼須要從新設置建議 if rec.timestamp.Before(obsoleteCutoff) { foundOldSample = true oldSampleIndex = i } } //若是被遍歷到的建議時間是在obsoleteCutoff以前,那麼須要從新設置建議 if foundOldSample { a.recommendations[args.Key][oldSampleIndex] = timestampedRecommendation{args.DesiredReplicas, time.Now()} } else { a.recommendations[args.Key] = append(a.recommendations[args.Key], timestampedRecommendation{args.DesiredReplicas, time.Now()}) } return recommendation, reason, message }
這個方法首先會去校驗當前是擴容仍是縮容,若是是擴容,那麼將scaleDelaySeconds設置爲ScaleUpBehavior的時間,並將betterRecommendation方法設置爲min;若是是縮容那麼則相反。
而後會遍歷建議,若是建議時間在窗口時間cutoff以後,那麼須要調用betterRecommendation方法來獲取建議值,而後將獲取到的最終結果返回。
convertDesiredReplicasWithBehaviorRate
func (a *HorizontalController) convertDesiredReplicasWithBehaviorRate(args NormalizationArg) (int32, string, string) { var possibleLimitingReason, possibleLimitingMessage string //若是指望副本數大於當前副本數 if args.DesiredReplicas > args.CurrentReplicas { //獲取預期擴容的pod數量 scaleUpLimit := calculateScaleUpLimitWithScalingRules(args.CurrentReplicas, a.scaleUpEvents[args.Key], args.ScaleUpBehavior) if scaleUpLimit < args.CurrentReplicas { // We shouldn't scale up further until the scaleUpEvents will be cleaned up scaleUpLimit = args.CurrentReplicas } maximumAllowedReplicas := args.MaxReplicas if maximumAllowedReplicas > scaleUpLimit { maximumAllowedReplicas = scaleUpLimit possibleLimitingReason = "ScaleUpLimit" possibleLimitingMessage = "the desired replica count is increasing faster than the maximum scale rate" } else { possibleLimitingReason = "TooManyReplicas" possibleLimitingMessage = "the desired replica count is more than the maximum replica count" } if args.DesiredReplicas > maximumAllowedReplicas { return maximumAllowedReplicas, possibleLimitingReason, possibleLimitingMessage } } else if args.DesiredReplicas < args.CurrentReplicas { //獲取預期縮容的pod數量 scaleDownLimit := calculateScaleDownLimitWithBehaviors(args.CurrentReplicas, a.scaleDownEvents[args.Key], args.ScaleDownBehavior) if scaleDownLimit > args.CurrentReplicas { // We shouldn't scale down further until the scaleDownEvents will be cleaned up scaleDownLimit = args.CurrentReplicas } minimumAllowedReplicas := args.MinReplicas if minimumAllowedReplicas < scaleDownLimit { minimumAllowedReplicas = scaleDownLimit possibleLimitingReason = "ScaleDownLimit" possibleLimitingMessage = "the desired replica count is decreasing faster than the maximum scale rate" } else { possibleLimitingMessage = "the desired replica count is less than the minimum replica count" possibleLimitingReason = "TooFewReplicas" } if args.DesiredReplicas < minimumAllowedReplicas { return minimumAllowedReplicas, possibleLimitingReason, possibleLimitingMessage } } return args.DesiredReplicas, "DesiredWithinRange", "the desired count is within the acceptable range" }
這個方法和上面的方法有些相似,不過是根據behavior具體行爲來作一個約束。若是是scaleUp,那麼須要調用calculateScaleUpLimitWithScalingRules來獲取預期擴容的pod數量,calculateScaleUpLimitWithScalingRules方法裏面會根據behavior設置的selectPolicy以及scaleUp.type參數來作一個計算,以下:
func calculateScaleUpLimitWithScalingRules(currentReplicas int32, scaleEvents []timestampedScaleEvent, scalingRules *autoscalingv2.HPAScalingRules) int32 { var result int32 var proposed int32 var selectPolicyFn func(int32, int32) int32 if *scalingRules.SelectPolicy == autoscalingv2.DisabledPolicySelect { return currentReplicas // Scaling is disabled } else if *scalingRules.SelectPolicy == autoscalingv2.MinPolicySelect { selectPolicyFn = min // For scaling up, the lowest change ('min' policy) produces a minimum value } else { selectPolicyFn = max // Use the default policy otherwise to produce a highest possible change } for _, policy := range scalingRules.Policies { //獲取最近變動的副本數 replicasAddedInCurrentPeriod := getReplicasChangePerPeriod(policy.PeriodSeconds, scaleEvents) periodStartReplicas := currentReplicas - replicasAddedInCurrentPeriod //根據不一樣的policy類型,決定不一樣的預期值 if policy.Type == autoscalingv2.PodsScalingPolicy { proposed = int32(periodStartReplicas + policy.Value) } else if policy.Type == autoscalingv2.PercentScalingPolicy { proposed = int32(math.Ceil(float64(periodStartReplicas) * (1 + float64(policy.Value)/100))) } result = selectPolicyFn(result, proposed) } return result } func getReplicasChangePerPeriod(periodSeconds int32, scaleEvents []timestampedScaleEvent) int32 { period := time.Second * time.Duration(periodSeconds) cutoff := time.Now().Add(-period) var replicas int32 //遍歷最近變動 for _, rec := range scaleEvents { if rec.timestamp.After(cutoff) { // 更新副本修改的數量, 會有正負,最終replicas就是最近變動的數量 replicas += rec.replicaChange } } return replicas }
若是沒有設置selectPolicy那麼selectPolicyFn默認就是max方法,而後在遍歷Policies的時候,若是type是pod,那麼就加上一個具體值,若是是Percent,那麼就加上一個百分比。
若是當前的副本數已經大於scaleUpLimit,那麼則設置scaleUpLimit爲當前副本數,若是指望副本數超過了最大容許副本數,那麼直接返回,不然返回指望副本數就行了。
下面來一張圖理一下邏輯:
水平擴展大致邏輯能夠用下面這兩張圖來進行一個歸納,若是感興趣的話,其中有不少細微的邏輯仍是須要參照文檔和代碼一塊兒理解起來纔會更加的好。
https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/
https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale-walkthrough/