Gaea是一個服務通訊框架 java
圖片來源於「58同城的跨平臺高性能,高可用的中間層服務架構設計分享」 安全
根據上圖,咱們詳細的來講明一下Gaea的客戶端,請求處理的過程。 服務器
Gaea1.0版本中並無實現異步化調用,那咱們就圍繞這同步調用來講明一下。處理過程以下: session
爲每個方法的調用建立代理proxy 架構
final String url = "tcp://demo/NewsService"; //demo服務名 NewsService 接口實現類 INewsService newsService = ProxyFactory.create(INewsService.class, url);
InvocationHandler handler = new ProxyStandard(type, serviceName, lookup); return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{type}, handler);
在這裏放了一個全局的cache,存儲了每個代理, 框架
private static Map cache = new ConcurrentHashMap(); //key:服務名 value: proxy
建立代理,爲每個方法制定統一的調用入口,每個方法在真正執行的時候,都將跳轉到這裏往下執行。也就是說,此方法將代理你所調用的方法,去執行後面的操做。 異步
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
在這裏咱們暫時不討論服務器的選擇等策略,繼續說明這個請求處理模型。在這個途中提到了一個頗有用的東西,即等待窗口,那到底這個等待窗口到底在Gaea中充當一個什麼樣的角色呢?在代碼中咱們能夠簡單的看到,等待窗口,便是一個Map而已 socket
private ConcurrentHashMap<Integer, WindowData> WaitWindows = new ConcurrentHashMap<Integer, WindowData>();在這個 等待窗口WaitWindows中Gaea放入了兩個個很重要的東西key:session 和 value:WindowData
session是一個隨着每一次調用自增加的一個int,它將被寫入到Gaea的協議中,用於找到返回數據的歸屬 tcp
private int sessionId; private int createSessionId() { synchronized (this) { //同步,確保線程安全 if (sessionId > GaeaConst.MAX_SESSIONID) { //當大於最大值後,從1繼續累加 sessionId = 1; } return sessionId++; } }WindowData是一個對象類型,用於時間通知和存放返回值。
public class WindowData { AutoResetEvent _event; //用於事件通知 byte[] _data; //接受返回值 private byte flag; //Gaea1.0中沒有用到 private Exception exception; ........... }在send以前,Gaea在 等待窗口waitWindows中註冊了這個windowData
socket.registerRec(p.getSessionID());
public void registerRec(int sessionId) { AutoResetEvent event = new AutoResetEvent(); WindowData wd = new WindowData(event); WaitWindows.put(sessionId, wd); }
在發送以前Gaea往等待窗口waitWindows中,利用sessionId註冊了windowData,在windowData中Gaea又new了一個AutoResetEvent,用於通知接收程序。 ide
在發送線程中,直接利用send函數發送了數據,也沒有用到等待窗口waitWindows和WindowData用於擁塞控制,而是直接發送
socket.registerRec(p.getSessionID()); socket.send(data);
在send中,直接添加了協議的頭和尾就直接發送了,所以目前尚未實現設計中的擁塞控制
返回數據的接受,是在建立server,socket時,就啓動了一個死循環在接收數據,新版的Gaea中,此處已改成select方式獲取數據。
@Override public void run() { while (true) { try { for (final CSocket socket : sockets) { try { pool.execute(Handle.getInstance(socket)); } catch (Throwable ex) { logger.error(ex); } } Thread.sleep(1); } catch (Throwable ex) { logger.error(ex); } } }真正接收數據實在CSocket.java中
protected void frameHandle() throws IOException, InterruptedException在frameHandle()中,Gaea從receive中獲取到sessionId,由於sessionId是被寫入到協議中來回傳輸的,
經過這個sessionId,很容易的就從等待窗口中取出了WindowData
WindowData wd = WaitWindows.get(pSessionId); if (wd != null) { wd.setData(pak); wd.getEvent().set(); }其中wd.setData(pak);就是向windowData中寫入返回數據,wd.getEvent().set() 則是通知接收函數,這個sessionId的請求回來了,你能夠返回給調用者了。
由於講的是同步,其中send,和 receive在同一個request()的方法中,send以後,receive一直在等待着,這次調用的返回
AutoResetEvent event = wd.getEvent(); int timeout = getReadTimeout(socketConfig.getReceiveTimeout(), queueLen); if (!event.waitOne(timeout)) { throw new TimeoutException("Receive data timeout or error!timeout:" + timeout + "ms,queue length:" + queueLen); }在這一段代碼中,receive經過發送時的sessionId找到windowData,而後獲取AutoResetEvent
if(!event.waitOne(timeout)){} 在這裏程序將等待這個event的set()函數通知本身,數據已經被返回了,不然等待timeout之後,將再也不等待,執行if中的語句,返回超時。超時後,在異常處理中,刪除這個等待窗口中的sessionId,則若是再返回,就會由於找不到這個sessionId,而拋棄返回值。
if (socket != null) { socket.unregisterRec(p.getSessionID()); }至此整個Gaea客戶端的請求處理模型就說完了,能夠說,整個模型都是圍繞這 等待窗口WaitWindows、sessionId、windowData來處理的。