go微服務框架go-micro深度學習(五) stream 調用過程詳解

    上一篇寫了一下rpc調用過程的實現方式,簡單來講就是服務端把實現了接口的結構體對象進行反射,抽取方法,簽名,保存,客戶端調用的時候go-micro封請求數據,服務端接收到請求時,找到須要調用調用的對象和對應的方法,利用反射進行調用,返回數據。 可是沒有說stream的實現方式,感受單獨寫一篇帖子來講這個更好一些。上一篇帖子是基礎,理解了上一篇,stream實現原理一點即破。先說一下使用方式,再說原理。 當前go-micro對 rpc 調用的方式大概以下: 普通的rpc調用 是這樣:html

1.鏈接服務器或者從緩存池獲得鏈接
2.客戶端 ->發送數據 -> 服務端接收
3.服務端 ->返回數據 -> 客戶端處理數據
4.關閉鏈接或者把鏈接返回到緩存池
複製代碼

當前 rps stream的實現方式 是這樣子:git

1. 鏈接服務器
2. 客戶端屢次發送請求-> 服務端接收
3. 服務端屢次返回數據-> 客戶端處理數據
4. 關閉鏈接
複製代碼

    當數據量比較大的時候咱們能夠用stream方式分批次傳輸數據。對於客戶端仍是服務端沒有限制,咱們能夠根據本身的須要使用stream方式,使用方式也很是的簡單,在定義接口的時候在參數或者返回值前面加上stream而後就能夠屢次進行傳輸了,使用的代碼仍是以前寫的例子,代碼都在github上:     好比個人例子中定義了兩個使用stream的接口,一個只在返回值使用stream,另外一個是在參數和返回值前都加上了stream,最終的使用方式沒有區別github

rpc Stream(model.SRequest) returns (stream model.SResponse) {}
    rpc BidirectionalStream(stream model.SRequest) returns (stream model.SResponse) {}
複製代碼

看一下go-micro爲咱們生成的代碼rpcapi.micro.go裏,不要被嚇到,生成了不少代碼,可是沒啥理解不了的 Server端api

// Server API for Say service
type SayHandler interface {
	// .... others	
	Stream(context.Context, *model.SRequest, Say_StreamStream) error
	BidirectionalStream(context.Context, Say_BidirectionalStreamStream) error
}
type Say_StreamStream interface {
	SendMsg(interface{}) error
	RecvMsg(interface{}) error
	Close() error
	Send(*model.SResponse) error
}
type Say_BidirectionalStreamStream interface {
	SendMsg(interface{}) error
	RecvMsg(interface{}) error
	Close() error
	Send(*model.SResponse) error
	Recv() (*model.SRequest, error)
}
// .... others 
複製代碼

Client端緩存

// Client API for Say service
type SayService interface {	
    //... others
	Stream(ctx context.Context, in *model.SRequest, opts ...client.CallOption) (Say_StreamService, error)
	BidirectionalStream(ctx context.Context, opts ...client.CallOption) (Say_BidirectionalStreamService, error)
}

type Say_StreamService interface {
	SendMsg(interface{}) error
	RecvMsg(interface{}) error
	Close() error
	Recv() (*model.SResponse, error)
}

type Say_BidirectionalStreamService interface {
	SendMsg(interface{}) error
	RecvMsg(interface{}) error
	Close() error
	Send(*model.SRequest) error
	Recv() (*model.SResponse, error)
}
複製代碼

    你會發現參數前面加了 Stream後,生成的代碼會把你的參數變成一個接口,這個接口主要要的方法是bash

SendMsg(interface{}) error
	RecvMsg(interface{}) error
	Close() error
複製代碼

剩下的兩個接口方法是根據你是發送仍是接收生成的,若是有發送就會有Send(你的參數),若是有接收會生成Rev() (你的參數, error),但這兩個方法只是爲了讓你使用時方便,裏面調用的仍是SendMsg(interface)和RecvMsg(interface)方法,可是他們是怎麼工做的,如何屢次發送和接收傳輸的數據,是否是感受很神奇。服務器

我就以TsBidirectionalStream 方法爲例開始分析,上一篇和再早以前的帖子已經說了服務端啓動的時候都作了哪些操做,這裏就再也不贅述, 服務端的實現,很簡單,不斷的獲取客戶端發過來的數據,再給客戶端一次一次的返回一些數據。dom

/*
 模擬數據
 */
func (s *Say) BidirectionalStream(ctx context.Context, stream rpcapi.Say_BidirectionalStreamStream) error {
	for {
		req, err := stream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			return err
		}
		for i := int64(0); i < req.Count; i++ {
			if err := stream.Send(&model.SResponse{Value: []string {lib.RandomStr(lib.Random(3, 6))}}); err != nil {
				return err
			}
		}
	}
	return nil
}
複製代碼

啓動服務,服務開始監聽客戶端傳過來的數據..... 客戶端調用服務端方法:ui

// 調用 
func TsBidirectionalStream(client rpcapi.SayService) {
	rspStream, err := client.BidirectionalStream(context.Background())
	if err != nil {
		panic(err)
	}
	// send
	go func() {
		rspStream.Send(&model.SRequest{Count: 2})
		rspStream.Send(&model.SRequest{Count: 5})
        // close the stream
		if err := rspStream.Close(); err != nil {
			fmt.Println("stream close err:", err)
		}
	}()
     // recv
	idx := 1
	for  {
		rsp, err := rspStream.Recv()

		if err == io.EOF {
			break
		} else if err != nil {
			panic(err)
		}

		fmt.Printf("test stream get idx %d data %v\n", idx, rsp)
		idx++
	}
	fmt.Println("Read Value End")
}
複製代碼

當客戶端在調用rpc的stream方法是要很獲得streamspa

rspStream, err := client.BidirectionalStream(context.Background())
// 
func (c *sayService) BidirectionalStream(ctx context.Context, opts ...client.CallOption) (Say_BidirectionalStreamService, error) {
	req := c.c.NewRequest(c.name, "Say.BidirectionalStream", &model.SRequest{})
	stream, err := c.c.Stream(ctx, req, opts...)
	if err != nil {
		return nil, err
	}
	return &sayServiceBidirectionalStream{stream}, nil
}
複製代碼

這個調用c.c.Stream(ctx, req, opts...)是關鍵,他的內部實現就是和服務器進行鏈接,而後返回一個stream,進行操做。

客戶端:和服務端創建鏈接,返回Stream,進行接收和發送數據
服務端:接收客戶端鏈接請求,利用反射找到相應的方法,組織Strem,傳給方法,進行數據的發送和接收
複製代碼

創建鏈接的時候就是一次rpc調用,服務端接受鏈接,而後客戶端發送一次調用,可是傳輸的是空數據,服務端利用反射找到具體的方法,組織stream,調用具體方法,利用這個鏈接,客戶端和服務端進行屢次通訊。

相關文章
相關標籤/搜索