目錄node
除了上次的warden直連方式外,kratos有另外一個服務發現sdk : discoverygit
discovery 能夠先簡單理解爲一個http服務、github
它最簡單的發現過程多是這樣的:golang
一、service 向discovery 服務註冊 appid
二、client 經過 appid 從discovery 查詢 service 的addrredis
固然 遠不止這麼簡單,還包含了不少功能在裏面的,例如服務自發現、負載均衡等算法
本節僅先看個最簡單的服務發現的demosql
首先走一遍discovery的http的apijson
// innerRouter init local router api path. func innerRouter(e *bm.Engine) { group := e.Group("/discovery") { group.POST("/register", register) group.POST("/renew", renew) group.POST("/cancel", cancel) group.GET("/fetch/all", initProtect, fetchAll) group.GET("/fetch", initProtect, fetch) group.GET("/fetchs", initProtect, fetchs) group.GET("/poll", initProtect, poll) group.GET("/polls", initProtect, polls) //manager group.POST("/set", set) group.GET("/nodes", initProtect, nodes) } }
discovery裏面的bm引擎註冊了這些接口, 接着我用postman 測了測。api
POST http://HOST/discovery/renew
app
curl 'http://127.0.0.1:7171/discovery/renew' -d "zone=sh1&env=test&appid=provider&hostname=myhostname"
*****成功***** { "code":0, "message":"" } ****失敗**** { "code":-400, "message":"-400" }
POST http://HOST/discovery/cancel
curl 'http://127.0.0.1:7171/discovery/cancel' -d "zone=sh1&env=test&appid=provider&hostname=myhostname"
*****成功***** { "code":0, "message":"" } ****失敗**** { "code":-400, "message":"-400" }
官方應用發現實現邏輯
選擇可用的節點,將應用appid加入poll的appid列表
若是polls請求返回err,則切換node節點,切換邏輯與自發現錯誤時切換邏輯一致
若是polls返回-304 ,說明appid無變動,從新發起poll監聽變動
polls接口返回appid的instances列表,完成服務發現,根據須要選擇不一樣的負載均衡算法進行節點的調度
直接new一個demo服務而後將demo服務註冊到discovery
主函數裏面服務註冊部分添加相似下面註冊代碼。
ip := "127.0.0.1" port := "9000" hn, _ := os.Hostname() dis := discovery.New(nil) ins := &naming.Instance{ Zone: env.Zone, Env: env.DeployEnv, AppID: "demo.service", Hostname: hn, Addrs: []string{ "grpc://" + ip + ":" + port, }, } cancel, err := dis.Register(context.Background(), ins) if err != nil { panic(err) } defer cancel()
panic 找不到節點,這個是咱們discovery的節點地址 能夠在環境變量裏面添加。
I:\VSProject\kratos-note\kratos-note\warden\discovery\server>kratos run INFO 01/04-19:32:28.198 I:/VSProject/go/pkg/mod/github.com/bilibili/kratos@v0.3.2-0.20191224125553-6e1180f53a8e/pkg/net/rpc/warden/server.go:329 warden: start grpc listen addr: [::]:9000 panic: invalid discovery config nodes:[] region:region01 zone:zone01 deployEnv:dev host:DESKTOP-NUEKD5O
配置discovery節點後成功註冊
I:\VSProject\kratos-note\kratos-note\warden\discovery\server>set DISCOVERY_NODES=127.0.0.1:7171 I:\VSProject\kratos-note\kratos-note\warden\discovery\server>kratos run INFO 01/04-19:40:25.426 I:/VSProject/kratos-note/kratos-note/warden/discovery/server/cmd/main.go:23 abc start 2020/01/04 19:40:25 start watch filepath: I:\VSProject\kratos-note\kratos-note\warden\discovery\server\configs INFO 01/04-19:40:25.497 I:/VSProject/go/pkg/mod/github.com/bilibili/kratos@v0.3.2-0.20191224125553-6e1180f53a8e/pkg/net/http/blademaster/server.go:98 blademaster: start http listen addr: 0.0.0.0:8000 [warden] config is Deprecated, argument will be ignored. please use -grpc flag or GRPC env to configure warden server. INFO 01/04-19:40:25.500 I:/VSProject/go/pkg/mod/github.com/bilibili/kratos@v0.3.2-0.20191224125553-6e1180f53a8e/pkg/net/rpc/warden/server.go:329 warden: start grpc listen addr: [::]:9000 INFO 01/04-19:40:25.501 I:/VSProject/go/pkg/mod/github.com/bilibili/kratos@v0.3.2-0.20191224125553-6e1180f53a8e/pkg/naming/discovery/discovery.go:248 disocvery: AddWatch(infra.discovery) already watch(false) INFO 01/04-19:40:25.514 I:/VSProject/go/pkg/mod/github.com/bilibili/kratos@v0.3.2-0.20191224125553-6e1180f53a8e/pkg/naming/discovery/discovery.go:631 discovery: successfully polls(http://127.0.0.1:7171/discovery/polls?appid=infra.discovery&env=dev&hostname=DESKTOP-NUEKD5O&latest_timestamp=0) instances ({"infra.discovery":{"instances":{"sh001":[{"region":"sh","zone":"sh001","env":"dev","appid":"infra.discovery","hostname":"test1","addrs":["http://127.0.0.1:7171"],"version":"","latest_timestamp":1578122538945305700,"metadata":null,"status":1}]},"latest_timestamp":1578122538945305700,"scheduler":null}}) INFO 01/04-19:40:25.527 I:/VSProject/go/pkg/mod/github.com/bilibili/kratos@v0.3.2-0.20191224125553-6e1180f53a8e/pkg/naming/discovery/discovery.go:414 discovery: register client.Get(http://127.0.0.1:7171/discovery/register) env(dev) appid(demo.service) addrs([grpc://127.0.0.1:9000]) success
如今咱們跟着日誌走一遍。
如圖理解,服務註冊邏輯應該是register -> renew ->cancel 註冊 而後 不停給心跳 最後取消註冊。
截取一條本地服務註冊日誌
操做大概是:
一、啓動discovery服務
二、啓動demo.server 註冊demo.server appid 服務
三、過一小會等待心跳,關閉demo.server
接着能夠看到整個日誌的過程大體上是 :
一、 0 : 啓動dicovery服務
二、 2/3/4 : 服務初始化
三、 5 : polls 長輪循 infra.discovery 服務自發現
四、 6/7: 新的鏈接 & 服務註冊、這時候咱們起動的demo.server服務註冊上來了
五、 9 : polls 長輪循 infra.discovery 服務自發現
六、 10 : renew心跳
七、 12 : 最後我殺掉了註冊的服務,出現了cancel請求。
從日誌看邏輯理解基本上也沒有太多誤差,接着看看服務發現。
0:discovery -conf discovery-example.toml -log.v=0 1: 2:INFO 01/10-10:31:19.575 C:/server/src/go/src/discovery/discovery/syncup.go:160 discovery 3:changed nodes:[127.0.0.1:7171] zones:map[] 4:INFO 01/10-10:31:19.575 C:/server/src/go/pkg/mod/github.com/bilibili/kratos@v0.1.0/pkg/net/http/blademaster/server.go:98 blademaster: start http listen addr: 127.0.0.1:7171 INFO 01/10-10:31:19.575 C:/server/src/go/src/discovery/registry/registry.go:219 Polls from(test1) new connection(1) 5:INFO 01/10-10:31:31.796 http-access-log ts=0 method=GET ip=127.0.0.1 traceid= user=no_user params=appid=infra.discovery&env=dev&hostname=DESKTOP-9NFHKD0&latest_timestamp=0 msg=0 stack=<nil> err= timeout_quota=39.98 path=/discovery/polls ret=0 6:INFO 01/10-10:31:31.798 C:/server/src/go/src/discovery/registry/registry.go:219 Polls from(DESKTOP-9NFHKD0) new connection(1) 7:INFO 01/10-10:31:31.799 http-access-log method=POST user=no_user path=/discovery/register err= ts=0 params=addrs=grpc%3A%2F%2F127.0.0.1%3A9000&appid=demo.service&env=dev&hostname=DESKTOP-9NFHKD0&metadata=®ion=region01&status=1&version=&zone=zone01 stack=<nil> ret=0 timeout_quota=39.98 ip=127.0.0.1 msg=0 traceid= 8:INFO 01/10-10:32:01.799 C:/server/src/go/src/discovery/registry/registry.go:370 DelConns from(DESKTOP-9NFHKD0) delete(1) 9:ERROR 01/10-10:32:01.799 http-access-log method=GET ip=127.0.0.1 err=-304 timeout_quota=39.98 user=no_user path=/discovery/polls params=appid=infra.discovery&env=dev&hostname=DESKTOP-9NFHKD0&latest_timestamp=1578623479566211700 ret=-304 msg=-304 stack=-304 ts=30.0011342 traceid= 10:INFO 01/10-10:32:01.799 http-access-log msg=0 err= timeout_quota=39.98 method=POST ip=127.0.0.1 user=no_user ret=0 path=/discovery/renew traceid= params=appid=demo.service&env=dev&hostname=DESKTOP-9NFHKD0®ion=region01&zone=zone01 stack=<nil> ts=0 11:INFO 01/10-10:32:01.800 C:/server/src/go/src/discovery/registry/registry.go:219 Polls from(DESKTOP-9NFHKD0) new connection(1) 12:INFO 01/10-10:32:08.499 http-access-log timeout_quota=39.98 path=/discovery/cancel ret=0 stack=<nil> ip=127.0.0.1 msg=0 traceid= ts=0 method=POST user=no_user err= params=appid=demo.service&env=dev&hostname=DESKTOP-9NFHKD0®ion=region01&zone=zone01
一樣先配置discovert節點 set DISCOVERY_NODES=127.0.0.1:7171
NewClient()改爲以下方式
package dao import ( "context" "github.com/bilibili/kratos/pkg/naming/discovery" "github.com/bilibili/kratos/pkg/net/rpc/warden" "github.com/bilibili/kratos/pkg/net/rpc/warden/resolver" "google.golang.org/grpc" ) // AppID your appid, ensure unique. const AppID = "demo.service" // NOTE: example func init(){ // NOTE: 注意這段代碼,表示要使用discovery進行服務發現 // NOTE: 還需注意的是,resolver.Register是全局生效的,因此建議該代碼放在進程初始化的時候執行 // NOTE: !!!切記不要在一個進程內進行多個不一樣中間件的Register!!! // NOTE: 在啓動應用時,能夠經過flag(-discovery.nodes) 或者 環境配置(DISCOVERY_NODES)指定discovery節點 resolver.Register(discovery.Builder()) } // NewClient new member grpc client func NewClient(cfg *warden.ClientConfig, opts ...grpc.DialOption) (DemoClient, error) { client := warden.NewClient(cfg, opts...) conn, err := client.Dial(context.Background(), "discovery://default/"+AppID) if err != nil { return nil, err } // 注意替換這裏: // NewDemoClient方法是在"api"目錄下代碼生成的 // 對應proto文件內自定義的service名字,請使用正確方法名替換 return NewDemoClient(conn), nil }
同時嵌入dao結構裏面、和上次warden direct方式同樣作SayHello接口測試調用。
// dao dao. type dao struct { db *sql.DB redis *redis.Redis mc *memcache.Memcache demoClient demoapi.DemoClient cache *fanout.Fanout demoExpire int32 } // New new a dao and return. func New(r *redis.Redis, mc *memcache.Memcache, db *sql.DB) (d Dao, err error) { var cfg struct{ DemoExpire xtime.Duration } if err = paladin.Get("application.toml").UnmarshalTOML(&cfg); err != nil { return } grpccfg := &warden.ClientConfig{ Dial: xtime.Duration(time.Second * 10), Timeout: xtime.Duration(time.Millisecond * 250), Subset: 50, KeepAliveInterval: xtime.Duration(time.Second * 60), KeepAliveTimeout: xtime.Duration(time.Second * 20), } //paladin.Get("grpc.toml").UnmarshalTOML(grpccfg) var grpcClient demoapi.DemoClient grpcClient, err = NewClient(grpccfg) d = &dao{ db: db, redis: r, mc: mc, demoClient : grpcClient, cache: fanout.New("cache"), demoExpire: int32(time.Duration(cfg.DemoExpire) / time.Second), } return }
操做流程
一、啓動discovery服務
二、啓動demo.server 註冊爲 demo.server 服務
三、啓動demo.client、
四、最後從demo.client的SayHello http接口 調到demo.server的grpc SayHello 接口。
我發現個別時候調用作服務發現,會發現client起不來, context deadline exceeded。
由於我把new client加在了dao裏面,超時的話,demo.client就直接pannic了
根據client日誌能夠發現
warden client: dial discovery://default/demo.service?subset=50 error context deadline exceeded!panic: context deadline exceeded
client : host:127.0.0.1:7171, url:http://127.0.0.1:7171/discovery/polls?appid=infra.discovery&env=dev&hostname=DESKTOP-9NFHKD0&latest_timestamp=1578902420717217500
在調用discovery polls的時候超時了,我配置的grpc dial 期限爲10s, 在官方discovery文檔介紹中寫到discovery在作服務節點自發現的時候,若是server節點實例沒有變動,則接口會阻塞直到30s返回-304。(poll(polls) 接口爲長輪訓接口)
關於服務自發現的話,這裏不細看了,本節只關注應用發現邏輯,感興趣能夠去discovery上看看。
INFO 01/10-15:22:34.436 http-access-log method=GET path=/discovery/polls user=no_user params=appid=infra.discovery&env=dev&hostname=CLII&latest_timestamp=0 stack=<nil> err= timeout_quota=39.98 ts=0 msg=0 traceid= ip=127.0.0.1 ret=0 INFO 01/10-15:22:34.438 C:/server/src/go/src/discovery/registry/registry.go:222 Polls from(CLII) new connection(1) INFO 01/10-15:22:34.440 C:/server/src/go/src/discovery/registry/registry.go:228 Polls from(CLII) reuse connection(2) INFO 01/10-15:22:44.219 C:/server/src/go/src/discovery/registry/registry.go:373 DelConns from(DESKTOP-9NFHKD0) delete(1) ERROR 01/10-15:22:44.219 http-access-log path=/discovery/polls ret=-304 msg=-304 timeout_quota=39.98 ip=127.0.0.1 params=appid=infra.discovery&env=dev&hostname=DESKTOP-9NFHKD0&latest_timestamp=1578637331623587200 user=no_user ts=39.9808023 err=-304 traceid= method=GET stack=-304 INFO 01/10-15:22:44.221 C:/server/src/go/src/discovery/registry/registry.go:222 Polls from(DESKTOP-9NFHKD0) new connection(1) INFO 01/10-15:22:44.525 http-access-log ts=0 method=POST ip=127.0.0.1 user=no_user stack=<nil> path=/discovery/renew err= traceid= ret=0 msg=0 timeout_quota=39.98 params=appid=demo.service&env=dev&hostname=DESKTOP-9NFHKD0®ion=region01&zone=zone01 INFO 01/10-15:23:04.438 C:/server/src/go/src/discovery/registry/registry.go:370 DelConns from(CLII) count decr(2) ERROR 01/10-15:23:04.438 http-access-log msg=-304 ts=30.0002154 method=GET err=-304 stack=-304 timeout_quota=39.98 ip=127.0.0.1 user=no_user path=/discovery/polls params=appid=infra.discovery&env=dev&hostname=CLII&latest_timestamp=1578637331623587200 ret=-304 traceid= INFO 01/10-15:23:04.440 C:/server/src/go/src/discovery/registry/registry.go:373 DelConns from(CLII) delete(1) ERROR 01/10-15:23:04.440 http-access-log ts=30.0013758 traceid= user=no_user path=/discovery/polls ret=-304 err=-304 method=GET ip=127.0.0.1 params=appid=infra.discovery&appid=demo.service&env=dev&hostname=CLII&latest_timestamp=1578637331623587200&latest_timestamp=0 msg=-304 stack=-304 timeout_quota=39.98
結合discovery 日誌
15:22:34的client發dial
15:22:45左右client panic
15:23:04dicovery纔回復一個-304 (實例信息無變動)
這其實是由於 client.Dial() 裏面封裝了grpc官方的服務發現,固然最終走的是kratos warden裏面的實現的grpc官方服務發現邏輯。
下面簡單看看這層邏輯,很繞,我也沒看懂,只能簡單看看,有機會接觸再補個詳細的。
// NewClient new grpc client func NewClient(cfg *warden.ClientConfig, opts ...grpc.DialOption) (demoapi.DemoClient, error) { client := warden.NewClient(cfg, opts...) cc, err := client.Dial(context.Background(), fmt.Sprintf("discovery://default/%s", AppID)) if err != nil { return nil, err } return demoapi.NewDemoClient(cc), nil }
實際上 client.Dial() 裏面會有會有這麼一個流程 :
client.Dial() - > grpc裏面DialContext() -> parser target 的 scheme 而後獲取 (這裏是discovery) 對應的Builder
if cc.dopts.resolverBuilder == nil { // Only try to parse target when resolver builder is not already set. cc.parsedTarget = parseTarget(cc.target) grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme) cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme) if cc.dopts.resolverBuilder == nil { // If resolver builder is still nil, the parsed target's scheme is // not registered. Fallback to default resolver and set Endpoint to // the original target. grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme) cc.parsedTarget = resolver.Target{ Scheme: resolver.GetDefaultScheme(), Endpoint: target, } cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme) } } else { cc.parsedTarget = resolver.Target{Endpoint: target} }
DialContext() 成功會獲得 -> 結構體ClientConn -> ClientConn.resolverWrapper 初始化 -> 調用build()
defer ccr.resolverMu.Unlock() ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)
// ClientConn represents a virtual connection to a conceptual endpoint, to // perform RPCs. // // A ClientConn is free to have zero or more actual connections to the endpoint // based on configuration, load, etc. It is also free to determine which actual // endpoints to use and may change it every RPC, permitting client-side load // balancing. // // A ClientConn encapsulates a range of functionality including name // resolution, TCP connection establishment (with retries and backoff) and TLS // handshakes. It also handles errors on established connections by // re-resolving the name and reconnecting. type ClientConn struct { ctx context.Context cancel context.CancelFunc target string parsedTarget resolver.Target authority string dopts dialOptions csMgr *connectivityStateManager balancerBuildOpts balancer.BuildOptions blockingpicker *pickerWrapper mu sync.RWMutex resolverWrapper *ccResolverWrapper sc *ServiceConfig conns map[*addrConn]struct{} // Keepalive parameter can be updated if a GoAway is received. mkp keepalive.ClientParameters curBalancerName string balancerWrapper *ccBalancerWrapper retryThrottler atomic.Value firstResolveEvent *grpcsync.Event channelzID int64 // channelz unique identification number czData *channelzData }
用戶Builder的實現進行UpdateState —> ClientConn的updateResolverState -> updateResolverState -> Address初始化等grpc官方邏輯
// Builder creates a resolver that will be used to watch name resolution updates. type Builder interface { // Build creates a new resolver for the given target. // // gRPC dial calls Build synchronously, and fails if the returned error is // not nil. Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error) // Scheme returns the scheme supported by this resolver. // Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md. Scheme() string }
// ClientConn contains the callbacks for resolver to notify any updates // to the gRPC ClientConn. // // This interface is to be implemented by gRPC. Users should not need a // brand new implementation of this interface. For the situations like // testing, the new implementation should embed this interface. This allows // gRPC to add new methods to this interface. type ClientConn interface { // UpdateState updates the state of the ClientConn appropriately. UpdateState(State) // ReportError notifies the ClientConn that the Resolver encountered an // error. The ClientConn will notify the load balancer and begin calling // ResolveNow on the Resolver with exponential backoff. ReportError(error) // NewAddress is called by resolver to notify ClientConn a new list // of resolved addresses. // The address list should be the complete list of resolved addresses. // // Deprecated: Use UpdateState instead. NewAddress(addresses []Address) // NewServiceConfig is called by resolver to notify ClientConn a new // service config. The service config should be provided as a json string. // // Deprecated: Use UpdateState instead. NewServiceConfig(serviceConfig string) // ParseServiceConfig parses the provided service config and returns an // object that provides the parsed config. ParseServiceConfig(serviceConfigJSON string) *serviceconfig.ParseResult }
kratos discovery
warden包裝了gRPC的整個服務發現實現邏輯,代碼分別位於pkg/naming/naming.go和warden/resolver/resolver.go中
naming.go
定義了用於描述業務實例的Instance
結構、用於服務註冊的Registry
接口、用於服務發現的Resolver
接口。
// Resolver resolve naming service type Resolver interface { Fetch(context.Context) (*InstancesInfo, bool) Watch() <-chan struct{} Close() error } // Registry Register an instance and renew automatically. type Registry interface { Register(ctx context.Context, ins *Instance) (cancel context.CancelFunc, err error) Close() error } // InstancesInfo instance info. type InstancesInfo struct { Instances map[string][]*Instance `json:"instances"` LastTs int64 `json:"latest_timestamp"` Scheduler *Scheduler `json:"scheduler"` }
resolver.go
內實現了gRPC官方的resolver.Builder
和resolver.Resolver
接口,同時也暴露了naming.go內的naming.Builder
和naming.Resolver
接口
// Resolver resolve naming service type Resolver interface { Fetch(context.Context) (*InstancesInfo, bool) Watch() <-chan struct{} Close() error } // Builder resolver builder. type Builder interface { Build(id string) Resolver Scheme() string }
kratos對grpc的Build作了包裝,只須要傳對應的服務的appid便可:warden/resolver/resolver.go在gRPC進行調用後,會根據Scheme方法查詢對應的naming.Builder
實現並調用Build將id傳入。而實現naming.Resolver
便可經過appid
去對應的服務發現中間件
(這裏是discovery
服務)進行實例信息的查詢(Fetch
接口)、除了簡單進行Fetch操做外還多了Watch
方法,用於監聽服務發現中間件的節點變化狀況,可以實時的進行服務實例信息的更新。
在naming/discovery內實現了基於discovery爲中間件的服務註冊與發現邏輯。大體上也能夠在這裏面看到作了對discovery服務中間件的polls請求。
// Build disovery resovler builder. func (d *Discovery) Build(appid string, opts ...naming.BuildOpt) naming.Resolver { r := &Resolve{ id: appid, d: d, event: make(chan struct{}, 1), opt: new(naming.BuildOptions), } for _, opt := range opts { opt.Apply(r.opt) } d.mutex.Lock() app, ok := d.apps[appid] if !ok { app = &appInfo{ resolver: make(map[*Resolve]struct{}), } d.apps[appid] = app cancel := d.cancelPolls if cancel != nil { cancel() } } app.resolver[r] = struct{}{} d.mutex.Unlock() if ok { select { case r.event <- struct{}{}: default: } } log.Info("disocvery: AddWatch(%s) already watch(%v)", appid, ok) d.once.Do(func() { go d.serverproc() }) return r } func (d *Discovery) serverproc() { var ( retry int ctx context.Context cancel context.CancelFunc ) ticker := time.NewTicker(time.Minute * 30) defer ticker.Stop() for { if ctx == nil { ctx, cancel = context.WithCancel(d.ctx) d.mutex.Lock() d.cancelPolls = cancel d.mutex.Unlock() } select { case <-d.ctx.Done(): return case <-ticker.C: d.switchNode() default: } apps, err := d.polls(ctx) if err != nil { d.switchNode() if ctx.Err() == context.Canceled { ctx = nil continue } time.Sleep(time.Second) retry++ continue } retry = 0 d.broadcast(apps) } }