Mina源碼閱讀筆記(六)—Mina異步IO的實現IoFuture

接上篇《IoSession對鏈接的操做》 java

IoFuture是和IoSession緊密相連的一個類,在官網上並無對它的描述,由於它通常不會顯示的拿出來用,權當是一個工具類被session所使用。固然在做用上,這個系列可並不簡單,咱們先看源碼的註釋對它的描述: 面試

IoFuture represents the completion of an asynchronous I/O operation on an IoSession. apache

這個類是提供異步操做的一個工具,因此在讀源碼以前,必須對異步IO操做有所瞭解,而後咱們才能夠順着這條路往下走。關於異步IO的介紹能夠看:《同步、異步、阻塞、非阻塞》 安全

IoFuture經過IoFutureListener被IoSession綁定,它的實現都放在org.apache.mina.core.future下。在IoFuture的實現中,分別提供了讀、寫、鏈接、關閉的future,經過這四個future來實現異步操做。異步操做很重要的一部分就是對線程的控制,因此在IoFuture這個接口中,咱們能很清楚的理清這幾個方法:await、join。固然還有notify,可是notify沒有必要寫在接口中,它能夠在程序裏直接使用。 session

這個系列的類設計的很規整,從上圖的結構就能看出來,圖中省略了writeconnection的圖,它們分別和readclose一致。因爲這個類的操做沒有那麼複雜,繼承關係也沒有那麼多層,因此這裏面都沒有用到Abstract的類來作具體實現。 框架

下面咱們來看這裏面最核心的一個類DefaultIoFuture。這個類實現IoFuture接口,主要實現了對await和join的操做,以及處理死鎖的操做。咱們先看這個類關聯到的成員變量,都比較簡單: 異步

/** A number of seconds to wait between two deadlock controls ( 5 seconds ) */
    private static final long DEAD_LOCK_CHECK_INTERVAL = 5000L;

    /** The associated session */
    private final IoSession session;

    /** A lock used by the wait() method */
    private final Object lock;

    private IoFutureListener<?> firstListener;

    private List<IoFutureListener<?>> otherListeners;

    private Object result;

    private boolean ready;

    private int waiters;

在我看來,讀源碼的目的,一是爲了理清框架的設計邏輯,理清結構,學習這些關聯關係;二是爲了學習處理細節,好比死鎖的處理、線程安全的處理。在這個類中,咱們將看到mina做者是如何處理死鎖的問題的。 async

咱們先看await操做,await主要是爲了等待異步操做的完成,而後通知相關的listener。咱們先看一個簡單的await操做和驗證死鎖的操做: 工具

public IoFuture await() throws InterruptedException {
        synchronized (lock) {
            while (!ready) {
                waiters++;
                try {
                    lock.wait(DEAD_LOCK_CHECK_INTERVAL);
                } finally {
                    waiters--;
                    if (!ready) {
                        checkDeadLock();
                    }
                }
            }
        }
        return this;
}

咱們應該要注意下在await方法中的wait操做,這裏講些題外話,面試中常問的waitsleep的區別。wait的操做其實很規範,必須寫在synchronized塊內,必須由其餘線程來notify,同時wait釋放鎖,不佔資源。而sleep佔着cup的資源區睡眠,時間沒到不能被喚醒,只能經過中斷來打斷。在這個await方法中,waitcheck dead lock的時間,而且設置了計數器waiters。這個waiterssetValue方法中被運用到,在setValue中: 學習

public void setValue(Object newValue) {
        synchronized (lock) {
            // Allow only once.
            if (ready) {
                return;
            }

            result = newValue;
            ready = true;
            if (waiters > 0) {
                lock.notifyAll();
            }
        }

        notifyListeners();
    }

異步操做是沒有一個固定的順序,誰先作好誰就返回,因此一旦有異步任務完成了操做,就會notify全部的等待,讓接下來先搶到的線程再執行。在DefaultIoFuture這個類中,我以爲最重要的到不是鏈接或者讀寫,而是上面提到的setValuegetValue,由於在後續的繼承關係中,會不斷的用到這兩個方法。不只在後續的繼承關係中,這兩個方法真正在傳遞值得操做是發生在IoService中,不要忘了雖然session很重要,但真正起鏈接做用的仍是service

而後咱們再看下上面提到的check dead lock的方法,在搶佔中只有讀、寫和鏈接會產生死鎖的狀況:

private void checkDeadLock() {
        if (!(this instanceof CloseFuture || this instanceof WriteFuture || this instanceof ReadFuture || this instanceof ConnectFuture)) {
            return;
        }
        StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();

        // Simple and quick check.
        for (StackTraceElement s : stackTrace) {
            if (AbstractPollingIoProcessor.class.getName().equals(s.getClassName())) {
                IllegalStateException e = new IllegalStateException("t");
                e.getStackTrace();
                throw new IllegalStateException("DEAD LOCK: " + IoFuture.class.getSimpleName()
                        + ".await() was invoked from an I/O processor thread.  " + "Please use "
                        + IoFutureListener.class.getSimpleName() + " or configure a proper thread model alternatively.");
            }
        }

        // And then more precisely.
        for (StackTraceElement s : stackTrace) {
            try {
                Class<?> cls = DefaultIoFuture.class.getClassLoader().loadClass(s.getClassName());
                if (IoProcessor.class.isAssignableFrom(cls)) {
                    throw new IllegalStateException("DEAD LOCK: " + IoFuture.class.getSimpleName()
                            + ".await() was invoked from an I/O processor thread.  " + "Please use "
                            + IoFutureListener.class.getSimpleName()
                            + " or configure a proper thread model alternatively.");
                }
            } catch (Exception cnfe) {
                // Ignore
            }
        }
    }

在追蹤堆棧信息時,這裏採用了兩種check方式,簡單和精確。在簡單檢測中,只是對比了類名,也就是對當前類有效,是一個一對一的比較。而在精確的檢測中,採用isAssignableFrom方法來分別和其父類和本類進行比較。若是有死鎖,就拋異常。另外join方法被廢棄,由awaitUninterruptibly代替,雖然叫join,其實仍是一種wait操做,等到必定時間將flag轉變一下。

下面咱們看ReadFuture接口,這個接口直接繼承IoFuture接口,並添加了相關的寫操做。接口由DefaultReadFuture實現。在使用中,ReadFuture在IoSession中read方法中被使用,也能夠說,在session層,直接讀寫的是future,咱們再看下AbstractIoSession中的read代碼:

public final ReadFuture read() {
        if (!getConfig().isUseReadOperation()) {
            throw new IllegalStateException("useReadOperation is not enabled.");
        }

        Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
        ReadFuture future;
        synchronized (readyReadFutures) {
            future = readyReadFutures.poll();
            if (future != null) {
                if (future.isClosed()) {
                    // Let other readers get notified.
                    readyReadFutures.offer(future);
                }
            } else {
                future = new DefaultReadFuture(this);
                getWaitingReadFutures().offer(future);
            }
        }

        return future;
    }

每次都是從隊列中拿出一個future,同理,每次寫也是往隊列裏寫入一個future。在DefaultReadFuture中的方法都比較簡單,這裏就不貼出來了。另外WriteFutureDefaultWriteFutureread相似,也再也不贅述。

最後咱們看看ConnectFuture,咱們經常寫這麼一段話來拿到session:

ConnectFuture future = connector.connect(new InetSocketAddress(
					HOST, PORT));// 建立鏈接
			future.awaitUninterruptibly();// 等待鏈接建立完成
			session = future.getSession();// 得到session

提一點,在多態的使用上,ConnectFuture徹底能夠換成IoFuture,這對後面的代碼沒有一點兒影響,getSession自己就是繼承自IoFuture的。ConnectFuture接口由DefaultConnectFuture來具體實現,因爲繼承了DefaultIoFuture,因此這裏面用到最多的就是DefaultIoFuture中的setValuegetValue方法,上面咱們也特別強調了這兩個方法的重要性,經過對resultsetValue)的傳遞實現了對sessionexception等狀態的傳遞。

稍微總結一下future對異步的貢獻,官方對future的描述就是處理了異步操做,從源碼中咱們很明顯的能夠看到future是經過await和notify來控制操做的連續性,經過死鎖檢測來作wait時的保障,上層(session)經過隊列來緩衝各類任務,而後經過競爭,誰搶到了線程,誰就執行。Future不難,組織結構也很清楚,我以爲看這節的源代碼最主要的仍是要作好兩點,第一是搞懂什麼是異步,第二是要明白future爲異步貢獻了什麼。

下一篇就要講mina中最龐大的filter chain了,這是mina中代碼最多,也是最具特點的一部分。

相關文章
相關標籤/搜索