From BIO to NIO —— NIO source code interpretation 2

Preface

This article will introduce the development of NIO, and it will be the basic knowledge of Reactor-Netty. We have released our videos for Methodology of Java Programming for RxJava and Reactor Java.java

PS : Chinese version of this series: juejin.im/post/5c2e23…linux

Methodology of Java Programming(RxJava)git

Youtube: www.youtube.com/playlist?li…github

Bilibili: www.bilibili.com/video/av345…shell

Methodology of Java Programming(Reactor Java)app

Youtube: www.youtube.com/playlist?li…less

Bilibili: www.bilibili.com/video/av353…dom

All versions are based on JDK 11.socket

Introduction

Followed by the last article From BIO to NIO —— NIO source code interpretation 1.async

Make Channel Implement socket ability

At first, our aim is to enhance or enrich the function of Socket. Thus, based on this, we need to create environment to meet the requirement to make Channel own the ability of Socket. So we define a interface java.nio.channels.NetworkChannel. Just check the following:

public interface NetworkChannel extends Channel {
    NetworkChannel bind(SocketAddress local) throws IOException;

    SocketAddress getLocalAddress() throws IOException;

    <T> NetworkChannel setOption(SocketOption<T> name, T value) throws IOException;

    <T> T getOption(SocketOption<T> name) throws IOException;

    Set<SocketOption<?>> supportedOptions();
}
複製代碼

By using bind(SocketAddress), we bind Socket to SocketAddress, then we can get the bind address with the related socket by getLocalAddress(). We can define our own config for socket by setOption(SocketOption,Object) and getOption(SocketOption).

bind() and accept()

Then we turn back to focus on java.nio.channels.ServerSocketChannel and sun.nio.ch.ServerSocketChannelImpl. Firstly, just check the implementation of bind:

//sun.nio.ch.ServerSocketChannelImpl#bind
@Override
public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {
    synchronized (stateLock) {
        ensureOpen();
        //Using localAddress to check whether bind has been used
        if (localAddress != null)
            throw new AlreadyBoundException();
        //InetSocketAddress(0) means all addresses which have been bind to local, system will choose suitable sockets
        InetSocketAddress isa = (local == null)
                                ? new InetSocketAddress(0)
                                : Net.checkAddress(local);
        SecurityManager sm = System.getSecurityManager();
        if (sm != null)
            sm.checkListen(isa.getPort());
        NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());
        Net.bind(fd, isa.getAddress(), isa.getPort());
        //Listener started, if backlog in s is smaller than 1. it will default accept 50 connections.
        Net.listen(fd, backlog < 1 ? 50 : backlog);
        localAddress = Net.localAddress(fd);
    }
    return this;
}
複製代碼

Then we can see how bind and listen implementation in Net.

Net.bind
//sun.nio.ch.Net#bind(java.io.FileDescriptor, java.net.InetAddress, int)
public static void bind(FileDescriptor fd, InetAddress addr, int port) throws IOException {
        bind(UNSPEC, fd, addr, port);
    }

static void bind(ProtocolFamily family, FileDescriptor fd, InetAddress addr, int port) throws IOException {
    //if protocol isn't ipv4 and it support ipv6, it will use ipv6.
    boolean preferIPv6 = isIPv6Available() &&
        (family != StandardProtocolFamily.INET);
    bind0(fd, preferIPv6, exclusiveBind, addr, port);
}

private static native void bind0(FileDescriptor fd, boolean preferIPv6, boolean useExclBind, InetAddress addr, int port) throws IOException;
複製代碼

bind0 is implementation of native method.

JNIEXPORT void JNICALL Java_sun_nio_ch_Net_bind0(JNIEnv *env, jclass clazz, jobject fdo, jboolean preferIPv6, jboolean useExclBind, jobject iao, int port) {
    SOCKETADDRESS sa;
    int sa_len = 0;
    int rv = 0;
    //將java的InetAddress轉換爲c的struct sockaddr
    if (NET_InetAddressToSockaddr(env, iao, port, &sa, &sa_len,
                                  preferIPv6) != 0) {
        return;//convert fail, return
    }
    //Using bind:int bind(int sockfd, struct sockaddr* addr, socklen_t addrlen) 
    rv = NET_Bind(fdval(env, fdo), &sa, sa_len);
    if (rv != 0) {
        handleSocketError(env, errno);
    }
}
複製代碼

Since socket is the pivot between program and core for communication, it doesn't have any network protocol, port and others. In order to solve this, we need to bind socket to an address.

In general, kernel will help us bind an address atomically. Also, it needs users to do this work by themselves to meet the requirement.

The most typical case is a server need to bind a public address to wait for connections from clients. But for clients, they don't need to invoke bind(), this will be done by kernel atomically.

But one thing is noticeable, bind is only for bind, when we received a new connection, it will create a new socket, then server will operate this new Socket, so we just need to focus on accept. At sun.nio.ch.ServerSocketChannelImpl#bind, we can it will use Net.listen(fd, backlog < 1 ? 50 : backlog) to start listen. If backlog < 1, it will default receive 50 connections. Now we can check it.

Net.listen
//sun.nio.ch.Net#listen
static native void listen(FileDescriptor fd, int backlog) throws IOException;
複製代碼

Now, Net.listenis native method.

JNIEXPORT void JNICALL Java_sun_nio_ch_Net_listen(JNIEnv *env, jclass cl, jobject fdo, jint backlog) {
    if (listen(fdval(env, fdo), backlog) < 0)
        handleSocketError(env, errno);
}
複製代碼

From the code, we can see at bottom it invoked listen to implement listen, listen is used before acceptafter we use bind. Its original method is int listen(int sockfd, int backlog), the return value 0 represents success and -1 is fail.

Then just turn back to ensureOpen():

//sun.nio.ch.ServerSocketChannelImpl#ensureOpen
// @throws ClosedChannelException if channel is closed
private void ensureOpen() throws ClosedChannelException {
    if (!isOpen())
        throw new ClosedChannelException();
}
//java.nio.channels.spi.AbstractInterruptibleChannel#isOpen
public final boolean isOpen() {
        return !closed;
    }
複製代碼

If Socket closed, throw ClosedChannelException. Then we check Net#checkAddress:

//sun.nio.ch.Net#checkAddress(java.net.SocketAddress)
public static InetSocketAddress checkAddress(SocketAddress sa) {
    if (sa == null)//null address
        throw new NullPointerException();
        //non InetSocketAddress type address 
    if (!(sa instanceof InetSocketAddress))
        throw new UnsupportedAddressTypeException(); // ## needs arg
    InetSocketAddress isa = (InetSocketAddress)sa;
    //invalid address
    if (isa.isUnresolved())
        throw new UnresolvedAddressException(); // ## needs arg
    InetAddress addr = isa.getAddress();
        //non-ipv4 and non-ipv6 address 
    if (!(addr instanceof Inet4Address || addr instanceof Inet6Address))
        throw new IllegalArgumentException("Invalid address type");
    return isa;
}
複製代碼

From the above code, we can see bind will check the status of ServerSocket whether it is open or closed and bind to address. If it doesn't bind to any address and it is still open, it will check the validation of socketaddress. Then it will use bind and listen in Net utils to finish binding ServerSocker's address and start listening actually. If the args is smaller than 1, it will receive 50 connection.

accept

Compared with first article about BIO, then we turn to see accept() in ServerSocketChannel:

//sun.nio.ch.ServerSocketChannelImpl#accept()
@Override
public SocketChannel accept() throws IOException {
    acceptLock.lock();
    try {
        int n = 0;
        FileDescriptor newfd = new FileDescriptor();
        InetSocketAddress[] isaa = new InetSocketAddress[1];

        boolean blocking = isBlocking();
        try {
            begin(blocking);
            do {
                n = accept(this.fd, newfd, isaa);
            } while (n == IOStatus.INTERRUPTED && isOpen());
        } finally {
            end(blocking, n > 0);
            assert IOStatus.check(n);
        }

        if (n < 1)
            return null;
        //For socketchannelimpl,default is blocking mode 
        // newly accepted socket is initially in blocking mode
        IOUtil.configureBlocking(newfd, true);

        InetSocketAddress isa = isaa[0];
        //Build SocketChannelImpl,It will explain in SocketChannelImpl
        SocketChannel sc = new SocketChannelImpl(provider(), newfd, isa);

        // check permitted to accept connections from the remote address
        SecurityManager sm = System.getSecurityManager();
        if (sm != null) {
            try {
                //check address and port access
                sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort());
            } catch (SecurityException x) {
                sc.close();
                throw x;
            }
        }
         //return socketchannelimpl 
        return sc;

    } finally {
        acceptLock.unlock();
    }
}
複製代碼

Foraccept(this.fd, newfd, isaa), invoke accept to accept the connection received from socket. It has been mentioned that, function will finally invoke int accept(int sockfd, struct sockaddr * addr, socklen_t*addrlen):

  • If fd listen to the awaiting connection in queue of socket and the socket isn't marked as Non-blocking, accept() will be blocked until connections appears.
  • If socket is marked as Non-blocking and no awaiting connection in queue, accept() will return error EAGAIN or EWOULDBLOCK.

Here, the pair of begin(blocking) and end(blocking, n>0) we have mentioned in InterruptibleChannel and interrupted IO. Now we mentioned again to let us know the application how it works. At this, we focus on the process of waiting connections, this process can appear Interrupted Exception. If this process finished normally, it will execute the following code, it doesn't mean Channel has been closed. end(blocking, n>0) the second argument completed is just determine the status of the awaiting process, we don't think too much to enlarge the scope of functions.

supportedOptions

Now we can see other Impl method for NetworkChannel, we can see supportedOptions:

//sun.nio.ch.ServerSocketChannelImpl#supportedOptions
@Override
public final Set<SocketOption<?>> supportedOptions() {
   return DefaultOptionsHolder.defaultOptions;
}
//sun.nio.ch.ServerSocketChannelImpl.DefaultOptionsHolder
private static class DefaultOptionsHolder {
   static final Set<SocketOption<?>> defaultOptions = defaultOptions();

   private static Set<SocketOption<?>> defaultOptions() {
       HashSet<SocketOption<?>> set = new HashSet<>();
       set.add(StandardSocketOptions.SO_RCVBUF);
       set.add(StandardSocketOptions.SO_REUSEADDR);
       if (Net.isReusePortAvailable()) {
           set.add(StandardSocketOptions.SO_REUSEPORT);
       }
       set.add(StandardSocketOptions.IP_TOS);
       set.addAll(ExtendedSocketOptions.options(SOCK_STREAM));
       //return HashSet which can't be changed
       return Collections.unmodifiableSet(set);
   }
}
複製代碼

We can check configs in the above code:

//java.net.StandardSocketOptions
//received buffer size for socket 
public static final SocketOption<Integer> SO_RCVBUF =
        new StdSocketOption<Integer>("SO_RCVBUF", Integer.class);
//whether reusable address check 
public static final SocketOption<Boolean> SO_REUSEADDR =
        new StdSocketOption<Boolean>("SO_REUSEADDR", Boolean.class);
//whether reusable port check
public static final SocketOption<Boolean> SO_REUSEPORT =
        new StdSocketOption<Boolean>("SO_REUSEPORT", Boolean.class);
//Internet(IP header)'s(ToS)type
public static final SocketOption<Integer> IP_TOS =
        new StdSocketOption<Integer>("IP_TOS", Integer.class);
複製代碼

Impl for setOption

We have known about configs which are supported by above code, we can check the details in setOption:

//sun.nio.ch.ServerSocketChannelImpl#setOption
@Override
public <T> ServerSocketChannel setOption(SocketOption<T> name, T value) throws IOException {
    Objects.requireNonNull(name);
    if (!supportedOptions().contains(name))
        throw new UnsupportedOperationException("'" + name + "' not supported");
    synchronized (stateLock) {
        ensureOpen();

        if (name == StandardSocketOptions.IP_TOS) {
            ProtocolFamily family = Net.isIPv6Available() ?
                StandardProtocolFamily.INET6 : StandardProtocolFamily.INET;
            Net.setSocketOption(fd, family, name, value);
            return this;
        }

        if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) {
            // SO_REUSEADDR emulated when using exclusive bind
            isReuseAddress = (Boolean)value;
        } else {
            // no options that require special handling
            Net.setSocketOption(fd, Net.UNSPEC, name, value);
        }
        return this;
    }
}
複製代碼

Now, we can see how supportedOptions().contaions(name) works. It will first check the supported configs, then do some settings. Based on configs in Socket, it mainly execute Net.setSocketOption.

