Socket網絡通訊之NIO

Socket網絡通訊之NIOjava

NIO:new io ,java1.4開始推出的可非阻塞IO。數據庫

java.nio 包,可解決BIO阻塞的不足 但比BIO學習、使用複雜。網絡

能夠以阻塞、非阻塞兩種方式工做。app

能夠在非阻塞模式下,能夠用少許(甚至一個)線程處理大量IO鏈接。框架

Java7推出了 Nio.2  (又稱AIO,異步IO)。異步

1、NIO工做流程以下圖:socket

流程圖如上所示,要理解NioSocket的使用必須先理解三個概念:Selector,Channel和Buffer。舉個例子。大學時有人賣電話卡,提供送貨上門服務,只要有人打電話,他就送過去、收錢在回去,而後等下一個電話,這就至關於普通的Socket處理請求的模式。若是請求不是不少,這是沒有問題的。而像如今的電商配送模式——送快遞就相似於NioSocket。快遞並不會一件一件的送,而是將不少件貨一塊兒拿去送,並且在中轉站都有專門的分揀員負責按配送範圍把貨物分給不一樣的送貨員,這樣效率就提升了不少。Selector就是中轉戰的分揀員,Channel就是送貨員(或者開往某個區域的配貨車),Buffer就是所要送的貨物。
NioSocket使用中首先要建立ServerSocketChannel,而後註冊Selector,接下來就能夠用Selector接收請求並處理了。 
ServerSocketChannel可使用本身的靜態工程方法open建立。每一個ServerSocketChannel對應一個ServerSocket,能夠調用其socket方法來獲取,不過若是直接使用獲取到ServerSocket來監聽請求,那仍是原來的處理模式,通常使用獲取到的ServerSocket來綁定端口。ServerSocketChannel能夠經過configureBlocking方法來設置是否採用阻塞模式,若是要採用非阻塞模式能夠用configureBlocking(false)來設置,設置了非阻塞模式以後就能夠調用register方法註冊Selector來使用了(阻塞模式不可使用Selector)。 
Selector可使用本身的靜態工程方法open建立,建立後經過Channel的register方法註冊到ServerSocketChannel或者SocketChannel上,註冊完以後Selector就能夠經過select方法來等等請求,select方法有一個long類型的參數,表明最長等待時間,若是在這段時間裏接收到了相應操做的請求則返回能夠處理的請求的數量,不然在超時後返回0,程序繼續往下走,若是傳入的參數爲0或者調用無參數的重載方法,select方法會採用阻塞模式直到有相應操做的請求出現。當接收到請求後Selector調用selectedKeys方法返回SelectedKey的集合。 
selectedKey保存了處理當前請求的Channel和Selector,而且提供了不一樣的操做類型。Channel在註冊Selector的時候能夠經過register的第二個參數選擇特定的操做,這裏的操做就是在selectedKey中定義的,一共有4種: 
SelectionKey.OP_ACCEPT 
SelectionKey.OP_CONNECT 
SelectionKey.OP_READ 
SelectionKey.OP_WRITE 
分別表示接收請求操做、鏈接操做、讀操做和寫操做,只有在register方法中註冊了相應的操做Selector纔會關心相應類型操做的請求。 
Channel和Selector並無誰屬於誰的關係,就像數據庫裏的多對多的關係,不過Selecor這個分揀員分揀的更細,它能夠按不一樣類型來分揀,分揀後的結果保存在SelectionKey中,能夠分別經過SelectionKey的channel方法和selector方法來獲取對應的Channel和Selector,並且還能夠經過isAcceptable、isConnectable、isReadable和isWritable方法來判斷是什麼類型的操做。 
NioSocket中服務端的處理過程能夠分爲5步: 
一、建立ServerSocketChannel並設置相應參數 。
二、建立Selector並註冊到ServerSocketChannel上 。
三、調用Selector的select方法等待請求 。
四、Selector接收到請求後使用selectionKeys返回SelectionKey集合 。
五、使用SelectionKey獲取到Channel、Selector和操做類型並進行具體操做。ide

下面具體說說Selector,Channel和Buffer的用法。學習

 2、Selector 選擇器 :this

Selector 選擇器 :非阻塞模式下,一個選擇器可檢測多個SelectableChannel,得到爲讀寫等操做準備好的通道。就不須要咱們用循環去判斷了。經過Selector,一個線程就能夠處理多個Channel,可極大減小線程數。 用cpu核心數量的線程,充分利用cpu資源,又減小線程切換。

Selector 用法:1,建立Selector。Selector selector = Selector.open();

       2,將要交給Selector檢測的SelectableChannel註冊進來。

          (1)channel.configureBlocking(false);   // 注意:必定要設爲非阻塞模式

          (2)SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

          channel.register方法的第二個參數指定要selector幫忙監聽的就緒操做:SelectionKey.OP_CONNECT(可鏈接);SelectionKey.OP_ACCEPT(可接受);SelectionKey.OP_READ(可讀);SelectionKey.OP_WRITE(可寫)。

       3,經過Selector來選擇就緒的Channel,有三個select方法。int n = selector.select();

        (1) int select() //阻塞直到有就緒的Channel。
        (2)int select(long timeout) //阻塞最長多久。
        (3)int selectNow() //不阻塞。 

          三個方法返回值:就緒的Channel數量。 
          注意:select()方法返回當前的就緒數量。
          例:第一次select返回1;第二次select,又一個channel就緒,若是第一個就緒的channel還未被處理,則此時就緒的channel是2個,會返回2。在用線程池異步處理任務時需特別當心,重複選擇!

       4,得到就緒的SelectionKey集合(當有就緒的Channel時)。

        Set<SelectionKey> selectedKeys = selector.selectedKeys();

       5,處理selectedKeys set。  

        Set<SelectionKey> selectedKeys = selector.selectedKeys();
        Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
        while(keyIterator.hasNext()) {
        SelectionKey key = keyIterator.next();
        if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.
        } else if (key.isConnectable()) {
        // a connection was established with a remote server.
        } else if (key.isReadable()) {
        // a channel is ready for reading
        } else if (key.isWritable()) {
        // a channel is ready for writing
        }
        keyIterator.remove(); //處理了,必定要從selectedKey集中移除
        }

 3、Channel 通道:數據的來源或去向目標

Java NIO: Channels read data into Buffers, and Buffers write data into Channels。

一、Channel的實現

  FileChannel 文件通道
  DatagramChannel UDP協議的通道
  SocketChannel 一般通道
  ServerSocketChannel 服務通道

二、各Channel的API方法

  open():建立通道
  read(Buffer):從通道中讀數據放入到buffer
  write(Buffer):將buffer中的數據寫給通道

3、Buffer   緩衝區:數據的臨時存放區

Buffer類型:ByteBuffer、MappedByteBuffer、CharBuffer、DoubleBuffer、FloatBuffer、IntBuffer、LongBuffer、ShortBuffer

Buffer的基本使用步驟:

(1)調用xxxBuffer.allocate(int)建立Buffer
(2)調用put方法往Buffer中寫數據
(3)調用buffer.flip()將buffer轉爲讀模式
(4)讀取buffer中的數據
(5)Call buffer.clear() or buffer.compact()

一、Buffer的操做API 

(1)調用xxxBuffer.allocate(int)建立Buffer
  ByteBuffer buf = ByteBuffer.allocate(48);
  CharBuffer buf = CharBuffer.allocate(1024);
(2)往Buffer中寫數據
  int bytesRead = inChannel.read(buf); //read into buffer.
  buf.put(127);
(3)調用buffer.flip()將buffer轉爲讀模式
  buf.flip(); // 轉爲讀模式,position變爲0
(4)讀取buffer中的數據
  //read from buffer into channel.
  int bytesWritten = inChannel.write(buf);
  byte aByte = buf.get();
(5)讀完後,調用clear()或compact()爲下次寫作好準備
  buf.clear(); //position=0 limit = capacity
  buf.compact(); //整理,將未讀的數據移動到頭部

以下所示爲NioServer和NioClient的代碼

