IO感受上和多線程並無多大關係,可是NIO改變了線程在應用層面使用的方式,也解決了一些實際的困難。而AIO是異步IO和前面的系列也有點關係。在此,爲了學習和記錄,也寫一篇文章來介紹NIO和AIO。 java
NIO是New I/O的簡稱,與舊式的基於流的I/O方法相對,從名字看,它表示新的一套Java I/O標 準。它是在Java 1.4中被歸入到JDK中的,並具備如下特性: 編程
全部的從通道中的讀寫操做,都要通過Buffer,而通道就是io的抽象,通道的另外一端就是操縱的文件。 設計模式
Java中Buffer的實現。基本的數據類型都有它對應的Buffer 緩存
Buffer的簡單使用例子: 安全
package test; import java.io.File; import java.io.FileInputStream; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; public class Test { public static void main(String[] args) throws Exception { FileInputStream fin = new FileInputStream(new File( "d:\\temp_buffer.tmp")); FileChannel fc = fin.getChannel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); fc.read(byteBuffer); fc.close(); byteBuffer.flip();//讀寫轉換 } }總結下使用的步驟是:
1. 獲得Channel 服務器
2. 申請Buffer 網絡
3. 創建Channel和Buffer的讀/寫關係 多線程
4. 關閉 併發
下面的例子是使用NIO來複制文件: app
public static void nioCopyFile(String resource, String destination) throws IOException { FileInputStream fis = new FileInputStream(resource); FileOutputStream fos = new FileOutputStream(destination); FileChannel readChannel = fis.getChannel(); // 讀文件通道 FileChannel writeChannel = fos.getChannel(); // 寫文件通道 ByteBuffer buffer = ByteBuffer.allocate(1024); // 讀入數據緩存 while (true) { buffer.clear(); int len = readChannel.read(buffer); // 讀入數據 if (len == -1) { break; // 讀取完畢 } buffer.flip(); writeChannel.write(buffer); // 寫入文件 } readChannel.close(); writeChannel.close(); }Buffer中有3個重要的參數:位置(position)、容量(capactiy)和上限(limit)
這裏要區別下容量和上限,好比一個Buffer有10KB,那麼10KB就是容量,我將5KB的文件讀到Buffer中,那麼上限就是5KB。
下面舉個例子來理解下這3個重要的參數:
public static void main(String[] args) throws Exception { ByteBuffer b = ByteBuffer.allocate(15); // 15個字節大小的緩衝區 System.out.println("limit=" + b.limit() + " capacity=" + b.capacity() + " position=" + b.position()); for (int i = 0; i < 10; i++) { // 存入10個字節數據 b.put((byte) i); } System.out.println("limit=" + b.limit() + " capacity=" + b.capacity() + " position=" + b.position()); b.flip(); // 重置position System.out.println("limit=" + b.limit() + " capacity=" + b.capacity() + " position=" + b.position()); for (int i = 0; i < 5; i++) { System.out.print(b.get()); } System.out.println(); System.out.println("limit=" + b.limit() + " capacity=" + b.capacity() + " position=" + b.position()); b.flip(); System.out.println("limit=" + b.limit() + " capacity=" + b.capacity() + " position=" + b.position()); }整個過程如圖:
此時position從0到10,capactiy和limit不變。
該操做會重置position,一般,將buffer從寫模式轉換爲讀 模式時須要執行此方法 flip()操做不只重置了當前的position爲0,還將limit設置到當前position的位置 。
limit的意義在於,來肯定哪些數據是有意義的,換句話說,從position到limit之間的數據纔是有意義的數據,由於是上次操做的數據。因此flip操做每每是讀寫轉換的意思。
意義同上。
而Buffer中大多數的方法都是去改變這3個參數來達到某些功能的:
public final Buffer rewind()將position置零,並清除標誌位(mark)
public final Buffer clear()將position置零,同時將limit設置爲capacity的大小,並清除了標誌mark
public final Buffer flip()先將limit設置到position所在位置,而後將position置零,並清除標誌位mark,一般在讀寫轉換時使用
public static void main(String[] args) throws Exception { RandomAccessFile raf = new RandomAccessFile("C:\\mapfile.txt", "rw"); FileChannel fc = raf.getChannel(); // 將文件映射到內存中 MappedByteBuffer mbb = fc.map(FileChannel.MapMode.READ_WRITE, 0, raf.length()); while (mbb.hasRemaining()) { System.out.print((char) mbb.get()); } mbb.put(0, (byte) 98); // 修改文件 raf.close(); }對MappedByteBuffer的修改就至關於修改文件自己,這樣操做的速度是很快的。
多線程網絡服務器的通常結構:
簡單的多線程服務器:
public static void main(String[] args) throws Exception { ServerSocket echoServer = null; Socket clientSocket = null; try { echoServer = new ServerSocket(8000); } catch (IOException e) { System.out.println(e); } while (true) { try { clientSocket = echoServer.accept(); System.out.println(clientSocket.getRemoteSocketAddress() + " connect!"); tp.execute(new HandleMsg(clientSocket)); } catch (IOException e) { System.out.println(e); } } }功能就是服務器端讀到什麼數據,就向客戶端回寫什麼數據。
這裏的tp是一個線程池,HandleMsg是處理消息的類。
static class HandleMsg implements Runnable{ 省略部分信息 public void run(){ try { is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); os = new PrintWriter(clientSocket.getOutputStream(), true); // 從InputStream當中讀取客戶端所發送的數據 String inputLine = null; long b=System. currentTimeMillis (); while ((inputLine = is.readLine()) != null) { os.println(inputLine); } long e=System. currentTimeMillis (); System. out.println ("spend:"+(e - b)+" ms "); } catch (IOException e) { e.printStackTrace(); }finally { 關閉資源 } } }客戶端:
public static void main(String[] args) throws Exception { Socket client = null; PrintWriter writer = null; BufferedReader reader = null; try { client = new Socket(); client.connect(new InetSocketAddress("localhost", 8000)); writer = new PrintWriter(client.getOutputStream(), true); writer.println("Hello!"); writer.flush(); reader = new BufferedReader(new InputStreamReader( client.getInputStream())); System.out.println("from server: " + reader.readLine()); } catch (Exception e) { } finally { // 省略資源關閉 } }以上的網絡編程是很基本的,使用這種方式,會有一些問題:
爲每個客戶端使用一個線程,若是客戶端出現延時等異常,線程可能會被佔用很長時間。由於數據的準備和讀取都在這個線程中。此時,若是客戶端數量衆多,可能會消耗大量的系統資源。
解決方案:
使用非阻塞的NIO (讀取數據不等待,數據準備好了再工做)
爲了體現NIO使用的高效。
這裏先模擬一個低效的客戶端來模擬因網絡而延時的狀況:
private static ExecutorService tp= Executors.newCachedThreadPool(); private static final int sleep_time=1000*1000*1000; public static class EchoClient implements Runnable{ public void run(){ try { client = new Socket(); client.connect(new InetSocketAddress("localhost", 8000)); writer = new PrintWriter(client.getOutputStream(), true); writer.print("H"); LockSupport.parkNanos(sleep_time); writer.print("e"); LockSupport.parkNanos(sleep_time); writer.print("l"); LockSupport.parkNanos(sleep_time); writer.print("l"); LockSupport.parkNanos(sleep_time); writer.print("o"); LockSupport.parkNanos(sleep_time); writer.print("!"); LockSupport.parkNanos(sleep_time); writer.println(); writer.flush(); }catch(Exception e) { } } }服務器端輸出:
spend:6000ms spend:6000ms spend:6000ms spend:6001ms spend:6002ms spend:6002ms spend:6002ms spend:6002ms spend:6003ms spend:6003ms由於
while ((inputLine = is.readLine()) != null)是阻塞的,因此時間都花在等待中。
若是用NIO來處理這個問題會怎麼作呢?
NIO有一個很大的特色就是:把數據準備好了再通知我
而Channel有點相似於流,一個Channel能夠和文件或者網絡Socket對應 。
selector是一個選擇器,它能夠選擇某一個Channel,而後作些事情。
一個線程能夠對應一個selector,而一個selector能夠輪詢多個Channel,而每一個Channel對應了一個Socket。
與上面一個線程對應一個Socket相比,使用NIO後,一個線程能夠輪詢多個Socket。
當selector調用select()時,會查看是否有客戶端準備好了數據。當沒有數據被準備好時,select()會阻塞。平時都說NIO是非阻塞的,可是若是沒有數據被準備好仍是會有阻塞現象。
當有數據被準備好時,調用完select()後,會返回一個SelectionKey,SelectionKey表示在某個selector上的某個Channel的數據已經被準備好了。
只有在數據準備好時,這個Channel纔會被選擇。
這樣NIO實現了一個線程來監控多個客戶端。
而剛剛模擬的網絡延遲的客戶端將不會影響NIO下的線程,由於某個Socket網絡延遲時,數據還未被準備好,selector是不會選擇它的,而會選擇其餘準備好的客戶端。
selectNow()與select()的區別在於,selectNow()是不阻塞的,當沒有客戶端準備好數據時,selectNow()不會阻塞,將返回0,有客戶端準備好數據時,selectNow()返回準備好的客戶端的個數。
主要代碼:
package test; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.channels.spi.AbstractSelector; import java.nio.channels.spi.SelectorProvider; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class MultiThreadNIOEchoServer { public static Map<Socket, Long> geym_time_stat = new HashMap<Socket, Long>(); class EchoClient { private LinkedList<ByteBuffer> outq; EchoClient() { outq = new LinkedList<ByteBuffer>(); } public LinkedList<ByteBuffer> getOutputQueue() { return outq; } public void enqueue(ByteBuffer bb) { outq.addFirst(bb); } } class HandleMsg implements Runnable { SelectionKey sk; ByteBuffer bb; public HandleMsg(SelectionKey sk, ByteBuffer bb) { super(); this.sk = sk; this.bb = bb; } @Override public void run() { // TODO Auto-generated method stub EchoClient echoClient = (EchoClient) sk.attachment(); echoClient.enqueue(bb); sk.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); selector.wakeup(); } } private Selector selector; private ExecutorService tp = Executors.newCachedThreadPool(); private void startServer() throws Exception { selector = SelectorProvider.provider().openSelector(); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); InetSocketAddress isa = new InetSocketAddress(8000); ssc.socket().bind(isa); // 註冊感興趣的事件,此處對accpet事件感興趣 SelectionKey acceptKey = ssc.register(selector, SelectionKey.OP_ACCEPT); for (;;) { selector.select(); Set readyKeys = selector.selectedKeys(); Iterator i = readyKeys.iterator(); long e = 0; while (i.hasNext()) { SelectionKey sk = (SelectionKey) i.next(); i.remove(); if (sk.isAcceptable()) { doAccept(sk); } else if (sk.isValid() && sk.isReadable()) { if (!geym_time_stat.containsKey(((SocketChannel) sk .channel()).socket())) { geym_time_stat.put( ((SocketChannel) sk.channel()).socket(), System.currentTimeMillis()); } doRead(sk); } else if (sk.isValid() && sk.isWritable()) { doWrite(sk); e = System.currentTimeMillis(); long b = geym_time_stat.remove(((SocketChannel) sk .channel()).socket()); System.out.println("spend:" + (e - b) + "ms"); } } } } private void doWrite(SelectionKey sk) { // TODO Auto-generated method stub SocketChannel channel = (SocketChannel) sk.channel(); EchoClient echoClient = (EchoClient) sk.attachment(); LinkedList<ByteBuffer> outq = echoClient.getOutputQueue(); ByteBuffer bb = outq.getLast(); try { int len = channel.write(bb); if (len == -1) { disconnect(sk); return; } if (bb.remaining() == 0) { outq.removeLast(); } } catch (Exception e) { // TODO: handle exception disconnect(sk); } if (outq.size() == 0) { sk.interestOps(SelectionKey.OP_READ); } } private void doRead(SelectionKey sk) { // TODO Auto-generated method stub SocketChannel channel = (SocketChannel) sk.channel(); ByteBuffer bb = ByteBuffer.allocate(8192); int len; try { len = channel.read(bb); if (len < 0) { disconnect(sk); return; } } catch (Exception e) { // TODO: handle exception disconnect(sk); return; } bb.flip(); tp.execute(new HandleMsg(sk, bb)); } private void disconnect(SelectionKey sk) { // TODO Auto-generated method stub //省略略幹關閉操做 } private void doAccept(SelectionKey sk) { // TODO Auto-generated method stub ServerSocketChannel server = (ServerSocketChannel) sk.channel(); SocketChannel clientChannel; try { clientChannel = server.accept(); clientChannel.configureBlocking(false); SelectionKey clientKey = clientChannel.register(selector, SelectionKey.OP_READ); EchoClient echoClinet = new EchoClient(); clientKey.attach(echoClinet); InetAddress clientAddress = clientChannel.socket().getInetAddress(); System.out.println("Accepted connection from " + clientAddress.getHostAddress()); } catch (Exception e) { // TODO: handle exception } } public static void main(String[] args) { // TODO Auto-generated method stub MultiThreadNIOEchoServer echoServer = new MultiThreadNIOEchoServer(); try { echoServer.startServer(); } catch (Exception e) { // TODO: handle exception } } }代碼僅做參考,主要的特色是,對不一樣事件的感興趣來作不一樣的事。
當用以前模擬的那個延遲的客戶端時,此次的時間消耗就在2ms到11ms之間了。性能提高是很明顯的。
總結:
1. NIO會將數據準備好後,再交由應用進行處理,數據的讀取/寫入過程依然在應用線程中完成,只是將等待的時間剝離到單獨的線程中去。
2. 節省數據準備時間(由於Selector能夠複用)
AIO的特色:
1. 讀完了再通知我
2. 不會加快IO,只是在讀完後進行通知
3. 使用回調函數,進行業務處理
AIO的相關代碼:
AsynchronousServerSocketChannel
server = AsynchronousServerSocketChannel.open().bind( new InetSocketAddress (PORT));使用server上的accept方法
public abstract <A> void accept(A attachment, CompletionHandler<AsynchronousSocketChannel,? super A> handler);CompletionHandler爲回調接口,當有客戶端accept以後,就作handler中的事情。
示例代碼:
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() { final ByteBuffer buffer = ByteBuffer.allocate(1024); public void completed(AsynchronousSocketChannel result, Object attachment) { System.out.println(Thread.currentThread().getName()); Future<Integer> writeResult = null; try { buffer.clear(); result.read(buffer).get(100, TimeUnit.SECONDS); buffer.flip(); writeResult = result.write(buffer); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { try { server.accept(null, this); writeResult.get(); result.close(); } catch (Exception e) { System.out.println(e.toString()); } } } @Override public void failed(Throwable exc, Object attachment) { System.out.println("failed: " + exc); } });
這裏使用了Future來實現即時返回,關於Future請參考上一篇
在理解了NIO的基礎上,看AIO,區別在於AIO是等讀寫過程完成後再去調用回調函數。
NIO是同步非阻塞的
AIO是異步非阻塞的
因爲NIO的讀寫過程依然在應用線程裏完成,因此對於那些讀寫過程時間長的,NIO就不太適合。
而AIO的讀寫過程完成後才被通知,因此AIO可以勝任那些重量級,讀寫過程長的任務。
系列: