NIO 源碼分析(05) Channel 源碼分析

NIO 源碼分析(05) Channel 源碼分析java

Netty 系列目錄(http://www.javashuo.com/article/p-hskusway-em.html)linux

1、Channel 類圖

Channel 類圖

功能說明:編程

  • InterruptibleChannel、AbstractInterruptibleChannel 提供了 Channel 響應 thread.interrupt(),支持中斷操做,最重要的兩個方法是 begin 和 end。
  • SelectableChannel、AbstractSelectableChannel 提供了 Channel 註冊到 Selector 的各類方法。
  • ReadableByteChannel、ScatteringByteChannel Channel 讀數據。
  • WritableByteChannel、GatheringByteChannel Channel 寫數據。
  • NetworkChannel Channel 進行端口綁定、參數設置等網絡相關的操做。
  • SocketChannel Socket 門面,由 SelectorProvider.openSocketChannel 提供具體的實現類。
  • ServerSocketChannel ServerSocket 門面,由 SelectorProvider.openServerSocketChannel 提供具體的實現類。
public ServerSocketChannel openServerSocketChannel() throws IOException {
    return new ServerSocketChannelImpl(this);
}

public SocketChannel openSocketChannel() throws IOException {
    return new SocketChannelImpl(this);
}

在 SelectorProviderImpl 中默認的 ServerSocketChannel 實現類是 ServerSocketChannelImpl。windows

2、begin 和 close 是什麼

begin() 和 end() 老是配對使用的,Channel 和 Selector 均有本身的實現,所完成的功能也是有所區別的。一般這兩個方法的使用以下:安全

2.1 AbstractInterruptibleChannel 中的 begin 和 close

boolean completed = false;
try {
   begin();
   completed = ...;    // Perform blocking I/O operation
   return ...;         // Return result
} finally {
   end(completed);
}

AbstractInterruptibleChannel 中最重要的方法是 begin 和 end,它們的功能是什麼呢?網絡

protected final void begin() {
    if (interruptor == null) {
        interruptor = new Interruptible() {
                public void interrupt(Thread target) {
                    synchronized (closeLock) {
                        if (!open)
                            return;
                        open = false;
                        interrupted = target;
                        try {
                            // Channel 中的 begin 就作了一件事,關閉 channel
                            AbstractInterruptibleChannel.this.implCloseChannel();
                        } catch (IOException x) { }
                    }
                }};
    }
    blockedOn(interruptor);     // t.blockedOn(b)
    Thread me = Thread.currentThread();
    if (me.isInterrupted())
        interruptor.interrupt(me);
}

protected final void end(boolean completed) throws AsynchronousCloseException {
    blockedOn(null);
    Thread interrupted = this.interrupted;
    if (interrupted != null && interrupted == Thread.currentThread()) {
        interrupted = null;
        throw new ClosedByInterruptException();
    }
    if (!completed && !open)
        throw new AsynchronousCloseException();
}

總結: Channel 中的 begin 就幹了一件事,關閉 channel。咱們先試想這樣一個場景,Channel 要讀寫數據時所在線程被中斷了,會發生什麼事?線程既然被中斷了,Channel 總要關閉吧。事實上 Channel 的 begin 和 end 就是作這個事情的。不過要理解 begin 和 end,彷佛咱們先得弄明白 AbstractInterruptibleChannel.blockedOn 究竟在幹什麼:多線程

static void blockedOn(Interruptible intr) {         // package-private
    sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(), intr);
}

其中 JavaLangAccess 接口在 java.lang.System 中被實例化,它是這樣寫的:app

