AIO是在NIO基礎上實現的異步非阻塞通訊java
Windows下提供了IOCP技術,I/O Completion Port,稱爲I/O完成端口。IOCP是一個消息隊列。當監聽到客戶請求的時候就把請求加到消息隊列中。而後已有的線程去逐一處理,處理完成後須要獲得反饋的工做線程就會收到通知,而後前去處理。當沒有請求加入到消息隊列的時候,相應的線程也就處理掛起的狀態進行等待。異步
因此Windows下算是有實際意義上的異步非阻塞async
同步異步是消息通訊的機制
阻塞非阻塞是事件處理ide
阻塞:死等着被調用方回信,中間什麼不幹
非阻塞:沒收到被調用方回信時,中間乾點別的
同步:一下子一趟,沒有結果就反覆問
異步:問完等對方通知反饋測試
同步阻塞:到服務檯反覆問訊,死等服務生反饋this
同步非阻塞:到服務檯反覆問詢,在等服務生反饋期間玩手機.net
異步阻塞:到服務檯問訊,死等服務生反饋,服務生確認後通知我線程
異步非阻塞:到服務檯問詢,問完就玩手機去了,服務生確認後通知我unix
Linux用epoll進行相關實現netty
AIO服務端實現
package netty.aio;
/**
*/
public class AIOSocketServerMain {
public static void main(String[] args) {
int port = 9999; AIOSocketServer selector = new AIOSocketServer(port); new Thread(selector).start();
}
}
package netty.aio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.CountDownLatch;
/**
*/
public class AIOSocketServer implements Runnable{
private int port;
//用於限制一個線程等待其餘線程各自執行完畢後再執行。
CountDownLatch countDownLatch;
AsynchronousServerSocketChannel asynchronousServerSocketChannel;
//傳入端口參數
public AIOSocketServer(int port) {
this.port = port; try { asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open(); asynchronousServerSocketChannel.bind(new InetSocketAddress(port)); } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); }
}
@Override
public void run() {
//這裏使用countdownlatch是爲了把線程阻止住 countDownLatch = new CountDownLatch(1); accept(); try { //多個線程在開始執行任務前首先 coundownlatch.await(), //當主線程調用 countDown() 時,計數器變爲0,多個線程同時被喚醒。 countDownLatch.await(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }
}
private void accept() {
asynchronousServerSocketChannel.accept(this,new AIOHandler());
}
}
package netty.aio;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
public class AIOSocketHandler implements CompletionHandler<Integer,ByteBuffer>{
private AsynchronousSocketChannel asynchronousSocketChannel; public AIOSocketHandler(AsynchronousSocketChannel asynchronousSocketChannel) { if(this.asynchronousSocketChannel==null) this.asynchronousSocketChannel = asynchronousSocketChannel; } @Override public void completed(Integer result, ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); try { String info = new String(bytes,"UTF-8"); System.out.println("收到信息:"+info); sendMessage(new Long(System.currentTimeMillis()).toString()); } catch (UnsupportedEncodingException e) { // TODO Auto-generated catch block e.printStackTrace(); } } private void sendMessage(String message) { ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.put(message.getBytes()); buffer.flip(); asynchronousSocketChannel.write(buffer,buffer,new CompletionHandler<Integer,ByteBuffer>(){ @Override public void completed(Integer result, ByteBuffer buffer) { if(buffer.hasRemaining()) { asynchronousSocketChannel.write(buffer,buffer,this); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { asynchronousSocketChannel.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { asynchronousSocketChannel.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
}
package netty.aio;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
public class AIOHandler implements CompletionHandler<AsynchronousSocketChannel,AIOSocketServer>{
@Override public void completed(AsynchronousSocketChannel channel, AIOSocketServer handler) { // TODO Auto-generated method stub handler.asynchronousServerSocketChannel.accept(handler,this); ByteBuffer buffer = ByteBuffer.allocate(1024); channel.read(buffer, buffer, new AIOSocketHandler(channel)); } @Override public void failed(Throwable exc, AIOSocketServer handler) { // TODO Auto-generated method stub handler.countDownLatch.countDown(); }
}
客戶端測試代碼
package netty.aio;
public class ClientThreadMain {
public static void main(String[] args) { String ip = "127.0.0.1"; int port = 9999; new Thread(new AIOSocketClient(ip,port)).start(); }
}
package netty.aio;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;
import io.netty.channel.unix.Buffer;
/**
*/
public class AIOSocketClient implements CompletionHandler<Void,AIOSocketClient>, Runnable{
private AsynchronousSocketChannel asynchronousSocketChannel;
private String ip;
private int port;
private CountDownLatch countDownLatch;
public AIOSocketClient(String ip,int port) {
this.ip = ip; this.port = port; //創建通道 try { asynchronousSocketChannel = AsynchronousSocketChannel.open(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); }
}br/>@Override
public void run() {
// TODO Auto-generated method stub
}
@Override
public void completed(Void result, AIOSocketClient attachment) {
byte[] bytes = "Hello JAVA AIO WORLD".getBytes(); ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length); byteBuffer.put(bytes); byteBuffer.flip(); asynchronousSocketChannel.write(byteBuffer, byteBuffer, new CompletionHandler<Integer,ByteBuffer>(){ @Override public void completed(Integer result, ByteBuffer buffer) { if(buffer.hasRemaining()) { asynchronousSocketChannel.write(buffer,buffer,this); }else { ByteBuffer reader = ByteBuffer.allocate(1024); asynchronousSocketChannel.read(reader, reader, new CompletionHandler<Integer,ByteBuffer>(){ @Override public void completed(Integer result, ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String info = ""; try { info = new String(bytes,"UTF-8"); System.out.println("讀入信息:"+info); countDownLatch.countDown(); } catch (UnsupportedEncodingException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override public void failed(Throwable exc, ByteBuffer buffer) { // TODO Auto-generated method stub try { asynchronousSocketChannel.close(); countDownLatch.countDown(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { asynchronousSocketChannel.close(); countDownLatch.countDown(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } });
}
@Override
public void failed(Throwable exc, AIOSocketClient attachment) {
// TODO Auto-generated method stub
try {
asynchronousSocketChannel.close();
countDownLatch.countDown(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); }
}
}
這裏用到了不少回調的地方,會稍後補一個回調的小例子進行說明
BIO,簡單但不堪重負
NIO,華麗但不完美
因此後邊要用Netty