【go網絡編程】-RPC編程

RPC介紹

Remote Procedure Call,遠程過程調用golang

解決問題

而一旦踏入公司尤爲是大型互聯網公司就會發現,公司的系統都由成千上萬大大小小的服務組成,各服務部署在不一樣的機器上,由不一樣的團隊負責。這時就會遇到兩個問題:
1)要搭建一個新服務,免不了須要依賴他人的服務,而如今他人的服務都在遠端,怎麼調用?
2)其它團隊要使用咱們的服務,咱們的服務該怎麼發佈以便他人調用?網絡

過程

圖片描述

  1. 服務消費方(client)調用以本地調用方式調用服務;
  2. client stub接收到調用後負責將方法、參數等組裝成可以進行網絡傳輸的消息體;
  3. client stub找到服務地址,並將消息發送到服務端;
  4. server stub收到消息後進行解碼;
  5. server stub根據解碼結果調用本地的服務;
  6. 本地服務執行並將結果返回給server stub;
  7. server stub將返回結果打包成消息併發送至消費方;
  8. client stub接收到消息,並進行解碼;
  9. 服務消費方獲得最終結果。

RPC的目標就是要2~8這些步驟都封裝起來,讓用戶對這些細節透明。併發

實現方式

RPC能夠經過HTTP來實現(grpc基於http2.0),也能夠經過Socket本身實現一套協議來實現.app

grpc框架

4種通訊方式

  • 簡單rpc:一個請求對象對應一個返回對象

rpc simpleHello(Person) returns (Result) {}框架

  • 服務端流式rpc :一個請求對象,服務端能夠傳回多個結果對象

rpc serverStreamHello(Person) returns (stream Result) {}tcp

  • 客戶端流式rpc:客戶端傳入多個請求對象,服務端返回一個響應結果

rpc clientStreamHello(stream Person) returns (Result) {}google

  • 雙向流式rpc:能夠傳入多個對象,返回多個響應對象

rpc biStreamHello(stream Person) returns (stream Result) {}spa

案例

simple.protocode

syntax = "proto3";
package testRPC;

service testRPC {

    rpc simple (emit) returns (on) {
    }
    rpc serverStream (emit) returns (stream on){}
    rpc clientStream (stream emit) returns (on){}
    rpc bothStream (stream emit) returns (stream on){}
}

message emit{
    string type = 1;
    string name = 2;
}

message on{
    string type = 1;
    int32 age = 2;
}

serviceserver

package main

import (
    "net"
    "google.golang.org/grpc"
    pb "google.golang.org/grpc/examples/test/proto"
    "google.golang.org/grpc/reflection"
    "fmt"
    "golang.org/x/net/context"
    "io"
)

const (
    port = ":50051"
)

type server struct {
}

func (s *server) Simple(ctx context.Context, in *pb.Emit) (*pb.On, error) {
    return &pb.On{Type: "Hello " + in.Type, Age: 10}, nil
}
func (s *server) ServerStream(in *pb.Emit, stream pb.TestRPC_ServerStreamServer) (error) {
    for i := 1; i < 10; i++ {
        if err := stream.Send(&pb.On{Type: "Hello " + in.Type, Age: int32(i)}); err != nil {
            return err
        }
    }
    return nil
}
func (s *server) ClientStream(stream pb.TestRPC_ClientStreamServer) (error) {
    var pointCount int32
    for {
        _, err := stream.Recv()
        if err == io.EOF {
            return stream.SendAndClose(&pb.On{
                Type: "end",
                Age:  pointCount,
            })
        }
        pointCount++
    }
}

func (s *server) BothStream(stream pb.TestRPC_BothStreamServer) (error) {
    for {
        emit, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        fmt.Println(string(emit.Type))
        for i := 1; i < 3; i++ {
            if err := stream.Send(&pb.On{Type: emit.Type, Age: int32(i)}); err != nil {
                return err
            }
        }

    }
    return nil
}

func main() {
    lis, _ := net.Listen("tcp", port)
    s := grpc.NewServer()
    pb.RegisterTestRPCServer(s, &server{})
    reflection.Register(s)
    if err := s.Serve(lis); err != nil {
        fmt.Println("error")
    }
}

client

package main

import (
    "google.golang.org/grpc"
    pb "google.golang.org/grpc/examples/test/proto"
    "golang.org/x/net/context"
    "time"
    "log"
    "io"
)

const (
    address = "localhost:50051"
)

func main() {
    conn, _ := grpc.Dial(address, grpc.WithInsecure())
    defer conn.Close()

    rpcClient := pb.NewTestRPCClient(conn)

    testSimple(rpcClient)

    testServerStream(rpcClient)

    testClientStream(rpcClient)

    testBothStream(rpcClient)

}

func testSimple(client pb.TestRPCClient) {
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()
    // simple
    r, _ := client.Simple(ctx, &pb.Emit{Type: "11"})
    log.Printf("Greeting: %s", r)
}

func testServerStream(client pb.TestRPCClient) {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    stream, err := client.ServerStream(ctx, &pb.Emit{Type: "Hello "})
    if err != nil {
        log.Fatalf("%v.ListFeatures(_) = _, %v", client, err)
    }
    for {
        feature, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatalf("%v.ListFeatures(_) = _, %v", client, err)
        }
        log.Println(feature)
    }
}

func testClientStream(client pb.TestRPCClient) {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    stream, err := client.ClientStream(ctx)
    if err != nil {
        log.Fatalf("%v.RecordRoute(_) = _, %v", client, err)
    }
    var points []pb.Emit
    for i := 0; i < 10; i++ {
        points = append(points, pb.Emit{Type: "t", Name: string(i)})
    }
    for _, point := range points {
        if err := stream.Send(&point); err != nil {
            log.Fatalf("%v.Send(%v) = %v", stream, point, err)
        }
    }
    reply, err := stream.CloseAndRecv()
    if err != nil {
        log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
    }
    log.Printf("Route summary: %v", reply)
}

func testBothStream(client pb.TestRPCClient) {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    stream, err := client.BothStream(ctx)
    if err != nil {
        log.Fatalf("%v.RecordRoute(_) = _, %v", client, err)
    }
    var points []pb.Emit
    for i := 0; i < 10; i++ {
        points = append(points, pb.Emit{Type: "xiaoming"+string(i), Name: "xiaohong"})
    }
    waitc := make(chan struct{})
    go func() {
        for {
            on, err := stream.Recv()
            if err == io.EOF {
                // read done.
                close(waitc)
                return
            }
            if err != nil {
                log.Fatalf("Failed to receive a note : %v", err)
            }
            log.Printf("Got message %s = %d)", on.Type, on.Age)
        }
    }()
    for _, point := range points {
        if err := stream.Send(&point); err != nil {
            log.Fatalf("Failed to send a note: %v", err)
        }
    }
    stream.CloseSend()
    <-waitc
}
相關文章
相關標籤/搜索