Go Micro Client 源碼分析


Client 主要是用來執行請求服務和訂閱發佈事件。是對於broker,Transort的一種封裝方便使用。node



  1. 初始化鏈接池數量和鏈接池TTL
  2. 調用注入的opts函數列表
  3. 最後初始化鏈接池
func (r *rpcClient) Init(opts ...Option) error {
    size := r.opts.PoolSize
    ttl := r.opts.PoolTTL

    for _, o := range opts {

    // update pool configuration if the options changed
    if size != r.opts.PoolSize || ttl != r.opts.PoolTTL {
        r.pool.size = r.opts.PoolSize
        r.pool.ttl = int64(r.opts.PoolTTL.Seconds())

    return nil


Call是Client接口中最主要的方法,在以前Go Micro Selector 源碼分析app

  1. Client調用Call方法
  2. Call方法調用selector組件的Select方法,獲取next函數
  3. call匿名函數中調用next函數(默認爲CacheSelector 隨機獲取服務列表中的節點, Go Micro Selector 源碼分析) 返回node
  4. 以grpcClient爲例,調用grpcClient.call
  5. call函數中獲取conn,而後Invoke調用服務端函數
func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
    // 複製出options
    callOpts := g.opts.CallOptions
    for _, opt := range opts {
    // 調用next函數 獲取selector
    next, err := g.next(req, callOpts)
    if err != nil {
        return err

    // 檢查context Deadline
    d, ok := ctx.Deadline()
    if !ok {
        // 沒有deadline 建立一個新的
        ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout)
    } else {
        // 獲取到deadline設置context 
        opt := client.WithRequestTimeout(time.Until(d))

    // should we noop right here?
    select {
    case <-ctx.Done():
        return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)

    // 複製call函數 在下面的goroutine中使用
    gcall := g.call

    // wrap the call in reverse
    for i := len(callOpts.CallWrappers); i > 0; i-- {
        gcall = callOpts.CallWrappers[i-1](gcall)

    // return errors.New("go.micro.client", "request timeout", 408)
    call := func(i int) error {
        // call backoff first. Someone may want an initial start delay
        t, err := callOpts.Backoff(ctx, req, i)
        if err != nil {
            return errors.InternalServerError("go.micro.client", err.Error())

        // only sleep if greater than 0
        if t.Seconds() > 0 {

        // select next node
        node, err := next()
        if err != nil && err == selector.ErrNotFound {
            return errors.NotFound("go.micro.client", err.Error())
        } else if err != nil {
            return errors.InternalServerError("go.micro.client", err.Error())

        // 調用call 正式調用服務端接口
        err = gcall(ctx, node, req, rsp, callOpts)
        g.opts.Selector.Mark(req.Service(), node, err)
        return err

    ch := make(chan error, callOpts.Retries+1)
    var gerr error
    // 重試 
    for i := 0; i <= callOpts.Retries; i++ {
        go func(i int) {
            // 調動call 返回channel 
            ch <- call(i)

        select {
        case <-ctx.Done():
            return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
        case err := <-ch:
            // if the call succeeded lets bail early
            if err == nil {
                return nil

            retry, rerr := callOpts.Retry(ctx, req, i, err)
            if rerr != nil {
                return rerr

            if !retry {
                return err

            gerr = err

    return gerr



func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOption) (Stream, error) {
    // make a copy of call opts
    callOpts := r.opts.CallOptions
    for _, opt := range opts {

    next, err := r.next(request, callOpts)
    if err != nil {
        return nil, err

    // should we noop right here?
    select {
    case <-ctx.Done():
        return nil, errors.Timeout("go.micro.client", fmt.Sprintf("%v", ctx.Err()))

    call := func(i int) (Stream, error) {
        // call backoff first. Someone may want an initial start delay
        t, err := callOpts.Backoff(ctx, request, i)
        if err != nil {
            return nil, errors.InternalServerError("go.micro.client", "backoff error: %v", err.Error())

        // only sleep if greater than 0
        if t.Seconds() > 0 {

        node, err := next()
        if err != nil && err == selector.ErrNotFound {
            return nil, errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
        } else if err != nil {
            return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())

        stream, err := r.stream(ctx, node, request, callOpts)
        r.opts.Selector.Mark(request.Service(), node, err)
        return stream, err

    type response struct {
        stream Stream
        err    error

    ch := make(chan response, callOpts.Retries+1)
    var grr error

    for i := 0; i <= callOpts.Retries; i++ {
        go func(i int) {
            s, err := call(i)
            ch <- response{s, err}

        select {
        case <-ctx.Done():
            return nil, errors.Timeout("go.micro.client", fmt.Sprintf("call timeout: %v", ctx.Err()))
        case rsp := <-ch:
            // if the call succeeded lets bail early
            if rsp.err == nil {
                return rsp.stream, nil

            retry, rerr := callOpts.Retry(ctx, request, i, rsp.err)
            if rerr != nil {
                return nil, rerr

            if !retry {
                return nil, rsp.err

            grr = rsp.err

    return nil, grr


然而在client的publish函數中,獲取了topic準備了body 最後調用broker的publishoop

func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOption) error {
    options := PublishOptions{
        Context: context.Background(),
    for _, o := range opts {

    md, ok := metadata.FromContext(ctx)
    if !ok {
        md = make(map[string]string)

    id := uuid.New().String()
    md["Content-Type"] = msg.ContentType()
    md["Micro-Topic"] = msg.Topic()
    md["Micro-Id"] = id

    // set the topic
    topic := msg.Topic()

    // get proxy
    if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {
        options.Exchange = prx

    // get the exchange
    if len(options.Exchange) > 0 {
        topic = options.Exchange

    // encode message body
    cf, err := r.newCodec(msg.ContentType())
    if err != nil {
        return errors.InternalServerError("go.micro.client", err.Error())
    b := &buffer{bytes.NewBuffer(nil)}
    if err := cf(b).Write(&codec.Message{
        Target: topic,
        Type:   codec.Event,
        Header: map[string]string{
            "Micro-Id":    id,
            "Micro-Topic": msg.Topic(),
    }, msg.Payload()); err != nil {
        return errors.InternalServerError("go.micro.client", err.Error())
    r.once.Do(func() {

    return r.opts.Broker.Publish(topic, &broker.Message{
        Header: md,
        Body:   b.Bytes(),