This article will introduce the development of NIO, and it will be the basic knowledge of Reactor-Netty
. We have released our videos for Methodology of Java Programming for RxJava
and Reactor Java
PS : Chinese version of this series:…app
Methodology of Java Programming(RxJava)socket
Methodology of Java Programming(Reactor Java)post
Followed by the last article From BIO to NIO series —— BIO source code interpretation
, now we can start to touch NIO
At last article, we can know for Asynchronization with non-Blocking
, what we can do is to create a thread pool to optimize some parts and set a timeout
to improve clients. However, drawbacks are also very obvious.ssr
For example, now we have 2 classrooms A and B. And students in A need to work with student B by one to one. Tasks for every pair is completely different. So consumptions in time are also different. Also, tasks own different value of rewards, so students need to strive for this. In traditional mode which is BIO
, if no one manage this, so tasks will be awaited and no one accept. So we can think this in the relationship between Server
and Client
which means Clients
don't have any data need to send, but they need to tell Server
they are still alive. In the previous example, if there's a new student in Classroom B can do the job of Administer
or Manager
, then students in B can be thought as threads in Server
. So we need a role which is Administer
. Based on this requirement, Selector
appeared. As Administer
, we need to manage the status of students whether wait for jobs, accept data or need to send data. For Selector
, it mainly focused on actions which just do anything based on their own tags. Also, these tags need to be managed. Thus, SelectionKey
emerged as the times require. Then back to the case, we can decorate students and make them own these tags to classify. At the same time, we also can provide a computer to release their hands to improve their efficiency then they can do more. At this way, computers provided can be seen as a role of Buffer
Therefore, there are 3 main roles in NIO
: Buffer
, Chanel
and Selector
. In the following sections, we will gradually introduce them based on their source codes.
From the previous case, we can consider students as Socket
to behave. So Channel
is Decorator
for that. So in the design of Channel
, it need this key word Socket
in its class. Then in the implantation class, it should centre on the functions and properties of Socket
. So, we can see the design of Channel
interface in java.nio.channels.Channel
public interface Channel extends Closeable {
/** * Tells whether or not this channel is open. * * @return {@code true} if, and only if, this channel is open */
public boolean isOpen();
/** * Closes this channel. * * <p> After a channel is closed, any further attempt to invoke I/O * operations upon it will cause a {@link ClosedChannelException} to be * thrown. * * <p> If this channel is already closed then invoking this method has no * effect. * * <p> This method may be invoked at any time. If some other thread has * already invoked it, however, then another invocation will block until * the first invocation is complete, after which it will return without * effect. </p> * * @throws IOException If an I/O error occurs */
public void close() throws IOException;
From the above code, it directly set the functions for the status of Socket
whether it is open and own closable ability. Now we will talk about how CloseChannelException
happens in daily code.
Sometimes, Channel
can be closed or interrupted in asynchronous which is we desired. To implement this, we also need to design the specific interface for that. If a thread is doing IO
operations and another thread can invoke close()
in Channel
. As a result of this,, the blocked thread within IO
operations will receive an exception AsynchronousCloseException
Also, another condition is noticeable. If a thread implemented Channel
and in IO
operations and blocking happened, another thread may invoke blocked thread's interrupt()
method. It leads to Channel
closed. So the blocked thread should receive the exception ClosedByInterruptException
and set the interrupted
status to the blocked thread.
Now, if the status has been set, and Channel
invoked IO
operations on it. Then Channel
will be closed. At the same time, we will receive a exception ClosedByInterruptException
. Its interupted
status still stay the same.
Now we can see the definition of InterruptibleChannel
public interface InterruptibleChannel extends Channel {
/** * Closes this channel. * * <p> Any thread currently blocked in an I/O operation upon this channel * will receive an {@link AsynchronousCloseException}. * * <p> This method otherwise behaves exactly as specified by the {@link * Channel#close Channel} interface. </p> * * @throws IOException If an I/O error occurs */
public void close() throws IOException;
We can see the detailed implementation in java.nio.channels.spi.AbstractInterruptibleChannel
. For the analysis of this class, we will explain in the next article Interruptible Channel and interruptible IO
We have mentioned that Channel
can be used by Selector
,and Selector
will assign tasks by status of Channel
. In order to do this, Channel
should provide a method to register on Selector
and bind to Selector
Therefore, the instance of Channel
should invoke register(Selector sel, int ops, Object att)
. One thing should be noticed, since Selector
manage Channel
based on its status, so this method will return an Object SelectionKey
to represent the status of Channel
on the Selector
. We will explain SelectionKey
in later since it is too long.
public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException {
if ((ops & ~validOps()) != 0)
throw new IllegalArgumentException();
if (!isOpen())
throw new ClosedChannelException();
synchronized (regLock) {
if (isBlocking())
throw new IllegalBlockingModeException();
synchronized (keyLock) {
// re-check if channel has been closed
if (!isOpen())
throw new ClosedChannelException();
SelectionKey k = findKey(sel);
if (k != null) {
} else {
// New registration
k = ((AbstractSelector)sel).register(this, ops, att);
return k;
private void addKey(SelectionKey k) {
assert Thread.holdsLock(keyLock);
int i = 0;
if ((keys != null) && (keyCount < keys.length)) {
// Find empty element of key array
for (i = 0; i < keys.length; i++)
if (keys[i] == null)
} else if (keys == null) {
keys = new SelectionKey[2];
} else {
// Grow key array
int n = keys.length * 2;
SelectionKey[] ks = new SelectionKey[n];
for (i = 0; i < keys.length; i++)
ks[i] = keys[i];
keys = ks;
i = keyCount;
keys[i] = k;
Once the Channel
is registered on the Selector
, Channel
doesn't deregister until Selector
do deregister for that. In another way to think about it, we can cancel SelectionKey
which represent Channel
on the Selector
and it will do the same thing. Thus, we can invoke SelectionKey#cancel()
to cancel Key
in explicit. Then it will deregister when Selector
do next operations.
/** * Cancels this key. * * <p> If this key has not yet been cancelled then it is added to its * selector's cancelled-key set while synchronized on that set. </p> */
public final void cancel() {
// Synchronizing "this" to prevent this key from getting canceled
// multiple times by different threads, which might cause race
// condition between selector's select() and channel's close().
synchronized (this) {
if (valid) {
valid = false;
//It still invoke Selector's cancel()
void cancel(SelectionKey k) {
synchronized (cancelledKeys) {
//At next selection, it will deregister keys which need to be cancelled, which is Channel deregister.
public final int select(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout");
return lockAndDoSelect(null, (timeout == 0) ? -1 : timeout);
private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout) throws IOException {
synchronized (this) {
if (inSelect)
throw new IllegalStateException("select in progress");
inSelect = true;
try {
synchronized (publicSelectedKeys) {
return doSelect(action, timeout);
} finally {
inSelect = false;
protected int doSelect(Consumer<SelectionKey> action, long timeout) throws IOException {
assert Thread.holdsLock(this);
this.timeout = timeout; // set selector timeout
if (interruptTriggered) {
return 0;
/** * * Invoked by selection operations to process the cancelled-key set */
protected final void processDeregisterQueue() throws IOException {
assert Thread.holdsLock(this);
assert Thread.holdsLock(publicSelectedKeys);
Set<SelectionKey> cks = cancelledKeys();
synchronized (cks) {
if (!cks.isEmpty()) {
Iterator<SelectionKey> i = cks.iterator();
while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl);
// remove the key from the selector
// remove from channel's key set
SelectableChannel ch =;
if (!ch.isOpen() && !ch.isRegistered())
At this, when Channel
is close, whatever we invoke Channel#close
or interrupt thread to close Channel
, it will always cancel all keys which represent this Channel
in implicit and it internally invoked k.cancel()
/** * Closes this channel. * * <p> If the channel has already been closed then this method returns * immediately. Otherwise it marks the channel as closed and then invokes * the {@link #implCloseChannel implCloseChannel} method in order to * complete the close operation. </p> * * @throws IOException * If an I/O error occurs */
public final void close() throws IOException {
synchronized (closeLock) {
if (closed)
closed = true;
protected final void implCloseChannel() throws IOException {
// clone keys to avoid calling cancel when holding keyLock
SelectionKey[] copyOfKeys = null;
synchronized (keyLock) {
if (keys != null) {
copyOfKeys = keys.clone();
if (copyOfKeys != null) {
for (SelectionKey k : copyOfKeys) {
if (k != null) {
k.cancel(); // invalidate and adds key to cancelledKey set
If Selector
is closed by itself, then Channel
will also be deregistered. At the same time, all keys represented Channel
will be invalid.
public final void close() throws IOException {
boolean open = selectorOpen.getAndSet(false);
if (!open)
public final void implCloseSelector() throws IOException {
synchronized (this) {
synchronized (publicSelectedKeys) {
// Deregister channels
Iterator<SelectionKey> i = keys.iterator();
while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl);
SelectableChannel selch =;
if (!selch.isOpen() && !selch.isRegistered())
assert selectedKeys.isEmpty() && keys.isEmpty();
In all Ops
which are supported by Channel
. if Channel
can do multiple Ops
, it can't register again on the Selector
when it registered on the specific selector
When we invoke java.nio.channels.spi.AbstractSelectableChannel#register
at the second call, it will only do the change on Ops
instead of register again. Since when we do register, it will produce a completely new SelectionKey
Object. Then we can check whether it is mapped to one or more Selector
which are registered on by invoking java.nio.channels.SelectableChannel#isRegistered
// -- Registration --
public final boolean isRegistered() {
synchronized (keyLock) {
//We can invoke addKey() when we register on Selector, which means keycount will increase when the number of registration increase.
return keyCount != 0;
public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException {
if ((ops & ~validOps()) != 0)
throw new IllegalArgumentException();
if (!isOpen())
throw new ClosedChannelException();
synchronized (regLock) {
//At this, it will do determination. if it is blocked mode, it will return true then throw exceptions.
if (isBlocking())
throw new IllegalBlockingModeException();
synchronized (keyLock) {
// re-check if channel has been closed
if (!isOpen())
throw new ClosedChannelException();
SelectionKey k = findKey(sel);
if (k != null) {
} else {
// New registration
k = ((AbstractSelector)sel).register(this, ops, att);
return k;
Now we can use the following demo as a example:
public NIOServerSelectorThread(int port) {
try {
//Open ServerSocketChannel,it can listen the connections from Clients, it's parent Channel for all connections from clients.
serverSocketChannel =;
//Set Channel in Non-Blocking mode
//Use ServerSocketChannel to create a Server Socket
serverSocket = serverSocketChannel.socket();
//Bind a port for Server Socket
serverSocket.bind(new InetSocketAddress(port));
//Create multiple selector
selector =;
//rigister ServerSocketChannel on Selector and listen accept() event
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("The server is start in port: "+port);
} catch (IOException e) {
Because of length limit, we will talk about the rest part in next.