Golang gRPC學習(04): Deadlines超時限制

爲何要使用Deadlines

當咱們使用gRPC時,gRPC庫關係的是鏈接,序列化,反序列化和超時執行。Deadlines 容許gRPC客戶端設置本身等待多長時間來完成rpc操做,直到出現這個錯誤 DEADLINE_EXCEEDED。可是在正常狀況下,這個DEADLINE_EXCEEDED默認設置是一個很大的數值。
一些語言的API用deadline,一些用 timeout。git

在正常狀況下,你沒有設置deadline,那麼全部的請求可能在最大請求時間事後才超時。這樣你對於你的服務器資源,可能存在風險,好比內存,可能由於這個長期運行的服務而增加很快,從而耗盡資源。
爲了不這種狀況,須要給你的客戶端請求程序設置一個默認的超時時間,在這段時間內請求沒有返回,那麼就超時報錯。github

怎麼使用?Deadlines使用步驟

使用步驟:

1.設置deadlinesgolang

var deadlineMs = flag.Int("deadline_ms", 20*1000, "Default deadline in milliseconds.")

clientDeadline := time.Now().Add(time.Duration(*deadlineMs) * time.Millisecond)
ctx, cancel := context.WithDeadline(ctx, clientDeadline)

2.檢查deadlinesbash

if ctx.Err() == context.Canceled {
        return status.New(codes.Canceled, "Client cancelled, abandoning.")
}

具體使用:

1. 創建鏈接時超時控制:
客戶端創建鏈接時,使用的Dial()函數,它位於
google.golang.org/grpc/clientconn.go 中,咱們看看這個函數內容:服務器

func Dial(target string, opts ...DialOption) (*ClientConn, error) {    
   return DialContext(context.Background(), target, opts...)
}

它裏面調用的 DialContext() 函數,這個函數很是長,他們在同一個文件中,它是實際執行的函數,這裏面就有context的timeout和Done相關操做。你也能夠到google.golang.org/grpc/clientconn.go文件中去看看這個函數DialContext具體是幹嗎的。異步

使用的時候傳入設置timeout的context,以下:tcp

ctx, cancel := context.Timeout(context.Bakcground(), time.Second*5)
defer cancel()
conn, err := grpc.DialContext(ctx, address, grpc.WithBlock(), grpc.WithInsecure())
  • grpc.WithInsecure() ,這個參數啥意思?
    gRPC是創建在HTTP/2上的,因此對TLS提供了很好的支持。若是在客戶端創建鏈接過程當中設置 grpc.WithInsecure() 就能夠跳過對服務器證書的驗證。寫練習時能夠用這個參數,可是在真實的環境中,不要這樣作,由於有泄露信息的風險。
  • grpc.WithBlock()
    這個參數會阻塞等待握手成功。
    由於用Dial鏈接時是異步鏈接,鏈接狀態爲正在鏈接,若是設置了這個參數就是同步鏈接,會阻塞等待握手成功。
    這個還和超時設置有關,若是你沒有設置這個參數,那麼context超時控制將會失效。

2. 調用時超時:
函數的調用超時控制函數

ctx, cancel := context.WithTimeout(context.TODO(), time.Second*5)
defer cancel()
result, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})

實例

用grpc官方的例子來練習下ui

目錄結構:google

grpc-tutorial
    -- 04deadlines
        --client
            - main.go
        --server
           - main.go
       --proto/echo
           - echo.proto
           - echo.pb.go

1.首先是定義服務 echo.proto

syntax = "proto3";
package echo;message 

EchoRequest {   
  string message = 1;
}

message EchoResponse {   
  string message = 1;
}

service Echo {    
  rpc UnaryEcho(EchoRequest) returns (EchoRequest) {}    
  rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {}   
  rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {}    
  rpc BidirectionalStreamingEcho(stream EchoRequest) returns (stream EchoResponse){}
}

進入到proto/echo目錄,生成go文件,命令以下:

protoc -I . --go_out=plugins=grpc:. ./echo.proto

2.客戶端

client\main.go
有2個主要的函數,2端都是stream和都不是stream,先看都不是stream的函數

都不是stream的函數

// unaryCall 不是stream的請求
func unaryCall(c pb.EchoClient, requestID int, message string, want codes.Code) {   
    ctx, cancel := context.WithTimeout(context.Background(), time.Second) //超時設置   
    defer cancel()   

    req := &pb.EchoRequest{Message: message} //參數部分

    _, err := c.UnaryEcho(ctx, req) //調用函數發送請求給服務端
    got := status.Code(err)   //
    fmt.Printf("[%v] wanted = %v, got = %v\n", requestID, want, got)
}

上面的code設置在文件 grpc/codes/codes.go

type Code uint32

const (
        OK Code = 0
        Canceled Code = 1
        Unknown Code = 2
        InvalidArgument Code = 3
        DeadlineExceeded Code = 4
   ... ...
)

2端都是stream的函數:

// streamingCall,2端都是stream
func streamingCall(c pb.EchoClient, requestID int, message string, want codes.Code) {
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)//超時設置
    defer cancel()

    stream, err := c.BidirectionalStreamingEcho(ctx)//雙向stream
    if err != nil {
        log.Printf("Send error : %v", err)
        return
    }

    err = stream.Send(&pb.EchoRequest{Message: message})//發送
    if err != nil {
        log.Printf("Send error : %v", err)
        return
    }

    _, err = stream.Recv() //接收
   
    got := status.Code(err)
    fmt.Printf("[%v] wanted = %v, got = %v\n", requestID, want, got)
}

main 執行函數

func main() {
    flag.Parse()

    conn, err := grpc.Dial(*addr, grpc.WithInsecure(), grpc.WithBlock())
    if err != nil {
        log.Fatalf("did not connect : %v ", err)
    }
    defer conn.Close()

    c :=pb.NewEchoClient(conn)

    // 成功請求
    unaryCall(c, 1, "word", codes.OK)
    // 超時 deadline
    unaryCall(c, 2, "delay", codes.DeadlineExceeded)
    // A successful request with propagated deadline
    unaryCall(c, 3, "[propagate me]world", codes.OK)
    // Exceeds propagated deadline
    unaryCall(c, 4, "[propagate me][propagate me]world", codes.DeadlineExceeded)
    // Receives a response from the stream successfully.
    streamingCall(c, 5, "[propagate me]world", codes.OK)
    // Exceeds propagated deadline before receiving a response
    streamingCall(c, 6, "[propagate me][propagate me]world", codes.DeadlineExceeded)
}

3.服務端

定義一個struct

type server struct {    
    pb.UnimplementedEchoServer    
    client pb.EchoClient    
    cc *grpc.ClientConn
}

2端不是stream的函數:

func (s *server) UnaryEcho(ctx context.Context, req *pb.EchoRequest)(*pb.EchoResponse, error) {
    message := req.Message
    if strings.HasPrefix(message, "[propagate me]") {//判斷接收的值
        time.Sleep(800 * time.Millisecond)
        message := strings.TrimPrefix(message, "[propagate me]")
        return s.client.UnaryEcho(ctx, &pb.EchoRequest{Message:message}) //<1>
    }

    if message == "delay" {
        time.Sleep(1500 * time.Millisecond) // message=delay 時睡眠1500毫秒,大於client的設置的1秒,這裏就超時了
    }

    return &pb.EchoResponse{Message:message}, nil
}

上面函數標註 <1> 這個地方比較有意思,當client端發送的字符串包含 [propagate me] 字符串時,先睡眠800毫秒,而後在從新執行客戶端請求服務端的函數 s.client.UnaryEcho() , 在次運行到服務端的 UnaryEcho(),客戶端已經超時了。
也就是說client/main.go 先請求了一次服務端,而後在server/main.go 的函數 func (s *server) UnaryEcho(ctx context.Context, req *pb.EchoRequest) 又執行了一次請求服務端,因此會致使超時。

2端設置stream的函數:

func (s *server) BidirectionalStreamingEcho(stream pb.Echo_BidirectionalStreamingEchoServer) error {
    for {
        req, err := stream.Recv()
        if err == io.EOF {
            return status.Error(codes.InvalidArgument, "request message not received")
        }
        if err != nil {
            return err
        }

        message := req.Message
        if strings.HasPrefix(message, "[propagate me]") {
            time.Sleep(800 * time.Millisecond)
            message = strings.TrimPrefix(message, "[propagate me]")
            res, err := s.client.UnaryEcho(stream.Context(), &pb.EchoRequest{Message:message})//再次執行客戶端請求服務端函數,這裏可能會超時
            if err != nil {
                return err
            }
            stream.Send(res)
        }

        if message == "delay" {
            time.Sleep(1500 * time.Millisecond)
        }
        stream.Send(&pb.EchoResponse{Message:message})
    }
}

main函數

func main() {
    flag.Parse()

    address := fmt.Sprintf(":%v", *port)
    lis, err := net.Listen("tcp", address)
    if err != nil {
        log.Fatalf("failed to listen: %v ", err)
    }

    echoServer := newEchoServer()
    defer echoServer.Close()

    grpcServer := grpc.NewServer()
    pb.RegisterEchoServer(grpcServer, echoServer)

    if err := grpcServer.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v ", err)
    }
}

執行

先運行 /server/main.go , go run main.go

在運行 /client/main.go, go run main.go

執行結果:

go run main.go
[1] wanted = OK, got = OK
[2] wanted = DeadlineExceeded, got = DeadlineExceeded
[3] wanted = OK, got = Unavailable
[4] wanted = DeadlineExceeded, got = Unavailable
[5] wanted = OK, got = Unavailable
[6] wanted = DeadlineExceeded, got = Unavailable

gRPC 系列代碼地址:

參考:

相關文章
相關標籤/搜索