NIO原理及實例

知識點:java

  1. 阻塞的概念,同步異步的區別
  2. Bio 及多路複用
  3. NIO概要
  4. NIO之Buffer(緩衝區)
  5. NIO之Channel(通路)
  6. NIO之Selector(選擇器)
  7. NIO之Reactor(反應堆)
  8. 基於NIO的聊天室實例

學習NIO咱們先了解前置概念:
1)阻塞和非阻塞
阻塞和非阻塞是進程在訪問數據的時候,數據是否準備就緒的一種處理方式,當數據沒有準備的時候
阻塞:每每須要等待緩衝區中的數據準備好事後才處理其餘事情,不然就一直等待。 非阻塞:當咱們的進程範文咱們的數據緩衝區的編程

2)同步 異步區別
基於應用程序和操做系統處理IO事件採起的方式來區分:
異步:同一時刻能夠處理多個io讀寫,應用程序等待操做系統通知
同步:同一時間只能處理一條io讀寫,應用程序直接參與io讀寫數組

咱們接着看下圖bash

簡單的說,必須等待數據接受完畢以後才能處理,不然一直阻塞,形象地說就比如一我的去買奶茶,可是奶茶店前排了不少人的隊,你就在隊伍後面排隊等待,期間你啥都作不了,這就是bio。 而後咱們看下nio的多路複用

多路複用要跟bio進行對比才能理解,首先bio(同步阻塞),用戶控件應用執行一個系統調用,會一直阻塞,知道系統調用完成爲止。 就以讀寫爲例,首先咱們發起調用read發起io讀的操做,由用戶空間轉到內核空間,內核等待數據包到達,而後把接受到的數據複製到用戶空間,完成read。在等待讀動做把socket中的數據讀取到buffer後,才能接受數據,期間是一直阻塞的。【以後會有關於netty的博文,那邊會講到netty的零拷貝】

怕你們不理解,另外找了一張圖,io多路複用至關於經過多個io的阻塞複用到了同一個select的阻塞上,從而使單線程狀況下,能夠處理多個客戶端請求。與傳統的多線程/多進程的模型比起來。多路複用最大的優點就是系統開銷小,系統不須要建立額外的進程或者線程,也不須要維護這些進程和線程的運行,下降了系統的維護量,節省系統開銷。 理解多路複用就要理解select函數:此函數容許進程指示內核等待多個事件的任何一個發送,只有一個或者多個事件發生或者經歷一段指定時間才喚醒。至關於咱們把多個socket都註冊到select上,任何一個socket的數據準備好,select就返回,此時用戶進程再調用read,把數據拷貝到用戶進程。這個過程是不斷輪詢的,只要監聽到某個文件句柄被激活(可讀/可寫),select就返回。因此它可以在一個通路中放置多個io,實現了多路複用。

咱們正式開始學習NIO服務器

一 JAVA NIO之概念

Java NIO 是 java 1.4, 以後新出的一套IO接口NIO中的N能夠理解爲Non-blocking,有些人會認爲是new,其實也沒錯。 BIO(Block IO)和Nio(Non-Block IO)的對比網絡

Nio主要用到的是塊,因此nio效率比io高。
JavaAPI中有倆套nio:
1)針對標準輸入輸出nio
2)網絡編程nio
Io以流的形式處理數據,nio以塊的形式處理數據。面向流的io一次處理一個字節,一個輸入流產生了一個字節,一個輸出流就消費一個字節。
面向塊的io,每一個操做都在一步中產生或者消費一個數據塊。
它讀取數據方式和寫數據的方式否必需要經過通道來操做緩衝區實現。
核心組件包括 Channels Buffers Selectorssession

二 Java NIO之Buffer(緩衝區)

