上一篇咱們講了《Socket粘包問題的3種解決方案》,但沒想到評論區居然炸了。介於你們的熱情討論,以及不一樣的反饋意見,本文就來作一個擴展和延伸,試圖找到問題的最優解,以及消息通信的最優解決方案。java
在正式開始以前,咱們先對上篇評論中的幾個典型問題作一個簡單的回覆,不感興趣的朋友可直接劃過。git
先說答案:TCP 自己並無粘包和半包一說,由於 TCP 本質上只是一個傳輸控制協議(Transmission Control Protocol,TCP),它是一種面向鏈接的、可靠的、基於字節流的傳輸層通訊協議,由 IETF 的 RFC 793 定義。編程
所謂的協議本質上是一個約定,就比如 Java 編程約定使用駝峯命名法同樣,約定的意義是爲了讓通信雙方,可以正常的進行消息互換的,那粘包和半包問題又是如何產生的呢?bootstrap
這是由於在 TCP 的交互中,數據是以字節流的形式進行傳輸的,而「流」的傳輸是沒有邊界的,由於沒有邊界因此就不能區分消息的歸屬,從而就會產生粘包和半包問題(粘包和半包的定義,詳見上一篇)。因此說 TCP 協議自己並不存在粘包和半包問題,只是在使用中若是不能有效的肯定流的邊界就會產生粘包和半包問題。數組
坦白的說,通過評論區你們的耐心「開導」,我也意識到了以結束符做爲最終的解決方案存在必定的侷限性,好比當一條消息中間若是出現告終束符就會形成半包的問題,因此若是是複雜的字符串要對內容進行編碼和解碼處理,這樣才能保證結束符的正確性。安全
這個問題的答案是否認的,其實上文在開頭已經描述了應用場景:「傳統的 Socket 編程」,學習它的意義就在於理解更早期更底層的一些知識,固然做爲補充本文會提供更加高效的消息通信方案——Netty 通信。服務器
聊完了以上問題,接下來我們先來補充一下上篇文章中提到的,將消息分爲消息頭和消息體的代碼實現。網絡
在開始寫服務器端和客戶端以前,我們先來編寫一個消息的封裝類,使用它能夠將消息封裝成消息頭和消息體,以下圖所示:
消息頭中存儲消息體的長度,從而肯定了消息的邊界,便解決粘包和半包問題。框架
消息的封裝類中提供了兩個方法:一個是將消息轉換成消息頭 + 消息體的方法,另外一個是讀取消息頭的方法,具體實現代碼以下:dom
/** * 消息封裝類 */ class SocketPacket { // 消息頭存儲的長度(佔 8 字節) static final int HEAD_SIZE = 8; /** * 將協議封裝爲:協議頭 + 協議體 * @param context 消息體(String 類型) * @return byte[] */ public byte[] toBytes(String context) { // 協議體 byte 數組 byte[] bodyByte = context.getBytes(); int bodyByteLength = bodyByte.length; // 最終封裝對象 byte[] result = new byte[HEAD_SIZE + bodyByteLength]; // 藉助 NumberFormat 將 int 轉換爲 byte[] NumberFormat numberFormat = NumberFormat.getNumberInstance(); numberFormat.setMinimumIntegerDigits(HEAD_SIZE); numberFormat.setGroupingUsed(false); // 協議頭 byte 數組 byte[] headByte = numberFormat.format(bodyByteLength).getBytes(); // 封裝協議頭 System.arraycopy(headByte, 0, result, 0, HEAD_SIZE); // 封裝協議體 System.arraycopy(bodyByte, 0, result, HEAD_SIZE, bodyByteLength); return result; } /** * 獲取消息頭的內容(也就是消息體的長度) * @param inputStream * @return */ public int getHeader(InputStream inputStream) throws IOException { int result = 0; byte[] bytes = new byte[HEAD_SIZE]; inputStream.read(bytes, 0, HEAD_SIZE); // 獲得消息體的字節長度 result = Integer.valueOf(new String(bytes)); return result; } }
接下來咱們來定義客戶端,在客戶端中咱們添加一組待發送的消息,隨機給服務器端發送一個消息,實現代碼以下:
/** * 客戶端 */ class MySocketClient { public static void main(String[] args) throws IOException { // 啓動 Socket 並嘗試鏈接服務器 Socket socket = new Socket("127.0.0.1", 9093); // 發送消息合集(隨機發送一條消息) final String[] message = {"Hi,Java.", "Hi,SQL~", "關注公衆號|Java中文社羣."}; // 建立協議封裝對象 SocketPacket socketPacket = new SocketPacket(); try (OutputStream outputStream = socket.getOutputStream()) { // 給服務器端發送 10 次消息 for (int i = 0; i < 10; i++) { // 隨機發送一條消息 String msg = message[new Random().nextInt(message.length)]; // 將內容封裝爲:協議頭+協議體 byte[] bytes = socketPacket.toBytes(msg); // 發送消息 outputStream.write(bytes, 0, bytes.length); outputStream.flush(); } } } }
服務器端咱們使用線程池來處理每一個客戶端的業務請求,實現代碼以下:
/** * 服務器端 */ class MySocketServer { public static void main(String[] args) throws IOException { // 建立 Socket 服務器端 ServerSocket serverSocket = new ServerSocket(9093); // 獲取客戶端鏈接 Socket clientSocket = serverSocket.accept(); // 使用線程池處理更多的客戶端 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(100, 150, 100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000)); threadPool.submit(() -> { // 客戶端消息處理 processMessage(clientSocket); }); } /** * 客戶端消息處理 * @param clientSocket */ private static void processMessage(Socket clientSocket) { // Socket 封裝對象 SocketPacket socketPacket = new SocketPacket(); // 獲取客戶端發送的消息對象 try (InputStream inputStream = clientSocket.getInputStream()) { while (true) { // 獲取消息頭(也就是消息體的長度) int bodyLength = socketPacket.getHeader(inputStream); // 消息體 byte 數組 byte[] bodyByte = new byte[bodyLength]; // 每次實際讀取字節數 int readCount = 0; // 消息體賦值下標 int bodyIndex = 0; // 循環接收消息頭中定義的長度 while (bodyIndex <= (bodyLength - 1) && (readCount = inputStream.read(bodyByte, bodyIndex, bodyLength)) != -1) { bodyIndex += readCount; } bodyIndex = 0; // 成功接收到客戶端的消息並打印 System.out.println("接收到客戶端的信息:" + new String(bodyByte)); } } catch (IOException ioException) { System.out.println(ioException.getMessage()); } } }
以上程序的執行結果以下:
從上述結果能夠看出,消息通信正常,客戶端和服務器端的交互中並無出現粘包和半包的問題。
以上的內容都是針對傳統 Socket 編程的,但要實現更加高效的通信和鏈接對象的複用就要使用 NIO(Non-Blocking IO,非阻塞 IO)或者 AIO(Asynchronous IO,異步非阻塞 IO)了。
傳統的 Socket 編程是 BIO(Blocking IO,同步阻塞 IO),它和 NIO 和 AIO 的區別以下:
PS:AIO 能夠看做是 NIO 的升級,它也叫 NIO 2。
傳統 Socket 的通信流程:
NIO 的通信流程:
NIO 的設計思路雖然很好,但它的代碼編寫比較麻煩,好比 Buffer 的使用和 Selector 的編寫等。而且在面對斷線重連、包丟失和粘包等複雜問題時手動處理的成本都很大,所以咱們一般會使用 Netty 框架來替代傳統的 NIO。
Netty 是一個異步、事件驅動的用來作高性能、高可靠性的網絡應用框架,使用它能夠快速輕鬆地開發網絡應用程序,極大的簡化了網絡編程的複雜度。
Netty 主要優勢有如下幾個:
Netty 主要包含如下 3 個部分,以下圖所示:
這 3 個部分的功能介紹以下。
Core 核心層是 Netty 最精華的內容,它提供了底層網絡通訊的通用抽象和實現,包括可擴展的事件模型、通用的通訊 API、支持零拷貝的 ByteBuf 等。
協議支持層基本上覆蓋了主流協議的編解碼實現,如 HTTP、SSL、Protobuf、壓縮、大文件傳輸、WebSocket、文本、二進制等主流協議,此外 Netty 還支持自定義應用層協議。Netty 豐富的協議支持下降了用戶的開發成本,基於 Netty 咱們能夠快速開發 HTTP、WebSocket 等服務。
傳輸服務層提供了網絡傳輸能力的定義和實現方法。它支持 Socket、HTTP 隧道、虛擬機管道等傳輸方式。Netty 對 TCP、UDP 等數據傳輸作了抽象和封裝,用戶能夠更聚焦在業務邏輯實現上,而沒必要關係底層數據傳輸的細節。
對 Netty 有了大概的認識以後,接下來咱們用 Netty 來編寫一個基礎的通信服務器,它包含兩個端:服務器端和客戶端,客戶端負責發送消息,服務器端負責接收並打印消息,具體的實現步驟以下。
首先咱們須要先添加 Netty 框架的支持,若是是 Maven 項目添加以下配置便可:
<!-- 添加 Netty 框架 --> <!-- https://mvnrepository.com/artifact/io.netty/netty-all --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.56.Final</version> </dependency>
Netty 的 3.x 和 4.x 爲主流的穩定版本,而最新的 5.x 已是放棄的測試版了,所以推薦使用 Netty 4.x 的最新穩定版。
按照官方的推薦,這裏將服務器端的代碼分爲如下 3 個部分:
PS:Channel 字面意思爲「通道」,它是網絡通訊的載體。Channel 提供了基本的 API 用於網絡 I/O 操做,如 register、bind、connect、read、write、flush 等。Netty 本身實現的 Channel 是以 JDK NIO Channel 爲基礎的,相比較於 JDK NIO,Netty 的 Channel 提供了更高層次的抽象,同時屏蔽了底層 Socket 的複雜性,賦予了 Channel 更增強大的功能,你在使用 Netty 時基本不須要再與 Java Socket 類直接打交道。
服務器端的實現代碼以下:
// 定義服務器的端口號 static final int PORT = 8007; /** * 服務器端 */ static class MyNettyServer { public static void main(String[] args) { // 建立一個線程組,用來負責接收客戶端鏈接 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 建立另外一個線程組,用來負責 I/O 的讀寫 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 建立一個 Server 實例(可理解爲 Netty 的入門類) ServerBootstrap b = new ServerBootstrap(); // 將兩個線程池設置到 Server 實例 b.group(bossGroup, workerGroup) // 設置 Netty 通道的類型爲 NioServerSocket(非阻塞 I/O Socket 服務器) .channel(NioServerSocketChannel.class) // 設置創建鏈接以後的執行器(ServerInitializer 是我建立的一個自定義類) .childHandler(new ServerInitializer()); // 綁定端口而且進行同步 ChannelFuture future = b.bind(PORT).sync(); // 對關閉通道進行監聽 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 資源關閉 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } /** * 服務端通道初始化 */ static class ServerInitializer extends ChannelInitializer<SocketChannel> { // 字符串編碼器和解碼器 private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); // 服務器端鏈接以後的執行器(自定義的類) private static final ServerHandler SERVER_HANDLER = new ServerHandler(); /** * 初始化通道的具體執行方法 */ @Override public void initChannel(SocketChannel ch) { // 通道 Channel 設置 ChannelPipeline pipeline = ch.pipeline(); // 設置(字符串)編碼器和解碼器 pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // 服務器端鏈接以後的執行器,接收到消息以後的業務處理 pipeline.addLast(SERVER_HANDLER); } } /** * 服務器端接收到消息以後的業務處理類 */ static class ServerHandler extends SimpleChannelInboundHandler<String> { /** * 讀取到客戶端的消息 */ @Override public void channelRead0(ChannelHandlerContext ctx, String request) { if (!request.isEmpty()) { System.out.println("接到客戶端的消息:" + request); } } /** * 數據讀取完畢 */ @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } /** * 異常處理,打印異常並關閉通道 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
客戶端的代碼實現也是分爲如下 3 個部分:
客戶端的實現代碼以下:
/** * 客戶端 */ static class MyNettyClient { public static void main(String[] args) { // 建立事件循環線程組(客戶端的線程組只有一個) EventLoopGroup group = new NioEventLoopGroup(); try { // Netty 客戶端啓動對象 Bootstrap b = new Bootstrap(); // 設置啓動參數 b.group(group) // 設置通道類型 .channel(NioSocketChannel.class) // 設置啓動執行器(負責啓動事件的業務執行,ClientInitializer 爲自定義的類) .handler(new ClientInitializer()); // 鏈接服務器端並同步通道 Channel ch = b.connect("127.0.0.1", 8007).sync().channel(); // 發送消息 ChannelFuture lastWriteFuture = null; // 給服務器端發送 10 條消息 for (int i = 0; i < 10; i++) { // 發送給服務器消息 lastWriteFuture = ch.writeAndFlush("Hi,Java."); } // 在關閉通道以前,同步刷新全部的消息 if (lastWriteFuture != null) { lastWriteFuture.sync(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { // 釋放資源 group.shutdownGracefully(); } } } /** * 客戶端通道初始化類 */ static class ClientInitializer extends ChannelInitializer<SocketChannel> { // 字符串編碼器和解碼器 private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); // 客戶端鏈接成功以後業務處理 private static final ClientHandler CLIENT_HANDLER = new ClientHandler(); /** * 初始化客戶端通道 */ @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); // 設置(字符串)編碼器和解碼器 pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // 客戶端鏈接成功以後的業務處理 pipeline.addLast(CLIENT_HANDLER); } } /** * 客戶端鏈接成功以後的業務處理 */ static class ClientHandler extends SimpleChannelInboundHandler<String> { /** * 讀取到服務器端的消息 */ @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) { System.err.println("接到服務器的消息:" + msg); } /** * 異常處理,打印異常並關閉通道 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
從以上代碼能夠看出,咱們代碼實現的功能是,客戶端給服務器端發送 10 條消息。
編寫完上述代碼以後,咱們就能夠啓動服務器端和客戶端了,啓動以後,它們的執行結果以下:
從上述結果中能夠看出,雖然客戶端和服務器端實現了通訊,但在 Netty 的使用中依然存在粘包的問題,服務器端一次收到了 10 條消息,而不是每次只收到一條消息,所以接下來咱們要解決掉 Netty 中的粘包問題。
在 Netty 中,解決粘包問題的經常使用方案有如下 3 種:
接下來咱們分別來看後兩種推薦的解決方案。
在 Netty 中提供了 DelimiterBasedFrameDecoder 類用來以特殊符號做爲消息的結束符,從而解決粘包和半包的問題。
它的核心實現代碼是在初始化通道(Channel)時,經過設置 DelimiterBasedFrameDecoder 來分隔消息,須要在客戶端和服務器端都進行設置,具體實現代碼以下。
服務器端核心實現代碼以下:
/** * 服務端通道初始化 */ static class ServerInitializer extends ChannelInitializer<SocketChannel> { // 字符串編碼器和解碼器 private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); // 服務器端鏈接以後的執行器(自定義的類) private static final ServerHandler SERVER_HANDLER = new ServerHandler(); /** * 初始化通道的具體執行方法 */ @Override public void initChannel(SocketChannel ch) { // 通道 Channel 設置 ChannelPipeline pipeline = ch.pipeline(); // 19 行:設置結尾分隔符【核心代碼】(參數1:爲消息的最大長度,可自定義;參數2:分隔符[此處以換行符爲分隔符]) pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter())); // 設置(字符串)編碼器和解碼器 pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // 服務器端鏈接以後的執行器,接收到消息以後的業務處理 pipeline.addLast(SERVER_HANDLER); } }
核心代碼爲第 19 行,代碼中已經備註了方法的含義,這裏就再也不贅述。
客戶端的核心實現代碼以下:
/** * 客戶端通道初始化類 */ static class ClientInitializer extends ChannelInitializer<SocketChannel> { // 字符串編碼器和解碼器 private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); // 客戶端鏈接成功以後業務處理 private static final ClientHandler CLIENT_HANDLER = new ClientHandler(); /** * 初始化客戶端通道 */ @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); // 17 行:設置結尾分隔符【核心代碼】(參數1:爲消息的最大長度,可自定義;參數2:分隔符[此處以換行符爲分隔符]) pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter())); // 設置(字符串)編碼器和解碼器 pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // 客戶端鏈接成功以後的業務處理 pipeline.addLast(CLIENT_HANDLER); } }
完整的服務器端和客戶端的實現代碼以下:
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class NettyExample { // 定義服務器的端口號 static final int PORT = 8007; /** * 服務器端 */ static class MyNettyServer { public static void main(String[] args) { // 建立一個線程組,用來負責接收客戶端鏈接 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 建立另外一個線程組,用來負責 I/O 的讀寫 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 建立一個 Server 實例(可理解爲 Netty 的入門類) ServerBootstrap b = new ServerBootstrap(); // 將兩個線程池設置到 Server 實例 b.group(bossGroup, workerGroup) // 設置 Netty 通道的類型爲 NioServerSocket(非阻塞 I/O Socket 服務器) .channel(NioServerSocketChannel.class) // 設置創建鏈接以後的執行器(ServerInitializer 是我建立的一個自定義類) .childHandler(new ServerInitializer()); // 綁定端口而且進行同步 ChannelFuture future = b.bind(PORT).sync(); // 對關閉通道進行監聽 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 資源關閉 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } /** * 服務端通道初始化 */ static class ServerInitializer extends ChannelInitializer<SocketChannel> { // 字符串編碼器和解碼器 private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); // 服務器端鏈接以後的執行器(自定義的類) private static final ServerHandler SERVER_HANDLER = new ServerHandler(); /** * 初始化通道的具體執行方法 */ @Override public void initChannel(SocketChannel ch) { // 通道 Channel 設置 ChannelPipeline pipeline = ch.pipeline(); // 設置結尾分隔符 pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter())); // 設置(字符串)編碼器和解碼器 pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // 服務器端鏈接以後的執行器,接收到消息以後的業務處理 pipeline.addLast(SERVER_HANDLER); } } /** * 服務器端接收到消息以後的業務處理類 */ static class ServerHandler extends SimpleChannelInboundHandler<String> { /** * 讀取到客戶端的消息 */ @Override public void channelRead0(ChannelHandlerContext ctx, String request) { if (!request.isEmpty()) { System.out.println("接到客戶端的消息:" + request); } } /** * 數據讀取完畢 */ @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } /** * 異常處理,打印異常並關閉通道 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } } /** * 客戶端 */ static class MyNettyClient { public static void main(String[] args) { // 建立事件循環線程組(客戶端的線程組只有一個) EventLoopGroup group = new NioEventLoopGroup(); try { // Netty 客戶端啓動對象 Bootstrap b = new Bootstrap(); // 設置啓動參數 b.group(group) // 設置通道類型 .channel(NioSocketChannel.class) // 設置啓動執行器(負責啓動事件的業務執行,ClientInitializer 爲自定義的類) .handler(new ClientInitializer()); // 鏈接服務器端並同步通道 Channel ch = b.connect("127.0.0.1", PORT).sync().channel(); // 發送消息 ChannelFuture lastWriteFuture = null; // 給服務器端發送 10 條消息 for (int i = 0; i < 10; i++) { // 發送給服務器消息 lastWriteFuture = ch.writeAndFlush("Hi,Java.\n"); } // 在關閉通道以前,同步刷新全部的消息 if (lastWriteFuture != null) { lastWriteFuture.sync(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { // 釋放資源 group.shutdownGracefully(); } } } /** * 客戶端通道初始化類 */ static class ClientInitializer extends ChannelInitializer<SocketChannel> { // 字符串編碼器和解碼器 private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); // 客戶端鏈接成功以後業務處理 private static final ClientHandler CLIENT_HANDLER = new ClientHandler(); /** * 初始化客戶端通道 */ @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); // 設置結尾分隔符 pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter())); // 設置(字符串)編碼器和解碼器 pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // 客戶端鏈接成功以後的業務處理 pipeline.addLast(CLIENT_HANDLER); } } /** * 客戶端鏈接成功以後的業務處理 */ static class ClientHandler extends SimpleChannelInboundHandler<String> { /** * 讀取到服務器端的消息 */ @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) { System.err.println("接到服務器的消息:" + msg); } /** * 異常處理,打印異常並關閉通道 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } } }
最終的執行結果以下圖所示:
從上述結果中能夠看出,Netty 能夠正常使用了,它已經不存在粘包和半包問題了。
此解決方案的核心是將消息分爲消息頭 + 消息體,在消息頭中保存消息體的長度,從而肯定一條消息的邊界,這樣就避免了粘包和半包問題了,它的實現過程以下圖所示:
在 Netty 中能夠經過 LengthFieldPrepender(編碼)和 LengthFieldBasedFrameDecoder(解碼)兩個類實現消息的封裝。和上一個解決方案相似,咱們須要分別在服務器端和客戶端經過設置通道(Channel)來解決粘包問題。
服務器端的核心代碼以下:
/** * 服務端通道初始化 */ static class ServerInitializer extends ChannelInitializer<SocketChannel> { // 字符串編碼器和解碼器 private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); // 服務器端鏈接以後的執行器(自定義的類) private static final NettyExample.ServerHandler SERVER_HANDLER = new NettyExample.ServerHandler(); /** * 初始化通道的具體執行方法 */ @Override public void initChannel(SocketChannel ch) { // 通道 Channel 設置 ChannelPipeline pipeline = ch.pipeline(); // 18 行:消息解碼:讀取消息頭和消息體 pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)); // 20 行:消息編碼:將消息封裝爲消息頭和消息體,在消息前添加消息體的長度 pipeline.addLast(new LengthFieldPrepender(4)); // 設置(字符串)編碼器和解碼器 pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // 服務器端鏈接以後的執行器,接收到消息以後的業務處理 pipeline.addLast(SERVER_HANDLER); } }
其中核心代碼是 18 行和 20 行,經過 LengthFieldPrepender 實現編碼(將消息打包成消息頭 + 消息體),經過 LengthFieldBasedFrameDecoder 實現解碼(從封裝的消息中取出消息的內容)。
LengthFieldBasedFrameDecoder 的參數說明以下:
LengthFieldBasedFrameDecoder(1024,0,4,0,4) 的意思是:數據包最大長度爲 1024,長度域佔首部的四個字節,在讀數據的時候去掉首部四個字節(即長度域)。
客戶端的核心實現代碼以下:
/** * 客戶端通道初始化類 */ static class ClientInitializer extends ChannelInitializer<SocketChannel> { // 字符串編碼器和解碼器 private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); // 客戶端鏈接成功以後業務處理 private static final NettyExample.ClientHandler CLIENT_HANDLER = new NettyExample.ClientHandler(); /** * 初始化客戶端通道 */ @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); // 消息解碼:讀取消息頭和消息體 pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)); // 消息編碼:將消息封裝爲消息頭和消息體,在響應字節數據前面添加消息體長度 pipeline.addLast(new LengthFieldPrepender(4)); // 設置(字符串)編碼器和解碼器 pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // 客戶端鏈接成功以後的業務處理 pipeline.addLast(CLIENT_HANDLER); } }
完整的服務器端和客戶端的實現代碼以下:
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; /** * 經過封裝 Netty 來解決粘包 */ public class NettyExample { // 定義服務器的端口號 static final int PORT = 8007; /** * 服務器端 */ static class MyNettyServer { public static void main(String[] args) { // 建立一個線程組,用來負責接收客戶端鏈接 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 建立另外一個線程組,用來負責 I/O 的讀寫 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 建立一個 Server 實例(可理解爲 Netty 的入門類) ServerBootstrap b = new ServerBootstrap(); // 將兩個線程池設置到 Server 實例 b.group(bossGroup, workerGroup) // 設置 Netty 通道的類型爲 NioServerSocket(非阻塞 I/O Socket 服務器) .channel(NioServerSocketChannel.class) // 設置創建鏈接以後的執行器(ServerInitializer 是我建立的一個自定義類) .childHandler(new NettyExample.ServerInitializer()); // 綁定端口而且進行同步 ChannelFuture future = b.bind(PORT).sync(); // 對關閉通道進行監聽 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 資源關閉 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } /** * 服務端通道初始化 */ static class ServerInitializer extends ChannelInitializer<SocketChannel> { // 字符串編碼器和解碼器 private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); // 服務器端鏈接以後的執行器(自定義的類) private static final NettyExample.ServerHandler SERVER_HANDLER = new NettyExample.ServerHandler(); /** * 初始化通道的具體執行方法 */ @Override public void initChannel(SocketChannel ch) { // 通道 Channel 設置 ChannelPipeline pipeline = ch.pipeline(); // 消息解碼:讀取消息頭和消息體 pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)); // 消息編碼:將消息封裝爲消息頭和消息體,在響應字節數據前面添加消息體長度 pipeline.addLast(new LengthFieldPrepender(4)); // 設置(字符串)編碼器和解碼器 pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // 服務器端鏈接以後的執行器,接收到消息以後的業務處理 pipeline.addLast(SERVER_HANDLER); } } /** * 服務器端接收到消息以後的業務處理類 */ static class ServerHandler extends SimpleChannelInboundHandler<String> { /** * 讀取到客戶端的消息 */ @Override public void channelRead0(ChannelHandlerContext ctx, String request) { if (!request.isEmpty()) { System.out.println("接到客戶端的消息:" + request); } } /** * 數據讀取完畢 */ @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } /** * 異常處理,打印異常並關閉通道 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } } /** * 客戶端 */ static class MyNettyClient { public static void main(String[] args) { // 建立事件循環線程組(客戶端的線程組只有一個) EventLoopGroup group = new NioEventLoopGroup(); try { // Netty 客戶端啓動對象 Bootstrap b = new Bootstrap(); // 設置啓動參數 b.group(group) // 設置通道類型 .channel(NioSocketChannel.class) // 設置啓動執行器(負責啓動事件的業務執行,ClientInitializer 爲自定義的類) .handler(new NettyExample.ClientInitializer()); // 鏈接服務器端並同步通道 Channel ch = b.connect("127.0.0.1", PORT).sync().channel(); // 發送消息 ChannelFuture lastWriteFuture = null; // 給服務器端發送 10 條消息 for (int i = 0; i < 10; i++) { // 發送給服務器消息 lastWriteFuture = ch.writeAndFlush("Hi,Java.\n"); } // 在關閉通道以前,同步刷新全部的消息 if (lastWriteFuture != null) { lastWriteFuture.sync(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { // 釋放資源 group.shutdownGracefully(); } } } /** * 客戶端通道初始化類 */ static class ClientInitializer extends ChannelInitializer<SocketChannel> { // 字符串編碼器和解碼器 private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); // 客戶端鏈接成功以後業務處理 private static final NettyExample.ClientHandler CLIENT_HANDLER = new NettyExample.ClientHandler(); /** * 初始化客戶端通道 */ @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); // 消息解碼:讀取消息頭和消息體 pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)); // 消息編碼:將消息封裝爲消息頭和消息體,在響應字節數據前面添加消息體長度 pipeline.addLast(new LengthFieldPrepender(4)); // 設置(字符串)編碼器和解碼器 pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // 客戶端鏈接成功以後的業務處理 pipeline.addLast(CLIENT_HANDLER); } } /** * 客戶端鏈接成功以後的業務處理 */ static class ClientHandler extends SimpleChannelInboundHandler<String> { /** * 讀取到服務器端的消息 */ @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) { System.err.println("接到服務器的消息:" + msg); } /** * 異常處理,打印異常並關閉通道 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } } }
以上程序的執行結果爲:
本文提供了傳統 Socket 通信將消息分爲消息頭和消息體的具體代碼實現,然而傳統的 Socket 在性能和複用性上表現通常,爲了更加高效的實現通信,咱們可使用 Netty 框架來替代傳統的 Socket 和 NIO 編程,但 Netty 在使用時依然會出現粘包的問題,因而咱們提供了兩種最多見的解決方案:經過分隔符或將封裝消息的解決方案,其中最後一種解決方案的使用更加普遍。
《Netty 核心原理剖析與 RPC 實踐》