NIO-WindowsSelectorImpl源碼分析

前言

原本是想學習Netty的,可是Netty是一個NIO框架,所以在學習netty以前,仍是先梳理一下NIO的知識。經過剖析源碼理解NIO的設計原理。html

本系列文章針對的是JDK1.8.0.161的源碼。java

NIO-Selector源碼分析Selector的功能和建立過程進行了分析,本篇對Windows下的WindowsSelectorImpl源碼實現進行詳細講解。windows

初始化WindowsSelectorProvider

上一篇文章提到,若沒有進行配置時,默認經過sun.nio.ch.DefaultSelectorProvider.create()建立SelectorProvider。 Windows下的代碼路徑在jdk\src\windows\classes\sun\nio\ch\DefaultSelectorProvider.java。在其內部經過實際是建立了一個WindowsSelectorProvider)數組

建立WindowsSelectorImpl

WindowsSelectorProvider是用於建立WindowsSelectorImpl的。緩存

Selector.Open()->
SelectorProvider.provider()->
sun.nio.ch.DefaultSelectorProvider.create()->
new WindowsSelectorImpl(this)->
WindowsSelectorProvider.openSelector()
public class WindowsSelectorProvider extends SelectorProviderImpl {

    public AbstractSelector openSelector() throws IOException {
        return new WindowsSelectorImpl(this);
    }
}
複製代碼

WindowsSelectorImpl結構

在詳細講解WindowsSelectorImpl源碼以前,先了解WindowsSelectorImpl的大體代碼結構。微信

在其內部有幾個主要的數據結構和屬性。數據結構

名稱 做用
SelectionKeyImpl[] channelArray 存放註冊的SelectionKey
PollArrayWrapper pollWrapper 底層的本機輪詢數組包裝對象,用於存放Socket文件描述符和事件掩碼
List<SelectThread> threads 輔助線程,多個線程有助於提升高併發時的性能
Pipe wakeupPipe 用於喚醒輔助線程
FdMap fdMap 保存文件描述符和SelectionKey的映射關係
SubSelector subSelector 調用JNI的poll和處理就緒的SelectionKey
StartLock startLock 新增的輔助線程使用該鎖等待主線程的開始信號
FinishLock finishLock 主線程用該鎖等待全部輔助線程執行完畢

SelectionKeyImpl

用於存放Channel,Selector以及存放Channel註冊時的事件掩碼。多線程

  • 在註冊的時候會建立SelectionKeyImpl
  • SelectionKeyImpl加入到SelectionKeyImpl[] channelArray
  • 將文件句柄和SelectionKeyImpl的對應關係加入到FdMap fdMap
  • 將key的文件描述符保存到PollArrayWrapper pollWrapper中。

PollArrayWrapper

PollArrayWrapper用於存放文件描述符的文件描述符和事件掩碼的native數組。相關的文件描述符的結構以下圖:併發

其中每項的結構以下:app

名稱 大小 說明
SOCKET fd 4字節 存放Socket文件句柄
short events 2字節 等待的事件掩碼
short reevents 2字節 實際發生的事件掩碼,暫時美有用到

如上所示,每項爲8字節,即爲SIZE_POLLFD的值,目前NIO實際只用前兩個字段。

class PollArrayWrapper {
    private AllocatedNativeObject pollArray; // The fd array
    long pollArrayAddress; // pollArrayAddress
    static short SIZE_POLLFD = 8; // sizeof pollfd struct
    private int size; // Size of the pollArray
    PollArrayWrapper(int newSize) {
        int allocationSize = newSize * SIZE_POLLFD;
        pollArray = new AllocatedNativeObject(allocationSize, true);
        pollArrayAddress = pollArray.address();
        this.size = newSize;
    }
...
}
複製代碼

PollArrayWrapper 內部使用 AllocatedNativeObject對象建立的堆外(native)內存對象。 將數組的首地址保存到pollArrayAddress中,在調用Poll的時候須要傳遞該參數給JNI

PollArrayWrapper暴露了讀寫FD和Event的方法供WindowsSelectorImpl使用。

void putDescriptor(int i, int fd) {
    pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);
}

void putEventOps(int i, int event) {
    pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event);
}

int getEventOps(int i) {
    return pollArray.getShort(SIZE_POLLFD * i + EVENT_OFFSET);
}

int getDescriptor(int i) {
    return pollArray.getInt(SIZE_POLLFD * i + FD_OFFSET);
}

複製代碼

SelectThread

因爲select最大一次性獲取1024個文件描述符。所以爲了提升poll的性能 WindowsSelectorImpl底層 經過引入多個輔助線程的方式實現多線程poll以提升高併發時的性能問題。 咱們先看一下注冊的邏輯

protected void implRegister(SelectionKeyImpl ski) {
    synchronized (closeLock) {
        if (pollWrapper == null)
            throw new ClosedSelectorException();
        //判斷是否須要擴容隊列以及添加輔助線程
        growIfNeeded();
        //保存到緩存中
        channelArray[totalChannels] = ski;
        //保存在數組中的位置
        ski.setIndex(totalChannels);
        //保存文件描述符和SelectionKeyImpl的映射關係到FDMap
        fdMap.put(ski);
        //保存到keys中
        keys.add(ski);
        //保存文件描述符和事件到native數組中
        pollWrapper.addEntry(totalChannels, ski);
        totalChannels++;
    }
}
複製代碼

在註冊以前會先會判斷當前註冊的Channel數量 是否達到須要啓動輔助線程的閾值。若是達到閾值則須要擴容pollWrapper數組,同時還要 wakeupSourceFd加入到擴容後的第一個位置 (具體做用下面會講解)。

private void growIfNeeded() {
    if (channelArray.length == totalChannels) {
        //channel數組已滿,擴容兩倍
        int newSize = totalChannels * 2; // Make a larger array
        SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
        System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1);
        channelArray = temp;
        //文件描述符數組擴容
        pollWrapper.grow(newSize);
    }
    //達到最大文件描述符數量時添加輔助線程
    if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed
        //將喚醒的文件描述符加入到擴容後的第一個位置。
        pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels);
        totalChannels++;
        //添加線程數
        threadsCount++;
    }
}
複製代碼

擴容PollArrayWrapper

pollWrapper.grow(newSize);
void grow(int newSize) {
    //建立新的數組
    PollArrayWrapper temp = new PollArrayWrapper(newSize);
    for (int i = 0; i < size; i++)
    //將原來的數組的內容存放到新的數組中
        replaceEntry(this, i, temp, i);
    //釋放原來的數組
    pollArray.free();
    //更新引用
    pollArray = temp.pollArray;
    //更新大小
    this.size = temp.size;
    //更新地址
    pollArrayAddress = pollArray.address();
}
複製代碼

擴容完成時,須要添加一個輔助線程以並行的處理全部文件描述符。主線程處理前1024個文件描述符,第二個輔助線程處理1025到2048的文件描述符,以此類推。 這樣使得主線程調用poll的時候,經過多線程並行執行一次性獲取到全部的已就緒的文件描述符,從而提升在高併發時的poll的性能。

每1024個PollFD的第一個句柄都要設置爲wakeupSourceFd,所以在擴容的時候也須要將新的位置的第一個設置爲wakeupSourceFd,該線程的目的是爲了喚醒輔助線程 。當多個線程阻塞在Poll,若此時主線程已經處理完成,則須要等待全部輔助線程完成,經過向wakeupSourceFd發送信號以激活Poll不在阻塞。

如今咱們知道了windows下poll多線程的使用方法,由於多線程poll還須要其餘的數據結構支持同步,具體的多線程執行邏輯咱們下面再討論。

FdMap

FDMap只是爲了保存文件描述符句柄和SelectionKey的關係,前面咱們提到了PollFD的數據結構包含了文件描述符句柄信息,所以咱們能夠經過文件描述符句柄從FdMap中獲取到對應的SelectionKey

private final static class FdMap extends HashMap<Integer, MapEntry> {
    static final long serialVersionUID = 0L;
    private MapEntry get(int desc) {
        return get(new Integer(desc));
    }
    private MapEntry put(SelectionKeyImpl ski) {
        return put(new Integer(ski.channel.getFDVal()), new MapEntry(ski));
    }
    private MapEntry remove(SelectionKeyImpl ski) {
        Integer fd = new Integer(ski.channel.getFDVal());
        MapEntry x = get(fd);
        if ((x != null) && (x.ski.channel == ski.channel))
            return remove(fd);
        return null;
    }
}
複製代碼

