Selector 實現原理

本文主要經過對Selector的使用流程講解來展開其中的實現原理。
首先先來段Selector最簡單使用片斷java

ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        int port = 5566;
        serverChannel.socket().bind(new InetSocketAddress(port));
        Selector selector = Selector.open();
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        while(true){
            int n = selector.select();
            if(n > 0) {
                Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                while (iter.hasNext()) {
                    SelectionKey selectionKey = iter.next();
                    ......
                    iter.remove();
                }
            }
        }

SocketChannel、ServerSocketChannel和Selector的實例初始化都經過SelectorProvider類實現,其中Selector是整個NIO Socket的核心實現。linux

ServerSocketChannel.open();數組

public static ServerSocketChannel open() throws IOException {
        return SelectorProvider.provider().openServerSocketChannel();
    }

SocketChannel.open();緩存

public static SocketChannel open() throws IOException {
        return SelectorProvider.provider().openSocketChannel();
    }

Selector.open();app

public static Selector open() throws IOException {
        return SelectorProvider.provider().openSelector();
    }

咱們來進一步的瞭解下SelectorProvider.provider()

public static SelectorProvider provider() {
        synchronized (lock) {
            if (provider != null)
                return provider;
            return AccessController.doPrivileged(
                new PrivilegedAction<>() {
                    public SelectorProvider run() {
                            if (loadProviderFromProperty())
                                return provider;
                            if (loadProviderAsService())
                                return provider;
                            provider = sun.nio.ch.DefaultSelectorProvider.create();
                            return provider;
                        }
                    });
        }
    }

若是配置了「java.nio.channels.spi.SelectorProvider」屬性,則經過該屬性值load對應的SelectorProvider對象,若是構建失敗則拋異常。
若是SystemClassLoader中已經加載過了SelectorProvider類,則是直接使用。不然從系統類加載器中獲取失敗,則拋異常。
若是上面兩種狀況都不存在,則返回系統默認的SelectorProvider,即,sun.nio.ch.DefaultSelectorProvider.create();
隨後在調用該方法,即SelectorProvider.provider()。則返回第一次調用的結果。socket

不一樣系統對應着不一樣的sun.nio.ch.DefaultSelectorProvider ide

這裏咱們看linux下面的sun.nio.ch.DefaultSelectorProvider函數

public class DefaultSelectorProvider {

    /**
     * Prevent instantiation.
     */
    private DefaultSelectorProvider() { }

    /**
     * Returns the default SelectorProvider.
     */
    public static SelectorProvider create() {
        return new sun.nio.ch.EPollSelectorProvider();
    }

}

能夠看見,linux系統下sun.nio.ch.DefaultSelectorProvider.create(); 會生成一個sun.nio.ch.EPollSelectorProvider類型的SelectorProvider,這裏對應於linux系統的epollui

接下來看下 selector.open():

/**
     * Opens a selector.
     *
     * <p> The new selector is created by invoking the {[@link](https://my.oschina.net/u/393)
     * java.nio.channels.spi.SelectorProvider#openSelector openSelector} method
     * of the system-wide default {[@link](https://my.oschina.net/u/393)
     * java.nio.channels.spi.SelectorProvider} object.  </p>
     *
     * [@return](https://my.oschina.net/u/556800)  A new selector
     *
     * [@throws](https://my.oschina.net/throws)  IOException
     *          If an I/O error occurs
     */
    public static Selector open() throws IOException {
        return SelectorProvider.provider().openSelector();
    }

在獲得sun.nio.ch.EPollSelectorProvider後調用openSelector()方法構建Selector,這裏會構建一個EPollSelectorImpl對象。this

EPollSelectorImpl

