go rpc 源碼分析

1.概述

go 源碼中帶了rpc框架,以相對精簡的當時方式實現了rpc功能,目前源碼中的rpc官方已經宣佈再也不添加新功能,並推薦使用grpc.
做爲go標準庫中rpc框架,仍是有不少地方值得借鑑及學習,這裏將從源碼角度分析go原生rpc框架,以及分享一些在使用過程當中遇到的坑.git

2.server端

server端主要分爲兩個步驟,首先進行方法註冊,經過反射處理將方法取出,並存到map中.而後是網絡調用,主要是監聽端口,讀取數據包,解碼請求
調用反射處理後的方法,將返回值編碼,返回給客戶端.golang

2.1 方法註冊

圖片描述

2.1.1 Register
// Register publishes the receiver's methods in the DefaultServer.
func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) }

// RegisterName is like Register but uses the provided name for the type
// instead of the receiver's concrete type.
func RegisterName(name string, rcvr interface{}) error {
    return DefaultServer.RegisterName(name, rcvr)
}

如上,方法註冊的入口函數有兩個,分別爲Register以及RegisterName,這裏interface{}一般是帶方法的對象.若是想要自定義方法的接收對象,則可使用RegisterName.json

2.1.2 反射處理過程
type methodType struct {
    sync.Mutex // protects counters
    method     reflect.Method    //反射後的函數
    ArgType    reflect.Type      //請求參數的反射值
    ReplyType  reflect.Type      //返回參數的反射值
    numCalls   uint              //調用次數
}


type service struct {
    name   string                 // 服務名,這裏一般爲register時的對象名或自定義對象名
    rcvr   reflect.Value          // 服務的接收者的反射值
    typ    reflect.Type           // 接收者的類型
    method map[string]*methodType // 對象的全部方法的反射結果.
}

反射處理過程,其實就是將對象以及對象的方法,經過反射生成上面的結構,如註冊Arith.Multiply(xx,xx) error 這樣的對象時,生成的結構爲 map["Arith"]service, service 中ethod爲 map["Multiply"]methodType.網絡

幾個關鍵代碼以下:併發

生成service對象框架

func (server *Server) register(rcvr interface{}, name string, useName bool) error {
    //生成service
    s := new(service)
    s.typ = reflect.TypeOf(rcvr)
    s.rcvr = reflect.ValueOf(rcvr)
    sname := reflect.Indirect(s.rcvr).Type().Name()
 
    ....
    s.name = sname

    // 經過suitableMethods將對象的方法轉換成map[string]*methodType結構
    s.method = suitableMethods(s.typ, true)
    
    ....

    //service存儲爲鍵值對
    if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
        return errors.New("rpc: service already defined: " + sname)
    }
    return nil
}

生成 map[string] *methodType異步

func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
    methods := make(map[string]*methodType)

    //經過反射,遍歷全部的方法
    for m := 0; m < typ.NumMethod(); m++ {
        method := typ.Method(m)
        mtype := method.Type
        mname := method.Name
        // Method must be exported.
        if method.PkgPath != "" {
            continue
        }
        // Method needs three ins: receiver, *args, *reply.
        if mtype.NumIn() != 3 {
            if reportErr {
                log.Println("method", mname, "has wrong number of ins:", mtype.NumIn())
            }
            continue
        }
        //取出請求參數類型
        argType := mtype.In(1)
        ...

        // 取出響應參數類型,響應參數必須爲指針
        replyType := mtype.In(2)
        if replyType.Kind() != reflect.Ptr {
            if reportErr {
                log.Println("method", mname, "reply type not a pointer:", replyType)
            }
            continue
        }
        ...


        // 去除函數的返回值,函數的返回值必須爲error.
        if returnType := mtype.Out(0); returnType != typeOfError {
            if reportErr {
                log.Println("method", mname, "returns", returnType.String(), "not error")
            }
            continue
        }
        
        //將方法存儲成key-value
        methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
    }
    return methods
}

2.2 網絡調用

// Request 每次rpc調用的請求的頭部分
type Request struct {
    ServiceMethod string   // 格式爲: "Service.Method"
    Seq           uint64   // 客戶端生成的序列號
    next          *Request // server端保持的鏈表
}

// Response 每次rpc調用的響應的頭部分
type Response struct {
    ServiceMethod string    // 對應請求部分的 ServiceMethod
    Seq           uint64    // 對應請求部分的 Seq
    Error         string    // 錯誤
    next          *Response // server端保持的鏈表
}

如上,網絡調用主要用到上面的兩個結構體,分別是請求參數以及返回參數,經過編解碼器(gob/json)實現二進制到結構體的相互轉換.主要涉及到下面幾個步驟:tcp

圖片描述

關鍵代碼以下:
取出請求,並獲得相應函數的調用參數ide

func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, err error) {
    // Grab the request header.
    req = server.getRequest()
    //編碼器讀取生成請求
    err = codec.ReadRequestHeader(req)
    if err != nil {
        //錯誤處理
        ...
        return
    }

    keepReading = true

    //取出服務名以及方法名
    dot := strings.LastIndex(req.ServiceMethod, ".")
    if dot < 0 {
        err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod)
        return
    }
    serviceName := req.ServiceMethod[:dot]
    methodName := req.ServiceMethod[dot+1:]

    //從註冊時生成的map中查詢出相應的方法的結構
    svci, ok := server.serviceMap.Load(serviceName)
    if !ok {
        err = errors.New("rpc: can't find service " + req.ServiceMethod)
        return
    }
    svc = svci.(*service)

    //獲取出方法的類型
    mtype = svc.method[methodName]
    if mtype == nil {
        err = errors.New("rpc: can't find method " + req.ServiceMethod)
    }

