micro.newService()中newOptionsnode
func newOptions(opts ...Option) Options { opt := Options{ Auth: auth.DefaultAuth, Broker: broker.DefaultBroker, Cmd: cmd.DefaultCmd, Config: config.DefaultConfig, Client: client.DefaultClient, Server: server.DefaultServer, Store: store.DefaultStore, Registry: registry.DefaultRegistry, Router: router.DefaultRouter, Runtime: runtime.DefaultRuntime, Transport: transport.DefaultTransport, Context: context.Background(), Signal: true, } for _, o := range opts { o(&opt) } return opt }
初始化了一堆基礎設置,來看看Client, client.DefaultClient,
這裏不要直接去看client/client.go中的newRpcClient()
,由於在micro/defaults.go中已經初始化了client,默認是grpcclient.DefaultClient = gcli.NewClient()
golang
func newClient(opts ...client.Option) client.Client { options := client.NewOptions() // default content type for grpc options.ContentType = "application/grpc+proto" for _, o := range opts { o(&options) } rc := &grpcClient{ opts: options, } rc.once.Store(false) rc.pool = newPool(options.PoolSize, options.PoolTTL, rc.poolMaxIdle(), rc.poolMaxStreams()) c := client.Client(rc) // wrap in reverse for i := len(options.Wrappers); i > 0; i-- { c = options.Wrappers[i-1](c) } return c } func NewClient(opts ...client.Option) client.Client { return newClient(opts...) }
這裏作了如下事情web
type grpcClient struct { opts client.Options pool *pool once atomic.Value } // Client is the interface used to make requests to services. // It supports Request/Response via Transport and Publishing via the Broker. // It also supports bidirectional streaming of requests. type Client interface { Init(...Option) error Options() Options NewMessage(topic string, msg interface{}, opts ...MessageOption) Message NewRequest(service, endpoint string, req interface{}, reqOpts ...RequestOption) Request Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) Publish(ctx context.Context, msg Message, opts ...PublishOption) error String() string }
newClient()中的c := client.Client(rc)
說明一下,segmentfault
grpcClient只有3個屬性,client.Client(rc)調用一下,讓rc變轉Client的實例,在grpc.go中也能夠看到grpcClient實現了Client定義的所有方法緩存
結合實例看example/client/main.go中call()
,發起請求app
func call(i int, c client.Client) { // Create new request to service go.micro.srv.example, method Example.Call req := c.NewRequest("go.micro.srv.example", "Example.Call", &example.Request{ Name: "John", }) // create context with metadata ctx := metadata.NewContext(context.Background(), map[string]string{ "X-User-Id": "john", "X-From-Id": "script", }) rsp := &example.Response{} // Call service if err := c.Call(ctx, req, rsp); err != nil { fmt.Println("call err: ", err, rsp) return } fmt.Println("Call:", i, "rsp:", rsp.Msg) } func (g *grpcClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request { return newGRPCRequest(service, method, req, g.opts.ContentType, reqOpts...) } func newGRPCRequest(service, method string, request interface{}, contentType string, reqOpts ...client.RequestOption) client.Request { var opts client.RequestOptions for _, o := range reqOpts { o(&opts) } // set the content-type specified if len(opts.ContentType) > 0 { contentType = opts.ContentType } return &grpcRequest{ service: service, method: method, request: request, contentType: contentType, opts: opts, } }
NewRequest()
最終獲得一個grpcRequest{}實例,而後調用grpcClient.Call()
ctx能夠附加信息(放在請求的header中),在請求週期中均可獲取,trace,auth等組件均可以使用dom
func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { if req == nil { return errors.InternalServerError("go.micro.client", "req is nil") } else if rsp == nil { return errors.InternalServerError("go.micro.client", "rsp is nil") } // make a copy of call opts callOpts := g.opts.CallOptions for _, opt := range opts { opt(&callOpts) } // check if we already have a deadline d, ok := ctx.Deadline() if !ok { // no deadline so we create a new one var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, callOpts.RequestTimeout) defer cancel() } else { // got a deadline so no need to setup context // but we need to set the timeout we pass along opt := client.WithRequestTimeout(time.Until(d)) opt(&callOpts) } // should we noop right here? select { case <-ctx.Done(): return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) default: } // make copy of call method gcall := g.call // wrap the call in reverse for i := len(callOpts.CallWrappers); i > 0; i-- { gcall = callOpts.CallWrappers[i-1](gcall) } // return errors.New("go.micro.client", "request timeout", 408) call := func(i int) error { // call backoff first. Someone may want an initial start delay t, err := callOpts.Backoff(ctx, req, i) if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } // only sleep if greater than 0 if t.Seconds() > 0 { time.Sleep(t) } // lookup the route to send the reques to route, err := g.lookupRoute(req, callOpts) if err != nil { return err } // pass a node to enable backwards compatability as changing the // call func would be a breaking change. // todo v3: change the call func to accept a route node := ®istry.Node{Address: route.Address} // make the call err = gcall(ctx, node, req, rsp, callOpts) // record the result of the call to inform future routing decisions g.opts.Selector.Record(*route, err) // try and transform the error to a go-micro error if verr, ok := err.(*errors.Error); ok { return verr } return err } ch := make(chan error, callOpts.Retries+1) var gerr error for i := 0; i <= callOpts.Retries; i++ { go func(i int) { ch <- call(i) }(i) select { case <-ctx.Done(): return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) case err := <-ch: // if the call succeeded lets bail early if err == nil { return nil } retry, rerr := callOpts.Retry(ctx, req, i, err) if rerr != nil { return rerr } if !retry { return err } gerr = err } } return gerr }
這裏作了如下事情函數
聲明call()函數,oop
調用g.lookupRoute()
,返回router.Route{}fetch
opts.Router.Lookup(query...) ->query()(router/default.go)
r.table = newTable(r.fetchRoutes)
設置了table.fetchRoutes。定義registry.Node,調用gcall(),即grpcClient.call()
定義重試次數(默認1),調用第5步的call()
這就是一次call請求的流程,這裏沒有深刻底層grpc是如何發起請求的,感興趣的同窗能夠查閱下代碼
若有錯漏,請留言告知,Thanks♪(・ω・)ノ
go micro 分析系列文章
go micro server 啓動分析
go micro client
go micro broker
go micro cmd
go micro config
go micro store
go micro registry
go micro router
go micro runtime
go micro transport
go micro web
go micro registry 插件consul
go micro plugin
go micro jwt 網關鑑權
go micro 鏈路追蹤
go micro 熔斷與限流
go micro wrapper 中間件
go micro metrics 接入Prometheus、Grafana