class EPollSelectorImpl
    extends SelectorImpl
{

    // File descriptors used for interrupt
    protected int fd0;
    protected int fd1;

    // The poll object
    EPollArrayWrapper pollWrapper;

    // Maps from file descriptors to keys
    private Map<Integer,SelectionKeyImpl> fdToKey;
EPollSelectorImpl(SelectorProvider sp) throws IOException {
        super(sp);
        long pipeFds = IOUtil.makePipe(false);
        fd0 = (int) (pipeFds >>> 32);
        fd1 = (int) pipeFds;
        try {
            pollWrapper = new EPollArrayWrapper();
            pollWrapper.initInterrupt(fd0, fd1);
            fdToKey = new HashMap<>();
        } catch (Throwable t) {
            try {
                FileDispatcherImpl.closeIntFD(fd0);
            } catch (IOException ioe0) {
                t.addSuppressed(ioe0);
            }
            try {
                FileDispatcherImpl.closeIntFD(fd1);
            } catch (IOException ioe1) {
                t.addSuppressed(ioe1);
            }
            throw t;
        }
    }

EPollSelectorImpl構造函數完成:
     ① EPollArrayWrapper的構建,EpollArrayWapper將Linux的epoll相關係統調用封裝成了native方法供EpollSelectorImpl使用。
     ② 經過EPollArrayWrapper向epoll註冊中斷事件

void initInterrupt(int fd0, int fd1) {
        outgoingInterruptFD = fd1;
        incomingInterruptFD = fd0;
        epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
    }

③ fdToKey:構建文件描述符-SelectorKeyImpl映射表
④ EPollSelectorImpl還持有已經註冊到selector的Channel的SelectionKey。
EPollSelectorImpl —>  SelectorImpl

public abstract class SelectorImpl
    extends AbstractSelector
{

    // The set of keys with data ready for an operation
    protected Set<SelectionKey> selectedKeys;

    // The set of keys registered with this Selector
    protected HashSet<SelectionKey> keys;

EPollArrayWrapper

EPollArrayWrapper完成了對epoll文件描述符的構建,以及對linux系統的epoll指令操縱的封裝。維護每次selector.select(…)的結果,即epoll_wait結果的epoll_event數組。
EPollArrayWrapper操縱了一個linux系統下epoll_event結構的本地數組。

* typedef union epoll_data {
*     void *ptr;
*     int fd;
*     __uint32_t u32;
*     __uint64_t u64;
*  } epoll_data_t;
*
* struct epoll_event {
*     __uint32_t events;
*     epoll_data_t data;
* };

epoll_event結構包含的數據成員(epoll_data_t data)和經過epoll_ctl註冊到epoll的文件描述符是同樣的。這裏data.fd爲咱們註冊的文件描述符。這樣咱們在處理事件的時候就能夠使用文件描述符。

EPollArrayWrapper將Linux的epoll相關係統調用封裝成了native方法供EpollSelectorImpl使用。

private native int epollCreate();
    private native void epollCtl(int epfd, int opcode, int fd, int events);
    private native int epollWait(long pollAddress, int numfds, long timeout,
                                 int epfd) throws IOException;

上述三個native方法就對應Linux下epoll相關的三個系統調用

// The fd of the epoll driver
    private final int epfd;

     // The epoll_event array for results from epoll_wait
    private final AllocatedNativeObject pollArray;

    // Base address of the epoll_event array
    private final long pollArrayAddress;
EPollArrayWrapper() throws IOException {
        // creates the epoll file descriptor
        epfd = epollCreate();

        // the epoll_event array passed to epoll_wait
        int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
        pollArray = new AllocatedNativeObject(allocationSize, true);
        pollArrayAddress = pollArray.address();
    }

EPoolArrayWrapper構造函數,建立了epoll文件描述符。構建了一個用於存放epoll_wait返回結果的epoll_event數組。

ServerSocketChannel.open();

返回ServerSocketChannelImpl對象,構建linux系統下ServerSocket的文件描述符。 ServerSocketChannelImpl:

// Our file descriptor
    private final FileDescriptor fd;

    // fd value needed for dev/poll. This value will remain valid
    // even after the value in the file descriptor object has been set to -1
    private int fdVal;
ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
        super(sp);
        this.fd =  Net.serverSocket(true);
        this.fdVal = IOUtil.fdVal(fd);
        this.state = ST_INUSE;
    }

ServerSocketChannelImpl (其實是AbstractSelectableChannel) 中持有全部已經註冊到selector的SelectionKey對象,以下:

// Keys that have been created by registering this channel with selectors.
    // They are saved because if this channel is closed the keys must be
    // deregistered.  Protected by keyLock.
    //
    private SelectionKey[] keys = null;

serverChannel.register(selector, SelectionKey.OP_ACCEPT);

public final SelectionKey register(Selector sel, int ops,
                                       Object att)
        throws ClosedChannelException
    {
        synchronized (regLock) {
            if (!isOpen())
                throw new ClosedChannelException();
            if ((ops & ~validOps()) != 0)
                throw new IllegalArgumentException();
            if (blocking)
                throw new IllegalBlockingModeException();
            SelectionKey k = findKey(sel);
            if (k != null) {
                k.interestOps(ops);
                k.attach(att);
            }
            if (k == null) {
                // New registration
                synchronized (keyLock) {
                    if (!isOpen())
                        throw new ClosedChannelException();
                    k = ((AbstractSelector)sel).register(this, ops, att);
                    addKey(k);
                }
            }
            return k;
        }
    }

將事件註冊到Selector中,並將SelectionKey放入ServerSocketChannel中的SelectionKey集合中。
👇 SelectorImpl. register

protected final SelectionKey register(AbstractSelectableChannel ch,
                                          int ops,
                                          Object attachment)
    {
        if (!(ch instanceof SelChImpl))
            throw new IllegalSelectorException();
        SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
        k.attach(attachment);
        synchronized (publicKeys) {
            implRegister(k);
        }
        k.interestOps(ops);
        return k;
    }

EPollSelectorImpl. implRegister

protected void implRegister(SelectionKeyImpl ski) {
        if (closed)
            throw new ClosedSelectorException();
        SelChImpl ch = ski.channel;
        int fd = Integer.valueOf(ch.getFDVal());
        fdToKey.put(fd, ski);
        pollWrapper.add(fd);
        keys.add(ski);
    }

① 將channel對應的fd(文件描述符)和對應的selectionKey放到fdToKey映射表中。
② 將channel對應的fd(文件描述符)添加到pollWrapper中,並初始化fd的事件爲0 ( 強制初始更新事件爲0,由於該事件可能存在於以前被殺死的註冊。)
③ 將selectionKey所對應的channel的文件描述符加入到pollWrapper中
④ 將selectionKey放入到 SelectionKey HashSet中。
⑤ k.interestOps(int)也會調用調EPollSelectorImpl的putEventOps(…)將事件存儲到EPollArrayWrapper對象的eventsLow或eventsHigh中。

SelectionKeyImpl:

public class SelectionKeyImpl
    extends AbstractSelectionKey
{

    final SelChImpl channel;                            // package-private
    public final SelectorImpl selector;

    // Index for a pollfd array in Selector that this key is registered with
    private int index;

    private volatile int interestOps;
    private int readyOps;

維護了channel (ServerSocketChannel or SocketChannel )和selector的關聯關係,以及interesOps和readOps。

int n = selector.select();

public int select() throws IOException {
        return select(0);
    }

最終會調用到EPollSelectorImpl的doSelect

protected int doSelect(long timeout) throws IOException {
        if (closed)
            throw new ClosedSelectorException();
        processDeregisterQueue();
        try {
            begin();
            pollWrapper.poll(timeout);
        } finally {
            end();
        }
        processDeregisterQueue();
        int numKeysUpdated = updateSelectedKeys();
        if (pollWrapper.interrupted()) {
            // Clear the wakeup pipe
            pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
            synchronized (interruptLock) {
                pollWrapper.clearInterrupted();
                IOUtil.drain(fd0);
                interruptTriggered = false;
            }
        }
        return numKeysUpdated;
    }

先來看processDeregisterQueue():

void processDeregisterQueue() throws IOException {
        Set var1 = this.cancelledKeys();
        synchronized(var1) {
            if (!var1.isEmpty()) {
                Iterator var3 = var1.iterator();

                while(var3.hasNext()) {
                    SelectionKeyImpl var4 = (SelectionKeyImpl)var3.next();

                    try {
                        this.implDereg(var4);
                    } catch (SocketException var12) {
                        IOException var6 = new IOException("Error deregistering key");
                        var6.initCause(var12);
                        throw var6;
                    } finally {
                        var3.remove();
                    }
                }
            }

        }
    }
protected void implDereg(SelectionKeyImpl ski) throws IOException {
        assert (ski.getIndex() >= 0);
        SelChImpl ch = ski.channel;
        int fd = ch.getFDVal();
        fdToKey.remove(Integer.valueOf(fd));
        pollWrapper.remove(fd);
        ski.setIndex(-1);
        keys.remove(ski);
        selectedKeys.remove(ski);
        deregister((AbstractSelectionKey)ski);
        SelectableChannel selch = ski.channel();
        if (!selch.isOpen() && !selch.isRegistered())
            ((SelChImpl)selch).kill();
    }

該方法會處理已經註銷的SelectionKey集合:
① 將已經註銷的selectionKey從fdToKey( 文件描述與SelectionKeyImpl的映射表 )中移除
② 將selectionKey所表明的channel的文件描述符從pollWrapper中移除
③ 將selectionKey從selectionKey集合中移除,這樣下次selector.select()就不會再講該selectionKey註冊到epoll中監聽
④ 也會將selectionKey從對應的channel中註銷
⑤ 最後若是對應的channel已經關閉而且沒有註冊其餘的selector了,則將該channel關閉

接着咱們來看EPollArrayWrapper.poll(timeout):

int poll(long timeout) throws IOException {
        updateRegistrations();
        updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
        for (int i=0; i<updated; i++) {
            if (getDescriptor(i) == incomingInterruptFD) {
                interruptedIndex = i;
                interrupted = true;
                break;
            }
        }
        return updated;
    }

updateRegistrations()方法會將已經註冊到該selector的事件(eventsLow或eventsHigh)經過調用epollCtl(epfd, opcode, fd, events); 註冊到linux系統中。
這裏epollWait就會調用linux底層的epoll_wait方法,並返回在epoll_wait期間有事件觸發的entry的個數

再看updateSelectedKeys():

private int updateSelectedKeys() {
        int entries = pollWrapper.updated;
        int numKeysUpdated = 0;
        for (int i=0; i<entries; i++) {
            int nextFD = pollWrapper.getDescriptor(i);
            SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
            // ski is null in the case of an interrupt
            if (ski != null) {
                int rOps = pollWrapper.getEventOps(i);
                if (selectedKeys.contains(ski)) {
                    if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
                        numKeysUpdated++;
                    }
                } else {
                    ski.channel.translateAndSetReadyOps(rOps, ski);
                    if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
                        selectedKeys.add(ski);
                        numKeysUpdated++;
                    }
                }
            }
        }
        return numKeysUpdated;
    }

