This article will introduce the development of NIO, and it will be the basic knowledge of Reactor-Netty
. We have released our videos for Methodology of Java Programming for RxJava
and Reactor Java
.java
PS : Chinese version of this series: juejin.im/post/5c2e23…linux
Methodology of Java Programming(RxJava)git
Youtube: www.youtube.com/playlist?li…github
Bilibili: www.bilibili.com/video/av345…shell
Methodology of Java Programming(Reactor Java)app
Youtube: www.youtube.com/playlist?li…less
Bilibili: www.bilibili.com/video/av353…dom
All versions are based on JDK 11.socket
Followed by the last article From BIO to NIO —— NIO source code interpretation 1
.async
At first, our aim is to enhance or enrich the function of Socket
. Thus, based on this, we need to create environment to meet the requirement to make Channel
own the ability of Socket
. So we define a interface java.nio.channels.NetworkChannel
. Just check the following:
public interface NetworkChannel extends Channel {
NetworkChannel bind(SocketAddress local) throws IOException;
SocketAddress getLocalAddress() throws IOException;
<T> NetworkChannel setOption(SocketOption<T> name, T value) throws IOException;
<T> T getOption(SocketOption<T> name) throws IOException;
Set<SocketOption<?>> supportedOptions();
}
複製代碼
By using bind(SocketAddress)
, we bind Socket
to SocketAddress
, then we can get the bind address with the related socket
by getLocalAddress()
. We can define our own config for socket
by setOption(SocketOption,Object)
and getOption(SocketOption)
.
Then we turn back to focus on java.nio.channels.ServerSocketChannel
and sun.nio.ch.ServerSocketChannelImpl
. Firstly, just check the implementation of bind
:
//sun.nio.ch.ServerSocketChannelImpl#bind
@Override
public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {
synchronized (stateLock) {
ensureOpen();
//Using localAddress to check whether bind has been used
if (localAddress != null)
throw new AlreadyBoundException();
//InetSocketAddress(0) means all addresses which have been bind to local, system will choose suitable sockets
InetSocketAddress isa = (local == null)
? new InetSocketAddress(0)
: Net.checkAddress(local);
SecurityManager sm = System.getSecurityManager();
if (sm != null)
sm.checkListen(isa.getPort());
NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());
Net.bind(fd, isa.getAddress(), isa.getPort());
//Listener started, if backlog in s is smaller than 1. it will default accept 50 connections.
Net.listen(fd, backlog < 1 ? 50 : backlog);
localAddress = Net.localAddress(fd);
}
return this;
}
複製代碼
Then we can see how bind and listen implementation in Net.
//sun.nio.ch.Net#bind(java.io.FileDescriptor, java.net.InetAddress, int)
public static void bind(FileDescriptor fd, InetAddress addr, int port) throws IOException {
bind(UNSPEC, fd, addr, port);
}
static void bind(ProtocolFamily family, FileDescriptor fd, InetAddress addr, int port) throws IOException {
//if protocol isn't ipv4 and it support ipv6, it will use ipv6.
boolean preferIPv6 = isIPv6Available() &&
(family != StandardProtocolFamily.INET);
bind0(fd, preferIPv6, exclusiveBind, addr, port);
}
private static native void bind0(FileDescriptor fd, boolean preferIPv6, boolean useExclBind, InetAddress addr, int port) throws IOException;
複製代碼
bind0
is implementation of native method.
JNIEXPORT void JNICALL Java_sun_nio_ch_Net_bind0(JNIEnv *env, jclass clazz, jobject fdo, jboolean preferIPv6, jboolean useExclBind, jobject iao, int port) {
SOCKETADDRESS sa;
int sa_len = 0;
int rv = 0;
//將java的InetAddress轉換爲c的struct sockaddr
if (NET_InetAddressToSockaddr(env, iao, port, &sa, &sa_len,
preferIPv6) != 0) {
return;//convert fail, return
}
//Using bind:int bind(int sockfd, struct sockaddr* addr, socklen_t addrlen)
rv = NET_Bind(fdval(env, fdo), &sa, sa_len);
if (rv != 0) {
handleSocketError(env, errno);
}
}
複製代碼
Since socket
is the pivot between program and core for communication, it doesn't have any network protocol, port and others. In order to solve this, we need to bind socket
to an address.
In general, kernel will help us bind an address atomically. Also, it needs users to do this work by themselves to meet the requirement.
The most typical case is a server need to bind a public address to wait for connections from clients. But for clients, they don't need to invoke bind()
, this will be done by kernel atomically.
But one thing is noticeable, bind is only for bind, when we received a new connection, it will create a new socket
, then server will operate this new Socket
, so we just need to focus on accept
. At sun.nio.ch.ServerSocketChannelImpl#bind
, we can it will use Net.listen(fd, backlog < 1 ? 50 : backlog)
to start listen. If backlog
< 1, it will default receive 50 connections. Now we can check it.
//sun.nio.ch.Net#listen
static native void listen(FileDescriptor fd, int backlog) throws IOException;
複製代碼
Now, Net.listen
is native
method.
JNIEXPORT void JNICALL Java_sun_nio_ch_Net_listen(JNIEnv *env, jclass cl, jobject fdo, jint backlog) {
if (listen(fdval(env, fdo), backlog) < 0)
handleSocketError(env, errno);
}
複製代碼
From the code, we can see at bottom it invoked listen
to implement listen
, listen
is used before accept
after we use bind
. Its original method is int listen(int sockfd, int backlog)
, the return value 0 represents success and -1 is fail.
Then just turn back to ensureOpen()
:
//sun.nio.ch.ServerSocketChannelImpl#ensureOpen
// @throws ClosedChannelException if channel is closed
private void ensureOpen() throws ClosedChannelException {
if (!isOpen())
throw new ClosedChannelException();
}
//java.nio.channels.spi.AbstractInterruptibleChannel#isOpen
public final boolean isOpen() {
return !closed;
}
複製代碼
If Socket
closed, throw ClosedChannelException
. Then we check Net#checkAddress
:
//sun.nio.ch.Net#checkAddress(java.net.SocketAddress)
public static InetSocketAddress checkAddress(SocketAddress sa) {
if (sa == null)//null address
throw new NullPointerException();
//non InetSocketAddress type address
if (!(sa instanceof InetSocketAddress))
throw new UnsupportedAddressTypeException(); // ## needs arg
InetSocketAddress isa = (InetSocketAddress)sa;
//invalid address
if (isa.isUnresolved())
throw new UnresolvedAddressException(); // ## needs arg
InetAddress addr = isa.getAddress();
//non-ipv4 and non-ipv6 address
if (!(addr instanceof Inet4Address || addr instanceof Inet6Address))
throw new IllegalArgumentException("Invalid address type");
return isa;
}
複製代碼
From the above code, we can see bind
will check the status of ServerSocket
whether it is open or closed and bind to address. If it doesn't bind to any address and it is still open, it will check the validation of socketaddress
. Then it will use bind
and listen
in Net
utils to finish binding ServerSocker
's address and start listening actually. If the args is smaller than 1, it will receive 50 connection.
Compared with first article about BIO
, then we turn to see accept()
in ServerSocketChannel
:
//sun.nio.ch.ServerSocketChannelImpl#accept()
@Override
public SocketChannel accept() throws IOException {
acceptLock.lock();
try {
int n = 0;
FileDescriptor newfd = new FileDescriptor();
InetSocketAddress[] isaa = new InetSocketAddress[1];
boolean blocking = isBlocking();
try {
begin(blocking);
do {
n = accept(this.fd, newfd, isaa);
} while (n == IOStatus.INTERRUPTED && isOpen());
} finally {
end(blocking, n > 0);
assert IOStatus.check(n);
}
if (n < 1)
return null;
//For socketchannelimpl,default is blocking mode
// newly accepted socket is initially in blocking mode
IOUtil.configureBlocking(newfd, true);
InetSocketAddress isa = isaa[0];
//Build SocketChannelImpl,It will explain in SocketChannelImpl
SocketChannel sc = new SocketChannelImpl(provider(), newfd, isa);
// check permitted to accept connections from the remote address
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
try {
//check address and port access
sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort());
} catch (SecurityException x) {
sc.close();
throw x;
}
}
//return socketchannelimpl
return sc;
} finally {
acceptLock.unlock();
}
}
複製代碼
Foraccept(this.fd, newfd, isaa)
, invoke accept
to accept the connection received from socket. It has been mentioned that, function will finally invoke int accept(int sockfd, struct sockaddr * addr, socklen_t*addrlen)
:
fd
listen to the awaiting connection in queue of socket
and the socket
isn't marked as Non-blocking
, accept()
will be blocked until connections appears.socket
is marked as Non-blocking
and no awaiting connection in queue, accept()
will return error EAGAIN
or EWOULDBLOCK
.Here, the pair of begin(blocking)
and end(blocking, n>0)
we have mentioned in InterruptibleChannel and interrupted IO
. Now we mentioned again to let us know the application how it works. At this, we focus on the process of waiting connections, this process can appear Interrupted Exception. If this process finished normally, it will execute the following code, it doesn't mean Channel
has been closed. end(blocking, n>0)
the second argument completed
is just determine the status of the awaiting process, we don't think too much to enlarge the scope of functions.
Now we can see other Impl method for NetworkChannel
, we can see supportedOptions
:
//sun.nio.ch.ServerSocketChannelImpl#supportedOptions
@Override
public final Set<SocketOption<?>> supportedOptions() {
return DefaultOptionsHolder.defaultOptions;
}
//sun.nio.ch.ServerSocketChannelImpl.DefaultOptionsHolder
private static class DefaultOptionsHolder {
static final Set<SocketOption<?>> defaultOptions = defaultOptions();
private static Set<SocketOption<?>> defaultOptions() {
HashSet<SocketOption<?>> set = new HashSet<>();
set.add(StandardSocketOptions.SO_RCVBUF);
set.add(StandardSocketOptions.SO_REUSEADDR);
if (Net.isReusePortAvailable()) {
set.add(StandardSocketOptions.SO_REUSEPORT);
}
set.add(StandardSocketOptions.IP_TOS);
set.addAll(ExtendedSocketOptions.options(SOCK_STREAM));
//return HashSet which can't be changed
return Collections.unmodifiableSet(set);
}
}
複製代碼
We can check configs in the above code:
//java.net.StandardSocketOptions
//received buffer size for socket
public static final SocketOption<Integer> SO_RCVBUF =
new StdSocketOption<Integer>("SO_RCVBUF", Integer.class);
//whether reusable address check
public static final SocketOption<Boolean> SO_REUSEADDR =
new StdSocketOption<Boolean>("SO_REUSEADDR", Boolean.class);
//whether reusable port check
public static final SocketOption<Boolean> SO_REUSEPORT =
new StdSocketOption<Boolean>("SO_REUSEPORT", Boolean.class);
//Internet(IP header)'s(ToS)type
public static final SocketOption<Integer> IP_TOS =
new StdSocketOption<Integer>("IP_TOS", Integer.class);
複製代碼
We have known about configs which are supported by above code, we can check the details in setOption
:
//sun.nio.ch.ServerSocketChannelImpl#setOption
@Override
public <T> ServerSocketChannel setOption(SocketOption<T> name, T value) throws IOException {
Objects.requireNonNull(name);
if (!supportedOptions().contains(name))
throw new UnsupportedOperationException("'" + name + "' not supported");
synchronized (stateLock) {
ensureOpen();
if (name == StandardSocketOptions.IP_TOS) {
ProtocolFamily family = Net.isIPv6Available() ?
StandardProtocolFamily.INET6 : StandardProtocolFamily.INET;
Net.setSocketOption(fd, family, name, value);
return this;
}
if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) {
// SO_REUSEADDR emulated when using exclusive bind
isReuseAddress = (Boolean)value;
} else {
// no options that require special handling
Net.setSocketOption(fd, Net.UNSPEC, name, value);
}
return this;
}
}
複製代碼
Now, we can see how supportedOptions().contaions(name)
works. It will first check the supported configs, then do some settings. Based on configs in Socket
, it mainly execute Net.setSocketOption
.
static void setSocketOption(FileDescriptor fd, ProtocolFamily family, SocketOption<?> name, Object value) throws IOException {
if (value == null)
throw new IllegalArgumentException("Invalid option value");
// only simple values supported by this method
Class<?> type = name.type();
if (extendedOptions.isOptionSupported(name)) {
extendedOptions.setOption(fd, name, value);
return;
}
//If it doestn't belong to Integer or Boolean, it will throw AssertionError
if (type != Integer.class && type != Boolean.class)
throw new AssertionError("Should not reach here");
// special handling
if (name == StandardSocketOptions.SO_RCVBUF ||
name == StandardSocketOptions.SO_SNDBUF)
{
//determine the size of receive and send buffer
int i = ((Integer)value).intValue();
if (i < 0)
throw new IllegalArgumentException("Invalid send/receive buffer size");
}
//If there's data in buffer. it will defer closing socket
if (name == StandardSocketOptions.SO_LINGER) {
int i = ((Integer)value).intValue();
if (i < 0)
value = Integer.valueOf(-1);
if (i > 65535)
value = Integer.valueOf(65535);
}
//UDP unicast
if (name == StandardSocketOptions.IP_TOS) {
int i = ((Integer)value).intValue();
if (i < 0 || i > 255)
throw new IllegalArgumentException("Invalid IP_TOS value");
}
//UDP multicast
if (name == StandardSocketOptions.IP_MULTICAST_TTL) {
int i = ((Integer)value).intValue();
if (i < 0 || i > 255)
throw new IllegalArgumentException("Invalid TTL/hop value");
}
// map option name to platform level/name
OptionKey key = SocketOptionRegistry.findOption(name, family);
if (key == null)
throw new AssertionError("Option not found");
int arg;
//convert config
if (type == Integer.class) {
arg = ((Integer)value).intValue();
} else {
boolean b = ((Boolean)value).booleanValue();
arg = (b) ? 1 : 0;
}
boolean mayNeedConversion = (family == UNSPEC);
boolean isIPv6 = (family == StandardProtocolFamily.INET6);
//set file descriptor and other arguments
setIntOption0(fd, mayNeedConversion, key.level(), key.name(), arg, isIPv6);
}
複製代碼
Ok, we can see impl in getOption
:
//sun.nio.ch.ServerSocketChannelImpl#getOption
@Override
@SuppressWarnings("unchecked")
public <T> T getOption(SocketOption<T> name) throws IOException {
Objects.requireNonNull(name);
//If it is unsupported options, just throw UnsupportedOperationException
if (!supportedOptions().contains(name))
throw new UnsupportedOperationException("'" + name + "' not supported");
synchronized (stateLock) {
ensureOpen();
if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) {
// SO_REUSEADDR emulated when using exclusive bind
return (T)Boolean.valueOf(isReuseAddress);
}
//If we can't accept above configs, it will handled by Net
// no options that require special handling
return (T) Net.getSocketOption(fd, Net.UNSPEC, name);
}
}
//sun.nio.ch.Net#getSocketOption
static Object getSocketOption(FileDescriptor fd, ProtocolFamily family, SocketOption<?> name) throws IOException {
Class<?> type = name.type();
if (extendedOptions.isOptionSupported(name)) {
return extendedOptions.getOption(fd, name);
}
//Only support Integer and boolean
// only simple values supported by this method
if (type != Integer.class && type != Boolean.class)
throw new AssertionError("Should not reach here");
// map option name to platform level/name
OptionKey key = SocketOptionRegistry.findOption(name, family);
if (key == null)
throw new AssertionError("Option not found");
boolean mayNeedConversion = (family == UNSPEC);
//get options from file descriptor
int value = getIntOption0(fd, mayNeedConversion, key.level(), key.name());
if (type == Integer.class) {
return Integer.valueOf(value);
} else {
return (value == 0) ? Boolean.FALSE : Boolean.TRUE;
}
}
複製代碼
At the sector of Net.bind
, we have said when we receive a connection. then it will create a Socket
to do some operations. This can be seen in accept
. After we get this. then we new SocketChannelImpl(provider(), newfd, isa)
. So there will be a question. When we use bind
, do we need to bind to a Socket
? So how it do this in BIO
? Now we need to review that.
In the past, there's a setImpl()
when we invoke java.net.ServerSocket#ServerSocket(int, int, java.net.InetAddress)
:
//java.net.ServerSocket
public ServerSocket(int port, int backlog, InetAddress bindAddr) throws IOException {
setImpl();
if (port < 0 || port > 0xFFFF)
throw new IllegalArgumentException(
"Port value out of range: " + port);
if (backlog < 1)
backlog = 50;
try {
bind(new InetSocketAddress(bindAddr, port), backlog);
} catch(SecurityException e) {
close();
throw e;
} catch(IOException e) {
close();
throw e;
}
}
//java.net.ServerSocket#setImpl
private void setImpl() {
if (factory != null) {
impl = factory.createSocketImpl();
checkOldImpl();
} else {
// No need to do a checkOldImpl() here, we know it's an up to date
// SocketImpl!
impl = new SocksSocketImpl();
}
if (impl != null)
impl.setServerSocket(this);
}
複製代碼
However, we should focus on bind(new InetSocketAddress(bindAddr, port), backlog);
:
//java.net.ServerSocket
public void bind(SocketAddress endpoint, int backlog) throws IOException {
if (isClosed())
throw new SocketException("Socket is closed");
if (!oldImpl && isBound())
throw new SocketException("Already bound");
if (endpoint == null)
endpoint = new InetSocketAddress(0);
if (!(endpoint instanceof InetSocketAddress))
throw new IllegalArgumentException("Unsupported address type");
InetSocketAddress epoint = (InetSocketAddress) endpoint;
if (epoint.isUnresolved())
throw new SocketException("Unresolved address");
if (backlog < 1)
backlog = 50;
try {
SecurityManager security = System.getSecurityManager();
if (security != null)
security.checkListen(epoint.getPort());
//Important!!
getImpl().bind(epoint.getAddress(), epoint.getPort());
getImpl().listen(backlog);
bound = true;
} catch(SecurityException e) {
bound = false;
throw e;
} catch(IOException e) {
bound = false;
throw e;
}
}
複製代碼
I have marked important at getImpl
, we can follow the source code to see what it do:
//java.net.ServerSocket#getImpl
SocketImpl getImpl() throws SocketException {
if (!created)
createImpl();
return impl;
}
複製代碼
Since the init value for created
is false
, so it must enter createImpl()
method:
//java.net.ServerSocket#createImpl
void createImpl() throws SocketException {
if (impl == null)
setImpl();
try {
impl.create(true);
created = true;
} catch (IOException e) {
throw new SocketException(e.getMessage());
}
}
複製代碼
Since impl
has been given value. So it will be impl.create(true)
, then created
will be set to true
. It finally reached my conclusion:
//java.net.AbstractPlainSocketImpl#create
protected synchronized void create(boolean stream) throws IOException {
this.stream = stream;
if (!stream) {
ResourceManager.beforeUdpCreate();
// only create the fd after we know we will be able to create the socket
fd = new FileDescriptor();
try {
socketCreate(false);
SocketCleanable.register(fd);
} catch (IOException ioe) {
ResourceManager.afterUdpClose();
fd = null;
throw ioe;
}
} else {
fd = new FileDescriptor();
socketCreate(true);
SocketCleanable.register(fd);
}
if (socket != null)
socket.setCreated();
if (serverSocket != null)
serverSocket.setCreated();
}
複製代碼
We can see socketCreate(true)
, its impl is the following:
@Override
void socketCreate(boolean stream) throws IOException {
if (fd == null)
throw new SocketException("Socket closed");
int newfd = socket0(stream);
fdAccess.set(fd, newfd);
}
複製代碼
We get a file descriptor by socket0(stream)
, Thus, socket
was created and binded. Now, we just turn back to sun.nio.ch.ServerSocketChannelImpl#accept()
. accept()
is used to deal with the following things after we impl channel and get connection. So for server, what does socket do when we bind it? Here, when we use ServerSocketChannel
, we need to base on JDK to use its general method called open()
. This is to lower the complexity during our use.
java.nio.channels.ServerSocketChannel#open
:
//java.nio.channels.ServerSocketChannel#open
public static ServerSocketChannel open() throws IOException {
return SelectorProvider.provider().openServerSocketChannel();
}
//sun.nio.ch.SelectorProviderImpl#openServerSocketChannel
public ServerSocketChannel openServerSocketChannel() throws IOException {
return new ServerSocketChannelImpl(this);
}
//sun.nio.ch.ServerSocketChannelImpl#ServerSocketChannelImpl(SelectorProvider)
ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
super(sp);
this.fd = Net.serverSocket(true);
this.fdVal = IOUtil.fdVal(fd);
}
//sun.nio.ch.Net#serverSocket
static FileDescriptor serverSocket(boolean stream) {
return IOUtil.newFD(socket0(isIPv6Available(), stream, true, fastLoopback));
}
複製代碼
We can see, we just need to new a ServerSocketChannelImpl
object, it means we get a socket
, then we can do bind for that. But we need to know the type we got by ServerSocketChannel#open
is ServerSocketChannel
. When we accept a connection from clients, we should create a Socket
to build a communication between them. Thus, in the sun.nio.ch.ServerSocketChannelImpl#accept()
, we do SocketChannel sc = new SocketChannelImpl(provider(), newfd, isa);
and we get SocketChannel
type Object. As a result of this, we can define our methods such as accept and write for Socket in this class.
For ServerSocketChannel
, we also need to learn about socket()
//sun.nio.ch.ServerSocketChannelImpl#socket
@Override
public ServerSocket socket() {
synchronized (stateLock) {
if (socket == null)
socket = ServerSocketAdaptor.create(this);
return socket;
}
}
複製代碼
We can see socket
is created by ServerSocketAdaptor
. It's a class which is implemented by ServerSocketChannelImpl
. The purpose of it is to adapt our habitat for use of ServerSocket
. Thus, ServerSocketAdaptor
extended ServerSocket
and rewrite its methods in order. So it makes us have more options to write this part of functions.
We have mentioned the impl for java.nio.channels.spi.AbstractInterruptibleChannel#close
in InterruptibleChannel and Imterruptible IO. So we just reviewed it here and try to talk something new:
//java.nio.channels.spi.AbstractInterruptibleChannel#close
public final void close() throws IOException {
synchronized (closeLock) {
if (closed)
return;
closed = true;
implCloseChannel();
}
}
//java.nio.channels.spi.AbstractSelectableChannel#implCloseChannel
protected final void implCloseChannel() throws IOException {
implCloseSelectableChannel();
// clone keys to avoid calling cancel when holding keyLock
SelectionKey[] copyOfKeys = null;
synchronized (keyLock) {
if (keys != null) {
copyOfKeys = keys.clone();
}
}
if (copyOfKeys != null) {
for (SelectionKey k : copyOfKeys) {
if (k != null) {
k.cancel(); // invalidate and adds key to cancelledKey set
}
}
}
}
//sun.nio.ch.ServerSocketChannelImpl#implCloseSelectableChannel
@Override
protected void implCloseSelectableChannel() throws IOException {
assert !isOpen();
boolean interrupted = false;
boolean blocking;
// set state to ST_CLOSING
synchronized (stateLock) {
assert state < ST_CLOSING;
state = ST_CLOSING;
blocking = isBlocking();
}
// wait for any outstanding accept to complete
if (blocking) {
synchronized (stateLock) {
assert state == ST_CLOSING;
long th = thread;
if (th != 0) {
//If local thread isn't null, local socket will close in advance and tell thread to close.
nd.preClose(fd);
NativeThread.signal(th);
// wait for accept operation to end
while (thread != 0) {
try {
stateLock.wait();
} catch (InterruptedException e) {
interrupted = true;
}
}
}
}
} else {
// non-blocking mode: wait for accept to complete
acceptLock.lock();
acceptLock.unlock();
}
// set state to ST_KILLPENDING
synchronized (stateLock) {
assert state == ST_CLOSING;
state = ST_KILLPENDING;
}
// close socket if not registered with Selector
//If it didn't register on selector, it will be directly killed which means close file descriptor.
if (!isRegistered())
kill();
// restore interrupt status
// It approved what we mentioned in last article if we interupt thread, we need to set the status of thread is interrupt.
if (interrupted)
Thread.currentThread().interrupt();
}
@Override
public void kill() throws IOException {
synchronized (stateLock) {
if (state == ST_KILLPENDING) {
state = ST_KILLED;
nd.close(fd);
}
}
}
複製代碼
Since we didn't talk too much about close()
in last article. We more focus on applications on SocketChannel
, and it is more related to exchange data between clients and server. So if the connection breaks, useless Channels will be closed.
sun.nio.ch.ServerSocketChannelImpl#accept()
:
@Override
public SocketChannel accept() throws IOException {
...
// newly accepted socket is initially in blocking mode
IOUtil.configureBlocking(newfd, true);
InetSocketAddress isa = isaa[0];
SocketChannel sc = new SocketChannelImpl(provider(), newfd, isa);
// check permitted to accept connections from the remote address
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
try {
sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort());
} catch (SecurityException x) {
sc.close();
throw x;
}
}
return sc;
} finally {
acceptLock.unlock();
}
}
複製代碼
In above code, it will do validation check for remote address. It there's exceptions happen, it will close SocketChannel
which has been created.
Another meaningful way to use close()
is if we build connections for clients and there's exceptions in connections, we also need to close all Sockets we have created:
//java.nio.channels.SocketChannel#open(java.net.SocketAddress)
public static SocketChannel open(SocketAddress remote) throws IOException {
SocketChannel sc = open();
try {
sc.connect(remote);
} catch (Throwable x) {
try {
sc.close();
} catch (Throwable suppressed) {
x.addSuppressed(suppressed);
}
throw x;
}
assert sc.isConnected();
return sc;
}
複製代碼
Thus, the bytecode ServerSocketChannelImpl
is loading, it will create SocketDispatcher
. We can do I/O operations in different platforms within different local methods based on SocketDispatcher
and also do the same thing in Socket
:
//sun.nio.ch.SocketDispatcher
class SocketDispatcher extends NativeDispatcher {
static {
IOUtil.load();
}
//read
int read(FileDescriptor fd, long address, int len) throws IOException {
return read0(fd, address, len);
}
long readv(FileDescriptor fd, long address, int len) throws IOException {
return readv0(fd, address, len);
}
//write
int write(FileDescriptor fd, long address, int len) throws IOException {
return write0(fd, address, len);
}
long writev(FileDescriptor fd, long address, int len) throws IOException {
return writev0(fd, address, len);
}
//preclose FileDescriptior
void preClose(FileDescriptor fd) throws IOException {
preClose0(fd);
}
//close
void close(FileDescriptor fd) throws IOException {
close0(fd);
}
//-- Native methods
static native int read0(FileDescriptor fd, long address, int len) throws IOException;
static native long readv0(FileDescriptor fd, long address, int len) throws IOException;
static native int write0(FileDescriptor fd, long address, int len) throws IOException;
static native long writev0(FileDescriptor fd, long address, int len) throws IOException;
static native void preClose0(FileDescriptor fd) throws IOException;
static native void close0(FileDescriptor fd) throws IOException;
}
複製代碼
We have seen FileDescriptor
has appeared too much times. Now we will introduce it. FileDescriptor
is used to open files and socket or byte source and receiver. The main application is to create a FileInputStream
or FileOutputStream
to contain it.
PS: Program applications shouldn't create their own file descriptor.
Now we turn to its source code:
public final class FileDescriptor {
private int fd;
private long handle;
private Closeable parent;
private List<Closeable> otherParents;
private boolean closed;
/** * true, if file is opened for appending. */
private boolean append;
static {
initIDs();
}
/** * Close FileDescriptor in not clear condition to clear */
private PhantomCleanable<FileDescriptor> cleanup;
/** * Build a invalid FileDescriptor,fd or handle will set in later */
public FileDescriptor() {
fd = -1;
handle = -1;
}
/** * Used for standard input, output, and error only. * For Windows the corresponding handle is initialized. * For Unix the append mode is cached. * @param fd the raw fd number (0, 1, 2) */
private FileDescriptor(int fd) {
this.fd = fd;
this.handle = getHandle(fd);
this.append = getAppend(fd);
}
...
}
複製代碼
We usually usejava.lang.System.in
,java.lang.System#out
,java.lang.System#err
to output.
public static final FileDescriptor in = new FileDescriptor(0);
public static final FileDescriptor out = new FileDescriptor(1);
public static final FileDescriptor err = new FileDescriptor(2);
複製代碼
We can test the validation of these File descriptors by using the following method:
//java.io.FileDescriptor#valid
public boolean valid() {
return (handle != -1) || (fd != -1);
}
複製代碼
If return value is true, then we can see this file descriptor which represent socket
, I/O operation and network connection is valid. Otherwise, false is invalid.
If someone has interest, just explore source code.
We have learnt about SocketChannel
, we just try to explore its details:
At the same time, we can invoke open
to create socket channel
. But we need to focus:
channel
for socket which has been in exist.SocketChannel
has open but it isn't connected.I/O
operations on socket
which has no connections, it will throw NotYetConnectedException
connect
to connect socket channel
socket channel
will keep alive until it close.socketChannel
by using isConnected()
. SocketChannel
supports non-blocking connection.SocketChannel
can be created firstly, and we use connect()
to connect remote socket
finishConnect
to close connectionisConnectionPending
to check whether there's any connections in pending. SocketChannel
support asynchronous close. It is very similar to asynchronous close in Channel
class.socket
's input stream is closed by a thread and another thread do I/O operation on this socket channel
it will be blocked, then read operation will can't read anything, it will return -1
.socket
's input stream is closed by a thread and another thread do I/O operation on this socket channel
it will be blocked, write operation will be blocked, and blocked program will receive AsynchronousCloseException
.Now we check its detailed impl method.
//java.nio.channels.SocketChannel#open()
public static SocketChannel open() throws IOException {
return SelectorProvider.provider().openSocketChannel();
}
//java.nio.channels.SocketChannel#open(java.net.SocketAddress)
//We don't need to invoke connect again
public static SocketChannel open(SocketAddress remote) throws IOException {
//default is blocking
SocketChannel sc = open();
try {
sc.connect(remote);
} catch (Throwable x) {
try {
sc.close();
} catch (Throwable suppressed) {
x.addSuppressed(suppressed);
}
throw x;
}
assert sc.isConnected();
return sc;
}
//sun.nio.ch.SelectorProviderImpl#openSocketChannel
public SocketChannel openSocketChannel() throws IOException {
return new SocketChannelImpl(this);
}
//sun.nio.ch.SocketChannelImpl#SocketChannelImpl(java.nio.channels.spi.SelectorProvider)
SocketChannelImpl(SelectorProvider sp) throws IOException {
super(sp);
//invoke socket function,true is TCP
this.fd = Net.socket(true);
this.fdVal = IOUtil.fdVal(fd);
}
//sun.nio.ch.Net#socket(boolean)
static FileDescriptor socket(boolean stream) throws IOException {
return socket(UNSPEC, stream);
}
//sun.nio.ch.Net#socket(java.net.ProtocolFamily, boolean)
static FileDescriptor socket(ProtocolFamily family, boolean stream) throws IOException {
boolean preferIPv6 = isIPv6Available() &&
(family != StandardProtocolFamily.INET);
return IOUtil.newFD(socket0(preferIPv6, stream, false, fastLoopback));
}
//sun.nio.ch.IOUtil#newFD
public static FileDescriptor newFD(int i) {
FileDescriptor fd = new FileDescriptor();
setfdVal(fd, i);
return fd;
}
static native void setfdVal(FileDescriptor fd, int value);
複製代碼
Source code for Net.socket(true)
:
JNIEXPORT jint JNICALL
Java_sun_nio_ch_Net_socket0(JNIEnv *env, jclass cl, jboolean preferIPv6,
jboolean stream, jboolean reuse, jboolean ignored)
{
int fd;
//TCP is SOCK_STREAM,UDP is SOCK_DGRAM, and stream=true;
int type = (stream ? SOCK_STREAM : SOCK_DGRAM);
//determine ipv6 or ipv4
int domain = (ipv6_available() && preferIPv6) ? AF_INET6 : AF_INET;
//invoke Linux's socket function,domain is protocol;
//typeis ssl type,protocol is set 0 to represent default transfer protocol
fd = socket(domain, type, 0);
//Error
if (fd < 0) {
return handleSocketError(env, errno);
}
/* Disable IPV6_V6ONLY to ensure dual-socket support */
if (domain == AF_INET6) {
int arg = 0;
//arg=1 set ipv6's socket only receive ipv6 context, arg=0 represent it can receive ipv4 context
if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, (char*)&arg,
sizeof(int)) < 0) {
JNU_ThrowByNameWithLastError(env,
JNU_JAVANETPKG "SocketException",
"Unable to set IPV6_V6ONLY");
close(fd);
return -1;
}
}
//SO_REUSEADDR has 4 applicaitons:
//1. when there's a same local address and port socket1 and its status is awaiting, but socket2 want to use this address and port, your program will need this option
//2.SO_REUSEADDR allow same port to run multiple instances(threads) which is the same server, but binded IP addresses are different
//3.SO_REUSEADDR allow single thread to bind same port to multiple sockets, but binded sockets have different ip addresses.
//4.SO_REUSEADDR allow completely same address and port to bind repeatly. But this is only used for UDP multicast instead of TCP
if (reuse) {
int arg = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&arg,
sizeof(arg)) < 0) {
JNU_ThrowByNameWithLastError(env,
JNU_JAVANETPKG "SocketException",
"Unable to set SO_REUSEADDR");
close(fd);
return -1;
}
}
#if defined(__linux__)
if (type == SOCK_DGRAM) {
int arg = 0;
int level = (domain == AF_INET6) ? IPPROTO_IPV6 : IPPROTO_IP;
if ((setsockopt(fd, level, IP_MULTICAST_ALL, (char*)&arg, sizeof(arg)) < 0) &&
(errno != ENOPROTOOPT)) {
JNU_ThrowByNameWithLastError(env,
JNU_JAVANETPKG "SocketException",
"Unable to set IP_MULTICAST_ALL");
close(fd);
return -1;
}
}
//IPV6_MULTICAST_HOPS used to control multicast size,
// 1 represent local network forward.
//Please see(http://www.ctt.sbras.ru/cgi-bin/www/unix_help/unix-man?ip6+4);
/* By default, Linux uses the route default */
if (domain == AF_INET6 && type == SOCK_DGRAM) {
int arg = 1;
if (setsockopt(fd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &arg,
sizeof(arg)) < 0) {
JNU_ThrowByNameWithLastError(env,
JNU_JAVANETPKG "SocketException",
"Unable to set IPV6_MULTICAST_HOPS");
close(fd);
return -1;
}
}
#endif
return fd;
}
複製代碼
From Linux 3.9, it added new feature SO_REUSEPORT
, it allows multiple sockets can be binded to the completely same address and port. It doesn't care about it's listened or non-listened, also if it was set SP_REUSEPORT
properties whatever TCP or UDP, it will can do this work.
To prevent port_hijacking
, there's a special limit. It should allows all sockets which want to share address and port to have same Effective user id
. It means it prevents one user steal port from another user. Also, kernel use special technique to deal with SO_REUSEPORT socket
accept()
) to give a socket with shared address and port.For example, a simple server can run multiple instances by SO_REUSEPORT socket
, it will impl a simple Load balancing strategy, since kernel has done the dispatch work.
From previous code, when socket
is created successfully, it will invoke IOUTIL.newFD
to create file descriptor. How can we know the socket is readable, writable or errors? Since there're only 3 status which are write, read and error. Since socket is binded to SocketChannel
, then we also can bind FileDescriptor
to this, then we can get its status. Since FileDescriptor
doesn't provide any external method to set fd, but setfdVal
can be impl by local method.
JNIEXPORT void JNICALL Java_sun_nio_ch_IOUtil_setfdVal(JNIEnv *env, jclass clazz, jobject fdo, jint val) {
(*env)->SetIntField(env, fdo, fd_fdID, val);
}
複製代碼
If anyone know something about shell in Linux, we can know shell will use redirect for dealing with exception. It means it will output by its redirected channel. 0 and 1 are pointed to the same channel, it also represent status. So we can operate with its Socket status which to change interest ops
in SelectionKey
. So we need to classify the status of SelectionKey
by input and output. it will help us for IO operations. Now we turn back to open()
in SocketChannel
. In ``SelectorProvider.provider().openSocketChannel(), it returns
SocketChannelImplobject instance. But we didn't see
this.statehas any value in
SocketChannelImpl(SelectorProvider sp), so default value is 0 which is
ST_UNCONNECTED`. At the same time, socket is default blocking.
In general, when we operate in asynchronous, we use open()
without arguments and invoke configureBlocking
to set non-blocking.
source code for connect()
:
//sun.nio.ch.SocketChannelImpl#connect
@Override
public boolean connect(SocketAddress sa) throws IOException {
InetSocketAddress isa = Net.checkAddress(sa);
SecurityManager sm = System.getSecurityManager();
if (sm != null)
sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
InetAddress ia = isa.getAddress();
if (ia.isAnyLocalAddress())
ia = InetAddress.getLocalHost();
try {
readLock.lock();
try {
writeLock.lock();
try {
int n = 0;
boolean blocking = isBlocking();
try {
//support interruptible thread by set current thread Interruptible blocker
beginConnect(blocking, isa);
do {
//invoke connect to impl,if use blocking mode, it will be waiting util it succeed or throw exception
n = Net.connect(fd, ia, isa.getPort());
} while (n == IOStatus.INTERRUPTED && isOpen());
} finally {
endConnect(blocking, (n > 0));
}
assert IOStatus.check(n);
//connection success
return n > 0;
} finally {
writeLock.unlock();
}
} finally {
readLock.unlock();
}
} catch (IOException ioe) {
// connect failed, close the channel
close();
throw SocketExceptions.of(ioe, isa);
}
}
複製代碼
For beginConnect
and endConnect
, they enhanced begin()
and end()
in AbstractInterruptibleChannel
. If its non-blocking channel
, we don't need to care about interruption in connection. It only means it will happen when blocking wait appears.
//sun.nio.ch.SocketChannelImpl#beginConnect
private void beginConnect(boolean blocking, InetSocketAddress isa) throws IOException { //only blocking, it will enter begin
if (blocking) {
// set hook for Thread.interrupt
begin();
}
synchronized (stateLock) {
//default open, except it invoke close
ensureOpen();
//check state of connection
int state = this.state;
if (state == ST_CONNECTED)
throw new AlreadyConnectedException();
if (state == ST_CONNECTIONPENDING)
throw new ConnectionPendingException();
//assert current state, if it isn't connected, give it value to present connecting status
assert state == ST_UNCONNECTED;
//connecting
this.state = ST_CONNECTIONPENDING;
//if it doesn't invoke bind, then it will execute
if (localAddress == null)
NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort());
remoteAddress = isa;
if (blocking) {
// record thread so it can be signalled if needed
readerThread = NativeThread.current();
}
}
}
複製代碼
During the connection, we just need to focus on the connection status:
ST_UNCONNECTED
、ST_CONNECTED
、ST_CONNECTIONPENDING
、ST_CLOSING
、ST_KILLPENDING
、ST_KILLED
, because it's a public status or shared status. So it can be operated by multiple threads. In order to do this, state
is defined as volatile
, When it is changed, it need stateLock
to help synchronized
:
//sun.nio.ch.SocketChannelImpl#endConnect
private void endConnect(boolean blocking, boolean completed) throws IOException {
endRead(blocking, completed);
//when n>0,connection success, status bacome ST_CONNECTED
if (completed) {
synchronized (stateLock) {
if (state == ST_CONNECTIONPENDING) {
localAddress = Net.localAddress(fd);
state = ST_CONNECTED;
}
}
}
}
//sun.nio.ch.SocketChannelImpl#endRead
private void endRead(boolean blocking, boolean completed) throws AsynchronousCloseException { //blocking is true,it will exexute
if (blocking) {
synchronized (stateLock) {
readerThread = 0;
// notify any thread waiting in implCloseSelectableChannel
if (state == ST_CLOSING) {
stateLock.notifyAll();
}
}
//end is paired with begin, when thread is interrupted, throw ClosedByInterruptException
// remove hook for Thread.interrupt
end(completed);
}
}
複製代碼
Now we go to Net.connect(fd, ia, isa.getPort())
:
//sun.nio.ch.Net#connect
static int connect(FileDescriptor fd, InetAddress remote, int remotePort) throws IOException {
return connect(UNSPEC, fd, remote, remotePort);
}
//sun.nio.ch.Net#connect
static int connect(ProtocolFamily family, FileDescriptor fd, InetAddress remote, int remotePort) throws IOException {
boolean preferIPv6 = isIPv6Available() &&
(family != StandardProtocolFamily.INET);
return connect0(preferIPv6, fd, remote, remotePort);
}
複製代碼
This function will finally invoke native method:
JNIEXPORT jint JNICALL Java_sun_nio_ch_Net_connect0(JNIEnv *env, jclass clazz, jboolean preferIPv6, jobject fdo, jobject iao, jint port) {
SOCKETADDRESS sa;
int sa_len = 0;
int rv;
//address will be convereted into struct sockaddr format
if (NET_InetAddressToSockaddr(env, iao, port, &sa, &sa_len, preferIPv6) != 0) {
return IOS_THROWN;
}
//put fd and sockaddr,it will connect to remote server,which is Three-way handshake in TCP
//if set configureBlocking(false), it will be non-blocking, otherwise, it won't be blocked until timeout or throw Exceptions.
rv = connect(fdval(env, fdo), &sa.sa, sa_len);
//0 success,when fail, use errno to get infomation
if (rv != 0) {
//non-blocking, connection hasn't been built(-2)
if (errno == EINPROGRESS) {
return IOS_UNAVAILABLE;
} else if (errno == EINTR) {
//interrupt(-3)
return IOS_INTERRUPTED;
}
return handleSocketError(env, errno);
}
//connection build, tcp connection need time to connect, but local network doesn't need that.
//Non-blocking will return IOS_UNAVAILABLE generally;
return 1;
}
複製代碼
From the above comment, if it's non-blocking and connection hasn't be built, it will return -2. From beignConnect
we can know, current status is ST_CONNECTIONPENDING
. So, when it will become ST_CONNECTED
in non-blocking condition?. Now, we can see sun.nio.ch.SocketChannelImpl#finishConnect
.
//java.net.Socket#Socket
private Socket(SocketAddress address, SocketAddress localAddr, boolean stream) throws IOException {
setImpl();
// backward compatibility
if (address == null)
throw new NullPointerException();
try {
createImpl(stream);
if (localAddr != null)
bind(localAddr);
connect(address);
} catch (IOException | IllegalArgumentException | SecurityException e) {
try {
close();
} catch (IOException ce) {
e.addSuppressed(ce);
}
throw e;
}
}
複製代碼
Here, we can get new instance of sun.nio.ch.SocketAdaptor
by invoking java.nio.channels.SocketChannel#open()
firstly then invoking socket()
.
Now, just check connect
in SocketAdaptor
:
//sun.nio.ch.SocketAdaptor#connect
public void connect(SocketAddress remote) throws IOException {
connect(remote, 0);
}
public void connect(SocketAddress remote, int timeout) throws IOException {
if (remote == null)
throw new IllegalArgumentException("connect: The address can't be null");
if (timeout < 0)
throw new IllegalArgumentException("connect: timeout can't be negative");
synchronized (sc.blockingLock()) {
if (!sc.isBlocking())
throw new IllegalBlockingModeException();
try {
//If there's no setting for timeout, it will wait unitl connection success or Exceptions happen
// no timeout
if (timeout == 0) {
sc.connect(remote);
return;
}
//Timout is set, Socket will be non-blocking
// timed connect
sc.configureBlocking(false);
try {
if (sc.connect(remote))
return;
} finally {
try {
sc.configureBlocking(true);
} catch (ClosedChannelException e) { }
}
long timeoutNanos = NANOSECONDS.convert(timeout, MILLISECONDS);
long to = timeout;
for (;;) {
//By calculating timeout value, it will try to connect infintely during permitted time
//If timeout, just close Socket
long startTime = System.nanoTime();
if (sc.pollConnected(to)) {
boolean connected = sc.finishConnect();
//See below explaination
assert connected;
break;
}
timeoutNanos -= System.nanoTime() - startTime;
if (timeoutNanos <= 0) {
try {
sc.close();
} catch (IOException x) { }
throw new SocketTimeoutException();
}
to = MILLISECONDS.convert(timeoutNanos, NANOSECONDS);
}
} catch (Exception x) {
Net.translateException(x, true);
}
}
}
複製代碼
The Java assert keyword was introduced in Java 1.4, so it’s been around for quite a while. To avoid conflict with assert
in old Java code, **Java won't start assert check when it is executing.**At this time, all assert will be ignored. If we want to open it, just use -enableassertions
or -ea
.
//sun.nio.ch.SocketChannelImpl#finishConnect
@Override
public boolean finishConnect() throws IOException {
try {
readLock.lock();
try {
writeLock.lock();
try {
// no-op if already connected
if (isConnected())
return true;
boolean blocking = isBlocking();
boolean connected = false;
try {
beginFinishConnect(blocking);
int n = 0;
if (blocking) {
do {
n = checkConnect(fd, true);
} while ((n == 0 || n == IOStatus.INTERRUPTED) && isOpen());
} else {
n = checkConnect(fd, false);
}
connected = (n > 0);
} finally {
endFinishConnect(blocking, connected);
}
assert (blocking && connected) ^ !blocking;
return connected;
} finally {
writeLock.unlock();
}
} finally {
readLock.unlock();
}
} catch (IOException ioe) {
// connect failed, close the channel
close();
throw SocketExceptions.of(ioe, remoteAddress);
}
}
//sun.nio.ch.SocketChannelImpl#checkConnect
private static native int checkConnect(FileDescriptor fd, boolean block) throws IOException;
複製代碼
For beginFinishConnect
and endFinishConnect
, they're similar to sun.nio.ch.SocketChannelImpl#beginConnect
and sun.nio.ch.SocketChannelImpl#endConnect
. For the rest, we just need to focus on core logic checkConnect(fd, true)
. It's also a local method:
JNIEXPORT jint JNICALL Java_sun_nio_ch_SocketChannelImpl_checkConnect(JNIEnv *env, jobject this, jobject fdo, jboolean block) {
int error = 0;
socklen_t n = sizeof(int);
//get FileDescriptor's fd
jint fd = fdval(env, fdo);
int result = 0;
struct pollfd poller;
//FileDescriptor
poller.fd = fd;
//request event is writing
poller.events = POLLOUT;
//return event
poller.revents = 0;
//-1->blocking,0->return directly and don't block thread
result = poll(&poller, 1, block ? -1 : 0);
//if result<0, invoke failed
if (result < 0) {
if (errno == EINTR) {
return IOS_INTERRUPTED;
} else {
JNU_ThrowIOExceptionWithLastError(env, "poll failed");
return IOS_THROWN;
}
}
//non-blocking, 0->connecting in pending
if (!block && (result == 0))
return IOS_UNAVAILABLE;
//the number of socket which is ready to write or have exception>0
if (result > 0) {
errno = 0;
result = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &n);
//error
if (result < 0) {
return handleSocketError(env, errno);
//handler error
} else if (error) {
return handleSocketError(env, error);
} else if ((poller.revents & POLLHUP) != 0) {
return handleSocketError(env, ENOTCONN);
}
//socket is ready to write which means connection has been built
// connected
return 1;
}
return 0;
}
複製代碼
The detailed process has been explained in the comment. For blocking and non-blocking, they are responded to sun.nio.ch.SocketChannelImpl#finishConnect
's operations. In another hand, from the source code, it used poll
to check socket
status. Based on this, it will know whether connection is successful or not. Since at non-blocking mode, finishConnect
will return directly.It will use sun.nio.ch.SocketAdaptor#connect
to handle this and check status of connections in loop. However, it's unrecommend in NIO programming. It's just semi-finished. We suggest to register on Selector
. Based on ops=OP_CONECT
to get SelectionKey
, then invoke finishConnect
to complete the connection.
Since finishConnect
will be updated as ST_CONNECTED
, the status will be changed when we do read
and write
. Thus, we must invoke finishConnect
.
Overall, we have introduced Selector
and SelectionKey
which will appear in the future article. We will explain them in details in next article.