1、阻塞IO與非阻塞IOjava
Linux網絡IO模型(5種)spring
(1)阻塞IO模型sql
全部文件操做都是阻塞的,以套接字接口爲例,在進程空間中調用recvfrom,系統調用直到數據包到達且被複制到應用進程緩衝區或發生錯誤時才返回,期間會一直等待(阻塞)。模型如圖:apache
(2)非阻塞IO模型bootstrap
recvfrom從應用層到內核時,若是該緩衝區沒數據,直接返回一個EWOULDBLOCK錯誤,反覆輪詢檢查這個狀態,看是否有數據到來。如圖:api
(3)IO複用模型數組
Linux提升select/poll,進程經過將一個或多個fd(file descriptor)傳遞給select或poll系統調用,阻塞在select操做上,偵測多個fd是否處於就緒狀態。select/poll順序掃描fd是否就緒,並且支持的fd數量有限。Linux還提供了一個epoll系統調用,使用基於事件驅動的方式代替順序掃描,性能更高。當有fd就緒時,當即回調函數rollback。如圖:安全
(4)信號驅動IO模型服務器
首先開啓套接口信號驅動IO功能,經過系統調用sigaction執行一個信號處理函數,該函數當即返回,進程繼續工做,它是非阻塞的。當數據準備就緒時,就爲該進程生成一個SIGIO信號,經過信號回調通知應用程序調用recfrom來讀取數據,通知主循環函數處理數據。如圖:網絡
(5)異步IO模型
告知內核啓動某個操做,讓內核在整個操做完成後(包括將數據從內核複製到用戶本身的緩衝區)通知咱們。它與信號驅動的主要區別是:信號驅動IO由內核告知咱們什麼時候開始一個IO操做,異步IO模型由內核通知咱們IO操做什麼時候已經完成。如圖所示:
IO多路複用的應用:
經過把多個IO的阻塞複用到一個select的阻塞上,使系統在單線程下可處理多個客戶端請求。與傳統多線程模型相比,最大優點是系統開銷小,不須要建立額外進程或線程。主要應用場景以下:
(1)服務器須要同時處理多個處於監聽狀態或鏈接狀態的套接字
(2)服務器須要同時處理多種網絡協議的套接字
Linux最終選擇epoll支持IO多路複用的系統調用,優勢以下:
(1)支持一個進程打開的socket描述符(FD)不受限制(select單線程默認1024太少,epoll僅受限操做系統最大文件句柄數,1GB內存機器大約10萬句柄)
(2)IO效率不會隨FD數目增長而線性降低(只對「活躍」的socke進行t操做,活躍socket纔會去主動調用callback函數)
(3)使用mmap加速內核與用戶空間消息傳遞(同一塊內存,避免沒必要要複製)
(4)API簡單:建立epoll描述符,添加監聽事件,阻塞等待監聽事件發生,關閉epoll描述符等
2、阻塞IO的例子(結合線程池)
//1.服務端 package com.xbai.io; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import com.xbai.executor.TimeServerHandlerExecutePool; import com.xbai.handler.TimeServerHandler; public class TimeServerExecutor { public static void main(String[] args)throws IOException { int port =8080; if(args !=null && args.length >0){ try { port = Integer.valueOf(args[0]); }catch (Exception e) { // TODO: handle exception } } ServerSocket server =null; try { server =new ServerSocket(port); System.out.println("The time server is started in port : " + port); TimeServerHandlerExecutePool singleExecutor =new TimeServerHandlerExecutePool(50,10000); while(true){ Socket socket = server.accept(); singleExecutor.execute(new TimeServerHandler(socket)); } }finally { if(server !=null){ System.out.println("The time server closed"); server.close(); server =null; } } } }
//2.服務端線程池 package com.xbai.executor; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class TimeServerHandlerExecutePool { private ExecutorService executor; public TimeServerHandlerExecutePool(int maxPoolSize,int queueSize){ executor =new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),maxPoolSize,120L,TimeUnit.SECONDS, new ArrayBlockingQueue(queueSize));//線程池要執行的任務阻塞成一個隊列,其內部的機制是等待喚醒生產者和消費者線程,有一個生產就可喚醒一個消費,去看源碼的線程池原理 } public void execute(Runnable task){ executor.execute(task); } }
//3.服務端處理器 package com.xbai.handler; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.net.Socket; import java.sql.Date; public class TimeServerHandler implements Runnable{ private Socketsocket; public TimeServerHandler(Socket socket) { this.socket = socket; } @Override public void run() { // TODO Auto-generated method stub BufferedReader br =null; PrintWriter pw =null; try { br =new BufferedReader(new InputStreamReader(socket.getInputStream())); pw =new PrintWriter(socket.getOutputStream(),true); String curTime =null; String msg =null; while(true){ msg = br.readLine(); if(msg ==null){ break; } System.out.println("The time server received order:" + msg); curTime ="query time order".equalsIgnoreCase(msg) ?new Date( System.currentTimeMillis()).toString() :"bad order"; pw.println(curTime);//這裏不寫println,就沒法插入換行符,那邊就不能readLine,一直阻塞,沒法獲取數據 } }catch (IOException e) { if(br !=null){ try { br.close(); }catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } if(pw !=null){ pw.close(); pw =null; } if(socket !=null){ try { socket.close(); }catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } socket =null; } } } }
//4.客戶端代碼 package com.xbai.io; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import java.net.UnknownHostException; public class TimeClient { public static void main(String[] args) { int port =8080; if(args !=null && args.length >0){ try { port = Integer.valueOf(args[0]); }catch (Exception e) { // TODO: handle exception } } Socket socket =null; BufferedReader br =null; PrintWriter pw =null; try { socket =new Socket("localhost",port); br =new BufferedReader(new InputStreamReader(socket.getInputStream())); pw =new PrintWriter(socket.getOutputStream(),true); pw.println("query time order"); System.out.println("send order succeed"); String resp = br.readLine(); System.out.println("Now is :" + resp); }catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); }finally{ if(pw !=null){ pw.close(); pw =null; } if(br !=null){ try { br.close(); }catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } br =null; } if(socket !=null){ try { socket.close(); }catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } socket =null; } } } }
執行結果
服務端啓動及收發:
客戶端發送和接收:
3、非阻塞IO的例子(原生Java NIO,目前有寫半包等問題,懷疑服務端沒有寫出去致使的客戶端Selector的關閉狀態異常)
//1.服務端主程序 package com.xiaobai.nio; public class NIOServer { public static void main(String[] args) { MultiplexerTimeServer timeServer = new MultiplexerTimeServer(); new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start(); } }
//2.服務端timeServer package com.xiaobai.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Date; import java.util.Iterator; import java.util.Set; public class MultiplexerTimeServer implements Runnable { private Selector selector; private ServerSocketChannel servChannel; private volatile boolean stop; public MultiplexerTimeServer() { try { selector = Selector.open();//創建Selector servChannel = ServerSocketChannel.open();//創建Channel servChannel.configureBlocking(false); servChannel.socket().bind(new InetSocketAddress(2048), 1024);//ServerSocket綁定 servChannel.register(selector, SelectionKey.OP_ACCEPT);//向Selector註冊ACCEPT事件 System.out.println("The time server is started in port 2048"); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override public void run() { while(!stop){ try { selector.select(1000);//輪詢Channel Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; while(it.hasNext()){ key = it.next(); it.remove();//移除它 try { handleInput(key); } catch (Exception e) { if(key != null){ key.cancel(); if(key.channel() != null){ key.channel().close(); } } } } } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } if(selector != null){ try { selector.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } private void handleInput(SelectionKey key) throws IOException{ if(key.isValid()){ //處理新接入的請求 if(key.isAcceptable()){//此前已向Selector註冊,並已open //獲取server channel ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); //獲取client channel SocketChannel sc = ssc.accept(); sc.configureBlocking(false); //第一次捕捉到的客戶端向Selector註冊READ事件 sc.register(selector, SelectionKey.OP_READ); } //處理已註冊的讀事件 if(key.isReadable()){ //獲取客戶端Channel SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer);//讀到緩衝 if(readBytes > 0){ readBuffer.flip(); // Buffer java.nio.Buffer.flip() // // // Flips this buffer. The limit is set to the current position and then the position is set to zero. If the mark is defined then it is discarded. // // After a sequence of channel-read or put operations, invoke this method to prepare for a sequence of channel-write or relative get operations. For example: // // buf.put(magic); // Prepend header // in.read(buf); // Read data into rest of buffer // buf.flip(); // Flip buffer // out.write(buf); // Write header + data to channel byte[] bytes = new byte[readBuffer.remaining()];//緩衝中有多少個字節數據 readBuffer.get(bytes); String body = new String(bytes,"UTF-8"); System.out.println("The time server receive order : " + body); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(// System.currentTimeMillis()).toString() : "BAD ORDER"; doWrite(sc,currentTime); }else if(readBytes < 0){ //貴在堅持! //對端鏈路關閉 // key.cancel(); // sc.close(); }else{ ;//讀到0字節,忽略 } } } } private void doWrite(SocketChannel channel, String response) throws IOException{ if(response != null && response.trim().length() > 0){ byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);//根據字節數組容量建立ByteBuffer writeBuffer.put(bytes);//字節數組複製到緩衝區 writeBuffer.flip(); channel.write(writeBuffer);//SocketChannel是異步非阻塞的,不保證一次發送完,出現「寫半包」問題, //這裏缺乏註冊寫操做,不斷輪詢Selector將沒有發送完的ByteBuffer發送完畢 //TODO 這裏有問題,沒有寫出去,致使客戶端沒法收到消息,顯示Selector關閉狀態異常 } } }
//3.客戶端主程序 package com.xiaobai.nio; public class NIOClient { public static void main(String[] args) { TimeClientHandle timeClientHandle = new TimeClientHandle("127.0.0.1",2048); new Thread(timeClientHandle,"NIO-TimeClient-001").start(); } }
//4.客戶端timeClient package com.xiaobai.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Date; import java.util.Iterator; import java.util.Set; public class TimeClientHandle implements Runnable { private String host; private int port; private Selector selector; private SocketChannel socketChannel; private volatile boolean stop; public TimeClientHandle(String host,int port) { this.host = host==null?"127.0.0.1":host; this.port = port; try { selector = Selector.open(); socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); } catch (Exception e) { // TODO: handle exception } } @Override public void run() { try { doConnect(); } catch (Exception e) { // TODO: handle exception } while(!stop){ try { selector.select(3000); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; while(it.hasNext()){ key = it.next(); it.remove(); try { handleInput(key); } catch (Exception e) { if(key != null){ key.cancel(); if(key.channel() != null){ key.channel().close(); } } } } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } if(selector != null){ try { selector.close(); } catch (Exception e) { // TODO: handle exception } } } } private void handleInput(SelectionKey key) throws Exception{ if(key.isValid()){ //判斷是否鏈接成功 //鏈接方法中已有鏈接不成功註冊鏈接事件的邏輯,反覆嘗試鏈接,這裏判斷,若是成功,註冊該客戶鏈接的read事件準備接收數據 SocketChannel sc = (SocketChannel) key.channel(); if(key.isConnectable()){ if(sc.finishConnect()){ sc.register(selector, SelectionKey.OP_READ); doWrite(sc);//本客戶向外寫東西 } } //下面是從服務器接收數據 if(key.isReadable()){ ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer);//讀到緩衝 if(readBytes > 0){ readBuffer.flip(); // Buffer java.nio.Buffer.flip() // // // Flips this buffer. The limit is set to the current position and then the position is set to zero. If the mark is defined then it is discarded. // // After a sequence of channel-read or put operations, invoke this method to prepare for a sequence of channel-write or relative get operations. For example: // // buf.put(magic); // Prepend header // in.read(buf); // Read data into rest of buffer // buf.flip(); // Flip buffer // out.write(buf); // Write header + data to channel byte[] bytes = new byte[readBuffer.remaining()];//緩衝中有多少個字節數據 readBuffer.get(bytes); String body = new String(bytes,"UTF-8"); System.out.println("Now is : " + body); this.stop = true; }else if(readBytes < 0){ //貴在堅持! //對端鏈路關閉 key.cancel(); sc.close(); }else{ ;//讀到0字節,忽略 } } } } private void doConnect() throws IOException { //若是鏈接成功,則直接註冊到多路複用器上,發送請求消息,讀應答 if(socketChannel.connect(new InetSocketAddress(host, port))){//異步鏈接,直至成功 socketChannel.register(selector, SelectionKey.OP_READ); doWrite(socketChannel); }else{//註冊鏈接事件,輪詢直至鏈接成功 //異步,究竟是什麼概念?底層是什麼原理?TCP/IP層面 socketChannel.register(selector, SelectionKey.OP_CONNECT); } } private void doWrite(SocketChannel sc) throws IOException { //本客戶向外寫東西 byte[] req = "QUERY TIME ORDER".getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); sc.write(writeBuffer); if(!writeBuffer.hasRemaining()){ System.out.println("Send order 2 server succeed."); } } }
4、TCP與UDP
5、網絡傳輸粘包與拆包問題
6、Netty入門案例與原理分析、Reactor模式
第一個例子:
//1.NettyServer package com.xiaobai.server.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import java.net.InetSocketAddress; public class NettyServer { private final int port; public NettyServer(int port) { this.port = port; } public static void main(String[] args) throws InterruptedException { if(args.length != 1) { System.err.println("Usage:" + NettyServer.class.getSimpleName() + " <port>"); return; } int port = Integer.parseInt(args[0]); new NettyServer(port).start(); } private void start() throws InterruptedException { final NettyServerHandler serverHandler = new NettyServerHandler(); EventLoopGroup group = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(group).channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(serverHandler); } }); ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { group.shutdownGracefully().sync(); } } }
//2.NettyServerHandler package com.xiaobai.server.netty; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; @ChannelHandler.Sharable public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; System.out.println("Server received:" + in.toString(CharsetUtil.UTF_8)); ctx.write(in); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.EMPTY_BUFFER) .addListener(ChannelFutureListener.CLOSE);//關閉該Channel } }
//3.NettyClient package com.xiaobai.server.netty; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.net.InetSocketAddress; public class NettyClient { private final String host; private final int port; public NettyClient(String host, int port) { this.host = host; this.port = port; } public static void main(String[] args) throws InterruptedException { if(args.length != 2) { System.err.println("Usage:" + NettyServer.class.getSimpleName() + " <host> <port>"); return; } String host = args[0]; int port = Integer.parseInt(args[1]); new NettyClient(host,port).start(); } private void start() throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress(host,port)) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyClientHandler()); } }); ChannelFuture f = b.connect().sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { group.shutdownGracefully().sync(); } } }
//4.NettyClientHandler package com.xiaobai.server.netty; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerInvoker; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; import io.netty.util.concurrent.EventExecutorGroup; public class NettyClientHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override protected void messageReceived(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { System.out.println("Client received: " + byteBuf.toString(CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!",CharsetUtil.UTF_8)); } }
執行結果:
服務端:
客戶端:
//服務端啓動spring配置文件applicationContext.xml <?xml version="1.0" encoding="UTF-8" ?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"> <bean id="nettyServer" class="com.xiaobai.netty.server.NettyServer" init-method="start"> <constructor-arg index="0" type="int"> <value>8888</value> </constructor-arg> </bean> </beans>
//服務端啓動類 package com.xiaobai.netty.spring; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class SpringStart { public static void main(String[] args) { ApplicationContext application = new ClassPathXmlApplicationContext("com/xiaobai/netty/spring/applicationContext.xml"); } }
//NettyServer package com.xiaobai.netty.server; import com.xiaobai.netty.handlers.ChildChannelHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import org.apache.log4j.Logger; public class NettyServer { private static final Logger logger = Logger.getLogger(NettyServer.class); //無參 public NettyServer() { } //用於spring管理構造函數初始化bean public NettyServer(int port) { this.port = port; } private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; private ServerBootstrap bootstrap; private int port; public void start() { bossGroup = new NioEventLoopGroup(); workerGroup = new NioEventLoopGroup(); try { bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,1024) .childHandler(new ChildChannelHandler()); //同步等待綁定端口成功 ChannelFuture future = bootstrap.bind(port).sync(); System.out.println("NettyServer Successfully Started in port:" + port); logger.info("NettyServer Successfully Started in port:" + port); //同步等待服務器端口關閉 future.channel().closeFuture().sync();//經實驗,這是阻塞方法,一直阻塞 }catch (Exception e) { //優雅退出,釋放線程池資源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
//ChildChannelHandler package com.xiaobai.netty.handlers; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; //注意參數化類型不要引錯包 public class ChildChannelHandler extends ChannelInitializer<SocketChannel>{ //原理來自平時讀書實踐(nio,aio,同步/異步原理,底層) //接收到客戶端鏈接(可鏈接)後的初始化動做,使用責任鏈模式綁定一系列數據讀寫操做,用於可讀可寫時的操做 @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024)); socketChannel.pipeline().addLast(new StringDecoder()); socketChannel.pipeline().addLast(new TimeServerHandler());//由簡入難,不斷調試、琢磨的框架 } }
//TimeServerHandler package com.xiaobai.netty.handlers; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import java.util.Date; public class TimeServerHandler extends ChannelHandlerAdapter { private int counter; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // ByteBuf buf = (ByteBuf) msg; // byte[] req = new byte[buf.readableBytes()]; // buf.readBytes(req); // String body = new String(req,"UTF-8").substring(0,req.length - System.getProperty("line.separator").length()); String body = (String)msg; System.out.println("The time server receive order:" + body + " ; the counter is : " + ++counter); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() :"BAD ORDER"; currentTime = currentTime + System.getProperty("line.separator"); ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); ctx.write(resp); } //這些都是主程序代碼中出現這些狀況後調用的接口代碼 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } }
//客戶端啓動類 package com.xiaobai.netty.client; import org.apache.log4j.Logger; public class NettyTest { private static final Logger logger = Logger.getLogger(NettyTest.class); public static void main(String[] args) { try { logger.info("Netty communication start!"); NettyClient client = new NettyClient("127.0.0.1",8888); client.send(); }catch (Exception e) { logger.error("Client connected failure!"); } } }
//NettyClient package com.xiaobai.netty.client; import com.xiaobai.netty.handlers.ChildChannelHandler; import com.xiaobai.netty.handlers.TimeClientHandler; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import org.apache.log4j.Logger; public class NettyClient { private static final Logger logger = Logger.getLogger(NettyClient.class); //無參 public NettyClient() { } //用於spring管理構造函數初始化bean public NettyClient(String host,int port) { this.host = host; this.port = port; } private EventLoopGroup group; private Bootstrap bootstrap; private int port; private String host; public void send() { try { group = new NioEventLoopGroup(); if(host != null && host.trim() != "") { bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY,true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024)); socketChannel.pipeline().addLast(new StringDecoder()); socketChannel.pipeline().addLast(new TimeClientHandler()); } }); //發起異步鏈接操做 System.out.println("NettyClient connecting " + host +":" + port); ChannelFuture future = bootstrap.connect(host,port).sync(); logger.info("NettyClient connected " + host +":" + port); //等待客戶端關閉 future.channel().closeFuture().sync(); }else{ logger.info("accessing nowhere!"); } }catch (Exception e) { //優雅退出,釋放線程池資源 group.shutdownGracefully(); } } }
//TimeClientHandler package com.xiaobai.netty.handlers; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import org.apache.log4j.Logger; public class TimeClientHandler extends ChannelHandlerAdapter { private static final Logger logger = Logger.getLogger(TimeClientHandler.class); private int counter; private byte[] req; public TimeClientHandler() { req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.warn("Unexpected exception from downstream:" + cause.getMessage()); ctx.close(); } //已鏈接後發送消息 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //不要反覆用一個ByteBuf,經測試會出現沒法發佈的問題--上網多查,研究其中原理!! ByteBuf message = null; for(int i = 0;i < 100; i++) { message = Unpooled.buffer(req.length); message.writeBytes(req); ctx.writeAndFlush(message); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // ByteBuf buf = (ByteBuf) msg; // byte[] req = new byte[buf.readableBytes()]; // buf.readBytes(req); // String body = new String(req,"UTF-8"); String body = (String)msg; System.out.println("Now is:" + body + " ; the counter is : " + ++counter); } }
執行結果:
服務端:
客戶端:
重要服務端組件類:
NioEventLoopGroup:一個線程組,包含了一組NIO線程,專門用於網絡事件處理,實際上它們就是Reactor線程組。這裏建立兩個,一個用於服務端接受客戶端的鏈接,另外一個用於SocketChannel的網絡讀寫。
ServerBootstrap:Netty用於啓動NIO服務端的輔助啓動類,下降開發複雜度。它的group方法將兩個NIO線程組看成入參傳遞到ServerBootstrap中。backlog:TCP參數,這裏設置爲1024.
NioServerSocketChannel:功能對應於JDK NIO類庫中的ServerSocketChannel類。
ChildChannelHandler:綁定I/O事件的處理類,做用相似於Reactor模式中的Handler類,主要用於處理網絡I/O事件,例如記錄日誌、對消息進行編解碼等。
服務端啓動輔助類配置完成後,調用它的bind方法綁定監聽端口,隨後調用它的同步阻塞方法sync等待綁定操做完成。完成以後Netty會返回一個ChannelFuture,它的功能相似於JDK的java.util.concurrent.Future,主要用於異步操做的通知回調。
future.channel().closeFuture.sync()是阻塞方法(一直阻塞,直到服務關閉),等待服務端鏈路關閉後才退出。
shutdownGracefully方法:優雅退出,釋放關聯資源。
ByteBuf:相似於JDK中的java.nio.ByteBuffer對象,不過它提供了更增強大和靈活的功能。經過ByteBuf的readableBytes方法可獲取緩衝區可讀字節數,根據可讀字節數建立byte數組,經過ByteBuf的readBytes方法將緩衝區字節數組複製到新建byte數組中,經過ChannelHandlerContext的write方法異步發送應答消息給客戶端。
ChannelHandlerContext的flush方法:將消息發送隊列中的消息寫入到SocketChannel中發送給對方。從性能角度考慮,爲了防止頻繁喚醒Selector進行消息發送(Writable事件),Netty的write方法並不直接將消息寫入SocketChannel中,只是把待發送的消息放到發送緩衝數組中,再經過調用flush方法,將發送緩衝區中的消息所有寫到SocketChannel中。
服務端建立、客戶端接入源碼與流程
1)建立ServerBootstrap實例。ServerBootstrap是Netty服務端啓動輔助類,提供了一系列方法用於設置服務端啓動相關參數,底層經過門面模式對各類能力進行抽象和封裝,下降用戶開發難度。ServerBootstrap只有一個無參構造函數,由於須要設置的參數太多了,且可能發生變化,故採用的是Builder模式。
2)設置並綁定Reactor線程池。ServerBootstrap的線程池是EventLoopGroup,實際就是EventLoop數組。EventLoop的職責是處理全部註冊到本線程多路複用器Selector上的Channel,Selector輪詢操做由綁定的EventLoop線程run方法驅動,在一個循環體內循環執行。EventLoop不只僅處理網絡I/O事件,用戶自定義Task和定時任務Task也統一由EventLoop負責處理,實現了線程模型統一。從調度層面看,也不存在從EventLoop線程中再啓動其餘類型線程用於異步執行其餘任務。避免了多線程併發操做和鎖競爭,提高了I/O線程的處理和調度性能。
建立兩個EventLoopGroup(不是必需兩個,可只建立一個共享),前一個是父,後一個是子。父被傳入父類構造函數。
3)設置並綁定服務端Channel.做爲NIO服務端,需建立ServerSocketChannel.Netty對原生NIO類庫進行了封裝,對應實現是NioServerSocketChannel.Netty的ServerBootstrap提供了channel方法用於指定服務端Channel類型,經過工廠類,利用反射建立NioServerSocketChannel對象。
指定NioServerSocketChannel後,設置TCP的backlog參數,底層C對應接口:
int listen(int fd,int backlog);
backlog指定內核爲此套接口排隊的最大鏈接個數。對於給定監聽套接口,內核要維護兩個隊列:未連接隊列和已鏈接隊列,根據TCP三路握手過程當中的三個分節來分隔這兩個隊列。服務器處於listen狀態時,收到客戶端syn分節(connect)時在未完成隊列中建立 一個新條目,而後後三路握手的第二個分節即服務器syn響應客戶端,此條目在第三分節到達前(客戶端對服務器syn的ack)一直保留在未完成 鏈接隊列中。三路握手完成,條目從未完成鏈接隊列搬到已完成鏈接隊列尾部。當進程調用accept時,從已完成隊列中的頭部取出一個條目給進程。當已完成隊列未空時進程將 睡眠,直到有條目才喚醒。backlog被規定爲兩個隊列總和最大值。大多數實現默認爲5,高併發下不夠,未完成鏈接隊列長度可能因客戶端syn的到達及等待三路握手 第三分節的到達延時而增大。Netty默認backlog爲100,用戶可根據實際場景和網絡情況 靈活設置。
4)鏈路創建時建立並初始化ChannelPipeline.ChannelPipeline不是NIO服務端必需,本質是一個負責處理網絡事件的職責鏈,管理和執行ChannelHandler.網絡事件以事件流形式流轉,根據ChannelHandler執行策略調度執行。典型網絡事件:
a.鏈路註冊 b.鏈路激活 c.鏈路斷開 d.接收到請求消息 e.請求消息接收並處理完畢 f.發送應答消息 g.鏈路發生異常 h.發生用戶自定義事件
5)添加並設置ChannelHandler.ChannelHandler是Netty提供給用戶定製和擴展的關鍵接口,例如消息編解碼,心跳,安全認證,TSL/SSL認證,流量控制,流量整形等。Netty提供了大量系統ChannelHandler,比較實用的以下:
a.系統編解碼框架:ByteToMessageCodec b.通用基於長度的半包解碼器:LengthFieldBasedFrameDecoder c.碼流日誌打印:LoggingHandler d.SSL安全認證:SslHandler e.鏈路空閒檢測:IdleStateHandler f.流量整形:ChannelTrafficShapingHandler g.Base64編解碼:Base64Decoder和Base64Encoder
建立和添加ChannelHandler示例代碼:
bossGroup = new NioEventLoopGroup(); workerGroup = new NioEventLoopGroup(); try { bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,1024) .childHandler(new ChannelInitializer<SocketChannel>(){ @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024)); socketChannel.pipeline().addLast(new StringDecoder()); socketChannel.pipeline().addLast(new TimeServerHandler());//由簡入難,不斷調試、琢磨的框架 } }); //同步等待綁定端口成功 ChannelFuture future = bootstrap.bind(port).sync(); System.out.println("NettyServer Successfully Started in port:" + port); logger.info("NettyServer Successfully Started in port:" + port); //同步等待服務器端口關閉 future.channel().closeFuture().sync();//經實驗,這是阻塞方法,一直阻塞 }catch (Exception e) { //優雅退出,釋放線程池資源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }
用戶可爲啓動輔助類和其父類分別指定Handler.兩類Handler的用途不一樣:子類Handler是NioServerSocketChannel對應的ChannelPipeline的Handler,父類中的Handler是客戶端新接入的鏈接SocketChannel對應的ChannelPipeline的Handler.
本質區別就是:ServerBootstrap中的Handler是NioServerSocketChannel使用的,全部鏈接該監聽端口的客戶端都會執行它;父類AbstractBootstrap中的Handler是個工廠類,爲每一個新接入的客戶端都建立一個新的Handler.
6)綁定並啓動監聽端口。將ServerSocketChannel註冊到Selector上監聽客戶端鏈接。
建立新的NioServerSocketChannel,兩個參數:第一個參數是從父類NIO線程池中順序獲取的一個NioEventLoop,它就是服務端用於監聽和接收客戶端鏈接的Reactor線程,第二個參數是所謂workerGroup線程池,是處理I/O讀寫的Reactor線程組。
NioServerSocketChannel建立成功後的初始化:
a.設置Socket參數和NioServerSocketChannel附加屬性 b.將AbstractBootstrap的Handler添加到NioServerSocketChannel的ChannelPipeline c.將用於服務端註冊的Handler(ServerBootstrapAcceptor)添加到ChannelPipeline
NioServerSocketChannel註冊:封裝成Task投遞到NioEventLoop,將NioServerSocketChannel註冊到NioEventLoop的Selector上(此時註冊0,不監放任何網絡操做)
註冊成功後,觸發ChannelRegistered事件,判斷監聽是否成功,成功則觸發ChannelActive事件,根據配置決定是否自動觸發Channel的讀事件,最終觸發ChannelPipeline讀操做,調用到HeadHandler的讀方法(業務處理)。不一樣Channel(不管客戶端仍是服務端)對讀操做準備工做不一樣,所以doBeginRead是個多態方法,這裏都要修改網絡監聽操做位爲自身感興趣的,NioServerSocketChannel感興趣的爲OP_ACCEPT(16)
用4 bit表示全部4種網絡操做類型:OP_READ 0001 OP_WRITE 0010 OP_CONNECT 0100 OP_ACCEPT 1000
好處是方便經過位操做進行操做位判斷和狀態修改,提高操做性能。
7)Selector輪詢。由Reactor線程NioEventLoop負責調度和執行Selector輪詢操做,選擇就緒的Channel集合。
根據就緒的操做位,執行不一樣操做。NioServerSocketChannel監聽的是鏈接操做,執行的是NioUnsafe(接口)的read方法,這裏使用的是NioMessageUnsafe(實現類,還有一個NioByteUnsafe)
doReadMessages方法:實際就是接收新的客戶端鏈接並建立NioSocketChannel.
接收到新的客戶端鏈接後,觸發ChannelPipeline的ChannelRead方法,執行headChannelHandlerContext的fireChannelRead方法(觸發事件),事件在ChannelPipeline中傳遞,執行ServerBootstrapAcceptor的channelRead方法,該方法分三個步驟:
a.將啓動時傳入的childHandler加入到客戶端SocketChannel的ChannelPipeline中 b.設置客戶端SocketChannel的TCP參數 c.註冊SocketChannel到多路複用器
NioSocketChannel註冊:仍註冊操做位爲0,觸發ChannelReadComplete事件,執行ChannelPipeline的read方法,執行HeadHandler的read方法,將網絡操做位改成OP_READ.
到此,新接入客戶端鏈接處理完成,可進行網絡讀寫I/O操做
8)輪詢到準備就緒的Channel後,由Reactor線程NioEventLoop執行ChannelPipeline相應方法(fire各類事件,觸發各類ChannelHandler的事件回調,觀察者模式),最終調度並執行ChannelHandler.
9)根據網絡事件類型,調度執行Netty系統ChannelHandler和用戶定製ChannelHandler.
7、Netty對粘包拆包的解決方案
8、編碼與解碼
9、序列化與反序列化
10、網絡傳輸私有協議制定與聊天室業務實現