#<i class="icon-file">Gaea請求處理流程</i>java
Gaea支持tcp/http/telnet三種通訊信息,其中主要的通訊部分是由netty通訊框架完成,netty提供了一種高性能的非阻塞通訊工具。bootstrap
##<i class="icon-share">Gaea各服務啓動</i>服務器
啓動服務的配置,這裏就tcp的配置簡單介紹一下,telnet,http的基本相同。多線程
<property> <name>gaea.servers</name> <value>gaea.server.tcp,gaea.server.http,gaea.server.telnet</value> </property> <property> <name>gaea.server.tcp.enable</name> <value>true</value> </property> <property> <name>gaea.server.tcp.implement</name> <value>com.bj58.spat.gaea.server.core.communication.tcp.SocketServer</value> </property> //其中還有一些配置IP和端口,字符緩衝區大小等,就很少送,看配置便可明白
配置以上的配置,便可標明TCP服務的啓動。也能夠本身實現某一個服務,繼承IServer接口,而後在配置文件中添加以上配置,便可在服務啓動的時候,啓動你的服務。如Gaea監控服務。app
IServer框架
public interface IServer { public void start() throws Exception; public void stop() throws Exception; }
SocketServer是TCP服務的IServer的實現類。dom
//SocketServer的主要代碼 initSocketServer() { //初始化TCPServer SocketHandler handler = new SocketHandler(); //TCP服務處理事件的Handler bootstrap.setPipelineFactory(new SocketPipelineFactory(handler, Global.getSingleton().getServiceConfig().getInt("gaea.server.tcp.frameMaxLength"))); bootstrap.setOption("child.tcpNoDelay", tcpNoDelay); bootstrap.setOption("child.receiveBufferSize", Global.getSingleton().getServiceConfig().getInt("gaea.server.tcp.receiveBufferSize")); bootstrap.setOption("child.sendBufferSize", Global.getSingleton().getServiceConfig().getInt("gaea.server.tcp.sendBufferSize")); try { InetSocketAddress socketAddress = null; socketAddress = new InetSocketAddress(Global.getSingleton().getServiceConfig().getString("gaea.server.tcp.listenIP"), Global.getSingleton().getServiceConfig().getInt("gaea.server.tcp.listenPort")); Channel channel = bootstrap.bind(socketAddress); //綁定地址 allChannels.add(channel); //把生成的channel放入一個ChannelGroup中,提供後續使用 } catch (Exception e) { logger.error("init socket server error", e); System.exit(1); } }
關於netty,網上的資料也不少,能夠多看看。異步
##<i class="icon-share">Gaea接收數據</i>socket
netty接收數據等事件處理,都在Handler中實現,所以在這裏說一下SocketHandlerasync
在Channel被綁定的時候,觸發ChannelOpen事件;
SocketServer.allChannels.add(e.getChannel()); /** * 若是當前服務啓動權限認證,則增長當前鏈接對應的SecureContext */ if(Global.getSingleton().getGlobalSecureIsRights()){//是否經過鏈接認證 Global.getSingleton().addChannelMap(e.getChannel(), new SecureContext());此鏈接對應一個SecureContext。 }
在客戶端鏈接的時候,觸發channelConnected事件;
for(IFilter filter : Global.getSingleton().getConnectionFilterList()) { //Global中取出啓動時,註冊的鏈接過濾器。 filter.filter(new GaeaContext(new GaeaChannel(e.getChannel())));// 執行鏈接過濾器。 }
主要執行鏈接過濾器,對鏈接進行控制
接收數據
try { logger.debug("message receive"); ByteBuffer buffer = ((ChannelBuffer)e.getMessage()).toByteBuffer(); //Netty接收到的數據在ChannelBuffer中,在此轉爲NIO的ByteBuffer byte[] reciveByte = buffer.array(); logger.debug("reciveByte.length:" + reciveByte.length); byte[] headDelimiter = new byte[0]; System.arraycopy(reciveByte, 0, headDelimiter, 0, 0); byte[] requestBuffer = new byte[reciveByte.length]; System.arraycopy(reciveByte, 0, requestBuffer, 0, (reciveByte.length)); GaeaContext gaeaContext = new GaeaContext(requestBuffer, //生成Gaea上下文GaeaContext; GaeaContext將貫穿於整個消息的處理流程。 new GaeaChannel(e.getChannel()), ServerType.TCP, this); SocketServer.invokerHandle.invoke(gaeaContext);//處理這個GaeaContext } catch(Throwable ex) { byte[] response = ExceptionHelper.createErrorProtocol(); //若是是異常,按照異常協議,返回給調用者 e.getChannel().write(response); logger.error("SocketHandler invoke error", ex); }
在invoke中,Gaea提供同步和異步兩種處理方式,固然同步應該是初期產物,如今處理基本都是異步處理
##<i class="icon-share">同步處理流程</i>
@Override public void invoke(GaeaContext context) throws Exception { try { for(IFilter f : Global.getSingleton().getGlobalRequestFilterList()) {//請求過濾器 if(context.getExecFilter() == ExecFilterType.All || context.getExecFilter() == ExecFilterType.RequestOnly) { f.filter(context); //執行請求過濾器的filter方法 } } if(context.isDoInvoke()) { doInvoke(context);//執行請求方法 } logger.debug("begin response filter"); // response filter for(IFilter f : Global.getSingleton().getGlobalResponseFilterList()) {//返回過濾器 if(context.getExecFilter() == ExecFilterType.All || context.getExecFilter() == ExecFilterType.ResponseOnly) { f.filter(context);//執行返回過濾器的filter方法 } } context.getServerHandler().writeResponse(context);//寫回給調用者 } catch(Exception ex) { context.setError(ex); context.getServerHandler().writeResponse(context); //若是出現異常,將異常寫回給調用者 logger.error("in async messageReceived", ex); } }
從以上代碼中能夠看出,Gaea先通過了一系列請求過濾器,而後才執行真正的方法,最終再執行返回過濾器,最後寫回給客戶端。
##<i class="icon-share">異步處理流程</i>
調用invoke以後,Gaea的此次請求將會被放入AsyncInvoker的異步執行器中執行,並快速返回,接收下次請求;
asyncInvoker.run(taskTimeOut, new IAsyncHandler(){...};
IAsynHandler中,具體有三個方法,run是執行任務,messageReceived是執行完以後,返回執行結果,exceptionCaught對執行過程當中的全部異常進行處理。IAsyncHandler定義以下:
public interface IAsyncHandler { public Object run() throws Throwable; public void messageReceived(Object obj); public void exceptionCaught(Throwable e); }
在執行asyncInvoker.run的時候,異步執行器,只是把任務扔給了64個隊列,此處默認是64,也能夠進行配置,配置項gaea.async.worker.count
AsyncTask task = new AsyncTask(timeOut, handler); //handler放入異步任務中,timeOut爲此任務的超時時間, if(rr > 10000) { rr = 0; } int idx = rr % workers.length; //輪詢放入多個隊列 workers[idx].addTask(task); ++rr;
其中異步任務中有一個超時時間,若是在隊列中的時間大於這個值,Gaea將拋棄此任務,保護總體服務的正常運行,這個也就是Gaea的服務器負載保護策略,防止服務端壓力過大宕機,丟棄部分任務,以保護大多數任務的有效執行。
在初始化異步執行器的時候,啓動了64個工做線程和一個線程池
private AsyncInvoker(int workerCount, boolean timeoutEffect) { workers = new AsyncWorker[workerCount]; ExecutorService executor = Executors.newCachedThreadPool(new ThreadRenameFactory("async task thread")); for(int i=0; i<workers.length; i++) { workers[i] = new AsyncWorker(executor, timeoutEffect); workers[i].setDaemon(true); workers[i].setName("async task worker[" + i + "]"); workers[i].start(); } }
在此,提供64個線程和一個線程池的做用是Gaea提供的兩種處理任務的方式,一種是任務分離,64個隊列各自處理各自的任務,一種是線程池,處理單個隊列,並設置了任務的多執行時間。
private void execNoTimeLimitTask() { AsyncTask task = null; try { task = taskQueue.take(); if(task != null){ if((System.currentTimeMillis() - task.getAddTime()) > task.getQtimeout()) { //超時 task.getHandler().exceptionCaught(new TimeoutException("async task timeout!")); return; } else { Object obj = task.getHandler().run(); //執行 task.getHandler().messageReceived(obj); //返回 } }else{ logger.error("execNoTimeLimitTask take task is null"); } } catch(InterruptedException ie) { } catch(Throwable ex) { if(task != null) { task.getHandler().exceptionCaught(ex); //處理異常 } } }
try { final AsyncTask task = taskQueue.take(); if(task != null) { if((System.currentTimeMillis() - task.getAddTime()) > task.getQtimeout()) { task.getHandler().exceptionCaught(new TimeoutException("async task timeout!")); return; }else{ final CountDownLatch cdl = new CountDownLatch(1); executor.execute(new Runnable(){ @Override public void run() { try { Object obj = task.getHandler().run(); //執行 task.getHandler().messageReceived(obj); //返回 } catch(Throwable ex) { task.getHandler().exceptionCaught(ex); } finally { cdl.countDown(); } } } ); cdl.await(getTimeout(task.getTimeout(), taskQueue.size()), TimeUnit.MILLISECONDS); //等待cdl.countDown通知,不然超時,任務拋棄。 if(cdl.getCount() > 0) { task.getHandler().exceptionCaught(new TimeoutException("async task timeout!")); //異常 } } }else{ logger.error("execTimeoutTask take task is null"); } } catch(InterruptedException ie) { logger.error(""); } catch (Throwable e) { logger.error("get task from poll error", e); }
public Object run() throws Throwable { logger.debug("begin request filter"); // request filter for(IFilter f : Global.getSingleton().getGlobalRequestFilterList()) { if(context.getExecFilter() == ExecFilterType.All || context.getExecFilter() == ExecFilterType.RequestOnly) { f.filter(context); } } if(context.isDoInvoke()) { if(context.getServerType() == ServerType.HTTP){ //對http服務進行處理 httpThreadLocal.set(context.getHttpContext()); } doInvoke(context); } logger.debug("begin response filter"); // response filter for(IFilter f : Global.getSingleton().getGlobalResponseFilterList()) { if(context.getExecFilter() == ExecFilterType.All || context.getExecFilter() == ExecFilterType.ResponseOnly) { f.filter(context); } } return context; }
從以上代碼能夠看出,處理流程上,跟同步基本相同,只是加了對http服務的處理,所以能夠看出,同步是不支持HTTP服務的。返回部分和異常也不是在run中執行,而是分散開,在messageReceived和exceptionCaught中進行處理。具體祥看代碼。
##<i class="icon-share">請求過濾器</i>
請求過濾器,顧名思義,是在執行任務以前,Gaea對請求作的一些處理。Gaea默認的框架的請求過濾器都有哪些?
<property> <name>gaea.filter.global.request</name> <value>com.bj58.spat.gaea.server.filter.ProtocolParseFilter,com.bj58.spat.gaea.server.filter.HandclaspFilter,com.bj58.spat.gaea.server.filter.ExecuteMethodFilter</value> </property>
在Gaea剛接收到數據的時候,Gaea將請求的二進制流放入了上下文根GaeaContext,在此Gaea對二進制流進行了解析,還原,用於執行任務
public void filter(GaeaContext context) throws Exception { if(context.getServerType() == ServerType.TCP) { byte[] desKeyByte = null; String desKeyStr = null; boolean bool = false; Global global = Global.getSingleton(); if(global != null){ //判斷當前服務啓用權限認證 if(global.getGlobalSecureIsRights()){ SecureContext securecontext = global.getGlobalSecureContext(context.getChannel().getNettyChannel()); bool = securecontext.isRights(); if(bool){ desKeyStr = securecontext.getDesKey(); } } } if(desKeyStr != null){ desKeyByte = desKeyStr.getBytes("utf-8"); } Protocol protocol = Protocol.fromBytes(context.getGaeaRequest().getRequestBuffer(),global.getGlobalSecureIsRights(),desKeyByte); //根據deskey進行協議解析 context.getGaeaRequest().setProtocol(protocol); /** * 服務重啓直接返回 */ if(Global.getSingleton().getServerState() == ServerStateType.Reboot && protocol.getPlatformType() == PlatformType.Java){ GaeaResponse response = new GaeaResponse(); ResetProtocol rp = new ResetProtocol(); rp.setMsg("This server is reboot!"); protocol.setSdpEntity(rp); response.setResponseBuffer(protocol.toBytes(global.getGlobalSecureIsRights(),desKeyByte)); context.setGaeaResponse(response); context.setExecFilter(ExecFilterType.None); context.setDoInvoke(false); } } }
整個流程如以上代碼,先判斷是否啓用權限認證,再根據權限認證,對二進制流進行協議解析。關於Gaea協議,是一個自定義的二進制協議,具體另起文章詳解。解析後若是是服務重啓任務,則寫入GaeaContext,供後續操做。
客戶端第一次請求,會跟服務端進行DES加解密的交互,肯定客戶端是否有權限調用此服務,具體過程見代碼
/** * 權限認證filter */ @Override public void filter(GaeaContext context) throws Exception { Protocol protocol = context.getGaeaRequest().getProtocol(); if(protocol.getPlatformType() == PlatformType.Java && context.getServerType() == ServerType.TCP){//java 客戶端支持權限認證 GaeaResponse response = new GaeaResponse(); Global global = Global.getSingleton(); //是否啓用權限認證 if(Global.getSingleton().getGlobalSecureIsRights()){ SecureContext sc = global.getGlobalSecureContext(context.getChannel().getNettyChannel()); //判斷當前channel是否經過認證 if(!sc.isRights()){ //沒有經過認證 if(protocol != null && protocol.getSdpEntity() instanceof HandclaspProtocol){ SecureKey sk = new SecureKey(); HandclaspProtocol handclaspProtocol = (HandclaspProtocol)protocol.getSdpEntity(); /** * 接收 客戶端公鑰 */ if("1".equals(handclaspProtocol.getType())){ sk.initRSAkey(); //客戶端發送公鑰數據 String clientPublicKey = handclaspProtocol.getData(); if(null == clientPublicKey || "".equals(clientPublicKey)){ logger.warn("get client publicKey warn!"); } //java 客戶端 if(protocol.getPlatformType() == PlatformType.Java){ //服務器生成公/私鑰,公鑰傳送給客戶端 sc.setServerPublicKey(sk.getStringPublicKey()); sc.setServerPrivateKey(sk.getStringPrivateKey()); sc.setClientPublicKey(clientPublicKey); handclaspProtocol.setData(sk.getStringPublicKey());//服務器端公鑰 } protocol.setSdpEntity(handclaspProtocol); response.setResponseBuffer(protocol.toBytes()); context.setGaeaResponse(response); this.setInvokeAndFilter(context); logger.info("send server publieKey sucess!"); } /** * 接收權限文件 */ else if("2".equals(handclaspProtocol.getType())){ //客戶端加密受權文件 String clientSecureInfo = handclaspProtocol.getData(); if(null == clientSecureInfo || "".equals(clientSecureInfo)){ logger.warn("get client secureKey warn!"); } //受權文件客戶端原文(服務器私鑰解密) String sourceInfo = sk.decryptByPrivateKey(clientSecureInfo, sc.getServerPrivateKey()); //校驗受權文件是否相同 //判斷是否合法,若是合法服務器端生成DES密鑰,經過客戶端提供的公鑰進行加密傳送給客戶端 if(global.containsSecureMap(sourceInfo)){ logger.info("secureKey is ok!"); String desKey = StringUtils.getRandomNumAndStr(8); //設置當前channel屬性 sc.setDesKey(desKey); sc.setRights(true); handclaspProtocol.setData(sk.encryptByPublicKey(desKey, sc.getClientPublicKey())); protocol.setSdpEntity(handclaspProtocol); response.setResponseBuffer(protocol.toBytes()); context.setGaeaResponse(response); }else{ logger.error("It's bad secureKey!"); this.ContextException(context, protocol, response, "受權文件錯誤!"); } this.setInvokeAndFilter(context); }else{ //權限認證 異常狀況 logger.error("權限認證異常!"); this.ContextException(context, protocol, response, "權限認證 異常!"); } response = null; sk = null; handclaspProtocol = null; }else{ //客戶端沒有啓動權限認證 logger.error("客戶端沒有啓用權限認證!"); this.ContextException(context, protocol, response, "客戶端沒有啓用權限認證!"); } } }else{ if(protocol != null && protocol.getSdpEntity() instanceof HandclaspProtocol){ //異常--當前服務器沒有啓動權限認證 logger.error("當前服務沒有啓用權限認證!"); this.ContextException(context, protocol, response, "當前服務沒有啓用權限認證!"); } } } }
服務端受權文件,對須要執行的方法,進行受權配置,當調用者調用的時候,此方法是否通過受權。在gaea中沒有看到關於此受權配置的文件。這裏就很少說,內部系統對於受權訪問的控制並不嚴格。
##<i class="icon-share">執行任務</i>
執行任務是doInvoke(gaeaContext)
這一步比較簡單,根據協議解析過濾器解析出來的請求數據,找到請求的類名,再根據類名,從IProxyFactory中取出與之對應的代理類,而後代理去執行真正的方法,關於IProxyFactory類,請看Gaea的啓動過程當中如何生成的。
IProxyStub localProxy = Global.getSingleton().getProxyFactory().getProxy(request.getLookup()); //獲取代理類 GaeaResponse gaeaResponse = localProxy.invoke(context);//執行真正的方法
在此過程當中,還對各類異常作了處理,全部的處理結果都放到了GaeaContext中。StopWatch主要記錄調用信息,並在返回過濾器中,記錄執行時間。
##<i class="icon-share">返回過濾器</i>
框架的返回過濾器
<!-- global response filter --> <property> <name>gaea.filter.global.response</name> <value>com.bj58.spat.gaea.server.filter.ProtocolCreateFilter,com.bj58.spat.gaea.server.filter.ExecuteTimeFilter</value> </property>
context.getGaeaResponse().setResponseBuffer(protocol.toBytes(Global.getSingleton().getGlobalSecureIsRights(),desKeyByte));
將最終執行結果轉換爲二進制流放入GaeaContext。
對方法的執行時間進行監控,並將結果發到一個UDP日誌服務器。
public void filter(GaeaContext context) throws Exception { StopWatch sw = context.getStopWatch(); Collection<PerformanceCounter> pcList = sw.getMapCounter().values(); for(PerformanceCounter pc : pcList) { if(pc.getExecuteTime() > minRecordTime) { StringBuilder sbMsg = new StringBuilder(); sbMsg.append(serviceName); sbMsg.append("--"); sbMsg.append(pc.getKey()); sbMsg.append("--time: "); sbMsg.append(pc.getExecuteTime()); sbMsg.append(" [fromIP: "); sbMsg.append(sw.getFromIP()); sbMsg.append(";localIP: "); sbMsg.append(sw.getLocalIP()+"]"); udpClient.send(sbMsg.toString()); } }
##<i class="icon-share">結果返回</i>
在以上過濾器,執行等過程,都是將所獲得的結果封裝到了上下文GaeaContext中,在這一步將其返回
public void messageReceived(Object obj) { if(context.getServerType() == ServerType.HTTP){ httpThreadLocal.remove(); } if(obj != null) { GaeaContext ctx = (GaeaContext)obj; ctx.getServerHandler().writeResponse(ctx); } else { logger.error("context is null!"); } } if(context != null && context.getGaeaResponse() != null){ context.getChannel().write(context.getGaeaResponse().getResponseBuffer()); //getResponseBuffer 結果的二進制流 } else { context.getChannel().write(new byte[]{0}); logger.error("context is null or response is null in writeResponse"); }
##<i class="icon-share">異常處理</i>
Gaea服務的執行過程當中,全部的異常都會拋出,被exceptionCaught(exception)接收,並通過Gaea封裝,序列化,返回給客戶端,告訴調用者,是什麼緣由致使調用失敗。
public void exceptionCaught(Throwable e) { if(context.getServerType() == ServerType.HTTP){ httpThreadLocal.remove(); } if(context.getGaeaResponse() == null){ GaeaResponse respone = new GaeaResponse(); context.setGaeaResponse(respone); } try { byte[] desKeyByte = null; String desKeyStr = null; boolean bool = false; Global global = Global.getSingleton(); if(global != null){ //判斷當前服務啓用權限認證 if(global.getGlobalSecureIsRights()){ SecureContext securecontext = global.getGlobalSecureContext(context.getChannel().getNettyChannel()); bool = securecontext.isRights(); if(bool){ desKeyStr = securecontext.getDesKey(); } } } if(desKeyStr != null){ desKeyByte = desKeyStr.getBytes("utf-8"); } Protocol protocol = context.getGaeaRequest().getProtocol(); if(protocol == null){ protocol = Protocol.fromBytes(context.getGaeaRequest().getRequestBuffer(),global.getGlobalSecureIsRights(),desKeyByte); context.getGaeaRequest().setProtocol(protocol); } protocol.setSdpEntity(ExceptionHelper.createError(e)); context.getGaeaResponse().setResponseBuffer(protocol.toBytes(Global.getSingleton().getGlobalSecureIsRights(),desKeyByte)); } catch (Exception ex) { context.getGaeaResponse().setResponseBuffer(new byte[]{0}); logger.error("AsyncInvokerHandle invoke-exceptionCaught error", ex); } context.getServerHandler().writeResponse(context); logger.error("AsyncInvokerHandle invoke error", e); } });
##<i class="icon-share">總結</i>
至此,一個客戶端的請求處理完畢,在Gaea的整個設計中能夠看出,不少東西都留出了接口,可以很好的在框架自己,作一些適合本身業務的處理,整個設計,則決定了服務通訊框架的性能。
###le284