NIO(3、Channel)

目錄

NIO(1、概述)
NIO(2、Buffer)
NIO(3、Channel)
NIO(4、Selector)html

Channel

上文說了描述了Buffer的實現機制,那麼這個章節就主要描述數據是如何進入緩衝區的,而且又是如何從緩衝區流出的。java

類圖縱覽及核心類概述

Channel

  這張圖只是簡單歸納了Channel的類圖,固然,Channel的設計遠比這個更復雜:例如SelectableChannel還有SocketChannel和ServerSocketChannel的實現,NetworkChannel繼承Channel並抽象了更多的方法;例如FileChannel,除了繼承AbstractInterruptibleChannel以外,還實現了GatheringByteChannel和ScatteringByteChannel接口。api

  • Channel
      咱們能夠看到,Channel接口自己定義了 close() 和 isOpen() 方法,在繼承Channel的接口中,又分別抽象了讀通道(ReadableByteChannel)、寫通道(WritableByteChannel)及可中斷的異步通道(InterruptibleChannel)接口。讀寫通道天然沒必要說,下文也會有介紹。數組

  • InterruptibleChannel
      這裏說下InterruptibleChannel,這是一個能夠被中斷的異步通道,繼承了 close() 方法。當一個線程在I/O被阻塞時,另外一個線程執行了close()方法,那麼阻塞的線程會拋出 AsynchronousCloseException異常。當一個線程在I/O被阻塞時,另外一個線程調用阻塞的線程中斷(interrupt())方法,那麼將會拋出ClosedByInterruptException異常。瀏覽器

  • AbstractInterruptibleChannel
      AbstractInterruptibleChannel抽象類,這是全部可中斷通道實現的基類,咱們能夠看到,FileChannel正是直接繼承自它,後文會介紹的SocketChannel和ServerSocketChannel繼承自AbstractSelectableChannel抽象類,而AbstractSelectableChannel又繼承自SelectableChannel抽象類,SelectableChannel上圖就能夠看出,一樣繼承AbstractInterruptibleChannel。咱們不得不想一下,爲何FileChannel直接繼承AbstractInterruptibleChannel抽象類,而另外一個經常使用的好比SocketChannel卻須要繼承自SelectableChannel抽象類?此時咱們須要只瞭解SelectableChannel是什麼問題就迎刃而解了。安全

  • SelectableChannel
      第一個章節在概述中提過一個名詞「多路複用」,當時也做了簡單描述:一個線程經過選擇器處理和管理多個通道,利用一個線程的資源去處理多個鏈接。SelectableChannel正是扮演着其中的一個重要角色。而相似SocketChannel同樣實現NetworkChannel接口的通道,這種網絡數據的傳輸尤爲在高併發的壓力下,讓CPU利用率真正體如今處理數據上,而不是頻繁上下文切換的開銷,使用這種機制能明顯提高處理性能,而FileChannel這種對文件操做是絕對不會使用到這種機制的。另外,SelectableChannel在阻塞模型中,每個I/O操做都會阻塞到它完成爲止,在非阻塞模型中,是不會出現阻塞的狀況,一樣返回的字節數可能少於要求或者乾脆沒有,是否阻塞咱們能經過 isBlocking() 方法查看。
      SelectableChannel是經過選擇器(Selector)複用的通道,爲了配合與選擇器一塊兒使用,可以使用 register() 方法,這個方法會返回一個SelectionKey對象,表示已經在選擇器裏註冊了,這個通道會一直保持到註銷爲止,同時也包括已經分配在這個選擇器上的這個通道的資源。通常狀況下,通道本身不能直接在通道上註銷,咱們能夠調用以前註冊時返回的SelectionKey對象的 cancel() 方法,能夠顯式註銷。另外,閱讀代碼咱們發現,幾乎全部的實現方法都有同步控制,因此,在多個併發線程下使用是安全的。服務器

Scatter/Gather

  Channel,它既是緩衝區數據的入口,也是其數據的出口。
  在上面的類圖咱們能夠看到,Channel分別抽象了ReadableByteChannel和WritableByteChannel接口,兩個接口各自定義了 read() 和 write() 方法,讀取和寫入ByteBuffer對象。並在這個基礎上又進一步定義了ScatteringByteChannel和GatheringByteChannel接口,一樣,這兩個接口一樣存在繼承自ReadableByteChannel和WritableByteChannel接口的read() 和 write() 方法,此外,還分別重載了 read() 和 write() 方法,增長了讀取或寫入ByteBuffer數組。
  網絡

  • Scattering Reads
      讀取數據從一個通道到多個緩衝區,這是簡單示意圖:

Scatter

ByteBuffer   buffer1 = ByteBuffer.allocate(1024);
  ByteBuffer   buffer2 = ByteBuffer.allocate(1024);
  ByteBuffer[] buffers = {buffer1, buffer2};
  channel.read(buffers);
  • Gathering Reads
      寫數據從多個緩衝區到一個通道,這是簡單示意圖:

Gather

ByteBuffer   buffer1 = ByteBuffer.allocate(1024);
  ByteBuffer   buffer2 = ByteBuffer.allocate(1024);
  ByteBuffer[] buffers = {buffer1, buffer2};
  channel.write(buffers);

Channel之間數據傳輸

  FileChannel的 transferFrom() 和 transferTo() 兩個方法實現了通道之間的數據傳輸,當有一方是FileChannel時,另外一方實現了 ReadableByteChannel和WritableByteChannel接口的通道就可以與FileChannel傳輸數據。目前爲止,只有文件通道(FileChannel)可以雙向傳輸。併發

  • transferFrom() & transferTo()
