· Linux網絡IO模型javascript
· 文件描述符html
· 阻塞IO模型java
· 非阻塞IO模型web
· IO複用模型數據庫
· 信號驅動IO模型編程
· 異步IO模型bootstrap
· BIO編程數組
· 僞異步IO編程瀏覽器
· NIO編程緩存
· 深刻Buffer
· Selector
· AIO編程
· Netty入門
· 開發與部署
· 粘包/拆包問題
· 問題及其解決
· 問題描述及其解決
· HTTP協議開發
· 文件服務器
· 問題及其解決
· 原理(過程)
· 開發
· Netty架構
· 邏輯架構
· 高性能
· 可靠性
· 可定製性
· 可擴展性
· 私有協議棧開發
1. Linux內核將全部外部設備視爲文件來操做。
2. 對一個文件的讀寫操做會調用內核提供的系統命令,返回一個file descripter(fd,文件描述符)。
3. 對一個socket的讀寫也會有相應的描述符,稱爲socketfd(socket描述符)。
1. 最經常使用的IO模型。
2. 默認的IO模型。
3. 以socket接口爲例說明阻塞IO模型。
1. 通常輪訓檢查內核數據是否就緒。
2. 若是內核數據未就緒,則直接返回一個EWOULDBLOCK錯誤。
1. Linux提供select/poll,進程傳遞一個或多個fd給select或poll系統調用,阻塞在select操做上,這樣select/poll能夠幫助進程同時檢測多個fd是否就緒。
2. select/poll存在支持fd數量有限、線性輪訓等問題,應採用基於事件驅動方式的epoll代替(當有fd就緒時,當即回調函數)。
進程先系統調用sigaction執行一個非阻塞的信號處理函數,進程繼續運行。當數據就緒時,爲該進程生成一個SIGIO信號,通知進程調用recvfrom讀取數據。
1. 進程告知內核啓動某個操做,並在內核完成整個操做後再通知進程。
2. 與信號驅動IO模型區別:信號驅動IO模型只通知數據就緒;異步IO模型通知操做已完成。
1. 有一個獨立的Acceptor線程負責監聽客戶端鏈接,接收到鏈接後爲每一個客戶端建立一個新的線程進行鏈路處理,處理完以後,經過輸出流返回給客戶端,線程銷燬。
2. 問題:服務端線程個數與客戶端併發訪問數1:1關係。當客戶端併發訪問量愈來愈大時,系統會發生線程堆棧溢出、建立新線程失敗等問題,最終致使進程宕機或僵死。
1. 當新客戶端接入時,將客戶端Socket封裝成一個Task(實現Runnable接口)投遞到線程池中進行處理。
2. 好處:因爲能夠設置線程池隊列的大小和最大線程數,因此資源佔用是可控的,客戶端併發數量增長不會致使資源耗盡、宕機。
3. 問題:底層通訊依然採用同步阻塞模型,沒法從根本上解決應答消息緩慢或網絡傳輸較慢時,長時間阻塞線程的問題。
1. BIO是面向流的,一次處理一個字節;NIO是面向塊的,以塊的形式處理數據。
2. BIO的java.io.*已經使用NIO從新實現過。
3. Buffer緩衝區存放着準備要寫入或讀出的數據。一般是一個字節數組,但也能夠是其餘類型的數組或不是數組。
4. Buffer類型:
a) ByteBuffer(經常使用)
b) CharBuffer
c) ShortBuffer
d) IntBuffer
e) LongBuffer
f) FloatBuffer
g) DoubleBuffer
5. Channel通道是雙向的,可經過它讀取或寫入數據。全部的數據都要經過Buffer來處理,永遠不會將數據直接寫入Channel。
6. 寫文件示例。
1 import java.io.FileOutputStream; 2 import java.io.IOException; 3 import java.io.UnsupportedEncodingException; 4 import java.nio.ByteBuffer; 5 import java.nio.channels.FileChannel; 6 import java.util.Random; 7 import java.util.UUID; 8 9 public class Test { 10 11 private static byte[] getRandomData() { 12 int randomLength = new Random().nextInt(100); 13 StringBuilder data = new StringBuilder(); 14 for (int index = 0; index < randomLength; index++) { 15 data.append(UUID.randomUUID().toString()); 16 } 17 return data.toString().getBytes(); 18 } 19 20 public static void main(String[] args) { 21 FileOutputStream fileOutputStream = null; 22 try { 23 fileOutputStream = new FileOutputStream("D:/test.txt"); 24 FileChannel fileChannel = fileOutputStream.getChannel(); 25 ByteBuffer byteBuffer = null; 26 for (int index = 0; index < 1000; index++) { 27 byte[] data = getRandomData(); 28 if (byteBuffer == null) { 29 byteBuffer = ByteBuffer.wrap(data); 30 } else if (data.length > byteBuffer.capacity()) { 31 if (byteBuffer.position() > 0) { 32 byteBuffer.flip(); 33 fileChannel.write(byteBuffer); 34 byteBuffer.clear(); 35 } 36 byteBuffer = ByteBuffer.wrap(data); 37 } else if (data.length > byteBuffer.remaining()) { 38 byteBuffer.flip(); 39 fileChannel.write(byteBuffer); 40 byteBuffer.clear(); 41 } 42 43 byteBuffer.put(data); 44 } 45 byteBuffer.flip(); 46 fileChannel.write(byteBuffer); 47 byteBuffer.clear(); 48 49 } catch (IOException e) { 50 e.printStackTrace(); 51 } finally { 52 if (fileOutputStream != null) { 53 try { 54 fileOutputStream.close(); 55 } catch (IOException e) { 56 e.printStackTrace(); 57 } 58 } 59 } 60 } 61 62 }
7. 讀文件示例。
1 import java.io.FileInputStream; 2 import java.io.IOException; 3 import java.nio.ByteBuffer; 4 import java.nio.channels.FileChannel; 5 6 public class Test { 7 8 public static void main(String[] args) { 9 FileInputStream fileInputStream = null; 10 try { 11 fileInputStream = new FileInputStream("D:/test.txt"); 12 FileChannel fileChannel = fileInputStream.getChannel(); 13 ByteBuffer byteBuffer = ByteBuffer.allocate(64); 14 while (fileChannel.read(byteBuffer) > 0) { 15 byteBuffer.flip(); 16 while (byteBuffer.hasRemaining()) { 17 System.out.print((char) byteBuffer.get()); 18 } 19 byteBuffer.clear(); 20 } 21 22 } catch (IOException e) { 23 e.printStackTrace(); 24 } finally { 25 if (fileInputStream != null) { 26 try { 27 fileInputStream.close(); 28 } catch (IOException e) { 29 e.printStackTrace(); 30 } 31 } 32 } 33 } 34 35 }
8. 複製文件示例。
1 import java.io.IOException; 2 import java.io.RandomAccessFile; 3 import java.nio.ByteBuffer; 4 import java.nio.channels.FileChannel; 5 6 public class Test { 7 8 public static void main(String[] args) { 9 RandomAccessFile sourceFile = null; 10 RandomAccessFile targetFile = null; 11 try { 12 sourceFile = new RandomAccessFile("D:/test.txt", "r"); 13 targetFile = new RandomAccessFile("D:/test.txt.bak", "rw"); 14 FileChannel sourceFileChannel = sourceFile.getChannel(); 15 FileChannel targetFileChannel = targetFile.getChannel(); 16 ByteBuffer byteBuffer = ByteBuffer.allocate(64); 17 while (sourceFileChannel.read(byteBuffer) > 0) { 18 byteBuffer.flip(); 19 targetFileChannel.write(byteBuffer); 20 byteBuffer.clear(); 21 } 22 23 } catch (IOException e) { 24 e.printStackTrace(); 25 } 26 } 27 28 }
1. Buffer能夠理解成數組,它經過如下3個值描述狀態:
a) position:下一個元素的位置;
b) limit:可讀取或寫入的元素總數,position老是小於或者等於limit;
c) capacity:Buffer最大容量,limit老是小於或者等於capacity。
2. 以讀、寫舉例說明Buffer。
a) 建立一個8字節的ByteBuffer。position=0,limit=8,capacity=8。
b) 讀取3個字節。position=3,limit=8,capacity=8。
c) 讀取2個字節。position=5,limit=8,capacity=8。
d) 執行flip()。position=0,limit=5,capacity=8。
e) 寫入4個字節。position=4,limit=5,capacity=8。
f) 寫入1個字節。position=5,limit=5,capacity=8。
g) 執行clear()。position=0,limit=8,capacity=8。
3. 建立ByteBuffer的兩種方法:
a) 建立固定大小的Buffer。
ByteBuffer.allocate(capacity)
b) 將數組及其內容包裝成Buffer。
byte array[] = new byte[1024]; ByteBuffer buffer = ByteBuffer.wrap(array);
1. Selector即IO複用模型中的多路複用器。
2. JDK使用了epoll。
1. AIO也稱NIO2.0,是異步IO模型。
2. JDK 7時在java.nio.channels包下新增了4個異步Channel。
a) AsynchronousSocketChannel
b) AsynchronousServerSocketChannel
c) AsynchronousFileChannel
d) AsynchronousDatagramChannel
3. 使用Future寫文件:異步執行,阻塞Future.get(),直到取得結果。
1 import java.io.IOException; 2 import java.nio.ByteBuffer; 3 import java.nio.channels.AsynchronousFileChannel; 4 import java.nio.file.Path; 5 import java.nio.file.Paths; 6 import java.nio.file.StandardOpenOption; 7 import java.util.ArrayList; 8 import java.util.List; 9 import java.util.Random; 10 import java.util.UUID; 11 import java.util.concurrent.ExecutionException; 12 import java.util.concurrent.Future; 13 14 public class Test { 15 16 private static byte[] getRandomData() { 17 int randomLength = new Random().nextInt(100); 18 StringBuilder data = new StringBuilder(); 19 for (int index = 0; index < randomLength; index++) { 20 data.append(UUID.randomUUID().toString()); 21 } 22 return data.append('\n').toString().getBytes(); 23 } 24 25 public static void main (String [] args) { 26 Path file = Paths.get("D:/test.txt"); 27 AsynchronousFileChannel asynchronousFileChannel = null; 28 try { 29 asynchronousFileChannel = AsynchronousFileChannel.open(file, StandardOpenOption.WRITE); 30 List<Future<Integer>> futures = new ArrayList<>(); 31 for (int index = 0; index < 10; index++) { 32 ByteBuffer byteBuffer = ByteBuffer.wrap(getRandomData()); 33 Future<Integer> future = asynchronousFileChannel.write(byteBuffer, 0); 34 futures.add(future); 35 } 36 for (Future<Integer> future : futures) { 37 Integer length = null; 38 try { 39 length = future.get(); 40 } catch (InterruptedException | ExecutionException e) { 41 e.printStackTrace(); 42 } 43 System.out.println("Bytes written: " + length); 44 } 45 46 } catch (IOException e) { 47 e.printStackTrace(); 48 } finally { 49 if (asynchronousFileChannel != null) { 50 try { 51 asynchronousFileChannel.close(); 52 } catch (IOException e) { 53 e.printStackTrace(); 54 } 55 } 56 } 57 } 58 }
4. 使用CompletionHandler寫文件:異步執行,回調CompletionHandler。注意:示例中,因爲不阻塞主線程,即異步任務是否結果主線程都會結束,有時會看不到結果,因此sleep 5秒。
1 import java.io.IOException; 2 import java.nio.ByteBuffer; 3 import java.nio.channels.AsynchronousFileChannel; 4 import java.nio.channels.CompletionHandler; 5 import java.nio.file.Path; 6 import java.nio.file.Paths; 7 import java.nio.file.StandardOpenOption; 8 import java.util.Random; 9 import java.util.UUID; 10 11 public class Test { 12 13 private static byte[] getRandomData() { 14 int randomLength = new Random().nextInt(100); 15 StringBuilder data = new StringBuilder(); 16 for (int index = 0; index < randomLength; index++) { 17 data.append(UUID.randomUUID().toString()); 18 } 19 return data.append('\n').toString().getBytes(); 20 } 21 22 public static void main (String [] args) { 23 Path file = Paths.get("D:/test.txt"); 24 AsynchronousFileChannel asynchronousFileChannel = null; 25 try { 26 asynchronousFileChannel = AsynchronousFileChannel.open(file, StandardOpenOption.WRITE); 27 CompletionHandler<Integer, Object> completionHandler = new CompletionHandler<Integer, Object>() { 28 @Override 29 public void completed(Integer result, Object attachment) { 30 System.out.println("Bytes written: " + result); 31 } 32 @Override 33 public void failed(Throwable exc, Object attachment) { 34 } 35 }; 36 for (int index = 0; index < 10; index ++) { 37 ByteBuffer byteBuffer = ByteBuffer.wrap(getRandomData()); 38 asynchronousFileChannel.write(byteBuffer, 0, null, completionHandler); 39 } 40 41 } catch (IOException e) { 42 e.printStackTrace(); 43 } finally { 44 if (asynchronousFileChannel != null) { 45 try { 46 asynchronousFileChannel.close(); 47 } catch (IOException e) { 48 e.printStackTrace(); 49 } 50 } 51 } 52 try { 53 Thread.sleep(5000); 54 } catch (InterruptedException e) { 55 e.printStackTrace(); 56 } 57 } 58 }
5. 使用Future讀文件:異步執行,阻塞Future.get(),直到取得結果。
1 import java.io.IOException; 2 import java.nio.ByteBuffer; 3 import java.nio.channels.AsynchronousFileChannel; 4 import java.nio.file.Path; 5 import java.nio.file.Paths; 6 import java.nio.file.StandardOpenOption; 7 import java.util.concurrent.ExecutionException; 8 import java.util.concurrent.Future; 9 10 public class Test { 11 12 public static void main (String [] args) { 13 Path file = Paths.get("D:/test.txt"); 14 AsynchronousFileChannel asynchronousFileChannel = null; 15 try { 16 asynchronousFileChannel = AsynchronousFileChannel.open(file, StandardOpenOption.READ); 17 ByteBuffer byteBuffer = ByteBuffer.allocate(64); 18 int position = 0; 19 int length = 0; 20 do { 21 Future<Integer> future = asynchronousFileChannel.read(byteBuffer, position); 22 length = future.get(); 23 if (length > 0) { 24 byteBuffer.flip(); 25 System.out.print(new String(byteBuffer.array())); 26 byteBuffer.clear(); 27 } 28 position += length; 29 } while (length > 0); 30 31 } catch (IOException e) { 32 e.printStackTrace(); 33 } catch (InterruptedException e) { 34 e.printStackTrace(); 35 } catch (ExecutionException e) { 36 e.printStackTrace(); 37 } finally { 38 if (asynchronousFileChannel != null) { 39 try { 40 asynchronousFileChannel.close(); 41 } catch (IOException e) { 42 e.printStackTrace(); 43 } 44 } 45 } 46 } 47 }
6. 使用CompletionHandler讀文件:異步執行,回調CompletionHandler。注意:示例中,因爲不阻塞主線程,即異步任務是否結果主線程都會結束,有時會看不到結果,因此sleep 5秒。
1 import java.io.IOException; 2 import java.nio.ByteBuffer; 3 import java.nio.channels.AsynchronousFileChannel; 4 import java.nio.channels.CompletionHandler; 5 import java.nio.file.Path; 6 import java.nio.file.Paths; 7 import java.nio.file.StandardOpenOption; 8 9 public class Test { 10 11 public static void main (String [] args) { 12 Path file = Paths.get("D:/test.txt"); 13 AsynchronousFileChannel asynchronousFileChannel = null; 14 try { 15 asynchronousFileChannel = AsynchronousFileChannel.open(file, StandardOpenOption.READ); 16 // 10個異步任務分別讀取文件頭64個字節,5秒後分別輸出。 17 CompletionHandler<Integer, ByteBuffer> completionHandler = new CompletionHandler<Integer, ByteBuffer>() { 18 @Override 19 public void completed(Integer result, ByteBuffer byteBuffer) { 20 byteBuffer.flip(); 21 System.out.print(new String(byteBuffer.array())); 22 byteBuffer.clear(); 23 } 24 @Override 25 public void failed(Throwable exc, ByteBuffer byteBuffer) { 26 } 27 }; 28 for (int index = 0; index < 10; index++) { 29 ByteBuffer byteBuffer = ByteBuffer.allocate(64); 30 asynchronousFileChannel.read(byteBuffer, byteBuffer.limit() * index, byteBuffer, completionHandler); 31 } 32 33 } catch (IOException e) { 34 e.printStackTrace(); 35 } finally { 36 if (asynchronousFileChannel != null) { 37 try { 38 asynchronousFileChannel.close(); 39 } catch (IOException e) { 40 e.printStackTrace(); 41 } 42 } 43 } 44 try { 45 Thread.sleep(5000); 46 } catch (InterruptedException e) { 47 e.printStackTrace(); 48 } 49 } 50 }
1. 對比。
2. 選擇NIO框架Netty,而不選擇JDK的NIO類庫的理由。
a) NIO類庫和API繁雜。
b) 需另具有Java多線程編程等技能。
c) 可靠性不高,工做量和難度很是大。
d) 臭名昭著的epoll Bug致使Selector空輪訓。
1. 開發環境:CLASSPATH中導入「netty-all-x.y.z.jar」便可。
2. 打包部署:因爲是非Web應用,構建成jar包部署便可。
1. 配置Maven的pom.xml文件。
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>5.0.0.Alpha1</version> </dependency>
2. 時間服務器TimeServer
1 import io.netty.bootstrap.ServerBootstrap; 2 import io.netty.channel.ChannelFuture; 3 import io.netty.channel.ChannelInitializer; 4 import io.netty.channel.ChannelOption; 5 import io.netty.channel.EventLoopGroup; 6 import io.netty.channel.nio.NioEventLoopGroup; 7 import io.netty.channel.socket.SocketChannel; 8 import io.netty.channel.socket.nio.NioServerSocketChannel; 9 10 public class TimeServer { 11 12 public void bind(int port) throws Exception { 13 // 服務器NIO線程組線 14 EventLoopGroup bossGroup = new NioEventLoopGroup(); 15 EventLoopGroup workerGroup = new NioEventLoopGroup(); 16 try { 17 ServerBootstrap serverBootstrap = new ServerBootstrap(); 18 serverBootstrap.group(bossGroup, workerGroup) 19 .channel(NioServerSocketChannel.class) 20 .option(ChannelOption.SO_BACKLOG, 1024) 21 .childHandler(new ChildChannelHandler()); 22 // 綁定端口,同步等待成功 23 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); 24 // 等待服務器監聽端口關閉 25 channelFuture.channel().closeFuture().sync(); 26 } finally { 27 // 優雅退出,釋放線程池資源 28 workerGroup.shutdownGracefully(); 29 bossGroup.shutdownGracefully(); 30 } 31 } 32 33 private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { 34 35 @Override 36 protected void initChannel(SocketChannel socketChannel) throws Exception { 37 socketChannel.pipeline().addLast(new TimeServerHandler()); 38 } 39 40 } 41 42 public static void main(String[] args) throws Exception { 43 new TimeServer().bind(8080); 44 } 45 46 }
3. 時間服務器TimeServerHandler
1 import java.util.Date; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.buffer.Unpooled; 5 import io.netty.channel.ChannelHandlerAdapter; 6 import io.netty.channel.ChannelHandlerContext; 7 8 public class TimeServerHandler extends ChannelHandlerAdapter { 9 10 @Override 11 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 12 ByteBuf reqBuf = (ByteBuf) msg; 13 byte[] req = new byte[reqBuf.readableBytes()]; 14 reqBuf.readBytes(req); 15 String reqString = new String(req, "UTF-8"); 16 String respString = "QUERY TIME ORDER".equalsIgnoreCase(reqString) ? new Date().toString() : "BAD ORDER"; 17 ByteBuf respBuf = Unpooled.copiedBuffer(respString.getBytes()); 18 ctx.write(respBuf); 19 } 20 21 @Override 22 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 23 ctx.flush(); 24 } 25 26 @Override 27 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 28 ctx.close(); 29 } 30 31 }
4. 時間客戶端TimeClient
1 import io.netty.bootstrap.Bootstrap; 2 import io.netty.channel.ChannelFuture; 3 import io.netty.channel.ChannelInitializer; 4 import io.netty.channel.ChannelOption; 5 import io.netty.channel.EventLoopGroup; 6 import io.netty.channel.nio.NioEventLoopGroup; 7 import io.netty.channel.socket.SocketChannel; 8 import io.netty.channel.socket.nio.NioSocketChannel; 9 10 public class TimeClient { 11 12 public void connect(String host, int port) throws Exception { 13 EventLoopGroup group = new NioEventLoopGroup(); 14 try { 15 // 客戶端NIO線程組 16 Bootstrap bootstrap = new Bootstrap(); 17 bootstrap.group(group).channel(NioSocketChannel.class) 18 .option(ChannelOption.TCP_NODELAY, true) 19 .handler(new ChildChannelHandler()); 20 // 發起異步鏈接操做 21 ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); 22 // 等待客戶端鏈路關閉 23 channelFuture.channel().closeFuture().sync(); 24 25 } finally { 26 // 優雅退出,釋放NIO線程組 27 group.shutdownGracefully(); 28 } 29 } 30 31 private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { 32 33 @Override 34 protected void initChannel(SocketChannel socketChannel) throws Exception { 35 socketChannel.pipeline().addLast(new TimeClientHandler()); 36 } 37 38 } 39 40 public static void main(String[] args) throws Exception { 41 new TimeClient().connect("127.0.0.1", 8080); 42 } 43 44 }
5. 時間客戶端TimeClientHandler
1 import io.netty.buffer.ByteBuf; 2 import io.netty.buffer.Unpooled; 3 import io.netty.channel.ChannelHandlerAdapter; 4 import io.netty.channel.ChannelHandlerContext; 5 6 public class TimeClientHandler extends ChannelHandlerAdapter { 7 8 private final ByteBuf reqBuf; 9 10 public TimeClientHandler() { 11 byte[] req = "QUERY TIME ORDER".getBytes(); 12 reqBuf = Unpooled.buffer(req.length); 13 reqBuf.writeBytes(req); 14 } 15 16 @Override 17 public void channelActive(ChannelHandlerContext ctx) throws Exception { 18 ctx.writeAndFlush(reqBuf); 19 } 20 21 @Override 22 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 23 ByteBuf respBuf = (ByteBuf) msg; 24 byte[] resp = new byte[respBuf.readableBytes()]; 25 respBuf.readBytes(resp); 26 String respString = new String(resp, "UTF-8"); 27 System.out.println(respString); 28 } 29 30 @Override 31 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 32 ctx.close(); 33 } 34 35 }
1. TCP是一個「流協議」,是沒有界限的一串數據。
2. TCP底層並不瞭解上層業務數據的具體含義,它會根據TCP緩衝區的實際狀況進行包的劃分。因此在業務上認爲,一個完整的包可能會被TCP拆包發送,也可能封裝多個
小包成大包發送。
3. 業界主流協議的解決方案概括:
a) 消息定長。如每一個報文的大小固定長度200字節,不足時空位補空格。
b) 在包尾增長回車換行符進行分割。如FTP協議。
c) 將消息分爲消息頭、消息體,消息頭中包含消息總長度(或消息體長度)的字段。
d) 更復雜的應用層協議。
4. Netty提供了多種編碼器用於解決粘包/拆包問題。
1. 原理:遍歷ByteBuf中的可讀字節,發現「\n」或「\r\n」時就結束。
2. 支持攜帶結束符或不攜帶結束符兩種編碼方式;支持配置單行的最大長度(超過最大長度未發現換行符則拋出異常,同時忽略掉以前讀到的異常碼流)。
3. StringDecoder功能:將接受到的對象轉成字符串,而後繼續調用後面的Handler。
4. 使用LineBasedFrameDecoder優化後的時間服務器。
a) 時間服務器TimeServer
1 import io.netty.bootstrap.ServerBootstrap; 2 import io.netty.channel.ChannelFuture; 3 import io.netty.channel.ChannelInitializer; 4 import io.netty.channel.ChannelOption; 5 import io.netty.channel.EventLoopGroup; 6 import io.netty.channel.nio.NioEventLoopGroup; 7 import io.netty.channel.socket.SocketChannel; 8 import io.netty.channel.socket.nio.NioServerSocketChannel; 9 import io.netty.handler.codec.LineBasedFrameDecoder; 10 import io.netty.handler.codec.string.StringDecoder; 11 12 public class TimeServer { 13 14 public void bind(int port) throws Exception { 15 // 服務器NIO線程組線 16 EventLoopGroup bossGroup = new NioEventLoopGroup(); 17 EventLoopGroup workerGroup = new NioEventLoopGroup(); 18 try { 19 ServerBootstrap serverBootstrap = new ServerBootstrap(); 20 serverBootstrap.group(bossGroup, workerGroup) 21 .channel(NioServerSocketChannel.class) 22 .option(ChannelOption.SO_BACKLOG, 1024) 23 .childHandler(new ChildChannelHandler()); 24 // 綁定端口,同步等待成功 25 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); 26 // 等待服務器監聽端口關閉 27 channelFuture.channel().closeFuture().sync(); 28 } finally { 29 // 優雅退出,釋放線程池資源 30 workerGroup.shutdownGracefully(); 31 bossGroup.shutdownGracefully(); 32 } 33 } 34 35 private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { 36 37 @Override 38 protected void initChannel(SocketChannel socketChannel) throws Exception { 39 socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024)); 40 socketChannel.pipeline().addLast(new StringDecoder()); 41 socketChannel.pipeline().addLast(new TimeServerHandler()); 42 } 43 44 } 45 46 public static void main(String[] args) throws Exception { 47 new TimeServer().bind(8080); 48 } 49 50 }
b) 時間服務器TimeServerHandler
1 import java.util.Date; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.buffer.Unpooled; 5 import io.netty.channel.ChannelHandlerAdapter; 6 import io.netty.channel.ChannelHandlerContext; 7 8 public class TimeServerHandler extends ChannelHandlerAdapter { 9 10 @Override 11 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 12 String reqString = (String) msg; 13 String respString = "QUERY TIME ORDER".equalsIgnoreCase(reqString) ? new Date().toString() : "BAD ORDER"; 14 respString += "\n"; 15 ByteBuf respBuf = Unpooled.copiedBuffer(respString.getBytes()); 16 ctx.write(respBuf); 17 } 18 19 @Override 20 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 21 ctx.flush(); 22 } 23 24 @Override 25 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 26 ctx.close(); 27 } 28 29 }
c) 時間客戶端TimeClient
1 import io.netty.bootstrap.Bootstrap; 2 import io.netty.channel.ChannelFuture; 3 import io.netty.channel.ChannelInitializer; 4 import io.netty.channel.ChannelOption; 5 import io.netty.channel.EventLoopGroup; 6 import io.netty.channel.nio.NioEventLoopGroup; 7 import io.netty.channel.socket.SocketChannel; 8 import io.netty.channel.socket.nio.NioSocketChannel; 9 import io.netty.handler.codec.LineBasedFrameDecoder; 10 import io.netty.handler.codec.string.StringDecoder; 11 12 public class TimeClient { 13 14 public void connect(String host, int port) throws Exception { 15 EventLoopGroup group = new NioEventLoopGroup(); 16 try { 17 // 客戶端NIO線程組 18 Bootstrap bootstrap = new Bootstrap(); 19 bootstrap.group(group).channel(NioSocketChannel.class) 20 .option(ChannelOption.TCP_NODELAY, true) 21 .handler(new ChildChannelHandler()); 22 // 發起異步鏈接操做 23 ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); 24 // 等待客戶端鏈路關閉 25 channelFuture.channel().closeFuture().sync(); 26 27 } finally { 28 // 優雅退出,釋放NIO線程組 29 group.shutdownGracefully(); 30 } 31 } 32 33 private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { 34 35 @Override 36 protected void initChannel(SocketChannel socketChannel) throws Exception { 37 socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024)); 38 socketChannel.pipeline().addLast(new StringDecoder()); 39 socketChannel.pipeline().addLast(new TimeClientHandler()); 40 } 41 42 } 43 44 public static void main(String[] args) throws Exception { 45 new TimeClient().connect("127.0.0.1", 8080); 46 } 47 48 }
d) 時間客戶端TimeClientHandler
1 import io.netty.buffer.ByteBuf; 2 import io.netty.buffer.Unpooled; 3 import io.netty.channel.ChannelHandlerAdapter; 4 import io.netty.channel.ChannelHandlerContext; 5 6 public class TimeClientHandler extends ChannelHandlerAdapter { 7 8 private final ByteBuf reqBuf; 9 10 public TimeClientHandler() { 11 byte[] req = "QUERY TIME ORDER\n".getBytes(); 12 reqBuf = Unpooled.buffer(req.length); 13 reqBuf.writeBytes(req); 14 } 15 16 @Override 17 public void channelActive(ChannelHandlerContext ctx) throws Exception { 18 ctx.writeAndFlush(reqBuf); 19 } 20 21 @Override 22 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 23 String respString = (String) msg; 24 System.out.println(respString); 25 } 26 27 @Override 28 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 29 ctx.close(); 30 } 31 32 }
1. 功能:以分隔符做爲碼流結束標識符的消息解碼。
2. 時間服務器TimeServer
1 import io.netty.bootstrap.ServerBootstrap; 2 import io.netty.buffer.ByteBuf; 3 import io.netty.buffer.Unpooled; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel; 10 import io.netty.channel.socket.nio.NioServerSocketChannel; 11 import io.netty.handler.codec.DelimiterBasedFrameDecoder; 12 import io.netty.handler.codec.string.StringDecoder; 13 14 public class TimeServer { 15 16 public void bind(int port) throws Exception { 17 // 服務器NIO線程組線 18 EventLoopGroup bossGroup = new NioEventLoopGroup(); 19 EventLoopGroup workerGroup = new NioEventLoopGroup(); 20 try { 21 ServerBootstrap serverBootstrap = new ServerBootstrap(); 22 serverBootstrap.group(bossGroup, workerGroup) 23 .channel(NioServerSocketChannel.class) 24 .option(ChannelOption.SO_BACKLOG, 1024) 25 .childHandler(new ChildChannelHandler()); 26 // 綁定端口,同步等待成功 27 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); 28 // 等待服務器監聽端口關閉 29 channelFuture.channel().closeFuture().sync(); 30 } finally { 31 // 優雅退出,釋放線程池資源 32 workerGroup.shutdownGracefully(); 33 bossGroup.shutdownGracefully(); 34 } 35 } 36 37 private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { 38 39 @Override 40 protected void initChannel(SocketChannel socketChannel) throws Exception { 41 ByteBuf delimiter = Unpooled.copiedBuffer("*&*".getBytes()); 42 socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); 43 socketChannel.pipeline().addLast(new StringDecoder()); 44 socketChannel.pipeline().addLast(new TimeServerHandler()); 45 } 46 47 } 48 49 public static void main(String[] args) throws Exception { 50 new TimeServer().bind(8080); 51 } 52 53 }
3. 時間服務器TimeServerHandler
1 import java.util.Date; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.buffer.Unpooled; 5 import io.netty.channel.ChannelHandlerAdapter; 6 import io.netty.channel.ChannelHandlerContext; 7 8 public class TimeServerHandler extends ChannelHandlerAdapter { 9 10 @Override 11 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 12 String reqString = (String) msg; 13 String respString = "QUERY TIME ORDER".equalsIgnoreCase(reqString) ? new Date().toString() : "BAD ORDER"; 14 respString += "*&*"; 15 ByteBuf respBuf = Unpooled.copiedBuffer(respString.getBytes()); 16 ctx.write(respBuf); 17 } 18 19 @Override 20 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 21 ctx.flush(); 22 } 23 24 @Override 25 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 26 ctx.close(); 27 } 28 29 }
4. 時間客戶端TimeClient
1 import io.netty.bootstrap.Bootstrap; 2 import io.netty.buffer.ByteBuf; 3 import io.netty.buffer.Unpooled; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel; 10 import io.netty.channel.socket.nio.NioSocketChannel; 11 import io.netty.handler.codec.DelimiterBasedFrameDecoder; 12 import io.netty.handler.codec.string.StringDecoder; 13 14 public class TimeClient { 15 16 public void connect(String host, int port) throws Exception { 17 EventLoopGroup group = new NioEventLoopGroup(); 18 try { 19 // 客戶端NIO線程組 20 Bootstrap bootstrap = new Bootstrap(); 21 bootstrap.group(group).channel(NioSocketChannel.class) 22 .option(ChannelOption.TCP_NODELAY, true) 23 .handler(new ChildChannelHandler()); 24 // 發起異步鏈接操做 25 ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); 26 // 等待客戶端鏈路關閉 27 channelFuture.channel().closeFuture().sync(); 28 29 } finally { 30 // 優雅退出,釋放NIO線程組 31 group.shutdownGracefully(); 32 } 33 } 34 35 private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { 36 37 @Override 38 protected void initChannel(SocketChannel socketChannel) throws Exception { 39 ByteBuf delimiter = Unpooled.copiedBuffer("*&*".getBytes()); 40 socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); 41 socketChannel.pipeline().addLast(new StringDecoder()); 42 socketChannel.pipeline().addLast(new TimeClientHandler()); 43 } 44 45 } 46 47 public static void main(String[] args) throws Exception { 48 new TimeClient().connect("127.0.0.1", 8080); 49 } 50 51 }
5. 時間客戶端TimeClientHandler
1 import io.netty.buffer.ByteBuf; 2 import io.netty.buffer.Unpooled; 3 import io.netty.channel.ChannelHandlerAdapter; 4 import io.netty.channel.ChannelHandlerContext; 5 6 public class TimeClientHandler extends ChannelHandlerAdapter { 7 8 private final ByteBuf reqBuf; 9 10 public TimeClientHandler() { 11 byte[] req = "QUERY TIME ORDER*&*".getBytes(); 12 reqBuf = Unpooled.buffer(req.length); 13 reqBuf.writeBytes(req); 14 } 15 16 @Override 17 public void channelActive(ChannelHandlerContext ctx) throws Exception { 18 ctx.writeAndFlush(reqBuf); 19 } 20 21 @Override 22 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 23 String respString = (String) msg; 24 System.out.println(respString); 25 } 26 27 @Override 28 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 29 ctx.close(); 30 } 31 32 }
1. 原理:不管一次接受到多少數據包,它都會按照設置的固定長度解碼,若是是半包消息,則緩存半包消息並等待下個包到達後進行拼包,直到讀取到一個完整的包。
2. 回顯服務器EchoServer
1 import io.netty.bootstrap.ServerBootstrap; 2 import io.netty.channel.ChannelFuture; 3 import io.netty.channel.ChannelInitializer; 4 import io.netty.channel.ChannelOption; 5 import io.netty.channel.EventLoopGroup; 6 import io.netty.channel.nio.NioEventLoopGroup; 7 import io.netty.channel.socket.SocketChannel; 8 import io.netty.channel.socket.nio.NioServerSocketChannel; 9 import io.netty.handler.codec.FixedLengthFrameDecoder; 10 import io.netty.handler.codec.string.StringDecoder; 11 12 public class EchoServer { 13 14 public void bind(int port) throws Exception { 15 // 服務器NIO線程組線 16 EventLoopGroup bossGroup = new NioEventLoopGroup(); 17 EventLoopGroup workerGroup = new NioEventLoopGroup(); 18 try { 19 ServerBootstrap serverBootstrap = new ServerBootstrap(); 20 serverBootstrap.group(bossGroup, workerGroup) 21 .channel(NioServerSocketChannel.class) 22 .option(ChannelOption.SO_BACKLOG, 1024) 23 .childHandler(new ChildChannelHandler()); 24 // 綁定端口,同步等待成功 25 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); 26 // 等待服務器監聽端口關閉 27 channelFuture.channel().closeFuture().sync(); 28 } finally { 29 // 優雅退出,釋放線程池資源 30 workerGroup.shutdownGracefully(); 31 bossGroup.shutdownGracefully(); 32 } 33 } 34 35 private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { 36 37 @Override 38 protected void initChannel(SocketChannel socketChannel) throws Exception { 39 socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(20)); 40 socketChannel.pipeline().addLast(new StringDecoder()); 41 socketChannel.pipeline().addLast(new EchoServerHandler()); 42 } 43 44 } 45 46 public static void main(String[] args) throws Exception { 47 new EchoServer().bind(8080); 48 } 49 50 }
3. 回顯服務器EchoServerHandler
1 import io.netty.channel.ChannelHandlerAdapter; 2 import io.netty.channel.ChannelHandlerContext; 3 4 public class EchoServerHandler extends ChannelHandlerAdapter { 5 6 @Override 7 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 8 System.out.println(msg); 9 } 10 11 @Override 12 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 13 ctx.close(); 14 } 15 16 }
4. 使用telnet命令測試,當長度達到20個字符時,服務器打印。
1. 沒法跨語言。Java序列化是Java語言內部的私有協議,其餘語言並不支持。
2. 序列化後的碼流太大。編碼後的字節數組越大,存儲的時候就越佔空間,存儲的硬件成本就越高,網絡傳輸時更佔帶寬,致使系統的吞吐量下降。
3. 序列化性能過低。編解碼耗時長。
4. 解決:編解碼框架,如Google Protobuf、MessagePack。此處不深刻展開。
1. 因爲HTTP協議的通用性,不少異構系統間的通訊交互採用HTTP協議,如很是流行的HTTP + XML或者RESTful + JSON。
2. 與Web容器相比,Netty開發HTTP的優點:輕量級;安全。
3. 這裏以文件服務器舉例,至於HTTP + XML,此處不深刻展開。
1. 文件服務器HttpFileServer
1 import io.netty.bootstrap.ServerBootstrap; 2 import io.netty.channel.ChannelFuture; 3 import io.netty.channel.ChannelInitializer; 4 import io.netty.channel.EventLoopGroup; 5 import io.netty.channel.nio.NioEventLoopGroup; 6 import io.netty.channel.socket.SocketChannel; 7 import io.netty.channel.socket.nio.NioServerSocketChannel; 8 import io.netty.handler.codec.http.HttpObjectAggregator; 9 import io.netty.handler.codec.http.HttpRequestDecoder; 10 import io.netty.handler.codec.http.HttpResponseEncoder; 11 import io.netty.handler.stream.ChunkedWriteHandler; 12 13 public class HttpFileServer { 14 15 public void run(int port, String folderPath) throws Exception { 16 EventLoopGroup bossGroup = new NioEventLoopGroup(); 17 EventLoopGroup workerGroup = new NioEventLoopGroup(); 18 try { 19 ServerBootstrap serverBootstrap = new ServerBootstrap(); 20 serverBootstrap.group(bossGroup, workerGroup) 21 .channel(NioServerSocketChannel.class) 22 .childHandler(new ChannelInitializer<SocketChannel>() { 23 24 @Override 25 protected void initChannel(SocketChannel socketChannel) throws Exception { 26 socketChannel.pipeline().addLast(new HttpRequestDecoder()); 27 socketChannel.pipeline().addLast(new HttpObjectAggregator(65536)); 28 socketChannel.pipeline().addLast(new HttpResponseEncoder()); 29 socketChannel.pipeline().addLast(new ChunkedWriteHandler()); 30 socketChannel.pipeline().addLast(new HttpFileServerHandler(folderPath)); 31 } 32 33 }); 34 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); 35 channelFuture.channel().closeFuture().sync(); 36 } finally { 37 workerGroup.shutdownGracefully(); 38 bossGroup.shutdownGracefully(); 39 } 40 } 41 42 public static void main(String[] args) throws Exception { 43 int port = 8080; 44 String folderPath = "E:/workspace"; 45 new HttpFileServer().run(port, folderPath); 46 } 47 48 }
2. 文件服務器HttpFileServerHandler
1 import io.netty.buffer.ByteBuf; 2 import io.netty.buffer.Unpooled; 3 import io.netty.channel.ChannelFutureListener; 4 import io.netty.channel.ChannelHandlerContext; 5 import io.netty.channel.SimpleChannelInboundHandler; 6 import io.netty.handler.codec.http.DefaultFullHttpResponse; 7 import io.netty.handler.codec.http.DefaultHttpResponse; 8 import io.netty.handler.codec.http.FullHttpRequest; 9 import io.netty.handler.codec.http.FullHttpResponse; 10 import io.netty.handler.codec.http.HttpHeaders; 11 import io.netty.handler.codec.http.HttpMethod; 12 import io.netty.handler.codec.http.HttpResponse; 13 import io.netty.handler.codec.http.HttpResponseStatus; 14 import io.netty.handler.codec.http.HttpVersion; 15 import io.netty.handler.stream.ChunkedFile; 16 import io.netty.util.CharsetUtil; 17 18 import java.io.File; 19 import java.io.FileNotFoundException; 20 import java.io.RandomAccessFile; 21 import java.net.URLDecoder; 22 23 public class HttpFileServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> { 24 25 private String folderPath; 26 27 public HttpFileServerHandler(String folderPath) { 28 this.folderPath = folderPath; 29 } 30 31 @Override 32 protected void messageReceived(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception { 33 if (!req.getDecoderResult().isSuccess()) { 34 sendStatus(ctx, HttpResponseStatus.BAD_REQUEST); 35 return; 36 } 37 if (!HttpMethod.GET.equals(req.getMethod())) { 38 sendStatus(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED); 39 return; 40 } 41 String uri = req.getUri(); 42 File file = getFile(uri); 43 if (file == null || file.isHidden() || !file.exists()) { 44 sendStatus(ctx, HttpResponseStatus.NOT_FOUND); 45 return; 46 } 47 try { 48 if (file.isDirectory()) { 49 listFiles(ctx, file, uri); 50 } else { 51 returnFile(ctx, req, file); 52 } 53 } catch (Exception e) { 54 sendStatus(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR); 55 } 56 } 57 58 private File getFile(String uri) throws Exception { 59 uri = URLDecoder.decode(uri, "UTF-8"); 60 return new File(folderPath + uri); 61 } 62 63 private void listFiles(ChannelHandlerContext ctx, File folder, String uri) throws Exception { 64 uri = uri.endsWith("/") ? uri : uri + "/"; 65 StringBuilder html = new StringBuilder("<h1>Index of ").append(URLDecoder.decode(uri, "UTF-8")).append("</h1><hr/><pre><a href=\"").append(uri).append("../\">../</a>\n"); 66 File[] subfiles = folder.listFiles(); 67 if (subfiles != null && subfiles.length > 0) { 68 for (File subfile : subfiles) { 69 String name = subfile.getName(); 70 html.append("<a href=\"").append(uri).append(name).append("\">").append(name).append("</a>\n"); 71 } 72 } 73 html.append("</pre><hr/>"); 74 FullHttpResponse resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); 75 resp.headers().add(HttpHeaders.Names.CONTENT_TYPE, "text/html;charset=UTF-8"); 76 ByteBuf content = Unpooled.copiedBuffer(html, CharsetUtil.UTF_8); 77 resp.content().writeBytes(content); 78 ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE); 79 } 80 81 private void returnFile(ChannelHandlerContext ctx, FullHttpRequest req, File file) throws Exception { 82 83 RandomAccessFile randomAccessFile = null; 84 try { 85 randomAccessFile = new RandomAccessFile(file, "r"); 86 HttpResponse resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); 87 resp.headers().set(HttpHeaders.Names.CONTENT_LENGTH, randomAccessFile.length()) 88 .set(HttpHeaders.Names.CONTENT_TYPE, "application/octet-stream"); 89 if (HttpHeaders.Values.KEEP_ALIVE.toString().equalsIgnoreCase(req.headers().get(HttpHeaders.Names.CONNECTION))) { 90 resp.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); 91 } 92 ctx.write(resp); 93 ctx.writeAndFlush(new ChunkedFile(randomAccessFile, 0, randomAccessFile.length(), 8192)).addListener(ChannelFutureListener.CLOSE); 94 95 } catch (FileNotFoundException e) { 96 sendStatus(ctx, HttpResponseStatus.NOT_FOUND); 97 } finally { 98 if (randomAccessFile != null) { 99 randomAccessFile.close(); 100 } 101 } 102 } 103 104 private void sendStatus(ChannelHandlerContext ctx, HttpResponseStatus status) throws Exception { 105 HttpResponse resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, status); 106 ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE); 107 } 108 109 }
1. 輪訓、Comet等服務器推送技術效率低下,大量消耗服務器帶寬和資源。
2. WebSocket的特色:
a) 單一的TCP鏈接,全雙工模式。
b) 對代理、防火牆和路由器透明。
c) 無頭部信息、Cookie和身份驗證。
d) 無安全開銷。
e) 經過「ping/pong」幀保持鏈路激活。
f) 服務器能夠主動傳遞消息給客戶端,客戶端再也不輪訓。
1. 瀏覽器向服務器發起一個HTTP請求(特別的頭信息,Sec-WebSocket-Key是隨機的),準備創建WebSocket鏈接。
GET /chat HTTP/1.1 Host: server.example.com Upgrade: websocket Connection: Upgrade Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw== Sec-WebSocket-Protocol: chat, superchat Sec-WebSocket-Version: 13 Origin: http://example.com
2. 服務器用Sec-WebSocket-Key加上魔幻字符串「258EAFA5-E914-47DA-95CA-C5AB0DC85B11」,先SHA-1加密,再BASE-64編碼,做爲Sec-WebSocket-Accept返回瀏覽器。握手完成。
HTTP/1.1 101 Switching Protocols Upgrade: websocket Connection: Upgrade Sec-WebSocket-Accept: HSmrc0sMlYUkAGmm5OPpG2HaGWk= Sec-WebSocket-Protocol: chat
3. 服務器和瀏覽器可經過message方式進行通訊。
4. 關閉消息帶有一個狀態碼和一個可選的關閉緣由,按協議要求發送一個Close控制幀,當對端接受到關閉控制幀指令時,主動關閉WebSocket鏈接。
1. 服務器WebSocketServer
1 import io.netty.bootstrap.ServerBootstrap; 2 import io.netty.channel.ChannelFuture; 3 import io.netty.channel.ChannelInitializer; 4 import io.netty.channel.EventLoopGroup; 5 import io.netty.channel.nio.NioEventLoopGroup; 6 import io.netty.channel.socket.SocketChannel; 7 import io.netty.channel.socket.nio.NioServerSocketChannel; 8 import io.netty.handler.codec.http.HttpObjectAggregator; 9 import io.netty.handler.codec.http.HttpRequestDecoder; 10 import io.netty.handler.codec.http.HttpResponseEncoder; 11 import io.netty.handler.stream.ChunkedWriteHandler; 12 13 public class WebSocketServer { 14 15 public void run(int port) throws Exception { 16 EventLoopGroup bossGroup = new NioEventLoopGroup(); 17 EventLoopGroup workerGroup = new NioEventLoopGroup(); 18 try { 19 ServerBootstrap serverBootstrap = new ServerBootstrap(); 20 serverBootstrap.group(bossGroup, workerGroup) 21 .channel(NioServerSocketChannel.class) 22 .childHandler(new ChannelInitializer<SocketChannel>() { 23 24 @Override 25 protected void initChannel(SocketChannel socketChannel) throws Exception { 26 socketChannel.pipeline().addLast(new HttpRequestDecoder()); 27 socketChannel.pipeline().addLast(new HttpObjectAggregator(65536)); 28 socketChannel.pipeline().addLast(new HttpResponseEncoder()); 29 socketChannel.pipeline().addLast(new ChunkedWriteHandler()); 30 socketChannel.pipeline().addLast(new WebSocketServerHandler()); 31 } 32 33 }); 34 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); 35 channelFuture.channel().closeFuture().sync(); 36 } finally { 37 workerGroup.shutdownGracefully(); 38 bossGroup.shutdownGracefully(); 39 } 40 } 41 42 public static void main(String[] args) throws Exception { 43 int port = 8080; 44 new WebSocketServer().run(port); 45 } 46 47 }
2. 服務器WebSocketServerHandler
1 import io.netty.channel.ChannelFutureListener; 2 import io.netty.channel.ChannelHandlerContext; 3 import io.netty.channel.SimpleChannelInboundHandler; 4 import io.netty.handler.codec.http.DefaultHttpResponse; 5 import io.netty.handler.codec.http.FullHttpRequest; 6 import io.netty.handler.codec.http.HttpHeaders; 7 import io.netty.handler.codec.http.HttpResponse; 8 import io.netty.handler.codec.http.HttpResponseStatus; 9 import io.netty.handler.codec.http.HttpVersion; 10 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; 11 import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; 12 import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; 13 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; 14 import io.netty.handler.codec.http.websocketx.WebSocketFrame; 15 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; 16 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; 17 18 import java.util.Date; 19 20 public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> { 21 22 private WebSocketServerHandshaker handshaker; 23 24 @Override 25 protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception { 26 // 傳統HTTP 27 if (msg instanceof FullHttpRequest) { 28 handleHttpRequest(ctx, (FullHttpRequest) msg); 29 } else if (msg instanceof WebSocketFrame) { 30 handleWebSocketFrame(ctx, (WebSocketFrame) msg); 31 } 32 } 33 34 @Override 35 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 36 ctx.flush(); 37 } 38 39 private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception { 40 if (!req.getDecoderResult().isSuccess() 41 || !HttpHeaders.Values.WEBSOCKET.toString().equalsIgnoreCase(req.headers().get(HttpHeaders.Names.UPGRADE))) { 42 sendStatus(ctx, HttpResponseStatus.BAD_REQUEST); 43 return; 44 } 45 WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://localhost:8080/testws", null, false); 46 handshaker = wsFactory.newHandshaker(req); 47 if (handshaker == null) { 48 WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel()); 49 } else { 50 handshaker.handshake(ctx.channel(), req); 51 } 52 } 53 54 private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception { 55 if (frame instanceof CloseWebSocketFrame) { 56 handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); 57 return; 58 } 59 if (frame instanceof PingWebSocketFrame) { 60 ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); 61 return; 62 } 63 if (!(frame instanceof TextWebSocketFrame)) { 64 throw new UnsupportedOperationException(); 65 } 66 String req = ((TextWebSocketFrame) frame).text(); 67 ctx.channel().write(new TextWebSocketFrame("歡迎" + req + ",如今時刻" + new Date())); 68 } 69 70 private void sendStatus(ChannelHandlerContext ctx, HttpResponseStatus status) throws Exception { 71 HttpResponse resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, status); 72 ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE); 73 } 74 75 }
3. 瀏覽器websocketclient.html
1 <script type="text/javascript"> 2 var socket; 3 function initSocket() { 4 if (socket) return; 5 if (!window.WebSocket) window.WebSocket = window.MozWebSocket; 6 if (!window.WebSocket) { 7 alert('瀏覽器不支持WebSocket'); 8 return; 9 } 10 socket = new WebSocket('ws://localhost:8080/testws'); 11 socket.onmessage = function(event) { 12 alert(event.data); 13 }; 14 socket.onopen = function(event) { 15 alert('WebSocket鏈接創建成功'); 16 }; 17 socket.onclose = function(event) { 18 alert('WebSocket鏈接已關閉'); 19 }; 20 } 21 22 function sendMsg() { 23 initSocket(); 24 if (socket && WebSocket && socket.readyState == WebSocket.OPEN) { 25 var msg = document.getElementById('msg').value; 26 socket.send(msg); 27 } 28 } 29 </script> 30 <input type="text" id="msg"/> 31 <input type="button" value="Send" onclick="sendMsg()"/>
1. Netty採用三層網絡架構設計和開發。
2. Reactor通訊調度層(第1層)。負責監聽網絡的讀寫和鏈接操做。將網絡層的數據讀取到內存緩存區,而後觸發各類網絡事件,如鏈接建立、鏈接激活、讀事件、寫事件等,將這些事件觸發到Pipeline中,有Pipeline管理的責任鏈進行後續處理。
3. 責任鏈ChannelPipleline(第2層)。負責事件在責任鏈中的有序傳播,同時動態地編排責任鏈。一般,由編解碼Handler將外部協議消息轉換成內部POJO對象,這樣上層業務只需關心業務邏輯處理。
4. 業務邏輯編排層Service ChannelHandler(第3層)。一般有兩類:存儲的業務邏輯編排和其餘應用層協議插件,用於特定協議相關的會話和鏈路管理。
5. 一般,開發者值需關係責任鏈和業務邏輯編排層。
Netty的高性能是如何實現的?
1. 採用異步非阻塞IO類庫,基於Reactor模式實現,解決了傳統同步阻塞IO模式下一個服務端沒法平滑處理線性增加的客戶端的問題。
2. TCP接收和發送緩衝區使用直接內存代替堆內存,避免內存複製,提高了IO讀寫性能。俗稱「零拷貝」(Zero-Copy)。
3. 經過內存池方式循環利用ByteBuf,避免了頻繁建立和銷燬ByteBuf帶來的性能損耗。
4. 可配置IO線程數、TCP參數等,爲不一樣場景提供定製化的調優參數,知足不一樣的性能場景。
5. 採用環形數組緩衝區實現無鎖化併發編程,代替傳統的線程安全容器和鎖。
6. 合理使用線程安全容器、原子類等,提高系統的併發處理能力。
7. 關鍵資源的處理使用單線程串行化方式,避免了多線程併發訪問帶來的鎖競爭和額外的CPU資源消耗問題。
8. 經過引用計數器及時申請釋放再也不被引用的對象,細粒度的內存管理下降了GC頻繁,減小了頻繁GC帶來的延時和CPU損耗。
Netty的可靠性是如何實現的?
1. 鏈路有效性檢測。
a) 長鏈接無需每次發送消息時建立鏈路,也無需在消息交互完成後關閉鏈路,所以相對短連接更高。
b) 爲保證長鏈接有效性,須要週期性心跳檢測。一旦發現問題,能夠及時關閉鏈路,重建TCP連接。
2. 內存保護機制。
a) 經過對象引用計數器對ByteBuf等內置對象進行細粒度的內存申請和釋放,對非法對象引用進行檢測和保護。
b) 經過內存池方式循環利用ByteBuf,節省內存。
c) 可設置內存容量上限,包括ByteBuf、線程池線程數等。
3. 優雅停機。
a) 當系統退出時,JVM經過註冊的Shutdown Hook攔截到退出信號量,而後執行退出操做,釋放相關模塊的資源,將緩衝區的消息處理完成或清空,將待刷新的數據持久化到磁盤或數據庫,完成後再退出。
b) 需設置超時時間T,若是達到T後仍然沒有退出,則經過「kill -9 pid」強殺進程。
Netty的可定製性是如何實現的?
1. 責任鏈模式:ChannelPipeline基於責任鏈模式,便於業務邏輯的攔截、定製和擴展。
2. 基於接口開發:關鍵類庫都提供了接口或抽象類。
3. 提供大量工廠類,重載工廠類可建立用戶實現的對象。
4. 提供大量系統參數供用戶設置。
可定義私有協議棧。
1. 開發時編寫的代碼。
a) 數據結構NettyMessage;
b) 消息編解碼器NettyMessageEncoder和NettyMessageDecoder;
c) 握手認證Handler LoginAuthReqHanlder和LoginAuthRespHanlder;
d) 心跳檢測Handler HearBeatReqHanlder和HearBeatRespHanlder。
2. 私有協議棧細節待補充。
做者:netoxi
出處:http://www.cnblogs.com/netoxi本文版權歸做者和博客園共有,歡迎轉載,未經贊成須保留此段聲明,且在文章頁面明顯位置給出原文鏈接。歡迎指正與交流。