本文主要經過對Selector的使用流程講解來展開其中的實現原理。
首先先來段Selector最簡單使用片斷java
ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); int port = 5566; serverChannel.socket().bind(new InetSocketAddress(port)); Selector selector = Selector.open(); serverChannel.register(selector, SelectionKey.OP_ACCEPT); while(true){ int n = selector.select(); if(n > 0) { Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey selectionKey = iter.next(); ...... iter.remove(); } } }
SocketChannel、ServerSocketChannel和Selector的實例初始化都經過SelectorProvider類實現,其中Selector是整個NIO Socket的核心實現。linux
ServerSocketChannel.open();數組
public static ServerSocketChannel open() throws IOException { return SelectorProvider.provider().openServerSocketChannel(); }
SocketChannel.open();緩存
public static SocketChannel open() throws IOException { return SelectorProvider.provider().openSocketChannel(); }
Selector.open();app
public static Selector open() throws IOException { return SelectorProvider.provider().openSelector(); }
public static SelectorProvider provider() { synchronized (lock) { if (provider != null) return provider; return AccessController.doPrivileged( new PrivilegedAction<>() { public SelectorProvider run() { if (loadProviderFromProperty()) return provider; if (loadProviderAsService()) return provider; provider = sun.nio.ch.DefaultSelectorProvider.create(); return provider; } }); } }
若是配置了「java.nio.channels.spi.SelectorProvider」屬性,則經過該屬性值load對應的SelectorProvider對象,若是構建失敗則拋異常。
若是SystemClassLoader中已經加載過了SelectorProvider類,則是直接使用。不然從系統類加載器中獲取失敗,則拋異常。
若是上面兩種狀況都不存在,則返回系統默認的SelectorProvider,即,sun.nio.ch.DefaultSelectorProvider.create();
隨後在調用該方法,即SelectorProvider.provider()。則返回第一次調用的結果。socket
不一樣系統對應着不一樣的sun.nio.ch.DefaultSelectorProvider ide
這裏咱們看linux下面的sun.nio.ch.DefaultSelectorProvider函數
public class DefaultSelectorProvider { /** * Prevent instantiation. */ private DefaultSelectorProvider() { } /** * Returns the default SelectorProvider. */ public static SelectorProvider create() { return new sun.nio.ch.EPollSelectorProvider(); } }
能夠看見,linux系統下sun.nio.ch.DefaultSelectorProvider.create(); 會生成一個sun.nio.ch.EPollSelectorProvider類型的SelectorProvider,這裏對應於linux系統的epollui
/** * Opens a selector. * * <p> The new selector is created by invoking the {[@link](https://my.oschina.net/u/393) * java.nio.channels.spi.SelectorProvider#openSelector openSelector} method * of the system-wide default {[@link](https://my.oschina.net/u/393) * java.nio.channels.spi.SelectorProvider} object. </p> * * [@return](https://my.oschina.net/u/556800) A new selector * * [@throws](https://my.oschina.net/throws) IOException * If an I/O error occurs */ public static Selector open() throws IOException { return SelectorProvider.provider().openSelector(); }
在獲得sun.nio.ch.EPollSelectorProvider後調用openSelector()方法構建Selector,這裏會構建一個EPollSelectorImpl對象。this
class EPollSelectorImpl extends SelectorImpl { // File descriptors used for interrupt protected int fd0; protected int fd1; // The poll object EPollArrayWrapper pollWrapper; // Maps from file descriptors to keys private Map<Integer,SelectionKeyImpl> fdToKey;
EPollSelectorImpl(SelectorProvider sp) throws IOException { super(sp); long pipeFds = IOUtil.makePipe(false); fd0 = (int) (pipeFds >>> 32); fd1 = (int) pipeFds; try { pollWrapper = new EPollArrayWrapper(); pollWrapper.initInterrupt(fd0, fd1); fdToKey = new HashMap<>(); } catch (Throwable t) { try { FileDispatcherImpl.closeIntFD(fd0); } catch (IOException ioe0) { t.addSuppressed(ioe0); } try { FileDispatcherImpl.closeIntFD(fd1); } catch (IOException ioe1) { t.addSuppressed(ioe1); } throw t; } }
EPollSelectorImpl構造函數完成:
① EPollArrayWrapper的構建,EpollArrayWapper將Linux的epoll相關係統調用封裝成了native方法供EpollSelectorImpl使用。
② 經過EPollArrayWrapper向epoll註冊中斷事件
void initInterrupt(int fd0, int fd1) { outgoingInterruptFD = fd1; incomingInterruptFD = fd0; epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN); }
③ fdToKey:構建文件描述符-SelectorKeyImpl映射表
④ EPollSelectorImpl還持有已經註冊到selector的Channel的SelectionKey。
EPollSelectorImpl —> SelectorImpl
public abstract class SelectorImpl extends AbstractSelector { // The set of keys with data ready for an operation protected Set<SelectionKey> selectedKeys; // The set of keys registered with this Selector protected HashSet<SelectionKey> keys;
EPollArrayWrapper完成了對epoll文件描述符的構建,以及對linux系統的epoll指令操縱的封裝。維護每次selector.select(…)的結果,即epoll_wait結果的epoll_event數組。
EPollArrayWrapper操縱了一個linux系統下epoll_event結構的本地數組。
* typedef union epoll_data { * void *ptr; * int fd; * __uint32_t u32; * __uint64_t u64; * } epoll_data_t; * * struct epoll_event { * __uint32_t events; * epoll_data_t data; * };
epoll_event結構包含的數據成員(epoll_data_t data)和經過epoll_ctl註冊到epoll的文件描述符是同樣的。這裏data.fd爲咱們註冊的文件描述符。這樣咱們在處理事件的時候就能夠使用文件描述符。
EPollArrayWrapper將Linux的epoll相關係統調用封裝成了native方法供EpollSelectorImpl使用。
private native int epollCreate(); private native void epollCtl(int epfd, int opcode, int fd, int events); private native int epollWait(long pollAddress, int numfds, long timeout, int epfd) throws IOException;
上述三個native方法就對應Linux下epoll相關的三個系統調用
// The fd of the epoll driver private final int epfd; // The epoll_event array for results from epoll_wait private final AllocatedNativeObject pollArray; // Base address of the epoll_event array private final long pollArrayAddress;
EPollArrayWrapper() throws IOException { // creates the epoll file descriptor epfd = epollCreate(); // the epoll_event array passed to epoll_wait int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT; pollArray = new AllocatedNativeObject(allocationSize, true); pollArrayAddress = pollArray.address(); }
EPoolArrayWrapper構造函數,建立了epoll文件描述符。構建了一個用於存放epoll_wait返回結果的epoll_event數組。
返回ServerSocketChannelImpl對象,構建linux系統下ServerSocket的文件描述符。 ServerSocketChannelImpl:
// Our file descriptor private final FileDescriptor fd; // fd value needed for dev/poll. This value will remain valid // even after the value in the file descriptor object has been set to -1 private int fdVal;
ServerSocketChannelImpl(SelectorProvider sp) throws IOException { super(sp); this.fd = Net.serverSocket(true); this.fdVal = IOUtil.fdVal(fd); this.state = ST_INUSE; }
ServerSocketChannelImpl (其實是AbstractSelectableChannel) 中持有全部已經註冊到selector的SelectionKey對象,以下:
// Keys that have been created by registering this channel with selectors. // They are saved because if this channel is closed the keys must be // deregistered. Protected by keyLock. // private SelectionKey[] keys = null;
public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException { synchronized (regLock) { if (!isOpen()) throw new ClosedChannelException(); if ((ops & ~validOps()) != 0) throw new IllegalArgumentException(); if (blocking) throw new IllegalBlockingModeException(); SelectionKey k = findKey(sel); if (k != null) { k.interestOps(ops); k.attach(att); } if (k == null) { // New registration synchronized (keyLock) { if (!isOpen()) throw new ClosedChannelException(); k = ((AbstractSelector)sel).register(this, ops, att); addKey(k); } } return k; } }
將事件註冊到Selector中,並將SelectionKey放入ServerSocketChannel中的SelectionKey集合中。
👇 SelectorImpl. register
protected final SelectionKey register(AbstractSelectableChannel ch, int ops, Object attachment) { if (!(ch instanceof SelChImpl)) throw new IllegalSelectorException(); SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this); k.attach(attachment); synchronized (publicKeys) { implRegister(k); } k.interestOps(ops); return k; }
EPollSelectorImpl. implRegister
protected void implRegister(SelectionKeyImpl ski) { if (closed) throw new ClosedSelectorException(); SelChImpl ch = ski.channel; int fd = Integer.valueOf(ch.getFDVal()); fdToKey.put(fd, ski); pollWrapper.add(fd); keys.add(ski); }
① 將channel對應的fd(文件描述符)和對應的selectionKey放到fdToKey映射表中。
② 將channel對應的fd(文件描述符)添加到pollWrapper中,並初始化fd的事件爲0 ( 強制初始更新事件爲0,由於該事件可能存在於以前被殺死的註冊。)
③ 將selectionKey所對應的channel的文件描述符加入到pollWrapper中
④ 將selectionKey放入到 SelectionKey HashSet中。
⑤ k.interestOps(int)也會調用調EPollSelectorImpl的putEventOps(…)將事件存儲到EPollArrayWrapper對象的eventsLow或eventsHigh中。
SelectionKeyImpl:
public class SelectionKeyImpl extends AbstractSelectionKey { final SelChImpl channel; // package-private public final SelectorImpl selector; // Index for a pollfd array in Selector that this key is registered with private int index; private volatile int interestOps; private int readyOps;
維護了channel (ServerSocketChannel or SocketChannel )和selector的關聯關係,以及interesOps和readOps。
public int select() throws IOException { return select(0); }
最終會調用到EPollSelectorImpl的doSelect
protected int doSelect(long timeout) throws IOException { if (closed) throw new ClosedSelectorException(); processDeregisterQueue(); try { begin(); pollWrapper.poll(timeout); } finally { end(); } processDeregisterQueue(); int numKeysUpdated = updateSelectedKeys(); if (pollWrapper.interrupted()) { // Clear the wakeup pipe pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0); synchronized (interruptLock) { pollWrapper.clearInterrupted(); IOUtil.drain(fd0); interruptTriggered = false; } } return numKeysUpdated; }
先來看processDeregisterQueue():
void processDeregisterQueue() throws IOException { Set var1 = this.cancelledKeys(); synchronized(var1) { if (!var1.isEmpty()) { Iterator var3 = var1.iterator(); while(var3.hasNext()) { SelectionKeyImpl var4 = (SelectionKeyImpl)var3.next(); try { this.implDereg(var4); } catch (SocketException var12) { IOException var6 = new IOException("Error deregistering key"); var6.initCause(var12); throw var6; } finally { var3.remove(); } } } } }
protected void implDereg(SelectionKeyImpl ski) throws IOException { assert (ski.getIndex() >= 0); SelChImpl ch = ski.channel; int fd = ch.getFDVal(); fdToKey.remove(Integer.valueOf(fd)); pollWrapper.remove(fd); ski.setIndex(-1); keys.remove(ski); selectedKeys.remove(ski); deregister((AbstractSelectionKey)ski); SelectableChannel selch = ski.channel(); if (!selch.isOpen() && !selch.isRegistered()) ((SelChImpl)selch).kill(); }
該方法會處理已經註銷的SelectionKey集合:
① 將已經註銷的selectionKey從fdToKey( 文件描述與SelectionKeyImpl的映射表 )中移除
② 將selectionKey所表明的channel的文件描述符從pollWrapper中移除
③ 將selectionKey從selectionKey集合中移除,這樣下次selector.select()就不會再講該selectionKey註冊到epoll中監聽
④ 也會將selectionKey從對應的channel中註銷
⑤ 最後若是對應的channel已經關閉而且沒有註冊其餘的selector了,則將該channel關閉
接着咱們來看EPollArrayWrapper.poll(timeout):
int poll(long timeout) throws IOException { updateRegistrations(); updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd); for (int i=0; i<updated; i++) { if (getDescriptor(i) == incomingInterruptFD) { interruptedIndex = i; interrupted = true; break; } } return updated; }
updateRegistrations()方法會將已經註冊到該selector的事件(eventsLow或eventsHigh)經過調用epollCtl(epfd, opcode, fd, events); 註冊到linux系統中。
這裏epollWait就會調用linux底層的epoll_wait方法,並返回在epoll_wait期間有事件觸發的entry的個數
再看updateSelectedKeys():
private int updateSelectedKeys() { int entries = pollWrapper.updated; int numKeysUpdated = 0; for (int i=0; i<entries; i++) { int nextFD = pollWrapper.getDescriptor(i); SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD)); // ski is null in the case of an interrupt if (ski != null) { int rOps = pollWrapper.getEventOps(i); if (selectedKeys.contains(ski)) { if (ski.channel.translateAndSetReadyOps(rOps, ski)) { numKeysUpdated++; } } else { ski.channel.translateAndSetReadyOps(rOps, ski); if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) { selectedKeys.add(ski); numKeysUpdated++; } } } } return numKeysUpdated; }
該方法會從經過EPollArrayWrapper中獲取到有事件觸發的SelectionKeyImpl對象,而後將SelectionKeyImpl放到selectedKey集合( 有事件觸發的selectionKey集合,能夠經過selector.selectedKeys()方法得到 )中,即selectedKeys。並設置SelectionKeyImpl中相關的readyOps值。
可是,這裏要注意兩點:
① 若是SelectionKeyImpl發現觸發的事件已經存在於readyOps中了,則不會使numKeysUpdated++;這樣會使得咱們沒法得知該事件的變化
② 若是SelectionKeyImpl已經存在於selectedKey集合中,則不會講該事件加入到readyOps中,也不會使numKeysUpdated++
👆以上兩點都說明,爲何咱們要在每次從selectedKey中獲取到Selectionkey後,將其從selectedKey集合移除,就是爲了當有事件觸發使selectionKey能正確到放入selectedKey集合中,並正確的通知給調用者。
epoll是Linux下的一種IO多路複用技術,能夠很是高效的處理數以百萬計的socket句柄。
先看看使用c封裝的3個epoll系統調用:
大概看看epoll內部是怎麼實現的:
epoll的兩種工做模式:
socket讀數據
socket寫數據
最後順便說下在Linux系統中JDK NIO使用的是 LT ,而Netty epoll使用的是 ET。
http://www.jianshu.com/p/0d497fe5484a
http://remcarpediem.com/2017/04/02/Netty源碼-三-I-O模型和Java-NIO底層原理/