java io以及unix io模型

IO分兩個階段:
1.通知內核準備數據。2.數據從內核緩衝區拷貝到應用緩衝區

根據這2點IO類型能夠分紅:
1.阻塞IO,在兩個階段上面都是阻塞的。
2.非阻塞IO,在第1階段,程序不斷的輪詢直到數據準備好,第2階段仍是阻塞的
3.IO複用,在第1階段,當一個或者多個IO準備就緒時,通知程序,第2階段仍是阻塞的,在第1階段仍是輪詢實現的,只是
全部的IO都集中在一個地方,這個地方進行輪詢
4.信號IO,當數據準備完畢的時候,信號通知程序數據準備完畢,第2階段阻塞
5.異步IO,1,2都不阻塞,windows的iocp是真正的異步IO


Java NIO 
java NIO在linux上面是用epoll實現的,屬於IO複用類型。
Selector:IO的多路複用器,通道須要向其註冊,在數據準備階段由他進行狀態的輪詢
SelectionKey:通道向selector註冊後會建立一個SelectionKey,SelectionKey維繫通道和selector的關係.SelectionKey包含兩個整數集一個爲interest集合,一個爲ready集合.interest集合指定Selector須要監聽的事件.ready集合爲Selector爲SelectorKey監聽後已經準備就緒的能夠進行操做的事件.ready集合特別須要注意,這個裏面可能有阻塞的行爲,如OP_READ事件,只是暗示可讀,可是真正的數據此時尚未到來,此時就會阻塞了。

epoll有三個方法epoll_create(),epoll_ctl(),epoll_wait():
int epoll_create(int size)
        建立epoll文件,用於存放epoll_ctl註冊的event。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
        註冊fd到epfd中,op爲操做數(add,mod,delete) ,epoll_event爲註冊感興趣的事件,這個裏面也註冊了回調函數,當對應的fd的設備ready時,就調用回調函數,將這個fd加入epfd的ready set當中,epoll_wait()一直就在那裏等待ready set。
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
        等待ready set,當準備好有數據的時候返回數據的個數,epoll_event爲感興趣的事件集合,maxevents爲事件集合的個數。

JDK中向Selector註冊事件流程(以linux epoll爲例):
關鍵代碼以下:
AbstractSelectableChannel.register():
    public final SelectionKey register(Selector sel, int ops,
                                       Object att)
        throws ClosedChannelException
    {
        if (!isOpen()) //channel關閉拋異常
            throw new ClosedChannelException();
        if ((ops & ~validOps()) != 0)//不合理的註冊值,拋異常
            throw new IllegalArgumentException();
        synchronized (regLock) {//有鎖就有可能有線程的阻塞和切換  關鍵點1
            if (blocking)
                throw new IllegalBlockingModeException();
            SelectionKey k = findKey(sel); //查看原來有沒有註冊過
            if (k != null) {//註冊過直接設置後返回
                k.interestOps(ops);
                k.attach(att);
            }
            if (k == null) {
                // New registration
                k = ((AbstractSelector)sel).register(this, ops, att);//沒有註冊的話執行selector的register
                addKey(k);
            }
            return k;
        }
    }


SelectionImpl.register():
    protected final SelectionKey register(AbstractSelectableChannel ch,
                                          int ops,
                                          Object p_w_upload)
    {
        if (!(ch instanceof SelChImpl))
            throw new IllegalSelectorException();
        SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
        k.attach(p_w_upload);
        synchronized (publicKeys) {//又是鎖,可能阻塞或者線程切換
            implRegister(k);
        }
        k.interestOps(ops);
        return k;
    }


EPollSelectorImpl.implRegister():
protected void implRegister(SelectionKeyImpl ski) {
        SelChImpl ch = ski.channel;
        fdToKey.put(Integer.valueOf(ch.getFDVal()), ski);//關鍵點2 fd和Selectionkey對應的map
        pollWrapper.add(ch);
        keys.add(ski);
}

EPollArrayWrapper.add():
void add(SelChImpl channel) {
        synchronized (updateList) {
            updateList.add(new Updator(channel, EPOLL_CTL_ADD));//關鍵點3
        }
}

