grpc: From Tutorial to Production by Alan Shreve at GopherCon 2017html
www.youtube.com/watch?v=7FZ…node
博文:about.sourcegraph.com/go/grpc-in-…git
答案就是:SOAP……好吧,開個玩笑,固然不多是 SOAP 了。github
如今流行的作法是 HTTP + JSON (REST API)golang
Alan 說「若是這輩子不再寫另外一個 REST 客戶端庫的話,那就能夠很幸福的死去了……😂」,由於這是最無聊的事情,一遍一遍的在作一樣的事情。swift
gPRC 是高性能、開源、通用的 RPC 框架。數組
與其講解定義,不如來實際作個東西更清楚。瀏覽器
使用 gRPC 這類東西,咱們並不是開始於寫 Go 代碼,咱們是從撰寫 gRPC 的 IDL 開始的。緩存
syntax = "proto3"
package rpc;
service Cache {
rpc Store(StoreReq) returns (StoreResp) {}
rpc Get(GetReq) returns (GetResp) {}
}
message StoreReq {
string key = 1;
bytes val = 2;
}
message StoreResp {
}
message GetReq {
string key = 1;
}
message GetResp {
bytes val = 1;
}
複製代碼
當寫了這個文件後,咱們馬上擁有了 9 種語言的客戶端的庫。bash
同時,咱們也擁有了 7 種語言的服務端的 API Stub:
func serverMain() {
if err := runServer(); err != nil {
fmt.Fprintf(os.Stderr, "Failed to run cache server: %s\n", err)
os.Exit(1)
}
}
func runServer() error {
srv := grpc.NewServer()
rpc.RegisterCacheServer(srv, &CacheService{})
l, err := net.Listen("tcp", "localhost:5051")
if err != nil {
return err
}
// block
return srv.Serve(l)
}
複製代碼
暫時先不實現 CacheService,先放個空的,稍後再實現。
type CacheService struct {
}
func (s *CacheService) Get(ctx context.Context, req *rpc.GetReq) (*rpc.GetResp, error) {
return nil, fmt.Errorf("unimplemented")
}
func (s *CacheService) Store(ctx context.Context, req *rpc.StoreReq) (*rpc.StoreResp, error) {
return nil, fmt.Errorf("unimplemented")
}
複製代碼
func clientMain() {
if err != runClient(); err != nil {
fmt.Fprintf(os.Stderr, "failed: %v\n", err)
os.Exit(1)
}
}
func runClient() error {
// 創建鏈接
conn, err := grpc.Dial("localhost:5053", grpc.WithInsecure())
if err != nil {
return fmt.Errorf("failed to dial server: %v", err)
}
cache := rpc.NewCacheClient(conn)
// 調用 grpc 的 store() 方法存儲鍵值對 { "gopher": "con" }
_, err = cache.Store(context.Background(), &rpc.StoreReq{Key: "gopher", Val: []byte("con")})
if err != nil {
return fmt.Errorf("failed to store: %v", err)
}
// 調用 grpc 的 get() 方法取回鍵爲 `gopher` 的值
resp, err := cache.Get(context.Background(), &rpc.GetReq{Key: "gopher"})
if err != nil {
return fmt.Errorf("failed to get: %v", err)
}
// 輸出
fmt.Printf("Got cached value %s\n", resp.Val)
return nil
}
複製代碼
或許有些人會認爲這和 WSDL 也太像了,這麼想沒有錯,由於 gRPC 在借鑑以前的 SOAP/WSDL 的錯誤基礎上,也吸收了他們優秀的地方。
server.go
type CacheService struct {
store map[string][]byte
}
func (s *CacheService) Get(ctx context.Context, req *rpc.GetReq) (*rpc.GetResp, error) {
val := s.store[req.Key]
return &rpc.GetResp{Val: val}, nil
}
func (s *CacheService) Store(ctx context.Context, req *rpc.StoreReq) (*rpc.StoreResp, error) {
s.store[req.Key] = req.Val
return &rpc.StoreResp{}, nil
}
複製代碼
注意這裏沒有鎖,你能夠想一想他們中有,由於未來他們會被併發的調用的。
固然,gRPC 支持錯誤處理。假設改寫上面的 Get(),對不存在的鍵進行報錯:
func (s *CacheService) Get(ctx context.Context, req *rpc.GetReq) (*rpc.GetResp, error) {
val, ok := s.store[req.Key]
if !ok {
return nil, status.Errorf(code.NotFound, "Key not found %s", req.Key)
}
return &rpc.GetResp{Val: val}, nil
}
複製代碼
若是這樣的代碼打算去部署的話,必定會被 SRE 攔截下來,由於全部通信必須加密傳輸。
在 gRPC 中添加 TLS 加密傳輸很容易。好比咱們修改 runServer() 添加 TLS 加密傳輸。
func runServer() error {
tlsCreds, err := credentials.NewServerTLSFromFile("tls.crt", "tls.key")
if err != nil {
return err
}
srv := grpc.NewServer(grpc.Creds(tlsCreds))
...
}
複製代碼
一樣,咱們也須要修改一下 runClient()。
func runClient() error {
tlsCreds := credentials.NewTLS(&tls.Config(InsecureSkipVerify: true))
conn, err := grpc.Dial("localhost:5051", grpc.WithTransportCredentials(tlsCreds))
...
}
複製代碼
如今有3個高性能的、事件驅動的實現
上線生產後,發現有一部分客戶產生了大量的鍵值,詢問得知,有的客戶但願對全部東西都緩存,這顯然不是對咱們這個緩存服務很好的事情。
咱們但願限制這種行爲,但對於當前系統而言,沒法知足這種需求,所以咱們須要修改實現,對每一個客戶發放客戶 token,那麼咱們就能夠約束特定客戶最多能夠創建多少鍵值,避免系統濫用。這就成爲了多租戶的緩存服務。
和以前同樣,咱們仍是從 IDL 開始,咱們須要修改接口,增長 account_token 項。
message StoreReq {
string key = 1;
bytes val = 2;
string account_token = 3;
}
複製代碼
一樣,咱們須要有獨立的服務針對帳戶服務,來獲取帳戶所容許的緩存鍵數:
service Accounts {
rpc GetByToken(GetByTokenReq) return (GetByTokenResp) {}
}
message GetByTokenReq {
string token = 1;
}
message GetByTokenResp {
Account account = 1;
}
message Account {
int64 max_cache_keys = 1;
}
複製代碼
這裏創建了一個新的 Accounts 服務,而且有一個 GetByToken() 方法,給入 token,返回一個 Account 類型的結果,而 Account 內有 max_cache_keys 鍵對應最大可緩存的鍵值數。
如今咱們進一步修改 client.go
func runClient() error {
...
cache := rpc.NewCacheClient(conn)
_, err = cache.Store(context.Background(), &rpc.StoreReq{
AccountToken: "inconshreveable",
Key: "gopher",
Val: []byte("con"),
})
if err != nil {
return fmt.Errorf("failed to store: %v", err)
}
...
}
複製代碼
服務端的改變要稍微大一些,但不過度。
type CacheService struct {
accounts rpc.AccountsClient
store map[string][]byte
keysByAccount map[string]int64
}
複製代碼
注意這裏的 accounts 是一個 grpc 的客戶端,由於咱們這個服務,同時也是另外一個 grpc 服務的客戶端。因此在接下來的 Store() 實現中,咱們須要先經過 accounts 調用另外一個服務取得帳戶信息。
func (s *CacheService) Store(ctx context.Context, req *rpc.StoreReq) (*rpc.StoreResp, error) {
// 調用另外一個服務取得帳戶信息,包含其鍵值限制
resp, err := s.accounts.GetByToken(context.Background(), &rpc.GetByTokenReq{
Token: req.AccountToken,
})
if err != nil {
return nil, err
}
// 檢查是否超量使用
if s.keysByAccount[req.AccountToken] >= resp.Account.MaxCacheKeys {
return nil, status.Errorf(codes.FailedPrecondition, "Account %s exceeds max key limit %d", req.AccountToken, resp.Account.MaxCacheKeys)
}
// 若是鍵不存在,須要新加鍵值,那麼咱們就對計數器加一
if _, ok := s.store[req.Key]; !ok {
s.keysByAccount[req.AccountToken] += 1
}
// 保存鍵值
s.store[req.Key] = req.Val
return &rpc.StoreResp{}, nil
}
複製代碼
上面的問題解決了,咱們服務又恢復了正常,不會有用戶創建過多的鍵值了。可是很快,咱們就又收到了其餘用戶發來的新的 issue,不少人反應說新系統變慢了,沒有達到 SLA 的要求。
但是咱們根本不知道到底發生了什麼,因而意識到了,咱們的程序沒有任何可觀察性(Observability),換句話說,咱們的程序沒有任何計量系統來統計性能相關的數據。
咱們先從最簡單的作起,添加日誌。
咱們先從 client.go 開始,增長一些測量和計數以及日誌輸出。
...
// 開始計時
start := time.Now()
_, err = cache.Store(context.Background(), &rpc.StoreReq{
AccountToken: "inconshreveable",
Key: "gopher",
Val: []byte("con"),
})
// 計算 cache.Store() 調用時間
log.Printf("cache.Store duration %s", time.Since(start))
if err != nil {
return fmt.Errorf("failed to store: %v", err)
}
// 再次開始計時
start = time.Now()
// 調用 grpc 的 get() 方法取回鍵爲 `gopher` 的值
resp, err := cache.Get(context.Background(), &rpc.GetReq{Key: "gopher"})
// 計算 cache.Get() 調用時間
log.Printf("cache.Get duration %s", time.Since(start))
if err != nil {
return fmt.Errorf("failed to get: %v", err)
}
複製代碼
一樣,在服務端也這麼處理。
func (s *CacheService) Store(ctx context.Context, req *rpc.StoreReq) (*rpc.StoreResp, error) {
// 開始計時
start := time.Now()
// 調用另外一個服務取得帳戶信息,包含其鍵值限制
resp, err := s.accounts.GetByToken(context.Background(), &rpc.GetByTokenReq{
Token: req.AccountToken,
})
// 輸出 account.GetByToken() 的調用時間
log.Printf("accounts.GetByToken duration %s", time.Since(start))
...
}
複製代碼
通過這些修改後,咱們發現同樣的事情在反反覆覆的作,那麼有什麼辦法能夠改變這種無聊的作法麼?查閱 grpc 文檔後,看到有一個叫作 Client Interceptor 的東西。
這至關因而一箇中間件,可是是在客戶端。當客戶端進行 rpc 調用的時候,這個中間件先會被調用,所以這個中間件能夠對調用進行一層包裝,而後再進行調用。
爲了實現這個功能,咱們建立一個新的文件,叫作 interceptor.go:
func WithClientInterceptor() grpc.DialOption {
return grpc.WithUnaryInterceptor(clientInterceptor)
}
func clientInterceptor(
ctx context.Context,
method string,
req interface{},
reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
start := time.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
log.Printf("invoke remote method=%s duration=%s error=%v", method, time.Since(start), err)
return err
}
複製代碼
咱們有了這個 WithClientInterceptor() 以後,能夠在 grpc.Dial() 的時候註冊進去。 client.go
func runClient() error {
...
conn, err := grpc.Dial("localhost:5051",
grpc.WithTransportCredentials(tlsCreds),
WithClientInterceptor())
...
}
複製代碼
註冊以後,全部的 grpc 調用都會通過咱們註冊的 clientInterceptor(),所以全部的時間就都有統計了,而不用每一個函數內部反反覆覆的添加時間、計量、輸出。
添加了客戶端的這個計量後,天然而然就聯想到服務端是否是也能夠作一樣的事情?通過查看文檔,能夠,有個叫作 Server Interceptor 的東西。
一樣的作法,咱們在服務端添加 interceptor.go,而且添加 ServerInterceptor() 函數。
func ServerInterceptor() grpc.ServerOption {
return grpc.UnaryInterceptor(serverInterceptor)
}
func serverInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
start := time.Now()
resp, err := handler(ctx, req)
log.Printf("invoke server method=%s duration=%s error=%v",
info.FullMethod,
time.Since(start),
err)
return resp, err
}
複製代碼
和客戶端同樣,須要在 runServer() 的時候註冊咱們定義的這個中間件。
func runServer() error {
...
srv := grpc.NewServer(grpc.Creds(tlsCreds), ServerInterceptor())
...
}
複製代碼
添加了日誌後,咱們終於在日誌中發現,/rpc.Accounts/GetByToken/ 花了好長的時間。咱們須要對這個操做設置超時。 server.go
func (s *CacheService) Store(ctx context.Context, req *rpc.StoreReq) (*rpc.StoreResp, error) {
accountsCtx, _ := context.WithTimeout(context.Background(), 2 * time.Second)
resp, err := s.accounts.GetByToken(accountsCtx, &rpc.GetByTokenReq{
Token: req.AccountToken,
})
...
}
複製代碼
這裏操做很簡單,直接使用標準庫中 context.WithTimeout() 就能夠了。
通過上面修改後,客戶依舊抱怨說沒有知足 SLA,仔細一想也對。就算這裏約束了 2 秒鐘,客戶端調用還須要時間,別的代碼在中間也有時間開銷。並且有的客戶說,咱們這裏須要1秒鐘,而不是2秒鐘。
好吧,讓咱們把這個時間設定推向調用方。
首先咱們要求在客戶端進行調用時間約束的設定: client.go
func runClient() error {
...
ctx, _ := context.WithTimeout(context.Background(), time.Second)
_, err = cache.Store(ctx, &rpc.StoreReq{Key: "gopher", Val: []byte("con")})
...
ctx, _ = context.WithTimeout(context.Background(), 50*time.Millisecond)
resp, err := cache.Get(ctx, &rpc.GetReq{Key: "gopher"})
...
}
複製代碼
而後在服務端,咱們將上下文傳遞。直接取調用方的 ctx。
func (s *CacheService) Store(ctx context.Context, req *rpc.StoreReq) (*rpc.StoreResp, error) {
resp, err := s.accounts.GetByToken(ctx, &rpc.GetByTokenReq{
Token: req.AccountToken,
})
...
}
複製代碼
上面的問題都解決了,終於能夠鬆一口氣了。但是客戶又提新的需求了……😅,說咱們能不能增長一個 Dry Run 的標誌,就是說我但願你作全部須要作的事情,除了真的修改鍵值庫。
GRPC metadata,也稱爲 GRPC 的 Header。就像 HTTP 頭同樣,能夠有一些 Metadata 信息傳遞過來。使用 metadata,可讓咱們的 Dry Run 的實現變得更簡潔,沒必要每一個 RPC 方法內都實現一遍檢查 Dry Run 標誌的邏輯,咱們能夠獨立出來。
func (s *CacheService) Store(ctx context.Context, req *rpc.StoreReq) (*rpc.StoreResp, error) {
resp, err := s.accounts.GetByToken(ctx, &rpc.GetByTokenReq{
Token: req.AccountToken,
})
if !dryRun(ctx) {
if _, ok := s.store[req.Key]; !ok {
s.keysByAccount[req.AccountToke] += 1
}
s.store[req.Key] = req.Val
}
return &rpc.StoreResp{}, nil
}
func dryRun(ctx context.Context) bool {
md, ok := metadata.FromContext(ctx)
if !ok {
return false
}
val, ok := md["dry-run"]
if !ok {
return false
}
if len(val) < 1 {
return false
}
return val[0] == "1"
}
複製代碼
固然,這麼作是有妥協的,由於通用化後就失去了類型檢查的能力。
在客戶端調用的時候,則須要根據狀況添加 dry-run 參數給 metadata。
func runClient() error {
...
ctx, _ := context.WithTimeout(context.Background(), time.Second)
ctx = metadata.NewContext(ctx, metadata.Pairs("dry-run", "1"))
_, err = cache.Store(ctx, &rpc.StoreReq{Key: "gopher", Val: []byte("con")})
...
}
複製代碼
實現了 Dry Run 覺得能夠休息了,以前抱怨慢的客戶又來抱怨了,雖然有超時控制,知足 SLA,可是服務那邊仍是慢,總超時不成功。檢查了一下,發現是網絡上的事情,咱們沒有太多能夠作的事情。爲了解決客戶的問題,咱們來添加一個重試的機制。
咱們能夠對每個 gRPC 調用添加一個 Retry 機制,咱們也能夠像以前計時統計那樣,使用 Interceptor 吧?
func clientInterceptor(...) error {
var (
start = time.Now()
attempts = 0
err error
backoff retryBackOff
)
for {
attempts += 1
select {
case <-ctx.Done():
err = status.Errorf(codes.DeadlineExceeded, "timeout reached before next retry attempt")
case <-backoff.Next():
startAttempt := time.Now()
err = invoker(ctx, method, req, reply, cc, opts...)
if err != nil {
log.Printf(...)
continue
}
}
break
}
log.Printf(...)
return err
}
複製代碼
看起來還不錯,而後就打算髮布這個代碼了。結果提交審覈的時候被打回來了,說這個代碼不合理,由於若是是非冪等(non-idempotent) 的操做,這樣就會致使屢次執行,改變指望結果了。
看來咱們得針對冪等和非冪等操做區別對待了。
silo.FireZeMissiles(NotIdempotent(ctx), req)
複製代碼
嗯,固然,沒這個東西。因此咱們須要本身來創造一個標記,經過 context,來標明操做是否冪等。
func NotIdempotent(ctx context.Context) context.Context {
return context.WithValue(ctx, "idempotent", false)
}
func isIdempotent(ctx context.Context) bool {
val, ok := ctx.Value("idempotent").(bool)
if !ok {
return true
}
return val
}
複製代碼
而後在咱們的 clientInterceptor() 實現中加入 isIdempotent() 判斷:
func clientInterceptor(...) error {
var (
start = time.Now()
attempts = 0
err error
backoff retryBackOff
)
for {
attempts += 1
select {
case <-ctx.Done():
err = status.Errorf(codes.DeadlineExceeded, "timeout reached before next retry attempt")
case <-backoff.Next():
startAttempt := time.Now()
err = invoker(ctx, method, req, reply, cc, opts...)
if err != nil && isIdempotent(ctx) {
log.Printf(...)
continue
}
}
break
}
log.Printf(...)
return err
}
複製代碼
這樣當調用失敗後,客戶端檢查發現是冪等的狀況,才重試,不然不重試。避免了非冪等操做的反覆操做。
感受沒啥問題了,因而部署上線了。但是運行一段時間後,發現有些不對勁。全部成功的RPC調用,也就是說這個操做自己是正確的,都沒有問題,超時重試也正常。可是全部失敗的 RPC 調用都不對了,全部失敗的 RPC 調用,都返回超時,而不是錯誤自己。這裏說的失敗,不是說網絡問題致使超時啥的,而是說請求自己的失敗,好比以前提到的,Get() 不存在的鍵,應該返回錯誤;或者 Store() 超過了配額,應該返回錯誤,這類錯誤在日誌中都沒看到,反而都對應了超時。
通過分析發現,服務端該報錯都報錯,沒啥問題,可是客戶端不對,本應該返回錯誤給調用方的地方,客戶端代碼反而又開始重試這個操做了。看來以前重試的代碼還有問題。
err = invoker(ctx, method, req, reply, cc, opts...)
if err != nil && isIdempotent(ctx) {
log.Printf(...)
continue
}
複製代碼
若是仔細觀察這部分代碼,會發現,不管 err 是什麼,只要非 nil,咱們就重試。其實這是不對的,咱們只有針對某些錯誤重試,好比網絡問題之類的,而不該該對咱們但願返回給調用方的錯誤重試,那沒有意義。
那麼問題就變成了,咱們到底應該怎麼對 err 判斷來決定是否重試?
因此,咱們須要的是一個完整的結構化的錯誤信息,而不是簡單的一個 Error Code 和字符串。固然這條路很差走,可是咱們已經作了這麼多了,堅持一下仍是能夠克服的。
這裏咱們仍是從 IDL 開始:
message Error {
int64 code = 1;
string messsage = 2;
bool temporary = 3;
int64 userErrorCode = 4;
}
複製代碼
而後咱們實現這個 Error 類型。 rpc/error.go
func (e *Error) Error() string {
return e.Message
}
func Errorf(code codes.Code, temporary bool, msg string, args ..interface{}) error {
return &Error{
Code: int64(code),
Message: fmt.Sprintf(msg, args...),
Temporary: temporary,
}
}
複製代碼
有這兩個函數,咱們能夠顯示和構造這個 Error 類型的變量了,可是咱們該怎麼把錯誤消息傳回客戶端呢?而後問題就開始變的繁瑣起來了: rpc/error.go
func MarshalError (err error, ctx context.Context) error {
rerr, ok := err.(*Error)
if !ok {
return err
}
pberr, marshalerr := pb.Marshal(rerr)
if marshalerr == nil {
md := metadata.Pairs("rpc-error", base64.StdEncoding.EncodeToString(pberr))
_ = grpc.SetTrailer(ctx, md)
}
return status.Errorf(codes.Code(rerr.Code), rerr.Message)
}
func UnmarshalError(err error, md metadata.MD) *Error {
vals, ok := md["rpc-error"]
if !ok {
return nil
}
buf, err := base64.StdEncoding.DecodeString(vals[0])
if err != nil {
return nil
}
var rerr Error
if err := pb.Unmarshal(buf, &rerr); err != nil {
return nil
}
return &rerr
}
複製代碼
interceptor.go
func serverInterceptor (
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
start := time.Now()
resp, err := handler(ctx, req)
err = rpc.MarshalError(err, ctx)
log.Print(...)
return resp, err
}
複製代碼
it’s ugly,but works.
這是在 gRPC 不支持高級 Error 的狀況下,怎麼去 work around 這個問題,而且湊合用起來。如今這麼作,錯誤就能夠跨主機邊界傳遞了。
又有客戶前來提需求了,有的客戶說咱們能夠存、也能夠取,可是如何才能把裏面全部的數據都獲取下來?因而有了需求,但願實現 Dump() 操做,能夠取回全部數據。
如今已經輕車熟路了,咱們先改 IDL,添加一個 Dump() 函數。
service Cache {
rpc Store(StoreReq) returns (StoreResp) {}
rpc Get(GetReq) returns (GetResp) {}
rpc Dump(DumpReq) returns (DumpResp) {}
}
message DumpReq{
}
message DumpResp {
repeated DumpItem items = 1;
}
message DumpItem {
string key = 1;
bytes val = 2;
}
複製代碼
這裏 DumpResp 裏面用的是 repeated,由於 protobuf 裏面不知道爲啥不叫 array。
新功能 Dump 上線了,結果發現你們都很喜歡 Dump,有不少人在 Dump,結果服務器的內存開始不夠了。因而咱們須要一些限制手段,能夠控制流量。
查閱了文檔後,發現咱們能夠控制同時最大有多少併發能夠訪問,以及能夠多頻繁的來訪問服務。 server.go
func runServer() error {
...
srv := grpc.NewServer(grpc.Creds(tlsCreds),
ServerInterceptor(),
grpc.MaxConcurrentStreams(64),
grpc.InTapHandle(NewTap().Handler))
rpc.RegisterCacheServer(srv, NewCacheService(accounts))
l, err := net.Listen("tcp", "localhost:5051")
if err != nil {
return err
}
l = netutil.LimitListener(l, 1024)
return srv.Serve(l)
}
複製代碼
這裏使用了 netutil.LimitListener(l, 1024) 控制了總共能夠有多少個鏈接,而後用 grpc.MaxConcurrentStreams(64) 指定了每一個 grpc 的鏈接能夠有多少個併發流(stream)。這兩個結合起來基本控制了併發的總數。
可是 gRPC 裏沒有地方限定能夠多頻繁的訪問。所以這裏用了 grpc.InTapHandle(NewTap().Handler)) 來進行定製,這是在更靠前的位置執行的。
tap.go
type Tap struct {
lim *rate.Limiter
}
func NewTap() *Tap {
return &Tap(rate.NewLimiter(150, 5))
}
func (t *Tap) Handler(ctx context.Context, info *tap.Info) (context.Context, error) {
if !t.lim.Allow() {
return nil, status.Errorf(codes.ResourceExhausted, "service is over rate limit")
}
return ctx, nil
}
複製代碼
以前的方案部署後,內存終於降下來了,可是還沒休息,就發現你們愈來愈喜歡用這個緩存服務,內存又不夠用了。這個時候咱們就開始思考,是否是能夠調整一下設計,不是每次 Dump 就當即在內存生成完整的返回數組,而是以流的形式,按需發回。 app.proto
syntax = "proto3";
package rpc;
service Cache {
rpc Store(StoreReq) returns (StoreResp) {}
rpc Get(GetReq) returns (GetResp) {}
rpc Dump(DumpReq) returns (stream DumpItem) {}
}
message DumpReq{
}
message DumpItem {
string key = 1;
bytes val = 2;
}
複製代碼
這裏再也不使用數組性質的 repeated,而是用 stream,客戶端請求 Dump() 後,將結果以流的形式發回去。 server.go
func (s *CacheService) Dump(req *rpc.DumpReq, stream rpc.Cache_DumpServer) error {
for k, v := range s.store {
stream.Send(&rpc.DumpItem{
Key: k,
Val: v,
})
}
return nil
}
複製代碼
咱們修改 Dump() 的實現,對於每一個記錄,利用 stream.Send() 發送到流。
注意這裏咱們沒有 context,只有個 stream。 client.go
func runClient() error {
...
stream, err := cache.Dump(context.Background(), &rpc.DumpReq{})
if err != nil {
return fmt.Errorf("failed to dump: %v", err)
}
for {
item, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("failed to stream item: %v", err)
}
}
return nil
}
複製代碼
使用流後,服務器性能提升了不少,可是,咱們的服務太吸引人了,用戶愈來愈多,結果又內存不夠了。這時候咱們審查代碼,感受能作的事情都作了,或許是時候從單一服務器,擴展爲多個服務器,而後之間使用負載均衡。
gRPC 是長鏈接性質的通信,所以若是一個客戶端鏈接了一個 gRPC Endpoint,那麼他就會一直鏈接到一個固定的服務器,所以多服務器的負載均衡對同一個客戶端來講是沒有意義的,不會由於這個客戶端有大量的請求而致使分散請求到不一樣的服務器上去。
若是咱們但願客戶端能夠利用多服務器的機制,咱們就須要更智能的客戶端,讓客戶端意識到服務器存在多個副本,所以客戶端創建多條鏈接到不一樣的服務器,這樣就可讓單一客戶端利用負載均衡的橫向擴展能力。
在複雜的環境中,咱們 gRPC 的客戶端(甚至服務端)多是不一樣語言平臺的。這實際上是 gRPC 的優點,能夠比較容易的實現跨語言平臺的通信。
好比咱們能夠作一個 Python 客戶端:
import grpc
import rpc_pb2 as rpc
channel = grpc.insecure_channel('localhost:5051')
cache_svc = rpc.CacheStub(channel)
resp = cache_svc.Get(rpc.GetReq(
key="gopher",
))
print resp.val
複製代碼
一個不是很爽的地方是雖然 gRPC 的跨語言通信很方便,可是各個語言的實現都比較隨意,好比 Go 中叫作 CacheClient(),而 Python 中則叫作 CacheStub()。這裏沒有什麼特別的緣由非不同的名字,就是因爲不一樣的做者實現的時候按照本身的想法命名的。
本文轉載自: blog.lab99.org/post/golang…
我的微信公衆號:
我的github:
我的博客: