Mina源碼閱讀筆記(五)—Mina對鏈接的操做IoSession

接上一篇《Mina的鏈接IoConnector》 java

IoSessionMina管理兩端的一個重要部分,也是Mina的核心,Session具備了生命週期的概念,它的生命週期和鏈接時緊密相關的,這點在後面的介紹中會涉及。另外,好像hibernate中也有session也有生命週期(真的是很久沒有用了,連hibernate有裏session是幹嗎的都想不起來了)。 web

在讀源碼以前,咱們仍是先了解下IoSession的做用和一些基本概念。IoSession的主要做用有以下一些: apache

l  管理鏈接。注意,這裏的管理鏈接並非直接去控制咱們上次講的最底層的鏈接acceptorconnector。若是acceptorconnector創建的一條管道,那session就是在管道內的管理者,他是沒有辦法將管道對半拆分開的,他只能從內部阻斷兩邊的通訊。管理鏈接還有部分就是能夠配置緩衝區的大小,閒置時間等等。 網絡

l  存儲信息。和web裏的session同樣,這裏的session也有存儲attribute的功能,不過通常來講,這裏存儲的都是和鏈接有關的東西,並不會像web開發同樣存一些業務上的東西。 session

l  驅動讀寫操做。我不知道用驅動這個詞是否合適,那個例子來講,session.write 數據結構

l  統計功能。Session還記錄了鏈接中的bytemessage等數量。 框架

Session在使用中是經過ConnectionFuture得到的: less

ConnectFuture future = connector.connect(new InetSocketAddress(
					HOST, PORT));// 建立鏈接
			future.awaitUninterruptibly();// 等待鏈接建立完成
			session = future.getSession();// 得到session

說完做用,就是session的狀態了,咱們讀源碼的時候,也會跟着session的狀態來讀: ide

  • Connected : the session has been created and is available
  • Idle : the session hasn't processed any request for at least a period of time (this period is configurable)
    • Idle for read : no read has actually been made for a period of time
    • Idle for write : no write has actually been made for a period of time
    • Idle for both : no read nor write for a period of time
  • Closing : the session is being closed (the remaining messages are being flushed, cleaning up is not terminated)
  • Closed : The session is now closed, nothing else can be done to revive it.

注意,這裏的最開始的connect,最後的closed這些都說的是acceptorconnector之間的操做,是他們的狀態來影響session的狀態。和hibernate不同,那個的操做也是session本身的(session.close等),這裏的session是沒辦法控制通道的。 工具

瞭解完了基礎的,咱們來看看代碼層面的實現,在org,apache.mina.core.session包中主要實現了IoSession的相關功能。咱們從IoSession這個接口開始看起:

內容比較多,比以前的那幾篇都難多了。咱們仍是按照上圖畫出來的方框對這些方法進行分分類,看看是否是跟咱們以前分析的同樣:

紅色:獲得一系列配置等。咱們說了sessionmina的核心。

藍色:驅動讀寫操做。

綠色:管理鏈接。

黑色:存儲功能,是否是和JSP裏的session很像。

橘色:統計數據。

有些沒有框上的並非說不屬於這裏面,而是有些我確實不瞭解,有些是比較難劃分,具體的含義能夠看源碼中,都有說明。

瞭解完IoSession的最基本功能以後,咱們要看看它的具體實現類AbstractIoSession。看具體實現以前,咱們先看看這個類關聯了哪些比較重要的引用:

private final IoHandler handler;

    protected IoSessionConfig config;

      private final IoService service;

    private static final AttributeKey READY_READ_FUTURES_KEY = new AttributeKey(AbstractIoSession.class,
            "readyReadFutures");
private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER;

  private static final WriteRequest CLOSE_REQUEST = new DefaultWriteRequest(new Object());

    private IoSessionAttributeMap attributes;

    private WriteRequestQueue writeRequestQueue;

private WriteRequest currentWriteRequest;

private final CloseFuture closeFuture = new DefaultCloseFuture(this);

在上面的代碼中,咱們有熟悉的handler,這裏咱們也能夠稍微明確一下handlersession之間的關係了,每次咱們在建立服務端和客戶端的時候,都必須設置一個handler,若是不設置則報異常,handler裏主要處理seesion各類狀態時的業務。Service用來管理sessionCloseFutrue用來設置通道的關閉,在上面咱們已經說過,session的關閉是經過closefuture來操做的。

這些成員變量中,除了咱們見過的handler和future,凡是帶有write的都來自org.apache.mina.core.write包,這個包做爲一個內部的工具類,在session的寫操做中起到輔助做用。其餘類均來自org.apache.mina.core.session中,這些類組成都比較簡單,但都是要了解AbstractIoSession以前,咱們要對這裏提到的這些對象有所瞭解。

首先是ioSessionConfig,和它的具體實現AbstractIoSessionCnfig:

從上面的圖咱們很容易就能看到mina的一些默認傳輸配置,固然這些數字都不是隨便寫的,爲何最小要64,最大是65535,我相信計算機網絡相關課程裏應該都會有涉及。

