gRPC的簡單使用

前言

八月初的時候,在公司內部作了一個主題爲《gRPC的簡單使用》的分享,其實就是和小夥伴們扯扯淡,如今抽空回憶一下,也算是一個小小的總結吧。python

如今市面上耳熟能詳的RPC框架也不少,下面列舉幾個遇到比較多的。git

  1. 谷歌的gRPC
  2. 推特的Thrift
  3. 阿里的Dubbo
  4. 。。。。

它們都是支持多語言的,相對來講,這三個之中,Dubbo支持的語言略微少一點。如今在一個公司內都能見到多種語言的技術棧都已是十分常見的事了,比如我司,都有JAVA,C#,Python三種語言了,因此在多語言支持這方面,在技術選型的時候,確定是要有所考慮的。github

下面進入正式的主題,gRPC。docker

gRPC的簡單介紹

gRPC是一個現代的開源高性能RPC框架,能夠在任何環境中運行。它能夠高效地將數據中心內和跨數據中心的服務鏈接起來,並支持可插拔的負載平衡、跟蹤、健康檢查和身份驗證。同時,它還把設備,移動應用程序和瀏覽器鏈接到後端服務的分佈式計算變得很容易。c#

gRPC有什麼優勢呢?後端

  1. 簡單的服務定義 (使用Protocol Buffers定義服務,這是一個功能強大的二進制序列化工具集和語言)
  2. 跨語言和平臺工做 (在微服務式架構中有效地鏈接多語言服務(10+種語言支持)並能自動爲各類語言和平臺的服務生成慣用的客戶端和服務器存根)
  3. 快速啓動並擴展 (使用單行安裝運行時和開發環境,並使用框架每秒擴展到數百萬個RPC)
  4. 雙向流媒體和集成的身份驗證 (雙向流媒體和集成的身份驗證 基於http/2的傳輸的雙向流和徹底集成的可插拔身份驗證)

gRPC在使用的時候有4種模式供咱們選擇api

  1. 一元RPC(Unary RPCs ):這是最簡單的定義,客戶端發送一個請求,服務端返回一個結果
  2. 服務器流RPC(Server streaming RPCs):客戶端發送一個請求,服務端返回一個流給客戶端,客戶從流中讀取一系列消息,直到讀取全部消息
  3. 客戶端流RPC(Client streaming RPCs ):客戶端經過流向服務端發送一系列消息,而後等待服務端讀取完數據並返回處理結果
  4. 雙向流RPC(Bidirectional streaming RPCs):客戶端和服務端均可以獨立向對方發送或接受一系列的消息。客戶端和服務端讀寫的順序是任意。

咱們要根據具體的場景來決定選擇那一種。瀏覽器

這裏只介紹一元RPC。正常來講,一元RPC應該能夠知足咱們平常60~70%的需求了吧。緩存

基本用法

gRPC的基本用法能夠簡單的分爲三個點:

  • 服務的定義,即proto文件的編寫
  • 服務端代碼編寫
  • 客戶端代碼編寫

下面咱們依次來看一下

服務的定義

既然要定義一個服務,確定是知道了這個服務要完成什麼事以後。

在定義以前,要對proto3和proto2有所瞭解。不過proto3是推薦的格式。因此咱們基本上只要用proto3就能夠了。

下面先來看一個後面要用到的proto文件。

syntax = "proto3";

option csharp_namespace = "XXXService";

package UserInfo;

service UserInfoService {
  rpc GetList(GetUserListRequest) returns (GetUserListReply){}
  rpc GetById(GetUserByIdRequest) returns (GetUserByIdRelpy){}
  rpc Save(SaveUserRequest) returns (SaveUserReply){}
}


message GetUserByIdRequest {
    int32 id = 1;
}

message GetUserByIdRelpy{
    int32 id = 1;
    string name = 2;
    int32 age = 3;
    int64 create_time = 4;
}

message GetUserListRequest {
    int32 id = 1;
    string name = 2;    
}

message GetUserListReply {
  message MsgItem {
    int32 id = 1;
    string name = 2;
    int32 age = 3;
    int64 create_time = 4;
   }
   int32 code = 1;
   string msg = 2;
   repeated MsgItem data = 3;
}