FileChannel fromChannel = fromFile.getChannel();
FileChannel toChannel = toFile.getChannel();
// transferFrom
toChannel.transferFrom(fromChannel, position, count);
// transferTo
fromChannel.transferTo(position,count,toChannel);

  咱們在講Buffer的時候就已經說了position,意指讀寫位置,count指的是數據大小。oracle

Channel重要實現

  • FileChannel:操做文件的讀寫
  • SocketChannel:經過TCP讀寫網絡數據
  • ServerSocketChannel:監聽TCP鏈接,你能利用它建立一個最簡單的Web服務器
  • DatagramChannel:經過UDP讀寫網絡數據

SocketChannel & ServerSocketChannel

  建立SocketChannel僅須要調用它的 open() 方法:

SocketChannel channel = SocketChannel.open();

  事實上,可能在實際使用的時候,咱們也會在的句柄對象的處理方法裏,使用SelectionKey的 channel() 方法來獲取,或者ServerSocketChannel對象的 accept() 方法來獲取。

SelectionKey key = ...
  //接受消息處理
  SocketChannel channel = ((ServerSocketChannel) key.channel()).accept();
  //或者
  SocketChannel channel = (SocketChannel) key.channel();

  在咱們手動 open() 打開這個socket以後,其實還未鏈接,這時候咱們對這個通道進行I/O操做會拋出NotYetConnectedException。咱們可使用 connect() 方法來鏈接通道的socket,與此同時,也可使用 isConnected() 判斷是否已鏈接。若是這個Socket在非阻塞模型中,就須要 finishConnect() 來肯定鏈接已完成,也能夠經過 isConnectionPending() 來測定該通道的鏈接是否正在進行中。

// 設置非阻塞
  channel.configureBlocking(false);
  if(channel.isConnected()) {
    channel.connect(new InetSocketAddress(8080));
        while (channel.finishConnect()){
            //connected -  do something
        }
  }

  固然,關於channel的讀寫操做,在講Scatter/Gather時已經講過,這裏再也不贅述。
  每次使用完成以後,都會調用 close() 方法關閉。須要說明的是,這裏的 close() 方法支持異步關閉,當一個線程執行 close() 方法同時,另外一個線程的在同一個通道上執行讀操做時會被阻塞,而後這個讀取操做不會返回任何數據,只會返回-1標識。固然,另外一個線程若是是寫操做的話也一樣會被阻塞,不一樣的是會拋出 AsynchronousCloseException 異常。咱們在上文中描述 InterruptibleChannel 這個可中斷通道接口的時候也提到這個問題。

channel.close();

  建立ServerSocketChannel與SocketChannel相似,不一樣的是ServerSocketChannel是監聽套接字鏈接。因此在建立它的時候,須要將它綁定,若是沒有綁定就執行 accept() 方法,那麼會拋出 NotYetBoundException  異常。

ServerSocketChannel channel = ServerSocketChannel.open();
  channel.socket().bind(new InetSocketAddress(8080));

  監聽到的鏈接可使用 accept() 方法來返回該鏈接的 SocketChannel。執行 accept() 方法會一直阻塞直到有鏈接到達。

SocketChannel channel = ((ServerSocketChannel) key.channel()).accept();

Simple Server

  下面是一段簡單的ServerSocketChannel和SocketChannel應用,能夠看到是如何使用這兩個類的。當瀏覽器輸入localhost:8080的時候,會在控制檯打印出這個鏈接的請求信息。

public static void main(String[] args) throws IOException {
        ServerSocketChannel channel = ServerSocketChannel.open();
        channel.socket().bind(new InetSocketAddress(8080));
        //設置非阻塞
        channel.configureBlocking(false);
        //註冊
        Selector selector = Selector.open();
        channel.register(selector, SelectionKey.OP_ACCEPT);
        //處理器
        Handler handler = new Handler(1024);
        while (true) {
            //等待請求,每次等待阻塞5s,5s後線程繼續向下執行,若是傳入0或者不傳參數將一直阻塞
            if (selector.select(5000) == 0) {
                continue;
            }
            //獲取待處理的SelectionKey
            Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
            while (keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                //當接受到請求
                if (key.isAcceptable()) {
                    handler.handleAccept(key);
                }
                try {
                    //讀數據
                    if (key.isReadable()) {
                        handler.handleRead(key);
                    }
                } catch (IOException e) {
                    keyIterator.remove();
                    e.printStackTrace();
                }
                keyIterator.remove();
            }
        }
    }

    private static class Handler {
        private int    bufferSize   = 1024;
        private String localCharset = "UTF-8";

        public Handler() {
        }

        public Handler(int bufferSize) {
            this.bufferSize = bufferSize;
        }

        public Handler(String localCharset) {
            this.localCharset = localCharset;
        }

        public Handler(int bufferSize, String localCharset) {
            this.bufferSize = bufferSize;
            this.localCharset = localCharset;
        }

        public void handleAccept(SelectionKey key) {
            try {
                SocketChannel channel = ((ServerSocketChannel) key.channel()).accept();
                channel.configureBlocking(false);
                channel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufferSize));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        public void handleRead(SelectionKey key) throws IOException {
            //獲取channel
            SocketChannel channel = (SocketChannel) key.channel();
            //獲取buffer 重置
            ByteBuffer buffer = (ByteBuffer) key.attachment();
            buffer.clear();
            if (channel.read(buffer) == -1) {
                channel.close();
            } else {
                //轉爲讀狀態
                buffer.flip();
                String receivedString = Charset.forName(localCharset)
                        .newDecoder().decode(buffer).toString();

                System.out.printf("接受到客戶端數據" + receivedString);

                //返回數據給客戶端
                String sendString = "接受數據:" + receivedString;
                buffer = ByteBuffer.wrap(sendString.getBytes(localCharset));
                channel.write(buffer);

                channel.close();
            }
        }
    }
相關文章
相關標籤/搜索