Client 主要是用來執行請求服務和訂閱發佈事件。是對於broker,Transort的一種封裝方便使用。node
Init
初始化客戶端函數segmentfault
func (r *rpcClient) Init(opts ...Option) error { size := r.opts.PoolSize ttl := r.opts.PoolTTL for _, o := range opts { o(&r.opts) } // update pool configuration if the options changed if size != r.opts.PoolSize || ttl != r.opts.PoolTTL { r.pool.Lock() r.pool.size = r.opts.PoolSize r.pool.ttl = int64(r.opts.PoolTTL.Seconds()) r.pool.Unlock() } return nil }
==Call==
Call是Client接口中最主要的方法,在以前Go Micro Selector 源碼分析app
func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { // 複製出options callOpts := g.opts.CallOptions for _, opt := range opts { opt(&callOpts) } // 調用next函數 獲取selector next, err := g.next(req, callOpts) if err != nil { return err } // 檢查context Deadline d, ok := ctx.Deadline() if !ok { // 沒有deadline 建立一個新的 ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout) } else { // 獲取到deadline設置context opt := client.WithRequestTimeout(time.Until(d)) opt(&callOpts) } // should we noop right here? select { case <-ctx.Done(): return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) default: } // 複製call函數 在下面的goroutine中使用 gcall := g.call // wrap the call in reverse for i := len(callOpts.CallWrappers); i > 0; i-- { gcall = callOpts.CallWrappers[i-1](gcall) } // return errors.New("go.micro.client", "request timeout", 408) call := func(i int) error { // call backoff first. Someone may want an initial start delay t, err := callOpts.Backoff(ctx, req, i) if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } // only sleep if greater than 0 if t.Seconds() > 0 { time.Sleep(t) } // select next node node, err := next() if err != nil && err == selector.ErrNotFound { return errors.NotFound("go.micro.client", err.Error()) } else if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } // 調用call 正式調用服務端接口 err = gcall(ctx, node, req, rsp, callOpts) g.opts.Selector.Mark(req.Service(), node, err) return err } ch := make(chan error, callOpts.Retries+1) var gerr error // 重試 for i := 0; i <= callOpts.Retries; i++ { go func(i int) { // 調動call 返回channel ch <- call(i) }(i) select { case <-ctx.Done(): return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) case err := <-ch: // if the call succeeded lets bail early if err == nil { return nil } retry, rerr := callOpts.Retry(ctx, req, i, err) if rerr != nil { return rerr } if !retry { return err } gerr = err } } return gerr }
Stream
Stream跟call的邏輯幾乎是同樣的,不過stream調用的是rpc_client.stream函數。這邊就不過多的分析了函數
func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOption) (Stream, error) { // make a copy of call opts callOpts := r.opts.CallOptions for _, opt := range opts { opt(&callOpts) } next, err := r.next(request, callOpts) if err != nil { return nil, err } // should we noop right here? select { case <-ctx.Done(): return nil, errors.Timeout("go.micro.client", fmt.Sprintf("%v", ctx.Err())) default: } call := func(i int) (Stream, error) { // call backoff first. Someone may want an initial start delay t, err := callOpts.Backoff(ctx, request, i) if err != nil { return nil, errors.InternalServerError("go.micro.client", "backoff error: %v", err.Error()) } // only sleep if greater than 0 if t.Seconds() > 0 { time.Sleep(t) } node, err := next() if err != nil && err == selector.ErrNotFound { return nil, errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error()) } else if err != nil { return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error()) } stream, err := r.stream(ctx, node, request, callOpts) r.opts.Selector.Mark(request.Service(), node, err) return stream, err } type response struct { stream Stream err error } ch := make(chan response, callOpts.Retries+1) var grr error for i := 0; i <= callOpts.Retries; i++ { go func(i int) { s, err := call(i) ch <- response{s, err} }(i) select { case <-ctx.Done(): return nil, errors.Timeout("go.micro.client", fmt.Sprintf("call timeout: %v", ctx.Err())) case rsp := <-ch: // if the call succeeded lets bail early if rsp.err == nil { return rsp.stream, nil } retry, rerr := callOpts.Retry(ctx, request, i, rsp.err) if rerr != nil { return nil, rerr } if !retry { return nil, rsp.err } grr = rsp.err } } return nil, grr }
Publish
Client中的Publish主要是調用broker中的publish:r.opts.Broker.Publish
然而在client的publish函數中,獲取了topic準備了body 最後調用broker的publishoop
func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOption) error { options := PublishOptions{ Context: context.Background(), } for _, o := range opts { o(&options) } md, ok := metadata.FromContext(ctx) if !ok { md = make(map[string]string) } id := uuid.New().String() md["Content-Type"] = msg.ContentType() md["Micro-Topic"] = msg.Topic() md["Micro-Id"] = id // set the topic topic := msg.Topic() // get proxy if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 { options.Exchange = prx } // get the exchange if len(options.Exchange) > 0 { topic = options.Exchange } // encode message body cf, err := r.newCodec(msg.ContentType()) if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } b := &buffer{bytes.NewBuffer(nil)} if err := cf(b).Write(&codec.Message{ Target: topic, Type: codec.Event, Header: map[string]string{ "Micro-Id": id, "Micro-Topic": msg.Topic(), }, }, msg.Payload()); err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } r.once.Do(func() { r.opts.Broker.Connect() }) return r.opts.Broker.Publish(topic, &broker.Message{ Header: md, Body: b.Bytes(), }) }