gRPC

1.簡介

在gRPC中,客戶端應用程序能夠直接調用不一樣計算機上的服務器應用程序上的方法,就像它是本地對象同樣,使您能夠更輕鬆地建立分佈式應用程序和服務。與許多RPC系統同樣,gRPC基於定義服務的思想,指定可使用其參數和返回類型遠程調用的方法。在服務器端,服務器實現此接口並運行gRPC服務器來處理客戶端調用。在客戶端,客戶端有一個存根(在某些語言中稱爲客戶端),它提供與服務器相同的方法。gRPC可使用protocol buffers做爲其接口定義語言(IDL)和其基礎消息交換格式,來序列化結構化數據,關於詳細的Proto語法介紹,能夠看一下另外一篇文章www.jianshu.com/p/434ac0fbc…git

圖片來自gRPC doc.png

2.基本概念

2.1.服務定義

與許多RPC系統同樣,gRPC基於定義服務的思想,指定可使用其參數和返回類型遠程調用的方法。 默認狀況下,gRPC使用protocol buffers做爲接口定義語言(IDL)來描述服務接口和有效負載消息的結構。 若是須要,可使用其餘替代方案。github

service HelloService {
  rpc SayHello (HelloRequest) returns (HelloResponse);
}

message HelloRequest {
  string greeting = 1;
}

message HelloResponse {
  string reply = 1;
}
複製代碼

gRPC容許您定義四種服務方法:golang

  • Unary RPCs:客戶端發送一個請求到服務端,並從服務端獲得一個響應,如同常規的函數調用。 rpc SayHello(HelloRequest) returns (HelloResponse){ }
  • Server streaming RPCs:客戶機向服務器發送一個請求,並獲取一個流來讀取返回的消息序列。客戶端從返回的流中讀取,直到沒有更多消息。 gRPC保證單個RPC調用中的消息排序。 rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse){ }
  • Client streaming RPCs:客戶端再次使用提供的流寫入一系列消息並將它們發送到服務器。 一旦客戶端寫完消息,它就等待服務器讀取它們並返回它的響應。 gRPC再次保證在單個RPC調用中的消息排序。 rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse) { }
  • Bidirectional streaming RPCs:雙方使用讀寫流發送一系列消息。 這兩個流獨立運行,所以客戶端和服務器能夠按照本身喜歡的順序進行讀寫:例如,服務器能夠在寫入響應以前等待接收全部客戶端消息,或者它能夠交替地讀取消息而後寫入消息, 或者其餘一些讀寫組合。 保留每一個流中的消息順序。 rpc BidiHello(stream HelloRequest) returns (stream HelloResponse){ }

2.2 API使用

從.proto文件中的服務定義開始,gRPC提供協議緩衝區編譯器插件,用於生成客戶機和服務器端代碼。gRPC用戶一般在客戶端調用這些API,並在服務器端實現相應的API。express

  • 在服務器端,服務器實現服務聲明的方法,並運行gRPC服務器來處理客戶端調用。 gRPC基礎結構解碼傳入請求,執行服務方法並對服務響應進行編碼。
  • 在客戶端,客戶端有一個稱爲存根的本地對象(對於某些語言,首選術語是客戶端),它實現與服務相同的方法。 而後,客戶端能夠在本地對象上調用這些方法,將調用的參數包裝在適當的協議緩衝區消息類型中 - gRPC在將請求發送到服務器並返回服務器的protocol buffers的響應。

2.3同步和異步模式

同步RPC調用會阻塞直到服務端的響應到達,是最接近RPC所指望的過程調用的抽象。另外一方面,網絡本質上是異步的,在許多狀況下,可以在不阻塞當前線程的狀況下啓動rpc是頗有用的。apache

3.RPC的生命週期

如今讓咱們仔細看看當gRPC客戶端調用gRPC服務器方法時會發生什麼。api

3.1Unary RPC

首先讓咱們看一下最簡單的RPC類型,客戶端發送單個請求,獲得單個響應。bash

  • 一但客戶端調用了stub/clientd對象上的方法,服務端就會獲得通知,RPC被調用了,攜帶客戶端關於本次調用的元數據,方法名,和指定的截止時間(若是提供了的話)。
  • 而後,服務器能夠當即發送回本身的初始元數據(必須在任何響應以前發送),或者等待客戶端的請求消息 - 首先發生的是特定於應用程序的消息。
  • 一旦服務器具備客戶端的請求消息,它就會執行建立和填充其響應所需的任何工做。 而後將響應與狀態詳細信息(狀態代碼和可選狀態消息)以及可選的尾隨元數據一塊兒返回(若是成功)到客戶端。
  • 若是status 是OK,客戶端獲得響應,就結束了整個調用。

