Go Micro Selector 源碼分析

概況

Micro中的Selector是客戶端級別的負載均衡的組件。當客戶端調用服務端方法時,會根據selector組件中定義的負載均衡策略來選擇Register中註冊的服務器列表中的一個。默認的有隨機策略使用的是隨機策略。使用的是cacheSelector組件,固然咱們能夠根據需求來替換這個組件只要實現Selector的接口就能夠隨時替換組件。下面能夠從新複習一下組件的接口。node

type Selector interface {
    Init(opts ...Option) error
    Options() Options
    // Select returns a function which should return the next node
    
    Select(service string, opts ...SelectOption) (Next, error)
    // Mark sets the success/error against a node
    
    Mark(service string, node *registry.Node, err error)
    // Reset returns state back to zero for a service
    
    Reset(service string)
    // Close renders the selector unusable
    
    Close() error
    // Name of the selector
    
    String() string
}

主要方法和流程

  1. Client調用Call方法
  2. Call方法調用selector組件的Select方法,獲取next函數
  3. call匿名函數中調用next函數(默認爲CacheSelector 隨機獲取服務列表中的節點) 返回node
  4. 以grpcClient爲例,調用grpcClient.call
  5. call函數中獲取conn,而後Invoke調用服務端函數

selector源碼

// Client 調用call函數
func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
    // make a copy of call opts
    callOpts := g.opts.CallOptions
    for _, opt := range opts {
        opt(&callOpts)
    }
    // `1. Client調用Call方法`
    next, err := g.next(req, callOpts)
    if err != nil {
        return err
    }

    // check if we already have a deadline
    d, ok := ctx.Deadline()
    if !ok {
        // no deadline so we create a new one
        ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout)
    } else {
        // got a deadline so no need to setup context
        // but we need to set the timeout we pass along
        opt := client.WithRequestTimeout(time.Until(d))
        opt(&callOpts)
    }

    // should we noop right here?
    select {
    case <-ctx.Done():
        return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
    default:
    }

    // 複製call函數 在下面的goroutine中使用
    gcall := g.call

    // wrap the call in reverse
    for i := len(callOpts.CallWrappers); i > 0; i-- {
        gcall = callOpts.CallWrappers[i-1](gcall)
    }

    // return errors.New("go.micro.client", "request timeout", 408)
    call := func(i int) error {
        // call backoff first. Someone may want an initial start delay
        t, err := callOpts.Backoff(ctx, req, i)
        if err != nil {
            return errors.InternalServerError("go.micro.client", err.Error())
        }

        // only sleep if greater than 0
        if t.Seconds() > 0 {
            time.Sleep(t)
        }

        // 調用next方法,獲取服務器節點
        node, err := next()
        if err != nil && err == selector.ErrNotFound {
            return errors.NotFound("go.micro.client", err.Error())
        } else if err != nil {
            return errors.InternalServerError("go.micro.client", err.Error())
        }

        // 調用grpccall方法 正式發送數據
        err = gcall(ctx, node, req, rsp, callOpts)
        g.opts.Selector.Mark(req.Service(), node, err)
        return err
    }
    // 初始化channel 接受call返回的error 用於重試
    ch := make(chan error, callOpts.Retries+1)
    var gerr error

    for i := 0; i <= callOpts.Retries; i++ {
        go func(i int) {
            ch <- call(i)
        }(i)

        select {
        case <-ctx.Done():
            return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
        case err := <-ch:
            // if the call succeeded lets bail early
            if err == nil {
                return nil
            }

            retry, rerr := callOpts.Retry(ctx, req, i, err)
            if rerr != nil {
                return rerr
            }

            if !retry {
                return err
            }

            gerr = err
        }
    }

    return gerr
}

// grpcClient next方法 調用selector組件的Select方法,獲取next函數
func (g *grpcClient) next(request client.Request, opts client.CallOptions) (selector.Next, error) {
    service := request.Service()

    // get proxy
    if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {
        service = prx
    }

    // get proxy address
    if prx := os.Getenv("MICRO_PROXY_ADDRESS"); len(prx) > 0 {
        opts.Address = []string{prx}
    }

    // return remote address
    if len(opts.Address) > 0 {
        return func() (*registry.Node, error) {
            return &registry.Node{
                Address: opts.Address[0],
            }, nil
        }, nil
    }

    // get next nodes from the selector
    next, err := g.opts.Selector.Select(service, opts.SelectOptions...)
    if err != nil && err == selector.ErrNotFound {
        return nil, errors.NotFound("go.micro.client", err.Error())
    } else if err != nil {
        return nil, errors.InternalServerError("go.micro.client", err.Error())
    }

    return next, nil
}