public class NioServer {
    private static Charset charset = Charset.forName("UTF-8");
    private static CharsetDecoder decoder = charset.newDecoder();
    public static void main(String[] args) throws IOException {
        int port = 1104;
        // 極少的線程
        int threads = 3;
        ExecutorService tpool = Executors.newFixedThreadPool(threads);
        // 一、獲得一個selector
        Selector selector = Selector.open();
        try (ServerSocketChannel ssc = ServerSocketChannel.open()) {
            ssc.bind(new InetSocketAddress(port));
            // 2 註冊到selector
            // 要非阻塞
            ssc.configureBlocking(false);
            // ssc向selector 註冊,監聽鏈接到來。
            ssc.register(selector, SelectionKey.OP_ACCEPT);
            // 鏈接計數
            int connectionCount = 0;
            // 三、循環選擇就緒的通道
            while (true) {
                // 阻塞等待就緒的事件
                int readyChannels = selector.select();
                // 由於select()阻塞能夠被中斷
                if (readyChannels == 0) {
                    continue;
                }

                // 取到就緒的key集合
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
                while (keyIterator.hasNext()) {
                    SelectionKey key = keyIterator.next();
                    if (key.isAcceptable()) {
                        // a connection was accepted by a ServerSocketChannel.
                        ServerSocketChannel sssc = (ServerSocketChannel) key.channel();
                        // 接受鏈接
                        SocketChannel cc = sssc.accept();
                        // 請selector 幫忙檢測數據到了
                        // 設置非阻塞
                        cc.configureBlocking(false);
                        // 向selector 註冊
                        cc.register(selector, SelectionKey.OP_READ, ++connectionCount);
                    } else if (key.isConnectable()) {
                        // a connection was established with a remote server.
                    } else if (key.isReadable()) {
                        // a channel is ready for reading
                        // 四、讀取數據進行處理
                        // 交各線程池去處理
                        tpool.execute(new SocketReadProcess(key));
                        // 取消一下注冊,防止線程池處理不及時,沒有註銷掉
                        key.cancel();
                    } else if (key.isWritable()) {
                        // a channel is ready for writing
                    }
                    keyIterator.remove(); // 處理了,必定要從selectedKey集中移除
                }
            }
        }
    }
    
    static class SocketReadProcess implements Runnable {
        SelectionKey key;
        public SocketReadProcess(SelectionKey key) {
            super();
            this.key = key;
        }
        @Override
        public void run() {
            try {
                System.out.println("鏈接" + key.attachment() + "發來:" + readFromChannel());
                // 若是鏈接不須要了,就關閉
                key.channel().close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

        private String readFromChannel() throws IOException {
            SocketChannel sc = (SocketChannel) key.channel();
            int bfsize = 1024;
            ByteBuffer rbf = ByteBuffer.allocateDirect(bfsize);
            // 定義一個更大的buffer
            ByteBuffer bigBf = null;
            // 讀的次數計數
            int count = 0;
            while ((sc.read(rbf)) != -1) {
                count++;
                ByteBuffer temp = ByteBuffer.allocateDirect(bfsize * (count + 1));
                if (bigBf != null) {
                    // 將buffer有寫轉爲讀模式
                    bigBf.flip();
                    temp.put(bigBf);
                }
                bigBf = temp;
                // 將此次讀到的數據放入大buffer
                rbf.flip();
                bigBf.put(rbf);
                // 爲下次讀,清理。
                rbf.clear();
                // 讀出的是字節,要轉爲字符串
            }
            if (bigBf != null) {
                // 轉爲讀模式
                bigBf.flip();
                // 轉成CharBuffer,再轉爲字符串。
                return decoder.decode(bigBf).toString();
            }
            return null;
        }
    }
}
NioServer
public class NioClient {
    static Charset charset = Charset.forName("UTF-8");
    public static void main(String[] args) {
        try (SocketChannel sc = SocketChannel.open();) {
            // 鏈接 會阻塞
            boolean connected = sc.connect(new InetSocketAddress("localhost", 1104));
            System.out.println("connected=" + connected);
            //
            Scanner scanner = new Scanner(System.in);
            System.out.println("請輸入:");
            String mess = scanner.nextLine();
            ByteBuffer bf = ByteBuffer.wrap(mess.getBytes(charset));
            while (bf.hasRemaining()) {
                int writedCount = sc.write(bf);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
NioClient

如上代碼所示,先執行NioServer再執行NioClient,用debug模式分步執行,多個客戶端鏈接的時候,會發現不會阻塞。

 固然對於NIO通訊,還可使用non-blockin模式和更加穩定的java開源框架Netty和MINA。

相關文章
相關標籤/搜索