message SaveUserRequest {
    string name = 1;
    int32 age = 2;  
}

message SaveUserReply {
   int32 code = 1;
   string msg = 2;
}

它有下面的幾個部分

  1. syntax , 指定要用那個版本的語法
  2. service , 指定rpc服務的接口,簡單理解成咱們平時定義的接口
  3. message , 指定要傳輸的消息體,簡單理解成咱們日常用的 DTO
  4. package , 指定包名
  5. option , 可選參數的定義,不一樣語言有不一樣的選項

其實看上去仍是比較容易懂的。至少一眼看過去能知道是些什麼意思。

若是對proto3尚未了解的,能夠參考這個文檔Language Guide (proto3),裏面很清楚的介紹了一些數據類型和不一樣語言數據類型的對應關係。

這裏有一個要注意的是,時間類型,在proto3中,沒有datetime類型,過去很長一段時間,咱們是隻能用時間戳來表示時間,也就是定義一個長整型,如今是能夠用timestamp表處理了。

在寫服務端和客戶端代碼以前,咱們須要根據proto文件生成對應的代碼。

一個命令便可搞定。

protoc --proto_path=IMPORT_PATH \
           --cpp_out=DST_DIR \
           --java_out=DST_DIR \
           --python_out=DST_DIR \
           --go_out=DST_DIR \
           --objc_out=DST_DIR \
           --csharp_out=DST_DIR \
           path/to/file.proto

如今時代進步的這麼快,很多語言已經有工具作了集成,能夠在build項目的時候就生成對應的文件了,不須要咱們再單獨去執行一次上面的那個命令。

比如咱們的.NET項目,能夠在ItemGroup中直接指定Protobuf,而後告訴它,proto文件是那個,是要生成服務端代碼仍是客戶端代碼。

能夠看看下面這個具體的例子。

<Project Sdk="Microsoft.NET.Sdk.Web">

  <PropertyGroup>
    <TargetFramework>netcoreapp2.1</TargetFramework>
  </PropertyGroup>

  <ItemGroup>
    <Protobuf Include="Protos\userinfo.proto" GrpcServices="Server" />
  </ItemGroup>
  
  <ItemGroup>
    <PackageReference Include="Microsoft.AspNetCore.App" />
    <PackageReference Include="Microsoft.AspNetCore.Razor.Design" Version="2.1.2" PrivateAssets="All" />
    <PackageReference Include="Google.Protobuf" Version="3.8.0" />
    <PackageReference Include="Grpc.Core" Version="1.22.0" />
    <PackageReference Include="Grpc.Tools" Version="1.22.0">
      <PrivateAssets>all</PrivateAssets>
      <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
    </PackageReference>
  </ItemGroup>

</Project>

再往下,就是寫代碼了。

服務端代碼編寫

服務端代碼分兩部分,一部分是服務具體的實現,一部分是服務怎麼起來。

先來看看服務的具體實現。

namespace MyBasedServiceA
{
    using Grpc.Core;
    using System.Linq;
    using System.Threading.Tasks;

    public class UserInfoServiceImpl : UserInfoService.UserInfoServiceBase
    {
        public override Task<GetUserByIdRelpy> GetById(GetUserByIdRequest request, ServerCallContext context)
        {
            var result = new GetUserByIdRelpy();

            var user = FakeUserInfoDb.GetById(request.Id);

            result.Id = user.Id;
            result.Name = user.Name;
            result.Age = user.Age;
            result.CreateTime = user.CreateTime;

            return Task.FromResult(result);
        }

        public override Task<GetUserListReply> GetList(GetUserListRequest request, ServerCallContext context)
        {
            var result = new GetUserListReply();

            var userList = FakeUserInfoDb.GetList(request.Id, request.Name);

            result.Code = 0;
            result.Msg = "成功";
            result.Data.AddRange(userList.Select(x => new GetUserListReply.Types.MsgItem
            {
                Id = x.Id,
                Age = x.Age,
                CreateTime = x.CreateTime,
                Name = x.Name
            }));

            return Task.FromResult(result);
        }

