[高併發Java 八] NIO和AIO

IO感受上和多線程並無多大關係,可是NIO改變了線程在應用層面使用的方式,也解決了一些實際的困難。而AIO是異步IO和前面的系列也有點關係。在此,爲了學習和記錄,也寫一篇文章來介紹NIO和AIO。 java

1. 什麼是NIO

NIO是New I/O的簡稱,與舊式的基於流的I/O方法相對,從名字看,它表示新的一套Java I/O標 準。它是在Java 1.4中被歸入到JDK中的,並具備如下特性:  編程

  • NIO是基於塊(Block)的,它以塊爲基本單位處理數據 (硬盤上存儲的單位也是按Block來存儲,這樣性能上比基於流的方式要好一些)
  • 爲全部的原始類型提供(Buffer)緩存支持 
  • 增長通道(Channel)對象,做爲新的原始 I/O 抽象
  • 支持鎖(咱們在平時使用時常常能看到會出現一些.lock的文件,這說明有線程正在使用這把鎖,當線程釋放鎖時,會把這個文件刪除掉,這樣其餘線程才能繼續拿到這把鎖)和內存映射文件的文件訪問接口 
  • 提供了基於Selector的異步網絡I/O 

全部的從通道中的讀寫操做,都要通過Buffer,而通道就是io的抽象,通道的另外一端就是操縱的文件。 設計模式

2. Buffer

Java中Buffer的實現。基本的數據類型都有它對應的Buffer 緩存

Buffer的簡單使用例子: 安全

package test;

import java.io.File;
import java.io.FileInputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class Test {
	public static void main(String[] args) throws Exception {
		FileInputStream fin = new FileInputStream(new File(
				"d:\\temp_buffer.tmp"));
		FileChannel fc = fin.getChannel();
		ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
		fc.read(byteBuffer);
		fc.close();
		byteBuffer.flip();//讀寫轉換
	}
}
總結下使用的步驟是:

1. 獲得Channel 服務器

2. 申請Buffer 網絡

3. 創建Channel和Buffer的讀/寫關係 多線程

4. 關閉 併發

下面的例子是使用NIO來複制文件: app

public static void nioCopyFile(String resource, String destination)
			throws IOException {
		FileInputStream fis = new FileInputStream(resource);
		FileOutputStream fos = new FileOutputStream(destination);
		FileChannel readChannel = fis.getChannel(); // 讀文件通道
		FileChannel writeChannel = fos.getChannel(); // 寫文件通道
		ByteBuffer buffer = ByteBuffer.allocate(1024); // 讀入數據緩存
		while (true) {
			buffer.clear();
			int len = readChannel.read(buffer); // 讀入數據
			if (len == -1) {
				break; // 讀取完畢
			}
			buffer.flip();
			writeChannel.write(buffer); // 寫入文件
		}
		readChannel.close();
		writeChannel.close();
	}
 Buffer中有3個重要的參數:位置(position)、容量(capactiy)和上限(limit) 

這裏要區別下容量和上限,好比一個Buffer有10KB,那麼10KB就是容量,我將5KB的文件讀到Buffer中,那麼上限就是5KB。

下面舉個例子來理解下這3個重要的參數:

public static void main(String[] args) throws Exception {
		ByteBuffer b = ByteBuffer.allocate(15); // 15個字節大小的緩衝區
		System.out.println("limit=" + b.limit() + " capacity=" + b.capacity()
				+ " position=" + b.position());
		for (int i = 0; i < 10; i++) {
			// 存入10個字節數據
			b.put((byte) i);
		}
		System.out.println("limit=" + b.limit() + " capacity=" + b.capacity()
				+ " position=" + b.position());
		b.flip(); // 重置position
		System.out.println("limit=" + b.limit() + " capacity=" + b.capacity()
				+ " position=" + b.position());
		for (int i = 0; i < 5; i++) {
			System.out.print(b.get());
		}
		System.out.println();
		System.out.println("limit=" + b.limit() + " capacity=" + b.capacity()
				+ " position=" + b.position());
		b.flip();
		System.out.println("limit=" + b.limit() + " capacity=" + b.capacity()
				+ " position=" + b.position());

	}
整個過程如圖:

此時position從0到10,capactiy和limit不變。

該操做會重置position,一般,將buffer從寫模式轉換爲讀 模式時須要執行此方法 flip()操做不只重置了當前的position爲0,還將limit設置到當前position的位置 。

limit的意義在於,來肯定哪些數據是有意義的,換句話說,從position到limit之間的數據纔是有意義的數據,由於是上次操做的數據。因此flip操做每每是讀寫轉換的意思。


意義同上。

而Buffer中大多數的方法都是去改變這3個參數來達到某些功能的:

public final Buffer rewind()
將position置零,並清除標誌位(mark) 
public final Buffer clear()
將position置零,同時將limit設置爲capacity的大小,並清除了標誌mark
public final Buffer flip()
先將limit設置到position所在位置,而後將position置零,並清除標誌位mark,一般在讀寫轉換時使用 

文件映射到內存 

public static void main(String[] args) throws Exception {
		RandomAccessFile raf = new RandomAccessFile("C:\\mapfile.txt", "rw");
		FileChannel fc = raf.getChannel();
		// 將文件映射到內存中
		MappedByteBuffer mbb = fc.map(FileChannel.MapMode.READ_WRITE, 0,
				raf.length());
		while (mbb.hasRemaining()) {
			System.out.print((char) mbb.get());
		}
		mbb.put(0, (byte) 98); // 修改文件
		raf.close();
	}
對MappedByteBuffer的修改就至關於修改文件自己,這樣操做的速度是很快的。

3. Channel

多線程網絡服務器的通常結構:

簡單的多線程服務器:

public static void main(String[] args) throws Exception {
		ServerSocket echoServer = null;
		Socket clientSocket = null;
		try {
			echoServer = new ServerSocket(8000);
		} catch (IOException e) {
			System.out.println(e);
		}
		while (true) {
			try {
				clientSocket = echoServer.accept();
				System.out.println(clientSocket.getRemoteSocketAddress()
						+ " connect!");
				tp.execute(new HandleMsg(clientSocket));
			} catch (IOException e) {
				System.out.println(e);
			}
		}
	}
功能就是服務器端讀到什麼數據,就向客戶端回寫什麼數據。

這裏的tp是一個線程池,HandleMsg是處理消息的類。

static class HandleMsg implements Runnable{  
		 省略部分信息                 
		 public void run(){         
			 try {         
				 is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); 
				 os = new PrintWriter(clientSocket.getOutputStream(), true); 
				 // 從InputStream當中讀取客戶端所發送的數據              
				 String inputLine = null;                 
				 long b=System. currentTimeMillis ();                 
				 while ((inputLine = is.readLine()) != null)
				 {           
					 os.println(inputLine);                 
				 }                 
				 long e=System. currentTimeMillis ();                 
				 System. out.println ("spend:"+(e - b)+" ms ");             
			} catch (IOException e) {                 
				e.printStackTrace();             
			}finally
			{  
				關閉資源 
			}     
		} 
	 }
客戶端:
public static void main(String[] args) throws Exception {
		Socket client = null;
		PrintWriter writer = null;
		BufferedReader reader = null;
		try {
			client = new Socket();
			client.connect(new InetSocketAddress("localhost", 8000));
			writer = new PrintWriter(client.getOutputStream(), true);
			writer.println("Hello!");
			writer.flush();
			reader = new BufferedReader(new InputStreamReader(
					client.getInputStream()));
			System.out.println("from server: " + reader.readLine());
		} catch (Exception e) {
		} finally {
			// 省略資源關閉
		}
	}
以上的網絡編程是很基本的,使用這種方式,會有一些問題:

爲每個客戶端使用一個線程,若是客戶端出現延時等異常,線程可能會被佔用很長時間。由於數據的準備和讀取都在這個線程中。此時,若是客戶端數量衆多,可能會消耗大量的系統資源。

解決方案:

使用非阻塞的NIO (讀取數據不等待,數據準備好了再工做)

爲了體現NIO使用的高效。

這裏先模擬一個低效的客戶端來模擬因網絡而延時的狀況:

private static ExecutorService tp= Executors.newCachedThreadPool();  
		private static final int sleep_time=1000*1000*1000;  
		public static class EchoClient implements Runnable{   
			public void run(){          
				try {              
					client = new Socket();              
					client.connect(new InetSocketAddress("localhost", 8000)); 
					writer = new PrintWriter(client.getOutputStream(), true); 
					writer.print("H");              
					LockSupport.parkNanos(sleep_time);       
					writer.print("e");           
					LockSupport.parkNanos(sleep_time);      
					writer.print("l");       
					LockSupport.parkNanos(sleep_time);  
					writer.print("l");       
					LockSupport.parkNanos(sleep_time);  
					writer.print("o");     
					LockSupport.parkNanos(sleep_time);  
					writer.print("!");         
					LockSupport.parkNanos(sleep_time);    
					writer.println();      
					writer.flush(); 
				}catch(Exception e)
				{
				}
			}
		}
服務器端輸出:
spend:6000ms 
spend:6000ms 
spend:6000ms 
spend:6001ms 
spend:6002ms 
spend:6002ms 
spend:6002ms 
spend:6002ms 
spend:6003ms 
spend:6003ms
由於
while ((inputLine = is.readLine()) != null)
是阻塞的,因此時間都花在等待中。