static void setSocketOption(FileDescriptor fd, ProtocolFamily family, SocketOption<?> name, Object value) throws IOException {
    if (value == null)
        throw new IllegalArgumentException("Invalid option value");

    // only simple values supported by this method
    Class<?> type = name.type();

    if (extendedOptions.isOptionSupported(name)) {
        extendedOptions.setOption(fd, name, value);
        return;
    }
    //If it doestn't belong to Integer or Boolean, it will throw AssertionError 
    if (type != Integer.class && type != Boolean.class)
        throw new AssertionError("Should not reach here");

    // special handling
    if (name == StandardSocketOptions.SO_RCVBUF ||
        name == StandardSocketOptions.SO_SNDBUF)
    {
        //determine the size of receive and send buffer
        int i = ((Integer)value).intValue();
        if (i < 0)
            throw new IllegalArgumentException("Invalid send/receive buffer size");
    }
        //If there's data in buffer. it will defer closing socket
    if (name == StandardSocketOptions.SO_LINGER) {
        int i = ((Integer)value).intValue();
        if (i < 0)
            value = Integer.valueOf(-1);
        if (i > 65535)
            value = Integer.valueOf(65535);
    }
    //UDP unicast
    if (name == StandardSocketOptions.IP_TOS) {
        int i = ((Integer)value).intValue();
        if (i < 0 || i > 255)
            throw new IllegalArgumentException("Invalid IP_TOS value");
    }
    //UDP multicast 
    if (name == StandardSocketOptions.IP_MULTICAST_TTL) {
        int i = ((Integer)value).intValue();
        if (i < 0 || i > 255)
            throw new IllegalArgumentException("Invalid TTL/hop value");
    }

    // map option name to platform level/name
    OptionKey key = SocketOptionRegistry.findOption(name, family);
    if (key == null)
        throw new AssertionError("Option not found");

    int arg;
    //convert config 
    if (type == Integer.class) {
        arg = ((Integer)value).intValue();
    } else {
        boolean b = ((Boolean)value).booleanValue();
        arg = (b) ? 1 : 0;
    }

    boolean mayNeedConversion = (family == UNSPEC);
    boolean isIPv6 = (family == StandardProtocolFamily.INET6);
    //set file descriptor and other arguments
    setIntOption0(fd, mayNeedConversion, key.level(), key.name(), arg, isIPv6);
}
複製代碼

getOption

Ok, we can see impl in getOption:

//sun.nio.ch.ServerSocketChannelImpl#getOption
@Override
@SuppressWarnings("unchecked")
public <T> T getOption(SocketOption<T> name) throws IOException {
    Objects.requireNonNull(name);
    //If it is unsupported options, just throw UnsupportedOperationException 
    if (!supportedOptions().contains(name))
        throw new UnsupportedOperationException("'" + name + "' not supported");

    synchronized (stateLock) {
        ensureOpen();
        if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) {
            // SO_REUSEADDR emulated when using exclusive bind
            return (T)Boolean.valueOf(isReuseAddress);
        }
        //If we can't accept above configs, it will handled by Net 
        // no options that require special handling
        return (T) Net.getSocketOption(fd, Net.UNSPEC, name);
    }
}
//sun.nio.ch.Net#getSocketOption
static Object getSocketOption(FileDescriptor fd, ProtocolFamily family, SocketOption<?> name) throws IOException {
    Class<?> type = name.type();

    if (extendedOptions.isOptionSupported(name)) {
        return extendedOptions.getOption(fd, name);
    }
    //Only support Integer and boolean 
    // only simple values supported by this method
    if (type != Integer.class && type != Boolean.class)
        throw new AssertionError("Should not reach here");

    // map option name to platform level/name
    OptionKey key = SocketOptionRegistry.findOption(name, family);
    if (key == null)
        throw new AssertionError("Option not found");

    boolean mayNeedConversion = (family == UNSPEC);
    //get options from file descriptor
    int value = getIntOption0(fd, mayNeedConversion, key.level(), key.name());

    if (type == Integer.class) {
        return Integer.valueOf(value);
    } else {
        return (value == 0) ? Boolean.FALSE : Boolean.TRUE;
    }
}
複製代碼

Differences and similarities in bind between ServerSocketChannel and ServerSocket

At the sector of Net.bind, we have said when we receive a connection. then it will create a Socket to do some operations. This can be seen in accept. After we get this. then we new SocketChannelImpl(provider(), newfd, isa). So there will be a question. When we use bind, do we need to bind to a Socket? So how it do this in BIO? Now we need to review that.

In the past, there's a setImpl() when we invoke java.net.ServerSocket#ServerSocket(int, int, java.net.InetAddress):

//java.net.ServerSocket
 public ServerSocket(int port, int backlog, InetAddress bindAddr) throws IOException {
        setImpl();
        if (port < 0 || port > 0xFFFF)
            throw new IllegalArgumentException(
                       "Port value out of range: " + port);
        if (backlog < 1)
          backlog = 50;
        try {
            bind(new InetSocketAddress(bindAddr, port), backlog);
        } catch(SecurityException e) {
            close();
            throw e;
        } catch(IOException e) {
            close();
            throw e;
        }
    }
//java.net.ServerSocket#setImpl
private void setImpl() {
        if (factory != null) {
            impl = factory.createSocketImpl();
            checkOldImpl();
        } else {
            // No need to do a checkOldImpl() here, we know it's an up to date
            // SocketImpl!
            impl = new SocksSocketImpl();
        }
        if (impl != null)
            impl.setServerSocket(this);
    }
複製代碼

However, we should focus on bind(new InetSocketAddress(bindAddr, port), backlog);:

//java.net.ServerSocket
public void bind(SocketAddress endpoint, int backlog) throws IOException {
        if (isClosed())
            throw new SocketException("Socket is closed");
        if (!oldImpl && isBound())
            throw new SocketException("Already bound");
        if (endpoint == null)
            endpoint = new InetSocketAddress(0);
        if (!(endpoint instanceof InetSocketAddress))
            throw new IllegalArgumentException("Unsupported address type");
        InetSocketAddress epoint = (InetSocketAddress) endpoint;
        if (epoint.isUnresolved())
            throw new SocketException("Unresolved address");
        if (backlog < 1)
          backlog = 50;
        try {
            SecurityManager security = System.getSecurityManager();
            if (security != null)
                security.checkListen(epoint.getPort());
                //Important!!
            getImpl().bind(epoint.getAddress(), epoint.getPort());
            getImpl().listen(backlog);
            bound = true;
        } catch(SecurityException e) {
            bound = false;
            throw e;
        } catch(IOException e) {
            bound = false;
            throw e;
        }
    }
複製代碼

I have marked important at getImpl, we can follow the source code to see what it do:

//java.net.ServerSocket#getImpl
SocketImpl getImpl() throws SocketException {
    if (!created)
        createImpl();
    return impl;
}
複製代碼

Since the init value for created is false, so it must enter createImpl() method:

//java.net.ServerSocket#createImpl
void createImpl() throws SocketException {
    if (impl == null)
        setImpl();
    try {
        impl.create(true);
        created = true;
    } catch (IOException e) {
        throw new SocketException(e.getMessage());
    }
}
複製代碼

Since impl has been given value. So it will be impl.create(true), then created will be set to true. It finally reached my conclusion:

//java.net.AbstractPlainSocketImpl#create
protected synchronized void create(boolean stream) throws IOException {
    this.stream = stream;
    if (!stream) {
        ResourceManager.beforeUdpCreate();
        // only create the fd after we know we will be able to create the socket
        fd = new FileDescriptor();
        try {
            socketCreate(false);
            SocketCleanable.register(fd);
        } catch (IOException ioe) {
            ResourceManager.afterUdpClose();
            fd = null;
            throw ioe;
        }
    } else {
        fd = new FileDescriptor();
        socketCreate(true);
        SocketCleanable.register(fd);
    }
    if (socket != null)
        socket.setCreated();
    if (serverSocket != null)
        serverSocket.setCreated();
}
複製代碼

We can see socketCreate(true), its impl is the following:

@Override
void socketCreate(boolean stream) throws IOException {
    if (fd == null)
        throw new SocketException("Socket closed");

    int newfd = socket0(stream);

    fdAccess.set(fd, newfd);
}
複製代碼

We get a file descriptor by socket0(stream), Thus, socket was created and binded. Now, we just turn back to sun.nio.ch.ServerSocketChannelImpl#accept(). accept() is used to deal with the following things after we impl channel and get connection. So for server, what does socket do when we bind it? Here, when we use ServerSocketChannel, we need to base on JDK to use its general method called open(). This is to lower the complexity during our use.

java.nio.channels.ServerSocketChannel#open:

//java.nio.channels.ServerSocketChannel#open
public static ServerSocketChannel open() throws IOException {
    return SelectorProvider.provider().openServerSocketChannel();
}
//sun.nio.ch.SelectorProviderImpl#openServerSocketChannel
public ServerSocketChannel openServerSocketChannel() throws IOException {
    return new ServerSocketChannelImpl(this);
}
//sun.nio.ch.ServerSocketChannelImpl#ServerSocketChannelImpl(SelectorProvider)
ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
    super(sp);
    this.fd =  Net.serverSocket(true);
    this.fdVal = IOUtil.fdVal(fd);
}
//sun.nio.ch.Net#serverSocket
static FileDescriptor serverSocket(boolean stream) {
    return IOUtil.newFD(socket0(isIPv6Available(), stream, true, fastLoopback));
}
複製代碼

We can see, we just need to new a ServerSocketChannelImpl object, it means we get a socket, then we can do bind for that. But we need to know the type we got by ServerSocketChannel#open is ServerSocketChannel. When we accept a connection from clients, we should create a Socket to build a communication between them. Thus, in the sun.nio.ch.ServerSocketChannelImpl#accept(), we do SocketChannel sc = new SocketChannelImpl(provider(), newfd, isa);

and we get SocketChannel type Object. As a result of this, we can define our methods such as accept and write for Socket in this class.

Extension of socket method in ServerSocketChannel

For ServerSocketChannel, we also need to learn about socket()

//sun.nio.ch.ServerSocketChannelImpl#socket
@Override
public ServerSocket socket() {
    synchronized (stateLock) {
        if (socket == null)
            socket = ServerSocketAdaptor.create(this);
        return socket;
    }
}
複製代碼

We can see socket is created by ServerSocketAdaptor. It's a class which is implemented by ServerSocketChannelImpl. The purpose of it is to adapt our habitat for use of ServerSocket. Thus, ServerSocketAdaptor extended ServerSocket and rewrite its methods in order. So it makes us have more options to write this part of functions.

We have mentioned the impl for java.nio.channels.spi.AbstractInterruptibleChannel#close in InterruptibleChannel and Imterruptible IO. So we just reviewed it here and try to talk something new:

//java.nio.channels.spi.AbstractInterruptibleChannel#close
public final void close() throws IOException {
    synchronized (closeLock) {
        if (closed)
            return;
        closed = true;
        implCloseChannel();
    }
}
//java.nio.channels.spi.AbstractSelectableChannel#implCloseChannel
protected final void implCloseChannel() throws IOException {
        implCloseSelectableChannel();

        // clone keys to avoid calling cancel when holding keyLock
        SelectionKey[] copyOfKeys = null;
        synchronized (keyLock) {
            if (keys != null) {
                copyOfKeys = keys.clone();
            }
        }

        if (copyOfKeys != null) {
            for (SelectionKey k : copyOfKeys) {
                if (k != null) {
                    k.cancel();   // invalidate and adds key to cancelledKey set
                }
            }
        }
    }
//sun.nio.ch.ServerSocketChannelImpl#implCloseSelectableChannel
@Override
protected void implCloseSelectableChannel() throws IOException {
    assert !isOpen();

    boolean interrupted = false;
    boolean blocking;

    // set state to ST_CLOSING
    synchronized (stateLock) {
        assert state < ST_CLOSING;
        state = ST_CLOSING;
        blocking = isBlocking();
    }

    // wait for any outstanding accept to complete
    if (blocking) {
        synchronized (stateLock) {
            assert state == ST_CLOSING;
            long th = thread;
            if (th != 0) {
                //If local thread isn't null, local socket will close in advance and tell thread to close.
                nd.preClose(fd);
                NativeThread.signal(th);

                // wait for accept operation to end
                while (thread != 0) {
                    try {
                        stateLock.wait();
                    } catch (InterruptedException e) {
                        interrupted = true;
                    }
                }
            }
        }
    } else {
        // non-blocking mode: wait for accept to complete
        acceptLock.lock();
        acceptLock.unlock();
    }

    // set state to ST_KILLPENDING
    synchronized (stateLock) {
        assert state == ST_CLOSING;
        state = ST_KILLPENDING;
    }

    // close socket if not registered with Selector
    //If it didn't register on selector, it will be directly killed which means close file descriptor. 
    if (!isRegistered())
        kill();

    // restore interrupt status
    // It approved what we mentioned in last article if we interupt thread, we need to set the status of thread is interrupt.
    if (interrupted)
        Thread.currentThread().interrupt();
}

@Override
public void kill() throws IOException {
    synchronized (stateLock) {
        if (state == ST_KILLPENDING) {
            state = ST_KILLED;
            nd.close(fd);
        }
    }
}
複製代碼
close() in channel

