K8s中的命令執行由apiserver、kubelet、cri、docker等組件共同完成, 其中最複雜的就是協議切換以及各類流拷貝相關,讓咱們一塊兒來看下關鍵實現,雖然代碼比較多,可是不會開發應該也能看懂,祝你好運node
K8s中的命令執行中有不少協議相關的處理, 咱們先一塊兒看下這些協議處理相關的基礎概念web
HTTP/1.1中容許在同一個連接上經過Header頭中的Connection配合Upgrade來實現協議的轉換,簡單來講就是容許在經過HTTP創建的連接之上使用其餘的協議來進行通訊,這也是k8s中命令中實現協議升級的關鍵docker
在HTTP協議中除了咱們常見的HTTP1.1,還支持websocket/spdy等協議,那服務端和客戶端如何在http之上完成不一樣協議的切換呢,首先第一個要素就是這裏的101(Switching Protocal)狀態碼, 即服務端告知客戶端咱們切換到Uprage定義的協議上來進行通訊(複用當前連接)json
SPDY協議是google開發的TCP會話層協議, SPDY協議中將Http的Request/Response稱爲Stream,並支持TCP的連接複用,同時多個stream之間經過Stream-id來進行標記,簡單來講就是支持在單個連接同時進行多個請求響應的處理,而且互不影響,k8s中的命令執行主要也就是經過stream來進行消息傳遞的後端
在Linux中進程執行一般都會包含三個FD:標準輸入、標準輸出、標準錯誤, k8s中的命令執行會將對應的FD進行重定向,從而獲取容器的命令的輸出,重定向到哪呢?固然是咱們上面提到過的stream了(由於對docker並不熟悉,因此這個地方並不保證Docker部分的準確性)api
在client與server之間經過101狀態碼、connection、upragde等完成基於當前連接的轉換以後, 當前連接上傳輸的數據就不在是以前的http1.1協議了,此時就要將對應的http連接轉成對應的協議進行轉換,在k8s命令執行的過程當中,會獲取將對應的request和response,都經過http的Hijacker接口獲取底層的tcp連接,從而繼續完成請求的轉發緩存
在經過Hijacker獲取到兩個底層的tcp的readerwriter以後,就能夠直接經過io.copy在兩個流上完成對應數據的拷貝,這樣就不須要在apiserver這個地方進行協議的轉換,而是直接經過tcp的流對拷就能夠實現請求和結果的轉發微信
基礎大概就介紹這些,接下來咱們一塊兒去看看其底層的具體實現,咱們從kubectl部分開始來逐層分析websocket
Kubectl執行命令主要分爲兩部分Pod合法性檢測和命令執行, Pod合法性檢測主要是獲取對應Pod的狀態,檢測是否在運行, 這裏咱們重點關注下命令執行部分restful
命令執行的核心分爲兩個步驟:1.經過SPDY協議創建連接 2)構建Stream創建連接
func (*DefaultRemoteExecutor) Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error { exec, err := remotecommand.NewSPDYExecutor(config, method, url) if err != nil { return err } return exec.Stream(remotecommand.StreamOptions{ Stdin: stdin, Stdout: stdout, Stderr: stderr, Tty: tty, TerminalSizeQueue: terminalSizeQueue, }) }
咱們能夠看到這個地方拼接的Url /pods/{namespace}/{podName}/exec其實就是對應apiserver上面pod的subresource接口,而後咱們就能夠去看apiserver端的請求處理了
// 建立一個exec req := restClient.Post(). Resource("pods"). Name(pod.Name). Namespace(pod.Namespace). SubResource("exec") req.VersionedParams(&corev1.PodExecOptions{ Container: containerName, Command: p.Command, Stdin: p.Stdin, Stdout: p.Out != nil, Stderr: p.ErrOut != nil, TTY: t.Raw, }, scheme.ParameterCodec) return p.Executor.Execute("POST", req.URL(), p.Config, p.In, p.Out, p.ErrOut, t.Raw, sizeQueue)
在exec.Stream主要是經過Headers傳遞要創建的Stream的類型,與server端進行協商
// set up stdin stream if p.Stdin != nil { headers.Set(v1.StreamType, v1.StreamTypeStdin) p.remoteStdin, err = conn.CreateStream(headers) if err != nil { return err } } // set up stdout stream if p.Stdout != nil { headers.Set(v1.StreamType, v1.StreamTypeStdout) p.remoteStdout, err = conn.CreateStream(headers) if err != nil { return err } } // set up stderr stream if p.Stderr != nil && !p.Tty { headers.Set(v1.StreamType, v1.StreamTypeStderr) p.remoteStderr, err = conn.CreateStream(headers) if err != nil { return err } }
APIServer在命令執行的過程當中扮演了代理的角色,其負責將Kubectl和kubelet之間的請求來進行轉發,注意這個轉發主要是基於tcp的流對拷完成的,由於kubectl和kubelet之間的通訊,其實是spdy協議,讓咱們一塊兒看下關鍵實現吧
Exec的SPDY請求會首先發送到Connect接口, Connection接口負責跟後端的kubelet進行連接的創建,而且進行響應結果的返回,在Connection接口中,首先會經過Pod獲取到對應的Node信息,而且構建Location即後端的Kubelet的連接地址和transport
func (r *ExecREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) { execOpts, ok := opts.(*api.PodExecOptions) if !ok { return nil, fmt.Errorf("invalid options object: %#v", opts) } // 返回對應的地址,以及創建連接 location, transport, err := pod.ExecLocation(r.Store, r.KubeletConn, ctx, name, execOpts) if err != nil { return nil, err } return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, true, responder), nil }
在獲取地址主要是構建後端的location信息,這裏會經過kubelet上報來的信息獲取到對應的node的host和Port信息,而且拼裝出pod的最終指向路徑即這裏的Path字段/exec/{namespace}/{podName}/{containerName}
loc := &url.URL{ Scheme: nodeInfo.Scheme, Host: net.JoinHostPort(nodeInfo.Hostname, nodeInfo.Port), // node的端口 Path: fmt.Sprintf("/%s/%s/%s/%s", path, pod.Namespace, pod.Name, container), // 路徑 RawQuery: params.Encode(), }
協議提高主要是經過UpgradeAwareHandler控制器進行實現, 該handler接收到請求以後會首先嚐試進行協議提高,其主要是檢測http頭裏面的Connection的值是否是Upragde來實現, 從以前kubelet的分析中能夠知道這裏確定是true
func newThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired, interceptRedirects bool, responder rest.Responder) *proxy.UpgradeAwareHandler { handler := proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, proxy.NewErrorResponder(responder)) handler.InterceptRedirects = interceptRedirects && utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StreamingProxyRedirects) handler.RequireSameHostRedirects = utilfeature.DefaultFeatureGate.Enabled(genericfeatures.ValidateProxyRedirects) handler.MaxBytesPerSec = capabilities.Get().PerConnectionBandwidthLimitBytesPerSec return handler } func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { // 若是協議提高成功,則由該協議完成 if h.tryUpgrade(w, req) { return } // 省略N多代碼 }
協議提高處理的邏輯比較多,這裏分爲幾個小節來進行依次說明, 主要是先從HTTP連接中獲取請求,並進行轉發,而後同時持有兩個連接,而且在連接上進行TCP流的拷貝
協議提高的第一步就是與後端的kubelet創建連接了,這裏會將kubelet發過來的請求進行拷貝,而且發送給後端的kubelet, 而且這裏也會獲取到一個與kubelet創建的http的連接,後面進行流對拷的時候須要用到, 注意實際上這個http請求響應的狀態碼,是101,即kubelet上其實是構建了一個spdy協議的handler來進行通訊的
// 構建http請求 req, err := http.NewRequest(method, location.String(), body) if err != nil { return nil, nil, err } req.Header = header // 發送請求創建連接 intermediateConn, err = dialer.Dial(req) if err != nil { return nil, nil, err } // Peek at the backend response. rawResponse.Reset() respReader := bufio.NewReader(io.TeeReader( io.LimitReader(intermediateConn, maxResponseSize), // Don't read more than maxResponseSize bytes. rawResponse)) // Save the raw response. // 讀取響應信息 resp, err := http.ReadResponse(respReader, nil)
這個請求其實是spdy協議的,在經過Hijack獲取到底層的連接以後,須要先將上面的請求轉發給kubelet從而觸發kubelet發送後面的Stream請求創建連接,就是這裏的Write將kubelet的結果轉發
requestHijackedConn, _, err := requestHijacker.Hijack() // Forward raw response bytes back to client. if len(rawResponse) > 0 { klog.V(6).Infof("Writing %d bytes to hijacked connection", len(rawResponse)) if _, err = requestHijackedConn.Write(rawResponse); err != nil { utilruntime.HandleError(fmt.Errorf("Error proxying response from backend to client: %v", err)) } }
通過上面的兩步操做,apiserver上就擁有來了兩個http連接,由於協議不是http的因此apiserver不能直接進行操做,而只能採用流對拷的方式來進行請求和響應的轉發
// 雙向拷貝連接 go func() { var writer io.WriteCloser if h.MaxBytesPerSec > 0 { writer = flowrate.NewWriter(backendConn, h.MaxBytesPerSec) } else { writer = backendConn } _, err := io.Copy(writer, requestHijackedConn) if err != nil && !strings.Contains(err.Error(), "use of closed network connection") { klog.Errorf("Error proxying data from client to backend: %v", err) } close(writerComplete) }() go func() { var reader io.ReadCloser if h.MaxBytesPerSec > 0 { reader = flowrate.NewReader(backendConn, h.MaxBytesPerSec) } else { reader = backendConn } _, err := io.Copy(requestHijackedConn, reader) if err != nil && !strings.Contains(err.Error(), "use of closed network connection") { klog.Errorf("Error proxying data from backend to client: %v", err) } close(readerComplete) }()
Kubelet上的命令執行主要是依賴於CRI.RuntimeService來執行的,kubelet只負責對應請求的轉發,並最終構建一個轉發後續請求的Stream代理,就完成了他的使命
主流程主要是獲取要執行的命令,而後檢測對應的Pod新,並調用host.GetExec返回一個對應的URL,而後後續的請求就由proxyStream來完成, 咱們一步步開始深刻
func (s *Server) getExec(request *restful.Request, response *restful.Response) { // 獲取執行命令 params := getExecRequestParams(request) streamOpts, err := remotecommandserver.NewOptions(request.Request) // 獲取pod的信息 pod, ok := s.host.GetPodByName(params.podNamespace, params.podName) podFullName := kubecontainer.GetPodFullName(pod) url, err := s.host.GetExec(podFullName, params.podUID, params.containerName, params.cmd, *streamOpts) proxyStream(response.ResponseWriter, request.Request, url) }
host.GetExec最終會調用到runtimeService即cri.RuntimeService的Exec接口來進行請求的執行,該接口會返回一個地址即/exec/{token},此時並無執行真正的命令只是建立了一個命令執行請求而已
func (m *kubeGenericRuntimeManager) GetExec(id kubecontainer.ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error) { // 省略請求構造 // 執行命令 resp, err := m.runtimeService.Exec(req) return url.Parse(resp.Url) }
最終其實就是調用cri的的exec接口, 咱們先忽略該接口具體返回的啥,將kubelet剩餘的邏輯看完
func (c *runtimeServiceClient) Exec(ctx context.Context, in *ExecRequest, opts ...grpc.CallOption) (*ExecResponse, error) { err := c.cc.Invoke(ctx, "/runtime.v1alpha2.RuntimeService/Exec", in, out, opts...) }
這裏咱們能夠發現,又是咱們以前見過的UpgradeAwareHandler,不過此次的url是後端exec執行返回的url了,而後剩下部分就跟apiserver裏面的差很少,在兩個http連接之間進行流對拷
咱們想一下這個地方Request和Response,實際上是對應的apiserver與kubelet創建的連接,這個連接上是spdy的頭,記住這個地方, 則此時又跟後端繼續創建連接,後端其實也是一個spdy協議的server, 至此咱們還差最後一個部分就是返回的那個連接究竟是啥,對應的控制器又是誰,進行下一節cri部分
// proxyStream proxies stream to url. func proxyStream(w http.ResponseWriter, r *http.Request, url *url.URL) { // TODO(random-liu): Set MaxBytesPerSec to throttle the stream. handler := proxy.NewUpgradeAwareHandler(url, nil /*transport*/, false /*wrapTransport*/, true /*upgradeRequired*/, &responder{}) handler.ServeHTTP(w, r) }
CRI.RuntimeService負責最終的命令執行,也是命令執行真正執行的位置,其中也涉及到不少的協議處理相關的操做,讓咱們一塊兒來看下關鍵實現吧
在上面咱們調用了RuntimeService的Exec接口,在kubelet中最終發現以下代碼,建立了一個DockerServer並啓動
ds, err := dockershim.NewDockerService(kubeDeps.DockerClientConfig, crOptions.PodSandboxImage, streamingConfig, dockerServer := dockerremote.NewDockerServer(remoteRuntimeEndpoint, ds) if err := dockerServer.Start(); err != nil { return err }
其中在Start函數裏面,註冊了下面兩個RuntimeService,寫過grpc的朋友都知道,這個其實就是註冊對應rpc接口的實現,其實最終咱們調用的是DockerService的接口
runtimeapi.RegisterRuntimeServiceServer(s.server, s.service) runtimeapi.RegisterImageServiceServer(s.server, s.service)
Exec最終的實現能夠發現其實是調用streamingServer的GetExec接口,返回了一個/exec/{token}的接口
func (ds *dockerService) Exec(_ context.Context, req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) { // 執行Exec請求 return ds.streamingServer.GetExec(req) }
咱們繼續追蹤streamingServer能夠看到GetExec接口實現以下, 最終build了一個url=/exec/{token},注意這裏實際上存儲了當前的Request請求在cache中
func (s *server) GetExec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) { // 生成token token, err := s.cache.Insert(req) return &runtimeapi.ExecResponse{ Url: s.buildURL("exec", token), }, nil }
首先經過token來獲取以前緩存的Request,而後經過exec請求命令,構建StreamOpts,並最終調用ServeExec進行執行,接下來就是最不容易看懂的部分了,前方高能
func (s *server) serveExec(req *restful.Request, resp *restful.Response) { // 獲取token token := req.PathParameter("token") // 緩存請求 cachedRequest, ok := s.cache.Consume(token) // 構建exec參數s exec, ok := cachedRequest.(*runtimeapi.ExecRequest) streamOpts := &remotecommandserver.Options{ Stdin: exec.Stdin, Stdout: exec.Stdout, Stderr: exec.Stderr, TTY: exec.Tty, } // 構建ServerExec執行請求 remotecommandserver.ServeExec( resp.ResponseWriter, req.Request, s.runtime, "", // unused: podName "", // unusued: podUID exec.ContainerId, exec.Cmd, streamOpts, s.config.StreamIdleTimeout, s.config.StreamCreationTimeout, s.config.SupportedRemoteCommandProtocols) }
ServerExec關鍵步驟就兩個:1)建立stream 2)執行請求, 比較複雜的主要是集中在建立stream部分,咱們注意下ExecInContainer的參數部分,傳入了經過建立流獲取的ctx的相關文件描述符的Stream, createStreams裏面的實現有兩種協議websocket和https,這裏咱們主要分析https(咱們使用kubectl使用的就是https協議)
func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podName string, uid types.UID, container string, cmd []string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) { // 建立serveExec ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout) defer ctx.conn.Close() // 獲取執行,這是一個阻塞的過程,err會獲取當前的執行是否成功, 這裏將ctx裏面的信息,都傳入進去,對應的其實就是各類流 err := executor.ExecInContainer(podName, uid, container, cmd, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan, 0) }
Stream的創建我將其歸納成下面幾個步驟:1)進行https的握手 2)協議升級爲spdy 3)等待stream的創建,咱們依次來看
1.完成https的握手
protocol, err := httpstream.Handshake(req, w, supportedStreamProtocols)
2.協議提高
// 流管道 streamCh := make(chan streamAndReply) upgrader := spdy.NewResponseUpgrader() // 構建spdy連接 conn := upgrader.UpgradeResponse(w, req, func(stream httpstream.Stream, replySent <-chan struct{}) error { // 當新請求創建以後,會追加到streamch streamCh <- streamAndReply{Stream: stream, replySent: replySent} return nil })
這裏有一個關鍵機制就是後面func回調函數的傳遞和streamch的傳遞,這裏創建一個連接以後會建立一個Server,而且傳入了一個控制器就是func回調函數,該函數每當創建一個連接以後,若是獲取到對應的stream就追加到StreamCh中,下面就是最複雜的網絡處理部分了,由於太複雜,因此仍是單獨開一節吧
整體流程上看起來簡單,主要是先根據請求來進行協議切換,而後返回101,而且基於當前的連接構建SPDY的請求處理,而後等待kubectl經過apiserver發送的須要創建的Stream,就完成了彼此通訊流stream的創建
首先第一步會先進行協議提高的響應,這裏咱們注意幾個關鍵部分,spdy協議,以及101狀態碼
// 協議 hijacker, ok := w.(http.Hijacker) if !ok { errorMsg := fmt.Sprintf("unable to upgrade: unable to hijack response") http.Error(w, errorMsg, http.StatusInternalServerError) return nil } w.Header().Add(httpstream.HeaderConnection, httpstream.HeaderUpgrade) // sydy協議 w.Header().Add(httpstream.HeaderUpgrade, HeaderSpdy31) w.WriteHeader(http.StatusSwitchingProtocols)
spdyConn, err := NewServerConnection(connWithBuf, newStreamHandler)
最終會經過newConnection負責新連接的創建
func NewServerConnection(conn net.Conn, newStreamHandler httpstream.NewStreamHandler) (httpstream.Connection, error) { // 建立一個新的連接, 經過一個已經存在的網絡連接 spdyConn, err := spdystream.NewConnection(conn, true) return newConnection(spdyConn, newStreamHandler), nil }
這裏咱們能夠看到是啓動一個後臺的server來進行連接請求的處理
func newConnection(conn *spdystream.Connection, newStreamHandler httpstream.NewStreamHandler) httpstream.Connection { c := &connection{conn: conn, newStreamHandler: newStreamHandler} // 當創建連接後,進行syn請求建立流的時候,會調用newSpdyStream go conn.Serve(c.newSpdyStream) return c }
1.首先會啓動多個goroutine來負責請求的處理,這裏的worker數量是5個,隊列大小是20,
frameQueues := make([]*PriorityFrameQueue, FRAME_WORKERS) for i := 0; i < FRAME_WORKERS; i++ { frameQueues[i] = NewPriorityFrameQueue(QUEUE_SIZE) // Ensure frame queue is drained when connection is closed go func(frameQueue *PriorityFrameQueue) { <-s.closeChan frameQueue.Drain() }(frameQueues[i]) wg.Add(1) go func(frameQueue *PriorityFrameQueue) { // let the WaitGroup know this worker is done defer wg.Done() s.frameHandler(frameQueue, newHandler) }(frameQueues[i]) }
2.監聽synStreamFrame,分流frame,會按照frame的streamID來進行hash選擇對應的frameQueues隊列
case *spdy.SynStreamFrame: if s.checkStreamFrame(frame) { priority = frame.Priority partition = int(frame.StreamId % FRAME_WORKERS) debugMessage("(%p) Add stream frame: %d ", s, frame.StreamId) // 添加到對應的StreamId對應的frame裏面 s.addStreamFrame(frame) } else { debugMessage("(%p) Rejected stream frame: %d ", s, frame.StreamId) continue // 最終會講frame push到上面的優先級隊列裏面 frameQueues[partition].Push(readFrame, priority)
3.讀取frame進行並把讀取到的stream經過newHandler傳遞給上面的StreamCH
func (s *Connection) frameHandler(frameQueue *PriorityFrameQueue, newHandler StreamHandler) { for { popFrame := frameQueue.Pop() if popFrame == nil { return } var frameErr error switch frame := popFrame.(type) { case *spdy.SynStreamFrame: frameErr = s.handleStreamFrame(frame, newHandler) } }
消費的流到下一節
Stream的等待創建主要是經過Headers裏面的StreamType來實現,這裏面會講對應的stdinStream和對應的spdy裏面的stream綁定,其餘類型也是這樣
func (*v3ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) { ctx := &context{} receivedStreams := 0 replyChan := make(chan struct{}) stop := make(chan struct{}) defer close(stop) WaitForStreams: for { select { case stream := <-streams: streamType := stream.Headers().Get(api.StreamType) switch streamType { case api.StreamTypeError: ctx.writeStatus = v1WriteStatusFunc(stream) go waitStreamReply(stream.replySent, replyChan, stop) case api.StreamTypeStdin: ctx.stdinStream = stream go waitStreamReply(stream.replySent, replyChan, stop) case api.StreamTypeStdout: ctx.stdoutStream = stream go waitStreamReply(stream.replySent, replyChan, stop) case api.StreamTypeStderr: ctx.stderrStream = stream go waitStreamReply(stream.replySent, replyChan, stop) case api.StreamTypeResize: ctx.resizeStream = stream go waitStreamReply(stream.replySent, replyChan, stop) default: runtime.HandleError(fmt.Errorf("unexpected stream type: %q", streamType)) } case <-replyChan: receivedStreams++ if receivedStreams == expectedStreams { break WaitForStreams } case <-expired: // TODO find a way to return the error to the user. Maybe use a separate // stream to report errors? return nil, errors.New("timed out waiting for client to create streams") } } return ctx, nil }
跟蹤調用鏈最終能夠看到以下的調用,最終指向了execHandler.ExecInContainer接口用於在容器中執行命令
func (a *criAdapter) ExecInContainer(podName string, podUID types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { // 執行command return a.Runtime.Exec(container, cmd, in, out, err, tty, resize) } func (r *streamingRuntime) Exec(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error { // 執行容器 return r.exec(containerID, cmd, in, out, err, tty, resize, 0) } // Internal version of Exec adds a timeout. func (r *streamingRuntime) exec(containerID string, cmd []string, in io.Reader, out, errw io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { // exechandler return r.execHandler.ExecInContainer(r.client, container, cmd, in, out, errw, tty, resize, timeout) }
命令的指向的主流程主要分爲兩個部分:1)建立exec執行任務 2)啓動exec執行任務
func (*NativeExecHandler) ExecInContainer(client libdocker.Interface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { // 在容器中執行命令 done := make(chan struct{}) defer close(done) // 執行命令 createOpts := dockertypes.ExecConfig{ Cmd: cmd, AttachStdin: stdin != nil, AttachStdout: stdout != nil, AttachStderr: stderr != nil, Tty: tty, } // 建立執行命令任務 execObj, err := client.CreateExec(container.ID, createOpts) startOpts := dockertypes.ExecStartCheck{Detach: false, Tty: tty} // 這裏咱們能夠看到咱們將前面獲取到的stream的封裝,都做爲FD傳入到容器的執行命令裏面去了 streamOpts := libdocker.StreamOptions{ InputStream: stdin, OutputStream: stdout, ErrorStream: stderr, RawTerminal: tty, ExecStarted: execStarted, } // 執行命令 err = client.StartExec(execObj.ID, startOpts, streamOpts) ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() count := 0 for { // 獲取執行結果 inspect, err2 := client.InspectExec(execObj.ID) if !inspect.Running { if inspect.ExitCode != 0 { err = &dockerExitError{inspect} } break } <-ticker.C } return err }
Docker的命令執行接口調用
func (cli *Client) ContainerExecCreate(ctx context.Context, container string, config types.ExecConfig) (types.IDResponse, error) { resp, err := cli.post(ctx, "/containers/"+container+"/exec", nil, config, nil) return response, err }
命令執行的核心實現主要是兩個步驟:1)首先發送exec執行請求 2)啓動對應的exec並獲取結果, 複雜的仍是SPDY相關的Stream的邏輯
func (d *kubeDockerClient) StartExec(startExec string, opts dockertypes.ExecStartCheck, sopts StreamOptions) error { // 啓動執行命令, 獲取結果 resp, err := d.client.ContainerExecAttach(ctx, startExec, dockertypes.ExecStartCheck{ Detach: opts.Detach, Tty: opts.Tty, }) // 將輸入流拷貝到輸出流, 這裏會講resp裏面的結果拷貝到outputSTream裏面 return d.holdHijackedConnection(sopts.RawTerminal || opts.Tty, sopts.InputStream, sopts.OutputStream, sopts.ErrorStream, resp) }
cli.postHijacked(ctx, "/exec/"+execID+"/start", nil, config, headers)
這裏的HiHijackConn功能跟以前介紹的相似,其核心也是經過創建http連接,而後進行協議提高,其conn就是底層的tcp連接,同時還給對應的連接設置了Keepliave當前是30s, 到此咱們就又有了一個基於spdy雙向通訊的連接
func (cli *Client) postHijacked(ctx context.Context, path string, query url.Values, body interface{}, headers map[string][]string) (types.HijackedResponse, error) { conn, err := cli.setupHijackConn(ctx, req, "tcp") return types.HijackedResponse{Conn: conn, Reader: bufio.NewReader(conn)}, err }
至此在kubelet上面咱們獲取到了與後端執行命令的Stream還有與apiserver創建的Stream, 此時就只須要將兩個流直接進行拷貝,就能夠實現數據的傳輸了
func (d *kubeDockerClient) holdHijackedConnection(tty bool, inputStream io.Reader, outputStream, errorStream io.Writer, resp dockertypes.HijackedResponse) error { receiveStdout := make(chan error) if outputStream != nil || errorStream != nil { // 將響應結果拷貝到outputstream裏面 go func() { receiveStdout <- d.redirectResponseToOutputStream(tty, outputStream, errorStream, resp.Reader) }() } stdinDone := make(chan struct{}) go func() { if inputStream != nil { io.Copy(resp.Conn, inputStream) } resp.CloseWrite() close(stdinDone) }() return nil }
在發生完成執行命令之後,會每隔2s鍾進行一次執行狀態的檢查,若是發現執行完成,則就直接退出
func (*NativeExecHandler) ExecInContainer(client libdocker.Interface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() count := 0 for { // 獲取執行結果 inspect, err2 := client.InspectExec(execObj.ID) if err2 != nil { return err2 } if !inspect.Running { if inspect.ExitCode != 0 { err = &dockerExitError{inspect} } break } <-ticker.C } return err } func (cli *Client) ContainerExecInspect(ctx context.Context, execID string) (types.ContainerExecInspect, error) { resp, err := cli.get(ctx, "/exec/"+execID+"/json", nil, nil) return response, err }
整個命令執行的過程其實仍是蠻複雜的,主要是在於網絡協議切換那部分,咱們能夠看到其實在整個過程當中,都是基於SPDY協議來進行的,而且在CRI.RuntimeService那部分咱們也能夠看到Stream的請求處理其實也是多goroutine併發的,仰慕一下大牛的設計,有什麼寫的不對的地方,歡迎一塊兒討論,謝謝大佬們能看到這裏
kubernetes學習筆記地址: https://www.yuque.com/baxiaoshi/tyado3
微信號:baxiaoshi2020
關注公告號閱讀更多源碼分析文章
更多文章關注 www.sreguide.com