        public override Task<SaveUserReply> Save(SaveUserRequest request, ServerCallContext context)
        {
            var result = new SaveUserReply();

            var flag = FakeUserInfoDb.Save(request.Name, request.Age);

            result.Code = 0;
            result.Msg = "成功";
            
            return Task.FromResult(result);
        }
    }
}

能夠看到上面的代碼,咱們只要繼承由proto文件生成的一個基類,而後去重寫它的實現,就能夠認爲是實現了一個服務。這個其實就是寫咱們具體的業務邏輯,大boss有什麼需求,堆上去就行了。

而後來看第二部分,服務怎麼起來。

在這裏我選擇的方案是使用通用主機來跑。固然也能夠直接在Startup的Configure方法中去啓動服務。只要能起來就行 :-D

namespace MyBasedServiceA
{
    using Grpc.Core;
    using Microsoft.Extensions.Hosting;
    using Microsoft.Extensions.Logging;
    using System;
    using System.Threading;
    using System.Threading.Tasks;

    public class MyBasedServiceAHostedService : BackgroundService
    {
        private readonly ILogger _logger;

        private Server _server;

        public MyBasedServiceAHostedService(ILoggerFactory loggerFactory)
        {
            this._logger = loggerFactory.CreateLogger<MyBasedServiceAHostedService>();

            _server = new Server
            {
                Services = { UserInfoService.BindService(new UserInfoServiceImpl()) },
                // ServerCredentials.Insecure仍是沒有用https,只用於演示
                // 生產環境,建議仍是弄個證書
                Ports = { new ServerPort("0.0.0.0", 9999, ServerCredentials.Insecure) }                
            };
        }
      
        protected override Task ExecuteAsync(CancellationToken stoppingToken)
        {
            _server.Start();
            return Task.CompletedTask;
        }
    }
}

而後是Program中的代碼。

namespace MyBasedServiceA
{
    using Microsoft.AspNetCore.Builder;
    using Microsoft.AspNetCore.Hosting;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.Hosting;
    using Microsoft.Extensions.Logging;

    public class Program
    {
        public static void Main(string[] args)
        {
            var host = new HostBuilder()
                .ConfigureLogging((hostContext, configLogging) =>
                {
                    configLogging.AddConsole();
                    configLogging.AddDebug();
                })
                .ConfigureServices((hostContext, services) =>
                {
                    services.AddHostedService<MyBasedServiceAHostedService>();
                })
                .Build();

            host.Run();
        }
    }
}

到這裏,服務端已經能夠了。

下面就是客戶端了。

客戶端代碼編寫

在這裏客戶端,咱們寫兩個,一個基於C#(.net core), 一個基於python。恰好也驗證一下gRPC的多語言。

正常來講,咱們所說的客戶端可能很大一部分是對外的WEB API了,就是說api的內部實現,是rpc的調用,而對外的是常見的返回JSON的rest api。

咱們先經過控制檯來體驗一下它的客戶端調用。

C#(.net core)客戶端

class Program
{
    static void Main(string[] args)
    {
        var channel = new Channel("localhost:9999", ChannelCredentials.Insecure);
        var client = new UserInfoService.UserInfoServiceClient(channel);

        var saveResponse = client.Save(new SaveUserRequest { Age = 99, Name = "c#name" });

        Console.WriteLine($"Save received: code = {saveResponse.Code} ,  msg = {saveResponse.Msg}");

        var getListResponse = client.GetList(new GetUserListRequest { });

        Console.WriteLine($"GetList received: code =  {getListResponse.Code} ,  msg = {getListResponse.Msg}");

        foreach (var item in getListResponse.Data)
        {
            Console.WriteLine(item.Name);
        }

        Console.ReadKey();
    }
}

其實這種方式咱們很容易聯想到WCF,都是生成代碼,能夠直接點出來的方法,強類型的使用體驗。不過我是基本沒有用過WCF的,貌似暴露了年齡了,逃~~

python客戶端

python要想運行gRPC相關的,要先安裝 grpcio-tools,而後再用命令生成相應的文件。

# 安裝
pip install grpcio-tools

# 生成
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. ./userinfo.proto

具體實現

import grpc
import userinfo_pb2, userinfo_pb2_grpc

