mina2中的session

簡介

session類圖java

Mina每創建一個鏈接同時會建立一個session對象,用於保存此次讀寫須要用到的全部信息。從抽象類AbstractIoSession中能夠看出session具備以下功能:
一、從attributes成員能夠看出session能夠存放用戶關心的鍵值對
二、注意到WriteRequestQueue,這是一個寫請求隊列,processor中調用flush或者flushNow方法時會將用戶寫入的數據包裝成一個writeRequest對象,並加入這個隊列中。
三、提供了大量的統計功能,好比接收到了多少消息、最後讀取時間等
在代碼中通常是這樣使用session的
// 建立服務器監聽  
IoAcceptor acceptor = new NioSocketAcceptor();  
// 設置buffer的長度  
acceptor.getSessionConfig().setReadBufferSize(2048);  
// 設置鏈接超時時間  
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);  

session作爲一個鏈接的具體對象,緩存當前鏈接用戶的一些信息。

linux

建立好acceptor或者connector以後,經過IoSessionConfig對session對行配置。
用得最多的是經過session寫入數據,這是調用了IoSession的write方法
WriteFuture write(Object message);  
WriteFuture write(Object message, SocketAddress destination);  

下面着重分析建立過程以及session的狀態緩存

建立與初始化

每創建一個鏈接,就會建立一個session,IoAcceptor的accept方法的返回值正是一個session。見 NioSocketAcceptor .accept( IoProcessor < NioSession > processor, ServerSocketChannel handle)方法:
 protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception {

        SelectionKey key = handle.keyFor(selector);

        if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) {
            return null;
        }

        // accept the connection from the client
        SocketChannel ch = handle.accept();

        if (ch == null) {
            return null;
        }

        return new NioSocketSession(this, processor, ch);
    }
由以上代碼可知, session包含了對衆多對象的引用,好比processor,socketChannel,SelectionKey,IoFilter等。
session在建立好後,緊接着就會對其進行初始化。 AbstractIoService .initSession( IoSession session, IoFuture future, IoSessionInitializer sessionInitializer)方法以下:
    protected final void initSession(IoSession session, IoFuture future, IoSessionInitializer sessionInitializer) {
        // Update lastIoTime if needed.
        if (stats.getLastReadTime() == 0) {
            stats.setLastReadTime(getActivationTime());
        }

        if (stats.getLastWriteTime() == 0) {
            stats.setLastWriteTime(getActivationTime());
        }

        // Every property but attributeMap should be set now.
        // Now initialize the attributeMap.  The reason why we initialize
        // the attributeMap at last is to make sure all session properties
        // such as remoteAddress are provided to IoSessionDataStructureFactory.
        try {
            ((AbstractIoSession) session).setAttributeMap(session.getService().getSessionDataStructureFactory()
                    .getAttributeMap(session));
        } catch (IoSessionInitializationException e) {
            throw e;
        } catch (Exception e) {
            throw new IoSessionInitializationException("Failed to initialize an attributeMap.", e);
        }

        try {
            ((AbstractIoSession) session).setWriteRequestQueue(session.getService().getSessionDataStructureFactory()
                    .getWriteRequestQueue(session));
        } catch (IoSessionInitializationException e) {
            throw e;
        } catch (Exception e) {
            throw new IoSessionInitializationException("Failed to initialize a writeRequestQueue.", e);
        }

        if ((future != null) && (future instanceof ConnectFuture)) {
            // DefaultIoFilterChain will notify the future. (We support ConnectFuture only for now).
            session.setAttribute(DefaultIoFilterChain.SESSION_CREATED_FUTURE, future);
        }

        if (sessionInitializer != null) {
            sessionInitializer.initializeSession(session, future);
        }

        finishSessionInitialization0(session, future);
    }
設置上次讀寫時間,初始化屬性map和寫請求隊列等。
session被初始化好以後會加入到processor中,processor中有一個隊列專門存放session:
例如:AbstractPollingIoProcessor.java中有:
private final Queue<S> newSessions = new ConcurrentLinkedQueue<S>();

    /** A queue used to store the sessions to be removed */
    private final Queue<S> removingSessions = new ConcurrentLinkedQueue<S>();

    /** A queue used to store the sessions to be flushed */
    private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<S>();

    /**
     * A queue used to store the sessions which have a trafficControl to be
     * updated
     */
    private final Queue<S> trafficControllingSessions = new ConcurrentLinkedQueue<S>();

    /** The processor thread : it handles the incoming messages */
    private final AtomicReference<Processor> processorRef = new AtomicReference<Processor>();
