BIO到NIO源碼的一些事兒之NIO 上

前言

此篇文章會詳細解讀NIO的功能逐步豐滿的路程,爲Reactor-Netty 庫的講解鋪平道路。java

關於Java編程方法論-Reactor與Webflux的視頻分享,已經完成了Rxjava 與 Reactor,b站地址以下:git

Rxjava源碼解讀與分享:www.bilibili.com/video/av345…github

Reactor源碼解讀與分享:www.bilibili.com/video/av353…編程

場景代入

接上一篇 BIO到NIO源碼的一些事兒之BIO,咱們來接觸NIO的一些事兒。安全

在上一篇中,咱們能夠看到,咱們要作到異步非阻塞,咱們本身進行的是建立線程池同時對部分代碼作timeout的修改來對接客戶端,可是弊端也很清晰,咱們轉換下思惟,這裏舉個場景例子,A班同窗要和B班同窗一塊兒一對一完成任務,每對人拿到的任務是不同的,消耗的時間有長有短,任務由於有獎勵因此同窗們會搶,傳統模式下,A班同窗和B班同窗不經管理話,即使只是一個心跳檢測的任務都得一塊兒,在這種狀況下,客戶端根本不會有數據要發送,只是想告訴服務器本身還活着,這種狀況下,假如B班再來一個同窗作對接的話,就頗有問題了,B班的每個同窗均可以當作服務器端的一個線程。因此,咱們須要一個管理者,因而Selector就出現了,做爲管理者,這裏,咱們每每須要管理同窗們的狀態,是否在等待任務,是否在接收信息,是否在輸出信息等等,Selector更側重於動做,針對於這些狀態標籤來作事情就能夠了,那這些狀態標籤其實也是須要管理的,因而SelectionKey也就應運而生。接着咱們須要對這些同窗進行包裝加強,使之攜帶這樣的標籤。一樣,對於同窗咱們應該進一步解放雙手的,好比給其配臺電腦,這樣,同窗是否是能夠作更多的事情了,那這個電腦在此處就是Buffer的存在了。 因而在NIO中最主要是有三種角色的,Buffer緩衝區,Channel通道,Selector選擇器,咱們都涉及到了,接下來,咱們對其源碼一步步分析解讀。服務器

Channel解讀

賦予Channel可異步可中斷的能力

有上可知,同窗其實都是表明着一個個的Socket的存在,那麼這裏Channel就是對其進行的加強包裝,也就是Channel的具體實現裏應該有Socket這個字段才行,而後具體實現類裏面也是牢牢圍繞着Socket具有的功能來作文章的。那麼,咱們首先來看java.nio.channels.Channel接口的設定:併發

public interface Channel extends Closeable {

    /** * Tells whether or not this channel is open. * * @return {@code true} if, and only if, this channel is open */
    public boolean isOpen();

    /** * Closes this channel. * * <p> After a channel is closed, any further attempt to invoke I/O * operations upon it will cause a {@link ClosedChannelException} to be * thrown. * * <p> If this channel is already closed then invoking this method has no * effect. * * <p> This method may be invoked at any time. If some other thread has * already invoked it, however, then another invocation will block until * the first invocation is complete, after which it will return without * effect. </p> * * @throws IOException If an I/O error occurs */
    public void close() throws IOException;

}
複製代碼

此處就是很直接的設定,判斷Channel是不是open狀態,關閉Channel的動做,咱們在接下來會講到ClosedChannelException是如何具體在代碼中發生的。 有時候,一個Channel可能會被異步關閉和中斷,這也是咱們所需求的。那麼要實現這個效果咱們須得設定一個能夠進行此操做效果的接口。達到的具體的效果應該是若是線程在實現這個接口的的Channel中進行IO操做的時候,另外一個線程能夠調用該Channel的close方法。致使的結果就是,進行IO操做的那個阻塞線程會收到一個AsynchronousCloseException異常。異步

一樣,咱們應該考慮到另外一種狀況,若是線程在實現這個接口的的Channel中進行IO操做的時候,另外一個線程可能會調用被阻塞線程的interrupt方法(Thread#interrupt()),從而致使Channel關閉,那麼這個阻塞的線程應該要收到ClosedByInterruptException異常,同時將中斷狀態設定到該阻塞線程之上。socket

這時候,若是中斷狀態已經在該線程設定完畢,此時在其之上的有Channel又調用了IO阻塞操做,那麼,這個Channel會被關閉,同時,該線程會當即受到一個ClosedByInterruptException異常,它的interrupt狀態仍然保持不變。 這個接口定義以下:ide

public interface InterruptibleChannel extends Channel {

    /** * Closes this channel. * * <p> Any thread currently blocked in an I/O operation upon this channel * will receive an {@link AsynchronousCloseException}. * * <p> This method otherwise behaves exactly as specified by the {@link * Channel#close Channel} interface. </p> * * @throws IOException If an I/O error occurs */
    public void close() throws IOException;

}
複製代碼

其針對上面所提到邏輯的具體實現是在java.nio.channels.spi.AbstractInterruptibleChannel進行的,關於這個類的解析,咱們來參考這篇文章InterruptibleChannel 與可中斷 IO

賦予Channel可被多路複用的能力

咱們在前面有說到,Channel能夠被Selector進行使用,而Selector是根據Channel的狀態來分配任務的,那麼Channel應該提供一個註冊到Selector上的方法,來和Selector進行綁定。也就是說Channel的實例要調用register(Selector,int,Object)。注意,由於Selector是要根據狀態值進行管理的,因此此方法會返回一個SelectionKey對象來表示這個channelselector上的狀態。關於SelectionKey,它是包含不少東西的,這裏暫不提。

//java.nio.channels.spi.AbstractSelectableChannel#register
public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException {
        if ((ops & ~validOps()) != 0)
            throw new IllegalArgumentException();
        if (!isOpen())
            throw new ClosedChannelException();
        synchronized (regLock) {
            if (isBlocking())
                throw new IllegalBlockingModeException();
            synchronized (keyLock) {
                // re-check if channel has been closed
                if (!isOpen())
                    throw new ClosedChannelException();
                SelectionKey k = findKey(sel);
                if (k != null) {
                    k.attach(att);
                    k.interestOps(ops);
                } else {
                    // New registration
                    k = ((AbstractSelector)sel).register(this, ops, att);
                    addKey(k);
                }
                return k;
            }
        }
    }
//java.nio.channels.spi.AbstractSelectableChannel#addKey
    private void addKey(SelectionKey k) {
        assert Thread.holdsLock(keyLock);
        int i = 0;
        if ((keys != null) && (keyCount < keys.length)) {
            // Find empty element of key array
            for (i = 0; i < keys.length; i++)
                if (keys[i] == null)
                    break;
        } else if (keys == null) {
            keys = new SelectionKey[2];
        } else {
            // Grow key array
            int n = keys.length * 2;
            SelectionKey[] ks =  new SelectionKey[n];
            for (i = 0; i < keys.length; i++)
                ks[i] = keys[i];
            keys = ks;
            i = keyCount;
        }
        keys[i] = k;
        keyCount++;
    }
複製代碼

一旦註冊到Selector上,Channel將一直保持註冊直到其被解除註冊。在解除註冊的時候會解除Selector分配給Channel的全部資源。 也就是Channel並無直接提供解除註冊的方法,那咱們換一個思路,咱們將Selector上表明其註冊的Key取消不就能夠了。這裏能夠經過調用SelectionKey#cancel()方法來顯式的取消key。而後在Selector下一次選擇操做期間進行對Channel的取消註冊。

//java.nio.channels.spi.AbstractSelectionKey#cancel
    /** * Cancels this key. * * <p> If this key has not yet been cancelled then it is added to its * selector's cancelled-key set while synchronized on that set. </p> */
    public final void cancel() {
        // Synchronizing "this" to prevent this key from getting canceled
        // multiple times by different threads, which might cause race
        // condition between selector's select() and channel's close().
        synchronized (this) {
            if (valid) {
                valid = false;
                //仍是調用Selector的cancel方法
                ((AbstractSelector)selector()).cancel(this);
            }
        }
    }


//java.nio.channels.spi.AbstractSelector#cancel
    void cancel(SelectionKey k) {                       
        synchronized (cancelledKeys) {
            cancelledKeys.add(k);
        }
    }


//在下一次select操做的時候來解除那些要求cancel的key,即解除Channel註冊
//sun.nio.ch.SelectorImpl#select(long)
    @Override
    public final int select(long timeout) throws IOException {
        if (timeout < 0)
            throw new IllegalArgumentException("Negative timeout");
            //重點關注此方法
        return lockAndDoSelect(null, (timeout == 0) ? -1 : timeout);
    }
//sun.nio.ch.SelectorImpl#lockAndDoSelect
    private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout) throws IOException {
        synchronized (this) {
            ensureOpen();
            if (inSelect)
                throw new IllegalStateException("select in progress");
            inSelect = true;
            try {
                synchronized (publicSelectedKeys) {
                    //重點關注此方法
                    return doSelect(action, timeout);
                }
            } finally {
                inSelect = false;
            }
        }
    }
//sun.nio.ch.WindowsSelectorImpl#doSelect
    protected int doSelect(Consumer<SelectionKey> action, long timeout) throws IOException {
        assert Thread.holdsLock(this);
        this.timeout = timeout; // set selector timeout
        processUpdateQueue();
        //重點關注此方法
        processDeregisterQueue();
        if (interruptTriggered) {
            resetWakeupSocket();
            return 0;
        }
        ...
    }

     /** * sun.nio.ch.SelectorImpl#processDeregisterQueue * Invoked by selection operations to process the cancelled-key set */
    protected final void processDeregisterQueue() throws IOException {
        assert Thread.holdsLock(this);
        assert Thread.holdsLock(publicSelectedKeys);

        Set<SelectionKey> cks = cancelledKeys();
        synchronized (cks) {
            if (!cks.isEmpty()) {
                Iterator<SelectionKey> i = cks.iterator();
                while (i.hasNext()) {
                    SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
                    i.remove();

                    // remove the key from the selector
                    implDereg(ski);

                    selectedKeys.remove(ski);
                    keys.remove(ski);

                    // remove from channel's key set
                    deregister(ski);

                    SelectableChannel ch = ski.channel();
                    if (!ch.isOpen() && !ch.isRegistered())
                        ((SelChImpl)ch).kill();
                }
            }
        }
    }
複製代碼

這裏,當Channel關閉時,不管是經過調用Channel#close仍是經過打斷線程的方式來對Channel進行關閉,其都會隱式的取消關於這個Channel的全部的keys,其內部也是調用了k.cancel()

//java.nio.channels.spi.AbstractInterruptibleChannel#close
    /** * Closes this channel. * * <p> If the channel has already been closed then this method returns * immediately. Otherwise it marks the channel as closed and then invokes * the {@link #implCloseChannel implCloseChannel} method in order to * complete the close operation. </p> * * @throws IOException * If an I/O error occurs */
    public final void close() throws IOException {
        synchronized (closeLock) {
            if (closed)
                return;
            closed = true;
            implCloseChannel();
        }
    }
//java.nio.channels.spi.AbstractSelectableChannel#implCloseChannel
     protected final void implCloseChannel() throws IOException {
        implCloseSelectableChannel();

        // clone keys to avoid calling cancel when holding keyLock
        SelectionKey[] copyOfKeys = null;
        synchronized (keyLock) {
            if (keys != null) {
                copyOfKeys = keys.clone();
            }
        }

        if (copyOfKeys != null) {
            for (SelectionKey k : copyOfKeys) {
                if (k != null) {
                    k.cancel();   // invalidate and adds key to cancelledKey set
                }
            }
        }
    }
複製代碼

若是Selector自身關閉掉,那麼Channel也會被解除註冊,同時表明Channel註冊的key也將變得無效:

//java.nio.channels.spi.AbstractSelector#close
public final void close() throws IOException {
        boolean open = selectorOpen.getAndSet(false);
        if (!open)
            return;
        implCloseSelector();
    }
//sun.nio.ch.SelectorImpl#implCloseSelector
@Override
public final void implCloseSelector() throws IOException {
    wakeup();
    synchronized (this) {
        implClose();
        synchronized (publicSelectedKeys) {
            // Deregister channels
            Iterator<SelectionKey> i = keys.iterator();
            while (i.hasNext()) {
                SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
                deregister(ski);
                SelectableChannel selch = ski.channel();
                if (!selch.isOpen() && !selch.isRegistered())
                    ((SelChImpl)selch).kill();
                selectedKeys.remove(ski);
                i.remove();
            }
            assert selectedKeys.isEmpty() && keys.isEmpty();
        }
    }
}
複製代碼

一個channel所支持的Ops中,假如支持多個Ops,在特定的selector註冊一次以後便沒法在該selector上重複註冊,也就是在二次調用java.nio.channels.spi.AbstractSelectableChannel#register方法獲得時候,只會進行Ops的改變,並不會從新註冊,由於註冊會產生一個全新的SelectionKey對象。咱們能夠經過調用java.nio.channels.SelectableChannel#isRegistered的方法來肯定是否向一個或多個Selector註冊了channel

//java.nio.channels.spi.AbstractSelectableChannel#isRegistered
 // -- Registration --

    public final boolean isRegistered() {
        synchronized (keyLock) {
            //咱們在以前往Selector上註冊的時候調用了addKey方法,即每次往//一個Selector註冊一次,keyCount就要自增一次。
            return keyCount != 0;
        }
    }
複製代碼

至此,繼承了SelectableChannel這個類以後,這個channel就能夠安全的由多個併發線程來使用。 這裏,要注意的是,繼承了AbstractSelectableChannel這個類以後,新建立的channel始終處於阻塞模式。然而與Selector的多路複用有關的操做必須基於非阻塞模式,因此在註冊到Selector以前,必須將channel置於非阻塞模式,而且在取消註冊以前,channel可能不會返回到阻塞模式。 這裏,咱們涉及了Channel的阻塞模式與非阻塞模式。在阻塞模式下,在Channel上調用的每一個I/O操做都將阻塞,直到完成爲止。 在非阻塞模式下,I/O操做永遠不會阻塞,而且能夠傳輸比請求的字節更少的字節,或者根本不傳輸任何字節。 咱們能夠經過調用channel的isBlocking方法來肯定其是否爲阻塞模式。

//java.nio.channels.spi.AbstractSelectableChannel#register
 public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException {
        if ((ops & ~validOps()) != 0)
            throw new IllegalArgumentException();
        if (!isOpen())
            throw new ClosedChannelException();
        synchronized (regLock) {
     //此處會作判斷,假如是阻塞模式,則會返回true,而後就會拋出異常
            if (isBlocking())
                throw new IllegalBlockingModeException();
            synchronized (keyLock) {
                // re-check if channel has been closed
                if (!isOpen())
                    throw new ClosedChannelException();
                SelectionKey k = findKey(sel);
                if (k != null) {
                    k.attach(att);
                    k.interestOps(ops);
                } else {
                    // New registration
                    k = ((AbstractSelector)sel).register(this, ops, att);
                    addKey(k);
                }
                return k;
            }
        }
    }
複製代碼

因此,咱們在使用的時候能夠基於如下的例子做爲參考:

public NIOServerSelectorThread(int port) {
		try {
			//打開ServerSocketChannel,用於監聽客戶端的鏈接,他是全部客戶端鏈接的父管道
			serverSocketChannel = ServerSocketChannel.open();
			//將管道設置爲非阻塞模式
			serverSocketChannel.configureBlocking(false);
			//利用ServerSocketChannel建立一個服務端Socket對象,即ServerSocket
			serverSocket = serverSocketChannel.socket();
			//爲服務端Socket綁定監聽端口
			serverSocket.bind(new InetSocketAddress(port));
			//建立多路複用器
			selector = Selector.open();
			//將ServerSocketChannel註冊到Selector多路複用器上,而且監聽ACCEPT事件
			serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
			System.out.println("The server is start in port: "+port);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
複製代碼

因時間關係,本篇暫時到這裏,剩下的會在下一篇中進行講解。

相關文章
相關標籤/搜索