BIO到NIO源碼的一些事兒之NIO 下 之 Selector

前言

此係列文章會詳細解讀NIO的功能逐步豐滿的路程,爲Reactor-Netty 庫的講解鋪平道路。java

關於Java編程方法論-Reactor與Webflux的視頻分享,已經完成了Rxjava 與 Reactor,b站地址以下:linux

Rxjava源碼解讀與分享:www.bilibili.com/video/av345…git

Reactor源碼解讀與分享:www.bilibili.com/video/av353…github

本系列源碼解讀基於JDK11 api細節可能與其餘版本有所差異,請自行解決jdk版本問題。編程

本系列前幾篇:windows

BIO到NIO源碼的一些事兒之BIOapi

BIO到NIO源碼的一些事兒之NIO 上數組

BIO到NIO源碼的一些事兒之NIO 中安全

SelectionKey的引入

如咱們在前面內容所講,在學生肯定以後,咱們就要對其狀態進行設定,而後再交由Selector進行管理,其狀態的設定咱們就經過SelectionKey來進行。數據結構

那這裏咱們先經過以前在Channel中並未仔細講解的SelectableChannel下的register方法。咱們前面有提到過, SelectableChannelchannel打形成能夠經過Selector來進行多路複用。做爲管理者,channel想要實現複用,就必須在管理者這裏進行註冊登記。因此,SelectableChannel下的register方法也就是咱們值得二次關注的核心了,也是對接咱們接下來內容的切入點,對於register方法的解讀,請看咱們以前的文章BIO到NIO源碼的一些事兒之NIO 上賦予Channel可被多路複用的能力這一節的內容。

這裏要記住的是SelectableChannel是對接channel特徵(即SelectionKey)的關鍵所在,這有點相似於表設計,本來能夠將特徵什麼的設定在一張表內,但爲了操做更加具備針對性,即爲了讓代碼功能更易於管理,就進行抽取並設計了第二張表,這個就有點像人體器官,總體上你們共同協做完成一件事,但器官內部本身專一於本身的主要特定功能,偶爾也具有其餘器官的一些小功能。

由此,咱們也就能夠知道,SelectionKey表示一個SelectableChannelSelector關聯的標記,能夠簡單理解爲一個token。就比如是咱們作權限管理系統用戶登陸後前臺會從後臺拿到的一個token同樣,用戶能夠憑藉此token來訪問操做相應的資源信息。

//java.nio.channels.spi.AbstractSelectableChannel#register
public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException {       ...
    synchronized (regLock) {
       ...
        synchronized (keyLock) {
           ...
            SelectionKey k = findKey(sel);
            if (k != null) {
                k.attach(att);
                k.interestOps(ops);
            } else {
                // New registration
                k = ((AbstractSelector)sel).register(this, ops, att);
                addKey(k);
            }
            return k;
        }
    }
}

複製代碼

結合上下兩段源碼,在每次Selector使用register方法註冊channel時,都會建立並返回一個SelectionKey

//sun.nio.ch.SelectorImpl#register
@Override
protected final SelectionKey register(AbstractSelectableChannel ch, int ops, Object attachment) {
    if (!(ch instanceof SelChImpl))
        throw new IllegalSelectorException();
    SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
    k.attach(attachment);

    // register (if needed) before adding to key set
    implRegister(k);

    // add to the selector's key set, removing it immediately if the selector
    // is closed. The key is not in the channel's key set at this point but
    // it may be observed by a thread iterating over the selector's key set.
    keys.add(k);
    try {
        k.interestOps(ops);
    } catch (ClosedSelectorException e) {
        assert ch.keyFor(this) == null;
        keys.remove(k);
        k.cancel();
        throw e;
    }
    return k;
}
複製代碼

咱們在BIO到NIO源碼的一些事兒之NIO 上賦予Channel可被多路複用的能力這一節的內容知道,一旦註冊到Selector上,Channel將一直保持註冊直到其被解除註冊。在解除註冊的時候會解除Selector分配給Channel的全部資源。 也就是SelectionKey在其調用SelectionKey#channel方法,或這個key所表明的channel 關閉,抑或此key所關聯的Selector關閉以前,都是有效。咱們在前面的文章分析中也知道,取消一個SelectionKey,不會馬上從Selector移除,它將被添加到SelectorcancelledKeys這個Set集合中,以便在下一次選擇操做期間刪除,咱們能夠經過java.nio.channels.SelectionKey#isValid判斷一個SelectionKey是否有效。

SelectionKey包含四個操做集,每一個操做集用一個Int來表示,int值中的低四位的bit 用於表示channel支持的可選操做種類。

/** * Operation-set bit for read operations. */
   public static final int OP_READ = 1 << 0;

   /** * Operation-set bit for write operations. */
   public static final int OP_WRITE = 1 << 2;

   /** * Operation-set bit for socket-connect operations. */
   public static final int OP_CONNECT = 1 << 3;

   /** * Operation-set bit for socket-accept operations. */
   public static final int OP_ACCEPT = 1 << 4;
複製代碼

interestOps

經過interestOps來肯定了selector在下一個選擇操做的過程當中將測試哪些操做類別的準備狀況,操做事件是不是channel關注的。interestOpsSelectionKey建立時,初始化爲註冊Selector時的ops值,這個值可經過sun.nio.ch.SelectionKeyImpl#interestOps(int)來改變,這點咱們在SelectorImpl#register能夠清楚的看到。

//sun.nio.ch.SelectionKeyImpl
public final class SelectionKeyImpl extends AbstractSelectionKey {
   private static final VarHandle INTERESTOPS =
           ConstantBootstraps.fieldVarHandle(
                   MethodHandles.lookup(),
                   "interestOps",
                   VarHandle.class,
                   SelectionKeyImpl.class, int.class);

   private final SelChImpl channel;
   private final SelectorImpl selector;

   private volatile int interestOps;
   private volatile int readyOps;

   // registered events in kernel, used by some Selector implementations
   private int registeredEvents;

   // index of key in pollfd array, used by some Selector implementations
   private int index;

   SelectionKeyImpl(SelChImpl ch, SelectorImpl sel) {
       channel = ch;
       selector = sel;
   }
  ...
}
複製代碼

readyOps

readyOps表示經過Selector檢測到channel已經準備就緒的操做事件。在SelectionKey建立時(即上面源碼所示),readyOps值爲0,在Selectorselect操做中可能會更新,可是須要注意的是咱們不能直接調用來更新。

SelectionKeyreadyOps表示一個channel已經爲某些操做準備就緒,但不能保證在針對這個就緒事件類型的操做過程當中不會發生阻塞,即該操做所在線程有可能會發生阻塞。在完成select操做後,大部分狀況下會當即對readyOps更新,此時readyOps值最準確,若是外部的事件或在該channel有IO操做,readyOps可能不許確。因此,咱們有看到其是volatile類型。

