上篇[【從入門到放棄-Java】併發編程-NIO使用]()簡單介紹了nio的基礎使用,本篇將深刻源碼分析nio中channel的實現。java
channel即通道,能夠用來讀、寫數據,它是全雙工的能夠同時用來讀寫操做。這也是它與stream流的最大區別。編程
channel須要與buffer配合使用,channel通道的一端是buffer,一端是數據源實體,如文件、socket等。在nio中,經過channel的不一樣實現來處理 不一樣實體與數據buffer中的數據傳輸。數組
channel接口:安全
package java.nio.channels; import java.io.IOException; import java.io.Closeable; /** * A nexus for I/O operations. * * <p> A channel represents an open connection to an entity such as a hardware * device, a file, a network socket, or a program component that is capable of * performing one or more distinct I/O operations, for example reading or * writing. * * <p> A channel is either open or closed. A channel is open upon creation, * and once closed it remains closed. Once a channel is closed, any attempt to * invoke an I/O operation upon it will cause a {@link ClosedChannelException} * to be thrown. Whether or not a channel is open may be tested by invoking * its {@link #isOpen isOpen} method. * * <p> Channels are, in general, intended to be safe for multithreaded access * as described in the specifications of the interfaces and classes that extend * and implement this interface. * * * @author Mark Reinhold * @author JSR-51 Expert Group * @since 1.4 */ public interface Channel extends Closeable { /** * Tells whether or not this channel is open. * * @return <tt>true</tt> if, and only if, this channel is open */ public boolean isOpen(); /** * Closes this channel. * * <p> After a channel is closed, any further attempt to invoke I/O * operations upon it will cause a {@link ClosedChannelException} to be * thrown. * * <p> If this channel is already closed then invoking this method has no * effect. * * <p> This method may be invoked at any time. If some other thread has * already invoked it, however, then another invocation will block until * the first invocation is complete, after which it will return without * effect. </p> * * @throws IOException If an I/O error occurs */ public void close() throws IOException; }
常見的channel實現有:網絡
FileChannel是一個抽象類,它繼承了AbstractInterruptibleChannel類,並實現了 SeekableByteChannel, GatheringByteChannel, ScatteringByteChannel接口。
具體的實現類主要是sun.nio.ch.FileChannelImpl。下面詳細分析下FileChannelImpl中每一個方法的具體實現。併發
private FileChannelImpl(FileDescriptor var1, String var2, boolean var3, boolean var4, boolean var5, Object var6) { //主要記載操做系統維護的文件描述符 this.fd = var1; //是否可讀 this.readable = var3; //是否可寫 this.writable = var4; //是否以追加的方式打開 this.append = var5; this.parent = var6; this.path = var2; //底層使用native的read和write來處理文件的 this.nd = new FileDispatcherImpl(var5); } //FileInputStream::getChannel 調用 FileChannelImpl.open(fd, path, true, false, this) 獲取只讀channel public static FileChannel open(FileDescriptor var0, String var1, boolean var2, boolean var3, Object var4) { return new FileChannelImpl(var0, var1, var2, var3, false, var4); } //FileOutputStream::getChannel 調用 FileChannelImpl.open(fd, path, false, true, append, this) 獲取只寫channel public static FileChannel open(FileDescriptor var0, String var1, boolean var2, boolean var3, boolean var4, Object var5) { return new FileChannelImpl(var0, var1, var2, var3, var4, var5); } private FileChannelImpl(FileDescriptor fd, String path, boolean readable, boolean writable, boolean direct, Object parent) { this.fd = fd; //是否可讀 this.readable = readable; //是否可寫 this.writable = writable; //對於從流建立的channel,在結束時要作不一樣的清理動做,(openJDK中才有,sun的jdk中沒有) this.parent = parent; //源文件的path this.path = path; //是否使用DirectIO this.direct = direct; this.nd = new FileDispatcherImpl(); if (direct) { assert path != null; this.alignment = nd.setDirectIO(fd, path); } else { this.alignment = -1; } //當parent不存在時,則註冊一個cleaner,不然交由parent作清理動做。 // Register a cleaning action if and only if there is no parent // as the parent will take care of closing the file descriptor. // FileChannel is used by the LambdaMetaFactory so a lambda cannot // be used here hence we use a nested class instead. this.closer = parent != null ? null : CleanerFactory.cleaner().register(this, new Closer(fd)); } // Used by FileInputStream.getChannel(), FileOutputStream.getChannel // and RandomAccessFile.getChannel() public static FileChannel open(FileDescriptor fd, String path, boolean readable, boolean writable, boolean direct, Object parent) { return new FileChannelImpl(fd, path, readable, writable, direct, parent); }
//實現自SeekableByteChannel接口的方法,將文件中的內容讀取到給定的byteBuffer中 public int read(ByteBuffer dst) throws IOException { //保證讀寫時,channel處於開啓狀態 ensureOpen(); //判斷是否可讀 if (!readable) throw new NonReadableChannelException(); synchronized (positionLock) { if (direct) Util.checkChannelPositionAligned(position(), alignment); int n = 0; int ti = -1; try { //開始阻塞,並註冊爲Interruptible,能夠被中斷 beginBlocking(); //將當前線程添加到NativeThreadSet中,並返回索引,方便後續操做。 //NativeThreadSet是一個線程安全的本地線程集合,方便管理,用來發送信號 ti = threads.add(); if (!isOpen()) return 0; do { //當未被系統中斷(即讀取完畢)或channel未被關閉,則一直讀,將內容寫入到byteBuffer(dst)中 n = IOUtil.read(fd, dst, -1, direct, alignment, nd); } while ((n == IOStatus.INTERRUPTED) && isOpen()); return IOStatus.normalize(n); } finally { //把當前線程從set中移出 threads.remove(ti); //結束,釋放鎖 endBlocking(n > 0); assert IOStatus.check(n); } } } //實現自ScatteringByteChannel接口的方法,將文件中的內容依次讀取到給定的byteBuffer數組中。 public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { if ((offset < 0) || (length < 0) || (offset > dsts.length - length)) throw new IndexOutOfBoundsException(); //保證讀寫時,channel處於開啓狀態 ensureOpen(); //判斷是否可讀 if (!readable) throw new NonReadableChannelException(); synchronized (positionLock) { if (direct) Util.checkChannelPositionAligned(position(), alignment); long n = 0; int ti = -1; try { //開始阻塞,並註冊爲Interruptible,能夠被中斷 beginBlocking(); //將當前線程添加到NativeThreadSet中,並返回索引,方便後續操做。 //NativeThreadSet是一個線程安全的本地線程集合,方便管理,用來發送信號 ti = threads.add(); if (!isOpen()) return 0; do { //當未被系統中斷(即讀取完畢)或channel未被關閉,則一直讀,將內容寫入到byteBuffer(dst)中 n = IOUtil.read(fd, dsts, offset, length, direct, alignment, nd); } while ((n == IOStatus.INTERRUPTED) && isOpen()); return IOStatus.normalize(n); } finally { //把當前線程從set中移出 threads.remove(ti); //結束,釋放鎖 endBlocking(n > 0); assert IOStatus.check(n); } } }
//實現自SeekableByteChannel接口的方法,將byteBuffer中的內容寫入到文件中 public int write(ByteBuffer src) throws IOException { //保證寫時,channel處於開啓狀態 ensureOpen(); //判斷是否可寫 if (!writable) throw new NonWritableChannelException(); synchronized (positionLock) { if (direct) Util.checkChannelPositionAligned(position(), alignment); int n = 0; int ti = -1; try { //開始阻塞,並註冊爲Interruptible,能夠被中斷 beginBlocking(); //將當前線程添加到NativeThreadSet中,並返回索引,方便後續操做。 //NativeThreadSet是一個線程安全的本地線程集合,方便管理,用來發送信號 ti = threads.add(); if (!isOpen()) return 0; do { //當未被系統中斷(即寫入完畢)或channel未被關閉,則一直寫,將內容寫入到文件中 n = IOUtil.write(fd, src, -1, direct, alignment, nd); } while ((n == IOStatus.INTERRUPTED) && isOpen()); return IOStatus.normalize(n); } finally { //把當前線程從set中移出 threads.remove(ti); //結束,釋放鎖 assert IOStatus.check(n); } } } //實現自GatheringByteChannel接口的方法,將byteBuffer數組中的內容依次寫入到文件中 public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { if ((offset < 0) || (length < 0) || (offset > srcs.length - length)) throw new IndexOutOfBoundsException(); //保證寫時,channel處於開啓狀態 ensureOpen(); //判斷是否可寫 if (!writable) throw new NonWritableChannelException(); synchronized (positionLock) { if (direct) Util.checkChannelPositionAligned(position(), alignment); long n = 0; int ti = -1; try { //開始阻塞,並註冊爲Interruptible,能夠被中斷 beginBlocking(); //將當前線程添加到NativeThreadSet中,並返回索引,方便後續操做。 //NativeThreadSet是一個線程安全的本地線程集合,方便管理,用來發送信號 ti = threads.add(); if (!isOpen()) return 0; do { //當未被系統中斷(即寫入完畢)或channel未被關閉,則一直寫,將內容寫入到文件中 n = IOUtil.write(fd, srcs, offset, length, direct, alignment, nd); } while ((n == IOStatus.INTERRUPTED) && isOpen()); return IOStatus.normalize(n); } finally { //把當前線程從set中移出 threads.remove(ti); //結束,釋放鎖 assert IOStatus.check(n); } } }
//實現自SeekableByteChannel接口的方法,獲取當前channel的position public long position() throws IOException { ensureOpen(); synchronized (positionLock) { long p = -1; int ti = -1; try { beginBlocking(); ti = threads.add(); if (!isOpen()) return 0; boolean append = fdAccess.getAppend(fd); do { //append模式下,position在channel的末尾 // in append-mode then position is advanced to end before writing p = (append) ? nd.size(fd) : nd.seek(fd, -1); } while ((p == IOStatus.INTERRUPTED) && isOpen()); return IOStatus.normalize(p); } finally { threads.remove(ti); endBlocking(p > -1); assert IOStatus.check(p); } } } //實現自SeekableByteChannel接口的方法,設置當前channel的position爲newPosition public FileChannel position(long newPosition) throws IOException { ensureOpen(); if (newPosition < 0) throw new IllegalArgumentException(); synchronized (positionLock) { long p = -1; int ti = -1; try { beginBlocking(); ti = threads.add(); if (!isOpen()) return null; do { //設置當前position爲newPosition p = nd.seek(fd, newPosition); } while ((p == IOStatus.INTERRUPTED) && isOpen()); return this; } finally { threads.remove(ti); endBlocking(p > -1); assert IOStatus.check(p); } } }
實現自SeekableByteChannel接口的方法,返回當前實體(文件)的大小app
實現自SeekableByteChannel接口的方法,用來截取文件至newSize大小dom
實現自SeekableByteChannel接口的方法,用來將channel中還沒有寫入磁盤的數據強制落盤socket
將fileChannel中的數據傳遞至另外一個channeltcp
從其它channel讀取數據至fileChannel
/** * Opens a socket channel. * * <p> The new channel is created by invoking the {@link * java.nio.channels.spi.SelectorProvider#openSocketChannel * openSocketChannel} method of the system-wide default {@link * java.nio.channels.spi.SelectorProvider} object. </p> * * @return A new socket channel * * @throws IOException * If an I/O error occurs */ public static SocketChannel open() throws IOException { return SelectorProvider.provider().openSocketChannel(); }
open方法是調用SelectorProvider中實現了java.nio.channels.spi.SelectorProvider#openSocketChannel的方法,底層實際是new SocketChannelImpl,調用native方法建立socket
public boolean connect(SocketAddress sa) throws IOException { //校驗Address是否合法 InetSocketAddress isa = Net.checkAddress(sa); //獲取系統安全管理器 SecurityManager sm = System.getSecurityManager(); if (sm != null) //校驗IP和端口是否被容許鏈接 sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort()); InetAddress ia = isa.getAddress(); //若是是本機地址,則獲取本機的host if (ia.isAnyLocalAddress()) ia = InetAddress.getLocalHost(); try { //加讀鎖 readLock.lock(); try { //加寫鎖 writeLock.lock(); try { int n = 0; //是否阻塞 boolean blocking = isBlocking(); try { //開啓connect前的校驗並設置爲ST_CONNECTIONPENDING,若是blocking是true 即阻塞模式,則記錄當前線程的ID,以便接收信號處理。 beginConnect(blocking, isa); do { //調用native connect方法 n = Net.connect(fd, ia, isa.getPort()); } while (n == IOStatus.INTERRUPTED && isOpen()); } finally { //結束鏈接 endConnect(blocking, (n > 0)); } assert IOStatus.check(n); return n > 0; } finally { //釋放寫鎖 writeLock.unlock(); } } finally { //釋放讀鎖 readLock.unlock(); } } catch (IOException ioe) { // connect failed, close the channel close(); throw SocketExceptions.of(ioe, isa); } }
實現自SelectableChannel的接口方法,調用native方法設置socket的阻塞狀態
在AbstractSelectableChannel中定義,註冊要監聽的事件。
public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException { if ((ops & ~validOps()) != 0) throw new IllegalArgumentException(); if (!isOpen()) throw new ClosedChannelException(); synchronized (regLock) { if (isBlocking()) throw new IllegalBlockingModeException(); synchronized (keyLock) { // re-check if channel has been closed if (!isOpen()) throw new ClosedChannelException(); SelectionKey k = findKey(sel); if (k != null) { k.attach(att); k.interestOps(ops); } else { // 向Selector中註冊事件 // New registration k = ((AbstractSelector)sel).register(this, ops, att); addKey(k); } return k; } } }
//實現自ReadableByteChannel接口的方法,從socket中讀取數據至ByteBuffer @Override public int read(ByteBuffer buf) throws IOException { Objects.requireNonNull(buf); readLock.lock(); try { boolean blocking = isBlocking(); int n = 0; try { //檢查channel是否開啓並已是connected的狀態。若是blocking是true 即阻塞模式,則記錄當前線程的ID,以便接收信號處理。 beginRead(blocking); // check if input is shutdown if (isInputClosed) return IOStatus.EOF; //若是是阻塞模式,則一直讀取直到數據讀取完畢;非阻塞模式則直接調用native方法不須要等待。 if (blocking) { do { n = IOUtil.read(fd, buf, -1, nd); } while (n == IOStatus.INTERRUPTED && isOpen()); } else { n = IOUtil.read(fd, buf, -1, nd); } } finally { endRead(blocking, n > 0); if (n <= 0 && isInputClosed) return IOStatus.EOF; } return IOStatus.normalize(n); } finally { readLock.unlock(); } } //實現自ScatteringByteChannel接口的方法,從socket中依次讀取數據至ByteBuffer數組 @Override public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { Objects.checkFromIndexSize(offset, length, dsts.length); readLock.lock(); try { boolean blocking = isBlocking(); long n = 0; try { beginRead(blocking); // check if input is shutdown if (isInputClosed) return IOStatus.EOF; //若是是阻塞模式,則一直讀取直到數據讀取完畢;非阻塞模式則直接調用native方法不須要等待。 if (blocking) { do { n = IOUtil.read(fd, dsts, offset, length, nd); } while (n == IOStatus.INTERRUPTED && isOpen()); } else { n = IOUtil.read(fd, dsts, offset, length, nd); } } finally { endRead(blocking, n > 0); if (n <= 0 && isInputClosed) return IOStatus.EOF; } return IOStatus.normalize(n); } finally { readLock.unlock(); } }
//實現自ReadableByteChannel接口的方法,將ByteBuffer中的數據寫入socket @Override public int write(ByteBuffer buf) throws IOException { Objects.requireNonNull(buf); writeLock.lock(); try { boolean blocking = isBlocking(); int n = 0; try { beginWrite(blocking); //若是是阻塞模式,則一直讀取直到數據讀取完畢;非阻塞模式則直接調用native方法不須要等待。 if (blocking) { do { n = IOUtil.write(fd, buf, -1, nd); } while (n == IOStatus.INTERRUPTED && isOpen()); } else { n = IOUtil.write(fd, buf, -1, nd); } } finally { endWrite(blocking, n > 0); if (n <= 0 && isOutputClosed) throw new AsynchronousCloseException(); } return IOStatus.normalize(n); } finally { writeLock.unlock(); } } @Override public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { Objects.checkFromIndexSize(offset, length, srcs.length); writeLock.lock(); try { boolean blocking = isBlocking(); long n = 0; try { beginWrite(blocking); //若是是阻塞模式,則一直等待直到數據寫入完畢;非阻塞模式則直接調用native方法不須要等待。 if (blocking) { do { n = IOUtil.write(fd, srcs, offset, length, nd); } while (n == IOStatus.INTERRUPTED && isOpen()); } else { n = IOUtil.write(fd, srcs, offset, length, nd); } } finally { endWrite(blocking, n > 0); if (n <= 0 && isOutputClosed) throw new AsynchronousCloseException(); } return IOStatus.normalize(n); } finally { writeLock.unlock(); } } //實現自ReadableByteChannel接口的方法,將ByteBuffer數組中的數據依次寫入socket /** * Writes a byte of out of band data. */ int sendOutOfBandData(byte b) throws IOException { writeLock.lock(); try { boolean blocking = isBlocking(); int n = 0; try { beginWrite(blocking); //若是是阻塞模式,則一直等待直到數據寫入完畢;非阻塞模式則直接調用native方法不須要等待。 if (blocking) { do { n = sendOutOfBandData(fd, b); } while (n == IOStatus.INTERRUPTED && isOpen()); } else { n = sendOutOfBandData(fd, b); } } finally { endWrite(blocking, n > 0); if (n <= 0 && isOutputClosed) throw new AsynchronousCloseException(); } return IOStatus.normalize(n); } finally { writeLock.unlock(); } }
@Override public ServerSocket socket() { synchronized (stateLock) { if (socket == null) socket = ServerSocketAdaptor.create(this); return socket; } }
@Override public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException { synchronized (stateLock) { ensureOpen(); if (localAddress != null) throw new AlreadyBoundException(); InetSocketAddress isa = (local == null) ? new InetSocketAddress(0) : Net.checkAddress(local); SecurityManager sm = System.getSecurityManager(); if (sm != null) sm.checkListen(isa.getPort()); //綁定前作一些前置處理,如將tcp socket文件描述符轉換成SDP NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort()); //綁定IP和地址 Net.bind(fd, isa.getAddress(), isa.getPort()); //開始監聽,設置socket上最多能夠掛起backlog個鏈接,若backlog小於1 則默認設置50個 Net.listen(fd, backlog < 1 ? 50 : backlog); localAddress = Net.localAddress(fd); } return this; }
@Override public SocketChannel accept() throws IOException { acceptLock.lock(); try { int n = 0; FileDescriptor newfd = new FileDescriptor(); InetSocketAddress[] isaa = new InetSocketAddress[1]; boolean blocking = isBlocking(); try { begin(blocking); do { //阻塞等待接收客戶端連接 n = accept(this.fd, newfd, isaa); } while (n == IOStatus.INTERRUPTED && isOpen()); } finally { end(blocking, n > 0); assert IOStatus.check(n); } if (n < 1) return null; //新接收的socket初始設置爲阻塞模式(所以非阻塞模式的每次須要顯示設置) // newly accepted socket is initially in blocking mode IOUtil.configureBlocking(newfd, true); InetSocketAddress isa = isaa[0]; //用新接收的socket建立SocketChannel SocketChannel sc = new SocketChannelImpl(provider(), newfd, isa); // check permitted to accept connections from the remote address SecurityManager sm = System.getSecurityManager(); if (sm != null) { try { sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort()); } catch (SecurityException x) { sc.close(); throw x; } } return sc; } finally { acceptLock.unlock(); } }
ServerSocketChannel並無read和write方法,只是繼承了AbstractSelectableChannel,以便在selector中使用
public DatagramChannelImpl(SelectorProvider sp) throws IOException { super(sp); ResourceManager.beforeUdpCreate(); try { //若是不支持IPv6則使用IPv4 this.family = Net.isIPv6Available() ? StandardProtocolFamily.INET6 : StandardProtocolFamily.INET; //設置非流式的socket(tcp是流模式協議,udp是數據報模式協議) this.fd = Net.socket(family, false); this.fdVal = IOUtil.fdVal(fd); } catch (IOException ioe) { ResourceManager.afterUdpClose(); throw ioe; } }
public SocketAddress receive(ByteBuffer dst) throws IOException { if (dst.isReadOnly()) throw new IllegalArgumentException("Read-only buffer"); readLock.lock(); try { boolean blocking = isBlocking(); int n = 0; ByteBuffer bb = null; try { SocketAddress remote = beginRead(blocking, false); boolean connected = (remote != null); SecurityManager sm = System.getSecurityManager(); if (connected || (sm == null)) { // connected or no security manager do { n = receive(fd, dst, connected); } while ((n == IOStatus.INTERRUPTED) && isOpen()); if (n == IOStatus.UNAVAILABLE) return null; } else { // Cannot receive into user's buffer when running with a // security manager and not connected bb = Util.getTemporaryDirectBuffer(dst.remaining()); for (;;) { do { n = receive(fd, bb, connected); } while ((n == IOStatus.INTERRUPTED) && isOpen()); if (n == IOStatus.UNAVAILABLE) return null; InetSocketAddress isa = (InetSocketAddress)sender; try { sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort()); } catch (SecurityException se) { // Ignore packet bb.clear(); n = 0; continue; } bb.flip(); dst.put(bb); break; } } //sender:發送方地址, Set by receive0 (## ugh) assert sender != null; return sender; } finally { if (bb != null) Util.releaseTemporaryDirectBuffer(bb); endRead(blocking, n > 0); assert IOStatus.check(n); } } finally { readLock.unlock(); } }
public int send(ByteBuffer src, SocketAddress target) throws IOException { Objects.requireNonNull(src); InetSocketAddress isa = Net.checkAddress(target, family); writeLock.lock(); try { boolean blocking = isBlocking(); int n = 0; try { //當connect後,remote會設置爲鏈接的地址 SocketAddress remote = beginWrite(blocking, false); if (remote != null) { // connected if (!target.equals(remote)) { throw new AlreadyConnectedException(); } do { n = IOUtil.write(fd, src, -1, nd); } while ((n == IOStatus.INTERRUPTED) && isOpen()); } else { // not connected SecurityManager sm = System.getSecurityManager(); if (sm != null) { InetAddress ia = isa.getAddress(); if (ia.isMulticastAddress()) { sm.checkMulticast(ia); } else { sm.checkConnect(ia.getHostAddress(), isa.getPort()); } } do { n = send(fd, src, isa); } while ((n == IOStatus.INTERRUPTED) && isOpen()); } } finally { endWrite(blocking, n > 0); assert IOStatus.check(n); } return IOStatus.normalize(n); } finally { writeLock.unlock(); } }
@Override public DatagramChannel connect(SocketAddress sa) throws IOException { InetSocketAddress isa = Net.checkAddress(sa, family); SecurityManager sm = System.getSecurityManager(); if (sm != null) { InetAddress ia = isa.getAddress(); if (ia.isMulticastAddress()) { sm.checkMulticast(ia); } else { sm.checkConnect(ia.getHostAddress(), isa.getPort()); sm.checkAccept(ia.getHostAddress(), isa.getPort()); } } readLock.lock(); try { writeLock.lock(); try { synchronized (stateLock) { ensureOpen(); if (state == ST_CONNECTED) throw new AlreadyConnectedException(); int n = Net.connect(family, fd, isa.getAddress(), isa.getPort()); if (n <= 0) throw new Error(); // Can't happen // connected remoteAddress = isa; state = ST_CONNECTED; // refresh local address localAddress = Net.localAddress(fd); // flush any packets already received. boolean blocking = isBlocking(); if (blocking) { IOUtil.configureBlocking(fd, false); } try { ByteBuffer buf = ByteBuffer.allocate(100); while (receive(buf) != null) { buf.clear(); } } finally { if (blocking) { IOUtil.configureBlocking(fd, true); } } } } finally { writeLock.unlock(); } } finally { readLock.unlock(); } return this; }
udp是數據報模式的協議,是沒有connect的。這裏的connect其實是在底層忽略了與其餘地址的數據傳輸。
在connect後,就能夠像socketChannel似得使用read和write了
本文學習了各類channel的實現,主要是對底層native方法的一些封裝,針對不一樣屬性的實體(文件、socket),使用對應的channel與byteBuffer傳輸數據。再經過byteBuffer與byte數據進行轉換。
channel的實現中,封裝了大量的native方法,重要的底層實現全在native中,後續能夠深刻學習下。
本文中出現的byteBuffer和selector將在接下來的文章中,單獨分析。
原文連接 本文爲雲棲社區原創內容,未經容許不得轉載。