Netty筆記——技術點彙總

目錄

· Linux網絡IO模型javascript

    · 文件描述符html

    · 阻塞IO模型java

    · 非阻塞IO模型web

    · IO複用模型數據庫

    · 信號驅動IO模型編程

    · 異步IO模型bootstrap

· BIO編程數組

· 僞異步IO編程瀏覽器

· NIO編程緩存

    · Buffer和Channel

    · 深刻Buffer

    · Selector

· AIO編程

· 四種IO編程對比及選擇Netty的緣由

· Netty入門

    · 開發與部署

    · Hello World

· 粘包/拆包問題

    · 問題及其解決

    · LineBasedFrameDecoder

    · DelimiterBasedFrameDecoder

    · FixedLengthFrameDecoder

· Java序列化問題

    · 問題描述及其解決

· HTTP協議開發

    · Netty HTTP

    · 文件服務器

· WebSocket協議開發

    · 問題及其解決

    · 原理(過程)

    · 開發

· Netty架構

    · 邏輯架構

    · 高性能

    · 可靠性

    · 可定製性

    · 可擴展性

· 私有協議棧開發


 

Linux網絡IO模型

文件描述符

1. Linux內核將全部外部設備視爲文件來操做。

2. 對一個文件的讀寫操做會調用內核提供的系統命令,返回一個file descripter(fd,文件描述符)。

3. 對一個socket的讀寫也會有相應的描述符,稱爲socketfd(socket描述符)。

阻塞IO模型

1. 最經常使用的IO模型。

2. 默認的IO模型。

3. 以socket接口爲例說明阻塞IO模型。

非阻塞IO模型

1. 通常輪訓檢查內核數據是否就緒。

2. 若是內核數據未就緒,則直接返回一個EWOULDBLOCK錯誤。

IO複用模型

1. Linux提供select/poll,進程傳遞一個或多個fd給select或poll系統調用,阻塞在select操做上,這樣select/poll能夠幫助進程同時檢測多個fd是否就緒。

2. select/poll存在支持fd數量有限、線性輪訓等問題,應採用基於事件驅動方式的epoll代替(當有fd就緒時,當即回調函數)。

信號驅動IO模型

進程先系統調用sigaction執行一個非阻塞的信號處理函數,進程繼續運行。當數據就緒時,爲該進程生成一個SIGIO信號,通知進程調用recvfrom讀取數據。

異步IO模型

1. 進程告知內核啓動某個操做,並在內核完成整個操做後再通知進程。

2. 與信號驅動IO模型區別:信號驅動IO模型只通知數據就緒;異步IO模型通知操做已完成。

BIO編程

1. 有一個獨立的Acceptor線程負責監聽客戶端鏈接,接收到鏈接後爲每一個客戶端建立一個新的線程進行鏈路處理,處理完以後,經過輸出流返回給客戶端,線程銷燬。

2. 問題:服務端線程個數與客戶端併發訪問數1:1關係。當客戶端併發訪問量愈來愈大時,系統會發生線程堆棧溢出、建立新線程失敗等問題,最終致使進程宕機或僵死。

僞異步IO編程

1. 當新客戶端接入時,將客戶端Socket封裝成一個Task(實現Runnable接口)投遞到線程池中進行處理。

2. 好處:因爲能夠設置線程池隊列的大小和最大線程數,因此資源佔用是可控的,客戶端併發數量增長不會致使資源耗盡、宕機。

3. 問題:底層通訊依然採用同步阻塞模型,沒法從根本上解決應答消息緩慢或網絡傳輸較慢時,長時間阻塞線程的問題。

NIO編程

Buffer和Channel

1. BIO是面向流的,一次處理一個字節;NIO是面向塊的,以塊的形式處理數據。

2. BIO的java.io.*已經使用NIO從新實現過。

3. Buffer緩衝區存放着準備要寫入或讀出的數據。一般是一個字節數組,但也能夠是其餘類型的數組或不是數組。

4. Buffer類型:

    a) ByteBuffer(經常使用)

    b) CharBuffer

    c) ShortBuffer

    d) IntBuffer

    e) LongBuffer

    f) FloatBuffer

    g) DoubleBuffer

5. Channel通道是雙向的,可經過它讀取或寫入數據。全部的數據都要經過Buffer來處理,永遠不會將數據直接寫入Channel。

 6. 寫文件示例。

 1 import java.io.FileOutputStream;
 2 import java.io.IOException;
 3 import java.io.UnsupportedEncodingException;
 4 import java.nio.ByteBuffer;
 5 import java.nio.channels.FileChannel;
 6 import java.util.Random;
 7 import java.util.UUID;
 8 
 9 public class Test {
10     
11     private static byte[] getRandomData() {
12         int randomLength = new Random().nextInt(100);
13         StringBuilder data = new StringBuilder();
14         for (int index = 0; index < randomLength; index++) {
15             data.append(UUID.randomUUID().toString());
16         }
17         return data.toString().getBytes();
18     }
19     
20     public static void main(String[] args) {
21         FileOutputStream fileOutputStream = null;
22         try {
23             fileOutputStream = new FileOutputStream("D:/test.txt");
24             FileChannel fileChannel = fileOutputStream.getChannel();
25             ByteBuffer byteBuffer = null;
26             for (int index = 0; index < 1000; index++) {
27                 byte[] data = getRandomData();
28                 if (byteBuffer == null) {
29                     byteBuffer = ByteBuffer.wrap(data);
30                 } else if (data.length > byteBuffer.capacity()) {
31                     if (byteBuffer.position() > 0) {
32                         byteBuffer.flip();
33                         fileChannel.write(byteBuffer);
34                         byteBuffer.clear();
35                     }
36                     byteBuffer = ByteBuffer.wrap(data);
37                 } else if (data.length > byteBuffer.remaining()) {
38                     byteBuffer.flip();
39                     fileChannel.write(byteBuffer);
40                     byteBuffer.clear();
41                 }
42                 
43                 byteBuffer.put(data);
44             }
45             byteBuffer.flip();
46             fileChannel.write(byteBuffer);
47             byteBuffer.clear();
48             
49         } catch (IOException e) {
50             e.printStackTrace();
51         } finally {
52             if (fileOutputStream != null) {
53                 try {
54                     fileOutputStream.close();
55                 } catch (IOException e) {
56                     e.printStackTrace();
57                 }
58             }
59         }
60     }
61     
62 }
View Code

7. 讀文件示例。

 1 import java.io.FileInputStream;
 2 import java.io.IOException;
 3 import java.nio.ByteBuffer;
 4 import java.nio.channels.FileChannel;
 5 
 6 public class Test {
 7 
 8     public static void main(String[] args) {
 9         FileInputStream fileInputStream = null;
10         try {
11             fileInputStream = new FileInputStream("D:/test.txt");
12             FileChannel fileChannel = fileInputStream.getChannel();
13             ByteBuffer byteBuffer = ByteBuffer.allocate(64);
14             while (fileChannel.read(byteBuffer) > 0) {
15                 byteBuffer.flip();
16                 while (byteBuffer.hasRemaining()) {
17                     System.out.print((char) byteBuffer.get());
18                 }
19                 byteBuffer.clear();
20             }
21             
22         } catch (IOException e) {
23             e.printStackTrace();
24         } finally {
25             if (fileInputStream != null) {
26                 try {
27                     fileInputStream.close();
28                 } catch (IOException e) {
29                     e.printStackTrace();
30                 }
31             }
32         }
33     }
34 
35 }
View Code

8. 複製文件示例。

 1 import java.io.IOException;
 2 import java.io.RandomAccessFile;
 3 import java.nio.ByteBuffer;
 4 import java.nio.channels.FileChannel;
 5 
 6 public class Test {
 7 
 8     public static void main(String[] args) {
 9         RandomAccessFile sourceFile = null;
10         RandomAccessFile targetFile = null;
11         try {
12             sourceFile = new RandomAccessFile("D:/test.txt", "r");
13             targetFile = new RandomAccessFile("D:/test.txt.bak", "rw");
14             FileChannel sourceFileChannel = sourceFile.getChannel();
15             FileChannel targetFileChannel = targetFile.getChannel();
16             ByteBuffer byteBuffer = ByteBuffer.allocate(64);
17             while (sourceFileChannel.read(byteBuffer) > 0) {
18                 byteBuffer.flip();
19                 targetFileChannel.write(byteBuffer);
20                 byteBuffer.clear();
21             }
22             
23         } catch (IOException e) {
24             e.printStackTrace();
25         }
26     }
27 
28 }
View Code

深刻Buffer

1. Buffer能夠理解成數組,它經過如下3個值描述狀態:

    a) position:下一個元素的位置;

    b) limit:可讀取或寫入的元素總數,position老是小於或者等於limit;

    c) capacity:Buffer最大容量,limit老是小於或者等於capacity。

2. 以讀、寫舉例說明Buffer。

    a) 建立一個8字節的ByteBuffer。position=0,limit=8,capacity=8。

     b) 讀取3個字節。position=3,limit=8,capacity=8。

     c) 讀取2個字節。position=5,limit=8,capacity=8。

     d) 執行flip()。position=0,limit=5,capacity=8。

    e) 寫入4個字節。position=4,limit=5,capacity=8。

     f) 寫入1個字節。position=5,limit=5,capacity=8。

    g) 執行clear()。position=0,limit=8,capacity=8。

 3. 建立ByteBuffer的兩種方法:

    a) 建立固定大小的Buffer。

ByteBuffer.allocate(capacity)

    b) 將數組及其內容包裝成Buffer。

byte array[] = new byte[1024];
ByteBuffer buffer = ByteBuffer.wrap(array);

Selector

1. Selector即IO複用模型中的多路複用器。

2. JDK使用了epoll。

AIO編程

1. AIO也稱NIO2.0,是異步IO模型。

2. JDK 7時在java.nio.channels包下新增了4個異步Channel。

    a) AsynchronousSocketChannel

    b) AsynchronousServerSocketChannel

    c) AsynchronousFileChannel

    d) AsynchronousDatagramChannel

3. 使用Future寫文件:異步執行,阻塞Future.get(),直到取得結果。

 1 import java.io.IOException;
 2 import java.nio.ByteBuffer;
 3 import java.nio.channels.AsynchronousFileChannel;
 4 import java.nio.file.Path;
 5 import java.nio.file.Paths;
 6 import java.nio.file.StandardOpenOption;
 7 import java.util.ArrayList;
 8 import java.util.List;
 9 import java.util.Random;
10 import java.util.UUID;
11 import java.util.concurrent.ExecutionException;
12 import java.util.concurrent.Future;
13 
14 public class Test {
15     
16     private static byte[] getRandomData() {
17         int randomLength = new Random().nextInt(100);
18         StringBuilder data = new StringBuilder();
19         for (int index = 0; index < randomLength; index++) {
20             data.append(UUID.randomUUID().toString());
21         }
22         return data.append('\n').toString().getBytes();
23     }
24 
25     public static void main (String [] args) {
26         Path file = Paths.get("D:/test.txt");
27         AsynchronousFileChannel asynchronousFileChannel = null;
28         try {
29             asynchronousFileChannel = AsynchronousFileChannel.open(file, StandardOpenOption.WRITE);
30             List<Future<Integer>> futures = new ArrayList<>();
31             for (int index = 0; index < 10; index++) {
32                 ByteBuffer byteBuffer = ByteBuffer.wrap(getRandomData());
33                 Future<Integer> future = asynchronousFileChannel.write(byteBuffer, 0);
34                 futures.add(future);
35             }
36             for (Future<Integer> future : futures) {
37                 Integer length = null;
38                 try {
39                     length = future.get();
40                 } catch (InterruptedException | ExecutionException e) {
41                     e.printStackTrace();
42                 }
43                 System.out.println("Bytes written: " + length);
44             }
45             
46         } catch (IOException e) {
47             e.printStackTrace();
48         } finally {
49             if (asynchronousFileChannel != null) {
50                 try {
51                     asynchronousFileChannel.close();
52                 } catch (IOException e) {
53                     e.printStackTrace();
54                 }
55             }
56         }
57     }
58 }
View Code

4. 使用CompletionHandler寫文件:異步執行,回調CompletionHandler。注意:示例中,因爲不阻塞主線程,即異步任務是否結果主線程都會結束,有時會看不到結果,因此sleep 5秒。

 1 import java.io.IOException;
 2 import java.nio.ByteBuffer;
 3 import java.nio.channels.AsynchronousFileChannel;
 4 import java.nio.channels.CompletionHandler;
 5 import java.nio.file.Path;
 6 import java.nio.file.Paths;
 7 import java.nio.file.StandardOpenOption;
 8 import java.util.Random;
 9 import java.util.UUID;
10 
11 public class Test {
12     
13     private static byte[] getRandomData() {
14         int randomLength = new Random().nextInt(100);
15         StringBuilder data = new StringBuilder();
16         for (int index = 0; index < randomLength; index++) {
17             data.append(UUID.randomUUID().toString());
18         }
19         return data.append('\n').toString().getBytes();
20     }
21 
22     public static void main (String [] args) {
23         Path file = Paths.get("D:/test.txt");
24         AsynchronousFileChannel asynchronousFileChannel = null;
25         try {
26             asynchronousFileChannel = AsynchronousFileChannel.open(file, StandardOpenOption.WRITE);
27             CompletionHandler<Integer, Object> completionHandler = new CompletionHandler<Integer, Object>() {
28                 @Override
29                 public void completed(Integer result, Object attachment) {
30                     System.out.println("Bytes written: " + result);
31                 }
32                 @Override
33                 public void failed(Throwable exc, Object attachment) {
34                 }
35             };
36             for (int index = 0; index < 10; index ++) {
37                 ByteBuffer byteBuffer = ByteBuffer.wrap(getRandomData());
38                 asynchronousFileChannel.write(byteBuffer, 0, null, completionHandler);
39             }
40             
41         } catch (IOException e) {
42             e.printStackTrace();
43         } finally {
44             if (asynchronousFileChannel != null) {
45                 try {
46                     asynchronousFileChannel.close();
47                 } catch (IOException e) {
48                     e.printStackTrace();
49                 }
50             }
51         }
52         try {
53             Thread.sleep(5000);
54         } catch (InterruptedException e) {
55             e.printStackTrace();
56         }
57     }
58 }
View Code

5. 使用Future讀文件:異步執行,阻塞Future.get(),直到取得結果。

 1 import java.io.IOException;
 2 import java.nio.ByteBuffer;
 3 import java.nio.channels.AsynchronousFileChannel;
 4 import java.nio.file.Path;
 5 import java.nio.file.Paths;
 6 import java.nio.file.StandardOpenOption;
 7 import java.util.concurrent.ExecutionException;
 8 import java.util.concurrent.Future;
 9 
10 public class Test {
11 
12     public static void main (String [] args) {
13         Path file = Paths.get("D:/test.txt");
14         AsynchronousFileChannel asynchronousFileChannel = null;
15         try {
16             asynchronousFileChannel = AsynchronousFileChannel.open(file, StandardOpenOption.READ);
17             ByteBuffer byteBuffer = ByteBuffer.allocate(64);
18             int position = 0;
19             int length = 0;
20             do {
21                 Future<Integer> future = asynchronousFileChannel.read(byteBuffer, position);
22                 length = future.get();
23                 if (length > 0) {
24                     byteBuffer.flip();
25                     System.out.print(new String(byteBuffer.array()));
26                     byteBuffer.clear();
27                 }
28                 position += length;
29             } while (length > 0);
30             
31         } catch (IOException e) {
32             e.printStackTrace();
33         } catch (InterruptedException e) {
34             e.printStackTrace();
35         } catch (ExecutionException e) {
36             e.printStackTrace();
37         } finally {
38             if (asynchronousFileChannel != null) {
39                 try {
40                     asynchronousFileChannel.close();
41                 } catch (IOException e) {
42                     e.printStackTrace();
43                 }
44             }
45         }
46     }
47 }
View Code

6. 使用CompletionHandler讀文件:異步執行,回調CompletionHandler。注意:示例中,因爲不阻塞主線程,即異步任務是否結果主線程都會結束,有時會看不到結果,因此sleep 5秒。

 1 import java.io.IOException;
 2 import java.nio.ByteBuffer;
 3 import java.nio.channels.AsynchronousFileChannel;
 4 import java.nio.channels.CompletionHandler;
 5 import java.nio.file.Path;
 6 import java.nio.file.Paths;
 7 import java.nio.file.StandardOpenOption;
 8 
 9 public class Test {
10 
11     public static void main (String [] args) {
12         Path file = Paths.get("D:/test.txt");
13         AsynchronousFileChannel asynchronousFileChannel = null;
14         try {
15             asynchronousFileChannel = AsynchronousFileChannel.open(file, StandardOpenOption.READ);
16             // 10個異步任務分別讀取文件頭64個字節,5秒後分別輸出。
17             CompletionHandler<Integer, ByteBuffer> completionHandler = new CompletionHandler<Integer, ByteBuffer>() {
18                 @Override
19                 public void completed(Integer result, ByteBuffer byteBuffer) {
20                     byteBuffer.flip();
21                     System.out.print(new String(byteBuffer.array()));
22                     byteBuffer.clear();
23                 }
24                 @Override
25                 public void failed(Throwable exc, ByteBuffer byteBuffer) {
26                 }
27             };
28             for (int index = 0; index < 10; index++) {
29                 ByteBuffer byteBuffer = ByteBuffer.allocate(64);
30                 asynchronousFileChannel.read(byteBuffer, byteBuffer.limit() * index, byteBuffer, completionHandler);
31             }
32             
33         } catch (IOException e) {
34             e.printStackTrace();
35         } finally {
36             if (asynchronousFileChannel != null) {
37                 try {
38                     asynchronousFileChannel.close();
39                 } catch (IOException e) {
40                     e.printStackTrace();
41                 }
42             }
43         }
44         try {
45             Thread.sleep(5000);
46         } catch (InterruptedException e) {
47             e.printStackTrace();
48         }
49     }
50 }
View Code

四種IO編程對比及選擇Netty的緣由

1. 對比。

2. 選擇NIO框架Netty,而不選擇JDK的NIO類庫的理由。

    a) NIO類庫和API繁雜。

    b) 需另具有Java多線程編程等技能。

    c) 可靠性不高,工做量和難度很是大。

    d) 臭名昭著的epoll Bug致使Selector空輪訓。

Netty入門

開發與部署

1. 開發環境:CLASSPATH中導入「netty-all-x.y.z.jar」便可。

2. 打包部署:因爲是非Web應用,構建成jar包部署便可。

Hello World

1. 配置Maven的pom.xml文件。

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>5.0.0.Alpha1</version>
</dependency>
View Code

2. 時間服務器TimeServer

 1 import io.netty.bootstrap.ServerBootstrap;
 2 import io.netty.channel.ChannelFuture;
 3 import io.netty.channel.ChannelInitializer;
 4 import io.netty.channel.ChannelOption;
 5 import io.netty.channel.EventLoopGroup;
 6 import io.netty.channel.nio.NioEventLoopGroup;
 7 import io.netty.channel.socket.SocketChannel;
 8 import io.netty.channel.socket.nio.NioServerSocketChannel;
 9 
10 public class TimeServer {
11     
12     public void bind(int port) throws Exception {
13         // 服務器NIO線程組線
14         EventLoopGroup bossGroup = new NioEventLoopGroup();
15         EventLoopGroup workerGroup = new NioEventLoopGroup();
16         try {
17             ServerBootstrap serverBootstrap = new ServerBootstrap();
18             serverBootstrap.group(bossGroup, workerGroup)
19                     .channel(NioServerSocketChannel.class)
20                     .option(ChannelOption.SO_BACKLOG, 1024)
21                     .childHandler(new ChildChannelHandler());
22             // 綁定端口,同步等待成功
23             ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
24             // 等待服務器監聽端口關閉
25             channelFuture.channel().closeFuture().sync();
26         } finally {
27             // 優雅退出,釋放線程池資源
28             workerGroup.shutdownGracefully();
29             bossGroup.shutdownGracefully();
30         }
31     }
32     
33     private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
34 
35         @Override
36         protected void initChannel(SocketChannel socketChannel) throws Exception {
37             socketChannel.pipeline().addLast(new TimeServerHandler());
38         }
39         
40     }
41     
42     public static void main(String[] args) throws Exception {
43         new TimeServer().bind(8080);
44     }
45     
46 }
View Code

3. 時間服務器TimeServerHandler

 1 import java.util.Date;
 2 
 3 import io.netty.buffer.ByteBuf;
 4 import io.netty.buffer.Unpooled;
 5 import io.netty.channel.ChannelHandlerAdapter;
 6 import io.netty.channel.ChannelHandlerContext;
 7 
 8 public class TimeServerHandler extends ChannelHandlerAdapter {
 9 
10     @Override
11     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
12         ByteBuf reqBuf = (ByteBuf) msg;
13         byte[] req = new byte[reqBuf.readableBytes()];
14         reqBuf.readBytes(req);
15         String reqString = new String(req, "UTF-8");
16         String respString = "QUERY TIME ORDER".equalsIgnoreCase(reqString) ? new Date().toString() : "BAD ORDER";
17         ByteBuf respBuf = Unpooled.copiedBuffer(respString.getBytes());
18         ctx.write(respBuf);
19     }
20     
21     @Override
22     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
23         ctx.flush();
24     }
25 
26     @Override
27     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
28         ctx.close();
29     }
30 
31 }
View Code

4. 時間客戶端TimeClient

 1 import io.netty.bootstrap.Bootstrap;
 2 import io.netty.channel.ChannelFuture;
 3 import io.netty.channel.ChannelInitializer;
 4 import io.netty.channel.ChannelOption;
 5 import io.netty.channel.EventLoopGroup;
 6 import io.netty.channel.nio.NioEventLoopGroup;
 7 import io.netty.channel.socket.SocketChannel;
 8 import io.netty.channel.socket.nio.NioSocketChannel;
 9 
