NIO-Channel接口分析



NIO-Channel接口分析

目錄

NIO-概覽
NIO-Buffer
NIO-Channel
NIO-Channel接口分析java

前言

原本是想學習Netty的,可是Netty是一個NIO框架,所以在學習netty以前,仍是先梳理一下NIO的知識。經過剖析源碼理解NIO的設計原理。微信

本系列文章針對的是JDK1.8.0.161的源碼。網絡

上一篇介紹了Channel的基本使用,下面對Channel的接口進行分析。併發

接口

SCTP協議

SCTP(Stream Control Transmission Protocol)是一種傳輸協議,在TCP/IP協議棧中所處的位置和TCP、UDP相似,兼有TCP/UDP二者特徵。框架

對於SCTP協議這裏不詳細描述,想了解的同窗能夠看下這篇文章
SCTP協議平時用的很少,這裏不作具體討論。異步

UDP協議

NIO使用DatagrmChannel實現了UDP協議的網絡通信。socket

20191207210432.png

下面咱們對各個接口進行分析。tcp

AutoCloseableCloseable分別是自動關閉和主動關閉接口。當資源(如句柄或文件等)須要釋放時,則須要調用close方法釋放資源。高併發

public interface AutoCloseable {
    void close() throws Exception;
}

public interface Closeable extends AutoCloseable {
    void close() throws IOException;
}

Channel是通道接口,針對於I/O相關的操做,須要打開和關閉操做。

public interface Channel extends Closeable {
    boolean isOpen();

    void close() throws IOException;
}

InterruptibleChannel是支持異步關閉和中斷的通道接口。爲了支持Thead的interrupt模型,當線程中斷時,能夠執行中斷處理對象的回調,從而關閉釋放Channel。

public interface InterruptibleChannel extends Channel {
    void close() throws IOException;
}

關於InterruptibleChannel可中斷I/O詳細解析能夠看一下《JDK源碼閱讀-InterruptibleChannel與可中斷IO》

Interruptible是線程中斷接口,即上面提的Thead的interrupt模型。當線程中斷時,則會調用中斷操做。

public abstract interface Interruptible {
    public abstract void interrupt(java.lang.Thread t);
}
public class Thread implements Runnable {
    ...
    public void interrupt() {
        if (this != Thread.currentThread())
            checkAccess();

        synchronized (blockerLock) {
            Interruptible b = blocker;
            if (b != null) {
                interrupt0();           // Just to set the interrupt flag
                b.interrupt(this);
                return;
            }
        }
        interrupt0();
    }
    ...
}

AbstractInterruptibleChannel實現了ChannelInterruptibleChannel接口。

20191208013708.png

  • closeLock是關閉時的鎖
  • open表示channle是否打開
  • interuptorInterruptible中斷回調
  • interrupted爲I/O執行時的線程
public abstract class AbstractInterruptibleChannel implements Channel, InterruptibleChannel {
    ...
    public final void close() throws IOException {
        synchronized(this.closeLock) {
            if (this.open) {
                this.open = false;
                this.implCloseChannel();
            }
        }
    }
    //具體的Channel實現關閉
    protected abstract void implCloseChannel() throws IOException;

    protected final void begin() {
        if (this.interruptor == null) {
            this.interruptor = new Interruptible() {
                //線程中斷時,則會調用該接口關閉Channel
                public void interrupt(Thread target) {
                    synchronized(AbstractInterruptibleChannel.this.closeLock) {
                        if (AbstractInterruptibleChannel.this.open) {
                            AbstractInterruptibleChannel.this.open = false;
                            AbstractInterruptibleChannel.this.interrupted = target;

                            try {
                                AbstractInterruptibleChannel.this.implCloseChannel();
                            } catch (IOException x) {
                            }

                        }
                    }
                }
            };
        }
        //將線程的blockOn設置爲當前interruptor,從而使得線程關閉時能關閉channel
        blockedOn(this.interruptor);
        Thread me = Thread.currentThread();
        if (me.isInterrupted()) {
            this.interruptor.interrupt(me);
        }

    }

    protected final void end(boolean completed)
        throws AsynchronousCloseException
    {
        //I/O結束,清除線程blocker
        blockedOn(null);
        Thread interrupted = this.interrupted;
        if (interrupted != null && interrupted == Thread.currentThread()) {
            interrupted = null;
            throw new ClosedByInterruptException();
        }
        if (!completed && !open)
            throw new AsynchronousCloseException();
    }

    static void blockedOn(Interruptible intr) {
        SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(), intr);
    }
}

AbstractInterruptibleChannel添加了beginend方法。 在I/O操做開始時會調用begin,在I/O操做結束時會調用end。在begin方法內將中斷操做加入到當前線程中。最終會調用到線程的blockOn方法,它會將該中斷接口注入到線程中,使得線程中斷時能夠調用到Channel並釋放相關資源。

public void blockedOn(Thread t, Interruptible b) {
    t.blockedOn(b);
}

SelectableChannel接口聲明瞭Channel是能夠被選擇的,在Windows平臺經過WindowsSelectorImpl實現,Linux經過EPollSelectorImpl實現。此外還有KQueue等實現,關於Selector具體細節在《NIO-Selector》一文中會介紹。

AbstractSelectableChannel實現了SelectableChannel接口。

NetworkChannel適用於網絡傳輸的接口。

public interface NetworkChannel extends Channel {
    //綁定地址
    NetworkChannel bind(SocketAddress var1) throws IOException;

    //獲取本地地址
    SocketAddress getLocalAddress() throws IOException;

    //設置socket選項
    <T> NetworkChannel setOption(SocketOption<T> var1, T var2) throws IOException;
    //獲取socket選項
    <T> T getOption(SocketOption<T> var1) throws IOException;
    //當前通道支持的socket選項
    Set<SocketOption<?>> supportedOptions();
}

MulticastChannel是支持組播接口。

public interface MulticastChannel extends NetworkChannel {
    void close() throws IOException;

    MembershipKey join(InetAddress group, NetworkInterface interf) throws IOException;

    MembershipKey join(InetAddress group, NetworkInterface interf, InetAddress source) throws IOException;
}

SelChImpl接口用於將底層的I/O就緒狀態更新爲就緒事件。

public interface SelChImpl extends Channel {

    FileDescriptor getFD();
    int getFDVal();
    //更新就緒事件
    public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk);
    //設置就緒事件
    public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk);
    //將底層的輪詢操做轉換爲事件
    void translateAndSetInterestOps(int ops, SelectionKeyImpl sk);
    //返回channle支持的操做,好比讀操做、寫操做等
    int validOps();
    void kill() throws IOException;
}

因爲UDP支持讀寫數據,所以還實現了ReadableByteChannelWritableByteChannel接口

public interface ReadableByteChannel extends Channel {
    int read(ByteBuffer dst) throws IOException;
}   

public interface WritableByteChannel extends Channel {
    int write(ByteBuffer src) throws IOException;
}

ByteChannel是支持讀寫的通道。

public interface ByteChannel extends ReadableByteChannel, WritableByteChannel {
}

ScatteringByteChannel則支持根據傳入偏移量讀,支持根據傳入偏移量寫GatheringByteChannel

public interface ScatteringByteChannel extends ReadableByteChannel {
    long read(ByteBuffer[] dsts, int offset, int length) throws IOException;
    long read(ByteBuffer[] dsts) throws IOException;}

public interface GatheringByteChannel extends WritableByteChannel {
    long write(ByteBuffer[] srcs, int offset, int length) throws IOException;
    long write(ByteBuffer[] srcs) throws IOException;
}

TCP協議

客戶端

20191209111550.png

TCP協議除了不支持組播,其餘和UDP是同樣的,再也不重複介紹。

服務端

20191209112800.png

服務端無需數據讀寫,僅須要接收鏈接,數據讀寫是SocketChannel乾的事。所以沒有ReadableByteChannelWriteableByteChannel等讀寫接口

文件

20191209112005.png

文件比網絡協議少了NetworkChannelSelChImplSelectableChannelSelChImplSelectableChannel主要是用於支持選擇器的,因爲網絡傳輸大多數鏈接時空閒的,並且數據什麼時候會到來並不知曉,同時須要支持高併發來鏈接,所以支持多路複用技術能夠顯著的提升性能,而磁盤讀寫則沒有該需求,所以無需選擇器。

SeekableByteChannel能夠經過修改position支持從指定位置讀寫數據。

public interface SeekableByteChannel extends ByteChannel {
    int read(ByteBuffer dst) throws IOException;

    int write(ByteBuffer src) throws IOException;
    
    long position() throws IOException;
    //設置偏移量
    SeekableByteChannel position(long newPosition) throws IOException;

    long size() throws IOException;
    //截取指定大小
    SeekableByteChannel truncate(long size) throws IOException;
}

總結

因爲文章篇幅比較長,所以仍是將接口分析和實現分析分開。本篇文章對Channel的接口進行說明,下一篇將對具體的實現進行分析。

相關文獻

  1. SCTP協議詳解
  2. 史上最強Java NIO入門:擔憂從入門到放棄的,請讀這篇!
  3. Java NIO系列教程
  4. 爲何SCTP沒有被大量使用/知道
  5. JDK源碼閱讀-InterruptibleChannel與可中斷IO
  6. 廣播和組播
  7. 關於AccessController.doPrivileged
  8. ServiceLoader源碼分析
  9. 基於Java的RDMA高性能通訊庫(六):SDP - Java Socket Direct Protocol

20191127212134.png
微信掃一掃二維碼關注訂閱號傑哥技術分享
出處:http://www.javashuo.com/article/p-rvmsctkv-gv.html 做者:傑哥很忙 本文使用「CC BY 4.0」創做共享協議。歡迎轉載,請在明顯位置給出出處及連接。

相關文章
相關標籤/搜索