Since we didn't talk too much about close() in last article. We more focus on applications on SocketChannel, and it is more related to exchange data between clients and server. So if the connection breaks, useless Channels will be closed.

sun.nio.ch.ServerSocketChannelImpl#accept():

@Override
public SocketChannel accept() throws IOException {
        ...
        // newly accepted socket is initially in blocking mode
        IOUtil.configureBlocking(newfd, true);

        InetSocketAddress isa = isaa[0];
        SocketChannel sc = new SocketChannelImpl(provider(), newfd, isa);

        // check permitted to accept connections from the remote address
        SecurityManager sm = System.getSecurityManager();
        if (sm != null) {
            try {
                sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort());
            } catch (SecurityException x) {
                sc.close();
                throw x;
            }
        }
        return sc;

    } finally {
        acceptLock.unlock();
    }
}
複製代碼

In above code, it will do validation check for remote address. It there's exceptions happen, it will close SocketChannel which has been created.

Another meaningful way to use close() is if we build connections for clients and there's exceptions in connections, we also need to close all Sockets we have created:

//java.nio.channels.SocketChannel#open(java.net.SocketAddress)
public static SocketChannel open(SocketAddress remote) throws IOException {
        SocketChannel sc = open();
        try {
            sc.connect(remote);
        } catch (Throwable x) {
            try {
                sc.close();
            } catch (Throwable suppressed) {
                x.addSuppressed(suppressed);
            }
            throw x;
        }
        assert sc.isConnected();
        return sc;
    }
複製代碼

Thus, the bytecode ServerSocketChannelImpl is loading, it will create SocketDispatcher. We can do I/O operations in different platforms within different local methods based on SocketDispatcher and also do the same thing in Socket:

//sun.nio.ch.SocketDispatcher
class SocketDispatcher extends NativeDispatcher {

    static {
        IOUtil.load();
    }
    //read
    int read(FileDescriptor fd, long address, int len) throws IOException {
        return read0(fd, address, len);
    }

    long readv(FileDescriptor fd, long address, int len) throws IOException {
        return readv0(fd, address, len);
    }
    //write 
    int write(FileDescriptor fd, long address, int len) throws IOException {
        return write0(fd, address, len);
    }

    long writev(FileDescriptor fd, long address, int len) throws IOException {
        return writev0(fd, address, len);
    }
    //preclose FileDescriptior
    void preClose(FileDescriptor fd) throws IOException {
        preClose0(fd);
    }
    //close
    void close(FileDescriptor fd) throws IOException {
        close0(fd);
    }

    //-- Native methods
    static native int read0(FileDescriptor fd, long address, int len) throws IOException;

    static native long readv0(FileDescriptor fd, long address, int len) throws IOException;

    static native int write0(FileDescriptor fd, long address, int len) throws IOException;

    static native long writev0(FileDescriptor fd, long address, int len) throws IOException;

    static native void preClose0(FileDescriptor fd) throws IOException;

    static native void close0(FileDescriptor fd) throws IOException;
}
複製代碼

FileDescriptor

We have seen FileDescriptor has appeared too much times. Now we will introduce it. FileDescriptor is used to open files and socket or byte source and receiver. The main application is to create a FileInputStream or FileOutputStream to contain it.

PS: Program applications shouldn't create their own file descriptor.

Now we turn to its source code:

public final class FileDescriptor {

    private int fd;

    private long handle;

    private Closeable parent;
    private List<Closeable> otherParents;
    private boolean closed;

    /** * true, if file is opened for appending. */
    private boolean append;

    static {
        initIDs();
    }
    /** * Close FileDescriptor in not clear condition to clear */
    private PhantomCleanable<FileDescriptor> cleanup;

    /** * Build a invalid FileDescriptor,fd or handle will set in later */
    public FileDescriptor() {
        fd = -1;
        handle = -1;
    }

    /** * Used for standard input, output, and error only. * For Windows the corresponding handle is initialized. * For Unix the append mode is cached. * @param fd the raw fd number (0, 1, 2) */
    private FileDescriptor(int fd) {
        this.fd = fd;
        this.handle = getHandle(fd);
        this.append = getAppend(fd);
    }
    ...
}
複製代碼

We usually usejava.lang.System.injava.lang.System#outjava.lang.System#err to output.

public static final FileDescriptor in = new FileDescriptor(0);
public static final FileDescriptor out = new FileDescriptor(1);
public static final FileDescriptor err = new FileDescriptor(2);
複製代碼

We can test the validation of these File descriptors by using the following method:

//java.io.FileDescriptor#valid
public boolean valid() {
        return (handle != -1) || (fd != -1);
    }
複製代碼

If return value is true, then we can see this file descriptor which represent socket, I/O operation and network connection is valid. Otherwise, false is invalid.

If someone has interest, just explore source code.

SocketChannel in NIO

We have learnt about SocketChannel, we just try to explore its details:

At the same time, we can invoke open to create socket channel. But we need to focus:

  • We can't create channel for socket which has been in exist.
  • New Created SocketChannel has open but it isn't connected.
  • Try to do I/O operations on socket which has no connections, it will throw NotYetConnectedException
  • It can invoke connect to connect socket channel
  • Once connection is built, socket channel will keep alive until it close.
  • We can check the status of socketChannel by using isConnected(). SocketChannel supports non-blocking connection.
  • SocketChannel can be created firstly, and we use connect() to connect remote socket
  • Invoke finishConnect to close connection
  • We can use isConnectionPending to check whether there's any connections in pending. SocketChannel support asynchronous close. It is very similar to asynchronous close in Channel class.
  • If socket's input stream is closed by a thread and another thread do I/O operation on this socket channel it will be blocked, then read operation will can't read anything, it will return -1.
  • If socket's input stream is closed by a thread and another thread do I/O operation on this socket channel it will be blocked, write operation will be blocked, and blocked program will receive AsynchronousCloseException.

Now we check its detailed impl method.

ServerSocketChannel and its open()