10 public class TimeClient {
11     
12     public void connect(String host, int port) throws Exception {
13         EventLoopGroup group = new NioEventLoopGroup();
14         try {
15             // 客戶端NIO線程組
16             Bootstrap bootstrap = new Bootstrap();
17             bootstrap.group(group).channel(NioSocketChannel.class)
18                     .option(ChannelOption.TCP_NODELAY, true)
19                     .handler(new ChildChannelHandler());
20             // 發起異步鏈接操做
21             ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
22             // 等待客戶端鏈路關閉
23             channelFuture.channel().closeFuture().sync();
24             
25         } finally {
26             // 優雅退出,釋放NIO線程組
27             group.shutdownGracefully();
28         }
29     }
30     
31     private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
32 
33         @Override
34         protected void initChannel(SocketChannel socketChannel) throws Exception {
35             socketChannel.pipeline().addLast(new TimeClientHandler());
36         }
37         
38     }
39     
40     public static void main(String[] args) throws Exception {
41         new TimeClient().connect("127.0.0.1", 8080);
42     }
43     
44 }
View Code

5. 時間客戶端TimeClientHandler

 1 import io.netty.buffer.ByteBuf;
 2 import io.netty.buffer.Unpooled;
 3 import io.netty.channel.ChannelHandlerAdapter;
 4 import io.netty.channel.ChannelHandlerContext;
 5 
 6 public class TimeClientHandler extends ChannelHandlerAdapter {
 7 
 8     private final ByteBuf reqBuf;
 9     
10     public TimeClientHandler() {
11         byte[] req = "QUERY TIME ORDER".getBytes();
12         reqBuf = Unpooled.buffer(req.length);
13         reqBuf.writeBytes(req);
14     }
15     
16     @Override
17     public void channelActive(ChannelHandlerContext ctx) throws Exception {
18         ctx.writeAndFlush(reqBuf);
19     }
20     
21     @Override
22     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
23         ByteBuf respBuf = (ByteBuf) msg;
24         byte[] resp = new byte[respBuf.readableBytes()];
25         respBuf.readBytes(resp);
26         String respString = new String(resp, "UTF-8");
27         System.out.println(respString);
28     }
29     
30     @Override
31     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
32         ctx.close();
33     }
34     
35 }
View Code

粘包/拆包問題

問題及其解決

1. TCP是一個「流協議」,是沒有界限的一串數據。

2. TCP底層並不瞭解上層業務數據的具體含義,它會根據TCP緩衝區的實際狀況進行包的劃分。因此在業務上認爲,一個完整的包可能會被TCP拆包發送,也可能封裝多個

小包成大包發送。

3. 業界主流協議的解決方案概括:

    a) 消息定長。如每一個報文的大小固定長度200字節,不足時空位補空格。

    b) 在包尾增長回車換行符進行分割。如FTP協議。

    c) 將消息分爲消息頭、消息體,消息頭中包含消息總長度(或消息體長度)的字段。

    d) 更復雜的應用層協議。

4. Netty提供了多種編碼器用於解決粘包/拆包問題。

LineBasedFrameDecoder

1. 原理:遍歷ByteBuf中的可讀字節,發現「\n」或「\r\n」時就結束。

2. 支持攜帶結束符或不攜帶結束符兩種編碼方式;支持配置單行的最大長度(超過最大長度未發現換行符則拋出異常,同時忽略掉以前讀到的異常碼流)。

3. StringDecoder功能:將接受到的對象轉成字符串,而後繼續調用後面的Handler。

4. 使用LineBasedFrameDecoder優化後的時間服務器。

    a) 時間服務器TimeServer

 1 import io.netty.bootstrap.ServerBootstrap;
 2 import io.netty.channel.ChannelFuture;
 3 import io.netty.channel.ChannelInitializer;
 4 import io.netty.channel.ChannelOption;
 5 import io.netty.channel.EventLoopGroup;
 6 import io.netty.channel.nio.NioEventLoopGroup;
 7 import io.netty.channel.socket.SocketChannel;
 8 import io.netty.channel.socket.nio.NioServerSocketChannel;
 9 import io.netty.handler.codec.LineBasedFrameDecoder;
10 import io.netty.handler.codec.string.StringDecoder;
11 
12 public class TimeServer {
13     
14     public void bind(int port) throws Exception {
15         // 服務器NIO線程組線
16         EventLoopGroup bossGroup = new NioEventLoopGroup();
17         EventLoopGroup workerGroup = new NioEventLoopGroup();
18         try {
19             ServerBootstrap serverBootstrap = new ServerBootstrap();
20             serverBootstrap.group(bossGroup, workerGroup)
21                     .channel(NioServerSocketChannel.class)
22                     .option(ChannelOption.SO_BACKLOG, 1024)
23                     .childHandler(new ChildChannelHandler());
24             // 綁定端口,同步等待成功
25             ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
26             // 等待服務器監聽端口關閉
27             channelFuture.channel().closeFuture().sync();
28         } finally {
29             // 優雅退出,釋放線程池資源
30             workerGroup.shutdownGracefully();
31             bossGroup.shutdownGracefully();
32         }
33     }
34     
35     private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
36 
37         @Override
38         protected void initChannel(SocketChannel socketChannel) throws Exception {
39             socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
40             socketChannel.pipeline().addLast(new StringDecoder());
41             socketChannel.pipeline().addLast(new TimeServerHandler());
42         }
43         
44     }
45     
46     public static void main(String[] args) throws Exception {
47         new TimeServer().bind(8080);
48     }
49     
50 }
View Code

    b) 時間服務器TimeServerHandler

 1 import java.util.Date;
 2 
 3 import io.netty.buffer.ByteBuf;
 4 import io.netty.buffer.Unpooled;
 5 import io.netty.channel.ChannelHandlerAdapter;
 6 import io.netty.channel.ChannelHandlerContext;
 7 
 8 public class TimeServerHandler extends ChannelHandlerAdapter {
 9 
10     @Override
11     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
12         String reqString = (String) msg;
13         String respString = "QUERY TIME ORDER".equalsIgnoreCase(reqString) ? new Date().toString() : "BAD ORDER";
14         respString += "\n";
15         ByteBuf respBuf = Unpooled.copiedBuffer(respString.getBytes());
16         ctx.write(respBuf);
17     }
18     
19     @Override
20     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
21         ctx.flush();
22     }
23 
24     @Override
25     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
26         ctx.close();
27     }
28 
29 }
View Code

    c) 時間客戶端TimeClient

 1 import io.netty.bootstrap.Bootstrap;
 2 import io.netty.channel.ChannelFuture;
 3 import io.netty.channel.ChannelInitializer;
 4 import io.netty.channel.ChannelOption;
 5 import io.netty.channel.EventLoopGroup;
 6 import io.netty.channel.nio.NioEventLoopGroup;
 7 import io.netty.channel.socket.SocketChannel;
 8 import io.netty.channel.socket.nio.NioSocketChannel;
 9 import io.netty.handler.codec.LineBasedFrameDecoder;
