此係列文章會詳細解讀NIO的功能逐步豐滿的路程,爲Reactor-Netty 庫的講解鋪平道路。java
關於Java編程方法論-Reactor與Webflux的視頻分享,已經完成了Rxjava 與 Reactor,b站地址以下:linux
Rxjava源碼解讀與分享:www.bilibili.com/video/av345…git
Reactor源碼解讀與分享:www.bilibili.com/video/av353…github
本系列源碼解讀基於JDK11 api細節可能與其餘版本有所差異,請自行解決jdk版本問題。編程
本系列前幾篇:windows
如咱們在前面內容所講,在學生肯定以後,咱們就要對其狀態進行設定,而後再交由Selector
進行管理,其狀態的設定咱們就經過SelectionKey
來進行。數據結構
那這裏咱們先經過以前在Channel
中並未仔細講解的SelectableChannel
下的register
方法。咱們前面有提到過, SelectableChannel
將channel
打形成能夠經過Selector
來進行多路複用。做爲管理者,channel
想要實現複用,就必須在管理者這裏進行註冊登記。因此,SelectableChannel
下的register
方法也就是咱們值得二次關注的核心了,也是對接咱們接下來內容的切入點,對於register
方法的解讀,請看咱們以前的文章BIO到NIO源碼的一些事兒之NIO 上 中賦予Channel可被多路複用的能力這一節的內容。
這裏要記住的是SelectableChannel
是對接channel
特徵(即SelectionKey
)的關鍵所在,這有點相似於表設計,本來能夠將特徵什麼的設定在一張表內,但爲了操做更加具備針對性,即爲了讓代碼功能更易於管理,就進行抽取並設計了第二張表,這個就有點像人體器官,總體上你們共同協做完成一件事,但器官內部本身專一於本身的主要特定功能,偶爾也具有其餘器官的一些小功能。
由此,咱們也就能夠知道,SelectionKey
表示一個SelectableChannel
與Selector
關聯的標記,能夠簡單理解爲一個token
。就比如是咱們作權限管理系統用戶登陸後前臺會從後臺拿到的一個token
同樣,用戶能夠憑藉此token
來訪問操做相應的資源信息。
//java.nio.channels.spi.AbstractSelectableChannel#register
public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException { ...
synchronized (regLock) {
...
synchronized (keyLock) {
...
SelectionKey k = findKey(sel);
if (k != null) {
k.attach(att);
k.interestOps(ops);
} else {
// New registration
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
return k;
}
}
}
複製代碼
結合上下兩段源碼,在每次Selector
使用register
方法註冊channel
時,都會建立並返回一個SelectionKey
。
//sun.nio.ch.SelectorImpl#register
@Override
protected final SelectionKey register(AbstractSelectableChannel ch, int ops, Object attachment) {
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);
// register (if needed) before adding to key set
implRegister(k);
// add to the selector's key set, removing it immediately if the selector
// is closed. The key is not in the channel's key set at this point but
// it may be observed by a thread iterating over the selector's key set.
keys.add(k);
try {
k.interestOps(ops);
} catch (ClosedSelectorException e) {
assert ch.keyFor(this) == null;
keys.remove(k);
k.cancel();
throw e;
}
return k;
}
複製代碼
咱們在BIO到NIO源碼的一些事兒之NIO 上 中賦予Channel可被多路複用的能力這一節的內容知道,一旦註冊到Selector
上,Channel
將一直保持註冊直到其被解除註冊。在解除註冊的時候會解除Selector
分配給Channel
的全部資源。 也就是SelectionKey
在其調用SelectionKey#channel
方法,或這個key所表明的channel
關閉,抑或此key所關聯的Selector
關閉以前,都是有效。咱們在前面的文章分析中也知道,取消一個SelectionKey
,不會馬上從Selector
移除,它將被添加到Selector
的cancelledKeys
這個Set
集合中,以便在下一次選擇操做期間刪除,咱們能夠經過java.nio.channels.SelectionKey#isValid
判斷一個SelectionKey
是否有效。
SelectionKey包含四個操做集,每一個操做集用一個Int來表示,int值中的低四位的bit 用於表示channel
支持的可選操做種類。
/** * Operation-set bit for read operations. */
public static final int OP_READ = 1 << 0;
/** * Operation-set bit for write operations. */
public static final int OP_WRITE = 1 << 2;
/** * Operation-set bit for socket-connect operations. */
public static final int OP_CONNECT = 1 << 3;
/** * Operation-set bit for socket-accept operations. */
public static final int OP_ACCEPT = 1 << 4;
複製代碼
經過interestOps
來肯定了selector
在下一個選擇操做的過程當中將測試哪些操做類別的準備狀況,操做事件是不是channel
關注的。interestOps
在SelectionKey
建立時,初始化爲註冊Selector
時的ops值,這個值可經過sun.nio.ch.SelectionKeyImpl#interestOps(int)
來改變,這點咱們在SelectorImpl#register
能夠清楚的看到。
//sun.nio.ch.SelectionKeyImpl
public final class SelectionKeyImpl extends AbstractSelectionKey {
private static final VarHandle INTERESTOPS =
ConstantBootstraps.fieldVarHandle(
MethodHandles.lookup(),
"interestOps",
VarHandle.class,
SelectionKeyImpl.class, int.class);
private final SelChImpl channel;
private final SelectorImpl selector;
private volatile int interestOps;
private volatile int readyOps;
// registered events in kernel, used by some Selector implementations
private int registeredEvents;
// index of key in pollfd array, used by some Selector implementations
private int index;
SelectionKeyImpl(SelChImpl ch, SelectorImpl sel) {
channel = ch;
selector = sel;
}
...
}
複製代碼
readyOps
表示經過Selector
檢測到channel
已經準備就緒的操做事件。在SelectionKey
建立時(即上面源碼所示),readyOps
值爲0,在Selector
的select
操做中可能會更新,可是須要注意的是咱們不能直接調用來更新。
SelectionKey
的readyOps
表示一個channel
已經爲某些操做準備就緒,但不能保證在針對這個就緒事件類型的操做過程當中不會發生阻塞,即該操做所在線程有可能會發生阻塞。在完成select
操做後,大部分狀況下會當即對readyOps
更新,此時readyOps
值最準確,若是外部的事件或在該channel
有IO操做,readyOps
可能不許確。因此,咱們有看到其是volatile
類型。
SelectionKey
定義了全部的操做事件,可是具體channel
支持的操做事件依賴於具體的channel
,即具體問題具體分析。 全部可選擇的channel
(即SelectableChannel
的子類)均可以經過SelectableChannel#validOps
方法,判斷一個操做事件是否被channel
所支持,即每一個子類都會有對validOps
的實現,返回一個數字,僅標識channel
支持的哪些操做。嘗試設置或測試一個不被channel
所支持的操做設定,將會拋出相關的運行時異常。 不一樣應用場景下,其所支持的Ops
是不一樣的,摘取部分以下所示:
//java.nio.channels.SocketChannel#validOps
public final int validOps() {
//即1|4|8 1101
return (SelectionKey.OP_READ
| SelectionKey.OP_WRITE
| SelectionKey.OP_CONNECT);
}
//java.nio.channels.ServerSocketChannel#validOps
public final int validOps() {
// 16
return SelectionKey.OP_ACCEPT;
}
//java.nio.channels.DatagramChannel#validOps
public final int validOps() {
// 1|4
return (SelectionKey.OP_READ
| SelectionKey.OP_WRITE);
}
複製代碼
若是須要常常關聯一些咱們程序中指定數據到SelectionKey
,好比一個咱們使用一個object表示上層的一種高級協議的狀態,object用於通知實現協議處理器。因此,SelectionKey支持經過attach
方法將一個對象附加到SelectionKey
的attachment
上。attachment
能夠經過java.nio.channels.SelectionKey#attachment
方法進行訪問。若是要取消該對象,則能夠經過該種方式:selectionKey.attach(null)
。
須要注意的是若是附加的對象再也不使用,必定要人爲清除,若是沒有,假如此SelectionKey
一直存在,因爲此處屬於強引用,那麼垃圾回收器不會回收該對象,若不清除的話會成內存泄漏。
SelectionKey在由多線程併發使用時,是線程安全的。咱們只須要知道,Selector
的select
操做會一直使用在調用該操做開始時當前的interestOps
所設定的值。
到如今爲止,咱們已經多多少少接觸了Selector
,其是一個什麼樣的角色,想必都很清楚了,那咱們就在咱們已經接觸到的來進一步深刻探究Selector
的設計運行機制。
從命名上就能夠知道 SelectableChannel
對象是依靠Selector
來實現多路複用的。 咱們能夠經過調用java.nio.channels.Selector#open
來建立一個selector
對象:
//java.nio.channels.Selector#open
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
複製代碼
關於這個SelectorProvider.provider()
,其使用了根據所在系統的默認實現,我這裏是windows系統,那麼其默認實現爲sun.nio.ch.WindowsSelectorProvider
,這樣,就能夠調用基於相應系統的具體實現了。
//java.nio.channels.spi.SelectorProvider#provider
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
//sun.nio.ch.DefaultSelectorProvider
public class DefaultSelectorProvider {
/** * Prevent instantiation. */
private DefaultSelectorProvider() { }
/** * Returns the default SelectorProvider. */
public static SelectorProvider create() {
return new sun.nio.ch.WindowsSelectorProvider();
}
}
複製代碼
基於windows來說,selector這裏最終會使用sun.nio.ch.WindowsSelectorImpl
來作一些核心的邏輯。
public class WindowsSelectorProvider extends SelectorProviderImpl {
public AbstractSelector openSelector() throws IOException {
return new WindowsSelectorImpl(this);
}
}
複製代碼
這裏,咱們須要來看一下WindowsSelectorImpl
的構造函數:
//sun.nio.ch.WindowsSelectorImpl#WindowsSelectorImpl
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
pollWrapper = new PollArrayWrapper(INIT_CAP);
wakeupPipe = Pipe.open();
wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
// Disable the Nagle algorithm so that the wakeup is more immediate
SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
(sink.sc).socket().setTcpNoDelay(true);
wakeupSinkFd = ((SelChImpl)sink).getFDVal();
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
複製代碼
咱們由Pipe.open()
就可知道selector
會保持打開的狀態,直到其調用它的close
方法:
//java.nio.channels.spi.AbstractSelector#close
public final void close() throws IOException {
boolean open = selectorOpen.getAndSet(false);
if (!open)
return;
implCloseSelector();
}
//sun.nio.ch.SelectorImpl#implCloseSelector
@Override
public final void implCloseSelector() throws IOException {
wakeup();
synchronized (this) {
implClose();
synchronized (publicSelectedKeys) {
// Deregister channels
Iterator<SelectionKey> i = keys.iterator();
while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
deregister(ski);
SelectableChannel selch = ski.channel();
if (!selch.isOpen() && !selch.isRegistered())
((SelChImpl)selch).kill();
selectedKeys.remove(ski);
i.remove();
}
assert selectedKeys.isEmpty() && keys.isEmpty();
}
}
}
//sun.nio.ch.WindowsSelectorImpl#implClose
@Override
protected void implClose() throws IOException {
assert !isOpen();
assert Thread.holdsLock(this);
// prevent further wakeup
synchronized (interruptLock) {
interruptTriggered = true;
}
wakeupPipe.sink().close();
wakeupPipe.source().close();
pollWrapper.free();
// Make all remaining helper threads exit
for (SelectThread t: threads)
t.makeZombie();
startLock.startThreads();
}
複製代碼
能夠看到,前面的wakeupPipe
在close方法中關閉掉了。這裏的close方法中又涉及了wakeupPipe.sink()
與wakeupPipe.source()
的關閉與pollWrapper.free()
的釋放,此處也是咱們本篇的難點所在,這裏,咱們來看看它們究竟是什麼樣的存在。 首先,咱們對WindowsSelectorImpl(SelectorProvider sp)
這個構造函數作下梳理:
PollArrayWrapper
對象(pollWrapper
);Pipe.open()
打開一個管道;wakeupSourceFd
和wakeupSinkFd
兩個文件描述符;wakeupSourceFd
)放到pollWrapper
裏;這裏咱們會有疑惑,爲何要建立一個管道,它是用來作什麼的。
咱們來看Pipe.open()
源碼實現:
//java.nio.channels.Pipe#open
public static Pipe open() throws IOException {
return SelectorProvider.provider().openPipe();
}
//sun.nio.ch.SelectorProviderImpl#openPipe
public Pipe openPipe() throws IOException {
return new PipeImpl(this);
}
//sun.nio.ch.PipeImpl#PipeImpl
PipeImpl(final SelectorProvider sp) throws IOException {
try {
AccessController.doPrivileged(new Initializer(sp));
} catch (PrivilegedActionException x) {
throw (IOException)x.getCause();
}
}
private class Initializer implements PrivilegedExceptionAction<Void> {
private final SelectorProvider sp;
private IOException ioe = null;
private Initializer(SelectorProvider sp) {
this.sp = sp;
}
@Override
public Void run() throws IOException {
LoopbackConnector connector = new LoopbackConnector();
connector.run();
if (ioe instanceof ClosedByInterruptException) {
ioe = null;
Thread connThread = new Thread(connector) {
@Override
public void interrupt() {}
};
connThread.start();
for (;;) {
try {
connThread.join();
break;
} catch (InterruptedException ex) {}
}
Thread.currentThread().interrupt();
}
if (ioe != null)
throw new IOException("Unable to establish loopback connection", ioe);
return null;
}
複製代碼
從上述源碼咱們能夠知道,建立了一個PipeImpl
對象, 在PipeImpl
的構造函數裏會執行AccessController.doPrivileged
,在它調用後緊接着會執行Initializer
的run
方法:
//sun.nio.ch.PipeImpl.Initializer.LoopbackConnector
private class LoopbackConnector implements Runnable {
@Override
public void run() {
ServerSocketChannel ssc = null;
SocketChannel sc1 = null;
SocketChannel sc2 = null;
try {
// Create secret with a backing array.
ByteBuffer secret = ByteBuffer.allocate(NUM_SECRET_BYTES);
ByteBuffer bb = ByteBuffer.allocate(NUM_SECRET_BYTES);
// Loopback address
InetAddress lb = InetAddress.getLoopbackAddress();
assert(lb.isLoopbackAddress());
InetSocketAddress sa = null;
for(;;) {
// Bind ServerSocketChannel to a port on the loopback
// address
if (ssc == null || !ssc.isOpen()) {
ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(lb, 0));
sa = new InetSocketAddress(lb, ssc.socket().getLocalPort());
}
// Establish connection (assume connections are eagerly
// accepted)
sc1 = SocketChannel.open(sa);
RANDOM_NUMBER_GENERATOR.nextBytes(secret.array());
do {
sc1.write(secret);
} while (secret.hasRemaining());
secret.rewind();
// Get a connection and verify it is legitimate
sc2 = ssc.accept();
do {
sc2.read(bb);
} while (bb.hasRemaining());
bb.rewind();
if (bb.equals(secret))
break;
sc2.close();
sc1.close();
}
// Create source and sink channels
source = new SourceChannelImpl(sp, sc1);
sink = new SinkChannelImpl(sp, sc2);
} catch (IOException e) {
try {
if (sc1 != null)
sc1.close();
if (sc2 != null)
sc2.close();
} catch (IOException e2) {}
ioe = e;
} finally {
try {
if (ssc != null)
ssc.close();
} catch (IOException e2) {}
}
}
}
}
複製代碼
這裏即爲建立pipe
的過程,windows
下的實現是建立兩個本地的socketChannel
,而後鏈接(鏈接的過程經過寫一個隨機數據作兩個socket的鏈接校驗),兩個socketChannel
分別實現了管道pipe
的source
與sink
端。 而咱們依然不清楚這個pipe
到底幹什麼用的, 假如你們熟悉系統調用的C/C++
的話,就能夠知道,一個阻塞在select
上的線程有如下三種方式能夠被喚醒:
time out
。non-block
的信號。可由kill
或pthread_kill
發出。因此,Selector.wakeup()
要喚醒阻塞的select
,那麼也只能經過這三種方法,其中:
select
一旦阻塞,沒法修改其time out
時間。Linux
上實現,Windows
上沒有這種信號通知的機制。看來只有第一種方法了。假如咱們屢次調用Selector.open()
,那麼在Windows
上會每調用一次,就會創建一對本身和本身的loopback
的TCP
鏈接;在Linux上的話,每調用一次,會開一對pipe
(pipe在Linux下通常都成對打開),到這裏,估計咱們可以猜得出來——那就是若是想要喚醒select
,只須要朝着本身的這個loopback
鏈接發點數據過去,因而,就能夠喚醒阻塞在select
上的線程了。
咱們對上面所述作下總結:在Windows
下,Java
虛擬機在Selector.open()
時會本身和本身創建loopback
的TCP
鏈接;在Linux
下,Selector
會建立pipe
。這主要是爲了Selector.wakeup()
能夠方便喚醒阻塞在select()
系統調用上的線程(經過向本身所創建的TCP
連接和管道上隨便寫點什麼就能夠喚醒阻塞線程)。
在WindowsSelectorImpl
構造器最後,咱們看到這一句代碼:pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
,即把pipe內Source端的文件描述符(wakeupSourceFd
)放到pollWrapper
裏。pollWrapper
做爲PollArrayWrapper
的實例,它究竟是什麼,這一節,咱們就來對其探索一番。
class PollArrayWrapper {
private AllocatedNativeObject pollArray; // The fd array
long pollArrayAddress; // pollArrayAddress
@Native private static final short FD_OFFSET = 0; // fd offset in pollfd
@Native private static final short EVENT_OFFSET = 4; // events offset in pollfd
static short SIZE_POLLFD = 8; // sizeof pollfd struct
private int size; // Size of the pollArray
PollArrayWrapper(int newSize) {
int allocationSize = newSize * SIZE_POLLFD;
pollArray = new AllocatedNativeObject(allocationSize, true);
pollArrayAddress = pollArray.address();
this.size = newSize;
}
...
// Access methods for fd structures
void putDescriptor(int i, int fd) {
pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);
}
void putEventOps(int i, int event) {
pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event);
}
...
// Adds Windows wakeup socket at a given index.
void addWakeupSocket(int fdVal, int index) {
putDescriptor(index, fdVal);
putEventOps(index, Net.POLLIN);
}
}
複製代碼
這裏將wakeupSourceFd
的POLLIN
事件標識爲pollArray
的EventOps
的對應的值,這裏使用的是unsafe直接操做的內存,也就是相對於這個pollArray
所在內存地址的偏移量SIZE_POLLFD * i + EVENT_OFFSET
這個位置上寫入Net.POLLIN
所表明的值,即參考下面本地方法相關源碼所展現的值。putDescriptor
一樣是這種相似操做。當sink端
有數據寫入時,source
對應的文件描述符wakeupSourceFd
就會處於就緒狀態。
//java.base/windows/native/libnio/ch/nio_util.h
/* WSAPoll()/WSAPOLLFD and the corresponding constants are only defined */
/* in Windows Vista / Windows Server 2008 and later. If we are on an */
/* older release we just use the Solaris constants as this was previously */
/* done in PollArrayWrapper.java. */
#define POLLIN 0x0001
#define POLLOUT 0x0004
#define POLLERR 0x0008
#define POLLHUP 0x0010
#define POLLNVAL 0x0020
#define POLLCONN 0x0002
複製代碼
AllocatedNativeObject
這個類的父類有大量的unsafe
類的操做,這些都是直接基於內存級別的操做。從其父類的構造器中,咱們能也清楚的看到pollArray
是經過unsafe.allocateMemory(size + ps)
分配的一塊系統內存。
class AllocatedNativeObject // package-private extends NativeObject {
/** * Allocates a memory area of at least {@code size} bytes outside of the * Java heap and creates a native object for that area. */
AllocatedNativeObject(int size, boolean pageAligned) {
super(size, pageAligned);
}
/** * Frees the native memory area associated with this object. */
synchronized void free() {
if (allocationAddress != 0) {
unsafe.freeMemory(allocationAddress);
allocationAddress = 0;
}
}
}
//sun.nio.ch.NativeObject#NativeObject(int, boolean)
protected NativeObject(int size, boolean pageAligned) {
if (!pageAligned) {
this.allocationAddress = unsafe.allocateMemory(size);
this.address = this.allocationAddress;
} else {
int ps = pageSize();
long a = unsafe.allocateMemory(size + ps);
this.allocationAddress = a;
this.address = a + ps - (a & (ps - 1));
}
}
複製代碼
至此,咱們算是完成了對Selector.open()
的解讀,其主要任務就是完成創建Pipe
,並把pipe
source
端的wakeupSourceFd
放入pollArray
中,這個pollArray
是Selector
完成其角色任務的樞紐。本篇主要圍繞Windows的實現來進行分析,即在windows下經過兩個鏈接的socketChannel
實現了Pipe
,linux
下則直接使用系統的pipe
便可。
所謂的註冊,其實就是將一個對象放到註冊地對象內的一個容器字段上,這個字段能夠是數組,隊列,也能夠是一個set集合,也能夠是一個list。這裏,一樣是這樣,只不過,其須要有個返回值,那麼把這個要放入集合的對象返回便可。
//sun.nio.ch.SelectorImpl#register
@Override
protected final SelectionKey register(AbstractSelectableChannel ch, int ops, Object attachment) {
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);
// register (if needed) before adding to key set
implRegister(k);
// add to the selector's key set, removing it immediately if the selector
// is closed. The key is not in the channel's key set at this point but
// it may be observed by a thread iterating over the selector's key set.
keys.add(k);
try {
k.interestOps(ops);
} catch (ClosedSelectorException e) {
assert ch.keyFor(this) == null;
keys.remove(k);
k.cancel();
throw e;
}
return k;
}
//sun.nio.ch.WindowsSelectorImpl#implRegister
@Override
protected void implRegister(SelectionKeyImpl ski) {
ensureOpen();
synchronized (updateLock) {
newKeys.addLast(ski);
}
}
複製代碼
這段代碼咱們以前已經有看過,這裏咱們再次溫習下。 首先會新建一個SelectionKeyImpl
對象,這個對象就是對Channel
的包裝,不只如此,還順帶把當前這個Selector
對象給收了進去,這樣,咱們也能夠經過SelectionKey
的對象來拿到其對應的Selector
對象。
接着,基於windows
平臺實現的implRegister
,先經過ensureOpen()
來確保該Selector
是打開的。接着將這個SelectionKeyImpl
加入到WindowsSelectorImpl
內針對於新註冊SelectionKey進行管理的newKeys
之中,newKeys
是一個ArrayDeque
對象。對於ArrayDeque
有不懂的,能夠參考Java 容器源碼分析之 Deque 與 ArrayDeque這篇文章。
而後再將此這個SelectionKeyImpl
加入到sun.nio.ch.SelectorImpl#keys
中去,這個Set<SelectionKey>
集合表明那些已經註冊到當前這個Selector
對象上的SelectionKey
集合。咱們來看sun.nio.ch.SelectorImpl
的構造函數:
//sun.nio.ch.SelectorImpl#SelectorImpl
protected SelectorImpl(SelectorProvider sp) {
super(sp);
keys = ConcurrentHashMap.newKeySet();
selectedKeys = new HashSet<>();
publicKeys = Collections.unmodifiableSet(keys);
publicSelectedKeys = Util.ungrowableSet(selectedKeys);
}
複製代碼
也就是說,這裏的publicKeys
就來源於keys
,只是publicKeys
屬於只讀的,咱們想要知道當前Selector
對象上所註冊的keys
,就能夠調用sun.nio.ch.SelectorImpl#keys
來獲得:
//sun.nio.ch.SelectorImpl#keys
@Override
public final Set<SelectionKey> keys() {
ensureOpen();
return publicKeys;
}
複製代碼
再回到這個構造函數中,selectedKeys
,顧名思義,其屬於已選擇Keys,即前一次操做期間,已經準備就緒的Channel
所對應的SelectionKey
。此集合爲keys
的子集。經過selector.selectedKeys()
獲取。
//sun.nio.ch.SelectorImpl#selectedKeys
@Override
public final Set<SelectionKey> selectedKeys() {
ensureOpen();
return publicSelectedKeys;
}
複製代碼
咱們看到其返回的是publicSelectedKeys
,針對這個字段裏的元素操做能夠作刪除,但不能作增長。 在前面的內容中,咱們有涉及到SelectionKey
的取消,因此,咱們在java.nio.channels.spi.AbstractSelector
方法內,是有定義cancelledKeys
的,也是一個HashSet
對象。其表明已經被取消但還沒有取消註冊(deregister)的SelectionKey
。此Set集合沒法直接訪問,一樣,它也是keys()的子集。
對於新的Selector
實例,上面幾個集合均爲空。由上面展現的源碼可知,經過channel.register
將SelectionKey
添加keys
中,此爲key的來源。 若是某個selectionKey.cancel()
被調用,那麼此key將會被添加到cancelledKeys
這個集合中,而後在下一次調用selector select
方法期間,此時canceldKeys
不爲空,將會觸發此SelectionKey
的deregister
操做(釋放資源,並從keys
中移除)。不管經過channel.close()
仍是經過selectionKey.cancel()
,都會致使SelectionKey
被加入到cannceldKey
中.
每次選擇操做(select)期間,均可以將key添加到selectedKeys
中或者將從cancelledKeys
中移除。
瞭解了上面的這些,咱們來進入到select
方法中,觀察下它的細節。由Selector
的api可知,select
操做有兩種形式,一種爲 select(),selectNow(),select(long timeout);另外一種爲select(Consumer<SelectionKey> action, long timeout)
,select(Consumer<SelectionKey> action)
,selectNow(Consumer<SelectionKey> action)
。後者爲JDK11新加入的api,主要針對那些準備好進行I/O操做的channels在select過程當中對相應的key進行的一個自定義操做。
須要注意的是,有Consumer<SelectionKey> action
參數的select操做是阻塞的,只有在選擇了至少一個Channel的狀況下,纔會調用此Selector
實例的wakeup
方法來喚醒,一樣,其所在線程被打斷也能夠。
//sun.nio.ch.SelectorImpl
@Override
public final int select(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout");
return lockAndDoSelect(null, (timeout == 0) ? -1 : timeout);
}
//sun.nio.ch.SelectorImpl
@Override
public final int select(Consumer<SelectionKey> action, long timeout) throws IOException {
Objects.requireNonNull(action);
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout");
return lockAndDoSelect(action, (timeout == 0) ? -1 : timeout);
}
//sun.nio.ch.SelectorImpl#lockAndDoSelect
private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout) throws IOException {
synchronized (this) {
ensureOpen();
if (inSelect)
throw new IllegalStateException("select in progress");
inSelect = true;
try {
synchronized (publicSelectedKeys) {
return doSelect(action, timeout);
}
} finally {
inSelect = false;
}
}
}
複製代碼
咱們能夠觀察,不管哪一種,它們最後都落在了lockAndDoSelect
這個方法上,最終會執行特定系統上的doSelect(action, timeout)
實現。 這裏咱們以sun.nio.ch.WindowsSelectorImpl#doSelect
爲例來說述其操做執行的步驟:
// sun.nio.ch.WindowsSelectorImpl#doSelect
@Override
protected int doSelect(Consumer<SelectionKey> action, long timeout) throws IOException {
assert Thread.holdsLock(this);
this.timeout = timeout; // set selector timeout
processUpdateQueue(); // <1>
processDeregisterQueue(); // <2>
if (interruptTriggered) {
resetWakeupSocket();
return 0;
}
// Calculate number of helper threads needed for poll. If necessary
// threads are created here and start waiting on startLock
adjustThreadsCount();
finishLock.reset(); // reset finishLock
// Wakeup helper threads, waiting on startLock, so they start polling.
// Redundant threads will exit here after wakeup.
startLock.startThreads();
// do polling in the main thread. Main thread is responsible for
// first MAX_SELECTABLE_FDS entries in pollArray.
try {
begin();
try {
subSelector.poll(); // <3>
} catch (IOException e) {
finishLock.setException(e); // Save this exception
}
// Main thread is out of poll(). Wakeup others and wait for them
if (threads.size() > 0)
finishLock.waitForHelperThreads();
} finally {
end();
}
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
finishLock.checkForException();
processDeregisterQueue(); // <4>
int updated = updateSelectedKeys(action); // <5>
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
resetWakeupSocket(); // <6>
return updated;
}
複製代碼
首先經過相應操做系統實現類(此處是WindowsSelectorImpl)的具體實現咱們能夠知道,經過<1>
處的 processUpdateQueue()
得到關於每一個剩餘Channel
(有些Channel取消了)的在此刻的interestOps
,這裏包括新註冊的和updateKeys
,並對其進行pollWrapper
的管理操做。
即對於新註冊的
SelectionKeyImpl
,咱們在相對於這個pollArray
所在內存地址的偏移量SIZE_POLLFD * totalChannels + FD_OFFSET
與SIZE_POLLFD * totalChannels + EVENT_OFFSET
分別存入SelectionKeyImpl
的文件描述符fd
與其對應的EventOps
(初始爲0)。對
updateKeys
,由於是其以前已經在pollArray
的某個相對位置上存儲過,這裏咱們還須要對拿到的key的有效性進行判斷,若是有效,只須要將正在操做的這個SelectionKeyImpl
對象的interestOps
寫入到在pollWrapper
中的存放它的EventOps
位置上。
注意: 在對
newKeys
進行key的有效性判斷以後,若是有效,會調用growIfNeeded()
方法,這裏首先會判斷channelArray.length == totalChannels
,此爲一個SelectionKeyImpl
的數組,初始容量大小爲8。channelArray
其實就是方便Selector
管理在冊SelectionKeyImpl
數量的一個數組而已,經過判斷它的數組長度大小,若是和totalChannels
(初始值爲1)相等,不只僅是爲了channelArray
擴容,更重要的是爲了輔助pollWrapper
,讓pollWrapper
擴容纔是目的所在。而當
totalChannels % MAX_SELECTABLE_FDS == 0
時,則多開一個線程處理selector
。windows
上select
系統調用有最大文件描述符限制,一次只能輪詢1024
個文件描述符,若是多於1024個,須要多線程進行輪詢。同時調用pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels)
在相對於這個pollArray
所在內存地址的偏移量SIZE_POLLFD * totalChannels + FD_OFFSET
這個位置上寫入wakeupSourceFd
所表明的fdVal
值。這樣在新起的線程就能夠經過MAX_SELECTABLE_FDS
來肯定這個用來監控的wakeupSourceFd
,方便喚醒selector
。經過ski.setIndex(totalChannels)
記錄下SelectionKeyImpl
在數組中的索引位置,以待後續使用。
/** * sun.nio.ch.WindowsSelectorImpl#processUpdateQueue * Process new registrations and changes to the interest ops. */
private void processUpdateQueue() {
assert Thread.holdsLock(this);
synchronized (updateLock) {
SelectionKeyImpl ski;
// new registrations
while ((ski = newKeys.pollFirst()) != null) {
if (ski.isValid()) {
growIfNeeded();
channelArray[totalChannels] = ski;
ski.setIndex(totalChannels);
pollWrapper.putEntry(totalChannels, ski);
totalChannels++;
MapEntry previous = fdMap.put(ski);
assert previous == null;
}
}
// changes to interest ops
while ((ski = updateKeys.pollFirst()) != null) {
int events = ski.translateInterestOps();
int fd = ski.getFDVal();
if (ski.isValid() && fdMap.containsKey(fd)) {
int index = ski.getIndex();
assert index >= 0 && index < totalChannels;
pollWrapper.putEventOps(index, events);
}
}
}
}
//sun.nio.ch.PollArrayWrapper#putEntry
// Prepare another pollfd struct for use.
void putEntry(int index, SelectionKeyImpl ski) {
putDescriptor(index, ski.getFDVal());
putEventOps(index, 0);
}
//sun.nio.ch.WindowsSelectorImpl#growIfNeeded
private void growIfNeeded() {
if (channelArray.length == totalChannels) {
int newSize = totalChannels * 2; // Make a larger array
SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1);
channelArray = temp;
pollWrapper.grow(newSize);
}
if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed
pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels);
totalChannels++;
threadsCount++;
}
}
// Initial capacity of the poll array
private final int INIT_CAP = 8;
// Maximum number of sockets for select().
// Should be INIT_CAP times a power of 2
private static final int MAX_SELECTABLE_FDS = 1024;
// The list of SelectableChannels serviced by this Selector. Every mod
// MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll
// array, where the corresponding entry is occupied by the wakeupSocket
private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];
// The number of valid entries in poll array, including entries occupied
// by wakeup socket handle.
private int totalChannels = 1;
//sun.nio.ch.PollArrayWrapper#grow
// Grows the pollfd array to new size
void grow(int newSize) {
PollArrayWrapper temp = new PollArrayWrapper(newSize);
for (int i = 0; i < size; i++)
replaceEntry(this, i, temp, i);
pollArray.free();
pollArray = temp.pollArray;
this.size = temp.size;
pollArrayAddress = pollArray.address();
}
// Maps file descriptors to their indices in pollArray
private static final class FdMap extends HashMap<Integer, MapEntry> {
static final long serialVersionUID = 0L;
private MapEntry get(int desc) {
return get(Integer.valueOf(desc));
}
private MapEntry put(SelectionKeyImpl ski) {
return put(Integer.valueOf(ski.getFDVal()), new MapEntry(ski));
}
private MapEntry remove(SelectionKeyImpl ski) {
Integer fd = Integer.valueOf(ski.getFDVal());
MapEntry x = get(fd);
if ((x != null) && (x.ski.channel() == ski.channel()))
return remove(fd);
return null;
}
}
// class for fdMap entries
private static final class MapEntry {
final SelectionKeyImpl ski;
long updateCount = 0;
MapEntry(SelectionKeyImpl ski) {
this.ski = ski;
}
}
private final FdMap fdMap = new FdMap();
複製代碼
上面WindowsSelectorImpl#doSelect展現源碼中<2>
處的 processDeregisterQueue()
。
cancelledKeys
進行清除,遍歷cancelledKeys
,並對每一個key
進行deregister
操做,而後從cancelledKeys
集合中刪除,從keys
集合與selectedKeys
中刪除,以此來釋放引用,方便gc回收,implDereg
方法,將會從channelArray
中移除對應的Channel
表明的SelectionKeyImpl
,調整totalChannels
和線程數,從map
和keys
中移除SelectionKeyImpl
,移除Channel
上的SelectionKeyImpl
並關閉Channel
。processDeregisterQueue()
方法在調用poll
方法先後都進行調用,這是確保可以正確處理在調用poll
方法阻塞的這一段時間以內取消的鍵能被及時清理。cancelledKey
所表明的channel
是否打開和解除註冊,若是關閉並解除註冊,則應該將相應的文件描述符對應占用的資源給關閉掉。/** * sun.nio.ch.SelectorImpl#processDeregisterQueue * Invoked by selection operations to process the cancelled-key set */
protected final void processDeregisterQueue() throws IOException {
assert Thread.holdsLock(this);
assert Thread.holdsLock(publicSelectedKeys);
Set<SelectionKey> cks = cancelledKeys();
synchronized (cks) {
if (!cks.isEmpty()) {
Iterator<SelectionKey> i = cks.iterator();
while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
i.remove();
// remove the key from the selector
implDereg(ski);
selectedKeys.remove(ski);
keys.remove(ski);
// remove from channel's key set
deregister(ski);
SelectableChannel ch = ski.channel();
if (!ch.isOpen() && !ch.isRegistered())
((SelChImpl)ch).kill();
}
}
}
}
//sun.nio.ch.WindowsSelectorImpl#implDereg
@Override
protected void implDereg(SelectionKeyImpl ski) {
assert !ski.isValid();
assert Thread.holdsLock(this);
if (fdMap.remove(ski) != null) {
int i = ski.getIndex();
assert (i >= 0);
if (i != totalChannels - 1) {
// Copy end one over it
SelectionKeyImpl endChannel = channelArray[totalChannels-1];
channelArray[i] = endChannel;
endChannel.setIndex(i);
pollWrapper.replaceEntry(pollWrapper, totalChannels-1, pollWrapper, i);
}
ski.setIndex(-1);
channelArray[totalChannels - 1] = null;
totalChannels--;
if (totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) {
totalChannels--;
threadsCount--; // The last thread has become redundant.
}
}
}
//sun.nio.ch.SocketChannelImpl#kill
@Override
public void kill() throws IOException {
synchronized (stateLock) {
if (state == ST_KILLPENDING) {
state = ST_KILLED;
nd.close(fd);
}
}
}
//C:/Program Files/Java/jdk-11.0.1/lib/src.zip!/java.base/sun/nio/ch/SocketChannelImpl.java:1126
static {
IOUtil.load();
nd = new SocketDispatcher();
}
//sun.nio.ch.SocketDispatcher#close
void close(FileDescriptor fd) throws IOException {
close0(fd);
}
複製代碼
上面WindowsSelectorImpl#doSelect
展現源碼中adjustThreadsCount()
方法的調用。
totalChannels % MAX_SELECTABLE_FDS == 0
,則多開一個線程處理selector
。這裏就是根據分配的線程數量值來增長或減小線程,其實就是針對操做系統的最大select
操做的文件描述符限制對線程個數進行調整。SelectThread
的run
方法實現。經過觀察其源碼能夠看到它首先是while (true)
,經過startLock.waitForStart(this)
來控制該線程是否運行仍是等待,運行狀態的話,會進而調用subSelector.poll(index)
(這個咱們後面內容詳細解讀),poll
結束,並且相對於當前主線程假若有多條SelectThread
子線程的話,當前這條SelectThread
線程第一個結束poll
的話,就調用finishLock.threadFinished()
來通知主線程。在剛新建這個線程並調用其run
方法的時候,此時lastRun = 0
,在第一次啓動的時候sun.nio.ch.WindowsSelectorImpl.StartLock#runsCounter
一樣爲0,因此會調用startLock.wait()
進而進入等待狀態。注意:
sun.nio.ch.WindowsSelectorImpl.StartLock
一樣會判斷當前其所檢測的線程是否廢棄,廢棄的話就返回true
,這樣被檢測線程也就能跳出其內run方法的while
循環從而結束線程運行。- 在調整線程的時候(調用
adjustThreadsCount
方法)與Selector
調用close
方法會間接調用到sun.nio.ch.WindowsSelectorImpl#implClose
,這兩個方法都會涉及到Selector
線程的釋放,即調用sun.nio.ch.WindowsSelectorImpl.SelectThread#makeZombie
。finishLock.threadFinished()
會調用wakeup()
方法來通知主線程,這裏,咱們能夠學到一個細節,若是線程正阻塞在select
方法上,就能夠調用wakeup
方法會使阻塞的選擇操做當即返回,經過Windows
的相關實現,原理實際上是向pipe
的sink
端寫入了一個字節,source
文件描述符就會處於就緒狀態,poll
方法會返回,從而致使select
方法返回。而在其餘solaris或者linux系統上其實採用系統調用pipe
來完成管道的建立,至關於直接用了系統的管道。經過wakeup()
相關實現還能夠看出,調用wakeup
會設置interruptTriggered
的標誌位,因此連續屢次調用wakeup
的效果等同於一次調用,不會引發無所謂的bug出現。
//sun.nio.ch.WindowsSelectorImpl#adjustThreadsCount
// After some channels registered/deregistered, the number of required
// helper threads may have changed. Adjust this number.
private void adjustThreadsCount() {
if (threadsCount > threads.size()) {
// More threads needed. Start more threads.
for (int i = threads.size(); i < threadsCount; i++) {
SelectThread newThread = new SelectThread(i);
threads.add(newThread);
newThread.setDaemon(true);
newThread.start();
}
} else if (threadsCount < threads.size()) {
// Some threads become redundant. Remove them from the threads List.
for (int i = threads.size() - 1 ; i >= threadsCount; i--)
threads.remove(i).makeZombie();
}
}
//sun.nio.ch.WindowsSelectorImpl.SelectThread
// Represents a helper thread used for select.
private final class SelectThread extends Thread {
private final int index; // index of this thread
final SubSelector subSelector;
private long lastRun = 0; // last run number
private volatile boolean zombie;
// Creates a new thread
private SelectThread(int i) {
super(null, null, "SelectorHelper", 0, false);
this.index = i;
this.subSelector = new SubSelector(i);
//make sure we wait for next round of poll
this.lastRun = startLock.runsCounter;
}
void makeZombie() {
zombie = true;
}
boolean isZombie() {
return zombie;
}
public void run() {
while (true) { // poll loop
// wait for the start of poll. If this thread has become
// redundant, then exit.
if (startLock.waitForStart(this))
return;
// call poll()
try {
subSelector.poll(index);
} catch (IOException e) {
// Save this exception and let other threads finish.
finishLock.setException(e);
}
// notify main thread, that this thread has finished, and
// wakeup others, if this thread is the first to finish.
finishLock.threadFinished();
}
}
}
// sun.nio.ch.WindowsSelectorImpl.FinishLock#threadFinished
// Each helper thread invokes this function on finishLock, when
// the thread is done with poll().
private synchronized void threadFinished() {
if (threadsToFinish == threads.size()) { // finished poll() first
// if finished first, wakeup others
wakeup();
}
threadsToFinish--;
if (threadsToFinish == 0) // all helper threads finished poll().
notify(); // notify the main thread
}
//sun.nio.ch.WindowsSelectorImpl#wakeup
@Override
public Selector wakeup() {
synchronized (interruptLock) {
if (!interruptTriggered) {
setWakeupSocket();
interruptTriggered = true;
}
}
return this;
}
//sun.nio.ch.WindowsSelectorImpl#setWakeupSocket
// Sets Windows wakeup socket to a signaled state.
private void setWakeupSocket() {
setWakeupSocket0(wakeupSinkFd);
}
private native void setWakeupSocket0(int wakeupSinkFd);
JNIEXPORT void JNICALL Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this, jint scoutFd) {
/* Write one byte into the pipe */
const char byte = 1;
send(scoutFd, &byte, 1, 0);
}
複製代碼
subSelector.poll()
是select的核心,由native
函數poll0
實現,並把pollWrapper.pollArrayAddress
做爲參數傳給poll0
,readFds
、writeFds
和exceptFds
數組用來保存底層select
的結果,數組的第一個位置都是存放發生事件的socket
的總數,其他位置存放發生事件的socket
句柄fd
。 咱們經過下面的代碼可知: 這個poll0()
會監聽pollWrapper
中的FD
有沒有數據進出,這裏會形成IO
阻塞,直到有數據讀寫事件發生。因爲pollWrapper
中保存的也有ServerSocketChannel
的FD
,因此只要ClientSocket
發一份數據到ServerSocket
,那麼poll0()
就會返回;又因爲pollWrapper
中保存的也有pipe
的write
端的FD
,因此只要pipe
的write
端向FD
發一份數據,也會形成poll0()
返回;若是這兩種狀況都沒有發生,那麼poll0()
就一直阻塞,也就是selector.select()
會一直阻塞;若是有任何一種狀況發生,那麼selector.select()
就會返回,全部在SelectThread
的run()
裏要用while (true) {}
,這樣就能夠保證在selector
接收到數據並處理完後繼續監聽poll()
;能夠看出,NIO依然是阻塞式的IO,那麼它和BIO的區別究竟在哪呢。 其實它的區別在於阻塞的位置不一樣,
BIO
是阻塞在read
方法(recvfrom),而NIO
阻塞在select
方法。那麼這樣作有什麼好處呢。若是單純的改變阻塞的位置,天然是沒有什麼變化的,但epoll等
的實現的巧妙之處就在於,它利用回調機制,讓監聽可以只須要知曉哪些socket
上的數據已經準備好了,只須要處理這些線程上面的數據就好了。採用BIO
,假設有1000
個鏈接,須要開1000
個線程,而後有1000
個read
的位置在阻塞(咱們在講解BIO部分已經經過Demo體現),採用NIO
編程,只須要1個線程,它利用select
的輪詢策略配合epoll
的事件機制及紅黑樹數據結構,下降了其內部輪詢的開銷,同時極大的減少了線程上下文切換的開銷。
//sun.nio.ch.WindowsSelectorImpl.SubSelector
private final class SubSelector {
private final int pollArrayIndex; // starting index in pollArray to poll
// These arrays will hold result of native select().
// The first element of each array is the number of selected sockets.
// Other elements are file descriptors of selected sockets.
// 保存發生read的FD
private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
// 保存發生write的FD
private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
//保存發生except的FD
private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];
private SubSelector() {
this.pollArrayIndex = 0; // main thread
}
private SubSelector(int threadIndex) { // helper threads
this.pollArrayIndex = (threadIndex + 1) * MAX_SELECTABLE_FDS;
}
private int poll() throws IOException{ // poll for the main thread
return poll0(pollWrapper.pollArrayAddress,
Math.min(totalChannels, MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout);
}
private int poll(int index) throws IOException {
// poll for helper threads
return poll0(pollWrapper.pollArrayAddress +
(pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
Math.min(MAX_SELECTABLE_FDS,
totalChannels - (index + 1) * MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout);
}
private native int poll0(long pollAddress, int numfds, int[] readFds, int[] writeFds, int[] exceptFds, long timeout);
...
}
複製代碼
上面WindowsSelectorImpl#doSelect展現源碼中<5>
處的 updateSelectedKeys(action)
來處理每一個channel
的 準備就緒的信息。key
還沒有在selectedKeys
中存在,則將其添加到該集合中。key
已經存在selectedKeys
中,即這個channel
存在所支持的ReadyOps
就緒操做中必須包含一個這種操做(由(ski.nioReadyOps() & ski.nioInterestOps()) != 0
來肯定),此時修改其ReadyOps
爲當前所要進行的操做。而咱們以前看到的Consumer<SelectionKey>
這個動做也是在此處進行。而由下面源碼可知,先前記錄在ReadyOps
中的任何就緒信息在調用此action
以前被丟棄掉,直接進行設定。//sun.nio.ch.WindowsSelectorImpl#updateSelectedKeys
private int updateSelectedKeys(Consumer<SelectionKey> action) {
updateCount++;
int numKeysUpdated = 0;
numKeysUpdated += subSelector.processSelectedKeys(updateCount, action);
for (SelectThread t: threads) {
numKeysUpdated += t.subSelector.processSelectedKeys(updateCount, action);
}
return numKeysUpdated;
}
//sun.nio.ch.SelectorImpl#processReadyEvents
protected final int processReadyEvents(int rOps, SelectionKeyImpl ski, Consumer<SelectionKey> action) {
if (action != null) {
ski.translateAndSetReadyOps(rOps);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
action.accept(ski);
ensureOpen();
return 1;
}
} else {
assert Thread.holdsLock(publicSelectedKeys);
if (selectedKeys.contains(ski)) {
if (ski.translateAndUpdateReadyOps(rOps)) {
return 1;
}
} else {
ski.translateAndSetReadyOps(rOps);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
selectedKeys.add(ski);
return 1;
}
}
}
return 0;
}
//sun.nio.ch.WindowsSelectorImpl.SubSelector#processSelectedKeys
private int processSelectedKeys(long updateCount, Consumer<SelectionKey> action) {
int numKeysUpdated = 0;
numKeysUpdated += processFDSet(updateCount, action, readFds,
Net.POLLIN,
false);
numKeysUpdated += processFDSet(updateCount, action, writeFds,
Net.POLLCONN |
Net.POLLOUT,
false);
numKeysUpdated += processFDSet(updateCount, action, exceptFds,
Net.POLLIN |
Net.POLLCONN |
Net.POLLOUT,
true);
return numKeysUpdated;
}
/** * sun.nio.ch.WindowsSelectorImpl.SubSelector#processFDSet * updateCount is used to tell if a key has been counted as updated * in this select operation. * * me.updateCount <= updateCount */
private int processFDSet(long updateCount, Consumer<SelectionKey> action, int[] fds, int rOps, boolean isExceptFds) {
int numKeysUpdated = 0;
for (int i = 1; i <= fds[0]; i++) {
int desc = fds[i];
if (desc == wakeupSourceFd) {
synchronized (interruptLock) {
interruptTriggered = true;
}
continue;
}
MapEntry me = fdMap.get(desc);
// If me is null, the key was deregistered in the previous
// processDeregisterQueue.
if (me == null)
continue;
SelectionKeyImpl sk = me.ski;
// The descriptor may be in the exceptfds set because there is
// OOB data queued to the socket. If there is OOB data then it
// is discarded and the key is not added to the selected set.
if (isExceptFds &&
(sk.channel() instanceof SocketChannelImpl) &&
discardUrgentData(desc))
{
continue;
}
//咱們應該關注的
int updated = processReadyEvents(rOps, sk, action);
if (updated > 0 && me.updateCount != updateCount) {
me.updateCount = updateCount;
numKeysUpdated++;
}
}
return numKeysUpdated;
}
複製代碼
至此,關於Selector的內容就暫時告一段落,在下一篇中,我會針對Java NIO Buffer進行相關解讀。