Micro中的Selector是客戶端級別的負載均衡的組件。當客戶端調用服務端方法時,會根據selector組件中定義的負載均衡策略來選擇Register中註冊的服務器列表中的一個。默認的有隨機策略使用的是隨機策略。使用的是cacheSelector組件,固然咱們能夠根據需求來替換這個組件只要實現Selector的接口就能夠隨時替換組件。下面能夠從新複習一下組件的接口。node
type Selector interface { Init(opts ...Option) error Options() Options // Select returns a function which should return the next node Select(service string, opts ...SelectOption) (Next, error) // Mark sets the success/error against a node Mark(service string, node *registry.Node, err error) // Reset returns state back to zero for a service Reset(service string) // Close renders the selector unusable Close() error // Name of the selector String() string }
// Client 調用call函數 func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { // make a copy of call opts callOpts := g.opts.CallOptions for _, opt := range opts { opt(&callOpts) } // `1. Client調用Call方法` next, err := g.next(req, callOpts) if err != nil { return err } // check if we already have a deadline d, ok := ctx.Deadline() if !ok { // no deadline so we create a new one ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout) } else { // got a deadline so no need to setup context // but we need to set the timeout we pass along opt := client.WithRequestTimeout(time.Until(d)) opt(&callOpts) } // should we noop right here? select { case <-ctx.Done(): return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) default: } // 複製call函數 在下面的goroutine中使用 gcall := g.call // wrap the call in reverse for i := len(callOpts.CallWrappers); i > 0; i-- { gcall = callOpts.CallWrappers[i-1](gcall) } // return errors.New("go.micro.client", "request timeout", 408) call := func(i int) error { // call backoff first. Someone may want an initial start delay t, err := callOpts.Backoff(ctx, req, i) if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } // only sleep if greater than 0 if t.Seconds() > 0 { time.Sleep(t) } // 調用next方法,獲取服務器節點 node, err := next() if err != nil && err == selector.ErrNotFound { return errors.NotFound("go.micro.client", err.Error()) } else if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } // 調用grpccall方法 正式發送數據 err = gcall(ctx, node, req, rsp, callOpts) g.opts.Selector.Mark(req.Service(), node, err) return err } // 初始化channel 接受call返回的error 用於重試 ch := make(chan error, callOpts.Retries+1) var gerr error for i := 0; i <= callOpts.Retries; i++ { go func(i int) { ch <- call(i) }(i) select { case <-ctx.Done(): return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) case err := <-ch: // if the call succeeded lets bail early if err == nil { return nil } retry, rerr := callOpts.Retry(ctx, req, i, err) if rerr != nil { return rerr } if !retry { return err } gerr = err } } return gerr } // grpcClient next方法 調用selector組件的Select方法,獲取next函數 func (g *grpcClient) next(request client.Request, opts client.CallOptions) (selector.Next, error) { service := request.Service() // get proxy if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 { service = prx } // get proxy address if prx := os.Getenv("MICRO_PROXY_ADDRESS"); len(prx) > 0 { opts.Address = []string{prx} } // return remote address if len(opts.Address) > 0 { return func() (*registry.Node, error) { return ®istry.Node{ Address: opts.Address[0], }, nil }, nil } // get next nodes from the selector next, err := g.opts.Selector.Select(service, opts.SelectOptions...) if err != nil && err == selector.ErrNotFound { return nil, errors.NotFound("go.micro.client", err.Error()) } else if err != nil { return nil, errors.InternalServerError("go.micro.client", err.Error()) } return next, nil } // 隨機獲取Registry中服務列表中的一個 策略是從上層方法傳進來的 func Random(services []*registry.Service) Next { var nodes []*registry.Node for _, service := range services { nodes = append(nodes, service.Nodes...) } return func() (*registry.Node, error) { if len(nodes) == 0 { return nil, ErrNoneAvailable } i := rand.Int() % len(nodes) return nodes[i], nil } }
// 默認selector對象 type registrySelector struct { so Options rc cache.Cache } // Options設置 只有一個time to live 設置 type Options struct { // TTL is the cache TTL TTL time.Duration } // Cache 對象 type cache struct { // Registry對象若是過時則經過這個對象獲取 registry.Registry // Options 設置TTL opts Options // 鎖用來控制併發問題 sync.RWMutex // 利用map緩存服務器節點 用name做爲key cache map[string][]*registry.Service // map保存的服務器過時時間 ttls map[string]time.Time watched map[string]bool // exit chan bool } // CacheSelector獲取服務節點核心方法 // 利用若是找到則直接返回若是找不到則請求Registry獲取服務器節點列表 func (c *cache) get(service string) ([]*registry.Service, error) { // read lock c.RLock() // 先獲取緩存中的服務節點 services := c.cache[service] // 獲取服務節點的ttl ttl := c.ttls[service] if c.isValid(services, ttl) { // make a copy cp := registry.Copy(services) // unlock the read c.RUnlock() // return servics return cp, nil } // get does the actual request for a service and cache it get := func(service string) ([]*registry.Service, error) { // 若是緩存不存在則 services, err := c.Registry.GetService(service) if err != nil { return nil, err } // cache results c.Lock() // 設置緩存 並同時設置TTL c.set(service, registry.Copy(services)) c.Unlock() return services, nil } // watch service if not watched if _, ok := c.watched[service]; !ok { go c.run(service) } // unlock the read lock c.RUnlock() // get and return services return get(service) }
在調用cache.get函數執行完成以後 會建立一個goroutine來監控service節點。調用的是Register中的watch方法,獲取到result而後判斷是否要添加、修改或者刪除緩存中的服務器節點。緩存
// get方法調用run函數啓動監控 if _, ok := c.watched[service]; !ok { go c.run(service) } func (c *cache) run(service string) { // set watcher c.Lock() c.watched[service] = true c.Unlock() // delete watcher on exit defer func() { c.Lock() delete(c.watched, service) c.Unlock() }() var a, b int for { // exit early if already dead if c.quit() { return } // 設施隔多久檢測服務是否可用 j := rand.Int63n(100) time.Sleep(time.Duration(j) * time.Millisecond) // 建立一個watcher w, err := c.Registry.Watch( registry.WatchService(service), ) if err != nil { if c.quit() { return } d := backoff(a) if a > 3 { log.Log("rcache: ", err, " backing off ", d) a = 0 } time.Sleep(d) a++ continue } // reset a a = 0 // watch循環下一個事件並調用update // update函數根據 Register 返回的result 來修改本地緩存中的節點信息 if err := c.watch(w); err != nil { if c.quit() { return } d := backoff(b) if b > 3 { log.Log("rcache: ", err, " backing off ", d) b = 0 } time.Sleep(d) b++ continue } // reset b b = 0 } }