以太坊源碼分析(51)rpc源碼分析

send back to the client. Otherwise the returned integer is send back to the client.

當返回的error不等於nil的時候,返回的整形值被忽略,error被髮送回客戶端。 不然整形的會返回被髮送回客戶端。

Optional arguments are supported by accepting pointer values as arguments. E.g.
if we want to do the addition in an optional finite field we can accept a mod
argument as pointer value.
經過提供指針類型的參數可使得方法支持可選參數。後面有點看不懂了。

     func (s *CalService) Add(a, b int, mod *int) (int, error)

This RPC method can be called with 2 integers and a null value as third argument.
In that case the mod argument will be nil. Or it can be called with 3 integers,
in that case mod will be pointing to the given third argument. Since the optional
argument is the last argument the RPC package will also accept 2 integers as
arguments. It will pass the mod argument as nil to the RPC method.

RPC方法能夠經過傳兩個integer和一個null值做爲第三個參數來調用。在這種狀況下mod參數會被設置爲nil。或者能夠傳遞三個integer,這樣mod會被設置爲指向第三個參數。儘管可選的參數是最後的參數,RPC包任然接收傳遞兩個integer,這樣mod參數會被設置爲nil。

The server offers the ServeCodec method which accepts a ServerCodec instance. It will
read requests from the codec, process the request and sends the response back to the
client using the codec. The server can execute requests concurrently. Responses
can be sent back to the client out of order.

server提供了ServerCodec方法,這個方法接收ServerCodec實例做爲參數。 服務器會使用codec讀取請求,處理請求,而後經過codec發送迴應給客戶端。server能夠併發的執行請求。response的順序可能和request的順序不一致。

    //An example server which uses the JSON codec:
     type CalculatorService struct {}
    
     func (s *CalculatorService) Add(a, b int) int {
        return a + b
     }
    
     func (s *CalculatorService Div(a, b int) (int, error) {
        if b == 0 {
            return 0, errors.New("divide by zero")
        }
        return a/b, nil
     }
    calculator := new(CalculatorService)
     server := NewServer()
     server.RegisterName("calculator", calculator")
    
     l, _ := net.ListenUnix("unix", &net.UnixAddr{Net: "unix", Name: "/tmp/calculator.sock"})
     for {
        c, _ := l.AcceptUnix()
        codec := v2.NewJSONCodec(c)
        go server.ServeCodec(codec)
     }


The package also supports the publish subscribe pattern through the use of subscriptions.
A method that is considered eligible for notifications must satisfy the following criteria:

- object must be exported
- method must be exported
- first method argument type must be context.Context
- method argument(s) must be exported or builtin types
- method must return the tuple Subscription, error


該軟件包還經過使用訂閱來支持發佈訂閱模式。
被認爲符合通知條件的方法必須知足如下條件:

- 對象必須導出
- 方法必須導出
- 第一個方法參數類型必須是context.Context
- 方法參數必須導出或內置類型
- 方法必須返回元組訂閱,錯誤

An example method:

     func (s *BlockChainService) NewBlocks(ctx context.Context) (Subscription, error) {
        ...
     }

Subscriptions are deleted when:

- the user sends an unsubscribe request
- the connection which was used to create the subscription is closed. This can be initiated
by the client and server. The server will close the connection on an write error or when
the queue of buffered notifications gets too big.

訂閱在下面幾種狀況下會被刪除

- 用戶發送了一個取消訂閱的請求
- 建立訂閱的鏈接被關閉。這種狀況可能由客戶端或者服務器觸發。 服務器在寫入出錯或者是通知隊列長度太大的時候會選擇關閉鏈接。

## RPC包的大體結構
網絡協議 channels和Json格式的請求和迴應的編碼和解碼都是同時與服務端和客戶端打交道的類。網絡協議channels主要提供鏈接和數據傳輸的功能。 json格式的編碼和解碼主要提供請求和迴應的序列化和反序列化功能(Json -> Go的對象)。

![ image]( picture/rpc_1.png)


## 源碼解析

### server.go
server.go主要實現了RPC服務端的核心邏輯。 包括RPC方法的註冊, 讀取請求,處理請求,發送迴應等邏輯。
server的核心數據結構是Server結構體。 services字段是一個map,記錄了全部註冊的方法和類。 run參數是用來控制Server的運行和中止的。 codecs是一個set。 用來存儲全部的編碼解碼器,其實就是全部的鏈接。 codecsMu是用來保護多線程訪問codecs的鎖。

services字段的value類型是service類型。 service表明了一個註冊到Server的實例,是一個對象和方法的組合。 service字段的name表明了service的namespace, typ實例的類型, callbacks是實例的回調方法, subscriptions是實例的訂閱方法。


    type serviceRegistry map[string]*service // collection of services
    type callbacks map[string]*callback // collection of RPC callbacks
    type subscriptions map[string]*callback
    type Server struct {
        services serviceRegistry
    
        run int32
        codecsMu sync.Mutex
        codecs *set.Set
    }
    
    // callback is a method callback which was registered in the server
    type callback struct {
        rcvr reflect.Value // receiver of method
        method reflect.Method // callback
        argTypes []reflect.Type // input argument types
        hasCtx bool // method's first argument is a context (not included in argTypes)
        errPos int // err return idx, of -1 when method cannot return error
        isSubscribe bool // indication if the callback is a subscription
    }
    
    // service represents a registered object
    type service struct {
        name string // name for service
        typ reflect.Type // receiver type
        callbacks callbacks // registered handlers
        subscriptions subscriptions // available subscriptions/notifications
    }


Server的建立,Server建立的時候經過調用server.RegisterName把本身的實例註冊上來,提供一些RPC服務的元信息。
        
    const MetadataApi = "rpc"
    // NewServer will create a new server instance with no registered handlers.
    func NewServer() *Server {
        server := &Server{
            services: make(serviceRegistry),
            codecs: set.New(),
            run: 1,
        }
    
        // register a default service which will provide meta information about the RPC service such as the services and
        // methods it offers.
        rpcService := &RPCService{server}
        server.RegisterName(MetadataApi, rpcService)
    
        return server
    }

服務註冊server.RegisterName,RegisterName方法會經過傳入的參數來建立一個service對象,如過傳入的rcvr實例沒有找到任何合適的方法,那麼會返回錯誤。 若是沒有錯誤,就把建立的service實例加入serviceRegistry。


    // RegisterName will create a service for the given rcvr type under the given name. When no methods on the given rcvr
    // match the criteria to be either a RPC method or a subscription an error is returned. Otherwise a new service is
    // created and added to the service collection this server instance serves.
    func (s *Server) RegisterName(name string, rcvr interface{}) error {
        if s.services == nil {
            s.services = make(serviceRegistry)
        }
    
        svc := new(service)
        svc.typ = reflect.TypeOf(rcvr)
        rcvrVal := reflect.ValueOf(rcvr)
    
        if name == "" {
            return fmt.Errorf("no service name for type %s", svc.typ.String())
        }
        //若是實例的類名不是導出的(類名的首字母大寫),就返回錯誤。
        if !isExported(reflect.Indirect(rcvrVal).Type().Name()) {
            return fmt.Errorf("%s is not exported", reflect.Indirect(rcvrVal).Type().Name())
        }
        //經過反射信息找到合適的callbacks 和subscriptions方法
        methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ)
        //若是這個名字當前已經被註冊過了,那麼若是有同名的方法就用新的替代,否者直接插入。
        // already a previous service register under given sname, merge methods/subscriptions
        if regsvc, present := s.services[name]; present {
            if len(methods) == 0 && len(subscriptions) == 0 {
                return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr)
            }
            for _, m := range methods {
                regsvc.callbacks[formatName(m.method.Name)] = m
            }
            for _, s := range subscriptions {
                regsvc.subscriptions[formatName(s.method.Name)] = s
            }
            return nil
        }
    
        svc.name = name
        svc.callbacks, svc.subscriptions = methods, subscriptions
    
        if len(svc.callbacks) == 0 && len(svc.subscriptions) == 0 {
            return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr)
        }
    
        s.services[svc.name] = svc
        return nil
    }

經過反射信息找出合適的方法,suitableCallbacks,這個方法在utils.go裏面。 這個方法會遍歷這個類型的全部方法,找到適配RPC callback或者subscription callback類型標準的方法並返回。關於RPC的標準,請參考文檔開頭的RPC標準。

    // suitableCallbacks iterates over the methods of the given type. It will determine if a method satisfies the criteria
    // for a RPC callback or a subscription callback and adds it to the collection of callbacks or subscriptions. See server
    // documentation for a summary of these criteria.
    func suitableCallbacks(rcvr reflect.Value, typ reflect.Type) (callbacks, subscriptions) {
        callbacks := make(callbacks)
        subscriptions := make(subscriptions)
    
    METHODS:
        for m := 0; m < typ.NumMethod(); m++ {
            method := typ.Method(m)
            mtype := method.Type
            mname := formatName(method.Name)
            if method.PkgPath != "" { // method must be exported
                continue
            }
    
            var h callback
            h.isSubscribe = isPubSub(mtype)
            h.rcvr = rcvr
            h.method = method
            h.errPos = -1
    
            firstArg := 1
            numIn := mtype.NumIn()
            if numIn >= 2 && mtype.In(1) == contextType {
                h.hasCtx = true
                firstArg = 2
            }
    
            if h.isSubscribe {
                h.argTypes = make([]reflect.Type, numIn-firstArg) // skip rcvr type
                for i := firstArg; i < numIn; i++ {
                    argType := mtype.In(i)
                    if isExportedOrBuiltinType(argType) {
                        h.argTypes[i-firstArg] = argType
                    } else {
                        continue METHODS
                    }
                }
    
                subscriptions[mname] = &h
                continue METHODS
            }
    
            // determine method arguments, ignore first arg since it's the receiver type
            // Arguments must be exported or builtin types
            h.argTypes = make([]reflect.Type, numIn-firstArg)
            for i := firstArg; i < numIn; i++ {
                argType := mtype.In(i)
                if !isExportedOrBuiltinType(argType) {
                    continue METHODS
                }
                h.argTypes[i-firstArg] = argType
            }
    
            // check that all returned values are exported or builtin types
            for i := 0; i < mtype.NumOut(); i++ {
                if !isExportedOrBuiltinType(mtype.Out(i)) {
                    continue METHODS
                }
            }
    
            // when a method returns an error it must be the last returned value
            h.errPos = -1
            for i := 0; i < mtype.NumOut(); i++ {
                if isErrorType(mtype.Out(i)) {
                    h.errPos = i
                    break
                }
            }
    
            if h.errPos >= 0 && h.errPos != mtype.NumOut()-1 {
                continue METHODS
            }
    
            switch mtype.NumOut() {
            case 0, 1, 2:
                if mtype.NumOut() == 2 && h.errPos == -1 { // method must one return value and 1 error
                    continue METHODS
                }
                callbacks[mname] = &h
            }
        }
    
        return callbacks, subscriptions
    }


server啓動和服務, server的啓動和服務這裏參考ipc.go中的一部分代碼。能夠看到每Accept()一個連接,就啓動一個goroutine調用srv.ServeCodec來進行服務,這裏也能夠看出JsonCodec的功能,Codec相似於裝飾器模式,在鏈接外面包了一層。Codec會放在後續來介紹,這裏先簡單瞭解一下。

    func (srv *Server) ServeListener(l net.Listener) error {
        for {
            conn, err := l.Accept()
            if err != nil {
                return err
            }
            log.Trace(fmt.Sprint("accepted conn", conn.RemoteAddr()))
            go srv.ServeCodec(NewJSONCodec(conn), OptionMethodInvocation|OptionSubscriptions)
        }
    }

ServeCodec, 這個方法很簡單,提供了codec.Close的關閉功能。 serveRequest的第二個參數singleShot是控制長鏈接仍是短鏈接的參數,若是singleShot爲真,那麼處理完一個請求以後會退出。 不過我們的serveRequest方法是一個死循環,不遇到異常,或者客戶端主動關閉,服務端是不會關閉的。 因此rpc提供的是長鏈接的功能。

    // ServeCodec reads incoming requests from codec, calls the appropriate callback and writes the
    // response back using the given codec. It will block until the codec is closed or the server is
    // stopped. In either case the codec is closed.
    func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) {
        defer codec.Close()
        s.serveRequest(codec, false, options)
    }

咱們的重磅方法終於出場,serveRequest 這個方法就是Server的主要處理流程。從codec讀取請求,找到對應的方法並調用,而後把迴應寫入codec。

部分標準庫的代碼能夠參考網上的使用教程, sync.WaitGroup 實現了一個信號量的功能。 Context實現上下文管理。

    
    // serveRequest will reads requests from the codec, calls the RPC callback and
    // writes the response to the given codec.
    //
    // If singleShot is true it will process a single request, otherwise it will handle
    // requests until the codec returns an error when reading a request (in most cases
    // an EOF). It executes requests in parallel when singleShot is false.
    func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecOption) error {
        var pend sync.WaitGroup
        defer func() {
            if err := recover(); err != nil {
                const size = 64 << 10
                buf := make([]byte, size)
                buf = buf[:runtime.Stack(buf, false)]
                log.Error(string(buf))
            }
            s.codecsMu.Lock()
            s.codecs.Remove(codec)
            s.codecsMu.Unlock()
        }()
    
        ctx, cancel := context.WithCancel(context.Background())
        defer cancel()
    
        // if the codec supports notification include a notifier that callbacks can use
        // to send notification to clients. It is thight to the codec/connection. If the
        // connection is closed the notifier will stop and cancels all active subscriptions.
        if options&OptionSubscriptions == OptionSubscriptions {
            ctx = context.WithValue(ctx, notifierKey{}, newNotifier(codec))
        }
        s.codecsMu.Lock()
        if atomic.LoadInt32(&s.run) != 1 { // server stopped
            s.codecsMu.Unlock()
            return &shutdownError{}
        }
        s.codecs.Add(codec)
        s.codecsMu.Unlock()
    
        // test if the server is ordered to stop
        for atomic.LoadInt32(&s.run) == 1 {
            reqs, batch, err := s.readRequest(codec)
            if err != nil {
                // If a parsing error occurred, send an error
                if err.Error() != "EOF" {
                    log.Debug(fmt.Sprintf("read error %v\n", err))
                    codec.Write(codec.CreateErrorResponse(nil, err))
                }
                // Error or end of stream, wait for requests and tear down
                //這裏主要是考慮多線程處理的時候等待全部的request處理完畢,
                //每啓動一個go線程會調用pend.Add(1)。
                //處理完成後調用pend.Done()會減去1。當爲0的時候,Wait()方法就會返回。
                pend.Wait()
                return nil
            }
    
            // check if server is ordered to shutdown and return an error
            // telling the client that his request failed.
            if atomic.LoadInt32(&s.run) != 1 {
                err = &shutdownError{}
                if batch {
                    resps := make([]interface{}, len(reqs))
                    for i, r := range reqs {
                        resps[i] = codec.CreateErrorResponse(&r.id, err)
                    }
                    codec.Write(resps)
                } else {
                    codec.Write(codec.CreateErrorResponse(&reqs[0].id, err))
                }
                return nil
            }
            // If a single shot request is executing, run and return immediately
            //若是隻執行一次,那麼執行完成後返回。
            if singleShot {
                if batch {
                    s.execBatch(ctx, codec, reqs)
                } else {
                    s.exec(ctx, codec, reqs[0])
                }
                return nil
            }
            // For multi-shot connections, start a goroutine to serve and loop back
            pend.Add(1)
            //啓動線程對請求進行服務。
            go func(reqs []*serverRequest, batch bool) {
                defer pend.Done()
                if batch {
                    s.execBatch(ctx, codec, reqs)
                } else {
                    s.exec(ctx, codec, reqs[0])
                }
            }(reqs, batch)
        }
        return nil
    }


readRequest方法,從codec讀取請求,而後根據請求查找對應的方法組裝成requests對象。
rpcRequest是codec返回的請求類型。j   

    type rpcRequest struct {
        service string
        method string
        id interface{}
        isPubSub bool
        params interface{}
        err Error // invalid batch element
    }

serverRequest進行處理以後返回的request

    // serverRequest is an incoming request
    type serverRequest struct {
        id interface{}
        svcname string
        callb *callback
        args []reflect.Value
        isUnsubscribe bool
        err Error
    }

readRequest方法,從codec讀取請求,對請求進行處理生成serverRequest對象返回。

    // readRequest requests the next (batch) request from the codec. It will return the collection
    // of requests, an indication if the request was a batch, the invalid request identifier and an
    // error when the request could not be read/parsed.
    func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, Error) {
        reqs, batch, err := codec.ReadRequestHeaders()
        if err != nil {
            return nil, batch, err
        }
        requests := make([]*serverRequest, len(reqs))
        // 根據reqs構建requests
        // verify requests
        for i, r := range reqs {
            var ok bool
            var svc *service
    
            if r.err != nil {
                requests[i] = &serverRequest{id: r.id, err: r.err}
                continue
            }
            //若是請求是發送/訂閱方面的請求,並且方法名稱有_unsubscribe後綴。
            if r.isPubSub && strings.HasSuffix(r.method, unsubscribeMethodSuffix) {
                requests[i] = &serverRequest{id: r.id, isUnsubscribe: true}
                argTypes := []reflect.Type{reflect.TypeOf("")} // expect subscription id as first arg
                if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil {
                    requests[i].args = args
                } else {
                    requests[i].err = &invalidParamsError{err.Error()}
                }
                continue
            }
            //若是沒有註冊這個服務。
            if svc, ok = s.services[r.service]; !ok { // rpc method isn't available
                requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}}
                continue
            }
            //若是是發佈和訂閱模式。 調用訂閱方法。
            if r.isPubSub { // eth_subscribe, r.method contains the subscription method name
                if callb, ok := svc.subscriptions[r.method]; ok {
                    requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb}
                    if r.params != nil && len(callb.argTypes) > 0 {
                        argTypes := []reflect.Type{reflect.TypeOf("")}
                        argTypes = append(argTypes, callb.argTypes...)
                        if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil {
                            requests[i].args = args[1:] // first one is service.method name which isn't an actual argument
                        } else {
                            requests[i].err = &invalidParamsError{err.Error()}
                        }
                    }
                } else {
                    requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.method, r.method}}
                }
                continue
            }
    
            if callb, ok := svc.callbacks[r.method]; ok { // lookup RPC method
                requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb}
                if r.params != nil && len(callb.argTypes) > 0 {
                    if args, err := codec.ParseRequestArguments(callb.argTypes, r.params); err == nil {
                        requests[i].args = args
                    } else {
                        requests[i].err = &invalidParamsError{err.Error()}
                    }
                }
                continue
            }
    
            requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}}
        }
    
        return requests, batch, nil
    }

exec和execBatch方法,調用s.handle方法對request進行處理。

    // exec executes the given request and writes the result back using the codec.
    func (s *Server) exec(ctx context.Context, codec ServerCodec, req *serverRequest) {
        var response interface{}
        var callback func()
        if req.err != nil {
            response = codec.CreateErrorResponse(&req.id, req.err)
        } else {
            response, callback = s.handle(ctx, codec, req)
        }
    
        if err := codec.Write(response); err != nil {
            log.Error(fmt.Sprintf("%v\n", err))
            codec.Close()
        }
    
        // when request was a subscribe request this allows these subscriptions to be actived
        if callback != nil {
            callback()
        }
    }
    
    // execBatch executes the given requests and writes the result back using the codec.
    // It will only write the response back when the last request is processed.
    func (s *Server) execBatch(ctx context.Context, codec ServerCodec, requests []*serverRequest) {
        responses := make([]interface{}, len(requests))
        var callbacks []func()
        for i, req := range requests {
            if req.err != nil {
                responses[i] = codec.CreateErrorResponse(&req.id, req.err)
            } else {
                var callback func()
                if responses[i], callback = s.handle(ctx, codec, req); callback != nil {
                    callbacks = append(callbacks, callback)
                }
            }
        }
    
        if err := codec.Write(responses); err != nil {
            log.Error(fmt.Sprintf("%v\n", err))
            codec.Close()
        }
    
        // when request holds one of more subscribe requests this allows these subscriptions to be activated
        for _, c := range callbacks {
            c()
        }
    }

handle方法,執行一個request,而後返回response
    
    // handle executes a request and returns the response from the callback.
    func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverRequest) (interface{}, func()) {
        if req.err != nil {
            return codec.CreateErrorResponse(&req.id, req.err), nil
        }
        //若是是取消訂閱的消息。NotifierFromContext(ctx)獲取以前咱們存入ctx的notifier。
        if req.isUnsubscribe { // cancel subscription, first param must be the subscription id
            if len(req.args) >= 1 && req.args[0].Kind() == reflect.String {
                notifier, supported := NotifierFromContext(ctx)
                if !supported { // interface doesn't support subscriptions (e.g. http)
                    return codec.CreateErrorResponse(&req.id, &callbackError{ErrNotificationsUnsupported.Error()}), nil
                }
    
                subid := ID(req.args[0].String())
                if err := notifier.unsubscribe(subid); err != nil {
                    return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil
                }
    
                return codec.CreateResponse(req.id, true), nil
            }
            return codec.CreateErrorResponse(&req.id, &invalidParamsError{"Expected subscription id as first argument"}), nil
        }
        //若是是訂閱消息。 那麼建立訂閱。並激活訂閱。
        if req.callb.isSubscribe {
            subid, err := s.createSubscription(ctx, codec, req)
            if err != nil {
                return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil
            }
    
            // active the subscription after the sub id was successfully sent to the client
            activateSub := func() {
                notifier, _ := NotifierFromContext(ctx)
                notifier.activate(subid, req.svcname)
            }
    
            return codec.CreateResponse(req.id, subid), activateSub
        }
    
        // regular RPC call, prepare arguments
        if len(req.args) != len(req.callb.argTypes) {
            rpcErr := &invalidParamsError{fmt.Sprintf("%s%s%s expects %d parameters, got %d",
                req.svcname, serviceMethodSeparator, req.callb.method.Name,
                len(req.callb.argTypes), len(req.args))}
            return codec.CreateErrorResponse(&req.id, rpcErr), nil
        }
    
        arguments := []reflect.Value{req.callb.rcvr}
        if req.callb.hasCtx {
            arguments = append(arguments, reflect.ValueOf(ctx))
        }
        if len(req.args) > 0 {
            arguments = append(arguments, req.args...)
        }
        //調用提供的rpc方法,並獲取reply
        // execute RPC method and return result
        reply := req.callb.method.Func.Call(arguments)
        if len(reply) == 0 {
            return codec.CreateResponse(req.id, nil), nil
        }
    
        if req.callb.errPos >= 0 { // test if method returned an error
            if !reply[req.callb.errPos].IsNil() {
                e := reply[req.callb.errPos].Interface().(error)
                res := codec.CreateErrorResponse(&req.id, &callbackError{e.Error()})
                return res, nil
            }
        }
        return codec.CreateResponse(req.id, reply[0].Interface()), nil
    }

### subscription.go 發佈訂閱模式。
在以前的server.go中就有出現了一些發佈訂閱模式的代碼, 在這裏集中闡述一下。

咱們在serveRequest的代碼中,就有這樣的代碼。

    若是codec支持, 能夠經過一個叫notifier的對象執行回調函數發送消息給客戶端。
    他和codec/connection關係很緊密。 若是鏈接被關閉,那麼notifier會關閉,並取消掉全部激活的訂閱。
    // if the codec supports notification include a notifier that callbacks can use
    // to send notification to clients. It is thight to the codec/connection. If the
    // connection is closed the notifier will stop and cancels all active subscriptions.
    if options&OptionSubscriptions == OptionSubscriptions {
        ctx = context.WithValue(ctx, notifierKey{}, newNotifier(codec))
    }

在服務一個客戶端鏈接時候,調用newNotifier方法建立了一個notifier對象存儲到ctx中。能夠觀察到Notifier對象保存了codec的實例,也就是說Notifier對象保存了網絡鏈接,用來在須要的時候發送數據。

    // newNotifier creates a new notifier that can be used to send subscription
    // notifications to the client.
    func newNotifier(codec ServerCodec) *Notifier {
        return &Notifier{
            codec: codec,
            active: make(map[ID]*Subscription),
            inactive: make(map[ID]*Subscription),
        }
    }

而後在handle方法中, 咱們處理一類特殊的方法,這種方法被標識爲isSubscribe. 調用createSubscription方法建立了了一個Subscription並調用notifier.activate方法存儲到notifier的激活隊列裏面。 代碼裏面有一個技巧。 這個方法調用完成後並無直接激活subscription,而是把激活部分的代碼做爲一個函數返回回去。而後在exec或者execBatch代碼裏面等待codec.CreateResponse(req.id, subid)這個response被髮送給客戶端以後被調用。避免客戶端尚未收到subscription ID的時候就收到了subscription信息。

    if req.callb.isSubscribe {
        subid, err := s.createSubscription(ctx, codec, req)
        if err != nil {
            return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil
        }

        // active the subscription after the sub id was successfully sent to the client
        activateSub := func() {
            notifier, _ := NotifierFromContext(ctx)
            notifier.activate(subid, req.svcname)
        }

        return codec.CreateResponse(req.id, subid), activateSub
    }

createSubscription方法會調用指定的註冊上來的方法,並獲得迴應。

    // createSubscription will call the subscription callback and returns the subscription id or error.
    func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *serverRequest) (ID, error) {
        // subscription have as first argument the context following optional arguments
        args := []reflect.Value{req.callb.rcvr, reflect.ValueOf(ctx)}
        args = append(args, req.args...)
        reply := req.callb.method.Func.Call(args)
    
        if !reply[1].IsNil() { // subscription creation failed
            return "", reply[1].Interface().(error)
        }
    
        return reply[0].Interface().(*Subscription).ID, nil
    }

在來看看咱們的activate方法,這個方法激活了subscription。 subscription在subscription ID被髮送給客戶端以後被激活,避免客戶端尚未收到subscription ID的時候就收到了subscription信息。
    
    // activate enables a subscription. Until a subscription is enabled all
    // notifications are dropped. This method is called by the RPC server after
    // the subscription ID was sent to client. This prevents notifications being
    // send to the client before the subscription ID is send to the client.
    func (n *Notifier) activate(id ID, namespace string) {
        n.subMu.Lock()
        defer n.subMu.Unlock()
        if sub, found := n.inactive[id]; found {
            sub.namespace = namespace
            n.active[id] = sub
            delete(n.inactive, id)
        }
    }

咱們再來看一個取消訂閱的函數

    // unsubscribe a subscription.
    // If the subscription could not be found ErrSubscriptionNotFound is returned.
    func (n *Notifier) unsubscribe(id ID) error {
        n.subMu.Lock()
        defer n.subMu.Unlock()
        if s, found := n.active[id]; found {
            close(s.err)
            delete(n.active, id)
            return nil
        }
        return ErrSubscriptionNotFound
    }

最後是一個發送訂閱的函數,調用這個函數把數據發送到客戶端, 這個也比較簡單。

    // Notify sends a notification to the client with the given data as payload.
    // If an error occurs the RPC connection is closed and the error is returned.
    func (n *Notifier) Notify(id ID, data interface{}) error {
        n.subMu.RLock()
        defer n.subMu.RUnlock()
    
        sub, active := n.active[id]
        if active {
            notification := n.codec.CreateNotification(string(id), sub.namespace, data)
            if err := n.codec.Write(notification); err != nil {
                n.codec.Close()
                return err
            }
        }
        return nil
    }


如何使用建議經過subscription_test.go的TestNotifications來查看完整的流程。

### client.go RPC客戶端源碼分析。

客戶端的主要功能是把請求發送到服務端,而後接收回應,再把迴應傳遞給調用者。

客戶端的數據結構

    // Client represents a connection to an RPC server.
    type Client struct {
        idCounter uint32
        //生成鏈接的函數,客戶端會調用這個函數生成一個網絡鏈接對象。
        connectFunc func(ctx context.Context) (net.Conn, error)
        //HTTP協議和非HTTP協議有不一樣的處理流程, HTTP協議不支持長鏈接, 只支持一個請求對應一個迴應的這種模式,同時也不支持發佈/訂閱模式。
        isHTTP bool
    
        // writeConn is only safe to access outside dispatch, with the
        // write lock held. The write lock is taken by sending on
        // requestOp and released by sending on sendDone.
        //經過這裏的註釋能夠看到,writeConn是調用這用來寫入請求的網絡鏈接對象,
        //只有在dispatch方法外面調用纔是安全的,並且須要經過給requestOp隊列發送請求來獲取鎖,
        //獲取鎖以後就能夠把請求寫入網絡,寫入完成後發送請求給sendDone隊列來釋放鎖,供其它的請求使用。
        writeConn net.Conn
    
        // for dispatch
        //下面有不少的channel,channel通常來講是goroutine之間用來通訊的通道,後續會隨着代碼介紹channel是如何使用的。
        close chan struct{}
        didQuit chan struct{} // closed when client quits
        reconnected chan net.Conn // where write/reconnect sends the new connection
        readErr chan error // errors from read
        readResp chan []*jsonrpcMessage // valid messages from read
        requestOp chan *requestOp // for registering response IDs
        sendDone chan error // signals write completion, releases write lock
        respWait map[string]*requestOp // active requests
        subs map[string]*ClientSubscription // active subscriptions
    }


newClient, 新建一個客戶端。 經過調用connectFunc方法來獲取一個網絡鏈接,若是網絡鏈接是httpConn對象的化,那麼isHTTP設置爲true。而後是對象的初始化, 若是是HTTP鏈接的化,直接返回,否者就啓動一個goroutine調用dispatch方法。 dispatch方法是整個client的指揮中心,經過上面提到的channel來和其餘的goroutine來進行通訊,獲取信息,根據信息作出各類決策。後續會詳細介紹dispatch。 由於HTTP的調用方式很是簡單, 這裏先對HTTP的方式作一個簡單的闡述。

    
    func newClient(initctx context.Context, connectFunc func(context.Context) (net.Conn, error)) (*Client, error) {
        conn, err := connectFunc(initctx)
        if err != nil {
            return nil, err
        }
        _, isHTTP := conn.(*httpConn)
    
        c := &Client{
            writeConn: conn,
            isHTTP: isHTTP,
            connectFunc: connectFunc,
            close: make(chan struct{}),
            didQuit: make(chan struct{}),
            reconnected: make(chan net.Conn),
            readErr: make(chan error),
            readResp: make(chan []*jsonrpcMessage),
            requestOp: make(chan *requestOp),
            sendDone: make(chan error, 1),
            respWait: make(map[string]*requestOp),
            subs: make(map[string]*ClientSubscription),
        }
        if !isHTTP {
            go c.dispatch(conn)
        }
        return c, nil
    }


請求調用經過調用client的 Call方法來進行RPC調用。

    // Call performs a JSON-RPC call with the given arguments and unmarshals into
    // result if no error occurred.
    //
    // The result must be a pointer so that package json can unmarshal into it. You
    // can also pass nil, in which case the result is ignored.
    返回值必須是一個指針,這樣才能把json值轉換成對象。 若是你不關心返回值,也能夠經過傳nil來忽略。
    func (c *Client) Call(result interface{}, method string, args ...interface{}) error {
        ctx := context.Background()
        return c.CallContext(ctx, result, method, args...)
    }
    
    func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
        msg, err := c.newMessage(method, args...)
        if err != nil {
            return err
        }
        //構建了一個requestOp對象。 resp是讀取返回的隊列,隊列的長度是1。
        op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)}
        
        if c.isHTTP {
            err = c.sendHTTP(ctx, op, msg)
        } else {
            err = c.send(ctx, op, msg)
        }
        if err != nil {
            return err
        }
    
        // dispatch has accepted the request and will close the channel it when it quits.
        switch resp, err := op.wait(ctx); {
        case err != nil:
            return err
        case resp.Error != nil:
            return resp.Error
        case len(resp.Result) == 0:
            return ErrNoResult
        default:
            return json.Unmarshal(resp.Result, &result)
        }
    }

sendHTTP,這個方法直接調用doRequest方法進行請求拿到迴應。而後寫入到resp隊列就返回了。

    func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg interface{}) error {
        hc := c.writeConn.(*httpConn)
        respBody, err := hc.doRequest(ctx, msg)
        if err != nil {
            return err
        }
        defer respBody.Close()
        var respmsg jsonrpcMessage
        if err := json.NewDecoder(respBody).Decode(&respmsg); err != nil {
            return err
        }
        op.resp <- &respmsg
        return nil
    }


在看看上面的另外一個方法 op.wait()方法,這個方法會查看兩個隊列的信息。若是是http那麼從resp隊列獲取到迴應就會直接返回。 這樣整個HTTP的請求過程就完成了。 中間沒有涉及到多線程問題,都在一個線程內部完成了。

    func (op *requestOp) wait(ctx context.Context) (*jsonrpcMessage, error) {
        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        case resp := <-op.resp:
            return resp, op.err
        }
    }

若是不是HTTP請求呢。 那處理的流程就比較複雜了, 還記得若是不是HTTP請求。在newClient的時候是啓動了一個goroutine 調用了dispatch方法。 咱們先看非http的 send方法。

從註釋來看。 這個方法把op寫入到requestOp這個隊列,注意的是這個隊列是沒有緩衝區的,也就是說若是這個時候這個隊列沒有人處理的化,這個調用是會阻塞在這裏的。 這就至關於一把鎖,若是發送op到requestOp成功了就拿到了鎖,能夠繼續下一步,下一步是調用write方法把請求的所有內容發送到網絡上。而後發送消息給sendDone隊列。sendDone能夠當作是鎖的釋放,後續在dispatch方法裏面會詳細分析這個過程。 而後返回。返回以後方法會阻塞在op.wait方法裏面。直到從op.resp隊列收到一個迴應,或者是收到一個ctx.Done()消息(這個消息通常會在完成或者是強制退出的時候獲取到。)

    // send registers op with the dispatch loop, then sends msg on the connection.
    // if sending fails, op is deregistered.
    func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error {
        select {
        case c.requestOp <- op:
            log.Trace("", "msg", log.Lazy{Fn: func() string {
                return fmt.Sprint("sending ", msg)
            }})
            err := c.write(ctx, msg)
            c.sendDone <- err
            return err
        case <-ctx.Done():
            // This can happen if the client is overloaded or unable to keep up with
            // subscription notifications.
            return ctx.Err()
        case <-c.didQuit:
            //已經退出,可能被調用了Close
            return ErrClientQuit
        }
    }

dispatch方法
    

    // dispatch is the main loop of the client.
    // It sends read messages to waiting calls to Call and BatchCall
    // and subscription notifications to registered subscriptions.
    func (c *Client) dispatch(conn net.Conn) {
        // Spawn the initial read loop.
        go c.read(conn)
    
        var (
            lastOp *requestOp // tracks last send operation
            requestOpLock = c.requestOp // nil while the send lock is held
            reading = true // if true, a read loop is running
        )
        defer close(c.didQuit)
        defer func() {
            c.closeRequestOps(ErrClientQuit)
            conn.Close()
            if reading {
                // Empty read channels until read is dead.
                for {
                    select {
                    case <-c.readResp:
                    case <-c.readErr:
                        return
                    }
                }
            }
        }()
    
        for {
            select {
            case <-c.close:
                return
    
            // Read path.
            case batch := <-c.readResp:
                //讀取到一個迴應。調用相應的方法處理
                for _, msg := range batch {
                    switch {
                    case msg.isNotification():
                        log.Trace("", "msg", log.Lazy{Fn: func() string {
                            return fmt.Sprint("<-readResp: notification ", msg)
                        }})
                        c.handleNotification(msg)
                    case msg.isResponse():
                        log.Trace("", "msg", log.Lazy{Fn: func() string {
                            return fmt.Sprint("<-readResp: response ", msg)
                        }})
                        c.handleResponse(msg)
                    default:
                        log.Debug("", "msg", log.Lazy{Fn: func() string {
                            return fmt.Sprint("<-readResp: dropping weird message", msg)
                        }})
                        // TODO: maybe close
                    }
                }
    
            case err := <-c.readErr:
                //接收到讀取失敗信息,這個是read線程傳遞過來的。
                log.Debug(fmt.Sprintf("<-readErr: %v", err))
                c.closeRequestOps(err)
                conn.Close()
                reading = false
    
            case newconn := <-c.reconnected:
                //接收到一個重鏈接信息
                log.Debug(fmt.Sprintf("<-reconnected: (reading=%t) %v", reading, conn.RemoteAddr()))
                if reading {
                    //等待以前的鏈接讀取完成。
                    // Wait for the previous read loop to exit. This is a rare case.
                    conn.Close()
                    <-c.readErr
                }
                //開啓閱讀的goroutine
                go c.read(newconn)
                reading = true
                conn = newconn
    
            // Send path.
            case op := <-requestOpLock:
                // Stop listening for further send ops until the current one is done.
                //接收到一個requestOp消息,那麼設置requestOpLock爲空,
                //這個時候若是有其餘人也但願發送op到requestOp,會由於沒有人處理而阻塞。
                requestOpLock = nil
                lastOp = op
                //把這個op加入等待隊列。
                for _, id := range op.ids {
                    c.respWait[string(id)] = op
                }
    
            case err := <-c.sendDone:
                //當op的請求信息已經發送到網絡上。會發送信息到sendDone。若是發送過程出錯,那麼err !=nil。
                if err != nil {
                    // Remove response handlers for the last send. We remove those here
                    // because the error is already handled in Call or BatchCall. When the
                    // read loop goes down, it will signal all other current operations.
                    //把全部的id從等待隊列刪除。
                    for _, id := range lastOp.ids {
                        delete(c.respWait, string(id))
                    }
                }
                // Listen for send ops again.
                //從新開始處理requestOp的消息。
                requestOpLock = c.requestOp
                lastOp = nil
            }
        }
    }


下面經過下面這種圖來講明dispatch的主要流程。下面圖片中圓形是線程。 藍色矩形是channel。 箭頭表明了channel的數據流動方向。

![ image]( picture/rpc_2.png)

- 多線程串行發送請求到網絡上的流程 首先發送requestOp請求到dispatch獲取到鎖, 而後把請求信息寫入到網絡,而後發送sendDone信息到dispatch解除鎖。 經過requestOp和sendDone這兩個channel以及dispatch代碼的配合完成了串行的發送請求到網絡上的功能。
- 讀取返回信息而後返回給調用者的流程。 把請求信息發送到網絡上以後, 內部的goroutine read會持續不斷的從網絡上讀取信息。 read讀取到返回信息以後,經過readResp隊列發送給dispatch。 dispatch查找到對應的調用者,而後把返回信息寫入調用者的resp隊列中。完成返回信息的流程。
- 重鏈接流程。 重鏈接在外部調用者寫入失敗的狀況下被外部調用者主動調用。 調用完成後發送新的鏈接給dispatch。 dispatch收到新的鏈接以後,會終止以前的鏈接,而後啓動新的read goroutine來重新的鏈接上讀取信息。
- 關閉流程。 調用者調用Close方法,Close方法會寫入信息到close隊列。 dispatch接收到close信息以後。 關閉didQuit隊列,關閉鏈接,等待read goroutine中止。 全部等待在didQuit隊列上面的客戶端調用所有返回。


#### 客戶端 訂閱模式的特殊處理
上面提到的主要流程是方法調用的流程。 以太坊的RPC框架還支持發佈和訂閱的模式。

