Go Rpc

簡介

go 提供了自帶的序列化協議gob(go binary),能夠進行原生go類型的序列化和反序列化,其中一個應用就是go語言自帶的rpc功能,主要在net/rpc包下。java

go 自帶的rpc提供了簡單的rpc相關的api,用戶只須要依照約定實現function而後進行服務註冊,就能夠在客戶端進行調用了。git

首先先列舉一下go rpc中的對於服務端提供的方法的相關約束:github

  • the method's type is exported. 方法所屬的類型必須是外部可見的golang

  • the method is exported. 方法必須是外部可見的json

  • the method has two arguments, both exported (or builtin) types. 方法參數只能有兩個,並且必須是外部可見的類型或者是基本類型。api

  • the method's second argument is a pointer. 方法的第二個參數類型必須是指針緩存

  • the method has return type error.方法的返回值必須是error類型bash

這裏結合我的的理解簡單解釋一下,方法和方法所屬的類型必須是exported,也就是必須是外部可見的,相似於java中接口的方法只能是public同樣,否則外部也不能調用。參數只能有兩個,並且第二個必須是指針,這是由於go rpc約定第一個參數是方法所需的入參,而第二個參數表明方法的實際返回值,因此第二個參數必須是指針類型,由於須要在方法內部修改它的值。返回值必須是error類型,表示方法執行中出現的異常或者rpc過程當中的異常。session

從這幾點約束能夠看出來,go自帶的rpc對相關的條件約束的很緊,這也符合go的「一件問題只有一個解決方式」的理念,經過明確的規定,讓開發過程變得更簡單。異步

快速上手

根據上面的約束,咱們先實際實現一個echo方法,它將客戶端傳遞來的參數原樣的返回:

1.首先服務端提供的方法必須歸屬於一個類型,並且是外部可見的類型,這裏咱們就定義一個EchoService的空結構好了:

type EchoService struct {}
複製代碼

2.服務端提供的方法必須也是外部可見的,因此定義一個方法叫作Echo:

func (service EchoService) Echo(arg string, result *string) error {
	*result = arg //在這裏直接將第二個參數(也就是實際的返回值)賦值爲arg
	return nil //error返回nil,也就是沒有發生異常
}
複製代碼

3.接下來咱們將Echo方法對外進行暴露:

func RegisterAndServe() {
	err := rpc.Register(&EchoService{})//註冊並非註冊方法,而是註冊EchoService的一個實例
	if err != nil {
		log.Fatal("error registering", err)
		return
	}
	rpc.HandleHTTP() //rpc通訊協議設置爲http協議
	err = http.ListenAndServe(":1234", nil) //端口設置爲9999
	if err != nil {
		log.Fatal("error listening", err)
	}
}
複製代碼

4.而後咱們定義一個客戶端:

func CallEcho(arg string) (result string, err error) {
	var client *rpc.Client
	client, err = rpc.DialHTTP("tcp", ":9999") //經過rpc.DialHTTP建立一個client
	if err != nil {
		return "", err
	}
	err = client.Call("EchoService.Echo", arg, &result) //經過類型加方法名指定要調用的方法
	if err != nil {
		return "", err
	}
	return result, err
}
複製代碼

5.最後分別啓動服務端和客戶端進行調用:

func main() {
	done := make(chan int)
	go server.RegisterAndServe() //先啓動服務端
	time.Sleep(1e9) //sleep 1s,由於服務端啓動是異步的,因此等一等
	go func() { //啓動客戶端
		result, err := client.CallEcho("hello world")
		if err != nil {
			log.Fatal("error calling", err)
		} else {
			fmt.Println("call echo:", result)
		}
		done <- 1
	}()
	<- done //阻塞等待客戶端結束
}
複製代碼

此外go自帶的rpc還提供rpc over tcp的選項,只須要在listen和dial時使用tcp鏈接就能夠了,rpc over tcp和這裏的例子惟一的區別就是創建鏈接時的區別,實際的rpc over http也並無使用http協議,只是用http server創建鏈接而已。

go還提供基於json的rpc,只須要在服務端和客戶端把rpc.ServeConn和rpc.Dial替換成jsonrpc.ServeConn和jsonrpc.Dial便可。

源碼解析

Server端

PRC over HTTP

第一個示例中,咱們調用了rpc.HandleHTTP(),它的做用是將rpc server綁定到http端口上,執行了這個方法以後咱們仍然須要主動調用http.ListenAndServe。rpc.HandleHTTP的具體實現以下:

const (
	// Defaults used by HandleHTTP
	DefaultRPCPath   = "/_goRPC_"
	DefaultDebugPath = "/debug/rpc"
)
// HandleHTTP registers an HTTP handler for RPC messages to DefaultServer
// on DefaultRPCPath and a debugging handler on DefaultDebugPath.
// It is still necessary to invoke http.Serve(), typically in a go statement.
func HandleHTTP() {
	DefaultServer.HandleHTTP(DefaultRPCPath, DefaultDebugPath)
}
複製代碼

能夠看見,HandleHTTP方法調用了DefaultServer的HandleHTTP方法,DefaultServer是rpc包內定義的一個Server類型變量,Server定義了不少方法:

  • func (server *Server) Accept(lis net.Listener)
  • func (server *Server) HandleHTTP(rpcPath, debugPath string)
  • func (server *Server) ServeCodec(codec ServerCodec)
  • func (server *Server) ServeConn(conn io.ReadWriteCloser)
  • func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request)
  • func (server *Server) ServeRequest(codec ServerCodec) error

DefaultServer就是一個Server實例:

// DefaultServer is the default instance of *Server.
var DefaultServer = NewServer()

// NewServer returns a new Server.
func NewServer() *Server {
	return &Server{}
}
複製代碼

其中HandleHTTP的具體實現以下:

// HandleHTTP registers an HTTP handler for RPC messages on rpcPath,
// and a debugging handler on debugPath.
// It is still necessary to invoke http.Serve(), typically in a go statement.
func (server *Server) HandleHTTP(rpcPath, debugPath string) {
	http.Handle(rpcPath, server)
	http.Handle(debugPath, debugHTTP{server})
}
複製代碼

實際上HandleHTTP就是使用http包的功能,將server自身註冊到http的url映射上了;從上面列舉的Server類型的部分方法能夠看出,Server自身實現了ServeHTTP方法,因此能夠處理http請求:

// ServeHTTP implements an http.Handler that answers RPC requests.
func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
	if req.Method != "CONNECT" {
		w.Header().Set("Content-Type", "text/plain; charset=utf-8")
		w.WriteHeader(http.StatusMethodNotAllowed)
		io.WriteString(w, "405 must CONNECT\n")
		return
	}
	conn, _, err := w.(http.Hijacker).Hijack()
	if err != nil {
		log.Print("rpc hijacking ", req.RemoteAddr, ": ", err.Error())
		return
	}
	io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n")
	server.ServeConn(conn)
}
複製代碼

能夠看到,rpc server收到http鏈接以後就會調用hijack方法接管這個鏈接,而後調用ServeConn方法進行處理,而ServeConn方法就和rpc over tcp沒區別了。

總的來講,rpc over http就是利用http包接收來自客戶端的鏈接,後續的流程和rpc over TCP同樣。

RPC over TCP

根據上面的第二個例子咱們能夠看到,在使用rpc over tcp時,用戶須要本身建立一個Listener並調用Accpet,而後調用Server的ServeConn方法。而咱們以前使用的rpc.ServeConn實際上調用了DefaultServer.ServeConn。而ServeConn的具體實現以下:

// ServeConn runs the server on a single connection.
// ServeConn blocks, serving the connection until the client hangs up.
// The caller typically invokes ServeConn in a go statement.
// ServeConn uses the gob wire format (see package gob) on the
// connection. To use an alternate codec, use ServeCodec.
// See NewClient's comment for information about concurrent access.
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
	buf := bufio.NewWriter(conn)
	srv := &gobServerCodec{
		rwc:    conn,
		dec:    gob.NewDecoder(conn),
		enc:    gob.NewEncoder(buf),
		encBuf: buf,
	}
	server.ServeCodec(srv) //構造了一個私有的gobServerCodec而後調用servCodec方法,表示默認使用gob序列化協議
}
複製代碼

能夠看到,ServeConn其實是構造了一個codec而後調用serveCodec方法,默認的邏輯是採用gobServerCodec,由此能夠看出,若是咱們想使用自定義的序列化協議,只須要實現一個本身的ServerCodec就能夠了,serverCodec接口定義以下:

// A ServerCodec implements reading of RPC requests and writing of
// RPC responses for the server side of an RPC session.
// The server calls ReadRequestHeader and ReadRequestBody in pairs
// to read requests from the connection, and it calls WriteResponse to
// write a response back. The server calls Close when finished with the
// connection. ReadRequestBody may be called with a nil
// argument to force the body of the request to be read and discarded.
// See NewClient's comment for information about concurrent access.
type ServerCodec interface {
	ReadRequestHeader(*Request) error
	ReadRequestBody(interface{}) error
	WriteResponse(*Response, interface{}) error
	// Close can be called multiple times and must be idempotent.
	Close() error
}
複製代碼

ServerCodec的方法定義裏只出現了Request和Response,並無鏈接相關的定義,說明在鏈接相關的變量須要設置成ServerCodec的成員變量,每次調用都須要構造新的ServerCodec對象。

回到serveCodec方法,能夠看到serveCodec的流程基本上就是:read request - invoke - close

// ServeCodec is like ServeConn but uses the specified codec to
// decode requests and encode responses.
func (server *Server) ServeCodec(codec ServerCodec) {
	sending := new(sync.Mutex)
	wg := new(sync.WaitGroup)
	for {
		service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
		if err != nil {
			if debugLog && err != io.EOF {
				log.Println("rpc:", err)
			}
			if !keepReading {
				break
			}
			// send a response if we actually managed to read a header.
			if req != nil {
				server.sendResponse(sending, req, invalidRequest, codec, err.Error())
				server.freeRequest(req)
			}
			continue
		}
		wg.Add(1)
    //每一個請求的處理都在新的goroutine裏執行
		go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
	}
	// We've seen that there are no more requests.
	// Wait for responses to be sent before closing codec.
	wg.Wait()
	codec.Close()
}
複製代碼

這裏看到,serveCodec會一直調用ReadRequestHeader和ReadRequestBody方法讀取請求,直到客戶端鏈接再也不發送請求,在serveConn方法的註釋裏也提到了,對於serveConn方法一般建議使用goroutine來執行。

接下來仔細看一下readRequest的實現:

func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, err error) {
	service, mtype, req, keepReading, err = server.readRequestHeader(codec)
	if err != nil {
		if !keepReading {
			return
		}
		// discard body
		codec.ReadRequestBody(nil)
		return
	}

	// Decode the argument value.
	argIsValue := false // if true, need to indirect before calling.
	if mtype.ArgType.Kind() == reflect.Ptr {
		argv = reflect.New(mtype.ArgType.Elem())
	} else {
		argv = reflect.New(mtype.ArgType)
		argIsValue = true
	}
	// argv guaranteed to be a pointer now.
	if err = codec.ReadRequestBody(argv.Interface()); err != nil {
		return
	}
	if argIsValue {
		argv = argv.Elem()
	}

	replyv = reflect.New(mtype.ReplyType.Elem())

	switch mtype.ReplyType.Elem().Kind() {
	case reflect.Map:
		replyv.Elem().Set(reflect.MakeMap(mtype.ReplyType.Elem()))
	case reflect.Slice:
		replyv.Elem().Set(reflect.MakeSlice(mtype.ReplyType.Elem(), 0, 0))
	}
	return
}

func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, err error) {
	// Grab the request header.
	req = server.getRequest()
	err = codec.ReadRequestHeader(req)
	if err != nil {
		req = nil
		if err == io.EOF || err == io.ErrUnexpectedEOF {
			return
		}
		err = errors.New("rpc: server cannot decode request: " + err.Error())
		return
	}

	// We read the header successfully. If we see an error now,
	// we can still recover and move on to the next request.
	keepReading = true

	dot := strings.LastIndex(req.ServiceMethod, ".")
	if dot < 0 {
		err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod)
		return
	}
	serviceName := req.ServiceMethod[:dot]
	methodName := req.ServiceMethod[dot+1:]

	// Look up the request.
	svci, ok := server.serviceMap.Load(serviceName)
	if !ok {
		err = errors.New("rpc: can't find service " + req.ServiceMethod)
		return
	}
	svc = svci.(*service)
	mtype = svc.method[methodName]
	if mtype == nil {
		err = errors.New("rpc: can't find method " + req.ServiceMethod)
	}
	return
}
複製代碼

基本就是依次調用codec的readRequestHeader和readRequestBody,過程當中會使用go自帶的gob序列化協議,這裏先不深刻看,省得層次太深亂了。

接下來看invoke部分:

func (s *service) call(server *Server, sending *sync.Mutex, wg *sync.WaitGroup, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
  //wg由ServeConn方法持有,用於阻塞等待調用方斷開,這裏每次處理一個請求就count down一次
	if wg != nil {
		defer wg.Done()
	}
  //對方法加鎖,就爲了把調用次數加一
	mtype.Lock()
  //調用次數加一,暫時沒看到是幹啥的
	mtype.numCalls++
	mtype.Unlock()
	function := mtype.method.Func
	// Invoke the method, providing a new value for the reply.
	returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
	// The return value for the method is an error.
	errInter := returnValues[0].Interface()
	errmsg := ""
	if errInter != nil {
		errmsg = errInter.(error).Error()
	}
	server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
	server.freeRequest(req)
}
複製代碼

invoke部分就是經過反射調用對應實例的方法,而後將結果經過sendResponse返回給客戶端,sendResponse實際上也是調用了codec的WriteResponse方法:

func (server *Server) sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec ServerCodec, errmsg string) {
	resp := server.getResponse()
	// Encode the response header
	resp.ServiceMethod = req.ServiceMethod
	if errmsg != "" {
		resp.Error = errmsg
		reply = invalidRequest
	}
	resp.Seq = req.Seq
	sending.Lock()
	err := codec.WriteResponse(resp, reply)
	if debugLog && err != nil {
		log.Println("rpc: writing response:", err)
	}
	sending.Unlock()
	server.freeResponse(resp)
}
複製代碼

這裏能夠看到,服務端在發送數據過程當中是加了鎖的,也就是WriteResponse部分是串行的。

服務端流程大體就到此爲止了,總體思路仍是基本的RPC流程:經過Listener創建鏈接,調用codec進行編解碼,經過反射執行真正的方法。server會讀取到的request對象緩存在內存中,具體是一個鏈表的格式,直到server端邏輯執行完

這裏再簡單看一下rpc server相關的其餘部分:

rpc.Register:register方法經過反射會把參數對應的類型下全部符合規範的方法加載並緩存起來

// Register publishes in the server the set of methods of the
// receiver value that satisfy the following conditions:
// - exported method of exported type
// - two arguments, both of exported type
// - the second argument is a pointer
// - one return value, of type error
// It returns an error if the receiver is not an exported type or has
// no suitable methods. It also logs the error using package log.
// The client accesses each method using a string of the form "Type.Method",
// where Type is the receiver's concrete type.
func (server *Server) Register(rcvr interface{}) error {
	return server.register(rcvr, "", false)
}

// RegisterName is like Register but uses the provided name for the type
// instead of the receiver's concrete type.
func (server *Server) RegisterName(name string, rcvr interface{}) error {
	return server.register(rcvr, name, true)
}

func (server *Server) register(rcvr interface{}, name string, useName bool) error {
	s := new(service)
	s.typ = reflect.TypeOf(rcvr)
	s.rcvr = reflect.ValueOf(rcvr)
	sname := reflect.Indirect(s.rcvr).Type().Name()
	if useName {
		sname = name
	}
	if sname == "" {
		s := "rpc.Register: no service name for type " + s.typ.String()
		log.Print(s)
		return errors.New(s)
	}
	if !isExported(sname) && !useName {
		s := "rpc.Register: type " + sname + " is not exported"
		log.Print(s)
		return errors.New(s)
	}
	s.name = sname

	// Install the methods
	s.method = suitableMethods(s.typ, true)

	if len(s.method) == 0 {
		str := ""

		// To help the user, see if a pointer receiver would work.
		method := suitableMethods(reflect.PtrTo(s.typ), false)
		if len(method) != 0 {
			str = "rpc.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)"
		} else {
			str = "rpc.Register: type " + sname + " has no exported methods of suitable type"
		}
		log.Print(str)
		return errors.New(str)
	}

	if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
		return errors.New("rpc: service already defined: " + sname)
	}
	return nil
}
複製代碼

rpc包內定義的各個struct:service、methodType、Server、Request、Response

type service struct { //保存了服務提供者的各個信息,包括名稱、類型、方法等等
	name   string                 // name of service
	rcvr   reflect.Value          // receiver of methods for the service
	typ    reflect.Type           // type of the receiver
	method map[string]*methodType // registered methods
}

