BIO到NIO源碼的一些事兒之NIO 中

前言

此係列文章會詳細解讀NIO的功能逐步豐滿的路程,爲Reactor-Netty 庫的講解鋪平道路。java

關於Java編程方法論-Reactor與Webflux的視頻分享,已經完成了Rxjava 與 Reactor,b站地址以下:linux

Rxjava源碼解讀與分享:www.bilibili.com/video/av345…git

Reactor源碼解讀與分享:www.bilibili.com/video/av353…github

本系列源碼解讀基於JDK11 api細節可能與其餘版本有所差異,請自行解決jdk版本問題。shell

Channel解讀

接上一篇BIO到NIO源碼的一些事兒之NIO 上編程

賦予Channel支持網絡socket的能力

咱們最初的目的就是爲了加強Socket,基於這個基本需求,沒有條件創造條件,因而爲了讓Channel擁有網絡socket的能力,這裏定義了一個java.nio.channels.NetworkChannel接口。花很少說,咱們來看這個接口的定義:api

public interface NetworkChannel extends Channel {
    NetworkChannel bind(SocketAddress local) throws IOException;

    SocketAddress getLocalAddress() throws IOException;

    <T> NetworkChannel setOption(SocketOption<T> name, T value) throws IOException;

    <T> T getOption(SocketOption<T> name) throws IOException;

    Set<SocketOption<?>> supportedOptions();
}
複製代碼

經過bind(SocketAddress) 方法將socket綁定到本地 SocketAddress上,經過getLocalAddress()方法返回socket綁定的地址, 經過 setOption(SocketOption,Object)getOption(SocketOption)方法設置和查詢socket支持的配置選項。緩存

bind

接下來咱們來看 java.nio.channels.ServerSocketChannel抽象類及其實現類sun.nio.ch.ServerSocketChannelImpl對之實現的細節。 首先咱們來看其對於bind的實現:服務器

//sun.nio.ch.ServerSocketChannelImpl#bind
@Override
public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {
    synchronized (stateLock) {
        ensureOpen();
        //經過localAddress判斷是否已經調用過bind
        if (localAddress != null)
            throw new AlreadyBoundException();
        //InetSocketAddress(0)表示綁定到本機的全部地址,由操做系統選擇合適的端口
        InetSocketAddress isa = (local == null)
                                ? new InetSocketAddress(0)
                                : Net.checkAddress(local);
        SecurityManager sm = System.getSecurityManager();
        if (sm != null)
            sm.checkListen(isa.getPort());
        NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());
        Net.bind(fd, isa.getAddress(), isa.getPort());
        //開啓監聽,s若是參數backlog小於1,默認接受50個鏈接 
        Net.listen(fd, backlog < 1 ? 50 : backlog);
        localAddress = Net.localAddress(fd);
    }
    return this;
}
複製代碼

下面咱們來看看Net中的bind和listen方法是如何實現的。網絡

Net.bind
//sun.nio.ch.Net#bind(java.io.FileDescriptor, java.net.InetAddress, int)
public static void bind(FileDescriptor fd, InetAddress addr, int port) throws IOException {
        bind(UNSPEC, fd, addr, port);
    }

static void bind(ProtocolFamily family, FileDescriptor fd, InetAddress addr, int port) throws IOException {
    //若是傳入的協議域不是IPV4並且支持IPV6,則使用ipv6
    boolean preferIPv6 = isIPv6Available() &&
        (family != StandardProtocolFamily.INET);
    bind0(fd, preferIPv6, exclusiveBind, addr, port);
}

private static native void bind0(FileDescriptor fd, boolean preferIPv6, boolean useExclBind, InetAddress addr, int port) throws IOException;
複製代碼

bind0爲native方法實現:

JNIEXPORT void JNICALL Java_sun_nio_ch_Net_bind0(JNIEnv *env, jclass clazz, jobject fdo, jboolean preferIPv6, jboolean useExclBind, jobject iao, int port) {
    SOCKETADDRESS sa;
    int sa_len = 0;
    int rv = 0;
    //將java的InetAddress轉換爲c的struct sockaddr
    if (NET_InetAddressToSockaddr(env, iao, port, &sa, &sa_len,
                                  preferIPv6) != 0) {
        return;//轉換失敗,方法返回
    }
    //調用bind方法:int bind(int sockfd, struct sockaddr* addr, socklen_t addrlen) 
    rv = NET_Bind(fdval(env, fdo), &sa, sa_len);
    if (rv != 0) {
        handleSocketError(env, errno);
    }
}

複製代碼

socket是用戶程序與內核交互信息的樞紐,它自身沒有網絡協議地址和端口號等信息,在進行網絡通訊的時候,必須把一個socket與一個地址相關聯。 不少時候內核會咱們自動綁定一個地址,然而有時用戶可能須要本身來完成這個綁定的過程,以知足實際應用的須要; 最典型的狀況是一個服務器進程須要綁定一個衆所周知的地址或端口以等待客戶來鏈接。 對於客戶端,不少時候並不須要調用bind方法,而是由內核自動綁定;

這裏要注意,綁定歸綁定,在有鏈接過來的時候會建立一個新的Socket,而後服務端操做這個新的Socket便可。這裏就能夠關注accept方法了。由sun.nio.ch.ServerSocketChannelImpl#bind最後,咱們知道其經過Net.listen(fd, backlog < 1 ? 50 : backlog)開啓監聽,若是參數backlog小於1,默認接受50個鏈接。由此,咱們來關注下Net.listen方法細節。

Net.listen
//sun.nio.ch.Net#listen
static native void listen(FileDescriptor fd, int backlog) throws IOException;
複製代碼

能夠知道,Net.listennative方法,源碼以下:

JNIEXPORT void JNICALL Java_sun_nio_ch_Net_listen(JNIEnv *env, jclass cl, jobject fdo, jint backlog) {
    if (listen(fdval(env, fdo), backlog) < 0)
        handleSocketError(env, errno);
}
複製代碼

能夠看到底層是調用listen實現的,listen函數在通常在調用bind以後到調用accept以前調用,它的函數原型是: int listen(int sockfd, int backlog)返回值:0表示成功, -1表示失敗

咱們再來關注下bind操做中的其餘細節,最開始時的ensureOpen()方法判斷:

//sun.nio.ch.ServerSocketChannelImpl#ensureOpen
// @throws ClosedChannelException if channel is closed
private void ensureOpen() throws ClosedChannelException {
    if (!isOpen())
        throw new ClosedChannelException();
}
//java.nio.channels.spi.AbstractInterruptibleChannel#isOpen
public final boolean isOpen() {
        return !closed;
    }
複製代碼

若是socket關閉,則拋出ClosedChannelException

咱們再來看下Net#checkAddress

//sun.nio.ch.Net#checkAddress(java.net.SocketAddress)
public static InetSocketAddress checkAddress(SocketAddress sa) {
    if (sa == null)//地址爲空 
        throw new NullPointerException();
        //非InetSocketAddress類型地址 
    if (!(sa instanceof InetSocketAddress))
        throw new UnsupportedAddressTypeException(); // ## needs arg
    InetSocketAddress isa = (InetSocketAddress)sa;
    //地址不可識別 
    if (isa.isUnresolved())
        throw new UnresolvedAddressException(); // ## needs arg
    InetAddress addr = isa.getAddress();
        //非ip4和ip6地址 
    if (!(addr instanceof Inet4Address || addr instanceof Inet6Address))
        throw new IllegalArgumentException("Invalid address type");
    return isa;
}
複製代碼

從上面能夠看出,bind首先檢查ServerSocket是否關閉,是否綁定地址, 若是既沒有綁定也沒關閉,則檢查綁定的socketaddress是否正確或合法; 而後經過Net工具類的bindlisten,完成實際的ServerSocket地址綁定和開啓監聽,若是綁定是開啓的參數小於1,則默認接受50個鏈接。

對照咱們以前在第一篇中接觸的BIO,咱們來看些accept()方法的實現:

//sun.nio.ch.ServerSocketChannelImpl#accept()
@Override
public SocketChannel accept() throws IOException {
    acceptLock.lock();
    try {
        int n = 0;
        FileDescriptor newfd = new FileDescriptor();
        InetSocketAddress[] isaa = new InetSocketAddress[1];

        boolean blocking = isBlocking();
        try {
            begin(blocking);
            do {
                n = accept(this.fd, newfd, isaa);
            } while (n == IOStatus.INTERRUPTED && isOpen());
        } finally {
            end(blocking, n > 0);
            assert IOStatus.check(n);
        }

        if (n < 1)
            return null;
        //針對接受鏈接的處理通道socketchannelimpl,默認爲阻塞模式 
        // newly accepted socket is initially in blocking mode
        IOUtil.configureBlocking(newfd, true);

        InetSocketAddress isa = isaa[0];
        //構建SocketChannelImpl,這個具體在SocketChannelImpl再說 
        SocketChannel sc = new SocketChannelImpl(provider(), newfd, isa);

        // check permitted to accept connections from the remote address
        SecurityManager sm = System.getSecurityManager();
        if (sm != null) {
            try {
                //檢查地址和port權限
                sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort());
            } catch (SecurityException x) {
                sc.close();
                throw x;
            }
        }
         //返回socketchannelimpl 
        return sc;

    } finally {
        acceptLock.unlock();
    }
}
複製代碼

對於accept(this.fd, newfd, isaa),調用accept接收socket中已創建的鏈接,咱們以前有在BIO中瞭解過,函數最終會調用:int accept(int sockfd,struct sockaddr *addr, socklen_t *addrlen);

  • 若是fd監聽socket的隊列中沒有等待的鏈接,socket也沒有被標記爲Non-blocking,accept()會阻塞直到鏈接出現;
  • 若是socket被標記爲Non-blocking,隊列中也沒有等待的鏈接,accept()返回錯誤EAGAIN或EWOULDBLOCK

這裏begin(blocking);end(blocking, n > 0);的合做模式咱們在InterruptibleChannel 與可中斷 IO這一篇文章中已經涉及過,這裏再次提一下,讓你們看到其應用,此處專一的是等待鏈接這個過程,期間能夠出現異常打斷,這個過程正常結束的話,就會正常往下執行邏輯,不要搞的好像這個Channel要結束了同樣,end(blocking, n > 0)的第二個參數completed也只是在判斷這個等待過程是否結束而已,不要功能範圍擴大化。

supportedOptions

咱們再來看下NetworkChannel的其餘方法實現,首先來看supportedOptions

//sun.nio.ch.ServerSocketChannelImpl#supportedOptions
@Override
public final Set<SocketOption<?>> supportedOptions() {
    return DefaultOptionsHolder.defaultOptions;
}
//sun.nio.ch.ServerSocketChannelImpl.DefaultOptionsHolder
private static class DefaultOptionsHolder {
    static final Set<SocketOption<?>> defaultOptions = defaultOptions();

    private static Set<SocketOption<?>> defaultOptions() {
        HashSet<SocketOption<?>> set = new HashSet<>();
        set.add(StandardSocketOptions.SO_RCVBUF);
        set.add(StandardSocketOptions.SO_REUSEADDR);
        if (Net.isReusePortAvailable()) {
            set.add(StandardSocketOptions.SO_REUSEPORT);
        }
        set.add(StandardSocketOptions.IP_TOS);
        set.addAll(ExtendedSocketOptions.options(SOCK_STREAM));
        //返回不可修改的HashSet 
        return Collections.unmodifiableSet(set);
    }
}
複製代碼

對上述配置中的一些配置咱們大體來瞅眼:

//java.net.StandardSocketOptions
//socket接受緩存大小 
public static final SocketOption<Integer> SO_RCVBUF =
        new StdSocketOption<Integer>("SO_RCVBUF", Integer.class);
//是否可重用地址 
public static final SocketOption<Boolean> SO_REUSEADDR =
        new StdSocketOption<Boolean>("SO_REUSEADDR", Boolean.class);
//是否可重用port
public static final SocketOption<Boolean> SO_REUSEPORT =
        new StdSocketOption<Boolean>("SO_REUSEPORT", Boolean.class);
//Internet協議(IP)標頭(header)中的服務類型(ToS)。
public static final SocketOption<Integer> IP_TOS =
        new StdSocketOption<Integer>("IP_TOS", Integer.class);
複製代碼

setOption實現

知道了上面的支持配置,咱們來看下setOption實現細節:

//sun.nio.ch.ServerSocketChannelImpl#setOption
@Override
public <T> ServerSocketChannel setOption(SocketOption<T> name, T value) throws IOException {
    Objects.requireNonNull(name);
    if (!supportedOptions().contains(name))
        throw new UnsupportedOperationException("'" + name + "' not supported");
    synchronized (stateLock) {
        ensureOpen();

        if (name == StandardSocketOptions.IP_TOS) {
            ProtocolFamily family = Net.isIPv6Available() ?
                StandardProtocolFamily.INET6 : StandardProtocolFamily.INET;
            Net.setSocketOption(fd, family, name, value);
            return this;
        }

        if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) {
            // SO_REUSEADDR emulated when using exclusive bind
            isReuseAddress = (Boolean)value;
        } else {
            // no options that require special handling
            Net.setSocketOption(fd, Net.UNSPEC, name, value);
        }
        return this;
    }
}
複製代碼

這裏,你們就能看到supportedOptions().contains(name)的做用了,首先會進行支持配置的判斷,而後進行正常的設置邏輯。裏面對於Socket配置設定主要執行了Net.setSocketOption,這裏,就只對其代碼作中文註釋就好,整個邏輯過程沒有太複雜的。

static void setSocketOption(FileDescriptor fd, ProtocolFamily family, SocketOption<?> name, Object value) throws IOException {
    if (value == null)
        throw new IllegalArgumentException("Invalid option value");

    // only simple values supported by this method
    Class<?> type = name.type();

    if (extendedOptions.isOptionSupported(name)) {
        extendedOptions.setOption(fd, name, value);
        return;
    }
    //非整形和布爾型,則拋出斷言錯誤 
    if (type != Integer.class && type != Boolean.class)
        throw new AssertionError("Should not reach here");

    // special handling
    if (name == StandardSocketOptions.SO_RCVBUF ||
        name == StandardSocketOptions.SO_SNDBUF)
    {
        //判斷接受和發送緩衝區大小 
        int i = ((Integer)value).intValue();
        if (i < 0)
            throw new IllegalArgumentException("Invalid send/receive buffer size");
    }
        //緩衝區有數據,延遲關閉socket的的時間 
    if (name == StandardSocketOptions.SO_LINGER) {
        int i = ((Integer)value).intValue();
        if (i < 0)
            value = Integer.valueOf(-1);
        if (i > 65535)
            value = Integer.valueOf(65535);
    }
    //UDP單播 
    if (name == StandardSocketOptions.IP_TOS) {
        int i = ((Integer)value).intValue();
        if (i < 0 || i > 255)
            throw new IllegalArgumentException("Invalid IP_TOS value");
    }
    //UDP多播 
    if (name == StandardSocketOptions.IP_MULTICAST_TTL) {
        int i = ((Integer)value).intValue();
        if (i < 0 || i > 255)
            throw new IllegalArgumentException("Invalid TTL/hop value");
    }

    // map option name to platform level/name
    OptionKey key = SocketOptionRegistry.findOption(name, family);
    if (key == null)
        throw new AssertionError("Option not found");

    int arg;
    //轉換配置參數值 
    if (type == Integer.class) {
        arg = ((Integer)value).intValue();
    } else {
        boolean b = ((Boolean)value).booleanValue();
        arg = (b) ? 1 : 0;
    }

    boolean mayNeedConversion = (family == UNSPEC);
    boolean isIPv6 = (family == StandardProtocolFamily.INET6);
    //設置文件描述符的值及其餘
    setIntOption0(fd, mayNeedConversion, key.level(), key.name(), arg, isIPv6);
}
複製代碼

