阻塞IO,在accept和IO讀寫時當前線程阻塞。 java
Socket clientScoket=server.accept(); //阻塞等待客戶端Socket連接服務器
取得鏈接後,把clientScoket封裝到Runable中交給ThreadPool線程池中線程去處理讀寫。socket
clientScoket.getInputStream();//取得客戶端Socket輸入流this
阻塞的進行IO讀寫操做.net
可是當線程池佔滿時,其餘連接必須等待有連接釋放線程。線程
只用一個線程經過Selector,就能夠控制客戶端Channel連接註冊,狀態監控,以及讀寫操做,實現多路複用。rest
此時多個客戶端能夠同時鏈接,和服務器進行IO讀寫操做。code
由於如今是一個線程關注多個Channel,並且每一個Channel的數據發送都是不連貫的。因此增長了Buffer做爲緩衝區。server
此線程無線循環,Selector只會監控Channel註冊時綁定的事件。通常ServerSocketChannel 綁定OP_ACCEPT事件,事件
客戶端SocketChannel綁定OP_READ。
public class NioServer { //通道管理器 private Selector selector; //獲取一個ServerSocket通道,並初始化通道 public NioServer init(int port) throws IOException{ //獲取一個ServerSocket通道 ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); serverChannel.socket().bind(new InetSocketAddress(port)); //獲取通道管理器 selector=Selector.open(); //將通道管理器與ServerSocketChannel綁定,併爲該通道註冊SelectionKey.OP_ACCEPT事件, //只有當該事件到達時,Selector.select()會返回,不然一直阻塞。 serverChannel.register(selector, SelectionKey.OP_ACCEPT); return this; } public void listen() throws IOException{ System.out.println("服務器端啓動成功"); //使用輪詢訪問selector while(true){ //當有註冊的事件觸發時,方法返回,不然阻塞。 selector.select(); //獲取selector中的迭代器,選中項爲註冊的事件 Iterator<SelectionKey> ite=selector.selectedKeys().iterator(); while(ite.hasNext()){ SelectionKey key = ite.next(); //刪除已選key,防止重複處理 ite.remove(); //服務端接收客戶端請求鏈接事件 if(key.isAcceptable()){ //服務端 ServerSocketChannel server = (ServerSocketChannel)key.channel(); //得到客戶端鏈接通道 SocketChannel channel = server.accept(); channel.configureBlocking(false); //向客戶端發消息 channel.write(ByteBuffer.wrap(new String("send message to client").getBytes())); //在與客戶端鏈接成功後,爲客戶端通道註冊SelectionKey.OP_READ事件。 channel.register(selector, SelectionKey.OP_READ); System.out.println("客戶端請求鏈接事件"); }else if(key.isReadable()){//客戶端有可讀數據事件 //獲取客戶端傳輸數據可讀取消息通道。 SocketChannel channel = (SocketChannel)key.channel(); //建立讀取數據緩衝器 ByteBuffer buffer = ByteBuffer.allocate(10); int read = channel.read(buffer); byte[] data = buffer.array(); String message = new String(data); System.out.println("receive message from client, size:" + buffer.position() + " msg: " + message); // ByteBuffer outbuffer = ByteBuffer.wrap(("server.".concat(msg)).getBytes()); // channel.write(outbuffer); } } } } public static void main(String[] args) throws IOException { new NioServer().init(9981).listen(); } }
雖然NIO是非阻塞的,一個線程就能夠經過多路複用器完成對多個客戶端的讀寫操做。可是當業務複雜時,推薦用ThreadPool來解決業務問題,從而解放IO線程。因此區分BossThreadPool和WorkerThreadPool。
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.LinkedList; import java.util.List; public class ServerSocketThreadPool{ private static final int MAX_THREAD = Runtime.getRuntime().availableProcessors(); private ThreadPool pool = new ThreadPool(MAX_THREAD); private static int PORT_NUMBER = 1234; public static void main(String[] args) throws Exception { new ServerSocketThreadPool().go(); } public void go() throws Exception { int port = PORT_NUMBER; System.out.println("Listenning on port:" + port); // 建立通道 ServerSocketChannel ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 綁定監聽端口 serverSocketChannel.socket().bind(new InetSocketAddress(port)); // 設置爲非阻塞方式 serverSocketChannel.configureBlocking(false); // 建立選擇器 Selector selector = Selector.open(); // 通道註冊到選擇器 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { // 一直阻塞,直到有數據請求 int n = selector.select(); if (n == 0) { continue; } Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = it.next(); if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel socket = server.accept(); registerChannel(selector,socket, SelectionKey.OP_READ); sayHello(socket); } if (key.isReadable()) { readDataFromSocket(key); } it.remove(); } } } public void registerChannel(Selector selector,SelectableChannel channel,int ops)throws Exception{ if(channel==null){ return; } channel.configureBlocking(false); channel.register(selector, ops); } public void sayHello(SocketChannel socket) throws Exception{ ByteBuffer buffer=ByteBuffer.allocate(1024); buffer.clear(); buffer.put("hello client".getBytes()); buffer.flip(); socket.write(buffer); } public void readDataFromSocket(SelectionKey key) throws Exception { WorkThread thread=pool.getWork(); if(thread==null){ return; } thread.serviceChannel(key); } private class ThreadPool { List idle=new LinkedList(); public ThreadPool(int poolSize) { for(int i=0;i<poolSize;i++){ WorkThread thread=new WorkThread(this); thread.setName("worker"+(i+1)); thread.start(); idle.add(thread); } } public WorkThread getWork(){ WorkThread thread=null; synchronized (idle) { if(idle.size()>0){ thread=(WorkThread) idle.remove(0); } } return thread; } public void returnWorker(WorkThread workThread) { synchronized (idle) { idle.add(workThread); } } } private class WorkThread extends Thread { private ByteBuffer buffer = ByteBuffer.allocate(1024); private ThreadPool pool; private SelectionKey key; public WorkThread(ThreadPool pool) { this.pool = pool; } public synchronized void run() { System.out.println(this.getName() + " is ready"); while (true) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); this.interrupt(); } if (key == null) { continue; } System.out.println(this.getName() + " has been awaken"); try{ drainChannel(key); }catch(Exception e){ System.out.println("caught '"+e+"' closing channel"); try{ key.channel().close(); }catch(IOException ioe){ ioe.printStackTrace(); } key.selector().wakeup(); } key=null; this.pool.returnWorker(this); } } synchronized void serviceChannel(SelectionKey key){ this.key=key; key.interestOps(key.interestOps()&(~SelectionKey.OP_READ)); this.notify(); } void drainChannel(SelectionKey key)throws Exception{ SocketChannel channel=(SocketChannel) key.channel(); buffer.clear(); int count; while((count=channel.read(buffer))>0){ buffer.flip(); /*while(buffer.hasRemaining()){ channel.write(buffer); }*/ byte[] bytes; bytes=new byte[count]; buffer.get(bytes); System.out.println(new String(bytes)); buffer.clear(); } if(count<0){ channel.close(); return; } key.interestOps(key.interestOps()|SelectionKey.OP_READ); key.selector().wakeup(); } } }