_HOST = 'localhost'
_PORT = '9999'

def run():
    conn = grpc.insecure_channel(_HOST + ':' + _PORT)

    client = userinfo_pb2_grpc.UserInfoServiceStub(channel=conn)

    saveResponse = client.Save(userinfo_pb2.SaveUserRequest(name="pyname", age=39))
    print("Save received: code = " + str(saveResponse.code) + ", msg = "+ saveResponse.msg)

    getListResponse = client.GetList(userinfo_pb2.GetUserListRequest())
    print("GetList received: code = " + str(getListResponse.code) + ", msg = "+ getListResponse.msg)

    for d in getListResponse.data:
        print(d.name)

if __name__ == '__main__':
    run()

一樣也是很簡潔。

運行效果

在服務端起來的狀況下,先運行.net core的客戶端,而後再運行python的客戶端,結果大體以下。

注: 在調用的時候,有幾個概念要知道!!

  1. gRPC中沒有采用傳統的timeout方式去處理,而是採用了Deadline機制,以爲這個機制和咱們的CancellationToken很類似
  2. 不管是客戶端仍是服務端,均可以隨時取消RPC

能夠看到咱們如今的地址都是硬編碼的,由於只有一個節點,而後在線上環境,都會是多節點的,因此咱們須要有服務註冊和服務發現,下面咱們就結合consul來完成服務註冊與發現。

服務治理(註冊與發現)

固然如今可選的工具仍是有不少的,consul,etcd,eureka等,固然最好的仍是直接上K8S,不過咱們公司還有很長的一段路才能上,因此咱們就怎麼簡單怎麼來了。

下面咱們調整一下服務端的代碼,讓gRPC的服務能夠註冊到consul上面。

public class MyBasedServiceAHostedService : BackgroundService
{
    private readonly Microsoft.Extensions.Logging.ILogger _logger;
    private readonly IConfiguration _configuration;
    private readonly IConsulClient _consulClient;

    private Server _server;
    private AgentServiceRegistration registration;

    public MyBasedServiceAHostedService(ILoggerFactory loggerFactory, IConfiguration configuration, IConsulClient consulClient, IHostingEnvironment environment)
    {
        this._logger = loggerFactory.CreateLogger<MyBasedServiceAHostedService>();
        this._configuration = configuration;
        this._consulClient = consulClient;

        var port = _configuration.GetValue<int>("AppSettings:Port");

        _logger.LogInformation($"{environment.EnvironmentName} Current Port is : {port}");

        // global logger for grpc
        GrpcEnvironment.SetLogger(new GrpcAdapterLogger(loggerFactory));

        var address = GetLocalIP();

        _logger.LogInformation($"{environment.EnvironmentName} Current IP is : {address}");

        registration = new AgentServiceRegistration()
        {
            ID = $"MyBasedServiceA-{Guid.NewGuid().ToString("N")}",
            Name = "MyBasedServiceA",
            Address = address,
            Port = port,
            Check = new AgentServiceCheck
            {
                TCP = $"{address}:{port}",
                DeregisterCriticalServiceAfter = TimeSpan.FromSeconds(5),
                Interval = TimeSpan.FromSeconds(10),
                Timeout = TimeSpan.FromSeconds(5)
            } 
        };

        _server = new Server
        {
            Ports = { new ServerPort("0.0.0.0", port, ServerCredentials.Insecure) }                
        };

        // not production record some things
        if (!environment.IsProduction())
        {
            _server.Services.Add(UserInfoService.BindService(new UserInfoServiceImpl()).Intercept(new AccessLogInterceptor(loggerFactory)));
        }
        else
        {
            _server.Services.Add(UserInfoService.BindService(new UserInfoServiceImpl()));
        }
    }
  
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {            
        await _consulClient.Agent.ServiceDeregister(registration.ID);
        await _consulClient.Agent.ServiceRegister(registration);
        _logger.LogInformation($"Registering with Consul {registration.ID} OK");  
        _server.Start();
    }

    public override async Task StopAsync(CancellationToken cancellationToken)
    {
        _logger.LogInformation("Unregistering from Consul");

        await _consulClient.Agent.ServiceDeregister(registration.ID);
        await _server.KillAsync();
        await base.StopAsync(cancellationToken);
    }