getOption

接下來,咱們來看getOption實現,源碼以下:

//sun.nio.ch.ServerSocketChannelImpl#getOption
@Override
@SuppressWarnings("unchecked")
public <T> T getOption(SocketOption<T> name) throws IOException {
    Objects.requireNonNull(name);
    //非通道支持選項,則拋出UnsupportedOperationException 
    if (!supportedOptions().contains(name))
        throw new UnsupportedOperationException("'" + name + "' not supported");

    synchronized (stateLock) {
        ensureOpen();
        if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) {
            // SO_REUSEADDR emulated when using exclusive bind
            return (T)Boolean.valueOf(isReuseAddress);
        }
        //假如獲取的不是上面的配置,則委託給Net來處理 
        // no options that require special handling
        return (T) Net.getSocketOption(fd, Net.UNSPEC, name);
    }
}
//sun.nio.ch.Net#getSocketOption
static Object getSocketOption(FileDescriptor fd, ProtocolFamily family, SocketOption<?> name) throws IOException {
    Class<?> type = name.type();

    if (extendedOptions.isOptionSupported(name)) {
        return extendedOptions.getOption(fd, name);
    }
    //只支持整形和布爾型,不然拋出斷言錯誤 
    // only simple values supported by this method
    if (type != Integer.class && type != Boolean.class)
        throw new AssertionError("Should not reach here");

    // map option name to platform level/name
    OptionKey key = SocketOptionRegistry.findOption(name, family);
    if (key == null)
        throw new AssertionError("Option not found");

    boolean mayNeedConversion = (family == UNSPEC);
    //獲取文件描述的選項配置 
    int value = getIntOption0(fd, mayNeedConversion, key.level(), key.name());

    if (type == Integer.class) {
        return Integer.valueOf(value);
    } else {
        //咱們要看到前面支持配置處的源碼其支持的類型要麼是Boolean,要麼是Integer
        //因此,返回值爲Boolean.FALSE 或 Boolean.TRUE也就不足爲奇了
        return (value == 0) ? Boolean.FALSE : Boolean.TRUE;
    }
}
複製代碼

ServerSocketChannel與ServerSocket在bind處的異同

Net.bind一節中,咱們最後說了一個注意點,每一個鏈接過來的時候都會建立一個Socket來供此鏈接進行操做,這個在accept方法中能夠看到,其在獲得鏈接以後,就 new SocketChannelImpl(provider(), newfd, isa)這個對象。那這裏,就引出一個話題,咱們在使用bind方法的時候,是否是也應該綁定到一個Socket之上呢,那以前bio是怎麼作呢,咱們先來回顧一下。 咱們以前在調用java.net.ServerSocket#ServerSocket(int, int, java.net.InetAddress)方法的時候,裏面有一個setImpl():

//java.net.ServerSocket
 public ServerSocket(int port, int backlog, InetAddress bindAddr) throws IOException {
        setImpl();
        if (port < 0 || port > 0xFFFF)
            throw new IllegalArgumentException(
                       "Port value out of range: " + port);
        if (backlog < 1)
          backlog = 50;
        try {
            bind(new InetSocketAddress(bindAddr, port), backlog);
        } catch(SecurityException e) {
            close();
            throw e;
        } catch(IOException e) {
            close();
            throw e;
        }
    }
//java.net.ServerSocket#setImpl
private void setImpl() {
        if (factory != null) {
            impl = factory.createSocketImpl();
            checkOldImpl();
        } else {
            // No need to do a checkOldImpl() here, we know it's an up to date
            // SocketImpl!
            impl = new SocksSocketImpl();
        }
        if (impl != null)
            impl.setServerSocket(this);
    }
複製代碼

可是,咱們此處的重點在bind(new InetSocketAddress(bindAddr, port), backlog);,這裏的代碼以下:

//java.net.ServerSocket
public void bind(SocketAddress endpoint, int backlog) throws IOException {
        if (isClosed())
            throw new SocketException("Socket is closed");
        if (!oldImpl && isBound())
            throw new SocketException("Already bound");
        if (endpoint == null)
            endpoint = new InetSocketAddress(0);
        if (!(endpoint instanceof InetSocketAddress))
            throw new IllegalArgumentException("Unsupported address type");
        InetSocketAddress epoint = (InetSocketAddress) endpoint;
        if (epoint.isUnresolved())
            throw new SocketException("Unresolved address");
        if (backlog < 1)
          backlog = 50;
        try {
            SecurityManager security = System.getSecurityManager();
            if (security != null)
                security.checkListen(epoint.getPort());
                //重點!!
            getImpl().bind(epoint.getAddress(), epoint.getPort());
            getImpl().listen(backlog);
            bound = true;
        } catch(SecurityException e) {
            bound = false;
            throw e;
        } catch(IOException e) {
            bound = false;
            throw e;
        }
    }
複製代碼

咱們有看到 getImpl()我標示了重點,這裏面作了什麼,咱們走進去:

//java.net.ServerSocket#getImpl
SocketImpl getImpl() throws SocketException {
    if (!created)
        createImpl();
    return impl;
}
複製代碼

在整個過程當中created仍是對象剛建立時的初始值,爲false,那麼,鐵定會進入createImpl()方法中:

//java.net.ServerSocket#createImpl
void createImpl() throws SocketException {
    if (impl == null)
        setImpl();
    try {
        impl.create(true);
        created = true;
    } catch (IOException e) {
        throw new SocketException(e.getMessage());
    }
}
複製代碼

而此處,由於前面impl已經賦值,因此,會走impl.create(true),進而將created設定爲true。而此刻,終於到我想講的重點了:

//java.net.AbstractPlainSocketImpl#create
protected synchronized void create(boolean stream) throws IOException {
    this.stream = stream;
    if (!stream) {
        ResourceManager.beforeUdpCreate();
        // only create the fd after we know we will be able to create the socket
        fd = new FileDescriptor();
        try {
            socketCreate(false);
            SocketCleanable.register(fd);
        } catch (IOException ioe) {
            ResourceManager.afterUdpClose();
            fd = null;
            throw ioe;
        }
    } else {
        fd = new FileDescriptor();
        socketCreate(true);
        SocketCleanable.register(fd);
    }
    if (socket != null)
        socket.setCreated();
    if (serverSocket != null)
        serverSocket.setCreated();
}

複製代碼

能夠看到,socketCreate(true);,它的實現以下:

@Override
void socketCreate(boolean stream) throws IOException {
    if (fd == null)
        throw new SocketException("Socket closed");

    int newfd = socket0(stream);

    fdAccess.set(fd, newfd);
}
複製代碼