接下來是IoSessionAttributeMap接口,這個接口主要做用就是規定了get、set、remove Attribute的方法。注意存儲在session中的變量是一種map關係的變量(key-value),因此咱們也很容易明白這個接口命名時爲何後面要多個map出來。至於這個類的實現,它隱藏的很好,放在了一個內部類中。具體能夠看IoSessionDataStructureFactory這個接口,這個接口主要是爲了這個map形勢提供數據結構和規定基本操做。至於這個session中map的底層實現則用了ConcurrentHashMap來作容器,這部分具體能夠看內部類DefaultIoSessionAttributeMap。

還有一個就是AttributeKey,這個類主要重寫equals方法和hashCode方法,爲了將session中的key和對應的session聯繫起來。由於一個項目中可能有多個session,而不一樣session中的key可能會相同,因此在構造key和hash的時候會將session也考慮進去。

public AttributeKey(Class<?> source, String name) {
        this.name = source.getName() + '.' + name + '@' + Integer.toHexString(this.hashCode());
    }

如今咱們能夠看AbstractIoSession了。主要看讀寫操做,其餘操做都是統計和配置稍稍看過便可:

public final ReadFuture read() {
        if (!getConfig().isUseReadOperation()) {
            throw new IllegalStateException("useReadOperation is not enabled.");
        }

        Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
        ReadFuture future;
        synchronized (readyReadFutures) {
            future = readyReadFutures.poll();
            if (future != null) {
                if (future.isClosed()) {
                    // Let other readers get notified.
                    readyReadFutures.offer(future);
                }
            } else {
                future = new DefaultReadFuture(this);
                getWaitingReadFutures().offer(future);
            }
        }

        return future;
    }

採用隊列進行讀取,這裏只用了Queue,沒有用concurrent中的那些同步隊列,而是用了synchronized關鍵字來處理同步,主要是咱們要明白這裏要同步的不是隊列裏的內容,而是讀的這個過程,session都是獨立的,因此一個session內一個隊列不管怎麼搶仍是能排除順序的。因此對於讀操做來講,主要是要保證在讀一條的時候,不能有其餘線程再讀。

下面是寫操做:

public WriteFuture write(Object message, SocketAddress remoteAddress) {
        if (message == null) {
            throw new IllegalArgumentException("Trying to write a null message : not allowed");
        }

        // We can't send a message to a connected session if we don't have
        // the remote address
        if (!getTransportMetadata().isConnectionless() && (remoteAddress != null)) {
            throw new UnsupportedOperationException();
        }

        // If the session has been closed or is closing, we can't either
        // send a message to the remote side. We generate a future
        // containing an exception.
        if (isClosing() || !isConnected()) {
            WriteFuture future = new DefaultWriteFuture(this);
            WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress);
            WriteException writeException = new WriteToClosedSessionException(request);
            future.setException(writeException);
            return future;
        }

        FileChannel openedFileChannel = null;

        // TODO: remove this code as soon as we use InputStream
        // instead of Object for the message.
        try {
            if ((message instanceof IoBuffer) && !((IoBuffer) message).hasRemaining()) {
                // Nothing to write : probably an error in the user code
                throw new IllegalArgumentException("message is empty. Forgot to call flip()?");
            } else if (message instanceof FileChannel) {
                FileChannel fileChannel = (FileChannel) message;
                message = new DefaultFileRegion(fileChannel, 0, fileChannel.size());
            } else if (message instanceof File) {
                File file = (File) message;
                openedFileChannel = new FileInputStream(file).getChannel();
                message = new FilenameFileRegion(file, openedFileChannel, 0, openedFileChannel.size());
            }
        } catch (IOException e) {
            ExceptionMonitor.getInstance().exceptionCaught(e);
            return DefaultWriteFuture.newNotWrittenFuture(this, e);
        }

        // Now, we can write the message. First, create a future
        WriteFuture writeFuture = new DefaultWriteFuture(this);
        WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);

        // Then, get the chain and inject the WriteRequest into it
        IoFilterChain filterChain = getFilterChain();
        filterChain.fireFilterWrite(writeRequest);

        // TODO : This is not our business ! The caller has created a
        // FileChannel,
        // he has to close it !
        if (openedFileChannel != null) {
            // If we opened a FileChannel, it needs to be closed when the write
            // has completed
            final FileChannel finalChannel = openedFileChannel;
            writeFuture.addListener(new IoFutureListener<WriteFuture>() {
                public void operationComplete(WriteFuture future) {
                    try {
                        finalChannel.close();
                    } catch (IOException e) {
                        ExceptionMonitor.getInstance().exceptionCaught(e);
                    }
                }
            });
        }

        // Return the WriteFuture.
        return writeFuture;
    }

這裏面有個instanceof FileChannelFile是否是感到有點兒奇怪,mina不是寫出去的是IoBuffer麼,怎麼如今又能夠寫文件了。Mina做爲一個封裝好的框架,天然能夠直接作文件的傳輸,這裏面會有相應的handler來處理這些業務。

Session部分最主要的就是了解他的生命週期以及相關聯的那些引用,這裏咱們能夠看到與讀寫最密切的就是Future了,因此這部分,就是我下篇會寫的主題。

相關文章
相關標籤/搜索