gRPC 在 Go 語言中大放異彩,愈來愈多的小夥伴在使用,最近也在公司安利了一波,但願這一篇文章能帶你一覽 gRPC 的巧妙之處,本文篇幅比較長,請作好閱讀準備。本文目錄以下:html
gRPC 是一個高性能、開源和通用的 RPC 框架,面向移動和 HTTP/2 設計。目前提供 C、Java 和 Go 語言版本,分別是:grpc, grpc-java, grpc-go. 其中 C 版本支持 C, C++, Node.js, Python, Ruby, Objective-C, PHP 和 C# 支持。java
gRPC 基於 HTTP/2 標準設計,帶來諸如雙向流、流控、頭部壓縮、單 TCP 鏈接上的多複用請求等特性。這些特性使得其在移動設備上表現更好,更省電和節省空間佔用。git
一、客戶端(gRPC Stub)調用 A 方法,發起 RPC 調用。github
二、對請求信息使用 Protobuf 進行對象序列化壓縮(IDL)。golang
三、服務端(gRPC Server)接收到請求後,解碼請求體,進行業務邏輯處理並返回。web
四、對響應結果使用 Protobuf 進行對象序列化壓縮(IDL)。算法
五、客戶端接受到服務端響應,解碼請求體。回調被調用的 A 方法,喚醒正在等待響應(阻塞)的客戶端調用並返回響應結果。api
type SearchService struct{} func (s *SearchService) Search(ctx context.Context, r *pb.SearchRequest) (*pb.SearchResponse, error) { return &pb.SearchResponse{Response: r.GetRequest() + " Server"}, nil } const PORT = "9001" func main() { server := grpc.NewServer() pb.RegisterSearchServiceServer(server, &SearchService{}) lis, err := net.Listen("tcp", ":"+PORT) ... server.Serve(lis) }
func main() { conn, err := grpc.Dial(":"+PORT, grpc.WithInsecure()) ... defer conn.Close() client := pb.NewSearchServiceClient(conn) resp, err := client.Search(context.Background(), &pb.SearchRequest{ Request: "gRPC", }) ... }
func (s *StreamService) List(r *pb.StreamRequest, stream pb.StreamService_ListServer) error { for n := 0; n <= 6; n++ { stream.Send(&pb.StreamResponse{ Pt: &pb.StreamPoint{ ... }, }) } return nil }
func printLists(client pb.StreamServiceClient, r *pb.StreamRequest) error { stream, err := client.List(context.Background(), r) ... for { resp, err := stream.Recv() if err == io.EOF { break } ... } return nil }
func (s *StreamService) Record(stream pb.StreamService_RecordServer) error { for { r, err := stream.Recv() if err == io.EOF { return stream.SendAndClose(&pb.StreamResponse{Pt: &pb.StreamPoint{...}}) } ... } return nil }
func printRecord(client pb.StreamServiceClient, r *pb.StreamRequest) error { stream, err := client.Record(context.Background()) ... for n := 0; n < 6; n++ { stream.Send(r) } resp, err := stream.CloseAndRecv() ... return nil }
func (s *StreamService) Route(stream pb.StreamService_RouteServer) error { for { stream.Send(&pb.StreamResponse{...}) r, err := stream.Recv() if err == io.EOF { return nil } ... } return nil }
func printRoute(client pb.StreamServiceClient, r *pb.StreamRequest) error { stream, err := client.Route(context.Background()) ... for n := 0; n <= 6; n++ { stream.Send(r) resp, err := stream.Recv() if err == io.EOF { break } ... } stream.CloseSend() return nil }
在開始分析以前,咱們要先 gRPC 的調用有一個初始印象。那麼最簡單的就是對 Client 端調用 Server 端進行抓包去剖析,看看整個過程當中它都作了些什麼事。以下圖:服務器
咱們略加整理髮現共有十二個行爲,是比較重要的。在開始分析以前,建議你本身先想一下,它們的做用都是什麼?大膽猜想一下,帶着疑問去學習效果更佳。網絡
Magic 幀的主要做用是創建 HTTP/2 請求的前言。在 HTTP/2 中,要求兩端都要發送一個鏈接前言,做爲對所使用協議的最終確認,並肯定 HTTP/2 鏈接的初始設置,客戶端和服務端各自發送不一樣的鏈接前言。
而上圖中的 Magic 幀是客戶端的前言之一,內容爲 PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n
,以肯定啓用 HTTP/2 鏈接。
SETTINGS 幀的主要做用是設置這一個鏈接的參數,做用域是整個鏈接而並不是單一的流。
而上圖的 SETTINGS 幀都是空 SETTINGS 幀,圖一是客戶端鏈接的前言(Magic 和 SETTINGS 幀分別組成鏈接前言)。圖二是服務端的。另外咱們從圖中能夠看到多個 SETTINGS 幀,這是爲何呢?是由於發送完鏈接前言後,客戶端和服務端還須要有一步互動確認的動做。對應的就是帶有 ACK 標識 SETTINGS 幀。
HEADERS 幀的主要做用是存儲和傳播 HTTP 的標頭信息。咱們關注到 HEADERS 裏有一些眼熟的信息,分別以下:
你會發現這些東西很是眼熟,其實都是 gRPC 的基礎屬性,實際上遠遠不止這些,只是設置了多少展現多少。例如像平時常見的 grpc-timeout
、grpc-encoding
也是在這裏設置的。
DATA 幀的主要做用是裝填主體信息,是數據幀。而在上圖中,能夠很明顯看到咱們的請求參數 gRPC 存儲在裏面。只須要了解到這一點就能夠了。
在上圖中 HEADERS 幀比較簡單,就是告訴咱們 HTTP 響應狀態和響應的內容格式。
在上圖中 DATA 幀主要承載了響應結果的數據集,圖中的 gRPC Server 就是咱們 RPC 方法的響應結果。
在上圖中 HEADERS 幀主要承載了 gRPC 狀態 和 gRPC 狀態消息,圖中的 grpc-status
和 grpc-message
就是咱們的 gRPC 調用狀態的結果。
主要做用是管理和流的窗口控制。一般狀況下打開一個鏈接後,服務器和客戶端會當即交換 SETTINGS 幀來肯定流控制窗口的大小。默認狀況下,該大小設置爲約 65 KB,但可經過發出一個 WINDOW_UPDATE 幀爲流控制設置不一樣的大小。
主要做用是判斷當前鏈接是否仍然可用,也經常使用於計算往返時間。其實也就是 PING/PONG,你們對此應該很熟。
這塊 gRPC 的基礎使用,你能夠看看我另外的 《gRPC 入門系列》,相信對你必定有幫助。
爲何四行代碼,就可以起一個 gRPC Server,內部作了什麼邏輯。你有想過嗎?接下來咱們一步步剖析,看看裏面究竟是何方神聖。
// grpc.NewServer() func NewServer(opt ...ServerOption) *Server { opts := defaultServerOptions for _, o := range opt { o(&opts) } s := &Server{ lis: make(map[net.Listener]bool), opts: opts, conns: make(map[io.Closer]bool), m: make(map[string]*service), quit: make(chan struct{}), done: make(chan struct{}), czData: new(channelzData), } s.cv = sync.NewCond(&s.mu) ... return s }
這塊比較簡單,主要是實例 grpc.Server 並進行初始化動做。涉及以下:
pb.RegisterSearchServiceServer(server, &SearchService{})
// search.pb.go type SearchServiceServer interface { Search(context.Context, *SearchRequest) (*SearchResponse, error) } func RegisterSearchServiceServer(s *grpc.Server, srv SearchServiceServer) { s.RegisterService(&_SearchService_serviceDesc, srv) }
還記得咱們平時編寫的 Protobuf 嗎?在生成出來的 .pb.go
文件中,會定義出 Service APIs interface 的具體實現約束。而咱們在 gRPC Server 進行註冊時,會傳入應用 Service 的功能接口實現,此時生成的 RegisterServer
方法就會保證二者之間的一致性。
你想亂傳糊弄一下?不可能的,請乖乖定義與 Protobuf 一致的接口方法。可是那個 &_SearchService_serviceDesc
又有什麼做用呢?代碼以下:
// search.pb.go var _SearchService_serviceDesc = grpc.ServiceDesc{ ServiceName: "proto.SearchService", HandlerType: (*SearchServiceServer)(nil), Methods: []grpc.MethodDesc{ { MethodName: "Search", Handler: _SearchService_Search_Handler, }, }, Streams: []grpc.StreamDesc{}, Metadata: "search.proto", }
這看上去像服務的描述代碼,用來向內部表述 「我」 都有什麼。涉及以下:
Handler
方法,其對應最終的 RPC 處理方法,在執行 RPC 方法的階段會使用。SearchServiceServer
服務func (s *Server) register(sd *ServiceDesc, ss interface{}) { ... srv := &service{ server: ss, md: make(map[string]*MethodDesc), sd: make(map[string]*StreamDesc), mdata: sd.Metadata, } for i := range sd.Methods { d := &sd.Methods[i] srv.md[d.MethodName] = d } for i := range sd.Streams { ... } s.m[sd.ServiceName] = srv }
在最後一步中,咱們會將先前的服務接口信息、服務描述信息給註冊到內部 service
去,以便於後續實際調用的使用。涉及以下:
在這一章節中,主要介紹的是 gRPC Server 在啓動前的整理和註冊行爲,看上去很簡單,但其實一切都是爲了後續的實際運行的預先準備。所以咱們整理一下思路,將其串聯起來看看,以下:
接下來到了整個流程中,最重要也是你們最關注的監聽/處理階段,核心代碼以下:
func (s *Server) Serve(lis net.Listener) error { ... var tempDelay time.Duration for { rawConn, err := lis.Accept() if err != nil { if ne, ok := err.(interface { Temporary() bool }); ok && ne.Temporary() { if tempDelay == 0 { tempDelay = 5 * time.Millisecond } else { tempDelay *= 2 } if max := 1 * time.Second; tempDelay > max { tempDelay = max } ... timer := time.NewTimer(tempDelay) select { case <-timer.C: case <-s.quit: timer.Stop() return nil } continue } ... return err } tempDelay = 0 s.serveWG.Add(1) go func() { s.handleRawConn(rawConn) s.serveWG.Done() }() } }
Serve 會根據外部傳入的 Listener 不一樣而調用不一樣的監聽模式,這也是 net.Listener
的魅力,靈活性和擴展性會比較高。而在 gRPC Server 中最經常使用的就是 TCPConn
,基於 TCP Listener 去作。接下來咱們一塊兒看看具體的處理邏輯,以下:
lis.Accept
取出鏈接,若是隊列中沒有需處理的鏈接時,會造成阻塞等待。lis.Accept
失敗,則觸發休眠機制,若爲第一次失敗那麼休眠 5ms,不然翻倍,再次失敗則不斷翻倍直至上限休眠時間 1s,而休眠完畢後就會嘗試去取下一個 「它」。lis.Accept
成功,則重置休眠的時間計數和啓動一個新的 goroutine 調用 handleRawConn
方法去執行/處理新的請求,也就是你們很喜歡說的 「每個請求都是不一樣的 goroutine 在處理」。// grpc.Dial(":"+PORT, grpc.WithInsecure()) func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { cc := &ClientConn{ target: target, csMgr: &connectivityStateManager{}, conns: make(map[*addrConn]struct{}), dopts: defaultDialOptions(), blockingpicker: newPickerWrapper(), czData: new(channelzData), firstResolveEvent: grpcsync.NewEvent(), } ... chainUnaryClientInterceptors(cc) chainStreamClientInterceptors(cc) ... }
grpc.Dial
方法其實是對於 grpc.DialContext
的封裝,區別在於 ctx
是直接傳入 context.Background
。其主要功能是建立與給定目標的客戶端鏈接,其承擔瞭如下職責:
以前聽到有的人說調用 grpc.Dial
後客戶端就已經與服務端創建起了鏈接,但這對不對呢?咱們先鳥瞰全貌,看看正在跑的 goroutine。以下:
咱們能夠有幾個核心方法一直在等待/處理信號,經過分析底層源碼可得知。涉及以下:
func (ac *addrConn) connect() func (ac *addrConn) resetTransport() func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) func (ac *addrConn) getReadyTransport()
在這裏主要分析 goroutine 提示的 resetTransport
方法,看看都作了啥。核心代碼以下:
func (ac *addrConn) resetTransport() { for i := 0; ; i++ { if ac.state == connectivity.Shutdown { return } ... connectDeadline := time.Now().Add(dialDuration) ac.updateConnectivityState(connectivity.Connecting) newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline) if err != nil { if ac.state == connectivity.Shutdown { return } ac.updateConnectivityState(connectivity.TransientFailure) timer := time.NewTimer(backoffFor) select { case <-timer.C: ... } continue } if ac.state == connectivity.Shutdown { newTr.Close() return } ... if !healthcheckManagingState { ac.updateConnectivityState(connectivity.Ready) } ... if ac.state == connectivity.Shutdown { return } ac.updateConnectivityState(connectivity.TransientFailure) } }
在該方法中會不斷地去嘗試建立鏈接,若成功則結束。不然不斷地根據 Backoff
算法的重試機制去嘗試建立鏈接,直到成功爲止。從結論上來說,單純調用 DialContext
是異步創建鏈接的,也就是並非立刻生效,處於 Connecting
狀態,而正式下要到達 Ready
狀態纔可用。
在抓包工具上提示一個包都沒有,那麼這算真正鏈接了嗎?我認爲這是一個表述問題,咱們應該儘量的嚴謹。若是你真的想經過 DialContext
方法就打通與服務端的鏈接,則須要調用 WithBlock
方法,雖然會致使阻塞等待,但最終鏈接會到達 Ready
狀態(握手成功)。以下圖:
type SearchServiceClient interface { Search(ctx context.Context, in *SearchRequest, opts ...grpc.CallOption) (*SearchResponse, error) } type searchServiceClient struct { cc *grpc.ClientConn } func NewSearchServiceClient(cc *grpc.ClientConn) SearchServiceClient { return &searchServiceClient{cc} }
這塊就是實例 Service API interface,比較簡單。
// search.pb.go func (c *searchServiceClient) Search(ctx context.Context, in *SearchRequest, opts ...grpc.CallOption) (*SearchResponse, error) { out := new(SearchResponse) err := c.cc.Invoke(ctx, "/proto.SearchService/Search", in, out, opts...) if err != nil { return nil, err } return out, nil }
proto 生成的 RPC 方法更像是一個包裝盒,把須要的東西放進去,而實際上調用的仍是 grpc.invoke
方法。以下:
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error { cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...) if err != nil { return err } if err := cs.SendMsg(req); err != nil { return err } return cs.RecvMsg(reply) }
經過概覽,能夠關注到三塊調用。以下:
// clientconn.go func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) { t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{ FullMethodName: method, }) if err != nil { return nil, nil, toRPCErr(err) } return t, done, nil }
在 newClientStream
方法中,咱們經過 getTransport
方法獲取了 Transport 層中抽象出來的 ClientTransport 和 ServerTransport,實際上就是獲取一個鏈接給後續 RPC 調用傳輸使用。
// conn.Close() func (cc *ClientConn) Close() error { defer cc.cancel() ... cc.csMgr.updateState(connectivity.Shutdown) ... cc.blockingpicker.close() if rWrapper != nil { rWrapper.close() } if bWrapper != nil { bWrapper.close() } for ac := range conns { ac.tearDown(ErrClientConnClosing) } if channelz.IsOn() { ... channelz.AddTraceEvent(cc.channelzID, ted) channelz.RemoveEntry(cc.channelzID) } return nil }
該方法會取消 ClientConn 上下文,同時關閉全部底層傳輸。涉及以下:
會,可是是異步鏈接的,鏈接狀態爲正在鏈接。但若是你設置了 grpc.WithBlock
選項,就會阻塞等待(等待握手成功)。另外你須要注意,當未設置 grpc.WithBlock
時,ctx 超時控制對其無任何效果。
會,除非你的客戶端不是常駐進程,那麼在應用結束時會被動地回收資源。但若是是常駐進程,你又真的忘記執行 Close
語句,會形成的泄露。以下圖:
3.1. 客戶端
3.2. 服務端
3.3. TCP
短期內不會出現問題,可是會不斷積蓄泄露,積蓄到最後固然就是服務沒法提供響應了。以下圖:
func chainUnaryClientInterceptors(cc *ClientConn) { interceptors := cc.dopts.chainUnaryInts if cc.dopts.unaryInt != nil { interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...) } var chainedInt UnaryClientInterceptor if len(interceptors) == 0 { chainedInt = nil } else if len(interceptors) == 1 { chainedInt = interceptors[0] } else { chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error { return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...) } } cc.dopts.unaryInt = chainedInt }
當存在多個攔截器時,取的就是第一個攔截器。所以結論是容許傳多個,但並無用。
可使用 go-grpc-middleware 提供的 grpc.UnaryInterceptor
和 grpc.StreamInterceptor
鏈式方法,方便快捷省心。
單單會用還不行,咱們再深剖一下,看看它是怎麼實現的。核心代碼以下:
func ChainUnaryClient(interceptors ...grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor { n := len(interceptors) if n > 1 { lastI := n - 1 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { var ( chainHandler grpc.UnaryInvoker curI int ) chainHandler = func(currentCtx context.Context, currentMethod string, currentReq, currentRepl interface{}, currentConn *grpc.ClientConn, currentOpts ...grpc.CallOption) error { if curI == lastI { return invoker(currentCtx, currentMethod, currentReq, currentRepl, currentConn, currentOpts...) } curI++ err := interceptors[curI](currentCtx, currentMethod, currentReq, currentRepl, currentConn, chainHandler, currentOpts...) curI-- return err } return interceptors[0](ctx, method, req, reply, cc, chainHandler, opts...) } } ... }
當攔截器數量大於 1 時,從 interceptors[1]
開始遞歸,每個遞歸的攔截器 interceptors[i]
會不斷地執行,最後才真正的去執行 handler
方法。同時也常常有人會問攔截器的執行順序是什麼,經過這段代碼你得出結論了嗎?
這個問題咱們能夠反向驗證一下,假設不公用 ClientConn 看看會怎麼樣?以下:
func BenchmarkSearch(b *testing.B) { for i := 0; i < b.N; i++ { conn, err := GetClientConn() if err != nil { b.Errorf("GetClientConn err: %v", err) } _, err = Search(context.Background(), conn) if err != nil { b.Errorf("Search err: %v", err) } } }
輸出結果:
... connection error: desc = "transport: Error while dialing dial tcp :10001: socket: too many open files" ... connection error: desc = "transport: Error while dialing dial tcp :10001: socket: too many open files" ... connection error: desc = "transport: Error while dialing dial tcp :10001: socket: too many open files" ... connection error: desc = "transport: Error while dialing dial tcp :10001: socket: too many open files" FAIL exit status 1
當你的應用場景是存在高頻次同時生成/調用 ClientConn 時,可能會致使系統的文件句柄佔用過多。這種狀況下你能夠變動應用程序生成/調用 ClientConn 的模式,又或是池化它,這塊能夠參考 grpc-go-pool 項目。
會不斷地進行重試,直到上下文取消。而重試時間方面採用 backoff 算法做爲的重連機制,默認的最大重試時間間隔是 120s。
許多客戶端要經過 HTTP 代理來訪問網絡,gRPC 所有用 HTTP/2 實現,等到代理開始支持 HTTP/2 就能透明轉發 gRPC 的數據。不光如此,負責負載均衡、訪問控制等等的反向代理都能無縫兼容 gRPC,比起本身設計 wire protocol 的 Thrift,這樣作科學很多。@ctiller @滕亦飛
gRPC 的 RPC 協議是基於 HTTP/2 標準實現的,HTTP/2 的一大特性就是不須要像 HTTP/1.1 同樣,每次發出請求都要從新創建一個新鏈接,而是會複用原有的鏈接。
因此這將致使 kube-proxy 只有在鏈接創建時纔會作負載均衡,而在這以後的每一次 RPC 請求都會利用本來的鏈接,那麼實際上後續的每一次的 RPC 請求都跑到了同一個地方。
注:使用 k8s service 作負載均衡的狀況下