關鍵點1.register()方法中有同步語句,可能當前線程就阻塞了,線程切換會有性能的損耗。
關鍵點2.fdToKey是個key爲fd,value爲selectionKey的map
關鍵點3.Updator是個內部類:
    private static class Updator {
        SelChImpl channel;
        int opcode;
        int events;
        Updator(SelChImpl channel, int opcode, int events) {
            this.channel = channel;
            this.opcode = opcode;
            this.events = events;
        }
        Updator(SelChImpl channel, int opcode) {
            this(channel, opcode, 0);
        }
    }
表明channel以及對channel的操做類型以及操做的事件,新註冊的操做類型爲add,操做事件爲0,表明沒有事件。
在SelectionImpl.register()中最後還要執行SelectionKeyImpl.interestOps()方法註冊操做事件(前面添加的時候操做事件是爲0的)
SelectionKeyImpl:
  public SelectionKey interestOps(int ops) {
        ensureValid();//檢測是否可用
        return nioInterestOps(ops);
    }


    SelectionKey nioInterestOps(int ops) {      // package-private
        if ((ops & ~channel().validOps()) != 0)
            throw new IllegalArgumentException();
        //真正的進行epoll感興趣事件的註冊
        channel.translateAndSetInterestOps(ops, this);
        interestOps = ops;
        return this;
    }


    public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
        int newOps = 0;
        //進行java的ops和epoll的ops轉換,每一個具體的channel是有區別的
        if ((ops & SelectionKey.OP_READ) != 0)
            newOps |= PollArrayWrapper.POLLIN;
        if ((ops & SelectionKey.OP_WRITE) != 0)
            newOps |= PollArrayWrapper.POLLOUT;
        if ((ops & SelectionKey.OP_CONNECT) != 0)
            newOps |= PollArrayWrapper.POLLCONN;
        sk.selector.putEventOps(sk, newOps);
    }

   void setInterest(SelChImpl channel, int mask) {

        synchronized (updateList) {
            // if the previous pending operation is to add this file descriptor
            // to epoll then update its event set
            if (updateList.size() > 0) {//關鍵點1
                Updator last = updateList.getLast();
                //這個確定是剛纔那個註冊的channel,直接進行事件的更新
                if (last.channel == channel && last.opcode == EPOLL_CTL_ADD) {
                    last.events = mask;
                    return;
                }
            }

            // update existing registration 程序運行到這裏的話,說明前面已經有更新的updator加入了,這裏只好新加入一個
            updateList.add(new Updator(channel, EPOLL_CTL_MOD, mask));
        }
    }
關鍵點1.這裏是考慮到併發的問題了,可是我有個疑問爲何註冊要分兩個步驟執行,爲何不直接在EPollSelectorImpl.implRegister()加入 updateList.add(new Updator(channel, EPOLL_CTL_ADD, mask))呢?
至此register()動做已經完成。


select()

代碼以下:
    public int select(long timeout)
        throws IOException
    {
        if (timeout < 0)
            throw new IllegalArgumentException("Negative timeout");
        return lockAndDoSelect((timeout == 0) ? -1 : timeout);
    }

    private int lockAndDoSelect(long timeout) throws IOException {
        synchronized (this) {
            if (!isOpen())
                throw new ClosedSelectorException();
            synchronized (publicKeys) {
                synchronized (publicSelectedKeys) {
                    return doSelect(timeout);
                }
            }
        }
    }
   protected int doSelect(long timeout)
        throws IOException
    {
        if (closed)
            throw new ClosedSelectorException();

        //反註冊過程 刪除取消的通道的key(interest keys,ready keys(也就是selected keys),channel的key set)
        processDeregisterQueue();
        try {
            begin();
            pollWrapper.poll(timeout);//關鍵點1
        } finally {
            end();
        }
        processDeregisterQueue();
        int numKeysUpdated = updateSelectedKeys();
        if (pollWrapper.interrupted()) {
            // Clear the wakeup pipe
            pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
            synchronized (interruptLock) {
                pollWrapper.clearInterrupted();
                IOUtil.drain(fd0);
                interruptTriggered = false;
            }
        }
        return numKeysUpdated;
    }


      void processDeregisterQueue() throws IOException {
        // Precondition: Synchronized on this, keys, and selectedKeys
        //遍歷取消的key
        Set cks = cancelledKeys();
        synchronized (cks) {
            Iterator i = cks.iterator();
            while (i.hasNext()) {
                SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
                try {
                     //底層的取消實現
                    implDereg(ski);
                } catch (SocketException se) {
                    IOException ioe = new IOException(
                        "Error deregistering key");
                    ioe.initCause(se);
                    throw ioe;
                } finally {
                    //刪除取消的key
                    i.remove();
                }
            }
        }
    }

       protected void implDereg(SelectionKeyImpl ski) throws IOException {
        assert (ski.getIndex() >= 0);
        SelChImpl ch = ski.channel;
        int fd = ch.getFDVal();
        fdToKey.remove(new Integer(fd));//hashmap中去除
        pollWrapper.release(ch);//這步很關鍵
        ski.setIndex(-1);
        keys.remove(ski);//總的key set去除
        selectedKeys.remove(ski);//已經準備好的set去除
        deregister((AbstractSelectionKey)ski);//移除channel的集合
        SelectableChannel selch = ski.channel();
        if (!selch.isOpen() && !selch.isRegistered())
            ((SelChImpl)selch).kill();
    }



    void release(SelChImpl channel) {

        //空閒隊列刪除了
        synchronized (updateList) {
            // flush any pending updates
            for (Iterator<Updator> it = updateList.iterator(); it.hasNext();) {
                if (it.next().channel == channel) {
                    it.remove();
                }
            }

            // remove from the idle set (if present)
            idleSet.remove(channel);

             //調用native 通知本channel對應的fd被被刪除
            // remove from epoll (if registered)
            epollCtl(epfd, EPOLL_CTL_DEL, channel.getFDVal(), 0);
        }
    }
