JAVA NIO non-blocking模式實現高併發服務器

JAVA NIO non-blocking模式實現高併發服務器

分類: JAVA NIO   1912人閱讀  評論(0)  收藏  舉報

目錄(?)[+]java

Java自1.4之後,加入了新IO特性,NIO. 號稱new IO. NIO帶來了non-blocking特性. 這篇文章主要講的是如何使用NIO的網絡新特性,來構建高性能非阻塞併發服務器.數據庫

文章基於我的理解,我也來搞搞NIO.,求指正.設計模式

在NIO以前

服務器仍是在使用阻塞式的java socket. 以Tomcat最新版本沒有開啓NIO模式的源碼爲例, tomcat會accept出來一個socket鏈接,而後調用processSocket方法來處理socket.tomcat

01 while(true) {
02 ....
03     Socket socket = null;
04     try {
05         // Accept the next incoming connection from the server
06         // socket
07         socket = serverSocketFactory.acceptSocket(serverSocket);
08     }
09 ...
10 ...
11     // Configure the socket
12     if (running && !paused && setSocketOptions(socket)) {
13         // Hand this socket off to an appropriate processor
14         if (!processSocket(socket)) {
15             countDownConnection();
16             // Close socket right away(socket);
17             closeSocket(socket);
18         }
19     }
20 ....
21 }


使用ServerSocket.accept()方法來建立一個鏈接. accept方法是阻塞方法,在下一個connection進來以前,accept會阻塞.服務器

在一個socket進來以後,Tomcat會在thread pool裏面拿出一個thread來處理鏈接的socket. 而後本身快速的脫身去接受下一個socket鏈接. 代碼以下:網絡

