NIO-SocketChannel源碼分析



NIO-SocketChannel源碼分析

目錄

NIO-概覽
NIO-Buffer
NIO-Channel
NIO-Channel接口分析
NIO-SocketChannel源碼分析java

前言

原本是想學習Netty的,可是Netty是一個NIO框架,所以在學習netty以前,仍是先梳理一下NIO的知識。經過剖析源碼理解NIO的設計原理。linux

本系列文章針對的是JDK1.8.0.161的源碼。git

上一篇介紹了Channel的接口,本篇對SocektChannel的源碼進行解析。github

ServerSocketChannelImpl

建立ServerSocketChannel

咱們經過ServerSocketChannel.open()建立一個ServerSocketChannel,它實際經過provider建立。編程

public abstract class ServerSocketChannel extends AbstractSelectableChannel implements NetworkChannel
{
    protected ServerSocketChannel(SelectorProvider provider) {
        super(provider);
    }

    public static ServerSocketChannel open() throws IOException {
        return SelectorProvider.provider().openServerSocketChannel();
    }
    ...
}

在首次建立時,建立初始化SelectorProvider對象。windows

public static SelectorProvider provider() {
        synchronized(lock) {
            return provider != null ? provider : (SelectorProvider)AccessController.doPrivileged(new PrivilegedAction<SelectorProvider>() {
                public SelectorProvider run() {
                    if (SelectorProvider.loadProviderFromProperty()) {
                        return SelectorProvider.provider;
                    } else if (SelectorProvider.loadProviderAsService()) {
                        return SelectorProvider.provider;
                    } else {
                        SelectorProvider.provider = DefaultSelectorProvider.create();
                        return SelectorProvider.provider;
                    }
                }
            });
        }
    }

具體SelcetProvider實如今《NIO-Selector》一節講解。數組

privoder建立完成後經過openServerSocketChannel建立ServiceSocketChannel緩存

public ServerSocketChannel openServerSocketChannel() throws IOException {
    return new ServerSocketChannelImpl(this);
}

ServerSocketChannelImpl靜態構造函數中會進行一些初始化操做。安全

class ServerSocketChannelImpl extends ServerSocketChannel implements SelChImpl
{
    private static NativeDispatcher nd;
    static {
        IOUtil.load();
        initIDs();
        nd = new SocketDispatcher();
    }
    private static native void initIDs();
    ...
}

初始化

經過靜態構造函數在首次建立時經過IOUtil.load()初始化一些必要的參數。IOUtil的靜態構造方法來加載net和nio的類庫。

public static void load() { }
static {
    java.security.AccessController.doPrivileged(
            new java.security.PrivilegedAction<Void>() {
                public Void run() {
                    //加載net和nio的庫
                    //net主要是應用層的一些協議實現,如FTP,Http
                    System.loadLibrary("net");
                    System.loadLibrary("nio");
                    return null;
                }
            });
    initIDs();
    IOV_MAX = iovMax();
}

沒有查詢到initIDs的具體做用,知道的同窗麻煩說明一下。
IOV_MAX用於獲取最大的可一次性寫入的緩存個數,當咱們經過IOUtils.write一次性向Channel寫入多個Buffer時,會有Buffer最大數量限制的。

IOUtil.load初始化完成,則會建立SocketDispatcher,它提供了Socket的native方法,不一樣平臺對於SocketDispatcher實現不同,最終都是調用FileDispatcherImpl執行相關的文件操做。

class ServerSocketChannelImpl extends ServerSocketChannel implements SelChImpl
{
    private final FileDescriptor fd;
    private int fdVal;
    private static final int ST_INUSE = 0;
    private int state = -1;
    ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
        super(sp);
        //首先經過Net.serverSocket(true)建立Socket並建立一個文件描述符與其關聯。
        this.fd =  Net.serverSocket(true);
        //在註冊selector的時候須要獲取到文件描述符的值。
        this.fdVal = IOUtil.fdVal(fd);
        this.state = ST_INUSE;
    }

    ServerSocketChannelImpl(SelectorProvider sp, FileDescriptor fd, boolean bound) throws IOException {
        super(sp);
        this.fd =  fd;
        this.fdVal = IOUtil.fdVal(fd);
        this.state = ST_INUSE;
        //已綁定則直接獲取地址
        if (bound)
            localAddress = Net.localAddress(fd);//獲取傳入的文件描述符的socket地址
    }
    ...
}

文件描述符簡稱fd,它是一個抽象概念,在C庫編程中能夠叫作文件流或文件流指針,在其它語言中也能夠叫作文件句柄(handler),並且這些不一樣名詞的隱含意義多是不徹底相同的。不過在系統層,咱們統一把它叫作文件描述符。

綁定和監聽

咱們經過channel.bind能夠將socket綁定到一個端口上。

public final ServerSocketChannel bind(SocketAddress local)
        throws IOException
{
    return bind(local, 0);
}
public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {
    synchronized (lock) {
        ...
        InetSocketAddress isa = (local == null) ? new InetSocketAddress(0) :
            Net.checkAddress(local);
        SecurityManager sm = System.getSecurityManager();
        //檢查是否端口已被監聽
        if (sm != null)
            sm.checkListen(isa.getPort());
        NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());
        Net.bind(fd, isa.getAddress(), isa.getPort());
        //默認tcp待鏈接隊列長度最小爲50
        Net.listen(fd, backlog < 1 ? 50 : backlog);
        synchronized (stateLock) {
            //從文件描述符中獲取地址信息
            localAddress = Net.localAddress(fd);
        }
    }
    return this;
}

首先會作一下基本校驗,包括檢查端口是否被佔用。最終綁定並監聽端口。

文件描述符能夠關聯到一個文件設備,最終能夠關聯到socket結構體,經過socket結構體能夠提取出地址信息。如何獲取地址信息能夠看下根據文件描述符fd獲取socket結構體,socket結構體能夠看下struct socket結構體詳解
關於backlog,因爲TCP鏈接時會有3次握手,當服務端接收到SYN包時,會將該請求socket加入到待鏈接隊列中,而後返回SYN+ACK繼續進行鏈接握手。若鏈接併發量很高,而服務端來不及accept致使待鏈接隊列滿了,則後續的鏈接請求將會被拒絕。

在創建在綁定地址以前,咱們須要調用NetHooks.beforeTcpBind,這個方法是將fd轉換爲SDP(Sockets Direct Protocol,Java套接字直接協議) socket。

SDP須要網卡支持InfiniBand高速網絡通訊技術,windows不支持該協議。

  • windows下的NetHooks
public final class NetHooks {
    public static void beforeTcpBind(FileDescriptor fdObj, InetAddress address, int port) throws IOException
    {
    }

    public static void beforeTcpConnect(FileDescriptor fdObj, InetAddress address, int port) throws IOException
    {
    }
}
  • Solaris下的NetHooks
public final class NetHooks {

    public static abstract class Provider {
        protected Provider() {}
        public abstract void implBeforeTcpBind(FileDescriptor fdObj, InetAddress address, int port) throws IOException;
        public abstract void implBeforeTcpConnect(FileDescriptor fdObj, InetAddress address, int port) throws IOException;
    }

    private static final Provider provider = new sun.net.sdp.SdpProvider();

    public static void beforeTcpBind(FileDescriptor fdObj, InetAddress address, int port) throws IOException
    {
        provider.implBeforeTcpBind(fdObj, address, port);
    }
    public static void beforeTcpConnect(FileDescriptor fdObj, InetAddress address, int port) throws IOException
    {
        provider.implBeforeTcpConnect(fdObj, address, port);
    }
}

實際調用了SdpProvider的方法

provider.implBeforeTcpBind(fdObj, address, port);
public void implBeforeTcpBind(FileDescriptor fdObj, InetAddress address, int port) throws IOException
{
    if (enabled)
        convertTcpToSdpIfMatch(fdObj, Action.BIND, address, port);
}
private void convertTcpToSdpIfMatch(FileDescriptor fdObj, Action action, InetAddress address, int port)
    throws IOException
{
    boolean matched = false;
    //匹配規則,通常有PortRangeRule校驗器校驗端口範圍是否符合規則
    for (Rule rule: rules) {
        if (rule.match(action, address, port)) {
            //將fd轉換爲socket
            SdpSupport.convertSocket(fdObj);
            matched = true;
            break;
        }
    }
    ...
}

public static void convertSocket(FileDescriptor fd) throws IOException {
    ...
    //獲取fd索引
    int fdVal = fdAccess.get(fd);
    convert0(fdVal);
}
//調用native方法轉換
private static native int create0() throws IOException;

//rules 是在初始化SdpProvider加載的
public SdpProvider() {
    String file = AccessController.doPrivileged(
        new GetPropertyAction("com.sun.sdp.conf"));
    ...
    list = loadRulesFromFile(file);
    ...
    this.rules = list;
    this.log = out;
}

咱們除了使用chennel.bind綁定地址之外,還能夠經過channel.socket().bind綁定。channel.socket()會建立一個內部的ServerSocket,ServerSocketAdaptor把實現了SocketServer的配置、綁定Socket Server的功能抽出提取到了ServerSocket中。

class ServerSocketChannelImpl extends ServerSocketChannel implements SelChImpl
{
    ...
    ServerSocket socket;
    public ServerSocket socket() {
        synchronized(this.stateLock) {
            if (this.socket == null) {
                this.socket = ServerSocketAdaptor.create(this);
            }
            return this.socket;
        }
    }
    ...
}

ServerSocketAdaptor實現了SocketServer的綁定,監聽等功能,但實際仍是調用ServerSocketChannelImpl的方法。

public class ServerSocketAdaptor
    extends ServerSocket
{
    private final ServerSocketChannelImpl ssc;

    public static ServerSocket create(ServerSocketChannelImpl ssc) {
        try {
            return new ServerSocketAdaptor(ssc);
        } catch (IOException x) {
            throw new Error(x);
        }
    }
    private ServerSocketAdaptor(ServerSocketChannelImpl ssc)
        throws IOException
    {
        this.ssc = ssc;
    }
    
    public void bind(SocketAddress local, int backlog) throws IOException {
        ...
        ssc.bind(local, backlog);
        ...
    }
    public Socket accept() throws IOException {
        synchronized (ssc.blockingLock()) {
            ...
            SocketChannel sc;
            if ((sc = ssc.accept()) != null)
            ...
        }
    }
    public void close() throws IOException {
        ssc.close();
    }

接收

咱們能夠經過channel.accept接收一個新的通道。

public SocketChannel accept() throws IOException {
    synchronized (lock) {
        //基本的校驗
        if (!isOpen())
            throw new ClosedChannelException();
        if (!isBound())
            throw new NotYetBoundException();
        SocketChannel sc = null;

        int n = 0;
        //建立文件描述符和接收到的客戶端socket進行綁定
        FileDescriptor newfd = new FileDescriptor();
        InetSocketAddress[] isaa = new InetSocketAddress[1];

        try {
            //I/O開始前調用begin
            begin();
            if (!isOpen())
                return null;
            thread = NativeThread.current();
            for (;;) {
                //接收新鏈接,將給定的文件描述符與socket客戶端綁定,並設置套接字客戶端的地址。
                n = accept(this.fd, newfd, isaa);
                //返回 1 成功
                //若返回IOStaus.INTERRUPTED表示系統調用了中斷操做,繼續等待接收。
                if ((n == IOStatus.INTERRUPTED) && isOpen())
                    continue;
                break;
            }
        } finally {
            thread = 0;
            //I/O結束調用end
            end(n > 0);
            assert IOStatus.check(n);
        }
        //返回 1 成功
        if (n < 1)
            return null;
        //默認都是阻塞模式
        IOUtil.configureBlocking(newfd, true);
        InetSocketAddress isa = isaa[0];
        //初始化客戶端socketchannel
        sc = new SocketChannelImpl(provider(), newfd, isa);
        //檢查權限
        SecurityManager sm = System.getSecurityManager();
        if (sm != null) {
            try {
                sm.checkAccept(isa.getAddress().getHostAddress(),
                                isa.getPort());
            } catch (SecurityException x) {
                sc.close();
                throw x;
            }
        }
        return sc;

    }
}

SecurityManager是安全管理器,用於權限校驗的類,若權限校驗不經過則會拋出AccessControlException異常。

SocketChannelImpl

SocketChannel生命週期

20191209174933.png

  • SocketChannel會有一個state標記當前的狀態,默認爲-1表示ST_UNINITIALIZED(未初始化)
  • 在構造函數最後會將state更新爲0(ST_UNCONNECTED,未鏈接)
  • 調用connect鏈接服務端,鏈接成功以前更新state爲1(ST_PENDING,待鏈接)
  • 鏈接成功時會更新state爲2(ST_CONNECTED,已鏈接)
  • 關閉通道時若I/O未完成時會將state更新爲3(ST_KILLPENDING,待釋放)
  • 當關閉通道後,且全部I/O已完成,會將state更新爲4(ST_KILLED,已釋放)

建立SocketChannel

SocketChannelServerSocketChannel相似,經過SocketChannel.open建立channel,和服務端相似也是經過provider調用SocketChannle的構造函數。

public static SocketChannel open() throws IOException {
    return SelectorProvider.provider().openSocketChannel();
}

鏈接

咱們也能夠經過open(SocketAddress remote)傳入鏈接地址,在建立後直接。

public static SocketChannel open(SocketAddress remote)
    throws IOException
{
    SocketChannel sc = open();
    try {
        sc.connect(remote);
    }
    ...
    return sc;
}

public boolean connect(SocketAddress sa) throws IOException {
    
    ...
    begin();
    if (localAddress == null) {
        NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort());
    }
    ...
    //創建鏈接
    n = Net.connect(fd, ia, isa.getPort());
    ...
    end((n > 0) || (n == IOStatus.UNAVAILABLE));
    ...
    //更新狀態
    state = ST_CONNECTED;
    // 非阻塞socket則更新狀態爲ST_PENDING
    if (!isBlocking())
        state = ST_PENDING;
    ...
}

NetHooks.beforeTcpConnectNetHooks.implBeforeTcpBind同樣,最終都是調用SdpProvider.convertTcpToSdpIfMatch

寫數據

將數據寫入channel時會調用IOUtil.write

public int write(ByteBuffer buf) throws IOException 
{
    ...
    begin();
    ...
    IOUtil.write(fd, buf, -1, nd);
    ...
    end(n > 0 || (n == IOStatus.UNAVAILABLE));
    ...
}

static int write(FileDescriptor fd, ByteBuffer src, long position, NativeDispatcher nd) throws IOException
{
    //使用直接緩衝區,則直接寫入到緩衝區中
    if (src instanceof DirectBuffer)
        return writeFromNativeBuffer(fd, src, position, nd);

    // 不是直接緩衝區
    int pos = src.position();
    int lim = src.limit();
    assert (pos <= lim);
    int rem = (pos <= lim ? lim - pos : 0);
    //獲取一個臨時的直接緩衝區地址。
    ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
    try {
        //寫入到臨時的直接緩衝區中
        bb.put(src);
        bb.flip();
        src.position(pos);
        //將直接緩衝區數據寫入
        int n = writeFromNativeBuffer(fd, bb, position, nd);
        if (n > 0) {
            //更新實際寫入量
            src.position(pos + n);
        }
        return n;
    } finally {
        //使用完緩衝釋放
        Util.offerFirstTemporaryDirectBuffer(bb);
    }
}

private static int writeFromNativeBuffer(FileDescriptor fd, ByteBuffer bb,
                                             long position, NativeDispatcher nd)
    throws IOException
{
    ...
    if (position != -1) {
        //socket不支持,文件支持
        written = nd.pwrite(fd, ((DirectBuffer)bb).address() + pos, rem, position);
    } else {
        //調用native方法
        written = nd.write(fd, ((DirectBuffer)bb).address() + pos, rem);
    }
    ...
}
int write(FileDescriptor fd, long address, int len) throws IOException {
        return write0(fd, address, len, append);
}
static native int write0(FileDescriptor fd, long address, int len, boolean append)
    throws IOException;

當咱們使用堆緩衝區時,須要從線程本地緩衝區申請一塊臨時的直接緩衝區用於存放臨時數據,會多一次內存複製。

因爲操做系統內核從設計上通常不容許直接訪問用戶空間,所以若使用堆緩衝區,則須要在堆外申請一個(內核)空間,而後將堆緩衝區的數據複製到堆外(直接/內核)緩衝區中。

ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
private static ThreadLocal<BufferCache> bufferCache = new ThreadLocal<BufferCache>()
{
    @Override
    protected BufferCache initialValue() {
        return new BufferCache();
    }
};
public static ByteBuffer getTemporaryDirectBuffer(int size) {
    //從線程緩衝區獲取一個緩衝區
    BufferCache cache = bufferCache.get();
    //獲取能容納的下的地址
    ByteBuffer buf = cache.get(size);
    if (buf != null) {
        return buf;
    } else {
        //當沒有合適的緩衝區時,
        if (!cache.isEmpty()) {
            //刪除第一個未用的緩衝區
            buf = cache.removeFirst();
            //釋放
            free(buf);
        }
        //從新分配一個合適大小的緩衝區
        return ByteBuffer.allocateDirect(size);
    }
}

ThreadLocal是利用空間換時間的一種方案性能優化方案,在每一個線程會有一個ThreadLocalMap用於存放數據。這樣每一個線程就訪問本身的數據,從而避免了非線程安全的數據因爲併發問題須要經過加鎖的方式同步帶來的性能損耗。

彙集寫

當咱們一次性向Channel寫入多個Buffer時,會有最大Buffer數量限制,也就是在Channel靜態構造函數獲取的IOV_MAX的值。一次性寫入多個Buffer在Linux中稱之爲彙集寫,即將內存中分散在的若干緩衝區中的數據按順序寫至文件中。

調用Channel的write(ByteBuffer[] srcs, int offset, int length)一次性寫入多個Buffer。

public long write(ByteBuffer[] srcs, int offset, int length) throws IOException 
{
    ...
    begin();
    ...
    IOUtil.write(fd, srcs, offset, length, nd);
    ...
    end((n > 0) || (n == IOStatus.UNAVAILABLE));
    ...        
}
write(FileDescriptor fd, ByteBuffer[] bufs, int offset, int length, NativeDispatcher nd)
{
    //獲取或建立一個指定長度的IOVecWrapper結構,它能夠存放多個Buffer數據。
    IOVecWrapper vec = IOVecWrapper.get(length);
    ...
    while (i < count && iov_len < IOV_MAX) {
        //遍歷每一塊待寫入緩衝區
        ByteBuffer buf = bufs[i];
        //計算buffer的可讀大小存放rem中
        ...
        //將buffer放入IOVecWrapper中
        if (rem > 0) {
            vec.setBuffer(iov_len, buf, pos, rem);  
            if (!(buf instanceof DirectBuffer)) {
                //若buf不是直接緩衝區則須要建立一個臨時的直接緩衝區
                ByteBuffer shadow = Util.getTemporaryDirectBuffer(rem);
                ...
                vec.setShadow(iov_len, shadow);
                ...
                //更新待寫入緩衝區引用指向直接緩衝區。
                buf = shadow;
            }
            //設置當前緩衝區的起始地址
            vec.putBase(iov_len, ((DirectBuffer)buf).address() + pos);
            //設置當前緩衝區的長度
            vec.putLen(iov_len, rem);
            iov_len++;
        }
        i++;
    }
    //調用writev一次性寫入多個緩衝區的數據。
    long bytesWritten = nd.writev(fd, vec.address, iov_len);
    ...
    long left = bytesWritten;
    //清理工做
    for (int j=0; j<iov_len; j++) {
        if (left > 0) {
            ByteBuffer buf = vec.getBuffer(j);
            int pos = vec.getPosition(j);
            int rem = vec.getRemaining(j);
            int n = (left > rem) ? rem : (int)left;
            buf.position(pos + n);
            left -= n;
        }
        ByteBuffer shadow = vec.getShadow(j);
        if (shadow != null)
        //將已寫入完成的緩衝區放回到臨時直接緩衝區中
            Util.offerLastTemporaryDirectBuffer(shadow);
        // 清楚IOVcMrapper的緩存
        vec.clearRefs(j);
    }
    return bytesWritten;
}

與彙集寫對應的還有readv()稱爲分散(散佈)讀,即將文件中若干連續的數據塊讀入內存分散的緩衝區中。
關於linux的彙集寫和散步讀具體內容能夠閱讀readv()和writev()函數分散讀取與彙集寫入

經過IOVecWrapper結構構建了一個字節緩衝區數組用於存放多個Buffer,其內部實際維護了一個Native數組結構。

class IOVecWrapper {
    ...
    private final AllocatedNativeObject vecArray;
    final long address;
    static int addressSize;
    private IOVecWrapper(int size) {
        ...
        //false表示無需頁面對齊
        this.vecArray  = new AllocatedNativeObject(size * SIZE_IOVEC, false);
        this.address   = vecArray.address();
    }
    ...
    static {
        //獲取本機指針的大小
        addressSize = Util.unsafe().addressSize();
        //保存每一個指針偏移量
        LEN_OFFSET = addressSize;
        //用於保存每一個AllocatedNativeObject對象的元素的大小
        //每一個NativeObject有兩個long屬性,所以須要×2
        SIZE_IOVEC = (short) (addressSize * 2);
    }
}

AllocatedNativeObject在堆外申請一片內存存放本機對象。

class AllocatedNativeObject extends NativeObject
{
    AllocatedNativeObject(int size, boolean pageAligned) {
        super(size, pageAligned);
    }    
}

class NativeObject {
    // native 分配地址,可能小於基地址,因爲頁面大小會對齊,因此實際的及地址時頁面對其後的地址。
    protected long allocationAddress;
    // native 基地址
    private final long address;

}

經過上面咱們知道當一次性批量寫入緩存時會調用native的writev彙集寫方法,調用以前會申請一塊地址用於存放可寫入的多塊緩衝區數據。如今咱們在回過頭看看IOVecWrapper具體是怎麼作的。

class IOVecWrapper {
    ...
    private final AllocatedNativeObject vecArray;
    private final ByteBuffer[] buf;
    private final int[] position;
    private final int[] remaining;
    private final ByteBuffer[] shadow;
    //每一個線程保存一份IOVecWrapper緩存
    private static final ThreadLocal<IOVecWrapper> cached =
        new ThreadLocal<IOVecWrapper>();
    //經過get獲取一塊適合大小的空間
    static IOVecWrapper get(int size) {
        IOVecWrapper wrapper = cached.get();
        if (wrapper != null && wrapper.size < size) {
            //若獲取到空間不夠大,則從新初始化一個空間。
            wrapper.vecArray.free();
            wrapper = null;
        }
        if (wrapper == null) {
            wrapper = new IOVecWrapper(size);
            //native資源,當對象釋放時使得操做系統能夠釋放內存,在將Buffer的時候關於直接緩衝區提到過相關的知識
            Cleaner.create(wrapper, new Deallocator(wrapper.vecArray));
            cached.set(wrapper);
        }
        return wrapper;
    }
    //將buffer保存起來
    void setBuffer(int i, ByteBuffer buf, int pos, int rem) {
        this.buf[i] = buf;
        this.position[i] = pos;
        this.remaining[i] = rem;
    }
    ...
}

因爲IOVecWrapper內部使用的是直接緩衝區,所以將它存放於ThreadLocalMap中複用以提升性能。