//java.nio.channels.SocketChannel#open()
public static SocketChannel open() throws IOException {
    return SelectorProvider.provider().openSocketChannel();
}
//java.nio.channels.SocketChannel#open(java.net.SocketAddress)
//We don't need to invoke connect again
public static SocketChannel open(SocketAddress remote) throws IOException {
    //default is blocking
    SocketChannel sc = open();
    try {
        sc.connect(remote);
    } catch (Throwable x) {
        try {
            sc.close();
        } catch (Throwable suppressed) {
            x.addSuppressed(suppressed);
        }
        throw x;
    }
    assert sc.isConnected();
    return sc;
}
//sun.nio.ch.SelectorProviderImpl#openSocketChannel
public SocketChannel openSocketChannel() throws IOException {
    return new SocketChannelImpl(this);
}
//sun.nio.ch.SocketChannelImpl#SocketChannelImpl(java.nio.channels.spi.SelectorProvider)
SocketChannelImpl(SelectorProvider sp) throws IOException {
    super(sp);
     //invoke socket function,true is TCP
    this.fd = Net.socket(true);
    this.fdVal = IOUtil.fdVal(fd);
}
//sun.nio.ch.Net#socket(boolean)
static FileDescriptor socket(boolean stream) throws IOException {
    return socket(UNSPEC, stream);
}
//sun.nio.ch.Net#socket(java.net.ProtocolFamily, boolean)
static FileDescriptor socket(ProtocolFamily family, boolean stream) throws IOException {
    boolean preferIPv6 = isIPv6Available() &&
        (family != StandardProtocolFamily.INET);
    return IOUtil.newFD(socket0(preferIPv6, stream, false, fastLoopback));
}
//sun.nio.ch.IOUtil#newFD
public static FileDescriptor newFD(int i) {
    FileDescriptor fd = new FileDescriptor();
    setfdVal(fd, i);
    return fd;
}
static native void setfdVal(FileDescriptor fd, int value);
複製代碼

Source code for Net.socket(true):

JNIEXPORT jint JNICALL
Java_sun_nio_ch_Net_socket0(JNIEnv *env, jclass cl, jboolean preferIPv6,
                            jboolean stream, jboolean reuse, jboolean ignored)
{
    int fd;
    //TCP is SOCK_STREAM,UDP is SOCK_DGRAM, and stream=true;
    int type = (stream ? SOCK_STREAM : SOCK_DGRAM);
    //determine ipv6 or ipv4
    int domain = (ipv6_available() && preferIPv6) ? AF_INET6 : AF_INET;

    //invoke Linux's socket function,domain is protocol;
    //typeis ssl type,protocol is set 0 to represent default transfer protocol
    fd = socket(domain, type, 0);
    //Error
    if (fd < 0) {
        return handleSocketError(env, errno);
    }

    /* Disable IPV6_V6ONLY to ensure dual-socket support */
    if (domain == AF_INET6) {
        int arg = 0;
        //arg=1 set ipv6's socket only receive ipv6 context, arg=0 represent it can receive ipv4 context
        if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, (char*)&arg,
                       sizeof(int)) < 0) {
            JNU_ThrowByNameWithLastError(env,
                                         JNU_JAVANETPKG "SocketException",
                                         "Unable to set IPV6_V6ONLY");
            close(fd);
            return -1;
        }
    }

    //SO_REUSEADDR has 4 applicaitons:
    //1. when there's a same local address and port socket1 and its status is awaiting, but socket2 want to use this address and port, your program will need this option
    //2.SO_REUSEADDR allow same port to run multiple instances(threads) which is the same server, but binded IP addresses are different
    //3.SO_REUSEADDR allow single thread to bind same port to multiple sockets, but binded sockets have different ip addresses.
   //4.SO_REUSEADDR allow completely same address and port to bind repeatly. But this is only used for UDP multicast instead of TCP
    if (reuse) {
        int arg = 1;
        if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&arg,
                       sizeof(arg)) < 0) {
            JNU_ThrowByNameWithLastError(env,
                                         JNU_JAVANETPKG "SocketException",
                                         "Unable to set SO_REUSEADDR");
            close(fd);
            return -1;
        }
    }

#if defined(__linux__)
    if (type == SOCK_DGRAM) {
        int arg = 0;
        int level = (domain == AF_INET6) ? IPPROTO_IPV6 : IPPROTO_IP;
        if ((setsockopt(fd, level, IP_MULTICAST_ALL, (char*)&arg, sizeof(arg)) < 0) &&
            (errno != ENOPROTOOPT)) {
            JNU_ThrowByNameWithLastError(env,
                                         JNU_JAVANETPKG "SocketException",
                                         "Unable to set IP_MULTICAST_ALL");
            close(fd);
            return -1;
        }
    }

     //IPV6_MULTICAST_HOPS used to control multicast size,
     // 1 represent local network forward.
     //Please see(http://www.ctt.sbras.ru/cgi-bin/www/unix_help/unix-man?ip6+4);
    /* By default, Linux uses the route default */
    if (domain == AF_INET6 && type == SOCK_DGRAM) {
        int arg = 1;
        if (setsockopt(fd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &arg,
                       sizeof(arg)) < 0) {
            JNU_ThrowByNameWithLastError(env,
                                         JNU_JAVANETPKG "SocketException",
                                         "Unable to set IPV6_MULTICAST_HOPS");
            close(fd);
            return -1;
        }
    }
#endif
    return fd;
}
複製代碼

From Linux 3.9, it added new feature SO_REUSEPORT, it allows multiple sockets can be binded to the completely same address and port. It doesn't care about it's listened or non-listened, also if it was set SP_REUSEPORT properties whatever TCP or UDP, it will can do this work.

To prevent port_hijacking, there's a special limit. It should allows all sockets which want to share address and port to have same Effective user id. It means it prevents one user steal port from another user. Also, kernel use special technique to deal with SO_REUSEPORT socket

  • For UDP socket, kernel will try to transfer data in average.
  • For TCP which listen to socket, kernel will try to forward new client's connections(which is received by accept()) to give a socket with shared address and port.

For example, a simple server can run multiple instances by SO_REUSEPORT socket, it will impl a simple Load balancing strategy, since kernel has done the dispatch work.

From previous code, when socket is created successfully, it will invoke IOUTIL.newFD to create file descriptor. How can we know the socket is readable, writable or errors? Since there're only 3 status which are write, read and error. Since socket is binded to SocketChannel, then we also can bind FileDescriptor to this, then we can get its status. Since FileDescriptor doesn't provide any external method to set fd, but setfdVal can be impl by local method.

JNIEXPORT void JNICALL Java_sun_nio_ch_IOUtil_setfdVal(JNIEnv *env, jclass clazz, jobject fdo, jint val) {
    (*env)->SetIntField(env, fdo, fd_fdID, val);
}
複製代碼

If anyone know something about shell in Linux, we can know shell will use redirect for dealing with exception. It means it will output by its redirected channel. 0 and 1 are pointed to the same channel, it also represent status. So we can operate with its Socket status which to change interest ops in SelectionKey. So we need to classify the status of SelectionKey by input and output. it will help us for IO operations. Now we turn back to open() in SocketChannel. In ``SelectorProvider.provider().openSocketChannel(), it returnsSocketChannelImplobject instance. But we didn't seethis.statehas any value inSocketChannelImpl(SelectorProvider sp), so default value is 0 which isST_UNCONNECTED`. At the same time, socket is default blocking.

In general, when we operate in asynchronous, we use open() without arguments and invoke configureBlocking to set non-blocking.

connect in SocketChannel

source code for connect():

//sun.nio.ch.SocketChannelImpl#connect
@Override
public boolean connect(SocketAddress sa) throws IOException {
    InetSocketAddress isa = Net.checkAddress(sa);
    SecurityManager sm = System.getSecurityManager();
    if (sm != null)
        sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());

    InetAddress ia = isa.getAddress();
    if (ia.isAnyLocalAddress())
        ia = InetAddress.getLocalHost();

    try {
        readLock.lock();
        try {
            writeLock.lock();
            try {
                int n = 0;
                boolean blocking = isBlocking();
                try {
                    //support interruptible thread by set current thread Interruptible blocker
                    beginConnect(blocking, isa);
                    do {
                    //invoke connect to impl,if use blocking mode, it will be waiting util it succeed or throw exception
                        n = Net.connect(fd, ia, isa.getPort());
                    } while (n == IOStatus.INTERRUPTED && isOpen());
                } finally {
                    endConnect(blocking, (n > 0));
                }
                assert IOStatus.check(n);
                //connection success
                return n > 0;
            } finally {
                writeLock.unlock();
            }
        } finally {
            readLock.unlock();
        }
    } catch (IOException ioe) {
        // connect failed, close the channel
        close();
        throw SocketExceptions.of(ioe, isa);
    }
}
複製代碼

For beginConnect and endConnect, they enhanced begin() and end() in AbstractInterruptibleChannel. If its non-blocking channel, we don't need to care about interruption in connection. It only means it will happen when blocking wait appears.

//sun.nio.ch.SocketChannelImpl#beginConnect
private void beginConnect(boolean blocking, InetSocketAddress isa) throws IOException {   //only blocking, it will enter begin
    if (blocking) {
        // set hook for Thread.interrupt
        
        begin();
    }
    synchronized (stateLock) {
        //default open, except it invoke close
        ensureOpen();
        //check state of connection
        int state = this.state;
        if (state == ST_CONNECTED)
            throw new AlreadyConnectedException();
        if (state == ST_CONNECTIONPENDING)
            throw new ConnectionPendingException();
        //assert current state, if it isn't connected, give it value to present connecting status
        assert state == ST_UNCONNECTED;
        //connecting
        this.state = ST_CONNECTIONPENDING;
        //if it doesn't invoke bind, then it will execute
        if (localAddress == null)
            NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort());
        remoteAddress = isa;

        if (blocking) {
            // record thread so it can be signalled if needed
            readerThread = NativeThread.current();
        }
    }
}
複製代碼

During the connection, we just need to focus on the connection status:

ST_UNCONNECTEDST_CONNECTEDST_CONNECTIONPENDINGST_CLOSINGST_KILLPENDINGST_KILLED, because it's a public status or shared status. So it can be operated by multiple threads. In order to do this, state is defined as volatile, When it is changed, it need stateLock to help synchronized:

//sun.nio.ch.SocketChannelImpl#endConnect
private void endConnect(boolean blocking, boolean completed) throws IOException {
    endRead(blocking, completed);
    //when n>0,connection success, status bacome ST_CONNECTED
    if (completed) {
        synchronized (stateLock) {
            if (state == ST_CONNECTIONPENDING) {
                localAddress = Net.localAddress(fd);
                state = ST_CONNECTED;
            }
        }
    }
}
//sun.nio.ch.SocketChannelImpl#endRead
private void endRead(boolean blocking, boolean completed) throws AsynchronousCloseException {   //blocking is true,it will exexute
    if (blocking) {
        synchronized (stateLock) {
            readerThread = 0;
            // notify any thread waiting in implCloseSelectableChannel
            if (state == ST_CLOSING) {
                stateLock.notifyAll();
            }
        }
        //end is paired with begin, when thread is interrupted, throw ClosedByInterruptException
        // remove hook for Thread.interrupt
        end(completed);
    }
}
複製代碼

Now we go to Net.connect(fd, ia, isa.getPort()):

//sun.nio.ch.Net#connect
static int connect(FileDescriptor fd, InetAddress remote, int remotePort) throws IOException {
    return connect(UNSPEC, fd, remote, remotePort);
}
//sun.nio.ch.Net#connect
static int connect(ProtocolFamily family, FileDescriptor fd, InetAddress remote, int remotePort) throws IOException {
    boolean preferIPv6 = isIPv6Available() &&
        (family != StandardProtocolFamily.INET);
    return connect0(preferIPv6, fd, remote, remotePort);
}
複製代碼

This function will finally invoke native method:

JNIEXPORT jint JNICALL Java_sun_nio_ch_Net_connect0(JNIEnv *env, jclass clazz, jboolean preferIPv6, jobject fdo, jobject iao, jint port) {
    SOCKETADDRESS sa;
    int sa_len = 0;
    int rv;
    //address will be convereted into struct sockaddr format
    if (NET_InetAddressToSockaddr(env, iao, port, &sa, &sa_len, preferIPv6) != 0) {
        return IOS_THROWN;
    }
    //put fd and sockaddr,it will connect to remote server,which is Three-way handshake in TCP
   //if set configureBlocking(false), it will be non-blocking, otherwise, it won't be blocked until timeout or throw Exceptions.
    rv = connect(fdval(env, fdo), &sa.sa, sa_len);
    //0 success,when fail, use errno to get infomation
    if (rv != 0) {
        //non-blocking, connection hasn't been built(-2)
        if (errno == EINPROGRESS) {
            return IOS_UNAVAILABLE;
        } else if (errno == EINTR) {
            //interrupt(-3)
            return IOS_INTERRUPTED;
        }
        return handleSocketError(env, errno);
    }
    //connection build, tcp connection need time to connect, but local network doesn't need that.
    //Non-blocking will return IOS_UNAVAILABLE generally;
    return 1;
}
複製代碼

From the above comment, if it's non-blocking and connection hasn't be built, it will return -2. From beignConnect we can know, current status is ST_CONNECTIONPENDING. So, when it will become ST_CONNECTED in non-blocking condition?. Now, we can see sun.nio.ch.SocketChannelImpl#finishConnect.

finishConnect in SocketChannelImpl

//java.net.Socket#Socket
private Socket(SocketAddress address, SocketAddress localAddr, boolean stream) throws IOException {
    setImpl();

    // backward compatibility
    if (address == null)
        throw new NullPointerException();

    try {
        createImpl(stream);
        if (localAddr != null)
            bind(localAddr);
        connect(address);
    } catch (IOException | IllegalArgumentException | SecurityException e) {
        try {
            close();
        } catch (IOException ce) {
            e.addSuppressed(ce);
        }
        throw e;
    }
}
複製代碼

Here, we can get new instance of sun.nio.ch.SocketAdaptorby invoking java.nio.channels.SocketChannel#open() firstly then invoking socket().

Now, just check connect in SocketAdaptor:

//sun.nio.ch.SocketAdaptor#connect
public void connect(SocketAddress remote) throws IOException {
    connect(remote, 0);
}

public void connect(SocketAddress remote, int timeout) throws IOException {
    if (remote == null)
        throw new IllegalArgumentException("connect: The address can't be null");
    if (timeout < 0)
        throw new IllegalArgumentException("connect: timeout can't be negative");

    synchronized (sc.blockingLock()) {
        if (!sc.isBlocking())
            throw new IllegalBlockingModeException();

        try {
            //If there's no setting for timeout, it will wait unitl connection success or Exceptions happen
            // no timeout
            if (timeout == 0) {
                sc.connect(remote);
                return;
            }
            //Timout is set, Socket will be non-blocking
            // timed connect
            sc.configureBlocking(false);
            try {
                if (sc.connect(remote))
                    return;
            } finally {
                try {
                    sc.configureBlocking(true);
                } catch (ClosedChannelException e) { }
            }

            long timeoutNanos = NANOSECONDS.convert(timeout, MILLISECONDS);
            long to = timeout;
            for (;;) {
                //By calculating timeout value, it will try to connect infintely during permitted time
                //If timeout, just close Socket
                long startTime = System.nanoTime();
                if (sc.pollConnected(to)) {
                    boolean connected = sc.finishConnect();
                    //See below explaination
                    assert connected;
                    break;
                }
                timeoutNanos -= System.nanoTime() - startTime;
                if (timeoutNanos <= 0) {
                    try {
                        sc.close();
                    } catch (IOException x) { }
                    throw new SocketTimeoutException();
                }
                to = MILLISECONDS.convert(timeoutNanos, NANOSECONDS);
            }

        } catch (Exception x) {
            Net.translateException(x, true);
        }
    }

}
複製代碼

The Java assert keyword was introduced in Java 1.4, so it’s been around for quite a while. To avoid conflict with assert in old Java code, **Java won't start assert check when it is executing.**At this time, all assert will be ignored. If we want to open it, just use -enableassertions or -ea.

//sun.nio.ch.SocketChannelImpl#finishConnect
@Override
public boolean finishConnect() throws IOException {
    try {
        readLock.lock();
        try {
            writeLock.lock();
            try {
                // no-op if already connected
                if (isConnected())
                    return true;

                boolean blocking = isBlocking();
                boolean connected = false;
                try {
                    beginFinishConnect(blocking);
                    int n = 0;
                    if (blocking) {
                        do {
                            
                            n = checkConnect(fd, true);
                        } while ((n == 0 || n == IOStatus.INTERRUPTED) && isOpen());
                    } else {
                        
                        n = checkConnect(fd, false);
                    }
                    connected = (n > 0);
                } finally {
                    endFinishConnect(blocking, connected);
                }
                assert (blocking && connected) ^ !blocking;
                return connected;
            } finally {
                writeLock.unlock();
            }
        } finally {
            readLock.unlock();
        }
    } catch (IOException ioe) {
        // connect failed, close the channel
        close();
        throw SocketExceptions.of(ioe, remoteAddress);
    }
}
//sun.nio.ch.SocketChannelImpl#checkConnect
private static native int checkConnect(FileDescriptor fd, boolean block) throws IOException;
複製代碼

For beginFinishConnect and endFinishConnect, they're similar to sun.nio.ch.SocketChannelImpl#beginConnect and sun.nio.ch.SocketChannelImpl#endConnect. For the rest, we just need to focus on core logic checkConnect(fd, true). It's also a local method:

JNIEXPORT jint JNICALL Java_sun_nio_ch_SocketChannelImpl_checkConnect(JNIEnv *env, jobject this, jobject fdo, jboolean block) {
    int error = 0;
    socklen_t n = sizeof(int);
    //get FileDescriptor's fd
    jint fd = fdval(env, fdo);
    int result = 0;
    struct pollfd poller;
    //FileDescriptor
    poller.fd = fd;
    //request event is writing
    poller.events = POLLOUT;
    //return event
    poller.revents = 0;
    
    //-1->blocking,0->return directly and don't block thread
    result = poll(&poller, 1, block ? -1 : 0);
    //if result<0, invoke failed
    if (result < 0) {
        if (errno == EINTR) {
            return IOS_INTERRUPTED;
        } else {
            JNU_ThrowIOExceptionWithLastError(env, "poll failed");
            return IOS_THROWN;
        }
    }
    //non-blocking, 0->connecting in pending
    if (!block && (result == 0))
        return IOS_UNAVAILABLE;
    //the number of socket which is ready to write or have exception>0
    if (result > 0) {
        errno = 0;
        result = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &n);
        //error
        if (result < 0) {
            return handleSocketError(env, errno);
        //handler error
        } else if (error) {
            return handleSocketError(env, error);
        } else if ((poller.revents & POLLHUP) != 0) {
            return handleSocketError(env, ENOTCONN);
        }
        //socket is ready to write which means connection has been built
        // connected
        return 1;
    }
    return 0;
}
複製代碼

The detailed process has been explained in the comment. For blocking and non-blocking, they are responded to sun.nio.ch.SocketChannelImpl#finishConnect's operations. In another hand, from the source code, it used poll to check socket status. Based on this, it will know whether connection is successful or not. Since at non-blocking mode, finishConnect will return directly.It will use sun.nio.ch.SocketAdaptor#connect to handle this and check status of connections in loop. However, it's unrecommend in NIO programming. It's just semi-finished. We suggest to register on Selector. Based on ops=OP_CONECT to get SelectionKey, then invoke finishConnect to complete the connection.

Since finishConnect will be updated as ST_CONNECTED, the status will be changed when we do read and write. Thus, we must invoke finishConnect.

Overall, we have introduced Selector and SelectionKey which will appear in the future article. We will explain them in details in next article.

相關文章
相關標籤/搜索