ingress-controller 源碼分析

主要邏輯

ingress-controller 源碼分析

nginx controller 入口函數

// file:k8s.io/ingress-nginx/nginx/main.go
func main() {
    // step1: 初始化日誌組件
    klog.InitFlags(nil)

      ......

    // step2:建立必要的目錄
    err = file.CreateRequiredDirectories()
      ......

    // step 3 :初始化ApiserverClient
    kubeClient, err := createApiserverClient(conf.APIServerHost, conf.RootCAFile, conf.KubeConfigFile)
      ......

    // step4: 檢查service配置
    if len(conf.DefaultService) > 0 {
        err := checkService(conf.DefaultService, kubeClient)
          ......

        klog.Infof("Validated %v as the default backend.", conf.DefaultService)
    }

    if len(conf.PublishService) > 0 {
        err := checkService(conf.PublishService, kubeClient)
          ......
    }

    // step5:獲取namespace
    if conf.Namespace != "" {
        _, err = kubeClient.CoreV1().Namespaces().Get(context.TODO(), conf.Namespace, metav1.GetOptions{})
        if err != nil {
            klog.Fatalf("No namespace with name %v found: %v", conf.Namespace, err)
        }
    }

    // step6: 建立默認證書
    conf.FakeCertificate = ssl.GetFakeSSLCert()
    klog.Infof("SSL fake certificate created %v", conf.FakeCertificate.PemFileName)

    // step7: 檢查是否支持v1beta API 、k8s 版本是否高於1.18.0
    k8s.IsNetworkingIngressAvailable, k8s.IsIngressV1Ready = k8s.NetworkingIngressAvailable(kubeClient)
    if !k8s.IsNetworkingIngressAvailable {
        klog.Warningf("Using deprecated \"k8s.io/api/extensions/v1beta1\" package because Kubernetes version is < v1.14.0")
    }

    if k8s.IsIngressV1Ready {
          ......
    }

    conf.Client = kubeClient

    // step8: 註冊prometheus
    reg := prometheus.NewRegistry()

    reg.MustRegister(prometheus.NewGoCollector())
    reg.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{
        PidFn:        func() (int, error) { return os.Getpid(), nil },
        ReportErrors: true,
    }))
    ......

    // step9:啓動profile
    if conf.EnableProfiling {
        go registerProfiler()
    }

    // step10: 實例化nginxcontroller (*)
    ngx := controller.NewNGINXController(conf, mc)

    // step11: 啓動健康探測和metrics API
    mux := http.NewServeMux()
    registerHealthz(nginx.HealthPath, ngx, mux)
    registerMetrics(reg, mux)

    go startHTTPServer(conf.ListenPorts.Health, mux)

    // step12: 啓動nginx master進程
    go ngx.Start()
  ......
}

nginx controller 初始化

