Java自1.4之後,加入了新IO特性,NIO. 號稱new IO. NIO帶來了non-blocking特性. 這篇文章主要講的是如何使用NIO的網絡新特性,來構建高性能非阻塞併發服務器.java
文章基於我的理解,我也來搞搞NIO.,求指正.數據庫
服務器仍是在使用阻塞式的java socket. 以Tomcat最新版本沒有開啓NIO模式的源碼爲例, tomcat會accept出來一個socket鏈接,而後調用processSocket方法來處理socket.設計模式
while(true) { .... Socket socket = null; try { // Accept the next incoming connection from the server // socket socket = serverSocketFactory.acceptSocket(serverSocket); } ... ... // Configure the socket if (running && !paused && setSocketOptions(socket)) { // Hand this socket off to an appropriate processor if (!processSocket(socket)) { countDownConnection(); // Close socket right away(socket); closeSocket(socket); } } .... }
使用ServerSocket.accept()方法來建立一個鏈接. accept方法是阻塞方法,在下一個connection進來以前,accept會阻塞.tomcat
在一個socket進來以後,Tomcat會在thread pool裏面拿出一個thread來處理鏈接的socket. 而後本身快速的脫身去接受下一個socket鏈接. 代碼以下:服務器
protected boolean processSocket(Socket socket) { // Process the request from this socket try { SocketWrapper<Socket> wrapper = new SocketWrapper<Socket>(socket); wrapper.setKeepAliveLeft(getMaxKeepAliveRequests()); // During shutdown, executor may be null - avoid NPE if (!running) { return false; } getExecutor().execute(new SocketProcessor(wrapper)); } catch (RejectedExecutionException x) { log.warn("Socket processing request was rejected for:"+socket,x); return false; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full log.error(sm.getString("endpoint.process.fail"), t); return false; } return true; }
而每一個處理socket的線程,也老是會阻塞在while(true) sockek.getInputStream().read() 方法上. 網絡
總結就是, 一個socket必須使用一個線程來處理. 導致服務器須要維護比較多的線程. 線程自己就是一個消耗資源的東西,而且每一個處理socket的線程都會阻塞在read方法上,使得系統大量資源被浪費.多線程
以上這種socket的服務方式適用於HTTP服務器,每一個http請求都是短時間的,無狀態的,而且http後臺的業務邏輯也通常比較複雜. 使用多線程和阻塞方式是合適的.架構
假若是作遊戲服務器,尤爲是CS架構的遊戲.這種傳統模式服務器毫無勝算.遊戲有如下幾個特色是傳統服務器不能勝任的:
1, 持久TCP鏈接. 每個client和server之間都存在一個持久的鏈接.當CCU(併發用戶數量)上升,阻塞式服務器沒法爲每個鏈接運行一個線程.
2, 本身開發的二進制流傳輸協議. 遊戲服務器講究響應快.那網絡傳輸也要節省時間. HTTP協議的冗餘內容太多,一個好的遊戲服務器傳輸協議,可使得message壓縮到3-6倍甚至以上.這就使得遊戲服務器要開發本身的協議解析器.
3, 傳輸雙向,且消息傳輸頻率高.假設一個遊戲服務器instance鏈接了2000個client,每一個client平均每秒鐘傳輸1-10個message,一個message大約幾百字節或者幾千字節.而server也須要向client廣播其餘玩家的當前信息.這使得服務器須要有高速處理消息的能力.
4, CS架構的遊戲服務器端的邏輯並不像APP服務器端的邏輯那麼複雜. 網絡遊戲在client端處理了大部分邏輯,server端負責簡單邏輯,甚至只是傳遞消息.併發
出現了使用NIO寫的非阻塞網絡引擎,好比Apache Mina, JBoss Netty, Smartfoxserver BitSwarm. 比較起來, Mina的性能不如後二者.Tomcat也存在NIO模式,不過須要人工開啓.app
首先要說明一下, 與App Server的servlet開發模式不同, 在Mina, Netty和BitSwarm上開發應用程序都是Event Driven的設計模式.Server端會收到Client端的event,Client也會收到Server端的event,Server端與Client端的都要註冊各類event的EventHandler來handle event.
用大白話來解釋NIO:
1, Buffers, 網絡傳輸字節存放的地方.不管是從channel中取,仍是向channel中寫,都必須以Buffers做爲中間存貯格式.
2, Socket Channels. Channel是網絡鏈接和buffer之間的數據通道.每一個鏈接一個channel.就像以前的socket的stream同樣.
3, Selector. 像一個巡警,在一個片區裏面不停的巡邏. 一旦發現事件發生,馬上將事件select出來.不過這些事件必須是提早註冊在selector上的. select出來的事件打包成SelectionKey.裏面包含了事件的發生事件,地點,人物. 若是警察不巡邏,每一個街道(socket)分配一個警察(thread),那麼一個片區有幾條街道,就須要幾個警察.但如今警察巡邏了,一個巡警(selector)能夠管理全部的片區裏面的街道(socketchannel).
以上把警察比做線程,街道比做socket或socketchannel,街道上發生的一切比做stream.把巡警比做selector,引發巡警注意的事件比做selectionKey.
從上能夠看出,使用NIO可使用一個線程,就能維護多個持久TCP鏈接.
下面給出NIO編寫的EchoServer和Client. Client鏈接server之後,將發送一條消息給server. Server會原封不懂的把消息發送回來.Client再把消息發送回去.Server再發回來.用不休止. 在性能的容許下,Client能夠啓動任意多.
如下Code涵蓋了NIO裏面最經常使用的方法和鏈接斷開診斷.註釋也全.
首先是Server的實現. Server端啓動了2個線程,connectionBell線程用於巡邏新的鏈接事件. readBell線程用於讀取全部channel的數據. 註解: Mina採起了一樣的作法,只是readBell線程啓動的個數等於處理器個數+1. 因而可知,NIO只須要少許的幾個線程就能夠維持很是多的併發持久鏈接.
每當事件發生,會調用dispatch方法去處理event. 通常狀況,會使用一個ThreadPool來處理event. ThreadPool的大小能夠自定義.但不是越大越好.若是處理event的邏輯比較複雜,好比須要額外網絡鏈接或者複雜數據庫查詢,那ThreadPool就須要稍微大些.(猜想)Smartfoxserver處理上萬的併發,也只用到了3-4個線程來dispatch event.
EchoServer
public class EchoServer { public static SelectorLoop connectionBell; public static SelectorLoop readBell; public boolean isReadBellRunning=false; public static void main(String[] args) throws IOException { new EchoServer().startServer(); } // 啓動服務器 public void startServer() throws IOException { // 準備好一個鬧鐘.當有連接進來的時候響. connectionBell = new SelectorLoop(); // 準備好一個鬧裝,當有read事件進來的時候響. readBell = new SelectorLoop(); // 開啓一個server channel來監聽 ServerSocketChannel ssc = ServerSocketChannel.open(); // 開啓非阻塞模式 ssc.configureBlocking(false); ServerSocket socket = ssc.socket(); socket.bind(new InetSocketAddress("localhost",7878)); // 給鬧鐘規定好要監聽報告的事件,這個鬧鐘只監聽新鏈接事件. ssc.register(connectionBell.getSelector(), SelectionKey.OP_ACCEPT); new Thread(connectionBell).start(); } // Selector輪詢線程類 public class SelectorLoop implements Runnable { private Selector selector; private ByteBuffer temp = ByteBuffer.allocate(1024); public SelectorLoop() throws IOException { this.selector = Selector.open(); } public Selector getSelector() { return this.selector; } @Override public void run() { while(true) { try { // 阻塞,只有當至少一個註冊的事件發生的時候纔會繼續. this.selector.select(); Set<SelectionKey> selectKeys = this.selector.selectedKeys(); Iterator<SelectionKey> it = selectKeys.iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); // 處理事件. 能夠用多線程來處理. this.dispatch(key); } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } public void dispatch(SelectionKey key) throws IOException, InterruptedException { if (key.isAcceptable()) { // 這是一個connection accept事件, 而且這個事件是註冊在serversocketchannel上的. ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); // 接受一個鏈接. SocketChannel sc = ssc.accept(); // 對新的鏈接的channel註冊read事件. 使用readBell鬧鐘. sc.configureBlocking(false); sc.register(readBell.getSelector(), SelectionKey.OP_READ); // 若是讀取線程尚未啓動,那就啓動一個讀取線程. synchronized(EchoServer.this) { if (!EchoServer.this.isReadBellRunning) { EchoServer.this.isReadBellRunning = true; new Thread(readBell).start(); } } } else if (key.isReadable()) { // 這是一個read事件,而且這個事件是註冊在socketchannel上的. SocketChannel sc = (SocketChannel) key.channel(); // 寫數據到buffer int count = sc.read(temp); if (count < 0) { // 客戶端已經斷開鏈接. key.cancel(); sc.close(); return; } // 切換buffer到讀狀態,內部指針歸位. temp.flip(); String msg = Charset.forName("UTF-8").decode(temp).toString(); System.out.println("Server received ["+msg+"] from client address:" + sc.getRemoteAddress()); Thread.sleep(1000); // echo back. sc.write(ByteBuffer.wrap(msg.getBytes(Charset.forName("UTF-8")))); // 清空buffer temp.clear(); } } } }
接下來就是Client的實現.Client能夠用傳統IO,也可使用NIO.這個例子使用的NIO,單線程.
public class Client implements Runnable { // 空閒計數器,若是空閒超過10次,將檢測server是否中斷鏈接. private static int idleCounter = 0; private Selector selector; private SocketChannel socketChannel; private ByteBuffer temp = ByteBuffer.allocate(1024); public static void main(String[] args) throws IOException { Client client= new Client(); new Thread(client).start(); //client.sendFirstMsg(); } public Client() throws IOException { // 一樣的,註冊鬧鐘. this.selector = Selector.open(); // 鏈接遠程server socketChannel = SocketChannel.open(); // 若是快速的創建了鏈接,返回true.若是沒有創建,則返回false,並在鏈接後出發Connect事件. Boolean isConnected = socketChannel.connect(new InetSocketAddress("localhost", 7878)); socketChannel.configureBlocking(false); SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ); if (isConnected) { this.sendFirstMsg(); } else { // 若是鏈接還在嘗試中,則註冊connect事件的監聽. connect成功之後會出發connect事件. key.interestOps(SelectionKey.OP_CONNECT); } } public void sendFirstMsg() throws IOException { String msg = "Hello NIO."; socketChannel.write(ByteBuffer.wrap(msg.getBytes(Charset.forName("UTF-8")))); } @Override public void run() { while (true) { try { // 阻塞,等待事件發生,或者1秒超時. num爲發生事件的數量. int num = this.selector.select(1000); if (num ==0) { idleCounter ++; if(idleCounter >10) { // 若是server斷開了鏈接,發送消息將失敗. try { this.sendFirstMsg(); } catch(ClosedChannelException e) { e.printStackTrace(); this.socketChannel.close(); return; } } continue; } else { idleCounter = 0; } Set<SelectionKey> keys = this.selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); if (key.isConnectable()) { // socket connected SocketChannel sc = (SocketChannel)key.channel(); if (sc.isConnectionPending()) { sc.finishConnect(); } // send first message; this.sendFirstMsg(); } if (key.isReadable()) { // msg received. SocketChannel sc = (SocketChannel)key.channel(); this.temp = ByteBuffer.allocate(1024); int count = sc.read(temp); if (count<0) { sc.close(); continue; } // 切換buffer到讀狀態,內部指針歸位. temp.flip(); String msg = Charset.forName("UTF-8").decode(temp).toString(); System.out.println("Client received ["+msg+"] from server address:" + sc.getRemoteAddress()); Thread.sleep(1000); // echo back. sc.write(ByteBuffer.wrap(msg.getBytes(Charset.forName("UTF-8")))); // 清空buffer temp.clear(); } } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
下載之後黏貼到eclipse中, 先運行EchoServer,而後能夠運行任意多的Client. 中止Server和client的方式就是直接terminate server.