Netty-JAVA基礎實現,AIO基礎

AIO是在NIO基礎上實現的異步非阻塞通訊java

Windows下提供了IOCP技術,I/O Completion Port,稱爲I/O完成端口。IOCP是一個消息隊列。當監聽到客戶請求的時候就把請求加到消息隊列中。而後已有的線程去逐一處理,處理完成後須要獲得反饋的工做線程就會收到通知,而後前去處理。當沒有請求加入到消息隊列的時候,相應的線程也就處理掛起的狀態進行等待。異步

因此Windows下算是有實際意義上的異步非阻塞async

同步異步是消息通訊的機制
阻塞非阻塞是事件處理ide

阻塞:死等着被調用方回信,中間什麼不幹
非阻塞:沒收到被調用方回信時,中間乾點別的
同步:一下子一趟,沒有結果就反覆問
異步:問完等對方通知反饋測試

同步阻塞:到服務檯反覆問訊,死等服務生反饋this

同步非阻塞:到服務檯反覆問詢,在等服務生反饋期間玩手機.net

異步阻塞:到服務檯問訊,死等服務生反饋,服務生確認後通知我線程

異步非阻塞:到服務檯問詢,問完就玩手機去了,服務生確認後通知我unix

Linux用epoll進行相關實現netty

AIO服務端實現
package netty.aio;

/**

  • AIO模式服務端實現
  • @author zhousjmas@hotmail.com
  • */
    public class AIOSocketServerMain {

    public static void main(String[] args) {

    int port = 9999;
    
    AIOSocketServer selector = new AIOSocketServer(port);
    
    new Thread(selector).start();

    }

}

package netty.aio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.CountDownLatch;

/**

  • NIO的多路複用器的實現
  • @author zhousjmas@hotmail.com
  • */
    public class AIOSocketServer implements Runnable{

    private int port;
    //用於限制一個線程等待其餘線程各自執行完畢後再執行。
    CountDownLatch countDownLatch;
    AsynchronousServerSocketChannel asynchronousServerSocketChannel;

    //傳入端口參數
    public AIOSocketServer(int port) {

    this.port = port;
    try {
        asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();
        asynchronousServerSocketChannel.bind(new InetSocketAddress(port));
    } catch (IOException e1) {
        // TODO Auto-generated catch block
        e1.printStackTrace();
    }

    }

    @Override
    public void run() {

    //這裏使用countdownlatch是爲了把線程阻止住
    countDownLatch = new CountDownLatch(1);
    
    accept();
    
    try {
        //多個線程在開始執行任務前首先 coundownlatch.await(),
        //當主線程調用 countDown() 時,計數器變爲0,多個線程同時被喚醒。
        countDownLatch.await();
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

    }

    private void accept() {

    asynchronousServerSocketChannel.accept(this,new AIOHandler());

    }

}

package netty.aio;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class AIOSocketHandler implements CompletionHandler<Integer,ByteBuffer>{

private AsynchronousSocketChannel asynchronousSocketChannel;

public AIOSocketHandler(AsynchronousSocketChannel asynchronousSocketChannel) {
    if(this.asynchronousSocketChannel==null)
        this.asynchronousSocketChannel = asynchronousSocketChannel;
}
@Override
public void completed(Integer result, ByteBuffer buffer) {

    buffer.flip();
    byte[] bytes = new byte[buffer.remaining()];
    buffer.get(bytes);

    try {
        String info = new String(bytes,"UTF-8");
        System.out.println("收到信息:"+info);
        sendMessage(new Long(System.currentTimeMillis()).toString());
    } catch (UnsupportedEncodingException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

}

private void sendMessage(String message) {

    ByteBuffer buffer = ByteBuffer.allocate(1024);

    buffer.put(message.getBytes());

    buffer.flip();

    asynchronousSocketChannel.write(buffer,buffer,new CompletionHandler<Integer,ByteBuffer>(){

        @Override
        public void completed(Integer result, ByteBuffer buffer) {

            if(buffer.hasRemaining()) {
                asynchronousSocketChannel.write(buffer,buffer,this);
            }

        }

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {

            try {
                asynchronousSocketChannel.close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        }

    });

}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
    try {
        asynchronousSocketChannel.close();
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

}

}

package netty.aio;

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class AIOHandler implements CompletionHandler<AsynchronousSocketChannel,AIOSocketServer>{

@Override
public void completed(AsynchronousSocketChannel channel, AIOSocketServer handler) {
    // TODO Auto-generated method stub
    handler.asynchronousServerSocketChannel.accept(handler,this);
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    channel.read(buffer, buffer, new AIOSocketHandler(channel));

}

@Override
public void failed(Throwable exc, AIOSocketServer handler) {
    // TODO Auto-generated method stub
    handler.countDownLatch.countDown();
}

}

客戶端測試代碼

package netty.aio;

public class ClientThreadMain {

public static void main(String[] args) {

    String ip = "127.0.0.1";
    int port = 9999;

    new Thread(new AIOSocketClient(ip,port)).start();

}

}

package netty.aio;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;

import io.netty.channel.unix.Buffer;

/**

  • AIO模式客戶端實現
  • @author zhousjmas@hotmail.com
  • */
    public class AIOSocketClient implements CompletionHandler<Void,AIOSocketClient>, Runnable{

    private AsynchronousSocketChannel asynchronousSocketChannel;
    private String ip;
    private int port;
    private CountDownLatch countDownLatch;

    public AIOSocketClient(String ip,int port) {

    this.ip = ip;
    this.port = port;
    
    //創建通道
    try {
        asynchronousSocketChannel = AsynchronousSocketChannel.open();
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

    }br/>@Override
    public void run() {
    // TODO Auto-generated method stub

    }

    @Override
    public void completed(Void result, AIOSocketClient attachment) {

    byte[] bytes = "Hello JAVA AIO WORLD".getBytes();
    ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length);
    byteBuffer.put(bytes);
    byteBuffer.flip();
    asynchronousSocketChannel.write(byteBuffer, byteBuffer, new CompletionHandler<Integer,ByteBuffer>(){
    
        @Override
        public void completed(Integer result, ByteBuffer buffer) {
            if(buffer.hasRemaining()) {
                asynchronousSocketChannel.write(buffer,buffer,this);
            }else {
                ByteBuffer reader = ByteBuffer.allocate(1024);
                asynchronousSocketChannel.read(reader, reader, new CompletionHandler<Integer,ByteBuffer>(){
    
                    @Override
                    public void completed(Integer result, ByteBuffer buffer) {
    
                        buffer.flip();
                        byte[] bytes = new byte[buffer.remaining()];
                        buffer.get(bytes);
                        String info  = "";
                        try {
                            info = new String(bytes,"UTF-8");
                            System.out.println("讀入信息:"+info);
                            countDownLatch.countDown();
    
                        } catch (UnsupportedEncodingException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
    
                    }
    
                    @Override
                    public void failed(Throwable exc, ByteBuffer buffer) {
                        // TODO Auto-generated method stub
                        try {
                            asynchronousSocketChannel.close();
                            countDownLatch.countDown();
                        } catch (IOException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
    
                    }
    
                });
            }
    
        }
    
        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            try {
                asynchronousSocketChannel.close();
                countDownLatch.countDown();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
    
        }
    
    });

    }

    @Override
    public void failed(Throwable exc, AIOSocketClient attachment) {
    // TODO Auto-generated method stub
    try {
    asynchronousSocketChannel.close();

    countDownLatch.countDown();
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

    }

}

這裏用到了不少回調的地方,會稍後補一個回調的小例子進行說明

BIO,簡單但不堪重負
NIO,華麗但不完美

因此後邊要用Netty

相關文章
相關標籤/搜索