Remote Procedure Call,遠程過程調用golang
而一旦踏入公司尤爲是大型互聯網公司就會發現,公司的系統都由成千上萬大大小小的服務組成,各服務部署在不一樣的機器上,由不一樣的團隊負責。這時就會遇到兩個問題:
1)要搭建一個新服務,免不了須要依賴他人的服務,而如今他人的服務都在遠端,怎麼調用?
2)其它團隊要使用咱們的服務,咱們的服務該怎麼發佈以便他人調用?網絡
RPC的目標就是要2~8這些步驟都封裝起來,讓用戶對這些細節透明。併發
RPC能夠經過HTTP來實現(grpc基於http2.0),也能夠經過Socket本身實現一套協議來實現.app
rpc simpleHello(Person) returns (Result) {}框架
rpc serverStreamHello(Person) returns (stream Result) {}tcp
rpc clientStreamHello(stream Person) returns (Result) {}google
rpc biStreamHello(stream Person) returns (stream Result) {}spa
simple.protocode
syntax = "proto3"; package testRPC; service testRPC { rpc simple (emit) returns (on) { } rpc serverStream (emit) returns (stream on){} rpc clientStream (stream emit) returns (on){} rpc bothStream (stream emit) returns (stream on){} } message emit{ string type = 1; string name = 2; } message on{ string type = 1; int32 age = 2; }
serviceserver
package main import ( "net" "google.golang.org/grpc" pb "google.golang.org/grpc/examples/test/proto" "google.golang.org/grpc/reflection" "fmt" "golang.org/x/net/context" "io" ) const ( port = ":50051" ) type server struct { } func (s *server) Simple(ctx context.Context, in *pb.Emit) (*pb.On, error) { return &pb.On{Type: "Hello " + in.Type, Age: 10}, nil } func (s *server) ServerStream(in *pb.Emit, stream pb.TestRPC_ServerStreamServer) (error) { for i := 1; i < 10; i++ { if err := stream.Send(&pb.On{Type: "Hello " + in.Type, Age: int32(i)}); err != nil { return err } } return nil } func (s *server) ClientStream(stream pb.TestRPC_ClientStreamServer) (error) { var pointCount int32 for { _, err := stream.Recv() if err == io.EOF { return stream.SendAndClose(&pb.On{ Type: "end", Age: pointCount, }) } pointCount++ } } func (s *server) BothStream(stream pb.TestRPC_BothStreamServer) (error) { for { emit, err := stream.Recv() if err == io.EOF { return nil } fmt.Println(string(emit.Type)) for i := 1; i < 3; i++ { if err := stream.Send(&pb.On{Type: emit.Type, Age: int32(i)}); err != nil { return err } } } return nil } func main() { lis, _ := net.Listen("tcp", port) s := grpc.NewServer() pb.RegisterTestRPCServer(s, &server{}) reflection.Register(s) if err := s.Serve(lis); err != nil { fmt.Println("error") } }
client
package main import ( "google.golang.org/grpc" pb "google.golang.org/grpc/examples/test/proto" "golang.org/x/net/context" "time" "log" "io" ) const ( address = "localhost:50051" ) func main() { conn, _ := grpc.Dial(address, grpc.WithInsecure()) defer conn.Close() rpcClient := pb.NewTestRPCClient(conn) testSimple(rpcClient) testServerStream(rpcClient) testClientStream(rpcClient) testBothStream(rpcClient) } func testSimple(client pb.TestRPCClient) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() // simple r, _ := client.Simple(ctx, &pb.Emit{Type: "11"}) log.Printf("Greeting: %s", r) } func testServerStream(client pb.TestRPCClient) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() stream, err := client.ServerStream(ctx, &pb.Emit{Type: "Hello "}) if err != nil { log.Fatalf("%v.ListFeatures(_) = _, %v", client, err) } for { feature, err := stream.Recv() if err == io.EOF { break } if err != nil { log.Fatalf("%v.ListFeatures(_) = _, %v", client, err) } log.Println(feature) } } func testClientStream(client pb.TestRPCClient) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() stream, err := client.ClientStream(ctx) if err != nil { log.Fatalf("%v.RecordRoute(_) = _, %v", client, err) } var points []pb.Emit for i := 0; i < 10; i++ { points = append(points, pb.Emit{Type: "t", Name: string(i)}) } for _, point := range points { if err := stream.Send(&point); err != nil { log.Fatalf("%v.Send(%v) = %v", stream, point, err) } } reply, err := stream.CloseAndRecv() if err != nil { log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil) } log.Printf("Route summary: %v", reply) } func testBothStream(client pb.TestRPCClient) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() stream, err := client.BothStream(ctx) if err != nil { log.Fatalf("%v.RecordRoute(_) = _, %v", client, err) } var points []pb.Emit for i := 0; i < 10; i++ { points = append(points, pb.Emit{Type: "xiaoming"+string(i), Name: "xiaohong"}) } waitc := make(chan struct{}) go func() { for { on, err := stream.Recv() if err == io.EOF { // read done. close(waitc) return } if err != nil { log.Fatalf("Failed to receive a note : %v", err) } log.Printf("Got message %s = %d)", on.Type, on.Age) } }() for _, point := range points { if err := stream.Send(&point); err != nil { log.Fatalf("Failed to send a note: %v", err) } } stream.CloseSend() <-waitc }