version:1.9.6api
導航:服務器
1.尋找入口app
2.構建命令行框架
3.建立服務鏈 CreateServerChainfrontend
4.啓動服務 s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh)tcp
1.命令行入口ide
cmd/kube-apiserver/apiserver.go函數
命令行解析框架 &cobra 須要先去了解一下:oop
https://o-my-chenjian.com/2017/09/20/Using-Cobra-With-Golang/
post
入口函數:cmd/kube-apiserver/apiserver.go
經過cmd的Execute啓動服務。
func main(){
... command := app.NewAPIServerCommand() ... if err := command.Execute(); err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } }
2. 構建命令行
cmd 結構體
cmd := &cobra.Command{ Use: "kube-apiserver", Long: `The Kubernetes API server validates and configures data for the api objects which include pods, services, replicationcontrollers, and others. The API Server services REST operations and provides the frontend to the cluster's shared state through which all other components interact.`, RunE: func(cmd *cobra.Command, args []string) error { verflag.PrintAndExitIfRequested() utilflag.PrintFlags(cmd.Flags()) // set default options completedOptions, err := Complete(s) if err != nil { return err } // validate options if errs := completedOptions.Validate(); len(errs) != 0 { return utilerrors.NewAggregate(errs) } return Run(completedOptions, stopCh) }, }
各組件程序都是用 cobra
來管理、解析命令行參數的,main 包下面還有 app 包,app 包纔是包含建立 cobra 命令邏輯的地方,因此其實 main 包的邏輯特別簡單,主要是調用執行函數就能夠了。
app.NewAPIServerCommand(server.SetupSignalHandler()) 返回*cobra.Command, 執行 command.Execute()最終會調用 Command結構體中定義的Run函數
上面的代碼中RunE是運行而且返回Error的意思
咱們能夠看到RunE中返回了Run(completedOptions, stopCh)
3. 建立服務鏈
4. 建立服務
// CreateServerChain creates the apiservers connected via delegation.
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*genericapiserver.GenericAPIServer, error) {
這裏面建立了一個 server
,通過 PrepareRun()
返回 preparedGenericAPIServer
並最終調用其方法 Run()
GenericAPIServer 結構體:
// GenericAPIServer contains state for a Kubernetes cluster api server. type GenericAPIServer struct { ..... // admissionControl is used to build the RESTStorage that backs an API Group. admissionControl admission.Interface // "Outputs" // Handler holds the handlers being used by this API server Handler *APIServerHandler 。。。。。 // DiscoveryGroupManager serves /apis DiscoveryGroupManager discovery.GroupManager // Enable swagger and/or OpenAPI if these configs are non-nil. openAPIConfig *openapicommon.Config // PostStartHooks are each called after the server has started listening, in a separate go func for each // with no guarantee of ordering between them. The map key is a name used for error reporting. // It may kill the process with a panic if it wishes to by returning an error. postStartHookLock sync.Mutex postStartHooks map[string]postStartHookEntry postStartHooksCalled bool disabledPostStartHooks sets.String preShutdownHookLock sync.Mutex preShutdownHooks map[string]preShutdownHookEntry preShutdownHooksCalled bool 。。。。。 // HandlerChainWaitGroup allows you to wait for all chain handlers finish after the server shutdown. HandlerChainWaitGroup *utilwaitgroup.SafeWaitGroup }
// Run spawns the secure http server. It only returns if stopCh is closed // or the secure port cannot be listened on initially. func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { err := s.NonBlockingRun(stopCh) if err != nil { return err } <-stopCh err = s.RunPreShutdownHooks() if err != nil { return err } // Wait for all requests to finish, which are bounded by the RequestTimeout variable. s.HandlerChainWaitGroup.Wait() return nil }
咱們看到它又調用了 s.NonBlockingRun()
,看方法名就知道是非阻塞運行即裏面會建立新的 goroutine 最終運行 http 服務器,提供 http
接口給其它 kubernetes 組件調用,也是 kubernetes 集羣控制的核心機制。而後到 <-stopCh
這裏阻塞,若是這個 channel 被 close,
這裏就會中止阻塞並處理關閉邏輯最後函數執行結束,s.NonBlockingRun()
這個函數也傳入了 stopCh
,一樣也是出於相似的考慮,讓程序優雅關閉,
stopCh
最初是 NewAPIServerCommand()
中建立的:
stopCh := server.SetupSignalHandler()
很容易看出來這個 channel 跟系統信號量綁定了,即 ctrl + c 或 kill 通知程序關閉的時候會 close 這個 channel ,而後調用 的地方就會中止阻塞,
作關閉程序須要的一些清理操做實現優雅關閉
<-stopCh
var onlyOneSignalHandler = make(chan struct{}) // SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned // which is closed on one of these signals. If a second signal is caught, the program // is terminated with exit code 1. func SetupSignalHandler() <-chan struct{} { close(onlyOneSignalHandler) // panics when called twice stop := make(chan struct{}) c := make(chan os.Signal, 2) signal.Notify(c, shutdownSignals...) go func() { <-c close(stop) <-c os.Exit(1) // second signal. Exit directly. }() return stop }
咱們再來看看 NonBlockingRun()
這個函數的實現
// NonBlockingRun spawns the secure http server. An error is // returned if the secure port cannot be listened on. func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error { ... // Use an internal stop channel to allow cleanup of the listeners on error. internalStopCh := make(chan struct{}) if s.SecureServingInfo != nil && s.Handler != nil { if err := s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh); err != nil { close(internalStopCh) return err } } ... return nil }
能夠看到又調用了 s.SecureServingInfo.Serve()
來啓動 http 服務器,繼續深刻進去
// Serve runs the secure http server. It fails only if certificates cannot be loaded or the initial listen call fails. // The actual server loop (stoppable by closing stopCh) runs in a go routine, i.e. Serve does not block. // It returns a stoppedCh that is closed when all non-hijacked active requests have been processed. func (s *SecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) (<-chan struct{}, error) { if s.Listener == nil { return nil, fmt.Errorf("listener must not be nil") } secureServer := &http.Server{ Addr: s.Listener.Addr().String(), Handler: handler, MaxHeaderBytes: 1 << 20, TLSConfig: &tls.Config{ NameToCertificate: s.SNICerts, // Can't use SSLv3 because of POODLE and BEAST // Can't use TLSv1.0 because of POODLE and BEAST using CBC cipher // Can't use TLSv1.1 because of RC4 cipher usage MinVersion: tls.VersionTLS12, // enable HTTP2 for go's 1.7 HTTP Server NextProtos: []string{"h2", "http/1.1"}, }, } ....... return RunServer(secureServer, s.Listener, shutdownTimeout, stopCh) }
這一步建立了http.Server, 而且調用RunServer
// RunServer listens on the given port if listener is not given, // then spawns a go-routine continuously serving until the stopCh is closed. // It returns a stoppedCh that is closed when all non-hijacked active requests // have been processed. // This function does not block // TODO: make private when insecure serving is gone from the kube-apiserver func RunServer( server *http.Server, ln net.Listener, shutDownTimeout time.Duration, stopCh <-chan struct{}, ) (<-chan struct{}, error) { if ln == nil { return nil, fmt.Errorf("listener must not be nil") } // Shutdown server gracefully. stoppedCh := make(chan struct{}) go func() { defer close(stoppedCh) <-stopCh ctx, cancel := context.WithTimeout(context.Background(), shutDownTimeout) server.Shutdown(ctx) cancel() }() go func() { defer utilruntime.HandleCrash() var listener net.Listener listener = tcpKeepAliveListener{ln.(*net.TCPListener)} if server.TLSConfig != nil { listener = tls.NewListener(listener, server.TLSConfig) } err := server.Serve(listener) msg := fmt.Sprintf("Stopped listening on %s", ln.Addr().String()) select { case <-stopCh: klog.Info(msg) default: panic(fmt.Sprintf("%s due to error: %v", msg, err)) } }() return stoppedCh, nil }
最終看到在後面那個新的 goroutine 中,調用了server.Serve(listener) 來啓動 http 服務器,正常啓動的狀況下會一直阻塞在這裏。
至此,咱們初步把 kube-apiserver 源碼的主線理清楚了,具體還有不少細節咱們後面再繼續深刻。要理清思路咱們就須要儘可能先屏蔽細節,尋找咱們想知道的邏輯路線。
據說學習是從模仿開始的: 感謝源做者 https://cloud.tencent.com/developer/article/1326541