SubSelector

SubSelector封裝了調用JNI poll的邏輯,以及獲取就緒SelectionKey的方法。

主線程和每個子線程都有一個SubSelector,其內存保存了poll獲取到的可讀文件描述符,可寫文件描述符以及異常的文件描述符。這樣每一個線程就有本身單獨的就緒文件描述符數組。

private final int pollArrayIndex;
private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];

複製代碼

pollArrayIndex記錄了當前SubSelector的序號,在調用poll的時候,須要將文件描述符數組的地址傳遞給JNI中,因爲咱們有多個線程一塊兒調用poll,且每一個線程處理1024個Channel。經過序號和數組的地址計算當前SubSelector所負責哪些通道。

private int poll() throws IOException{ // poll for the main thread
    return poll0(pollWrapper.pollArrayAddress,
                    Math.min(totalChannels, MAX_SELECTABLE_FDS),
                    readFds, writeFds, exceptFds, timeout);
}

private int poll(int index) throws IOException {
    // poll for helper threads
    return  poll0(pollWrapper.pollArrayAddress +
                (pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
                Math.min(MAX_SELECTABLE_FDS,
                        totalChannels - (index + 1) * MAX_SELECTABLE_FDS),
                readFds, writeFds, exceptFds, timeout);
}

private native int poll0(long pollAddress, int numfds, int[] readFds, int[] writeFds, int[] exceptFds, long timeout);

複製代碼

在主線程調用poll以後,會獲取到已就緒的文件描述符(包含可讀、可寫、異常)。經過調用processSelectedKeys將就緒的文件描述符對應的SelectorKey加入到selectedKeys中。這樣咱們外部就能夠調用到全部就緒的SelectorKey進行遍歷處理。

private int processSelectedKeys(long updateCount) {
    int numKeysUpdated = 0;
    numKeysUpdated += processFDSet(updateCount, readFds,
                                    Net.POLLIN,
                                    false);
    numKeysUpdated += processFDSet(updateCount, writeFds,
                                    Net.POLLCONN |
                                    Net.POLLOUT,
                                    false);
    numKeysUpdated += processFDSet(updateCount, exceptFds,
                                    Net.POLLIN |
                                    Net.POLLCONN |
                                    Net.POLLOUT,
                                    true);
    return numKeysUpdated;
}
複製代碼

可讀文件描述符,可寫文件描述符以及異常文件描述符的處理邏輯都是同樣的,調用processFDSet處理更新SelectorKey的就緒事件。這裏會傳入文件描述符的數組。須要注意的是文件描述符第一個元素是數組的長度。

private int processFDSet(long updateCount, int[] fds, int rOps, boolean isExceptFds) {
    int numKeysUpdated = 0;
    //1. 遍歷文件描述符數組
    for (int i = 1; i <= fds[0]; i++) {
        //獲取文件描述符句柄值
        int desc = fds[i];
        //2. 判斷當前文件描述符是不是用於喚醒的文件描述
        if (desc == wakeupSourceFd) {
            synchronized (interruptLock) {
                interruptTriggered = true;
            }
            continue;
        }
        //3. 獲取文件描述符句柄對應的SelectionKey的映射值
        MapEntry me = fdMap.get(desc);
        // 4. 若爲空,則表示已經被取消。
        if (me == null)
            continue;
        SelectionKeyImpl sk = me.ski;

        // 5. 丟棄OOD數據(緊急數據)
        if (isExceptFds &&
            (sk.channel() instanceof SocketChannelImpl) &&
            discardUrgentData(desc))
        {
            continue;
        }
        //6. 判斷key是否已經就緒,若已就緒,則將當前操做累加到原來的操做上,好比原來寫事件就緒,如今讀事件就緒,就須要更新該key讀寫就緒
        if (selectedKeys.contains(sk)) { // Key in selected set
        //clearedCount 和 updateCount用於避免同一個key的事件設置屢次,由於同一個文件描述符可能在可讀文件描述符數組也可能在異常文件描述符數組中。
            if (me.clearedCount != updateCount) {
                if (sk.channel.translateAndSetReadyOps(rOps, sk) &&
                    (me.updateCount != updateCount)) {
                    me.updateCount = updateCount;
                    numKeysUpdated++;
                }
            } else { // The readyOps have been set; now add
                if (sk.channel.translateAndUpdateReadyOps(rOps, sk) &&
                    (me.updateCount != updateCount)) {
                    me.updateCount = updateCount;
                    numKeysUpdated++;
                }
            }
            me.clearedCount = updateCount;
        } else { // Key is not in selected set yet
        //key原來未就緒,將key加入selectedKeys中
            if (me.clearedCount != updateCount) {
                sk.channel.translateAndSetReadyOps(rOps, sk);
                if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
                    selectedKeys.add(sk);
                    me.updateCount = updateCount;
                    numKeysUpdated++;
                }
            } else { // The readyOps have been set; now add
                sk.channel.translateAndUpdateReadyOps(rOps, sk);
                if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
                    selectedKeys.add(sk);
                    me.updateCount = updateCount;
                    numKeysUpdated++;
                }
            }
            me.clearedCount = updateCount;
        }
    }
    return numKeysUpdated;
}

