高性能服務通訊框架Gaea的詳細實現--server請求處理流程

#<i class="icon-file">Gaea請求處理流程</i>java

Gaea支持tcp/http/telnet三種通訊信息,其中主要的通訊部分是由netty通訊框架完成,netty提供了一種高性能的非阻塞通訊工具。bootstrap

##<i class="icon-share">Gaea各服務啓動</i>服務器

啓動服務的配置,這裏就tcp的配置簡單介紹一下,telnet,http的基本相同。多線程

<property>
<name>gaea.servers</name> 
<value>gaea.server.tcp,gaea.server.http,gaea.server.telnet</value>
</property>
<property>
<name>gaea.server.tcp.enable</name>
<value>true</value>
</property>
<property>
<name>gaea.server.tcp.implement</name>
<value>com.bj58.spat.gaea.server.core.communication.tcp.SocketServer</value>
</property>
//其中還有一些配置IP和端口,字符緩衝區大小等,就很少送,看配置便可明白

配置以上的配置,便可標明TCP服務的啓動。也能夠本身實現某一個服務,繼承IServer接口,而後在配置文件中添加以上配置,便可在服務啓動的時候,啓動你的服務。如Gaea監控服務。app

IServer框架

public interface IServer {

	public void start() throws Exception;

	public void stop() throws Exception;
}

SocketServer是TCP服務的IServer的實現類。dom

//SocketServer的主要代碼
    initSocketServer() { //初始化TCPServer
        SocketHandler handler = new SocketHandler(); //TCP服務處理事件的Handler
        bootstrap.setPipelineFactory(new SocketPipelineFactory(handler, 
        		Global.getSingleton().getServiceConfig().getInt("gaea.server.tcp.frameMaxLength")));
        bootstrap.setOption("child.tcpNoDelay", tcpNoDelay);
        bootstrap.setOption("child.receiveBufferSize", 
        		Global.getSingleton().getServiceConfig().getInt("gaea.server.tcp.receiveBufferSize"));
        bootstrap.setOption("child.sendBufferSize", 
        		Global.getSingleton().getServiceConfig().getInt("gaea.server.tcp.sendBufferSize"));

        try {
        	InetSocketAddress socketAddress = null;
        	socketAddress = new InetSocketAddress(Global.getSingleton().getServiceConfig().getString("gaea.server.tcp.listenIP"),
        			Global.getSingleton().getServiceConfig().getInt("gaea.server.tcp.listenPort"));
            Channel channel = bootstrap.bind(socketAddress); //綁定地址
            allChannels.add(channel); //把生成的channel放入一個ChannelGroup中,提供後續使用
		} catch (Exception e) {
			logger.error("init socket server error", e);
			System.exit(1);
		}
    
    }

關於netty,網上的資料也不少,能夠多看看。異步

##<i class="icon-share">Gaea接收數據</i>socket

netty接收數據等事件處理,都在Handler中實現,所以在這裏說一下SocketHandlerasync

  1. channelOpen

在Channel被綁定的時候,觸發ChannelOpen事件;

SocketServer.allChannels.add(e.getChannel());
		/**
		 * 若是當前服務啓動權限認證,則增長當前鏈接對應的SecureContext
		 */
		if(Global.getSingleton().getGlobalSecureIsRights()){//是否經過鏈接認證
			Global.getSingleton().addChannelMap(e.getChannel(), new SecureContext());此鏈接對應一個SecureContext。
		}
  1. channelConnected

在客戶端鏈接的時候,觸發channelConnected事件;

for(IFilter filter : Global.getSingleton().getConnectionFilterList()) { //Global中取出啓動時,註冊的鏈接過濾器。
			filter.filter(new GaeaContext(new GaeaChannel(e.getChannel())));// 執行鏈接過濾器。
		}

主要執行鏈接過濾器,對鏈接進行控制

  1. messageReceived

接收數據

