etcd學習(3)-grpc使用etcd作服務發現

grpc經過etcd實現服務發現

前言

項目中使用etcd實現了grpc的服務戶註冊和服務發現,這裏來看下如何實現的服務註冊和服務發現node

先來看下使用的demo,demo中的代碼discoverygit

服務註冊

package discovery

import (
	"context"
	"encoding/json"
	"errors"
	"net/http"
	"strconv"
	"strings"
	"time"

	clientv3 "go.etcd.io/etcd/client/v3"
	"go.uber.org/zap"
)

// Register for grpc server
type Register struct {
	EtcdAddrs   []string
	DialTimeout int

	closeCh     chan struct{}
	leasesID    clientv3.LeaseID
	keepAliveCh <-chan *clientv3.LeaseKeepAliveResponse

	srvInfo Server
	srvTTL  int64
	cli     *clientv3.Client
	logger  *zap.Logger
}

// NewRegister create a register base on etcd
func NewRegister(etcdAddrs []string, logger *zap.Logger) *Register {
	return &Register{
		EtcdAddrs:   etcdAddrs,
		DialTimeout: 3,
		logger:      logger,
	}
}

// Register a service
func (r *Register) Register(srvInfo Server, ttl int64) (chan<- struct{}, error) {
	var err error

	if strings.Split(srvInfo.Addr, ":")[0] == "" {
		return nil, errors.New("invalid ip")
	}

	if r.cli, err = clientv3.New(clientv3.Config{
		Endpoints:   r.EtcdAddrs,
		DialTimeout: time.Duration(r.DialTimeout) * time.Second,
	}); err != nil {
		return nil, err
	}

	r.srvInfo = srvInfo
	r.srvTTL = ttl

	if err = r.register(); err != nil {
		return nil, err
	}

	r.closeCh = make(chan struct{})

	go r.keepAlive()

	return r.closeCh, nil
}

// Stop stop register
func (r *Register) Stop() {
	r.closeCh <- struct{}{}
}

// register 註冊節點
func (r *Register) register() error {
	leaseCtx, cancel := context.WithTimeout(context.Background(), time.Duration(r.DialTimeout)*time.Second)
	defer cancel()

	leaseResp, err := r.cli.Grant(leaseCtx, r.srvTTL)
	if err != nil {
		return err
	}
	r.leasesID = leaseResp.ID
	if r.keepAliveCh, err = r.cli.KeepAlive(context.Background(), leaseResp.ID); err != nil {
		return err
	}

	data, err := json.Marshal(r.srvInfo)
	if err != nil {
		return err
	}
	_, err = r.cli.Put(context.Background(), BuildRegPath(r.srvInfo), string(data), clientv3.WithLease(r.leasesID))
	return err
}

// unregister 刪除節點
func (r *Register) unregister() error {
	_, err := r.cli.Delete(context.Background(), BuildRegPath(r.srvInfo))
	return err
}

// keepAlive
func (r *Register) keepAlive() {
	ticker := time.NewTicker(time.Duration(r.srvTTL) * time.Second)
	for {
		select {
		case <-r.closeCh:
			if err := r.unregister(); err != nil {
				r.logger.Error("unregister failed", zap.Error(err))
			}
			if _, err := r.cli.Revoke(context.Background(), r.leasesID); err != nil {
				r.logger.Error("revoke failed", zap.Error(err))
			}
			return
		case res := <-r.keepAliveCh:
			if res == nil {
				if err := r.register(); err != nil {
					r.logger.Error("register failed", zap.Error(err))
				}
			}
		case <-ticker.C:
			if r.keepAliveCh == nil {
				if err := r.register(); err != nil {
					r.logger.Error("register failed", zap.Error(err))
				}
			}
		}
	}
}

// UpdateHandler return http handler
func (r *Register) UpdateHandler() http.HandlerFunc {
	return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
		wi := req.URL.Query().Get("weight")
		weight, err := strconv.Atoi(wi)
		if err != nil {
			w.WriteHeader(http.StatusBadRequest)
			w.Write([]byte(err.Error()))
			return
		}

		var update = func() error {
			r.srvInfo.Weight = int64(weight)
			data, err := json.Marshal(r.srvInfo)
			if err != nil {
				return err
			}
			_, err = r.cli.Put(context.Background(), BuildRegPath(r.srvInfo), string(data), clientv3.WithLease(r.leasesID))
			return err
		}

		if err := update(); err != nil {
			w.WriteHeader(http.StatusInternalServerError)
			w.Write([]byte(err.Error()))
			return
		}
		w.Write([]byte("update server weight success"))
	})
}

func (r *Register) GetServerInfo() (Server, error) {
	resp, err := r.cli.Get(context.Background(), BuildRegPath(r.srvInfo))
	if err != nil {
		return r.srvInfo, err
	}
	info := Server{}
	if resp.Count >= 1 {
		if err := json.Unmarshal(resp.Kvs[0].Value, &info); err != nil {
			return info, err
		}
	}
	return info, nil
}

來分析下上面的代碼實現github

當啓動一個grpc的時候咱們註冊到etcd中golang

etcdRegister := discovery.NewRegister(config.Etcd.Addrs, log.Logger)
	node := discovery.Server{
		Name: app,
		Addr: utils.InternalIP() + config.Port.GRPC,
	}

	if _, err := etcdRegister.Register(node, 10); err != nil {
		panic(fmt.Sprintf("server register failed: %v", err))
	}

調用服務註冊的時候首先分配了一個租約json

func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) {
	r := &pb.LeaseGrantRequest{TTL: ttl}
	resp, err := l.remote.LeaseGrant(ctx, r, l.callOpts...)
	if err == nil {
		gresp := &LeaseGrantResponse{
			ResponseHeader: resp.GetHeader(),
			ID:             LeaseID(resp.ID),
			TTL:            resp.TTL,
			Error:          resp.Error,
		}
		return gresp, nil
	}
	return nil, toErr(ctx, err)
}

而後經過KeepAlive保活api

// KeepAlive嘗試保持給定的租約永久alive
func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
	ch := make(chan *LeaseKeepAliveResponse, LeaseResponseChSize)

	l.mu.Lock()
	// ensure that recvKeepAliveLoop is still running
	select {
	case <-l.donec:
		err := l.loopErr
		l.mu.Unlock()
		close(ch)
		return ch, ErrKeepAliveHalted{Reason: err}
	default:
	}
	ka, ok := l.keepAlives[id]
	if !ok {
		// create fresh keep alive
		ka = &keepAlive{
			chs:           []chan<- *LeaseKeepAliveResponse{ch},
			ctxs:          []context.Context{ctx},
			deadline:      time.Now().Add(l.firstKeepAliveTimeout),
			nextKeepAlive: time.Now(),
			donec:         make(chan struct{}),
		}
		l.keepAlives[id] = ka
	} else {
		// add channel and context to existing keep alive
		ka.ctxs = append(ka.ctxs, ctx)
		ka.chs = append(ka.chs, ch)
	}
	l.mu.Unlock()

	go l.keepAliveCtxCloser(ctx, id, ka.donec)
	// 使用once只在第一次調用
	l.firstKeepAliveOnce.Do(func() {
		// 500毫秒一次,不斷的發送保持活動請求
		go l.recvKeepAliveLoop()
		// 刪除等待過久沒反饋的租約
		go l.deadlineLoop()
	})

	return ch, nil
}

// deadlineLoop獲取在租約TTL中沒有收到響應的任何保持活動的通道
func (l *lessor) deadlineLoop() {
	for {
		select {
		case <-time.After(time.Second):
			// donec 關閉,當 recvKeepAliveLoop 中止時設置 loopErr
		case <-l.donec:
			return
		}
		now := time.Now()
		l.mu.Lock()
		for id, ka := range l.keepAlives {
			if ka.deadline.Before(now) {
				// 等待響應過久;租約可能已過時
				ka.close()
				delete(l.keepAlives, id)
			}
		}
		l.mu.Unlock()
	}
}

func (l *lessor) recvKeepAliveLoop() (gerr error) {
	defer func() {
		l.mu.Lock()
		close(l.donec)
		l.loopErr = gerr
		for _, ka := range l.keepAlives {
			ka.close()
		}
		l.keepAlives = make(map[LeaseID]*keepAlive)
		l.mu.Unlock()
	}()

	for {
		// resetRecv 打開一個新的lease stream並開始發送保持活動請求。
		stream, err := l.resetRecv()
		if err != nil {
			if canceledByCaller(l.stopCtx, err) {
				return err
			}
		} else {
			for {
				// 接收lease stream的返回返回
				resp, err := stream.Recv()
				if err != nil {
					if canceledByCaller(l.stopCtx, err) {
						return err
					}

					if toErr(l.stopCtx, err) == rpctypes.ErrNoLeader {
						l.closeRequireLeader()
					}
					break
				}
				// 根據LeaseKeepAliveResponse更新租約
				// 若是租約過時刪除全部alive channels
				l.recvKeepAlive(resp)
			}
		}

		select {
		case <-time.After(retryConnWait):
			continue
		case <-l.stopCtx.Done():
			return l.stopCtx.Err()
		}
	}
}

// resetRecv 打開一個新的lease stream並開始發送保持活動請求。
func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
	sctx, cancel := context.WithCancel(l.stopCtx)
	// 創建服務端和客戶端鏈接的lease stream
	stream, err := l.remote.LeaseKeepAlive(sctx, l.callOpts...)
	if err != nil {
		cancel()
		return nil, err
	}

	l.mu.Lock()
	defer l.mu.Unlock()
	if l.stream != nil && l.streamCancel != nil {
		l.streamCancel()
	}

	l.streamCancel = cancel
	l.stream = stream

	go l.sendKeepAliveLoop(stream)
	return stream, nil
}

// sendKeepAliveLoop 在給定流的生命週期內發送保持活動請求
func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
	for {
		var tosend []LeaseID

		now := time.Now()
		l.mu.Lock()
		for id, ka := range l.keepAlives {
			if ka.nextKeepAlive.Before(now) {
				tosend = append(tosend, id)
			}
		}
		l.mu.Unlock()

		for _, id := range tosend {
			r := &pb.LeaseKeepAliveRequest{ID: int64(id)}
			if err := stream.Send(r); err != nil {
				// TODO do something with this error?
				return
			}
		}

		select {
		// 每500毫秒執行一次
		case <-time.After(500 * time.Millisecond):
		case <-stream.Context().Done():
			return
		case <-l.donec:
			return
		case <-l.stopCtx.Done():
			return
		}
	}
}

// 撤銷給定的租約,全部附加到租約的key將過時並被刪除  
func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) {
	r := &pb.LeaseRevokeRequest{ID: int64(id)}
	resp, err := l.remote.LeaseRevoke(ctx, r, l.callOpts...)
	if err == nil {
		return (*LeaseRevokeResponse)(resp), nil
	}
	return nil, toErr(ctx, err)
}

總結:緩存

一、每次註冊一個服務的分配一個租約;服務器

二、KeepAlive經過從客戶端到服務器端的流化的keep alive請求和從服務器端到客戶端的流化的keep alive應答來維持租約;mvc

三、KeepAlive會500毫秒進行一次lease stream的發送;app

四、而後接收到KeepAlive發送信息回執,處理更新租約,服務處於活動狀態;

五、若是在租約TTL中沒有收到響應的任何保持活動的請求,刪除租約;

六、Revoke撤銷一個租約,全部附加到租約的key將過時並被刪除。

服務發現

咱們只需實現grpc在resolver中提供了Builder和Resolver接口,就能完成gRPC客戶端的服務發現和負載均衡

// 建立一個resolver用於監視名稱解析更新
type Builder interface {
	Build(target Target, cc ClientConn, opts BuildOption) (Resolver, error)
	Scheme() string
}
  • Build方法:爲給定目標建立一個新的resolver,當調用grpc.Dial()時執行;

  • Scheme方法:返回此resolver支持的方案,可參考Scheme定義

// 監視指定目標的更新,包括地址更新和服務配置更新
type Resolver interface {
	ResolveNow(ResolveNowOption)
	Close()
}
  • ResolveNow方法:被 gRPC 調用,以嘗試再次解析目標名稱。只用於提示,可忽略該方法;

  • Close方法:關閉resolver。

接下來看下具體的實現

package discovery

import (
	"context"
	"time"

	"go.uber.org/zap"

	"go.etcd.io/etcd/api/v3/mvccpb"
	clientv3 "go.etcd.io/etcd/client/v3"
	"google.golang.org/grpc/resolver"
)

const (
	schema = "etcd"
)

// Resolver for grpc client
type Resolver struct {
	schema      string
	EtcdAddrs   []string
	DialTimeout int

	closeCh      chan struct{}
	watchCh      clientv3.WatchChan
	cli          *clientv3.Client
	keyPrifix    string
	srvAddrsList []resolver.Address

	cc     resolver.ClientConn
	logger *zap.Logger
}

// NewResolver create a new resolver.Builder base on etcd
func NewResolver(etcdAddrs []string, logger *zap.Logger) *Resolver {
	return &Resolver{
		schema:      schema,
		EtcdAddrs:   etcdAddrs,
		DialTimeout: 3,
		logger:      logger,
	}
}

// Scheme returns the scheme supported by this resolver.
func (r *Resolver) Scheme() string {
	return r.schema
}

// Build creates a new resolver.Resolver for the given target
func (r *Resolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
	r.cc = cc

	r.keyPrifix = BuildPrefix(Server{Name: target.Endpoint, Version: target.Authority})
	if _, err := r.start(); err != nil {
		return nil, err
	}
	return r, nil
}

// ResolveNow resolver.Resolver interface
func (r *Resolver) ResolveNow(o resolver.ResolveNowOptions) {}

// Close resolver.Resolver interface
func (r *Resolver) Close() {
	r.closeCh <- struct{}{}
}

// start
func (r *Resolver) start() (chan<- struct{}, error) {
	var err error
	r.cli, err = clientv3.New(clientv3.Config{
		Endpoints:   r.EtcdAddrs,
		DialTimeout: time.Duration(r.DialTimeout) * time.Second,
	})
	if err != nil {
		return nil, err
	}
	resolver.Register(r)

	r.closeCh = make(chan struct{})

	if err = r.sync(); err != nil {
		return nil, err
	}

	go r.watch()

	return r.closeCh, nil
}

// watch update events
func (r *Resolver) watch() {
	ticker := time.NewTicker(time.Minute)
	r.watchCh = r.cli.Watch(context.Background(), r.keyPrifix, clientv3.WithPrefix())

	for {
		select {
		case <-r.closeCh:
			return
		case res, ok := <-r.watchCh:
			if ok {
				r.update(res.Events)
			}
		case <-ticker.C:
			if err := r.sync(); err != nil {
				r.logger.Error("sync failed", zap.Error(err))
			}
		}
	}
}

