這是接上一篇文章《RSF 分佈式服務框架設計》以後的續做,主要是 Hasor-RSF 的請求響應工做原理以及設計思路。服務器
很是首先很是感謝關注這一系列文章的兄弟們,因爲我我的時間不肯定因此 RSF 更新起來比較無規律。近段時間抽出了時間仔細設計了一下 RSF 發現高性能和靈活的可擴展性還真不是那麼輕易就能達到的。先說一下大致的工做流程。網絡
RSF 工做流程:架構
大致的工做思路是:一個完整的 Rsf 請求調用會在客戶端造成 Request接口,而後通過轉換加工變爲 Request消息,最後在經過Rsf協議傳輸到遠程機器。遠程機器在接收到 Request消息以後在將其從新轉換爲 Request接口,調用處理完畢以後在造成 Response 反饋給客戶端。併發
這整個請求響應過程當中一個方法調用會經歷兩次編碼解碼過程,一次在客戶端一次在服務端。所以序列化的性能十分關鍵。此外 Rsf 數據包的簡單與否也直接影響到 Encode,Decode的效率,這部分主要是網絡 IO開銷。框架
採用異步IO的優點在於每個請求響應的處理都是異步的、非阻塞的。請求響應的過程當中不影響各方自己業務系統的運行,同時能夠兼顧更多的併發調用。異步
RSF 單機狀況下保障高併發 & 高可用:分佈式
在高併發下,客戶端會不斷的向服務器發起請求,這相似於 DDOS 攻擊,因此要求服務器有能力抵禦這樣的高併發請求。當遇到這樣高併發請求的時,要先確保 RSF 服務器有能力處理這麼多請求。若是超出本身的處理能力應當丟棄超出承受能力的請求包。同時爲了保證高可用的目的,RSF 在丟棄請求以前應當作一件十分有意義的事情。通知客戶端服務器資源緊張,讓其能夠選擇其它服務器發起調用。這樣一來客戶端在發起 RSF 請求以後不至於由於服務器繁忙而傻等,也增長了調用成功概率。高併發
線程也是一種資源。在實現機制上應當選用異步IO的網絡模型,這樣能夠避免由於大量鏈接而產生過多的線程。畢竟維護這些線程也是須要消耗資源的。性能
考慮到業務線程執行時間不固定,同時網絡IO的時間也由於網絡情況而定。在架構設計上二者要徹底分開來處理,這樣才能兩方面都兼顧到。爲此 RSF 用到兩個線程池,一個專門用於處理網絡IO、另外一個專一於處理業務調用。測試
當網絡IO接收到完整的一個 RSF 消息時,將消息放入隊列等待調用線程處理。
下面就分別討論 網絡IO線程、調用線程的處理邏輯。
IO線程:
這部分功能由 Netty 的 Worker 線程擔當。對於IO線程主要的工做就是讀取網絡數據而且將其轉換爲 RSF 消息,而後放入隊列。不管執行結果如何 RSF 的IO線程都會將處理結果反饋給客戶端,下面這張圖列出了它的工做流程(綠色部分是要作的事情,粉色部分表明運行狀態)
完整的IO線程邏輯會在多個類中完成,它們主要分散在 RSFProtocolDecoder、InnerServerHandler 兩個類中。RSFProtocolDecoder 類主要負責將網絡數據包轉換成 RSF 消息並丟進 netty 的 pipeline 中。
/*RSFProtocolDecoder類*/ protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { ByteBuf frame = (ByteBuf) super.decode(ctx, in); if (frame == null) return null; //* byte[1] version RSF版本(0xC1) byte version = frame.getByte(0); short status = 0; //decode try { status = this.doDecode(version, ctx, frame);//協議解析 } catch (Throwable e) { status = ProtocolStatus.ProtocolError; } finally { if (status == ProtocolStatus.OK) return null; /*錯誤狀況*/ frame.skipBytes(1); fireProtocolError(ctx,version,frame.readLong(),ProtocolStatus.ProtocolError); } return null; }
上面的代碼來自於 Hasor-RSF 項目,rsf協議標記位於數據包的第一個字節。在上面代碼中能夠看到讀取了第零個字節用來判斷,主要判斷邏輯位於 doDecode 方法。該方法返回了一個狀態,狀態值表示了數據包的解析是否成功。若是返回狀態不是 「OK」或者執行過程當中發生意外,都會致使 fireProtocolError 方法的執行。而fireProtocolError 方法的做用就是向客戶端報告 ProtocolError 異常。
下面是 doDecode 方法的邏輯負責解析協議的,當協議正確解析以後經過 fireChannelRead 方法將讀取到的 RSF 消息丟進 pipeline 交給後面的 Handler 進行處理。
/**協議解析*/ private short doDecode(byte version,ChannelHandlerContext ctx,ByteBuf frame) throws IOException{ ProtocolType pType = ProtocolType.valueOf(version); if (pType == ProtocolType.Request) { //request Protocol<RequestSocketBlock> requestProtocol = ProtocolUtils.requestProtocol(version); if (requestProtocol != null) { RequestSocketBlock block = requestProtocol.decode(frame); RequestMsg reqMetaData = TransferUtils.requestToMessage(block); ctx.fireChannelRead(reqMetaData); return ProtocolStatus.OK;/*正常處理後返回*/ } } if (pType == ProtocolType.Response) { //response Protocol<ResponseSocketBlock> responseProtocol = ProtocolUtils.responseProtocol(version); if (responseProtocol != null) { ResponseSocketBlock block = responseProtocol.decode(frame); ResponseMsg resMetaData = TransferUtils.responseToMessage(block); ctx.fireChannelRead(resMetaData); return ProtocolStatus.OK;/*正常處理後返回*/ } } return ProtocolStatus.ProtocolError; }
接下來 InnerServerHandler 類接收到 Netty 解碼器發來的 RSF 消息並將其放入隊列,下面是處理邏輯。
if (msg instanceof RequestMsg == false) return; //建立request、response RequestMsg requestMsg = (RequestMsg) msg; requestMsg.setReceiveTime(System.currentTimeMillis()); //放入業務線程準備執行 try { Executor exe = this.rsfContext.getCallExecute(requestMsg.getServiceName()); exe.execute(new InnerRequestHandler(this.rsfContext, requestMsg, ctx.channel())); // ResponseMsg pack = TransferUtils.buildStatus(// requestMsg.getVersion(), //協議版本 requestMsg.getRequestID(),//請求ID ProtocolStatus.Accepted);//迴應ACK ctx.pipeline().write(pack); } catch (RejectedExecutionException e) { ResponseMsg pack = TransferUtils.buildStatus(// requestMsg.getVersion(), //協議版本 requestMsg.getRequestID(),//請求ID ProtocolStatus.ChooseOther);//服務器資源緊張 ctx.pipeline().write(pack); }
在前面提到 RSF 會在高併發訪問的狀況下,依照本身的實際能力來合理的丟棄過載的請求。以上代碼就是這個邏輯的重要保證。被服務器接受的 RSF 請求會響應 ACK,因爲系統過載要求客戶端選擇其它服務提供者時會收到 ChooseOther 包。
至於以後就沒有什麼了,簡短的兩段代碼其目的就是爲了快速的處理網絡傳來的 RSF 數據包。處理完畢以後會迅速處理下一個 RSF 數據包。
調用線程:
調用線程採用的是 ThreadPoolExecutor 類實現的。該類是一個固定數量的線程池,同時可使用有容量限制的隊列。正好符合 RSF 對調用線程的要求。
調用線程的入口程序位於 InnerRequestHandler 類中。調用線程的處理邏輯分爲三個步驟:
1、將 RSF 消息轉換爲 Request/Response 接口,這會引起反序列化操做。
2、檢查 timeout 。
3、執行調用,並向客戶端寫入返回值。
其中必需要特殊說明的是 timeout 檢查。在 RSF 中超時時間在客戶端和服務端都有配置。客戶端在發起 RSF 請求的時就會開始記錄時間,一旦調用超時就會丟棄全部遠程針對這次請求的 Response 響應。
同時做爲服務器,在接收到 RSF 數據包並向客戶端發送 ACK 時就會記錄請求到達服務端的那一刻時間。當正式開始執行調用的時候,調用線程會檢查時間差。是否達到了請求中要求的超時時間,若是超時後面的調用也不須要進行了。
這裏要注意的是,調用過程採用的超時判斷是依照客戶端傳來的時間以及服務端配置的超時時間共同決定的。假如客戶端配置的超時時間是3000毫秒,服務端配置的是 1000 毫秒。
那麼客戶端發起調用的時候會等待3000毫秒來接受返回值,可是在服務端因爲要求是1000毫秒所以當服務端收到調用請求迴應ACK以後,須要在1000毫秒以內正式執行 RSF 調用,不然調用線程會拋棄到這個調用迴應客戶端 Timeout。
因此要注意服務端配置的 timeout 不是業務方法最長可執行時間,而是 RSF 在迴應 ACK 以後最大的等待調用時間。下面是這段業務邏輯代碼
private RsfResponseImpl doRequest() { RsfRequestImpl request = null; RsfResponseImpl response = null; try { request = RuntimeUtils.recoverRequest(// requestMsg, new NetworkConnection(this.channel), this.rsfContext); response = request.buildResponse(); } catch (RsfException e) { Hasor.logError("recoverRequest fail, requestID:" + requestMsg.getRequestID() + " , " + e.getMessage()); // ResponseMsg pack = TransferUtils.buildStatus(// requestMsg.getVersion(), //協議版本 requestMsg.getRequestID(),//請求ID e.getStatus());//迴應狀態 this.channel.write(pack); return null; } //1.檢查timeout long lostTime = System.currentTimeMillis() - requestMsg.getReceiveTime(); int timeout=validateTimeout(requestMsg.getClientTimeout(),request.getMetaData()); if (lostTime > timeout) { response.sendStatus(ProtocolStatus.RequestTimeout, "request timeout. (client parameter)."); return response; } //2.執行調用 try { RsfFilter[] rsfFilters =this.rsfContext.getRsfFilters(request.getMetaData()); new InnerRsfFilterHandler(rsfFilters,InnerInvokeHandler.Default).doFilter(request,response); } catch (Throwable e) { //500 InternalServerError response.sendStatus(ProtocolStatus.InternalServerError, e.getMessage()); return response; } return response; }
擴展方式:
擴展 RSF 採用你們都熟悉的 「Web開發模式「,將 RSF 請求封裝爲 Request/Response。在此基礎上經過 RsfFilter 完成擴展。擴展 RSF 實現更復雜的邏輯只須要簡單的實現這個過濾器便可。
public class DemoRsfFilter implements RsfFilter { public void doFilter(RsfRequest request, RsfResponse response, RsfFilterChain chain) throws Throwable { try { //before chain.doFilter(request, response); } catch (Exception e) { //throws } finally { //after } } }
看到上面的擴展方式是否是很情切?開發者能夠根據發來的調用請求來決定是否真的調用業務邏輯,或者是選擇調用其它服務返回業務遠程服務執行的結果。
response.sendData(...);
又或者通知客戶端調用失敗,返回一個錯誤消息:
response.sendStatus(ProtocolStatus.Unauthorized, ...);
測試結果 & 性能指標:
通過測試這樣的架構性能表現確實很使人興奮,固然在個人實現中仍是有不少地方是能夠繼續優化的。下面是測試的服務類一個 sayHello 的服務:
public class TestServices { public String sayHello(String msg) { return msg; } }
服務器:選用個人開發機一臺筆記本電腦配置以下:
Intel Core(TM) i5-2520M 4核 CPU @2.5G 、普通 100M網卡、Windows 7 64位操做系統,8G內存。
服務端可處理的最大隊列設爲:4096,有 4 條線程負責處理調用、5 條線程負責處理網絡IO,1 條線程負責偵聽網絡端口。
客戶機:是一臺比較老的筆記本,配置爲: Intel Core 2 Duo 雙核 CPU,也是普通 100M 網卡。
在測試期間使用一個客戶端模擬 50 條線程發起請求,每條請求不斷的往服務器發送調用請求,任何服務端的返回都丟棄無論。
鏈接方式爲:兩臺電腦網線直連中間不通過路由器。
測試結果是:服務器峯值能夠處理約 1.7W 的請求,CPU 消耗維持在 80%~90% 左右、網卡使用率維持在 30%~40%。由於客戶端瘋狂的發送數據包因此會致使客戶端不少請求收到了 ChooseOther 迴應。
這個測試結果是目前階段的測試結果,各個系統參數還沒有進行調優。
出了併發測試還作過單鏈接下的壓力測試,和上面的參數同樣,單臺機器下壓測服務器能夠處理一個鏈接內7000+ 請求。
淘寶的HSF,在專業服務器上 16核 CPU,下 2000+鏈接,性能聽說能夠跑到 10W。 RSF 因爲開發僅僅完成了主要部分。再加上測試環境不同,性能還很差放在一塊兒比較。
總的來講 RSF 的初期表現仍是很讓人滿意的,也歡迎各位大神拍磚,討論。
---------------------------------
相關連接: