Java中的NIO基礎知識

上一篇介紹了五種NIO模型,本篇將介紹Java中的NIO類庫,爲學習netty作好鋪墊java

Java NIO 由3個核心組成,分別是Channels,Buffers,Selectors。本文主要介紹着三個部分。git

Channel

全部的I/O都從一個Channel開始。通道與流不一樣,通道是雙向的,流是單向的。github

便可以從通道中讀取數據,也能夠寫數據到通道里 。編程

讀的話,是從通道讀取數據到緩衝區,寫的話是從緩衝區寫入數據到通道。緩存

四種通道:服務器

  • FileChannel.從文件中讀寫數據
  • DatagramChannel.經過UDP協議,讀寫網絡中的數據
  • SocketChannel,能經過TCP協議來讀寫網絡中數據,經常使用於客戶端
  • ServerSocketChannel。監聽TCP鏈接,對每一個新進來的鏈接會建立一個SocketChannel。

Buffer

Java NIO中的Buffer用於NIO通道進行交互。網絡

緩衝區本質上一塊能夠寫入數據,也能夠從中讀取數據的內存。也就是堆外內存,也叫直接內存。多線程

當向Buffer寫入數據時,Buffer會記錄下寫了多少數據,一旦要讀取數據,須要經過flip()方法將Buffer從寫模式切換到度模式。併發

在讀模式下,能夠讀取以前寫入到Buffer的全部數據。socket

 

一旦讀完了全部數據,就須要狀況緩存區,讓它能夠再次被寫入。有兩種方式能清空緩衝區,調用clear()或者compact()方法。

clear()方法會清空整個緩衝區。compact()方法只會清除已經讀過的數據。任何未讀的數據都被移到緩衝區的起始處,新寫入的數據將放到緩衝區未讀數據的後面。

任何未讀的數據將被移到緩衝區的起始處,新寫入的數據將放大緩衝區未讀數據的後面。

 

 

Buffer的capacity,position和limit

capacity

capacity做爲一個內存塊,buffer有一個固定的大小值,也叫capacity,只能向內存中寫入byte,long,char等類型。一旦Buffer滿了,須要將其清空。

 

position

當寫數據到Buffer中是,position表示當前的位置。初始的position值爲0,當一個byte,long等數據寫到buffer後,position會向前移動到下一個可插入數據的單元。positon最大可謂capacity-1.

當讀取數據時,也是從特定位置讀。將Buffer從寫模式切換到讀模式,positon會被重置0,當從Buffer的position處讀取數據時,position向前移動到想一個能夠讀的位置。

 

limit

在寫模式下,Buffer的limit表示你最多能往Buffer裏寫多少數據。寫模式下,limit等於buffer的capacity

 

Buffer的分配

要想得到一個Buffer對象首先要進行分配。 每個Buffer類都有一個allocate方法。下面是一個分配48字節capacity的ByteBuffer的例子。

ByteBuffer buf = ByteBuffer.allocate(48);

 

Selector

 Selector(選擇器)是Java NIO中可以檢測一到多個NIO通道,並可以檢測到通道是否爲讀寫事件準備好的的組件。因此Selector能夠單個線程處理多個Channel。

爲何使用Selector

Selector可以使用一個線程來處理全部通道。可是對於現在的操做系統和CPU來講,多線程已經較過去效率高了不少。

Selector的建立

1.經過調用Selector.open()方法建立一個Selector

2.將Channel註冊到Selector上配合使用,可以使用Channel.register方法來實現,以下

            servChannel.configureBlocking(false);
            servChannel.register(selector, SelectionKey.OP_ACCEPT);

 

 與Selector一塊兒使用時,Channel必須處於非阻塞模式下。這意味着FileChannel與Selector不能一塊兒使用,由於FileChannel不能切換到非阻塞模式。

3.通道觸發意味着該事件已經就緒。Java中有以下常量對應着通道事件。

  • SelectionKey.OP_CONNECT(鏈接就緒):Channel成功鏈接到另外一個服務器
  • SelectionKey.OP_ACCEPT(接收就緒):Channel準備好接收進入新的鏈接
  • SelectionKey.OP_READ(讀就緒):Channel有數據能夠讀
  • SelectionKey.OP_WRITE(寫就緒):Chanel有數據能夠寫

4.SelectionKey

當向Selector註冊Channel時,register()方法會返回一個SelectionKey對象。這個對象包含interest集合,ready集合,Channel,Selector,附加的對象(可選)。

interest集合是你所選擇的感興趣的事件集合。能夠經過SelectionKey讀寫interest集合。

ready 集合是通道已經準備就緒的操做的集合。在一次選擇(Selection)以後,你會首先訪問這個ready set。

 

用NIO建立的客戶端與服務端:

 

服務端:

package com.nio;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;

public class MutipleexerTimeServer implements Runnable{

    private Selector selector;

    private ServerSocketChannel servChannel;

    private volatile boolean stop = false;

    /**
     * 建立多路複用器,綁定NIO端口
     *
     * @param port
     */
    public MutipleexerTimeServer(int port){
        try{
            selector = Selector.open();
            servChannel = ServerSocketChannel.open();
            servChannel.configureBlocking(false);
            servChannel.register(selector, SelectionKey.OP_ACCEPT);
            servChannel.socket().bind(new InetSocketAddress(port),1024);
            System.out.println("the time server start at port: "+port );
        }catch (Exception e){
            e.printStackTrace();
            System.exit(1);
        }
    }

    public void stop(){
        this.stop = stop;
    }

    @Override
    public void run() {
        while (!stop){
            try {
                // selector每隔一秒喚醒一次
                selector.select(1000);
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectionKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()){
                    key = it.next();
                    it.remove();
                    try {
                        handlerInput(key);
                    }catch (Exception e){
                        if (key !=null){
                            key.cancel();
                            if (key.channel() !=null){
                                key.channel().close();
                            }
                        }
                    }
                }
            }catch (Exception e){
                e.printStackTrace();
            }
        }

        // 多路複用器關閉後,全部註冊到上面的channel和pipe等資源都不被自動去註冊並關閉,全部不須要重複釋放資源
        if (selector!=null){
            try {
                selector.close();
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }


    private void handlerInput(SelectionKey key) throws Exception{
        if (key.isValid()){
            // 處理新接入的請求消息
            if (key.isAcceptable()){
                // Accept the new Connection
                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                // 已完成TCP三次握手
                SocketChannel sc = ssc.accept();
                sc.configureBlocking(false);
                // Add the new connection to the selector
                sc.register(selector,SelectionKey.OP_READ);
            }

            if (key.isReadable()){
                // Read the data
                SocketChannel sc = (SocketChannel) key.channel();
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if (readBytes>0){
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes,"UTF-8");
                    System.out.println("server receive order: "+body);
                    String correntTime = "QUERY".equalsIgnoreCase(body)?new Date(System.currentTimeMillis()).toString():"BAD ORDER";
                    doWrite(sc,correntTime);
                }else if (readBytes<0){
                     // 對端鏈路關閉
                    key.cancel();
                    sc.close();
                }else {
                    ;
                }
            }
        }
    }

    private void doWrite(SocketChannel channel,String response) throws Exception{
        if (response!=null && response.trim().length()>0){
            byte[] bytes = response.getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            writeBuffer.flip();
            channel.write(writeBuffer);
        }
    }
}


package com.nio;
/**
 * 啓動類
 */
public class TimeServer {

    public static void main(String args[]){
        int port = 9816;
        MutipleexerTimeServer timeServer = new MutipleexerTimeServer(port);
        new Thread(timeServer,"nit").start();
    }
}

 

客戶端:

package com.nio.client;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * @author tangj
 * @date 2018/6/14 23:13
 */
public class TimeClientHandle implements Runnable{

    private String host;

    private int port;

    private Selector selector;

    private SocketChannel socketChannel;

    private volatile boolean stop;

    public TimeClientHandle(String host,int port){
        this.host = host;
        this.port = port;
        try{
            selector = Selector.open();
            socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
        }catch (Exception e){
            e.printStackTrace();
            System.exit(-1);
        }
    }

    @Override
    public void run() {
        try{
            doConnect();
        }catch (Exception e){
            e.printStackTrace();
            System.exit(1);
        }

        while (!stop){
            try{
                selector.select(1000);
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectionKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()){
                    key = it.next();
                    it.remove();
                    try {
                        handlerInput(key);
                    }catch (Exception e){
                        if (key!=null){
                            key.cancel();
                            if (key.channel() != null){
                                key.channel().close();
                            }
                        }
                    }
                }
            }catch (Exception e){
                e.printStackTrace();
                System.exit(1);
            }
        }

        // 多路複用器關閉後,全部註冊到上面的channel和pipe等資源都不被自動去註冊並關閉,全部不須要重複釋放資源
        if (selector!=null){
            try {
                selector.close();
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    private void handlerInput(SelectionKey key) throws Exception{
        if (key.isValid()){
            // 判斷是否鏈接成功
            SocketChannel sc = (SocketChannel) key.channel();
            if (key.isConnectable()){
                if (sc.finishConnect()) {
                    sc.register(selector, SelectionKey.OP_READ);
                    doWrite(sc);
                }else {
                    // 鏈接失敗
                    System.exit(1);
                }

            }
            if (key.isReadable()){
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if (readBytes >0){
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes,"UTF-8");
                    System.out.println("NOW IS: "+body);
                    this.stop = true;
                }else if (readBytes < 0){
                    // 對端鏈路關閉
                    key.cancel();
                    sc.close();
                }else {
                    ; //讀到0字節,忽略
                }
            }
        }
    }

    private void doConnect() throws Exception{
        // 若是直接鏈接成功,則註冊到多路複用器上,發送請求信息,讀應答
        if (socketChannel.connect(new InetSocketAddress(host,port))){
            socketChannel.register(selector, SelectionKey.OP_READ);
                doWrite(socketChannel);
        }else {
            socketChannel.register(selector,SelectionKey.OP_CONNECT);
        }
    }

    private void doWrite(SocketChannel sc) throws IOException{
        byte[] req = "QUERY".getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
        writeBuffer.put(req);
        writeBuffer.flip();
        sc.write(writeBuffer);
        if (!writeBuffer.hasRemaining()){
            System.out.println("send order 2 server secceed");
        }
    }
}



package com.nio.client;

/**
 * @author tangj
 * @date 2018/6/14 22:56
 */
public class TimeClient {
    public static void main(String args[]){
        new Thread(new TimeClientHandle("127.0.0.1",9816)).start();
    }
}

 

參考:

併發編程網

《Netty權威指南》

 

 

代碼地址:

github

相關文章
相關標籤/搜索