private static void setJavaLangAccess() {
    // Allow privileged classes outside of java.lang
    sun.misc.SharedSecrets.setJavaLangAccess(new sun.misc.JavaLangAccess(){
        public void blockedOn(Thread t, Interruptible b) {
            t.blockedOn(b);
        }
    }
}

如今咱們發現,JavaLangAccess 的 blockedOn 實現,竟然只有這麼一句 t.blockedOn(b)。繼續跟蹤到 java.lang.Thread 中 blockedOn 的實現了:socket

private volatile Interruptible blocker;
void blockedOn(Interruptible b) {
    synchronized (blockerLock) {
        blocker = b;
    }
}

實際上就是 Thread 線程上註冊了一個鉤子方法,當線程中斷時,即調用 thread.interrupt() 時會回調這個 b.interrupt(this) 方法,進而關閉線程對應的 Channel。

public void interrupt() {
    if (this != Thread.currentThread())
        checkAccess();

    synchronized (blockerLock) {
        Interruptible b = blocker;
        if (b != null) {
            interrupt0();           // Just to set the interrupt flag
            b.interrupt(this);
            return;
        }
    }
    interrupt0();
}

2.2 Selector 中的 begin 和 end

protected final void begin() {
    if (interruptor == null) {
        interruptor = new Interruptible() {
                public void interrupt(Thread ignore) {
                    // 線程中斷時喚醒 selector
                    AbstractSelector.this.wakeup();
                }};
    }
    AbstractInterruptibleChannel.blockedOn(interruptor);
    Thread me = Thread.currentThread();
    if (me.isInterrupted())
        interruptor.interrupt(me);
}

protected final void end() {
    AbstractInterruptibleChannel.blockedOn(null);
}

總結: Selector 中的 begin 和 end 就是在線程中斷時喚醒對應的 selector,對應的使用以下:

protected int doSelect(long timeout) throws IOException {
    try {
        begin();
        pollWrapper.poll(timeout);
    } finally {
        end();
    }
    // ...
}

3、Channel 註冊

3.1 AbstractSelectableChannel 與 Channel 註冊相關屬性

private SelectionKey[] keys = null;
private int keyCount = 0;
// Lock for key set and count
private final Object keyLock = new Object();

// Blocking mode, protected by regLock
boolean blocking = true;
// Lock for registration and configureBlocking operations
private final Object regLock = new Object();
  • keys、keyCount Channel 註冊後的 SelectionKey 集合和數量,也就是說一個 Channel 能夠註冊到多個 Selector 上。keyLock 保證多線程下 keys、keyCount 操做時的數據安全。

  • blocking Channel 是不是阻塞的。regLock 保證多線程下 blocking 操做的線程安全,同時註冊時也須要加鎖。

3.2 register 方法

Channel 註冊

public final SelectionKey register(Selector sel, int ops, Object att)
        throws ClosedChannelException {
    synchronized (regLock) {
        if (!isOpen())
            throw new ClosedChannelException();
        // 註冊的感興趣事件不合法
        if ((ops & ~validOps()) != 0)
            throw new IllegalArgumentException();
        if (blocking)
            throw new IllegalBlockingModeException();
        // 1. 判斷 Channel 是不是在這個 Selector 上是否已經註冊
        SelectionKey k = findKey(sel);
        // 2. 若是已經註冊,更新感興趣事件
        if (k != null) {
            k.interestOps(ops);
            k.attach(att);
        }
        // 3. 若是沒有註冊,委託 seletor.register 完成註冊
        if (k == null) {
            // New registration
            synchronized (keyLock) {
                if (!isOpen())
                    throw new ClosedChannelException();
                k = ((AbstractSelector)sel).register(this, ops, att);
                addKey(k);
            }
        }
        return k;
    }
}

總結: AbstractSelectableChannel 能夠註冊到多個 Selector 上,keys 屬性管理了全部已經註冊的 SelectionKey。對於已經註冊的 SelectionKey 和未註冊的處理邏輯並不一樣。具體步驟以下:

  1. 判斷 Channel 是不是在這個 Selector 上是否已經註冊。
  2. 若是 Channel 已經註冊了,更新註冊的感興趣事件。
  3. 判斷 Channel 未註冊,則委託給 sel 完成註冊。

3.3 SelectionKey.interestOps 事件註冊

SelectionKeyImpl(SelChImpl ch, SelectorImpl sel) {
    channel = ch;
    selector = sel;
}

SelectionKeyImpl 是對 Channel 和 Seletor 的封裝,具體的事件註冊仍是委託給了 channel 完成。

public SelectionKey interestOps(int ops) {
    ensureValid();
    return nioInterestOps(ops);
}

public SelectionKey nioInterestOps(int ops) {
    if ((ops & ~channel().validOps()) != 0)
        throw new IllegalArgumentException();
    channel.translateAndSetInterestOps(ops, this);
    interestOps = ops;
    return this;
}

總結: SelectionKey.interestOps 事件註冊繞了一圈,最後發現又委託給了 Channel 來完成。

// ServerSocketChannel 事件註冊
public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
    int newOps = 0;

    // Translate ops
    if ((ops & SelectionKey.OP_ACCEPT) != 0)
        newOps |= Net.POLLIN;
    // Place ops into pollfd array
    sk.selector.putEventOps(sk, newOps);
}