try {
			logger.debug("message receive");
			ByteBuffer buffer = ((ChannelBuffer)e.getMessage()).toByteBuffer(); //Netty接收到的數據在ChannelBuffer中,在此轉爲NIO的ByteBuffer
			byte[] reciveByte = buffer.array();
			logger.debug("reciveByte.length:" + reciveByte.length);
			
			byte[] headDelimiter = new byte[0];
			System.arraycopy(reciveByte, 0, headDelimiter, 0, 0);
			
			byte[] requestBuffer = new byte[reciveByte.length];
			System.arraycopy(reciveByte, 0, requestBuffer, 0, (reciveByte.length));
			
			GaeaContext gaeaContext = new GaeaContext(requestBuffer, //生成Gaea上下文GaeaContext; GaeaContext將貫穿於整個消息的處理流程。
					new GaeaChannel(e.getChannel()), 
					ServerType.TCP,
					this);
			
			SocketServer.invokerHandle.invoke(gaeaContext);//處理這個GaeaContext
		} catch(Throwable ex) {
			byte[] response = ExceptionHelper.createErrorProtocol(); //若是是異常,按照異常協議,返回給調用者
			e.getChannel().write(response);
			logger.error("SocketHandler invoke error", ex);
		}

在invoke中,Gaea提供同步和異步兩種處理方式,固然同步應該是初期產物,如今處理基本都是異步處理

##<i class="icon-share">同步處理流程</i>

@Override
	public void invoke(GaeaContext context) throws Exception {
		try {
			for(IFilter f : Global.getSingleton().getGlobalRequestFilterList()) {//請求過濾器
				if(context.getExecFilter() == ExecFilterType.All || context.getExecFilter() == ExecFilterType.RequestOnly) {
					f.filter(context); //執行請求過濾器的filter方法
				}
			}
			
			if(context.isDoInvoke()) {
				doInvoke(context);//執行請求方法
			}
			
			logger.debug("begin response filter");
			// response filter
			for(IFilter f : Global.getSingleton().getGlobalResponseFilterList()) {//返回過濾器
				if(context.getExecFilter() == ExecFilterType.All || context.getExecFilter() == ExecFilterType.ResponseOnly) {
					f.filter(context);//執行返回過濾器的filter方法
				}
			}
			context.getServerHandler().writeResponse(context);//寫回給調用者
		} catch(Exception ex) {
			context.setError(ex);
			context.getServerHandler().writeResponse(context); //若是出現異常,將異常寫回給調用者
			logger.error("in async messageReceived", ex);
		}
	}

從以上代碼中能夠看出,Gaea先通過了一系列請求過濾器,而後才執行真正的方法,最終再執行返回過濾器,最後寫回給客戶端。

##<i class="icon-share">異步處理流程</i>

調用invoke以後,Gaea的此次請求將會被放入AsyncInvoker的異步執行器中執行,並快速返回,接收下次請求;

asyncInvoker.run(taskTimeOut, new IAsyncHandler(){...};

IAsynHandler中,具體有三個方法,run是執行任務,messageReceived是執行完以後,返回執行結果,exceptionCaught對執行過程當中的全部異常進行處理。IAsyncHandler定義以下:

public interface IAsyncHandler {
	public Object run() throws Throwable;
	public void messageReceived(Object obj);
	public void exceptionCaught(Throwable e);
}

在執行asyncInvoker.run的時候,異步執行器,只是把任務扔給了64個隊列,此處默認是64,也能夠進行配置,配置項gaea.async.worker.count

AsyncTask task = new AsyncTask(timeOut, handler); //handler放入異步任務中,timeOut爲此任務的超時時間,
		if(rr > 10000) {
			rr = 0;
		}
		int idx = rr % workers.length; //輪詢放入多個隊列
		workers[idx].addTask(task);
		++rr;

其中異步任務中有一個超時時間,若是在隊列中的時間大於這個值,Gaea將拋棄此任務,保護總體服務的正常運行,這個也就是Gaea的服務器負載保護策略,防止服務端壓力過大宕機,丟棄部分任務,以保護大多數任務的有效執行。

在初始化異步執行器的時候,啓動了64個工做線程和一個線程池

private AsyncInvoker(int workerCount, boolean timeoutEffect) {
		workers = new AsyncWorker[workerCount];
		ExecutorService executor = Executors.newCachedThreadPool(new ThreadRenameFactory("async task thread"));
		
		for(int i=0; i<workers.length; i++) {
			workers[i] = new AsyncWorker(executor, timeoutEffect);
			workers[i].setDaemon(true);
			workers[i].setName("async task worker[" + i + "]");
			workers[i].start();
		}
	}

在此,提供64個線程和一個線程池的做用是Gaea提供的兩種處理任務的方式,一種是任務分離,64個隊列各自處理各自的任務,一種是線程池,處理單個隊列,並設置了任務的多執行時間。

  1. 64個隊列處理各自任務 優勢:資源隔離,互不影響 肯定:若有一個任務執行慢,將影響此隊列中剩餘任務,致使整個隊列拋棄。 解決方案:處理現場不止處理本身隊列的任務,在線程空閒時,可竊取其它隊列的任務。
private void execNoTimeLimitTask() {
		AsyncTask task = null;
		try {
			task = taskQueue.take();
			if(task != null){
				if((System.currentTimeMillis() - task.getAddTime()) > task.getQtimeout()) { //超時
					task.getHandler().exceptionCaught(new TimeoutException("async task timeout!"));
					return;
				} else {
					Object obj = task.getHandler().run(); //執行
					task.getHandler().messageReceived(obj); //返回
				}
			}else{
				logger.error("execNoTimeLimitTask take task is null");
			}
		} catch(InterruptedException ie) {
			
		} catch(Throwable ex) {
			if(task != null) {
				task.getHandler().exceptionCaught(ex); //處理異常
			}
		}
	}
  1. 線程池處理單個隊列任務 缺點:隊列異常,可致使全部任務受到影響。newCachedThreadPool的線程池,可致使建立過多線程 優勢:若有個別任務較慢,也不影響其它任務執行。
try {
			final AsyncTask task = taskQueue.take();
			if(task != null) {
				if((System.currentTimeMillis() - task.getAddTime()) > task.getQtimeout()) {
					task.getHandler().exceptionCaught(new TimeoutException("async task timeout!"));
					return;
				}else{
					final CountDownLatch cdl = new CountDownLatch(1); 
					executor.execute(new Runnable(){
							@Override
							public void run() {
								try {
									Object obj = task.getHandler().run(); //執行
									task.getHandler().messageReceived(obj); //返回
								} catch(Throwable ex) {
									task.getHandler().exceptionCaught(ex);
								} finally {
									cdl.countDown();
								}
							}
						}
					);
					cdl.await(getTimeout(task.getTimeout(), taskQueue.size()), TimeUnit.MILLISECONDS); //等待cdl.countDown通知,不然超時,任務拋棄。
					if(cdl.getCount() > 0) {
						task.getHandler().exceptionCaught(new TimeoutException("async task timeout!")); //異常
					}
				}
			}else{
				logger.error("execTimeoutTask take task is null");
			}
		} catch(InterruptedException ie) {
			logger.error("");
		} catch (Throwable e) {
			logger.error("get task from poll error", e);
		}
  1. run執行過程
public Object run() throws Throwable {
				logger.debug("begin request filter");
				// request filter
				for(IFilter f : Global.getSingleton().getGlobalRequestFilterList()) {
					if(context.getExecFilter() == ExecFilterType.All || context.getExecFilter() == ExecFilterType.RequestOnly) {
						f.filter(context);
					}
				}
				
				if(context.isDoInvoke()) {
					if(context.getServerType() == ServerType.HTTP){ //對http服務進行處理
						httpThreadLocal.set(context.getHttpContext());
					}
					doInvoke(context);
				}
				
				logger.debug("begin response filter");
				// response filter
				for(IFilter f : Global.getSingleton().getGlobalResponseFilterList()) {
					if(context.getExecFilter() == ExecFilterType.All || context.getExecFilter() == ExecFilterType.ResponseOnly) {
						f.filter(context);
					}
				}
				return context;
			}

從以上代碼能夠看出,處理流程上,跟同步基本相同,只是加了對http服務的處理,所以能夠看出,同步是不支持HTTP服務的。返回部分和異常也不是在run中執行,而是分散開,在messageReceived和exceptionCaught中進行處理。具體祥看代碼。

##<i class="icon-share">請求過濾器</i>

請求過濾器,顧名思義,是在執行任務以前,Gaea對請求作的一些處理。Gaea默認的框架的請求過濾器都有哪些?

<property>
<name>gaea.filter.global.request</name>
<value>com.bj58.spat.gaea.server.filter.ProtocolParseFilter,com.bj58.spat.gaea.server.filter.HandclaspFilter,com.bj58.spat.gaea.server.filter.ExecuteMethodFilter</value>
</property>
  1. ProtocolParseFilter 解析協議過濾器

在Gaea剛接收到數據的時候,Gaea將請求的二進制流放入了上下文根GaeaContext,在此Gaea對二進制流進行了解析,還原,用於執行任務

public void filter(GaeaContext context) throws Exception {
		if(context.getServerType() == ServerType.TCP) {
			byte[] desKeyByte = null;
			String desKeyStr = null;
			boolean bool = false;
			
			Global global = Global.getSingleton();
			if(global != null){
				//判斷當前服務啓用權限認證
				if(global.getGlobalSecureIsRights()){
					SecureContext securecontext = global.getGlobalSecureContext(context.getChannel().getNettyChannel());
					bool = securecontext.isRights();
					if(bool){
						desKeyStr = securecontext.getDesKey();
					}
				}
			}
			
			if(desKeyStr != null){
				desKeyByte = desKeyStr.getBytes("utf-8");
			}
			Protocol protocol = Protocol.fromBytes(context.getGaeaRequest().getRequestBuffer(),global.getGlobalSecureIsRights(),desKeyByte); //根據deskey進行協議解析
			context.getGaeaRequest().setProtocol(protocol);
			/**
			 * 服務重啓直接返回
			 */	
			if(Global.getSingleton().getServerState() == ServerStateType.Reboot && protocol.getPlatformType() == PlatformType.Java){
				GaeaResponse response = new GaeaResponse();
				ResetProtocol rp = new ResetProtocol();
				rp.setMsg("This server is reboot!");
				protocol.setSdpEntity(rp);
				response.setResponseBuffer(protocol.toBytes(global.getGlobalSecureIsRights(),desKeyByte));
				context.setGaeaResponse(response);
				context.setExecFilter(ExecFilterType.None);
				context.setDoInvoke(false);
			}
		}
	}

整個流程如以上代碼,先判斷是否啓用權限認證,再根據權限認證,對二進制流進行協議解析。關於Gaea協議,是一個自定義的二進制協議,具體另起文章詳解。解析後若是是服務重啓任務,則寫入GaeaContext,供後續操做。

  1. HandclaspFilter 鏈接過濾器

客戶端第一次請求,會跟服務端進行DES加解密的交互,肯定客戶端是否有權限調用此服務,具體過程見代碼

/**
	 * 權限認證filter
	 */
	@Override
	public void filter(GaeaContext context) throws Exception {
		
		Protocol protocol = context.getGaeaRequest().getProtocol();		
		if(protocol.getPlatformType() == PlatformType.Java && context.getServerType() == ServerType.TCP){//java 客戶端支持權限認證
			GaeaResponse response = new GaeaResponse();
			Global global = Global.getSingleton();
			//是否啓用權限認證
			if(Global.getSingleton().getGlobalSecureIsRights()){
				SecureContext sc = global.getGlobalSecureContext(context.getChannel().getNettyChannel());
				//判斷當前channel是否經過認證
				if(!sc.isRights()){
					//沒有經過認證
					if(protocol != null && protocol.getSdpEntity() instanceof HandclaspProtocol){
						SecureKey sk = new SecureKey();
						HandclaspProtocol handclaspProtocol = (HandclaspProtocol)protocol.getSdpEntity();
						/**
						 * 接收 客戶端公鑰
						 */
						if("1".equals(handclaspProtocol.getType())){
							sk.initRSAkey();
							//客戶端發送公鑰數據
							String clientPublicKey = handclaspProtocol.getData();
							if(null == clientPublicKey || "".equals(clientPublicKey)){
								logger.warn("get client publicKey warn!");
							}
							//java 客戶端
							if(protocol.getPlatformType() == PlatformType.Java){
								//服務器生成公/私鑰,公鑰傳送給客戶端
								sc.setServerPublicKey(sk.getStringPublicKey());
								sc.setServerPrivateKey(sk.getStringPrivateKey());
								sc.setClientPublicKey(clientPublicKey);
								handclaspProtocol.setData(sk.getStringPublicKey());//服務器端公鑰
							}
							
							protocol.setSdpEntity(handclaspProtocol);
							response.setResponseBuffer(protocol.toBytes());
							context.setGaeaResponse(response);
							this.setInvokeAndFilter(context);
							logger.info("send server publieKey sucess!");
						} 
						/**
						 * 接收權限文件
						 */
						else if("2".equals(handclaspProtocol.getType())){
							//客戶端加密受權文件
							String clientSecureInfo = handclaspProtocol.getData();
							if(null == clientSecureInfo || "".equals(clientSecureInfo)){
								logger.warn("get client secureKey warn!");
							}
							//受權文件客戶端原文(服務器私鑰解密)
							String sourceInfo = sk.decryptByPrivateKey(clientSecureInfo, sc.getServerPrivateKey());
							//校驗受權文件是否相同
							//判斷是否合法,若是合法服務器端生成DES密鑰,經過客戶端提供的公鑰進行加密傳送給客戶端
							if(global.containsSecureMap(sourceInfo)){
								logger.info("secureKey is ok!");
								String desKey = StringUtils.getRandomNumAndStr(8);
								//設置當前channel屬性
								sc.setDesKey(desKey);
								sc.setRights(true);
								handclaspProtocol.setData(sk.encryptByPublicKey(desKey, sc.getClientPublicKey()));
								protocol.setSdpEntity(handclaspProtocol);
								response.setResponseBuffer(protocol.toBytes());
								context.setGaeaResponse(response);
							}else{
								logger.error("It's bad secureKey!");
								this.ContextException(context, protocol, response, "受權文件錯誤!");
							}
							this.setInvokeAndFilter(context);
						}else{
							//權限認證 異常狀況
							logger.error("權限認證異常!");
							this.ContextException(context, protocol, response, "權限認證 異常!");
						}
						response = null;
						sk = null;
						handclaspProtocol = null;
					}else{
						//客戶端沒有啓動權限認證
						logger.error("客戶端沒有啓用權限認證!");
						this.ContextException(context, protocol, response, "客戶端沒有啓用權限認證!");
					}
				}
			}else{
				if(protocol != null && protocol.getSdpEntity() instanceof HandclaspProtocol){
					//異常--當前服務器沒有啓動權限認證
					logger.error("當前服務沒有啓用權限認證!");
					this.ContextException(context, protocol, response, "當前服務沒有啓用權限認證!");
				}
			}
		}
	}
  1. ExecuteMethodFilter 執行方法過濾器

服務端受權文件,對須要執行的方法,進行受權配置,當調用者調用的時候,此方法是否通過受權。在gaea中沒有看到關於此受權配置的文件。這裏就很少說,內部系統對於受權訪問的控制並不嚴格。

##<i class="icon-share">執行任務</i>

執行任務是doInvoke(gaeaContext)

這一步比較簡單,根據協議解析過濾器解析出來的請求數據,找到請求的類名,再根據類名,從IProxyFactory中取出與之對應的代理類,而後代理去執行真正的方法,關於IProxyFactory類,請看Gaea的啓動過程當中如何生成的。

IProxyStub localProxy = Global.getSingleton().getProxyFactory().getProxy(request.getLookup()); //獲取代理類
    
    GaeaResponse gaeaResponse = localProxy.invoke(context);//執行真正的方法

在此過程當中,還對各類異常作了處理,全部的處理結果都放到了GaeaContext中。StopWatch主要記錄調用信息,並在返回過濾器中,記錄執行時間。

##<i class="icon-share">返回過濾器</i>

框架的返回過濾器

<!-- global response filter -->
<property>
<name>gaea.filter.global.response</name>
<value>com.bj58.spat.gaea.server.filter.ProtocolCreateFilter,com.bj58.spat.gaea.server.filter.ExecuteTimeFilter</value>
</property>
  1. ProtocolCreateFilter 建立協議過濾器
context.getGaeaResponse().setResponseBuffer(protocol.toBytes(Global.getSingleton().getGlobalSecureIsRights(),desKeyByte));

將最終執行結果轉換爲二進制流放入GaeaContext。

  1. ExecuteTimeFilter 執行時間監視過濾器

對方法的執行時間進行監控,並將結果發到一個UDP日誌服務器。

public void filter(GaeaContext context) throws Exception {
		StopWatch sw = context.getStopWatch();
		Collection<PerformanceCounter> pcList = sw.getMapCounter().values();
		for(PerformanceCounter pc : pcList) {
			if(pc.getExecuteTime() > minRecordTime) {
				StringBuilder sbMsg = new StringBuilder();
				sbMsg.append(serviceName);
				sbMsg.append("--");
				sbMsg.append(pc.getKey());
				sbMsg.append("--time: ");
				sbMsg.append(pc.getExecuteTime());
				
				sbMsg.append(" [fromIP: ");
				sbMsg.append(sw.getFromIP());
				sbMsg.append(";localIP: ");
				sbMsg.append(sw.getLocalIP()+"]");
				
				udpClient.send(sbMsg.toString());
			}
		}

##<i class="icon-share">結果返回</i>

在以上過濾器,執行等過程,都是將所獲得的結果封裝到了上下文GaeaContext中,在這一步將其返回

public void messageReceived(Object obj) {
				if(context.getServerType() == ServerType.HTTP){
					httpThreadLocal.remove();
				}
				if(obj != null) {
					GaeaContext ctx = (GaeaContext)obj;
					ctx.getServerHandler().writeResponse(ctx);
				} else {
					logger.error("context is null!");
				}
			}
		if(context != null && context.getGaeaResponse() != null){
			context.getChannel().write(context.getGaeaResponse().getResponseBuffer()); //getResponseBuffer 結果的二進制流
		} else {
			context.getChannel().write(new byte[]{0}); 
			logger.error("context is null or response is null in writeResponse");
		}

##<i class="icon-share">異常處理</i>

Gaea服務的執行過程當中,全部的異常都會拋出,被exceptionCaught(exception)接收,並通過Gaea封裝,序列化,返回給客戶端,告訴調用者,是什麼緣由致使調用失敗。

public void exceptionCaught(Throwable e) {
				if(context.getServerType() == ServerType.HTTP){
					httpThreadLocal.remove();
				}
				
				if(context.getGaeaResponse() == null){
					GaeaResponse respone = new GaeaResponse();
					context.setGaeaResponse(respone);
				}
				
				try {
					byte[] desKeyByte = null;
					String desKeyStr = null;
					boolean bool = false;
					
					Global global = Global.getSingleton();
					if(global != null){
						//判斷當前服務啓用權限認證
						if(global.getGlobalSecureIsRights()){
							SecureContext securecontext = global.getGlobalSecureContext(context.getChannel().getNettyChannel());
							bool = securecontext.isRights();
							if(bool){
								desKeyStr = securecontext.getDesKey();
							}
						}
					}
					
					if(desKeyStr != null){
						desKeyByte = desKeyStr.getBytes("utf-8");
					}
					
					Protocol protocol = context.getGaeaRequest().getProtocol();
					if(protocol == null){
						protocol = Protocol.fromBytes(context.getGaeaRequest().getRequestBuffer(),global.getGlobalSecureIsRights(),desKeyByte);
						context.getGaeaRequest().setProtocol(protocol);
					}
					protocol.setSdpEntity(ExceptionHelper.createError(e));
					context.getGaeaResponse().setResponseBuffer(protocol.toBytes(Global.getSingleton().getGlobalSecureIsRights(),desKeyByte));
				} catch (Exception ex) {
					context.getGaeaResponse().setResponseBuffer(new byte[]{0});
					logger.error("AsyncInvokerHandle invoke-exceptionCaught error", ex);
				}
				
				context.getServerHandler().writeResponse(context);
				logger.error("AsyncInvokerHandle invoke error", e);
			}
		});

##<i class="icon-share">總結</i>

至此,一個客戶端的請求處理完畢,在Gaea的整個設計中能夠看出,不少東西都留出了接口,可以很好的在框架自己,作一些適合本身業務的處理,整個設計,則決定了服務通訊框架的性能。

###le284

相關文章
相關標籤/搜索