經過本地方法socket0(stream)獲得了一個文件描述符,由此,Socket建立了出來,而後進行相應的綁定。 咱們再把眼光放回到sun.nio.ch.ServerSocketChannelImpl#accept()中,這裏new的SocketChannelImpl對象是獲得鏈接以後作的事情,那對於服務器來說,綁定時候用的Socket呢,這裏,咱們在使用ServerSocketChannel的時候,每每要使用JDK給咱們提供的對我統一的方法open,也是爲了下降咱們使用的複雜度,這裏是java.nio.channels.ServerSocketChannel#open:

//java.nio.channels.ServerSocketChannel#open
public static ServerSocketChannel open() throws IOException {
    return SelectorProvider.provider().openServerSocketChannel();
}
//sun.nio.ch.SelectorProviderImpl#openServerSocketChannel
public ServerSocketChannel openServerSocketChannel() throws IOException {
    return new ServerSocketChannelImpl(this);
}
//sun.nio.ch.ServerSocketChannelImpl#ServerSocketChannelImpl(SelectorProvider)
ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
    super(sp);
    this.fd =  Net.serverSocket(true);
    this.fdVal = IOUtil.fdVal(fd);
}
//sun.nio.ch.Net#serverSocket
static FileDescriptor serverSocket(boolean stream) {
    return IOUtil.newFD(socket0(isIPv6Available(), stream, true, fastLoopback));
}
複製代碼

能夠看到,只要new了一個ServerSocketChannelImpl對象,就至關於拿到了一個socket而後bind也就有着落了。可是,咱們要注意下細節ServerSocketChannel#open獲得的是ServerSocketChannel類型。咱們accept到一個客戶端來的鏈接後,應該在客戶端與服務器之間建立一個Socket通道來供二者通訊操做的,因此,sun.nio.ch.ServerSocketChannelImpl#accept()中所作的是SocketChannel sc = new SocketChannelImpl(provider(), newfd, isa);,獲得的是SocketChannel類型的對象,這樣,就能夠將Socket的讀寫數據的方法定義在這個類裏面。

由ServerSocketChannel的socket方法延伸的

關於ServerSocketChannel,咱們還有方法須要接觸一下,如socket():

//sun.nio.ch.ServerSocketChannelImpl#socket
@Override
public ServerSocket socket() {
    synchronized (stateLock) {
        if (socket == null)
            socket = ServerSocketAdaptor.create(this);
        return socket;
    }
}
複製代碼

咱們看到了ServerSocketAdaptor,咱們經過此類的註釋可知,這是一個和ServerSocket調用同樣,可是底層是用ServerSocketChannelImpl來實現的一個類,其適配是的目的是適配咱們使用ServerSocket的方式,因此該ServerSocketAdaptor繼承ServerSocket並按順序重寫了它的方法,因此,咱們在寫這塊兒代碼的時候也就有了新的選擇。

InterruptibleChannel 與可中斷 IO這一篇文章中已經涉及過java.nio.channels.spi.AbstractInterruptibleChannel#close的實現,這裏,咱們再來回顧下其中的某些細節,順帶引出咱們新的話題:

//java.nio.channels.spi.AbstractInterruptibleChannel#close
public final void close() throws IOException {
    synchronized (closeLock) {
        if (closed)
            return;
        closed = true;
        implCloseChannel();
    }
}
//java.nio.channels.spi.AbstractSelectableChannel#implCloseChannel
protected final void implCloseChannel() throws IOException {
        implCloseSelectableChannel();

        // clone keys to avoid calling cancel when holding keyLock
        SelectionKey[] copyOfKeys = null;
        synchronized (keyLock) {
            if (keys != null) {
                copyOfKeys = keys.clone();
            }
        }

        if (copyOfKeys != null) {
            for (SelectionKey k : copyOfKeys) {
                if (k != null) {
                    k.cancel();   // invalidate and adds key to cancelledKey set
                }
            }
        }
    }
//sun.nio.ch.ServerSocketChannelImpl#implCloseSelectableChannel
@Override
protected void implCloseSelectableChannel() throws IOException {
    assert !isOpen();

    boolean interrupted = false;
    boolean blocking;

    // set state to ST_CLOSING
    synchronized (stateLock) {
        assert state < ST_CLOSING;
        state = ST_CLOSING;
        blocking = isBlocking();
    }

    // wait for any outstanding accept to complete
    if (blocking) {
        synchronized (stateLock) {
            assert state == ST_CLOSING;
            long th = thread;
            if (th != 0) {
                //本地線程不爲null,則本地Socket預先關閉
                //並通知線程通知關閉
                nd.preClose(fd);
                NativeThread.signal(th);

                // wait for accept operation to end
                while (thread != 0) {
                    try {
                        stateLock.wait();
                    } catch (InterruptedException e) {
                        interrupted = true;
                    }
                }
            }
        }
    } else {
        // non-blocking mode: wait for accept to complete
        acceptLock.lock();
        acceptLock.unlock();
    }

    // set state to ST_KILLPENDING
    synchronized (stateLock) {
        assert state == ST_CLOSING;
        state = ST_KILLPENDING;
    }

    // close socket if not registered with Selector
    //若是未在Selector上註冊,直接kill掉
    //即關閉文件描述 
    if (!isRegistered())
        kill();

    // restore interrupt status
    //印證了咱們上一篇中在異步打斷中如果經過線程的中斷方法中斷線程的話
    //最後要設定該線程狀態是interrupt
    if (interrupted)
        Thread.currentThread().interrupt();
}

@Override
public void kill() throws IOException {
    synchronized (stateLock) {
        if (state == ST_KILLPENDING) {
            state = ST_KILLED;
            nd.close(fd);
        }
    }
}
複製代碼
channel的close()應用

也是由於close()並無在InterruptibleChannel 與可中斷 IO這一篇文章中進行具體的講解應用,這裏其應用的更可能是在SocketChannel這裏,其更多的涉及到客戶端與服務端創建鏈接交換數據,因此斷開鏈接後,將不用的Channel關閉是很正常的。 這裏,在sun.nio.ch.ServerSocketChannelImpl#accept()中的源碼中:

@Override
public SocketChannel accept() throws IOException {
        ...
        // newly accepted socket is initially in blocking mode
        IOUtil.configureBlocking(newfd, true);

        InetSocketAddress isa = isaa[0];
        SocketChannel sc = new SocketChannelImpl(provider(), newfd, isa);

        // check permitted to accept connections from the remote address
        SecurityManager sm = System.getSecurityManager();
        if (sm != null) {
            try {
                sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort());
            } catch (SecurityException x) {
                sc.close();
                throw x;
            }
        }
        return sc;

    } finally {
        acceptLock.unlock();
    }
}
複製代碼

這裏經過對所接收的鏈接的遠程地址作合法性判斷,假如驗證出現異常,則關閉上面建立的SocketChannel。 還有一個關於close()的實際用法,在客戶端創建鏈接的時候,若是鏈接出異常,一樣是要關閉所建立的Socket:

//java.nio.channels.SocketChannel#open(java.net.SocketAddress)
public static SocketChannel open(SocketAddress remote) throws IOException {
        SocketChannel sc = open();
        try {
            sc.connect(remote);
        } catch (Throwable x) {
            try {
                sc.close();
            } catch (Throwable suppressed) {
                x.addSuppressed(suppressed);
            }
            throw x;
        }
        assert sc.isConnected();
        return sc;
    }
複製代碼