總結: 事件註冊須要注意的是 OP_READ、OP_WRITE、OP_CONNECT、OP_ACCEPT 只是 JDK 對底層 IO 事件的封裝,在註冊前須要將 JDK IO 事件轉換成 Linux NIO 事件。最終事件的註冊都是委託給了 Selector 完成,因此才說 Selector 纔是 NIO 的核心。

// Net
public static final short POLLIN;   // 讀事件
public static final short POLLOUT;  // 寫事件
public static final short POLLERR;
public static final short POLLHUP;
public static final short POLLNVAL;
public static final short POLLCONN;

4、Channel.accept

public SocketChannel accept() throws IOException {
    synchronized (lock) {
        // 1. 鏈接校驗
        if (!isOpen())
            throw new ClosedChannelException();
        if (!isBound())
            throw new NotYetBoundException();
        SocketChannel sc = null;

        // 2. newfd、isaa 若是有新的 socket 鏈接,則會經過 accept 賦值
        int n = 0;
        FileDescriptor newfd = new FileDescriptor();
        InetSocketAddress[] isaa = new InetSocketAddress[1];

        // 3. begin、end 配套使用則能夠響應線程中斷,關閉 channel
        try {
            begin();
            if (!isOpen())
                return null;
            thread = NativeThread.current();
            for (;;) {
                // 4. 接收新的鏈接請求
                n = accept(this.fd, newfd, isaa);
                if ((n == IOStatus.INTERRUPTED) && isOpen())
                    continue;
                break;
            }
        } finally {
            thread = 0;
            end(n > 0);
            assert IOStatus.check(n);
        }

        if (n < 1)
            return null;
        // 4. socket 默認是阻塞的,因此若是是 NIO 編程須要手動設置成 false
        IOUtil.configureBlocking(newfd, true);
        InetSocketAddress isa = isaa[0];
        sc = new SocketChannelImpl(provider(), newfd, isa);
        // 5. 返回新的 socket 請求
        return sc;
    }
}

private int accept(FileDescriptor ssfd, FileDescriptor newfd,
                   InetSocketAddress[] isaa) throws IOException {
    return accept0(ssfd, newfd, isaa);
}

總結: ServerSocketChannel 接收新的鏈接請求步驟和 BIO 相似,沒有什麼區別,大體有如下步驟:

  1. channel 鏈接校驗
  2. begin、end 配套使用則能夠響應線程中斷,關閉 channel
  3. 調用 accept 接收新的鏈接請求
  4. socket 默認是阻塞的,因此若是是 NIO 編程須要手動設置成 false

5、Channel 關閉

close() 操做限於通道,並且仍是實現了 InterruptibleChannel 接口的通道,例如 FileChannel 就沒有 close 操做。

在分析 close() 具體實現以前,咱們先得理解爲何要有 close() 這個操做:一個可選擇的通道,在建立之初會生成一個 FileDescriptor,linux 下即爲 fd,windows 下即爲句柄,這些都是系統資源,不能無限佔用,當在不使用的時候,就應該將其釋放,close 便是完成這個工做的。

Channel 關閉

5.1 Channel.close

抽象類 AbstractInterruptibleChannel 實現了 InterruptibleChannel 接口,而 SelectableChannel 繼承自 AbstractInterruptibleChannel,所以,可選擇的通道同時也是能夠 close 的。AbstractInterruptibleChannel 的 close 實現以下:

public final void close() throws IOException {
    synchronized (closeLock) {
        if (!open)
            return;
        open = false;
        implCloseChannel();
    }
}

來具體關閉邏輯就在 implCloseChannel() 中了,因而再看 AbstractSelectableChannel:

protected final void implCloseChannel() throws IOException {
    implCloseSelectableChannel();
    synchronized (keyLock) {
        int count = (keys == null) ? 0 : keys.length;
        for (int i = 0; i < count; i++) {
            SelectionKey k = keys[i];
            if (k != null)
                k.cancel();
        }
    }
}

先看 synchronized 同步塊,它將當前通道保存的 SelectionKey 所有 cancel,意思就是說,當前通關閉了,與它相關的全部 SelectionKey 都沒有意義了,因此要所有取消掉,以前講解 cancel 過程已經說明了,cancel 操做只是將 SelectionKey 加入對 應選擇器的 cancelKeys 集合中,在下次正式選擇開始的時候再一一清除;

這麼看來,仍是應該追究一下 implCloseSelectableChannel() 的實現了,下面分別從 ServerSocketChannel 和 SocketChannel 實現出發:

先看 ServerSocketChannelImpl