// NewNGINXController creates a new NGINX Ingress controller.
func NewNGINXController(config *Configuration, mc metric.Collector) *NGINXController {
    // 初始化 event broadcaster
    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartLogging(klog.Infof)
    eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{
        Interface: config.Client.CoreV1().Events(config.Namespace),
    })

    // 獲取/etc/resolv.conf 中的nameserver 列表
    h, err := dns.GetSystemNameServers()
    if err != nil {
        klog.Warningf("Error reading system nameservers: %v", err)
    }

    // 實例化NGINXController
    n := &NGINXController{
        isIPV6Enabled: ing_net.IsIPv6Enabled(),

        resolver:        h,
        cfg:             config,
        syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(config.SyncRateLimit, 1),

        recorder: eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
            Component: "nginx-ingress-controller",
        }),

        stopCh:   make(chan struct{}),
        updateCh: channels.NewRingChannel(1024),

        ngxErrCh: make(chan error),

        stopLock: &sync.Mutex{},

        runningConfig: new(ingress.Configuration),

        Proxy: &TCPProxy{},

        metricCollector: mc,

        command: NewNginxCommand(),
    }

    // 啓動webhook 服務
    if n.cfg.ValidationWebhook != "" {
        n.validationWebhookServer = &http.Server{
            Addr:      config.ValidationWebhook,
            Handler:   adm_controller.NewAdmissionControllerServer(&adm_controller.IngressAdmission{Checker: n}),
            TLSConfig: ssl.NewTLSListener(n.cfg.ValidationWebhookCertPath, n.cfg.ValidationWebhookKeyPath).TLSConfig(),
        }
    }

    // 獲取pod runtime信息
    pod, err := k8s.GetPodDetails(config.Client)
    if err != nil {
        klog.Fatalf("unexpected error obtaining pod information: %v", err)
    }
    n.podInfo = pod

    // 實例化store(本地緩存)
    n.store = store.New(
        config.Namespace,
        config.ConfigMapName,
        config.TCPConfigMapName,
        config.UDPConfigMapName,
        config.DefaultSSLCertificate,
        config.ResyncPeriod,
        config.Client,
        n.updateCh,
        pod,
        config.DisableCatchAll)

    // 建立同步隊列
    n.syncQueue = task.NewTaskQueue(n.syncIngress)

    ... ...

    // 格式化template配置模板
    onTemplateChange := func() {
        template, err := ngx_template.NewTemplate(nginx.TemplatePath)
        if err != nil {
            // this error is different from the rest because it must be clear why nginx is not working
            klog.Errorf(`
-------------------------------------------------------------------------------
Error loading new template: %v
-------------------------------------------------------------------------------
`, err)
            return
        }

        // 若模板格式化正確,則更新到nginxcontroller 對象中,並往同步隊列發送一個template-change事件
        n.t = template
        klog.Info("New NGINX configuration template loaded.")
        n.syncQueue.EnqueueTask(task.GetDummyObject("template-change"))
    }

    // 首次啓動加載配置模板文件
    ngxTpl, err := ngx_template.NewTemplate(nginx.TemplatePath)
    ......

    n.t = ngxTpl

    // 監聽模板文件變化
    // 監聽 /etc/nginx/template/nginx.tmpl 模板文件是否有變化,有變化則調用onTemplateChange
    _, err = watch.NewFileWatcher(nginx.TemplatePath, onTemplateChange)
 ... ...

    // 監聽/etc/nginx/geoip/ 目錄下配置文件變化
    filesToWatch := []string{}
    err = filepath.Walk("/etc/nginx/geoip/", func(path string, info os.FileInfo, err error) error {
        ......
        filesToWatch = append(filesToWatch, path)
        ......
    })

    ......

    for _, f := range filesToWatch {
        _, err = watch.NewFileWatcher(f, func() {
            klog.Infof("File %v changed. Reloading NGINX", f)
            // 配置文件有變化則往同步隊列發送一個file-change 事件
            n.syncQueue.EnqueueTask(task.GetDummyObject("file-change"))
        })
        ......
    }

    return n
}

ingress controller 結構體nginx

type NGINXController struct {
    // pod runtime 信息
    podInfo *k8s.PodInfo
    // 配置信息
    cfg *Configuration
    // 事件通知器
    recorder record.EventRecorder
    // 同步隊列
    syncQueue *task.Queue
    // 同步狀態
    syncStatus status.Syncer
    // 同步限流器
    syncRateLimiter flowcontrol.RateLimiter

    stopLock *sync.Mutex

    stopCh   chan struct{}
    // 更新環狀channel
    updateCh *channels.RingChannel

    // 接受nginx 錯誤信息channel
    ngxErrCh chan error

    // 當前配置文件
    runningConfig *ingress.Configuration
    // nginx 配置模板文件
    t ngx_template.TemplateWriter
    // nameserver 列表
    resolver []net.IP
    // 是否啓用ipv6
    isIPV6Enabled bool
    // 是否關閉
    isShuttingDown bool
    // TCP代理
    Proxy *TCPProxy
  // 本地緩存
    store store.Storer
  // metrics 收集器
    metricCollector metric.Collector
  // webhook
    validationWebhookServer *http.Server
    // nginx 二進制命令
    command NginxExecTester
}

ngx.Start()

ngx.Start() 主要作3個事情
啓動store 協程
啓動syncQueue協程
監聽updateChweb

當從updateCh 見到變化事件時,向syncQueue 發送一個taskjson

// file:internal/ingress/controller/nginx.go
// Start starts a new NGINX master process running in the foreground.
func (n *NGINXController) Start() {
    klog.Info("Starting NGINX Ingress controller")
    // 初始化同步informers 及secret
    n.store.Run(n.stopCh)

    // we need to use the defined ingress class to allow multiple leaders
    // in order to update information about ingress status
    // 定義節點選舉ID (ingress class 用於區分不一樣集羣)
    // 使用定義的ingress class 來容許多個leader節點更新ingress狀態
    electionID := fmt.Sprintf("%v-%v", n.cfg.ElectionID, class.DefaultClass)
    if class.IngressClass != "" {
        electionID = fmt.Sprintf("%v-%v", n.cfg.ElectionID, class.IngressClass)
    }

 // leader節點選舉
    setupLeaderElection(&leaderElectionConfig{
        ......
    })

    cmd := n.command.ExecCommand()

    ......

    if n.cfg.EnableSSLPassthrough {
        n.setupSSLProxy()
    }

    // 啓動nginx
    klog.Info("Starting NGINX process")
    n.start(cmd)

    // 啓動同步隊列
    go n.syncQueue.Run(time.Second, n.stopCh)

    // force initial sync
    // 發送initial-sync 事件
    n.syncQueue.EnqueueTask(task.GetDummyObject("initial-sync"))

    // In case of error the temporal configuration file will
    // be available up to five minutes after the error
    // 每隔5分鐘刪除臨時配置文件
    go func() {
        for {
            time.Sleep(5 * time.Minute)
            err := cleanTempNginxCfg()
            ......
        }
    }()

    ......

    for {
        select {
        case err := <-n.ngxErrCh:
            if n.isShuttingDown {
                return
            }

            // if the nginx master process dies, the workers continue to process requests
            // until the failure of the configured livenessProbe and restart of the pod.
            // master 進程掛掉時,workerInc進程將繼續處理請求,直到配置的liveness探針探測失敗
            if process.IsRespawnIfRequired(err) {
                return
            }

        // 循環從updateCh裏面獲取事件
        case event := <-n.updateCh.Out():
            if n.isShuttingDown {
                break
            }

            if evt, ok := event.(store.Event); ok {
                klog.V(3).Infof("Event %v received - object %v", evt.Type, evt.Obj)
                if evt.Type == store.ConfigurationEvent {
                    // TODO: is this necessary? Consider removing this special case
                    n.syncQueue.EnqueueTask(task.GetDummyObject("configmap-change"))
                    continue
                }
                // 放入可忽略的同步隊列
                n.syncQueue.EnqueueSkippableTask(evt.Obj)
            } else {
                klog.Warningf("Unexpected event type received %T", event)
            }
        case <-n.stopCh:
            return
        }
    }
}

事件類型api

const (
    // CreateEvent event associated with new objects in an informer
    CreateEvent EventType = "CREATE"
    // UpdateEvent event associated with an object update in an informer
    UpdateEvent EventType = "UPDATE"
    // DeleteEvent event associated when an object is removed from an informer
    DeleteEvent EventType = "DELETE"
    // ConfigurationEvent event associated when a controller configuration object is created or updated
    ConfigurationEvent EventType = "CONFIGURATION"
)

同步隊列
結構體緩存

// Queue manages a time work queue through an independent worker that invokes the
// given sync function for every work item inserted.
// The queue uses an internal timestamp that allows the removal of certain elements
// which timestamp is older than the last successful get operation.
type Queue struct {
    // queue is the work queue the worker polls
    queue workqueue.RateLimitingInterface
    // sync is called for each item in the queue
    sync func(interface{}) error
    // workerDone is closed when the worker exits
    workerDone chan bool
    // fn makes a key for an API object
    fn func(obj interface{}) (interface{}, error)
    // lastSync is the Unix epoch time of the last execution of 'sync'
    lastSync int64
}

隊列類型
(1) 可忽略隊列 EnqueueSkippableTask
(2) 不可忽略隊列app

// EnqueueTask enqueues ns/name of the given api object in the task queue.
func (t *Queue) EnqueueTask(obj interface{}) {
    t.enqueue(obj, false)
}

// EnqueueSkippableTask enqueues ns/name of the given api object in
// the task queue that can be skipped
func (t *Queue) EnqueueSkippableTask(obj interface{}) {
    t.enqueue(obj, true)
}

// 入隊列
// enqueue enqueues ns/name of the given api object in the task queue.
func (t *Queue) enqueue(obj interface{}, skippable bool) {
    if t.IsShuttingDown() {
        klog.Errorf("queue has been shutdown, failed to enqueue: %v", obj)
        return
    }

    ts := time.Now().UnixNano()
    if !skippable {
        // make sure the timestamp is bigger than lastSync
        ts = time.Now().Add(24 * time.Hour).UnixNano()
    }
    klog.V(3).Infof("queuing item %v", obj)
    key, err := t.fn(obj)
    if err != nil {
        klog.Errorf("%v", err)
        return
    }
    t.queue.Add(Element{
        Key:       key,
        Timestamp: ts,
    })
}

store 協程tcp

// file : k8s.io/ingress-nginx/internal/controller/store/store.go
// Run initiates the synchronization of the informers and the initial
// synchronization of the secrets.
func (s *k8sStore) Run(stopCh chan struct{}) {
    // start informers
    s.informers.Run(stopCh)
}

調用了informers.Run()方法
起多個協程去監聽ingress、secret、endpoint、service、configmap、pod 的變化ide

// Run initiates the synchronization of the informers against the API server.
func (i *Informer) Run(stopCh chan struct{}) {
    // 啓動secret、endpoint、service、configmap、pod 的informer
    go i.Secret.Run(stopCh)
    go i.Endpoint.Run(stopCh)
    go i.Service.Run(stopCh)
    go i.ConfigMap.Run(stopCh)
    go i.Pod.Run(stopCh)

        ......

    time.Sleep(1 * time.Second)

    go i.Ingress.Run(stopCh)
    ......
}

這裏以監聽 ingress 變化爲例,接着分析具體實現函數

// New creates a new object store to be used in the ingress controller
func New(
    namespace, configmap, tcp, udp, defaultSSLCertificate string,
    resyncPeriod time.Duration,
    client clientset.Interface,
    updateCh *channels.RingChannel,
    pod *k8s.PodInfo,
    disableCatchAll bool) Storer {

    store := &k8sStore{
        informers:             &Informer{},
        listers:               &Lister{},
        sslStore:              NewSSLCertTracker(),
        updateCh:              updateCh,
        backendConfig:         ngx_config.NewDefault(),
        syncSecretMu:          &sync.Mutex{},
        backendConfigMu:       &sync.RWMutex{},
        secretIngressMap:      NewObjectRefMap(),
        defaultSSLCertificate: defaultSSLCertificate,
        pod:                   pod,
    }

    ......

    // k8sStore fulfills resolver.Resolver interface
    // 格式化annotation
    store.annotations = annotations.NewAnnotationExtractor(store)

    store.listers.IngressWithAnnotation.Store = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)

    ......

    // create informers factory, enable and assign required informers
    // informer 工廠函數
    infFactory := informers.NewSharedInformerFactoryWithOptions(client, resyncPeriod,
        informers.WithNamespace(namespace),
        informers.WithTweakListOptions(tweakListOptionsFunc))

    if k8s.IsNetworkingIngressAvailable {
        store.informers.Ingress = infFactory.Networking().V1beta1().Ingresses().Informer()
    } else {
        store.informers.Ingress = infFactory.Extensions().V1beta1().Ingresses().Informer()
    }

    store.listers.Ingress.Store = store.informers.Ingress.GetStore()

    store.informers.Endpoint = infFactory.Core().V1().Endpoints().Informer()
    store.listers.Endpoint.Store = store.informers.Endpoint.GetStore()

    store.informers.Secret = infFactory.Core().V1().Secrets().Informer()
    store.listers.Secret.Store = store.informers.Secret.GetStore()

    store.informers.ConfigMap = infFactory.Core().V1().ConfigMaps().Informer()
    store.listers.ConfigMap.Store = store.informers.ConfigMap.GetStore()

    store.informers.Service = infFactory.Core().V1().Services().Informer()
    store.listers.Service.Store = store.informers.Service.GetStore()

    labelSelector := labels.SelectorFromSet(store.pod.Labels)

    // list and watch 機制
    store.informers.Pod = cache.NewSharedIndexInformer(
        &cache.ListWatch{
            ListFunc: func(options metav1.ListOptions) (k8sruntime.Object, error) {
                options.LabelSelector = labelSelector.String()
                return client.CoreV1().Pods(store.pod.Namespace).List(context.TODO(), options)
            },
            WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                options.LabelSelector = labelSelector.String()
                return client.CoreV1().Pods(store.pod.Namespace).Watch(context.TODO(), options)
            },
        },
        &corev1.Pod{},
        resyncPeriod,
        cache.Indexers{},
    )
    store.listers.Pod.Store = store.informers.Pod.GetStore()

    ingDeleteHandler := func(obj interface{}) {
        ing, ok := toIngress(obj)
        if !ok {
            // If we reached here it means the ingress was deleted but its final state is unrecorded.
            tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
            if !ok {
                klog.Errorf("couldn't get object from tombstone %#v", obj)
                return
            }
            ing, ok = tombstone.Obj.(*networkingv1beta1.Ingress)
            if !ok {
                klog.Errorf("Tombstone contained object that is not an Ingress: %#v", obj)
                return
            }
        }

        if !class.IsValid(ing) {
            klog.Infof("ignoring delete for ingress %v based on annotation %v", ing.Name, class.IngressKey)
            return
        }
        if isCatchAllIngress(ing.Spec) && disableCatchAll {
            klog.Infof("ignoring delete for catch-all ingress %v/%v because of --disable-catch-all", ing.Namespace, ing.Name)
            return
        }
        recorder.Eventf(ing, corev1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name))

        store.listers.IngressWithAnnotation.Delete(ing)

        key := k8s.MetaNamespaceKey(ing)
        store.secretIngressMap.Delete(key)

        updateCh.In() <- Event{
            Type: DeleteEvent,
            Obj:  obj,
        }
    }

    ingEventHandler := cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            ing, _ := toIngress(obj)
            if !class.IsValid(ing) {
                a, _ := parser.GetStringAnnotation(class.IngressKey, ing)
                klog.Infof("ignoring add for ingress %v based on annotation %v with value %v", ing.Name, class.IngressKey, a)
                return
            }
            if isCatchAllIngress(ing.Spec) && disableCatchAll {
                klog.Infof("ignoring add for catch-all ingress %v/%v because of --disable-catch-all", ing.Namespace, ing.Name)
                return
            }
            recorder.Eventf(ing, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name))

            store.syncIngress(ing)
            store.updateSecretIngressMap(ing)
            store.syncSecrets(ing)

            updateCh.In() <- Event{
                Type: CreateEvent,
                Obj:  obj,
            }
        },
        DeleteFunc: ingDeleteHandler,
        UpdateFunc: func(old, cur interface{}) {
            oldIng, _ := toIngress(old)
            curIng, _ := toIngress(cur)

            validOld := class.IsValid(oldIng)
            validCur := class.IsValid(curIng)
            if !validOld && validCur {
                if isCatchAllIngress(curIng.Spec) && disableCatchAll {
                    klog.Infof("ignoring update for catch-all ingress %v/%v because of --disable-catch-all", curIng.Namespace, curIng.Name)
                    return
                }

                klog.Infof("creating ingress %v based on annotation %v", curIng.Name, class.IngressKey)
                recorder.Eventf(curIng, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
            } else if validOld && !validCur {
                klog.Infof("removing ingress %v based on annotation %v", curIng.Name, class.IngressKey)
                ingDeleteHandler(old)
                return
            } else if validCur && !reflect.DeepEqual(old, cur) {
                if isCatchAllIngress(curIng.Spec) && disableCatchAll {
                    klog.Infof("ignoring update for catch-all ingress %v/%v and delete old one because of --disable-catch-all", curIng.Namespace, curIng.Name)
                    ingDeleteHandler(old)
                    return
                }

                recorder.Eventf(curIng, corev1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
            } else {
                klog.V(3).Infof("No changes on ingress %v/%v. Skipping update", curIng.Namespace, curIng.Name)
                return
            }

            store.syncIngress(curIng)
            store.updateSecretIngressMap(curIng)
            store.syncSecrets(curIng)

            updateCh.In() <- Event{
                Type: UpdateEvent,
                Obj:  cur,
            }
        },
    }

    secrEventHandler := cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {...},
        UpdateFunc: func(old, cur interface{}) {...},
        DeleteFunc: func(obj interface{}) {...},
    }

    epEventHandler := cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {...},
        DeleteFunc: func(obj interface{}) {...},
        UpdateFunc: func(old, cur interface{}) {...},
    }

    ......

    cmEventHandler := cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {...},
        UpdateFunc: func(old, cur interface{}) {...},
    }

    podEventHandler := cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {...},
        UpdateFunc: func(old, cur interface{}) {...},
        DeleteFunc: func(obj interface{}) {...},
    }

    serviceHandler := cache.ResourceEventHandlerFuncs{
        UpdateFunc: func(old, cur interface{}) {...},
    }

 // 註冊各類類型的eventHandler
    store.informers.Ingress.AddEventHandler(ingEventHandler)
    store.informers.Endpoint.AddEventHandler(epEventHandler)
    store.informers.Secret.AddEventHandler(secrEventHandler)
    store.informers.ConfigMap.AddEventHandler(cmEventHandler)
    store.informers.Service.AddEventHandler(serviceHandler)
    store.informers.Pod.AddEventHandler(podEventHandler)

    // do not wait for informers to read the configmap configuration
    ns, name, _ := k8s.ParseNameNS(configmap)
    cm, err := client.CoreV1().ConfigMaps(ns).Get(context.TODO(), name, metav1.GetOptions{})
    if err != nil {
        klog.Warningf("Unexpected error reading configuration configmap: %v", err)
    }

    store.setConfig(cm)
    return store
}

能夠看到,每種類型的informer 基本都有相關的回調方法,包括:
AddFunc: func(obj interface{}) {...},
UpdateFunc: func(old, cur interface{}) {...},
DeleteFunc: func(obj interface{}) {...},oop

每一個方法裏面都會往updateCh 寫入不一樣類型的事件(CreateEvent、DeleteEvent、UpdateEvent)
這一步跟store 協程協同工做,informer 經過list&watch 方法監聽資源變化,一旦資源有變化則向updateCh 裏面寫入事件,store 協程循環監聽updateCh變化,一旦收到事件則往syncQueue 寫入一個task

隊列消費

// file : k8s.io/ingress-controller/internal/ingress/controller/nginx.go
// 初始化Queue
n.syncQueue = task.NewTaskQueue(n.syncIngress)

// NewTaskQueue creates a new task queue with the given sync function.
// The sync function is called for every element inserted into the queue.
// 對於每一個插入進來的項目都會調用sync function
func NewTaskQueue(syncFn func(interface{}) error) *Queue {
    return NewCustomTaskQueue(syncFn, nil)
}

// NewCustomTaskQueue  
func NewCustomTaskQueue(syncFn func(interface{}) error, fn func(interface{}) (interface{}, error)) *Queue {
    // syncFn(也就是syncIngress)被賦值到Queue.sync 
    q := &Queue{
        queue:      workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
        sync:       syncFn,
        workerDone: make(chan bool),
        fn:         fn,
    }

    if fn == nil {
        q.fn = q.defaultKeyFunc
    }

    return q
}

消費Queue隊列
核心方法:
t.queue.Get() -> t.sync()

// file: k8s.io/ingress-nginx/internal/ingress/controller/nginx.go
func (n *NGINXController) Start() {
    ......
    go n.syncQueue.Run(time.Second, n.stopCh)
    ......
}

// file: k8s.io/ingress-nginx/internal/task/queue.go
// Run starts processing elements in the queue
func (t *Queue) Run(period time.Duration, stopCh <-chan struct{}) {
   wait.Until(t.worker, period, stopCh)
}

// worker processes work in the queue through sync.
// 消費Queue隊列
func (t *Queue) worker() {
   for {
      key, quit := t.queue.Get()
      ......
      ts := time.Now().UnixNano()

      item := key.(Element)
      // 比對最後一次同步的時間戳與Queue中取出item裏面帶的時間戳,若是小於最後一次同步時間戳則忽略改變動
      if t.lastSync > item.Timestamp {
         klog.V(3).Infof("skipping %v sync (%v > %v)", item.Key, t.lastSync, item.Timestamp)
         t.queue.Forget(key)
         t.queue.Done(key)
         continue
      }

      klog.V(3).Infof("syncing %v", item.Key)
      // 調用syncIngress
      if err := t.sync(key); err != nil {
         klog.Warningf("requeuing %v, err %v", item.Key, err)
         t.queue.AddRateLimited(Element{
            Key:       item.Key,
            Timestamp: time.Now().UnixNano(),
         })
      } else {
         t.queue.Forget(key)
         t.lastSync = ts
      }

      t.queue.Done(key)
   }
}

syncIngress 工做原理

比對線上在跑的配置跟新生成的配置是否相同,並判斷是否可以動態重載配置(僅更新endpoint),減小nginx頻繁reload帶來性能損耗.
pcfg :當前格式化出來的配置
n.runningConfig : 當前線上環境運行的配置

比對pcfg 和 n.runningConfig 配置,判斷是否能夠動態更新配置(僅endpoint列表變化)
(1)支持動態更新配置:調用n.configureDynamically(pcfg)
將backend 列表以json格式post 到/configuration/backends 這個LUA Handler,動態更新endpoint 列表

(2)不支持動態更新配置,調用 n.OnUpdate(*pcfg)
生成臨時配置文件
檢測臨時配置文件語法
diff 臨時配置文件與當前線上配置文件
刪除臨時配置文件
將新生成的配置寫入線上配置文件
執行nginx -s reload 重載配置

// file: k8s.io/ingress-nginx/internal/ingress/controller/controller.go
// syncIngress collects all the pieces required to assemble the NGINX
// configuration file and passes the resulting data structures to the backend
// (OnUpdate) when a reload is deemed necessary.
// 組裝nginx 配置文件
// 須要reload 時,調用OnUpdate
func (n *NGINXController) syncIngress(interface{}) error {
    ......
   ings := n.store.ListIngresses(nil)
   // 格式化新配置
   hosts, servers, pcfg := n.getConfiguration(ings)
   ......

   // 判斷配置是否有變化
   if n.runningConfig.Equal(pcfg) {
      klog.V(3).Infof("No configuration change detected, skipping backend reload.")
      return nil
   }

   ......

   // 配置有變化,則判斷是否須要reload nginx
   if !n.IsDynamicConfigurationEnough(pcfg) {
      klog.Infof("Configuration changes detected, backend reload required.")

      // 生成checksum hash值
      hash, _ := hashstructure.Hash(pcfg, &hashstructure.HashOptions{
         TagName: "json",
      })

      pcfg.ConfigurationChecksum = fmt.Sprintf("%v", hash)

      //調用onUpdate 方法
      err := n.OnUpdate(*pcfg)
      ......

      klog.Infof("Backend successfully reloaded.")
      ......
   }

   // 是否首次同步(ingress.Configuration 結構體是否爲空)
   isFirstSync := n.runningConfig.Equal(&ingress.Configuration{})
   if isFirstSync {
      // For the initial sync it always takes some time for NGINX to start listening
      // For large configurations it might take a while so we loop and back off
      // 首次初始化須要耗費必定的時間,睡眠1秒
      klog.Info("Initial sync, sleeping for 1 second.")
      time.Sleep(1 * time.Second)
   }

   // 重試機制
   retry := wait.Backoff{
      Steps:    15,
      Duration: 1 * time.Second,
      Factor:   0.8,
      Jitter:   0.1,
   }

   err := wait.ExponentialBackoff(retry, func() (bool, error) {
      // 動態更新nginx 配置
      err := n.configureDynamically(pcfg)
      if err == nil {
         klog.V(2).Infof("Dynamic reconfiguration succeeded.")
         return true, nil
      }

      klog.Warningf("Dynamic reconfiguration failed: %v", err)
      return false, err
   })
   ......

   n.runningConfig = pcfg

   return nil
}

判斷是否能夠動態更新配置
不須要reload的場景

  1. endpoint 變化

須要reload的場景

  1. 新增ingress
  2. 新增證書配置
  3. ingress 增長/刪除 PATH
  4. 刪除ingress、service、secret
  5. Secret 更新
  6. 部分annotation變動,形成上述狀態更新
// file: k8s.io/ingress-contoller/internal/ingress/controller/nginx.go
// IsDynamicConfigurationEnough returns whether a Configuration can be
// dynamically applied, without reloading the backend.
// 判斷是否nginx 能夠動態重載,不須要執行reload
func (n *NGINXController) IsDynamicConfigurationEnough(pcfg *ingress.Configuration) bool {
   copyOfRunningConfig := *n.runningConfig
   copyOfPcfg := *pcfg

   copyOfRunningConfig.Backends = []*ingress.Backend{}
   copyOfPcfg.Backends = []*ingress.Backend{}

   clearL4serviceEndpoints(©OfRunningConfig)
   clearL4serviceEndpoints(©OfPcfg)

   copyOfRunningConfig.ControllerPodsCount = 0
   copyOfPcfg.ControllerPodsCount = 0

   clearCertificates(©OfRunningConfig)
   clearCertificates(©OfPcfg)

   return copyOfRunningConfig.Equal(©OfPcfg)
}

不能動態更新,調用nginx reload 重載配置

// OnUpdate is called by the synchronization loop whenever configuration
// changes were detected. The received backend Configuration is merged with the
// configuration ConfigMap before generating the final configuration file.
// Returns nil in case the backend was successfully reloaded.
// 當監聽到配置發生變化,同步循環將調用OnUdate
// 接收到的backend 配置會跟當前配置的configmap 進行合併
func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
   cfg := n.store.GetBackendConfiguration()
   cfg.Resolver = n.resolver

   // 生成臨時配置
   content, err := n.generateTemplate(cfg, ingressCfg)
   ......

   // 檢查配置是否正確
   err = n.testTemplate(content)
   ......

   if klog.V(2) {
      src, _ := ioutil.ReadFile(cfgPath)
      if !bytes.Equal(src, content) {
         tmpfile, err := ioutil.TempFile("", "new-nginx-cfg")
         if err != nil {
            return err
         }
         defer tmpfile.Close()
         // 建立臨時配置文件
         err = ioutil.WriteFile(tmpfile.Name(), content, file.ReadWriteByUser)
         ......
         // diff 比對生成的臨時配置跟當前生效配置
         diffOutput, err := exec.Command("diff", "-I", "'# Configuration.*'", "-u", cfgPath, tmpfile.Name()).CombinedOutput()
         ......

         klog.Infof("NGINX configuration diff:\n%v", string(diffOutput))

         // 刪除臨時配置文件
         os.Remove(tmpfile.Name())
      }
   }
   // 將新配置寫入cfgPath
   err = ioutil.WriteFile(cfgPath, content, file.ReadWriteByUser)
   ......

   // reload nginx
   o, err := n.command.ExecCommand("-s", "reload").CombinedOutput()
   ......

   return nil
}

動態更新

// file: k8s.io/ingress-contoller/internal/ingress/controller/nginx.go
// configureDynamically encodes new Backends in JSON format and POSTs the
// payload to an internal HTTP endpoint handled by Lua.
// 以json 的格式封裝backend 列表並post 到lua API
func (n *NGINXController) configureDynamically(pcfg *ingress.Configuration) error {
   backendsChanged := !reflect.DeepEqual(n.runningConfig.Backends, pcfg.Backends)
   if backendsChanged {
       // 更新endpoint 列表
      err := configureBackends(pcfg.Backends)
      ......
   }

   // 比對TCP/UDP endpoint 列表
   streamConfigurationChanged := !reflect.DeepEqual(n.runningConfig.TCPEndpoints, pcfg.TCPEndpoints) || !reflect.DeepEqual(n.runningConfig.UDPEndpoints, pcfg.UDPEndpoints)
   if streamConfigurationChanged {
      err := updateStreamConfiguration(pcfg.TCPEndpoints, pcfg.UDPEndpoints)
      ......
   }

   if n.runningConfig.ControllerPodsCount != pcfg.ControllerPodsCount {
      // post pod 數目
      statusCode, _, err := nginx.NewPostStatusRequest("/configuration/general", "application/json", ingress.GeneralConfig{
         ControllerPodsCount: pcfg.ControllerPodsCount,
      })
      ......
   }
   // 比對servers 變化
   serversChanged := !reflect.DeepEqual(n.runningConfig.Servers, pcfg.Servers)
   if serversChanged {
      err := configureCertificates(pcfg.Servers)
      ......
   }

   return nil
}

以JSON 格式 POST 調用LUA Handler /configuration/backends

// file: k8s.io/ingress-nginx/internal/controller/nginx.go
func configureBackends(rawBackends []*ingress.Backend) error {
   backends := make([]*ingress.Backend, len(rawBackends))

   for i, backend := range rawBackends {
      var service *apiv1.Service
      if backend.Service != nil {
         service = &apiv1.Service{Spec: backend.Service.Spec}
      }
      luaBackend := &ingress.Backend{
         Name:                 backend.Name,
         Port:                 backend.Port,
         SSLPassthrough:       backend.SSLPassthrough,
         SessionAffinity:      backend.SessionAffinity,
         UpstreamHashBy:       backend.UpstreamHashBy,
         LoadBalancing:        backend.LoadBalancing,
         Service:              service,
         NoServer:             backend.NoServer,
         TrafficShapingPolicy: backend.TrafficShapingPolicy,
         AlternativeBackends:  backend.AlternativeBackends,
      }

      var endpoints []ingress.Endpoint
      for _, endpoint := range backend.Endpoints {
         endpoints = append(endpoints, ingress.Endpoint{
            Address: endpoint.Address,
            Port:    endpoint.Port,
         })
      }

      luaBackend.Endpoints = endpoints
      backends[i] = luaBackend
   }

    // 更新endpoint 列表
   statusCode, _, err := nginx.NewPostStatusRequest("/configuration/backends", "application/json", backends)
   if err != nil {
      return err
   }

   if statusCode != http.StatusCreated {
      return fmt.Errorf("unexpected error code: %d", statusCode)
   }

   return nil
}
相關文章
相關標籤/搜索