看關鍵點1的代碼:
    int poll(long timeout) throws IOException {
        updateRegistrations();//在poll前 先把update裏面不須要的條目處理掉
        updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
        for (int i=0; i<updated; i++) {
            if (getDescriptor(i) == incomingInterruptFD) {
                interruptedIndex = i;
                interrupted = true;
                break;
            }
        }
        return updated;
    }


    void updateRegistrations() {
        synchronized (updateList) {
            Updator u = null;
            while ((u = updateList.poll()) != null) {
                SelChImpl ch = u.channel;
                if (!ch.isOpen())
                    continue;

                // if the events are 0 then file descriptor is put into "idle
                // set" to prevent it being polled
                if (u.events == 0) {//這個表示interest事件爲0
                    boolean added = idleSet.add(u.channel); //關鍵點1
                    //先加入到idleSet裏面 若是是此次加入的 並且操做行爲是mod,那麼就是個刪除這個channel對應的fd
                    // if added to idle set then remove from epoll if registered
                    if (added && (u.opcode == EPOLL_CTL_MOD))
                        epollCtl(epfd, EPOLL_CTL_DEL, ch.getFDVal(), 0);
                } else {
                    //關鍵點 2
                    // events are specified. If file descriptor was in idle set
                    // it must be re-registered (by converting opcode to ADD)
                    boolean idle = false;
                    //若是idleSet不爲空並且有這個Updator  說明關鍵點1處代碼返回true,操做行爲爲add,mod的話在epollCtl會被刪除掉
                    if (!idleSet.isEmpty())
                        idle = idleSet.remove(u.channel);
                    int opcode = (idle) ? EPOLL_CTL_ADD : u.opcode;
                    epollCtl(epfd, opcode, ch.getFDVal(), u.events);
                }
            }
        }
    }

updateList是在EPollArrayWrapper.setInterest()和add()方法中添加的,當有多個線程的時候,經過channel register()有可能剛加入的Updator會被updateRegistrations()獲得,獲得的就是Channel第一次register的updatot,這個時候events爲0,被加入到idleSet接着setInterest()被調用(channel register()最後一步),多了一個updator,這個時候再執行Selector.select(),顯然會到關鍵點2操做行爲add,是沒有問題的。執行完updateRegistrations()方法,而後就epollWait()方法的調用,這個就是epoll的native方法了

總結:
1.Channel的register方法最後加入channel的感興趣的事件到updatorList中
2.Selector的select的方法主要是對updatorList進行運做,首先去除全部cancelkey(),也就刪除了對應的底層的updatorList的條目,而後迭代updatorList根據updator的event事件進行處理,也就是執行epoll的epollCtl方法,以後就是執行epollWait等待epollCtl的channel對應的callback函數的執行了。
相關文章
相關標籤/搜索