1) Buffer介紹: 緩衝區,本質就是一個數組,可是它是特殊的數組,緩衝區對象內置了一些機制,可以追蹤和記錄緩衝區的狀態變化狀況,若是咱們使用get方法從緩衝區中獲取數據或者用put方法吧數據寫入緩衝區,都會引發緩衝區的狀態變化
在緩衝區中,最重要的屬性是以下三個,他們一塊兒合做完成了對緩衝區內容狀態的變化跟蹤
1)position:指定了下一個將要被寫入或者讀取的元素索引,它的值由get()/put() 方法自動更新,在新建立一個Buffer對象時,position被初始化爲0
2)limit:操做緩衝區的可操做空間和可操做範圍,指定還有多少數據須要去除,或者還有多少空間能夠放入數據
3)capacity:指定了能夠存儲在緩衝區中的最大數據容量,實際上,它指定了底層數組的大小,或者至少是指定了准許咱們使用的底層數組的容量。多線程

以上三個屬性值之間有一些相對的大小的關係:0<=position<=limit<=capacity
若是咱們建立了一個新的容量爲10的bytebuffer對象,在初始化的時候。position設置爲0,limit和capacity被設置爲10,在之後使用bytebuffer 對象過程當中,capacity的值不會再發生變化,而其餘倆個值會順着使用而變化 以下圖:app

如今咱們能夠從通道中讀取一些數據到緩衝區,注意從通道讀取數據,至關於往緩衝區中寫入數據。若是讀取四個本身的數據,則此時的position爲4,即下一個將要被寫入的字節索引爲4,而limit依舊是10異步

下一步把讀取的數據寫入到輸出通道,至關於從緩衝區讀取數據,在此以前,必須調用flip()方法,該方法將完成倆件事: 1)把limit設置成position值
2)把position值設置爲0
【flip】 須要將緩衝區數據取出來解析,固定住

取出以後調用clear方法 迴歸到最初的狀態。

package com.Allen.buffer;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class testBufferDemo01 {
	public static void main(String[] args) throws IOException {
		String fileURL="F://a.txt";
		FileInputStream fis=new FileInputStream(fileURL);
		//獲取通路
		FileChannel channel=fis.getChannel();
		//定義緩衝區大小
		ByteBuffer buffer=ByteBuffer.allocate(10);
		output("init", buffer);
		//先讀
		channel.read(buffer);
		output("read", buffer);
		buffer.flip();
		output("flip", buffer);		
		while (buffer.hasRemaining()) {
			byte b=buffer.get();
		}
		output("get", buffer);
		buffer.clear();
		output("clear", buffer);
		fis.close();
	}
	
	public static void output(String string,ByteBuffer buffer){
		System.out.println(string);
		System.out.println(buffer.capacity()+":"+buffer.position()+":"+buffer.limit());
	}
}
複製代碼

結果

三 Java NIO之Channel(通路)

通道是個對象,經過它能夠讀取和寫入數據,全部的數據都是經過buffer對象來處理。咱們永遠不會把字節直接寫入通道,相反是吧數據寫入包含一個或者多個字節的緩衝區。一樣不會直接讀取字節,而是把數據從通道讀入緩衝區,再從緩衝區獲取這個字節,nio中提供了多種通道對象,而全部的通道對象都實現了channel接口。

使用nIo讀取數據】
任什麼時候候讀取數據,都不是直接從通道中讀取,而是從通道讀取到緩衝區,因此使用NIO讀取數據能夠分紅下面三個步驟
1)從FileInputStream獲取Channel
2)建立Buffer
3)將數據從Channel 讀取到Buffer中
下面就是一個nio讀複製文件的實例

package com.allen.test;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class testNio {
	public static void main(String[] args) throws IOException {
		String oldFileUrl="E://1.txt";
		String newFileUrl="E://2.txt";
		FileInputStream fis=new FileInputStream(oldFileUrl);
		FileChannel inChannel=fis.getChannel();
		ByteBuffer bf=ByteBuffer.allocate(1024);
		FileOutputStream fos=new FileOutputStream(newFileUrl);
		FileChannel outChannel=fos.getChannel();
		while(true){
			int eof=inChannel.read(bf);
			if(eof==-1){
				break;
			}else{
				bf.flip();
				outChannel.write(bf);
				bf.clear();
			}
		}
		inChannel.close();
		fis.close();
		outChannel.close();
		fos.close();	
	}
}
複製代碼

四 JAVA NIO之Selector(選擇器)

