本篇文章主要使用IO和NIO的形式來實現一個簡單的聊天室,而且說明IO方法存在的問題,而NIO又是如何解決的。java
大概的框架爲,先提供思路和大概框架圖——代碼——問題及解決方式,這樣會容易看一點。服務器
下面編寫一個簡單的聊天室,大概須要的功能就是服務端維護一個聊天室,裏邊的客戶端發送消息以後服務將其消息轉發給其餘客戶端,達到一個聊天室的效果。多線程
大體的思路:服務端區分職責,分紅兩部分,主線程負責接收鏈接並把鏈接放入到線程池中處理,維護一個線程池,全部對於socket的處理都交給線程池中的線程來處理。以下圖。架構
下面貼上demo代碼(代碼中有幾處爲了方便並無採用最規範的定義方式,如線程池的建立和Map初始化的時候未設置初始容量等)併發
代碼分五個類,服務端(ChatServer,監聽做用,爲服務端主線程)、客戶端(ChatClient)、服務端處理器(ServerHandler,能夠理解爲線程池中要執行的事情)、客戶端處理器(ClientHandler,客戶端讀寫服務器消息的處理),工具類(SocketUtils,只有一個發送消息方法)。框架
服務端:socket
/** * 服務端啓動類 * 主要負責監聽客戶端鏈接 */ public class ChatServer { public static void main(String[] args) { ServerSocket serverSocket = null; /*----------爲了方便使用Executors建立線程-------------*/ ExecutorService handlerThreadPool = Executors.newFixedThreadPool(100); try { serverSocket = new ServerSocket(8888); while (true) { System.out.println("-----------阻塞等待鏈接------------"); Socket socket = serverSocket.accept(); String key = socket.getInetAddress().getHostAddress() + ":" + socket.getPort(); System.err.println(key + "已鏈接"); // 主線程只接收,處理直接交給處理線程池 handlerThreadPool.execute(new ServerHandler(socket)); } } catch (IOException e) { e.printStackTrace(); if (Objects.nonNull(serverSocket)) { try { serverSocket.close(); } catch (IOException ioException) { ioException.printStackTrace(); } } } } }
服務端處理類:ide
/** * 服務端socket事件處理類 * 負責處理對應socket中的讀寫操做 */ public class ServerHandler implements Runnable { /** * 鏈接到服務端的全部鏈接 socket的地址端口->socket */ private static final Map<String, Socket> socketMap = new ConcurrentHashMap<>(); /** * 維護名稱和地址的map */ private static final Map<String, String> nameMap = new ConcurrentHashMap<>(); private Socket socket; /** * 每一個socket的標識,使用地址+端口構成 */ private String key; public ServerHandler() { } public ServerHandler(Socket socket) { this.socket = socket; this.key = socket.getInetAddress().getHostAddress() + ":" + socket.getPort(); } @Override public void run() { Socket s = socket; // 根據消息執行不一樣操做 InputStream inputStream; // debug查看數據用 // Map<String, Socket> tmpMap = socketMap; try { inputStream = s.getInputStream(); Scanner scanner = new Scanner(inputStream); while (true) { String line = scanner.nextLine(); if (line.startsWith("register")) { // 登記 String[] split = line.split(":"); String name = split[1]; String msg; // 校驗是否存在 if (socketMap.containsKey(key)) { msg = "請勿重複登記"; sendMsg(s, msg); return; } if (nameMap.containsValue(name)) { msg = "名稱已被登記,請換一個名稱"; sendMsg(s, msg); return; } // 通知本身已鏈接 sendMsg(s, "已鏈接到服務器"); msg = name + "進入聊天室"; // 將消息轉發給其餘客戶端 sendMsgToClients(msg); // 放入socket池 socketMap.put(key, s); nameMap.put(key, name); System.err.println(name + "已登記"); } else if (line.trim().equalsIgnoreCase("end")) { if (notPassRegisterValidate()) { continue; } // 斷開鏈接 socketMap.remove(key); String name = nameMap.get(key); String msg = name + "離開聊天室"; System.err.println(msg); // 將消息轉發給其餘客戶端 sendMsgToClients(msg); msg = "已斷開鏈接"; // 發送給對應的鏈接斷開信息 sendMsg(s, msg); inputStream.close(); break; } else { if (notPassRegisterValidate()) { continue; } // 正常通訊 String name = nameMap.get(key); String msg = name + ":" + line; // 將消息轉發給其餘客戶端 sendMsgToClients(msg); } } } catch (IOException e) { e.printStackTrace(); } } /** * 是否已登陸校驗 * * @return 是否已登陸 */ private boolean notPassRegisterValidate() { boolean hasRegister = nameMap.containsKey(key); if (hasRegister) { return false; } String msg = "您還未登陸,請先登陸"; sendMsg(socket, msg); return true; } /** * 往鏈接發送消息 * * @param socket 客戶端鏈接 * @param msg 消息 */ private void sendMsg(Socket socket, String msg) { SocketUtils.sendMsg(socket, msg); if (socket.isClosed()) { socketMap.remove(key); } } /** * 發送給其餘客戶端信息 * * @param msg 信息 */ private void sendMsgToClients(String msg) { for (Map.Entry<String, Socket> entry : socketMap.entrySet()) { if (this.key.equals(entry.getKey())) { continue; } sendMsg(entry.getValue(), msg); } } }
工具類(一個發送消息的方法):函數
public class SocketUtils { private SocketUtils() { } public static void sendMsg(Socket socket, String msg) { Socket s = socket; OutputStream outputStream = null; msg += "\r\n"; try { outputStream = s.getOutputStream(); outputStream.write(msg.getBytes(StandardCharsets.UTF_8)); outputStream.flush(); } catch (IOException e) { System.err.println("發送消息失敗, 鏈接已斷開"); try { if (Objects.nonNull(outputStream)) { outputStream.close(); } socket.close(); } catch (IOException ioException) { ioException.printStackTrace(); } } } }
客戶端:高併發
/** * 客戶端讀和寫各自使用一個線程 */ public class ChatClient { public static void main(String[] args) { Socket socket; ExecutorService clientHandlerPool = Executors.newFixedThreadPool(2); try { socket = new Socket("localhost", 8888); // 寫線程 clientHandlerPool.execute(new ClientHandler(socket, 1)); // 讀線程 clientHandlerPool.execute(new ClientHandler(socket, 0)); } catch (IOException e) { e.printStackTrace(); } } }
客戶端處理器:
/** * 客戶端處理器 * 根據type來區分是作讀工做仍是寫工做 */ public class ClientHandler implements Runnable { private Socket socket; /** * 處理類型,0-讀、1-寫 */ private int type; public ClientHandler() { throw new IllegalArgumentException("不能使用沒有參數的構造函數"); } public ClientHandler(Socket socket, int type) { this.socket = socket; this.type = type; } @Override public void run() { if (type == 1) { // 進行寫操做 doWriteJob(); return; } // 默認讀操做 doReadJob(); } /** * 讀操做 */ private void doReadJob() { Socket s = socket; InputStream inputStream; try { inputStream = s.getInputStream(); Scanner scanner = new Scanner(inputStream); while (true) { String line = scanner.nextLine(); if (null != line && !"".equals(line)) { System.err.println(line); } // 若是已退出了,那麼關閉鏈接 if ("已斷開鏈接".equals(line)) { socket.close(); break; } } } catch (IOException e) { e.printStackTrace(); try { socket.close(); } catch (IOException ioException) { ioException.printStackTrace(); } } } /** * 寫線程 */ private void doWriteJob() { Socket s = socket; try { Scanner scanner = new Scanner(System.in); while (true) { String output = scanner.nextLine(); if (Objects.nonNull(output) && !"".equals(output)) { SocketUtils.sendMsg(s, output); } } } catch (Exception e) { e.printStackTrace(); System.err.println("錯誤發生了:" + e.getMessage()); } } }
結果:
思考:當前這樣實現有什麼瓶頸,可能會出現什麼問題?
存在問題:
- 服務端使用accept阻塞接收線程,鏈接一個一個處理,在高併發下處理性能緩慢。
- 沒有鏈接的時候線程一直處於阻塞狀態形成資源的浪費(若是使用多線程接收處理併發,那麼沒鏈接的時候形成多個線程的資源浪費)。
那咱們來看下NIO是怎麼解決上方的問題的,首先上這個demo總體的架構圖。
大概的邏輯爲
- 服務端將ServerSocketChannel註冊到Selector中,客戶端鏈接進來的時候事件觸發,將客戶端的鏈接註冊到selector中。
- 主線程負責selector的輪詢工做,發現有事件能夠處理就將其交給線程池。
- 客戶端同理分紅兩個部分,寫操做和讀操做,每一個操做由一個線程單獨完成;可是若是讀操做處理使用while循環不斷輪詢等待接收的話,CPU會飆升,因此須要客戶端新建一個selector來解決這個問題,注意這個selector跟服務端不是同一個,沒有啥關係。
代碼分類大體跟IO寫法同樣,分紅服務端、服務端處理器、客戶端、客戶端處理器,下面爲demo。
服務端:
public class ChatServer { private Selector selector; private ServerSocketChannel serverSocketChannel; private static final ExecutorService handlerPool = Executors.newFixedThreadPool(100); public ChatServer() throws IOException { this.selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); ServerSocket serverSocket = serverSocketChannel.socket(); serverSocket.bind(new InetSocketAddress(9999)); // 將服務端的socket註冊到selector中,接收客戶端,並將其註冊到selector中,其自己也是selector中的一個I/O事件 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.err.println("聊天室服務端初始化結束"); } /** * 啓動方法 * 1.監聽,拿到以後進行處理 */ public void start() throws IOException { int count; while (true) { // 可能出現select方法沒阻塞,空輪詢致使死循環的狀況 count = selector.select(); if (count > 0) { Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); // 交給線程池處理 handlerPool.execute(new ServerHandler(key, selector)); // 處理完成後移除 iterator.remove(); } } } } public static void main(String[] args) throws IOException { new ChatServer().start(); } }
服務端處理器:
public class ServerHandler implements Runnable { private SelectionKey key; private Selector selector; public ServerHandler() { } /** * 原本能夠經過key拿到selector,這裏爲了圖方便就這樣寫了 */ public ServerHandler(SelectionKey key, Selector selector) { this.key = key; this.selector = selector; } @Override public void run() { try { if (key.isAcceptable()) { // 說明是服務端的事件,注意這裏強轉換爲的是ServerSocketChannel ServerSocketChannel channel = (ServerSocketChannel) key.channel(); // 接收鏈接 SocketChannel socket = channel.accept(); if (Objects.isNull(socket)) { return; } socket.configureBlocking(false); // 接收客戶端的socket而且將其註冊到服務端這邊的selector中,注意客戶端在此時跟服務端selector產生關聯 socket.register(selector, SelectionKey.OP_READ); System.err.println("服務端已接收鏈接"); } else if (key.isReadable()) { // 客戶端發送信息過來了 doReadJob(); } } catch (IOException e) { e.printStackTrace(); // 錯誤處理 } } /** * 讀取操做 */ private void doReadJob() throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); int readCount = socketChannel.read(buffer); if (readCount > 0) { String msg = new String(buffer.array(), StandardCharsets.UTF_8); System.err.println(socketChannel.getRemoteAddress().toString() + "的信息爲:" + msg); // 轉發給其餘客戶端 sendMsgToOtherClients(msg); } } /** * 轉發消息給其餘客戶端 * * @param msg 消息 */ private void sendMsgToOtherClients(String msg) throws IOException { SocketChannel self = (SocketChannel) key.channel(); Set<SelectionKey> keys = selector.keys(); Iterator<SelectionKey> iterator = keys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); SelectableChannel channel = selectionKey.channel(); // 若是是自己或者不是socketChannel類型則跳過 if (self.equals(channel) || channel instanceof ServerSocketChannel) { continue; } SocketChannel socketChannel = (SocketChannel) channel; ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8)); socketChannel.write(byteBuffer); } } }
客戶端:
public class ChatClient { private Selector selector; private SocketChannel socketChannel; private static ExecutorService dealPool = Executors.newFixedThreadPool(2); public ChatClient() throws IOException { /* * 說明一下: * 客戶端這邊的selector跟剛纔在服務端定義的selector是不一樣的兩個selector * 客戶端這邊不須要selector也能實現功能,可是讀取的時候必須不斷的循環,會致使CPU飆升, * 因此使用selector是爲了解決這個問題的,別跟服務端的selector搞混就好 */ selector = Selector.open(); socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9999)); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); } public void start() throws IOException, InterruptedException { // 鏈接 // socketChannel.connect(new InetSocketAddress("localhost", 9999)); while (!socketChannel.finishConnect()) { System.err.println("正在鏈接..."); TimeUnit.MILLISECONDS.sleep(200); } System.err.println("鏈接成功"); // 使用兩個線程來分別處理讀取和寫操做 // 寫數據 dealPool.execute(new ClientHandler(selector, socketChannel, 1)); // 讀取數據 dealPool.execute(new ClientHandler(selector, socketChannel, 0)); } public static void main(String[] args) throws IOException, InterruptedException { new ChatClient().start(); } }
客戶端處理器:
public class ClientHandler implements Runnable { private Selector selector; private SocketChannel socketChannel; /** * 0-讀,1-寫 */ private int type; public ClientHandler() { } public ClientHandler(Selector selector, SocketChannel socketChannel, int type) { // selector是爲了解決讀時候CPU飆升的問題,具體見客戶端的啓動類代碼註釋 this.selector = selector; this.socketChannel = socketChannel; this.type = type; } @Override public void run() { try { if (type == 0) { doClientReadJob(); return; } doClientWriteJob(); } catch (IOException e) { e.printStackTrace(); } } /** * 寫操做 */ private void doClientWriteJob() throws IOException { SocketChannel sc = socketChannel; Scanner scanner = new Scanner(System.in); while (true) { if (scanner.hasNextLine()) { String line = scanner.nextLine(); if (null != line && !"".equals(line)) { ByteBuffer buffer = ByteBuffer.wrap(line.getBytes(StandardCharsets.UTF_8)); sc.write(buffer); } } } } /** * 讀操做 */ private void doClientReadJob() throws IOException { SocketChannel sc = socketChannel; ByteBuffer buf = ByteBuffer.allocate(1024); while (true) { int select = selector.select(); if (select > 0) { Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { // 這是必須的,否則下方的remove會出錯 SelectionKey next = iterator.next(); // 這裏由於只有自己這個客戶端註冊到客戶端的selector中,因此有事件必定是它的,也就不用從key拿了,直接操做就行 buf.clear(); int read = sc.read(buf); if (read > 0) { String msg = new String(buf.array(), StandardCharsets.UTF_8); System.err.println(msg); } // 事件處理完以後要移除這個key,不然的話selector.select()方法不會再讀到這個key,即使有新的時間到這個channel來 iterator.remove(); } } } } }
結果圖:
在編寫的過程當中發現瞭如下兩點:
- select方法以後若是存在key,而且接下來的操做未對這個selectionKey作remove操做,那麼下次的select不會再將其選入,即使有事件發生,也就是說,select方法不會選擇以前已經選過的key。
- selector.select()方法中偶爾會出現不阻塞的狀況。這就是NIO中的空輪詢bug,也就是說,沒有鏈接又不阻塞的話,while(true) ... 的寫法就是一個死循環,會致使CPU飆升。
第二點問題在NIO框架(如netty)中都採用了比較好的解決方法,能夠去查下如何解決的。接下來看下NIO的寫法是否解決了IO寫法中存在的問題:
服務端使用accept阻塞接收線程,鏈接一個一個處理,在高併發下處理性能緩慢。
答:上述寫法中仍是使用一個ServerSocketChannel來接收客戶端,沒有解決這個問題;可是能夠經過使用線程池的方式來解決。也就是說將服務端的事件分紅兩個部分,第一個部分爲接收客戶端,使用一個線程池來維護;第二個部分爲客戶端的事件處理操做,也維護一個線程池來執行這些事件。
這樣性能上去了,因爲selector的存在也不會出現資源浪費的事情,netty就是這麼作的哦。
沒有鏈接的時候線程一直處於阻塞狀態形成資源的浪費(若是使用多線程接收處理併發,那麼沒鏈接的時候形成多個線程的資源浪費)。
答:解決。NIO寫法主要有selector不斷輪詢,不會出現沒鏈接不做爲的狀況,並且多個鏈接的話也沒有問題(參考1的回答)。
兩種寫法都有Reactor模式的影子,可是IO寫法有明顯的缺點就是若是沒有鏈接會形成資源浪費的問題(採用多個接收鏈接的話更甚),而NIO中selector輪詢機制就很好的解決了無鏈接時無做爲的狀況,而且在性能方面能夠經過職責分類和線程池來獲得改善,因此,NIO,永遠滴神。
須要壓力,須要努力。