若是用NIO來處理這個問題會怎麼作呢?

NIO有一個很大的特色就是:把數據準備好了再通知我 

而Channel有點相似於流,一個Channel能夠和文件或者網絡Socket對應 。

selector是一個選擇器,它能夠選擇某一個Channel,而後作些事情。

一個線程能夠對應一個selector,而一個selector能夠輪詢多個Channel,而每一個Channel對應了一個Socket。

與上面一個線程對應一個Socket相比,使用NIO後,一個線程能夠輪詢多個Socket。

selector調用select()時,會查看是否有客戶端準備好了數據。當沒有數據被準備好時,select()會阻塞。平時都說NIO是非阻塞的,可是若是沒有數據被準備好仍是會有阻塞現象。

當有數據被準備好時,調用完select()後,會返回一個SelectionKey,SelectionKey表示在某個selector上的某個Channel的數據已經被準備好了。

只有在數據準備好時,這個Channel纔會被選擇。

這樣NIO實現了一個線程來監控多個客戶端。

而剛剛模擬的網絡延遲的客戶端將不會影響NIO下的線程,由於某個Socket網絡延遲時,數據還未被準備好,selector是不會選擇它的,而會選擇其餘準備好的客戶端。

selectNow()與select()的區別在於,selectNow()是不阻塞的,當沒有客戶端準備好數據時,selectNow()不會阻塞,將返回0,有客戶端準備好數據時,selectNow()返回準備好的客戶端的個數。

主要代碼:

package test;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.AbstractSelector;
import java.nio.channels.spi.SelectorProvider;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MultiThreadNIOEchoServer {
	public static Map<Socket, Long> geym_time_stat = new HashMap<Socket, Long>();

	class EchoClient {
		private LinkedList<ByteBuffer> outq;

		EchoClient() {
			outq = new LinkedList<ByteBuffer>();
		}

		public LinkedList<ByteBuffer> getOutputQueue() {
			return outq;
		}

		public void enqueue(ByteBuffer bb) {
			outq.addFirst(bb);
		}
	}

	class HandleMsg implements Runnable {
		SelectionKey sk;
		ByteBuffer bb;

		public HandleMsg(SelectionKey sk, ByteBuffer bb) {
			super();
			this.sk = sk;
			this.bb = bb;
		}

		@Override
		public void run() {
			// TODO Auto-generated method stub
			EchoClient echoClient = (EchoClient) sk.attachment();
			echoClient.enqueue(bb);
			sk.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
			selector.wakeup();
		}

	}

	private Selector selector;
	private ExecutorService tp = Executors.newCachedThreadPool();

	private void startServer() throws Exception {
		selector = SelectorProvider.provider().openSelector();
		ServerSocketChannel ssc = ServerSocketChannel.open();
		ssc.configureBlocking(false);
		InetSocketAddress isa = new InetSocketAddress(8000);
		ssc.socket().bind(isa);
		// 註冊感興趣的事件,此處對accpet事件感興趣
		SelectionKey acceptKey = ssc.register(selector, SelectionKey.OP_ACCEPT);
		for (;;) {
			selector.select();
			Set readyKeys = selector.selectedKeys();
			Iterator i = readyKeys.iterator();
			long e = 0;
			while (i.hasNext()) {
				SelectionKey sk = (SelectionKey) i.next();
				i.remove();
				if (sk.isAcceptable()) {
					doAccept(sk);
				} else if (sk.isValid() && sk.isReadable()) {
					if (!geym_time_stat.containsKey(((SocketChannel) sk
							.channel()).socket())) {
						geym_time_stat.put(
								((SocketChannel) sk.channel()).socket(),
								System.currentTimeMillis());
					}
					doRead(sk);
				} else if (sk.isValid() && sk.isWritable()) {
					doWrite(sk);
					e = System.currentTimeMillis();
					long b = geym_time_stat.remove(((SocketChannel) sk
							.channel()).socket());
					System.out.println("spend:" + (e - b) + "ms");
				}
			}
		}
	}

	private void doWrite(SelectionKey sk) {
		// TODO Auto-generated method stub
		SocketChannel channel = (SocketChannel) sk.channel();
		EchoClient echoClient = (EchoClient) sk.attachment();
		LinkedList<ByteBuffer> outq = echoClient.getOutputQueue();
		ByteBuffer bb = outq.getLast();
		try {
			int len = channel.write(bb);
			if (len == -1) {
				disconnect(sk);
				return;
			}
			if (bb.remaining() == 0) {
				outq.removeLast();
			}
		} catch (Exception e) {
			// TODO: handle exception
			disconnect(sk);
		}
		if (outq.size() == 0) {
			sk.interestOps(SelectionKey.OP_READ);
		}
	}

	private void doRead(SelectionKey sk) {
		// TODO Auto-generated method stub
		SocketChannel channel = (SocketChannel) sk.channel();
		ByteBuffer bb = ByteBuffer.allocate(8192);
		int len;
		try {
			len = channel.read(bb);
			if (len < 0) {
				disconnect(sk);
				return;
			}
		} catch (Exception e) {
			// TODO: handle exception
			disconnect(sk);
			return;
		}
		bb.flip();
		tp.execute(new HandleMsg(sk, bb));
	}

	private void disconnect(SelectionKey sk) {
		// TODO Auto-generated method stub
		//省略略幹關閉操做
	}

	private void doAccept(SelectionKey sk) {
		// TODO Auto-generated method stub
		ServerSocketChannel server = (ServerSocketChannel) sk.channel();
		SocketChannel clientChannel;
		try {
			clientChannel = server.accept();
			clientChannel.configureBlocking(false);
			SelectionKey clientKey = clientChannel.register(selector,
					SelectionKey.OP_READ);
			EchoClient echoClinet = new EchoClient();
			clientKey.attach(echoClinet);
			InetAddress clientAddress = clientChannel.socket().getInetAddress();
			System.out.println("Accepted connection from "
					+ clientAddress.getHostAddress());
		} catch (Exception e) {
			// TODO: handle exception
		}
	}

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		MultiThreadNIOEchoServer echoServer = new MultiThreadNIOEchoServer();
		try {
			echoServer.startServer();
		} catch (Exception e) {
			// TODO: handle exception
		}

	}

}
代碼僅做參考,主要的特色是,對不一樣事件的感興趣來作不一樣的事。

