經過上篇socke基礎,咱們回顧了下socket的用法。上篇內容很簡單,服務端也只是接收了一個客戶端的鏈接,接下來咱們就升級下咱們的demo,使其像一個真正的服務器。java
首先咱們容許服務端接收多個客戶端的鏈接。修改OioServer
以下服務器
代碼2-1併發
public class OioServer { private ServerSocket serverSocket; public void start() { Socket socket = null; try { openServer(8081); if (Objects.isNull(serverSocket)) { return; } while (true) { socket = listenAccept(); handleSocket(socket); } } catch (Exception e) { e.printStackTrace(); SocketUtils.closeServerSocketSafely(serverSocket); SocketUtils.closeSocketSafely(socket); } } private void handleSocket(Socket socket) { new Thread(() -> { while (!socket.isClosed()) { String msg = SocketUtils.read(socket); SocketUtils.write(socket, " I get you" + msg); } }).start(); } public void openServer(int port) throws IOException { // 1 建立ServerSocket serverSocket = new ServerSocket(); // 2 綁定端口 SocketAddress socketAddress = new InetSocketAddress(port); serverSocket.bind(socketAddress); // 3 accept客戶端 } public Socket listenAccept() throws IOException { return serverSocket.accept(); } }
當調用start()方法後,咱們服務器就開始監聽8081接口了。而後每次一個客戶端鏈接進來,咱們就會獲得一個socket,而後咱們建立一個線程去處理這個socket。app
爲何要建立新的線程?由於socket讀寫都是阻塞的,若是不啓動新線程,那主線程就會被阻塞。這個時候,有新的客戶端鏈接進來將不會被處理。可是,咱們爲每一個socket建立一個線程,這樣是有代價的,而且咱們服務器是不可能建立無數個線程的。固咱們使用爲每一個socket建立一個線程這種方法在高併發的狀況下顯然是不可行的。那麼有什麼方法改進嗎?答案是確定的。如今java有了nio,可是我如今不急於把這個王炸展現出來,讓咱們一步步靠近它,並揭開它的神祕面紗。socket
如今咱們知道了爲每一個socket建立一個線程是由於,socket的操做(讀或寫)是阻塞的,那咱們不讓它阻塞不就能夠了?有辦法嗎?有。對於讀,咱們可使用inputStream.available()
;來判斷一下,是否可讀,不可讀咱們就不調用阻塞方法 inputStream.read(bytes)
。因而咱們再SocketUtils
中天加一個方法高併發
代碼2-2學習
/** * 從socket中讀數據 */ public static ReadResult readWithNoBlocking(Socket socket) { try { InputStream inputStream = socket.getInputStream(); byte[] bytes = new byte[1024]; int len; StringBuilder sb = new StringBuilder(); if (inputStream.available() <= 0) { return ReadResult.unReadableResult(); } while ((len = inputStream.read(bytes)) != -1) { sb.append(new String(bytes, 0, len, "UTF-8")); if (inputStream.available() <= 0) { return ReadResult.readableResult(sb.toString()); } } return ReadResult.readableResult(sb.toString()); } catch (IOException e) { e.printStackTrace(); return ReadResult.unReadableResult(); } }
而後修改OioServer,ui
代碼2-4this
public class OioServer { private ServerSocket serverSocket; private volatile List<Socket> socketList = new ArrayList<>(); ... public void start() { Socket socket = null; try { openServer(8081); // 開啓處理socket鏈接的線程 startChildHandler(); // 主線程監聽鏈接 while (true) { Socket socket = listenAccept(); handleSocket(socket); } } catch (Exception e) { e.printStackTrace(); SocketUtils.closeServerSocketSafely(serverSocket); SocketUtils.closeSocketSafely(socket); } } // 添加socket到socketList中 private void handleSocket(Socket socket) { socketList.add(socket); } // 處理全部socket private void startChildHandler() { new Thread(() -> { while (true) { for (Socket socketToDeal : socketList) { ReadResult readResult = SocketUtils.readWithNoBlocking(socketToDeal); if (readResult.readable()) { System.out.println("收到客戶端消息" + socketToDeal.getInetAddress().toString() + " " + readResult.result()); SocketUtils.write(socketToDeal, "Get u:" + readResult.result()); } } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); }
首先咱們修改了handleSocket方法,是新建的socket添加到socketList中,由於咱們有了SocketUtils.readWithNoBlocking
方法,讀操做不再會阻塞住線程了,這樣咱們就能夠在循環中不斷堅持全部的socket是否有消息發過來,並處理。線程
雖然上述代碼健壯性有待考證,可是咱們確實獲得了一個只要一個線程就能夠處理全部socket的服務器模型。也能夠說,這是簡易版的nio服務器。
如今咱們已經有一個nio 的server了,可是,徹底是沒有章法的編寫的,若是要增長功能,或者定製化一些東西,那必需要修改OioServer
,這違反了開閉原則。所以咱們須要提取一些通用邏輯,將邏輯的處理交給使用方,下面是以可讀爲例。
代碼2-5
public class NioServer { private ServerSocket serverSocket; private volatile List<SocketContext> socketList = new ArrayList<>(); private volatile List<SocketContext> statusChangedContext = new ArrayList<>(); public void start(int port) { // 監聽端口線程 new Thread(() ->{ Socket socket = null; try { openServer(port); startChildHandler(); while (true) { socket = listenAccept(); handleSocket(socket); } } catch (Exception e) { e.printStackTrace(); SocketUtils.closeServerSocketSafely(serverSocket); SocketUtils.closeSocketSafely(socket); } }).start(); } // 監聽全部socket private void startChildHandler() { new Thread(() -> { while (true) { for (SocketContext socketToDeal : socketList) { ReadResult readResult = SocketUtils.readWithNoBlocking(socketToDeal.getSocket()); if (readResult.readable()) { // 若是socket可讀,將其加入到statusChangedContext中,並喚醒調用線程 socketToDeal.setStatus(SocketContext.STATUS_READABLE); socketToDeal.setMsg(readResult.result()); statusChangedContext.add(socketToDeal); synchronized (statusChangedContext) { statusChangedContext.notifyAll(); } } } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } private void handleSocket(Socket socket) { SocketContext socketContext = new SocketContext(); socketContext.setSocket(socket); socketList.add(socketContext); } private void openServer(int port) throws IOException { // 1 建立ServerSocket serverSocket = new ServerSocket(); // 2 綁定端口 SocketAddress socketAddress = new InetSocketAddress(port); serverSocket.bind(socketAddress); // 3 accept客戶端 } private Socket listenAccept() throws IOException { return serverSocket.accept(); } public List<SocketContext> getStatusChangedContext() { if (statusChangedContext.size() == 0) { try { // 當statusChangedContext爲空,也就是沒有事件要處理的時候,咱們掛起調用方線程,這樣能夠節約資源 synchronized (statusChangedContext) { statusChangedContext.wait(); } } catch (InterruptedException e) { e.printStackTrace(); } } return statusChangedContext; } public static class SocketContext { public static final int STATUS_READABLE = 1; private Socket socket; private int status; private String msg; public Socket getSocket() { return socket; } public void setSocket(Socket socket) { this.socket = socket; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public String read() { return msg; } public void setMsg(String msg) { this.msg = msg; } public void write(String msg) { SocketUtils.write(this.socket, msg); } } }
而後咱們就能夠這樣使用它了
代碼2-6
public class NioServerTest { @Test public void test() { NioSocket server = new NioSocket(); server.start(8081); while (true) { Iterator<SocketContext> socketContexts = server.getStatusChangedContext().iterator(); while (socketContexts.hasNext()) { SocketContext context = socketContexts.next(); socketContexts.remove(); if (context.getStatus() == SocketContext.STATUS_READABLE) { // 處理讀 System.out.println(context.read()); context.write("Ok"); } } } } }
從代碼2-4
到代碼2-5
邏輯跨越應該不大,這裏解釋下2-5
的一些細節.
爲了讓NioSocket
在後臺持續監聽咱們設定的端口,咱們將 socket = listenAccept(); handleSocket(socket);
這兩個步驟放入一個單獨的線程。每次有客戶端接入,便會獲得一個新的socket,將這個新的socket加入到socketList
中,而後在startChildHandler
啓動的線程中遍歷全部socket,並判斷其狀態改變(可讀)。
爲了把業務控制權交於調用方,在本例中也就是NioSocketTest.test
。我定義看一個變量statusChangedContext
,若是有socket可讀,則將其包裝成SocketContext
加入到statusChangedContext
中取。這樣,調用方直接拿到statusChangedContext
去遍歷,就能夠處理全部的socket的讀事件。
當調用方調用getStatusChangedContext()
方法時,若是此時statusChangedContext
爲空,則調用線程會被掛起,知道有可讀事件出現,調用線程被喚醒(statusChangedContext.notifyAll()
)
若是看官老爺讀了上面兩部分,那麼至少對nio的使用已經有所領悟了。上面咱們自制了一個nio 的socket,雖然只能對read事件做出反應,可是其餘的事件,好比,可寫、socket斷開等事件也是能夠按照這個思路去作的。那麼咱們就能夠無縫切入java nio了。
代碼2-7
public class NioServer { private Selector selector; private Selector chiledSelector; public void start(int port) throws IOException { // 經過open()方法找到Selector selector = Selector.open(); chiledSelector = Selector.open(); // 打開服務器套接字通道 ServerSocketChannel ssc = ServerSocketChannel.open(); // 服務器配置爲非阻塞 ssc.configureBlocking(false); // 進行服務的綁定 ssc.bind(new InetSocketAddress("localhost", port)); // 註冊到selector,等待鏈接 SelectionKey selectionKey = ssc.register(selector, 0); selectionKey.interestOps(SelectionKey.OP_ACCEPT); while (!Thread.currentThread().isInterrupted()) { selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> keyIterator = keys.iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if (!key.isValid()) { continue; } if (key.isAcceptable()) { SocketChannel clientChannel = ssc.accept(); handleSocket(clientChannel); } keyIterator.remove(); //該事件已經處理,能夠丟棄 } } } public Set<SelectionKey> getStatusChangedContext() throws IOException { chiledSelector.select(); return chiledSelector.selectedKeys(); } private void handleSocket(SocketChannel clientChannel) throws IOException { clientChannel.configureBlocking(false); clientChannel.register(chiledSelector, SelectionKey.OP_READ); System.out.println("a new client connected " + clientChannel.getRemoteAddress()); } public void write(SelectionKey key, String msg) throws IOException, ClosedChannelException { SocketChannel channel = (SocketChannel) key.channel(); System.out.println("write:" + msg); ByteBuffer sendBuffer = ByteBuffer.allocate(1024); sendBuffer.clear(); sendBuffer.put(msg.getBytes()); sendBuffer.flip(); channel.write(sendBuffer); channel.register(chiledSelector, SelectionKey.OP_READ); } public String read(SelectionKey key) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer readBuffer = ByteBuffer.allocate(1024); readBuffer.clear(); int numRead; try { numRead = socketChannel.read(readBuffer); } catch (IOException e) { key.cancel(); socketChannel.close(); return null; } return new String(readBuffer.array(), 0, numRead); } }
代碼2-8
public class NioServerTest { @Test public void test() throws IOException { NioServer server = new NioServer(); server.start(8081); while (true) { Iterator<SelectionKey> socketContexts = server.getStatusChangedContext().iterator(); while (socketContexts.hasNext()) { SelectionKey key = socketContexts.next(); socketContexts.remove(); if ((key.readyOps() & SelectionKey.OP_READ) != 0) { System.out.println(server.read(key)); server.write(key, "Ok"); } } } } }
上面利用java nio寫的server跟咱們本身實現的nio寫的server效果是同樣的。咱們本身建立監聽客戶端線程,還有處理socket線程的工做,交給了java nio內部(固然不是簡單的起了兩個線程而已,我只是簡化了這個模型)。
在java nio中,socket不在是socket,而是SocketChannel,這裏你們暫時理解他倆等價吧。而後一個Selector就至關於一個線程,而後咱們將channel與selector經過register
方法關聯起來,並指定咱們感興趣的事。注意:這裏跟咱們本身實現的nio有區別,咱們沒有提供註冊興趣事件,而是默認對可讀事件感興趣。而後咱們調selector.select()方法,一樣,這個方法沒有事件發生會阻塞。而後獲得事件集合去遍歷處理。
這篇文章,咱們經過bio的socket本身經過線程和循環實現了服務端,並有了事件的概念。而後咱們又用Nio的方式去實現了相同的功能。經過兩種方式,咱們很天然的理解了Nio的使用及基本原理,下一章咱們將會更加細緻的學習Java NIO.