grpc和consul結合實現分佈式rpc調用

GRPC

主要介紹了grpc在使用示例和原理,以及如何與consul結合

gRPC 是什麼?

gRPC 也是基於如下理念:定義一個服務,指定其可以被遠程調用的方法(包含參數和返回類型)。在服務端實現這個接口,並運行一個 gRPC 服務器來處理客戶端調用。在客戶端擁有一個存根可以像服務端同樣的方法。

在 gRPC 裏客戶端應用能夠像調用本地對象同樣直接調用另外一臺不一樣的機器上服務端應用的方法,使得咱們可以更容易地建立分佈式應用和服務。html

參考文檔:gRPC Python Quickstartpython

開始前確保已經安裝grpcio-tools和grpcio這兩個包git

定義一個GRPC有以下三個步驟:windows

  1. 定義一個消息類型
  2. 編譯該proto文件
  3. 編寫服務端代碼
  4. 編寫客戶端代碼

咱們以實現一個echo的grpc爲例。安全

定義一個消息類型

首先定義通訊雙方(即客戶端和服務端)交互的消息格式(protobuf消息的格式),而後定義該echo服務
以下:bash

syntax = "proto3";  // 聲明使用 proto3 語法

//  定義客戶端請求的protobuf格式,以下所示,包含一個字符串字段q
message Req {
    string q = 1;
}

//  定義服務端相應的protobuf格式,以下所示,包含一個字符串字段a
message Resp {
    string a = 1;
}

//  定義echo服務,以下所示,該服務包含一個名稱爲"echo"的rpc
service Echoer{
    rpc echo (Req) returns (Resp) {}
}

使用如下命令編譯:服務器

python -m grpc_tools.protoc -I./ --python_out=. --grpc_python_out=. ./Echoer.proto

生成兩個py文件tcp

  • Echoer_pb2.py 此文件包含生成的 request(Req) 和 response(Resp) 類。
  • Echoer_pb2_grpc.py 此文件包含生成的 客戶端(EchoerStub)和服務端(EchoerServicer)的類

建立服務端代碼

建立和運行 Echoer 服務能夠分爲兩個部分:分佈式

  • 實現咱們服務定義的生成的服務接口:作咱們的服務的實際的「工做」的函數。
  • 運行一個 gRPC 服務器,監聽來自客戶端的請求並傳輸服務的響應。

在當前目錄,建立文件 Echoer_server.py,實現一個新的函數:ide

from concurrent import futures
import time

import grpc

import Echoer_pb2
import Echoer_pb2_grpc

_ONE_DAY_IN_SECONDS = 60 * 60 * 24


class Echoer(Echoer_pb2_grpc.EchoerServicer):
    # 工做函數
    def SayHello(self, request, context):
        return Echoer_pb2.Resp(a="echo")


def serve():
    # gRPC 服務器
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    Echoer_pb2_grpc.add_EchoerServicer_to_server(Echoer(), server)
    server.add_insecure_port('[::]:50051')
    server.start()  # start() 不會阻塞,若是運行時你的代碼沒有其它的事情可作,你可能須要循環等待。
    try:
        while True:
            time.sleep(_ONE_DAY_IN_SECONDS)
    except KeyboardInterrupt:
        server.stop(0)

if __name__ == '__main__':
    serve()

建立客戶端代碼

在當前目錄,打開文件 Echoer_client.py,實現一個新的函數:

from __future__ import print_function

import grpc

import Echoer_pb2
import Echoer_pb2_grpc


def run():
    channel = grpc.insecure_channel('localhost:50051') # 建立信道
    stub = Echoer_pb2_grpc.EchoerStub(channel) # 經過信道獲取憑據,即Stub
    response = stub.echo(Echoer_pb2.Req(q='echo')) # 調用rpc,獲取響應
    print("Echoer client received: " + response.a)

if __name__ == '__main__':
    run()

運行代碼

首先運行服務端代碼

python Echoer_server.py

複製代碼
而後運行客戶端代碼

python Echoer_client.py
# output
Echoer client received: echo

進階

點擊查看參考博客

爲了通訊安全起見,GRPC提供了TSlSSL的支持。

首先利用openssl建立一個自簽名證書

$ openssl genrsa -out server.key 2048
Generating RSA private key, 2048 bit long modulus (2 primes)
............................................................+++++
................................................................................................................................+++++
e is 65537 (0x010001)

$ openssl req -new -x509 -sha256 -key server.key -out server.crt -days 3650
You are about to be asked to enter information that will be incorporated
into your certificate request.
What you are about to enter is what is called a Distinguished Name or a DN.
There are quite a few fields but you can leave some blank
For some fields there will be a default value,
If you enter '.', the field will be left blank.
-----
Country Name (2 letter code) [AU]:
State or Province Name (full name) [Some-State]:
Locality Name (eg, city) []:
Organization Name (eg, company) [Internet Widgits Pty Ltd]:
Organizational Unit Name (eg, section) []:
Common Name (e.g. server FQDN or YOUR name) []:Echoer
Email Address []:

生成了server.key和server.crt兩個文件,服務端兩個文件都須要,客戶端只須要crt文件

修改服務端代碼

server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
Echoer_pb2_grpc.add_EchoerServicer_to_server(Echoer(), server)
# 讀取 key and certificate
with open(os.path.join(os.path.split(__file__)[0], 'server.key')) as f:
    private_key = f.read().encode()
with open(os.path.join(os.path.split(__file__)[0], 'server.crt')) as f:
    certificate_chain = f.read().encode()
# 建立 server credentials
server_creds = grpc.ssl_server_credentials(((private_key, certificate_chain,),))
# 調用add_secure_port方法,而不是add_insesure_port方法
server.add_secure_port('localhost:50051', server_creds)

修改客戶端代碼

# 讀取證書
with open('server.crt') as f:
    trusted_certs = f.read().encode()
# 建立 credentials
credentials = grpc.ssl_channel_credentials(root_certificates=trusted_certs)
# 調用secure_channel方法,而不是insecure_channel方法
channel = grpc.secure_channel('localhost:50051', credentials)

啓動服務端後,啓動客戶端,會出現如下錯誤:

grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with:
        status = StatusCode.UNAVAILABLE
        details = "Connect Failed"
        debug_error_string = "{"created":"@1547552759.642000000","description":"Failed to create subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":2721,"referenced_errors":[{"created":"@1547552759.642000000","description":"Pick Cancelled","file":"src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc","file_line":241,"referenced_errors":[{"created":"@1547552759.642000000","description":"Connect Failed","file":"src/core/ext/filters/client_channel/subchannel.cc","file_line":689,"grpc_status":14,"referenced_errors":[{"created":"@1547552759.642000000","description":"Peer name localhost is not in peer certificate","file":"src/core/lib/security/security_connector/security_connector.cc","file_line":880}]}]}]}"
>

!!! 警告:

這是由於TSLSSL模式下,客戶端是經過服務名稱:port來獲取服務的憑據,而不是ip:port, 因此對客戶端作以下修改:

# 修改前
channel = grpc.secure_channel('localhost:50051', credentials)
# 修改後
channel = grpc.secure_channel('Echoer:50051', credentials)

!!! 警告:

其次,在TSLSSL模式下,客戶端對服務名稱:port解析時候須要dns支持,目前不知道如何解決,只可以採起如下措施解決,經過修改windows的host文件,利用host將服務名稱解析爲IP地址,
打開windows的host文件,地址:C:\Windows\System32\drivers\etc\hosts備份後修改以下,添加:

# 服務的IP地址 服務名稱
127.0.0.1 Echoer

保存便可

修改後,再次運行,便可運行成功

注意事項:CA證書和私鑰key都是配套的,不配套的CA證書和key是沒法校驗成功的

結合consul

注意事項:確保consul已經正確啓動,查看http://ip:port:8500/, 可查看consul的狀態,確保已經安裝python-consul這個庫,不然沒法操做consul

首先想象咱們以上的grpc示例程序之因此成功的有限制條件,

  • 咱們知道服務端已經正常啓動
  • 咱們知道了服務端的ip和端口

但在實際過程當中,通常是不可能確切知道服務的ip和端口的,因此consul就起了箇中間橋樑的做用,具體以下:

服務註冊

服務註冊,顧名思義,服務在啓動以前,必須如今consul中註冊。

服務端:當服務端啓動以後,consul會利用服務註冊時得到的ip和port同服務創建聯繫,其中最重要的就是health check即心跳檢測。consul經過心跳檢測來斷定該服務是否正常。

客戶端:客戶端經過consul來查詢所需服務的ip和port,若對應服務已經註冊且心跳檢測正常,則會返回給客戶端對應的ip和port信息,而後客戶端就能夠利用這個來鏈接服務端了

服務註冊示例代碼以下:

def register(self, server_name, ip, port, consul_host=CONSUL_HOST):
    """
    server_name: 服務名稱
    ip: 服務IP地址
    port: 服務監聽的端口
    consul_host: 所鏈接的consul服務器的IP地址
    """
    c = consul.Consul(host=consul_host) # 獲取與consul的鏈接
    print(f"開始註冊服務{server_name}")
    check = consul.Check.tcp(ip, port, "10s") # 設置心跳檢測的超時時間和對應的ip和port端口
    c.agent.service.register(server_name, f"{server_name}-{ip}-{port}", address=ip, port=port, check=check) # 註冊

既然有服務註冊,固然會有服務註銷,示例代碼以下:

def unregister(self, server_name, ip, port, consul_host=CONSUL_HOST):
    c = consul.Consul(host=consul_host)
    print(f"開始退出服務{server_name}")
    c.agent.service.deregister(f"{server_name}-{ip}-{port}")

服務查詢

客戶端則須要在consul中查詢對應服務的IP和port,但因爲在TSL/SSL模式下,所需的只是服務名稱和port,故而只須要查詢port端口便可。

客戶端服務查詢採用的是DNS的查詢方式,必須確保安裝dnspython庫,用於建立DNS查詢

服務查詢示例代碼以下:

# 建立一個consul dns查詢的 resolver
consul_resolver = resolver.Resolver()
consul_resolver.port = 8600
consul_resolver.nameservers = [consul_host]

def get_host_port(self, server_name):
    try:
        dns_answer_srv = consul_resolver.query(f"{server_name}.service.consul", "SRV") # 查詢對應服務的port,
    except DNSException as e:
        return None, None
    return server_name, dns_answer_srv[0].port # 返回服務名和端口

grpc流模式

grpc總共提供了四種數據交互模式:

  • simpe 簡單模式 RPC:即上述的全部的grpc
  • server-side streaming 服務端流式 RPC
  • client-side streaming 客戶端流式 RPC
  • Bidirectional streaming 雙向數據流模式的 gRPC

因爲grpc對於消息有大小限制,diff數據過大會致使沒法接收數據,咱們在使用過程當中,使用了流模式來解決了此類問題,
在此模式下,客戶端傳入的參數由具體的protobuf變爲了protobuf的迭代器,客戶端接收的響應也變爲了迭代器,獲取完整的響應則須要迭代獲取。
服務端響應也變爲了一個迭代器。

修改服務定義文件:

# 修改前
service Echoer{
    rpc echo (Req) returns (Resp) {}
}
# 修改後
service Echoer{
    rpc echo (stream Req) returns (stream Resp) {}
}

從新編譯

修改服務端

將工做函數修改成以下所示, 即工做函數變成了一個迭代器:

def echo(self, request_iterator, context):
    for i in range(10):
        yield Echoer_pb2.Resp(a="echo")

修改客戶端

將echo的傳入參數修改成迭代器:

def qq():
    for i in range(10):
        yield Echoer_pb2.Req(q="echo")
response = stub.echo(qq())
for resp in response:
    print("Echoer client received: " + response.a)

從新運行,接收結果以下:

$ python Echoer_client.py
Echoer client received: echo
Echoer client received: echo
Echoer client received: echo
Echoer client received: echo
Echoer client received: echo
Echoer client received: echo
Echoer client received: echo
Echoer client received: echo
Echoer client received: echo
Echoer client received: echo
相關文章
相關標籤/搜索