上一篇介紹了五種NIO模型,本篇將介紹Java中的NIO類庫,爲學習netty作好鋪墊java
Java NIO 由3個核心組成,分別是Channels,Buffers,Selectors。本文主要介紹着三個部分。git
全部的I/O都從一個Channel開始。通道與流不一樣,通道是雙向的,流是單向的。github
便可以從通道中讀取數據,也能夠寫數據到通道里 。編程
讀的話,是從通道讀取數據到緩衝區,寫的話是從緩衝區寫入數據到通道。緩存
四種通道:服務器
Java NIO中的Buffer用於NIO通道進行交互。網絡
緩衝區本質上一塊能夠寫入數據,也能夠從中讀取數據的內存。也就是堆外內存,也叫直接內存。多線程
當向Buffer寫入數據時,Buffer會記錄下寫了多少數據,一旦要讀取數據,須要經過flip()方法將Buffer從寫模式切換到度模式。併發
在讀模式下,能夠讀取以前寫入到Buffer的全部數據。socket
一旦讀完了全部數據,就須要狀況緩存區,讓它能夠再次被寫入。有兩種方式能清空緩衝區,調用clear()或者compact()方法。
clear()方法會清空整個緩衝區。compact()方法只會清除已經讀過的數據。任何未讀的數據都被移到緩衝區的起始處,新寫入的數據將放到緩衝區未讀數據的後面。
任何未讀的數據將被移到緩衝區的起始處,新寫入的數據將放大緩衝區未讀數據的後面。
Buffer的capacity,position和limit
capacity
capacity做爲一個內存塊,buffer有一個固定的大小值,也叫capacity,只能向內存中寫入byte,long,char等類型。一旦Buffer滿了,須要將其清空。
position
當寫數據到Buffer中是,position表示當前的位置。初始的position值爲0,當一個byte,long等數據寫到buffer後,position會向前移動到下一個可插入數據的單元。positon最大可謂capacity-1.
當讀取數據時,也是從特定位置讀。將Buffer從寫模式切換到讀模式,positon會被重置0,當從Buffer的position處讀取數據時,position向前移動到想一個能夠讀的位置。
limit
在寫模式下,Buffer的limit表示你最多能往Buffer裏寫多少數據。寫模式下,limit等於buffer的capacity
Buffer的分配
要想得到一個Buffer對象首先要進行分配。 每個Buffer類都有一個allocate方法。下面是一個分配48字節capacity的ByteBuffer的例子。
ByteBuffer buf = ByteBuffer.allocate(48);
Selector(選擇器)是Java NIO中可以檢測一到多個NIO通道,並可以檢測到通道是否爲讀寫事件準備好的的組件。因此Selector能夠單個線程處理多個Channel。
Selector可以使用一個線程來處理全部通道。可是對於現在的操做系統和CPU來講,多線程已經較過去效率高了不少。
1.經過調用Selector.open()方法建立一個Selector
2.將Channel註冊到Selector上配合使用,可以使用Channel.register方法來實現,以下
servChannel.configureBlocking(false); servChannel.register(selector, SelectionKey.OP_ACCEPT);
與Selector一塊兒使用時,Channel必須處於非阻塞模式下。這意味着FileChannel與Selector不能一塊兒使用,由於FileChannel不能切換到非阻塞模式。
3.通道觸發意味着該事件已經就緒。Java中有以下常量對應着通道事件。
4.SelectionKey
當向Selector註冊Channel時,register()方法會返回一個SelectionKey對象。這個對象包含interest集合,ready集合,Channel,Selector,附加的對象(可選)。
interest集合是你所選擇的感興趣的事件集合。能夠經過SelectionKey讀寫interest集合。
ready 集合是通道已經準備就緒的操做的集合。在一次選擇(Selection)以後,你會首先訪問這個ready set。
用NIO建立的客戶端與服務端:
服務端:
package com.nio; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Date; import java.util.Iterator; import java.util.Set; public class MutipleexerTimeServer implements Runnable{ private Selector selector; private ServerSocketChannel servChannel; private volatile boolean stop = false; /** * 建立多路複用器,綁定NIO端口 * * @param port */ public MutipleexerTimeServer(int port){ try{ selector = Selector.open(); servChannel = ServerSocketChannel.open(); servChannel.configureBlocking(false); servChannel.register(selector, SelectionKey.OP_ACCEPT); servChannel.socket().bind(new InetSocketAddress(port),1024); System.out.println("the time server start at port: "+port ); }catch (Exception e){ e.printStackTrace(); System.exit(1); } } public void stop(){ this.stop = stop; } @Override public void run() { while (!stop){ try { // selector每隔一秒喚醒一次 selector.select(1000); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectionKeys.iterator(); SelectionKey key = null; while (it.hasNext()){ key = it.next(); it.remove(); try { handlerInput(key); }catch (Exception e){ if (key !=null){ key.cancel(); if (key.channel() !=null){ key.channel().close(); } } } } }catch (Exception e){ e.printStackTrace(); } } // 多路複用器關閉後,全部註冊到上面的channel和pipe等資源都不被自動去註冊並關閉,全部不須要重複釋放資源 if (selector!=null){ try { selector.close(); }catch (Exception e){ e.printStackTrace(); } } } private void handlerInput(SelectionKey key) throws Exception{ if (key.isValid()){ // 處理新接入的請求消息 if (key.isAcceptable()){ // Accept the new Connection ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); // 已完成TCP三次握手 SocketChannel sc = ssc.accept(); sc.configureBlocking(false); // Add the new connection to the selector sc.register(selector,SelectionKey.OP_READ); } if (key.isReadable()){ // Read the data SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if (readBytes>0){ readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = new String(bytes,"UTF-8"); System.out.println("server receive order: "+body); String correntTime = "QUERY".equalsIgnoreCase(body)?new Date(System.currentTimeMillis()).toString():"BAD ORDER"; doWrite(sc,correntTime); }else if (readBytes<0){ // 對端鏈路關閉 key.cancel(); sc.close(); }else { ; } } } } private void doWrite(SocketChannel channel,String response) throws Exception{ if (response!=null && response.trim().length()>0){ byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer); } } } package com.nio; /** * 啓動類 */ public class TimeServer { public static void main(String args[]){ int port = 9816; MutipleexerTimeServer timeServer = new MutipleexerTimeServer(port); new Thread(timeServer,"nit").start(); } }
客戶端:
package com.nio.client; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; /** * @author tangj * @date 2018/6/14 23:13 */ public class TimeClientHandle implements Runnable{ private String host; private int port; private Selector selector; private SocketChannel socketChannel; private volatile boolean stop; public TimeClientHandle(String host,int port){ this.host = host; this.port = port; try{ selector = Selector.open(); socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); }catch (Exception e){ e.printStackTrace(); System.exit(-1); } } @Override public void run() { try{ doConnect(); }catch (Exception e){ e.printStackTrace(); System.exit(1); } while (!stop){ try{ selector.select(1000); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectionKeys.iterator(); SelectionKey key = null; while (it.hasNext()){ key = it.next(); it.remove(); try { handlerInput(key); }catch (Exception e){ if (key!=null){ key.cancel(); if (key.channel() != null){ key.channel().close(); } } } } }catch (Exception e){ e.printStackTrace(); System.exit(1); } } // 多路複用器關閉後,全部註冊到上面的channel和pipe等資源都不被自動去註冊並關閉,全部不須要重複釋放資源 if (selector!=null){ try { selector.close(); }catch (Exception e){ e.printStackTrace(); } } } private void handlerInput(SelectionKey key) throws Exception{ if (key.isValid()){ // 判斷是否鏈接成功 SocketChannel sc = (SocketChannel) key.channel(); if (key.isConnectable()){ if (sc.finishConnect()) { sc.register(selector, SelectionKey.OP_READ); doWrite(sc); }else { // 鏈接失敗 System.exit(1); } } if (key.isReadable()){ ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if (readBytes >0){ readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = new String(bytes,"UTF-8"); System.out.println("NOW IS: "+body); this.stop = true; }else if (readBytes < 0){ // 對端鏈路關閉 key.cancel(); sc.close(); }else { ; //讀到0字節,忽略 } } } } private void doConnect() throws Exception{ // 若是直接鏈接成功,則註冊到多路複用器上,發送請求信息,讀應答 if (socketChannel.connect(new InetSocketAddress(host,port))){ socketChannel.register(selector, SelectionKey.OP_READ); doWrite(socketChannel); }else { socketChannel.register(selector,SelectionKey.OP_CONNECT); } } private void doWrite(SocketChannel sc) throws IOException{ byte[] req = "QUERY".getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); sc.write(writeBuffer); if (!writeBuffer.hasRemaining()){ System.out.println("send order 2 server secceed"); } } } package com.nio.client; /** * @author tangj * @date 2018/6/14 22:56 */ public class TimeClient { public static void main(String args[]){ new Thread(new TimeClientHandle("127.0.0.1",9816)).start(); } }
參考:
併發編程網
《Netty權威指南》
代碼地址: