Java NIO 由如下幾個核心部分組成:
一、Buffer
二、Channel
三、Selectorlinux
Buffer和Channel在深刻淺出NIO之Channel、Buffer一文中已經介紹過,本文主要講解NIO的Selector實現原理。面試
以前進行socket編程時,accept方法會一直阻塞,直到有客戶端請求的到來,並返回socket進行相應的處理。整個過程是流水線的,處理完一個請求,才能去獲取並處理後面的請求,固然也能夠把獲取socket和處理socket的過程分開,一個線程負責accept,一個線程池負責處理請求。編程
但NIO提供了更好的解決方案,採用選擇器(Selector)返回已經準備好的socket,並按順序處理,基於通道(Channel)和緩衝區(Buffer)來進行數據的傳輸。windows
這裏出來一個新概念,selector,具體是一個什麼樣的東西?數組
想一想一個場景:在一個養雞場,有這麼一我的,天天的工做就是不停檢查幾個特殊的雞籠,若是有雞進來,有雞出去,有雞生蛋,有雞生病等等,就把相應的狀況記錄下來,若是雞場的負責人想知道狀況,只須要詢問那我的便可。緩存
在這裏,這我的就至關Selector,每一個雞籠至關於一個SocketChannel,每一個線程經過一個Selector能夠管理多個SocketChannel。服務器
爲了實現Selector管理多個SocketChannel,必須將具體的SocketChannel對象註冊到Selector,並聲明須要監聽的事件(這樣Selector才知道須要記錄什麼數據),一共有4種事件:架構
一、connect:客戶端鏈接服務端事件,對應值爲SelectionKey.OP_CONNECT(8)
二、accept:服務端接收客戶端鏈接事件,對應值爲SelectionKey.OP_ACCEPT(16)
三、read:讀事件,對應值爲SelectionKey.OP_READ(1)
四、write:寫事件,對應值爲SelectionKey.OP_WRITE(4)app
這個很好理解,每次請求到達服務器,都是從connect開始,connect成功後,服務端開始準備accept,準備就緒,開始讀數據,並處理,最後寫回數據返回。異步
因此,當SocketChannel有對應的事件發生時,Selector均可以觀察到,並進行相應的處理。
爲了更好的理解,先看一段服務端的示例代碼
ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); serverChannel.socket().bind(new InetSocketAddress(port)); Selector selector = Selector.open(); serverChannel.register(selector, SelectionKey.OP_ACCEPT); while(true){ int n = selector.select(); if (n == 0) continue; Iterator ite = this.selector.selectedKeys().iterator(); while(ite.hasNext()){ SelectionKey key = (SelectionKey)ite.next(); if (key.isAcceptable()){ SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept(); clntChan.configureBlocking(false); //將選擇器註冊到鏈接到的客戶端信道, //並指定該信道key值的屬性爲OP_READ, //同時爲該信道指定關聯的附件 clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufSize)); } if (key.isReadable()){ handleRead(key); } if (key.isWritable() && key.isValid()){ handleWrite(key); } if (key.isConnectable()){ System.out.println("isConnectable = true"); } ite.remove(); } }
一、建立ServerSocketChannel實例,並綁定指定端口;
二、建立Selector實例;
三、將serverSocketChannel註冊到selector,並指定事件OP_ACCEPT,最底層的socket經過channel和selector創建關聯;
四、若是沒有準備好的socket,select方法會被阻塞一段時間並返回0;
五、若是底層有socket已經準備好,selector的select方法會返回socket的個數,並且selectedKeys方法會返回socket對應的事件(connect、accept、read or write);
六、根據事件類型,進行不一樣的處理邏輯;
在步驟3中,selector只註冊了serverSocketChannel的OP_ACCEPT事件
一、若是有客戶端A鏈接服務,執行select方法時,能夠經過serverSocketChannel獲取客戶端A的socketChannel,並在selector上註冊socketChannel的OP_READ事件。
二、若是客戶端A發送數據,會觸發read事件,這樣下次輪詢調用select方法時,就能經過socketChannel讀取數據,同時在selector上註冊該socketChannel的OP_WRITE事件,實現服務器往客戶端寫數據。
SocketChannel、ServerSocketChannel和Selector的實例初始化都經過SelectorProvider類實現,其中Selector是整個NIO Socket的核心實現。
public static SelectorProvider provider() { synchronized (lock) { if (provider != null) return provider; return AccessController.doPrivileged( new PrivilegedAction<SelectorProvider>() { public SelectorProvider run() { if (loadProviderFromProperty()) return provider; if (loadProviderAsService()) return provider; provider = sun.nio.ch.DefaultSelectorProvider.create(); return provider; } }); } }
SelectorProvider在windows和linux下有不一樣的實現,provider方法會返回對應的實現。
這裏不由要問,Selector是如何作到同時管理多個socket?
下面咱們看看Selector的具體實現,Selector初始化時,會實例化PollWrapper、SelectionKeyImpl數組和Pipe。
WindowsSelectorImpl(SelectorProvider sp) throws IOException { super(sp); pollWrapper = new PollArrayWrapper(INIT_CAP); wakeupPipe = Pipe.open(); wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal(); // Disable the Nagle algorithm so that the wakeup is more immediate SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink(); (sink.sc).socket().setTcpNoDelay(true); wakeupSinkFd = ((SelChImpl)sink).getFDVal(); pollWrapper.addWakeupSocket(wakeupSourceFd, 0); }
pollWrapper用Unsafe類申請一塊物理內存pollfd,存放socket句柄fdVal和events,其中pollfd共8位,0-3位保存socket句柄,4-7位保存events。
pollWrapper提供了fdVal和event數據的相應操做,如添加操做經過Unsafe的putInt和putShort實現。
void putDescriptor(int i, int fd) { pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd); } void putEventOps(int i, int event) { pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event); }
先看看serverChannel.register(selector, SelectionKey.OP_ACCEPT)
是如何實現的
public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException { synchronized (regLock) { SelectionKey k = findKey(sel); if (k != null) { k.interestOps(ops); k.attach(att); } if (k == null) { // New registration synchronized (keyLock) { if (!isOpen()) throw new ClosedChannelException(); k = ((AbstractSelector)sel).register(this, ops, att); addKey(k); } } return k; } }
protected final SelectionKey register(AbstractSelectableChannel ch, int ops, Object attachment) { if (!(ch instanceof SelChImpl)) throw new IllegalSelectorException(); SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this); k.attach(attachment); synchronized (publicKeys) { implRegister(k); } k.interestOps(ops); return k; } protected void implRegister(SelectionKeyImpl ski) { synchronized (closeLock) { if (pollWrapper == null) throw new ClosedSelectorException(); growIfNeeded(); channelArray[totalChannels] = ski; ski.setIndex(totalChannels); fdMap.put(ski); keys.add(ski); pollWrapper.addEntry(totalChannels, ski); totalChannels++; } }
一、以當前channel和selector爲參數,初始化SelectionKeyImpl 對象selectionKeyImpl ,並添加附件attachment。
二、若是當前channel的數量totalChannels等於SelectionKeyImpl數組大小,對SelectionKeyImpl數組和pollWrapper進行擴容操做。
三、若是totalChannels % MAX_SELECTABLE_FDS == 0,則多開一個線程處理selector。
四、pollWrapper.addEntry將把selectionKeyImpl中的socket句柄添加到對應的pollfd。
五、k.interestOps(ops)方法最終也會把event添加到對應的pollfd。
因此,無論serverSocketChannel,仍是socketChannel,在selector註冊的事件,最終都保存在pollArray中。
接着,再來看看selector中的select是如何實現一次獲取多個有事件發生的channel的,底層由selector實現類的doSelect方法實現,以下:
protected int doSelect(long timeout) throws IOException { if (channelArray == null) throw new ClosedSelectorException(); this.timeout = timeout; // set selector timeout processDeregisterQueue(); if (interruptTriggered) { resetWakeupSocket(); return 0; } // Calculate number of helper threads needed for poll. If necessary // threads are created here and start waiting on startLock adjustThreadsCount(); finishLock.reset(); // reset finishLock // Wakeup helper threads, waiting on startLock, so they start polling. // Redundant threads will exit here after wakeup. startLock.startThreads(); // do polling in the main thread. Main thread is responsible for // first MAX_SELECTABLE_FDS entries in pollArray. try { begin(); try { subSelector.poll(); } catch (IOException e) { finishLock.setException(e); // Save this exception } // Main thread is out of poll(). Wakeup others and wait for them if (threads.size() > 0) finishLock.waitForHelperThreads(); } finally { end(); } // Done with poll(). Set wakeupSocket to nonsignaled for the next run. finishLock.checkForException(); processDeregisterQueue(); int updated = updateSelectedKeys(); // Done with poll(). Set wakeupSocket to nonsignaled for the next run. resetWakeupSocket(); return updated; }
其中 subSelector.poll() 是select的核心,由native函數poll0實現,readFds、writeFds 和exceptFds數組用來保存底層select的結果,數組的第一個位置都是存放發生事件的socket的總數,其他位置存放發生事件的socket句柄fd。
private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1]; private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1]; private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1]; private int poll() throws IOException{ // poll for the main thread return poll0(pollWrapper.pollArrayAddress, Math.min(totalChannels, MAX_SELECTABLE_FDS), readFds, writeFds, exceptFds, timeout); }
執行 selector.select() ,poll0函數把指向socket句柄和事件的內存地址傳給底層函數。
一、若是以前沒有發生事件,程序就阻塞在select處,固然不會一直阻塞,由於epoll在timeout時間內若是沒有事件,也會返回;
二、一旦有對應的事件發生,poll0方法就會返回;
三、processDeregisterQueue方法會清理那些已經cancelled的SelectionKey;
四、updateSelectedKeys方法統計有事件發生的SelectionKey數量,並把符合條件發生事件的SelectionKey添加到selectedKeys哈希表中,提供給後續使用。
在早期的JDK1.4和1.5 update10版本以前,Selector基於select/poll模型實現,是基於IO複用技術的非阻塞IO,不是異步IO。在JDK1.5 update10和linux core2.6以上版本,sun優化了Selctor的實現,底層使用epoll替換了select/poll。
經過遍歷selector中的SelectionKeyImpl數組,獲取發生事件的socketChannel對象,其中保存了對應的socket,實現以下
public int read(ByteBuffer buf) throws IOException { if (buf == null) throw new NullPointerException(); synchronized (readLock) { if (!ensureReadOpen()) return -1; int n = 0; try { begin(); synchronized (stateLock) { if (!isOpen()) { return 0; } readerThread = NativeThread.current(); } for (;;) { n = IOUtil.read(fd, buf, -1, nd); if ((n == IOStatus.INTERRUPTED) && isOpen()) { // The system call was interrupted but the channel // is still open, so retry continue; } return IOStatus.normalize(n); } } finally { readerCleanup(); // Clear reader thread // The end method, which end(n > 0 || (n == IOStatus.UNAVAILABLE)); // Extra case for socket channels: Asynchronous shutdown // synchronized (stateLock) { if ((n <= 0) && (!isInputOpen)) return IOStatus.EOF; } assert IOStatus.check(n); } } }
最終經過Buffer的方式讀取socket的數據。
public Selector wakeup() { synchronized (interruptLock) { if (!interruptTriggered) { setWakeupSocket(); interruptTriggered = true; } } return this; } // Sets Windows wakeup socket to a signaled state. private void setWakeupSocket() { setWakeupSocket0(wakeupSinkFd); } private native void setWakeupSocket0(int wakeupSinkFd);
看來wakeupSinkFd這個變量是爲wakeup方法使用的。
其中interruptTriggered爲中斷已觸發標誌,當pollWrapper.interrupt()以後,該標誌即爲true了;由於這個標誌,連續兩次wakeup,只會有一次效果。
epoll是Linux下的一種IO多路複用技術,能夠很是高效的處理數以百萬計的socket句柄。
三個epoll相關的系統調用:
epoll內部實現大概以下:
以爲不錯請點贊支持,歡迎留言或進個人我的羣855801563領取【架構資料專題目合集90期】、【BATJTMD大廠JAVA面試真題1000+】,本羣專用於學習交流技術、分享面試機會,拒絕廣告,我也會在羣內不按期答題、探討。