Knative 駕馭篇:帶你 '縱橫馳騁' Knative 自動擴縮容實現

Knative 中提供了自動擴縮容靈活的實現機制,本文從 三橫兩縱 的維度帶你深刻了解 KPA 自動擴縮容的實現機制。讓你輕鬆駕馭 Knative 自動擴縮容。web

注:本文基於最新 Knative v0.11.0 版本代碼解讀websocket

KPA 實現流程圖

在 Knative 中,建立一個 Revision 會相應的建立 PodAutoScaler 資源。在KPA中經過操做 PodAutoScaler 資源,對當前的 Revision 中的 POD 進行擴縮容。app

針對上面的流程實現,咱們從三橫兩縱的維度進行剖析其實現機制。less

三橫

  • KPA 控制器
  • 根據指標定時計算 POD 數
  • 指標採集

KPA 控制器

經過Revision 建立PodAutoScaler, 在 KPA 控制器中主要包括兩個資源(Decider 和 Metric)和一個操做(Scale)。主要代碼以下socket

func (c *Reconciler) reconcile(ctx context.Context, pa *pav1alpha1.PodAutoscaler) error {
    ......
    decider, err := c.reconcileDecider(ctx, pa, pa.Status.MetricsServiceName)
    if err != nil {
        return fmt.Errorf("error reconciling Decider: %w", err)
    }

    if err := c.ReconcileMetric(ctx, pa, pa.Status.MetricsServiceName); err != nil {
        return fmt.Errorf("error reconciling Metric: %w", err)
    }

    // Metrics services are no longer needed as we use the private services now.
    if err := c.DeleteMetricsServices(ctx, pa); err != nil {
        return err
    }

    // Get the appropriate current scale from the metric, and right size
    // the scaleTargetRef based on it.
    want, err := c.scaler.Scale(ctx, pa, sks, decider.Status.DesiredScale)
    if err != nil {
        return fmt.Errorf("error scaling target: %w", err)
    }
......
}

這裏先介紹一下兩個資源:ide

  • Decider : 擴縮容決策的資源,經過Decider獲取擴縮容POD數: DesiredScale。
  • Metric:採集指標的資源,經過Metric會採集當前Revision下的POD指標。

再看一下Scale操做,在Scale方法中,根據擴縮容POD數、最小實例數和最大實例數肯定最終須要擴容的POD實例數,而後修改deployment的Replicas值,最終實現POD的擴縮容, 代碼實現以下:函數

// Scale attempts to scale the given PA's target reference to the desired scale.
func (ks *scaler) Scale(ctx context.Context, pa *pav1alpha1.PodAutoscaler, sks *nv1a1.ServerlessService, desiredScale int32) (int32, error) {
......
    min, max := pa.ScaleBounds()
    if newScale := applyBounds(min, max, desiredScale); newScale != desiredScale {
        logger.Debugf("Adjusting desiredScale to meet the min and max bounds before applying: %d -> %d", desiredScale, newScale)
        desiredScale = newScale
    }

    desiredScale, shouldApplyScale := ks.handleScaleToZero(ctx, pa, sks, desiredScale)
    if !shouldApplyScale {
        return desiredScale, nil
    }

    ps, err := resources.GetScaleResource(pa.Namespace, pa.Spec.ScaleTargetRef, ks.psInformerFactory)
    if err != nil {
        return desiredScale, fmt.Errorf("failed to get scale target %v: %w", pa.Spec.ScaleTargetRef, err)
    }

    currentScale := int32(1)
    if ps.Spec.Replicas != nil {
        currentScale = *ps.Spec.Replicas
    }
    if desiredScale == currentScale {
        return desiredScale, nil
    }

    logger.Infof("Scaling from %d to %d", currentScale, desiredScale)
    return ks.applyScale(ctx, pa, desiredScale, ps)
}

根據指標定時計算 POD 數

這是一個關於Decider的故事。Decider建立以後會同時建立出來一個定時器,該定時器默認每隔 2 秒(能夠經過TickInterval 參數配置)會調用Scale方法,該Scale方法實現以下:ui

func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (desiredPodCount int32, excessBC int32, validScale bool) {
    ......
    metricName := spec.ScalingMetric
    var observedStableValue, observedPanicValue float64
    switch spec.ScalingMetric {
    case autoscaling.RPS:
        observedStableValue, observedPanicValue, err = a.metricClient.StableAndPanicRPS(metricKey, now)
        a.reporter.ReportStableRPS(observedStableValue)
        a.reporter.ReportPanicRPS(observedPanicValue)
        a.reporter.ReportTargetRPS(spec.TargetValue)
    default:
        metricName = autoscaling.Concurrency // concurrency is used by default
        observedStableValue, observedPanicValue, err = a.metricClient.StableAndPanicConcurrency(metricKey, now)
        a.reporter.ReportStableRequestConcurrency(observedStableValue)
        a.reporter.ReportPanicRequestConcurrency(observedPanicValue)
        a.reporter.ReportTargetRequestConcurrency(spec.TargetValue)
    }

    // Put the scaling metric to logs.
    logger = logger.With(zap.String("metric", metricName))

    if err != nil {
        if err == ErrNoData {
            logger.Debug("No data to scale on yet")
        } else {
            logger.Errorw("Failed to obtain metrics", zap.Error(err))
        }
        return 0, 0, false
    }

    // Make sure we don't get stuck with the same number of pods, if the scale up rate
    // is too conservative and MaxScaleUp*RPC==RPC, so this permits us to grow at least by a single
    // pod if we need to scale up.
    // E.g. MSUR=1.1, OCC=3, RPC=2, TV=1 => OCC/TV=3, MSU=2.2 => DSPC=2, while we definitely, need
    // 3 pods. See the unit test for this scenario in action.
    maxScaleUp := math.Ceil(spec.MaxScaleUpRate * readyPodsCount)
    // Same logic, opposite math applies here.
    maxScaleDown := math.Floor(readyPodsCount / spec.MaxScaleDownRate)

    dspc := math.Ceil(observedStableValue / spec.TargetValue)
    dppc := math.Ceil(observedPanicValue / spec.TargetValue)
    logger.Debugf("DesiredStablePodCount = %0.3f, DesiredPanicPodCount = %0.3f, MaxScaleUp = %0.3f, MaxScaleDown = %0.3f",
        dspc, dppc, maxScaleUp, maxScaleDown)

    // We want to keep desired pod count in the  [maxScaleDown, maxScaleUp] range.
    desiredStablePodCount := int32(math.Min(math.Max(dspc, maxScaleDown), maxScaleUp))
    desiredPanicPodCount := int32(math.Min(math.Max(dppc, maxScaleDown), maxScaleUp))
......
    return desiredPodCount, excessBC, true
}

該方法主要是從 MetricCollector 中獲取指標信息,根據指標信息計算出須要擴縮的POD數。而後設置在 Decider 中。另外當 Decider 中 POD 指望值發生變化時會觸發 PodAutoscaler 從新調和的操做,關鍵代碼以下:this

......
if runner.updateLatestScale(desiredScale, excessBC) {
        m.Inform(metricKey)
    }
......

在KPA controller中設置調和Watch操做:阿里雲

......
    // Have the Deciders enqueue the PAs whose decisions have changed.
    deciders.Watch(impl.EnqueueKey)
......

指標採集

經過兩種方式收集POD指標:

  • PUSH 收集指標:經過暴露指標接口,外部服務(如Activitor)能夠調用該接口推送 metric 信息
  • PULL 收集指標:經過調用 Queue Proxy 服務接口收集指標。

PUSH 收集指標實現比較簡單,在main.go中 暴露服務,將接收到的 metric 推送到 MetricCollector 中:

// Set up a statserver.
    statsServer := statserver.New(statsServerAddr, statsCh, logger)
....
go func() {
        for sm := range statsCh {
            collector.Record(sm.Key, sm.Stat)
            multiScaler.Poke(sm.Key, sm.Stat)
        }
    }()

PULL 收集指標是如何收集的呢? 還記得上面提到的Metric資源吧,這裏接收到Metric資源又會建立出一個定時器,這個定時器每隔 1 秒會訪問 queue-proxy 9090 端口採集指標信息。關鍵代碼以下:

// newCollection creates a new collection, which uses the given scraper to
// collect stats every scrapeTickInterval.
func newCollection(metric *av1alpha1.Metric, scraper StatsScraper, logger *zap.SugaredLogger) *collection {
    c := &collection{
        metric:             metric,
        concurrencyBuckets: aggregation.NewTimedFloat64Buckets(BucketSize),
        rpsBuckets:         aggregation.NewTimedFloat64Buckets(BucketSize),
        scraper:            scraper,

        stopCh: make(chan struct{}),
    }

    logger = logger.Named("collector").With(
        zap.String(logkey.Key, fmt.Sprintf("%s/%s", metric.Namespace, metric.Name)))

    c.grp.Add(1)
    go func() {
        defer c.grp.Done()

        scrapeTicker := time.NewTicker(scrapeTickInterval)
        for {
            select {
            case <-c.stopCh:
                scrapeTicker.Stop()
                return
            case <-scrapeTicker.C:
                stat, err := c.getScraper().Scrape()
                if err != nil {
                    copy := metric.DeepCopy()
                    switch {
                    case err == ErrFailedGetEndpoints:
                        copy.Status.MarkMetricNotReady("NoEndpoints", ErrFailedGetEndpoints.Error())
                    case err == ErrDidNotReceiveStat:
                        copy.Status.MarkMetricFailed("DidNotReceiveStat", ErrDidNotReceiveStat.Error())
                    default:
                        copy.Status.MarkMetricNotReady("CreateOrUpdateFailed", "Collector has failed.")
                    }
                    logger.Errorw("Failed to scrape metrics", zap.Error(err))
                    c.updateMetric(copy)
                }
                if stat != emptyStat {
                    c.record(stat)
                }
            }
        }
    }()
    return c
}

兩縱

  • 0-1 擴容
  • 1-N 擴縮容

上面從KPA實現的 3個橫向角度進行了分析,KPA 實現了0-1擴容以及1-N 擴縮容,下面咱們從這兩個縱向的角度進一步分析。

咱們知道,在 Knative 中,流量經過兩種模式到達POD: Serve 模式和 Proxy 模式。

Proxy 模式: POD數爲 0 時(另外針對突發流量的場景也會切換到 Proxy 模式,這裏先不作詳細解讀),切換到 Proxy 模式。

Serve 模式:POD數不爲 0 時,切換成 Serve 模式。

那麼在何時進行模式的切換呢?在KPA中的代碼實現以下:

mode := nv1alpha1.SKSOperationModeServe
    // We put activator in the serving path in the following cases:
    // 1\. The revision is scaled to 0:
    //   a. want == 0
    //   b. want == -1 && PA is inactive (Autoscaler has no previous knowledge of
    //            this revision, e.g. after a restart) but PA status is inactive (it was
    //            already scaled to 0).
    // 2\. The excess burst capacity is negative.
    if want == 0 || decider.Status.ExcessBurstCapacity < 0 || want == -1 && pa.Status.IsInactive() {
        logger.Infof("SKS should be in proxy mode: want = %d, ebc = %d, PA Inactive? = %v",
            want, decider.Status.ExcessBurstCapacity, pa.Status.IsInactive())
        mode = nv1alpha1.SKSOperationModeProxy
    }

0-1 擴容

第一步:指標採集

在POD數爲0時,流量請求模式爲Proxy 模式。這時候流量是經過 Activitor 接管的,在 Activitor 中,會根據請求數的指標信息,經過WebSockt調用 KPA中提供的指標接口,將指標信息發送給 KPA 中的 MetricCollector。

在 Activitor 中 main 函數中,訪問 KPA 服務 代碼實現以下

// Open a WebSocket connection to the autoscaler.
    autoscalerEndpoint := fmt.Sprintf("ws://%s.%s.svc.%s%s", "autoscaler", system.Namespace(), pkgnet.GetClusterDomainName(), autoscalerPort)
    logger.Info("Connecting to Autoscaler at ", autoscalerEndpoint)
    statSink := websocket.NewDurableSendingConnection(autoscalerEndpoint, logger)
    go statReporter(statSink, ctx.Done(), statCh, logger)

經過 WebSockt 發送請求指標代碼實現:

func statReporter(statSink *websocket.ManagedConnection, stopCh <-chan struct{},
    statChan <-chan []autoscaler.StatMessage, logger *zap.SugaredLogger) {
    for {
        select {
        case sm := <-statChan:
            go func() {
                for _, msg := range sm {
                    if err := statSink.Send(msg); err != nil {
                        logger.Errorw("Error while sending stat", zap.Error(err))
                    }
                }
            }()
        case <-stopCh:
            // It's a sending connection, so no drainage required.
            statSink.Shutdown()
            return
        }
    }
}

第二步:根據指標計算 POD 數

在 Scale 方法中,根據 PUSH 獲取的指標信息,計算出指望的POD數。修改 Decider 指望 POD 值,觸發 PodAutoScaler 從新調和。

第三步:擴容

在KPA controller中,從新執行 reconcile 方法,執行 scaler 對當前Revision進行擴容操做。而後將流量模式切換成 Server 模式。最終實現 0-1 的擴容操做。

1-N 擴縮容

第一步:指標採集

在 POD 數不爲0時,流量請求模式爲 Server 模式。這時候會經過PULL 的方式訪問當前 revision 中全部 POD queue proxy 9090 端口,拉取業務指標信息, 訪問服務 URL 代碼實現以下:

...
func urlFromTarget(t, ns string) string {
    return fmt.Sprintf(
        "http://%s.%s:%d/metrics",
        t, ns, networking.AutoscalingQueueMetricsPort)
}

第二步:根據指標計算 POD 數

在 Scale 方法中,根據 PULL 獲取的指標信息,計算出指望的POD數。修改 Decider 指望 POD 值,觸發 PodAutoScaler 從新調和。

第三步: 擴縮容

在 KPA controller中,從新執行 reconcile 方法,執行 scaler 對當前Revision進行擴縮容操做。若是縮容爲 0 或者觸發突發流量場景,則將流量模式切換成 Proxy 模式。最終實現 1-N 擴縮容操做。

總結

相信經過上面的介紹,對Knative KPA的實現有了更深刻的理解,瞭解了其實現原理不只有助於咱們排查相關的問題,更在於咱們能夠基於這樣的擴縮容機制實現自定義的擴縮容組件,這也正是 Knative 自動擴縮容可擴展性靈魂所在。


本文做者:元毅

閱讀原文

本文爲阿里雲內容,未經容許不得轉載。

相關文章
相關標籤/搜索