Selector 通常稱 爲選擇器 ,固然你也能夠翻譯爲 多路複用器 。它是Java NIO核心組件中的一個,用於檢查一個或多個NIO Channel(通道)的狀態是否處於可讀、可寫。如此能夠實現單線程管理多個channels,也就是能夠管理多個網絡連接。 使用Selector的好處在於: 使用更少的線程來就能夠來處理通道了, 相比使用多個線程,避免了線程上下文切換帶來的開銷。

有了selector,能夠用一個線程處理全部的channel。線程之間的切換對操做系統來講,代建是很高的,而且每一個線程也會佔用必定的系統資源,因此對於系統而言,線程越少越好(可是也不是絕對的,若cpu有多個內核,不使用多任務是在浪費CPU能力)

Selector selector=Selector.open();
註冊channel到selector上
Channel.configureBlocking(false)
SelectionKey key=channel.register(selector,SelectionKey.OP_READ)

註冊到server上的channel必須設置成異步模式,不然異步io沒法工做,這就意味着咱們不能夠把一個Filechannel註冊到selector,由於filechannel沒有異步模式,可是socketchannel有異步模式

Register方法的第二個參數,它是一個interst set ,意思是註冊的selector對channel中的那些事務感興趣。事件分紅四種:read write connect accept,通道觸發一個時間指該事件已經Read,全部某個channel成功鏈接到另外一個服務器稱之爲connect ready。一個serversocketchanel準備好接受新的鏈接稱爲connect ready。一個數據可讀的通道能夠說read ready。等待寫數據的通道write ready。
Wirte:SelectionKey.OP_WRITE
Read:SelectionKey.OP_READ
Accept:SelectionKey.OP_ACCEPT
Connect:SelectionKey.OP_CONNECT
如果對多個事件感情求,能夠寫爲(用or)
Int interest=SelectionKey.OP_READ|SelectionKey.OP_ACCEPT
SelectionKey表示通道在selector上這個註冊,經過SelectionKey能夠獲得selector和註冊的channel.selector感興趣的事。 一旦向selector註冊了一個或者多個通道,能夠調用重載的select方法返回你所感興趣的事件已經準備就緒的通道。

五 JAVA NIO 之 Reactor(反應堆)

阻塞/IO通訊模型

java 在上圖客戶端增多的狀況下右邊的線程會出現不可控的狀況。
引入了pool的概念,
因此Nio 是jdk1.4開始使用的,能夠說是想新io,也能夠說是非阻塞io
如下是nio工做原理:
1)由一個專門的線程去處理全部的io事件而且負責分發
2)事件驅動機制,時間到的時候觸發,而不是同步地去監聽事件
3)線程通訊,線程之間經過wait,notify等方式通訊,保證每次上下文切換都是有意義的,減小無畏的線程切換。

六實例

服務器

package com.allen.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

/**
 * 網絡多客戶端聊天室
 * 功能1: 客戶端經過Java NIO鏈接到服務端,支持多客戶端的鏈接
 * 功能2:客戶端初次鏈接時,服務端提示輸入暱稱,若是暱稱已經有人使用,提示從新輸入,若是暱稱惟一,則登陸成功,以後發送消息都須要按照規定格式帶着暱稱發送消息
 * 功能3:客戶端登陸後,發送已經設置好的歡迎信息和在線人數給客戶端,而且通知其餘客戶端該客戶端上線
 * 功能4:服務器收到已登陸客戶端輸入內容,轉發至其餘登陸客戶端。
 * 
 * TODO 客戶端下線檢測
 */
public class NIOServer {

    private int port = 8080;
    private Charset charset = Charset.forName("UTF-8");
    //用來記錄在線人數,以及暱稱
    private static HashSet<String> users = new HashSet<String>();
    
    private static String USER_EXIST = "系統提示:該暱稱已經存在,請換一個暱稱";
    //至關於自定義協議格式,與客戶端協商好
    private static String USER_CONTENT_SPILIT = "#@#";
    