該方法會從經過EPollArrayWrapper中獲取到有事件觸發的SelectionKeyImpl對象,而後將SelectionKeyImpl放到selectedKey集合( 有事件觸發的selectionKey集合,能夠經過selector.selectedKeys()方法得到 )中,即selectedKeys。並設置SelectionKeyImpl中相關的readyOps值。
可是,這裏要注意兩點:
① 若是SelectionKeyImpl發現觸發的事件已經存在於readyOps中了,則不會使numKeysUpdated++;這樣會使得咱們沒法得知該事件的變化
② 若是SelectionKeyImpl已經存在於selectedKey集合中,則不會講該事件加入到readyOps中,也不會使numKeysUpdated++
👆以上兩點都說明,爲何咱們要在每次從selectedKey中獲取到Selectionkey後,將其從selectedKey集合移除,就是爲了當有事件觸發使selectionKey能正確到放入selectedKey集合中,並正確的通知給調用者。

epoll原理

epoll是Linux下的一種IO多路複用技術,能夠很是高效的處理數以百萬計的socket句柄。

先看看使用c封裝的3個epoll系統調用:

  • int epoll_create(int size) epoll_create創建一個epoll對象。參數size是內核保證可以正確處理的最大句柄數,多於這個最大數時內核可不保證效果。
  • int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) epoll_ctl能夠操做epoll_create建立的epoll,如將socket句柄加入到epoll中讓其監控,或把epoll正在監控的某個socket句柄移出epoll。
  • int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout) epoll_wait在調用時,在給定的timeout時間內,所監控的句柄中有事件發生時,就返回用戶態的進程。

大概看看epoll內部是怎麼實現的:

  1. epoll初始化時,會向內核註冊一個文件系統,用於存儲被監控的句柄文件,調用epoll_create時,會在這個文件系統中建立一個file節點。同時epoll會開闢本身的內核高速緩存區,以紅黑樹的結構保存句柄,以支持快速的查找、插入、刪除。還會再創建一個list鏈表,用於存儲準備就緒的事件。
  2. 當執行epoll_ctl時,除了把socket句柄放到epoll文件系統裏file對象對應的紅黑樹上以外,還會給內核中斷處理程序註冊一個回調函數,告訴內核,若是這個句柄的中斷到了,就把它放到準備就緒list鏈表裏。因此,當一個socket上有數據到了,內核在把網卡上的數據copy到內核中後,就把socket插入到就緒鏈表裏。
  3. 當epoll_wait調用時,僅僅觀察就緒鏈表裏有沒有數據,若是有數據就返回,不然就sleep,超時時馬上返回。

epoll的兩種工做模式:

  • LT:level-trigger,水平觸發模式,只要某個socket處於readable/writable狀態,不管何時進行epoll_wait都會返回該socket。
  • ET:edge-trigger,邊緣觸發模式,只有某個socket從unreadable變爲readable或從unwritable變爲writable時,epoll_wait纔會返回該socket。

socket讀數據

socket寫數據

最後順便說下在Linux系統中JDK NIO使用的是 LT ,而Netty epoll使用的是 ET。

參考

http://www.jianshu.com/p/0d497fe5484a
http://remcarpediem.com/2017/04/02/Netty源碼-三-I-O模型和Java-NIO底層原理/

相關文章
相關標籤/搜索