As we mentioned in last articles, after students are ready, we need to set their status via SlectionKey
, then they'll be handled by Selector
.java
In last article, we didn't talked too much for register
in SelectableChannel
, but we mentioned we can use SelectableChannel
to make channel
become multiplexing by Selector
.git
As an administer, if channel
wanna implement multiplexing, we must tell administer to register our information. So register
in SelectableChannel
is the main point in the following. For details of register
, just check previous article.windows
We need to know SelectableChannel
implement Channel
which is the key point for SelectionKey
. This just likes table design. Briefly, we can put all properties into one table, but if we want to easily to write code and make our code more functionality, we need to separate the table into 2 tables. This way just like our body, each organ not only has their own different functions but also own the other organs' functions partially. When put them together, they can work well.api
Thus, we can consider SelectionKey
as a connection between SelectableChannel
and Selector
. Briefly, we can think it's a token
, users can use this token
to access the service.promise
//java.nio.channels.spi.AbstractSelectableChannel#register
public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException { ...
synchronized (regLock) {
...
synchronized (keyLock) {
...
SelectionKey k = findKey(sel);
if (k != null) {
k.attach(att);
k.interestOps(ops);
} else {
// New registration
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
return k;
}
}
}
複製代碼
//sun.nio.ch.SelectorImpl#register
@Override
protected final SelectionKey register(AbstractSelectableChannel ch, int ops, Object attachment) {
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);
// register (if needed) before adding to key set
implRegister(k);
// add to the selector's key set, removing it immediately if the selector
// is closed. The key is not in the channel's key set at this point but
// it may be observed by a thread iterating over the selector's key set.
keys.add(k);
try {
k.interestOps(ops);
} catch (ClosedSelectorException e) {
assert ch.keyFor(this) == null;
keys.remove(k);
k.cancel();
throw e;
}
return k;
}
複製代碼
From above 2 blocks of codes, when Selector
use register
to register Channel
. it will create and return SelectionKey
. We have mentioned once Channel
was registered on Selector
, it won't deregister until we do deregister and Selector
will release all resources assigned to related Channel
. Before Selector
or Channel
is closed, SelectionKey
will be always valid. Also, we have talked about the validation of SelectionKey
, when we want to cancel it, it will not be removed from Selector
immediately. It'll be added into cancelledKey
in Selector
. Then it will deleted in next operations. We can check the validation by using java.nio.channels.SelectionKey#isValid
.app
SelectionKey
has 4 operation-set. Each of them are represented by int value.socket
/** * Operation-set bit for read operations. */
public static final int OP_READ = 1 << 0;
/** * Operation-set bit for write operations. */
public static final int OP_WRITE = 1 << 2;
/** * Operation-set bit for socket-connect operations. */
public static final int OP_CONNECT = 1 << 3;
/** * Operation-set bit for socket-accept operations. */
public static final int OP_ACCEPT = 1 << 4;
複製代碼
Based on interestOps
. we can know the next operation of Selector
will be or operations listened by Channel
. We can change ops
value by sun.nio.ch.SelectionKeyImpl#interestOps(int)
when SelectionKey
is created. This we can see in the following:ide
//sun.nio.ch.SelectionKeyImpl
public final class SelectionKeyImpl extends AbstractSelectionKey {
private static final VarHandle INTERESTOPS =
ConstantBootstraps.fieldVarHandle(
MethodHandles.lookup(),
"interestOps",
VarHandle.class,
SelectionKeyImpl.class, int.class);
private final SelChImpl channel;
private final SelectorImpl selector;
private volatile int interestOps;
private volatile int readyOps;
// registered events in kernel, used by some Selector implementations
private int registeredEvents;
// index of key in pollfd array, used by some Selector implementations
private int index;
SelectionKeyImpl(SelChImpl ch, SelectorImpl sel) {
channel = ch;
selector = sel;
}
...
}
複製代碼
readyOps
means Selector
has receive the signal of events have been ready from Channel
. When SelectionKey
is created, the value of readyOps
is 0. The value may be updated when Selector
do select
. But we need to remember we can't invoke this api to update value directly.oop
readyOps
in SelectionKey
represents Channel
has been ready for the future operations, but it can't promise there will be no blocking happens. After finishing select
operation, readyOps
will be updated in most times. Thus, the current value of readyOps
must be correct. If external events have IO operations for the channel
, readyOps
may be not correct. Thus, it's volatile
type.ui
SelectionKey
has defined all operations. However, the supported operations will be dependent on supported channel
. All selectable channel
(subclass of SelectableChannel
) can use SelectableChannel#validOps
to check whether the operation can be supported by this Channel
. So every subclass will have their own implementation for validOps
and return a number to mark supported operations by Channel
. When we try to test a unsupported operation for the Channel
, it will throw Runtime Exception. Thus, the number of supported operations in different conditions are different:
//java.nio.channels.SocketChannel#validOps
public final int validOps() {
//1|4|8 1101
return (SelectionKey.OP_READ
| SelectionKey.OP_WRITE
| SelectionKey.OP_CONNECT);
}
//java.nio.channels.ServerSocketChannel#validOps
public final int validOps() {
// 16
return SelectionKey.OP_ACCEPT;
}
//java.nio.channels.DatagramChannel#validOps
public final int validOps() {
// 1|4
return (SelectionKey.OP_READ
| SelectionKey.OP_WRITE);
}
複製代碼
If we often need to link some specific data to SelectionKey
, we can use attach
in SelectionKey
to attach an Object on attachment
of SelectionKey
. attachment
can be accessed by invoking java.nio.channels.SelectionKey#attachment
. In another hand, if we want to cancel it, just selectionKey.attach(null)
.
If attached object won't be used any more, we need to clean it manually. If we don't do this, SlecletionKey
will always exist. Because this reference is a strong reference, so GC won't clean it and this will lead to memory leaks.
When SelectionKey
is used in concurrent, it's thread safe. We only need to know select
in Selector
will always use the value of interestOps
before select
is applied.
Now, we have know the role of Selector
, and we will analyse how it works in the following sections.
From the name of SelectableChannel
, we can know it can be used for multiplexing by Selector
. We can invoke java.nio.channels.Selector#open
to create selector
:
//java.nio.channels.Selector#open
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
複製代碼
For SelectorProvider.provider()
, it used default implementation chosen by System. If we use Windows, its default implementation will be sun.nio.ch.WindowsSelectorProvider
. Then we can invoke implementation based on our own Operation System:
//java.nio.channels.spi.SelectorProvider#provider
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
//sun.nio.ch.DefaultSelectorProvider
public class DefaultSelectorProvider {
/** - Prevent instantiation. */
private DefaultSelectorProvider() { }
/** - Returns the default SelectorProvider. */
public static SelectorProvider create() {
return new sun.nio.ch.WindowsSelectorProvider();
}
}
複製代碼
For Windows, selector will finally use sun.nio.ch.WindowsSelectorImpl
to do some core business.
public class WindowsSelectorProvider extends SelectorProviderImpl {
public AbstractSelector openSelector() throws IOException {
return new WindowsSelectorImpl(this);
}
}
複製代碼
Now we can check its constructor WindowsSelectorImpl
:
//sun.nio.ch.WindowsSelectorImpl#WindowsSelectorImpl
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
pollWrapper = new PollArrayWrapper(INIT_CAP);
wakeupPipe = Pipe.open();
wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
// Disable the Nagle algorithm so that the wakeup is more immediate
SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
(sink.sc).socket().setTcpNoDelay(true);
wakeupSinkFd = ((SelChImpl)sink).getFDVal();
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
複製代碼
Selector
will be closed until Pipe
is close.
//java.nio.channels.spi.AbstractSelector#close
public final void close() throws IOException {
boolean open = selectorOpen.getAndSet(false);
if (!open)
return;
implCloseSelector();
}
//sun.nio.ch.SelectorImpl#implCloseSelector
@Override
public final void implCloseSelector() throws IOException {
wakeup();
synchronized (this) {
implClose();
synchronized (publicSelectedKeys) {
// Deregister channels
Iterator<SelectionKey> i = keys.iterator();
while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
deregister(ski);
SelectableChannel selch = ski.channel();
if (!selch.isOpen() && !selch.isRegistered())
((SelChImpl)selch).kill();
selectedKeys.remove(ski);
i.remove();
}
assert selectedKeys.isEmpty() && keys.isEmpty();
}
}
}
//sun.nio.ch.WindowsSelectorImpl#implClose
@Override
protected void implClose() throws IOException {
assert !isOpen();
assert Thread.holdsLock(this);
// prevent further wakeup
synchronized (interruptLock) {
interruptTriggered = true;
}
wakeupPipe.sink().close();
wakeupPipe.source().close();
pollWrapper.free();
// Make all remaining helper threads exit
for (SelectThread t: threads)
t.makeZombie();
startLock.startThreads();
}
複製代碼
WakeipPipe
has been closed by close
. In close
, it used wakeupPipe.sink()
, close wakeupPipe.source
and released by pollWrapper.free()
. This is also the most difficult part in this article. Now we can follow it and see how it works.
At first, we can see WindowsSelectorImpl(SelectorProvider sp)
:
PollArrayWrapper
object which is pollWrapper
Pipe.open()
wakeipSourceFd
and wakeupSinkFd
.wakeupSourceFd
into pollWrapper
.Now, there's a question why we need to create a pipe and what does it do?
Just check the implementation of Pipe.open()
:
//java.nio.channels.Pipe#open
public static Pipe open() throws IOException {
return SelectorProvider.provider().openPipe();
}
//sun.nio.ch.SelectorProviderImpl#openPipe
public Pipe openPipe() throws IOException {
return new PipeImpl(this);
}
//sun.nio.ch.PipeImpl#PipeImpl
PipeImpl(final SelectorProvider sp) throws IOException {
try {
AccessController.doPrivileged(new Initializer(sp));
} catch (PrivilegedActionException x) {
throw (IOException)x.getCause();
}
}
private class Initializer implements PrivilegedExceptionAction<Void> {
private final SelectorProvider sp;
private IOException ioe = null;
private Initializer(SelectorProvider sp) {
this.sp = sp;
}
@Override
public Void run() throws IOException {
LoopbackConnector connector = new LoopbackConnector();
connector.run();
if (ioe instanceof ClosedByInterruptException) {
ioe = null;
Thread connThread = new Thread(connector) {
@Override
public void interrupt() {}
};
connThread.start();
for (;;) {
try {
connThread.join();
break;
} catch (InterruptedException ex) {}
}
Thread.currentThread().interrupt();
}
if (ioe != null)
throw new IOException("Unable to establish loopback connection", ioe);
return null;
}
複製代碼
From the code, it created a PipeImpl
object, in its constructor, it will execute AccessController.doPrivileged
then use run()
in Initializer
.
//sun.nio.ch.PipeImpl.Initializer.LoopbackConnector
private class LoopbackConnector implements Runnable {
@Override
public void run() {
ServerSocketChannel ssc = null;
SocketChannel sc1 = null;
SocketChannel sc2 = null;
try {
// Create secret with a backing array.
ByteBuffer secret = ByteBuffer.allocate(NUM_SECRET_BYTES);
ByteBuffer bb = ByteBuffer.allocate(NUM_SECRET_BYTES);
// Loopback address
InetAddress lb = InetAddress.getLoopbackAddress();
assert(lb.isLoopbackAddress());
InetSocketAddress sa = null;
for(;;) {
// Bind ServerSocketChannel to a port on the loopback
// address
if (ssc == null || !ssc.isOpen()) {
ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(lb, 0));
sa = new InetSocketAddress(lb, ssc.socket().getLocalPort());
}
// Establish connection (assume connections are eagerly
// accepted)
sc1 = SocketChannel.open(sa);
RANDOM_NUMBER_GENERATOR.nextBytes(secret.array());
do {
sc1.write(secret);
} while (secret.hasRemaining());
secret.rewind();
// Get a connection and verify it is legitimate
sc2 = ssc.accept();
do {
sc2.read(bb);
} while (bb.hasRemaining());
bb.rewind();
if (bb.equals(secret))
break;
sc2.close();
sc1.close();
}
// Create source and sink channels
source = new SourceChannelImpl(sp, sc1);
sink = new SinkChannelImpl(sp, sc2);
} catch (IOException e) {
try {
if (sc1 != null)
sc1.close();
if (sc2 != null)
sc2.close();
} catch (IOException e2) {}
ioe = e;
} finally {
try {
if (ssc != null)
ssc.close();
} catch (IOException e2) {}
}
}
}
}
複製代碼
This is the process for creating pipe
. For implementation in Windows, it will create 2 local socketChannel
, then do connection, these 2 socketChannel
implement the function of source
and sink
in pipe separately, but we still don't know what we can use pipe
do.
If you guys are familiar with C/C++
, we can know a thread which is blocked on select
, it can be awaken by the following 3 ways:
non-block
signal from kill
or pthread_kill
.So, if Selector.wakeup()
want to wake up select
in block, it only can use these 3 ways. In all of them:
select
, then timeout
value can't be changed anymore.Linux
, there's no any similar notifications on Windows
It seems we just can use the first method. If we invoke Selector.open()
for many times on Windows
, it will create a pair of TCP
connection between its loopback
and itself every time. However, for Linux
, it will open a pair of pipe
every time. Now, we know if we want to wake up select
on Windows, we just need to give a little data for loopback
, it will wake up the blocked thread on select
.
So just do a simple conclusion. For Windows
, JVM
will build TCP
connection. For Linux
, Selector
will build pipe
. We can use Selector.wakeup()
to wake the thread on select
easily.
At the end of construction of WindowsSelectorImpl
, we saw pollWrapper.addWakeupSocket(wakeupSourceFd,0)
which put the file descriptor for Source in pipe(wakeupSourceFd
) to pollWrapper
. Since pollWrapper
is PollArrayWrapper
's instance, so what is it? In this section, we will explore it.
class PollArrayWrapper {
private AllocatedNativeObject pollArray; // The fd array
long pollArrayAddress; // pollArrayAddress
@Native private static final short FD_OFFSET = 0; // fd offset in pollfd
@Native private static final short EVENT_OFFSET = 4; // events offset in pollfd
static short SIZE_POLLFD = 8; // sizeof pollfd struct
private int size; // Size of the pollArray
PollArrayWrapper(int newSize) {
int allocationSize = newSize * SIZE_POLLFD;
pollArray = new AllocatedNativeObject(allocationSize, true);
pollArrayAddress = pollArray.address();
this.size = newSize;
}
...
// Access methods for fd structures
void putDescriptor(int i, int fd) {
pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);
}
void putEventOps(int i, int event) {
pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event);
}
...
// Adds Windows wakeup socket at a given index.
void addWakeupSocket(int fdVal, int index) {
putDescriptor(index, fdVal);
putEventOps(index, Net.POLLIN);
}
}
複製代碼
At the end of the code, it made POLLIN
used as a value for EventOps
in pollArray
. It operated directly in memory within unsafe. It means it wrote the value of Net.POLLIN
into offset SIZE_POLLFD * i + EVENT_OFFSET
in the memory address of pollArray
. putDescriptor
also has similar operations. When sink
get some data need to write, and File descriptor wakeupSourceFd
related to source
will be ready.
//java.base/windows/native/libnio/ch/nio_util.h
/* WSAPoll()/WSAPOLLFD and the corresponding constants are only defined */
/* in Windows Vista / Windows Server 2008 and later. If we are on an */
/* older release we just use the Solaris constants as this was previously */
/* done in PollArrayWrapper.java. */
#define POLLIN 0x0001
#define POLLOUT 0x0004
#define POLLERR 0x0008
#define POLLHUP 0x0010
#define POLLNVAL 0x0020
#define POLLCONN 0x0002
複製代碼
The parent class of AllicatedNativeObject
has lots of unsafe
operations, these operations are memory-level operations. From its parent class's constructor, we also can clearly see pollArray
use unsafe.allocatedMemory(size+ps)
to allocate memory.
class AllocatedNativeObject // package-private extends NativeObject {
/** * Allocates a memory area of at least {@code size} bytes outside of the * Java heap and creates a native object for that area. */
AllocatedNativeObject(int size, boolean pageAligned) {
super(size, pageAligned);
}
/** * Frees the native memory area associated with this object. */
synchronized void free() {
if (allocationAddress != 0) {
unsafe.freeMemory(allocationAddress);
allocationAddress = 0;
}
}
}
//sun.nio.ch.NativeObject#NativeObject(int, boolean)
protected NativeObject(int size, boolean pageAligned) {
if (!pageAligned) {
this.allocationAddress = unsafe.allocateMemory(size);
this.address = this.allocationAddress;
} else {
int ps = pageSize();
long a = unsafe.allocateMemory(size + ps);
this.allocationAddress = a;
this.address = a + ps - (a & (ps - 1));
}
}
複製代碼
Now, we have finished this part of Selector.open()
. Its main task is to build pipe
and put wakeupSource
in pipe source
to pollArray
. pollArray
plays an very important role for Selector
to complete its work. This part focused on implementation in Windows mainly. For Windows, it implement pipe
by using 2 connected socketChannel
. For Linux, it only need to invoke pipe
in System.
So What is registration? It means we put a object into a container field on the class we desired. This field can be a array, queue, set or list. The same is true here, but a little difference is it needs a return value. So we just need to put this into a collection and return as object.
//sun.nio.ch.SelectorImpl#register
@Override
protected final SelectionKey register(AbstractSelectableChannel ch, int ops, Object attachment) {
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);
// register (if needed) before adding to key set
implRegister(k);
// add to the selector's key set, removing it immediately if the selector
// is closed. The key is not in the channel's key set at this point but
// it may be observed by a thread iterating over the selector's key set.
keys.add(k);
try {
k.interestOps(ops);
} catch (ClosedSelectorException e) {
assert ch.keyFor(this) == null;
keys.remove(k);
k.cancel();
throw e;
}
return k;
}
//sun.nio.ch.WindowsSelectorImpl#implRegister
@Override
protected void implRegister(SelectionKeyImpl ski) {
ensureOpen();
synchronized (updateLock) {
newKeys.addLast(ski);
}
}
複製代碼
This we have seen before, just reviewed here.
Now it will build a SelectionKeyImpl
object first, this object is encapsulation for Channel
. Not only this, but also it absorbed Selector
. Thus, we can get the Selector
object by SelectionKey
.
Then for implRegister
based on Windows
, it will check Selector
is open by ensureOpen()
firstly. Then it will add SelectionKeyImpl
into WIndowsSelectorImpl
to handler the new key in SelectionKey
which just registered and added them into newKeys
. newKeys
is a ArrayDeque
object.
After that, SelectionKeyImpl
will be added into sun.nio.ch.SelectorImpl#keys
. Set<SelectionKey>
represented the set of SelectionKey
which have been registered on the selector
. Just check sun.nio.ch.SelectorImpl
constructor:
//sun.nio.ch.SelectorImpl#SelectorImpl
protected SelectorImpl(SelectorProvider sp) {
super(sp);
keys = ConcurrentHashMap.newKeySet();
selectedKeys = new HashSet<>();
publicKeys = Collections.unmodifiableSet(keys);
publicSelectedKeys = Util.ungrowableSet(selectedKeys);
}
複製代碼
publicKeys
here coms from keys
, but publicKey
is read-only. So if we want to know registered keys on current Selector
, we just need to use sun.nio.ch.SelectorImpl#keys
:
//sun.nio.ch.SelectorImpl#keys
@Override
public final Set<SelectionKey> keys() {
ensureOpen();
return publicKeys;
}
複製代碼
Back to this constructor, selectedKey
which means the keys have been selected, It represented SelectionKey
in Channel
has been ready in previous operation. The set is subset of keys
, it can be got from selector.selectedKeys()
.
//sun.nio.ch.SelectorImpl#selectedKeys
@Override
public final Set<SelectionKey> selectedKeys() {
ensureOpen();
return publicSelectedKeys;
}
複製代碼
We can see it returned publicSelectedKeys
, we can do delete operations for elements in this field, but we can't do any additions.
In previous section, we have talked about cancelation of SelectionKey
. Thus, in java.nio.channels.spi.AbstractSelector
, it has defined a hashset object cancelledKeys
. It represents the SelectionKey
which have been cancelled but haven't been deregistered. This set can't be access directly. It's also a subset belongs to keys()
.
For new instance of Selector
, the above sets are empty. From the source code, we use channel.register
to register SelectionKey
and add them into keys
.
If selectionKey.cancel()
is invoked, then the key will be added into cancelledKeys
, if cancelledKeys
is not empty, it will trigger deregister
of SelectionKey
to release resources in next select
operation of Selector
. Whatever we use channel.close()
or selectionKey.cancel()
, SelectionKey
will be added into cancelledKey
. In every select
operation, key can be added into selectedKey
or removed from cancelledKey
.
Now, we can focus on select
. From api of Selector
, select
has 2 forms. The first ones are select()
, selectNow()
and select(long timeout)
. Another ones are
select(Consumer<SelectionKey> action, long timeout)
,select(Consumer<SelectionKey> action)
,selectNow(Consumer<SelectionKey> action)
. The second forms are new feature APIs in JDK11 and they are used for channels ready for IO operations to do a custom operation for selected key in select operation.
But we need to know, select
in Consumer<SelectionKey> action
is blocked. The thread can be waken up only we select at least channel. This is the same for interruption in thread.
//sun.nio.ch.SelectorImpl
@Override
public final int select(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout");
return lockAndDoSelect(null, (timeout == 0) ? -1 : timeout);
}
//sun.nio.ch.SelectorImpl
@Override
public final int select(Consumer<SelectionKey> action, long timeout) throws IOException {
Objects.requireNonNull(action);
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout");
return lockAndDoSelect(action, (timeout == 0) ? -1 : timeout);
}
//sun.nio.ch.SelectorImpl#lockAndDoSelect
private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout) throws IOException {
synchronized (this) {
ensureOpen();
if (inSelect)
throw new IllegalStateException("select in progress");
inSelect = true;
try {
synchronized (publicSelectedKeys) {
return doSelect(action, timeout);
}
} finally {
inSelect = false;
}
}
}
複製代碼
We can see whatever select
we choose, it will finally invoke lockAndDoSelect
, then execute deSelect(action,timeout)
and it has different implementations on different systems.
We can use sun.nio.ch.WindowsSelectorImpl#doSelect
as example and explore its process:
// sun.nio.ch.WindowsSelectorImpl#doSelect
@Override
protected int doSelect(Consumer<SelectionKey> action, long timeout) throws IOException {
assert Thread.holdsLock(this);
this.timeout = timeout; // set selector timeout
processUpdateQueue(); // <1>
processDeregisterQueue(); // <2>
if (interruptTriggered) {
resetWakeupSocket();
return 0;
}
// Calculate number of helper threads needed for poll. If necessary
// threads are created here and start waiting on startLock
adjustThreadsCount();
finishLock.reset(); // reset finishLock
// Wakeup helper threads, waiting on startLock, so they start polling.
// Redundant threads will exit here after wakeup.
startLock.startThreads();
// do polling in the main thread. Main thread is responsible for
// first MAX_SELECTABLE_FDS entries in pollArray.
try {
begin();
try {
subSelector.poll(); // <3>
} catch (IOException e) {
finishLock.setException(e); // Save this exception
}
// Main thread is out of poll(). Wakeup others and wait for them
if (threads.size() > 0)
finishLock.waitForHelperThreads();
} finally {
end();
}
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
finishLock.checkForException();
processDeregisterQueue(); // <4>
int updated = updateSelectedKeys(action); // <5>
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
resetWakeupSocket(); // <6>
return updated;
}
複製代碼
WindowsSelectorImpl
) we can know, processUpdateQueue
can get the rest of Channel
's current interestOps
although some of them maybe cancelled. This contains new registered SelectionKey
and updateKey
, these keys will be handler by pollWrapper
.For new registered
SelectionKeyImpl
, we need to save file descriptor and relatedEventOps
(initial value is 0) on the memory address ofpollArray
atSIZE_POLLFD * totalChannels + FD_OFFSET
與SIZE_POLLFD * totalChannels + EVENT_OFFSET
separately.
For
updateKeys
, since it has been saved on the relative position ofpollArray
, we also need to check the key's validation. If it is valid, we just need to writeSelectionImpl
'sinterestOps
which we are operating now topollWrapper
'sEventOps
.
After we do the check of validation for
newKeys
, if it's valid, it will invokegrowIfNeeded()
, it will firstly checkchannelArray.length == totalChannels
, this is a array forSlectionKeyImpl
and initial size is 8. It is convenient forSelector
to handler registeredSelectionKeyImpl
byChannelArray
. By check its array length, if it is same astotalChannels
(initial value is 1), it not only expandschannelArray
's size, but also helppollWrapper
expand its size, which is the main purpose.However, when
totalChannels % MAX_SELECTABLE_FDS == 0
, it will create a new thread to handlerselector
. There's a limit on the number of file descriptors which can be invoked onWindows
and it only can invoke 1024 file descriptors at most. If we need to handler more than 1024 file descriptors, we need to do this in multi-thread. At the same time, we need to usepollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels)
to writefdVal
ofwakeupSourceFd
on the memory address ofpollArray
's offsetSIZE_POLLFD * totalChannels + FD_OFFSET
. Then new thread which we create for helping handler extra file descriptors can useMAX_SELECTABLE_FDS
to get listenedwakeupSourceFd
. It can help wake upSelector
. By the use ofski.setIndex(totalChannels)
to record the index position ofSelectionKeyImpl
for future use.
/** * sun.nio.ch.WindowsSelectorImpl#processUpdateQueue * Process new registrations and changes to the interest ops. */
private void processUpdateQueue() {
assert Thread.holdsLock(this);
synchronized (updateLock) {
SelectionKeyImpl ski;
// new registrations
while ((ski = newKeys.pollFirst()) != null) {
if (ski.isValid()) {
growIfNeeded();
channelArray[totalChannels] = ski;
ski.setIndex(totalChannels);
pollWrapper.putEntry(totalChannels, ski);
totalChannels++;
MapEntry previous = fdMap.put(ski);
assert previous == null;
}
}
// changes to interest ops
while ((ski = updateKeys.pollFirst()) != null) {
int events = ski.translateInterestOps();
int fd = ski.getFDVal();
if (ski.isValid() && fdMap.containsKey(fd)) {
int index = ski.getIndex();
assert index >= 0 && index < totalChannels;
pollWrapper.putEventOps(index, events);
}
}
}
}
//sun.nio.ch.PollArrayWrapper#putEntry
// Prepare another pollfd struct for use.
void putEntry(int index, SelectionKeyImpl ski) {
putDescriptor(index, ski.getFDVal());
putEventOps(index, 0);
}
//sun.nio.ch.WindowsSelectorImpl#growIfNeeded
private void growIfNeeded() {
if (channelArray.length == totalChannels) {
int newSize = totalChannels * 2; // Make a larger array
SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1);
channelArray = temp;
pollWrapper.grow(newSize);
}
if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed
pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels);
totalChannels++;
threadsCount++;
}
}
// Initial capacity of the poll array
private final int INIT_CAP = 8;
// Maximum number of sockets for select().
// Should be INIT_CAP times a power of 2
private static final int MAX_SELECTABLE_FDS = 1024;
// The list of SelectableChannels serviced by this Selector. Every mod
// MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll
// array, where the corresponding entry is occupied by the wakeupSocket
private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];
// The number of valid entries in poll array, including entries occupied
// by wakeup socket handle.
private int totalChannels = 1;
//sun.nio.ch.PollArrayWrapper#grow
// Grows the pollfd array to new size
void grow(int newSize) {
PollArrayWrapper temp = new PollArrayWrapper(newSize);
for (int i = 0; i < size; i++)
replaceEntry(this, i, temp, i);
pollArray.free();
pollArray = temp.pollArray;
this.size = temp.size;
pollArrayAddress = pollArray.address();
}
// Maps file descriptors to their indices in pollArray
private static final class FdMap extends HashMap<Integer, MapEntry> {
static final long serialVersionUID = 0L;
private MapEntry get(int desc) {
return get(Integer.valueOf(desc));
}
private MapEntry put(SelectionKeyImpl ski) {
return put(Integer.valueOf(ski.getFDVal()), new MapEntry(ski));
}
private MapEntry remove(SelectionKeyImpl ski) {
Integer fd = Integer.valueOf(ski.getFDVal());
MapEntry x = get(fd);
if ((x != null) && (x.ski.channel() == ski.channel()))
return remove(fd);
return null;
}
}
// class for fdMap entries
private static final class MapEntry {
final SelectionKeyImpl ski;
long updateCount = 0;
MapEntry(SelectionKeyImpl ski) {
this.ski = ski;
}
}
private final FdMap fdMap = new FdMap();
複製代碼
processDeregisterQueue
at WindowsSelectorImpl#doSelect
<2>.cancelledKeys
at first, then do deregister
for each key
and remove from cancelledKeys
. This will be also removed from keys
and selectedkeys
to release reference and help garbage collection.implDereg
internally, and remove SelectionKeyImpl
which are related to channel
from channelArray
. It will adjust totalChannels
and the number of threads, then remore SelectionKeyImpl
from map
and keys
. After removing SelectionKeyImpl
from Channel
, it will close Channel
.processDeregisterQueue()
should be invoked before and after invoking poll
. This is to make sure, it can clean cancelled keys immediately when poll
is in blocking.Channel
for cancelledKey
whether it's open or deregistered. If it has been deregistered and closed, we should release the related file descriptor and close it./** * sun.nio.ch.SelectorImpl#processDeregisterQueue * Invoked by selection operations to process the cancelled-key set */
protected final void processDeregisterQueue() throws IOException {
assert Thread.holdsLock(this);
assert Thread.holdsLock(publicSelectedKeys);
Set<SelectionKey> cks = cancelledKeys();
synchronized (cks) {
if (!cks.isEmpty()) {
Iterator<SelectionKey> i = cks.iterator();
while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
i.remove();
// remove the key from the selector
implDereg(ski);
selectedKeys.remove(ski);
keys.remove(ski);
// remove from channel's key set
deregister(ski);
SelectableChannel ch = ski.channel();
if (!ch.isOpen() && !ch.isRegistered())
((SelChImpl)ch).kill();
}
}
}
}
//sun.nio.ch.WindowsSelectorImpl#implDereg
@Override
protected void implDereg(SelectionKeyImpl ski) {
assert !ski.isValid();
assert Thread.holdsLock(this);
if (fdMap.remove(ski) != null) {
int i = ski.getIndex();
assert (i >= 0);
if (i != totalChannels - 1) {
// Copy end one over it
SelectionKeyImpl endChannel = channelArray[totalChannels-1];
channelArray[i] = endChannel;
endChannel.setIndex(i);
pollWrapper.replaceEntry(pollWrapper, totalChannels-1, pollWrapper, i);
}
ski.setIndex(-1);
channelArray[totalChannels - 1] = null;
totalChannels--;
if (totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) {
totalChannels--;
threadsCount--; // The last thread has become redundant.
}
}
}
//sun.nio.ch.SocketChannelImpl#kill
@Override
public void kill() throws IOException {
synchronized (stateLock) {
if (state == ST_KILLPENDING) {
state = ST_KILLED;
nd.close(fd);
}
}
}
//C:/Program Files/Java/jdk-11.0.1/lib/src.zip!/java.base/sun/nio/ch/SocketChannelImpl.java:1126
static {
IOUtil.load();
nd = new SocketDispatcher();
}
//sun.nio.ch.SocketDispatcher#close
void close(FileDescriptor fd) throws IOException {
close0(fd);
}
複製代碼
adjustThreadsCount
.totalChannels % MAX_SELECTABLE_FDS == 0
is true, it will open a new thread to handler selector
. This will adjust the number of threads by allocated counts of threads. It means it will adjust the number of threads based on the limitation of max number of file descriptors depend on different Systems.run
in SelectThread
. How to know it, we need to know the condition is while(true)
, then startLock.waitForStart(this)
will control this thread run or wait. If running, then invoke `subSelector.poll(index).poll
finished, and there are several sub threads SelectThread
related to main thread, and current SelectThread
firstly finished poll
, it will invoke finishLock.threadFinished()
to notice main thread. When we invoke run
in newly created thread, now lastRun = 0
. At the first time startup, sun.nio.ch.WindowsSelectorImpl.StartLock#runsCounter
is 0 at the same time, so it will invoke startLock.wait()
to become awaiting.
sun.nio.ch.WindowsSelectorImpl.StartLock
will check the current thread whether it has been discarded, if it is, then returntrue
. Then the checked thread will jump out fromrun
in while loop to finish the thread.
- When invoking
adjustThreadsCount
andclose
inSelector
, it will usesun.nio.ch.WindowsSelectorImpl#implClose
, there are related to the release ofSelector
's threads which usesun.nio.ch.WindowsSelectorImpl.SelectThread#makeZombie
.
finishLock.threadFinished()
will usewakeup()
to notice main thread. One detail we can learn from this. If thread is currently blocked on theselect
, then we can usewakeup
to make blockedselect
return value for us. Based on the implementation onWindows
, it just writes data onsink
inpipe
, sourcefd
will become ready andpoll
will return which leadselect
also return. OnSolaris
orLinux
, they takepipe
to build connection by System. By the implementation ofwakeup
we also can know, it will set the position ofinterruptTriggered
. If we invokewakeup
for many times, it's same as one time. It won't cause any bugs happen.
//sun.nio.ch.WindowsSelectorImpl#adjustThreadsCount
// After some channels registered/deregistered, the number of required
// helper threads may have changed. Adjust this number.
private void adjustThreadsCount() {
if (threadsCount > threads.size()) {
// More threads needed. Start more threads.
for (int i = threads.size(); i < threadsCount; i++) {
SelectThread newThread = new SelectThread(i);
threads.add(newThread);
newThread.setDaemon(true);
newThread.start();
}
} else if (threadsCount < threads.size()) {
// Some threads become redundant. Remove them from the threads List.
for (int i = threads.size() - 1 ; i >= threadsCount; i--)
threads.remove(i).makeZombie();
}
}
//sun.nio.ch.WindowsSelectorImpl.SelectThread
// Represents a helper thread used for select.
private final class SelectThread extends Thread {
private final int index; // index of this thread
final SubSelector subSelector;
private long lastRun = 0; // last run number
private volatile boolean zombie;
// Creates a new thread
private SelectThread(int i) {
super(null, null, "SelectorHelper", 0, false);
this.index = i;
this.subSelector = new SubSelector(i);
//make sure we wait for next round of poll
this.lastRun = startLock.runsCounter;
}
void makeZombie() {
zombie = true;
}
boolean isZombie() {
return zombie;
}
public void run() {
while (true) { // poll loop
// wait for the start of poll. If this thread has become
// redundant, then exit.
if (startLock.waitForStart(this))
return;
// call poll()
try {
subSelector.poll(index);
} catch (IOException e) {
// Save this exception and let other threads finish.
finishLock.setException(e);
}
// notify main thread, that this thread has finished, and
// wakeup others, if this thread is the first to finish.
finishLock.threadFinished();
}
}
}
// sun.nio.ch.WindowsSelectorImpl.FinishLock#threadFinished
// Each helper thread invokes this function on finishLock, when
// the thread is done with poll().
private synchronized void threadFinished() {
if (threadsToFinish == threads.size()) { // finished poll() first
// if finished first, wakeup others
wakeup();
}
threadsToFinish--;
if (threadsToFinish == 0) // all helper threads finished poll().
notify(); // notify the main thread
}
//sun.nio.ch.WindowsSelectorImpl#wakeup
@Override
public Selector wakeup() {
synchronized (interruptLock) {
if (!interruptTriggered) {
setWakeupSocket();
interruptTriggered = true;
}
}
return this;
}
//sun.nio.ch.WindowsSelectorImpl#setWakeupSocket
// Sets Windows wakeup socket to a signaled state.
private void setWakeupSocket() {
setWakeupSocket0(wakeupSinkFd);
}
private native void setWakeupSocket0(int wakeupSinkFd);
JNIEXPORT void JNICALL Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this, jint scoutFd) {
/* Write one byte into the pipe */
const char byte = 1;
send(scoutFd, &byte, 1, 0);
}
複製代碼
subSelector.poll()
is the core of select
and implemented by native
function poll0
. It uses pollWrapper.pollArrayAddress
as parameter for poll0
. readFds
,writeFds
and exceptFds
arrays are used to save the result of select
from underlying. The first index of array is used to save the number of sockets, the rest part is used to save fd
of socket
. From the following codes, we can know:
poll0
will listen pollWrapper
's FD
whether there's any data input or output. There will be IO blocking until IO operations happened. Since pollWrapper
also saved ServerSocketChannel
's FD
, so ClientSocket
just need to send data to ServerSocket
, poll0
will return. However, since pollWrapper
also save FD
from write
side in pipe
. If write
send data to FD
, it will also make poll0
return. If both of them don't happen, it will make poll0()
always blocked. If any one of them happened, selector.select
will return. We need to use while (true){}
at run()
in SelectThread
, this can make sure afterselector
receive data and handler, it can still listen poll()
.
So we can know, NIO is still a kind of IO with Block mode. So what's difference between
NIO
andBIO
? Actually, its difference is the position of blocking.BIO
is blocked atread(recvfrom)
, butNIO
is blocked atselect
. So what can we benefit from that? If we just change the position which it will be blocked, there will be no change for that. However, the core to solve this is the implementation ofEpoll
. It takes call back function and just let us know which part of data are ready by listeningsocket
. We just need to handle the data on these threads. If we take this inBIO
and assume there're 1000 connections, then there will be 1000 connections blocked atread
. If we takeNIO
, we just need one thread to handle this. It utilizeselect
's round robin strategy and combine withepoll
and red black tree data structure to lower the cost of performance and reduce the code of switching context.
//sun.nio.ch.WindowsSelectorImpl.SubSelector
private final class SubSelector {
private final int pollArrayIndex; // starting index in pollArray to poll
// These arrays will hold result of native select().
// The first element of each array is the number of selected sockets.
// Other elements are file descriptors of selected sockets.
// 保存發生read的FD
private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
// 保存發生write的FD
private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
//保存發生except的FD
private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];
private SubSelector() {
this.pollArrayIndex = 0; // main thread
}
private SubSelector(int threadIndex) { // helper threads
this.pollArrayIndex = (threadIndex + 1) * MAX_SELECTABLE_FDS;
}
private int poll() throws IOException{ // poll for the main thread
return poll0(pollWrapper.pollArrayAddress,
Math.min(totalChannels, MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout);
}
private int poll(int index) throws IOException {
// poll for helper threads
return poll0(pollWrapper.pollArrayAddress +
(pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
Math.min(MAX_SELECTABLE_FDS,
totalChannels - (index + 1) * MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout);
}
private native int poll0(long pollAddress, int numfds, int[] readFds, int[] writeFds, int[] exceptFds, long timeout);
...
}
複製代碼
updateSelectedKeys(action)
at WindowsSelectorImpl#doSelect
<5> is used to handler every ready channel
.key
in Channel
haven't been added into selectedKeys
, then it will be added into selectedKeys
.key
have been added into selectedKeys
which means ReadyOps
which is supported by this channel
contains at least one operation. This can be checked by (ski.nioReadyOps() & ski.nioInterestOps()) != 0
. Then we need to change ReadyOps
to what we do currently. Consumer<SelectionKey>
is also at this position. In the below codes,any ready information recorded in ReadyOps
will be discarded before invoking action
and set directly.//sun.nio.ch.WindowsSelectorImpl#updateSelectedKeys
private int updateSelectedKeys(Consumer<SelectionKey> action) {
updateCount++;
int numKeysUpdated = 0;
numKeysUpdated += subSelector.processSelectedKeys(updateCount, action);
for (SelectThread t: threads) {
numKeysUpdated += t.subSelector.processSelectedKeys(updateCount, action);
}
return numKeysUpdated;
}
//sun.nio.ch.SelectorImpl#processReadyEvents
protected final int processReadyEvents(int rOps, SelectionKeyImpl ski, Consumer<SelectionKey> action) {
if (action != null) {
ski.translateAndSetReadyOps(rOps);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
action.accept(ski);
ensureOpen();
return 1;
}
} else {
assert Thread.holdsLock(publicSelectedKeys);
if (selectedKeys.contains(ski)) {
if (ski.translateAndUpdateReadyOps(rOps)) {
return 1;
}
} else {
ski.translateAndSetReadyOps(rOps);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
selectedKeys.add(ski);
return 1;
}
}
}
return 0;
}
//sun.nio.ch.WindowsSelectorImpl.SubSelector#processSelectedKeys
private int processSelectedKeys(long updateCount, Consumer<SelectionKey> action) {
int numKeysUpdated = 0;
numKeysUpdated += processFDSet(updateCount, action, readFds,
Net.POLLIN,
false);
numKeysUpdated += processFDSet(updateCount, action, writeFds,
Net.POLLCONN |
Net.POLLOUT,
false);
numKeysUpdated += processFDSet(updateCount, action, exceptFds,
Net.POLLIN |
Net.POLLCONN |
Net.POLLOUT,
true);
return numKeysUpdated;
}
/** * sun.nio.ch.WindowsSelectorImpl.SubSelector#processFDSet * updateCount is used to tell if a key has been counted as updated * in this select operation. * * me.updateCount <= updateCount */
private int processFDSet(long updateCount, Consumer<SelectionKey> action, int[] fds, int rOps, boolean isExceptFds) {
int numKeysUpdated = 0;
for (int i = 1; i <= fds[0]; i++) {
int desc = fds[i];
if (desc == wakeupSourceFd) {
synchronized (interruptLock) {
interruptTriggered = true;
}
continue;
}
MapEntry me = fdMap.get(desc);
// If me is null, the key was deregistered in the previous
// processDeregisterQueue.
if (me == null)
continue;
SelectionKeyImpl sk = me.ski;
// The descriptor may be in the exceptfds set because there is
// OOB data queued to the socket. If there is OOB data then it
// is discarded and the key is not added to the selected set.
if (isExceptFds &&
(sk.channel() instanceof SocketChannelImpl) &&
discardUrgentData(desc))
{
continue;
}
//咱們應該關注的
int updated = processReadyEvents(rOps, sk, action);
if (updated > 0 && me.updateCount != updateCount) {
me.updateCount = updateCount;
numKeysUpdated++;
}
}
return numKeysUpdated;
}
複製代碼
So content for Selector
has been finished. In next article, we will talk about Java NIO Buffer.