    private Selector selector = null;
    
    
    public NIOServer(int port) throws IOException{
		
		this.port = port;
		//要想富,先修路
		//先把通道打開
		ServerSocketChannel server = ServerSocketChannel.open();
		
		//設置高速公路的關卡
		server.bind(new InetSocketAddress(this.port));
		server.configureBlocking(false);
		
		
		//開門迎客,排隊叫號大廳開始工做
		selector = Selector.open();
		
		//告訴服務叫號大廳的工做人員,你能夠接待了(事件)
		server.register(selector, SelectionKey.OP_ACCEPT);
		
		System.out.println("服務已啓動,監聽端口是:" + this.port);
	}
    
    
    public void listener() throws IOException{
    	
    	//死循環,這裏不會阻塞
    	//CPU工做頻率可控了,是可控的固定值
    	while(true) {
    		
    		//在輪詢,咱們服務大廳中,到底有多少我的正在排隊
            int wait = selector.select();
            if(wait == 0) continue; //若是沒有人排隊,進入下一次輪詢
            
            //取號,默認給他分配個號碼(排隊號碼)
            Set<SelectionKey> keys = selector.selectedKeys();  //能夠經過這個方法,知道可用通道的集合
            Iterator<SelectionKey> iterator = keys.iterator();
            while(iterator.hasNext()) {
				SelectionKey key = (SelectionKey) iterator.next();
				//處理一個,號碼就要被消除,打發他走人(別在服務大廳佔着茅坑不拉屎了)
				//過號不候
				iterator.remove();
				//處理邏輯
				process(key);
            }
        }
		
	}
    
    
    public void process(SelectionKey key) throws IOException {
    	//判斷客戶端肯定已經進入服務大廳而且已經能夠實現交互了
        if(key.isAcceptable()){
        	ServerSocketChannel server = (ServerSocketChannel)key.channel();
            SocketChannel client = server.accept();
            //非阻塞模式
            client.configureBlocking(false);
            //註冊選擇器,並設置爲讀取模式,收到一個鏈接請求,而後起一個SocketChannel,並註冊到selector上,以後這個鏈接的數據,就由這個SocketChannel處理
            client.register(selector, SelectionKey.OP_READ);
            
            //將此對應的channel設置爲準備接受其餘客戶端請求
            key.interestOps(SelectionKey.OP_ACCEPT);
//            System.out.println("有客戶端鏈接,IP地址爲 :" + sc.getRemoteAddress());
            client.write(charset.encode("請輸入你的暱稱"));
        }
        //處理來自客戶端的數據讀取請求
        if(key.isReadable()){
            //返回該SelectionKey對應的 Channel,其中有數據須要讀取
            SocketChannel client = (SocketChannel)key.channel(); 
            
            //往緩衝區讀數據
            ByteBuffer buff = ByteBuffer.allocate(1024);
            StringBuilder content = new StringBuilder();
            try{
                while(client.read(buff) > 0)
                {
                    buff.flip();
                    content.append(charset.decode(buff));
                    
                }
//                System.out.println("從IP地址爲:" + sc.getRemoteAddress() + "的獲取到消息: " + content);
                //將此對應的channel設置爲準備下一次接受數據
                key.interestOps(SelectionKey.OP_READ);
            }catch (IOException io){
            	key.cancel();
                if(key.channel() != null)
                {
                	key.channel().close();
                }
            }
            if(content.length() > 0) {
                String[] arrayContent = content.toString().split(USER_CONTENT_SPILIT);
                //註冊用戶
                if(arrayContent != null && arrayContent.length == 1) {
                    String nickName = arrayContent[0];
                    if(users.contains(nickName)) {
                    	client.write(charset.encode(USER_EXIST));
                    } else {
                        users.add(nickName);
                        int onlineCount = onlineCount();
                        String message = "歡迎 " + nickName + " 進入聊天室! 當前在線人數:" + onlineCount;
                        broadCast(null, message);
                    }
                } 
                //註冊完了,發送消息
                else if(arrayContent != null && arrayContent.length > 1) {
                    String nickName = arrayContent[0];
                    String message = content.substring(nickName.length() + USER_CONTENT_SPILIT.length());
                    message = nickName + "說 : " + message;
                    if(users.contains(nickName)) {
                        //不回發給發送此內容的客戶端
                    	broadCast(client, message);
                    }
                }
            }
            
        }
    }
    