讀數據

從channel讀取數據時會調用IOUtil.read

public int read(ByteBuffer buf) throws IOException {
    ...
    begin();
    ...
    n = IOUtil.read(fd, buf, -1, nd);
    ...
    end(n > 0 || (n == IOStatus.UNAVAILABLE));
    ...
}

static int read(FileDescriptor fd, ByteBuffer dst, long position, NativeDispatcher nd) throws IOException
{
    if (dst.isReadOnly())
        throw new IllegalArgumentException("Read-only buffer");
    if (dst instanceof DirectBuffer)
        return readIntoNativeBuffer(fd, dst, position, nd);//使用直接緩衝區

    //當不是使用直接內存時,則從線程本地緩衝獲取一塊臨時的直接緩衝區存放待讀取的數據
    ByteBuffer bb = Util.getTemporaryDirectBuffer(dst.remaining());
    try {
        //讀取
        int n = readIntoNativeBuffer(fd, bb, position, nd);
        bb.flip();
        if (n > 0)
        //將直接緩衝區的數據寫入到堆緩衝區中
            dst.put(bb);
        return n;
    } finally {
        //釋放臨時的直接緩衝區
        Util.offerFirstTemporaryDirectBuffer(bb);
    }
}
private static int readIntoNativeBuffer(FileDescriptor fd, ByteBuffer bb, long position, NativeDispatcher nd) throws IOException
{
    ...
    if (position != -1) {
        //socket不支持,文件支持
        n = nd.pread(fd, ((DirectBuffer)bb).address() + pos,
                        rem, position);
    } else {
        n = nd.read(fd, ((DirectBuffer)bb).address() + pos, rem);
    }
    ...
}

分散讀

前面說到,Channel也支持分散讀取。

調用Channel的read(ByteBuffer[] dsts, int offset, int length)讀取到多個Buffer中。

public long read(ByteBuffer[] dsts, int offset, int length)
    throws IOException
{
    ...
    begin();
    ...
    n = IOUtil.read(fd, dsts, offset, length, nd);
    ...
    end(n > 0 || (n == IOStatus.UNAVAILABLE));
    ...
}

static long read(FileDescriptor fd, ByteBuffer[] bufs, NativeDispatcher nd)
        throws IOException
{
    return read(fd, bufs, 0, bufs.length, nd);
}

static long read(FileDescriptor fd, ByteBuffer[] bufs, int offset, int length,
                    NativeDispatcher nd)
    throws IOException
{
    IOVecWrapper vec = IOVecWrapper.get(length);
    ...
    while (i < count && iov_len < IOV_MAX) {
        ByteBuffer buf = bufs[i];
        ...
        vec.setBuffer(iov_len, buf, pos, rem);

        if (!(buf instanceof DirectBuffer)) {
            ByteBuffer shadow = Util.getTemporaryDirectBuffer(rem);
            vec.setShadow(iov_len, shadow);
        ...
    }
    ...
    long bytesRead = nd.readv(fd, vec.address, iov_len);
    //清理工做
    ...
    return bytesRead;
    //清理工做
    ...
}

分散讀和彙集寫邏輯相似,都須要經過IOVecWrapper進行一個「中轉」。

關閉通道

經過調用channel.close關閉通道,在接口實現裏咱們看過它實際是AbstractInterruptibleChannel聲明的。

public final void close() throws IOException {
    synchronized(this.closeLock) {
        if (this.open) {
            this.open = false;
            this.implCloseChannel();
        }
    }
}

AbstractSelectableChannel實現了implCloseChannel

protected final void implCloseChannel() throws IOException {
    //關閉當前channel
    implCloseSelectableChannel();
    synchronized (keyLock) {
        int count = (keys == null) ? 0 : keys.length;
        for (int i = 0; i < count; i++) {
            SelectionKey k = keys[i];
            if (k != null)
            //註冊的channel都須要取消
                k.cancel();
        }
    }
}

當channel註冊到selector時會建立SelectionKey保存到keys中。

this.implCloseSelectableChannel();
protected void implCloseSelectableChannel() throws IOException {
    synchronized(this.stateLock) {
        this.isInputOpen = false;
        this.isOutputOpen = false;
        if (this.state != 4) {
            //windows不作處理
            //linux和Solaris須要,關閉前將fd複製到另外一個待關閉fd中,以防止被fd回收
            nd.preClose(this.fd);
        }

        if (this.readerThread != 0L) {
            //發送信號給讀線程,將其從阻塞I/O中釋放,避免一直被阻塞。
            NativeThread.signal(this.readerThread);
        }

        if (this.writerThread != 0L) {
            //發送信號給寫線程,將其從阻塞I/O中釋放,避免一直被阻塞。
            NativeThread.signal(this.writerThread);
        }
        //若還有註冊的channel,則不處理,等待key所有註銷後再kill
        //若沒有的話能夠直接kill當前channel
        if (!this.isRegistered()) {
            this.kill();
        }
    }
}

kill方法是在具體Channel中實現的,最終調用nd的close方法關閉文件描述符。

//SocketChannel
public void kill() throws IOException {
    synchronized(this.stateLock) {
        if (this.state != 4) {
            if (this.state == -1) {
                this.state = 4;
            } else {
                assert !this.isOpen() && !this.isRegistered();
                //若仍有線程還沒釋放,則等線程I/O執行完後再kill
                if (this.readerThread == 0L && this.writerThread == 0L) {
                    nd.close(this.fd);
                    this.state = 4;
                } else {
                    this.state = 3;
                }

            }
        }
    }
}
//ServerSocketChannel
public void kill() throws IOException {
        synchronized(this.stateLock) {
            if (this.state != 1) {
                if (this.state == -1) {
                    this.state = 1;
                } else {
                    assert !this.isOpen() && !this.isRegistered();

                    nd.close(this.fd);
                    this.state = 1;
                }
            }
        }
    }

ServerSocketChannel的僅有-1(未初始化)、0(使用中)和1(已釋放)三個狀態。

半鏈接

NIO支持tcp的半鏈接,因爲TCP是全雙工的,即有輸入流和輸出流。在某些時候咱們能夠中斷其中一個流,而另外一個流仍然能夠繼續工做。好比做爲客戶端咱們能夠關閉輸出流,可是仍然能繼續接收到服務端發送的數據。

//關閉輸入流
client.socket().shutdownInput();
//關閉輸出流
client.socket().shutdownOutput();

當客戶端關閉了輸出流,實際上會送FIN包到服務端,服務端接收到後響應ACK,若服務端不發送FIN包,關閉服務端的輸出流(客戶端的輸入流)時,則服務端仍然能繼續發送(響應)數據給客戶端,客戶端也仍然能夠繼續接收到數據。NIO的Pipe就是經過兩個socket的半鏈接實現的單項數據傳輸。

總結

本篇對客戶端和服務端的socket,綁定、鏈接、讀、寫以及關閉等經常使用操做進行源碼分析,下一篇將繼續對FileChannel源碼進行探究。

相關文獻

  1. 史上最強Java NIO入門:擔憂從入門到放棄的,請讀這篇!
  2. Java NIO系列教程
  3. 基於Java的RDMA高性能通訊庫(六):SDP - Java Socket Direct Protocol
  4. Java安全管理器SecurityManager
  5. 操做系統內核空間和用戶空間的互訪問
  6. BIO到NIO源碼的一些事兒之NIO中
  7. ThreadLocal
  8. 根據文件描述符fd獲取socket結構體
  9. struct socket結構體詳解
  10. readv()和writev()函數
  11. 分散讀取與彙集寫入

20191127212134.png
微信掃一掃二維碼關注訂閱號傑哥技術分享
出處:http://www.javashuo.com/article/p-tduwjpgh-ge.html 做者:傑哥很忙 本文使用「CC BY 4.0」創做共享協議。歡迎轉載,請在明顯位置給出出處及連接。

相關文章
相關標籤/搜索