Go學習筆記 - 使用jsonrpc進行遠程訪問

##JSON-RPC ---------- JSON-RPC是一個輕量級的遠程調用協議,簡單易用。 **請求數據體**: { "method": "getName", "params": ["1"], "id": 1 } `method`: 遠端的方法名 `params`: 遠程方法接收的參數列表 `id`: 本次請求的標識碼,遠程返回時數據的標識碼應與本次請求的標識碼相同 **返回數據體**: { "result": {"id": 1, "name": "name1"}, "error": null, "id": 1 } `result`: 遠程方法返回值 `error`: 錯誤信息 `id`: 調用時所傳來的id ---------- ##Go的rpc包 **`net/rpc`** `net/rpc`包實現了最基本的rpc調用,它默認經過`HTTP`協議傳輸`gob`數據來實現遠程調用。 服務端實現了一個HTTP server,接收客戶端的請求,在收到調用請求後,會反序列化客戶端傳來的gob數據,獲取要調用的方法名,並經過反射來調用咱們本身實現的處理方法,這個處理方法傳入固定的兩個參數,並返回一個error對象,參數分別爲客戶端的請求內容以及要返回給客戶端的數據體的指針。 **`net/rpc/jsonrpc`** `net/rpc/jsonrpc`包實現了JSON-RPC協議,即實現了`net/rpc`包的`ClientCodec`接口與`ServerCodec`,增長了對json數據的序列化與反序列化。 ---------- ##Go JSON-RPC遠程調用 客戶端與服務端雙方傳輸數據,其中數據結構必須得讓雙方都能處理。 首先定義rpc所傳輸的數據的結構,client端與server端都得用到。 // 須要傳輸的對象 type RpcObj struct { Id int `json:"id"` // struct標籤, 若是指定,jsonrpc包會在序列化json時,將該聚合字段命名爲指定的字符串 Name string `json:"name"` } // 須要傳輸的對象 type ReplyObj struct { Ok bool `json:"ok"` Id int `json:"id"` Msg string `json:"msg"` } `RpcObj` 爲傳輸的數據 `ReplyObj` 爲服務端返回的數據 這兩個結構體都可以在client和server端雙向傳遞 **服務端** 引入兩個包 "net/rpc" "net/rpc/jsonrpc" `net/rpc`實現了go的rpc框架,而`net/rpc/jsonrpc`則具體實現了JSON-RPC協議,具備json數據的序列化與反序列化功能。 實現處理器 // server端的rpc處理器 type ServerHandler struct {} // server端暴露的rpc方法 func (serverHandler ServerHandler) GetName(id int, returnObj *RpcObj) error { log.Println("server\t-", "recive GetName call, id:", id) returnObj.Id = id returnObj.Name = "名稱1" return nil } // server端暴露的rpc方法 func (serverHandler ServerHandler) SaveName(rpcObj RpcObj, returnObj *ReplyObj) error { log.Println("server\t-", "recive SaveName call, RpcObj:", rpcObj) returnObj.Ok = true returnObj.Id = rpcObj.Id returnObj.Msg = "存儲成功" return nil } `ServerHandler`結構能夠不須要什麼字段,只須要有符合`net/rpc`server端處理器約定的方法便可。 符合約定的方法必須具有兩個參數和一個`error`類型的返回值 *第一個參數* 爲client端調用rpc時交給服務器的數據,能夠是指針也能夠是實體。`net/rpc/jsonrpc`的json處理器會將客戶端傳遞的json數據解析爲正確的struct對象。 *第二個參數* 爲server端返回給client端的數據,必須爲指針類型。`net/rpc/jsonrpc`的json處理器會將這個對象正確序列化爲json字符串,最終返回給client端。 `ServerHandler`結構須要註冊給`net/rpc`的HTTP處理器,HTTP處理器綁定後,會經過反射獲得其暴露的方法,在處理請求時,根據JSON-RPC協議中的`method`字段動態的調用其指定的方法。 // 新建Server server := rpc.NewServer() // 開始監聽,使用端口 8888 listener, err := net.Listen("tcp", ":8888") if err != nil { log.Fatal("server\t-", "listen error:", err.Error()) } defer listener.Close() log.Println("server\t-", "start listion on port 8888") // 新建處理器 serverHandler := &ServerHandler{} // 註冊處理器 server.Register(serverHandler) // 等待並處理連接 for { conn, err := listener.Accept() if err != nil { log.Fatal(err.Error()) } // 在goroutine中處理請求 // 綁定rpc的編碼器,使用http connection新建一個jsonrpc編碼器,並將該編碼器綁定給http處理器 go server.ServeCodec(jsonrpc.NewServerCodec(conn)) } rpc server端大體的處理流程 ![golang_jsonrpc_server](http://images.cnblogs.com/cnblogs_com/hangxin1940/508415/o_golang_jsonrpc_server.png "golang_jsonrpc_server") **客戶端** 客戶端必須確保存在服務端在傳輸的數據中所使用的struct,在這裏,必須確保客戶端也能使用`RpcObj`與`ReplyObj` struct。 client, err := net.DialTimeout("tcp", "localhost:8888", 1000*1000*1000*30) // 30秒超時時間 if err != nil { log.Fatal("client\t-", err.Error()) } defer client.Close() 首先,經過`net`包使用TCP協議鏈接至服務器,這裏設定了超時時間。 clientRpc := jsonrpc.NewClient(client) 而後使用`jsonrpc.NewClient`經過以前的TCP連接創建一個rpcClient實例。 對於`net/rpc`的客戶端,在遠程調用是有同步(Synchronous)和異步(Asynchronous)兩種方式。不論那種方式,在源碼中,請求老是在一個新的goroutine中執行,而且使用一個通道(chan)來存放服務器返回值。使用同步方式調用時,調用方法內部會等待chan的數據,並一直阻塞直到遠程服務器返回。而使用異步方式時,客戶端的調用方法會直接將chan返回,這樣就能夠適時的處理數據而不影響當前goroutine。 下面是`net/rpc/client`中調用遠程rpc的源碼 // Go invokes the function asynchronously. It returns the Call structure representing // the invocation. The done channel will signal when the call is complete by returning // the same Call object. If done is nil, Go will allocate a new channel. // If non-nil, done must be buffered or Go will deliberately crash. func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call { call := new(Call) call.ServiceMethod = serviceMethod call.Args = args call.Reply = reply if done == nil { done = make(chan *Call, 10) // buffered. } else { // If caller passes done != nil, it must arrange that // done has enough buffer for the number of simultaneous // RPCs that will be using that channel. If the channel // is totally unbuffered, it's best not to run at all. if cap(done) == 0 { log.Panic("rpc: done channel is unbuffered") } } call.Done = done client.send(call) return call } // Call invokes the named function, waits for it to complete, and returns its error status. func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error { call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done return call.Error } *同步調用的使用* // 遠程服務器返回的對象 var rpcObj RpcObj // 請求數據,rpcObj對象會被填充 clientRpc.Call("ServerHandler.GetName", 1, &rpcObj) // 遠程返回的對象 var reply ReplyObj // 傳給遠程服務器的對象參數 saveObj := RpcObj{2, "對象2"} // 請求數據 clientRpc.Call("ServerHandler.SaveName", saveObj, &reply) `Call`方法屬於同步方式的調用。第一個參數爲Server端JSON-RPC處理器的類名加方法名,第二個參數爲提交給遠端服務器的數據,第三個參數是服務器的返回數據,必須是指針。 *異步調用的使用* // 傳給遠程的對象 saveObj := RpcObj{i, "對象"} // 異步的請求數據 divCall := clientRpc.Go("ServerHandler.SaveName", saveObj, &ReplyObj{}, nil) // 在一個新的goroutine中異步獲取遠程的返回數據,並不阻塞當前的goroutine go func() { reply := <-divCall.Done // 取出遠程返回的數據 }() `Call`方法屬於同步方式的調用。第一個參數爲Server端JSON-RPC處理器的類名加方法名,第二個參數爲提交給遠端服務器的數據,第三個參數是服務器的返回數據,必須是指針,第四個參數爲一個通道,能夠留空,留空的話它會幫忙建一個,並保存在divCall中。 `net/rpc/jsonrpc/client`會把方法名與參數自動序列化爲json格式,其結構如開頭所述的JSON-RPC結構同樣,並自動爲JSON-RPC中的id賦值。而服務端返回的對象也會被正確的反序列化。 rpc client端大體的處理流程 ![golang_jsonrpc_client](http://images.cnblogs.com/cnblogs_com/hangxin1940/508415/o_golang_jsonrpc_client.png "golang_jsonrpc_client") ---------- 完整的程序 package main import ( "net/rpc" "net/rpc/jsonrpc" "net" "log" ) // 須要傳輸的對象 type RpcObj struct { Id int `json:"id"` // struct標籤, 若是指定,jsonrpc包會在序列化json時,將該聚合字段命名爲指定的字符串 Name string `json:"name"` } // 須要傳輸的對象 type ReplyObj struct { Ok bool `json:"ok"` Id int `json:"id"` Msg string `json:"msg"` } // server端的rpc處理器 type ServerHandler struct {} // server端暴露的rpc方法 func (serverHandler ServerHandler) GetName(id int, returnObj *RpcObj) error { log.Println("server\t-", "recive GetName call, id:", id) returnObj.Id = id returnObj.Name = "名稱1" return nil } // server端暴露的rpc方法 func (serverHandler ServerHandler) SaveName(rpcObj *RpcObj, returnObj *ReplyObj) error { log.Println("server\t-", "recive SaveName call, RpcObj:", rpcObj) returnObj.Ok = true returnObj.Id = rpcObj.Id returnObj.Msg = "存儲成功" return nil } // 開啓rpc服務器 func startServer() { // 新建Server server := rpc.NewServer() // 開始監聽,使用端口 8888 listener, err := net.Listen("tcp", ":8888") if err != nil { log.Fatal("server\t-", "listen error:", err.Error()) } defer listener.Close() log.Println("server\t-", "start listion on port 8888") // 新建處理器 serverHandler := &ServerHandler{} // 註冊處理器 server.Register(serverHandler) // 等待並處理連接 for { conn, err := listener.Accept() if err != nil { log.Fatal(err.Error()) } // 在goroutine中處理請求 // 綁定rpc的編碼器,使用http connection新建一個jsonrpc編碼器,並將該編碼器綁定給http處理器 go server.ServeCodec(jsonrpc.NewServerCodec(conn)) } } // 客戶端以同步的方式向rpc服務器請求 func callRpcBySynchronous() { // 鏈接至服務器 client, err := net.DialTimeout("tcp", "localhost:8888", 1000*1000*1000*30) // 30秒超時時間 if err != nil { log.Fatal("client\t-", err.Error()) } defer client.Close() // 創建rpc通道 clientRpc := jsonrpc.NewClient(client) // 遠程服務器返回的對象 var rpcObj RpcObj log.Println("client\t-", "call GetName method") // 請求數據,rpcObj對象會被填充 clientRpc.Call("ServerHandler.GetName", 1, &rpcObj) log.Println("client\t-", "recive remote return", rpcObj) // 遠程返回的對象 var reply ReplyObj // 傳給遠程服務器的對象參數 saveObj := RpcObj{2, "對象2"} log.Println("client\t-", "call SetName method") // 請求數據 clientRpc.Call("ServerHandler.SaveName", saveObj, &reply) log.Println("client\t-", "recive remote return", reply) } // 客戶端以異步的方式向rpc服務器請求 func callRpcByAsynchronous() { // 打開連接 client, err := net.DialTimeout("tcp", "localhost:8888", 1000*1000*1000*30) // 30秒超時時間 if err != nil { log.Fatal("client\t-", err.Error()) } defer client.Close() // 創建rpc通道 clientRpc := jsonrpc.NewClient(client) // 用於阻塞主goroutine endChan := make(chan int, 15) // 15次請求 for i := 1 ; i <= 15; i++ { // 傳給遠程的對象 saveObj := RpcObj{i, "對象"} log.Println("client\t-", "call SetName method") // 異步的請求數據 divCall := clientRpc.Go("ServerHandler.SaveName", saveObj, &ReplyObj{}, nil) // 在一個新的goroutine中異步獲取遠程的返回數據 go func(num int) { reply := <-divCall.Done log.Println("client\t-", "recive remote return by Asynchronous", reply.Reply) endChan <- num }(i) } // 15個請求所有返回時此方法能夠退出了 for i := 1 ; i <= 15; i++ { _ = <-endChan } } func main() { go startServer() callRpcBySynchronous() callRpcByAsynchronous() } ---------- ##總結 在使用`net/rpc/jsonrpc`時遇到這樣一個問題: 有多個client與一個server進行rpc調用,而這些client又處於不一樣的內網,在server端須要獲取client端的公網IP。 按照`net/rpc`的實現,在服務端處理器的自定義方法中只能獲取被反序列化的數據,其餘請求相關信息如client的IP只能在主goroutine的`net.Listener.Accept`中的`Conn`對象取得。 按源碼中的示例,每接收一個TCP請求都會在一個新的goroutine中處理,可是處理器的自定義方法都運行在不一樣的goroutine中,這些回調的方法沒有暴露任何能獲取conn的字段、方法。 我是這樣解決的,在server端rpc處理器struct中放一個聚合字段,用於存儲ip地址的。 處理器被註冊與rpc server,全局只有一個,在每次接受到tcp請求後,開啓一個goroutine,而後在goroutine內部當即加上排斥鎖,而後再把請求的conn綁定給rpc server處理器,這樣,即能保證handler字段的線程安全,又能及時的相應client的請求。 .... .... .... mutex := &sync.Mutex{} // 等待連接 for { // 相應請求 conn, err := listener.Accept() if err != nil { log.Println(err.Error()) } // 開啓一個goroutine來處理請求,緊接着等待下一個請求。 go func() { // 應用排斥鎖 mutex.Lock() // 記錄ip地址 reciveHandler.Ip = strings.Split(conn.RemoteAddr().String(), ":")[0] // 處理JSON-RPC調用 server.ServeCodec(jsonrpc.NewServerCodec(conn)) // 解鎖 mutex.Unlock() }() } .... .... ....
相關文章
相關標籤/搜索