循環處理,不斷讀取連接上的字節流,解密出請求,調用方法,編碼響應,回寫到客戶端.函數

func (server *Server) ServeCodec(codec ServerCodec) {
    sending := new(sync.Mutex)
    for {
        //讀取請求
        service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
        if err != nil {
            ...
        }

        //調用
        go service.call(server, sending, mtype, req, argv, replyv, codec)
    }
    codec.Close()
}

經過參數進行函數調用

func (s *service) call(server *Server, sending *sync.Mutex, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
    mtype.Lock()
    mtype.numCalls++
    mtype.Unlock()
    function := mtype.method.Func
    // 經過反射進行函數調用
    returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
    // 返回值是不爲空時,則取出錯誤的string
    errInter := returnValues[0].Interface()
    errmsg := ""
    if errInter != nil {
        errmsg = errInter.(error).Error()
    }
    
    //發送相應,並釋放請求結構
    server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
    server.freeRequest(req)
}

3.client端

// 異步調用
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
}

// 同步調用
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
}
// Call represents an active RPC.
type Call struct {
    ServiceMethod string      // 服務名及方法名 格式:服務.方法
    Args          interface{} // 函數的請求參數 (*struct).
    Reply         interface{} // 函數的響應參數 (*struct).
    Error         error       // 方法完成後 error的狀態.
    Done          chan *Call  // 方法調用結束後的channel.
}

client端部分則相對要簡單不少,主要提供Call以及Go兩個方法,分別表示同步調用以及異步調用,但其實同步調用底層實現其實也是異步調用,調用時主要用到了Call結構,相關解釋如上.

3.1 主要流程

圖片描述

3.2 關鍵代碼

發送請求部分代碼,每次send一次請求,均生成一個call對象,並使用seq做爲key保存在map中,服務端返回時從map取出call,進行相應處理.

func (client *Client) send(call *Call) {
    //請求級別的鎖
    client.reqMutex.Lock()
    defer client.reqMutex.Unlock()

    // Register this call.
    client.mutex.Lock()
    if client.shutdown || client.closing {
        call.Error = ErrShutdown
        client.mutex.Unlock()
        call.done()
        return
    }

    //生成seq,每次調用均生成惟一的seq,在服務端相應後會經過該值進行匹配
    seq := client.seq
    client.seq++
    client.pending[seq] = call
    client.mutex.Unlock()

    // 請求併發送請求
    client.request.Seq = seq
    client.request.ServiceMethod = call.ServiceMethod
    err := client.codec.WriteRequest(&client.request, call.Args)
    if err != nil {
        //發送請求錯誤時,將map中call對象刪除.
        client.mutex.Lock()
        call = client.pending[seq]
        delete(client.pending, seq)
        client.mutex.Unlock()
        if call != nil {
            call.Error = err
            call.done()
        }
    }
}

接收響應部分的代碼,這裏是一個for循環,不斷讀取tcp上的流,並解碼成Response對象以及方法的Reply對象.

func (client *Client) input() {
    var err error
    var response Response
    for err == nil {
        response = Response{}
        err = client.codec.ReadResponseHeader(&response)
        if err != nil {
            break
        }

        //經過response中的 Seq獲取call對象
        seq := response.Seq
        client.mutex.Lock()
        call := client.pending[seq]
        delete(client.pending, seq)
        client.mutex.Unlock()

        switch {
        case call == nil:
            err = client.codec.ReadResponseBody(nil)
            if err != nil {
                err = errors.New("reading error body: " + err.Error())
            }
        case response.Error != "":
            //服務端返回錯誤,直接將錯誤返回
            call.Error = ServerError(response.Error)
            err = client.codec.ReadResponseBody(nil)
            if err != nil {
                err = errors.New("reading error body: " + err.Error())
            }
            call.done()
        default:
            //經過編碼器,將Resonse的body部分解碼成reply對象.
            err = client.codec.ReadResponseBody(call.Reply)
            if err != nil {
                call.Error = errors.New("reading body " + err.Error())
            }
            call.done()
        }
    }

    // 客戶端退出處理
    client.reqMutex.Lock()
    client.mutex.Lock()
    client.shutdown = true
    closing := client.closing
    if err == io.EOF {
        if closing {
            err = ErrShutdown
        } else {
            err = io.ErrUnexpectedEOF
        }
    }
    for _, call := range client.pending {
        call.Error = err
        call.done()
    }
    client.mutex.Unlock()
    client.reqMutex.Unlock()
    if debugLog && err != io.EOF && !closing {
        log.Println("rpc: client protocol error:", err)
    }
}

4.一些坑

  • 同步調用沒法超時

因爲原生rpc只提供兩個方法,同步的Call以及異步的Go,同步的Call服務端不返回則會一直阻塞,這裏若是存在大量的不返回,會致使協程一直沒法釋放.

  • 異步調用超時後會內存泄漏

基於異步調用加channel實現超時功能也會存在泄漏問題,緣由是client的請求會存在map結構中,Go函數退出並不會清理map的內容,所以若是server端不返回的話,map中的請求會一直存儲,從而致使內存泄漏.

5. 總結

總的來講,go原生rpc算是個基礎版本的rpc,代碼精簡,可擴展性高,可是隻是實現了rpc最基本的網絡通信,像超時熔斷,連接管理(保活與重連),服務註冊發現,仍是欠缺的,所以仍是達不到生產環境開箱即用,不過git就有一個基於rpc的功能加強版本,叫rpcx,支持了大部分主流rpc的特性.

6. 參考

rpc https://golang.org/pkg/net/rpc/

相關文章
相關標籤/搜索