3.2 Server streaming RPC

Server streaming RPC,在獲得客戶端的請求信息後,期待服務端發送響應的流,發送完全部的響應以後,服務端狀態細節和可選的尾元數據也會被服務端發送來結束調用。一旦客戶端擁有全部服務器的響應,客戶端就會完成。服務器

3.3 Client streaming RPC

客戶端發送一個請求的流而不是單個請求,服務器發送回單個響應,一般但不必定在收到全部客戶端請求後,以及其狀態詳細信息和可選的尾隨元數據。網絡

3.4Bidirectional streaming RPC

在雙向流式RPC中,調用再次由調用方法的客戶端和接收客戶端元數據,方法名稱和截止時間的服務器啓動。 服務器再次能夠選擇發回其初始元數據或等待客戶端開始發送請求。接下來會發生什麼取決於應用程序,由於客戶端和服務器能夠按任何順序讀寫 - 流徹底獨立地運行。 所以,例如,服務器能夠等到它收到全部客戶端的消息以後再寫入其響應,或者服務器和客戶端能夠「乒乓」:服務器獲取請求,而後發回響應,而後客戶端發送 另外一個基於響應的請求,等等。app

3.5Deadlines/Timeouts

gRPC容許客戶端指定它願意等待多久待RPC調用完成,直到RPC被中斷,並帶有DEADLINE_EXCEEDED錯誤。服務端,能夠查詢一個特定的RPC是否已經超時,或者還有多久待調用完成。若是指定deadline或者timeout不一樣語言,方式可能不一樣。

3.6RPC termination

客戶端和服務器都對調用的成功作出獨立的和本地的決定,而且他們的結論可能不一樣,這就意味着,你可能在服務端收到(「我已經發送完全部的響應」),可是客戶端缺失敗了(「響應超時」),服務端也可能在客戶端發送完全部請求以前決定完成。

3.7Cancelling RPCs

客戶端和服務端在任什麼時候候均可以取消RPC調用,取消當即終止RPC,以便再也不進行進一步的工做。 它不是「撤消」:取消以前所作的更改將不會被回滾。

3.8 Metadata

元數據是以鍵值對列表形式的特定RPC調用(例如身份驗證詳細信息)的信息,其中鍵是字符串,值一般是字符串(但能夠是二進制數據)。 元數據對gRPC自己是不透明的 - 它容許客戶端提供與服務器調用相關的信息,反之亦然。

3.9Channels

gRPC通道提供與指定主機和端口上的gRPC服務器的鏈接,並在建立客戶端存根(或某些語言中的「客戶端」)時使用。 客戶端能夠指定通道參數來修改gRPC的默認行爲,例如打開和關閉消息壓縮。 通道具備狀態,包括已鏈接和空閒。

4.安裝

4.1. Install gRPC

go get -u google.golang.org/grpc

4.2. Install Protocol Buffers v3

安裝protoc編譯器,用於產生gRPC服務代碼,下載地址: github.com/google/prot…

  • 解壓下載的文件
  • 更新PATH環境變量,將protoc二進制可執行文件路徑加到環境變量中。

4.3 install protoc plugin for golang

go get -u github.com/golang/protobuf/protoc-gen-go

5.編譯示例

示例代碼在grpc項目下的examples目錄下

cd $GOPATH/src/google.golang.org/grpc/examples/helloworld
複製代碼

gRPC服務定義在.proto文件中,該文件被用於編譯產生相關的.pb.go文件,.pb.go文件是使用protocol編譯器protoc編譯.proto文件產生的。示例代碼中該文件已經產生,內容涵蓋一下兩點:

  • 產生客戶端和服務端代碼
  • 用於填充,序列化和檢索HelloRequest和HelloReply消息類型的代碼。 測試:
  • go run greeter_server/main.go啓動服務端運行
  • go run greeter_client/main.go在的終端裏,啓動客戶端運行

若是在運行上面命令的時候,出現依賴包問題,好比:

➜  helloworld git:(master) go run greeter_server/main.go
../../status/status.go:37:2: cannot find package "google.golang.org/genproto/googleapis/rpc/status" in any of:
	/usr/local/go/src/google.golang.org/genproto/googleapis/rpc/status (from $GOROOT)
	/Users/xxx/workspace/src/google.golang.org/genproto/googleapis/rpc/status (from $GOPATH)
複製代碼

安裝 google.golang.org/genproto:

$ wget https://github.com/google/go-genproto/archive/master.tar.gz -O ${GOPATH}/src/google.golang.org/genproto.tar.gz
$ cd ${GOPATH}/src/google.golang.org && tar zxvf genproto.tar.gz && mv go-genproto-master genproto
複製代碼

若是順利,將會看到客戶端標準輸出:

➜  helloworld git:(master) go run greeter_client/main.go
2019/07/12 17:21:47 Greeting: Hello world
複製代碼

6.更新服務

6.1定義新服務

上面已經成功運行了咱們的gRPC示例代碼,如今當咱們須要新增服務需求時,在.proto文件中定義相關服務,好比,下面咱們新增一個SayHelloAgain方法,方法的參數和返回值和以前的保持不變

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}
  // Sends another greeting
  rpc SayHelloAgain (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name. message HelloRequest { string name = 1; } // The response message containing the greetings message HelloReply { string message = 1; } 複製代碼

6.2.編譯proto文件產生新的服務代碼

此時,須要使用protoc編譯器從新編譯一下咱們修改後的文件

$ protoc -I helloworld/ helloworld/helloworld.proto --go_out=plugins=grpc:helloworld
複製代碼

執行此命令後,新的helloworld.pb.go文件有了新的變化。

6.3更新咱們應用程序,從新運行

修改greeter_server/main.go文件:

/*
 *
 * Copyright 2015 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

//go:generate protoc -I ../helloworld --go_out=plugins=grpc:../helloworld ../helloworld/helloworld.proto

// Package main implements a server for Greeter service.
package main

import (
	"context"
	"log"
	"net"

	"google.golang.org/grpc"
	pb "google.golang.org/grpc/examples/helloworld/helloworld"
)

const (
	port = ":50051"
)

// server is used to implement helloworld.GreeterServer.
type server struct{}

// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
	log.Printf("Received: %v", in.Name)
	return &pb.HelloReply{Message: "Hello " + in.Name}, nil
}

func (s *server) SayHelloAgain(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
	return &pb.HelloReply{Message: "Hello again " + in.Name}, nil
}

func main() {
	lis, err := net.Listen("tcp", port)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	s := grpc.NewServer()
	pb.RegisterGreeterServer(s, &server{})
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}
複製代碼

修改greeter_client/main.go文件:

/*
 *
 * Copyright 2015 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

// Package main implements a client for Greeter service.
package main

import (
	"context"
	"log"
	"os"
	"time"

	"google.golang.org/grpc"
	pb "google.golang.org/grpc/examples/helloworld/helloworld"
)

const (
	address     = "localhost:50051"
	defaultName = "world"
)

func main() {
	// Set up a connection to the server.
	conn, err := grpc.Dial(address, grpc.WithInsecure())
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	c := pb.NewGreeterClient(conn)

	// Contact the server and print out its response.
	name := defaultName
	if len(os.Args) > 1 {
		name = os.Args[1]
	}
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})
	if err != nil {
		log.Fatalf("could not greet: %v", err)
	}
	log.Printf("Greeting: %s", r.Message)
	r, err = c.SayHelloAgain(ctx, &pb.HelloRequest{Name: name})
	if err != nil {
		log.Fatalf("could not greet: %v", err)
	}
	log.Printf("Greeting: %s", r.Message)
}

複製代碼

6.4 運行

  • go run greeter_server/main.go 運行服務端
  • go run greeter_client/main.go 運行客戶端 順利的話,從控制檯打印:
➜  helloworld git:(master) go run greeter_client/main.go
2019/07/12 17:40:44 Greeting: Hello world
2019/07/12 17:40:44 Greeting: Hello again world
複製代碼

7.總結

7.1 定義服務

定義一個服務,須要在.proto文件中指定一個service :

service RouteGuide {
   ...
}
複製代碼

而後在service中定義rpc方法,指定請求參數類型,和返回值類型。gRPC容許咱們定義四種類型的服務方法,全部這些類型的方法都在RouteGuide服務中。

  • 第一種:最簡單的,客服端經過stub發送一個請求到服務端,等待一個響應返回,就像普通的方法調用。 rpc GetFeature(Point) returns (Feature) {}
  • 第二種:服務端流RPC,客戶端發送一個請求到服務端,並獲取一個返回流用來讀取信息序列,客戶端讀取流知道無更多消息。就像在示例中看到的,將stream關鍵字放在返回值類型前面就能夠定義一個服務端流方法。 rpc ListFeatures(Rectangle) returns (stream Feature) {}
  • 客戶端流RPC,客戶端寫一個消息序列並將它們發送到服務端,客戶端完成寫消息後,等待服務端讀取它們並返回響應,你能夠在請求類型前面加stream關鍵字來定義一個客戶端流RPC. rpc RecordRoute(stream Point) returns (RouteSummary) {}
  • 雙向RPC,兩端都是用消息系列來進行讀寫,兩個流操做是獨立的,因此客戶端和服務端能夠以任何順序來進行讀寫,例如,服務端能夠等到客戶端全部的信息到達後再返回響應。或者讀一個寫一個,或者其餘方式。保留每一個流中的消息順序,你能夠在請求參數和響應參數前面加上stream來定義這類方法。 rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

7.2.定義方法的請求參數類型和響應參數類型

咱們.proto文件一樣包含protocol buffer 請求和響應的消息類型在方法定義中,以下:

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}
複製代碼

7.3.編譯產生客戶端和服務端代碼

接下來,經過咱們.proto文件中的服務定義,產生gRPC客戶端和服務端接口,使用protocol buffer的編譯器 protoc帶有gRPC的go語言插件。

protoc -I routeguide/ routeguide/route_guide.proto --go_out=plugins=grpc:routeguide
複製代碼

運行上面的命令,能夠產生咱們須要的.pb.go文件

7.4建立服務端

首先,咱們來看看若是建立一個RouteGuide服務端

  • 實現服務定義產生的服務接口,以此來作實際的工做。
  • 運行一個gRPC服務來監聽客戶端請求並分發他們到正確的服務上 實現RouteGuide:
type routeGuideServer struct {
        ...
}
...

func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
    for _, feature := range s.savedFeatures {
		if proto.Equal(feature.Location, point) {
			return feature, nil
		}
	}
	// No feature was found, return an unnamed feature
	return &pb.Feature{"", point}, nil
}
...

func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
        for _, feature := range s.savedFeatures {
		if inRange(feature.Location, rect) {
			if err := stream.Send(feature); err != nil {
				return err
			}
		}
	}
	return nil
}
...

func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
       var pointCount, featureCount, distance int32
	var lastPoint *pb.Point
	startTime := time.Now()
	for {
		point, err := stream.Recv()
		if err == io.EOF {
			endTime := time.Now()
			return stream.SendAndClose(&pb.RouteSummary{
				PointCount:   pointCount,
				FeatureCount: featureCount,
				Distance:     distance,
				ElapsedTime:  int32(endTime.Sub(startTime).Seconds()),
			})
		}
		if err != nil {
			return err
		}
		pointCount++
		for _, feature := range s.savedFeatures {
			if proto.Equal(feature.Location, point) {
				featureCount++
			}
		}
		if lastPoint != nil {
			distance += calcDistance(lastPoint, point)
		}
		lastPoint = point
	}
}
...

func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
   for {
		in, err := stream.Recv()
		if err == io.EOF {
			return nil
		}
		if err != nil {
			return err
		}
		key := serialize(in.Location)
                ... // look for notes to be sent to client
		for _, note := range s.routeNotes[key] {
			if err := stream.Send(note); err != nil {
				return err
			}
		}
	}
}
...
複製代碼

一但咱們實現了全部的方法,咱們還須要開啓一個gRPC服務,客戶端才能實際使用咱們的服務,以下:

flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
        log.Fatalf("failed to listen: %v", err)
}
grpcServer := grpc.NewServer()
pb.RegisterRouteGuideServer(grpcServer, &routeGuideServer{})
... // determine whether to use TLS
grpcServer.Serve(lis)
複製代碼

步驟以下:

  • 指定客戶端請求的端口 err := net.Listen("tcp", fmt.Sprintf(":%d", *port)).
  • 建立一個gRPC服務實例 grpc.NewServer().
  • 將服務註冊到gRPC服務端上
  • 調用Serve()方法來阻塞監聽,直到進程被殺或者Stop()被調用。

7.5建立客戶端

建立一個client stub: 爲了調用服務方法,咱們須要建立一個gRPC管道來與服務端通訊,咱們經過傳入服務端地址和端口到grpc.Dial()方法來實現:

conn, err := grpc.Dial(*serverAddr)
if err != nil {
    ...
}
defer conn.Close()
複製代碼

在grpc.Dial方法中能夠經過DialOptions來設置權限驗證,咱們的例子中,目前不須要這樣作。 一旦gRPC的管道創建,咱們須要一個客戶端stub來進行RPC交互,咱們能夠經過pb包中的NewRouteGuideClient 方法來實現,

client := pb.NewRouteGuideClient(conn)
複製代碼

調用服務方法: 在gRPC-go中,RPC操做都是同步阻塞模式,這意味着,RPC調用要等待服務端響應。 簡單的RPC調用,就像調用本地的方法:

feature, err := client.GetFeature(context.Background(), &pb.Point{409146138, -746188906})
if err != nil {
        ...
}
複製代碼

如你所見,咱們能夠在咱們創建的stub上進行方法調用,在方法調用的參數上,提供了一個請求的protocol buffer類型的值,並傳入了一個context.Context對象,該對象能夠在須要的時候改變RPC調用的行爲,例如,超時取消,若是調用未返回一個錯誤,咱們就能夠讀取返回值信息從第一個返回值中。

log.Println(feature)
複製代碼

服務端流RPC

rect := &pb.Rectangle{ ... }  // initialize a pb.Rectangle
stream, err := client.ListFeatures(context.Background(), rect)
if err != nil {
    ...
}
for {
    feature, err := stream.Recv()
    if err == io.EOF {
        break
    }
    if err != nil {
        log.Fatalf("%v.ListFeatures(_) = _, %v", client, err)
    }
    log.Println(feature)
}
複製代碼

和簡單的RPC相似,咱們傳給方法一個context參數,一個請求protocol buffer參數,然而,在獲取相應的時候,咱們的獲得的是一個RouteGuide_ListFeaturesClient實例,客戶端可使用該stream來讀取服務端響應。 咱們使用RouteGuide_ListFeaturesClient的Recv()方法來重複讀取服務端響應到一個protocol buffer對象中(示例中爲Feature)直到沒有更多的信息。客戶端在每次調用Recv()方法後都須要檢查異常,若是err 是nil,表示該stream還正常,能夠繼續讀取,若是err == io.EOF表示消息已經讀取完了,不然就是一個RPC錯誤。

客戶端流RPC:

// Create a random number of random points
r := rand.New(rand.NewSource(time.Now().UnixNano()))
pointCount := int(r.Int31n(100)) + 2 // Traverse at least two points
var points []*pb.Point
for i := 0; i < pointCount; i++ {
	points = append(points, randomPoint(r))
}
log.Printf("Traversing %d points.", len(points))
stream, err := client.RecordRoute(context.Background())
if err != nil {
	log.Fatalf("%v.RecordRoute(_) = _, %v", client, err)
}
for _, point := range points {
	if err := stream.Send(point); err != nil {
		if err == io.EOF {
			break
		}
		log.Fatalf("%v.Send(%v) = %v", stream, point, err)
	}
}
reply, err := stream.CloseAndRecv()
if err != nil {
	log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
}
log.Printf("Route summary: %v", reply)
複製代碼

RouteGuide_RecordRouteClient 有一個Send方法,咱們可使用它向服務端發送請求,一旦咱們結束寫入客戶端請求到stream中,咱們須要調用stream上的CloseAndRecv()方法來告知gRPC咱們已經完成寫入請求,等待服務端響應。咱們經過CloseAndRecv()方法的返回值err能夠獲得RPC的狀態,若是err 是nil 表示該方法的第一個返回值是一個合法的服務端響應。 雙端的streaming RPC:

stream, err := client.RouteChat(context.Background())
waitc := make(chan struct{})
go func() {
	for {
		in, err := stream.Recv()
		if err == io.EOF {
			// read done.
			close(waitc)
			return
		}
		if err != nil {
			log.Fatalf("Failed to receive a note : %v", err)
		}
		log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
	}
}()
for _, note := range notes {
	if err := stream.Send(note); err != nil {
		log.Fatalf("Failed to send a note: %v", err)
	}
}
stream.CloseSend()
<-waitc
複製代碼

語法和客戶端stream方法相似,除了咱們在結束咱們的調用時,須要使用stream山的CloseSend()方法。因爲每一個端獲取雙發的消息的順序都是雙發寫入消息的順序,因此客戶端和服務端能夠任意順序的讀取和寫入消息,雙端的stream操做時獨立的。

7.6最後就能夠進行相應的測試了

相關文章
相關標籤/搜索