1、Unix IO 與 IPC
Unix IO:Open-Read or Write-Closejava
IPC:open socket - receive and send to socket - close socket數據庫
IPC 全稱是 InterProcess Communication。編程
當消息發出後,消息進入 SendQ隊列 一直等待 sending socket 處理,才真正發出(一直等待是阻塞的)。當消息到達時,消息進入RecvQ隊列 一直等待 receiving socket 處理(同前)。segmentfault
底層的 TCP 協議關聯的 RecvQ 或者 SendQ 隊列就是一個操做系統緩衝區。數組
public static void simpleIo() throws IOException { InputStream input = new BufferedInputStream(new FileInputStream("/home/lg/Pictures/wallpapers/2047.png")); OutputStream out = new BufferedOutputStream(new FileOutputStream("dem.png")); byte[] buf = new byte[1024]; while (input.read(buf) > 0) { out.write(buf); } out.flush(); }
public static void simpleIo() throws IOException { BufferedReader input = new BufferedReader(new InputStreamReader(System.in)); PrintWriter out = new PrintWriter(System.out, true); String myLine; while ((myLine = input.readLine()) != null) { out.println(myLine); } } // new String(char[] chars, "utf-8") 字節序列到字符串的轉換
* PrintWriter 與 BufferedWriter 區別
1. PrintWriter的print、println方法能夠接受任意類型的參數,而BufferedWriter的write方法只能接受字符、字符數組和字符串;
2. PrintWriter的println方法自動添加換行,BufferedWriter須要顯示調用newLine方法;
3. PrintWriter的方法不會拋異常,若關心異常,須要調用checkError方法看是否有異常發生;
4. PrintWriter構造方法可指定參數,實現自動刷新緩存(autoflush);
5. PrintWriter的構造方法更廣。
3、Java 中的 socket 編程
public class Server { public static void main(String[] args) { { ServerSocket myServer = null; Socket acceptSocket = null; BufferedReader input = null; PrintWriter output = null; try { myServer = new ServerSocket(4000); while (true) { acceptSocket = myServer.accept(); input = new BufferedReader(new InputStreamReader(acceptSocket.getInputStream())); output = new PrintWriter(acceptSocket.getOutputStream(), true); String rLine; while ((rLine = input.readLine()) != null) { System.out.println("服務器接受到一條消息:\n" + rLine + "\n---------------\n\n"); if (rLine.equals("hello server")) { System.out.println("而後打了個招呼\n---------------\n\n"); output.println("hello client"); } else { System.out.println("而後嘲諷了一波\n---------------\n\n"); output.println("233333"); } } } } catch (IOException e) { e.printStackTrace(); } } } }
public class Client { private static Socket myClient; //靜態初始化時處理異常 static { try { myClient = new Socket("localhost", 4000); } catch (IOException e) { e.printStackTrace(); } } private static void sendMessage() { try { PrintWriter output = new PrintWriter(myClient.getOutputStream(), true); BufferedReader readConsole = new BufferedReader(new InputStreamReader(System.in)); String cline; while ((cline = readConsole.readLine()) != null) { output.println(cline); System.out.println("消息發送成功!\n---------------\n\n"); } } catch (IOException e) { e.printStackTrace(); } } private static void receiveMessage() { try { BufferedReader input = new BufferedReader(new InputStreamReader(myClient.getInputStream())); // 這裏省略掉了 OutputStream(System.out), 直接使用打印替代 String rLine; while ((rLine = input.readLine()) != null) { System.out.println("客戶端收到來自服務器的消息:\n" + rLine + "\n---------------\n\n"); } } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { new Thread(Client::sendMessage).start(); new Thread(Client::receiveMessage).start(); } }
客戶端使用了兩個線程:一個用於發送消息,另外一個用於接受消息。服務端只考慮了一個 socket 鏈接,所以並無用多線程。實際上,服務端一般須要同時處理多個 socket 鏈接,所以可考慮多線程。創建一個線程池,對於每一個 socket 鏈接,分派一個線程去處理。
4、NIO 與 socket 編程
1. 處理流程對比
普通的 BIO,消息要等待處理,須要先放入操做系統的 Socket 緩衝區(阻塞的SendQ與RecvQ隊列),這樣一直等待,直到 sending socket 或 receiving socket 就位後,才進行處理。
而 NIO,是 消息進入到操做系統的 Socket 緩衝區,直接複製到 Buffer (ByteBuffer.allocate)中,抑或直接進入與系統底層關聯的緩衝區(ByteBuffer.allocateDirector)。不用一直等待 sending socket 或 receiving socket 就續就進行處理,是非阻塞的。
須要說明的是等待就緒的阻塞是不使用CPU的,是在「空等」;而真正的讀寫操做的阻塞是使用CPU的,真正在"幹活",並且這個過程很是快,屬於memory copy,帶寬一般在1GB/s級別以上,能夠理解爲基本不耗時。
下圖是幾種常見I/O模型的對比:(圖片來自:UNIX網絡編程 -- I/O複用:select和poll函數)
(摘自 Linux IO模式及 select、poll、epoll詳解 )
,而同時,kernel會「監視」全部select負責的socket,當任何一個socket中的數據準備好了,select就會返回。這個時候用戶進程再調用read操做,將數據從kernel拷貝到用戶進程。注意與 nio 的關係:在IO multiplexing Model中,實際中,對於每個socket,通常都設置成爲non-blocking,可是,如上圖所示,整個用戶的process實際上是一直被block的。只不過process是被select這個函數block,而不是被socket IO給block。
NIO的主要事件有幾個:讀就緒、寫就緒、有新鏈接到來。Selector 主要圍繞這幾個事件作分發。註冊全部感興趣的事件處理器,單線程輪詢選擇就緒事件,執行事件處理器。
public class Server { private static Selector selector; static { try { selector = Selector.open(); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { sscRegister(); } private static void sscRegister() { try { ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); ssc.socket().bind(new InetSocketAddress("localhost", 8000)); ssc.register(selector, SelectionKey.OP_ACCEPT); dispatcher(selector); } catch (IOException e) { e.printStackTrace(); } } private static void dispatcher(Selector selector) { try { while (true) { int select = selector.select(); if (select == 0) { System.out.println("返回 0 表示 SelectedKeys 未更新"); continue; } Iterator<SelectionKey> selectedIterator = selector.selectedKeys().iterator(); while (selectedIterator.hasNext()) { SelectionKey key = selectedIterator.next(); selectedIterator.remove(); // 只是從 SelectedKeys 中移除,而並非從整個選擇器的鍵集中移除。 if (!key.isValid()) { continue; } if (key.isConnectable()) { System.out.println(233); } if (key.isAcceptable()) { accept(key); } if (key.isReadable()) { read(key); } } } } catch (IOException e) { e.printStackTrace(); } } private static void accept(SelectionKey key) { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); try { SocketChannel socketChannel; if ((socketChannel = serverSocketChannel.accept()) != null) { // 服務端非阻塞時,直接返回 null socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); } } catch (IOException e) { e.printStackTrace(); } } private static void read(SelectionKey key) { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); try { if (channel.read(buffer) > 0) { System.out.println("客戶端:" + new String(buffer.array())); buffer.clear(); } } catch (IOException e) { e.printStackTrace(); } finally { buffer.clear(); } } }
public class Client { private static void clientSendMsg() { try { SocketChannel client = SocketChannel.open(new InetSocketAddress("localhost", 8000)); System.out.println("已向服務器發出請求!"); String msg = "hello" + Thread.currentThread().getName(); ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes()); client.write(buffer); buffer.flip(); Thread.sleep(5); client.close(); } catch (IOException | InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { new Thread(Client::clientSendMsg, "Thread-A").start(); } }
