近期比較火的開源項目go-zero是一個集成了各類工程實踐的包含了Web和RPC協議的功能完善的微服務框架,今天咱們就一塊兒來分析一下其中的RPC部分zRPC。node
zRPC底層依賴gRPC,內置了服務註冊、負載均衡、攔截器等模塊,其中還包括自適應降載,自適應熔斷,限流等微服務治理方案,是一個簡單易用的可直接用於生產的企業級RPC框架。nginx
zRPC支持直連和基於etcd服務發現兩種方式,咱們以基於etcd作服務發現爲例演示zRPC的基本使用:git
建立hello.yaml配置文件,配置以下:github
Name: hello.rpc // 服務名 ListenOn: 127.0.0.1:9090 // 服務監聽地址 Etcd: Hosts: - 127.0.0.1:2379 // etcd服務地址 Key: hello.rpc // 服務註冊key
建立hello.proto文件,並生成對應的go代碼golang
syntax = "proto3"; package pb; service Greeter { rpc SayHello (HelloRequest) returns (HelloReply) {} } message HelloRequest { string name = 1; } message HelloReply { string message = 1; }
生成go代碼算法
protoc --go_out=plugins=grpc:. hello.proto
package main import ( "context" "flag" "log" "example/zrpc/pb" "github.com/tal-tech/go-zero/core/conf" "github.com/tal-tech/go-zero/zrpc" "google.golang.org/grpc" ) type Config struct { zrpc.RpcServerConf } var cfgFile = flag.String("f", "./hello.yaml", "cfg file") func main() { flag.Parse() var cfg Config conf.MustLoad(*cfgFile, &cfg) srv, err := zrpc.NewServer(cfg.RpcServerConf, func(s *grpc.Server) { pb.RegisterGreeterServer(s, &Hello{}) }) if err != nil { log.Fatal(err) } srv.Start() } type Hello struct{} func (h *Hello) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { return &pb.HelloReply{Message: "hello " + in.Name}, nil }
package main import ( "context" "log" "example/zrpc/pb" "github.com/tal-tech/go-zero/core/discov" "github.com/tal-tech/go-zero/zrpc" ) func main() { client := zrpc.MustNewClient(zrpc.RpcClientConf{ Etcd: discov.EtcdConf{ Hosts: []string{"127.0.0.1:2379"}, Key: "hello.rpc", }, }) conn := client.Conn() hello := pb.NewGreeterClient(conn) reply, err := hello.SayHello(context.Background(), &pb.HelloRequest{Name: "go-zero"}) if err != nil { log.Fatal(err) } log.Println(reply.Message) }
啓動服務,查看服務是否註冊:設計模式
ETCDCTL_API=3 etcdctl get hello.rpc --prefix
顯示服務已經註冊:架構
hello.rpc/7587849401504590084 127.0.0.1:9090
運行客戶端便可看到輸出:app
hello go-zero
這個例子演示了zRPC的基本使用,能夠看到經過zRPC構建RPC服務很是簡單,只須要不多的幾行代碼,接下來咱們繼續進行探索負載均衡
下圖展現zRPC的架構圖和主要組成部分
zRPC主要有如下幾個模塊組成:
這裏介紹了zRPC的主要組成模塊和每一個模塊的主要功能,其中resolver和balancer模塊實現了gRPC開放的接口,實現了自定義的resolver和balancer,攔截器模塊是整個zRPC的功能重點,自適應降載、自適應熔斷、prometheus服務指標收集等功能都在這裏實現
gRPC提供了攔截器功能,主要是對請求先後進行額外處理的攔截操做,其中攔截器包含客戶端攔截器和服務端攔截器,又分爲一元(Unary)攔截器和流(Stream)攔截器,這裏咱們主要講解一元攔截器,流攔截器同理。
客戶端攔截器定義以下:
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
其中method爲方法名,req,reply分別爲請求和響應參數,cc爲客戶端鏈接對象,invoker參數是真正執行rpc方法的handler其實在攔截器中被調用執行
服務端攔截器定義以下:
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
其中req爲請求參數,info中包含了請求方法屬性,handler爲對server端方法的包裝,也是在攔截器中被調用執行
zRPC中內置了豐富的攔截器,其中包括自適應降載、自適應熔斷、權限驗證、prometheus指標收集等等,因爲攔截器較多,篇幅有限無法全部的攔截器給你們一一解析,這裏咱們主要分析兩個,自適應熔斷和prometheus服務監控指標收集:
當客戶端向服務端發起請求,客戶端會記錄服務端返回的錯誤,當錯誤達到必定的比例,客戶端會自行的進行熔斷處理,丟棄掉必定比例的請求以保護下游依賴,且能夠自動恢復。zRPC中自適應熔斷遵循《Google SRE》中過載保護策略,算法以下:
requests: 總請求數量
accepts: 正常請求數量
K: 倍值 (Google SRE推薦值爲2)
能夠經過修改K的值來修改熔斷髮生的激進程度,下降K的值會使得自適應熔斷算法更加激進,增長K的值則自適應熔斷算法變得再也不那麼激進
熔斷攔截器定義以下:
func BreakerInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { // target + 方法名 breakerName := path.Join(cc.Target(), method) return breaker.DoWithAcceptable(breakerName, func() error { // 真正執行調用 return invoker(ctx, method, req, reply, cc, opts...) }, codes.Acceptable) }
accept方法實現了Google SRE過載保護算法,判斷否進行熔斷
func (b *googleBreaker) accept() error { // accepts爲正常請求數,total爲總請求數 accepts, total := b.history() weightedAccepts := b.k * float64(accepts) // 算法實現 dropRatio := math.Max(0, (float64(total-protection)-weightedAccepts)/float64(total+1)) if dropRatio <= 0 { return nil } // 是否超過比例 if b.proba.TrueOnProba(dropRatio) { return ErrServiceUnavailable } return nil }
doReq方法首先判斷是否熔斷,知足條件直接返回error(circuit breaker is open),不知足條件則對請求數進行累加
func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error { if err := b.accept(); err != nil { if fallback != nil { return fallback(err) } else { return err } } defer func() { if e := recover(); e != nil { b.markFailure() panic(e) } }() // 此處執行RPC請求 err := req() // 正常請求total和accepts都會加1 if acceptable(err) { b.markSuccess() } else { // 請求失敗只有total會加1 b.markFailure() } return err }
服務監控是瞭解服務當前運行狀態以及變化趨勢的重要手段,監控依賴於服務指標的收集,經過prometheus進行監控指標的收集是業界主流方案,zRPC中也採用了prometheus來進行指標的收集
prometheus攔截器定義以下:
這個攔截器主要是對服務的監控指標進行收集,這裏主要是對RPC方法的耗時和調用錯誤進行收集,這裏主要使用了Prometheus的Histogram和Counter數據類型
func UnaryPrometheusInterceptor() grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) ( interface{}, error) { // 執行前記錄一個時間 startTime := timex.Now() resp, err := handler(ctx, req) // 執行後經過Since算出執行該調用的耗時 metricServerReqDur.Observe(int64(timex.Since(startTime)/time.Millisecond), info.FullMethod) // 方法對應的錯誤碼 metricServerReqCodeTotal.Inc(info.FullMethod, strconv.Itoa(int(status.Code(err)))) return resp, err } }
除了內置了豐富的攔截器以外,zRPC同時支持添加自定義攔截器
Client端經過AddInterceptor方法添加一元攔截器:
func (rc *RpcClient) AddInterceptor(interceptor grpc.UnaryClientInterceptor) { rc.client.AddInterceptor(interceptor) }
Server端經過AddUnaryInterceptors方法添加一元攔截器:
func (rs *RpcServer) AddUnaryInterceptors(interceptors ...grpc.UnaryServerInterceptor) { rs.server.AddUnaryInterceptors(interceptors...) }
zRPC服務註冊架構圖:
zRPC中自定義了resolver模塊,用來實現服務的註冊功能。zRPC底層依賴gRPC,在gRPC中要想自定義resolver須要實現resolver.Builder接口:
type Builder interface { Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error) Scheme() string }
其中Build方法返回Resolver,Resolver定義以下:
type Resolver interface { ResolveNow(ResolveNowOptions) Close() }
在zRPC中定義了兩種resolver,direct和discov,這裏咱們主要分析基於etcd作服務發現的discov,自定義的resolver須要經過gRPC提供了Register方法進行註冊代碼以下:
func RegisterResolver() { resolver.Register(&dirBuilder) resolver.Register(&disBuilder) }
當咱們啓動咱們的zRPC Server的時候,調用Start方法,會像etcd中註冊對應的服務地址:
func (ags keepAliveServer) Start(fn RegisterFn) error { // 註冊服務地址 if err := ags.registerEtcd(); err != nil { return err } // 啓動服務 return ags.Server.Start(fn) }
當咱們啓動zRPC客戶端的時候,在gRPC內部會調用咱們自定義resolver的Build方法,zRPC經過在Build方法內調用執行了resolver.ClientConn的UpdateState方法,該方法會把服務地址註冊到gRPC客戶端內部:
func (d *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) ( resolver.Resolver, error) { hosts := strings.FieldsFunc(target.Authority, func(r rune) bool { return r == EndpointSepChar }) // 服務發現 sub, err := discov.NewSubscriber(hosts, target.Endpoint) if err != nil { return nil, err } update := func() { var addrs []resolver.Address for _, val := range subset(sub.Values(), subsetSize) { addrs = append(addrs, resolver.Address{ Addr: val, }) } // 向gRPC註冊服務地址 cc.UpdateState(resolver.State{ Addresses: addrs, }) } // 監聽 sub.AddListener(update) update() // 返回自定義的resolver.Resolver return &nopResolver{cc: cc}, nil }
在discov中,經過調用load方法從etcd中獲取指定服務的全部地址:
func (c *cluster) load(cli EtcdClient, key string) { var resp *clientv3.GetResponse for { var err error ctx, cancel := context.WithTimeout(c.context(cli), RequestTimeout) // 從etcd中獲取指定服務的全部地址 resp, err = cli.Get(ctx, makeKeyPrefix(key), clientv3.WithPrefix()) cancel() if err == nil { break } logx.Error(err) time.Sleep(coolDownInterval) } var kvs []KV c.lock.Lock() for _, ev := range resp.Kvs { kvs = append(kvs, KV{ Key: string(ev.Key), Val: string(ev.Value), }) } c.lock.Unlock() c.handleChanges(key, kvs) }
並經過watch監聽服務地址的變化:
func (c *cluster) watch(cli EtcdClient, key string) { rch := cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key), clientv3.WithPrefix()) for { select { case wresp, ok := <-rch: if !ok { logx.Error("etcd monitor chan has been closed") return } if wresp.Canceled { logx.Error("etcd monitor chan has been canceled") return } if wresp.Err() != nil { logx.Error(fmt.Sprintf("etcd monitor chan error: %v", wresp.Err())) return } // 監聽變化通知更新 c.handleWatchEvents(key, wresp.Events) case <-c.done: return } } }
這部分主要介紹了zRPC中是如何自定義的resolver,以及基於etcd的服務發現原理,經過這部分的介紹你們能夠了解到zRPC內部服務註冊發現的原理,源代碼比較多隻是粗略的從整個流程上進行了分析,若是你們對zRPC的源碼比較感興趣能夠自行進行學習
負載均衡原理圖:
避免過載是負載均衡策略的一個重要指標,好的負載均衡算法能很好的平衡服務端資源。經常使用的負載均衡算法有輪訓、隨機、Hash、加權輪訓等。但爲了應對各類複雜的場景,簡單的負載均衡算法每每表現的不夠好,好比輪訓算法當服務響應時間變長就很容易致使負載再也不平衡, 所以zRPC中自定義了默認負載均衡算法P2C(Power of Two Choices),和resolver相似,要想自定義balancer也須要實現gRPC定義的balancer.Builder接口,因爲和resolver相似這裏再也不帶你們一塊兒分析如何自定義balancer,感興趣的朋友能夠查看gRPC相關的文檔來進行學習
注意,zRPC是在客戶端進行負載均衡,常見的還有經過nginx中間代理的方式
zRPC框架中默認的負載均衡算法爲P2C,該算法的主要思想是:
僞代碼以下:
![]()主要算法邏輯在Pick方法中實現:
func (p *p2cPicker) Pick(ctx context.Context, info balancer.PickInfo) ( conn balancer.SubConn, done func(balancer.DoneInfo), err error) { p.lock.Lock() defer p.lock.Unlock() var chosen *subConn switch len(p.conns) { case 0: return nil, nil, balancer.ErrNoSubConnAvailable case 1: chosen = p.choose(p.conns[0], nil) case 2: chosen = p.choose(p.conns[0], p.conns[1]) default: var node1, node2 *subConn for i := 0; i < pickTimes; i++ { // 隨機數 a := p.r.Intn(len(p.conns)) b := p.r.Intn(len(p.conns) - 1) if b >= a { b++ } // 隨機獲取全部節點中的兩個節點 node1 = p.conns[a] node2 = p.conns[b] // 效驗節點是否健康 if node1.healthy() && node2.healthy() { break } } // 選擇其中一個節點 chosen = p.choose(node1, node2) } atomic.AddInt64(&chosen.inflight, 1) atomic.AddInt64(&chosen.requests, 1) return chosen.conn, p.buildDoneFunc(chosen), nil }
choose方法對隨機選擇出來的節點進行負載比較從而最終肯定選擇哪一個節點
func (p *p2cPicker) choose(c1, c2 *subConn) *subConn { start := int64(timex.Now()) if c2 == nil { atomic.StoreInt64(&c1.pick, start) return c1 } if c1.load() > c2.load() { c1, c2 = c2, c1 } pick := atomic.LoadInt64(&c2.pick) if start-pick > forcePick && atomic.CompareAndSwapInt64(&c2.pick, pick, start) { return c2 } else { atomic.StoreInt64(&c1.pick, start) return c1 } }
上面主要介紹了zRPC默認負載均衡算法的設計思想和代碼實現,那自定義的balancer是如何註冊到gRPC的呢,resolver提供了Register方法來進行註冊,一樣balancer也提供了Register方法來進行註冊:
func init() { balancer.Register(newBuilder()) } func newBuilder() balancer.Builder { return base.NewBalancerBuilder(Name, new(p2cPickerBuilder)) }
註冊balancer以後gRPC怎麼知道使用哪一個balancer呢?這裏咱們須要使用配置項進行配置,在NewClient的時候經過grpc.WithBalancerName方法進行配置:
func NewClient(target string, opts ...ClientOption) (*client, error) { var cli client opts = append(opts, WithDialOption(grpc.WithBalancerName(p2c.Name))) if err := cli.dial(target, opts...); err != nil { return nil, err } return &cli, nil }
這部分主要介紹了zRPC中內中的負載均衡算法的實現原理以及具體的實現方式,以後介紹了zRPC是如何註冊自定義的balancer以及如何選擇自定義的balancer,經過這部分你們應該對負載均衡有了更進一步的認識
首先,介紹了zRPC的基本使用方法,能夠看到zRPC使用很是簡單,只須要少數幾行代碼就能夠構建高性能和自帶服務治理能力的RPC服務,固然這裏沒有面面俱到的介紹zRPC的基本使用,你們能夠查看相關文檔進行學習
接着,介紹了zRPC的幾個重要組成模塊以及其實現原理,並分析了部分源碼。攔截器模塊是整個zRPC的重點,其中內置了豐富的功能,像熔斷、監控、降載等等也是構建高可用微服務必不可少的。resolver和balancer模塊自定義了gRPC的resolver和balancer,經過該部分能夠了解到整個服務註冊與發現的原理以及如何構建本身的服務發現系統,同時自定義負載均衡算法也變得再也不神祕
最後,zRPC是一個經歷過各類工程實踐的RPC框架,不管是想要用於生產仍是學習其中的設計模式都是一個不可多得的開源項目。但願經過這篇文章的介紹你們可以進一步瞭解zRPC
https://github.com/tal-tech/go-zero
好將來技術