從BIO到Netty

想要進行節點與節點間、客戶端與服務器端、進程與進程間的通訊,須要經過網絡IO進行。JAVA經過Socket封裝TCP/IP網絡協議,進行網絡之間的數據傳輸。java

1.首先來回顧一下ISO七層模型和TCP/IP四層協議bootstrap

 

2.瞭解一下JAVA經過Socket進行網絡通訊的流程瀏覽器

本地的進程經過PID標識惟一進程號,網絡中經過協議+ip地址+端口號標識一個進程號,如:http://10.10.10.10:8080,網絡間的進程被惟一標識後,能夠進行網絡通訊。Socket將TCP/IP複雜的操做,封裝爲簡單的接口供應用層使用,實現網絡通訊。就像文件IO的「打開—讀寫—關閉」同樣,網絡間的通訊也變成客戶端、服務器端能夠給本身的」文件「寫入內容,供對方讀取,通訊結束時關閉「文件」。服務器

Socket通訊流程:網絡

服務端accept()方法會產生阻塞,等待客戶端鏈接。併發

3.JAVA中的ServerSocket、Socket類框架

ServerSocket類:                                                        Socket類:socket

   

4.簡單的傳統BIO通訊例子(同步阻塞一問一答式)maven

服務端代碼ide

package service; import java.io.*; import java.net.ServerSocket; import java.net.Socket; public class BIOServerTest { public static void main(String[] args) throws IOException { new BIOServerTest().bind(); } public void bind() throws IOException { ServerSocket server = null; try { server = new ServerSocket(8000); Socket socket = null; while (true) { socket = server.accept(); new Thread(new BIOServerHandler(socket)).start(); } } catch (IOException e) { e.printStackTrace(); } finally { if (server != null) { server.close(); } } } public class BIOServerHandler implements Runnable { private Socket socket; public BIOServerHandler(Socket socket) { this.socket = socket; } public void run() { BufferedReader in = null; PrintWriter out = null; try { in = new BufferedReader(new InputStreamReader(this.socket.getInputStream())); out = new PrintWriter(this.socket.getOutputStream(), true); String body = null; while (true) { body = in.readLine(); if (body == null) { break; } System.out.println("client request " + body); out.println("ok"); } } catch (Exception e) { if (in != null) { try { in.close(); } catch (IOException e1) { e1.printStackTrace(); } } if (out != null) { out.close(); } if (socket!=null){ try { socket.close(); } catch (IOException e1) { e1.printStackTrace(); } } } } } }
View Code

客戶端代碼

package service; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; public class BIOClientTest { public static void main(String[] args) { Socket socket = null; BufferedReader in = null; PrintWriter out = null; try { socket = new Socket("127.0.0.1", 8000); in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(), true); out.println("hello"); String resp = in.readLine(); System.out.println("server response " + resp); } catch (Exception e) { } finally { if (in != null) { try { in.close(); } catch (IOException e) { e.printStackTrace(); } } if (out != null) { out.close(); } if (socket != null) { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
View Code

傳統的同步阻塞BIO(Blocking IO)的處理過程當中,每有一個客戶端的請求,服務端就須要啓動一個線程與之對接,監聽數據傳輸。成千上百的客戶端訪問,在服務端就須要啓動成千上百個線程,每個線程在監聽不到數據的時候,又不能獲得充分的利用,這種模型顯然沒法知足高性能、高併發的場景。

5.NIO

JDK1.4引入一個新的庫NIO,有人稱之爲New IO,有人稱之爲Non-blocking IO,它出現的目的就是要解決阻塞問題,實現非阻塞網絡通訊,咱們就暫且稱之爲非阻塞IO吧。

在上面的BIO中,數據是面向字節傳輸,NIO則是面向塊傳輸的,就比如吃米飯,一粒一粒吃,確定沒有一勺一勺吃有感受,又快又爽,此時它即是在java中提供的高速的、面向塊的IO。

下面來了解NIO傳輸的相關概念:

緩衝區buffer

在面向流的IO中,是將數據寫入或讀取至Stream對象中(InputStream/OutputStream)。在NIO中,服務端和客戶端都是經過訪問緩衝區讀取、寫入來操做數據。每一種java基本類型都對應一種緩衝區如:ByteBuffer、IntBuffer等。每個Buffer類都是Buffer接口的子類。

通道Channel

Channel是一個通道,網絡數據經過Channel讀取和寫入。Channel不像流,寫入要用InputStream,讀取要用OutputStream,Channel是雙向的,因此既能夠用來讀,也能夠用來寫。

多路複用器Selector

多路複用器提供選擇已經就緒的任務的能力,至關於僱傭一個服務員來對接多個用餐的客戶,Selector會不斷輪詢註冊在其上的Channel,若是某個Channel發生讀寫事件,這個Channel便處於就緒狀態,會被Selector輪詢處來,經過SelectorKey獲取就緒的Channel後,進行後續的IO操做。在BIO中客戶端每有一個客戶請求,服務端就須要一個服務員來對接,一個顧客,一個服務員。而NIO用多路複用器以後,就能夠一個服務端的線程,服務多個客戶端請求,實現高可用。

關鍵步驟:

(1)建立ServerSocketChannel,配置爲非阻塞模式 (2)綁定監聽,配置TCP參數 (3)建立獨立IO線程,用於輪詢多路複用器Selector (4)建立Selector,將(1)中的serverSocketChannel註冊在selector上,並監聽selectorKey.ACCEPT (5)啓動IO線程,循環執行Selector.select()方法,輪詢就緒的Channel (6)輪詢到就緒的channel時,若是是新的客戶端接入,調用ServerSocketChannel.accept()方法,接受新的客戶端。 (7)將新接入的客戶端SocketChannel設爲非阻塞模式,配置TCP參數 (8)SocketChannel註冊到Selector,監聽OP_READ操做位 (9)若是輪詢到Channel爲OP_READ,則讀取數據 (10)若是爲OP_WRITE,說明數據還在發送。
View Code

 NIO原生接口複雜,不易編寫,很差維護,感興趣的同窗能夠去了解下。

6.Netty

Netty是NIO框架之一,接口簡潔,入門簡單,有好的穩定性、健壯性、可擴展性,已經獲得了較爲普遍的使用。

Netty 提供基於NIO的Server/Client網絡應用框架,服務端啓動後,就開始等待瀏覽器(httpRequest請求)或者客戶端的請求(服務器綁定端口監聽)。瀏覽器輸入ip:port後,server獲得請求,通過serverHandler處理後,返回給瀏覽器或者客戶端。
7.Netty的使用

a.maven工程添加netty依賴

<dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.33.Final</version>
        </dependency>
View Code

b.基於netty的服務端開發:啓動,添加線程組,添加過濾器,綁定端口,監聽,釋放資源

package service; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class NettyServerTest { public static void main(String[] args) { new NettyServerTest().bind(); } public void bind() { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup). channel(NioServerSocketChannel.class). option(ChannelOption.SO_BACKLOG, 1024). childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new ServerHandler()); } }); try { ChannelFuture f = bootstrap.bind(8000).sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public class ServerHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuffer = (ByteBuf) msg; byte[] req = new byte[byteBuffer.readableBytes()]; byteBuffer.readBytes(req); String body = new String(req, "UTF-8"); System.out.println("request body:" + body); ByteBuf resp = Unpooled.copiedBuffer(new String("ok").getBytes()); ctx.write(resp); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); } } }
View Code

c.基於netty的客戶端開發:啓動,添加線程組,添加過濾器,創建鏈接,監聽,釋放資源

package service; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class NettyClientTest { public static void main(String[] args) { new NettyClientTest().connect(); } public void connect() { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new ClientHandler()); } }); try { ChannelFuture f = bootstrap.connect("127.0.0.0", 8000).sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } private class ClientHandler extends ChannelHandlerAdapter { ByteBuf message; public ClientHandler() { byte[] req = "hello".getBytes(); message = Unpooled.buffer(req.length); message.writeBytes(req); } @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(message); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; byte[] resp = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(resp); String body = new String(resp, "UTF-8"); System.out.println("server response " + body); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); } } }
View Code
相關文章
相關標籤/搜索