Java NIO

簡介

引入NIO的緣由

  1. 由於BIO都是阻塞的IO,爲了使Java能支持非阻塞I/O,JDK引入了NIO,能夠將NIO理解成是Non-block I/O.(也有書說是new IO)
  2. BIO編程中,每當有一個新的客戶端請求過來時,服務器端必須建立一個新的線程處理新接入的客戶端鏈路,一個線程只能處理一個客戶端鏈接,在併發量大的鏈接場景下,使用BIO的性能會很是低。

基本概念

BIO是基於字節流和字符流進行操做的,而NIO是基於通道Channel和緩衝區Buffer進行操做的,數據從通道讀取到緩衝區中,或者從緩衝區寫入到通道中。java

NIO的類庫位於java.lang.nio下,其中有以下一些基本的概念:編程

  • 通道Channel:通道的做用與BIO中的流相似,主要不一樣的地方在於:緩存

    • Channel是雙向的,支持同時讀寫操做,而Stream只能是單向的。
    • 通道能夠異步地讀寫。
    • 通道中的數據老是要先讀到一個Buffer,或者從一個Buffer中寫入。 Channel又能夠分爲兩大類——用於網絡讀寫的SelectableChannel和用於文件操做的FileChannel。
  • 緩衝區Buffer:在BIO中,能夠直接將數據寫入或者直接讀取到流中,也能夠經過裝飾類添加緩衝的功能;而在NIO中,全部的數據都是用緩衝區處理的,任什麼時候候使用NIO讀取或者寫入數據都是經過緩衝區進行的。緩衝區本質上是一塊能夠讀寫數據的內存,這塊內存被包裝成NIO Buffer對象,並提供了一組方法來訪問該塊內存。服務器

  • 分散/彙集scatter/gather:分散和彙集是用來描述從通道中讀取或者寫入通道的操做。分散從通道中讀取是指讀操做時將讀取的數據寫入多個緩衝區中;彙集寫入通道是指寫入操做時將多個緩衝區的數據寫入到同一個通道中。分散/彙集一般用於須要將傳輸數據分開處理的場合,如傳輸一個消息能夠將消息頭和消息體分散到不一樣的buffer中。網絡

  • 選擇器Selector:selector模型是NIO編程的基礎,多路複用器Selector經過不斷地輪詢已經註冊過的通道,檢測出就緒的通道集合,從而能夠實現一個線程管理多個通道,管理多個網絡鏈接。併發

經常使用Channel實現與基本示例

java.nio包中經常使用的Channel實現類有:dom

  • FileChannel:從文件中讀寫數據
  • DatagramChannel:經過UDP讀寫網絡中的數據
  • SocketChannel:經過TCP讀寫網絡中的數據
  • ServerSocketChannel:監聽新進來的TCP鏈接,對每個新進來的鏈接都會建立一個SocketChannel

FileChannel示例

FileChannel沒法設置爲非阻塞模式,只能運行在阻塞模式下。使用FileChannel的幾個基本步驟包括:打開FileChannel、從FileChannel讀寫數據、關閉FileChannel。異步

  • 寫入數據到文件
private static final int BUF_SIZE = 1024;

public static void main(String[] args) {
	// 打開FileChannel須要經過FileIntputStream或FileOutputStream或RandomAccessFile
	try (FileOutputStream out = new FileOutputStream(new File("d:\\test.txt"));
		FileChannel channel = out.getChannel();) {
		ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE);
		buffer.put("NIO學習:NIO FileChannel Demo".getBytes());// 先往buffer中寫入數據
		buffer.flip();// 調轉buffer中讀寫指針position的位置
		printBuffer(buffer); //打印buffer中內容
		channel.write(buffer); // 將buffer中數據寫入channel
	} catch (FileNotFoundException e) {
		e.printStackTrace();
	} catch (IOException e) {
		e.printStackTrace();
	}
}

private static void printBuffer(ByteBuffer buffer) {
	try {
		Charset charset = Charset.forName("UTF-8");
		CharsetDecoder decoder = charset.newDecoder();
		CharBuffer cBuf = decoder.decode(buffer);
		buffer.flip();
		System.out.println(cBuf.toString());
	} catch (CharacterCodingException e) {
		e.printStackTrace();
	}
}
  • 從文件中讀數據