protected void implCloseSelectableChannel() throws IOException {
    synchronized (stateLock) {
        if (state != ST_KILLED)
            nd.preClose(fd);
        long th = thread;
        if (th != 0)
            NativeThread.signal(th);
        if (!isRegistered())
            kill();
    }
}

出現了兩個很奇怪的東西,看來要徹底弄懂這段代碼,是得好好分析一下它們了,它們是:NativeDispatcher nd 和 NativeThread;

若是已經對 linux 信號機制很是熟悉,應該很容易猜想到 NativeThread.signal(th) 在作什麼,是的,它在喚醒阻塞的線程 th,下面咱們來看看它是如何作到的:

5.2 NativeThread

NativeThread 類很是簡單,幾乎全是 native 方法:

class NativeThread {
    static native long current();
    static native void signal(long nt);
    static native void init();
    static {
        Util.load();
        init();
    }
}

在看其本地實現:

//自定義中斷信號,kill –l
#define INTERRUPT_SIGNAL (__SIGRTMAX - 2)
//自定義的信號處理函數,當前函數什麼都不作
static void nullHandler(int sig) {
}
#endif
//NativeThread.init()的本地實現,能夠看到它用到了sigaction
//sigaction用來install一個信號
JNIEXPORT void JNICALL 
Java_sun_nio_ch_NativeThread_init(JNIEnv *env, jclass cl) {
    #ifdef __linux__
        sigset_t ss;
        // 如下這段代碼是常見的信號安裝過程
        // 講解這段代碼的目的只是爲了讓你們理解NativeThread.signal
        // 的工做原理,故不少細節就簡單帶過了
        struct sigaction sa, osa;
        // sa用於定製信號INTERRUPT_SIGNAL的處理方式的
        // 如sa_handler = nullHandler即用來指定信號處理函數的
        // 即線程收到信號時,爲執行這個函數,nullHandler是個空殼
        // 函數,因此它什麼都不作
        // 不用理解sa_flags各個標識表明什麼
        // sigemptyset顧名思義,它是初始化sigaction的sa_mask位
        // sigaction(INTERRUPT_SIGNAL, &sa, &osa)執行後
        // 若是成功,則表示INTERRUPT_SIGNAL這個信號安裝成功了
        // 爲何要有這個init呢,其實不用這不操做也許不會有問題
        // 但由於不能確保INTERRUPT_SIGNAL沒有被其餘線程install
        // 過,若是sa_handler對應函數不是空操做,則在使用這個信號
        // 時會對當前線程有影響
        sa.sa_handler = nullHandler;
        sa.sa_flags = 0;
        sigemptyset(&sa.sa_mask);
        if (sigaction(INTERRUPT_SIGNAL, &sa, &osa) < 0)
        JNU_ThrowIOExceptionWithLastError(env, "sigaction");
    #endif
}
 
JNIEXPORT jlong JNICALL
Java_sun_nio_ch_NativeThread_current(JNIEnv *env, jclass cl) {
    #ifdef __linux__
    // pthread_self()便是獲取當前線程ID,它與getpid()是不一樣的
    // 具體細節沒有研究
        return (long)pthread_self();
    #else
        return -1;
    #endif
}
 
JNIEXPORT void JNICALL
Java_sun_nio_ch_NativeThread_signal(JNIEnv *env, jclass cl, jlong thread) {
    #ifdef __linux__
    //這個就是最關鍵的signal實現了,能夠看到,它調用了pthread庫的pthread_kill
    //像thread線程發送一個INTERRUPT_SIGNAL信號,這個信號就是在init中install
    //的,對應的處理函數是空函數,也就是說,往thread線程發送一個信號,若是該線程處於
    //阻塞狀態,則會由於受到信號而終止阻塞,而若是處於非阻塞,則無影響
        if (pthread_kill((pthread_t)thread, INTERRUPT_SIGNAL))
        JNU_ThrowIOExceptionWithLastError(env, "Thread signal failed");
    #endif
}

Java 的 NativeThread 作靜態初始化時已經執行了 init,也就是說 INTERRUPT_SIGNAL 信號已經被安裝,而 ServerSocketChannelImpl 在上述 accept 時可能賦值。

try {
    begin();
    if (!isOpen())
    return null;
    thread = NativeThread.current();
    for (;;) {
        n = accept0(this.fd, newfd, isaa);
        if ((n == IOStatus.INTERRUPTED) && isOpen())
        continue;
        break;
    }
} finally {
    thread = 0;
    end(n > 0);
    assert IOStatus.check(n);
}