// 隨機獲取Registry中服務列表中的一個 策略是從上層方法傳進來的
func Random(services []*registry.Service) Next {
    var nodes []*registry.Node

    for _, service := range services {
        nodes = append(nodes, service.Nodes...)
    }

    return func() (*registry.Node, error) {
        if len(nodes) == 0 {
            return nil, ErrNoneAvailable
        }

        i := rand.Int() % len(nodes)
        return nodes[i], nil
    }
}

默認registrySelector(CacheSelector)源碼分析

主體方法

// 默認selector對象
type registrySelector struct {
    so Options
    rc cache.Cache
}

// Options設置 只有一個time to live 設置
type Options struct {
    // TTL is the cache TTL
    TTL time.Duration
}

// Cache 對象
type cache struct {
    // Registry對象若是過時則經過這個對象獲取
    registry.Registry
    // Options 設置TTL
    opts Options
    // 鎖用來控制併發問題
    sync.RWMutex
    // 利用map緩存服務器節點 用name做爲key
    cache   map[string][]*registry.Service
    // map保存的服務器過時時間
    ttls    map[string]time.Time
    watched map[string]bool
    //
    exit chan bool
}

// CacheSelector獲取服務節點核心方法
// 利用若是找到則直接返回若是找不到則請求Registry獲取服務器節點列表
func (c *cache) get(service string) ([]*registry.Service, error) {
    // read lock
    c.RLock()

    // 先獲取緩存中的服務節點
    services := c.cache[service]
    // 獲取服務節點的ttl
    ttl := c.ttls[service]

    if c.isValid(services, ttl) {
        // make a copy
        cp := registry.Copy(services)
        // unlock the read
        c.RUnlock()
        // return servics
        return cp, nil
    }

    // get does the actual request for a service and cache it
    get := func(service string) ([]*registry.Service, error) {
        // 若是緩存不存在則
        services, err := c.Registry.GetService(service)
        if err != nil {
            return nil, err
        }

        // cache results
        c.Lock()
        // 設置緩存 並同時設置TTL
        c.set(service, registry.Copy(services))
        c.Unlock()

        return services, nil
    }

    // watch service if not watched
    if _, ok := c.watched[service]; !ok {
        go c.run(service)
    }

    // unlock the read lock
    c.RUnlock()

    // get and return services
    return get(service)
}

Watch

在調用cache.get函數執行完成以後 會建立一個goroutine來監控service節點。調用的是Register中的watch方法,獲取到result而後判斷是否要添加、修改或者刪除緩存中的服務器節點。緩存

// get方法調用run函數啓動監控
if _, ok := c.watched[service]; !ok {
    go c.run(service)
}

func (c *cache) run(service string) {
    // set watcher
    c.Lock()
    c.watched[service] = true
    c.Unlock()

    // delete watcher on exit
    defer func() {
        c.Lock()
        delete(c.watched, service)
        c.Unlock()
    }()

    var a, b int

    for {
        // exit early if already dead
        if c.quit() {
            return
        }

        // 設施隔多久檢測服務是否可用
        j := rand.Int63n(100)
        time.Sleep(time.Duration(j) * time.Millisecond)

        // 建立一個watcher
        w, err := c.Registry.Watch(
            registry.WatchService(service),
        )

        if err != nil {
            if c.quit() {
                return
            }

            d := backoff(a)

            if a > 3 {
                log.Log("rcache: ", err, " backing off ", d)
                a = 0
            }

            time.Sleep(d)
            a++

            continue
        }

        // reset a
        a = 0

        // watch循環下一個事件並調用update
        // update函數根據 Register 返回的result 來修改本地緩存中的節點信息
        if err := c.watch(w); err != nil {
            if c.quit() {
                return
            }

            d := backoff(b)

            if b > 3 {
                log.Log("rcache: ", err, " backing off ", d)
                b = 0
            }

            time.Sleep(d)
            b++

            continue
        }

        // reset b
        b = 0
    }
}
相關文章
相關標籤/搜索