01     protected boolean processSocket(Socket socket) {
02         // Process the request from this socket
03         try {
04             SocketWrapper<Socket> wrapper = new SocketWrapper<Socket>(socket);
05             wrapper.setKeepAliveLeft(getMaxKeepAliveRequests());
06             // During shutdown, executor may be null - avoid NPE
07             if (!running) {
08                 return false;
09             }
10             getExecutor().execute(new SocketProcessor(wrapper));
11         catch (RejectedExecutionException x) {
12             log.warn("Socket processing request was rejected for:"+socket,x);
13             return false;
14         catch (Throwable t) {
15             ExceptionUtils.handleThrowable(t);
16             // This means we got an OOM or similar creating a thread, or that
17             // the pool and its queue are full
18             log.error(sm.getString("endpoint.process.fail"), t);
19             return false;
20         }
21         return true;
22     }

而每一個處理socket的線程,也老是會阻塞在while(true) sockek.getInputStream().read() 方法上. 多線程

總結就是, 一個socket必須使用一個線程來處理. 導致服務器須要維護比較多的線程. 線程自己就是一個消耗資源的東西,而且每一個處理socket的線程都會阻塞在read方法上,使得系統大量資源被浪費.架構

以上這種socket的服務方式適用於HTTP服務器,每一個http請求都是短時間的,無狀態的,而且http後臺的業務邏輯也通常比較複雜. 使用多線程和阻塞方式是合適的.併發

假若是作遊戲服務器,尤爲是CS架構的遊戲.這種傳統模式服務器毫無勝算.遊戲有如下幾個特色是傳統服務器不能勝任的:
1, 持久TCP鏈接. 每個client和server之間都存在一個持久的鏈接.當CCU(併發用戶數量)上升,阻塞式服務器沒法爲每個鏈接運行一個線程.
2, 本身開發的二進制流傳輸協議. 遊戲服務器講究響應快.那網絡傳輸也要節省時間. HTTP協議的冗餘內容太多,一個好的遊戲服務器傳輸協議,可使得message壓縮到3-6倍甚至以上.這就使得遊戲服務器要開發本身的協議解析器.
3, 傳輸雙向,且消息傳輸頻率高.假設一個遊戲服務器instance鏈接了2000個client,每一個client平均每秒鐘傳輸1-10個message,一個message大約幾百字節或者幾千字節.而server也須要向client廣播其餘玩家的當前信息.這使得服務器須要有高速處理消息的能力.
4, CS架構的遊戲服務器端的邏輯並不像APP服務器端的邏輯那麼複雜. 網絡遊戲在client端處理了大部分邏輯,server端負責簡單邏輯,甚至只是傳遞消息.app

在Java NIO出現之後

出現了使用NIO寫的非阻塞網絡引擎,好比Apache Mina, JBoss Netty, Smartfoxserver BitSwarm. 比較起來, Mina的性能不如後二者.Tomcat也存在NIO模式,不過須要人工開啓.

首先要說明一下, 與App Server的servlet開發模式不同, 在Mina, Netty和BitSwarm上開發應用程序都是Event Driven的設計模式.Server端會收到Client端的event,Client也會收到Server端的event,Server端與Client端的都要註冊各類event的EventHandler來handle event.

用大白話來解釋NIO:
1, Buffers, 網絡傳輸字節存放的地方.不管是從channel中取,仍是向channel中寫,都必須以Buffers做爲中間存貯格式.
2, Socket Channels. Channel是網絡鏈接和buffer之間的數據通道.每一個鏈接一個channel.就像以前的socket的stream同樣.
3, Selector. 像一個巡警,在一個片區裏面不停的巡邏. 一旦發現事件發生,馬上將事件select出來.不過這些事件必須是提早註冊在selector上的. select出來的事件打包成SelectionKey.裏面包含了事件的發生事件,地點,人物. 若是警察不巡邏,每一個街道(socket)分配一個警察(thread),那麼一個片區有幾條街道,就須要幾個警察.但如今警察巡邏了,一個巡警(selector)能夠管理全部的片區裏面的街道(socketchannel).

以上把警察比做線程,街道比做socket或socketchannel,街道上發生的一切比做stream.把巡警比做selector,引發巡警注意的事件比做selectionKey.

從上能夠看出,使用NIO可使用一個線程,就能維護多個持久TCP鏈接.

NIO實例

下面給出NIO編寫的EchoServer和Client. Client鏈接server之後,將發送一條消息給server. Server會原封不懂的把消息發送回來.Client再把消息發送回去.Server再發回來.用不休止. 在性能的容許下,Client能夠啓動任意多.

如下Code涵蓋了NIO裏面最經常使用的方法和鏈接斷開診斷.註釋也全.

首先是Server的實現. Server端啓動了2個線程,connectionBell線程用於巡邏新的鏈接事件. readBell線程用於讀取全部channel的數據. 註解: Mina採起了一樣的作法,只是readBell線程啓動的個數等於處理器個數+1. 因而可知,NIO只須要少許的幾個線程就能夠維持很是多的併發持久鏈接.

每當事件發生,會調用dispatch方法去處理event. 通常狀況,會使用一個ThreadPool來處理event. ThreadPool的大小能夠自定義.但不是越大越好.若是處理event的邏輯比較複雜,好比須要額外網絡鏈接或者複雜數據庫查詢,那ThreadPool就須要稍微大些.(猜想)Smartfoxserver處理上萬的併發,也只用到了3-4個線程來dispatch event.

EchoServer

001 public class EchoServer {
002     public static SelectorLoop connectionBell;
003     public static SelectorLoop readBell;
004     public boolean isReadBellRunning=false;
005  
006     public static void main(String[] args) throws IOException {
007         new EchoServer().startServer();
008     }
009      
010     // 啓動服務器
011     public void startServer() throws IOException {
012         // 準備好一個鬧鐘.當有連接進來的時候響.
013         connectionBell = new SelectorLoop();
014          
015         // 準備好一個鬧裝,當有read事件進來的時候響.
016         readBell = new SelectorLoop();
017          
018         // 開啓一個server channel來監聽
019         ServerSocketChannel ssc = ServerSocketChannel.open();
020         // 開啓非阻塞模式
021         ssc.configureBlocking(false);
022          
023         ServerSocket socket = ssc.socket();
024         socket.bind(new InetSocketAddress("localhost",7878));
025          
026         // 給鬧鐘規定好要監聽報告的事件,這個鬧鐘只監聽新鏈接事件.
027         ssc.register(connectionBell.getSelector(), SelectionKey.OP_ACCEPT);
028         new Thread(connectionBell).start();
029     }
030      
031     // Selector輪詢線程類
032     public class SelectorLoop implements Runnable {
033         private Selector selector;
034         private ByteBuffer temp = ByteBuffer.allocate(1024);
035          
036         public SelectorLoop() throws IOException {
037             this.selector = Selector.open();
038         }
039          
040         public Selector getSelector() {
041             return this.selector;
042         }
043  
044         @Override
045         public void run() {
046             while(true) {
047                 try {
048                     // 阻塞,只有當至少一個註冊的事件發生的時候纔會繼續.
049                     this.selector.select();
050                      
051                     Set<SelectionKey> selectKeys = this.selector.selectedKeys();
052                     Iterator<SelectionKey> it = selectKeys.iterator();
053                     while (it.hasNext()) {
054                         SelectionKey key = it.next();
055                         it.remove();
056                         // 處理事件. 能夠用多線程來處理.
057                         this.dispatch(key);
058                     }
059                 catch (IOException e) {
060                     e.printStackTrace();
061                 catch (InterruptedException e) {
062                     e.printStackTrace();
063                 }
064             }
065         }
066          
067         public void dispatch(SelectionKey key) throws IOException, InterruptedException {
068             if (key.isAcceptable()) {
069                 // 這是一個connection accept事件, 而且這個事件是註冊在serversocketchannel上的.
070                 ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
071                 // 接受一個鏈接.
072                 SocketChannel sc = ssc.accept();
073                  
074                 // 對新的鏈接的channel註冊read事件. 使用readBell鬧鐘.
075                 sc.configureBlocking(false);
076                 sc.register(readBell.getSelector(), SelectionKey.OP_READ);
077                  
078                 // 若是讀取線程尚未啓動,那就啓動一個讀取線程.
079                 synchronized(EchoServer.this) {
080                     if (!EchoServer.this.isReadBellRunning) {
081                         EchoServer.this.isReadBellRunning = true;
082                         new Thread(readBell).start();
083                     }
084                 }
085                  
086             else if (key.isReadable()) {
087                 // 這是一個read事件,而且這個事件是註冊在socketchannel上的.
088                 SocketChannel sc = (SocketChannel) key.channel();
089                 // 寫數據到buffer
090                 int count = sc.read(temp);
091                 if (count < 0) {
092                     // 客戶端已經斷開鏈接.
093                     key.cancel();
094                     sc.close();
095                     return;
096                 }
097                 // 切換buffer到讀狀態,內部指針歸位.
098                 temp.flip();
099                 String msg = Charset.forName("UTF-8").decode(temp).toString();
100                 System.out.println("Server received ["+msg+"] from client address:" + sc.getRemoteAddress());
101                  
102                 Thread.sleep(1000);
103                 // echo back.
104                 sc.write(ByteBuffer.wrap(msg.getBytes(Charset.forName("UTF-8"))));
105                  
106                 // 清空buffer
107                 temp.clear();
108             }
109         }
110          
111     }
112  
113 }

接下來就是Client的實現.Client能夠用傳統IO,也可使用NIO.這個例子使用的NIO,單線程.

001 public class Client implements Runnable {
002     // 空閒計數器,若是空閒超過10次,將檢測server是否中斷鏈接.
003     private static int idleCounter = 0;
004     private Selector selector;
005     private SocketChannel socketChannel;
006     private ByteBuffer temp = ByteBuffer.allocate(1024);
007  
008     public static void main(String[] args) throws IOException {
009         Client client= new Client();
010         new Thread(client).start();
011         //client.sendFirstMsg();
012     }
013      
014     public Client() throws IOException {
015         // 一樣的,註冊鬧鐘.
016         this.selector = Selector.open();
017          
018         // 鏈接遠程server
019         socketChannel = SocketChannel.open();
020         // 若是快速的創建了鏈接,返回true.若是沒有創建,則返回false,並在鏈接後出發Connect事件.
021         Boolean isConnected = socketChannel.connect(new InetSocketAddress("localhost"7878));
022         socketChannel.configureBlocking(false);
023         SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);
024          
025         if (isConnected) {
026             this.sendFirstMsg();
027         else {
028             // 若是鏈接還在嘗試中,則註冊connect事件的監聽. connect成功之後會出發connect事件.
029             key.interestOps(SelectionKey.OP_CONNECT);
030         }
031     }
032      
033     public void sendFirstMsg() throws IOException {
034         String msg = "Hello NIO.";
035         socketChannel.write(ByteBuffer.wrap(msg.getBytes(Charset.forName("UTF-8"))));
036     }
037
相關文章
相關標籤/搜索