Java自1.4之後,加入了新IO特性,NIO. 號稱new IO. NIO帶來了non-blocking特性. 這篇文章主要講的是如何使用NIO的網絡新特性,來構建高性能非阻塞併發服務器.數據庫
文章基於我的理解,我也來搞搞NIO.,求指正.設計模式
服務器仍是在使用阻塞式的java socket. 以Tomcat最新版本沒有開啓NIO模式的源碼爲例, tomcat會accept出來一個socket鏈接,而後調用processSocket方法來處理socket.tomcat
01 |
while ( true ) { |
02 |
.... |
03 |
Socket socket = null ; |
04 |
try { |
05 |
// Accept the next incoming connection from the server |
06 |
// socket |
07 |
socket = serverSocketFactory.acceptSocket(serverSocket); |
08 |
} |
09 |
... |
10 |
... |
11 |
// Configure the socket |
12 |
if (running && !paused && setSocketOptions(socket)) { |
13 |
// Hand this socket off to an appropriate processor |
14 |
if (!processSocket(socket)) { |
15 |
countDownConnection(); |
16 |
// Close socket right away(socket); |
17 |
closeSocket(socket); |
18 |
} |
19 |
} |
20 |
.... |
21 |
} |
使用ServerSocket.accept()方法來建立一個鏈接. accept方法是阻塞方法,在下一個connection進來以前,accept會阻塞.服務器
在一個socket進來以後,Tomcat會在thread pool裏面拿出一個thread來處理鏈接的socket. 而後本身快速的脫身去接受下一個socket鏈接. 代碼以下:網絡
01 |
protected boolean processSocket(Socket socket) { |
02 |
// Process the request from this socket |
03 |
try { |
04 |
SocketWrapper<Socket> wrapper = new SocketWrapper<Socket>(socket); |
05 |
wrapper.setKeepAliveLeft(getMaxKeepAliveRequests()); |
06 |
// During shutdown, executor may be null - avoid NPE |
07 |
if (!running) { |
08 |
return false ; |
09 |
} |
10 |
getExecutor().execute( new SocketProcessor(wrapper)); |
11 |
} catch (RejectedExecutionException x) { |
12 |
log.warn( "Socket processing request was rejected for:" +socket,x); |
13 |
return false ; |
14 |
} catch (Throwable t) { |
15 |
ExceptionUtils.handleThrowable(t); |
16 |
// This means we got an OOM or similar creating a thread, or that |
17 |
// the pool and its queue are full |
18 |
log.error(sm.getString( "endpoint.process.fail" ), t); |
19 |
return false ; |
20 |
} |
21 |
return true ; |
22 |
} |
而每一個處理socket的線程,也老是會阻塞在while(true) sockek.getInputStream().read() 方法上. 多線程
總結就是, 一個socket必須使用一個線程來處理. 導致服務器須要維護比較多的線程. 線程自己就是一個消耗資源的東西,而且每一個處理socket的線程都會阻塞在read方法上,使得系統大量資源被浪費.架構
以上這種socket的服務方式適用於HTTP服務器,每一個http請求都是短時間的,無狀態的,而且http後臺的業務邏輯也通常比較複雜. 使用多線程和阻塞方式是合適的.併發
假若是作遊戲服務器,尤爲是CS架構的遊戲.這種傳統模式服務器毫無勝算.遊戲有如下幾個特色是傳統服務器不能勝任的:
1, 持久TCP鏈接. 每個client和server之間都存在一個持久的鏈接.當CCU(併發用戶數量)上升,阻塞式服務器沒法爲每個鏈接運行一個線程.
2, 本身開發的二進制流傳輸協議. 遊戲服務器講究響應快.那網絡傳輸也要節省時間. HTTP協議的冗餘內容太多,一個好的遊戲服務器傳輸協議,可使得message壓縮到3-6倍甚至以上.這就使得遊戲服務器要開發本身的協議解析器.
3, 傳輸雙向,且消息傳輸頻率高.假設一個遊戲服務器instance鏈接了2000個client,每一個client平均每秒鐘傳輸1-10個message,一個message大約幾百字節或者幾千字節.而server也須要向client廣播其餘玩家的當前信息.這使得服務器須要有高速處理消息的能力.
4, CS架構的遊戲服務器端的邏輯並不像APP服務器端的邏輯那麼複雜. 網絡遊戲在client端處理了大部分邏輯,server端負責簡單邏輯,甚至只是傳遞消息.app
出現了使用NIO寫的非阻塞網絡引擎,好比Apache Mina, JBoss Netty, Smartfoxserver BitSwarm. 比較起來, Mina的性能不如後二者.Tomcat也存在NIO模式,不過須要人工開啓.
首先要說明一下, 與App Server的servlet開發模式不同, 在Mina, Netty和BitSwarm上開發應用程序都是Event Driven的設計模式.Server端會收到Client端的event,Client也會收到Server端的event,Server端與Client端的都要註冊各類event的EventHandler來handle event.
用大白話來解釋NIO:
1, Buffers, 網絡傳輸字節存放的地方.不管是從channel中取,仍是向channel中寫,都必須以Buffers做爲中間存貯格式.
2, Socket Channels. Channel是網絡鏈接和buffer之間的數據通道.每一個鏈接一個channel.就像以前的socket的stream同樣.
3, Selector. 像一個巡警,在一個片區裏面不停的巡邏. 一旦發現事件發生,馬上將事件select出來.不過這些事件必須是提早註冊在selector上的. select出來的事件打包成SelectionKey.裏面包含了事件的發生事件,地點,人物. 若是警察不巡邏,每一個街道(socket)分配一個警察(thread),那麼一個片區有幾條街道,就須要幾個警察.但如今警察巡邏了,一個巡警(selector)能夠管理全部的片區裏面的街道(socketchannel).
以上把警察比做線程,街道比做socket或socketchannel,街道上發生的一切比做stream.把巡警比做selector,引發巡警注意的事件比做selectionKey.
從上能夠看出,使用NIO可使用一個線程,就能維護多個持久TCP鏈接.
下面給出NIO編寫的EchoServer和Client. Client鏈接server之後,將發送一條消息給server. Server會原封不懂的把消息發送回來.Client再把消息發送回去.Server再發回來.用不休止. 在性能的容許下,Client能夠啓動任意多.
如下Code涵蓋了NIO裏面最經常使用的方法和鏈接斷開診斷.註釋也全.
首先是Server的實現. Server端啓動了2個線程,connectionBell線程用於巡邏新的鏈接事件. readBell線程用於讀取全部channel的數據. 註解: Mina採起了一樣的作法,只是readBell線程啓動的個數等於處理器個數+1. 因而可知,NIO只須要少許的幾個線程就能夠維持很是多的併發持久鏈接.
每當事件發生,會調用dispatch方法去處理event. 通常狀況,會使用一個ThreadPool來處理event. ThreadPool的大小能夠自定義.但不是越大越好.若是處理event的邏輯比較複雜,好比須要額外網絡鏈接或者複雜數據庫查詢,那ThreadPool就須要稍微大些.(猜想)Smartfoxserver處理上萬的併發,也只用到了3-4個線程來dispatch event.
EchoServer
001 |
public class EchoServer { |
002 |
public static SelectorLoop connectionBell; |
003 |
public static SelectorLoop readBell; |
004 |
public boolean isReadBellRunning= false ; |
005 |
006 |
public static void main(String[] args) throws IOException { |
007 |
new EchoServer().startServer(); |
008 |
} |
009 |
|
010 |
// 啓動服務器 |
011 |
public void startServer() throws IOException { |
012 |
// 準備好一個鬧鐘.當有連接進來的時候響. |
013 |
connectionBell = new SelectorLoop(); |
014 |
|
015 |
// 準備好一個鬧裝,當有read事件進來的時候響. |
016 |
readBell = new SelectorLoop(); |
017 |
|
018 |
// 開啓一個server channel來監聽 |
019 |
ServerSocketChannel ssc = ServerSocketChannel.open(); |
020 |
// 開啓非阻塞模式 |
021 |
ssc.configureBlocking( false ); |
022 |
|
023 |
ServerSocket socket = ssc.socket(); |
024 |
socket.bind( new InetSocketAddress( "localhost" , 7878 )); |
025 |
|
026 |
// 給鬧鐘規定好要監聽報告的事件,這個鬧鐘只監聽新鏈接事件. |
027 |
ssc.register(connectionBell.getSelector(), SelectionKey.OP_ACCEPT); |
028 |
new Thread(connectionBell).start(); |
029 |
} |
030 |
|
031 |
// Selector輪詢線程類 |
032 |
public class SelectorLoop implements Runnable { |
033 |
private Selector selector; |
034 |
private ByteBuffer temp = ByteBuffer.allocate( 1024 ); |
035 |
|
036 |
public SelectorLoop() throws IOException { |
037 |
this .selector = Selector.open(); |
038 |
} |
039 |
|
040 |
public Selector getSelector() { |
041 |
return this .selector; |
042 |
} |
043 |
044 |
@Override |
045 |
public void run() { |
046 |
while ( true ) { |
047 |
try { |
048 |
// 阻塞,只有當至少一個註冊的事件發生的時候纔會繼續. |
049 |
this .selector.select(); |
050 |
|
051 |
Set<SelectionKey> selectKeys = this .selector.selectedKeys(); |
052 |
Iterator<SelectionKey> it = selectKeys.iterator(); |
053 |
while (it.hasNext()) { |
054 |
SelectionKey key = it.next(); |
055 |
it.remove(); |
056 |
// 處理事件. 能夠用多線程來處理. |
057 |
this .dispatch(key); |
058 |
} |
059 |
} catch (IOException e) { |
060 |
e.printStackTrace(); |
061 |
} catch (InterruptedException e) { |
062 |
e.printStackTrace(); |
063 |
} |
064 |
} |
065 |
} |
066 |
|
067 |
public void dispatch(SelectionKey key) throws IOException, InterruptedException { |
068 |
if (key.isAcceptable()) { |
069 |
// 這是一個connection accept事件, 而且這個事件是註冊在serversocketchannel上的. |
070 |
ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); |
071 |
// 接受一個鏈接. |
072 |
SocketChannel sc = ssc.accept(); |
073 |
|
074 |
// 對新的鏈接的channel註冊read事件. 使用readBell鬧鐘. |
075 |
sc.configureBlocking( false ); |
076 |
sc.register(readBell.getSelector(), SelectionKey.OP_READ); |
077 |
|
078 |
// 若是讀取線程尚未啓動,那就啓動一個讀取線程. |
079 |
synchronized (EchoServer. this ) { |
080 |
if (!EchoServer. this .isReadBellRunning) { |
081 |
EchoServer. this .isReadBellRunning = true ; |
082 |
new Thread(readBell).start(); |
083 |
} |
084 |
} |
085 |
|
086 |
} else if (key.isReadable()) { |
087 |
// 這是一個read事件,而且這個事件是註冊在socketchannel上的. |
088 |
SocketChannel sc = (SocketChannel) key.channel(); |
089 |
// 寫數據到buffer |
090 |
int count = sc.read(temp); |
091 |
if (count < 0 ) { |
092 |
// 客戶端已經斷開鏈接. |
093 |
key.cancel(); |
094 |
sc.close(); |
095 |
return ; |
096 |
} |
097 |
// 切換buffer到讀狀態,內部指針歸位. |
098 |
temp.flip(); |
099 |
String msg = Charset.forName( "UTF-8" ).decode(temp).toString(); |
100 |
System.out.println( "Server received [" +msg+ "] from client address:" + sc.getRemoteAddress()); |
101 |
|
102 |
Thread.sleep( 1000 ); |
103 |
// echo back. |
104 |
sc.write(ByteBuffer.wrap(msg.getBytes(Charset.forName( "UTF-8" )))); |
105 |
|
106 |
// 清空buffer |
107 |
temp.clear(); |
108 |
} |
109 |
} |
110 |
|
111 |
} |
112 |
113 |
} |
接下來就是Client的實現.Client能夠用傳統IO,也可使用NIO.這個例子使用的NIO,單線程.
001 |
public class Client implements Runnable { |
002 |
// 空閒計數器,若是空閒超過10次,將檢測server是否中斷鏈接. |
003 |
private static int idleCounter = 0 ; |
004 |
private Selector selector; |
005 |
private SocketChannel socketChannel; |
006 |
private ByteBuffer temp = ByteBuffer.allocate( 1024 ); |
007 |
008 |
public static void main(String[] args) throws IOException { |
009 |
Client client= new Client(); |
010 |
new Thread(client).start(); |
011 |
//client.sendFirstMsg(); |
012 |
} |
013 |
|
014 |
public Client() throws IOException { |
015 |
// 一樣的,註冊鬧鐘. |
016 |
this .selector = Selector.open(); |
017 |
|
018 |
// 鏈接遠程server |
019 |
socketChannel = SocketChannel.open(); |
020 |
// 若是快速的創建了鏈接,返回true.若是沒有創建,則返回false,並在鏈接後出發Connect事件. |
021 |
Boolean isConnected = socketChannel.connect( new InetSocketAddress( "localhost" , 7878 )); |
022 |
socketChannel.configureBlocking( false ); |
023 |
SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ); |
024 |
|
025 |
if (isConnected) { |
026 |
this .sendFirstMsg(); |
027 |
} else { |
028 |
// 若是鏈接還在嘗試中,則註冊connect事件的監聽. connect成功之後會出發connect事件. |
029 |
key.interestOps(SelectionKey.OP_CONNECT); |
030 |
} |
031 |
} |
032 |
|
033 |
public void sendFirstMsg() throws IOException { |
034 |
String msg = "Hello NIO." ; |
035 |
socketChannel.write(ByteBuffer.wrap(msg.getBytes(Charset.forName( "UTF-8" )))); |
036 |
} |
037 |