複製代碼
  1. 首先忽略wakeupSourceFd,前面說了該文件描述符用於喚醒。
  2. 過濾fdMap不存在的文件描述符,這些文件描述符已經被取消了。
  3. 忽略OOB(緊急)數據,這些數據須要調用discardUrgentData讀取並忽略。
  4. 根據key是否在SelectorKeys中決定是設置事件掩碼仍是更新事件掩碼。

多線程Poll

如今大部分數據結構都已經介紹了,在談論Pipe、StartLock和FinishLock以前,是時候引入多線程Poll功能了,在談論多線程時,會對上述三個數據結構和功能進行詳細說明。

首先咱們先看一下建立WindowsSelectorImpl作了什麼

WindowsSelectorImpl(SelectorProvider sp) throws IOException {
    super(sp);
    pollWrapper = new PollArrayWrapper(INIT_CAP);
    wakeupPipe = Pipe.open();
    wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();

    // Disable the Nagle algorithm so that the wakeup is more immediate
    SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
    (sink.sc).socket().setTcpNoDelay(true);
    wakeupSinkFd = ((SelChImpl)sink).getFDVal();

    pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
複製代碼
  1. 首先建立了一個默認8個長度(8*8字節)的文件描述符數組PollArrayWrapper
  2. 建立一個Pipe,Pipe咱們以前討論過是一個單向通信管道。
  3. 獲取Pipe的源端和目標端的文件描述符句柄,該句柄用於激活線程。
  4. wakeupSourceFd存到PollArrayWapper每1024個元素的第一個位置。使得每一個線程都能被wakeupSourceFd喚醒。

因爲select最大支持1024個句柄,這裏第一個文件描述符是wakeupSourceFd,因此一個線程實際最多併發處理1023個socket文件描述符。

pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
void addWakeupSocket(int fdVal, int index) {
    putDescriptor(index, fdVal);
    putEventOps(index, Net.POLLIN);
}
複製代碼

如今咱們看一下doSelect邏輯

protected int doSelect(long timeout) throws IOException {
    if (channelArray == null)
            throw new ClosedSelectorException();
    this.timeout = timeout; // set selector timeout
    //1. 刪除取消的key
    processDeregisterQueue();
    if (interruptTriggered) {
        resetWakeupSocket();
        return 0;
    }
    //2. 調整線程數 ,等待運行
    adjustThreadsCount();
    //3. 設置輔助線程數
    finishLock.reset(); 
    //4. 開始運行新增的輔助線程
    startLock.startThreads();
    
    try {
        begin();
        try {
            //5. 獲取就緒文件描述符
            subSelector.poll();
        } catch (IOException e) {
            finishLock.setException(e); // Save this exception
        }
        //6. 等待全部輔助線程完成
        if (threads.size() > 0)
            finishLock.waitForHelperThreads();
        } finally {
            end();
        }
    // Done with poll(). Set wakeupSocket to nonsignaled for the next run.
    finishLock.checkForException();
    //7. 再次檢查刪除取消的key
    processDeregisterQueue();
    //8. 將就緒的key加入到selectedKeys中
    int updated = updateSelectedKeys();
    // 完成,重置喚醒標記下次在運行。
    resetWakeupSocket();
    return updated;
}
複製代碼
  1. 刪除取消key,當channel關閉時,對應的Key會被取消,被取消的key會加入到cancelledKeys中。
protected final void implCloseChannel() throws IOException {
    implCloseSelectableChannel();
    synchronized (keyLock) {
        int count = (keys == null) ? 0 : keys.length;
        for (int i = 0; i < count; i++) {
            SelectionKey k = keys[i];
            if (k != null)
                k.cancel();
        }
    }
}
public final void cancel() {
    ...
    ((AbstractSelector)selector()).cancel(this);
    ...
}
void cancel(SelectionKey k) {                       // package-private
    synchronized (cancelledKeys) {
        cancelledKeys.add(k);
    }
}
複製代碼

調用processDeregisterQueue進行註銷。

processDeregisterQueue();
//遍歷全部已取消的key,取消他們
void processDeregisterQueue() throws IOException {
    // Precondition: Synchronized on this, keys, and selectedKeys
    Set<SelectionKey> cks = cancelledKeys();
    synchronized (cks) {
        if (!cks.isEmpty()) {
            //遍歷全部key
            Iterator<SelectionKey> i = cks.iterator();
            while (i.hasNext()) {
                SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
                try {
                    //註銷key
                    implDereg(ski);
                } catch (SocketException se) {
                    throw new IOException("Error deregistering key", se);
                } finally {
                    i.remove();
                }
            }
        }
    }
}
protected void implDereg(SelectionKeyImpl ski) throws IOException{
    int i = ski.getIndex();
    assert (i >= 0);
    synchronized (closeLock) {
        if (i != totalChannels - 1) {
            // 把最後一個通道複製到取消key所在的位置。
            SelectionKeyImpl endChannel = channelArray[totalChannels-1];
            channelArray[i] = endChannel;
            endChannel.setIndex(i);
            pollWrapper.replaceEntry(pollWrapper, totalChannels - 1,
                                                            pollWrapper, i);
        }
        ski.setIndex(-1);
    }
    //將最後一個通道清空。
    channelArray[totalChannels - 1] = null;
    totalChannels--;
    //判斷是否須要減小一個輔助線程。
    if ( totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) {
        totalChannels--;
        threadsCount--; // The last thread has become redundant.
    }
    //清除對應的緩存。
    fdMap.remove(ski); // Remove the key from fdMap, keys and selectedKeys
    keys.remove(ski);
    selectedKeys.remove(ski);
    //設置key無效
    deregister(ski);
    SelectableChannel selch = ski.channel();
    if (!selch.isOpen() && !selch.isRegistered())
    //關閉文件描述符
        ((SelChImpl)selch).kill();
}
//將全部key都設置爲無效
protected final void deregister(AbstractSelectionKey key) {
        ((AbstractSelectableChannel)key.channel()).removeKey(key);
    }
    void removeKey(SelectionKey k) {                    // package-private
        synchronized (keyLock) {
            for (int i = 0; i < keys.length; i++)
                if (keys[i] == k) {
                    keys[i] = null;
                    keyCount--;
                }
            //將key設置爲無效
            ((AbstractSelectionKey)k).invalidate();
        }
    }

複製代碼
  • 取消時首先會將該Key的文件描述符的PollFD項從pollWrapper中移除。
  • 將key從channelArray中刪除。
  • 若總的註冊通道數達到了減少線程的閾值,則減少一個線程。
  • 清理fdMapkeysselectedKeys數據緩存。
  • 設置key無效
  • 關閉文件描述符

((SelChImpl)selch).kill();是在各個Channel中實現的,以SocketChannel爲例,最終會調用nd.close(fd);關閉對應的文件描述符

  1. 調整輔助線程數
private void adjustThreadsCount() {
    //當線程大於實際線程,建立更多線程
    if (threadsCount > threads.size()) {
        // More threads needed. Start more threads.
        for (int i = threads.size(); i < threadsCount; i++) {
            SelectThread newThread = new SelectThread(i);
            threads.add(newThread);
            //設置爲守護線程
            newThread.setDaemon(true);
            newThread.start();
        }
    } else if (threadsCount < threads.size()) {
        // 當線程小於實際線程,移除線程。
        for (int i = threads.size() - 1 ; i >= threadsCount; i--)
            threads.remove(i).makeZombie();
    }
}

複製代碼

在建立新的線程時,會記錄上一次運行的數量保存到lastRun變量中

private SelectThread(int i) {
        this.index = i;
        this.subSelector = new SubSelector(i);
        //make sure we wait for next round of poll
        this.lastRun = startLock.runsCounter;
    }
複製代碼

當線程啓動時會等待主線程激活

public void run() {
    while (true) { // poll loop
        //等待主線程信號激活
        if (startLock.waitForStart(this))
            return;
        // call poll()
        try {
            subSelector.poll(index);
        } catch (IOException e) {
            // Save this exception and let other threads finish.
            finishLock.setException(e);
        }
        // 通知主線程完成.
        finishLock.threadFinished();
    }
}
複製代碼

經過startLock等待主線程的開始信號。若當前線程是新啓動的線程,則runsCounter == thread.lastRun爲真,此時新的線程須要等待主線程調用啓動。

startLock.waitForStart(this)
private synchronized boolean waitForStart(SelectThread thread) {
        while (true) {
            while (runsCounter == thread.lastRun) {
                try {
                    startLock.wait();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            if (thread.isZombie()) { // redundant thread
                return true; // will cause run() to exit.
            } else {
                thread.lastRun = runsCounter; // update lastRun
                return false; // will cause run() to poll.
            }
        }
    }
}
複製代碼
  1. 設置輔助線程數量

記錄當前輔助線程數量,下次新增的輔助線程須要等待主線程通知啓動。

finishLock.reset(); 
private void reset() {
    threadsToFinish = threads.size(); // helper threads
}
複製代碼
  1. 開始運行新增的輔助線程
startLock.startThreads();
private synchronized void startThreads() {
    runsCounter++; // next run
    notifyAll(); // 通知全部輔助線程繼續執行,
}
複製代碼
  1. 獲取已就緒的文件描述符
subSelector.poll();
//主線程調用
private int poll() throws IOException{ 
    return poll0(pollWrapper.pollArrayAddress,
                    Math.min(totalChannels, MAX_SELECTABLE_FDS),
                    readFds, writeFds, exceptFds, timeout);
}
//輔助線程調用
private int poll(int index) throws IOException {

    return  poll0(pollWrapper.pollArrayAddress +
                (pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
                Math.min(MAX_SELECTABLE_FDS,
                        totalChannels - (index + 1) * MAX_SELECTABLE_FDS),
                readFds, writeFds, exceptFds, timeout);
}
複製代碼

輔助線程和主線程調用的區別就是存放PollFD的位置變化,每一個線程會有1024個PollFD(8B)的位置存放PollFD。這樣使得多個線程的數據內存分離互不影響。 下面看一下JNI的poll0作了什麼處理。下面羅略了主要的邏輯

typedef struct {
    jint fd;
    jshort events;
} pollfd;

Java_sun_nio_ch_WindowsSelectorImpl_00024SubSelector_poll0(JNIEnv *env, jobject this,
                                   jlong pollAddress, jint numfds,
                                   jintArray returnReadFds, jintArray returnWriteFds,
                                   jintArray returnExceptFds, jlong timeout)
{
    DWORD result = 0;
    pollfd *fds = (pollfd *) pollAddress;
    int i;
    FD_SET readfds, writefds, exceptfds;
    struct timeval timevalue, *tv;
    static struct timeval zerotime = {0, 0};
    ...
    /* Call select */
    if ((result = select(0 , &readfds, &writefds, &exceptfds, tv))
                                                             == SOCKET_ERROR) {
        //當出現錯誤時,變量每一個socket獲取它的就緒狀態
        FD_SET errreadfds, errwritefds, errexceptfds;
        ...
        for (i = 0; i < numfds; i++) {
            errreadfds.fd_count = 0;
            errwritefds.fd_count = 0;
            if (fds[i].events & POLLIN) {
               errreadfds.fd_array[0] = fds[i].fd;
               errreadfds.fd_count = 1;
            }
            if (fds[i].events & (POLLOUT | POLLCONN))
            {
                errwritefds.fd_array[0] = fds[i].fd;
                errwritefds.fd_count = 1;
            }
            errexceptfds.fd_array[0] = fds[i].fd;
            errexceptfds.fd_count = 1;
            //遍歷每一個socket,探測它的狀態
            /* call select on the i-th socket */
            if (select(0, &errreadfds, &errwritefds, &errexceptfds, &zerotime)
                                                             == SOCKET_ERROR) {
                /* This socket causes an error. Add it to exceptfds set */
                exceptfds.fd_array[exceptfds.fd_count] = fds[i].fd;
                exceptfds.fd_count++;
            } else {
                
                ...
            }
        }
        }
    }

    /* Return selected sockets. */
    /* Each Java array consists of sockets count followed by sockets list */
...
    (*env)->SetIntArrayRegion(env, returnReadFds, 0,
                              readfds.fd_count + 1, (jint *)&readfds);

    (*env)->SetIntArrayRegion(env, returnWriteFds, 0,
                              writefds.fd_count + 1, (jint *)&writefds);
    (*env)->SetIntArrayRegion(env, returnExceptFds, 0,
                              exceptfds.fd_count + 1, (jint *)&exceptfds);
    return 0;
}

複製代碼
  • 首先會經過pollfd *fds = (pollfd *) pollAddress;將pollAddress的地址轉換爲polldf的數組結構。

這裏會自動內存對齊,pollfd一共只有6個字節,第一個是int類型的文件描述符句柄,第二個是short類型的等待事件掩碼值。第二個short後會填充2B,所以每一個pollFD是8B。而實際後面2字節用於存放實際發生事件的事件掩碼。

  • 經過調用Win32API的select執行實際的操做獲取就緒的文件描述符。當socket收到OOB(緊急)數據時,會產生異常。此時須要遍歷全部文件描述符,以肯定是哪一個socket接收到OOB數據。從而正常處理。上面也提到過OOB數據會經過調用discardUrgentData進行清理。
JNIEXPORT jboolean JNICALL Java_sun_nio_ch_WindowsSelectorImpl_discardUrgentData(JNIEnv* env, jobject this, jint s) {
    char data[8];
    jboolean discarded = JNI_FALSE;
    int n;
    do {
        //讀取MSG_OOB數據
        n = recv(s, (char*)&data, sizeof(data), MSG_OOB);
        if (n > 0) {
            //讀取到設置標記爲true
            discarded = JNI_TRUE;
        }
    } while (n > 0);
    return discarded;
}
複製代碼

若是timeval爲{0,0},則select()當即返回,這可用於探詢所選套接口的狀態。若是處於這種狀態,則select()調用可認爲是非阻塞的,且一切適用於非阻塞調用的假設都適用於它。

  • 當獲取到全部的就緒的文件描述符時,須要保存到返回結果中,同時讀寫和異常的返回結果的數組第一個爲就緒的長度值。

  • 等待全部輔助線程完成,當主線程完成時會當即調用wakeupwakeupSourceFd發生數據以觸發輔助線程喚醒。輔助線程喚醒後也會調用wakeup一次。當輔助線程都被喚醒後就會通知主線程。

if (threads.size() > 0)
    finishLock.waitForHelperThreads();
private synchronized void waitForHelperThreads() {
        if (threadsToFinish == threads.size()) {
            // no helper threads finished yet. Wakeup them up.
            wakeup();
        }
        while (threadsToFinish != 0) {
            try {
                finishLock.wait();
            } catch (InterruptedException e) {
                // Interrupted - set interrupted state.
                Thread.currentThread().interrupt();
            }
        }
    }
private synchronized void threadFinished() {
        if (threadsToFinish == threads.size()) { // finished poll() first
            // if finished first, wakeup others
            wakeup();
        }
        threadsToFinish--;
        if (threadsToFinish == 0) // all helper threads finished poll().
            notify();             // notify the main thread
    }
    
複製代碼

若輔助線接收到數據,則它須要調用wakeup來喚醒其餘輔助線程,這樣使得主線程火輔助線程至少能調用一次wakeup激活其餘輔助線程。wakeup內部會調用setWakeupSocketwakeupSourceFd發生一個信號。

public Selector wakeup() {
    synchronized (interruptLock) {
        if (!interruptTriggered) {
            setWakeupSocket();
            interruptTriggered = true;
        }
    }
    return this;
}
//發生一個字節數據喚醒wakeupsocket
Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this,
                                                jint scoutFd)
{
    /* Write one byte into the pipe */
    const char byte = 1;
    send(scoutFd, &byte, 1, 0);
}
複製代碼

當主線被激活時,須要調用resetWakeupSocketwakeupSourceFd的數據讀取出來。

private void resetWakeupSocket() {
        synchronized (interruptLock) {
            if (interruptTriggered == false)
                return;
            resetWakeupSocket0(wakeupSourceFd);
            interruptTriggered = false;
        }
    }
    //讀取wakeupsocket的數據。
Java_sun_nio_ch_WindowsSelectorImpl_resetWakeupSocket0(JNIEnv *env, jclass this,
                                            jint scinFd)
{
    char bytes[WAKEUP_SOCKET_BUF_SIZE];
    long bytesToRead;

    /* 獲取數據大小 */
    ioctlsocket (scinFd, FIONREAD, &bytesToRead);
    if (bytesToRead == 0) {
        return;
    }
    /* 從緩衝區讀取全部數據 */
    if (bytesToRead > WAKEUP_SOCKET_BUF_SIZE) {
        char* buf = (char*)malloc(bytesToRead);
        recv(scinFd, buf, bytesToRead, 0);
        free(buf);
    } else {
        recv(scinFd, bytes, WAKEUP_SOCKET_BUF_SIZE, 0);
    }
}
複製代碼

ioctlsocket()是一個計算機函數,功能是控制套接口的模式。可用於任一狀態的任一套接口。它用於獲取與套接口相關的操做參數,而與具體協議或通信子系統無關。第二個參數時對socket的操做命令

  1. 再次調用刪除取消的key
  2. 將就緒的key加入到selectKeys中,有多個線程會將全部線程的就緒key加入到selectKeys中。
int updated = updateSelectedKeys();
private int updateSelectedKeys() {
    updateCount++;
    int numKeysUpdated = 0;
    numKeysUpdated += subSelector.processSelectedKeys(updateCount);
    for (SelectThread t: threads) {
        numKeysUpdated += t.subSelector.processSelectedKeys(updateCount);
    }
    return numKeysUpdated;
}
複製代碼

若key首次被加入,則會調用translateAndSetReadyOps,若key已經在selectKeys中,則會調用translateAndUpdateReadyOps。這兩個方法都是調用translateReadyOps,translateReadyOps操做會將已就緒的操做保存。

public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) {
    return translateReadyOps(ops, sk.nioReadyOps(), sk);
}