type methodType struct {//保存了反射獲取到的方法的相關信息,此外還有一個計數器,用來統計調用次數,還有一個繼承的Mutext接口,用來作計數器的同步
	sync.Mutex // protects counters
	method     reflect.Method
	ArgType    reflect.Type
	ReplyType  reflect.Type
	numCalls   uint
}

// Server represents an RPC Server.
type Server struct { //server對象
	serviceMap sync.Map   // map[string]*service 保存服務提供者信息的map
	reqLock    sync.Mutex // protects freeReq 用於作freeReq的同步
	freeReq    *Request //rpc 請求
	respLock   sync.Mutex // protects freeResp 用於作freeResp的同步
	freeResp   *Response //rpc響應
}

// Request is a header written before every RPC call. It is used internally
// but documented here as an aid to debugging, such as when analyzing
// network traffic.
type Request struct { //Request僅標識請求頭,只包含一些元數據
	ServiceMethod string   // format: "Service.Method"
	Seq           uint64   // sequence number chosen by client
	next          *Request // for free list in Server
}

// Response is a header written before every RPC return. It is used internally
// but documented here as an aid to debugging, such as when analyzing
// network traffic.
type Response struct {//Response僅標識請求頭,只包含一些元數據
	ServiceMethod string    // echoes that of the Request
	Seq           uint64    // echoes that of the request
	Error         string    // error, if any.
	next          *Response // for free list in Server
}
複製代碼

咱們能夠留意到,Request和Response被定義成了鏈表同樣的結構,並且Server還在request和response上加了同步,這是由於在server中req和resp是複用的,而不是每次處理請求都建立新的對象,具體能夠從getRequest/getResponse/freeReqeust/freeResponse看出來:

func (server *Server) getRequest() *Request {
	server.reqLock.Lock()
	req := server.freeReq
	if req == nil {
		req = new(Request)
	} else {
		server.freeReq = req.next
		*req = Request{}
	}
	server.reqLock.Unlock()
	return req
}

func (server *Server) freeRequest(req *Request) {
	server.reqLock.Lock()
	req.next = server.freeReq
	server.freeReq = req
	server.reqLock.Unlock()
}

func (server *Server) getResponse() *Response {
	server.respLock.Lock()
	resp := server.freeResp
	if resp == nil {
		resp = new(Response)
	} else {
		server.freeResp = resp.next
		*resp = Response{}
	}
	server.respLock.Unlock()
	return resp
}

func (server *Server) freeResponse(resp *Response) {
	server.respLock.Lock()
	resp.next = server.freeResp
	server.freeResp = resp
	server.respLock.Unlock()
}
複製代碼

Client端

客戶端能夠經過如下幾種方式和服務端創建鏈接:

  • func Dial(network, address string) (*Client, error) //直接創建tcp鏈接

  • func DialHTTP(network, address string) (*Client, error) //經過http發送connect請求創建鏈接,使用默認的PATH

  • func DialHTTPPath(network, address, path string) (*Client, - error)//經過http發送connect請求創建鏈接,使用自定義的PATH

  • func NewClient(conn io.ReadWriteCloser) *Client //根據給定的鏈接創建rpc客戶端

  • func NewClientWithCodec(codec ClientCodec) *Client //根據給定的ClientCodec創建rpc客戶端

客戶端的調用方式有兩種:Call和Go,其中Call是同步調用,而Go則是異步調用。其中Call的返回值是error類型,而Go的返回值是Call類型。實際上Call也是調用Go方法實現的,只是在Call會阻塞等待Go方法返回結果而已。這裏還有一個問題,就是調用時只能經過channel控制超時,底層的邏輯不會有超時,若是server端一直不返回,客戶端緩存的請求就會一直不釋放,致使泄漏。

// Go invokes the function asynchronously. It returns the Call structure representing
// the invocation. The done channel will signal when the call is complete by returning
// the same Call object. If done is nil, Go will allocate a new channel.
// If non-nil, done must be buffered or Go will deliberately crash.
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
	call := new(Call)
	call.ServiceMethod = serviceMethod
	call.Args = args
	call.Reply = reply
	if done == nil {
		done = make(chan *Call, 10) // buffered.
	} else {
		// If caller passes done != nil, it must arrange that
		// done has enough buffer for the number of simultaneous
		// RPCs that will be using that channel. If the channel
		// is totally unbuffered, it's best not to run at all.
		if cap(done) == 0 {
			log.Panic("rpc: done channel is unbuffered")
		}
	}
	call.Done = done
	client.send(call)
	return call
}

// Call invokes the named function, waits for it to complete, and returns its error status.
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
	call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
	return call.Error
}
複製代碼

這裏須要注意的是,Go方法接收一個channel類型的done做爲結束的標記,並且這個channel必須是有緩衝的,至於爲何,我猜想是防止往done裏寫入發生阻塞,具體還須要再確認下。

與server的實現相似,客戶端提供了一個ClientCodec的接口,用來作請求和響應的解析,其中的方法這裏就不列舉了。

下面看一下客戶端構造時的邏輯:

// Client represents an RPC Client.
// There may be multiple outstanding Calls associated
// with a single Client, and a Client may be used by
// multiple goroutines simultaneously.
type Client struct {
	codec ClientCodec
	reqMutex sync.Mutex // protects following
	request  Request
	mutex    sync.Mutex // protects following
	seq      uint64
	pending  map[uint64]*Call
	closing  bool // user has called Close
	shutdown bool // server has told us to stop
}

// NewClient returns a new Client to handle requests to the
// set of services at the other end of the connection.
// It adds a buffer to the write side of the connection so
// the header and payload are sent as a unit.
//
// The read and write halves of the connection are serialized independently,
// so no interlocking is required. However each half may be accessed
// concurrently so the implementation of conn should protect against
// concurrent reads or concurrent writes.
func NewClient(conn io.ReadWriteCloser) *Client {
	encBuf := bufio.NewWriter(conn)
	client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
	return NewClientWithCodec(client)
}

// NewClientWithCodec is like NewClient but uses the specified
// codec to encode requests and decode responses.
func NewClientWithCodec(codec ClientCodec) *Client {
	client := &Client{
		codec:   codec,
		pending: make(map[uint64]*Call),
	}
	go client.input()
	return client
}
複製代碼

Client對象中包含一個map類型的pending,用來緩存全部未完成的請求,同時對request和seq作了同步。

能夠看到,若是使用默認的NewClient方法,則會構造一個gobClientCodec,它使用gob做爲序列化協議;也能夠本身指定一個codec。

在構造時,會執行go client.input(),這個input方法就是client接收響應的邏輯了,這個方法會在循環中讀取響應,根據響應的seq找到對應的請求,而後經過請求的done發送信號。

func (client *Client) input() {
	var err error
	var response Response
	for err == nil {
		response = Response{}
		err = client.codec.ReadResponseHeader(&response)
		if err != nil {
			break
		}
		seq := response.Seq
		client.mutex.Lock()
		call := client.pending[seq]
		delete(client.pending, seq)
		client.mutex.Unlock()

		switch {
		case call == nil:
			// We've got no pending call. That usually means that // WriteRequest partially failed, and call was already // removed; response is a server telling us about an // error reading request body. We should still attempt // to read error body, but there's no one to give it to.
			err = client.codec.ReadResponseBody(nil)
			if err != nil {
				err = errors.New("reading error body: " + err.Error())
			}
		case response.Error != "":
			// We've got an error response. Give this to the request; // any subsequent requests will get the ReadResponseBody // error if there is one. call.Error = ServerError(response.Error) err = client.codec.ReadResponseBody(nil) if err != nil { err = errors.New("reading error body: " + err.Error()) } call.done() default: err = client.codec.ReadResponseBody(call.Reply) if err != nil { call.Error = errors.New("reading body " + err.Error()) } call.done() } } // Terminate pending calls. client.reqMutex.Lock() client.mutex.Lock() client.shutdown = true closing := client.closing if err == io.EOF { if closing { err = ErrShutdown } else { err = io.ErrUnexpectedEOF } } for _, call := range client.pending { call.Error = err call.done() } client.mutex.Unlock() client.reqMutex.Unlock() if debugLog && err != io.EOF && !closing { log.Println("rpc: client protocol error:", err) } } 複製代碼

codec

除了go自帶的gob序列化,用戶還可使用其餘的序列化方式,包括前面提到的json。go 提供了json格式的rpc,能夠支持跨語言的調用。

其餘

值得注意的是,net/rpc下的內容目前已經再也不更新(freeze)了,具體參考:github.com/golang/go/i…

網上有博客說go 自帶的rpc性能遠優於grpc,再也不更新的緣由可能只是開發團隊再也不願意花費過多精力而已。

相關文章
相關標籤/搜索