    //TODO 要是能檢測下線,就不用這麼統計了
    public int onlineCount() {
        int res = 0;
        for(SelectionKey key : selector.keys()){
            Channel target = key.channel();
            
            if(target instanceof SocketChannel){
                res++;
            }
        }
        return res;
    }
    
    
    public void broadCast(SocketChannel client, String content) throws IOException {
        //廣播數據到全部的SocketChannel中
        for(SelectionKey key : selector.keys()) {
            Channel targetchannel = key.channel();
            //若是client不爲空,不回發給發送此內容的客戶端
            if(targetchannel instanceof SocketChannel && targetchannel != client) {
                SocketChannel target = (SocketChannel)targetchannel;
                target.write(charset.encode(content));
            }
        }
    }
    
    
    public static void main(String[] args) throws IOException {
        new NIOServer(8080).listener();
    }
}
複製代碼

客戶端

package com.allen.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;

public class NIOClient {

	private final InetSocketAddress serverAdrress = new InetSocketAddress("localhost", 8080);
    private Selector selector = null;
    private SocketChannel client = null;
    
    private String nickName = "";
    private Charset charset = Charset.forName("UTF-8");
    private static String USER_EXIST = "系統提示:該暱稱已經存在,請換一個暱稱";
    private static String USER_CONTENT_SPILIT = "#@#";
    
    
    public NIOClient() throws IOException{
    	
    	//無論三七二十一,先把路修好,把關卡開放
        //鏈接遠程主機的IP和端口
        client = SocketChannel.open(serverAdrress);
        client.configureBlocking(false);
        
        //開門接客
        selector = Selector.open();
        client.register(selector, SelectionKey.OP_READ);
    }
    
    public void session(){
    	//開闢一個新線程從服務器端讀數據
        new Reader().start();
        //開闢一個新線程往服務器端寫數據
        new Writer().start();
	}
    
    private class Writer extends Thread{

		@Override
		public void run() {
			try{
				//在主線程中 從鍵盤讀取數據輸入到服務器端
		        Scanner scan = new Scanner(System.in);
		        while(scan.hasNextLine()){
		            String line = scan.nextLine();
		            if("".equals(line)) continue; //不容許發空消息
		            if("".equals(nickName)) {
		            	nickName = line;
		                line = nickName + USER_CONTENT_SPILIT;
		            } else {
		                line = nickName + USER_CONTENT_SPILIT + line;
		            }
//		            client.register(selector, SelectionKey.OP_WRITE);
		            client.write(charset.encode(line));//client既能寫也能讀,這邊是寫
		        }
		        scan.close();
			}catch(Exception e){
				
			}
		}
    	
    }
    
    
    private class Reader extends Thread {
        public void run() {
            try {
            	
            	//輪詢
                while(true) {
                    int readyChannels = selector.select();
                    if(readyChannels == 0) continue;
                    Set<SelectionKey> selectedKeys = selector.selectedKeys();  //能夠經過這個方法,知道可用通道的集合
                    Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
                    while(keyIterator.hasNext()) {
                         SelectionKey key = (SelectionKey) keyIterator.next();
                         keyIterator.remove();
                         process(key);
                    }
                }
            }
            catch (IOException io){
            	
            }
        }

        private void process(SelectionKey key) throws IOException {
            if(key.isReadable()){
                //使用 NIO 讀取 Channel中的數據,這個和全局變量client是同樣的,由於只註冊了一個SocketChannel
                //client既能寫也能讀,這邊是讀
                SocketChannel sc = (SocketChannel)key.channel();
                
                ByteBuffer buff = ByteBuffer.allocate(1024);
                String content = "";
                while(sc.read(buff) > 0)
                {
                    buff.flip();
                    content += charset.decode(buff);
                }
                //若系統發送通知名字已經存在,則須要換個暱稱
                if(USER_EXIST.equals(content)) {
                	nickName = "";
                }
                System.out.println(content);
                key.interestOps(SelectionKey.OP_READ);
            }
        }
    }
    
    
    
    public static void main(String[] args) throws IOException
    {
        new NIOClient().session();
    }
}
複製代碼
相關文章
相關標籤/搜索