go micro client

micro.newService()中newOptionsnode

func newOptions(opts ...Option) Options {
    opt := Options{
        Auth:      auth.DefaultAuth,
        Broker:    broker.DefaultBroker,
        Cmd:       cmd.DefaultCmd,
        Config:    config.DefaultConfig,
        Client:    client.DefaultClient,
        Server:    server.DefaultServer,
        Store:     store.DefaultStore,
        Registry:  registry.DefaultRegistry,
        Router:    router.DefaultRouter,
        Runtime:   runtime.DefaultRuntime,
        Transport: transport.DefaultTransport,
        Context:   context.Background(),
        Signal:    true,
    }

    for _, o := range opts {
        o(&opt)
    }

    return opt
}

初始化了一堆基礎設置,來看看Client, client.DefaultClient,
這裏不要直接去看client/client.go中的newRpcClient(),由於在micro/defaults.go中已經初始化了client,默認是grpc
client.DefaultClient = gcli.NewClient()golang

func newClient(opts ...client.Option) client.Client {
    options := client.NewOptions()
    // default content type for grpc
    options.ContentType = "application/grpc+proto"

    for _, o := range opts {
        o(&options)
    }

    rc := &grpcClient{
        opts: options,
    }
    rc.once.Store(false)

    rc.pool = newPool(options.PoolSize, options.PoolTTL, rc.poolMaxIdle(), rc.poolMaxStreams())

    c := client.Client(rc)

    // wrap in reverse
    for i := len(options.Wrappers); i > 0; i-- {
        c = options.Wrappers[i-1](c)
    }

    return c
}

func NewClient(opts ...client.Option) client.Client {
    return newClient(opts...)
}

這裏作了如下事情web

  1. 初始化並設置options,設置ContentType爲"application/grpc+proto"
  2. 實例化grpcClient{}
  3. 依次調用client Wrapper中間件,注意是倒着調用哦
type grpcClient struct {
    opts client.Options
    pool *pool
    once atomic.Value
}

// Client is the interface used to make requests to services.
// It supports Request/Response via Transport and Publishing via the Broker.
// It also supports bidirectional streaming of requests.
type Client interface {
    Init(...Option) error
    Options() Options
    NewMessage(topic string, msg interface{}, opts ...MessageOption) Message
    NewRequest(service, endpoint string, req interface{}, reqOpts ...RequestOption) Request
    Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
    Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error)
    Publish(ctx context.Context, msg Message, opts ...PublishOption) error
    String() string
}

newClient()中的c := client.Client(rc)說明一下,segmentfault

grpcClient只有3個屬性,client.Client(rc)調用一下,讓rc變轉Client的實例,在grpc.go中也能夠看到grpcClient實現了Client定義的所有方法緩存

結合實例看example/client/main.go中call(),發起請求app

func call(i int, c client.Client) {
    // Create new request to service go.micro.srv.example, method Example.Call
    req := c.NewRequest("go.micro.srv.example", "Example.Call", &example.Request{
        Name: "John",
    })

    // create context with metadata
    ctx := metadata.NewContext(context.Background(), map[string]string{
        "X-User-Id": "john",
        "X-From-Id": "script",
    })

    rsp := &example.Response{}

    // Call service
    if err := c.Call(ctx, req, rsp); err != nil {
        fmt.Println("call err: ", err, rsp)
        return
    }

    fmt.Println("Call:", i, "rsp:", rsp.Msg)
}

func (g *grpcClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
    return newGRPCRequest(service, method, req, g.opts.ContentType, reqOpts...)
}

func newGRPCRequest(service, method string, request interface{}, contentType string, reqOpts ...client.RequestOption) client.Request {
    var opts client.RequestOptions
    for _, o := range reqOpts {
        o(&opts)
    }

    // set the content-type specified
    if len(opts.ContentType) > 0 {
        contentType = opts.ContentType
    }

    return &grpcRequest{
        service:     service,
        method:      method,
        request:     request,
        contentType: contentType,
        opts:        opts,
    }
}

NewRequest()最終獲得一個grpcRequest{}實例,而後調用grpcClient.Call()
ctx能夠附加信息(放在請求的header中),在請求週期中均可獲取,trace,auth等組件均可以使用dom

func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
    if req == nil {
        return errors.InternalServerError("go.micro.client", "req is nil")
    } else if rsp == nil {
        return errors.InternalServerError("go.micro.client", "rsp is nil")
    }
    // make a copy of call opts
    callOpts := g.opts.CallOptions
    for _, opt := range opts {
        opt(&callOpts)
    }

    // check if we already have a deadline
    d, ok := ctx.Deadline()
    if !ok {
        // no deadline so we create a new one
        var cancel context.CancelFunc
        ctx, cancel = context.WithTimeout(ctx, callOpts.RequestTimeout)
        defer cancel()
    } else {
        // got a deadline so no need to setup context
        // but we need to set the timeout we pass along
        opt := client.WithRequestTimeout(time.Until(d))
        opt(&callOpts)
    }

    // should we noop right here?
    select {
    case <-ctx.Done():
        return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
    default:
    }

    // make copy of call method
    gcall := g.call

    // wrap the call in reverse
    for i := len(callOpts.CallWrappers); i > 0; i-- {
        gcall = callOpts.CallWrappers[i-1](gcall)
    }

    // return errors.New("go.micro.client", "request timeout", 408)
    call := func(i int) error {
        // call backoff first. Someone may want an initial start delay
        t, err := callOpts.Backoff(ctx, req, i)
        if err != nil {
            return errors.InternalServerError("go.micro.client", err.Error())
        }

        // only sleep if greater than 0
        if t.Seconds() > 0 {
            time.Sleep(t)
        }

        // lookup the route to send the reques to
        route, err := g.lookupRoute(req, callOpts)
        if err != nil {
            return err
        }

        // pass a node to enable backwards compatability as changing the
        // call func would be a breaking change.
        // todo v3: change the call func to accept a route
        node := &registry.Node{Address: route.Address}

        // make the call
        err = gcall(ctx, node, req, rsp, callOpts)

        // record the result of the call to inform future routing decisions
        g.opts.Selector.Record(*route, err)

        // try and transform the error to a go-micro error
        if verr, ok := err.(*errors.Error); ok {
            return verr
        }

        return err
    }

    ch := make(chan error, callOpts.Retries+1)
    var gerr error

    for i := 0; i <= callOpts.Retries; i++ {
        go func(i int) {
            ch <- call(i)
        }(i)

        select {
        case <-ctx.Done():
            return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
        case err := <-ch:
            // if the call succeeded lets bail early
            if err == nil {
                return nil
            }

            retry, rerr := callOpts.Retry(ctx, req, i, err)
            if rerr != nil {
                return rerr
            }

            if !retry {
                return err
            }

            gerr = err
        }
    }

    return gerr
}

這裏作了如下事情函數

  1. 設置g.opts.CallOptions
  2. 設置ctx超時,超時返回錯誤408
  3. 複製了grpcClient.call()給變量gcall
  4. 依次調用callOpts.CallWrappers 中間件,注意這裏,前面是client中間件,這裏是call中間件,具體在中間件文章講
  5. 聲明call()函數,oop

    1. 設置delay 機制
    2. 調用g.lookupRoute(),返回router.Route{}fetch

      1. 初始化router.QueryOption
      2. opts.Router.Lookup(query...) ->query()(router/default.go)

        1. 從緩存查t.routes[opts.Service],查不到就調用t.fetchRoutes(opts.Service)獲取路由表後再取,這裏的fetchRoutes()並非router/table.go中的函數,而是router/default.go中router.fetchRoutes(),在router/default.go中的newRouter()裏r.table = newTable(r.fetchRoutes)設置了table.fetchRoutes。
      3. 設置opts.Selector,默認random
      4. opts.Selector.Select(routes, opts.SelectOptions...)選擇合適的route發起請求,只有一個就直接返回,多個就隨機返回一個,selector能夠有其餘實現方式實現,具體看實際業務了
    3. 定義registry.Node,調用gcall(),即grpcClient.call()

      1. 定義header,加入timeout,x-content-type,放入ctx
      2. 設置grpc.DialOption{}等參數,而後調用g.pool.getConn(),使用鏈接池發起鏈接
      3. 開協程,設置grpc.CallOption{},發起遠程調用cc.Invoke()(google.golang.org/grpc/call.go),完成後發出通知給ch chan
      4. 監聽調用完成ch chan,ctx超時chan
    4. 記錄調用結果,用於優化選擇,然而random的Selector裏面啥也沒幹
  6. 定義重試次數(默認1),調用第5步的call()

    1. 接受ctx超時信號(返回408錯誤),
    2. call()中完成的信號,成功完成就返回nill,否則就調用Retry()

這就是一次call請求的流程,這裏沒有深刻底層grpc是如何發起請求的,感興趣的同窗能夠查閱下代碼

若有錯漏,請留言告知,Thanks♪(・ω・)ノ

go micro 分析系列文章
go micro server 啓動分析
go micro client
go micro broker
go micro cmd
go micro config
go micro store
go micro registry
go micro router
go micro runtime
go micro transport
go micro web
go micro registry 插件consul
go micro plugin
go micro jwt 網關鑑權
go micro 鏈路追蹤
go micro 熔斷與限流
go micro wrapper 中間件
go micro metrics 接入Prometheus、Grafana

相關文章
相關標籤/搜索