跟着gRPC源碼簡單學習RPC原理

RPC(Remote Procedure Call/遠程過程調用)是一種服務間交互的方式,達到像本地方法同樣調用遠程方法的目的。指針

看看源碼,簡單瞭解gRPC是怎麼實現RPC的,以gRPC官方代碼示例helloworld爲例。code

服務註冊

先看一下服務端怎麼註冊服務。server

helloworld.pb.go文件中,會有RegisterGreeterServer方法以及_Greeter_serviceDesc變量,
_Greeter_serviceDesc描述了服務的屬性。RegisterGreeterServer方法會向gRPC服務端s註冊服務srvrpc

//...
func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) {
    s.RegisterService(&_Greeter_serviceDesc, srv)
}
//...
var _Greeter_serviceDesc = grpc.ServiceDesc{
    ServiceName: "helloworld.Greeter", // ServiceName包括`.proto`文件中的package名稱
    HandlerType: (*GreeterServer)(nil),
    Methods: []grpc.MethodDesc{
        {
            MethodName: "SayHello",
            Handler:    _Greeter_SayHello_Handler,
        },
    },
    Streams:  []grpc.StreamDesc{},
    Metadata: "helloworld.proto",
}

server.go中的RegisterService方法,會判斷ServiceServer是否實現sd中描述的HandlerType;若是實現了則調用s.register方法註冊。源碼

func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
    ht := reflect.TypeOf(sd.HandlerType).Elem()
    st := reflect.TypeOf(ss)
    if !st.Implements(ht) { // 判斷ServiceServer是否實現sd中描述的HandlerType
        grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
    }
    s.register(sd, ss)
}

register根據Method建立對應的map,並將名稱做爲鍵,方法描述(指針)做爲值,添加到相應的map中。
最後將{服務名稱:服務}添加到服務端。string

func (s *Server) register(sd *ServiceDesc, ss interface{}) {
    s.mu.Lock()
    defer s.mu.Unlock()
    s.printf("RegisterService(%q)", sd.ServiceName)
    if s.serve {// 服務端是否啓動
        grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
    }
    if _, ok := s.m[sd.ServiceName]; ok {//服務是否已經註冊
        grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
    }
    srv := &service{
        server: ss,
        md:     make(map[string]*MethodDesc), // map of Methods
        sd:     make(map[string]*StreamDesc), // map of Stream
        mdata:  sd.Metadata,
    }
    for i := range sd.Methods {
        d := &sd.Methods[i]
        srv.md[d.MethodName] = d
    }
    for i := range sd.Streams {
        d := &sd.Streams[i]
        srv.sd[d.StreamName] = d
    }
    s.m[sd.ServiceName] = srv // 添加服務到服務端
}

服務調用

服務端註冊了服務以後,接下來就是調用服務。it

客戶端調用服務

helloworld.pb.go中的SayHello方法中使用c.cc.Invoke遠程調用服務端:io

func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
    out := new(HelloReply)
    // 遠程調用服務端的方法
    err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)
    if err != nil {
        return nil, err
    }
    return out, nil
}

Invoke的定義以下,"/helloworld.Greeter/SayHello"對應的是method參數。class

func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error

method被傳入後,依次被invokenewClientStream方法調用,保存在cs.callHdr.Method;後續在newAttemptLocked方法中生成cs.attempt.t
cs.SendMsg最終經過csAttempt.sendMsg方法中調用a.t.Write方法寫出請求req變量

最後cs.RecvMsg輪詢獲取返回結果。

func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
    cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
    if err != nil {
        return err
    }
    if err := cs.SendMsg(req); err != nil {
        return err
    }
    return cs.RecvMsg(reply)
}

服務端執行調用

handleStream方法會處理請求中的Method字段,得到服務名與方法名。調用s.processUnaryRPC, 調用md.Handler方法(即_Greeter_SayHello_Handler方法)。
最終調用s.sendResponse方法返回結果。

func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
    in := new(HelloRequest)
    if err := dec(in); err != nil {
        return nil, err
    }
    if interceptor == nil {
        return srv.(GreeterServer).SayHello(ctx, in)
    }
    info := &grpc.UnaryServerInfo{
        Server:     srv,
        FullMethod: "/helloworld.Greeter/SayHello",
    }
    handler := func(ctx context.Context, req interface{}) (interface{}, error) {
        return srv.(GreeterServer).SayHello(ctx, req.(*HelloRequest))
    }
    return interceptor(ctx, in, info, handler)
}
相關文章
相關標籤/搜索