下面介紹 jupiter-0.2.7 版本中 grpc 經過 etcd 實現服務發現與註冊。node
服務註冊的流程圖:golang
etcd的服務註冊代碼模塊在 jupiter/pkg/registry/etcdv3
中。mvc
下面讓咱們來看看實際的代碼app
// Registry register/unregister service // registry impl should control rpc timeout type Registry interface { RegisterService(context.Context, *server.ServiceInfo) error UnregisterService(context.Context, *server.ServiceInfo) error ListServices(context.Context, string, string) ([]*server.ServiceInfo, error) WatchServices(context.Context, string, string) (chan Endpoints, error) io.Closer }
在 pkg/registry/registry.go
中定義了註冊服務對象的接口。不一樣的服務只要實現了這些接口,jupiter 就能使用。負載均衡
首先咱們來看看註冊方法框架
// RegisterService register service to registry func (reg *etcdv3Registry) RegisterService(ctx context.Context, info *server.ServiceInfo) error { err := reg.registerBiz(ctx, info) ... } // 業務信息註冊 func (reg *etcdv3Registry) registerBiz(ctx context.Context, info *server.ServiceInfo) error { ... // 提交信息到 etcd _, err := reg.client.Put(readCtx, key, val, opOptions...) ... }
這裏主要的部分是 reg.client.Put()
將服務信息提交到 etcd 中。其中的租約機制我會在以後單獨寫一篇文章介紹。這裏主要仍是關注如何註冊。
源碼中還有個 registerMetric()
方法,這個方法的目的是將服務信息在提交到etcd的 prometheus 前綴目錄下,用於服務監控,用的也是 client.Put() 方法。這裏具體就不展現代碼了,感興趣的同窗能夠去源碼庫中查看。ide
服務退出ui
// 刪除服務 func (reg *etcdv3Registry) unregister(ctx context.Context, key string) error { ... // 刪除服務信息 _, err := reg.client.Delete(ctx, key) ... }
這裏經過 client.Delete()
方法將服務信息從 etcd 中刪除掉。google
獲取服務列表spa
// ListServices list service registered in registry with name `name` func (reg *etcdv3Registry) ListServices(ctx context.Context, name string, scheme string) (services []*server.ServiceInfo, err error) { // 服務信息key的前綴 target := fmt.Sprintf("/%s/%s/providers/%s://", reg.Prefix, name, scheme) // 獲取相關前綴的全部信息 getResp, getErr := reg.client.Get(ctx, target, clientv3.WithPrefix()) ... }
經過 client.Get()
方法獲取到相同前綴的服務信息。
服務信息變更監控
// WatchServices watch service change event, then return address list func (reg *etcdv3Registry) WatchServices(ctx context.Context, name string, scheme string) (chan registry.Endpoints, error) { prefix := fmt.Sprintf("/%s/%s/", reg.Prefix, name) // 經過etcd客戶端建立一個監控通道 watch, err := reg.client.WatchPrefix(context.Background(), prefix) if err != nil { return nil, err } ... xgo.Go(func() { // 不斷接收etcd發送過來的變更事件 for event := range watch.C() { switch event.Type { case mvccpb.PUT: updateAddrList(al, prefix, scheme, event.Kv) case mvccpb.DELETE: deleteAddrList(al, prefix, scheme, event.Kv) } out := al.DeepCopy() fmt.Printf("al => %p\n", al.Nodes) fmt.Printf("snapshot => %p\n", out.Nodes) select { // 將更新後的服務信息發送出去,接收方是 resolver case addresses <- *out: default: xlog.Warnf("invalid") } } }) // 返回一個地址通道,用於傳遞 return addresses, nil }
WatchServices()
方法主要是監控信息的變更事件,並將變更後的服務信息從新返回給 resolver。具體思路是經過 etcdClient.Watch()
方法建立一個監控通道,而後放入一個 goroutine來不斷接收 etcd 推送過來的事件,維護本地的服務信息,並經過 resolver 最終返回到 grpclb 負載均衡器進行服務地址信息的更新。
服務發現流程圖:
grpc 的 resolver 模塊定義了兩個接口
// Builder creates a resolver that will be used to watch name resolution updates. type Builder interface { Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error) Scheme() string } // Resolver watches for the updates on the specified target. // Updates include address updates and service config updates. type Resolver interface { ResolveNow(ResolveNowOptions) Close() }
首先咱們來看看 Builder 接口的具體實現
type baseBuilder struct { name string reg registry.Registry } // Build ... func (b *baseBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { endpoints, err := b.reg.WatchServices(context.Background(), target.Endpoint, "grpc") if err != nil { return nil, err } var stop = make(chan struct{}) xgo.Go(func() { for { select { case endpoint := <-endpoints: var state = resolver.State{ Addresses: make([]resolver.Address, 0), ... } for _, node := range endpoint.Nodes { ... state.Addresses = append(state.Addresses, address) } cc.UpdateState(state) case <-stop: return } } }) return &baseResolver{ stop: stop, }, nil }
這裏Build 方法主要是經過 Registry 模塊得到監控服務通道,而後將更新的服務信息再更新到 grpcClient 中去,保證 grpcClient 的負載均衡器的服務地址永遠都是最新的。
如何將Builder的具體實現註冊到 grpc 中
import "google.golang.org/grpc/resolver" // Register ... func Register(name string, reg registry.Registry) { resolver.Register(&baseBuilder{ name: name, reg: reg, }) }
將 Registry模塊注入到 Builder 對象中,而後注入到 grpc 的 resolver 模塊中去。這樣 grpcClient 在實際運行中就會調用 etcd 的服務發現功能了。
這裏在介紹一下jupiter框架在實際項目中如何使用服務發現與註冊。
func (app *Application) startServers() error { var eg errgroup.Group // start multi servers for _, s := range app.servers { s := s eg.Go(func() (err error) { _ = app.registerer.RegisterService(context.TODO(), s.Info()) defer app.registerer.UnregisterService(context.TODO(), s.Info()) ... }) } return eg.Wait() } eng := engine.NewEngine() eng.SetRegistry(compound_registry.New( etcdv3_registry.StdConfig("default").Build(), ))
在框架的 Application 模塊中已經實現了服務的自動註冊與刪除。通常使用框架時不須要再調用。項目使用中只須要在建立 Application 對象時,將註冊中心信息注入便可。
// 服務發現須要初始化,拿到etcd中服務的信息 func (eng *Engine) initResolver() error { resolver.Register("etcd", etcdv3.StdConfig("default").Build()) return nil }
服務發現也是類型的將註冊中心信息注入便可。**