private static final int BUF_SIZE = 1024;
	
public static void main(String[] args) {
	//channel關聯文件——>經過channel讀取數據到buffer
	try (FileInputStream in = new FileInputStream(new File("d:\\test.txt"));
			FileChannel channel = in.getChannel();) {
			ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE);
			channel.read(buffer);
			buffer.flip();
			printBuffer(buffer);
		} catch (FileNotFoundException e) {
		e.printStackTrace();
		} catch (IOException e) {
		e.printStackTrace();
	}
}

private static void printBuffer(ByteBuffer buffer) {
	Charset charset = Charset.forName("UTF-8");
	CharsetDecoder decoder = charset.newDecoder();
	try {
		CharBuffer buf = decoder.decode(buffer);
		buffer.flip();
		System.out.println(buf.toString());
	} catch (CharacterCodingException e) {
		e.printStackTrace();
	}
}
  • close():使用完FileChannel須要關閉channel,爲簡潔代碼可使用try-with-resources語法。socket

  • position():position()方法能夠獲取FileChannel當前的位置,而position(long pos)能夠設置FileChannel當前位置,可是若是將位置設置在文件結束符以後,調用position()將返回-1;調用position(pos)寫入數據,則會把文件撐大到當前位置並寫入數據,這樣會致使磁盤上物理文件中寫入的數據間有空隙。性能

long pos = channle.position();
channel.position(100L);
  • size():channel.size()方法返回的是channel關聯文件的大小。

  • truncate(long size):truncate()方法用來截取文件,此時文件中指定長度後面的部分將會被刪除,如:

channel.truncate(100);// 將會保留前100個字節
  • force(boolean flag):通常狀況下出於性能考慮,操做系統會將數據緩存在內存中,因此沒法保證寫入到FileChannel中的數據必定會當即寫到磁盤上,此時,若是調用force()方法則能強制將channel中的數據當即寫入磁盤。

DatagramChannel示例

DatagramChannel是用來收發UDP包的通道,由於UDP是無鏈接的網絡協議,因此DatagramChannel收發的是UDP數據包。

public static void main(String[] args) throws InterruptedException {
	new Thread(() -> {
		// 打開DatagramChannel,監聽UDP9999端口
		try (DatagramChannel channel = DatagramChannel.open()) {
			channel.socket().bind(new InetSocketAddress(9999));
			ByteBuffer buffer = ByteBuffer.allocate(100);
			// 經過channel的recevice方法接收UDP數據包
			channel.receive(buffer);
			buffer.flip();
			printBuffer(buffer);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}).start();

	Thread.sleep(1000); // 服務端先啓動

	new Thread(() -> {
		try (DatagramChannel channel = DatagramChannel.open();) {
			ByteBuffer buf = ByteBuffer.allocate(100);
			buf.clear();
			buf.put("DatagramChannel Demo".getBytes());
			// 發送數據前注意要把buffer的position置爲0
			buf.flip(); 
			// 調用send方法發送到指定IP地址的指定端口
			channel.send(buf, new InetSocketAddress("localhost", 9999));
		} catch (IOException e) {
			e.printStackTrace();
		}
	}).start();
}

因爲UDP是無鏈接的,當指定鏈接的IP地址或域名時並不會建立一個真正的鏈接,而是鎖住了DatagramChannel,只能從鎖定的地址收發數據,而且數據傳送沒有可靠性保證。

SocketChannel&ServerSocketChannel示例

public static void main(String[] args) throws InterruptedException {
	Thread server = new Thread(() -> {
		try (ServerSocketChannel channel = ServerSocketChannel.open()) {
			channel.socket().bind(new InetSocketAddress(44593));
			SocketChannel socket = channel.accept();
			ByteBuffer buffer = ByteBuffer.allocate(1024);
			socket.read(buffer);
			buffer.flip();
			printBuffer(buffer);
		} catch (IOException e) {
			e.printStackTrace();
		}
	});

	Thread client = new Thread(() -> {
		try (SocketChannel channel = SocketChannel.open()) {
			channel.socket().connect(new InetSocketAddress("127.0.0.1", 44593));
			ByteBuffer buffer = ByteBuffer.allocate(1034);
			buffer.put("socket channle demo".getBytes());
			buffer.flip();
			channel.write(buffer);
		} catch (IOException e) {
			e.printStackTrace();
		}
	});
	//啓動順序不影響結果
	server.start();
	client.start();
}

Buffer

Buffer做爲數據的讀寫緩衝區,具有讀和寫兩種模式。

public abstract class Buffer {
  // Invariants: mark <= position <= limit <= capacity
    private int mark = -1;
    private int position = 0;
    private int limit;
    private int capacity;
    ......
    public final Buffer flip() {
        limit = position;
        position = 0;
        mark = -1;
        return this;
    }
    public final Buffer clear() {
        position = 0;
        limit = capacity;
        mark = -1;
        return this;
    }
    ......
}

capacity、position、limit、mark、flip()、clear()

  • capacity:buffer的容量,在申請buffer時指定大小,是固定不變的。

  • limit:buffer可使用的上限——寫模式下表示最多能往buffer中寫入數據的邊界,初始化時limit等於capacity;調用flip()切換爲讀模式後limit會等於當前的position;表示能讀到的數據邊界。當一次讀寫操做完成後,limit的值可能不會等於capacity,存在內存泄露的狀況(這個不知道算不算設計不夠友好),避免這種狀況要在每一次讀寫操做完成後執行clear()方法清空buffer。

  • position:position能夠當作是一個讀寫指針,指示當前讀或寫的位置,隨put/get方法自動更新,當buffer中的數據準備好了,須要從寫模式切換爲讀模式時,須要調用buffer.flip()方法,能夠看到flip()方法會將當前寫的最後一個位置賦值給limit,而後將position切換爲0,即變成從0位置開始讀,能夠讀到limit位置,反之從讀模式切換爲寫模式也是如此。

  • mark: mark用來標記某個時刻的一個position,經過調用buffer.mark()方法能夠記錄當前的position,以後能經過buffer.reset()恢復到這個position。mark默認值是-1,而且其值必須小於position,若是調用buffer.position(index)時傳入的index比mark小,則會將mark設置爲-1使暫存的位置失效。

這4個屬性的大小關係是$mark <= position <= limit <= capacity $

  • flip():flip方法用來切換讀寫模式:當buffer處於寫模式時,每往buffer中寫入一個數據position加1,調用flip()切換爲讀取模式時,會將當前的position賦值給limit,再把position賦值爲0,這樣就能夠從索引爲0的位置讀取到limit處。當buffer處於讀模式時,每往buffer中讀出一個數據position-1,直到position等於limit時數據消費完。
  • clear():每完成一次讀寫後必須調用clear()方法才能再次使用buffer,不然可能形成內存泄露,由於當讀取完數據調用flip()方法時limit不必定等於capacity,會使buffer的可用內存小於申請的內存大小。clear()方法會將position重置爲0,將limit重置爲capacity,mark重置爲-1。
public static void main(String[] args) {
	ByteBuffer buffer = ByteBuffer.allocate(10);
	buffer.put("abcde".getBytes());
	buffer.flip();
	while (buffer.hasRemaining()) {
		System.out.print(buffer.get() + " ");
	}
	buffer.flip();
	try {
		buffer.put("abcdef".getBytes()); //沒有clear()將拋出BufferOverflowException
	} catch (Exception e) {
		e.printStackTrace();
	}
}

Selector

register

一個selector能夠註冊多個channel,而且seletor要求channel必須工做在非阻塞模式下,所以FileChannel不能結合selector使用,同時註冊的channel須要調用 channel.configureBlocking(flase); 設置爲非阻塞通道。

channel聲明瞭一個抽象方法用來註冊channel到selector:

public abstract SelectionKey register(Selector sel, int ops, Object att)
        throws ClosedChannelException;
  • SelectionKey:SelectionKey包含了許多有用的屬性,如interest集合、ready集合、channel對象、selector對象等;經過SelectionKey返回值,能夠進行各類操做。

  • ops:ops是一個int類型數,其實質表示的是事件類型的集合,即channel註冊selector時告訴selector其對哪些事件感興趣。SelectionKey中只定義了四種事件類型,分別用四個常量表示:

    • SelectionKey.OP_CONNECT:接受請求操做
    • SelectionKey.OP_ACCEPT :鏈接操做
    • SelectionKey.OP_READ:讀取操做
    • SelectionKey.OP_WRITE:寫操做

而且channel也不是四種事件都能註冊,不一樣的channel只能註冊validOps()方法中有定義的事件。

  • SocketChannel
public final int validOps() {
    return (SelectionKey.OP_READ
            | SelectionKey.OP_WRITE
            | SelectionKey.OP_CONNECT);
}
  • DataGramchannel
public final int validOps() {
    return (SelectionKey.OP_READ
            | SelectionKey.OP_WRITE);
}
  • ServerSocketChannel
public final int validOps() {
    return SelectionKey.OP_ACCEPT;
}
  • att:att是一個附加的對象,能夠不指定,也可讓咱們更靈活的將更多的信息附加到SelectionKey上,好比attach一個buffer、attach一個惟一標識等等。

select

註冊了selector的channel便能將本來由本身調用accept的工做交由selector來代替。selector經過select()方法根據channel註冊時所關聯的感興趣的事件返回準備就緒的channel。此時,本來阻塞在channel.accept()上的操做變成了阻塞在selector.select()上。

當select()返回值大於0時,說明有channel準備就緒了,進一步處理能夠按如下步驟進行:

  1. 調用selectKeys()方法得到就緒通道的鍵集 Set keys = selector.selectedKeys();
  2. 遍歷鍵集並檢測每一個鍵對應的channel所屬的就緒事件;
  3. 使用SelectionKey.channel()方法得到具體的channel類型對數據進行處理;
  4. 處理完畢以後將已處理的鍵值從鍵集中移除。

selectNow() & wakeUp()

selectNow()和select()不一樣之處在於前者不會阻塞當前線程,而是直接返回。

wakeUp()是用來喚醒被select()阻塞的線程的,有的時候select()阻塞的線程,咱們不想其一直被阻塞,而是一段時間內若是沒有通道就緒就繼續執行,那麼這個時候能夠在另一個線程裏調用selector.wakeUp(),可是這裏有個「坑」就是若是當前的selector沒有被阻塞在select上,那麼下一次調用該selector對象的select方法會被當即喚醒。

簡單示例

public class Server {

	public static void main(String[] args) {
		try {
			Selector selector = Selector.open();
			ServerSocketChannel channel = ServerSocketChannel.open();
			channel.configureBlocking(false);
			channel.socket().bind(new InetSocketAddress(44593));
			channel.register(selector, channel.validOps());
			while (true) {
				while (selector.select() == 0) {
					continue;
				}
				Set<SelectionKey> keys = selector.selectedKeys();
				Iterator<SelectionKey> iterator = keys.iterator();
				while (iterator.hasNext()) {
					SelectionKey key = iterator.next();
					if (key.isAcceptable()) {
						ServerSocketChannel server = (ServerSocketChannel) key.channel();
						SocketChannel client = server.accept();
						client.configureBlocking(false);
						client.register(selector, SelectionKey.OP_READ);
					}
					if(key.isReadable()) {
						SocketChannel client = (SocketChannel) key.channel();
						ByteBuffer buffer = ByteBuffer.allocate(100);
						client.read(buffer);
						buffer.flip();
						BufferUtil.printBuffer(buffer);
						client = (SocketChannel) key.channel();
						client.register(selector, SelectionKey.OP_READ);
					}
					keys.clear();
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}
public class Client {

	public static void main(String[] args) {
		try (Scanner sc = new Scanner(System.in); 
				Socket socket = new Socket("127.0.0.1", 44593);) {
			String input = sc.nextLine();
			while (input != null && !"".equals(input.trim())) {
				socket.getOutputStream().write(input.getBytes());
				input = sc.nextLine();
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

IO模型圖解

阻塞IO模型

非阻塞IO模型

多路複用IO模型

異步IO模型

四種模型比較

相關文章
相關標籤/搜索