項目中使用etcd實現了grpc的服務戶註冊和服務發現,這裏來看下如何實現的服務註冊和服務發現node
先來看下使用的demo,demo中的代碼discoverygit
package discovery import ( "context" "encoding/json" "errors" "net/http" "strconv" "strings" "time" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) // Register for grpc server type Register struct { EtcdAddrs []string DialTimeout int closeCh chan struct{} leasesID clientv3.LeaseID keepAliveCh <-chan *clientv3.LeaseKeepAliveResponse srvInfo Server srvTTL int64 cli *clientv3.Client logger *zap.Logger } // NewRegister create a register base on etcd func NewRegister(etcdAddrs []string, logger *zap.Logger) *Register { return &Register{ EtcdAddrs: etcdAddrs, DialTimeout: 3, logger: logger, } } // Register a service func (r *Register) Register(srvInfo Server, ttl int64) (chan<- struct{}, error) { var err error if strings.Split(srvInfo.Addr, ":")[0] == "" { return nil, errors.New("invalid ip") } if r.cli, err = clientv3.New(clientv3.Config{ Endpoints: r.EtcdAddrs, DialTimeout: time.Duration(r.DialTimeout) * time.Second, }); err != nil { return nil, err } r.srvInfo = srvInfo r.srvTTL = ttl if err = r.register(); err != nil { return nil, err } r.closeCh = make(chan struct{}) go r.keepAlive() return r.closeCh, nil } // Stop stop register func (r *Register) Stop() { r.closeCh <- struct{}{} } // register 註冊節點 func (r *Register) register() error { leaseCtx, cancel := context.WithTimeout(context.Background(), time.Duration(r.DialTimeout)*time.Second) defer cancel() leaseResp, err := r.cli.Grant(leaseCtx, r.srvTTL) if err != nil { return err } r.leasesID = leaseResp.ID if r.keepAliveCh, err = r.cli.KeepAlive(context.Background(), leaseResp.ID); err != nil { return err } data, err := json.Marshal(r.srvInfo) if err != nil { return err } _, err = r.cli.Put(context.Background(), BuildRegPath(r.srvInfo), string(data), clientv3.WithLease(r.leasesID)) return err } // unregister 刪除節點 func (r *Register) unregister() error { _, err := r.cli.Delete(context.Background(), BuildRegPath(r.srvInfo)) return err } // keepAlive func (r *Register) keepAlive() { ticker := time.NewTicker(time.Duration(r.srvTTL) * time.Second) for { select { case <-r.closeCh: if err := r.unregister(); err != nil { r.logger.Error("unregister failed", zap.Error(err)) } if _, err := r.cli.Revoke(context.Background(), r.leasesID); err != nil { r.logger.Error("revoke failed", zap.Error(err)) } return case res := <-r.keepAliveCh: if res == nil { if err := r.register(); err != nil { r.logger.Error("register failed", zap.Error(err)) } } case <-ticker.C: if r.keepAliveCh == nil { if err := r.register(); err != nil { r.logger.Error("register failed", zap.Error(err)) } } } } } // UpdateHandler return http handler func (r *Register) UpdateHandler() http.HandlerFunc { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { wi := req.URL.Query().Get("weight") weight, err := strconv.Atoi(wi) if err != nil { w.WriteHeader(http.StatusBadRequest) w.Write([]byte(err.Error())) return } var update = func() error { r.srvInfo.Weight = int64(weight) data, err := json.Marshal(r.srvInfo) if err != nil { return err } _, err = r.cli.Put(context.Background(), BuildRegPath(r.srvInfo), string(data), clientv3.WithLease(r.leasesID)) return err } if err := update(); err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) return } w.Write([]byte("update server weight success")) }) } func (r *Register) GetServerInfo() (Server, error) { resp, err := r.cli.Get(context.Background(), BuildRegPath(r.srvInfo)) if err != nil { return r.srvInfo, err } info := Server{} if resp.Count >= 1 { if err := json.Unmarshal(resp.Kvs[0].Value, &info); err != nil { return info, err } } return info, nil }
來分析下上面的代碼實現github
當啓動一個grpc的時候咱們註冊到etcd中golang
etcdRegister := discovery.NewRegister(config.Etcd.Addrs, log.Logger) node := discovery.Server{ Name: app, Addr: utils.InternalIP() + config.Port.GRPC, } if _, err := etcdRegister.Register(node, 10); err != nil { panic(fmt.Sprintf("server register failed: %v", err)) }
調用服務註冊的時候首先分配了一個租約json
func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) { r := &pb.LeaseGrantRequest{TTL: ttl} resp, err := l.remote.LeaseGrant(ctx, r, l.callOpts...) if err == nil { gresp := &LeaseGrantResponse{ ResponseHeader: resp.GetHeader(), ID: LeaseID(resp.ID), TTL: resp.TTL, Error: resp.Error, } return gresp, nil } return nil, toErr(ctx, err) }
而後經過KeepAlive保活api
// KeepAlive嘗試保持給定的租約永久alive func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) { ch := make(chan *LeaseKeepAliveResponse, LeaseResponseChSize) l.mu.Lock() // ensure that recvKeepAliveLoop is still running select { case <-l.donec: err := l.loopErr l.mu.Unlock() close(ch) return ch, ErrKeepAliveHalted{Reason: err} default: } ka, ok := l.keepAlives[id] if !ok { // create fresh keep alive ka = &keepAlive{ chs: []chan<- *LeaseKeepAliveResponse{ch}, ctxs: []context.Context{ctx}, deadline: time.Now().Add(l.firstKeepAliveTimeout), nextKeepAlive: time.Now(), donec: make(chan struct{}), } l.keepAlives[id] = ka } else { // add channel and context to existing keep alive ka.ctxs = append(ka.ctxs, ctx) ka.chs = append(ka.chs, ch) } l.mu.Unlock() go l.keepAliveCtxCloser(ctx, id, ka.donec) // 使用once只在第一次調用 l.firstKeepAliveOnce.Do(func() { // 500毫秒一次,不斷的發送保持活動請求 go l.recvKeepAliveLoop() // 刪除等待過久沒反饋的租約 go l.deadlineLoop() }) return ch, nil } // deadlineLoop獲取在租約TTL中沒有收到響應的任何保持活動的通道 func (l *lessor) deadlineLoop() { for { select { case <-time.After(time.Second): // donec 關閉,當 recvKeepAliveLoop 中止時設置 loopErr case <-l.donec: return } now := time.Now() l.mu.Lock() for id, ka := range l.keepAlives { if ka.deadline.Before(now) { // 等待響應過久;租約可能已過時 ka.close() delete(l.keepAlives, id) } } l.mu.Unlock() } } func (l *lessor) recvKeepAliveLoop() (gerr error) { defer func() { l.mu.Lock() close(l.donec) l.loopErr = gerr for _, ka := range l.keepAlives { ka.close() } l.keepAlives = make(map[LeaseID]*keepAlive) l.mu.Unlock() }() for { // resetRecv 打開一個新的lease stream並開始發送保持活動請求。 stream, err := l.resetRecv() if err != nil { if canceledByCaller(l.stopCtx, err) { return err } } else { for { // 接收lease stream的返回返回 resp, err := stream.Recv() if err != nil { if canceledByCaller(l.stopCtx, err) { return err } if toErr(l.stopCtx, err) == rpctypes.ErrNoLeader { l.closeRequireLeader() } break } // 根據LeaseKeepAliveResponse更新租約 // 若是租約過時刪除全部alive channels l.recvKeepAlive(resp) } } select { case <-time.After(retryConnWait): continue case <-l.stopCtx.Done(): return l.stopCtx.Err() } } } // resetRecv 打開一個新的lease stream並開始發送保持活動請求。 func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) { sctx, cancel := context.WithCancel(l.stopCtx) // 創建服務端和客戶端鏈接的lease stream stream, err := l.remote.LeaseKeepAlive(sctx, l.callOpts...) if err != nil { cancel() return nil, err } l.mu.Lock() defer l.mu.Unlock() if l.stream != nil && l.streamCancel != nil { l.streamCancel() } l.streamCancel = cancel l.stream = stream go l.sendKeepAliveLoop(stream) return stream, nil } // sendKeepAliveLoop 在給定流的生命週期內發送保持活動請求 func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) { for { var tosend []LeaseID now := time.Now() l.mu.Lock() for id, ka := range l.keepAlives { if ka.nextKeepAlive.Before(now) { tosend = append(tosend, id) } } l.mu.Unlock() for _, id := range tosend { r := &pb.LeaseKeepAliveRequest{ID: int64(id)} if err := stream.Send(r); err != nil { // TODO do something with this error? return } } select { // 每500毫秒執行一次 case <-time.After(500 * time.Millisecond): case <-stream.Context().Done(): return case <-l.donec: return case <-l.stopCtx.Done(): return } } } // 撤銷給定的租約,全部附加到租約的key將過時並被刪除 func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) { r := &pb.LeaseRevokeRequest{ID: int64(id)} resp, err := l.remote.LeaseRevoke(ctx, r, l.callOpts...) if err == nil { return (*LeaseRevokeResponse)(resp), nil } return nil, toErr(ctx, err) }
總結:緩存
一、每次註冊一個服務的分配一個租約;服務器
二、KeepAlive經過從客戶端到服務器端的流化的keep alive
請求和從服務器端到客戶端的流化的keep alive
應答來維持租約;mvc
三、KeepAlive會500毫秒進行一次lease stream的發送;app
四、而後接收到KeepAlive發送信息回執,處理更新租約,服務處於活動狀態;
五、若是在租約TTL中沒有收到響應的任何保持活動的請求,刪除租約;
六、Revoke撤銷一個租約,全部附加到租約的key將過時並被刪除。
咱們只需實現grpc在resolver中提供了Builder和Resolver接口,就能完成gRPC客戶端的服務發現和負載均衡
// 建立一個resolver用於監視名稱解析更新 type Builder interface { Build(target Target, cc ClientConn, opts BuildOption) (Resolver, error) Scheme() string }
Build方法:爲給定目標建立一個新的resolver,當調用grpc.Dial()時執行;
Scheme方法:返回此resolver支持的方案,可參考Scheme定義
// 監視指定目標的更新,包括地址更新和服務配置更新 type Resolver interface { ResolveNow(ResolveNowOption) Close() }
ResolveNow方法:被 gRPC 調用,以嘗試再次解析目標名稱。只用於提示,可忽略該方法;
Close方法:關閉resolver。
接下來看下具體的實現
package discovery import ( "context" "time" "go.uber.org/zap" "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc/resolver" ) const ( schema = "etcd" ) // Resolver for grpc client type Resolver struct { schema string EtcdAddrs []string DialTimeout int closeCh chan struct{} watchCh clientv3.WatchChan cli *clientv3.Client keyPrifix string srvAddrsList []resolver.Address cc resolver.ClientConn logger *zap.Logger } // NewResolver create a new resolver.Builder base on etcd func NewResolver(etcdAddrs []string, logger *zap.Logger) *Resolver { return &Resolver{ schema: schema, EtcdAddrs: etcdAddrs, DialTimeout: 3, logger: logger, } } // Scheme returns the scheme supported by this resolver. func (r *Resolver) Scheme() string { return r.schema } // Build creates a new resolver.Resolver for the given target func (r *Resolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { r.cc = cc r.keyPrifix = BuildPrefix(Server{Name: target.Endpoint, Version: target.Authority}) if _, err := r.start(); err != nil { return nil, err } return r, nil } // ResolveNow resolver.Resolver interface func (r *Resolver) ResolveNow(o resolver.ResolveNowOptions) {} // Close resolver.Resolver interface func (r *Resolver) Close() { r.closeCh <- struct{}{} } // start func (r *Resolver) start() (chan<- struct{}, error) { var err error r.cli, err = clientv3.New(clientv3.Config{ Endpoints: r.EtcdAddrs, DialTimeout: time.Duration(r.DialTimeout) * time.Second, }) if err != nil { return nil, err } resolver.Register(r) r.closeCh = make(chan struct{}) if err = r.sync(); err != nil { return nil, err } go r.watch() return r.closeCh, nil } // watch update events func (r *Resolver) watch() { ticker := time.NewTicker(time.Minute) r.watchCh = r.cli.Watch(context.Background(), r.keyPrifix, clientv3.WithPrefix()) for { select { case <-r.closeCh: return case res, ok := <-r.watchCh: if ok { r.update(res.Events) } case <-ticker.C: if err := r.sync(); err != nil { r.logger.Error("sync failed", zap.Error(err)) } } } } // update func (r *Resolver) update(events []*clientv3.Event) { for _, ev := range events { var info Server var err error switch ev.Type { case mvccpb.PUT: info, err = ParseValue(ev.Kv.Value) if err != nil { continue } addr := resolver.Address{Addr: info.Addr, Metadata: info.Weight} if !Exist(r.srvAddrsList, addr) { r.srvAddrsList = append(r.srvAddrsList, addr) r.cc.UpdateState(resolver.State{Addresses: r.srvAddrsList}) } case mvccpb.DELETE: info, err = SplitPath(string(ev.Kv.Key)) if err != nil { continue } addr := resolver.Address{Addr: info.Addr} if s, ok := Remove(r.srvAddrsList, addr); ok { r.srvAddrsList = s r.cc.UpdateState(resolver.State{Addresses: r.srvAddrsList}) } } } } // sync 同步獲取全部地址信息 func (r *Resolver) sync() error { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() res, err := r.cli.Get(ctx, r.keyPrifix, clientv3.WithPrefix()) if err != nil { return err } r.srvAddrsList = []resolver.Address{} for _, v := range res.Kvs { info, err := ParseValue(v.Value) if err != nil { continue } addr := resolver.Address{Addr: info.Addr, Metadata: info.Weight} r.srvAddrsList = append(r.srvAddrsList, addr) } r.cc.UpdateState(resolver.State{Addresses: r.srvAddrsList}) return nil }
總結:
一、watch會監聽前綴的信息變動,有變動的通知,及時更新srvAddrsList的地址信息;
二、sync會定時的同步etcd中的可用的服務地址到srvAddrsList中;
三、使用UpdateState更新ClientConn的Addresses;
四、而後grpc客戶端就能根據配置的具體策略發送請求到grpc的server中。
這裏使用gRPC內置的負載均衡策略round_robin
,根據負載均衡地址,以輪詢的方式進行調用服務,來測試下服務的發現和簡單的服務負載
package discovery import ( "context" "fmt" "log" "net" "testing" "time" "go.uber.org/zap" "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/resolver" "etcd-learning/discovery/helloworld" "google.golang.org/grpc" ) var etcdAddrs = []string{"127.0.0.1:2379"} func TestResolver(t *testing.T) { r := NewResolver(etcdAddrs, zap.NewNop()) resolver.Register(r) // etcd中註冊5個服務 go newServer(t, ":1001", "1.0.0", 1) go newServer(t, ":1002", "1.0.0", 1) go newServer(t, ":1003", "1.0.0", 1) go newServer(t, ":1004", "1.0.0", 1) go newServer(t, ":1006", "1.0.0", 10) conn, err := grpc.Dial("etcd:///hello", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) if err != nil { t.Fatalf("failed to dial %v", err) } defer conn.Close() c := helloworld.NewGreeterClient(conn) // 進行十次數據請求 for i := 0; i < 10; i++ { resp, err := c.SayHello(context.Background(), &helloworld.HelloRequest{Name: "abc"}) if err != nil { t.Fatalf("say hello failed %v", err) } log.Println(resp.Message) time.Sleep(100 * time.Millisecond) } time.Sleep(10 * time.Second) } type server struct { Port string } // SayHello implements helloworld.GreeterServer func (s *server) SayHello(ctx context.Context, in *helloworld.HelloRequest) (*helloworld.HelloReply, error) { return &helloworld.HelloReply{Message: fmt.Sprintf("Hello From %s", s.Port)}, nil } func newServer(t *testing.T, port string, version string, weight int64) { register := NewRegister(etcdAddrs, zap.NewNop()) defer register.Stop() listen, err := net.Listen("tcp", port) if err != nil { log.Fatalf("failed to listen %v", err) } s := grpc.NewServer() helloworld.RegisterGreeterServer(s, &server{Port: port}) info := Server{ Name: "hello", Addr: fmt.Sprintf("127.0.0.1%s", port), Version: version, Weight: weight, } register.Register(info, 10) if err := s.Serve(listen); err != nil { log.Fatalf("failed to server %v", err) } }
這裏註冊了5個服務,端口號是1001到1006,循環調用10次
=== RUN TestResolver 2021/07/24 22:44:52 Hello From :1001 2021/07/24 22:44:52 Hello From :1006 2021/07/24 22:44:53 Hello From :1001 2021/07/24 22:44:53 Hello From :1002 2021/07/24 22:44:53 Hello From :1003 2021/07/24 22:44:53 Hello From :1004 2021/07/24 22:44:53 Hello From :1006 2021/07/24 22:44:53 Hello From :1001 2021/07/24 22:44:53 Hello From :1002 2021/07/24 22:44:53 Hello From :1003
發現每次的請求會發送到不一樣的服務中
在服務消費者和服務提供者之間有一個獨立的LB,一般是專門的硬件設備如 F5,或者基於軟件如LVS
,HAproxy
等實現。LB上有全部服務的地址映射表,一般由運維配置註冊,當服務消費方調用某個目標服務時,它向LB發起請求,由LB以某種策略,好比輪詢(Round-Robin)
作負載均衡後將請求轉發到目標服務。LB通常具有健康檢查能力,能自動摘除不健康的服務實例。
該方案主要問題:
一、單點問題,全部服務調用流量都通過LB,當服務數量和調用量大的時候,LB容易成爲瓶頸,且一旦LB發生故障影響整個系統;
二、服務消費方、提供方之間增長了一級,有必定性能開銷。
針對第一個方案的不足,此方案將LB的功能集成到服務消費方進程裏,也被稱爲軟負載或者客戶端負載方案。服務提供方啓動時,首先將服務地址註冊到服務註冊表,同時按期報心跳到服務註冊表以代表服務的存活狀態,至關於健康檢查,服務消費方要訪問某個服務時,它經過內置的LB組件向服務註冊表查詢,同時緩存並按期刷新目標服務地址列表,而後以某種負載均衡策略選擇一個目標服務地址,最後向目標服務發起請求。LB和服務發現能力被分散到每個服務消費者的進程內部,同時服務消費方和服務提供方之間是直接調用,沒有額外開銷,性能比較好。
該方案主要問題:
一、開發成本,該方案將服務調用方集成到客戶端的進程裏頭,若是有多種不一樣的語言棧,就要配合開發多種不一樣的客戶端,有必定的研發和維護成本;
二、另外生產環境中,後續若是要對客戶庫進行升級,勢必要求服務調用方修改代碼並從新發布,升級較複雜。
該方案是針對第二種方案的不足而提出的一種折中方案,原理和第二種方案基本相似。
不一樣之處是將LB和服務發現功能從進程內移出來,變成主機上的一個獨立進程。主機上的一個或者多個服務要訪問目標服務時,他們都經過同一主機上的獨立LB進程作服務發現和負載均衡。該方案也是一種分佈式方案沒有單點問題,一個LB進程掛了隻影響該主機上的服務調用方,服務調用方和LB之間是進程內調用性能好,同時該方案還簡化了服務調用方,不須要爲不一樣語言開發客戶庫,LB的升級不須要服務調用方改代碼。
該方案主要問題:部署較複雜,環節多,出錯調試排查問題不方便。
上面經過etcd實現服務發現,使用的及時第二種 進程內LB(Balancing-aware Client)。
【Load Balancing in gRPC】https://github.com/grpc/grpc/blob/master/doc/load-balancing.md
【文中的代碼示例】https://github.com/boilingfrog/etcd-learning/tree/main/discovery