原本是想學習Netty的,可是Netty是一個NIO框架,所以在學習netty以前,仍是先梳理一下NIO的知識。經過剖析源碼理解NIO的設計原理。html
本系列文章針對的是JDK1.8.0.161的源碼。java
NIO-Selector源碼分析對Selector
的功能和建立過程進行了分析,本篇對Windows下的WindowsSelectorImpl
源碼實現進行詳細講解。windows
上一篇文章提到,若沒有進行配置時,默認經過sun.nio.ch.DefaultSelectorProvider.create()
建立SelectorProvider
。 Windows下的代碼路徑在jdk\src\windows\classes\sun\nio\ch\DefaultSelectorProvider.java
。在其內部經過實際是建立了一個WindowsSelectorProvider)
。數組
WindowsSelectorProvider
是用於建立WindowsSelectorImpl
的。緩存
Selector.Open()->
SelectorProvider.provider()->
sun.nio.ch.DefaultSelectorProvider.create()->
new WindowsSelectorImpl(this)->
WindowsSelectorProvider.openSelector()
public class WindowsSelectorProvider extends SelectorProviderImpl {
public AbstractSelector openSelector() throws IOException {
return new WindowsSelectorImpl(this);
}
}
複製代碼
在詳細講解WindowsSelectorImpl
源碼以前,先了解WindowsSelectorImpl
的大體代碼結構。微信
在其內部有幾個主要的數據結構和屬性。數據結構
名稱 | 做用 |
---|---|
SelectionKeyImpl[] channelArray | 存放註冊的SelectionKey |
PollArrayWrapper pollWrapper | 底層的本機輪詢數組包裝對象,用於存放Socket文件描述符和事件掩碼 |
List<SelectThread> threads | 輔助線程,多個線程有助於提升高併發時的性能 |
Pipe wakeupPipe | 用於喚醒輔助線程 |
FdMap fdMap | 保存文件描述符和SelectionKey的映射關係 |
SubSelector subSelector | 調用JNI的poll和處理就緒的SelectionKey |
StartLock startLock | 新增的輔助線程使用該鎖等待主線程的開始信號 |
FinishLock finishLock | 主線程用該鎖等待全部輔助線程執行完畢 |
用於存放Channel
,Selector
以及存放Channel註冊時的事件掩碼。多線程
SelectionKeyImpl
SelectionKeyImpl
加入到SelectionKeyImpl[] channelArray
SelectionKeyImpl
的對應關係加入到FdMap fdMap
PollArrayWrapper pollWrapper
中。PollArrayWrapper
用於存放文件描述符的文件描述符和事件掩碼的native數組。相關的文件描述符的結構以下圖:併發
其中每項的結構以下:app
名稱 | 大小 | 說明 |
---|---|---|
SOCKET fd | 4字節 | 存放Socket文件句柄 |
short events | 2字節 | 等待的事件掩碼 |
short reevents | 2字節 | 實際發生的事件掩碼,暫時美有用到 |
如上所示,每項爲8字節,即爲SIZE_POLLFD
的值,目前NIO實際只用前兩個字段。
class PollArrayWrapper {
private AllocatedNativeObject pollArray; // The fd array
long pollArrayAddress; // pollArrayAddress
static short SIZE_POLLFD = 8; // sizeof pollfd struct
private int size; // Size of the pollArray
PollArrayWrapper(int newSize) {
int allocationSize = newSize * SIZE_POLLFD;
pollArray = new AllocatedNativeObject(allocationSize, true);
pollArrayAddress = pollArray.address();
this.size = newSize;
}
...
}
複製代碼
在 PollArrayWrapper
內部使用 AllocatedNativeObject
對象建立的堆外(native)內存對象。 將數組的首地址保存到pollArrayAddress
中,在調用Poll
的時候須要傳遞該參數給JNI。
PollArrayWrapper
暴露了讀寫FD和Event的方法供WindowsSelectorImpl
使用。
void putDescriptor(int i, int fd) {
pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);
}
void putEventOps(int i, int event) {
pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event);
}
int getEventOps(int i) {
return pollArray.getShort(SIZE_POLLFD * i + EVENT_OFFSET);
}
int getDescriptor(int i) {
return pollArray.getInt(SIZE_POLLFD * i + FD_OFFSET);
}
複製代碼
因爲select最大一次性獲取1024個文件描述符。所以爲了提升poll的性能 WindowsSelectorImpl
底層 經過引入多個輔助線程的方式實現多線程poll以提升高併發時的性能問題。 咱們先看一下注冊的邏輯
protected void implRegister(SelectionKeyImpl ski) {
synchronized (closeLock) {
if (pollWrapper == null)
throw new ClosedSelectorException();
//判斷是否須要擴容隊列以及添加輔助線程
growIfNeeded();
//保存到緩存中
channelArray[totalChannels] = ski;
//保存在數組中的位置
ski.setIndex(totalChannels);
//保存文件描述符和SelectionKeyImpl的映射關係到FDMap
fdMap.put(ski);
//保存到keys中
keys.add(ski);
//保存文件描述符和事件到native數組中
pollWrapper.addEntry(totalChannels, ski);
totalChannels++;
}
}
複製代碼
在註冊以前會先會判斷當前註冊的Channel
數量 是否達到須要啓動輔助線程的閾值。若是達到閾值則須要擴容pollWrapper
數組,同時還要 將wakeupSourceFd
加入到擴容後的第一個位置 (具體做用下面會講解)。
private void growIfNeeded() {
if (channelArray.length == totalChannels) {
//channel數組已滿,擴容兩倍
int newSize = totalChannels * 2; // Make a larger array
SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1);
channelArray = temp;
//文件描述符數組擴容
pollWrapper.grow(newSize);
}
//達到最大文件描述符數量時添加輔助線程
if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed
//將喚醒的文件描述符加入到擴容後的第一個位置。
pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels);
totalChannels++;
//添加線程數
threadsCount++;
}
}
複製代碼
擴容PollArrayWrapper
pollWrapper.grow(newSize);
void grow(int newSize) {
//建立新的數組
PollArrayWrapper temp = new PollArrayWrapper(newSize);
for (int i = 0; i < size; i++)
//將原來的數組的內容存放到新的數組中
replaceEntry(this, i, temp, i);
//釋放原來的數組
pollArray.free();
//更新引用
pollArray = temp.pollArray;
//更新大小
this.size = temp.size;
//更新地址
pollArrayAddress = pollArray.address();
}
複製代碼
擴容完成時,須要添加一個輔助線程以並行的處理全部文件描述符。主線程處理前1024個文件描述符,第二個輔助線程處理1025到2048的文件描述符,以此類推。 這樣使得主線程調用poll的時候,經過多線程並行執行一次性獲取到全部的已就緒的文件描述符,從而提升在高併發時的poll的性能。
每1024個PollFD的第一個句柄都要設置爲wakeupSourceFd
,所以在擴容的時候也須要將新的位置的第一個設置爲wakeupSourceFd
,該線程的目的是爲了喚醒輔助線程 。當多個線程阻塞在Poll
,若此時主線程已經處理完成,則須要等待全部輔助線程完成,經過向wakeupSourceFd
發送信號以激活Poll
不在阻塞。
如今咱們知道了windows下poll多線程的使用方法,由於多線程poll還須要其餘的數據結構支持同步,具體的多線程執行邏輯咱們下面再討論。
FDMap只是爲了保存文件描述符句柄和SelectionKey
的關係,前面咱們提到了PollFD的數據結構包含了文件描述符句柄信息,所以咱們能夠經過文件描述符句柄從FdMap中獲取到對應的SelectionKey
。
private final static class FdMap extends HashMap<Integer, MapEntry> {
static final long serialVersionUID = 0L;
private MapEntry get(int desc) {
return get(new Integer(desc));
}
private MapEntry put(SelectionKeyImpl ski) {
return put(new Integer(ski.channel.getFDVal()), new MapEntry(ski));
}
private MapEntry remove(SelectionKeyImpl ski) {
Integer fd = new Integer(ski.channel.getFDVal());
MapEntry x = get(fd);
if ((x != null) && (x.ski.channel == ski.channel))
return remove(fd);
return null;
}
}
複製代碼
SubSelector
封裝了調用JNI poll的邏輯,以及獲取就緒SelectionKey
的方法。
主線程和每個子線程都有一個SubSelector
,其內存保存了poll獲取到的可讀文件描述符,可寫文件描述符以及異常的文件描述符。這樣每一個線程就有本身單獨的就緒文件描述符數組。
private final int pollArrayIndex;
private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];
複製代碼
pollArrayIndex
記錄了當前SubSelector
的序號,在調用poll的時候,須要將文件描述符數組的地址傳遞給JNI中,因爲咱們有多個線程一塊兒調用poll,且每一個線程處理1024個Channel
。經過序號和數組的地址計算當前SubSelector
所負責哪些通道。
private int poll() throws IOException{ // poll for the main thread
return poll0(pollWrapper.pollArrayAddress,
Math.min(totalChannels, MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout);
}
private int poll(int index) throws IOException {
// poll for helper threads
return poll0(pollWrapper.pollArrayAddress +
(pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
Math.min(MAX_SELECTABLE_FDS,
totalChannels - (index + 1) * MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout);
}
private native int poll0(long pollAddress, int numfds, int[] readFds, int[] writeFds, int[] exceptFds, long timeout);
複製代碼
在主線程調用poll以後,會獲取到已就緒的文件描述符(包含可讀、可寫、異常)。經過調用processSelectedKeys
將就緒的文件描述符對應的SelectorKey
加入到selectedKeys
中。這樣咱們外部就能夠調用到全部就緒的SelectorKey
進行遍歷處理。
private int processSelectedKeys(long updateCount) {
int numKeysUpdated = 0;
numKeysUpdated += processFDSet(updateCount, readFds,
Net.POLLIN,
false);
numKeysUpdated += processFDSet(updateCount, writeFds,
Net.POLLCONN |
Net.POLLOUT,
false);
numKeysUpdated += processFDSet(updateCount, exceptFds,
Net.POLLIN |
Net.POLLCONN |
Net.POLLOUT,
true);
return numKeysUpdated;
}
複製代碼
可讀文件描述符,可寫文件描述符以及異常文件描述符的處理邏輯都是同樣的,調用processFDSet
處理更新SelectorKey
的就緒事件。這裏會傳入文件描述符的數組。須要注意的是文件描述符第一個元素是數組的長度。
private int processFDSet(long updateCount, int[] fds, int rOps, boolean isExceptFds) {
int numKeysUpdated = 0;
//1. 遍歷文件描述符數組
for (int i = 1; i <= fds[0]; i++) {
//獲取文件描述符句柄值
int desc = fds[i];
//2. 判斷當前文件描述符是不是用於喚醒的文件描述
if (desc == wakeupSourceFd) {
synchronized (interruptLock) {
interruptTriggered = true;
}
continue;
}
//3. 獲取文件描述符句柄對應的SelectionKey的映射值
MapEntry me = fdMap.get(desc);
// 4. 若爲空,則表示已經被取消。
if (me == null)
continue;
SelectionKeyImpl sk = me.ski;
// 5. 丟棄OOD數據(緊急數據)
if (isExceptFds &&
(sk.channel() instanceof SocketChannelImpl) &&
discardUrgentData(desc))
{
continue;
}
//6. 判斷key是否已經就緒,若已就緒,則將當前操做累加到原來的操做上,好比原來寫事件就緒,如今讀事件就緒,就須要更新該key讀寫就緒
if (selectedKeys.contains(sk)) { // Key in selected set
//clearedCount 和 updateCount用於避免同一個key的事件設置屢次,由於同一個文件描述符可能在可讀文件描述符數組也可能在異常文件描述符數組中。
if (me.clearedCount != updateCount) {
if (sk.channel.translateAndSetReadyOps(rOps, sk) &&
(me.updateCount != updateCount)) {
me.updateCount = updateCount;
numKeysUpdated++;
}
} else { // The readyOps have been set; now add
if (sk.channel.translateAndUpdateReadyOps(rOps, sk) &&
(me.updateCount != updateCount)) {
me.updateCount = updateCount;
numKeysUpdated++;
}
}
me.clearedCount = updateCount;
} else { // Key is not in selected set yet
//key原來未就緒,將key加入selectedKeys中
if (me.clearedCount != updateCount) {
sk.channel.translateAndSetReadyOps(rOps, sk);
if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
selectedKeys.add(sk);
me.updateCount = updateCount;
numKeysUpdated++;
}
} else { // The readyOps have been set; now add
sk.channel.translateAndUpdateReadyOps(rOps, sk);
if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
selectedKeys.add(sk);
me.updateCount = updateCount;
numKeysUpdated++;
}
}
me.clearedCount = updateCount;
}
}
return numKeysUpdated;
}
複製代碼
wakeupSourceFd
,前面說了該文件描述符用於喚醒。discardUrgentData
讀取並忽略。如今大部分數據結構都已經介紹了,在談論Pipe、StartLock和FinishLock以前,是時候引入多線程Poll功能了,在談論多線程時,會對上述三個數據結構和功能進行詳細說明。
首先咱們先看一下建立WindowsSelectorImpl
作了什麼
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
pollWrapper = new PollArrayWrapper(INIT_CAP);
wakeupPipe = Pipe.open();
wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
// Disable the Nagle algorithm so that the wakeup is more immediate
SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
(sink.sc).socket().setTcpNoDelay(true);
wakeupSinkFd = ((SelChImpl)sink).getFDVal();
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
複製代碼
PollArrayWrapper
wakeupSourceFd
存到PollArrayWapper
每1024個元素的第一個位置。使得每一個線程都能被wakeupSourceFd
喚醒。因爲select最大支持1024個句柄,這裏第一個文件描述符是wakeupSourceFd
,因此一個線程實際最多併發處理1023個socket文件描述符。
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
void addWakeupSocket(int fdVal, int index) {
putDescriptor(index, fdVal);
putEventOps(index, Net.POLLIN);
}
複製代碼
如今咱們看一下doSelect邏輯
protected int doSelect(long timeout) throws IOException {
if (channelArray == null)
throw new ClosedSelectorException();
this.timeout = timeout; // set selector timeout
//1. 刪除取消的key
processDeregisterQueue();
if (interruptTriggered) {
resetWakeupSocket();
return 0;
}
//2. 調整線程數 ,等待運行
adjustThreadsCount();
//3. 設置輔助線程數
finishLock.reset();
//4. 開始運行新增的輔助線程
startLock.startThreads();
try {
begin();
try {
//5. 獲取就緒文件描述符
subSelector.poll();
} catch (IOException e) {
finishLock.setException(e); // Save this exception
}
//6. 等待全部輔助線程完成
if (threads.size() > 0)
finishLock.waitForHelperThreads();
} finally {
end();
}
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
finishLock.checkForException();
//7. 再次檢查刪除取消的key
processDeregisterQueue();
//8. 將就緒的key加入到selectedKeys中
int updated = updateSelectedKeys();
// 完成,重置喚醒標記下次在運行。
resetWakeupSocket();
return updated;
}
複製代碼
cancelledKeys
中。protected final void implCloseChannel() throws IOException {
implCloseSelectableChannel();
synchronized (keyLock) {
int count = (keys == null) ? 0 : keys.length;
for (int i = 0; i < count; i++) {
SelectionKey k = keys[i];
if (k != null)
k.cancel();
}
}
}
public final void cancel() {
...
((AbstractSelector)selector()).cancel(this);
...
}
void cancel(SelectionKey k) { // package-private
synchronized (cancelledKeys) {
cancelledKeys.add(k);
}
}
複製代碼
調用processDeregisterQueue
進行註銷。
processDeregisterQueue();
//遍歷全部已取消的key,取消他們
void processDeregisterQueue() throws IOException {
// Precondition: Synchronized on this, keys, and selectedKeys
Set<SelectionKey> cks = cancelledKeys();
synchronized (cks) {
if (!cks.isEmpty()) {
//遍歷全部key
Iterator<SelectionKey> i = cks.iterator();
while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
try {
//註銷key
implDereg(ski);
} catch (SocketException se) {
throw new IOException("Error deregistering key", se);
} finally {
i.remove();
}
}
}
}
}
protected void implDereg(SelectionKeyImpl ski) throws IOException{
int i = ski.getIndex();
assert (i >= 0);
synchronized (closeLock) {
if (i != totalChannels - 1) {
// 把最後一個通道複製到取消key所在的位置。
SelectionKeyImpl endChannel = channelArray[totalChannels-1];
channelArray[i] = endChannel;
endChannel.setIndex(i);
pollWrapper.replaceEntry(pollWrapper, totalChannels - 1,
pollWrapper, i);
}
ski.setIndex(-1);
}
//將最後一個通道清空。
channelArray[totalChannels - 1] = null;
totalChannels--;
//判斷是否須要減小一個輔助線程。
if ( totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) {
totalChannels--;
threadsCount--; // The last thread has become redundant.
}
//清除對應的緩存。
fdMap.remove(ski); // Remove the key from fdMap, keys and selectedKeys
keys.remove(ski);
selectedKeys.remove(ski);
//設置key無效
deregister(ski);
SelectableChannel selch = ski.channel();
if (!selch.isOpen() && !selch.isRegistered())
//關閉文件描述符
((SelChImpl)selch).kill();
}
//將全部key都設置爲無效
protected final void deregister(AbstractSelectionKey key) {
((AbstractSelectableChannel)key.channel()).removeKey(key);
}
void removeKey(SelectionKey k) { // package-private
synchronized (keyLock) {
for (int i = 0; i < keys.length; i++)
if (keys[i] == k) {
keys[i] = null;
keyCount--;
}
//將key設置爲無效
((AbstractSelectionKey)k).invalidate();
}
}
複製代碼
channelArray
中刪除。fdMap
、keys
、selectedKeys
數據緩存。((SelChImpl)selch).kill();
是在各個Channel中實現的,以SocketChannel爲例,最終會調用nd.close(fd);
關閉對應的文件描述符
private void adjustThreadsCount() {
//當線程大於實際線程,建立更多線程
if (threadsCount > threads.size()) {
// More threads needed. Start more threads.
for (int i = threads.size(); i < threadsCount; i++) {
SelectThread newThread = new SelectThread(i);
threads.add(newThread);
//設置爲守護線程
newThread.setDaemon(true);
newThread.start();
}
} else if (threadsCount < threads.size()) {
// 當線程小於實際線程,移除線程。
for (int i = threads.size() - 1 ; i >= threadsCount; i--)
threads.remove(i).makeZombie();
}
}
複製代碼
在建立新的線程時,會記錄上一次運行的數量保存到lastRun
變量中
private SelectThread(int i) {
this.index = i;
this.subSelector = new SubSelector(i);
//make sure we wait for next round of poll
this.lastRun = startLock.runsCounter;
}
複製代碼
當線程啓動時會等待主線程激活
public void run() {
while (true) { // poll loop
//等待主線程信號激活
if (startLock.waitForStart(this))
return;
// call poll()
try {
subSelector.poll(index);
} catch (IOException e) {
// Save this exception and let other threads finish.
finishLock.setException(e);
}
// 通知主線程完成.
finishLock.threadFinished();
}
}
複製代碼
經過startLock
等待主線程的開始信號。若當前線程是新啓動的線程,則runsCounter == thread.lastRun
爲真,此時新的線程須要等待主線程調用啓動。
startLock.waitForStart(this)
private synchronized boolean waitForStart(SelectThread thread) {
while (true) {
while (runsCounter == thread.lastRun) {
try {
startLock.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if (thread.isZombie()) { // redundant thread
return true; // will cause run() to exit.
} else {
thread.lastRun = runsCounter; // update lastRun
return false; // will cause run() to poll.
}
}
}
}
複製代碼
記錄當前輔助線程數量,下次新增的輔助線程須要等待主線程通知啓動。
finishLock.reset();
private void reset() {
threadsToFinish = threads.size(); // helper threads
}
複製代碼
startLock.startThreads();
private synchronized void startThreads() {
runsCounter++; // next run
notifyAll(); // 通知全部輔助線程繼續執行,
}
複製代碼
subSelector.poll();
//主線程調用
private int poll() throws IOException{
return poll0(pollWrapper.pollArrayAddress,
Math.min(totalChannels, MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout);
}
//輔助線程調用
private int poll(int index) throws IOException {
return poll0(pollWrapper.pollArrayAddress +
(pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
Math.min(MAX_SELECTABLE_FDS,
totalChannels - (index + 1) * MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout);
}
複製代碼
輔助線程和主線程調用的區別就是存放PollFD的位置變化,每一個線程會有1024個PollFD(8B)的位置存放PollFD。這樣使得多個線程的數據內存分離互不影響。 下面看一下JNI的poll0
作了什麼處理。下面羅略了主要的邏輯
typedef struct {
jint fd;
jshort events;
} pollfd;
Java_sun_nio_ch_WindowsSelectorImpl_00024SubSelector_poll0(JNIEnv *env, jobject this,
jlong pollAddress, jint numfds,
jintArray returnReadFds, jintArray returnWriteFds,
jintArray returnExceptFds, jlong timeout)
{
DWORD result = 0;
pollfd *fds = (pollfd *) pollAddress;
int i;
FD_SET readfds, writefds, exceptfds;
struct timeval timevalue, *tv;
static struct timeval zerotime = {0, 0};
...
/* Call select */
if ((result = select(0 , &readfds, &writefds, &exceptfds, tv))
== SOCKET_ERROR) {
//當出現錯誤時,變量每一個socket獲取它的就緒狀態
FD_SET errreadfds, errwritefds, errexceptfds;
...
for (i = 0; i < numfds; i++) {
errreadfds.fd_count = 0;
errwritefds.fd_count = 0;
if (fds[i].events & POLLIN) {
errreadfds.fd_array[0] = fds[i].fd;
errreadfds.fd_count = 1;
}
if (fds[i].events & (POLLOUT | POLLCONN))
{
errwritefds.fd_array[0] = fds[i].fd;
errwritefds.fd_count = 1;
}
errexceptfds.fd_array[0] = fds[i].fd;
errexceptfds.fd_count = 1;
//遍歷每一個socket,探測它的狀態
/* call select on the i-th socket */
if (select(0, &errreadfds, &errwritefds, &errexceptfds, &zerotime)
== SOCKET_ERROR) {
/* This socket causes an error. Add it to exceptfds set */
exceptfds.fd_array[exceptfds.fd_count] = fds[i].fd;
exceptfds.fd_count++;
} else {
...
}
}
}
}
/* Return selected sockets. */
/* Each Java array consists of sockets count followed by sockets list */
...
(*env)->SetIntArrayRegion(env, returnReadFds, 0,
readfds.fd_count + 1, (jint *)&readfds);
(*env)->SetIntArrayRegion(env, returnWriteFds, 0,
writefds.fd_count + 1, (jint *)&writefds);
(*env)->SetIntArrayRegion(env, returnExceptFds, 0,
exceptfds.fd_count + 1, (jint *)&exceptfds);
return 0;
}
複製代碼
pollfd *fds = (pollfd *) pollAddress;
將pollAddress的地址轉換爲polldf的數組結構。這裏會自動內存對齊,pollfd一共只有6個字節,第一個是int類型的文件描述符句柄,第二個是short類型的等待事件掩碼值。第二個short後會填充2B,所以每一個pollFD是8B。而實際後面2字節用於存放實際發生事件的事件掩碼。
discardUrgentData
進行清理。JNIEXPORT jboolean JNICALL Java_sun_nio_ch_WindowsSelectorImpl_discardUrgentData(JNIEnv* env, jobject this, jint s) {
char data[8];
jboolean discarded = JNI_FALSE;
int n;
do {
//讀取MSG_OOB數據
n = recv(s, (char*)&data, sizeof(data), MSG_OOB);
if (n > 0) {
//讀取到設置標記爲true
discarded = JNI_TRUE;
}
} while (n > 0);
return discarded;
}
複製代碼
若是timeval爲{0,0},則select()當即返回,這可用於探詢所選套接口的狀態。若是處於這種狀態,則select()調用可認爲是非阻塞的,且一切適用於非阻塞調用的假設都適用於它。
當獲取到全部的就緒的文件描述符時,須要保存到返回結果中,同時讀寫和異常的返回結果的數組第一個爲就緒的長度值。
等待全部輔助線程完成,當主線程完成時會當即調用wakeup
向wakeupSourceFd
發生數據以觸發輔助線程喚醒。輔助線程喚醒後也會調用wakeup
一次。當輔助線程都被喚醒後就會通知主線程。
if (threads.size() > 0)
finishLock.waitForHelperThreads();
private synchronized void waitForHelperThreads() {
if (threadsToFinish == threads.size()) {
// no helper threads finished yet. Wakeup them up.
wakeup();
}
while (threadsToFinish != 0) {
try {
finishLock.wait();
} catch (InterruptedException e) {
// Interrupted - set interrupted state.
Thread.currentThread().interrupt();
}
}
}
private synchronized void threadFinished() {
if (threadsToFinish == threads.size()) { // finished poll() first
// if finished first, wakeup others
wakeup();
}
threadsToFinish--;
if (threadsToFinish == 0) // all helper threads finished poll().
notify(); // notify the main thread
}
複製代碼
若輔助線接收到數據,則它須要調用wakeup
來喚醒其餘輔助線程,這樣使得主線程火輔助線程至少能調用一次wakeup
激活其餘輔助線程。wakeup
內部會調用setWakeupSocket
向wakeupSourceFd
發生一個信號。
public Selector wakeup() {
synchronized (interruptLock) {
if (!interruptTriggered) {
setWakeupSocket();
interruptTriggered = true;
}
}
return this;
}
//發生一個字節數據喚醒wakeupsocket
Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this,
jint scoutFd)
{
/* Write one byte into the pipe */
const char byte = 1;
send(scoutFd, &byte, 1, 0);
}
複製代碼
當主線被激活時,須要調用resetWakeupSocket
將wakeupSourceFd
的數據讀取出來。
private void resetWakeupSocket() {
synchronized (interruptLock) {
if (interruptTriggered == false)
return;
resetWakeupSocket0(wakeupSourceFd);
interruptTriggered = false;
}
}
//讀取wakeupsocket的數據。
Java_sun_nio_ch_WindowsSelectorImpl_resetWakeupSocket0(JNIEnv *env, jclass this,
jint scinFd)
{
char bytes[WAKEUP_SOCKET_BUF_SIZE];
long bytesToRead;
/* 獲取數據大小 */
ioctlsocket (scinFd, FIONREAD, &bytesToRead);
if (bytesToRead == 0) {
return;
}
/* 從緩衝區讀取全部數據 */
if (bytesToRead > WAKEUP_SOCKET_BUF_SIZE) {
char* buf = (char*)malloc(bytesToRead);
recv(scinFd, buf, bytesToRead, 0);
free(buf);
} else {
recv(scinFd, bytes, WAKEUP_SOCKET_BUF_SIZE, 0);
}
}
複製代碼
ioctlsocket()是一個計算機函數,功能是控制套接口的模式。可用於任一狀態的任一套接口。它用於獲取與套接口相關的操做參數,而與具體協議或通信子系統無關。第二個參數時對socket的操做命令
int updated = updateSelectedKeys();
private int updateSelectedKeys() {
updateCount++;
int numKeysUpdated = 0;
numKeysUpdated += subSelector.processSelectedKeys(updateCount);
for (SelectThread t: threads) {
numKeysUpdated += t.subSelector.processSelectedKeys(updateCount);
}
return numKeysUpdated;
}
複製代碼
若key首次被加入,則會調用translateAndSetReadyOps
,若key已經在selectKeys中,則會調用translateAndUpdateReadyOps
。這兩個方法都是調用translateReadyOps
,translateReadyOps
操做會將已就緒的操做保存。
public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) {
return translateReadyOps(ops, sk.nioReadyOps(), sk);
}
public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {
return translateReadyOps(ops, 0, sk);
}
複製代碼
關閉WindowsSelectorImpl時會將全部註冊的通道一同關閉
protected void implClose() throws IOException {
synchronized (closeLock) {
if (channelArray != null) {
if (pollWrapper != null) {
// prevent further wakeup
synchronized (interruptLock) {
interruptTriggered = true;
}
wakeupPipe.sink().close();
wakeupPipe.source().close();
//關閉全部channel
for(int i = 1; i < totalChannels; i++) { // Deregister channels
if (i % MAX_SELECTABLE_FDS != 0) { // skip wakeupEvent
deregister(channelArray[i]);
SelectableChannel selch = channelArray[i].channel();
if (!selch.isOpen() && !selch.isRegistered())
((SelChImpl)selch).kill();
}
}
//釋放數據
pollWrapper.free();
pollWrapper = null;
selectedKeys = null;
channelArray = null;
//釋放輔助線程
for (SelectThread t: threads)
t.makeZombie();
//喚醒輔助線程使其退出。
startLock.startThreads();
}
}
}
}
複製代碼
本文對WindowsSelectorImpl
的代碼實現進行詳細解析。下一篇將對Linux下的EpollSelectorImpl
的實現繼續講解。
- 微信掃一掃二維碼關注訂閱號傑哥技術分享
- 出處:www.cnblogs.com/Jack-Blog/p…
- 做者:傑哥很忙
- 本文使用「CC BY 4.0」創做共享協議。歡迎轉載,請在明顯位置給出出處及連接。