10 import io.netty.handler.codec.string.StringDecoder;
11 
12 public class TimeClient {
13     
14     public void connect(String host, int port) throws Exception {
15         EventLoopGroup group = new NioEventLoopGroup();
16         try {
17             // 客戶端NIO線程組
18             Bootstrap bootstrap = new Bootstrap();
19             bootstrap.group(group).channel(NioSocketChannel.class)
20                     .option(ChannelOption.TCP_NODELAY, true)
21                     .handler(new ChildChannelHandler());
22             // 發起異步鏈接操做
23             ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
24             // 等待客戶端鏈路關閉
25             channelFuture.channel().closeFuture().sync();
26             
27         } finally {
28             // 優雅退出,釋放NIO線程組
29             group.shutdownGracefully();
30         }
31     }
32     
33     private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
34 
35         @Override
36         protected void initChannel(SocketChannel socketChannel) throws Exception {
37             socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
38             socketChannel.pipeline().addLast(new StringDecoder());
39             socketChannel.pipeline().addLast(new TimeClientHandler());
40         }
41         
42     }
43     
44     public static void main(String[] args) throws Exception {
45         new TimeClient().connect("127.0.0.1", 8080);
46     }
47     
48 }
View Code

    d) 時間客戶端TimeClientHandler

 1 import io.netty.buffer.ByteBuf;
 2 import io.netty.buffer.Unpooled;
 3 import io.netty.channel.ChannelHandlerAdapter;
 4 import io.netty.channel.ChannelHandlerContext;
 5 
 6 public class TimeClientHandler extends ChannelHandlerAdapter {
 7 
 8     private final ByteBuf reqBuf;
 9     
10     public TimeClientHandler() {
11         byte[] req = "QUERY TIME ORDER\n".getBytes();
12         reqBuf = Unpooled.buffer(req.length);
13         reqBuf.writeBytes(req);
14     }
15     
16     @Override
17     public void channelActive(ChannelHandlerContext ctx) throws Exception {
18         ctx.writeAndFlush(reqBuf);
19     }
20     
21     @Override
22     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
23         String respString = (String) msg;
24         System.out.println(respString);
25     }
26     
27     @Override
28     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
29         ctx.close();
30     }
31     
32 }
View Code

DelimiterBasedFrameDecoder

1. 功能:以分隔符做爲碼流結束標識符的消息解碼。

2. 時間服務器TimeServer

 1 import io.netty.bootstrap.ServerBootstrap;
 2 import io.netty.buffer.ByteBuf;
 3 import io.netty.buffer.Unpooled;
 4 import io.netty.channel.ChannelFuture;
 5 import io.netty.channel.ChannelInitializer;
 6 import io.netty.channel.ChannelOption;
 7 import io.netty.channel.EventLoopGroup;
 8 import io.netty.channel.nio.NioEventLoopGroup;
 9 import io.netty.channel.socket.SocketChannel;
10 import io.netty.channel.socket.nio.NioServerSocketChannel;
11 import io.netty.handler.codec.DelimiterBasedFrameDecoder;
12 import io.netty.handler.codec.string.StringDecoder;
13 
14 public class TimeServer {
15     
16     public void bind(int port) throws Exception {
17         // 服務器NIO線程組線
18         EventLoopGroup bossGroup = new NioEventLoopGroup();
19         EventLoopGroup workerGroup = new NioEventLoopGroup();
20         try {
21             ServerBootstrap serverBootstrap = new ServerBootstrap();
22             serverBootstrap.group(bossGroup, workerGroup)
23                     .channel(NioServerSocketChannel.class)
24                     .option(ChannelOption.SO_BACKLOG, 1024)
25                     .childHandler(new ChildChannelHandler());
26             // 綁定端口,同步等待成功
27             ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
28             // 等待服務器監聽端口關閉
29             channelFuture.channel().closeFuture().sync();
30         } finally {
31             // 優雅退出,釋放線程池資源
32             workerGroup.shutdownGracefully();
33             bossGroup.shutdownGracefully();
34         }
35     }
36     
37     private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
38 
39         @Override
40         protected void initChannel(SocketChannel socketChannel) throws Exception {
41             ByteBuf delimiter = Unpooled.copiedBuffer("*&*".getBytes());
42             socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
43             socketChannel.pipeline().addLast(new StringDecoder());
44             socketChannel.pipeline().addLast(new TimeServerHandler());
45         }
46         
47     }
48     
49     public static void main(String[] args) throws Exception {
50         new TimeServer().bind(8080);
51     }
52     
53 }
View Code

3. 時間服務器TimeServerHandler

 1 import java.util.Date;
 2 
 3 import io.netty.buffer.ByteBuf;
 4 import io.netty.buffer.Unpooled;
 5 import io.netty.channel.ChannelHandlerAdapter;
 6 import io.netty.channel.ChannelHandlerContext;
 7 
 8 public class TimeServerHandler extends ChannelHandlerAdapter {
 9 
10     @Override
11     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
12         String reqString = (String) msg;
13         String respString = "QUERY TIME ORDER".equalsIgnoreCase(reqString) ? new Date().toString() : "BAD ORDER";
14         respString += "*&*";
15         ByteBuf respBuf = Unpooled.copiedBuffer(respString.getBytes());
16         ctx.write(respBuf);
17     }
18     
19     @Override
20     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
21         ctx.flush();
22     }
23 
24     @Override
25     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
26         ctx.close();
27     }
28 
29 }
View Code

4. 時間客戶端TimeClient

 1 import io.netty.bootstrap.Bootstrap;
 2 import io.netty.buffer.ByteBuf;
 3 import io.netty.buffer.Unpooled;
 4 import io.netty.channel.ChannelFuture;
 5 import io.netty.channel.ChannelInitializer;
 6 import io.netty.channel.ChannelOption;
 7 import io.netty.channel.EventLoopGroup;
 8 import io.netty.channel.nio.NioEventLoopGroup;
 9 import io.netty.channel.socket.SocketChannel;
10 import io.netty.channel.socket.nio.NioSocketChannel;
11 import io.netty.handler.codec.DelimiterBasedFrameDecoder;
12 import io.netty.handler.codec.string.StringDecoder;
13 
14 public class TimeClient {
15     
16     public void connect(String host, int port) throws Exception {
17         EventLoopGroup group = new NioEventLoopGroup();
18         try {
19             // 客戶端NIO線程組
20             Bootstrap bootstrap = new Bootstrap();
21             bootstrap.group(group).channel(NioSocketChannel.class)
22                     .option(ChannelOption.TCP_NODELAY, true)
23                     .handler(new ChildChannelHandler());
24             // 發起異步鏈接操做
25             ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
26             // 等待客戶端鏈路關閉
27             channelFuture.channel().closeFuture().sync();
28             
29         } finally {
30             // 優雅退出,釋放NIO線程組
31             group.shutdownGracefully();
32         }
33     }
34     
35     private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
36 
37         @Override
38         protected void initChannel(SocketChannel socketChannel) throws Exception {
39             ByteBuf delimiter = Unpooled.copiedBuffer("*&*".getBytes());
40             socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
41             socketChannel.pipeline().addLast(new StringDecoder());
42             socketChannel.pipeline().addLast(new TimeClientHandler());
43         }
44         
45     }
46     
47     public static void main(String[] args) throws Exception {
48         new TimeClient().connect("127.0.0.1", 8080);
49     }
50     
51 }
View Code

5. 時間客戶端TimeClientHandler

 1 import io.netty.buffer.ByteBuf;
 2 import io.netty.buffer.Unpooled;
 3 import io.netty.channel.ChannelHandlerAdapter;
 4 import io.netty.channel.ChannelHandlerContext;
 5 
 6 public class TimeClientHandler extends ChannelHandlerAdapter {
 7 
 8     private final ByteBuf reqBuf;
 9     
10     public TimeClientHandler() {
11         byte[] req = "QUERY TIME ORDER*&*".getBytes();
12         reqBuf = Unpooled.buffer(req.length);
13         reqBuf.writeBytes(req);
14     }
15     
16     @Override
17     public void channelActive(ChannelHandlerContext ctx) throws Exception {
18         ctx.writeAndFlush(reqBuf);
19     }
20     
21     @Override
22     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
23         String respString = (String) msg;
24         System.out.println(respString);
25     }
26     
27     @Override
28     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
29         ctx.close();
30     }
31     
32 }
View Code