    private string GetLocalIP()
    {
        try
        {
            string hostName = Dns.GetHostName(); 
            IPHostEntry ipEntry = Dns.GetHostEntry(hostName);
            for (int i = 0; i < ipEntry.AddressList.Length; i++)
            {
                if (ipEntry.AddressList[i].AddressFamily == AddressFamily.InterNetwork)
                {
                    return ipEntry.AddressList[i].ToString();
                }
            }
            return "127.0.0.1";
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Get Local Ip error");
            return "127.0.0.1";
        }
    }
}

而後客戶端要定義一個從consul中讀取實例的方法。

從consul中讀取對應service的健康實例,正常會用定時輪訓的方式去讀取,或者寫入短期的緩存。

public class FindService : IFindService 
{
    private readonly ILogger _logger;
    private readonly IConsulClient _consulClient;
    private readonly ConcurrentDictionary<string, (List<string> List, DateTimeOffset Expiration)> _dict;

    public FindService(ILoggerFactory loggerFactory, IConsulClient consulClient)
    {
        _logger = loggerFactory.CreateLogger<FindService>();
        _consulClient = consulClient;
        _dict = new ConcurrentDictionary<string, (List<string> List, DateTimeOffset Expiration)>();
    }

    public async Task<string> FindServiceAsync(string serviceName)
    {
        var key = $"SD:{serviceName}";

        if (_dict.TryGetValue(key, out var item) && item.Expiration > DateTimeOffset.UtcNow)
        {
            _logger.LogInformation($"Read from cache");
            return item.List[new Random().Next(0, item.List.Count)];              
        }
        else
        {
            var queryResult = await _consulClient.Health.Service(serviceName, string.Empty, true);

            var result = new List<string>();
            foreach (var serviceEntry in queryResult.Response)
            {
                result.Add(serviceEntry.Service.Address + ":" + serviceEntry.Service.Port);
            }

            _logger.LogInformation($"Read from consul : {string.Join(",", result)}");

            if (result != null && result.Any())
            {
                // for demonstration, we make expiration a little big
                var val = (result, DateTimeOffset.UtcNow.AddSeconds(600));

                _dict.AddOrUpdate(key, val, (x, y) => val);

                var count = result.Count;
                return result[new Random().Next(0, count)];
            }

            return "";
        }
    }
}

調用的時候。

private async Task<(UserInfoService.UserInfoServiceClient Client, string Msg)> GetClientAsync(string name)
{
    var target = await _findService.FindServiceAsync(name);
    _logger.LogInformation($"Current target = {target}");

    if (string.IsNullOrWhiteSpace(target))
    {
        return (null, "can not find a service");
    }
    else
    {
        var channel = new Channel(target, ChannelCredentials.Insecure);

        var client = new UserInfoService.UserInfoServiceClient(channel);
        return (client, string.Empty);
    }
}

而後咱們編寫docker-compose.yml, 讓它在docker中跑

version: '3.4'

services:
  xxxserver1:
    image: ${DOCKER_REGISTRY-}xxxserver
    build:
      context: .
      dockerfile: MyBasedServiceA/Dockerfile
    ports:
      - "9999:9999"  # 綁定容器的9999端口到主機的9999端口
    depends_on:
      - consuldev      
    networks:  
      backend:

  xxxserver2:
    image: ${DOCKER_REGISTRY-}xxxserver
    build:
      context: .
      dockerfile: MyBasedServiceA/Dockerfile
    ports:
      - "9995:9999"   # 綁定容器的9999端口到主機的9995端口
    depends_on:
      - consuldev      
    networks:  
      backend:      
      
  xxxclient:
    image: ${DOCKER_REGISTRY-}xxxclient
    build:
      context: .
      dockerfile: XXXService/Dockerfile
    ports:
      - "9000:80"
    depends_on:
      - consuldev    
      - xxxserver1
      - xxxserver2
    networks:  
      backend:

  consuldev:
    image: consul:latest    
    ports:
      - "8300:8300"
      - "8400:8400"
      - "8500:8500"    
    networks:  
      backend:

networks:  
  backend:      
    driver: bridge

