NIO 源碼分析(05) Channel 源碼分析java
Netty 系列目錄(http://www.javashuo.com/article/p-hskusway-em.html)linux
功能說明:編程
InterruptibleChannel、AbstractInterruptibleChannel
提供了 Channel 響應 thread.interrupt(),支持中斷操做,最重要的兩個方法是 begin 和 end。SelectableChannel、AbstractSelectableChannel
提供了 Channel 註冊到 Selector 的各類方法。ReadableByteChannel、ScatteringByteChannel
Channel 讀數據。WritableByteChannel、GatheringByteChannel
Channel 寫數據。NetworkChannel
Channel 進行端口綁定、參數設置等網絡相關的操做。SocketChannel
Socket 門面,由 SelectorProvider.openSocketChannel 提供具體的實現類。ServerSocketChannel
ServerSocket 門面,由 SelectorProvider.openServerSocketChannel 提供具體的實現類。public ServerSocketChannel openServerSocketChannel() throws IOException { return new ServerSocketChannelImpl(this); } public SocketChannel openSocketChannel() throws IOException { return new SocketChannelImpl(this); }
在 SelectorProviderImpl 中默認的 ServerSocketChannel 實現類是 ServerSocketChannelImpl。windows
begin() 和 end() 老是配對使用的,Channel 和 Selector 均有本身的實現,所完成的功能也是有所區別的。一般這兩個方法的使用以下:安全
boolean completed = false; try { begin(); completed = ...; // Perform blocking I/O operation return ...; // Return result } finally { end(completed); }
AbstractInterruptibleChannel 中最重要的方法是 begin 和 end,它們的功能是什麼呢?網絡
protected final void begin() { if (interruptor == null) { interruptor = new Interruptible() { public void interrupt(Thread target) { synchronized (closeLock) { if (!open) return; open = false; interrupted = target; try { // Channel 中的 begin 就作了一件事,關閉 channel AbstractInterruptibleChannel.this.implCloseChannel(); } catch (IOException x) { } } }}; } blockedOn(interruptor); // t.blockedOn(b) Thread me = Thread.currentThread(); if (me.isInterrupted()) interruptor.interrupt(me); } protected final void end(boolean completed) throws AsynchronousCloseException { blockedOn(null); Thread interrupted = this.interrupted; if (interrupted != null && interrupted == Thread.currentThread()) { interrupted = null; throw new ClosedByInterruptException(); } if (!completed && !open) throw new AsynchronousCloseException(); }
總結: Channel 中的 begin 就幹了一件事,關閉 channel。咱們先試想這樣一個場景,Channel 要讀寫數據時所在線程被中斷了,會發生什麼事?線程既然被中斷了,Channel 總要關閉吧。事實上 Channel 的 begin 和 end 就是作這個事情的。不過要理解 begin 和 end,彷佛咱們先得弄明白 AbstractInterruptibleChannel.blockedOn 究竟在幹什麼:多線程
static void blockedOn(Interruptible intr) { // package-private sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(), intr); }
其中 JavaLangAccess 接口在 java.lang.System 中被實例化,它是這樣寫的:app
private static void setJavaLangAccess() { // Allow privileged classes outside of java.lang sun.misc.SharedSecrets.setJavaLangAccess(new sun.misc.JavaLangAccess(){ public void blockedOn(Thread t, Interruptible b) { t.blockedOn(b); } } }
如今咱們發現,JavaLangAccess 的 blockedOn 實現,竟然只有這麼一句 t.blockedOn(b)。繼續跟蹤到 java.lang.Thread 中 blockedOn 的實現了:socket
private volatile Interruptible blocker; void blockedOn(Interruptible b) { synchronized (blockerLock) { blocker = b; } }
實際上就是 Thread 線程上註冊了一個鉤子方法,當線程中斷時,即調用 thread.interrupt() 時會回調這個 b.interrupt(this) 方法,進而關閉線程對應的 Channel。
public void interrupt() { if (this != Thread.currentThread()) checkAccess(); synchronized (blockerLock) { Interruptible b = blocker; if (b != null) { interrupt0(); // Just to set the interrupt flag b.interrupt(this); return; } } interrupt0(); }
protected final void begin() { if (interruptor == null) { interruptor = new Interruptible() { public void interrupt(Thread ignore) { // 線程中斷時喚醒 selector AbstractSelector.this.wakeup(); }}; } AbstractInterruptibleChannel.blockedOn(interruptor); Thread me = Thread.currentThread(); if (me.isInterrupted()) interruptor.interrupt(me); } protected final void end() { AbstractInterruptibleChannel.blockedOn(null); }
總結: Selector 中的 begin 和 end 就是在線程中斷時喚醒對應的 selector,對應的使用以下:
protected int doSelect(long timeout) throws IOException { try { begin(); pollWrapper.poll(timeout); } finally { end(); } // ... }
private SelectionKey[] keys = null; private int keyCount = 0; // Lock for key set and count private final Object keyLock = new Object(); // Blocking mode, protected by regLock boolean blocking = true; // Lock for registration and configureBlocking operations private final Object regLock = new Object();
keys、keyCount
Channel 註冊後的 SelectionKey 集合和數量,也就是說一個 Channel 能夠註冊到多個 Selector 上。keyLock 保證多線程下 keys、keyCount 操做時的數據安全。
blocking
Channel 是不是阻塞的。regLock 保證多線程下 blocking 操做的線程安全,同時註冊時也須要加鎖。
public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException { synchronized (regLock) { if (!isOpen()) throw new ClosedChannelException(); // 註冊的感興趣事件不合法 if ((ops & ~validOps()) != 0) throw new IllegalArgumentException(); if (blocking) throw new IllegalBlockingModeException(); // 1. 判斷 Channel 是不是在這個 Selector 上是否已經註冊 SelectionKey k = findKey(sel); // 2. 若是已經註冊,更新感興趣事件 if (k != null) { k.interestOps(ops); k.attach(att); } // 3. 若是沒有註冊,委託 seletor.register 完成註冊 if (k == null) { // New registration synchronized (keyLock) { if (!isOpen()) throw new ClosedChannelException(); k = ((AbstractSelector)sel).register(this, ops, att); addKey(k); } } return k; } }
總結: AbstractSelectableChannel 能夠註冊到多個 Selector 上,keys 屬性管理了全部已經註冊的 SelectionKey。對於已經註冊的 SelectionKey 和未註冊的處理邏輯並不一樣。具體步驟以下:
SelectionKeyImpl(SelChImpl ch, SelectorImpl sel) { channel = ch; selector = sel; }
SelectionKeyImpl 是對 Channel 和 Seletor 的封裝,具體的事件註冊仍是委託給了 channel 完成。
public SelectionKey interestOps(int ops) { ensureValid(); return nioInterestOps(ops); } public SelectionKey nioInterestOps(int ops) { if ((ops & ~channel().validOps()) != 0) throw new IllegalArgumentException(); channel.translateAndSetInterestOps(ops, this); interestOps = ops; return this; }
總結: SelectionKey.interestOps 事件註冊繞了一圈,最後發現又委託給了 Channel 來完成。
// ServerSocketChannel 事件註冊 public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) { int newOps = 0; // Translate ops if ((ops & SelectionKey.OP_ACCEPT) != 0) newOps |= Net.POLLIN; // Place ops into pollfd array sk.selector.putEventOps(sk, newOps); }
總結: 事件註冊須要注意的是 OP_READ、OP_WRITE、OP_CONNECT、OP_ACCEPT 只是 JDK 對底層 IO 事件的封裝,在註冊前須要將 JDK IO 事件轉換成 Linux NIO 事件。最終事件的註冊都是委託給了 Selector 完成,因此才說 Selector 纔是 NIO 的核心。
// Net public static final short POLLIN; // 讀事件 public static final short POLLOUT; // 寫事件 public static final short POLLERR; public static final short POLLHUP; public static final short POLLNVAL; public static final short POLLCONN;
public SocketChannel accept() throws IOException { synchronized (lock) { // 1. 鏈接校驗 if (!isOpen()) throw new ClosedChannelException(); if (!isBound()) throw new NotYetBoundException(); SocketChannel sc = null; // 2. newfd、isaa 若是有新的 socket 鏈接,則會經過 accept 賦值 int n = 0; FileDescriptor newfd = new FileDescriptor(); InetSocketAddress[] isaa = new InetSocketAddress[1]; // 3. begin、end 配套使用則能夠響應線程中斷,關閉 channel try { begin(); if (!isOpen()) return null; thread = NativeThread.current(); for (;;) { // 4. 接收新的鏈接請求 n = accept(this.fd, newfd, isaa); if ((n == IOStatus.INTERRUPTED) && isOpen()) continue; break; } } finally { thread = 0; end(n > 0); assert IOStatus.check(n); } if (n < 1) return null; // 4. socket 默認是阻塞的,因此若是是 NIO 編程須要手動設置成 false IOUtil.configureBlocking(newfd, true); InetSocketAddress isa = isaa[0]; sc = new SocketChannelImpl(provider(), newfd, isa); // 5. 返回新的 socket 請求 return sc; } } private int accept(FileDescriptor ssfd, FileDescriptor newfd, InetSocketAddress[] isaa) throws IOException { return accept0(ssfd, newfd, isaa); }
總結: ServerSocketChannel 接收新的鏈接請求步驟和 BIO 相似,沒有什麼區別,大體有如下步驟:
close() 操做限於通道,並且仍是實現了 InterruptibleChannel 接口的通道,例如 FileChannel 就沒有 close 操做。
在分析 close() 具體實現以前,咱們先得理解爲何要有 close() 這個操做:一個可選擇的通道,在建立之初會生成一個 FileDescriptor,linux 下即爲 fd,windows 下即爲句柄,這些都是系統資源,不能無限佔用,當在不使用的時候,就應該將其釋放,close 便是完成這個工做的。
抽象類 AbstractInterruptibleChannel 實現了 InterruptibleChannel 接口,而 SelectableChannel 繼承自 AbstractInterruptibleChannel,所以,可選擇的通道同時也是能夠 close 的。AbstractInterruptibleChannel 的 close 實現以下:
public final void close() throws IOException { synchronized (closeLock) { if (!open) return; open = false; implCloseChannel(); } }
來具體關閉邏輯就在 implCloseChannel() 中了,因而再看 AbstractSelectableChannel:
protected final void implCloseChannel() throws IOException { implCloseSelectableChannel(); synchronized (keyLock) { int count = (keys == null) ? 0 : keys.length; for (int i = 0; i < count; i++) { SelectionKey k = keys[i]; if (k != null) k.cancel(); } } }
先看 synchronized 同步塊,它將當前通道保存的 SelectionKey 所有 cancel,意思就是說,當前通關閉了,與它相關的全部 SelectionKey 都沒有意義了,因此要所有取消掉,以前講解 cancel 過程已經說明了,cancel 操做只是將 SelectionKey 加入對 應選擇器的 cancelKeys 集合中,在下次正式選擇開始的時候再一一清除;
這麼看來,仍是應該追究一下 implCloseSelectableChannel() 的實現了,下面分別從 ServerSocketChannel 和 SocketChannel 實現出發:
先看 ServerSocketChannelImpl
protected void implCloseSelectableChannel() throws IOException { synchronized (stateLock) { if (state != ST_KILLED) nd.preClose(fd); long th = thread; if (th != 0) NativeThread.signal(th); if (!isRegistered()) kill(); } }
出現了兩個很奇怪的東西,看來要徹底弄懂這段代碼,是得好好分析一下它們了,它們是:NativeDispatcher nd 和 NativeThread;
若是已經對 linux 信號機制很是熟悉,應該很容易猜想到 NativeThread.signal(th) 在作什麼,是的,它在喚醒阻塞的線程 th,下面咱們來看看它是如何作到的:
NativeThread 類很是簡單,幾乎全是 native 方法:
class NativeThread { static native long current(); static native void signal(long nt); static native void init(); static { Util.load(); init(); } }
在看其本地實現:
//自定義中斷信號,kill –l #define INTERRUPT_SIGNAL (__SIGRTMAX - 2) //自定義的信號處理函數,當前函數什麼都不作 static void nullHandler(int sig) { } #endif //NativeThread.init()的本地實現,能夠看到它用到了sigaction //sigaction用來install一個信號 JNIEXPORT void JNICALL Java_sun_nio_ch_NativeThread_init(JNIEnv *env, jclass cl) { #ifdef __linux__ sigset_t ss; // 如下這段代碼是常見的信號安裝過程 // 講解這段代碼的目的只是爲了讓你們理解NativeThread.signal // 的工做原理,故不少細節就簡單帶過了 struct sigaction sa, osa; // sa用於定製信號INTERRUPT_SIGNAL的處理方式的 // 如sa_handler = nullHandler即用來指定信號處理函數的 // 即線程收到信號時,爲執行這個函數,nullHandler是個空殼 // 函數,因此它什麼都不作 // 不用理解sa_flags各個標識表明什麼 // sigemptyset顧名思義,它是初始化sigaction的sa_mask位 // sigaction(INTERRUPT_SIGNAL, &sa, &osa)執行後 // 若是成功,則表示INTERRUPT_SIGNAL這個信號安裝成功了 // 爲何要有這個init呢,其實不用這不操做也許不會有問題 // 但由於不能確保INTERRUPT_SIGNAL沒有被其餘線程install // 過,若是sa_handler對應函數不是空操做,則在使用這個信號 // 時會對當前線程有影響 sa.sa_handler = nullHandler; sa.sa_flags = 0; sigemptyset(&sa.sa_mask); if (sigaction(INTERRUPT_SIGNAL, &sa, &osa) < 0) JNU_ThrowIOExceptionWithLastError(env, "sigaction"); #endif } JNIEXPORT jlong JNICALL Java_sun_nio_ch_NativeThread_current(JNIEnv *env, jclass cl) { #ifdef __linux__ // pthread_self()便是獲取當前線程ID,它與getpid()是不一樣的 // 具體細節沒有研究 return (long)pthread_self(); #else return -1; #endif } JNIEXPORT void JNICALL Java_sun_nio_ch_NativeThread_signal(JNIEnv *env, jclass cl, jlong thread) { #ifdef __linux__ //這個就是最關鍵的signal實現了,能夠看到,它調用了pthread庫的pthread_kill //像thread線程發送一個INTERRUPT_SIGNAL信號,這個信號就是在init中install //的,對應的處理函數是空函數,也就是說,往thread線程發送一個信號,若是該線程處於 //阻塞狀態,則會由於受到信號而終止阻塞,而若是處於非阻塞,則無影響 if (pthread_kill((pthread_t)thread, INTERRUPT_SIGNAL)) JNU_ThrowIOExceptionWithLastError(env, "Thread signal failed"); #endif }
Java 的 NativeThread 作靜態初始化時已經執行了 init,也就是說 INTERRUPT_SIGNAL 信號已經被安裝,而 ServerSocketChannelImpl 在上述 accept 時可能賦值。
try { begin(); if (!isOpen()) return null; thread = NativeThread.current(); for (;;) { n = accept0(this.fd, newfd, isaa); if ((n == IOStatus.INTERRUPTED) && isOpen()) continue; break; } } finally { thread = 0; end(n > 0); assert IOStatus.check(n); }
try 的內部,for 循環以前,thread 被複製爲 NativeThread.current() 即爲當前線程 id;finally 時 thread 又被修改回 0,所以在 implCloseSelectableChannel 纔有這樣一段:
if (th != 0) NativeThread.signal(th);
NativeThread.signal(th) 經過像當前線程發送 INTERRUPT_SIGNAL 信號而確保 th 線程沒有被阻塞,即若是阻塞就中止阻塞。
如今理解了 NativeThread 了,咱們再看 NativeDispatcher
首先咱們得知道在 ServerSocketChannelImpl 中,nd 被初始化爲 SocketDispatcher,見:
static { Util.load(); initIDs(); nd = new SocketDispatcher(); }
又由於 linux 下一切皆文件的思想(現實雖然不絕對),SocketDispatcher 其實就是用 FileDispatcher 實現的,最終 FileDispatcher 也只是封裝了一大堆 native 方法,一波三折,關於 FileDispatcher,這裏先不詳細講解了,先針對 nd.preClose(fd) 和 kill 將 implCloseSelectableChannel 的過程說明白吧:
首先,咱們要明白這樣一個道理:在多線程環境下,老是很難知道何時可安全的關閉或釋放資源(如fd),當一個線程 A 使用 fd 來讀寫,而另外一個線程 B 關閉或釋放了 fd,則 A 線程就會讀寫一個錯誤的文件或 socket;爲了防止這種狀況出現,因而 NIO 就採用了經典的 two-step 處理方案:
第一步:建立一個 socket pair,假設 FDs 爲 sp[2],先 close 掉 sp[1],這樣,該 socket pair 就成爲了一個半關閉的連接;複製 (dup2)sp[0] 到 fd(即爲咱們想關閉或釋放的fd),這個時候,其餘線程若是正在讀寫當即會得到 EOF 或者 Pipe Error,read 或 write 方法裏會檢測這些狀態作相應處理;
第二步:最後一個會使用到 fd 的線程負責釋放
nd.preClose(fd) 即爲兩步曲中的第一步,咱們先來看其實現,最終定位到 FileDispatcher.c,相關代碼以下:
static int preCloseFD = -1; JNIEXPORT void JNICALL Java_sun_nio_ch_FileDispatcher_init(JNIEnv *env, jclass cl) { int sp[2]; if (socketpair(PF_UNIX, SOCK_STREAM, 0, sp) < 0) { JNU_ThrowIOExceptionWithLastError(env, "socketpair failed"); return; } preCloseFD = sp[0]; close(sp[1]); } JNIEXPORT void JNICALL Java_sun_nio_ch_FileDispatcher_preClose0(JNIEnv *env, jclass clazz, jobject fdo) { jint fd = fdval(env, fdo); if (preCloseFD >= 0) { if (dup2(preCloseFD, fd) < 0) JNU_ThrowIOExceptionWithLastError(env, "dup2 failed"); } }
從上面兩個函數實現,咱們能夠看到,在 init 函數中,建立了一個半關閉的 socket pair,preCloseFD 即爲未關閉的一端,init 在靜態初始化時就會被執行;再來看關鍵的 preClose0,它的確是採用 dup2 來複制 preCloseFD,這樣一來,fd 就被替換成了 preCloseFD,這正是 socket pair 中未被關閉的一端。
既然 nd.preClose(fd) 只是預關閉,則真正執行關閉的邏輯確定在這個 kill 中了,從代碼邏輯上仍是比較好懂的,if (!isRegistered()) 即表示該通道沒有被註冊,表示全部 Selector 都沒有意願關心這個通道了,則天然能夠放心的關閉 fd。
果斷猜想 kill 中有 nd.close(fd) 這樣的代碼,不信請看:
public void kill() throws IOException { synchronized (stateLock) { if (state == ST_KILLED) return; if (state == ST_UNINITIALIZED) { state = ST_KILLED; return; } assert !isOpen() && !isRegistered(); nd.close(fd); state = ST_KILLED; } }
果真如此,這樣一來,關閉二步曲就可以較安全的釋放咱們的fd資源了,至於 nd.close(fd)的本地實現,這裏就不講了,確定是採用了 close(fd) 的系統調用。總的來講,通道的 close 就是爲了斷開它與內核 fd 的那點聯繫。
天天用心記錄一點點。內容也許不重要,但習慣很重要!