// update
func (r *Resolver) update(events []*clientv3.Event) {
	for _, ev := range events {
		var info Server
		var err error

		switch ev.Type {
		case mvccpb.PUT:
			info, err = ParseValue(ev.Kv.Value)
			if err != nil {
				continue
			}
			addr := resolver.Address{Addr: info.Addr, Metadata: info.Weight}
			if !Exist(r.srvAddrsList, addr) {
				r.srvAddrsList = append(r.srvAddrsList, addr)
				r.cc.UpdateState(resolver.State{Addresses: r.srvAddrsList})
			}
		case mvccpb.DELETE:
			info, err = SplitPath(string(ev.Kv.Key))
			if err != nil {
				continue
			}
			addr := resolver.Address{Addr: info.Addr}
			if s, ok := Remove(r.srvAddrsList, addr); ok {
				r.srvAddrsList = s
				r.cc.UpdateState(resolver.State{Addresses: r.srvAddrsList})
			}
		}
	}
}

// sync 同步獲取全部地址信息
func (r *Resolver) sync() error {
	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
	defer cancel()
	res, err := r.cli.Get(ctx, r.keyPrifix, clientv3.WithPrefix())
	if err != nil {
		return err
	}
	r.srvAddrsList = []resolver.Address{}

	for _, v := range res.Kvs {
		info, err := ParseValue(v.Value)
		if err != nil {
			continue
		}
		addr := resolver.Address{Addr: info.Addr, Metadata: info.Weight}
		r.srvAddrsList = append(r.srvAddrsList, addr)
	}
	r.cc.UpdateState(resolver.State{Addresses: r.srvAddrsList})
	return nil
}

總結:

一、watch會監聽前綴的信息變動,有變動的通知,及時更新srvAddrsList的地址信息;

二、sync會定時的同步etcd中的可用的服務地址到srvAddrsList中;

三、使用UpdateState更新ClientConn的Addresses;

四、而後grpc客戶端就能根據配置的具體策略發送請求到grpc的server中。

這裏使用gRPC內置的負載均衡策略round_robin,根據負載均衡地址,以輪詢的方式進行調用服務,來測試下服務的發現和簡單的服務負載

package discovery

import (
	"context"
	"fmt"
	"log"
	"net"
	"testing"
	"time"

	"go.uber.org/zap"
	"google.golang.org/grpc/balancer/roundrobin"
	"google.golang.org/grpc/resolver"

	"etcd-learning/discovery/helloworld"

	"google.golang.org/grpc"
)

var etcdAddrs = []string{"127.0.0.1:2379"}

func TestResolver(t *testing.T) {
	r := NewResolver(etcdAddrs, zap.NewNop())
	resolver.Register(r)

	// etcd中註冊5個服務
	go newServer(t, ":1001", "1.0.0", 1)
	go newServer(t, ":1002", "1.0.0", 1)
	go newServer(t, ":1003", "1.0.0", 1)
	go newServer(t, ":1004", "1.0.0", 1)
	go newServer(t, ":1006", "1.0.0", 10)

	conn, err := grpc.Dial("etcd:///hello", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
	if err != nil {
		t.Fatalf("failed to dial %v", err)
	}
	defer conn.Close()

	c := helloworld.NewGreeterClient(conn)

	// 進行十次數據請求
	for i := 0; i < 10; i++ {
		resp, err := c.SayHello(context.Background(), &helloworld.HelloRequest{Name: "abc"})
		if err != nil {
			t.Fatalf("say hello failed %v", err)
		}
		log.Println(resp.Message)
		time.Sleep(100 * time.Millisecond)
	}

	time.Sleep(10 * time.Second)
}

type server struct {
	Port string
}

// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *helloworld.HelloRequest) (*helloworld.HelloReply, error) {
	return &helloworld.HelloReply{Message: fmt.Sprintf("Hello From %s", s.Port)}, nil
}

func newServer(t *testing.T, port string, version string, weight int64) {
	register := NewRegister(etcdAddrs, zap.NewNop())
	defer register.Stop()

	listen, err := net.Listen("tcp", port)
	if err != nil {
		log.Fatalf("failed to listen %v", err)
	}

	s := grpc.NewServer()
	helloworld.RegisterGreeterServer(s, &server{Port: port})

	info := Server{
		Name:    "hello",
		Addr:    fmt.Sprintf("127.0.0.1%s", port),
		Version: version,
		Weight:  weight,
	}

	register.Register(info, 10)

	if err := s.Serve(listen); err != nil {
		log.Fatalf("failed to server %v", err)
	}
}