咱們先看看訂閱的方法,以太坊提供了幾種主要service的訂閱方式(EthSubscribe ShhSubscribe).同時也提供了自定義服務的訂閱方法(Subscribe),


    // EthSubscribe registers a subscripion under the "eth" namespace.
    func (c *Client) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) {
        return c.Subscribe(ctx, "eth", channel, args...)
    }
    
    // ShhSubscribe registers a subscripion under the "shh" namespace.
    func (c *Client) ShhSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) {
        return c.Subscribe(ctx, "shh", channel, args...)
    }
    
    // Subscribe calls the "<namespace>_subscribe" method with the given arguments,
    // registering a subscription. Server notifications for the subscription are
    // sent to the given channel. The element type of the channel must match the
    // expected type of content returned by the subscription.
    //
    // The context argument cancels the RPC request that sets up the subscription but has no
    // effect on the subscription after Subscribe has returned.
    //
    // Slow subscribers will be dropped eventually. Client buffers up to 8000 notifications
    // before considering the subscriber dead. The subscription Err channel will receive
    // ErrSubscriptionQueueOverflow. Use a sufficiently large buffer on the channel or ensure
    // that the channel usually has at least one reader to prevent this issue.
    //Subscribe會使用傳入的參數調用"<namespace>_subscribe"方法來訂閱指定的消息。
    //服務器的通知會寫入channel參數指定的隊列。 channel參數必須和返回的類型相同。
    //ctx參數能夠用來取消RPC的請求,可是若是訂閱已經完成就不會有效果了。
    //處理速度太慢的訂閱者的消息會被刪除,每一個客戶端有8000個消息的緩存。
    func (c *Client) Subscribe(ctx context.Context, namespace string, channel interface{}, args ...interface{}) (*ClientSubscription, error) {
        // Check type of channel first.
        chanVal := reflect.ValueOf(channel)
        if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 {
            panic("first argument to Subscribe must be a writable channel")
        }
        if chanVal.IsNil() {
            panic("channel given to Subscribe must not be nil")
        }
        if c.isHTTP {
            return nil, ErrNotificationsUnsupported
        }
    
        msg, err := c.newMessage(namespace+subscribeMethodSuffix, args...)
        if err != nil {
            return nil, err
        }
        //requestOp的參數和Call調用的不同。 多了一個參數sub.
        op := &requestOp{
            ids: []json.RawMessage{msg.ID},
            resp: make(chan *jsonrpcMessage),
            sub: newClientSubscription(c, namespace, chanVal),
        }
    
        // Send the subscription request.
        // The arrival and validity of the response is signaled on sub.quit.
        if err := c.send(ctx, op, msg); err != nil {
            return nil, err
        }
        if _, err := op.wait(ctx); err != nil {
            return nil, err
        }
        return op.sub, nil
    }

newClientSubscription方法,這個方法建立了一個新的對象ClientSubscription,這個對象把傳入的channel參數保存起來。 而後本身又建立了三個chan對象。後續會對詳細介紹這三個chan對象


    func newClientSubscription(c *Client, namespace string, channel reflect.Value) *ClientSubscription {
        sub := &ClientSubscription{
            client: c,
            namespace: namespace,
            etype: channel.Type().Elem(),
            channel: channel,
            quit: make(chan struct{}),
            err: make(chan error, 1),
            in: make(chan json.RawMessage),
        }
        return sub
    }

從上面的代碼能夠看出。訂閱過程根Call過程差很少,構建一個訂閱請求。調用send發送到網絡上,而後等待返回。 咱們經過dispatch對返回結果的處理來看看訂閱和Call的不一樣。


    func (c *Client) handleResponse(msg *jsonrpcMessage) {
        op := c.respWait[string(msg.ID)]
        if op == nil {
            log.Debug(fmt.Sprintf("unsolicited response %v", msg))
            return
        }
        delete(c.respWait, string(msg.ID))
        // For normal responses, just forward the reply to Call/BatchCall.
        若是op.sub是nil,普通的RPC請求,這個字段的值是空白的,只有訂閱請求才有值。
        if op.sub == nil {
            op.resp <- msg
            return
        }
        // For subscription responses, start the subscription if the server
        // indicates success. EthSubscribe gets unblocked in either case through
        // the op.resp channel.
        defer close(op.resp)
        if msg.Error != nil {
            op.err = msg.Error
            return
        }
        if op.err = json.Unmarshal(msg.Result, &op.sub.subid); op.err == nil {
            //啓動一個新的goroutine 並把op.sub.subid記錄起來。
            go op.sub.start()
            c.subs[op.sub.subid] = op.sub
        }
    }


op.sub.start方法。 這個goroutine專門用來處理訂閱消息。主要的功能是從in隊列裏面獲取訂閱消息,而後把訂閱消息放到buffer裏面。 若是可以數據可以發送。就從buffer裏面發送一些數據給用戶傳入的那個channel。 若是buffer超過指定的大小,就丟棄。

    
    func (sub *ClientSubscription) start() {
        sub.quitWithError(sub.forward())
    }
    
    func (sub *ClientSubscription) forward() (err error, unsubscribeServer bool) {
        cases := []reflect.SelectCase{
            {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)},
            {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.in)},
            {Dir: reflect.SelectSend, Chan: sub.channel},
        }
        buffer := list.New()
        defer buffer.Init()
        for {
            var chosen int
            var recv reflect.Value
            if buffer.Len() == 0 {
                // Idle, omit send case.
                chosen, recv, _ = reflect.Select(cases[:2])
            } else {
                // Non-empty buffer, send the first queued item.
                cases[2].Send = reflect.ValueOf(buffer.Front().Value)
                chosen, recv, _ = reflect.Select(cases)
            }
    
            switch chosen {
            case 0: // <-sub.quit
                return nil, false
            case 1: // <-sub.in
                val, err := sub.unmarshal(recv.Interface().(json.RawMessage))
                if err != nil {
                    return err, true
                }
                if buffer.Len() == maxClientSubscriptionBuffer {
                    return ErrSubscriptionQueueOverflow, true
                }
                buffer.PushBack(val)
            case 2: // sub.channel<-
                cases[2].Send = reflect.Value{} // Don't hold onto the value.
                buffer.Remove(buffer.Front())
            }
        }
    }


當接收到一條Notification消息的時候會調用handleNotification方法。會把消息傳送給in隊列。

    func (c *Client) handleNotification(msg *jsonrpcMessage) {
        if !strings.HasSuffix(msg.Method, notificationMethodSuffix) {
            log.Debug(fmt.Sprint("dropping non-subscription message: ", msg))
            return
        }
        var subResult struct {
            ID string `json:"subscription"`
            Result json.RawMessage `json:"result"`
        }
        if err := json.Unmarshal(msg.Params, &subResult); err != nil {
            log.Debug(fmt.Sprint("dropping invalid subscription message: ", msg))
            return
        }
        if c.subs[subResult.ID] != nil {
            c.subs[subResult.ID].deliver(subResult.Result)
        }
    }
    func (sub *ClientSubscription) deliver(result json.RawMessage) (ok bool) {
        select {
        case sub.in <- result:
            return true
        case <-sub.quit:
            return false
        }
    }
相關文章
相關標籤/搜索