Kubectl exec 的工做原理解讀

對於常常和 Kubernetes 打交道的 YAML 工程師來講,最經常使用的命令就是 kubectl exec 了,經過它能夠直接在容器內執行命令來調試應用程序。若是你不知足於只是用用而已,想了解 kubectl exec 的工做原理,那麼本文值得你仔細讀一讀。本文將經過參考 kubectlAPI ServerKubelet 和容器運行時接口(CRI)Docker API 中的相關代碼來了解該命令是如何工做的。node

kubectl exec 的工做原理用一張圖就能夠表示:linux

kubectl exec

先來看一個例子:nginx

🐳 → kubectl version --short 
Client Version: v1.15.0 
Server Version: v1.15.3

🐳 → kubectl run nginx --image=nginx --port=80 --generator=run-pod/v1
pod/nginx created

🐳 → kubectl get po     
NAME    READY   STATUS    RESTARTS   AGE 
nginx   1/1     Running   0          6s  

🐳 → kubectl exec nginx -- date
Sat Jan 25 18:47:52 UTC 2020

🐳 → kubectl exec -it nginx -- /bin/bash 
root@nginx:/#

第一個 kubectl exec 在容器內執行了 date 命令,第二個 kubectl exec 使用 -i-t 參數進入了容器的交互式 shell。git

重複第二個 kubectl exec 命令,打印更詳細的日誌:github

🐳 → kubectl -v=7 exec -it nginx -- /bin/bash                                                         
I0125 10:51:55.434043   28053 loader.go:359] Config loaded from file:  /home/isim/.kube/kind-config-linkerd
I0125 10:51:55.438595   28053 round_trippers.go:416] GET https://127.0.0.1:38545/api/v1/namespaces/default/pods/nginx
I0125 10:51:55.438607   28053 round_trippers.go:423] Request Headers:
I0125 10:51:55.438611   28053 round_trippers.go:426]     Accept: application/json, */*
I0125 10:51:55.438615   28053 round_trippers.go:426]     User-Agent: kubectl/v1.15.0 (linux/amd64) kubernetes/e8462b5
I0125 10:51:55.445942   28053 round_trippers.go:441] Response Status: 200 OK in 7 milliseconds
I0125 10:51:55.451050   28053 round_trippers.go:416] POST https://127.0.0.1:38545/api/v1/namespaces/default/pods/nginx/exec?command=%2Fbin%2Fbash&container=nginx&stdin=true&stdout=true&tty=true
I0125 10:51:55.451063   28053 round_trippers.go:423] Request Headers:
I0125 10:51:55.451067   28053 round_trippers.go:426]     X-Stream-Protocol-Version: v4.channel.k8s.io
I0125 10:51:55.451090   28053 round_trippers.go:426]     X-Stream-Protocol-Version: v3.channel.k8s.io
I0125 10:51:55.451096   28053 round_trippers.go:426]     X-Stream-Protocol-Version: v2.channel.k8s.io
I0125 10:51:55.451100   28053 round_trippers.go:426]     X-Stream-Protocol-Version: channel.k8s.ioI0125 10:51:55.451121   28053 round_trippers.go:426]     User-Agent: kubectl/v1.15.0 (linux/amd64) kubernetes/e8462b5
I0125 10:51:55.465690   28053 round_trippers.go:441] Response Status: 101 Switching Protocols in 14 milliseconds
root@nginx:/#

這裏有兩個重要的 HTTP 請求:docker

  • GET 請求用來獲取 Pod 信息
  • POST 請求調用 Pod 的子資源 exec 在容器內執行命令。

子資源(subresource)隸屬於某個 K8S 資源,表示爲父資源下方的子路徑,例如 /logs/status/scale/exec 等。其中每一個子資源支持的操做根據對象的不一樣而改變。shell

最後 API Server 返回了 101 Ugrade 響應,向客戶端表示已切換到 SPDY 協議。json

SPDY 容許在單個 TCP 鏈接上覆用獨立的 stdin/stdout/stderr/spdy-error 流。api

1. API Server 源碼分析

請求首先會到底 API Server,先來看看 API Server 是如何註冊 rest.ExecRest 處理器來處理子資源請求 /exec 的。這個處理器用來肯定 exec 要進入的節點。緩存

API Server 啓動過程當中作的第一件事就是指揮內嵌的 GenericAPIServer 加載早期的遺留 API(legacy API):

if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
	// ...
	if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {
		return nil, err
	}
}

在 API 加載過程當中,會將類型 LegacyRESTStorage 實例化,建立一個 storage.PodStorage 實例:

podStorage, err := podstore.NewStorage(
	restOptionsGetter,
	nodeStorage.KubeletConnectionInfo,
	c.ProxyTransport,
	podDisruptionClient,
)
if err != nil {
	return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
}

隨後 storeage.PodStorage 實例會被添加到 map restStorageMap 中。注意,該 map 將路徑 pods/exec 映射到了 podStoragerest.ExecRest 處理器。

restStorageMap := map[string]rest.Storage{
	"pods":             podStorage.Pod,
	"pods/attach":      podStorage.Attach,
	"pods/status":      podStorage.Status,
	"pods/log":         podStorage.Log,
	"pods/exec":        podStorage.Exec,
	"pods/portforward": podStorage.PortForward,
	"pods/proxy":       podStorage.Proxy,
	"pods/binding":     podStorage.Binding,
	"bindings":         podStorage.LegacyBinding,

podstorage 爲 pod 和子資源提供了 CURD 邏輯和策略的抽象。更多詳細信息請查看內嵌的 genericregistry.Store

map restStorageMap 會成爲實例 apiGroupInfo 的一部分,添加到 GenericAPIServer 中:

if err := s.installAPIResources(apiPrefix, apiGroupInfo, openAPIModels); err != nil {
	return err
}

// Install the version handler.
// Add a handler at /<apiPrefix> to enumerate the supported api versions.
s.Handler.GoRestfulContainer.Add(discovery.NewLegacyRootAPIHandler(s.discoveryAddresses, s.Serializer, apiPrefix).WebService())

其中 GoRestfulContainer.ServeMux 會將傳入的請求 URL 映射到不一樣的處理器。

接下來重點觀察處理器 therest.ExecRest 的工做原理,它的 Connect() 方法會調用函數 pod.ExecLocation() 來肯定 pod 中容器的 exec 子資源的 URL

// Connect returns a handler for the pod exec proxy
func (r *ExecREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
	execOpts, ok := opts.(*api.PodExecOptions)
	if !ok {
		return nil, fmt.Errorf("invalid options object: %#v", opts)
	}
	location, transport, err := pod.ExecLocation(r.Store, r.KubeletConn, ctx, name, execOpts)
	if err != nil {
		return nil, err
	}
	return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, true, responder), nil
}

函數 pod.ExecLocation() 返回的 URL 被 API Server 用來決定鏈接到哪一個節點。

下面接着分析節點上的 Kubelet 源碼。

2. Kubelet 源碼分析

到了 Kubelet 這邊,咱們須要關心兩點:

  • Kubelet 是如何註冊 exec 處理器的?
  • Kubelet 與 Docker API 如何交互?

Kubelet 的初始化過程很是複雜,主要涉及到兩個函數:

註冊處理器

當 Kubelet 啓動時,它的 RunKubelet() 函數會調用私有函數 startKubelet()啓動 kubelet.Kubelet 實例ListenAndServe() 方法,而後該方法會調用函數 ListenAndServeKubeletServer() ,使用構造函數 NewServer() 來安裝 『debugging』處理器:

// NewServer initializes and configures a kubelet.Server object to handle HTTP requests.
func NewServer(
	// ...
	criHandler http.Handler) Server {
	// ...
	if enableDebuggingHandlers {
		server.InstallDebuggingHandlers(criHandler)
		if enableContentionProfiling {
			goruntime.SetBlockProfileRate(1)
		}
	} else {
		server.InstallDebuggingDisabledHandlers()
	}
	return server
}

InstallDebuggingHandlers() 函數使用 getExec() 處理器來註冊 HTTP 請求模式:

// InstallDebuggingHandlers registers the HTTP request patterns that serve logs or run commands/containers
func (s *Server) InstallDebuggingHandlers(criHandler http.Handler) {
  // ...
  ws = new(restful.WebService)
	ws.
		Path("/exec")
	ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
		To(s.getExec).
		Operation("getExec"))
	ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
		To(s.getExec).
		Operation("getExec"))
	ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
		To(s.getExec).
		Operation("getExec"))
	ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
		To(s.getExec).
		Operation("getExec"))
	s.restfulCont.Add(ws)

其中 getExec() 處理器又會調用 s.host 實例中的 GetExec() 方法:

// getExec handles requests to run a command inside a container.
func (s *Server) getExec(request *restful.Request, response *restful.Response) {
  	// ...
	podFullName := kubecontainer.GetPodFullName(pod)
	url, err := s.host.GetExec(podFullName, params.podUID, params.containerName, params.cmd, *streamOpts)
	if err != nil {
		streaming.WriteError(err, response.ResponseWriter)
		return
	}
	// ...
}

s.host 被實例化爲 kubelet.Kubelet 類型的一個實例,它嵌套引用了 StreamingRuntime 接口,該接口又被實例化kubeGenericRuntimeManager 的實例,即運行時管理器。該運行時管理器是 Kubelet 與 Docker API 交互的關鍵組件,GetExec() 方法就是由它實現的:

// GetExec gets the endpoint the runtime will serve the exec request from.
func (m *kubeGenericRuntimeManager) GetExec(id kubecontainer.ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error) {
	// ...
	resp, err := m.runtimeService.Exec(req)
	if err != nil {
		return nil, err
	}

	return url.Parse(resp.Url)
}

GetExec() 又會調用 runtimeService.Exec() 方法,進一步挖掘你會發現 runtimeService 是 CRI 包中定義的接口kuberuntime.kubeGenericRuntimeManagerruntimeService 被實例化爲 kuberuntime.instrumentedRuntimeService 類型,由它來實現 runtimeService.Exec() 方法:

func (in instrumentedRuntimeService) Exec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
	const operation = "exec"
	defer recordOperation(operation, time.Now())

	resp, err := in.service.Exec(req)
	recordError(operation, err)
	return resp, err
}

instrumentedRuntimeService 實例的嵌套服務對象被實例化theremote.RemoteRuntimeService 類型的實例。該類型實現了 Exec() 方法:

// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
func (r *RemoteRuntimeService) Exec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
	ctx, cancel := getContextWithTimeout(r.timeout)
	defer cancel()

	resp, err := r.runtimeClient.Exec(ctx, req)
	if err != nil {
		klog.Errorf("Exec %s '%s' from runtime service failed: %v", req.ContainerId, strings.Join(req.Cmd, " "), err)
		return nil, err
	}

	if resp.Url == "" {
		errorMessage := "URL is not set"
		klog.Errorf("Exec failed: %s", errorMessage)
		return nil, errors.New(errorMessage)
	}

	return resp, nil
}

Exec() 方法會向 /runtime.v1alpha2.RuntimeService/Exec 發起一個 gRPC 調用來讓運行時端準備一個流式通訊的端點,該端點用於在容器中執行命令(關於如何將 Docker shim 設置爲 gRPC 服務端的更多信息請參考下一小節)。

gRPC 服務端經過調用 RuntimeServiceServer.Exec() 方法來處理請求,該方法由 dockershim.dockerService 結構體實現:

// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
func (ds *dockerService) Exec(_ context.Context, req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
	if ds.streamingServer == nil {
		return nil, streaming.NewErrorStreamingDisabled("exec")
	}
	_, err := checkContainerStatus(ds.client, req.ContainerId)
	if err != nil {
		return nil, err
	}
	return ds.streamingServer.GetExec(req)
}

第 10 行的 ThestreamingServer 是一個 streaming.Server 接口,它在構造函數 dockershim.NewDockerService() 中被實例化:

// create streaming server if configured.
if streamingConfig != nil {
	var err error
	ds.streamingServer, err = streaming.NewServer(*streamingConfig, ds.streamingRuntime)
	if err != nil {
		return nil, err
	}
}

來看一下 GetExec() 方法的實現方式:

func (s *server) GetExec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
	if err := validateExecRequest(req); err != nil {
		return nil, err
	}
	token, err := s.cache.Insert(req)
	if err != nil {
		return nil, err
	}
	return &runtimeapi.ExecResponse{
		Url: s.buildURL("exec", token),
	}, nil
}

能夠看到這裏只是向客戶端返回一個簡單的 token 組合成的 URL, 之因此生成一個 token 是由於用戶的命令中可能包含各類各樣的字符,各類長度的字符,須要格式化爲一個簡單的 token。 該 token 會緩存在本地,後面真正的 exec 請求會攜帶這個 token,經過該 token 找到以前的具體請求。其中 restful.WebService 實例會將 pod exec 請求路由到這個端點:

// InstallDebuggingHandlers registers the HTTP request patterns that serve logs or run commands/containers
func (s *Server) InstallDebuggingHandlers(criHandler http.Handler) {
  // ...
  ws = new(restful.WebService)
	ws.
		Path("/exec")
	ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
		To(s.getExec).
		Operation("getExec"))
	ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
		To(s.getExec).
		Operation("getExec"))
	ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
		To(s.getExec).
		Operation("getExec"))
	ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
		To(s.getExec).
		Operation("getExec"))
	s.restfulCont.Add(ws)

建立 Docker shim

PreInitRuntimeService() 函數做爲 gRPC 服務端,負責建立並啓動 Docker shim。在將dockershim.dockerService 類型實例化時,讓其嵌套的 streamingRuntime 實例引用 dockershim.NativeExecHandler 的實例(該實例實現了 dockershim.ExecHandler 接口)。

ds := &dockerService{
	// ...
	streamingRuntime: &streamingRuntime{
		client:      client,
		execHandler: &NativeExecHandler{},
	},
	// ...
}

使用 Docker 的 exec API 在容器中執行命令的核心實現就是 NativeExecHandler.ExecInContainer() 方法:

func (*NativeExecHandler) ExecInContainer(client libdocker.Interface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
	// ...
	startOpts := dockertypes.ExecStartCheck{Detach: false, Tty: tty}
	streamOpts := libdocker.StreamOptions{
		InputStream:  stdin,
		OutputStream: stdout,
		ErrorStream:  stderr,
		RawTerminal:  tty,
		ExecStarted:  execStarted,
	}
	err = client.StartExec(execObj.ID, startOpts, streamOpts)
	if err != nil {
		return err
	}
	// ...

這裏就是最終 Kubelet 調用 Docker exec API 的地方。

最後須要搞清楚的是 streamingServer 處理器如何處理 exec 請求。首先須要找到它的 exec 處理器,咱們直接從構造函數 streaming.NewServer() 開始往下找,由於這是將 /exec/{token} 路徑綁定到 serveExec 處理器的地方:

ws := &restful.WebService{}
endpoints := []struct {
	path    string
	handler restful.RouteFunction
}{
	{"/exec/{token}", s.serveExec},
	{"/attach/{token}", s.serveAttach},
	{"/portforward/{token}", s.servePortForward},
}

全部發送到 dockershim.dockerService 實例的請求最終都會在 streamingServer 處理器上完成,由於 dockerService.ServeHTTP() 方法會調用 streamingServer 實例的 ServeHTTP() 方法。

serveExec 處理器會調用 remoteCommand.ServeExec() 函數,這個函數又是幹嗎的呢?它會調用前面提到的 Executor.ExecInContainer() 方法,而 ExecInContainer() 方法是知道如何與 Docker exec API 通訊的:

// ServeExec handles requests to execute a command in a container. After
// creating/receiving the required streams, it delegates the actual execution
// to the executor.
func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podName string, uid types.UID, container string, cmd []string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) {
	// ...
	err := executor.ExecInContainer(podName, uid, container, cmd, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan, 0)
	if err != nil {
	// ...
	} else {
	// ...	
	}
}

3. 總結

本文經過解讀 kubectlAPI ServerCRI 的源碼,幫助你們理解 kubectl exec 命令的工做原理,固然,這裏並無涉及到 Docker exec API 的細節,也沒有涉及到 docker exec 的工做原理。

首先,kubectl 向 API Server 發出了 GETPOST 請求,API Server 返回了 101 Ugrade 響應,向客戶端表示已切換到 SPDY 協議。

隨後 API Server 使用 storage.PodStoragerest.ExecRest 來提供處理器的映射和執行邏輯,其中 rest.ExecRest 處理器決定 exec 要進入的節點。

最後 Kubelet 向 Docker shim 請求一個流式端點 URL,並將 exec 請求轉發到 Docker exec API。kubelet 再將這個 URL 以 Redirect 的方式返回給 API Server,請求就會重定向到到對應 Streaming Server 上發起的 exec 請求,並維護長鏈。

雖然本文只關注了 kubectl exec 命令,但其餘的子命令(例如 attachport-forwardlog 等等)也遵循了相似的實現模式:

kubectl


Kubernetes 1.18.2 1.17.5 1.16.9 1.15.12離線安裝包發佈地址http://store.lameleg.com ,歡迎體驗。 使用了最新的sealos v3.3.6版本。 做了主機名解析配置優化,lvscare 掛載/lib/module解決開機啓動ipvs加載問題, 修復lvscare社區netlink與3.10內核不兼容問題,sealos生成百年證書等特性。更多特性 https://github.com/fanux/sealos 。歡迎掃描下方的二維碼加入釘釘羣 ,釘釘羣已經集成sealos的機器人實時能夠看到sealos的動態。

相關文章
相關標籤/搜索