try 的內部,for 循環以前,thread 被複製爲 NativeThread.current() 即爲當前線程 id;finally 時 thread 又被修改回 0,所以在 implCloseSelectableChannel 纔有這樣一段:

if (th != 0)
        NativeThread.signal(th);

NativeThread.signal(th) 經過像當前線程發送 INTERRUPT_SIGNAL 信號而確保 th 線程沒有被阻塞,即若是阻塞就中止阻塞。

5.3 NativeDispatcher

如今理解了 NativeThread 了,咱們再看 NativeDispatcher
首先咱們得知道在 ServerSocketChannelImpl 中,nd 被初始化爲 SocketDispatcher,見:

static {
    Util.load();
    initIDs();
    nd = new SocketDispatcher();
}

又由於 linux 下一切皆文件的思想(現實雖然不絕對),SocketDispatcher 其實就是用 FileDispatcher 實現的,最終 FileDispatcher 也只是封裝了一大堆 native 方法,一波三折,關於 FileDispatcher,這裏先不詳細講解了,先針對 nd.preClose(fd) 和 kill 將 implCloseSelectableChannel 的過程說明白吧:

首先,咱們要明白這樣一個道理:在多線程環境下,老是很難知道何時可安全的關閉或釋放資源(如fd),當一個線程 A 使用 fd 來讀寫,而另外一個線程 B 關閉或釋放了 fd,則 A 線程就會讀寫一個錯誤的文件或 socket;爲了防止這種狀況出現,因而 NIO 就採用了經典的 two-step 處理方案:

第一步:建立一個 socket pair,假設 FDs 爲 sp[2],先 close 掉 sp[1],這樣,該 socket pair 就成爲了一個半關閉的連接;複製 (dup2)sp[0] 到 fd(即爲咱們想關閉或釋放的fd),這個時候,其餘線程若是正在讀寫當即會得到 EOF 或者 Pipe Error,read 或 write 方法裏會檢測這些狀態作相應處理;

第二步:最後一個會使用到 fd 的線程負責釋放
nd.preClose(fd) 即爲兩步曲中的第一步,咱們先來看其實現,最終定位到 FileDispatcher.c,相關代碼以下:

static int preCloseFD = -1; 
 
JNIEXPORT void JNICALL
Java_sun_nio_ch_FileDispatcher_init(JNIEnv *env, jclass cl)
{
    int sp[2];
    if (socketpair(PF_UNIX, SOCK_STREAM, 0, sp) < 0) {
    JNU_ThrowIOExceptionWithLastError(env, "socketpair failed");
        return;
    }
    preCloseFD = sp[0];
    close(sp[1]);
}
JNIEXPORT void JNICALL
Java_sun_nio_ch_FileDispatcher_preClose0(JNIEnv *env, jclass clazz, jobject fdo)
{
    jint fd = fdval(env, fdo);
    if (preCloseFD >= 0) {
    if (dup2(preCloseFD, fd) < 0)
        JNU_ThrowIOExceptionWithLastError(env, "dup2 failed");
    }
}

從上面兩個函數實現,咱們能夠看到,在 init 函數中,建立了一個半關閉的 socket pair,preCloseFD 即爲未關閉的一端,init 在靜態初始化時就會被執行;再來看關鍵的 preClose0,它的確是採用 dup2 來複制 preCloseFD,這樣一來,fd 就被替換成了 preCloseFD,這正是 socket pair 中未被關閉的一端。

既然 nd.preClose(fd) 只是預關閉,則真正執行關閉的邏輯確定在這個 kill 中了,從代碼邏輯上仍是比較好懂的,if (!isRegistered()) 即表示該通道沒有被註冊,表示全部 Selector 都沒有意願關心這個通道了,則天然能夠放心的關閉 fd。
果斷猜想 kill 中有 nd.close(fd) 這樣的代碼,不信請看:

public void kill() throws IOException {
    synchronized (stateLock) {
        if (state == ST_KILLED)
        return;
        if (state == ST_UNINITIALIZED) {
                state = ST_KILLED;
        return;
            }
        assert !isOpen() && !isRegistered();
        nd.close(fd);
        state = ST_KILLED;
    }
}

果真如此,這樣一來,關閉二步曲就可以較安全的釋放咱們的fd資源了,至於 nd.close(fd)的本地實現,這裏就不講了,確定是採用了 close(fd) 的系統調用。總的來講,通道的 close 就是爲了斷開它與內核 fd 的那點聯繫。


天天用心記錄一點點。內容也許不重要,但習慣很重要!

相關文章
相關標籤/搜索