計算機網絡能夠說是每一個學計算機的都繞不過去的一道坎。計算機網絡到底有多麼重要,你走到大學圖書館的計算機部分,翻開那些什麼《從零開始:黑客XXX》,《黑客攻防從入門到放棄》等書籍,基本第一部分都是在談論網絡。你去一些X客論壇,上面的教程帖也基本都是從網絡部分開始的。html
相信每一位科班出身的,都學習過《計算機網絡》這樣書籍, 上過這樣的課程。固然教師資源如何,我這裏就不談論,那必定又會引出一頓苦水。可是學習完這樣的課程,咱們仍是對計算機網絡感到十分迷茫。這時候的咱們能夠背下網絡七層模型,網絡五層模型等,瞭解局域網,IP等基本地概念,可是計算機網絡對於咱們來講,仍是一個十分空蕩蕩的名詞。java
爲了更好地瞭解網絡(絕對不是由於那時候很迷黑客的緣故),我決定參加高級網絡工程師的考試。經過網絡工程師的我對計算機網絡有了更爲深刻的理解,開始將本身的計算機網絡體系從概念上勾連起來。也許我能夠看懂其中的一些路由規則,甚至看懂一些路由分發的論文。可是我依舊只是站在理論的角度,去理解計算機網絡。react
到了工做的時候,開始瞭解Socket編程,開始參與各類實際生產環境的編程。這個時候的我開始對網絡有了雖然簡單,可是十分真實的接觸。計算機網絡再也不只是停留在書本中的概念,而是我用以實現業務目標的切實手段。linux
隨着工做中開始負責物聯網項目的建設,我對網絡中的各類協議開始有了本身的認識,甚至能夠本身實現網絡協議規範的代碼落地。於此同時,因爲對網絡交互的性能要求,我再也不只是會使用BIO網絡編程,我開始使用NIO網絡編程。web
爲了本身的知識儲備,也是爲了知足本身的好奇心,我查找了許多的資料,也報了許多課程,去學習網絡編程。而我正好週六完成了軟考的又一次考試,因此接下來有必定空閒時間的我,接下來會繼續整理個人知識,並將它寫成博客。算法
這篇博客的主要內容就是按照演變的順序,寫下BIO->NIO->Reactor->Netty這樣的四個里程碑。這也是大佬們推薦的計算機網絡編程的學習路線。不過此次只是給個總體的認識以及demo,更爲深刻的原理探究,會放在後面。數據庫
幾乎每一個人都是BIO開始的計算機網絡編程,而其中大部分也永遠地留在了這個計算機網絡編程的模型。編程
這裏給出一些簡單的demo,供你們認識。bootstrap
package tech.jarry.learning.netease; import java.io.IOException; import java.io.OutputStream; import java.net.Inet4Address; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.charset.Charset; import java.util.Scanner; /** * @Description: * @Author: jarry */ public class BIOClient { private static final Charset charset = Charset.forName("utf-8"); public static void main(String[] args) throws IOException { Socket socket = new Socket(); // Socket socket = new Socket("localhost", 8080); // 我還覺得能夠的。可是貌似上面的8080表示目標端口,而下面的8080表示源端口(發送端口) // socket.bind(new InetSocketAddress("localhost", 8080)); // 後來纔去肯定,.bind是用於綁定源信息,而.connect是用於綁定目標信息 socket.connect(new InetSocketAddress(Inet4Address.getLocalHost(), 8080)); OutputStream outputStream = socket.getOutputStream(); Scanner scanner = new Scanner(System.in); System.out.println("please input: "); String msg = scanner.nextLine(); outputStream.write(msg.getBytes(charset)); scanner.close(); outputStream.close(); socket.close(); } }
package tech.jarry.learning.netease; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; /** * @Description: BIO模型中Server端的簡單實現 * @Author: jarry */ public class BIOServer { public static void main(String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress(8080)); System.out.println("server has started"); while (!serverSocket.isClosed()) { Socket requestClient = serverSocket.accept(); System.out.println("server get a connection: " + requestClient.toString()); InputStream requestInputStream = requestClient.getInputStream(); BufferedReader reader = new BufferedReader(new InputStreamReader(requestInputStream)); String msg; while ((msg = reader.readLine()) != null) { if (msg.length() == 0) { break; } System.out.println(msg); } System.out.println("server has receive a message from: " + requestClient.toString()); requestInputStream.close(); requestClient.close(); } serverSocket.close(); } }
package tech.jarry.learning.netease; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @Description: 直接對原有代碼BIOServer進行暴力修改,將其阻塞部分,經過多線程實現異步處理 * @Author: jarry */ public class BIOServer1 { private static ExecutorService executorService = Executors.newCachedThreadPool(); public static void main(String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress(8080)); System.out.println("server has started"); while (!serverSocket.isClosed()) { Socket requestClient = serverSocket.accept(); System.out.println("server get a connection: " + requestClient.toString()); executorService.submit(new Runnable() { @Override public void run() { InputStream requestInputStream = null; try { requestInputStream = requestClient.getInputStream(); } catch (IOException e) { e.printStackTrace(); } BufferedReader reader = new BufferedReader(new InputStreamReader(requestInputStream)); String msg = null; while (true) { try { if (!((msg = reader.readLine()) != null)) { break; } } catch (IOException e) { e.printStackTrace(); } if (msg.length() == 0) { break; } System.out.println(msg); } System.out.println("server has receive a message from: " + requestClient.toString()); try { requestInputStream.close(); requestClient.close(); } catch (IOException e) { e.printStackTrace(); } } }); } serverSocket.close(); } /** * 運行結果分析: * server has started * server get a connection: Socket[addr=/10.0.75.1,port=64042,localport=8080] * server get a connection: Socket[addr=/10.0.75.1,port=64052,localport=8080] * server get a connection: Socket[addr=/10.0.75.1,port=64061,localport=8080] * 123 * server has receive a message from: Socket[addr=/10.0.75.1,port=64042,localport=8080] * 456 * server has receive a message from: Socket[addr=/10.0.75.1,port=64052,localport=8080] * 789 * server has receive a message from: Socket[addr=/10.0.75.1,port=64061,localport=8080] */ }
package tech.jarry.learning.netease; import java.io.*; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; /** * @Description: 直接對原有代碼BIOServer進行暴力修改,增長了其http格式的返回,確保瀏覽器能夠正常訪問 * @Author: jarry */ public class BIOServer2 { public static void main(String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress(8080)); System.out.println("server has started"); while (!serverSocket.isClosed()) { Socket requestClient = serverSocket.accept(); System.out.println("server get a connection: " + requestClient.toString()); InputStream requestInputStream = requestClient.getInputStream(); BufferedReader reader = new BufferedReader(new InputStreamReader(requestInputStream)); String msg; while ((msg = reader.readLine()) != null) { if (msg.length() == 0) { break; } System.out.println(msg); } System.out.println("server has receive a message from: " + requestClient.toString()); // 返回數據,並確保能夠被http協議理解 OutputStream outputStream = requestClient.getOutputStream(); outputStream.write("HTTP/1.1 200 OK\r\r".getBytes("utf-8")); outputStream.write("Content-Length: 11\r\n\r\n".getBytes("utf-8")); outputStream.write("Hello World".getBytes("utf-8")); outputStream.flush(); requestInputStream.close(); outputStream.close(); requestClient.close(); } serverSocket.close(); } /** * 運行結果分析: */ // server has started // server get a connection: Socket[addr=/0:0:0:0:0:0:0:1,port=63008,localport=8080] // GET / HTTP/1.1 // Host: localhost:8080 // Connection: keep-alive // Cache-Control: max-age=0 // Upgrade-Insecure-Requests: 1 // User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36 // Sec-Fetch-Mode: navigate // Sec-Fetch-User: ?1 // Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3 // Sec-Fetch-Site: none // Accept-Encoding: gzip, deflate, br // Accept-Language: en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7 // Cookie: Webstorm-c7a2b5a2=b5e53f87-54cc-41d5-a21f-c7be3056dfe8; centcontrol_login_token=09E8A6B6888CB0B7A9F89AB3DB5FAFE4 // server has receive a message from: Socket[addr=/0:0:0:0:0:0:0:1,port=63008,localport=8080] // server get a connection: Socket[addr=/0:0:0:0:0:0:0:1,port=63009,localport=8080] // GET /favicon.ico HTTP/1.1 // Host: localhost:8080 // Connection: keep-alive // Pragma: no-cache // Cache-Control: no-cache // Sec-Fetch-Mode: no-cors // User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36 // Accept: image/webp,image/apng,image/*,*/*;q=0.8 // Sec-Fetch-Site: same-origin // Referer: http://localhost:8080/ // Accept-Encoding: gzip, deflate, br // Accept-Language: en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7 // Cookie: Webstorm-c7a2b5a2=b5e53f87-54cc-41d5-a21f-c7be3056dfe8; centcontrol_login_token=09E8A6B6888CB0B7A9F89AB3DB5FAFE4 // server has receive a message from: Socket[addr=/0:0:0:0:0:0:0:1,port=63009,localport=8080] }
上面的代碼是一套的,能夠進行Server與Client的通訊,功能較爲簡單。設計模式
因此這裏再給一個,能夠進行通訊的版本。簡單的業務場景能夠直接修改,應用。
package self; import java.io.*; import java.net.*; /** * @Description: * @Author: jarry */ public class Client { public static void main(String[] args) throws IOException { Socket socket = new Socket(); socket.setSoTimeout(2000); socket.connect(new InetSocketAddress(Inet4Address.getLocalHost(),2000),2000); System.out.println("client startup"); dealMsg(socket); socket.close(); } private static void dealMsg(Socket clientSocket) throws IOException { // 1.獲取鍵盤輸入流 InputStream systemInputStream = System.in; // 2.將systemInputStream轉化爲具備緩存功能的字符輸入流BufferedReader BufferedReader systemBufferedReader = new BufferedReader(new InputStreamReader(systemInputStream)); // 3.獲取Socket輸入流 InputStream socketInputStream = clientSocket.getInputStream(); // 4.將socketInputStream轉換爲具備緩存能力的字符輸入流 BufferedReader socketBufferedReader = new BufferedReader(new InputStreamReader(socketInputStream)); // 5.獲取Socket輸出流 OutputStream socketOutputStream = clientSocket.getOutputStream(); // 6.將socketOutputStream轉換爲打印流(用於發送String) PrintStream socketPrintStream = new PrintStream(socketOutputStream); // 用於確立鏈接狀態的標識符 boolean flag = true; // 7.利用循環,client與server進行交互 do { // 從鍵盤等系統輸入流獲取輸入字符串 String str = systemBufferedReader.readLine(); // 將str寫入到socketClient的打印流(本質是輸出流)。socketClient的輸出流是鏈接Server的,用於向Server發送數據的 socketPrintStream.println(str); // 從Server得到回寫(Server的回寫,必定會發送到socketClient的輸入流中(輸入的「入」是指入socketClient) String echo = socketBufferedReader.readLine(); // 創建一個用於關閉的方式 if ("bye".equalsIgnoreCase(echo)){ flag = false; }else{ // 在控制檯打印server的echo System.out.println("server echo:"+echo); } }while (flag); // 8.退出交互,須要關閉與Server鏈接的兩個資源(輸入與輸出) 考慮一下lombok的@Cleanup socketBufferedReader.close(); socketPrintStream.close(); } }
package self; import java.io.*; import java.net.ServerSocket; import java.net.Socket; /** * @Description: * @Author: jarry */ public class Server { public static void main(String[] args) throws IOException { // 創建Server的Socket,服務端不須要設置IP,以及Port // IP採用本地IP ServerSocket serverSocket = new ServerSocket(2000); System.out.println("server startup"); // 經過循環,對client的請求進行監聽 while (true){ // 得到client的請求 Socket clientRequest = serverSocket.accept(); // 異步處理client的請求 ClientHandler clientHandler = new ClientHandler(clientRequest); clientHandler.start(); } } private static class ClientHandler extends Thread { Socket socketClient = null; boolean flag = true; ClientHandler(Socket socketClient){ this.socketClient = socketClient; } @Override public void run() { super.run(); // 構建系統輸入流 InputStream systemInputStream = System.in; // 將系統輸入流轉換爲字符輸入流 BufferedReader systemBufferedReader = new BufferedReader(new InputStreamReader(systemInputStream)); try { // 構建socketClient的輸入流(即客戶端中,寫入client輸出流的數據) InputStream clientInputStream = socketClient.getInputStream(); // 將client的輸入流轉爲具備存儲能力的BufferedReader BufferedReader clientBufferedReader = new BufferedReader(new InputStreamReader(clientInputStream)); // 構建socketClient的輸出流(用於發送數據,即客戶端中,從client輸入流讀取的數據) OutputStream clientOutputStream = socketClient.getOutputStream(); // 將client的輸出流轉換爲打印流,便於輸出數據 PrintStream clientPrintStream = new PrintStream(clientOutputStream); // 經過循環,與客戶端進行交互 do { // 讀取從客戶端發送來的數據,即讀取socketClient的輸入流轉化的BufferedReader String str = clientBufferedReader.readLine(); if ("bye".equalsIgnoreCase(str)){ flag = false; clientPrintStream.println("connect interrupt"); }else{ System.out.println(str); // 發送回寫數據,即將回寫數據寫入socketClient的輸出流(客戶端的輸入流會獲取相關數據) clientPrintStream.println(str.length()); } // 從系統輸入中獲取想要發送的數據 String servStr = systemBufferedReader.readLine(); // 發送到客戶端 clientPrintStream.println(servStr); }while (flag); // 一樣的,關閉鏈接資源 clientBufferedReader.close(); clientPrintStream.close(); } catch (IOException e) { e.printStackTrace(); }finally { // 不管發生什麼,最後都要關閉socket鏈接 try { socketClient.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
爲了使得代碼結構更有優雅,而且爲了更好地處理字符編碼問題(demo中保留了各類數據類型的處理方式)。咱們將上述版本更新一下。
package example; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Inet4Address; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketException; import java.nio.ByteBuffer; public class Client { // 鏈接到遠程服務器的遠程端口 private static final int PORT = 20000; // 本地端口 private static final int LOCAL_PORT = 20001; public static void main(String[] args) throws IOException { // 建立Socket的操做,能夠選擇不一樣的建立方式 Socket socket = createSocket(); // Socket初始化操做 initSocket(socket); // 連接到本地20000端口,超時時間3秒,超過則拋出超時異常 socket.connect(new InetSocketAddress(Inet4Address.getLocalHost(), PORT), 3000); System.out.println("已發起服務器鏈接,並進入後續流程~"); System.out.println("客戶端信息:" + socket.getLocalAddress() + " P:" + socket.getLocalPort()); System.out.println("服務器信息:" + socket.getInetAddress() + " P:" + socket.getPort()); try { // 發送接收數據 todo(socket); } catch (Exception e) { System.out.println("異常關閉"); } // 釋放資源 socket.close(); System.out.println("客戶端已退出~"); } /** * 建立Socket * @return * @throws IOException */ private static Socket createSocket() throws IOException { /* // 無代理模式,等效於空構造函數 Socket socket = new Socket(Proxy.NO_PROXY); // 新建一份具備HTTP代理的套接字,傳輸數據將經過www.baidu.com:8080端口轉發 Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(Inet4Address.getByName("www.baidu.com"), 8800)); socket = new Socket(proxy); // 新建一個套接字,而且直接連接到本地20000的服務器上 socket = new Socket("localhost", PORT); // 新建一個套接字,而且直接連接到本地20000的服務器上 socket = new Socket(Inet4Address.getLocalHost(), PORT); // 新建一個套接字,而且直接連接到本地20000的服務器上,而且綁定到本地20001端口上 socket = new Socket("localhost", PORT, Inet4Address.getLocalHost(), LOCAL_PORT); socket = new Socket(Inet4Address.getLocalHost(), PORT, Inet4Address.getLocalHost(), LOCAL_PORT); */ // 推薦無參構造,由於其它(上面)的構造方法都是包含構造,設參,以及connect操做。而socket一旦connect後,設置參數的操做就無效了。不便於靈活使用 Socket socket = new Socket(); // 綁定到本地20001端口 socket.bind(new InetSocketAddress(Inet4Address.getLocalHost(), LOCAL_PORT)); return socket; } private static void initSocket(Socket socket) throws SocketException { // 設置讀取超時時間爲2秒 socket.setSoTimeout(2000); // 是否複用未徹底關閉的Socket地址,對於指定bind操做後的套接字有效(正常Socket關閉後,對應端口在兩分鐘內將再也不復用。而這個設置將能夠直接使用對應空置端口) socket.setReuseAddress(true); // 是否開啓Nagle算法(開啓後,兩點:第一,會對收到的每次數據進行ACK,另外一端只有在接收到對應ACK,纔會繼續發送數據。第二,若是有數據堆積,會一次將全部堆積數據發出去(畢竟這種模式有數據堆積是正常的) // 開啓後,更爲嚴謹,嚴格,安全(默認開啓) socket.setTcpNoDelay(true); // 是否須要在長時無數據響應時發送確認數據(相似心跳包),時間大約爲2小時 socket.setKeepAlive(true); // 對於close關閉操做行爲進行怎樣的處理;默認爲false,0 // false、0:默認狀況,關閉時當即返回,底層系統接管輸出流,將緩衝區內的數據發送完成 // true、0:關閉時當即返回,緩衝區數據拋棄,直接發送RST結束命令到對方,並沒有需通過2MSL等待 // true、200:關閉時最長阻塞200毫秒,隨後按第二狀況處理 socket.setSoLinger(true, 20); // 是否讓緊急數據內斂,默認false;緊急數據經過 socket.sendUrgentData(1);發送 // 只有設置爲true,纔會暴露到上層(邏輯層) socket.setOOBInline(true); // 設置接收發送緩衝器大小 socket.setReceiveBufferSize(64 * 1024 * 1024); socket.setSendBufferSize(64 * 1024 * 1024); // 設置性能參數:短連接,延遲,帶寬的相對重要性(權重) socket.setPerformancePreferences(1, 1, 0); } private static void todo(Socket client) throws IOException { // 獲得Socket輸出流 OutputStream outputStream = client.getOutputStream(); // 獲得Socket輸入流 InputStream inputStream = client.getInputStream(); byte[] buffer = new byte[256]; ByteBuffer byteBuffer = ByteBuffer.wrap(buffer); // 等同於上兩行代碼(ByteBuffer是NIO提供的一個工具,allocate就是分配內存地址,ByteBuffer處理的是byte) // ByteBuffer byteBuffer = ByteBuffer.allocate(256); // 嘗試各類數據傳輸,發出 // byte byteBuffer.put((byte) 126); // char char c = 'a'; byteBuffer.putChar(c); // int int i = 2323123; byteBuffer.putInt(i); // bool boolean b = true; byteBuffer.put(b ? (byte) 1 : (byte) 0); // Long long l = 298789739; byteBuffer.putLong(l); // float float f = 12.345f; byteBuffer.putFloat(f); // double double d = 13.31241248782973; byteBuffer.putDouble(d); // String String str = "Hello你好!"; byteBuffer.put(str.getBytes()); // 發送到服務器(長度等於index+1) outputStream.write(buffer, 0, byteBuffer.position() + 1); // 接收服務器返回 int read = inputStream.read(buffer); System.out.println("收到數量:" + read); // 資源釋放 outputStream.close(); inputStream.close(); } /** * 擴展-MSL * MSL是Maximum Segment Lifetime的英文縮寫,可譯爲「最長報文段壽命」, * 它是任何報文在網絡上存在的最長的最長時間,超過這個時間報文將被丟棄。 * 咱們都知道IP頭部中有個TTL字段,TTL是time to live的縮寫,可譯爲「生存時間」, * 這個生存時間是由源主機設置設置初始值但不是但不是存在的具體時間,而是一個IP數據報能夠通過的最大路由數,每通過一個路由器,它的值就減1, * 當此值爲0則數據報被丟棄,同時發送ICMP報文通知源主機。 * RFC793中規定MSL爲2分鐘,但這徹底是從工程上來考慮,對於如今的網絡,MSL=2分鐘可能太長了一些。 * 所以TCP容許不一樣的實現可根據具體狀況使用更小的MSL值。TTL與MSL是有關係的但不是簡單的相等關係,MSL要大於TTL。 */ }
package example; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Inet4Address; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.nio.ByteBuffer; public class Server { private static final int PORT = 20000; public static void main(String[] args) throws IOException { ServerSocket server = createServerSocket(); initServerSocket(server); // 綁定到本地端口上 backlog標識等待隊列中等待數量(超出,則在對應的客戶端觸發異常) server.bind(new InetSocketAddress(Inet4Address.getLocalHost(), PORT), 50); System.out.println("服務器準備就緒~"); System.out.println("服務器信息:" + server.getInetAddress() + " P:" + server.getLocalPort()); // 等待客戶端鏈接 for (; ; ) { // 獲得客戶端 Socket client = server.accept(); // 客戶端構建異步線程 ClientHandler clientHandler = new ClientHandler(client); // 啓動線程 clientHandler.start(); } } private static ServerSocket createServerSocket() throws IOException { // 建立基礎的ServerSocket ServerSocket serverSocket = new ServerSocket(); // 綁定到本地端口20000上,而且設置當前可容許等待連接的隊列爲50個 //server.bind(new InetSocketAddress(Inet4Address.getLocalHost(), PORT), 50); //serverSocket = new ServerSocket(PORT); // 等效於上面的方案,隊列設置爲50個 //serverSocket = new ServerSocket(PORT, 50); // 與上面等同 // serverSocket = new ServerSocket(PORT, 50, Inet4Address.getLocalHost()); return serverSocket; } private static void initServerSocket(ServerSocket serverSocket) throws IOException { // 是否複用未徹底關閉的地址端口 serverSocket.setReuseAddress(true); // 等效Socket#setReceiveBufferSize(針對的是accept()接收到的clientSocket。畢竟在accept時就已經接收到了必定的數據了) serverSocket.setReceiveBufferSize(64 * 1024 * 1024); // 設置serverSocket#accept超時時間 // serverSocket.setSoTimeout(2000); // 設置性能參數:短連接,延遲,帶寬的相對重要性(針對的是accept()接收到的clientSocket) serverSocket.setPerformancePreferences(1, 1, 1); } /** * 客戶端消息處理 */ private static class ClientHandler extends Thread { private Socket socket; ClientHandler(Socket socket) { this.socket = socket; } @Override public void run() { super.run(); System.out.println("新客戶端鏈接:" + socket.getInetAddress() + " P:" + socket.getPort()); try { // 獲得套接字流 OutputStream outputStream = socket.getOutputStream(); InputStream inputStream = socket.getInputStream(); byte[] buffer = new byte[256]; int readCount = inputStream.read(buffer); ByteBuffer byteBuffer = ByteBuffer.wrap(buffer, 0, readCount); // 按客戶端發送的順序讀取 // byte byte be = byteBuffer.get(); // char char c = byteBuffer.getChar(); // int int i = byteBuffer.getInt(); // bool boolean b = byteBuffer.get() == 1; // Long long l = byteBuffer.getLong(); // float float f = byteBuffer.getFloat(); // double double d = byteBuffer.getDouble(); // String int pos = byteBuffer.position(); String str = new String(buffer, pos, readCount - pos - 1); System.out.println("收到數量:" + readCount + " 數據:" + be + "\n" + c + "\n" + i + "\n" + b + "\n" + l + "\n" + f + "\n" + d + "\n" + str + "\n"); outputStream.write(buffer, 0, readCount); outputStream.close(); inputStream.close(); } catch (Exception e) { System.out.println("鏈接異常斷開"); } finally { // 鏈接關閉 try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } System.out.println("客戶端已退出:" + socket.getInetAddress() + " P:" + socket.getPort()); } } }
這裏的tool,代表了兩點:如何實現int與byte之間的轉換,能夠自定義實現數據的轉換
package example; /** * 過渡一下,簡述int與byte之間的轉換。 * 進而明確各類數據類型與byte之間的轉化。 * 最終引伸出NIO包下的ByteBuffer工具,實現不一樣數據類型與byte類型的相互轉換 */ public class Tools { public static int byteArrayToInt(byte[] b) { return b[3] & 0xFF | (b[2] & 0xFF) << 8 | (b[1] & 0xFF) << 16 | (b[0] & 0xFF) << 24; } public static byte[] intToByteArray(int a) { return new byte[]{ (byte) ((a >> 24) & 0xFF), (byte) ((a >> 16) & 0xFF), (byte) ((a >> 8) & 0xFF), (byte) (a & 0xFF) }; } }
因爲實際工做中UDP使用得比較少,因此這裏只給出了BIO中UDP的使用。不過也基本知足了UDP的使用入門了,能夠實現局域網搜索(起碼對我目前的工做來講是夠用了)。至於UDP用於音視頻數據傳輸,得讀者本身尋找,或者等我瞭解以後,更新。
package self; import java.io.IOException; import java.net.*; /** * @Description: * @Author: jarry */ public class UDPSearcher { public static void main(String[] args) throws IOException { System.out.println("UDPSearcher started."); // 構建UDP的Socket(因爲是searcher,即數據的率先發送者,因此能夠不用指定port,用於監聽) DatagramSocket datagramSocket = new DatagramSocket(); // 構建請求消息的實體(包含目標ip及port) String requestMsg = "just a joke."; byte[] requestBytes = requestMsg.getBytes(); DatagramPacket requestPacket = new DatagramPacket(requestBytes, requestBytes.length); requestPacket.setAddress(Inet4Address.getLocalHost()); requestPacket.setPort(20000); // 發送請求數據 System.out.println("UDPSearcher has send msg."); datagramSocket.send(requestPacket); // 接收回送數據 byte[] buf = new byte[512]; DatagramPacket receivePacket = new DatagramPacket(buf,buf.length); datagramSocket.receive(receivePacket); String sourceIp = receivePacket.getAddress().getHostAddress(); int sourcePort = receivePacket.getPort(); int dataLength = receivePacket.getLength(); String receiveData = new String(receivePacket.getData(),0,receivePacket.getData().length); // 顯示接收到的數據 System.out.println("UDPSearcher has received data with source:"+sourceIp+":"+sourcePort+" with length "+dataLength+". data:"+receiveData); // 因爲是demo,因此不用循環,就此結束 System.out.println("UDPSearcher finished."); datagramSocket.close(); } }
package self; import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.SocketException; /** * @Description: * @Author: jarry */ public class UDPProvider { public static void main(String[] args) throws IOException { System.out.println("UDPProvider started."); // 新建DatagramSocekt,並設定在本機20000端口監聽,並接收消息 DatagramSocket datagramSocket = new DatagramSocket(20000); // 新建DatagramPacket實體 byte[] buf = new byte[512]; DatagramPacket datagramPacket = new DatagramPacket(buf,buf.length); // 接收數據 datagramSocket.receive(datagramPacket); // 處理接受到的數據 String sourceIp = datagramPacket.getAddress().getHostAddress(); int sourcePort = datagramPacket.getPort(); String data = new String(datagramPacket.getData(),0,datagramPacket.getLength()); // 顯示接收到的數據 System.out.println("UDPProvider has received data with source:"+sourceIp+":"+sourcePort+" with length "+data.length()+". data:"+data); // 準備發送回送數據 String responseData = "UDPProvider has received data with length:"+data.length(); byte[] responseBytes = responseData.getBytes(); // 構建回送數據實體(別玩了,設置目標ip與port) DatagramPacket responsePacket = new DatagramPacket(responseBytes, responseBytes.length ,datagramPacket.getAddress(),datagramPacket.getPort()); // 發送回送數據 System.out.println("UDPProvider has sended data."); datagramSocket.send(responsePacket); // 因爲是demo,因此不用循環,就此結束 System.out.println("UDPProvider finished."); datagramSocket.close(); } }
爲了網絡監聽的clear,以及權限問題,須要對上述代碼進行一次升級。
package self.v2; /** * @Description: 自定義通訊數據格式(這多是最簡單的應用層協議了) * @Author: jarry */ public class MessageCreator { private static final String SN_HEADER = "收到暗號,我是(SN):"; private static final String PORT_HEADER = "發送暗號,請回電端口(PORT):"; public static String buildWithPort(int port){ return PORT_HEADER + port; } public static int parsePort(String data){ if (data.startsWith(PORT_HEADER)){ return Integer.parseInt(data.substring(PORT_HEADER.length())); } return -1; } public static String buildWithSN(String sn){ return SN_HEADER + sn; } public static String parseSN(String data){ if (data.startsWith(SN_HEADER)){ return data.substring(SN_HEADER.length()); } return null; } }
package self.v2; import java.io.IOException; import java.net.*; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; /** * @Description: * @Author: jarry */ public class UDPSearcher { // 監聽端口號 private static final int LISTEN_PORT = 30000; public static void main(String[] args) throws IOException, InterruptedException { System.out.println("UDPSearcher Started"); Listener listener = listen(); sendBroadcast(); // 讀取任意鍵盤信息後退出 System.in.read(); List<Device> devices = listener.getDevicesAndClose(); for (Device device : devices) { System.out.println("Device:"+device.toString()); } // 完成 System.out.println("UDPSearcher Finished"); } private static Listener listen() throws InterruptedException { System.out.println("UDPSearcher start listen."); CountDownLatch countDownLatch = new CountDownLatch(1); Listener listener = new Listener(LISTEN_PORT, countDownLatch); listener.start(); countDownLatch.await(); return listener; } /** * 用於發送廣播消息 * @throws IOException */ private static void sendBroadcast() throws IOException { System.out.println("UDPSearcher sendBroadcast started."); // 做爲一個搜索者(發送請求),無需指定一個端口,由系統自動分配 DatagramSocket datagramSocket = new DatagramSocket(); // 構建一份請求數據 String requestData = MessageCreator.buildWithPort(LISTEN_PORT); byte[] requestDataBytes = requestData.getBytes(); // 構建發送數據實體 DatagramPacket requestPacket = new DatagramPacket(requestDataBytes, requestDataBytes.length); // 設置目標地址(採用廣播地址) requestPacket.setAddress(Inet4Address.getByName("255.255.255.255")); requestPacket.setPort(20000); // 發送構建好的消息 datagramSocket.send(requestPacket); System.out.println("start send data."); // 發送結束 System.out.println("UDPSearcher sendBroadcast finished."); datagramSocket.close(); } private static class Device { final int port; final String ip; final String sn; public Device(int port, String ip, String sn) { this.port = port; this.ip = ip; this.sn = sn; } @Override public String toString() { return "Device{" + "port=" + port + ", ip='" + ip + '\'' + ", sn='" + sn + '\'' + '}'; } } private static class Listener extends Thread{ private final int listenPort; private final CountDownLatch countDownLatch; private final List<Device> devices = new ArrayList<Device>(); private boolean done = false; private DatagramSocket ds = null; public Listener(int listenPort, CountDownLatch countDownLatch){ super(); this.listenPort = listenPort; this.countDownLatch = countDownLatch; } @Override public void run() { super.run(); // 通知已啓動 countDownLatch.countDown(); // 開始實際數據監聽部分 try { // 監聽回送端口 ds = new DatagramSocket(listenPort); while (!done){ // 接收消息的實體 final byte[] buf = new byte[512]; DatagramPacket receivePack = new DatagramPacket(buf, buf.length); // 開始接收數據 ds.receive(receivePack); // 打印接收到的信息 String ip = receivePack.getAddress().getHostAddress(); int port = receivePack.getPort(); int dataLength = receivePack.getLength(); String data = new String(receivePack.getData(),0,dataLength); System.out.println("UDPSearcher receive form ip:" + ip + "\tport:" + port + "\tdata:" + data); String sn = MessageCreator.parseSN(data); if (sn != null){ Device device = new Device(port, ip ,sn); devices.add(device); } } }catch (Exception e){ }finally { close(); } System.out.println("UDPSearcher listner finished"); } private void close(){ if (ds != null){ ds.close(); ds = null; } } List<Device> getDevicesAndClose(){ done = true; close(); return devices; } } }
package self.v2; /** * @Description: * @Author: jarry */ import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.util.UUID; /** * UDP 提供者, 用於提供UDP服務 */ public class UDPProvider { public static void main(String[] args) throws IOException { String sn = UUID.randomUUID().toString(); Provider provider = new Provider(sn); provider.start(); // 讀取任意字符,退出 System.in.read(); provider.exit(); } private static class Provider extends Thread { private final String sn; private boolean done = false; private DatagramSocket datagramSocket = null; public Provider(String sn){ super(); this.sn = sn; } @Override public void run() { super.run(); System.out.println("UDPProvider started."); try { // 做爲一個接收者(接受請求),須要指定一個端口用來接收消息 datagramSocket = new DatagramSocket(20000); // 經過一個循環,不斷監聽,接收數據 while (true) { // 接收消息的實體 final byte[] buf = new byte[512]; DatagramPacket receivePack = new DatagramPacket(buf, buf.length); // 開始接收數據 datagramSocket.receive(receivePack); // 打印接收到的信息 String ip = receivePack.getAddress().getHostAddress(); int port = receivePack.getPort(); int dataLength = receivePack.getLength(); String data = new String(receivePack.getData(), 0, dataLength); System.out.println("UDPProvider receive form ip:" + ip + "\tport:" + port + "\tdata:" + data); // 得到目標端口 int responsePort = MessageCreator.parsePort(data); if (responsePort != -1){ // 構建一份回送數據 String responseData = MessageCreator.buildWithSN(sn); byte[] reponseDataBytes = responseData.getBytes(); // 直接根據發送者,構建回送數據實體 DatagramPacket responsePacket = new DatagramPacket(reponseDataBytes, reponseDataBytes.length, receivePack.getAddress(), // 採用指定的端口,而不是解析得到的來源端口(來源端口不必定就是監聽端口,這是有些時候爲了簡化而已) responsePort); // 發送構建好的回送消息 datagramSocket.send(responsePacket); System.out.println("start send data."); } } }catch (Exception ignore){ }finally { close(); } // 發送結束 System.out.println("UDPProvider finished."); } /** * 對外提供結束方法 */ void exit(){ done = true; close(); } /** * 本地關閉DatagramSocket的方法 */ private void close(){ if (datagramSocket != null){ datagramSocket.close(); datagramSocket = null; } } } }
在瞭解BIO以後,咱們能夠很明顯地發現其中的問題,那就是BIO模型中,每個Client的請求發送到Server端後,Server端經過accept接收請求後,必須建立一個clientSocket來進行通訊。而且這個通訊是阻塞的,一方面,新的clientSocket沒法進入(單線程嘛),另外一方面,clientSocket是經過流的方式進行通訊,而流的通訊方式是阻塞的(即沒有得到數據是,必須在那裏等待)。這兩個問題,前者能夠如demo中那樣,建立一個線程池來解決,然後者是無法解決的。而這樣一個多線程+BIO也是不少開發人員的選擇,由於這樣的實現也十分簡單,而且能夠知足必定的需求了。
可是,回過頭想想,上述的解決方案,存在一個問題。那就是系統併發量受限於線程池的線程數。若是請求只有幾百的併發,那麼上述的解決方案沒有任何問題。可是任何一個稍有規模的業務場景都不會只有幾百的併發。那麼若是不對技術進行升級,只有兩個辦法。一個升級硬件配置,尤爲是內存(由於線程是很是消耗內存的),另外一方面將鏈接按照必定的邏輯維度進行拆分(好比按照業務場景)。
我曾經和個人Boss談話時,提到這麼一句話:技術的很是熟練,不如技術升級帶來的價值高(由於咱們公司有一個去年畢業的開發,很是抗拒學習新技術。雖然基礎的CRUD挺熟練的,可是效率真的過低了。一個簡單的條件查詢就說要十五個工做日。若是他會使用函數式編程,配合MP,也許就一個小時吧。有空能夠出個效率編程的專題,感受頗有價值)。
因此,在BIO越加疲軟的時候(固然有的業務場景BIO性能貌似並不比NIO低多少,可是投入差異有些大),終於NIO面世。
NIO藉助事件監聽機制,提供非阻塞式的高伸縮性網絡。固然,有興趣的能夠深挖,相關概念仍是不少的,好比它與linux的IO模型的關係,這些均可以很好地幫助你們擴展視野(畢竟視野決定了高度)。
NIO有三大支柱,分別是:ByteBuffer,Channel,Selector(詳見:Java NIO:Buffer、Channel 和 Selector)。
ByteBuffer:就是一個數據實體,其中提供了許多數據轉換的方法。如在BIO的demo中就用到了
Channel:參考網絡通訊的channel,全部的 NIO 操做始於通道,通道是數據來源或數據寫入的目的地。這下降了BIO入門時對流認識的痛苦(一會輸入流,一會輸出流,流還須要進行轉換),而且也有利於提升開發效率。
Selector:多路複用器(雖然有人稱之爲選擇器,可是更爲精準的說法時多路複用器),實現了一個線程管理多個Channel,也是NIO事件驅動機制的基礎。
固然上述的這些,也不是必須的,我能夠只有Channel,ByteBuffer的數據轉換能夠本身實現,而Selector,能夠經過多線程的方式去達到相似的功能效果(性能固然時無法比的了)。可是隻有三者齊聚,才能最大發揮NIO的性能。
這裏給出一些簡單的demo,供你們認識。
package tech.jarry.learning.netease; import java.io.IOException; import java.net.Inet4Address; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.Scanner; /** * @Description: NIO模型下的TCP客戶端實現 * @Author: jarry */ public class NIOClient { public static void main(String[] args) throws IOException { // 得到一個SocektChannel SocketChannel socketChannel = SocketChannel.open(); // 設置SocketChannel爲非阻塞模式 socketChannel.configureBlocking(false); // 設置SocketChannel的鏈接配置 socketChannel.connect(new InetSocketAddress(Inet4Address.getLocalHost(), 8080)); // 經過循環,不斷鏈接。跳出循環,表示鏈接創建成功 while (!socketChannel.finishConnect()){ // 若是沒有成功創建鏈接,就一直阻塞當前線程(.yield()會令當前線程「謙讓」出CPU資源) Thread.yield(); } // 發送外部輸入的數據 Scanner scanner = new Scanner(System.in); System.out.println("please input:"); String msg = scanner.nextLine(); // ByteBuffer.wrap()會直接調用HeapByteBuffer。故一方面其會本身完成內存分配。另外一方面,其分配的內存是非直接內存(非heap堆) ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes()); // ByteBuffer.hasRemaining()用於判斷對應ByteBuffer是否還有剩餘數據(實現:return position < limit;) while (byteBuffer.hasRemaining()){ socketChannel.write(byteBuffer); } // 讀取響應 System.out.println("receive echoResponse from server"); // 設置緩衝區大小爲1024 ByteBuffer requestBuffer = ByteBuffer.allocate(1024); // 判斷條件:是否開啓,是否讀取到數據 //TODO 我認爲這裏的實現十分粗糙,是不能夠置於生產環境的,具體還須要後面再看看(即便是過渡demo,也能夠思考一下嘛) while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1){ // 長鏈接狀況下,須要手動判斷數據有沒有讀取結束 (此處作一個簡單的判斷: 超過0字節就認爲請求結束了) if (requestBuffer.position() > 0) { break; } } requestBuffer.flip(); // byte[] content = new byte[requestBuffer.limit()]; // // .get()方法只會返回byte類型(猜想是當前標記位的數據) // requestBuffer.get(content); // System.out.println(new String(content)); // ByteBuffer提供了大量的基本類型轉換的方法,能夠直接拿來使用 System.out.println(new String(requestBuffer.array())); scanner.close(); socketChannel.close(); } }
package tech.jarry.learning.netease; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; /** * @Description: 直接根據BIOServer進行轉變的。因此總體的邏輯與BIOServer相似 * @Author: jarry */ public class NIOServer { public static void main(String[] args) throws IOException { // 建立網絡服務端 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); //TODO .socket().bind()與.bind()的區別不清楚 serverSocketChannel.socket().bind(new InetSocketAddress(8080)); System.out.println("server has started"); // 經過循環,不斷獲取監聽不一樣客戶端發來的鏈接請求 while (true){ // 因爲NIO是非阻塞,故返回值是徹底多是null的 SocketChannel socketChannel = serverSocketChannel.accept(); if (socketChannel != null){ System.out.println("server has connect a new client: "+socketChannel.getRemoteAddress().toString()); socketChannel.configureBlocking(false); ByteBuffer requestBuffer = ByteBuffer.allocate(1024); while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1){ if (requestBuffer.position() > 0){ break; } } if (requestBuffer.position() == 0){ // 若是沒有數據,就再也不進行後續處理,而是進入下一個循環 continue; } requestBuffer.flip(); System.out.println("server receive a message: "+new String(requestBuffer.array())); System.out.println("server receive a message from: "+socketChannel.getRemoteAddress()); // 響應結果 200 String response = "HTTP/1.1 200 OK\r\n" + "Content-Length: 12\r\n\r\n" + "Hello World!"; ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes()); while (responseBuffer.hasRemaining()){ socketChannel.write(responseBuffer); } } } } }
package tech.jarry.learning.netease; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.Iterator; import java.util.List; /** * @Description: 與BIOServer一樣的,NIOServer也沒法同時鏈接多個客戶端 * V1版本這裏,依舊如BIOServer1那樣,經過輪詢實現多個客戶端處理(不過BIO因爲是阻塞的,因此採用多線程。而NIO是非阻塞的,因此採用一個全局列表來進行處理) * @Author: jarry */ public class NIOServerV1 { private static List<SocketChannel> socketChannelList = new ArrayList<>(); public static void main(String[] args) throws IOException { // 建立網絡服務端 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); //TODO .socket().bind()與.bind()的區別不清楚 serverSocketChannel.socket().bind(new InetSocketAddress(8080)); System.out.println("server has started"); // 經過循環,不斷獲取監聽不一樣客戶端發來的鏈接請求 while (true) { // 因爲NIO是非阻塞,故返回值是徹底多是null的 SocketChannel socketChannel = serverSocketChannel.accept(); if (socketChannel != null) { // 若是有新的鏈接接入,就打印日誌,並將對應的SocektChannel置入全局隊列中 System.out.println("server has connect a new client: " + socketChannel.getRemoteAddress().toString()); socketChannel.configureBlocking(false); socketChannelList.add(socketChannel); } else { // 若是沒有新的鏈接接入,就對現有鏈接的數據進行處理,若是處理完了就從列表中刪除對應SocketChannel Iterator<SocketChannel> socketChannelIterator = socketChannelList.iterator(); while (socketChannelIterator.hasNext()){ SocketChannel clientSocketChannel = socketChannelIterator.next(); ByteBuffer requestBuffer = ByteBuffer.allocate(1024); // 新增:若是當前channel的數據長度爲0,表示這個通道沒有數據須要處理,那就過會兒處理 if (clientSocketChannel.read(requestBuffer) == 0){ // 進入下一個循環,即處理下一個channel continue; } while (clientSocketChannel.isOpen() && clientSocketChannel.read(requestBuffer) != -1) { if (requestBuffer.position() > 0) { break; } } if (requestBuffer.position() == 0) { // 若是沒有數據,就再也不進行後續處理,而是進入下一個循環 continue; } requestBuffer.flip(); System.out.println("server receive a message: " + new String(requestBuffer.array())); System.out.println("server receive a message from: " + clientSocketChannel.getRemoteAddress()); // 響應結果 200 String response = "HTTP/1.1 200 OK\r\n" + "Content-Length: 12\r\n\r\n" + "Hello World!"; ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes()); while (responseBuffer.hasRemaining()) { clientSocketChannel.write(responseBuffer); } // 新增:若是運行到這裏,說明返回的數據已經返回了 // 我認爲,若是是長鏈接的話,這裏的處理應當更加嚴密(固然這只是一個過渡demo版本) socketChannelIterator.remove(); // 我認爲,應當進行close等資源釋放操做。而且應該先remove(),再close clientSocketChannel.close(); } } } } }
package tech.jarry.learning.netease.again; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; /** * @Description: 這個版本,充分利用了NIO的第三個支柱-Selector,完成事件驅動的轉型 * 注意,上個版本使用循環,就相似自旋(自旋相對比較底層,小),雖然解決了BIO的每一個client佔據一個線程的資源消耗(主要是內存),可是加大了CPU的消耗(CPU要不斷進行循環,判斷,即便是無效的操做) * NIO經過Selector,創建事件驅動模型,來解決這一問題。即只有當特定的事件(如鏈接創建完成)發生,纔會進行對應的事件處理(從而避免了CPU的無效消耗,提升效率) * 私語:不少Javaer一直停留在初級層次(網絡編程只能百度,使用BIO),就是沒法突破事件驅動模型這種抽象層次更高的高層思想 * @Description: 爲了更好地學習與理解Netty,基礎的NIO再過一遍 * @Author: jarry */ public class NIOServerV2 { public static void main(String[] args) throws IOException { // 1.建立並配置ServerSocketChannel ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(8080)); // 2.建立Selector,並完成SelectionKey的註冊,並完成初始化監聽 // Selector在非阻塞的基礎上,實現了一個線程管理多個Channel(也就常說的「多路複用」) // 那可不能夠理解爲一個selector管理多個channel,監聽多個channel(後續代碼中,除了server外,還有client們都註冊到了這個selector中) Selector selector = Selector.open(); SelectionKey selectionKey = serverSocketChannel.register(selector, 0, serverSocketChannel); selectionKey.interestOps(SelectionKey.OP_ACCEPT); System.out.println("server start success "); // 3.開始循環處理各個事件 while (true) { // 1.經過.select()阻塞當前線程,直到有註冊的selectionKey觸發(觸發是,會將對應的selectionKey複製到selected set中) selector.select(); // 2.獲取觸發的selectionKey集合 Set<SelectionKey> selectionKeySet = selector.selectedKeys(); // 3.遍歷處理觸發的selectionKey集合 Iterator<SelectionKey> iterator = selectionKeySet.iterator(); while (iterator.hasNext()){ // 1.得到觸發的selectionKey SelectionKey selectedKey = iterator.next(); // 2.從集合中移除正在處理的selectionKey(單線程也能夠在處理完後移除,但多線程中就可能出現同一selectionKey被多個線程處理) iterator.remove(); // 3.根據iteration觸發的事件類型,進行對應處理(這裏demo爲了簡單一些,就只處理accept與read事件類型) if (selectedKey.isAcceptable()){ // 若是selectedKey觸發的是accept事件類型,即serverSocketChannel經過accept得到了一個客戶端鏈接 // 1.得到服務端ServerSocketChannel(即以前監聽accept事件時,放入attachment的可選對象,便於後續處理) ServerSocketChannel server = (ServerSocketChannel)selectedKey.attachment(); // 2.得到客戶端SocketChannel(利用剛剛得到的server,與觸發的.accept()方法),便於後續操做 SocketChannel client = server.accept(); // 3.配置客戶端SocketChannel(畢竟SocketChannel也是默認配置阻塞的) client.configureBlocking(false); // 4.註冊新的事件(既然已經鏈接成功,那麼開始註冊如read等新事件,便於後續監聽) // 也能夠採起相似初始化階段那樣兩句代碼完成,可是這裏不須要(也能夠說時表現一個新的處理方法) client.register(selector, SelectionKey.OP_READ, client); // 5.日誌打印 System.out.println("server has connect a new client: "+ client.getRemoteAddress()); } if (selectedKey.isReadable()){ // 若是selectedKey觸發的是可讀事件類型,即當前selectionKey對應的channel能夠進行讀操做(但不表明就必定有數據能夠讀) // 1.得到客戶端SocketChannel(即以前監聽事件處理時,註冊read事件時置入的attachment對象) SocketChannel client = (SocketChannel)selectedKey.attachment(); // 2.新建一個ByteBuffer用於緩衝數據(或者說,用來盛放數據) ByteBuffer requestBuffer = ByteBuffer.allocate(1024); // 3.判斷對應client是否處於open狀態,對應channel內是否有可讀數據(若是不知足就跳過該循環) // 本來我在想我都已經移除了對應的key,這裏又沒有處理數據,那下一次不就沒有對應key了。 // 但實際是我移除的是.selectedKeys()選出來的key(是複製體),下次觸發read事件,還會有對應key被selectedKeys選出來的。 while (client.isOpen() && client.read(requestBuffer) != -1){ // 達到這裏,說明對應channel中是有對應數據的 // 開始讀取數據 if (requestBuffer.position() > 0){ // 這裏爲了簡化處理,就設定爲一旦讀取了數據就算讀取完畢 // 注意:讀取的操做在loop的判斷條件中,client.read(requestBuffer) //TODO_FINISH 疑問:既然這裏設置的是>0就break,那爲何實際操做中,數據(字符串)是讀完了呢 // 答案:while循環條件的read就是完成了當前緩衝區數據的讀取。 //而循環體中的if在生產環境可能更可能是進行(編解碼的沾包拆包處理等)。 break; } } // 4.若是requestBuffer爲空,即沒有讀取到數據,那就跳出本次selectionKey的處理 if (requestBuffer.position() == 0){ continue; } // 5.到達這裏說明requestBuffer.position()不爲0,即bytebBuffer不爲空,即讀取到了數據,那麼就處理數據 // 5.1 將requestBuffer從寫模式轉爲讀模式 requestBuffer.flip(); // 5.2 業務處理:將brequestBuffer中的數據打印出來(切記,只有.allocate()分配的非直接內存的ByteBuffer才能夠.array()) System.out.println(new String(requestBuffer.array())); System.out.println("server has receive a message from: "+client.getRemoteAddress()); // 6.返回響應 // 6.1 模擬一下http協議的響應,便於瀏覽器解析(響應結果 200) String response = "HTTP/1.1 200 OK\r\n" + "Content-Length: 11\r\n\r\n" + "Hello World"; // 6.2 經過ByteBuffer.wrap()將數據置入響應的ByteBuffer ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes()); // 6.2 將響應的ByteBuffer寫入到客戶端Socket中(底層會自動將該數據發送過去,額,好吧。實際是交由操做系統底層處理) while (responseBuffer.hasRemaining()) { client.write(responseBuffer); } } } //TODO_FINISHED 不理解這句的目的是什麼,這是一個相似.select()的非阻塞式方法。 // epoll空論的一種解決方案,可是沒法根本解決問題,最好仍是如Netty那樣refresh解決 selector.selectNow(); } } }
在瞭解NIO以後,估計不少人都太歎服於它的設計,它的效率,它的性能。NIO因爲其模型,充分發揮了單線程的性能,可是單線程每每就意味着性能瓶頸(如單線程是沒法發揮多核CPU的性能的)。
因此,如何實現NIO的多線程成爲了網絡編程的發展方向。
固然若是隻是爲了將NIO與多線程結合起來,其實並非十分困難。可是如何有機結合(既充分發揮多線程性能,又不產生過多的性能浪費),並確保可擴展性纔是真正的關鍵。
這個時候Doug Lea這個Java大神(真的佩服),發表了一篇文章Scalable IO in Java,提出瞭解決問題,甚至都給出了demo。
首先是基本的Reactor線程模型設計:
在Reactor基本線程模型中,Doug Lea將NIO進行accept操做的部分提取出來,經過一個單一線程acceptor(也就是當前線程)實現client的accept信號的監聽,並進行分發(進行後續事件的註冊)。 而當監聽到read等事件後,經過dispatch將相關事件處理分發到線程池TreadPool中,交由worker thread進行具體業務處理。
固然這樣的線程模型,其擴展性依舊沒法知足需求,其性能瓶頸,會卡在acceptor線程上。因此Doug Lea進而提出了multiple Reactors
其設計是將原先的基本Reactor線程模型的Reactor拆分爲mainReactor與subReactor,中間經過acceptor鏈接,從而下降原先基本Reactor線程模型中acceptor的壓力。
這裏給出一些簡單的demo,供你們認識。
package tech.jarry.learning.netease; import java.io.IOException; import java.net.Inet4Address; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.Scanner; /** * @Description: NIO模型下的TCP客戶端實現 * @Author: jarry */ public class NIOClient { public static void main(String[] args) throws IOException { // 得到一個SocektChannel SocketChannel socketChannel = SocketChannel.open(); // 設置SocketChannel爲非阻塞模式 socketChannel.configureBlocking(false); // 設置SocketChannel的鏈接配置 socketChannel.connect(new InetSocketAddress(Inet4Address.getLocalHost(), 8080)); // 經過循環,不斷鏈接。跳出循環,表示鏈接創建成功 while (!socketChannel.finishConnect()){ // 若是沒有成功創建鏈接,就一直阻塞當前線程(.yield()會令當前線程「謙讓」出CPU資源) Thread.yield(); } // 發送外部輸入的數據 Scanner scanner = new Scanner(System.in); System.out.println("please input:"); String msg = scanner.nextLine(); // ByteBuffer.wrap()會直接調用HeapByteBuffer。故一方面其會本身完成內存分配。另外一方面,其分配的內存是非直接內存(非heap堆) ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes()); // ByteBuffer.hasRemaining()用於判斷對應ByteBuffer是否還有剩餘數據(實現:return position < limit;) while (byteBuffer.hasRemaining()){ socketChannel.write(byteBuffer); } // 讀取響應 System.out.println("receive echoResponse from server"); // 設置緩衝區大小爲1024 ByteBuffer requestBuffer = ByteBuffer.allocate(1024); // 判斷條件:是否開啓,是否讀取到數據 //TODO 我認爲這裏的實現十分粗糙,是不能夠置於生產環境的,具體還須要後面再看看(即便是過渡demo,也能夠思考一下嘛) while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1){ // 長鏈接狀況下,須要手動判斷數據有沒有讀取結束 (此處作一個簡單的判斷: 超過0字節就認爲請求結束了) if (requestBuffer.position() > 0) { break; } } requestBuffer.flip(); // byte[] content = new byte[requestBuffer.limit()]; // // .get()方法只會返回byte類型(猜想是當前標記位的數據) // requestBuffer.get(content); // System.out.println(new String(content)); // ByteBuffer提供了大量的基本類型轉換的方法,能夠直接拿來使用 System.out.println(new String(requestBuffer.array())); scanner.close(); socketChannel.close(); } }
package tech.jarry.learning.netease; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; import java.util.Random; import java.util.Set; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** * @Description: 根據Doug Lea大神的多路複用Reactor線程模型,進行編碼,學習Reactor設計模式在網絡編程的重要體現 * 注意:NIOServerV2做爲一個demo已經不錯了。可是仍然存在致命的性能瓶頸(其實很明顯,整個網絡編程就靠一個線程實現所有工做,確定不行,起碼無法充分發揮多核CPU的能力) * 故將服務端經常使用的部分分爲accept,read,bussinessDeal三個部分(第三部分,本demo就不深刻了) * @Author: jarry */ public class NIOServerV3 { // 處理業務操做的線程 private static ExecutorService workPool = Executors.newCachedThreadPool(); /** * 封裝了Selector.select()等事件的輪詢的共用代碼 */ abstract class ReactorThread extends Thread { Selector selector; LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(); /** * Selector監聽到有事件後,調用這個方法(不過具體實現,須要基類本身實現) * @param channel */ public abstract void handler(SelectableChannel channel) throws Exception; private ReactorThread() throws IOException { selector = Selector.open(); } // 用於判斷線程運行狀態 volatile boolean running = false; @Override public void run() { // 輪詢Selector事件 while (running) { try { // 執行隊列中的任務 Runnable task; while ((task = taskQueue.poll()) != null) { task.run(); } selector.select(1000); // 獲取查詢結果 Set<SelectionKey> selected = selector.selectedKeys(); // 遍歷查詢結果 Iterator<SelectionKey> iter = selected.iterator(); while (iter.hasNext()) { // 被封裝的查詢結果 SelectionKey key = iter.next(); iter.remove(); int readyOps = key.readyOps(); // 關注 Read 和 Accept兩個事件 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { try { SelectableChannel channel = (SelectableChannel) key.attachment(); channel.configureBlocking(false); handler(channel); if (!channel.isOpen()) { key.cancel(); // 若是關閉了,就取消這個KEY的訂閱 } } catch (Exception ex) { key.cancel(); // 若是有異常,就取消這個KEY的訂閱 } } } selector.selectNow(); } catch (IOException e) { e.printStackTrace(); } } } private SelectionKey register(SelectableChannel channel) throws Exception { // 爲何register要以任務提交的形式,讓reactor線程去處理? // 由於線程在執行channel註冊到selector的過程當中,會和調用selector.select()方法的線程爭用同一把鎖 // 而select()方法是在eventLoop中經過while循環調用的,爭搶的可能性很高,爲了讓register能更快的執行,就放到同一個線程來處理 FutureTask<SelectionKey> futureTask = new FutureTask<>(() -> channel.register(selector, 0, channel)); taskQueue.add(futureTask); return futureTask.get(); } private void doStart() { if (!running) { running = true; start(); } } } // 0. 建立ServerSocketChannel private ServerSocketChannel serverSocketChannel; // 一、建立多個線程 - accept處理reactor線程 (accept線程) private ReactorThread[] mainReactorThreads = new ReactorThread[1]; // 二、建立多個線程 - io處理reactor線程 (I/O線程) private ReactorThread[] subReactorThreads = new ReactorThread[8]; /** * 初始化線程組 */ private void initGroup() throws IOException { // 建立IO線程,負責處理客戶端鏈接之後socketChannel的IO讀寫 for (int i = 0; i < subReactorThreads.length; i++) { subReactorThreads[i] = new ReactorThread() { @Override public void handler(SelectableChannel channel) throws IOException { // work線程只負責處理IO處理,不處理accept事件 SocketChannel ch = (SocketChannel) channel; ByteBuffer requestBuffer = ByteBuffer.allocate(1024); while (ch.isOpen() && ch.read(requestBuffer) != -1) { // 長鏈接狀況下,須要手動判斷數據有沒有讀取結束 (此處作一個簡單的判斷: 超過0字節就認爲請求結束了) if (requestBuffer.position() > 0) { break; } } if (requestBuffer.position() == 0) { return; // 若是沒數據了, 則不繼續後面的處理 } requestBuffer.flip(); byte[] content = new byte[requestBuffer.limit()]; requestBuffer.get(content); System.out.println(new String(content)); System.out.println(Thread.currentThread().getName() + "收到數據,來自:" + ch.getRemoteAddress()); // TODO 業務操做 數據庫、接口... workPool.submit(() -> { }); // 響應結果 200 String response = "HTTP/1.1 200 OK\r\n" + "Content-Length: 11\r\n\r\n" + "Hello World"; ByteBuffer buffer = ByteBuffer.wrap(response.getBytes()); while (buffer.hasRemaining()) { ch.write(buffer); } } }; } // 建立mainReactor線程, 只負責處理serverSocketChannel for (int i = 0; i < mainReactorThreads.length; i++) { mainReactorThreads[i] = new ReactorThread() { AtomicInteger incr = new AtomicInteger(0); @Override public void handler(SelectableChannel channel) throws Exception { // 只作請求分發,不作具體的數據讀取 ServerSocketChannel ch = (ServerSocketChannel) channel; SocketChannel socketChannel = ch.accept(); socketChannel.configureBlocking(false); // 收到鏈接創建的通知以後,分發給I/O線程繼續去讀取數據 int index = incr.getAndIncrement() % subReactorThreads.length; ReactorThread workEventLoop = subReactorThreads[index]; workEventLoop.doStart(); SelectionKey selectionKey = workEventLoop.register(socketChannel); selectionKey.interestOps(SelectionKey.OP_READ); System.out.println(Thread.currentThread().getName() + "收到新鏈接 : " + socketChannel.getRemoteAddress()); } }; } } /** * 初始化channel,而且綁定一個eventLoop線程 * * @throws IOException IO異常 */ private void initAndRegister() throws Exception { // 一、 建立ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); // 二、 將serverSocketChannel註冊到selector int index = new Random().nextInt(mainReactorThreads.length); mainReactorThreads[index].doStart(); SelectionKey selectionKey = mainReactorThreads[index].register(serverSocketChannel); selectionKey.interestOps(SelectionKey.OP_ACCEPT); } /** * 綁定端口 * * @throws IOException IO異常 */ private void bind() throws IOException { // 一、 正式綁定端口,對外服務 serverSocketChannel.bind(new InetSocketAddress(8080)); System.out.println("啓動完成,端口8080"); } public static void main(String[] args) throws Exception { NIOServerV3 nioServerV3 = new NIOServerV3(); nioServerV3.initGroup(); // 一、 建立main和sub兩組線程 nioServerV3.initAndRegister(); // 二、 建立serverSocketChannel,註冊到mainReactor線程上的selector上 nioServerV3.bind(); // 三、 爲serverSocketChannel綁定端口 } }
爲了更好的理解Reactor線程模型,我將以前的Reactor代碼,按照個人代碼習慣,作了一些調整。
這個部分理解是有一些困難的,推薦多看幾遍,如我這樣手擼兩邊,第二遍能夠根據本身的習慣,進行代碼結構的調整。
package tech.jarry.learning.netease.again; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; import java.util.Random; import java.util.Set; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** * @Description: 爲了更好地學習與理解Netty,結合Reactor線程模型的NIO再過一遍,並作出必定的結構調整 * @Author: jarry */ public class NIOServerV3 { // 工做線程池,其中工做線程用於完成實際工做(如計算,編解碼等工做) private static ExecutorService workerPool = Executors.newCachedThreadPool(); // 全局變量ServerSocketChannel,記錄服務端的Channel private ServerSocketChannel serverSocketChannel; // 建立mainReactors線程組 private MainReactorThread[] mainReactorThreads = new MainReactorThread[1]; // 建立subReactors線程組 private SubReactorThread[] subReactorThreads = new SubReactorThread[8]; private abstract class AbstractReactorThread extends Thread { // 建立Selector,用於創建Channel事件監聽 protected Selector selector; // 用於標記線程運行狀態 private volatile boolean running = false; // 建立任務隊列,用於多線程處理工做 private LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(); /** * 經過懶加載方式,實例化Selector */ public AbstractReactorThread() throws IOException { selector = Selector.open(); } @Override /** * 重寫run方法,完成ReactorThread的公共代碼邏輯 */ public void run() { while (running){ // 1.經過一個巧妙的方式,遍歷處理taskQueue中的全部task Runnable task; while ((task = taskQueue.poll()) != null){ task.run(); } // 2.經過.select()阻塞當前線程,直到有註冊的selectionKey觸發(之因此等待1000ms,應該是爲了令上面的task執行完成) try { selector.select(1000L); } catch (IOException e) { e.printStackTrace(); } // 3.接下來的操做相似,遍歷處理各類監聽到的事件 Set<SelectionKey> selectionKeySet = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeySet.iterator(); while (iterator.hasNext()){ SelectionKey selectedKey = iterator.next(); iterator.remove(); // 得到事件類型的編號 int readyOps = selectedKey.readyOps(); // 經過位運算等方式,快速判斷readyOps是否與對應事件類型編號符合(這裏做爲demo只關注accept與read事件) if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { // 根據V2的編程瞭解,.attachment返回的很可能是服務端的ServerSocketChannel,也多是客戶端的SocketChannel,故採用他們共同的父類SelectableChannel SelectableChannel channel = (SelectableChannel)selectedKey.attachment(); try { // 老規矩,將channel設置爲非阻塞式的 channel.configureBlocking(false); // 將channel交給抽象方法reactorHandler解決,(具體實現由各自子類去實現) //TODO_FINISH 話說,如何肯定哪一個子類解決哪一個問題 // 答案:抽象類不會實例化成對象 // 這裏的reactorHandler都是由對應子類調用的。MainReactorThread只有在註冊時調用,而且是直接置入taskQueue,第二次不會到這裏 reactorHandler(channel); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } // 判斷channel是否關閉 if (!channel.isOpen()){ // 若是channel已經關閉,那麼其上的SelectionKey就能夠取消訂閱了 selectedKey.cancel(); } } } //TODO 這個仍是看不懂 try { selector.selectNow(); } catch (IOException e) { e.printStackTrace(); } } } /** * 根據提交的channel,進行註冊處理(畢竟調用這個方法的,也只有此類,與衍生類了) * @param channel * @return */ protected SelectionKey register(SelectableChannel channel) throws ExecutionException, InterruptedException { // 爲何register要以任務提交的形式,讓reactor線程去處理? // 由於線程在執行channel註冊到selector的過程當中,會和調用selector.select()方法的線程爭用同一把鎖 // 而select()方法是在eventLoop中經過while循環調用的,爭搶的可能性很高,爲了讓register能更快的執行,就放到同一個線程來處理 // 這裏不管是解決方案,仍是register與select通用一把synchronized鎖,都蠻使人驚歎的(雖然我不大理解爲何register要與select公用一邊鎖) // select -> SelectorImpl.lockAndDoSelect 該方法的執行內容採用了synchronized(this)鎖 // register -> SelectorImpl.register 該方法的執行內容採用了synchronized(this.publicKeys)鎖 (果真這個比較複雜,主要synchronized鎖太多了) FutureTask<SelectionKey> futureTask = new FutureTask<>(() -> channel.register(selector, 0, channel) ); taskQueue.add(futureTask); return futureTask.get(); } /** * 執行啓動操做(其實外部能夠判斷線程狀態的,可是這裏running表示的線程狀態,與規定的線程狀態不一樣) */ protected void doStart(){ if (!running){ running = true; start(); } } /** * mainReactor與subReactor的handler處理邏輯是不一樣的,交由子類實現 */ protected abstract void reactorHandler(SelectableChannel channel) throws IOException, ExecutionException, InterruptedException; } /** * mainReactor的實現類,實現了父類的reactorHandler方法。主要完成accept的監聽與處理,並進行事件分發操做 */ public class MainReactorThread extends AbstractReactorThread { AtomicInteger atomicInteger = new AtomicInteger(0); /** * 經過懶加載方式,實例化Selector */ public MainReactorThread() throws IOException { } @Override protected void reactorHandler(SelectableChannel channel) throws IOException, ExecutionException, InterruptedException { // 得到服務端ServerSocketChannel ServerSocketChannel server = (ServerSocketChannel) channel; // 得到客戶端SocketChannel SocketChannel client = server.accept(); // 設置客戶端SocketChannel爲非阻塞模式 client.configureBlocking(false); // // 設置新的事件監聽 // client.register(selector, SelectionKey.OP_READ, client); // 再也不由當前線程完成read事件的註冊,畢竟當前線程只完成accept事件處理,與事件分發 // 故調用專門寫的一個私有方法,進行註冊 doRegister(client); // 打印日誌 System.out.println("server has connect a new client: "+client.getRemoteAddress()); } /** * Reactor線程模型下,MainReactor將read事件的註冊下放到SubReactor * @param client 須要進行事件(這裏只處理read事件)註冊的client */ private void doRegister(SocketChannel client) throws ExecutionException, InterruptedException { // 經過輪詢的方式(也能夠自定義,或擴展開),將事件(非Accept事件,如read事件)交給subReactor線程池中的線程處理 int index = atomicInteger.getAndIncrement() % subReactorThreads.length; // 獲取subReactorThread對象,又稱workEventLoop對象(爲了更好地對接Netty中的EventLoop SubReactorThread workEventLoop = subReactorThreads[index]; // 調用workEventLoop的doStart()方法,啓動工做線程(若是以前已有事件啓動了,就不會再啓動了) workEventLoop.doStart(); // 完成事件的註冊工做(AbstractReactorThread中的註冊,默認監聽事件編碼爲0。 SelectionKey selectionKey = workEventLoop.register(client); // 設置監聽事件的編碼(這樣的分離,有助於不一樣子類的實現,更加友好) selectionKey.interestOps(SelectionKey.OP_READ); } } /** * subReactor的實現類,實現了父類的reactorHandler方法。主要完成非accept事件(這裏demo特指read)的監聽與處理,包括打印,計算,響應等 */ public class SubReactorThread extends AbstractReactorThread { /** * 經過懶加載方式,實例化Selector */ public SubReactorThread() throws IOException { } @Override /** * 完成非accept事件(這裏特指read)事件的處理(打印與響應) */ protected void reactorHandler(SelectableChannel channel) throws IOException { // 得到客戶端SocketChannel SocketChannel client = (SocketChannel) channel; // 建立ByteBuffer做爲緩衝區 ByteBuffer requestBuffer = ByteBuffer.allocate(1024); // 嘗試讀取數據 while (client.isOpen() && (client.read(requestBuffer)) != -1){ // 這裏進行簡單判斷與處理 if (requestBuffer.position() > 0){ break; } } // 判斷requestBuffer大小 if (requestBuffer.position() == 0){ // 若是沒有數據,就不須要進行接下來的處理了 return; } // 將requestBuffer由寫模式轉爲讀模式 requestBuffer.flip(); // TODO 業務操做 數據庫、接口... workerPool.submit(() -> { // 如:打印請求數據 System.out.println("server get a message: "+new String(requestBuffer.array())); }); // 打印日誌 System.out.println("server get a mesage from client: "+client.getRemoteAddress()); // 發送響應 String response = "HTTP/1.1 200 OK\r\n" + "Content-Length: 11\r\n\r\n" + "Hello World"; ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes()); while (responseBuffer.hasRemaining()){ client.write(responseBuffer); } } } /** * Reactor線程模型的初始化 * @throws IOException * @throws ExecutionException * @throws InterruptedException */ public void init() throws IOException, ExecutionException, InterruptedException { initGroup(); initMain(); } /** * 進行服務端,端口綁定 * @param port * @throws IOException */ public void bind(int port) throws IOException { serverSocketChannel.socket().bind(new InetSocketAddress(8080)); System.out.println("server bind success"); System.out.println("server start"); } /** * 實例化兩個Reactor線程組 * @throws IOException */ private void initGroup() throws IOException { for (int i = 0; i < mainReactorThreads.length; i++) { mainReactorThreads[i] = new MainReactorThread(); } for (int i = 0; i < subReactorThreads.length; i++) { subReactorThreads[i] = new SubReactorThread(); } } /** * 初始化一個MainReactorThread,來進行工做 * @throws IOException * @throws ExecutionException * @throws InterruptedException */ private void initMain() throws IOException, ExecutionException, InterruptedException { //TODO_FINISHED 話說,這裏的mainReactorThreads只有一個線程,MainReactorThread可能多個線程嘛?仍是說一個端口-》一個ServerSocketChannel-》一個MainReactorThread? // 參照Netty的bossGroup的NioEventLoopGroup // 初始化並配置serverSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); // 從mainReactorThreads中挑選一個MainReactorThread int index = new Random().nextInt(mainReactorThreads.length); // 啓動挑選出來的mainReactorThread mainReactorThreads[index].doStart(); // 經過挑選出來的mainReactorThread線程對服務端serverSocketChannel進行註冊 SelectionKey selectionKey = mainReactorThreads[index].register(serverSocketChannel); // 設定監聽的事件編碼(Accept事件監聽) selectionKey.interestOps(SelectionKey.OP_ACCEPT); } public static void main(String[] args) throws IOException, ExecutionException, InterruptedException { NIOServerV3 nioServerV3 = new NIOServerV3(); nioServerV3.init(); nioServerV3.bind(8080); } }
說實話,NIO優秀的網絡編程模型,結合Doug Lea提出的Reactor線程模型已經爲現有的網絡編程提出了一個幾乎無可挑剔的解決方案。
從技術上來講,這個方案已經沒有什麼問題了。惟一的問題就是以前提到的缺點:
那麼,有沒有更好的解決方案呢?有的,那就Netty框架。
前面提到200行代碼只是實現了基本結構支持,那麼這200行的代碼就能夠提取成爲一個公共的通用模塊。
Netty框架作出了優秀的封裝(如NioEventLoopGroup,ByteBuf,ServerBootstrap等等),並且解決了NIO的epoll空輪詢等問題,更是提供了諸多的工具類,提供便利。
Netty的架構分爲三個部分:
從官方的結構圖,能夠看出其中包含三大模塊:
這裏給出一個簡單的demo(根據官方提供的echoServerDemo,增長了一些註釋),供你們認識。
package tech.jarry.learning.netease.example; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; /** * Echoes back any received data from a client. */ public final class EchoServer { static final int PORT = Integer.parseInt(System.getProperty("port", "8080")); public static void main(String[] args) throws Exception { // Configure the server. // 建立EventLoopGroup accept線程組 NioEventLoop EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 建立EventLoopGroup I/O線程組 EventLoopGroup workerGroup2 = new NioEventLoopGroup(1); try { // 服務端啓動引導工具類 ServerBootstrap b = new ServerBootstrap(); // 配置服務端處理的reactor線程組以及服務端的其餘配置 b // 設置兩個線程組(Reactor線程模型中的mainReactorThreads與subReactorThreads)。說白了就是兩個線程池 .group(bossGroup, workerGroup2) // 設置採用的channel類型(NioServerSocketChannel對應ServerSocketChannel,其它相似),底層實現用的反射 /** * ChannelFactory 的 newChannel() 方法何時會被調用就能夠了。 * 對於 NioSocketChannel,因爲它充當客戶端的功能,它的建立時機在 connect(…) 的時候; * 對於 NioServerSocketChannel 來講,它充當服務端功能,它的建立時機在綁定端口 bind(…) 的時候。 */ .channel(NioServerSocketChannel.class) //TODO 只看到是用於配置,詳細還不瞭解 .option(ChannelOption.SO_BACKLOG, 100) // 設置handler,這裏設置了Netty提供的日誌ChannelHandler(並採用了Debug級別) .handler(new LoggingHandler(LogLevel.DEBUG)) // 設置childHandler 這裏能夠經過ChannelInitializer實例,來放入多個ChannelHandler(須要重寫其中的.initChannel()方法) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { // 經過SocketChannel得到ChannelPipeline ChannelPipeline p = ch.pipeline(); // 在ChannelPipeline後面添加新的ChannelHandler /** * 每一個 Channel 內部都有一個 pipeline,pipeline 由多個 handler 組成, * handler 之間的順序是很重要的,由於 IO 事件將按照順序順次通過 pipeline 上的 handler, * 這樣每一個 handler 能夠專一於作一點點小事,由多個 handler 組合來完成一些複雜的邏輯。 * * Inbound 和 Outbound。在 Netty 中,IO 事件被分爲 Inbound 事件和 Outbound 事件。 * 例如: * 1. pipeline.addLast(new StringDecoder()); * 2. pipeline.addLast(new StringEncoder()); * 3. pipeline.addLast(new BizHandler()); */ p.addLast(new EchoServerHandler()); } }); // 經過bind啓動服務(Netty的ChannelFuture繼承自JDK的Future,只不過多了一些方法 ChannelFuture f = b // 服務端用於綁定端口(其中還涉及Java的channel等操做) .bind(PORT) // ChannelFuture新增的.sync()區別於新增的.await() /** * sync() 和 await() 用於阻塞等待 * sync() 內部會先調用 await() 方法,等 await() 方法返回後,會檢查下這個任務是否失敗,若是失敗,從新將致使失敗的異常拋出來。 * 也就是說,若是使用 await(),任務拋出異常後,await() 方法會返回,可是不會拋出異常,而 sync() 方法返回的同時會拋出異常。 */ .sync(); // 阻塞主線程,知道網絡服務被關閉 f // 用於得到封裝在Netty的ChannelFuture內的Channel(Java的),便於進行下一步操做 .channel() // 當Netty的ChannelFuture被關閉時,返回ChannelFuture .closeFuture() // 同上 .sync(); } finally { // 關閉線程組 bossGroup.shutdownGracefully(); workerGroup2.shutdownGracefully(); } } }
從上面的代碼,能夠看到,利用建造者模式,經過一個固定的模板,Netty就能夠實現一個簡單的EchoServer了。
使用的時候,通常業務的處理只是在修改其中pipeline的handler。
經過Netty,咱們幾乎只須要關注業務方面的,而且利用Netty的Pipeline,能夠很輕鬆的編排handler。
網絡模型實際上是開發人員思惟的體現。而網絡模型的演變其實就是開發人員思惟的演變。
這裏我經過一個例子,來展示網絡模型演變中思惟的演變。
客戶(Client)到公司(Server)辦理業務,會找到前臺的小姐姐(acceptor),由小姐姐引導着,處理各種業務。
可是,當存在複數位客戶時,就須要在前臺排隊,等待前臺小姐姐招待好最前面的客戶。
爲了解決這個排隊問題,Boss打算招聘多位小姐姐(線程池)。可是客戶的到來有時多,有時少,因此前臺小姐姐也是有時候很空閒,有時候很繁忙。
因爲小姐姐引導客戶處理業務時,客戶常常因爲填表(業務處理中數據庫操做等)等操做,致使小姐姐在一旁等待。而這時候,前臺還有客戶在等待。
爲了解決這個問題,Boss在前臺放置了一個鈴鐺(accept註冊)。
當有新的客戶到來時,小姐姐就會給他一個新的鈴鐺(read等事件註冊),用於在處理業務時呼喚她。
隨着客戶處理業務的上升,Boss招收了多位工做人員(worker thread),用於引導客戶處理業務。
而小姐姐(Acceptor)只負責招待客戶(只處理accept事件),交給客戶新的鈴鐺(read等事件註冊)。客戶經過新的鈴鐺找到工做人員,處理業務。
隨着業務規模的進一步提高,一位前臺小姐姐已經忙不過來了。
Boss就在前臺安排了多位小姐姐,招待與引導客戶。
看到Boss的成功,許多人都爭相模仿,可是因爲了解不深入,老是出現各類問題。
Boss但願經過這一成功經驗,幫助到其它人,因此製做了一款工做流軟件(Netty)。
其餘人只須要下載這個軟件,而後按照其中的簡單引導,進行設置,就能夠輕鬆使用。
解決問題通常都是三個步驟:
就像我在leetcode上,通常第一步,都是先按照本身最直觀的想法,先嚐試解決問題,而後再考慮優化,最後思考是否能夠通用化,工具化等。
此次只是簡單提一些網絡編程的演變過程,也算是幫助你們入門吧。
後續會進行深刻的原理分析與總結,並會寫成博客,發佈出來的。
原文出處:https://www.cnblogs.com/Tiancheng-Duan/p/11834590.html