RPC(遠過程調用)在分佈式系統中是很經常使用的基礎通信手段,核心思想是將不一樣進程之間的通信抽象爲函數調用,基本的過程是調用端經過將參數序列化到流中併發送給服務端,服務端從流中反序列化出參數並完成實際的處理,而後將結果序列化後返回給調用端。一般的RPC由接口形式來定義,接口定義服務的名字,接口方法定義每一個請求的輸入參數和返回結果。RPC內部的序列化、網絡通信等實現細節則由框架來完成,對用戶來講是徹底透明的。以前我使用.net開發過一套輕量級的分佈式框架(PPT看這裏,視頻看這裏),這套框架通過2年多的持續開發和改進已經運用到數款產品中(包括網絡遊戲和分佈式應用),取得了不錯的效果,等將來框架成熟後會考慮開源,本文討論的RPC基於這套框架展開。html
一般咱們的函數調用都是同步的,也就是調用方在發起請求後就能獲得結果(成功返回結果失敗則拋出異常),中間不能去幹其餘事情,與這種模式對應的RPC稱之爲請求-響應式模式。請求-響應式的優勢在於時序清晰,邏輯簡單,和普通的函數調用徹底等價。好比咱們能夠這樣定義RPC接口:服務器
1 [Protocol(ID=1)] 2 public interface ICalculate 3 { 4 [DispId(1)] 5 int Add(int p1, int p2); 6 }
客戶端就能夠像這樣使用接口:網絡
1 var calculate = new ICalculateProxy();//ICalculateProxy爲框架生成的接口代理類 2 calculate.Connect(url); 3 var result = calculate.Add(1, 2);
可是在分佈式中這種模式的缺點也是很是的明顯,第一個問題是網絡通信的延遲會嚴重的制約請求-響應式RPC的響應速度,使得請求吞吐量沒法知足性能須要,大量的CPU時間會阻塞在等待請求的響應上;第二個問題是請求-響應式只有由客戶端向服務端發起請求,服務端不能主動向客戶端發送事件通知,也就是缺少一種callback機制。session
針對請求-響應式的缺點咱們能夠用雙向通信機制來改進,首先去掉請求的返回值,全部的方法請求都定義爲無返回結果,這樣調用方在發出請求以後就能夠繼續幹後面的事情了,而不須要再等待服務返回結果。同時針對服務接口定義一個Callback接口用於服務端向客戶端發送請求結果和事件通知,這樣服務器就能夠主動向客戶端發送消息了。這種RPC模式能夠稱之爲雙向會話式,接口能夠這樣定義:閉包
1 [Protocol(ID=1), Callback(typeof(ICalculateCallback))] 2 public interface ICalculate 3 { 4 [DispId(1)] 5 void Add(int p1, int p2); 6 } 7 8 public interface ICalculateCallback : IServiceCallback 9 { 10 [DispId(1)] 11 void OnAdd(int result); 12 }
服務端能夠這樣實現服務接口:併發
1 public class CaculateService : ICaculateImpl //這裏ICaculateImpl爲框架生成的服務實現接口 2 { 3 ICaculateImpl.Add(Session session, int p1, int p2) 4 { 5 var result = p1 + p2; 6 session.Cllback.OnAdd(result); 7 } 8 }
雙向會話式解決了請求的異步處理以及服務器的雙向通信問題,可是也給調用者帶來了一些不便,例如上例中若是調用方發起多個Add請求,在收到OnAdd消息後如何將結果與請求關聯起來呢?一種解決方案是在Add請求中多加一個request id參數,服務器在處理完Add以後將request id放到OnAdd方法中和結果一塊兒傳給客戶端,客戶端根據request id來關聯請求與結果。這種手工處理的方式代碼寫起來很麻煩,那麼有沒有一種更好的RPC模式來解決這個問題呢?這就是下面給你們介紹的支持異步調用的RPC設計。框架
異步調用的主要設計思想是在雙向會話式的基礎上讓調用方經過一個回調函數來得到請求的結果,而再也不經過Callback接口來得到結果。採用回調函數的好處在於咱們可使用閉包來隱式的關聯請求和響應之間的上下文,這樣就不須要顯式的傳遞request id來手工關聯上下文了。而且服務器仍然能夠經過Callback接口向客戶端主動發送消息,保留了原來雙向通信的優勢。可是須要注意的是因爲請求在服務器上多是異步執行的,因此服務器不保證請求的響應是按順序返回的,這可能形成一些隱含的亂序問題,須要客戶端在調用時特別注意。若是響應須要嚴格的按照請求順序返回客戶端,那麼服務端須要同步處理請求,或者引入隊列機制對異步的響應進行排隊有序返回響應。異步
以前的ICalculate就能夠這樣定義:分佈式
[Protocol(ID=1), Callback(typeof(ICalculateCallback))] public interface ICalculate { [DispId(1), Async] void Add(int p1, int p2, Action<int> OnResult, Action<int,string> OnError = null); }
用Async這個標籤表示這個請求爲異步請求,調用者用OnResult回調函數來接收請求的結果,OnError則爲返回錯誤的回調函數,若是調用者不關心錯誤返回,那麼能夠不傳遞OnError,而在IServiceCallback的OnError方法中接收錯誤信息。
調用者能夠很方便的使用閉包來處理結果,同時隱藏異步的實現細節,像這樣:ide
1 void TestAdd(ICalculateProxy calculate, int p1, int p2) 2 { 3 calculate.Add(p1, p2, result => MessageBox.Show(string.Format("{0} + {1} = {2}", p1, p2, result), (errCode, errMsg) => MessageBox.Show("Add failed:" + errMsg)); 4 }
服務器端的實現是這樣的:
1 public class CaculateService : ICaculateImpl 2 { 3 ICaculateImpl.Add(Session session, int p1, int p2, ICaculate_AddAsyncReply reply) 4 { 5 try 6 { 7 var result = p1 + p2; 8 reply.OnResult(result); 9 } 10 catch(OverflowException e) 11 { 12 reply.OnError(-1, e.Message); 13 } 14 } 15 }
ICaculate_AddAsyncReply爲框架生成的返回異步結果的對象,有一個OnResult和一個OnError方法。有了這個reply對象以後,服務器的請求處理也能夠實現異步處理,客戶端請求不須要在請求函數裏一次完成,而是能夠放到其餘線程或者異步方法中處理,稍後在經過reply向客戶端返回結果。
下面咱們來看看框架在背後爲咱們作的一些實現細節,首先是客戶端的Proxy:
1 //在Proxy中使用一個RequestContext結構保存請求的上下文信息,上下文中記錄某個請求的惟一id,在調用時一塊兒發送到服務器: 2 struct RequestContext 3 { 4 public int reqId; 5 public Delegate OnResult; 6 public Action<int, string> OnError; 7 8 public RequestContext(int id, Delegate onResult, Action<int, string> onError) 9 { 10 reqId = id; 11 OnResult = onResult; 12 OnError = onError; 13 } 14 } 15 16 //服務器返回響應以後proxy就找出reqId對應的請求上下文,而後調用對應的回調函數傳遞結果
17 void OnAddReply(BinaryStreamReader __reader) 18 { 19 int reqId; 20 int ret; 21 __reader.Read(out reqId); 22 __reader.Read(out ret); 23 if(ret == 0) 24 { 25 int p0; 26 __reader.Read(out p0); 27 RequestContext ctx = PopAsyncRequest(reqId); 28 var __onResult = ctx.OnResult as Action<int>; 29 __onResult(p0); 30 } 31 else 32 { 33 RequestContext ctx = PopAsyncRequest(reqId); 34 string msg; 35 __reader.Read(out msg); 36 if(ctx.OnError != null) 37 ctx.OnError(ret, msg); 38 else 39 _handler.OnError(ret, msg); 40 } 41 }
服務端的一些實現細節:
1 //框架生成請求對應的異步響應類 2 public class ICaculate_AddAsyncReply : AsyncReply 3 { 4 public ICaculate_AddAsyncReply(int reqId, Connection conn) 5 { 6 _reqId = reqId; 7 _connection = conn; 8 } 9 10 public void OnError(int error, string msg) 11 { 12 var stream = new BinaryStreamWriter(); 13 stream.Write(1); 14 stream.Write(_reqId); 15 stream.Write(error); 16 stream.Write(msg); 17 _connection.Write(stream.BuildSendBuffer()); 18 } 19 public void OnResult(int result) 20 { 21 var stream = new BinaryStreamWriter(); 22 stream.Write(1); 23 stream.Write(_reqId); 24 stream.Write(0); 25 stream.Write(result); 26 _connection.Write(stream.BuildSendBuffer()); 27 } 28 }
框架生成的Stub類將收到的請求數據進行解析而後調用具體服務類來處理請求:
1 void AddInvoke(ICaculateImpl __service, Session __client, BinaryStreamReader __reader) 2 { 3 int p1; 4 int p2; 5 int __reqId; 6 __reader.Read(out __reqId); 7 __reader.Read(out p1); 8 __reader.Read(out p2); 9 var reply = new ICaculate_AddAsyncReply(__reqId, __client.Connection); 10 try 11 { 12 __service.Add(__client, p1, p2, reply); 13 } 14 catch(ServiceException e) 15 { 16 reply.OnError(e.ErrCode, e.Message); 17 Log.Info("Service Invoke Failed. clientId:{0} error message:{1}", __client.ID, e.Message); 18 } 19 catch(Exception e) 20 { 21 reply.OnError((int)ServiceErrorCode.Generic, "generic service error."); 22 Log.Error("Generic Service Invoke Failed, clientId:{0} error message:{1}\nCall Stack: {2}", __client.ID, e.Message, e.StackTrace); 23 } 24 }
因爲完整的框架代碼比較龐大,因此上面只貼了關鍵部分的實現細節。從實現細節咱們能夠看到,框架實際上也是經過request id來關聯請求和響應函數之間的上下文的,可是經過代碼生成機制隱藏了實現的細節,給使用者提供了一種優雅的抽象。
總結:在雙向會話式的RPC基礎上,引入了一種新的異步請求調用模式,讓調用者能夠經過閉包來方便的異步處理請求的響應結果,同時服務器端的請求處理也能夠實現異步處理。