這裏註冊了5個服務,端口號是1001到1006,循環調用10次

=== RUN   TestResolver
2021/07/24 22:44:52 Hello From :1001
2021/07/24 22:44:52 Hello From :1006
2021/07/24 22:44:53 Hello From :1001
2021/07/24 22:44:53 Hello From :1002
2021/07/24 22:44:53 Hello From :1003
2021/07/24 22:44:53 Hello From :1004
2021/07/24 22:44:53 Hello From :1006
2021/07/24 22:44:53 Hello From :1001
2021/07/24 22:44:53 Hello From :1002
2021/07/24 22:44:53 Hello From :1003

發現每次的請求會發送到不一樣的服務中

負載均衡

集中式LB(Proxy Model)

grpc

在服務消費者和服務提供者之間有一個獨立的LB,一般是專門的硬件設備如 F5,或者基於軟件如LVSHAproxy等實現。LB上有全部服務的地址映射表,一般由運維配置註冊,當服務消費方調用某個目標服務時,它向LB發起請求,由LB以某種策略,好比輪詢(Round-Robin)作負載均衡後將請求轉發到目標服務。LB通常具有健康檢查能力,能自動摘除不健康的服務實例。

該方案主要問題:

一、單點問題,全部服務調用流量都通過LB,當服務數量和調用量大的時候,LB容易成爲瓶頸,且一旦LB發生故障影響整個系統;

二、服務消費方、提供方之間增長了一級,有必定性能開銷。

進程內LB(Balancing-aware Client)

grpc

針對第一個方案的不足,此方案將LB的功能集成到服務消費方進程裏,也被稱爲軟負載或者客戶端負載方案。服務提供方啓動時,首先將服務地址註冊到服務註冊表,同時按期報心跳到服務註冊表以代表服務的存活狀態,至關於健康檢查,服務消費方要訪問某個服務時,它經過內置的LB組件向服務註冊表查詢,同時緩存並按期刷新目標服務地址列表,而後以某種負載均衡策略選擇一個目標服務地址,最後向目標服務發起請求。LB和服務發現能力被分散到每個服務消費者的進程內部,同時服務消費方和服務提供方之間是直接調用,沒有額外開銷,性能比較好。

該方案主要問題:

一、開發成本,該方案將服務調用方集成到客戶端的進程裏頭,若是有多種不一樣的語言棧,就要配合開發多種不一樣的客戶端,有必定的研發和維護成本;

二、另外生產環境中,後續若是要對客戶庫進行升級,勢必要求服務調用方修改代碼並從新發布,升級較複雜。

獨立 LB 進程(External Load Balancing Service)

grpc

該方案是針對第二種方案的不足而提出的一種折中方案,原理和第二種方案基本相似。

不一樣之處是將LB和服務發現功能從進程內移出來,變成主機上的一個獨立進程。主機上的一個或者多個服務要訪問目標服務時,他們都經過同一主機上的獨立LB進程作服務發現和負載均衡。該方案也是一種分佈式方案沒有單點問題,一個LB進程掛了隻影響該主機上的服務調用方,服務調用方和LB之間是進程內調用性能好,同時該方案還簡化了服務調用方,不須要爲不一樣語言開發客戶庫,LB的升級不須要服務調用方改代碼。

該方案主要問題:部署較複雜,環節多,出錯調試排查問題不方便。

上面經過etcd實現服務發現,使用的及時第二種 進程內LB(Balancing-aware Client)。

參考

【Load Balancing in gRPC】https://github.com/grpc/grpc/blob/master/doc/load-balancing.md
【文中的代碼示例】https://github.com/boilingfrog/etcd-learning/tree/main/discovery

相關文章
相關標籤/搜索