加入隊列以後,processor就會從隊列中取出session,如下是processor的run方法關鍵代碼:
  1. private class Processor implements Runnable {  
         public void run() {  
             for (;;) {  
                 long t0 = System.currentTimeMillis();  
                 int selected = select(SELECT_TIMEOUT);  
                 long t1 = System.currentTimeMillis();  
                 long delta = (t1 - t0);  
      
                 if ((selected == 0) && !wakeupCalled.get() && (delta < 100)) {  
                     if (isBrokenConnection()) {  
                         wakeupCalled.getAndSet(false);  
                         continue;  
                     } else {  
                         registerNewSelector();  
                     }  
                     wakeupCalled.getAndSet(false);  
                     continue;  
                 }  
      
                 nSessions += handleNewSessions();  
      
                 updateTrafficMask();  
      
                 if (selected > 0) {  
                     process();  
                 }  
               
                 nSessions -= removeSessions();  
                              
             }  
         }  
     }  
一、不斷地調用select方法來檢查是否有session準備就緒,若是沒有或者間隔時間小於100ms則檢查selector是否可用,若是不可用從新建一個selector(這裏linux下的epoll的問題可能致使selector不可用。)
二、從newSessions隊列中取出這些session,並將其負責的通道註冊到selector上
三、處理準備就緒的session
AbstractPollingIoProcessor.java
private void process(S session) {  
    // Process Reads  
    if (isReadable(session) && !session.isReadSuspended()) {  
        read(session);  
    }  
  
    // Process writes  
    if (isWritable(session) && !session.isWriteSuspended()) {  
        // add the session to the queue, if it's not already there  
        if (session.setScheduledForFlush(true)) {  
            flushingSessions.add(session);  
        }  
    }  
}
總結一下建立與初始化過程:鏈接到來建立一個session,初始化好以後加入到processor負責的一個隊列中。processor線程會把隊列中的session對應的通道都註冊到它本身的selector上,而後這個selector輪詢這些通道是否準備就緒,一旦準備就緒就調用對應方法進行處理(read or flushNow)。

狀態

Mina中的session具備狀態,且狀態之間是能夠相互轉化的
Connected:session被建立時處於這種狀態
Idle:沒有請求能夠處理(可配置)
Closing:正處於關閉狀態(可能正在作一些清理工做)
Closed:關閉狀態
下圖是這幾種狀態之間的轉化圖:
IoFilter與IoHandler就是在這些狀態上面加以干預,下面重點看一下IDLE狀態,它分三種:
Idle for read:在規定時間內沒有數據可讀
Idle for write:在規定時間內沒有數據可寫
Idle for both:在規定時間內沒有數據可讀和可寫
這三種狀態分別對應IdleStatus類的三個常量:READER_IDLE、WRITER_IDLE、BOTH_IDLE
前面session的用法中有以下設置:
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); 
acceptor的run方法中調用了notifyIdleSessions
private void notifyIdleSessions( long currentTime )  
    {  
        // process idle sessions  
        if ( currentTime - lastIdleCheckTime >= 1000 )  
        {  
            lastIdleCheckTime = currentTime;  
            AbstractIoSession.notifyIdleness( getListeners().getManagedSessions().values().iterator(), currentTime );  
        }  
 } 
每隔一秒一檢查是否到達了設置的空閒時間
  1. private static void notifyIdleSession0(IoSession session, long currentTime, long idleTime, IdleStatus status,  
            long lastIoTime) {  
        if ((idleTime > 0) && (lastIoTime != 0) && (currentTime - lastIoTime >= idleTime)) {  
            session.getFilterChain().fireSessionIdle(status);  
        }  
    }  
若是當前時間減去上一次IDLE事件觸發的時間大於用戶設置的idleTime,則觸發一次sessionIdle事件。
public void fireSessionIdle(IdleStatus status) {  
    session.increaseIdleCount(status, System.currentTimeMillis());  
    Entry head = this.head;  
    callNextSessionIdle(head, session, status);  
}  
increaseIdleCount這個方法會更新lastToTime的值爲當前時間,緊接着穿透過濾器鏈(固然在過濾器的sessionIdle中可能會作一些操做)到達IoHandler的sessionIdle方法,若是須要在session空閒的時候作一些操做,就能夠在這個方法裏面作。
相關文章
相關標籤/搜索