JDK1.4 的 NIO 有效解決了原有流式 IO 存在的線程開銷的問題,在 NIO 中使用多線程,主要目的已不是爲了應對每一個客戶端請求而分配獨立的服務線程,而是經過多線程充分使用用多個 CPU 的處理能力和處理中的等待時間,達到提升服務能力的目的。
多線程的引入,容易爲原本就略顯複雜的 NIO 代碼進一步下降可讀性和可維護性。引入良好的設計模型,將不只帶來高性能、高可靠的代碼,也將帶來一個愜意的開發過程。java
NIO 的選擇器採用了多路複用(Multiplexing)技術,可在一個選擇器上處理多個套接字, 經過獲取讀寫通道來進行 IO 操做。因爲網絡帶寬等緣由,在通道的讀、寫操做中是容易出現等待的, 因此在讀、寫操做中引入多線程,對性能提升明顯,並且能夠提升客戶端的感知服務質量。因此本文的模型將主要經過使用讀、寫線程池 來提升與客戶端的數據交換能力。ios
以下圖所示,服務端接受客戶端請求後,控制線程將該請求的讀通道交給讀線程池,由讀線程池分配線程完成對客戶端數據的讀取操做;當讀線程完成讀操做後,將數據返回控制線程,進行服務端的業務處理;完成 業務處理後,將需迴應給客戶端的數據和寫通道提交給寫線程池,由寫線程完成向客戶端發送迴應數據的操做。服務器
同時整個服務端的流程處理,創建於事件機制上。在 [接受鏈接->讀->業務處理->寫 ->關閉鏈接 ]這個 過程當中,觸發器將觸發相應事件,由事件處理器對相應事件分別響應,完成服務器端的業務處理。
下面咱們就來詳細看一下這個模型的各個組成部分。網絡
回頁首多線程
(1)onAccept:當服務端收到客戶端鏈接請求時,觸發該事件。經過該事件咱們能夠知道有新的客戶端呼入。該事件可用來控制服務端的負載。例如,服務器可設定同時只爲必定數量客戶端提供服務,當同時請求數超出數量時,可在響應該事件時直接拋出異常,以拒絕新的鏈接。
(2)onAccepted:當客戶端請求被服務器接受後觸發該事件。該事件代表一個新的客戶端與服務器正式創建鏈接。
(3)onRead:當客戶端發來數據,並已被服務器控制線程正確讀取時,觸發該事件 。該事件通知各事件處理器能夠對客戶端發來的數據進行實際處理了。須要注意的是,在本模型中,客戶端的數據讀取是由控制線程交由讀線程完成的,事件處理器不須要在該事件中進行專門的讀操做,而只需將控制線程傳來的數據進行直接處理便可。
(4)onWrite:當客戶端能夠開始接受服務端發送數據時觸發該事件,經過該事件,咱們能夠向客戶端發送迴應數據。 在本模型中,事件處理器只須要在該事件中設置
(5)onClosed:當客戶端與服務器斷開鏈接時觸發該事件。
(6)onError:當客戶端與服務器從鏈接開始到最後斷開鏈接期間發生錯誤時觸發該事件。經過該事件咱們能夠知道有什麼錯誤發生。socket
回頁首性能
在這個模型中,事件採用廣播方式,也就是全部在冊的事件處理器都能得到事件通知。這樣能夠將不一樣性質的業務處理,分別用不一樣的處理器實現,使每一個處理器的業務功能儘量單一。
以下圖:整個事件模型由監聽器、事件適配器、事件觸發器、事件處理器組成。線程
public interface Serverlistener { public void onError(String error); public void onAccept() throws Exception; public void onAccepted(Request request) throws Exception; public void onRead(Request request) throws Exception; public void onWrite(Request request, Response response) throws Exception; public void onClosed(Request request) throws Exception; }
public abstract class EventAdapter implements Serverlistener { public EventAdapter() { } public void onError(String error) {} public void onAccept() throws Exception {} public void onAccepted(Request request) throws Exception {} public void onRead(Request request) throws Exception {} public void onWrite(Request request, Response response) throws Exception {} public void onClosed(Request request) throws Exception {} }
public class Notifier { private static Arraylist listeners = null; private static Notifier instance = null; private Notifier() { listeners = new Arraylist(); } /** * 獲取事件觸發器 * @return 返回事件觸發器 */ public static synchronized Notifier getNotifier() { if (instance == null) { instance = new Notifier(); return instance; } else return instance; } /** * 添加事件監聽器 * @param l 監聽器 */ public void addlistener(Serverlistener l) { synchronized (listeners) { if (!listeners.contains(l)) listeners.add(l); } } public void fireOnAccept() throws Exception { for (int i = listeners.size() - 1; i >= 0; i--) ( (Serverlistener) listeners.get(i)).onAccept(); } ....// other fire method }
public class ServerHandler extends EventAdapter { public ServerHandler() { } public void onRead(Request request) throws Exception { System.out.println("Received: " + new String(data)); } }
ServerHandler handler = new ServerHandler(); Notifier.addlistener(handler);
回頁首設計
NIO 多線程服務器主要由主控服務線程、讀線程和寫線程組成。日誌
public class Server implements Runnable { .... private static int MAX_THREADS = 4; public Server(int port) throws Exception { .... // 建立無阻塞網絡套接 selector = Selector.open(); sschannel = ServerSocketChannel.open(); sschannel.configureBlocking(false); address = new InetSocketAddress(port); ServerSocket ss = sschannel.socket(); ss.bind(address); sschannel.register(selector, SelectionKey.OP_ACCEPT); } public void run() { System.out.println("Server started ..."); System.out.println("Server listening on port: " + port); // 監聽 while (true) { try { int num = 0; num = selector.select(); if (num > 0) { Set selectedKeys = selector.selectedKeys(); Iterator it = selectedKeys.iterator(); while (it.hasNext()) { SelectionKey key = (SelectionKey) it.next(); it.remove(); // 處理 IO 事件 if ( (key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) { // Accept the new connection ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); notifier.fireOnAccept(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); // 觸發接受鏈接事件 Request request = new Request(sc); notifier.fireOnAccepted(request); // 註冊讀操做 , 以進行下一步的讀操做 sc.register(selector, SelectionKey.OP_READ, request); } else if ( (key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ ) { // 提交讀服務線程讀取客戶端數據 Reader.processRequest(key); key.cancel(); } else if ( (key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE ) { // 提交寫服務線程向客戶端發送迴應數據 Writer.processRequest(key); key.cancel(); } } } else { addRegister(); // 在 Selector 中註冊新的寫通道 } } catch (Exception e) { notifier.fireOnError("Error occured in Server: " + e.getMessage()); continue; } } } .... }
public class Reader extends Thread { public void run() { while (true) { try { SelectionKey key; synchronized (pool) { while (pool.isEmpty()) { pool.wait(); } key = (SelectionKey) pool.remove(0); } // 讀取客戶端數據,並觸發 onRead 事件 read(key); } catch (Exception e) { continue; } } } .... }
public final class Writer extends Thread { public void run() { while (true) { try { SelectionKey key; synchronized (pool) { while (pool.isEmpty()) { pool.wait(); } key = (SelectionKey) pool.remove(0); } // 向客戶端發送數據,而後關閉鏈接,並分別觸發 onWrite,onClosed 事件 write(key); } catch (Exception e) { continue; } } } .... }
NIO 多線程模型的實現告一段落,如今咱們能夠暫且將 NIO 的各個 API 和煩瑣的調用方法拋於腦後,專心於咱們的實際應用中。
咱們用一個簡單的 TimeServer(時間查詢服務器)來看看該模型能帶來多麼簡潔的開發方式。
在這個 TimeServer 中,將提供兩種語言(中文、英文)的時間查詢服務。咱們將讀取客戶端的查詢命令(GB/EN),並回應相應語言格式的當前時間。在應答客戶的請求的同時,服務器將進行日誌記錄。作爲示例,對日誌記錄,咱們只是簡單地將客戶端的訪問時間和 IP 地址輸出到服務器的終端上。
public class TimeHandler extends EventAdapter { public TimeHandler() { } public void onWrite(Request request, Response response) throws Exception { String command = new String(request.getDataInput()); String time = null; Date date = new Date(); // 判斷查詢命令 if (command.equals("GB")) { // 中文格式 DateFormat cnDate = DateFormat.getDateTimeInstance(DateFormat.FulL, DateFormat.FulL, Locale.CHINA); time = cnDate.format(date); } else { // 英文格式 DateFormat enDate = DateFormat.getDateTimeInstance(DateFormat.FulL, DateFormat.FulL, Locale.US); time = enDate.format(date); } response.send(time.getBytes()); } }
public class LogHandler extends EventAdapter { public LogHandler() { } public void onClosed(Request request) throws Exception { String log = new Date().toString() + " from " + request.getAddress() .toString(); System.out.println(log); } public void onError(String error) { System.out.println("Error: " + error); } }
public class Start { public static void main(String[] args) { try { LogHandler loger = new LogHandler(); TimeHandler timer = new TimeHandler(); Notifier notifier = Notifier.getNotifier(); notifier.addlistener(loger); notifier.addlistener(timer); System.out.println("Server starting ..."); Server server = new Server(5100); Thread tServer = new Thread(server); tServer.start(); } catch (Exception e) { System.out.println("Server error: " + e.getMessage()); System.exit(-1); } } }
經過例子咱們能夠看到,基於事件回調的 NIO 多線程服務器模型,提供了清晰直觀的實現方式,可以讓開發者從 NIO 及多線程的技術細節中擺脫出來,集中精力關注具體的業務實現。
原文:http://www.ibm.com/developerworks/cn/java/l-niosvr/