運行結果以下:

當用 docker 把這幾個服務都跑起來以後, 能夠看到相似下面的輸出

也能夠用docker ps命令來看一下那幾個服務是否是真的在運行。

同時,咱們打開consul的UI界面,能夠看到咱們服務端的兩個實例已經註冊上來了。

當咱們用客戶端去訪問的時候,能夠發現,第一次它是從consul中取下來了兩個ip,而後隨機選了一個進行訪問。

咱們把其中一個服務端(0.4)stop,用來模擬某個節點出現異常,被剔除的狀況 ,能夠發現consul上面已經看不到了,只剩下0.3這個節點了。

若是咱們的調度策略沒有及時將"死掉"的節點剔除,就會出現下面的這種狀況。

最後,把stop的服務端啓動,模擬恢復正常,這個時候能夠發現不管調度到那個節點均可以正常訪問了。

.NET Core 2.x 和 .NET Core 3.0的細微區別

在.NET Core 2.x中,咱們的Server,是須要手動控制的,在.NET Core 3.0中,能夠認爲它已經和Kestrel融爲一體了,再也不須要咱們再手動去Start了。

一樣的,服務的實現,也和Endpoint Routing緊密的結合在一塊兒了,再也不和以前同樣了。

能夠看看下面的例子,可能會發現,這是一種熟悉的不能再熟悉的感受。

public class Startup
{        
    public void ConfigureServices(IServiceCollection services)
    {
        services.AddGrpc(x=> 
        {
            x.EnableDetailedErrors = true;
            x.Interceptors.Add<AccessLogInterceptor>();
        });
    }
  
    public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
    {
        if (env.IsDevelopment())
        {
            app.UseDeveloperExceptionPage();
        }

        app.UseRouting();

        app.UseEndpoints(endpoints =>
        {
            endpoints.MapGrpcService<GreeterService>();

            endpoints.MapGrpcService<UserService>();

            endpoints.MapGet("/", async context =>
            {
                await context.Response.WriteAsync("Communication with gRPC endpoints must be made through a gRPC client. To learn how to create a client, visit: https://go.microsoft.com/fwlink/?linkid=2086909");
            });
        });
    }
}

客戶端的用法也和之前不同了,直接看一個例子就很清晰了。

[HttpGet]
public async Task<string> GetAsync(CancellationToken cancellationToken)
{
    // fixed https error
    var httpclientHandler = new HttpClientHandler
    {
        ServerCertificateCustomValidationCallback = (message, cert, chain, error) => true
    };

    var httpClient = new HttpClient(httpclientHandler)
    {
        // The port number(5001) must match the port of the gRPC server.
        BaseAddress = new Uri("https://localhost:5001")
    };

    try
    {
        var client = GrpcClient.Create<UserInfoRpcService.UserInfoRpcServiceClient>(httpClient);

        var callOptions = new Grpc.Core.CallOptions()
            // StatusCode=Cancelled
            .WithCancellationToken(cancellationToken)
            // StatusCode=DeadlineExceeded
            .WithDeadline(DateTime.UtcNow.AddMilliseconds(2000));

        var reply = await client.GetByIdAsync(new GetUserByIdRequest { Id = 1 }, callOptions);

        return reply.Name;
    }
    catch (Exception ex)
    {
        _logger.LogError(ex, "some exception occure");

        return "error";
    }            
}

從它的用法上,咱們也有熟悉的面孔 HttpClient

雖然gRPC是基於HTTP/2的,可是能夠看到咱們上面小節的例子中,仍是可以指定不使用的。然而到.NET Core 3.0以後,咱們就必需要使用https了,否則客戶端就是調不通的。一樣的,咱們也能夠在grpc-dotnet的倉庫上面看到,若是想不使用HTTP/2,就讓咱們用回以前的老庫,不要用新庫,James Newton-King就是這麼直接。

https://github.com/grpc/grpc-dotnet/issues/277

https://github.com/grpc/grpc-dotnet/issues/405

https://github.com/grpc/grpc-dotnet/issues/431

擴展閱讀

文中出現的示例代碼均可以在下面這個倉庫找到

catcherwong-archive/2019

相關文章
相關標籤/搜索