接着,咱們在implCloseSelectableChannel中會發現nd.preClose(fd);nd.close(fd);,這個在SocketChannelImplServerSocketChannelImpl二者對於implCloseSelectableChannel實現中均可以看到,這個nd是什麼,這裏,咱們拿ServerSocketChannelImpl來說,在這個類的最後面有一段靜態代碼塊(SocketChannelImpl同理),也就是在這個類加載的時候就會執行:

//C:/Program Files/Java/jdk-11.0.1/lib/src.zip!/java.base/sun/nio/ch/ServerSocketChannelImpl.java:550
static {
     //加載nio,net資源庫
        IOUtil.load();
        initIDs();
        nd = new SocketDispatcher();
    }
複製代碼

也就是說,在ServerSocketChannelImpl這個類字節碼加載的時候,就會建立SocketDispatcher對象。經過SocketDispatcher容許在不一樣的平臺調用不一樣的本地方法進行讀寫操做,而後基於這個類,咱們就能夠在sun.nio.ch.SocketChannelImpl作Socket的I/O操做。

//sun.nio.ch.SocketDispatcher
class SocketDispatcher extends NativeDispatcher {

    static {
        IOUtil.load();
    }
    //讀操做 
    int read(FileDescriptor fd, long address, int len) throws IOException {
        return read0(fd, address, len);
    }

    long readv(FileDescriptor fd, long address, int len) throws IOException {
        return readv0(fd, address, len);
    }
    //寫操做 
    int write(FileDescriptor fd, long address, int len) throws IOException {
        return write0(fd, address, len);
    }

    long writev(FileDescriptor fd, long address, int len) throws IOException {
        return writev0(fd, address, len);
    }
    //預關閉文件描述符
    void preClose(FileDescriptor fd) throws IOException {
        preClose0(fd);
    }
    //關閉文件描述
    void close(FileDescriptor fd) throws IOException {
        close0(fd);
    }

    //-- Native methods
    static native int read0(FileDescriptor fd, long address, int len) throws IOException;

    static native long readv0(FileDescriptor fd, long address, int len) throws IOException;

    static native int write0(FileDescriptor fd, long address, int len) throws IOException;

    static native long writev0(FileDescriptor fd, long address, int len) throws IOException;

    static native void preClose0(FileDescriptor fd) throws IOException;

    static native void close0(FileDescriptor fd) throws IOException;
}
複製代碼

FileDescriptor

咱們有看到FileDescriptor在前面代碼中有大量的出現,這裏,咱們對它來專門介紹。經過FileDescriptor 這個類的實例來充當底層機器特定結構的不透明處理,表示打開文件,打開socket或其餘字節源或接收器。 文件描述符的主要用途是建立一個 FileInputStream或 FileOutputStream來包含它。 注意: 應用程序不該建立本身的文件描述符。 咱們來看其部分源碼:

public final class FileDescriptor {

    private int fd;

    private long handle;

    private Closeable parent;
    private List<Closeable> otherParents;
    private boolean closed;

    /** * true, if file is opened for appending. */
    private boolean append;

    static {
        initIDs();
    }
    /** * 在未明確關閉FileDescriptor的狀況下進行清理. */
    private PhantomCleanable<FileDescriptor> cleanup;

    /** * 構造一個無效的FileDescriptor對象,fd或handle會在以後進行設定 */
    public FileDescriptor() {
        fd = -1;
        handle = -1;
    }

    /** * Used for standard input, output, and error only. * For Windows the corresponding handle is initialized. * For Unix the append mode is cached. * 僅用於標準輸入,輸出和錯誤。     * 對於Windows,初始化相應的句柄。     * 對於Unix,緩存附加模式。 * @param fd the raw fd number (0, 1, 2) */
    private FileDescriptor(int fd) {
        this.fd = fd;
        this.handle = getHandle(fd);
        this.append = getAppend(fd);
    }
    ...
}
複製代碼

咱們平時所用的標準輸入,輸出,錯誤流的句柄能夠以下,一般,咱們不會直接使用它們,而是使用java.lang.System.injava.lang.System#outjava.lang.System#err:

public static final FileDescriptor in = new FileDescriptor(0);
public static final FileDescriptor out = new FileDescriptor(1);
public static final FileDescriptor err = new FileDescriptor(2);
複製代碼

測試該文件描述符是否有效可使用以下方法:

//java.io.FileDescriptor#valid
public boolean valid() {
        return (handle != -1) || (fd != -1);
    }
複製代碼

返回值爲true的話,那麼這個文件描述符對象所表明的socket 文件操做或其餘活動的網絡鏈接都是有效的,反之,false則是無效。 更多內容,讀者能夠自行深刻源碼,此處就不過多解釋了。爲了讓你們能夠更好的理解上述內容,咱們會在後面的部分還要進一步涉及一下。

NIO包下SocketChannel解讀

在前面,咱們已經接觸了SocketChannel,這裏,來接觸下細節。

一樣,咱們也能夠經過調用此類的open方法來建立socket channel。這裏須要注意:

  • 沒法爲任意預先存在的socket建立channel
  • 新建立的socket channel已打開但還沒有鏈接。
  • 嘗試在未鏈接的channel上調用I/O操做將致使拋出NotYetConnectedException
  • 能夠經過調用connect方法鏈接socket channel;
  • 一旦鏈接後,socket channel會保持鏈接狀態,直到它關閉。
  • 是否有鏈接socket channel能夠經過肯定調用其isConnected方法。

socket channel支持 非阻塞鏈接:

  • 能夠先建立socket channel,而後能夠經過 connect 方法創建到遠程socket的鏈接。
  • 經過調用finishConnect方法來結束鏈接。
  • 判斷是否正在進行鏈接操做能夠經過調用isConnectionPending方法來肯定。

socket channel支持異步關閉,相似於Channel類中的異步關閉操做。

  • 若是socket的輸入端被一個線程關閉而另外一個線程在此socket channel上因在進行讀操做而被阻塞,那麼被阻塞線程中的讀操做將不讀取任何字節並將返回 -1
  • 若是socket的輸出端被一個線程關閉而另外一個線程在socket channel上因在進行寫操做而被阻塞,則被阻塞的線程將收到AsynchronousCloseException

接下來,咱們來看其具體實現方法。

ServerSocketChannel與SocketChannel的open()

//java.nio.channels.SocketChannel#open()
public static SocketChannel open() throws IOException {
    return SelectorProvider.provider().openSocketChannel();
}
//java.nio.channels.SocketChannel#open(java.net.SocketAddress)
//這個方法省的咱們再次調用connect了
public static SocketChannel open(SocketAddress remote) throws IOException {
    //默認是堵塞的,這個在AbstractSelectableChannel處討論過了
    SocketChannel sc = open();
    try {
        sc.connect(remote);
    } catch (Throwable x) {
        try {
            sc.close();
        } catch (Throwable suppressed) {
            x.addSuppressed(suppressed);
        }
        throw x;
    }
    assert sc.isConnected();
    return sc;
}
//sun.nio.ch.SelectorProviderImpl#openSocketChannel
public SocketChannel openSocketChannel() throws IOException {
    return new SocketChannelImpl(this);
}
//sun.nio.ch.SocketChannelImpl#SocketChannelImpl(java.nio.channels.spi.SelectorProvider)
SocketChannelImpl(SelectorProvider sp) throws IOException {
    super(sp);
     //調用socket函數,true表示TCP
    this.fd = Net.socket(true);
    this.fdVal = IOUtil.fdVal(fd);
}
//sun.nio.ch.Net#socket(boolean)
static FileDescriptor socket(boolean stream) throws IOException {
    return socket(UNSPEC, stream);
}
//sun.nio.ch.Net#socket(java.net.ProtocolFamily, boolean)
static FileDescriptor socket(ProtocolFamily family, boolean stream) throws IOException {
    boolean preferIPv6 = isIPv6Available() &&
        (family != StandardProtocolFamily.INET);
    return IOUtil.newFD(socket0(preferIPv6, stream, false, fastLoopback));
}
//sun.nio.ch.IOUtil#newFD
public static FileDescriptor newFD(int i) {
    FileDescriptor fd = new FileDescriptor();
    setfdVal(fd, i);
    return fd;
}
static native void setfdVal(FileDescriptor fd, int value);
複製代碼

關於Net.socket(true),咱們前面已經提到過了,這裏,經過其底層源碼來再次調教下 (此處不想看能夠跳過):

JNIEXPORT jint JNICALL
Java_sun_nio_ch_Net_socket0(JNIEnv *env, jclass cl, jboolean preferIPv6,
                            jboolean stream, jboolean reuse, jboolean ignored)
{
    int fd;
    //字節流仍是數據報,TCP對應SOCK_STREAM,UDP對應SOCK_DGRAM,此處傳入的stream=true;
    int type = (stream ? SOCK_STREAM : SOCK_DGRAM);
    //判斷是IPV6仍是IPV4
    int domain = (ipv6_available() && preferIPv6) ? AF_INET6 : AF_INET;

    //調用Linux的socket函數,domain爲表明協議;
    //type爲套接字類型,protocol設置爲0來表示使用默認的傳輸協議
    fd = socket(domain, type, 0);
    //出錯
    if (fd < 0) {
        return handleSocketError(env, errno);
    }

    /* Disable IPV6_V6ONLY to ensure dual-socket support */
    if (domain == AF_INET6) {
        int arg = 0;
        //arg=1設置ipv6的socket只接收ipv6地址的報文,arg=0表示也可接受ipv4的請求
        if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, (char*)&arg,
                       sizeof(int)) < 0) {
            JNU_ThrowByNameWithLastError(env,
                                         JNU_JAVANETPKG "SocketException",
                                         "Unable to set IPV6_V6ONLY");
            close(fd);
            return -1;
        }
    }

    //SO_REUSEADDR有四種用途:
    //1.當有一個有相同本地地址和端口的socket1處於TIME_WAIT狀態時,而你啓動的程序的socket2要佔用該地址和端口,你的程序就要用到該選項。 
    //2.SO_REUSEADDR容許同一port上啓動同一服務器的多個實例(多個進程)。但每一個實例綁定的IP地址是不能相同的。
    //3.SO_REUSEADDR容許單個進程綁定相同的端口到多個socket上,但每一個socket綁定的ip地址不一樣。 
   //4.SO_REUSEADDR容許徹底相同的地址和端口的重複綁定。但這隻用於UDP的多播,不用於TCP;
    if (reuse) {
        int arg = 1;
        if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&arg,
                       sizeof(arg)) < 0) {
            JNU_ThrowByNameWithLastError(env,
                                         JNU_JAVANETPKG "SocketException",
                                         "Unable to set SO_REUSEADDR");
            close(fd);
            return -1;
        }
    }

#if defined(__linux__)
    if (type == SOCK_DGRAM) {
        int arg = 0;
        int level = (domain == AF_INET6) ? IPPROTO_IPV6 : IPPROTO_IP;
        if ((setsockopt(fd, level, IP_MULTICAST_ALL, (char*)&arg, sizeof(arg)) < 0) &&
            (errno != ENOPROTOOPT)) {
            JNU_ThrowByNameWithLastError(env,
                                         JNU_JAVANETPKG "SocketException",
                                         "Unable to set IP_MULTICAST_ALL");
            close(fd);
            return -1;
        }
    }

     //IPV6_MULTICAST_HOPS用於控制多播的範圍,
     // 1表示只在本地網絡轉發,
     //更多介紹請參考(http://www.ctt.sbras.ru/cgi-bin/www/unix_help/unix-man?ip6+4);
    /* By default, Linux uses the route default */
    if (domain == AF_INET6 && type == SOCK_DGRAM) {
        int arg = 1;
        if (setsockopt(fd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &arg,
                       sizeof(arg)) < 0) {
            JNU_ThrowByNameWithLastError(env,
                                         JNU_JAVANETPKG "SocketException",
                                         "Unable to set IPV6_MULTICAST_HOPS");
            close(fd);
            return -1;
        }
    }
#endif
    return fd;
}

複製代碼

Linux 3.9以後加入了SO_REUSEPORT配置,這個配置很強大,多個socket(無論是處於監聽仍是非監聽,無論是TCP仍是UDP)只要在綁定以前設置了SO_REUSEPORT屬性,那麼就能夠綁定到徹底相同的地址和端口。 爲了阻止"port 劫持"(Port hijacking)有一個特別的限制:全部但願共享源地址和端口的socket都必須擁有相同的有效用戶id(effective user ID)。這樣一個用戶就不能從另外一個用戶那裏"偷取"端口。另外,內核在處理SO_REUSEPORT socket的時候使用了其它系統上沒有用到的"特殊技巧":

  • 對於UDP socket,內核嘗試平均的轉發數據報;
  • 對於TCP監聽socket,內核嘗試將新的客戶鏈接請求(由accept返回)平均的交給共享同一地址和端口的socket(服務器監聽socket)。

例如:一個簡單的服務器程序的多個實例可使用SO_REUSEPORT socket,這樣就實現一個簡單的負載均衡,由於內核已經把請求的分配都作了。

在前面的代碼中能夠看到,在這個socket建立成功以後,調用IOUtil.newFD建立了文件描述符 。這裏,我只是想知道這個Socket是能夠輸入呢,仍是能夠讀呢,仍是有錯呢,參考FileDescriptor這一節最後那幾個標準狀態的設定,其實這裏也是同樣,由於咱們要往Socket中寫和讀,其標準狀態無非就這三種:輸入,輸出,出錯。而這個Socket是綁定在SocketChannel上的,那就把FileDescriptor也綁定到上面便可,這樣咱們就能夠獲取到它的狀態了。因爲FileDescriptor沒有提供外部設置fd的方法,setfdVal是經過本地方法實現的:

JNIEXPORT void JNICALL Java_sun_nio_ch_IOUtil_setfdVal(JNIEnv *env, jclass clazz, jobject fdo, jint val) {
    (*env)->SetIntField(env, fdo, fd_fdID, val);
}
複製代碼

假如各位有對Linux下的shell編程或者命令有了解的話,咱們知道,shell對報錯進行重定向要使用2>,也就是將錯誤信息由2號所指向的通道寫出,這裏0和1 一樣指向一個通道。此處一樣也表明了狀態,這樣就能夠對錶明Socket的狀態進行操做了,也就是改變SelectionKeyinterest ops,即首先對SelectionKey按輸入輸出類型進行分類,而後咱們的讀寫狀態的操做也就有着落了。此處咱們打個戳,在下一篇中會對其進行細節講解。

咱們迴歸到SocketChannelopen方法中。咱們能夠看到,SelectorProvider.provider().openSocketChannel()返回的是SocketChannelImpl對象實例。在SocketChannelImpl(SelectorProvider sp)中咱們並未看到其對this.state進行值操做,也就是其默認爲0,即ST_UNCONNECTED(未鏈接狀態),同時Socket默認是堵塞的。 因此,通常狀況下,當採用異步方式時,使用不帶參數的open方法比較常見,這樣,咱們會隨之調用configureBlocking來設置非堵塞。