public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {
    return translateReadyOps(ops, 0, sk);
}

複製代碼

關閉WindowsSelectorImpl

關閉WindowsSelectorImpl時會將全部註冊的通道一同關閉

protected void implClose() throws IOException {
    synchronized (closeLock) {
        if (channelArray != null) {
            if (pollWrapper != null) {
                // prevent further wakeup
                synchronized (interruptLock) {
                    interruptTriggered = true;
                }
                wakeupPipe.sink().close();
                wakeupPipe.source().close();
                //關閉全部channel
                for(int i = 1; i < totalChannels; i++) { // Deregister channels
                    if (i % MAX_SELECTABLE_FDS != 0) { // skip wakeupEvent
                        deregister(channelArray[i]);
                        SelectableChannel selch = channelArray[i].channel();
                        if (!selch.isOpen() && !selch.isRegistered())
                            ((SelChImpl)selch).kill();
                    }
                }
                //釋放數據
                pollWrapper.free();
                pollWrapper = null;
                selectedKeys = null;
                channelArray = null;
                //釋放輔助線程
                for (SelectThread t: threads)
                        t.makeZombie();
                //喚醒輔助線程使其退出。
                startLock.startThreads();
            }
        }
    }
}
複製代碼

總結

本文對WindowsSelectorImpl的代碼實現進行詳細解析。下一篇將對Linux下的EpollSelectorImpl的實現繼續講解。

相關文獻

  1. C語言-----結構體內存對齊
  2. windows下的select函數
  3. OOB(out-of-band data)
  4. JAVA NIO 選擇器

20191127212134.png

  • 微信掃一掃二維碼關注訂閱號傑哥技術分享
  • 出處:www.cnblogs.com/Jack-Blog/p…
  • 做者:傑哥很忙
  • 本文使用「CC BY 4.0」創做共享協議。歡迎轉載,請在明顯位置給出出處及連接。
相關文章
相關標籤/搜索