// file:k8s.io/ingress-nginx/nginx/main.go func main() { // step1: 初始化日誌組件 klog.InitFlags(nil) ...... // step2:建立必要的目錄 err = file.CreateRequiredDirectories() ...... // step 3 :初始化ApiserverClient kubeClient, err := createApiserverClient(conf.APIServerHost, conf.RootCAFile, conf.KubeConfigFile) ...... // step4: 檢查service配置 if len(conf.DefaultService) > 0 { err := checkService(conf.DefaultService, kubeClient) ...... klog.Infof("Validated %v as the default backend.", conf.DefaultService) } if len(conf.PublishService) > 0 { err := checkService(conf.PublishService, kubeClient) ...... } // step5:獲取namespace if conf.Namespace != "" { _, err = kubeClient.CoreV1().Namespaces().Get(context.TODO(), conf.Namespace, metav1.GetOptions{}) if err != nil { klog.Fatalf("No namespace with name %v found: %v", conf.Namespace, err) } } // step6: 建立默認證書 conf.FakeCertificate = ssl.GetFakeSSLCert() klog.Infof("SSL fake certificate created %v", conf.FakeCertificate.PemFileName) // step7: 檢查是否支持v1beta API 、k8s 版本是否高於1.18.0 k8s.IsNetworkingIngressAvailable, k8s.IsIngressV1Ready = k8s.NetworkingIngressAvailable(kubeClient) if !k8s.IsNetworkingIngressAvailable { klog.Warningf("Using deprecated \"k8s.io/api/extensions/v1beta1\" package because Kubernetes version is < v1.14.0") } if k8s.IsIngressV1Ready { ...... } conf.Client = kubeClient // step8: 註冊prometheus reg := prometheus.NewRegistry() reg.MustRegister(prometheus.NewGoCollector()) reg.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{ PidFn: func() (int, error) { return os.Getpid(), nil }, ReportErrors: true, })) ...... // step9:啓動profile if conf.EnableProfiling { go registerProfiler() } // step10: 實例化nginxcontroller (*) ngx := controller.NewNGINXController(conf, mc) // step11: 啓動健康探測和metrics API mux := http.NewServeMux() registerHealthz(nginx.HealthPath, ngx, mux) registerMetrics(reg, mux) go startHTTPServer(conf.ListenPorts.Health, mux) // step12: 啓動nginx master進程 go ngx.Start() ...... }
// NewNGINXController creates a new NGINX Ingress controller. func NewNGINXController(config *Configuration, mc metric.Collector) *NGINXController { // 初始化 event broadcaster eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{ Interface: config.Client.CoreV1().Events(config.Namespace), }) // 獲取/etc/resolv.conf 中的nameserver 列表 h, err := dns.GetSystemNameServers() if err != nil { klog.Warningf("Error reading system nameservers: %v", err) } // 實例化NGINXController n := &NGINXController{ isIPV6Enabled: ing_net.IsIPv6Enabled(), resolver: h, cfg: config, syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(config.SyncRateLimit, 1), recorder: eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{ Component: "nginx-ingress-controller", }), stopCh: make(chan struct{}), updateCh: channels.NewRingChannel(1024), ngxErrCh: make(chan error), stopLock: &sync.Mutex{}, runningConfig: new(ingress.Configuration), Proxy: &TCPProxy{}, metricCollector: mc, command: NewNginxCommand(), } // 啓動webhook 服務 if n.cfg.ValidationWebhook != "" { n.validationWebhookServer = &http.Server{ Addr: config.ValidationWebhook, Handler: adm_controller.NewAdmissionControllerServer(&adm_controller.IngressAdmission{Checker: n}), TLSConfig: ssl.NewTLSListener(n.cfg.ValidationWebhookCertPath, n.cfg.ValidationWebhookKeyPath).TLSConfig(), } } // 獲取pod runtime信息 pod, err := k8s.GetPodDetails(config.Client) if err != nil { klog.Fatalf("unexpected error obtaining pod information: %v", err) } n.podInfo = pod // 實例化store(本地緩存) n.store = store.New( config.Namespace, config.ConfigMapName, config.TCPConfigMapName, config.UDPConfigMapName, config.DefaultSSLCertificate, config.ResyncPeriod, config.Client, n.updateCh, pod, config.DisableCatchAll) // 建立同步隊列 n.syncQueue = task.NewTaskQueue(n.syncIngress) ... ... // 格式化template配置模板 onTemplateChange := func() { template, err := ngx_template.NewTemplate(nginx.TemplatePath) if err != nil { // this error is different from the rest because it must be clear why nginx is not working klog.Errorf(` ------------------------------------------------------------------------------- Error loading new template: %v ------------------------------------------------------------------------------- `, err) return } // 若模板格式化正確,則更新到nginxcontroller 對象中,並往同步隊列發送一個template-change事件 n.t = template klog.Info("New NGINX configuration template loaded.") n.syncQueue.EnqueueTask(task.GetDummyObject("template-change")) } // 首次啓動加載配置模板文件 ngxTpl, err := ngx_template.NewTemplate(nginx.TemplatePath) ...... n.t = ngxTpl // 監聽模板文件變化 // 監聽 /etc/nginx/template/nginx.tmpl 模板文件是否有變化,有變化則調用onTemplateChange _, err = watch.NewFileWatcher(nginx.TemplatePath, onTemplateChange) ... ... // 監聽/etc/nginx/geoip/ 目錄下配置文件變化 filesToWatch := []string{} err = filepath.Walk("/etc/nginx/geoip/", func(path string, info os.FileInfo, err error) error { ...... filesToWatch = append(filesToWatch, path) ...... }) ...... for _, f := range filesToWatch { _, err = watch.NewFileWatcher(f, func() { klog.Infof("File %v changed. Reloading NGINX", f) // 配置文件有變化則往同步隊列發送一個file-change 事件 n.syncQueue.EnqueueTask(task.GetDummyObject("file-change")) }) ...... } return n }
ingress controller 結構體nginx
type NGINXController struct { // pod runtime 信息 podInfo *k8s.PodInfo // 配置信息 cfg *Configuration // 事件通知器 recorder record.EventRecorder // 同步隊列 syncQueue *task.Queue // 同步狀態 syncStatus status.Syncer // 同步限流器 syncRateLimiter flowcontrol.RateLimiter stopLock *sync.Mutex stopCh chan struct{} // 更新環狀channel updateCh *channels.RingChannel // 接受nginx 錯誤信息channel ngxErrCh chan error // 當前配置文件 runningConfig *ingress.Configuration // nginx 配置模板文件 t ngx_template.TemplateWriter // nameserver 列表 resolver []net.IP // 是否啓用ipv6 isIPV6Enabled bool // 是否關閉 isShuttingDown bool // TCP代理 Proxy *TCPProxy // 本地緩存 store store.Storer // metrics 收集器 metricCollector metric.Collector // webhook validationWebhookServer *http.Server // nginx 二進制命令 command NginxExecTester }
ngx.Start() 主要作3個事情
啓動store 協程
啓動syncQueue協程
監聽updateChweb
當從updateCh 見到變化事件時,向syncQueue 發送一個taskjson
// file:internal/ingress/controller/nginx.go // Start starts a new NGINX master process running in the foreground. func (n *NGINXController) Start() { klog.Info("Starting NGINX Ingress controller") // 初始化同步informers 及secret n.store.Run(n.stopCh) // we need to use the defined ingress class to allow multiple leaders // in order to update information about ingress status // 定義節點選舉ID (ingress class 用於區分不一樣集羣) // 使用定義的ingress class 來容許多個leader節點更新ingress狀態 electionID := fmt.Sprintf("%v-%v", n.cfg.ElectionID, class.DefaultClass) if class.IngressClass != "" { electionID = fmt.Sprintf("%v-%v", n.cfg.ElectionID, class.IngressClass) } // leader節點選舉 setupLeaderElection(&leaderElectionConfig{ ...... }) cmd := n.command.ExecCommand() ...... if n.cfg.EnableSSLPassthrough { n.setupSSLProxy() } // 啓動nginx klog.Info("Starting NGINX process") n.start(cmd) // 啓動同步隊列 go n.syncQueue.Run(time.Second, n.stopCh) // force initial sync // 發送initial-sync 事件 n.syncQueue.EnqueueTask(task.GetDummyObject("initial-sync")) // In case of error the temporal configuration file will // be available up to five minutes after the error // 每隔5分鐘刪除臨時配置文件 go func() { for { time.Sleep(5 * time.Minute) err := cleanTempNginxCfg() ...... } }() ...... for { select { case err := <-n.ngxErrCh: if n.isShuttingDown { return } // if the nginx master process dies, the workers continue to process requests // until the failure of the configured livenessProbe and restart of the pod. // master 進程掛掉時,workerInc進程將繼續處理請求,直到配置的liveness探針探測失敗 if process.IsRespawnIfRequired(err) { return } // 循環從updateCh裏面獲取事件 case event := <-n.updateCh.Out(): if n.isShuttingDown { break } if evt, ok := event.(store.Event); ok { klog.V(3).Infof("Event %v received - object %v", evt.Type, evt.Obj) if evt.Type == store.ConfigurationEvent { // TODO: is this necessary? Consider removing this special case n.syncQueue.EnqueueTask(task.GetDummyObject("configmap-change")) continue } // 放入可忽略的同步隊列 n.syncQueue.EnqueueSkippableTask(evt.Obj) } else { klog.Warningf("Unexpected event type received %T", event) } case <-n.stopCh: return } } }
事件類型api
const ( // CreateEvent event associated with new objects in an informer CreateEvent EventType = "CREATE" // UpdateEvent event associated with an object update in an informer UpdateEvent EventType = "UPDATE" // DeleteEvent event associated when an object is removed from an informer DeleteEvent EventType = "DELETE" // ConfigurationEvent event associated when a controller configuration object is created or updated ConfigurationEvent EventType = "CONFIGURATION" )
同步隊列
結構體緩存
// Queue manages a time work queue through an independent worker that invokes the // given sync function for every work item inserted. // The queue uses an internal timestamp that allows the removal of certain elements // which timestamp is older than the last successful get operation. type Queue struct { // queue is the work queue the worker polls queue workqueue.RateLimitingInterface // sync is called for each item in the queue sync func(interface{}) error // workerDone is closed when the worker exits workerDone chan bool // fn makes a key for an API object fn func(obj interface{}) (interface{}, error) // lastSync is the Unix epoch time of the last execution of 'sync' lastSync int64 }
隊列類型
(1) 可忽略隊列 EnqueueSkippableTask
(2) 不可忽略隊列app
// EnqueueTask enqueues ns/name of the given api object in the task queue. func (t *Queue) EnqueueTask(obj interface{}) { t.enqueue(obj, false) } // EnqueueSkippableTask enqueues ns/name of the given api object in // the task queue that can be skipped func (t *Queue) EnqueueSkippableTask(obj interface{}) { t.enqueue(obj, true) } // 入隊列 // enqueue enqueues ns/name of the given api object in the task queue. func (t *Queue) enqueue(obj interface{}, skippable bool) { if t.IsShuttingDown() { klog.Errorf("queue has been shutdown, failed to enqueue: %v", obj) return } ts := time.Now().UnixNano() if !skippable { // make sure the timestamp is bigger than lastSync ts = time.Now().Add(24 * time.Hour).UnixNano() } klog.V(3).Infof("queuing item %v", obj) key, err := t.fn(obj) if err != nil { klog.Errorf("%v", err) return } t.queue.Add(Element{ Key: key, Timestamp: ts, }) }
store 協程tcp
// file : k8s.io/ingress-nginx/internal/controller/store/store.go // Run initiates the synchronization of the informers and the initial // synchronization of the secrets. func (s *k8sStore) Run(stopCh chan struct{}) { // start informers s.informers.Run(stopCh) }
調用了informers.Run()方法
起多個協程去監聽ingress、secret、endpoint、service、configmap、pod 的變化ide
// Run initiates the synchronization of the informers against the API server. func (i *Informer) Run(stopCh chan struct{}) { // 啓動secret、endpoint、service、configmap、pod 的informer go i.Secret.Run(stopCh) go i.Endpoint.Run(stopCh) go i.Service.Run(stopCh) go i.ConfigMap.Run(stopCh) go i.Pod.Run(stopCh) ...... time.Sleep(1 * time.Second) go i.Ingress.Run(stopCh) ...... }
這裏以監聽 ingress 變化爲例,接着分析具體實現函數
// New creates a new object store to be used in the ingress controller func New( namespace, configmap, tcp, udp, defaultSSLCertificate string, resyncPeriod time.Duration, client clientset.Interface, updateCh *channels.RingChannel, pod *k8s.PodInfo, disableCatchAll bool) Storer { store := &k8sStore{ informers: &Informer{}, listers: &Lister{}, sslStore: NewSSLCertTracker(), updateCh: updateCh, backendConfig: ngx_config.NewDefault(), syncSecretMu: &sync.Mutex{}, backendConfigMu: &sync.RWMutex{}, secretIngressMap: NewObjectRefMap(), defaultSSLCertificate: defaultSSLCertificate, pod: pod, } ...... // k8sStore fulfills resolver.Resolver interface // 格式化annotation store.annotations = annotations.NewAnnotationExtractor(store) store.listers.IngressWithAnnotation.Store = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc) ...... // create informers factory, enable and assign required informers // informer 工廠函數 infFactory := informers.NewSharedInformerFactoryWithOptions(client, resyncPeriod, informers.WithNamespace(namespace), informers.WithTweakListOptions(tweakListOptionsFunc)) if k8s.IsNetworkingIngressAvailable { store.informers.Ingress = infFactory.Networking().V1beta1().Ingresses().Informer() } else { store.informers.Ingress = infFactory.Extensions().V1beta1().Ingresses().Informer() } store.listers.Ingress.Store = store.informers.Ingress.GetStore() store.informers.Endpoint = infFactory.Core().V1().Endpoints().Informer() store.listers.Endpoint.Store = store.informers.Endpoint.GetStore() store.informers.Secret = infFactory.Core().V1().Secrets().Informer() store.listers.Secret.Store = store.informers.Secret.GetStore() store.informers.ConfigMap = infFactory.Core().V1().ConfigMaps().Informer() store.listers.ConfigMap.Store = store.informers.ConfigMap.GetStore() store.informers.Service = infFactory.Core().V1().Services().Informer() store.listers.Service.Store = store.informers.Service.GetStore() labelSelector := labels.SelectorFromSet(store.pod.Labels) // list and watch 機制 store.informers.Pod = cache.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (k8sruntime.Object, error) { options.LabelSelector = labelSelector.String() return client.CoreV1().Pods(store.pod.Namespace).List(context.TODO(), options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { options.LabelSelector = labelSelector.String() return client.CoreV1().Pods(store.pod.Namespace).Watch(context.TODO(), options) }, }, &corev1.Pod{}, resyncPeriod, cache.Indexers{}, ) store.listers.Pod.Store = store.informers.Pod.GetStore() ingDeleteHandler := func(obj interface{}) { ing, ok := toIngress(obj) if !ok { // If we reached here it means the ingress was deleted but its final state is unrecorded. tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { klog.Errorf("couldn't get object from tombstone %#v", obj) return } ing, ok = tombstone.Obj.(*networkingv1beta1.Ingress) if !ok { klog.Errorf("Tombstone contained object that is not an Ingress: %#v", obj) return } } if !class.IsValid(ing) { klog.Infof("ignoring delete for ingress %v based on annotation %v", ing.Name, class.IngressKey) return } if isCatchAllIngress(ing.Spec) && disableCatchAll { klog.Infof("ignoring delete for catch-all ingress %v/%v because of --disable-catch-all", ing.Namespace, ing.Name) return } recorder.Eventf(ing, corev1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name)) store.listers.IngressWithAnnotation.Delete(ing) key := k8s.MetaNamespaceKey(ing) store.secretIngressMap.Delete(key) updateCh.In() <- Event{ Type: DeleteEvent, Obj: obj, } } ingEventHandler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { ing, _ := toIngress(obj) if !class.IsValid(ing) { a, _ := parser.GetStringAnnotation(class.IngressKey, ing) klog.Infof("ignoring add for ingress %v based on annotation %v with value %v", ing.Name, class.IngressKey, a) return } if isCatchAllIngress(ing.Spec) && disableCatchAll { klog.Infof("ignoring add for catch-all ingress %v/%v because of --disable-catch-all", ing.Namespace, ing.Name) return } recorder.Eventf(ing, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name)) store.syncIngress(ing) store.updateSecretIngressMap(ing) store.syncSecrets(ing) updateCh.In() <- Event{ Type: CreateEvent, Obj: obj, } }, DeleteFunc: ingDeleteHandler, UpdateFunc: func(old, cur interface{}) { oldIng, _ := toIngress(old) curIng, _ := toIngress(cur) validOld := class.IsValid(oldIng) validCur := class.IsValid(curIng) if !validOld && validCur { if isCatchAllIngress(curIng.Spec) && disableCatchAll { klog.Infof("ignoring update for catch-all ingress %v/%v because of --disable-catch-all", curIng.Namespace, curIng.Name) return } klog.Infof("creating ingress %v based on annotation %v", curIng.Name, class.IngressKey) recorder.Eventf(curIng, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name)) } else if validOld && !validCur { klog.Infof("removing ingress %v based on annotation %v", curIng.Name, class.IngressKey) ingDeleteHandler(old) return } else if validCur && !reflect.DeepEqual(old, cur) { if isCatchAllIngress(curIng.Spec) && disableCatchAll { klog.Infof("ignoring update for catch-all ingress %v/%v and delete old one because of --disable-catch-all", curIng.Namespace, curIng.Name) ingDeleteHandler(old) return } recorder.Eventf(curIng, corev1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name)) } else { klog.V(3).Infof("No changes on ingress %v/%v. Skipping update", curIng.Namespace, curIng.Name) return } store.syncIngress(curIng) store.updateSecretIngressMap(curIng) store.syncSecrets(curIng) updateCh.In() <- Event{ Type: UpdateEvent, Obj: cur, } }, } secrEventHandler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) {...}, UpdateFunc: func(old, cur interface{}) {...}, DeleteFunc: func(obj interface{}) {...}, } epEventHandler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) {...}, DeleteFunc: func(obj interface{}) {...}, UpdateFunc: func(old, cur interface{}) {...}, } ...... cmEventHandler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) {...}, UpdateFunc: func(old, cur interface{}) {...}, } podEventHandler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) {...}, UpdateFunc: func(old, cur interface{}) {...}, DeleteFunc: func(obj interface{}) {...}, } serviceHandler := cache.ResourceEventHandlerFuncs{ UpdateFunc: func(old, cur interface{}) {...}, } // 註冊各類類型的eventHandler store.informers.Ingress.AddEventHandler(ingEventHandler) store.informers.Endpoint.AddEventHandler(epEventHandler) store.informers.Secret.AddEventHandler(secrEventHandler) store.informers.ConfigMap.AddEventHandler(cmEventHandler) store.informers.Service.AddEventHandler(serviceHandler) store.informers.Pod.AddEventHandler(podEventHandler) // do not wait for informers to read the configmap configuration ns, name, _ := k8s.ParseNameNS(configmap) cm, err := client.CoreV1().ConfigMaps(ns).Get(context.TODO(), name, metav1.GetOptions{}) if err != nil { klog.Warningf("Unexpected error reading configuration configmap: %v", err) } store.setConfig(cm) return store }
能夠看到,每種類型的informer 基本都有相關的回調方法,包括:
AddFunc: func(obj interface{}) {...},
UpdateFunc: func(old, cur interface{}) {...},
DeleteFunc: func(obj interface{}) {...},oop
每一個方法裏面都會往updateCh 寫入不一樣類型的事件(CreateEvent、DeleteEvent、UpdateEvent)
這一步跟store 協程協同工做,informer 經過list&watch 方法監聽資源變化,一旦資源有變化則向updateCh 裏面寫入事件,store 協程循環監聽updateCh變化,一旦收到事件則往syncQueue 寫入一個task
// file : k8s.io/ingress-controller/internal/ingress/controller/nginx.go // 初始化Queue n.syncQueue = task.NewTaskQueue(n.syncIngress) // NewTaskQueue creates a new task queue with the given sync function. // The sync function is called for every element inserted into the queue. // 對於每一個插入進來的項目都會調用sync function func NewTaskQueue(syncFn func(interface{}) error) *Queue { return NewCustomTaskQueue(syncFn, nil) } // NewCustomTaskQueue func NewCustomTaskQueue(syncFn func(interface{}) error, fn func(interface{}) (interface{}, error)) *Queue { // syncFn(也就是syncIngress)被賦值到Queue.sync q := &Queue{ queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), sync: syncFn, workerDone: make(chan bool), fn: fn, } if fn == nil { q.fn = q.defaultKeyFunc } return q }
消費Queue隊列
核心方法:
t.queue.Get() -> t.sync()
// file: k8s.io/ingress-nginx/internal/ingress/controller/nginx.go func (n *NGINXController) Start() { ...... go n.syncQueue.Run(time.Second, n.stopCh) ...... } // file: k8s.io/ingress-nginx/internal/task/queue.go // Run starts processing elements in the queue func (t *Queue) Run(period time.Duration, stopCh <-chan struct{}) { wait.Until(t.worker, period, stopCh) } // worker processes work in the queue through sync. // 消費Queue隊列 func (t *Queue) worker() { for { key, quit := t.queue.Get() ...... ts := time.Now().UnixNano() item := key.(Element) // 比對最後一次同步的時間戳與Queue中取出item裏面帶的時間戳,若是小於最後一次同步時間戳則忽略改變動 if t.lastSync > item.Timestamp { klog.V(3).Infof("skipping %v sync (%v > %v)", item.Key, t.lastSync, item.Timestamp) t.queue.Forget(key) t.queue.Done(key) continue } klog.V(3).Infof("syncing %v", item.Key) // 調用syncIngress if err := t.sync(key); err != nil { klog.Warningf("requeuing %v, err %v", item.Key, err) t.queue.AddRateLimited(Element{ Key: item.Key, Timestamp: time.Now().UnixNano(), }) } else { t.queue.Forget(key) t.lastSync = ts } t.queue.Done(key) } }
比對線上在跑的配置跟新生成的配置是否相同,並判斷是否可以動態重載配置(僅更新endpoint),減小nginx頻繁reload帶來性能損耗.
pcfg :當前格式化出來的配置
n.runningConfig : 當前線上環境運行的配置
比對pcfg 和 n.runningConfig 配置,判斷是否能夠動態更新配置(僅endpoint列表變化)
(1)支持動態更新配置:調用n.configureDynamically(pcfg)
將backend 列表以json格式post 到/configuration/backends 這個LUA Handler,動態更新endpoint 列表
(2)不支持動態更新配置,調用 n.OnUpdate(*pcfg)
生成臨時配置文件
檢測臨時配置文件語法
diff 臨時配置文件與當前線上配置文件
刪除臨時配置文件
將新生成的配置寫入線上配置文件
執行nginx -s reload 重載配置
// file: k8s.io/ingress-nginx/internal/ingress/controller/controller.go // syncIngress collects all the pieces required to assemble the NGINX // configuration file and passes the resulting data structures to the backend // (OnUpdate) when a reload is deemed necessary. // 組裝nginx 配置文件 // 須要reload 時,調用OnUpdate func (n *NGINXController) syncIngress(interface{}) error { ...... ings := n.store.ListIngresses(nil) // 格式化新配置 hosts, servers, pcfg := n.getConfiguration(ings) ...... // 判斷配置是否有變化 if n.runningConfig.Equal(pcfg) { klog.V(3).Infof("No configuration change detected, skipping backend reload.") return nil } ...... // 配置有變化,則判斷是否須要reload nginx if !n.IsDynamicConfigurationEnough(pcfg) { klog.Infof("Configuration changes detected, backend reload required.") // 生成checksum hash值 hash, _ := hashstructure.Hash(pcfg, &hashstructure.HashOptions{ TagName: "json", }) pcfg.ConfigurationChecksum = fmt.Sprintf("%v", hash) //調用onUpdate 方法 err := n.OnUpdate(*pcfg) ...... klog.Infof("Backend successfully reloaded.") ...... } // 是否首次同步(ingress.Configuration 結構體是否爲空) isFirstSync := n.runningConfig.Equal(&ingress.Configuration{}) if isFirstSync { // For the initial sync it always takes some time for NGINX to start listening // For large configurations it might take a while so we loop and back off // 首次初始化須要耗費必定的時間,睡眠1秒 klog.Info("Initial sync, sleeping for 1 second.") time.Sleep(1 * time.Second) } // 重試機制 retry := wait.Backoff{ Steps: 15, Duration: 1 * time.Second, Factor: 0.8, Jitter: 0.1, } err := wait.ExponentialBackoff(retry, func() (bool, error) { // 動態更新nginx 配置 err := n.configureDynamically(pcfg) if err == nil { klog.V(2).Infof("Dynamic reconfiguration succeeded.") return true, nil } klog.Warningf("Dynamic reconfiguration failed: %v", err) return false, err }) ...... n.runningConfig = pcfg return nil }
判斷是否能夠動態更新配置
不須要reload的場景
須要reload的場景
// file: k8s.io/ingress-contoller/internal/ingress/controller/nginx.go // IsDynamicConfigurationEnough returns whether a Configuration can be // dynamically applied, without reloading the backend. // 判斷是否nginx 能夠動態重載,不須要執行reload func (n *NGINXController) IsDynamicConfigurationEnough(pcfg *ingress.Configuration) bool { copyOfRunningConfig := *n.runningConfig copyOfPcfg := *pcfg copyOfRunningConfig.Backends = []*ingress.Backend{} copyOfPcfg.Backends = []*ingress.Backend{} clearL4serviceEndpoints(©OfRunningConfig) clearL4serviceEndpoints(©OfPcfg) copyOfRunningConfig.ControllerPodsCount = 0 copyOfPcfg.ControllerPodsCount = 0 clearCertificates(©OfRunningConfig) clearCertificates(©OfPcfg) return copyOfRunningConfig.Equal(©OfPcfg) }
不能動態更新,調用nginx reload 重載配置
// OnUpdate is called by the synchronization loop whenever configuration // changes were detected. The received backend Configuration is merged with the // configuration ConfigMap before generating the final configuration file. // Returns nil in case the backend was successfully reloaded. // 當監聽到配置發生變化,同步循環將調用OnUdate // 接收到的backend 配置會跟當前配置的configmap 進行合併 func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error { cfg := n.store.GetBackendConfiguration() cfg.Resolver = n.resolver // 生成臨時配置 content, err := n.generateTemplate(cfg, ingressCfg) ...... // 檢查配置是否正確 err = n.testTemplate(content) ...... if klog.V(2) { src, _ := ioutil.ReadFile(cfgPath) if !bytes.Equal(src, content) { tmpfile, err := ioutil.TempFile("", "new-nginx-cfg") if err != nil { return err } defer tmpfile.Close() // 建立臨時配置文件 err = ioutil.WriteFile(tmpfile.Name(), content, file.ReadWriteByUser) ...... // diff 比對生成的臨時配置跟當前生效配置 diffOutput, err := exec.Command("diff", "-I", "'# Configuration.*'", "-u", cfgPath, tmpfile.Name()).CombinedOutput() ...... klog.Infof("NGINX configuration diff:\n%v", string(diffOutput)) // 刪除臨時配置文件 os.Remove(tmpfile.Name()) } } // 將新配置寫入cfgPath err = ioutil.WriteFile(cfgPath, content, file.ReadWriteByUser) ...... // reload nginx o, err := n.command.ExecCommand("-s", "reload").CombinedOutput() ...... return nil }
動態更新
// file: k8s.io/ingress-contoller/internal/ingress/controller/nginx.go // configureDynamically encodes new Backends in JSON format and POSTs the // payload to an internal HTTP endpoint handled by Lua. // 以json 的格式封裝backend 列表並post 到lua API func (n *NGINXController) configureDynamically(pcfg *ingress.Configuration) error { backendsChanged := !reflect.DeepEqual(n.runningConfig.Backends, pcfg.Backends) if backendsChanged { // 更新endpoint 列表 err := configureBackends(pcfg.Backends) ...... } // 比對TCP/UDP endpoint 列表 streamConfigurationChanged := !reflect.DeepEqual(n.runningConfig.TCPEndpoints, pcfg.TCPEndpoints) || !reflect.DeepEqual(n.runningConfig.UDPEndpoints, pcfg.UDPEndpoints) if streamConfigurationChanged { err := updateStreamConfiguration(pcfg.TCPEndpoints, pcfg.UDPEndpoints) ...... } if n.runningConfig.ControllerPodsCount != pcfg.ControllerPodsCount { // post pod 數目 statusCode, _, err := nginx.NewPostStatusRequest("/configuration/general", "application/json", ingress.GeneralConfig{ ControllerPodsCount: pcfg.ControllerPodsCount, }) ...... } // 比對servers 變化 serversChanged := !reflect.DeepEqual(n.runningConfig.Servers, pcfg.Servers) if serversChanged { err := configureCertificates(pcfg.Servers) ...... } return nil }
以JSON 格式 POST 調用LUA Handler /configuration/backends
// file: k8s.io/ingress-nginx/internal/controller/nginx.go func configureBackends(rawBackends []*ingress.Backend) error { backends := make([]*ingress.Backend, len(rawBackends)) for i, backend := range rawBackends { var service *apiv1.Service if backend.Service != nil { service = &apiv1.Service{Spec: backend.Service.Spec} } luaBackend := &ingress.Backend{ Name: backend.Name, Port: backend.Port, SSLPassthrough: backend.SSLPassthrough, SessionAffinity: backend.SessionAffinity, UpstreamHashBy: backend.UpstreamHashBy, LoadBalancing: backend.LoadBalancing, Service: service, NoServer: backend.NoServer, TrafficShapingPolicy: backend.TrafficShapingPolicy, AlternativeBackends: backend.AlternativeBackends, } var endpoints []ingress.Endpoint for _, endpoint := range backend.Endpoints { endpoints = append(endpoints, ingress.Endpoint{ Address: endpoint.Address, Port: endpoint.Port, }) } luaBackend.Endpoints = endpoints backends[i] = luaBackend } // 更新endpoint 列表 statusCode, _, err := nginx.NewPostStatusRequest("/configuration/backends", "application/json", backends) if err != nil { return err } if statusCode != http.StatusCreated { return fmt.Errorf("unexpected error code: %d", statusCode) } return nil }