使用異步處理IO的方式,使用一個線程,處理大量的連接。java
先對NIO原理和通訊模型作一些瞭解。ios
1. 由一個專門的線程來處理全部的 IO 事件,並負責分發。
2. 事件驅動機制:事件到的時候觸發,而不是同步的去監視事件。
3. 線程通信:線程之間經過 wait,notify 等方式通信。保證每次上下文切換都是有意義的。減小無謂的線程切換。 服務器
Java NIO的服務端只需啓動一個專門的線程來處理全部的 IO 事件,這種通訊模型是怎麼實現的呢?呵呵,咱們一塊兒來探究它的奧祕吧。java NIO採用了雙向通道(channel)進行數據傳輸,而不是單向的流(stream),在通道上能夠註冊咱們感興趣的事件。一共有如下四種事件:異步
事件名 | 對應值 |
服務端接收客戶端鏈接事件 | SelectionKey.OP_ACCEPT(16) |
客戶端鏈接服務端事件 | SelectionKey.OP_CONNECT(8) |
讀事件 | SelectionKey.OP_READ(1) |
寫事件 | SelectionKey.OP_WRITE(4) |
服務端和客戶端各自維護一個管理通道的對象,咱們稱之爲selector,該對象能檢測一個或多個通道 (channel) 上的事件。咱們以服務端爲例,若是服務端的selector上註冊了讀事件,某時刻客戶端給服務端發送了一些數據,阻塞I/O這時會調用read()方法阻塞地讀取數據,而NIO的服務端會在selector中添加一個讀事件。服務端的處理線程會輪詢地訪問selector,若是訪問selector時發現有感興趣的事件到達,則處理這些事件,若是沒有感興趣的事件到達,則處理線程會一直阻塞直到感興趣的事件到達爲止。下面是我理解的java NIO的通訊模型示意圖:socket
package priv.lee.paradise.nioserver.listen; import priv.lee.paradise.nioserver.distribute.DoGetOperater; import priv.lee.paradise.nioserver.distribute.DoPostOperater; import priv.lee.paradise.nioserver.util.SocketUtil; import java.nio.channels.SocketChannel; /** * Created by li on 2016/12/12. * 當所監聽的端口有鏈接進來的時候,可經過端口監聽器通知觀察者進行下一步操做。 */ class ListenerObserver { /** * 當有新鏈接進來的時候,會通知該類調用靜態方法。 * 每傳入一個新的socketChannel會新建一個線程對其進行處理。 * * @param socketChannel 客戶端和服務器的嵌套字。 */ static void acceptNotify(SocketChannel socketChannel) { String requestInfo = SocketUtil.getRequest(socketChannel); System.out.println(requestInfo); if (requestInfo.contains("GET")) { DoGetOperater doGetOperater = new DoGetOperater(socketChannel, requestInfo); doGetOperater.doGet(); } if (requestInfo.contains("POST")) { DoPostOperater doPostOperater = new DoPostOperater(requestInfo, socketChannel); doPostOperater.doPost(); } } }
package priv.lee.paradise.nioserver.listen; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; /** * Created by li on 2016/12/11. * 使用nio建立的端口監聽器。本類再也不使用靜態方法。 */ public class PortListener { private int port; private Selector selector; /** * 初始化監聽器,要建立監聽器必須傳入端口號 * param:port 端口號。 * return:null */ public PortListener(int port) { this.port = port; init(); } /** * 初始化端口監聽器,獲取通道、註冊事件。 * param:null; * return:null */ private void init() { ServerSocketChannel serverSocketChannel = getServerSocketChannel(); selector = getSelector(); registerEvent(serverSocketChannel, selector, SelectionKey.OP_ACCEPT); } /** * 開始監聽事件。 */ public void startListening() { System.out.println("開始監聽" + port + "號端口"); while (!Thread.interrupted()) { handleEvent(); } } /** * 但監聽器監聽到註冊過的事件到來的時候, * selector.select();爲線程阻塞方法,當感興趣的事件到來的時候纔會觸發這個方法。 */ private void handleEvent() { try { selector.select(); Iterator iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = (SelectionKey) iterator.next(); if (key.isAcceptable()) { handleAcceptableEvent(key); } else if (key.isReadable()) { handleReadableEvent(key); } iterator.remove(); } } catch (Exception e) { e.printStackTrace(); } } /** * 處理一個可讀的事件。 * * @param key 可讀事件 */ private void handleReadableEvent(SelectionKey key) { SocketChannel socketChannel = (SocketChannel) key.channel(); ListenerObserver.acceptNotify(socketChannel); } /** * 處理Acceptable事件 * * @param key 以註冊是的事件觸發。並向選擇器註冊讀取事件。 */ private void handleAcceptableEvent(SelectionKey key) { ServerSocketChannel serverSocketChannel; try { serverSocketChannel = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); } catch (Exception e) { e.printStackTrace(); } } /** * 獲取一個ServerSocketChannel對象,並對其進行一些配置。 * param:null。 * return:ServerSocketChannel */ private ServerSocketChannel getServerSocketChannel() { ServerSocketChannel serverSocketChannel = null; try { serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(port)); } catch (Exception e) { e.printStackTrace(); } return serverSocketChannel; } /** * 獲取一個選擇器。 * param:null * return Selector. */ private Selector getSelector() { Selector selector = null; try { selector = Selector.open(); } catch (Exception e) { e.printStackTrace(); } return selector; } /** * 爲通道註冊選擇器要監聽的事件。 * * @param serverSocketChannel 待監測的通道。 * @param selector 選擇器 * @param opAccept 待監聽的事件選項(int)。 */ private void registerEvent(ServerSocketChannel serverSocketChannel, Selector selector, int opAccept) { try { serverSocketChannel.register(selector, opAccept); } catch (Exception e) { e.printStackTrace(); } } }