SocketChannel的connect解讀

由前面可知,咱們調用connect方法鏈接到遠程服務器,其源碼以下:

//sun.nio.ch.SocketChannelImpl#connect
@Override
public boolean connect(SocketAddress sa) throws IOException {
    InetSocketAddress isa = Net.checkAddress(sa);
    SecurityManager sm = System.getSecurityManager();
    if (sm != null)
        sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());

    InetAddress ia = isa.getAddress();
    if (ia.isAnyLocalAddress())
        ia = InetAddress.getLocalHost();

    try {
        readLock.lock();
        try {
            writeLock.lock();
            try {
                int n = 0;
                boolean blocking = isBlocking();
                try {
                    //支持線程中斷,經過設置當前線程的Interruptible blocker屬性實現
                    beginConnect(blocking, isa);
                    do {
                    //調用connect函數實現,若是採用堵塞模式,會一直等待,直到成功或出//現異常
                        n = Net.connect(fd, ia, isa.getPort());
                    } while (n == IOStatus.INTERRUPTED && isOpen());
                } finally {
                    endConnect(blocking, (n > 0));
                }
                assert IOStatus.check(n);
                //鏈接成功
                return n > 0;
            } finally {
                writeLock.unlock();
            }
        } finally {
            readLock.unlock();
        }
    } catch (IOException ioe) {
        // connect failed, close the channel
        close();
        throw SocketExceptions.of(ioe, isa);
    }
}
複製代碼

關於beginConnectendConnect,是針對AbstractInterruptibleChannelbegin()end方法的一種加強。這裏咱們須要知道的是,假如是非阻塞Channel的話,咱們無須去關心鏈接過程的打斷。顧名思義,只有阻塞等待才須要去考慮打斷這一場景的出現。剩下的細節我已經在代碼中進行了完整的註釋,讀者可自行查看。

//sun.nio.ch.SocketChannelImpl#beginConnect
private void beginConnect(boolean blocking, InetSocketAddress isa) throws IOException {   //只有阻塞的時候纔會進入begin
    if (blocking) {
        // set hook for Thread.interrupt
        //支持線程中斷,經過設置當前線程的Interruptible blocker屬性實現
        begin();
    }
    synchronized (stateLock) {
        //默認爲open, 除非調用了close方法
        ensureOpen();
        //檢查鏈接狀態
        int state = this.state;
        if (state == ST_CONNECTED)
            throw new AlreadyConnectedException();
        if (state == ST_CONNECTIONPENDING)
            throw new ConnectionPendingException();
        //斷言當前的狀態是不是未鏈接狀態,若是是,賦值表示正在鏈接中
        assert state == ST_UNCONNECTED;
        //表示正在鏈接中
        this.state = ST_CONNECTIONPENDING;
        //只有未綁定本地地址也就是說未調用bind方法才執行,
        //該方法在ServerSocketChannel中也見過
        if (localAddress == null)
            NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort());
        remoteAddress = isa;

        if (blocking) {
            // record thread so it can be signalled if needed
            readerThread = NativeThread.current();
        }
    }
}
複製代碼

在鏈接過程當中,咱們須要注意的就是幾個鏈接的狀態:ST_UNCONNECTEDST_CONNECTEDST_CONNECTIONPENDINGST_CLOSINGST_KILLPENDINGST_KILLED,也是由於其是一個公共狀態,可能會有多個線程對其進行鏈接操做的。因此,state被定義爲一個volatile變量,這個變量在改變的時候須要有stateLock這個對象來做爲synchronized鎖對象來控制同步操做的。

//sun.nio.ch.SocketChannelImpl#endConnect
private void endConnect(boolean blocking, boolean completed) throws IOException {
    endRead(blocking, completed);
    //當上面代碼中n>0,說明鏈接成功,更新狀態爲ST_CONNECTED
    if (completed) {
        synchronized (stateLock) {
            if (state == ST_CONNECTIONPENDING) {
                localAddress = Net.localAddress(fd);
                state = ST_CONNECTED;
            }
        }
    }
}
//sun.nio.ch.SocketChannelImpl#endRead
private void endRead(boolean blocking, boolean completed) throws AsynchronousCloseException {   //當阻塞狀態下的話,才進入
    if (blocking) {
        synchronized (stateLock) {
            readerThread = 0;
            // notify any thread waiting in implCloseSelectableChannel
            if (state == ST_CLOSING) {
                stateLock.notifyAll();
            }
        }
        //和begin成對出現,當線程中斷時,拋出ClosedByInterruptException
        // remove hook for Thread.interrupt
        end(completed);
    }
}
複製代碼

咱們來關注connect中的Net.connect(fd, ia, isa.getPort())方法:

//sun.nio.ch.Net#connect
static int connect(FileDescriptor fd, InetAddress remote, int remotePort) throws IOException {
    return connect(UNSPEC, fd, remote, remotePort);
}
//sun.nio.ch.Net#connect
static int connect(ProtocolFamily family, FileDescriptor fd, InetAddress remote, int remotePort) throws IOException {
    boolean preferIPv6 = isIPv6Available() &&
        (family != StandardProtocolFamily.INET);
    return connect0(preferIPv6, fd, remote, remotePort);
}
複製代碼

該方法最終會調用native方法,具體註釋以下:

JNIEXPORT jint JNICALL Java_sun_nio_ch_Net_connect0(JNIEnv *env, jclass clazz, jboolean preferIPv6, jobject fdo, jobject iao, jint port) {
    SOCKETADDRESS sa;
    int sa_len = 0;
    int rv;
    //地址轉換爲struct sockaddr格式
    if (NET_InetAddressToSockaddr(env, iao, port, &sa, &sa_len, preferIPv6) != 0) {
        return IOS_THROWN;
    }
    //傳入fd和sockaddr,與遠程服務器創建鏈接,通常就是TCP三次握手
   //若是設置了configureBlocking(false),不會堵塞,不然會堵塞一直到超時或出現異常
    rv = connect(fdval(env, fdo), &sa.sa, sa_len);
    //0表示鏈接成功,失敗時經過errno獲取具體緣由
    if (rv != 0) {
        //非堵塞,鏈接還未創建(-2)
        if (errno == EINPROGRESS) {
            return IOS_UNAVAILABLE;
        } else if (errno == EINTR) {
            //中斷(-3)
            return IOS_INTERRUPTED;
        }
        return handleSocketError(env, errno);
    }
    //鏈接創建,通常TCP鏈接鏈接都須要時間,所以除非是本地網絡,
    //通常狀況下非堵塞模式返回IOS_UNAVAILABLE比較多;
    return 1;
}
複製代碼

從上面能夠經過註釋看到,若是是非堵塞,並且鏈接也並未立馬創建成功,其返回的是-2,也就是鏈接未創建成功,由以前beginConnect部分源碼可知,此時狀態爲ST_CONNECTIONPENDING,那麼,非阻塞條件下,何時會變爲ST_CONNECTED?有什麼方法能夠查詢狀態或者等待鏈接完成? 那就讓咱們來關注下sun.nio.ch.SocketChannelImpl#finishConnect

SocketChannelImpl中finishConnect解讀

首先,咱們回顧下,前面咱們涉及了sun.nio.ch.ServerSocketAdaptor的用法,方便咱們只有Socket編程習慣人羣使用,這裏,咱們也就能夠看到基本的核心實現邏輯,那麼有ServerSocketAdaptor就有SocketAdaptor,這裏,在BIO的Socket編程中最後也是調用了connect(address)操做:

//java.net.Socket#Socket
private Socket(SocketAddress address, SocketAddress localAddr, boolean stream) throws IOException {
    setImpl();

    // backward compatibility
    if (address == null)
        throw new NullPointerException();

    try {
        createImpl(stream);
        if (localAddr != null)
            bind(localAddr);
        connect(address);
    } catch (IOException | IllegalArgumentException | SecurityException e) {
        try {
            close();
        } catch (IOException ce) {
            e.addSuppressed(ce);
        }
        throw e;
    }
}
複製代碼

這裏,咱們能夠調用java.nio.channels.SocketChannel#open(),而後調用所獲得的SocketChannel對象的socket()方法,就能夠獲得sun.nio.ch.SocketAdaptor對象實例了。咱們來查看SocketAdaptor的connect實現:

//sun.nio.ch.SocketAdaptor#connect
public void connect(SocketAddress remote) throws IOException {
    connect(remote, 0);
}

public void connect(SocketAddress remote, int timeout) throws IOException {
    if (remote == null)
        throw new IllegalArgumentException("connect: The address can't be null");
    if (timeout < 0)
        throw new IllegalArgumentException("connect: timeout can't be negative");

    synchronized (sc.blockingLock()) {
        if (!sc.isBlocking())
            throw new IllegalBlockingModeException();

        try {
            //未設定超時則會一直在此等待直到鏈接或者出現異常
            // no timeout
            if (timeout == 0) {
                sc.connect(remote);
                return;
            }
            //有超時設定,則會將Socket給設定爲非阻塞
            // timed connect
            sc.configureBlocking(false);
            try {
                if (sc.connect(remote))
                    return;
            } finally {
                try {
                    sc.configureBlocking(true);
                } catch (ClosedChannelException e) { }
            }

            long timeoutNanos = NANOSECONDS.convert(timeout, MILLISECONDS);
            long to = timeout;
            for (;;) {
                //經過計算超時時間,在容許的時間範圍內無限循環來進行鏈接,
                //若是超時,則關閉這個Socket
                long startTime = System.nanoTime();
                if (sc.pollConnected(to)) {
                    boolean connected = sc.finishConnect();
                    //看下文解釋
                    assert connected;
                    break;
                }
                timeoutNanos -= System.nanoTime() - startTime;
                if (timeoutNanos <= 0) {
                    try {
                        sc.close();
                    } catch (IOException x) { }
                    throw new SocketTimeoutException();
                }
                to = MILLISECONDS.convert(timeoutNanos, NANOSECONDS);
            }

        } catch (Exception x) {
            Net.translateException(x, true);
        }
    }

}
複製代碼

這裏先解釋下一個小注意點:在Java中,assert關鍵字是從JAVA SE 1.4 引入的,爲了不和老版本的Java代碼中使用了assert關鍵字致使錯誤,Java在執行的時候默認是不啓動斷言檢查的(這個時候,全部的斷言語句都 將忽略!),若是要開啓斷言檢查,則須要用開關-enableassertions或-ea來開啓。 經過上面的源碼註釋,相信大夥已經知道大體的流程了,那關於sun.nio.ch.SocketChannelImpl#finishConnect到底作了什麼,此處,咱們來探索一番:

//sun.nio.ch.SocketChannelImpl#finishConnect
@Override
public boolean finishConnect() throws IOException {
    try {
        readLock.lock();
        try {
            writeLock.lock();
            try {
                // no-op if already connected
                if (isConnected())
                    return true;

                boolean blocking = isBlocking();
                boolean connected = false;
                try {
                    beginFinishConnect(blocking);
                    int n = 0;
                    if (blocking) {
                        do {
                            //阻塞狀況下,第二個參數傳入true
                            n = checkConnect(fd, true);
                        } while ((n == 0 || n == IOStatus.INTERRUPTED) && isOpen());
                    } else {
                        //非阻塞狀況下,第二個參數傳入false
                        n = checkConnect(fd, false);
                    }
                    connected = (n > 0);
                } finally {
                    endFinishConnect(blocking, connected);
                }
                assert (blocking && connected) ^ !blocking;
                return connected;
            } finally {
                writeLock.unlock();
            }
        } finally {
            readLock.unlock();
        }
    } catch (IOException ioe) {
        // connect failed, close the channel
        close();
        throw SocketExceptions.of(ioe, remoteAddress);
    }
}
//sun.nio.ch.SocketChannelImpl#checkConnect
private static native int checkConnect(FileDescriptor fd, boolean block) throws IOException;
複製代碼

關於beginFinishConnectendFinishConnect和咱們以前分析的sun.nio.ch.SocketChannelImpl#beginConnectsun.nio.ch.SocketChannelImpl#endConnect過程差很少,不懂讀者可回看。剩下的,就是咱們關注的主要核心邏輯checkConnect(fd, true),它也是一個本地方法,涉及到的源碼以下:

JNIEXPORT jint JNICALL Java_sun_nio_ch_SocketChannelImpl_checkConnect(JNIEnv *env, jobject this, jobject fdo, jboolean block) {
    int error = 0;
    socklen_t n = sizeof(int);
    //獲取FileDescriptor中的fd
    jint fd = fdval(env, fdo);
    int result = 0;
    struct pollfd poller;
    //文件描述符
    poller.fd = fd;
    //請求的事件爲寫事件
    poller.events = POLLOUT;
    //返回的事件
    poller.revents = 0;
    
    //-1表示阻塞,0表示當即返回,不阻塞進程
    result = poll(&poller, 1, block ? -1 : 0);
    //小於0表示調用失敗
    if (result < 0) {
        if (errno == EINTR) {
            return IOS_INTERRUPTED;
        } else {
            JNU_ThrowIOExceptionWithLastError(env, "poll failed");
            return IOS_THROWN;
        }
    }
    //非堵塞時,0表示沒有準備好的鏈接
    if (!block && (result == 0))
        return IOS_UNAVAILABLE;
    //準備好寫或出現錯誤的socket數量>0
    if (result > 0) {
        errno = 0;
        result = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &n);
        //出錯
        if (result < 0) {
            return handleSocketError(env, errno);
        //發生錯誤,處理錯誤
        } else if (error) {
            return handleSocketError(env, error);
        } else if ((poller.revents & POLLHUP) != 0) {
            return handleSocketError(env, ENOTCONN);
        }
        //socket已經準備好,可寫,即鏈接已經創建好
        // connected
        return 1;
    }
    return 0;
}
複製代碼

具體的過程如源碼註釋所示,其中是否阻塞咱們在本地方法源碼中和以前sun.nio.ch.SocketChannelImpl#finishConnect的行爲產生對應。另外,從上面的源碼看到,底層是經過poll查詢socket的狀態,從而判斷鏈接是否創建成功;因爲在非堵塞模式下,finishConnect方法會當即返回,根據此處sun.nio.ch.SocketAdaptor#connect的處理,其使用循環的方式判斷鏈接是否創建,在咱們的nio編程中,這個是不建議的,屬於半成品,而是建議註冊到Selector,經過ops=OP_CONNECT獲取鏈接完成的SelectionKey,而後調用finishConnect完成鏈接的創建; 那麼finishConnect是否能夠不調用呢?答案是否認的,由於只有finishConnect中才會將狀態更新爲ST_CONNECTED,而在調用readwrite時都會對狀態進行判斷。

這裏,咱們算是引出了咱們即將要涉及的SelectorSelectionKey,咱們會在下一篇中進行詳細講解。

相關文章
相關標籤/搜索