異步IO在IO操做的兩個階段,都不會使線程阻塞。 Java 的 I/O 依賴於操做系統的實現。html
Java NIO 的通道相似流,但又有些不一樣:java
關鍵的Buffer實現 ByteBuffer,CharBuffer,DoubleBuffer,FloatBuffer,IntBuffer,LongBuffer,ShortBuffer服務器
Buffer兩種模式、三個屬性: 網絡
capacity
做爲一個內存塊,Buffer有一個固定的大小值,也叫「capacity」.你只能往裏寫capacity個byte、long,char等類型。一旦Buffer滿了,須要將其清空(經過讀數據或者清除數據)才能繼續寫數據往裏寫數據。多線程
position
當你寫數據到Buffer中時,position表示當前的位置。初始的position值爲0.當一個byte、long等數據寫到Buffer後, position會向前移動到下一個可插入數據的Buffer單元。position最大可爲capacity – 1. 當讀取數據時,也是從某個特定位置讀。當將Buffer從寫模式切換到讀模式,position會被重置爲0. 當從Buffer的position處讀取數據時,position向前移動到下一個可讀的位置。併發
limit
在寫模式下,Buffer的limit表示你最多能往Buffer裏寫多少數據。 寫模式下,limit等於Buffer的capacity。 當切換Buffer到讀模式時, limit表示你最多能讀到多少數據。所以,當切換Buffer到讀模式時,limit會被設置成寫模式下的position值。換句話說,你能讀到以前寫入的全部數據(limit被設置成已寫數據的數量,這個值在寫模式下就是position)app
參考連接:Buffer原理 www.cnblogs.com/chenpi/p/64…異步
Selector(選擇器)是Java NIO中可以檢測一到多個NIO通道,並可以知曉通道是否爲諸如讀寫事件作好準備的組件。這樣,一個單獨的線程能夠管理多個channel,從而管理多個網絡鏈接。socket
監聽四種事件ide
select()方法
select()阻塞到至少有一個通道在你註冊的事件上就緒了。 select(long timeout)和select()同樣,除了最長會阻塞timeout毫秒(參數)。
selectedKeys()方法
調用selector的selectedKeys()方法,訪問「已選擇鍵集(selected key set)」中的就緒通道。
參考連接:操做系統層面分析Selector原理 zhhphappy.iteye.com/blog/203289…
服務端
public class NIOServerSocket {
//存儲SelectionKey的隊列
private static List<SelectionKey> writeQueue = new ArrayList<SelectionKey>();
private static Selector selector = null;
//添加SelectionKey到隊列
public static void addWriteQueue(SelectionKey key){
synchronized (writeQueue) {
writeQueue.add(key);
//喚醒主線程
selector.wakeup();
}
}
public static void main(String[] args) throws IOException {
// 1.建立ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 2.綁定端口
serverSocketChannel.bind(new InetSocketAddress(60000));
// 3.設置爲非阻塞
serverSocketChannel.configureBlocking(false);
// 4.建立通道選擇器
selector = Selector.open();
/* * 5.註冊事件類型 * * sel:通道選擇器 * ops:事件類型 ==>SelectionKey:包裝類,包含事件類型和通道自己。四個常量類型表示四種事件類型 * SelectionKey.OP_ACCEPT 獲取報文 SelectionKey.OP_CONNECT 鏈接 * SelectionKey.OP_READ 讀 SelectionKey.OP_WRITE 寫 */
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
System.out.println("服務器端:正在監聽60000端口");
// 6.獲取可用I/O通道,得到有多少可用的通道
int num = selector.select();
if (num > 0) { // 判斷是否存在可用的通道
// 得到全部的keys
Set<SelectionKey> selectedKeys = selector.selectedKeys();
// 使用iterator遍歷全部的keys
Iterator<SelectionKey> iterator = selectedKeys.iterator();
// 迭代遍歷當前I/O通道
while (iterator.hasNext()) {
// 得到當前key
SelectionKey key = iterator.next();
// 調用iterator的remove()方法,並非移除當前I/O通道,標識當前I/O通道已經處理。
iterator.remove();
// 判斷事件類型,作對應的處理
if (key.isAcceptable()) {
ServerSocketChannel ssChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = ssChannel.accept();
System.out.println("處理請求:"+ socketChannel.getRemoteAddress());
// 獲取客戶端的數據
// 設置非阻塞狀態
socketChannel.configureBlocking(false);
// 註冊到selector(通道選擇器)
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
System.out.println("讀事件");
//取消讀事件的監控
key.cancel();
//調用讀操做工具類
NIOHandler.read(key);
} else if (key.isWritable()) {
System.out.println("寫事件");
//取消讀事件的監控
key.cancel();
//調用寫操做工具類
NIOHandler.write(key);
}
}
}else{
synchronized (writeQueue) {
while(writeQueue.size() > 0){
SelectionKey key = writeQueue.remove(0);
//註冊寫事件
SocketChannel channel = (SocketChannel) key.channel();
Object attachment = key.attachment();
channel.register(selector, SelectionKey.OP_WRITE,attachment);
}
}
}
}
}
}
複製代碼
消息處理
public class NIOHandler {
//構造線程池
private static ExecutorService executorService = Executors.newFixedThreadPool(10);
public static void read(final SelectionKey key){
//得到線程並執行
executorService.submit(new Runnable() {
@Override
public void run() {
try {
SocketChannel readChannel = (SocketChannel) key.channel();
// I/O讀數據操做
ByteBuffer buffer = ByteBuffer.allocate(1024);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
int len = 0;
while (true) {
buffer.clear();
len = readChannel.read(buffer);
if (len == -1) break;
buffer.flip();
while (buffer.hasRemaining()) {
baos.write(buffer.get());
}
}
System.out.println("服務器端接收到的數據:"+ new String(baos.toByteArray()));
//將數據添加到key中
key.attach(baos);
//將註冊寫操做添加到隊列中
NIOServerSocket.addWriteQueue(key);
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
public static void write(final SelectionKey key) {
//拿到線程並執行
executorService.submit(new Runnable() {
@Override
public void run() {
try {
// 寫操做
SocketChannel writeChannel = (SocketChannel) key.channel();
//拿到客戶端傳遞的數據
ByteArrayOutputStream attachment = (ByteArrayOutputStream)key.attachment();
System.out.println("客戶端發送來的數據:"+new String(attachment.toByteArray()));
ByteBuffer buffer = ByteBuffer.allocate(1024);
String message = "你好,我是服務器!!";
buffer.put(message.getBytes());
buffer.flip();
writeChannel.write(buffer);
writeChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
}
複製代碼
客戶端
public class NIOClientSocket {
public static void main(String[] args) throws IOException {
//使用線程模擬用戶 併發訪問
for (int i = 0; i < 1; i++) {
new Thread(){
public void run() {
try {
//1.建立SocketChannel
SocketChannel socketChannel=SocketChannel.open();
//2.鏈接服務器
socketChannel.connect(new InetSocketAddress("localhost",60000));
//寫數據
String msg="我是客戶端"+Thread.currentThread().getId();
ByteBuffer buffer=ByteBuffer.allocate(1024);
buffer.put(msg.getBytes());
buffer.flip();
socketChannel.write(buffer);
socketChannel.shutdownOutput();
//讀數據
ByteArrayOutputStream bos = new ByteArrayOutputStream();
int len = 0;
while (true) {
buffer.clear();
len = socketChannel.read(buffer);
if (len == -1)
break;
buffer.flip();
while (buffer.hasRemaining()) {
bos.write(buffer.get());
}
}
System.out.println("客戶端收到:"+new String(bos.toByteArray()));
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
};
}.start();
}
}
}
複製代碼
參考連接:SocketChannel.read blog.csdn.net/cao47820824…
參考連接:NIO坑 www.jianshu.com/p/1af407c04…