FixedLengthFrameDecoder

1. 原理:不管一次接受到多少數據包,它都會按照設置的固定長度解碼,若是是半包消息,則緩存半包消息並等待下個包到達後進行拼包,直到讀取到一個完整的包。

2. 回顯服務器EchoServer

 1 import io.netty.bootstrap.ServerBootstrap;
 2 import io.netty.channel.ChannelFuture;
 3 import io.netty.channel.ChannelInitializer;
 4 import io.netty.channel.ChannelOption;
 5 import io.netty.channel.EventLoopGroup;
 6 import io.netty.channel.nio.NioEventLoopGroup;
 7 import io.netty.channel.socket.SocketChannel;
 8 import io.netty.channel.socket.nio.NioServerSocketChannel;
 9 import io.netty.handler.codec.FixedLengthFrameDecoder;
10 import io.netty.handler.codec.string.StringDecoder;
11 
12 public class EchoServer {
13     
14     public void bind(int port) throws Exception {
15         // 服務器NIO線程組線
16         EventLoopGroup bossGroup = new NioEventLoopGroup();
17         EventLoopGroup workerGroup = new NioEventLoopGroup();
18         try {
19             ServerBootstrap serverBootstrap = new ServerBootstrap();
20             serverBootstrap.group(bossGroup, workerGroup)
21                     .channel(NioServerSocketChannel.class)
22                     .option(ChannelOption.SO_BACKLOG, 1024)
23                     .childHandler(new ChildChannelHandler());
24             // 綁定端口,同步等待成功
25             ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
26             // 等待服務器監聽端口關閉
27             channelFuture.channel().closeFuture().sync();
28         } finally {
29             // 優雅退出,釋放線程池資源
30             workerGroup.shutdownGracefully();
31             bossGroup.shutdownGracefully();
32         }
33     }
34     
35     private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
36 
37         @Override
38         protected void initChannel(SocketChannel socketChannel) throws Exception {
39             socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(20));
40             socketChannel.pipeline().addLast(new StringDecoder());
41             socketChannel.pipeline().addLast(new EchoServerHandler());
42         }
43         
44     }
45     
46     public static void main(String[] args) throws Exception {
47         new EchoServer().bind(8080);
48     }
49     
50 }
View Code

3. 回顯服務器EchoServerHandler

 1 import io.netty.channel.ChannelHandlerAdapter;
 2 import io.netty.channel.ChannelHandlerContext;
 3 
 4 public class EchoServerHandler extends ChannelHandlerAdapter {
 5 
 6     @Override
 7     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 8         System.out.println(msg);
 9     }
10     
11     @Override
12     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
13         ctx.close();
14     }
15 
16 }
View Code

4. 使用telnet命令測試,當長度達到20個字符時,服務器打印。

Java序列化問題

問題描述及其解決

1. 沒法跨語言。Java序列化是Java語言內部的私有協議,其餘語言並不支持。

2. 序列化後的碼流太大。編碼後的字節數組越大,存儲的時候就越佔空間,存儲的硬件成本就越高,網絡傳輸時更佔帶寬,致使系統的吞吐量下降。

3. 序列化性能過低。編解碼耗時長。

4. 解決:編解碼框架,如Google Protobuf、MessagePack。此處不深刻展開。

HTTP協議開發

Netty HTTP

1. 因爲HTTP協議的通用性,不少異構系統間的通訊交互採用HTTP協議,如很是流行的HTTP + XML或者RESTful + JSON。

2. 與Web容器相比,Netty開發HTTP的優點:輕量級;安全。

3. 這裏以文件服務器舉例,至於HTTP + XML,此處不深刻展開。

文件服務器

1. 文件服務器HttpFileServer

 1 import io.netty.bootstrap.ServerBootstrap;
 2 import io.netty.channel.ChannelFuture;
 3 import io.netty.channel.ChannelInitializer;
 4 import io.netty.channel.EventLoopGroup;
 5 import io.netty.channel.nio.NioEventLoopGroup;
 6 import io.netty.channel.socket.SocketChannel;
 7 import io.netty.channel.socket.nio.NioServerSocketChannel;
 8 import io.netty.handler.codec.http.HttpObjectAggregator;
 9 import io.netty.handler.codec.http.HttpRequestDecoder;
10 import io.netty.handler.codec.http.HttpResponseEncoder;
11 import io.netty.handler.stream.ChunkedWriteHandler;
12 
13 public class HttpFileServer {
14     
15     public void run(int port, String folderPath) throws Exception {
16         EventLoopGroup bossGroup = new NioEventLoopGroup();
17         EventLoopGroup workerGroup = new NioEventLoopGroup();
18         try {
19             ServerBootstrap serverBootstrap = new ServerBootstrap();
20             serverBootstrap.group(bossGroup, workerGroup)
21                     .channel(NioServerSocketChannel.class)
22                     .childHandler(new ChannelInitializer<SocketChannel>() {
23                         
24                         @Override
25                         protected void initChannel(SocketChannel socketChannel) throws Exception {
26                             socketChannel.pipeline().addLast(new HttpRequestDecoder());
27                             socketChannel.pipeline().addLast(new HttpObjectAggregator(65536));
28                             socketChannel.pipeline().addLast(new HttpResponseEncoder());
29                             socketChannel.pipeline().addLast(new ChunkedWriteHandler());
30                             socketChannel.pipeline().addLast(new HttpFileServerHandler(folderPath));
31                         }
32                         
33                     });
34             ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
35             channelFuture.channel().closeFuture().sync();
36         } finally {
37             workerGroup.shutdownGracefully();
38             bossGroup.shutdownGracefully();
39         }
40     }
41     
42     public static void main(String[] args) throws Exception {
43         int port = 8080;
44         String folderPath = "E:/workspace";
45         new HttpFileServer().run(port, folderPath);
46     }
47 
48 }
View Code

2. 文件服務器HttpFileServerHandler

  1 import io.netty.buffer.ByteBuf;
  2 import io.netty.buffer.Unpooled;
  3 import io.netty.channel.ChannelFutureListener;
  4 import io.netty.channel.ChannelHandlerContext;
  5 import io.netty.channel.SimpleChannelInboundHandler;
  6 import io.netty.handler.codec.http.DefaultFullHttpResponse;
  7 import io.netty.handler.codec.http.DefaultHttpResponse;
  8 import io.netty.handler.codec.http.FullHttpRequest;
  9 import io.netty.handler.codec.http.FullHttpResponse;
 10 import io.netty.handler.codec.http.HttpHeaders;
 11 import io.netty.handler.codec.http.HttpMethod;
 12 import io.netty.handler.codec.http.HttpResponse;
 13 import io.netty.handler.codec.http.HttpResponseStatus;
 14 import io.netty.handler.codec.http.HttpVersion;
 15 import io.netty.handler.stream.ChunkedFile;
 16 import io.netty.util.CharsetUtil;
 17 
 18 import java.io.File;
 19 import java.io.FileNotFoundException;
 20 import java.io.RandomAccessFile;
 21 import java.net.URLDecoder;
 22 
 23 public class HttpFileServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
 24     
 25     private String folderPath;
 26     
 27     public HttpFileServerHandler(String folderPath) {
 28         this.folderPath = folderPath;
 29     }
 30 
 31     @Override
 32     protected void messageReceived(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {
 33         if (!req.getDecoderResult().isSuccess()) {
 34             sendStatus(ctx, HttpResponseStatus.BAD_REQUEST);
 35             return;
 36         }
 37         if (!HttpMethod.GET.equals(req.getMethod())) {
 38             sendStatus(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED);
 39             return;
 40         }
 41         String uri = req.getUri();
 42         File file = getFile(uri);
 43         if (file == null || file.isHidden() || !file.exists()) {
 44             sendStatus(ctx, HttpResponseStatus.NOT_FOUND);
 45             return;
 46         }
 47         try {
 48             if (file.isDirectory()) {
 49                 listFiles(ctx, file, uri);
 50             } else {
 51                 returnFile(ctx, req, file);
 52             }
 53         } catch (Exception e) {
 54             sendStatus(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR);
 55         }
 56     }
 57     
 58     private File getFile(String uri) throws Exception {
 59         uri = URLDecoder.decode(uri, "UTF-8");
 60         return new File(folderPath + uri);
 61     }
 62     
 63     private void listFiles(ChannelHandlerContext ctx, File folder, String uri) throws Exception {
 64         uri = uri.endsWith("/") ? uri : uri + "/";
 65         StringBuilder html = new StringBuilder("<h1>Index of ").append(URLDecoder.decode(uri, "UTF-8")).append("</h1><hr/><pre><a href=\"").append(uri).append("../\">../</a>\n");
 66         File[] subfiles = folder.listFiles();
 67         if (subfiles != null && subfiles.length > 0) {
 68             for (File subfile : subfiles) {
 69                 String name = subfile.getName();
 70                 html.append("<a href=\"").append(uri).append(name).append("\">").append(name).append("</a>\n");
 71             }
 72         }
 73         html.append("</pre><hr/>");
 74         FullHttpResponse resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
 75         resp.headers().add(HttpHeaders.Names.CONTENT_TYPE, "text/html;charset=UTF-8");
 76         ByteBuf content = Unpooled.copiedBuffer(html, CharsetUtil.UTF_8);
 77         resp.content().writeBytes(content);
 78         ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE);
 79     }
 80     
 81     private void returnFile(ChannelHandlerContext ctx, FullHttpRequest req, File file) throws Exception {
 82         
 83         RandomAccessFile randomAccessFile = null;
 84         try {
 85             randomAccessFile = new RandomAccessFile(file, "r");
 86             HttpResponse resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
 87             resp.headers().set(HttpHeaders.Names.CONTENT_LENGTH, randomAccessFile.length())
 88                     .set(HttpHeaders.Names.CONTENT_TYPE, "application/octet-stream");
 89             if (HttpHeaders.Values.KEEP_ALIVE.toString().equalsIgnoreCase(req.headers().get(HttpHeaders.Names.CONNECTION))) {
 90                 resp.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
 91             }
 92             ctx.write(resp);
 93             ctx.writeAndFlush(new ChunkedFile(randomAccessFile, 0, randomAccessFile.length(), 8192)).addListener(ChannelFutureListener.CLOSE);
 94             
 95         } catch (FileNotFoundException e) {
 96             sendStatus(ctx, HttpResponseStatus.NOT_FOUND);
 97         } finally {
 98             if (randomAccessFile != null) {
 99                 randomAccessFile.close();
100             }
101         }
102     }
103     
104     private void sendStatus(ChannelHandlerContext ctx, HttpResponseStatus status) throws Exception {
105         HttpResponse resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, status);
106         ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE);
107     }
108 
109 }
View Code

WebSocket協議開發

問題及其解決

1. 輪訓、Comet等服務器推送技術效率低下,大量消耗服務器帶寬和資源。

2. WebSocket的特色:

    a) 單一的TCP鏈接,全雙工模式。

    b) 對代理、防火牆和路由器透明。

    c) 無頭部信息、Cookie和身份驗證。

    d) 無安全開銷。

    e) 經過「ping/pong」幀保持鏈路激活。

    f) 服務器能夠主動傳遞消息給客戶端,客戶端再也不輪訓。

原理(過程)

1. 瀏覽器向服務器發起一個HTTP請求(特別的頭信息,Sec-WebSocket-Key是隨機的),準備創建WebSocket鏈接。

GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==
Sec-WebSocket-Protocol: chat, superchat
Sec-WebSocket-Version: 13
Origin: http://example.com

2. 服務器用Sec-WebSocket-Key加上魔幻字符串「258EAFA5-E914-47DA-95CA-C5AB0DC85B11」,先SHA-1加密,再BASE-64編碼,做爲Sec-WebSocket-Accept返回瀏覽器。握手完成。

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: HSmrc0sMlYUkAGmm5OPpG2HaGWk=
Sec-WebSocket-Protocol: chat

3. 服務器和瀏覽器可經過message方式進行通訊。

4. 關閉消息帶有一個狀態碼和一個可選的關閉緣由,按協議要求發送一個Close控制幀,當對端接受到關閉控制幀指令時,主動關閉WebSocket鏈接。

開發

1. 服務器WebSocketServer

 1 import io.netty.bootstrap.ServerBootstrap;
 2 import io.netty.channel.ChannelFuture;
 3 import io.netty.channel.ChannelInitializer;
 4 import io.netty.channel.EventLoopGroup;
 5 import io.netty.channel.nio.NioEventLoopGroup;
 6 import io.netty.channel.socket.SocketChannel;
 7 import io.netty.channel.socket.nio.NioServerSocketChannel;
 8 import io.netty.handler.codec.http.HttpObjectAggregator;
 9 import io.netty.handler.codec.http.HttpRequestDecoder;
10 import io.netty.handler.codec.http.HttpResponseEncoder;
11 import io.netty.handler.stream.ChunkedWriteHandler;
12 
13 public class WebSocketServer {
14     
15     public void run(int port) throws Exception {
16         EventLoopGroup bossGroup = new NioEventLoopGroup();
17         EventLoopGroup workerGroup = new NioEventLoopGroup();
18         try {
19             ServerBootstrap serverBootstrap = new ServerBootstrap();
20             serverBootstrap.group(bossGroup, workerGroup)
21                     .channel(NioServerSocketChannel.class)
22                     .childHandler(new ChannelInitializer<SocketChannel>() {
23                         
24                         @Override
25                         protected void initChannel(SocketChannel socketChannel) throws Exception {
26                             socketChannel.pipeline().addLast(new HttpRequestDecoder());
27                             socketChannel.pipeline().addLast(new HttpObjectAggregator(65536));
28                             socketChannel.pipeline().addLast(new HttpResponseEncoder());
29                             socketChannel.pipeline().addLast(new ChunkedWriteHandler());
30                             socketChannel.pipeline().addLast(new WebSocketServerHandler());
31                         }
32                         
33                     });
34             ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
35             channelFuture.channel().closeFuture().sync();
36         } finally {
37             workerGroup.shutdownGracefully();
38             bossGroup.shutdownGracefully();
39         }
40     }
41     
42     public static void main(String[] args) throws Exception {
43         int port = 8080;
44         new WebSocketServer().run(port);
45     }
46     
47 }
View Code

2. 服務器WebSocketServerHandler

 1 import io.netty.channel.ChannelFutureListener;
 2 import io.netty.channel.ChannelHandlerContext;
 3 import io.netty.channel.SimpleChannelInboundHandler;
 4 import io.netty.handler.codec.http.DefaultHttpResponse;
 5 import io.netty.handler.codec.http.FullHttpRequest;
 6 import io.netty.handler.codec.http.HttpHeaders;
 7 import io.netty.handler.codec.http.HttpResponse;
 8 import io.netty.handler.codec.http.HttpResponseStatus;
 9 import io.netty.handler.codec.http.HttpVersion;
10 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
11 import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
12 import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
13 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
14 import io.netty.handler.codec.http.websocketx.WebSocketFrame;
15 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
16 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
17 
18 import java.util.Date;
19 
20 public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
21     
22     private WebSocketServerHandshaker handshaker;
23     
24     @Override
25     protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
26         // 傳統HTTP
27         if (msg instanceof FullHttpRequest) {
28             handleHttpRequest(ctx, (FullHttpRequest) msg);
29         } else if (msg instanceof WebSocketFrame) {
30             handleWebSocketFrame(ctx, (WebSocketFrame) msg);
31         }
32     }
33     
34     @Override
35     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
36         ctx.flush();
37     }
38     
39     private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {
40         if (!req.getDecoderResult().isSuccess()
41                 || !HttpHeaders.Values.WEBSOCKET.toString().equalsIgnoreCase(req.headers().get(HttpHeaders.Names.UPGRADE))) {
42             sendStatus(ctx, HttpResponseStatus.BAD_REQUEST);
43             return;
44         }
45         WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://localhost:8080/testws", null, false);
46         handshaker = wsFactory.newHandshaker(req);
47         if (handshaker == null) {
48             WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
49         } else {
50             handshaker.handshake(ctx.channel(), req);
51         }
52     }
53     
54     private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
55         if (frame instanceof CloseWebSocketFrame) {
56             handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
57             return;
58         }
59         if (frame instanceof PingWebSocketFrame) {
60             ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
61             return;
62         }
63         if (!(frame instanceof TextWebSocketFrame)) {
64             throw new UnsupportedOperationException();
65         }
66         String req = ((TextWebSocketFrame) frame).text();
67         ctx.channel().write(new TextWebSocketFrame("歡迎" + req + ",如今時刻" + new Date()));
68     }
69     
70     private void sendStatus(ChannelHandlerContext ctx, HttpResponseStatus status) throws Exception {
71         HttpResponse resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, status);
72         ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE);
73     }
74     
75 }
View Code

3. 瀏覽器websocketclient.html

 1 <script type="text/javascript">
 2 var socket;
 3 function initSocket() {
 4     if (socket) return;
 5     if (!window.WebSocket) window.WebSocket = window.MozWebSocket;
 6     if (!window.WebSocket) {
 7         alert('瀏覽器不支持WebSocket');
 8         return;
 9     }
10     socket = new WebSocket('ws://localhost:8080/testws');
11     socket.onmessage = function(event) {
12         alert(event.data);
13     };
14     socket.onopen = function(event) {
15         alert('WebSocket鏈接創建成功');
16     };
17     socket.onclose = function(event) {
18         alert('WebSocket鏈接已關閉');
19     };
20 }
21 
22 function sendMsg() {
23     initSocket();
24     if (socket && WebSocket && socket.readyState == WebSocket.OPEN) {
25         var msg = document.getElementById('msg').value;
26         socket.send(msg);
27     }
28 }
29 </script>
30 <input type="text" id="msg"/>
31 <input type="button" value="Send" onclick="sendMsg()"/>
View Code

Netty架構

邏輯架構

1. Netty採用三層網絡架構設計和開發。

2. Reactor通訊調度層(第1層)。負責監聽網絡的讀寫和鏈接操做。將網絡層的數據讀取到內存緩存區,而後觸發各類網絡事件,如鏈接建立、鏈接激活、讀事件、寫事件等,將這些事件觸發到Pipeline中,有Pipeline管理的責任鏈進行後續處理。

3. 責任鏈ChannelPipleline(第2層)。負責事件在責任鏈中的有序傳播,同時動態地編排責任鏈。一般,由編解碼Handler將外部協議消息轉換成內部POJO對象,這樣上層業務只需關心業務邏輯處理。

4. 業務邏輯編排層Service ChannelHandler(第3層)。一般有兩類:存儲的業務邏輯編排和其餘應用層協議插件,用於特定協議相關的會話和鏈路管理。

5. 一般,開發者值需關係責任鏈和業務邏輯編排層。

高性能

Netty的高性能是如何實現的?

1. 採用異步非阻塞IO類庫,基於Reactor模式實現,解決了傳統同步阻塞IO模式下一個服務端沒法平滑處理線性增加的客戶端的問題。

2. TCP接收和發送緩衝區使用直接內存代替堆內存,避免內存複製,提高了IO讀寫性能。俗稱「零拷貝」(Zero-Copy)。

3. 經過內存池方式循環利用ByteBuf,避免了頻繁建立和銷燬ByteBuf帶來的性能損耗。

4. 可配置IO線程數、TCP參數等,爲不一樣場景提供定製化的調優參數,知足不一樣的性能場景。

5. 採用環形數組緩衝區實現無鎖化併發編程,代替傳統的線程安全容器和鎖。

6. 合理使用線程安全容器、原子類等,提高系統的併發處理能力。

7. 關鍵資源的處理使用單線程串行化方式,避免了多線程併發訪問帶來的鎖競爭和額外的CPU資源消耗問題。

8. 經過引用計數器及時申請釋放再也不被引用的對象,細粒度的內存管理下降了GC頻繁,減小了頻繁GC帶來的延時和CPU損耗。

可靠性

Netty的可靠性是如何實現的?

1. 鏈路有效性檢測。

    a) 長鏈接無需每次發送消息時建立鏈路,也無需在消息交互完成後關閉鏈路,所以相對短連接更高。

    b) 爲保證長鏈接有效性,須要週期性心跳檢測。一旦發現問題,能夠及時關閉鏈路,重建TCP連接。

2. 內存保護機制。

    a) 經過對象引用計數器對ByteBuf等內置對象進行細粒度的內存申請和釋放,對非法對象引用進行檢測和保護。

    b) 經過內存池方式循環利用ByteBuf,節省內存。

    c) 可設置內存容量上限,包括ByteBuf、線程池線程數等。

3. 優雅停機。

    a) 當系統退出時,JVM經過註冊的Shutdown Hook攔截到退出信號量,而後執行退出操做,釋放相關模塊的資源,將緩衝區的消息處理完成或清空,將待刷新的數據持久化到磁盤或數據庫,完成後再退出。

    b) 需設置超時時間T,若是達到T後仍然沒有退出,則經過「kill -9 pid」強殺進程。

可定製性

Netty的可定製性是如何實現的?

1. 責任鏈模式:ChannelPipeline基於責任鏈模式,便於業務邏輯的攔截、定製和擴展。

2. 基於接口開發:關鍵類庫都提供了接口或抽象類。

3. 提供大量工廠類,重載工廠類可建立用戶實現的對象。

4. 提供大量系統參數供用戶設置。

可擴展性

可定義私有協議棧。

私有協議棧開發

1. 開發時編寫的代碼。

    a) 數據結構NettyMessage;

    b) 消息編解碼器NettyMessageEncoder和NettyMessageDecoder;

    c) 握手認證Handler LoginAuthReqHanlder和LoginAuthRespHanlder;

    d) 心跳檢測Handler HearBeatReqHanlder和HearBeatRespHanlder。

2. 私有協議棧細節待補充。

 

做者:netoxi
出處:http://www.cnblogs.com/netoxi本文版權歸做者和博客園共有,歡迎轉載,未經贊成須保留此段聲明,且在文章頁面明顯位置給出原文鏈接。歡迎指正與交流。

相關文章
相關標籤/搜索