當用以前模擬的那個延遲的客戶端時,此次的時間消耗就在2ms到11ms之間了。性能提高是很明顯的。

總結:

1. NIO會將數據準備好後,再交由應用進行處理,數據的讀取/寫入過程依然在應用線程中完成,只是將等待的時間剝離到單獨的線程中去。

2. 節省數據準備時間(由於Selector能夠複用) 

5. AIO

AIO的特色:

1. 讀完了再通知我 

2. 不會加快IO,只是在讀完後進行通知 

3. 使用回調函數,進行業務處理 

AIO的相關代碼:

AsynchronousServerSocketChannel

server = AsynchronousServerSocketChannel.open().bind( new InetSocketAddress (PORT));
使用server上的accept方法
public abstract <A> void accept(A attachment,                                     CompletionHandler<AsynchronousSocketChannel,? super A> handler);
 CompletionHandler爲回調接口,當有客戶端accept以後,就作handler中的事情。

示例代碼:

server.accept(null,
				new CompletionHandler<AsynchronousSocketChannel, Object>() {
					final ByteBuffer buffer = ByteBuffer.allocate(1024);

					public void completed(AsynchronousSocketChannel result,
							Object attachment) {
						System.out.println(Thread.currentThread().getName());
						Future<Integer> writeResult = null;
						try {
							buffer.clear();
							result.read(buffer).get(100, TimeUnit.SECONDS);
							buffer.flip();
							writeResult = result.write(buffer);
						} catch (InterruptedException | ExecutionException e) {
							e.printStackTrace();
						} catch (TimeoutException e) {
							e.printStackTrace();
						} finally {
							try {
								server.accept(null, this);
								writeResult.get();
								result.close();
							} catch (Exception e) {
								System.out.println(e.toString());
							}
						}
					}

					@Override
					public void failed(Throwable exc, Object attachment) {
						System.out.println("failed: " + exc);
					}
				});

這裏使用了Future來實現即時返回,關於Future請參考上一篇

在理解了NIO的基礎上,看AIO,區別在於AIO是等讀寫過程完成後再去調用回調函數。

NIO是同步非阻塞的

AIO是異步非阻塞的

因爲NIO的讀寫過程依然在應用線程裏完成,因此對於那些讀寫過程時間長的,NIO就不太適合。

而AIO的讀寫過程完成後才被通知,因此AIO可以勝任那些重量級,讀寫過程長的任務。



系列:

[高併發Java 一] 前言

[高併發Java 二] 多線程基礎

[高併發Java 三] Java內存模型和線程安全

[高併發Java 四] 無鎖

[高併發Java 五] JDK併發包1

[高併發Java 六] JDK併發包2

[高併發Java 七] 併發設計模式

[高併發Java 八] NIO和AIO

[高併發Java 九] 鎖的優化和注意事項

[高併發Java 十] JDK8對併發的新支持