JAVA NIO non-blocking模式實現高併發服務器

Java自1.4之後,加入了新IO特性,NIO. 號稱new IO. NIO帶來了non-blocking特性. 這篇文章主要講的是如何使用NIO的網絡新特性,來構建高性能非阻塞併發服務器.java

文章基於我的理解,我也來搞搞NIO.,求指正.數據庫

在NIO以前

服務器仍是在使用阻塞式的java socket. 以Tomcat最新版本沒有開啓NIO模式的源碼爲例, tomcat會accept出來一個socket鏈接,而後調用processSocket方法來處理socket.設計模式

while(true) {
....
    Socket socket = null;
    try {
        // Accept the next incoming connection from the server
        // socket
        socket = serverSocketFactory.acceptSocket(serverSocket);
    }
...
...
    // Configure the socket
    if (running && !paused && setSocketOptions(socket)) {
        // Hand this socket off to an appropriate processor
        if (!processSocket(socket)) {
            countDownConnection();
            // Close socket right away(socket);
            closeSocket(socket);
        }
    }
....
}


使用ServerSocket.accept()方法來建立一個鏈接. accept方法是阻塞方法,在下一個connection進來以前,accept會阻塞.tomcat

在一個socket進來以後,Tomcat會在thread pool裏面拿出一個thread來處理鏈接的socket. 而後本身快速的脫身去接受下一個socket鏈接. 代碼以下:服務器

    protected boolean processSocket(Socket socket) {
        // Process the request from this socket
        try {
            SocketWrapper<Socket> wrapper = new SocketWrapper<Socket>(socket);
            wrapper.setKeepAliveLeft(getMaxKeepAliveRequests());
            // During shutdown, executor may be null - avoid NPE
            if (!running) {
                return false;
            }
            getExecutor().execute(new SocketProcessor(wrapper));
        } catch (RejectedExecutionException x) {
            log.warn("Socket processing request was rejected for:"+socket,x);
            return false;
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            // This means we got an OOM or similar creating a thread, or that
            // the pool and its queue are full
            log.error(sm.getString("endpoint.process.fail"), t);
            return false;
        }
        return true;
    }

而每一個處理socket的線程,也老是會阻塞在while(true) sockek.getInputStream().read() 方法上. 網絡

總結就是, 一個socket必須使用一個線程來處理. 導致服務器須要維護比較多的線程. 線程自己就是一個消耗資源的東西,而且每一個處理socket的線程都會阻塞在read方法上,使得系統大量資源被浪費.多線程

以上這種socket的服務方式適用於HTTP服務器,每一個http請求都是短時間的,無狀態的,而且http後臺的業務邏輯也通常比較複雜. 使用多線程和阻塞方式是合適的.架構

假若是作遊戲服務器,尤爲是CS架構的遊戲.這種傳統模式服務器毫無勝算.遊戲有如下幾個特色是傳統服務器不能勝任的:
1, 持久TCP鏈接. 每個client和server之間都存在一個持久的鏈接.當CCU(併發用戶數量)上升,阻塞式服務器沒法爲每個鏈接運行一個線程.
2, 本身開發的二進制流傳輸協議. 遊戲服務器講究響應快.那網絡傳輸也要節省時間. HTTP協議的冗餘內容太多,一個好的遊戲服務器傳輸協議,可使得message壓縮到3-6倍甚至以上.這就使得遊戲服務器要開發本身的協議解析器.
3, 傳輸雙向,且消息傳輸頻率高.假設一個遊戲服務器instance鏈接了2000個client,每一個client平均每秒鐘傳輸1-10個message,一個message大約幾百字節或者幾千字節.而server也須要向client廣播其餘玩家的當前信息.這使得服務器須要有高速處理消息的能力.
4, CS架構的遊戲服務器端的邏輯並不像APP服務器端的邏輯那麼複雜. 網絡遊戲在client端處理了大部分邏輯,server端負責簡單邏輯,甚至只是傳遞消息.併發

在Java NIO出現之後

出現了使用NIO寫的非阻塞網絡引擎,好比Apache Mina, JBoss Netty, Smartfoxserver BitSwarm. 比較起來, Mina的性能不如後二者.Tomcat也存在NIO模式,不過須要人工開啓.app

首先要說明一下, 與App Server的servlet開發模式不同, 在Mina, Netty和BitSwarm上開發應用程序都是Event Driven的設計模式.Server端會收到Client端的event,Client也會收到Server端的event,Server端與Client端的都要註冊各類event的EventHandler來handle event.

用大白話來解釋NIO:
1, Buffers, 網絡傳輸字節存放的地方.不管是從channel中取,仍是向channel中寫,都必須以Buffers做爲中間存貯格式.
2, Socket Channels. Channel是網絡鏈接和buffer之間的數據通道.每一個鏈接一個channel.就像以前的socket的stream同樣.
3, Selector. 像一個巡警,在一個片區裏面不停的巡邏. 一旦發現事件發生,馬上將事件select出來.不過這些事件必須是提早註冊在selector上的. select出來的事件打包成SelectionKey.裏面包含了事件的發生事件,地點,人物. 若是警察不巡邏,每一個街道(socket)分配一個警察(thread),那麼一個片區有幾條街道,就須要幾個警察.但如今警察巡邏了,一個巡警(selector)能夠管理全部的片區裏面的街道(socketchannel).

以上把警察比做線程,街道比做socket或socketchannel,街道上發生的一切比做stream.把巡警比做selector,引發巡警注意的事件比做selectionKey.

從上能夠看出,使用NIO可使用一個線程,就能維護多個持久TCP鏈接.

NIO實例

下面給出NIO編寫的EchoServer和Client. Client鏈接server之後,將發送一條消息給server. Server會原封不懂的把消息發送回來.Client再把消息發送回去.Server再發回來.用不休止. 在性能的容許下,Client能夠啓動任意多.

如下Code涵蓋了NIO裏面最經常使用的方法和鏈接斷開診斷.註釋也全.

首先是Server的實現. Server端啓動了2個線程,connectionBell線程用於巡邏新的鏈接事件. readBell線程用於讀取全部channel的數據. 註解: Mina採起了一樣的作法,只是readBell線程啓動的個數等於處理器個數+1. 因而可知,NIO只須要少許的幾個線程就能夠維持很是多的併發持久鏈接.

每當事件發生,會調用dispatch方法去處理event. 通常狀況,會使用一個ThreadPool來處理event. ThreadPool的大小能夠自定義.但不是越大越好.若是處理event的邏輯比較複雜,好比須要額外網絡鏈接或者複雜數據庫查詢,那ThreadPool就須要稍微大些.(猜想)Smartfoxserver處理上萬的併發,也只用到了3-4個線程來dispatch event.

EchoServer

public class EchoServer {
	public static SelectorLoop connectionBell;
	public static SelectorLoop readBell;
	public boolean isReadBellRunning=false;

	public static void main(String[] args) throws IOException {
		new EchoServer().startServer();
	}
	
	// 啓動服務器
	public void startServer() throws IOException {
		// 準備好一個鬧鐘.當有連接進來的時候響.
		connectionBell = new SelectorLoop();
		
		// 準備好一個鬧裝,當有read事件進來的時候響.
		readBell = new SelectorLoop();
		
		// 開啓一個server channel來監聽
		ServerSocketChannel ssc = ServerSocketChannel.open();
		// 開啓非阻塞模式
		ssc.configureBlocking(false);
		
		ServerSocket socket = ssc.socket();
		socket.bind(new InetSocketAddress("localhost",7878));
		
		// 給鬧鐘規定好要監聽報告的事件,這個鬧鐘只監聽新鏈接事件.
		ssc.register(connectionBell.getSelector(), SelectionKey.OP_ACCEPT);
		new Thread(connectionBell).start();
	}
	
	// Selector輪詢線程類
	public class SelectorLoop implements Runnable {
		private Selector selector;
		private ByteBuffer temp = ByteBuffer.allocate(1024);
		
		public SelectorLoop() throws IOException {
			this.selector = Selector.open();
		}
		
		public Selector getSelector() {
			return this.selector;
		}

		@Override
		public void run() {
			while(true) {
				try {
				    // 阻塞,只有當至少一個註冊的事件發生的時候纔會繼續.
					this.selector.select();
					
					Set<SelectionKey> selectKeys = this.selector.selectedKeys();
					Iterator<SelectionKey> it = selectKeys.iterator();
					while (it.hasNext()) {
						SelectionKey key = it.next();
						it.remove();
						// 處理事件. 能夠用多線程來處理.
						this.dispatch(key);
					}
				} catch (IOException e) {
					e.printStackTrace();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
		
		public void dispatch(SelectionKey key) throws IOException, InterruptedException {
			if (key.isAcceptable()) {
				// 這是一個connection accept事件, 而且這個事件是註冊在serversocketchannel上的.
				ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
				// 接受一個鏈接.
				SocketChannel sc = ssc.accept();
				
				// 對新的鏈接的channel註冊read事件. 使用readBell鬧鐘.
				sc.configureBlocking(false);
				sc.register(readBell.getSelector(), SelectionKey.OP_READ);
				
				// 若是讀取線程尚未啓動,那就啓動一個讀取線程.
				synchronized(EchoServer.this) {
					if (!EchoServer.this.isReadBellRunning) {
						EchoServer.this.isReadBellRunning = true;
						new Thread(readBell).start();
					}
				}
				
			} else if (key.isReadable()) {
				// 這是一個read事件,而且這個事件是註冊在socketchannel上的.
				SocketChannel sc = (SocketChannel) key.channel();
				// 寫數據到buffer
				int count = sc.read(temp);
				if (count < 0) {
					// 客戶端已經斷開鏈接.
					key.cancel();
					sc.close();
					return;
				}
				// 切換buffer到讀狀態,內部指針歸位.
				temp.flip();
				String msg = Charset.forName("UTF-8").decode(temp).toString();
				System.out.println("Server received ["+msg+"] from client address:" + sc.getRemoteAddress());
				
				Thread.sleep(1000);
				// echo back.
				sc.write(ByteBuffer.wrap(msg.getBytes(Charset.forName("UTF-8"))));
				
				// 清空buffer
				temp.clear();
			}
		}
		
	}

}

接下來就是Client的實現.Client能夠用傳統IO,也可使用NIO.這個例子使用的NIO,單線程.

public class Client implements Runnable {
	// 空閒計數器,若是空閒超過10次,將檢測server是否中斷鏈接.
	private static int idleCounter = 0;
	private Selector selector;
	private SocketChannel socketChannel;
	private ByteBuffer temp = ByteBuffer.allocate(1024);

	public static void main(String[] args) throws IOException {
		Client client= new Client();
		new Thread(client).start();
		//client.sendFirstMsg();
	}
	
	public Client() throws IOException {
		// 一樣的,註冊鬧鐘.
		this.selector = Selector.open();
		
		// 鏈接遠程server
		socketChannel = SocketChannel.open();
		// 若是快速的創建了鏈接,返回true.若是沒有創建,則返回false,並在鏈接後出發Connect事件.
		Boolean isConnected = socketChannel.connect(new InetSocketAddress("localhost", 7878));
		socketChannel.configureBlocking(false);
		SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);
		
		if (isConnected) {
			this.sendFirstMsg();
		} else {
			// 若是鏈接還在嘗試中,則註冊connect事件的監聽. connect成功之後會出發connect事件.
		    key.interestOps(SelectionKey.OP_CONNECT);
		}
	}
	
	public void sendFirstMsg() throws IOException {
		String msg = "Hello NIO.";
		socketChannel.write(ByteBuffer.wrap(msg.getBytes(Charset.forName("UTF-8"))));
	}

	@Override
	public void run() {
        while (true) {
			try {
				// 阻塞,等待事件發生,或者1秒超時. num爲發生事件的數量.
				int num = this.selector.select(1000);
				if (num ==0) {
					idleCounter ++;
					if(idleCounter >10) {
						// 若是server斷開了鏈接,發送消息將失敗.
						try {
						    this.sendFirstMsg();
						} catch(ClosedChannelException e) {
							e.printStackTrace();
							this.socketChannel.close();
							return;
						}
					}
					continue;
				} else {
					idleCounter = 0;
				}
				Set<SelectionKey> keys = this.selector.selectedKeys();
				Iterator<SelectionKey> it = keys.iterator();
				while (it.hasNext()) {
					SelectionKey key = it.next();
					it.remove();
					if (key.isConnectable()) {
						// socket connected
						SocketChannel sc = (SocketChannel)key.channel();
						if (sc.isConnectionPending()) {
						    sc.finishConnect();
						}
						// send first message;
						this.sendFirstMsg();
					}
					if (key.isReadable()) {
						// msg received.
						SocketChannel sc = (SocketChannel)key.channel();
						this.temp = ByteBuffer.allocate(1024);
						int count = sc.read(temp);
						if (count<0) {
							sc.close();
							continue;
						}
						// 切換buffer到讀狀態,內部指針歸位.
						temp.flip();
						String msg = Charset.forName("UTF-8").decode(temp).toString();
						System.out.println("Client received ["+msg+"] from server address:" + sc.getRemoteAddress());
						
						Thread.sleep(1000);
						// echo back.
						sc.write(ByteBuffer.wrap(msg.getBytes(Charset.forName("UTF-8"))));
						
						// 清空buffer
						temp.clear();
					}
				}
			} catch (IOException e) {
				e.printStackTrace();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

}

下載之後黏貼到eclipse中, 先運行EchoServer,而後能夠運行任意多的Client. 中止Server和client的方式就是直接terminate server.

相關文章
相關標籤/搜索