服務通訊框架Gaea---client的請求處理模型

Gaea的請求處理模型圖

Gaea是一個服務通訊框架 java

 圖片來源於「58同城的跨平臺高性能,高可用的中間層服務架構設計分享」  安全

根據上圖,咱們詳細的來講明一下Gaea的客戶端,請求處理的過程。 服務器

Gaea1.0版本中並無實現異步化調用,那咱們就圍繞這同步調用來講明一下。處理過程以下: session

一、建立代理

爲每個方法的調用建立代理proxy 架構

final String url = "tcp://demo/NewsService"; //demo服務名 NewsService 接口實現類
  INewsService newsService = ProxyFactory.create(INewsService.class, url);
InvocationHandler handler = new ProxyStandard(type, serviceName, lookup);
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class[]{type},
                handler);

在這裏放了一個全局的cache,存儲了每個代理, 框架

private static Map cache = new ConcurrentHashMap(); //key:服務名 value: proxy

建立代理,爲每個方法制定統一的調用入口,每個方法在真正執行的時候,都將跳轉到這裏往下執行。也就是說,此方法將代理你所調用的方法,去執行後面的操做。 異步

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable

二、等待窗口

在這裏咱們暫時不討論服務器的選擇等策略,繼續說明這個請求處理模型。在這個途中提到了一個頗有用的東西,即等待窗口,那到底這個等待窗口到底在Gaea中充當一個什麼樣的角色呢?在代碼中咱們能夠簡單的看到,等待窗口,便是一個Map而已 socket

private ConcurrentHashMap<Integer, WindowData> WaitWindows = new ConcurrentHashMap<Integer, WindowData>();
在這個 等待窗口WaitWindows中Gaea放入了兩個個很重要的東西key:session 和 value:WindowData

session是一個隨着每一次調用自增加的一個int,它將被寫入到Gaea的協議中,用於找到返回數據的歸屬 tcp

private int sessionId; 
private int createSessionId() {
        synchronized (this) { //同步,確保線程安全
            if (sessionId > GaeaConst.MAX_SESSIONID) { //當大於最大值後,從1繼續累加
                sessionId = 1;
            }
            return sessionId++;
        }
    }
WindowData是一個對象類型,用於時間通知和存放返回值。
public class WindowData {

	AutoResetEvent _event; //用於事件通知
	byte[] _data; //接受返回值
	private byte flag; //Gaea1.0中沒有用到
	private Exception exception; 
...........
}
在send以前,Gaea在 等待窗口waitWindows中註冊了這個windowData
socket.registerRec(p.getSessionID());
public void registerRec(int sessionId) {
        AutoResetEvent event = new AutoResetEvent(); 
        WindowData wd = new WindowData(event);
        WaitWindows.put(sessionId, wd);
    }

在發送以前Gaea往等待窗口waitWindows中,利用sessionId註冊了windowData,在windowData中Gaea又new了一個AutoResetEvent,用於通知接收程序。 ide

三、數據發送

在發送線程中,直接利用send函數發送了數據,也沒有用到等待窗口waitWindows和WindowData用於擁塞控制,而是直接發送

socket.registerRec(p.getSessionID());
  socket.send(data);

在send中,直接添加了協議的頭和尾就直接發送了,所以目前尚未實現設計中的擁塞控制

四、數據接收

返回數據的接受,是在建立server,socket時,就啓動了一個死循環在接收數據,新版的Gaea中,此處已改成select方式獲取數據。

@Override
    public void run() {
        while (true) {
            try {
                for (final CSocket socket : sockets) {
                    try {
                        pool.execute(Handle.getInstance(socket));
                    } catch (Throwable ex) {
                        logger.error(ex);
                    }
                }
                Thread.sleep(1);
            } catch (Throwable ex) {
                logger.error(ex);
            }
        }
    }
真正接收數據實在CSocket.java中
protected void frameHandle() throws IOException, InterruptedException
在frameHandle()中,Gaea從receive中獲取到sessionId,由於sessionId是被寫入到協議中來回傳輸的,

經過這個sessionId,很容易的就從等待窗口中取出了WindowData

WindowData wd = WaitWindows.get(pSessionId);
  if (wd != null) {
      wd.setData(pak);
      wd.getEvent().set();
 }
其中wd.setData(pak);就是向windowData中寫入返回數據,wd.getEvent().set() 則是通知接收函數,這個sessionId的請求回來了,你能夠返回給調用者了。

五、數據返回

由於講的是同步,其中send,和 receive在同一個request()的方法中,send以後,receive一直在等待着,這次調用的返回

AutoResetEvent event = wd.getEvent();
        int timeout = getReadTimeout(socketConfig.getReceiveTimeout(), queueLen);
        if (!event.waitOne(timeout)) {
            throw new TimeoutException("Receive data timeout or error!timeout:" + timeout + "ms,queue length:" + queueLen);
        }
在這一段代碼中,receive經過發送時的sessionId找到windowData,而後獲取AutoResetEvent

if(!event.waitOne(timeout)){} 在這裏程序將等待這個event的set()函數通知本身,數據已經被返回了,不然等待timeout之後,將再也不等待,執行if中的語句,返回超時。超時後,在異常處理中,刪除這個等待窗口中的sessionId,則若是再返回,就會由於找不到這個sessionId,而拋棄返回值。

if (socket != null) {
      socket.unregisterRec(p.getSessionID());
 }
至此整個Gaea客戶端的請求處理模型就說完了,能夠說,整個模型都是圍繞這 等待窗口WaitWindows、sessionId、windowData來處理的。
相關文章
相關標籤/搜索