SelectionKey定義了全部的操做事件,可是具體channel支持的操做事件依賴於具體的channel,即具體問題具體分析。 全部可選擇的channel(即SelectableChannel的子類)均可以經過SelectableChannel#validOps方法,判斷一個操做事件是否被channel所支持,即每一個子類都會有對validOps的實現,返回一個數字,僅標識channel支持的哪些操做。嘗試設置或測試一個不被channel所支持的操做設定,將會拋出相關的運行時異常。 不一樣應用場景下,其所支持的Ops是不一樣的,摘取部分以下所示:

//java.nio.channels.SocketChannel#validOps
public final int validOps() {
    //即1|4|8 1101
    return (SelectionKey.OP_READ
            | SelectionKey.OP_WRITE
            | SelectionKey.OP_CONNECT);
}
//java.nio.channels.ServerSocketChannel#validOps
public final int validOps() {
    // 16
    return SelectionKey.OP_ACCEPT;
}
//java.nio.channels.DatagramChannel#validOps
public final int validOps() {
    // 1|4
    return (SelectionKey.OP_READ
            | SelectionKey.OP_WRITE);
}
複製代碼

若是須要常常關聯一些咱們程序中指定數據到SelectionKey,好比一個咱們使用一個object表示上層的一種高級協議的狀態,object用於通知實現協議處理器。因此,SelectionKey支持經過attach方法將一個對象附加到SelectionKeyattachment上。attachment能夠經過java.nio.channels.SelectionKey#attachment方法進行訪問。若是要取消該對象,則能夠經過該種方式:selectionKey.attach(null)

須要注意的是若是附加的對象再也不使用,必定要人爲清除,若是沒有,假如此SelectionKey一直存在,因爲此處屬於強引用,那麼垃圾回收器不會回收該對象,若不清除的話會成內存泄漏。

SelectionKey在由多線程併發使用時,是線程安全的。咱們只須要知道,Selectorselect操做會一直使用在調用該操做開始時當前的interestOps所設定的值。

Selector探究

到如今爲止,咱們已經多多少少接觸了Selector,其是一個什麼樣的角色,想必都很清楚了,那咱們就在咱們已經接觸到的來進一步深刻探究Selector的設計運行機制。

Selector的open方法

從命名上就能夠知道 SelectableChannel對象是依靠Selector來實現多路複用的。 咱們能夠經過調用java.nio.channels.Selector#open來建立一個selector對象:

//java.nio.channels.Selector#open
public static Selector open() throws IOException {
    return SelectorProvider.provider().openSelector();
}
複製代碼

關於這個SelectorProvider.provider(),其使用了根據所在系統的默認實現,我這裏是windows系統,那麼其默認實現爲sun.nio.ch.WindowsSelectorProvider,這樣,就能夠調用基於相應系統的具體實現了。

//java.nio.channels.spi.SelectorProvider#provider
public static SelectorProvider provider() {
    synchronized (lock) {
        if (provider != null)
            return provider;
        return AccessController.doPrivileged(
            new PrivilegedAction<>() {
                public SelectorProvider run() {
                        if (loadProviderFromProperty())
                            return provider;
                        if (loadProviderAsService())
                            return provider;
                        provider = sun.nio.ch.DefaultSelectorProvider.create();
                        return provider;
                    }
                });
    }
}
//sun.nio.ch.DefaultSelectorProvider
public class DefaultSelectorProvider {

/** * Prevent instantiation. */
private DefaultSelectorProvider() { }

/** * Returns the default SelectorProvider. */
public static SelectorProvider create() {
    return new sun.nio.ch.WindowsSelectorProvider();
}

}
複製代碼

基於windows來說,selector這裏最終會使用sun.nio.ch.WindowsSelectorImpl來作一些核心的邏輯。

public class WindowsSelectorProvider extends SelectorProviderImpl {

    public AbstractSelector openSelector() throws IOException {
        return new WindowsSelectorImpl(this);
    }
}
複製代碼

這裏,咱們須要來看一下WindowsSelectorImpl的構造函數:

//sun.nio.ch.WindowsSelectorImpl#WindowsSelectorImpl
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
    super(sp);
    pollWrapper = new PollArrayWrapper(INIT_CAP);
    wakeupPipe = Pipe.open();
    wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();

    // Disable the Nagle algorithm so that the wakeup is more immediate
    SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
    (sink.sc).socket().setTcpNoDelay(true);
    wakeupSinkFd = ((SelChImpl)sink).getFDVal();

    pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
複製代碼

咱們由Pipe.open()就可知道selector會保持打開的狀態,直到其調用它的close方法:

//java.nio.channels.spi.AbstractSelector#close
public final void close() throws IOException {
    boolean open = selectorOpen.getAndSet(false);
    if (!open)
        return;
    implCloseSelector();
}
//sun.nio.ch.SelectorImpl#implCloseSelector
@Override
public final void implCloseSelector() throws IOException {
    wakeup();
    synchronized (this) {
        implClose();
        synchronized (publicSelectedKeys) {
            // Deregister channels
            Iterator<SelectionKey> i = keys.iterator();
            while (i.hasNext()) {
                SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
                deregister(ski);
                SelectableChannel selch = ski.channel();
                if (!selch.isOpen() && !selch.isRegistered())
                    ((SelChImpl)selch).kill();
                selectedKeys.remove(ski);
                i.remove();
            }
            assert selectedKeys.isEmpty() && keys.isEmpty();
        }
    }
}
//sun.nio.ch.WindowsSelectorImpl#implClose
@Override
protected void implClose() throws IOException {
    assert !isOpen();
    assert Thread.holdsLock(this);

    // prevent further wakeup
    synchronized (interruptLock) {
        interruptTriggered = true;
    }

    wakeupPipe.sink().close();
    wakeupPipe.source().close();
    pollWrapper.free();

    // Make all remaining helper threads exit
    for (SelectThread t: threads)
            t.makeZombie();
    startLock.startThreads();
}

複製代碼

能夠看到,前面的wakeupPipe在close方法中關閉掉了。這裏的close方法中又涉及了wakeupPipe.sink()wakeupPipe.source()的關閉與pollWrapper.free()的釋放,此處也是咱們本篇的難點所在,這裏,咱們來看看它們究竟是什麼樣的存在。 首先,咱們對WindowsSelectorImpl(SelectorProvider sp)這個構造函數作下梳理:

  • 建立一個PollArrayWrapper對象(pollWrapper);
  • Pipe.open()打開一個管道;
  • 拿到wakeupSourceFdwakeupSinkFd兩個文件描述符;
  • 把pipe內Source端的文件描述符(wakeupSourceFd)放到pollWrapper裏;

Pipe.open()的解惑

這裏咱們會有疑惑,爲何要建立一個管道,它是用來作什麼的。

咱們來看Pipe.open()源碼實現:

//java.nio.channels.Pipe#open
public static Pipe open() throws IOException {
    return SelectorProvider.provider().openPipe();
}
//sun.nio.ch.SelectorProviderImpl#openPipe
public Pipe openPipe() throws IOException {
    return new PipeImpl(this);
}
//sun.nio.ch.PipeImpl#PipeImpl
PipeImpl(final SelectorProvider sp) throws IOException {
    try {
        AccessController.doPrivileged(new Initializer(sp));
    } catch (PrivilegedActionException x) {
        throw (IOException)x.getCause();
    }
}
private class Initializer implements PrivilegedExceptionAction<Void> {

private final SelectorProvider sp;

private IOException ioe = null;

private Initializer(SelectorProvider sp) {
    this.sp = sp;
}

@Override
public Void run() throws IOException {
    LoopbackConnector connector = new LoopbackConnector();
    connector.run();
    if (ioe instanceof ClosedByInterruptException) {
        ioe = null;
        Thread connThread = new Thread(connector) {
            @Override
            public void interrupt() {}
        };
        connThread.start();
        for (;;) {
            try {
                connThread.join();
                break;
            } catch (InterruptedException ex) {}
        }
        Thread.currentThread().interrupt();
    }

    if (ioe != null)
        throw new IOException("Unable to establish loopback connection", ioe);

    return null;
}
複製代碼

從上述源碼咱們能夠知道,建立了一個PipeImpl對象, 在PipeImpl的構造函數裏會執行AccessController.doPrivileged,在它調用後緊接着會執行Initializerrun方法:

//sun.nio.ch.PipeImpl.Initializer.LoopbackConnector
private class LoopbackConnector implements Runnable {

    @Override
    public void run() {
        ServerSocketChannel ssc = null;
        SocketChannel sc1 = null;
        SocketChannel sc2 = null;

        try {
            // Create secret with a backing array.
            ByteBuffer secret = ByteBuffer.allocate(NUM_SECRET_BYTES);
            ByteBuffer bb = ByteBuffer.allocate(NUM_SECRET_BYTES);

            // Loopback address
            InetAddress lb = InetAddress.getLoopbackAddress();
            assert(lb.isLoopbackAddress());
            InetSocketAddress sa = null;
            for(;;) {
                // Bind ServerSocketChannel to a port on the loopback
                // address
                if (ssc == null || !ssc.isOpen()) {
                    ssc = ServerSocketChannel.open();
                    ssc.socket().bind(new InetSocketAddress(lb, 0));
                    sa = new InetSocketAddress(lb, ssc.socket().getLocalPort());
                }

                // Establish connection (assume connections are eagerly
                // accepted)
                sc1 = SocketChannel.open(sa);
                RANDOM_NUMBER_GENERATOR.nextBytes(secret.array());
                do {
                    sc1.write(secret);
                } while (secret.hasRemaining());
                secret.rewind();

                // Get a connection and verify it is legitimate
                sc2 = ssc.accept();
                do {
                    sc2.read(bb);
                } while (bb.hasRemaining());
                bb.rewind();

                if (bb.equals(secret))
                    break;

                sc2.close();
                sc1.close();
            }

            // Create source and sink channels
            source = new SourceChannelImpl(sp, sc1);
            sink = new SinkChannelImpl(sp, sc2);
        } catch (IOException e) {
            try {
                if (sc1 != null)
                    sc1.close();
                if (sc2 != null)
                    sc2.close();
            } catch (IOException e2) {}
            ioe = e;
        } finally {
            try {
                if (ssc != null)
                    ssc.close();
            } catch (IOException e2) {}
        }
    }
}
}
複製代碼

這裏即爲建立pipe的過程,windows下的實現是建立兩個本地的socketChannel,而後鏈接(鏈接的過程經過寫一個隨機數據作兩個socket的鏈接校驗),兩個socketChannel分別實現了管道pipesourcesink端。 而咱們依然不清楚這個pipe到底幹什麼用的, 假如你們熟悉系統調用的C/C++的話,就能夠知道,一個阻塞在select上的線程有如下三種方式能夠被喚醒:

  1. 有數據可讀/寫,或出現異常。
  2. 阻塞時間到,即time out
  3. 收到一個non-block的信號。可由killpthread_kill發出。

因此,Selector.wakeup()要喚醒阻塞的select,那麼也只能經過這三種方法,其中:

  • 第二種方法能夠排除,由於select一旦阻塞,沒法修改其time out時間。
  • 而第三種看來只能在Linux上實現,Windows上沒有這種信號通知的機制。

看來只有第一種方法了。假如咱們屢次調用Selector.open(),那麼在Windows上會每調用一次,就會創建一對本身和本身的loopbackTCP鏈接;在Linux上的話,每調用一次,會開一對pipe(pipe在Linux下通常都成對打開),到這裏,估計咱們可以猜得出來——那就是若是想要喚醒select,只須要朝着本身的這個loopback鏈接發點數據過去,因而,就能夠喚醒阻塞在select上的線程了。

咱們對上面所述作下總結:在Windows下,Java虛擬機在Selector.open()時會本身和本身創建loopbackTCP鏈接;在Linux下,Selector會建立pipe。這主要是爲了Selector.wakeup()能夠方便喚醒阻塞在select()系統調用上的線程(經過向本身所創建的TCP連接和管道上隨便寫點什麼就能夠喚醒阻塞線程)。

PollArrayWrapper解讀

WindowsSelectorImpl構造器最後,咱們看到這一句代碼:pollWrapper.addWakeupSocket(wakeupSourceFd, 0);,即把pipe內Source端的文件描述符(wakeupSourceFd)放到pollWrapper裏。pollWrapper做爲PollArrayWrapper的實例,它究竟是什麼,這一節,咱們就來對其探索一番。

class PollArrayWrapper {

    private AllocatedNativeObject pollArray; // The fd array

    long pollArrayAddress; // pollArrayAddress

    @Native private static final short FD_OFFSET     = 0; // fd offset in pollfd
    @Native private static final short EVENT_OFFSET  = 4; // events offset in pollfd

    static short SIZE_POLLFD = 8; // sizeof pollfd struct

    private int size; // Size of the pollArray

    PollArrayWrapper(int newSize) {
        int allocationSize = newSize * SIZE_POLLFD;
        pollArray = new AllocatedNativeObject(allocationSize, true);
        pollArrayAddress = pollArray.address();
        this.size = newSize;
    }

    ...

    // Access methods for fd structures
    void putDescriptor(int i, int fd) {
        pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);
    }

    void putEventOps(int i, int event) {
        pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event);
    }
    ...
   // Adds Windows wakeup socket at a given index.
    void addWakeupSocket(int fdVal, int index) {
        putDescriptor(index, fdVal);
        putEventOps(index, Net.POLLIN);
    }
}
複製代碼

這裏將wakeupSourceFdPOLLIN事件標識爲pollArrayEventOps的對應的值,這裏使用的是unsafe直接操做的內存,也就是相對於這個pollArray所在內存地址的偏移量SIZE_POLLFD * i + EVENT_OFFSET這個位置上寫入Net.POLLIN所表明的值,即參考下面本地方法相關源碼所展現的值。putDescriptor一樣是這種相似操做。當sink端有數據寫入時,source對應的文件描述符wakeupSourceFd就會處於就緒狀態。

//java.base/windows/native/libnio/ch/nio_util.h
    /* WSAPoll()/WSAPOLLFD and the corresponding constants are only defined   */
    /* in Windows Vista / Windows Server 2008 and later. If we are on an      */
    /* older release we just use the Solaris constants as this was previously */
    /* done in PollArrayWrapper.java.                                         */
    #define POLLIN       0x0001
    #define POLLOUT      0x0004
    #define POLLERR      0x0008
    #define POLLHUP      0x0010
    #define POLLNVAL     0x0020
    #define POLLCONN     0x0002
複製代碼

AllocatedNativeObject這個類的父類有大量的unsafe類的操做,這些都是直接基於內存級別的操做。從其父類的構造器中,咱們能也清楚的看到pollArray是經過unsafe.allocateMemory(size + ps)分配的一塊系統內存。

class AllocatedNativeObject // package-private extends NativeObject {
    /** * Allocates a memory area of at least {@code size} bytes outside of the * Java heap and creates a native object for that area. */
    AllocatedNativeObject(int size, boolean pageAligned) {
        super(size, pageAligned);
    }

    /** * Frees the native memory area associated with this object. */
    synchronized void free() {
        if (allocationAddress != 0) {
            unsafe.freeMemory(allocationAddress);
            allocationAddress = 0;
        }
    }

}
//sun.nio.ch.NativeObject#NativeObject(int, boolean)
protected NativeObject(int size, boolean pageAligned) {
        if (!pageAligned) {
            this.allocationAddress = unsafe.allocateMemory(size);
            this.address = this.allocationAddress;
        } else {
            int ps = pageSize();
            long a = unsafe.allocateMemory(size + ps);
            this.allocationAddress = a;
            this.address = a + ps - (a & (ps - 1));
        }
    }
複製代碼

至此,咱們算是完成了對Selector.open()的解讀,其主要任務就是完成創建Pipe,並把pipe source端的wakeupSourceFd放入pollArray中,這個pollArraySelector完成其角色任務的樞紐。本篇主要圍繞Windows的實現來進行分析,即在windows下經過兩個鏈接的socketChannel實現了Pipelinux下則直接使用系統的pipe便可。

SelectionKey在selector中的管理

SelectionKey在selector中註冊

所謂的註冊,其實就是將一個對象放到註冊地對象內的一個容器字段上,這個字段能夠是數組,隊列,也能夠是一個set集合,也能夠是一個list。這裏,一樣是這樣,只不過,其須要有個返回值,那麼把這個要放入集合的對象返回便可。

//sun.nio.ch.SelectorImpl#register
@Override
protected final SelectionKey register(AbstractSelectableChannel ch, int ops, Object attachment) {
    if (!(ch instanceof SelChImpl))
        throw new IllegalSelectorException();
    SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
    k.attach(attachment);

    // register (if needed) before adding to key set
    implRegister(k);

    // add to the selector's key set, removing it immediately if the selector
    // is closed. The key is not in the channel's key set at this point but
    // it may be observed by a thread iterating over the selector's key set.
    keys.add(k);
    try {
        k.interestOps(ops);
    } catch (ClosedSelectorException e) {
        assert ch.keyFor(this) == null;
        keys.remove(k);
        k.cancel();
        throw e;
    }
    return k;
}
//sun.nio.ch.WindowsSelectorImpl#implRegister
@Override
protected void implRegister(SelectionKeyImpl ski) {
    ensureOpen();
    synchronized (updateLock) {
        newKeys.addLast(ski);
    }
}
複製代碼

這段代碼咱們以前已經有看過,這裏咱們再次溫習下。 首先會新建一個SelectionKeyImpl對象,這個對象就是對Channel的包裝,不只如此,還順帶把當前這個Selector對象給收了進去,這樣,咱們也能夠經過SelectionKey的對象來拿到其對應的Selector對象。

接着,基於windows平臺實現的implRegister,先經過ensureOpen()來確保該Selector是打開的。接着將這個SelectionKeyImpl加入到WindowsSelectorImpl內針對於新註冊SelectionKey進行管理的newKeys之中,newKeys是一個ArrayDeque對象。對於ArrayDeque有不懂的,能夠參考Java 容器源碼分析之 Deque 與 ArrayDeque這篇文章。

而後再將此這個SelectionKeyImpl加入到sun.nio.ch.SelectorImpl#keys中去,這個Set<SelectionKey>集合表明那些已經註冊到當前這個Selector對象上的SelectionKey集合。咱們來看sun.nio.ch.SelectorImpl的構造函數:

//sun.nio.ch.SelectorImpl#SelectorImpl
protected SelectorImpl(SelectorProvider sp) {
    super(sp);
    keys = ConcurrentHashMap.newKeySet();
    selectedKeys = new HashSet<>();
    publicKeys = Collections.unmodifiableSet(keys);
    publicSelectedKeys = Util.ungrowableSet(selectedKeys);
}
複製代碼

也就是說,這裏的publicKeys就來源於keys,只是publicKeys屬於只讀的,咱們想要知道當前Selector對象上所註冊的keys,就能夠調用sun.nio.ch.SelectorImpl#keys來獲得:

//sun.nio.ch.SelectorImpl#keys
@Override
public final Set<SelectionKey> keys() {
    ensureOpen();
    return publicKeys;
}
複製代碼

再回到這個構造函數中,selectedKeys,顧名思義,其屬於已選擇Keys,即前一次操做期間,已經準備就緒的Channel所對應的SelectionKey。此集合爲keys的子集。經過selector.selectedKeys()獲取。

//sun.nio.ch.SelectorImpl#selectedKeys
@Override
public final Set<SelectionKey> selectedKeys() {
    ensureOpen();
    return publicSelectedKeys;
}
複製代碼

咱們看到其返回的是publicSelectedKeys,針對這個字段裏的元素操做能夠作刪除,但不能作增長。 在前面的內容中,咱們有涉及到SelectionKey的取消,因此,咱們在java.nio.channels.spi.AbstractSelector方法內,是有定義cancelledKeys的,也是一個HashSet對象。其表明已經被取消但還沒有取消註冊(deregister)的SelectionKey。此Set集合沒法直接訪問,一樣,它也是keys()的子集。

對於新的Selector實例,上面幾個集合均爲空。由上面展現的源碼可知,經過channel.registerSelectionKey添加keys中,此爲key的來源。 若是某個selectionKey.cancel()被調用,那麼此key將會被添加到cancelledKeys這個集合中,而後在下一次調用selector select方法期間,此時canceldKeys不爲空,將會觸發此SelectionKeyderegister操做(釋放資源,並從keys中移除)。不管經過channel.close()仍是經過selectionKey.cancel(),都會致使SelectionKey被加入到cannceldKey中.

每次選擇操做(select)期間,均可以將key添加到selectedKeys中或者將從cancelledKeys中移除。

Selector的select方法的解讀

瞭解了上面的這些,咱們來進入到select方法中,觀察下它的細節。由Selector的api可知,select操做有兩種形式,一種爲 select(),selectNow(),select(long timeout);另外一種爲select(Consumer<SelectionKey> action, long timeout)select(Consumer<SelectionKey> action)selectNow(Consumer<SelectionKey> action)。後者爲JDK11新加入的api,主要針對那些準備好進行I/O操做的channels在select過程當中對相應的key進行的一個自定義操做。

須要注意的是,有Consumer<SelectionKey> action參數的select操做是阻塞的,只有在選擇了至少一個Channel的狀況下,纔會調用此Selector實例的wakeup方法來喚醒,一樣,其所在線程被打斷也能夠。

//sun.nio.ch.SelectorImpl
@Override
public final int select(long timeout) throws IOException {
    if (timeout < 0)
        throw new IllegalArgumentException("Negative timeout");
    return lockAndDoSelect(null, (timeout == 0) ? -1 : timeout);
}

//sun.nio.ch.SelectorImpl
@Override
public final int select(Consumer<SelectionKey> action, long timeout) throws IOException {
    Objects.requireNonNull(action);
    if (timeout < 0)
        throw new IllegalArgumentException("Negative timeout");
    return lockAndDoSelect(action, (timeout == 0) ? -1 : timeout);
}
//sun.nio.ch.SelectorImpl#lockAndDoSelect
private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout) throws IOException {
        synchronized (this) {
            ensureOpen();
            if (inSelect)
                throw new IllegalStateException("select in progress");
            inSelect = true;
            try {
                synchronized (publicSelectedKeys) {
                    return doSelect(action, timeout);
                }
            } finally {
                inSelect = false;
            }
        }
    }

複製代碼

咱們能夠觀察,不管哪一種,它們最後都落在了lockAndDoSelect這個方法上,最終會執行特定系統上的doSelect(action, timeout)實現。 這裏咱們以sun.nio.ch.WindowsSelectorImpl#doSelect爲例來說述其操做執行的步驟:

// sun.nio.ch.WindowsSelectorImpl#doSelect
@Override
protected int doSelect(Consumer<SelectionKey> action, long timeout) throws IOException {
        assert Thread.holdsLock(this);
        this.timeout = timeout; // set selector timeout
        processUpdateQueue();  // <1>
        processDeregisterQueue(); // <2>
        if (interruptTriggered) {
            resetWakeupSocket();
            return 0;
        }
        // Calculate number of helper threads needed for poll. If necessary
        // threads are created here and start waiting on startLock
        adjustThreadsCount();
        finishLock.reset(); // reset finishLock
        // Wakeup helper threads, waiting on startLock, so they start polling.
        // Redundant threads will exit here after wakeup.
        startLock.startThreads();
        // do polling in the main thread. Main thread is responsible for
        // first MAX_SELECTABLE_FDS entries in pollArray.
        try {
            begin();
            try {
                subSelector.poll();  // <3>
            } catch (IOException e) {
                finishLock.setException(e); // Save this exception
            }
            // Main thread is out of poll(). Wakeup others and wait for them
            if (threads.size() > 0)
                finishLock.waitForHelperThreads();
          } finally {
              end();
          }
        // Done with poll(). Set wakeupSocket to nonsignaled for the next run.
        finishLock.checkForException();
        processDeregisterQueue();  // <4>
        int updated = updateSelectedKeys(action); // <5>
        // Done with poll(). Set wakeupSocket to nonsignaled for the next run.
        resetWakeupSocket(); // <6>
        return updated;
    }
複製代碼

processUpdateQueue解讀

  1. 首先經過相應操做系統實現類(此處是WindowsSelectorImpl)的具體實現咱們能夠知道,經過<1> 處的 processUpdateQueue()得到關於每一個剩餘Channel(有些Channel取消了)的在此刻的interestOps,這裏包括新註冊的和updateKeys,並對其進行pollWrapper的管理操做。

    • 即對於新註冊的SelectionKeyImpl,咱們在相對於這個pollArray所在內存地址的偏移量SIZE_POLLFD * totalChannels + FD_OFFSETSIZE_POLLFD * totalChannels + EVENT_OFFSET分別存入SelectionKeyImpl的文件描述符fd與其對應的EventOps(初始爲0)。

    • updateKeys,由於是其以前已經在pollArray的某個相對位置上存儲過,這裏咱們還須要對拿到的key的有效性進行判斷,若是有效,只須要將正在操做的這個SelectionKeyImpl對象的interestOps寫入到在pollWrapper中的存放它的EventOps位置上。

    注意: 在對newKeys進行key的有效性判斷以後,若是有效,會調用growIfNeeded()方法,這裏首先會判斷channelArray.length == totalChannels,此爲一個SelectionKeyImpl的數組,初始容量大小爲8。channelArray其實就是方便Selector管理在冊SelectionKeyImpl數量的一個數組而已,經過判斷它的數組長度大小,若是和totalChannels(初始值爲1)相等,不只僅是爲了channelArray擴容,更重要的是爲了輔助pollWrapper,讓pollWrapper擴容纔是目的所在。

    而當totalChannels % MAX_SELECTABLE_FDS == 0時,則多開一個線程處理selectorwindowsselect系統調用有最大文件描述符限制,一次只能輪詢1024個文件描述符,若是多於1024個,須要多線程進行輪詢。同時調用pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels)在相對於這個pollArray所在內存地址的偏移量SIZE_POLLFD * totalChannels + FD_OFFSET這個位置上寫入wakeupSourceFd所表明的fdVal值。這樣在新起的線程就能夠經過MAX_SELECTABLE_FDS來肯定這個用來監控的wakeupSourceFd,方便喚醒selector。經過ski.setIndex(totalChannels)記錄下SelectionKeyImpl在數組中的索引位置,以待後續使用。

/** * sun.nio.ch.WindowsSelectorImpl#processUpdateQueue * Process new registrations and changes to the interest ops. */
private void processUpdateQueue() {
    assert Thread.holdsLock(this);

    synchronized (updateLock) {
        SelectionKeyImpl ski;

        // new registrations
        while ((ski = newKeys.pollFirst()) != null) {
            if (ski.isValid()) {
                growIfNeeded();
                channelArray[totalChannels] = ski;
                ski.setIndex(totalChannels);
                pollWrapper.putEntry(totalChannels, ski);
                totalChannels++;
                MapEntry previous = fdMap.put(ski);
                assert previous == null;
            }
        }

        // changes to interest ops
        while ((ski = updateKeys.pollFirst()) != null) {
            int events = ski.translateInterestOps();
            int fd = ski.getFDVal();
            if (ski.isValid() && fdMap.containsKey(fd)) {
                int index = ski.getIndex();
                assert index >= 0 && index < totalChannels;
                pollWrapper.putEventOps(index, events);
            }
        }
    }
}

//sun.nio.ch.PollArrayWrapper#putEntry
// Prepare another pollfd struct for use.
void putEntry(int index, SelectionKeyImpl ski) {
    putDescriptor(index, ski.getFDVal());
    putEventOps(index, 0);
}
//sun.nio.ch.WindowsSelectorImpl#growIfNeeded
private void growIfNeeded() {
    if (channelArray.length == totalChannels) {
        int newSize = totalChannels * 2; // Make a larger array
        SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
        System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1);
        channelArray = temp;
        pollWrapper.grow(newSize);
    }
    if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed
        pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels);
        totalChannels++;
        threadsCount++;
    }
}
// Initial capacity of the poll array
private final int INIT_CAP = 8;
// Maximum number of sockets for select().
// Should be INIT_CAP times a power of 2
private static final int MAX_SELECTABLE_FDS = 1024;

// The list of SelectableChannels serviced by this Selector. Every mod
// MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll
// array, where the corresponding entry is occupied by the wakeupSocket
private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];
// The number of valid entries in poll array, including entries occupied
// by wakeup socket handle.
private int totalChannels = 1;

//sun.nio.ch.PollArrayWrapper#grow
// Grows the pollfd array to new size
void grow(int newSize) {
    PollArrayWrapper temp = new PollArrayWrapper(newSize);
    for (int i = 0; i < size; i++)
        replaceEntry(this, i, temp, i);
    pollArray.free();
    pollArray = temp.pollArray;
    this.size = temp.size;
    pollArrayAddress = pollArray.address();
}

// Maps file descriptors to their indices in pollArray
private static final class FdMap extends HashMap<Integer, MapEntry> {
    static final long serialVersionUID = 0L;
    private MapEntry get(int desc) {
        return get(Integer.valueOf(desc));
    }
    private MapEntry put(SelectionKeyImpl ski) {
        return put(Integer.valueOf(ski.getFDVal()), new MapEntry(ski));
    }
    private MapEntry remove(SelectionKeyImpl ski) {
        Integer fd = Integer.valueOf(ski.getFDVal());
        MapEntry x = get(fd);
        if ((x != null) && (x.ski.channel() == ski.channel()))
            return remove(fd);
        return null;
    }
}

// class for fdMap entries
private static final class MapEntry {
    final SelectionKeyImpl ski;
    long updateCount = 0;
    MapEntry(SelectionKeyImpl ski) {
        this.ski = ski;
    }
}
private final FdMap fdMap = new FdMap();
複製代碼
processDeregisterQueue解讀
  1. 接着經過上面WindowsSelectorImpl#doSelect展現源碼中<2> 處的 processDeregisterQueue()
    • cancelledKeys進行清除,遍歷cancelledKeys,並對每一個key進行deregister操做,而後從cancelledKeys集合中刪除,從keys集合與selectedKeys中刪除,以此來釋放引用,方便gc回收,
    • 其內調用implDereg方法,將會從channelArray中移除對應的Channel表明的SelectionKeyImpl,調整totalChannels和線程數,從mapkeys中移除SelectionKeyImpl,移除Channel上的SelectionKeyImpl並關閉Channel
    • 同時還發現該processDeregisterQueue()方法在調用poll方法先後都進行調用,這是確保可以正確處理在調用poll方法阻塞的這一段時間以內取消的鍵能被及時清理。
    • 最後,還會判斷這個cancelledKey所表明的channel是否打開和解除註冊,若是關閉並解除註冊,則應該將相應的文件描述符對應占用的資源給關閉掉。
/** * sun.nio.ch.SelectorImpl#processDeregisterQueue * Invoked by selection operations to process the cancelled-key set */
protected final void processDeregisterQueue() throws IOException {
    assert Thread.holdsLock(this);
    assert Thread.holdsLock(publicSelectedKeys);

    Set<SelectionKey> cks = cancelledKeys();
    synchronized (cks) {
        if (!cks.isEmpty()) {
            Iterator<SelectionKey> i = cks.iterator();
            while (i.hasNext()) {
                SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
                i.remove();

                // remove the key from the selector
                implDereg(ski);

                selectedKeys.remove(ski);
                keys.remove(ski);

                // remove from channel's key set
                deregister(ski);

                SelectableChannel ch = ski.channel();
                if (!ch.isOpen() && !ch.isRegistered())
                    ((SelChImpl)ch).kill();
            }
        }
    }
}
//sun.nio.ch.WindowsSelectorImpl#implDereg
@Override
protected void implDereg(SelectionKeyImpl ski) {
    assert !ski.isValid();
    assert Thread.holdsLock(this);

    if (fdMap.remove(ski) != null) {
        int i = ski.getIndex();
        assert (i >= 0);

        if (i != totalChannels - 1) {
            // Copy end one over it
            SelectionKeyImpl endChannel = channelArray[totalChannels-1];
            channelArray[i] = endChannel;
            endChannel.setIndex(i);
            pollWrapper.replaceEntry(pollWrapper, totalChannels-1, pollWrapper, i);
        }
        ski.setIndex(-1);

        channelArray[totalChannels - 1] = null;
        totalChannels--;
        if (totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) {
            totalChannels--;
            threadsCount--; // The last thread has become redundant.
        }
    }
}

//sun.nio.ch.SocketChannelImpl#kill
@Override
public void kill() throws IOException {
    synchronized (stateLock) {
        if (state == ST_KILLPENDING) {
            state = ST_KILLED;
            nd.close(fd);
        }
    }
}
//C:/Program Files/Java/jdk-11.0.1/lib/src.zip!/java.base/sun/nio/ch/SocketChannelImpl.java:1126
static {
    IOUtil.load();
    nd = new SocketDispatcher();
}
//sun.nio.ch.SocketDispatcher#close
void close(FileDescriptor fd) throws IOException {
    close0(fd);
}
複製代碼
adjustThreadsCount解讀
  1. 接着咱們來看到上面WindowsSelectorImpl#doSelect展現源碼中adjustThreadsCount()方法的調用。
    • 前面有提到若是totalChannels % MAX_SELECTABLE_FDS == 0,則多開一個線程處理selector。這裏就是根據分配的線程數量值來增長或減小線程,其實就是針對操做系統的最大select操做的文件描述符限制對線程個數進行調整。
    • 咱們來觀察所建線程作了什麼事情,即觀察SelectThreadrun方法實現。經過觀察其源碼能夠看到它首先是while (true),經過startLock.waitForStart(this)來控制該線程是否運行仍是等待,運行狀態的話,會進而調用subSelector.poll(index)(這個咱們後面內容詳細解讀),
    • 當此線程poll結束,並且相對於當前主線程假若有多條SelectThread子線程的話,當前這條SelectThread線程第一個結束poll的話,就調用finishLock.threadFinished()來通知主線程。在剛新建這個線程並調用其run方法的時候,此時lastRun = 0,在第一次啓動的時候sun.nio.ch.WindowsSelectorImpl.StartLock#runsCounter一樣爲0,因此會調用startLock.wait()進而進入等待狀態。

注意:

  • sun.nio.ch.WindowsSelectorImpl.StartLock一樣會判斷當前其所檢測的線程是否廢棄,廢棄的話就返回true,這樣被檢測線程也就能跳出其內run方法的while循環從而結束線程運行。
  • 在調整線程的時候(調用adjustThreadsCount方法)與Selector調用close方法會間接調用到sun.nio.ch.WindowsSelectorImpl#implClose,這兩個方法都會涉及到Selector線程的釋放,即調用sun.nio.ch.WindowsSelectorImpl.SelectThread#makeZombie
  • finishLock.threadFinished()會調用wakeup()方法來通知主線程,這裏,咱們能夠學到一個細節,若是線程正阻塞在select方法上,就能夠調用wakeup方法會使阻塞的選擇操做當即返回,經過Windows的相關實現,原理實際上是向pipesink端寫入了一個字節,source文件描述符就會處於就緒狀態,poll方法會返回,從而致使select方法返回。而在其餘solaris或者linux系統上其實採用系統調用pipe來完成管道的建立,至關於直接用了系統的管道。經過wakeup()相關實現還能夠看出,調用wakeup會設置interruptTriggered的標誌位,因此連續屢次調用wakeup的效果等同於一次調用,不會引發無所謂的bug出現。
//sun.nio.ch.WindowsSelectorImpl#adjustThreadsCount
// After some channels registered/deregistered, the number of required
// helper threads may have changed. Adjust this number.
private void adjustThreadsCount() {
    if (threadsCount > threads.size()) {
        // More threads needed. Start more threads.
        for (int i = threads.size(); i < threadsCount; i++) {
            SelectThread newThread = new SelectThread(i);
            threads.add(newThread);
            newThread.setDaemon(true);
            newThread.start();
        }
    } else if (threadsCount < threads.size()) {
        // Some threads become redundant. Remove them from the threads List.
        for (int i = threads.size() - 1 ; i >= threadsCount; i--)
            threads.remove(i).makeZombie();
    }
}

//sun.nio.ch.WindowsSelectorImpl.SelectThread
// Represents a helper thread used for select.
private final class SelectThread extends Thread {
    private final int index; // index of this thread
    final SubSelector subSelector;
    private long lastRun = 0; // last run number
    private volatile boolean zombie;
    // Creates a new thread
    private SelectThread(int i) {
        super(null, null, "SelectorHelper", 0, false);
        this.index = i;
        this.subSelector = new SubSelector(i);
        //make sure we wait for next round of poll
        this.lastRun = startLock.runsCounter;
    }
    void makeZombie() {
        zombie = true;
    }
    boolean isZombie() {
        return zombie;
    }
    public void run() {
        while (true) { // poll loop
            // wait for the start of poll. If this thread has become
            // redundant, then exit.
            if (startLock.waitForStart(this))
                return;
            // call poll()
            try {
                subSelector.poll(index);
            } catch (IOException e) {
                // Save this exception and let other threads finish.
                finishLock.setException(e);
            }
            // notify main thread, that this thread has finished, and
            // wakeup others, if this thread is the first to finish.
            finishLock.threadFinished();
        }
    }
}

// sun.nio.ch.WindowsSelectorImpl.FinishLock#threadFinished
// Each helper thread invokes this function on finishLock, when
// the thread is done with poll().
private synchronized void threadFinished() {
    if (threadsToFinish == threads.size()) { // finished poll() first
        // if finished first, wakeup others
        wakeup();
    }
    threadsToFinish--;
    if (threadsToFinish == 0) // all helper threads finished poll().
        notify();             // notify the main thread
}

//sun.nio.ch.WindowsSelectorImpl#wakeup
@Override
public Selector wakeup() {
    synchronized (interruptLock) {
        if (!interruptTriggered) {
            setWakeupSocket();
            interruptTriggered = true;
        }
    }
    return this;
}
//sun.nio.ch.WindowsSelectorImpl#setWakeupSocket
// Sets Windows wakeup socket to a signaled state.
private void setWakeupSocket() {
    setWakeupSocket0(wakeupSinkFd);
}
private native void setWakeupSocket0(int wakeupSinkFd);

JNIEXPORT void JNICALL Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this, jint scoutFd) {
    /* Write one byte into the pipe */
    const char byte = 1;
    send(scoutFd, &byte, 1, 0);
}
複製代碼
subSelector的poll方法解讀
  1. subSelector.poll() 是select的核心,由native函數poll0實現,並把pollWrapper.pollArrayAddress做爲參數傳給poll0readFdswriteFdsexceptFds數組用來保存底層select的結果,數組的第一個位置都是存放發生事件的socket的總數,其他位置存放發生事件的socket句柄fd。 咱們經過下面的代碼可知: 這個poll0()會監聽pollWrapper中的FD有沒有數據進出,這裏會形成IO阻塞,直到有數據讀寫事件發生。因爲pollWrapper中保存的也有ServerSocketChannelFD,因此只要ClientSocket發一份數據到ServerSocket,那麼poll0()就會返回;又因爲pollWrapper中保存的也有pipewrite端的FD,因此只要pipewrite端向FD發一份數據,也會形成poll0()返回;若是這兩種狀況都沒有發生,那麼poll0()就一直阻塞,也就是selector.select()會一直阻塞;若是有任何一種狀況發生,那麼selector.select()就會返回,全部在SelectThreadrun()裏要用while (true) {},這樣就能夠保證在selector接收到數據並處理完後繼續監聽poll();

能夠看出,NIO依然是阻塞式的IO,那麼它和BIO的區別究竟在哪呢。 其實它的區別在於阻塞的位置不一樣,BIO是阻塞在read方法(recvfrom),而NIO阻塞在select方法。那麼這樣作有什麼好處呢。若是單純的改變阻塞的位置,天然是沒有什麼變化的,但epoll等的實現的巧妙之處就在於,它利用回調機制,讓監聽可以只須要知曉哪些socket上的數據已經準備好了,只須要處理這些線程上面的數據就好了。採用BIO,假設有1000個鏈接,須要開1000個線程,而後有1000read的位置在阻塞(咱們在講解BIO部分已經經過Demo體現),採用NIO編程,只須要1個線程,它利用select的輪詢策略配合epoll的事件機制及紅黑樹數據結構,下降了其內部輪詢的開銷,同時極大的減少了線程上下文切換的開銷。

//sun.nio.ch.WindowsSelectorImpl.SubSelector
private final class SubSelector {
        private final int pollArrayIndex; // starting index in pollArray to poll
        // These arrays will hold result of native select().
        // The first element of each array is the number of selected sockets.
        // Other elements are file descriptors of selected sockets.
        // 保存發生read的FD
        private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
        // 保存發生write的FD
        private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
        //保存發生except的FD
        private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];

        private SubSelector() {
            this.pollArrayIndex = 0; // main thread
        }

        private SubSelector(int threadIndex) { // helper threads
            this.pollArrayIndex = (threadIndex + 1) * MAX_SELECTABLE_FDS;
        }

        private int poll() throws IOException{ // poll for the main thread
            return poll0(pollWrapper.pollArrayAddress,
                         Math.min(totalChannels, MAX_SELECTABLE_FDS),
                         readFds, writeFds, exceptFds, timeout);
        }

        private int poll(int index) throws IOException {
            // poll for helper threads
            return  poll0(pollWrapper.pollArrayAddress +
                     (pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
                     Math.min(MAX_SELECTABLE_FDS,
                             totalChannels - (index + 1) * MAX_SELECTABLE_FDS),
                     readFds, writeFds, exceptFds, timeout);
        }

        private native int poll0(long pollAddress, int numfds, int[] readFds, int[] writeFds, int[] exceptFds, long timeout);
             ...
}
複製代碼
updateSelectedKeys解讀
  1. 接下來將經過上面WindowsSelectorImpl#doSelect展現源碼中<5> 處的 updateSelectedKeys(action)來處理每一個channel準備就緒的信息。
  • 若是該通道的key還沒有在selectedKeys中存在,則將其添加到該集合中。
  • 若是該通道的key已經存在selectedKeys中,即這個channel存在所支持的ReadyOps就緒操做中必須包含一個這種操做(由(ski.nioReadyOps() & ski.nioInterestOps()) != 0來肯定),此時修改其ReadyOps爲當前所要進行的操做。而咱們以前看到的Consumer<SelectionKey>這個動做也是在此處進行。而由下面源碼可知,先前記錄在ReadyOps中的任何就緒信息在調用此action以前被丟棄掉,直接進行設定。
//sun.nio.ch.WindowsSelectorImpl#updateSelectedKeys
private int updateSelectedKeys(Consumer<SelectionKey> action) {
    updateCount++;
    int numKeysUpdated = 0;
    numKeysUpdated += subSelector.processSelectedKeys(updateCount, action);
    for (SelectThread t: threads) {
        numKeysUpdated += t.subSelector.processSelectedKeys(updateCount, action);
    }
    return numKeysUpdated;
}
//sun.nio.ch.SelectorImpl#processReadyEvents
protected final int processReadyEvents(int rOps, SelectionKeyImpl ski, Consumer<SelectionKey> action) {
    if (action != null) {
        ski.translateAndSetReadyOps(rOps);
        if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
            action.accept(ski);
            ensureOpen();
            return 1;
        }
    } else {
        assert Thread.holdsLock(publicSelectedKeys);
        if (selectedKeys.contains(ski)) {
            if (ski.translateAndUpdateReadyOps(rOps)) {
                return 1;
            }
        } else {
            ski.translateAndSetReadyOps(rOps);
            if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
                selectedKeys.add(ski);
                return 1;
            }
        }
    }
    return 0;
}
//sun.nio.ch.WindowsSelectorImpl.SubSelector#processSelectedKeys
private int processSelectedKeys(long updateCount, Consumer<SelectionKey> action) {
    int numKeysUpdated = 0;
    numKeysUpdated += processFDSet(updateCount, action, readFds,
                                    Net.POLLIN,
                                    false);
    numKeysUpdated += processFDSet(updateCount, action, writeFds,
                                    Net.POLLCONN |
                                    Net.POLLOUT,
                                    false);
    numKeysUpdated += processFDSet(updateCount, action, exceptFds,
                                    Net.POLLIN |
                                    Net.POLLCONN |
                                    Net.POLLOUT,
                                    true);
    return numKeysUpdated;
}

    /** * sun.nio.ch.WindowsSelectorImpl.SubSelector#processFDSet * updateCount is used to tell if a key has been counted as updated * in this select operation. * * me.updateCount <= updateCount */
private int processFDSet(long updateCount, Consumer<SelectionKey> action, int[] fds, int rOps, boolean isExceptFds) {
    int numKeysUpdated = 0;
    for (int i = 1; i <= fds[0]; i++) {
        int desc = fds[i];
        if (desc == wakeupSourceFd) {
            synchronized (interruptLock) {
                interruptTriggered = true;
            }
            continue;
        }
        MapEntry me = fdMap.get(desc);
        // If me is null, the key was deregistered in the previous
        // processDeregisterQueue.
        if (me == null)
            continue;
        SelectionKeyImpl sk = me.ski;

        // The descriptor may be in the exceptfds set because there is
        // OOB data queued to the socket. If there is OOB data then it
        // is discarded and the key is not added to the selected set.
        if (isExceptFds &&
            (sk.channel() instanceof SocketChannelImpl) &&
            discardUrgentData(desc))
        {
            continue;
        }
        //咱們應該關注的
        int updated = processReadyEvents(rOps, sk, action);
        if (updated > 0 && me.updateCount != updateCount) {
            me.updateCount = updateCount;
            numKeysUpdated++;
        }
    }
    return numKeysUpdated;
}

複製代碼

至此,關於Selector的內容就暫時告一段落,在下一篇中,我會針對Java NIO Buffer進行相關解讀。

相關文章
相關標籤/搜索