原本是想學習Netty的,可是Netty是一個NIO框架,所以在學習netty以前,仍是先梳理一下NIO的知識。經過剖析源碼理解NIO的設計原理。html
本系列文章針對的是JDK1.8.0.161的源碼。java
上一篇介紹了Channel的接口,本篇對SocektChannel的源碼進行解析。linux
咱們經過ServerSocketChannel.open()
建立一個ServerSocketChannel
,它實際經過provider
建立。c++
public abstract class ServerSocketChannel extends AbstractSelectableChannel implements NetworkChannel {
protected ServerSocketChannel(SelectorProvider provider) {
super(provider);
}
public static ServerSocketChannel open() throws IOException {
return SelectorProvider.provider().openServerSocketChannel();
}
...
}
複製代碼
在首次建立時,建立初始化SelectorProvider
對象。git
public static SelectorProvider provider() {
synchronized(lock) {
return provider != null ? provider : (SelectorProvider)AccessController.doPrivileged(new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (SelectorProvider.loadProviderFromProperty()) {
return SelectorProvider.provider;
} else if (SelectorProvider.loadProviderAsService()) {
return SelectorProvider.provider;
} else {
SelectorProvider.provider = DefaultSelectorProvider.create();
return SelectorProvider.provider;
}
}
});
}
}
複製代碼
具體
SelcetProvider
實如今《NIO-Selector》一節講解。github
privoder
建立完成後經過openServerSocketChannel
建立ServiceSocketChannel
。編程
public ServerSocketChannel openServerSocketChannel() throws IOException {
return new ServerSocketChannelImpl(this);
}
複製代碼
在ServerSocketChannelImpl
靜態構造函數中會進行一些初始化操做。windows
class ServerSocketChannelImpl extends ServerSocketChannel implements SelChImpl {
private static NativeDispatcher nd;
static {
IOUtil.load();
initIDs();
nd = new SocketDispatcher();
}
private static native void initIDs();
...
}
複製代碼
經過靜態構造函數在首次建立時經過IOUtil.load()
初始化一些必要的參數。IOUtil的靜態構造方法來加載net和nio的類庫。數組
public static void load() { }
static {
java.security.AccessController.doPrivileged(
new java.security.PrivilegedAction<Void>() {
public Void run() {
//加載net和nio的庫
//net主要是應用層的一些協議實現,如FTP,Http
System.loadLibrary("net");
System.loadLibrary("nio");
return null;
}
});
initIDs();
IOV_MAX = iovMax();
}
複製代碼
經過System.loadLibrary
加載後就能夠調用該類的native方法。緩存
initIDs
是一個native方法,這裏對windows下的native源碼進行說明。在native\sun\nio\ch\ServerSocketChannelImpl.c
能夠看到ServerSocketChannelImpl的initIDs的代碼
Java_sun_nio_ch_ServerSocketChannelImpl_initIDs(JNIEnv *env, jclass cls)
{
//查找類型FileDescriptor
cls = (*env)->FindClass(env, "java/io/FileDescriptor");
//獲取class的字段fd,I是int類型的意思,將字段id保存到fd_fdID,之後要獲取該字段就能夠直接用fd_fdID去獲取
fd_fdID = (*env)->GetFieldID(env, cls, "fd", "I");
//查找InetSocketAddress類型
cls = (*env)->FindClass(env, "java/net/InetSocketAddress");
//建立一個引用指向cls,後面能夠經過cls操做這個java對象
isa_class = (*env)->NewGlobalRef(env, cls);
//獲取構造函數的方法id,(Ljava/net/InetAddress;I)V 是反彙編後的構造函數簽名
isa_ctorID = (*env)->GetMethodID(env, cls, "<init>",
"(Ljava/net/InetAddress;I)V");
}
複製代碼
將fd_fdID
、isa_ctorID
和isa_class
保存起來後面都用獲得。
FindClass
:是查找指定的類型GetFieldID
:獲取類型字段idNewGlobalRef
:建立一個引用GetMethodID
:獲取方法id關於JNI的字段和方法解釋能夠看JNI GetFieldID和GetMethodID函數解釋及方法簽名
咱們能夠經過javap
查看類的方法簽名,經過-p
顯示全部類和成員,-l
顯示行號和本地變量表,-c
對代碼進行反彙編。
C:\Program Files\Java\jdk1.8.0_161\bin>javap 用法: javap <options> <classes> 其中, 可能的選項包括: -help --help -? 輸出此用法消息 -version 版本信息 -v -verbose 輸出附加信息 -l 輸出行號和本地變量表 -public 僅顯示公共類和成員 -protected 顯示受保護的/公共類和成員 -package 顯示程序包/受保護的/公共類 和成員 (默認) -p -private 顯示全部類和成員 -c 對代碼進行反彙編 -s 輸出內部類型簽名 -sysinfo 顯示正在處理的類的 系統信息 (路徑, 大小, 日期, MD5 散列) -constants 顯示最終常量 -classpath <path> 指定查找用戶類文件的位置 -cp <path> 指定查找用戶類文件的位置 -bootclasspath <path> 覆蓋引導類文件的位置 複製代碼
在命令行輸入javap -p -c -l java.net.InetSocketAddress
查看構造函數的方法簽名爲Method "<init>":(Ljava/net/InetAddress;I)V
C:\Program Files\Java\jdk1.8.0_161\bin>javap -p -c -l java.net.InetSocketAddress Compiled from "InetSocketAddress.java" public class java.net.InetSocketAddress extends java.net.SocketAddress { private final transient java.net.InetSocketAddress$InetSocketAddressHolder holder; private static final long serialVersionUID; ... public java.net.InetSocketAddress(int); Code: 0: aload_0 1: invokestatic #215 // Method java/net/InetAddress.anyLocalAddress:()Ljava/net/InetAddress; 4: iload_1 5: invokespecial #219 // Method "<init>":(Ljava/net/InetAddress;I)V 8: return LineNumberTable: line 166: 0 line 167: 8 ... 複製代碼
IOV_MAX用於獲取最大的可一次性寫入的緩存個數,當咱們經過IOUtils.write一次性向Channel寫入多個Buffer時,會有Buffer最大數量限制的。
在IOUtil.load
初始化完成,則會建立SocketDispatcher
,它提供了Socket的native方法,不一樣平臺對於SocketDispatcher實現不同,最終都是調用FileDispatcherImpl
執行相關的文件操做。
class ServerSocketChannelImpl extends ServerSocketChannel implements SelChImpl {
private final FileDescriptor fd;
private int fdVal;
private static final int ST_INUSE = 0;
private int state = -1;
ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
super(sp);
//首先經過Net.serverSocket(true)建立Socket並建立一個文件描述符與其關聯。
this.fd = Net.serverSocket(true);
//在註冊selector的時候須要獲取到文件描述符的值。
this.fdVal = IOUtil.fdVal(fd);
this.state = ST_INUSE;
}
ServerSocketChannelImpl(SelectorProvider sp, FileDescriptor fd, boolean bound) throws IOException {
super(sp);
this.fd = fd;
this.fdVal = IOUtil.fdVal(fd);
this.state = ST_INUSE;
//已綁定則直接獲取地址
if (bound)
localAddress = Net.localAddress(fd);//獲取傳入的文件描述符的socket地址
}
...
}
複製代碼
文件描述符簡稱fd,它是一個抽象概念,在C庫編程中能夠叫作文件流或文件流指針,在其它語言中也能夠叫作文件句柄(handler),並且這些不一樣名詞的隱含意義多是不徹底相同的。不過在系統層,咱們統一把它叫作文件描述符。
咱們經過channel.bind
能夠將socket綁定到一個端口上。
public final ServerSocketChannel bind(SocketAddress local) throws IOException {
return bind(local, 0);
}
public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {
synchronized (lock) {
...
InetSocketAddress isa = (local == null) ? new InetSocketAddress(0) :
Net.checkAddress(local);
SecurityManager sm = System.getSecurityManager();
//檢查是否端口已被監聽
if (sm != null)
sm.checkListen(isa.getPort());
NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());
Net.bind(fd, isa.getAddress(), isa.getPort());
//默認tcp待鏈接隊列長度最小爲50
Net.listen(fd, backlog < 1 ? 50 : backlog);
synchronized (stateLock) {
//從文件描述符中獲取地址信息
localAddress = Net.localAddress(fd);
}
}
return this;
}
複製代碼
首先會作一下基本校驗,包括檢查端口是否被佔用。最終綁定並監聽端口。
文件描述符能夠關聯到一個文件設備,最終能夠關聯到socket結構體,經過socket結構體能夠提取出地址信息。如何獲取地址信息能夠看下根據文件描述符fd獲取socket結構體,socket結構體能夠看下struct socket結構體詳解 關於backlog,因爲TCP鏈接時會有3次握手,當服務端接收到
SYN
包時,會將該請求socket加入到待鏈接隊列中,而後返回SYN+ACK
繼續進行鏈接握手。若鏈接併發量很高,而服務端來不及accept致使待鏈接隊列滿了,則後續的鏈接請求將會被拒絕。
在創建在綁定地址以前,咱們須要調用NetHooks.beforeTcpBind
,這個方法是將fd轉換爲SDP(Sockets Direct Protocol,Java套接字直接協議) socket。
SDP須要網卡支持InfiniBand高速網絡通訊技術,windows不支持該協議。
public final class NetHooks {
public static void beforeTcpBind(FileDescriptor fdObj, InetAddress address, int port) throws IOException {
}
public static void beforeTcpConnect(FileDescriptor fdObj, InetAddress address, int port) throws IOException {
}
}
複製代碼
public final class NetHooks {
public static abstract class Provider {
protected Provider() {}
public abstract void implBeforeTcpBind(FileDescriptor fdObj, InetAddress address, int port) throws IOException;
public abstract void implBeforeTcpConnect(FileDescriptor fdObj, InetAddress address, int port) throws IOException;
}
private static final Provider provider = new sun.net.sdp.SdpProvider();
public static void beforeTcpBind(FileDescriptor fdObj, InetAddress address, int port) throws IOException {
provider.implBeforeTcpBind(fdObj, address, port);
}
public static void beforeTcpConnect(FileDescriptor fdObj, InetAddress address, int port) throws IOException {
provider.implBeforeTcpConnect(fdObj, address, port);
}
}
複製代碼
實際調用了SdpProvider的方法
provider.implBeforeTcpBind(fdObj, address, port);
public void implBeforeTcpBind(FileDescriptor fdObj, InetAddress address, int port) throws IOException {
if (enabled)
convertTcpToSdpIfMatch(fdObj, Action.BIND, address, port);
}
private void convertTcpToSdpIfMatch(FileDescriptor fdObj, Action action, InetAddress address, int port) throws IOException {
boolean matched = false;
//匹配規則,通常有PortRangeRule校驗器校驗端口範圍是否符合規則
for (Rule rule: rules) {
if (rule.match(action, address, port)) {
//將fd轉換爲socket
SdpSupport.convertSocket(fdObj);
matched = true;
break;
}
}
...
}
public static void convertSocket(FileDescriptor fd) throws IOException {
...
//獲取fd索引
int fdVal = fdAccess.get(fd);
convert0(fdVal);
}
//調用native方法轉換
private static native int create0() throws IOException;
//rules 是在初始化SdpProvider加載的
public SdpProvider() {
String file = AccessController.doPrivileged(
new GetPropertyAction("com.sun.sdp.conf"));
...
list = loadRulesFromFile(file);
...
this.rules = list;
this.log = out;
}
複製代碼
咱們除了使用chennel.bind
綁定地址之外,還能夠經過channel.socket().bind
綁定。channel.socket()
會建立一個內部的ServerSocket,ServerSocketAdaptor
把實現了SocketServer的配置、綁定Socket Server的功能抽出提取到了ServerSocket
中。
class ServerSocketChannelImpl extends ServerSocketChannel implements SelChImpl {
...
ServerSocket socket;
public ServerSocket socket() {
synchronized(this.stateLock) {
if (this.socket == null) {
this.socket = ServerSocketAdaptor.create(this);
}
return this.socket;
}
}
...
}
複製代碼
ServerSocketAdaptor
實現了SocketServer的綁定,監聽等功能,但實際仍是調用ServerSocketChannelImpl
的方法。
public class ServerSocketAdaptor extends ServerSocket {
private final ServerSocketChannelImpl ssc;
public static ServerSocket create(ServerSocketChannelImpl ssc) {
try {
return new ServerSocketAdaptor(ssc);
} catch (IOException x) {
throw new Error(x);
}
}
private ServerSocketAdaptor(ServerSocketChannelImpl ssc) throws IOException {
this.ssc = ssc;
}
public void bind(SocketAddress local, int backlog) throws IOException {
...
ssc.bind(local, backlog);
...
}
public Socket accept() throws IOException {
synchronized (ssc.blockingLock()) {
...
SocketChannel sc;
if ((sc = ssc.accept()) != null)
...
}
}
public void close() throws IOException {
ssc.close();
}
複製代碼
咱們能夠經過channel.accept
接收一個新的通道。
public SocketChannel accept() throws IOException {
synchronized (lock) {
//基本的校驗
if (!isOpen())
throw new ClosedChannelException();
if (!isBound())
throw new NotYetBoundException();
SocketChannel sc = null;
int n = 0;
//建立文件描述符和接收到的客戶端socket進行綁定
FileDescriptor newfd = new FileDescriptor();
InetSocketAddress[] isaa = new InetSocketAddress[1];
try {
//I/O開始前調用begin
begin();
if (!isOpen())
return null;
thread = NativeThread.current();
for (;;) {
//接收新鏈接,將給定的文件描述符與socket客戶端綁定,並設置套接字客戶端的地址。
n = accept(this.fd, newfd, isaa);
//返回 1 成功
//若返回IOStaus.INTERRUPTED表示系統調用了中斷操做,繼續等待接收。
if ((n == IOStatus.INTERRUPTED) && isOpen())
continue;
break;
}
} finally {
thread = 0;
//I/O結束調用end
end(n > 0);
assert IOStatus.check(n);
}
//返回 1 成功
if (n < 1)
return null;
//默認都是阻塞模式
IOUtil.configureBlocking(newfd, true);
InetSocketAddress isa = isaa[0];
//初始化客戶端socketchannel
sc = new SocketChannelImpl(provider(), newfd, isa);
//檢查權限
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
try {
sm.checkAccept(isa.getAddress().getHostAddress(),
isa.getPort());
} catch (SecurityException x) {
sc.close();
throw x;
}
}
return sc;
}
}
複製代碼
SecurityManager是安全管理器,用於權限校驗的類,若權限校驗不經過則會拋出
AccessControlException
異常。
SocketChannel
和ServerSocketChannel
相似,經過SocketChannel.open
建立channel,和服務端相似也是經過provider調用SocketChannle的構造函數。
public static SocketChannel open() throws IOException {
return SelectorProvider.provider().openSocketChannel();
}
複製代碼
咱們也能夠經過open(SocketAddress remote)
傳入鏈接地址,在建立後直接。
public static SocketChannel open(SocketAddress remote) throws IOException {
SocketChannel sc = open();
try {
sc.connect(remote);
}
...
return sc;
}
public boolean connect(SocketAddress sa) throws IOException {
...
begin();
if (localAddress == null) {
NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort());
}
...
//創建鏈接
n = Net.connect(fd, ia, isa.getPort());
...
end((n > 0) || (n == IOStatus.UNAVAILABLE));
...
//更新狀態
state = ST_CONNECTED;
// 非阻塞socket則更新狀態爲ST_PENDING
if (!isBlocking())
state = ST_PENDING;
...
}
複製代碼
NetHooks.beforeTcpConnect
和NetHooks.implBeforeTcpBind
同樣,最終都是調用SdpProvider.convertTcpToSdpIfMatch
將數據寫入channel時會調用IOUtil.write
public int write(ByteBuffer buf) throws IOException {
...
begin();
...
IOUtil.write(fd, buf, -1, nd);
...
end(n > 0 || (n == IOStatus.UNAVAILABLE));
...
}
static int write(FileDescriptor fd, ByteBuffer src, long position, NativeDispatcher nd) throws IOException {
//使用直接緩衝區,則直接寫入到緩衝區中
if (src instanceof DirectBuffer)
return writeFromNativeBuffer(fd, src, position, nd);
// 不是直接緩衝區
int pos = src.position();
int lim = src.limit();
assert (pos <= lim);
int rem = (pos <= lim ? lim - pos : 0);
//獲取一個臨時的直接緩衝區地址。
ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
try {
//寫入到臨時的直接緩衝區中
bb.put(src);
bb.flip();
src.position(pos);
//將直接緩衝區數據寫入
int n = writeFromNativeBuffer(fd, bb, position, nd);
if (n > 0) {
//更新實際寫入量
src.position(pos + n);
}
return n;
} finally {
//使用完緩衝釋放
Util.offerFirstTemporaryDirectBuffer(bb);
}
}
private static int writeFromNativeBuffer(FileDescriptor fd, ByteBuffer bb, long position, NativeDispatcher nd) throws IOException {
...
if (position != -1) {
//socket不支持,文件支持
written = nd.pwrite(fd, ((DirectBuffer)bb).address() + pos, rem, position);
} else {
//調用native方法
written = nd.write(fd, ((DirectBuffer)bb).address() + pos, rem);
}
...
}
int write(FileDescriptor fd, long address, int len) throws IOException {
return write0(fd, address, len, append);
}
static native int write0(FileDescriptor fd, long address, int len, boolean append) throws IOException;
複製代碼
當咱們使用堆緩衝區時,須要從線程本地緩衝區申請一塊臨時的直接緩衝區用於存放臨時數據,會多一次內存複製。
關於爲何須要使用一塊臨時緩衝區作中間處理能夠看下《Java NIO爲何須要DirectByteBuffer做爲中間緩衝區》
ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
private static ThreadLocal<BufferCache> bufferCache = new ThreadLocal<BufferCache>()
{
@Override
protected BufferCache initialValue() {
return new BufferCache();
}
};
public static ByteBuffer getTemporaryDirectBuffer(int size) {
//從線程緩衝區獲取一個緩衝區
BufferCache cache = bufferCache.get();
//獲取能容納的下的地址
ByteBuffer buf = cache.get(size);
if (buf != null) {
return buf;
} else {
//當沒有合適的緩衝區時,
if (!cache.isEmpty()) {
//刪除第一個未用的緩衝區
buf = cache.removeFirst();
//釋放
free(buf);
}
//從新分配一個合適大小的緩衝區
return ByteBuffer.allocateDirect(size);
}
}
複製代碼
ThreadLocal是利用空間換時間的一種方案性能優化方案,在每一個線程會有一個ThreadLocalMap用於存放數據。這樣每一個線程就訪問本身的數據,從而避免了非線程安全的數據因爲併發問題須要經過加鎖的方式同步帶來的性能損耗。
當咱們一次性向Channel寫入多個Buffer時,會有最大Buffer數量限制,也就是在Channel靜態構造函數獲取的IOV_MAX
的值。一次性寫入多個Buffer在Linux中稱之爲彙集寫,即將內存中分散在的若干緩衝區中的數據按順序寫至文件中。
調用Channel的write(ByteBuffer[] srcs, int offset, int length)
一次性寫入多個Buffer。
public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
...
begin();
...
IOUtil.write(fd, srcs, offset, length, nd);
...
end((n > 0) || (n == IOStatus.UNAVAILABLE));
...
}
write(FileDescriptor fd, ByteBuffer[] bufs, int offset, int length, NativeDispatcher nd)
{
//獲取或建立一個指定長度的IOVecWrapper結構,它能夠存放多個Buffer數據。
IOVecWrapper vec = IOVecWrapper.get(length);
...
while (i < count && iov_len < IOV_MAX) {
//遍歷每一塊待寫入緩衝區
ByteBuffer buf = bufs[i];
//計算buffer的可讀大小存放rem中
...
//將buffer放入IOVecWrapper中
if (rem > 0) {
vec.setBuffer(iov_len, buf, pos, rem);
if (!(buf instanceof DirectBuffer)) {
//若buf不是直接緩衝區則須要建立一個臨時的直接緩衝區
ByteBuffer shadow = Util.getTemporaryDirectBuffer(rem);
...
vec.setShadow(iov_len, shadow);
...
//更新待寫入緩衝區引用指向直接緩衝區。
buf = shadow;
}
//設置當前緩衝區的起始地址
vec.putBase(iov_len, ((DirectBuffer)buf).address() + pos);
//設置當前緩衝區的長度
vec.putLen(iov_len, rem);
iov_len++;
}
i++;
}
//調用writev一次性寫入多個緩衝區的數據。
long bytesWritten = nd.writev(fd, vec.address, iov_len);
...
long left = bytesWritten;
//清理工做
for (int j=0; j<iov_len; j++) {
if (left > 0) {
ByteBuffer buf = vec.getBuffer(j);
int pos = vec.getPosition(j);
int rem = vec.getRemaining(j);
int n = (left > rem) ? rem : (int)left;
buf.position(pos + n);
left -= n;
}
ByteBuffer shadow = vec.getShadow(j);
if (shadow != null)
//將已寫入完成的緩衝區放回到臨時直接緩衝區中
Util.offerLastTemporaryDirectBuffer(shadow);
// 清楚IOVcMrapper的緩存
vec.clearRefs(j);
}
return bytesWritten;
}
複製代碼
與彙集寫對應的還有
readv()
稱爲分散(散佈)讀,即將文件中若干連續的數據塊讀入內存分散的緩衝區中。 關於linux的彙集寫和散步讀具體內容能夠閱讀readv()和writev()函數和分散讀取與彙集寫入
經過IOVecWrapper
結構構建了一個字節緩衝區數組用於存放多個Buffer,其內部實際維護了一個Native數組結構。
class IOVecWrapper {
...
private final AllocatedNativeObject vecArray;
final long address;
static int addressSize;
private IOVecWrapper(int size) {
...
//false表示無需頁面對齊
this.vecArray = new AllocatedNativeObject(size * SIZE_IOVEC, false);
this.address = vecArray.address();
}
...
static {
//獲取本機指針的大小
addressSize = Util.unsafe().addressSize();
//保存每一個指針偏移量
LEN_OFFSET = addressSize;
//用於保存每一個AllocatedNativeObject對象的元素的大小
//每一個NativeObject有兩個long屬性,所以須要×2
SIZE_IOVEC = (short) (addressSize * 2);
}
}
複製代碼
AllocatedNativeObject
在堆外申請一片內存存放本機對象。
class AllocatedNativeObject extends NativeObject {
AllocatedNativeObject(int size, boolean pageAligned) {
super(size, pageAligned);
}
}
class NativeObject {
// native 分配地址,可能小於基地址,因爲頁面大小會對齊,因此實際的及地址時頁面對其後的地址。
protected long allocationAddress;
// native 基地址
private final long address;
}
複製代碼
經過上面咱們知道當一次性批量寫入緩存時會調用native的writev
彙集寫方法,調用以前會申請一塊地址用於存放可寫入的多塊緩衝區數據。如今咱們在回過頭看看IOVecWrapper
具體是怎麼作的。
class IOVecWrapper {
...
private final AllocatedNativeObject vecArray;
private final ByteBuffer[] buf;
private final int[] position;
private final int[] remaining;
private final ByteBuffer[] shadow;
//每一個線程保存一份IOVecWrapper緩存
private static final ThreadLocal<IOVecWrapper> cached =
new ThreadLocal<IOVecWrapper>();
//經過get獲取一塊適合大小的空間
static IOVecWrapper get(int size) {
IOVecWrapper wrapper = cached.get();
if (wrapper != null && wrapper.size < size) {
//若獲取到空間不夠大,則從新初始化一個空間。
wrapper.vecArray.free();
wrapper = null;
}
if (wrapper == null) {
wrapper = new IOVecWrapper(size);
//native資源,當對象釋放時使得操做系統能夠釋放內存,在將Buffer的時候關於直接緩衝區提到過相關的知識
Cleaner.create(wrapper, new Deallocator(wrapper.vecArray));
cached.set(wrapper);
}
return wrapper;
}
//將buffer保存起來
void setBuffer(int i, ByteBuffer buf, int pos, int rem) {
this.buf[i] = buf;
this.position[i] = pos;
this.remaining[i] = rem;
}
...
}
複製代碼
因爲IOVecWrapper內部使用的是直接緩衝區,所以將它存放於ThreadLocalMap中複用以提升性能。
從channel讀取數據時會調用IOUtil.read
public int read(ByteBuffer buf) throws IOException {
...
begin();
...
n = IOUtil.read(fd, buf, -1, nd);
...
end(n > 0 || (n == IOStatus.UNAVAILABLE));
...
}
static int read(FileDescriptor fd, ByteBuffer dst, long position, NativeDispatcher nd) throws IOException {
if (dst.isReadOnly())
throw new IllegalArgumentException("Read-only buffer");
if (dst instanceof DirectBuffer)
return readIntoNativeBuffer(fd, dst, position, nd);//使用直接緩衝區
//當不是使用直接內存時,則從線程本地緩衝獲取一塊臨時的直接緩衝區存放待讀取的數據
ByteBuffer bb = Util.getTemporaryDirectBuffer(dst.remaining());
try {
//讀取
int n = readIntoNativeBuffer(fd, bb, position, nd);
bb.flip();
if (n > 0)
//將直接緩衝區的數據寫入到堆緩衝區中
dst.put(bb);
return n;
} finally {
//釋放臨時的直接緩衝區
Util.offerFirstTemporaryDirectBuffer(bb);
}
}
private static int readIntoNativeBuffer(FileDescriptor fd, ByteBuffer bb, long position, NativeDispatcher nd) throws IOException {
...
if (position != -1) {
//socket不支持,文件支持
n = nd.pread(fd, ((DirectBuffer)bb).address() + pos,
rem, position);
} else {
n = nd.read(fd, ((DirectBuffer)bb).address() + pos, rem);
}
...
}
複製代碼
前面說到,Channel也支持分散讀取。
調用Channel的read(ByteBuffer[] dsts, int offset, int length)
讀取到多個Buffer中。
public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
...
begin();
...
n = IOUtil.read(fd, dsts, offset, length, nd);
...
end(n > 0 || (n == IOStatus.UNAVAILABLE));
...
}
static long read(FileDescriptor fd, ByteBuffer[] bufs, NativeDispatcher nd) throws IOException {
return read(fd, bufs, 0, bufs.length, nd);
}
static long read(FileDescriptor fd, ByteBuffer[] bufs, int offset, int length, NativeDispatcher nd) throws IOException {
IOVecWrapper vec = IOVecWrapper.get(length);
...
while (i < count && iov_len < IOV_MAX) {
ByteBuffer buf = bufs[i];
...
vec.setBuffer(iov_len, buf, pos, rem);
if (!(buf instanceof DirectBuffer)) {
ByteBuffer shadow = Util.getTemporaryDirectBuffer(rem);
vec.setShadow(iov_len, shadow);
...
}
...
long bytesRead = nd.readv(fd, vec.address, iov_len);
//清理工做
...
return bytesRead;
//清理工做
...
}
複製代碼
分散讀和彙集寫邏輯相似,都須要經過IOVecWrapper進行一個「中轉」。
經過調用channel.close
關閉通道,在接口實現裏咱們看過它實際是AbstractInterruptibleChannel
聲明的。
public final void close() throws IOException {
synchronized(this.closeLock) {
if (this.open) {
this.open = false;
this.implCloseChannel();
}
}
}
複製代碼
在AbstractSelectableChannel
實現了implCloseChannel
protected final void implCloseChannel() throws IOException {
//關閉當前channel
implCloseSelectableChannel();
synchronized (keyLock) {
int count = (keys == null) ? 0 : keys.length;
for (int i = 0; i < count; i++) {
SelectionKey k = keys[i];
if (k != null)
//註冊的channel都須要取消
k.cancel();
}
}
}
複製代碼
當channel註冊到selector時會建立SelectionKey保存到keys中。
this.implCloseSelectableChannel();
protected void implCloseSelectableChannel() throws IOException {
synchronized(this.stateLock) {
this.isInputOpen = false;
this.isOutputOpen = false;
if (this.state != 4) {
//windows不作處理
//linux和Solaris須要,關閉前將fd複製到另外一個待關閉fd中,以防止被fd回收
nd.preClose(this.fd);
}
if (this.readerThread != 0L) {
//發送信號給讀線程,將其從阻塞I/O中釋放,避免一直被阻塞。
NativeThread.signal(this.readerThread);
}
if (this.writerThread != 0L) {
//發送信號給寫線程,將其從阻塞I/O中釋放,避免一直被阻塞。
NativeThread.signal(this.writerThread);
}
//若還有註冊的channel,則不處理,等待key所有註銷後再kill
//若沒有的話能夠直接kill當前channel
if (!this.isRegistered()) {
this.kill();
}
}
}
複製代碼
kill
方法是在具體Channel中實現的,最終調用nd的close方法關閉文件描述符。
//SocketChannel
public void kill() throws IOException {
synchronized(this.stateLock) {
if (this.state != 4) {
if (this.state == -1) {
this.state = 4;
} else {
assert !this.isOpen() && !this.isRegistered();
//若仍有線程還沒釋放,則等線程I/O執行完後再kill
if (this.readerThread == 0L && this.writerThread == 0L) {
nd.close(this.fd);
this.state = 4;
} else {
this.state = 3;
}
}
}
}
}
//ServerSocketChannel
public void kill() throws IOException {
synchronized(this.stateLock) {
if (this.state != 1) {
if (this.state == -1) {
this.state = 1;
} else {
assert !this.isOpen() && !this.isRegistered();
nd.close(this.fd);
this.state = 1;
}
}
}
}
複製代碼
ServerSocketChannel的僅有-1(未初始化)、0(使用中)和1(已釋放)三個狀態。
NIO支持tcp的半鏈接,因爲TCP是全雙工的,即有輸入流和輸出流。在某些時候咱們能夠中斷其中一個流,而另外一個流仍然能夠繼續工做。好比做爲客戶端咱們能夠關閉輸出流,可是仍然能繼續接收到服務端發送的數據。
//關閉輸入流
client.socket().shutdownInput();
//關閉輸出流
client.socket().shutdownOutput();
複製代碼
當客戶端關閉了輸出流,實際上會送FIN
包到服務端,服務端接收到後響應ACK
,若服務端不發送FIN
包,關閉服務端的輸出流(客戶端的輸入流)時,則服務端仍然能繼續發送(響應)數據給客戶端,客戶端也仍然能夠繼續接收到數據。NIO的Pipe就是經過兩個socket的半鏈接實現的單項數據傳輸。
本篇對客戶端和服務端的socket,綁定、鏈接、讀、寫以及關閉等經常使用操做進行源碼分析,下一篇將繼續對FileChannel源碼進行探究。
![]()
- 微信掃一掃二維碼關注訂閱號傑哥技術分享
- 出處:www.cnblogs.com/Jack-Blog/p…
- 做者:傑哥很忙
- 本文使用「CC BY 4.0」創做共享協議。歡迎轉載,請在明顯位置給出出處及連接。