Netty學習筆記(一) - 簡介和組件設計

在互聯網發達的今天,網絡已經深刻到生活的方方面面,一個高效、性能可靠的網絡通訊已經成爲一個重要的訴求,在Java方面須要尋求一種高性能網絡編程的實踐。java

1、簡介

當前JDK(本文使用的JDK 1.8)中已經有網絡編程相關的API,使用過程當中或多或少會存在如下幾個問題:編程

  • 阻塞:早期JDK裏的API是用阻塞式的實現方式,在讀寫數據調用時數據尚未準備好,或者目前不可寫,操做就會被阻塞直到數據準備好或目標可寫爲止。雖然能夠採用每個鏈接建立一個線程進行處理,可是可能會形成大量線程得不到釋放,消耗資源。從JDK 1.4開始提供非阻塞的實現。
  • 處理和調度IO煩瑣:偏底層的API實現暴露了更多的與業務無關的操做細節,使得在高負載下實現一個可靠和高效的邏輯就變得複雜和煩瑣。

Netty是一款異步的事件驅動的網絡應用程序框架,支持快速地開發可維護的高性能的面向協議的服務器和客戶端。它擁有簡單而強大的設計模型,易於使用,擁有比Java API更高的性能等特色,它屏蔽了底層實現的細節,使開發人員更關注業務邏輯的實現。bootstrap

2、組件和設計

  • Channel:屏蔽底層網絡傳輸細節,提供簡單易用的諸如bind、connect、read、write方法。
  • EventLoop:線程模型。處理鏈接生命週期過程當中發生的事件,以及其餘一些任務。
  • ChannelFuture:異步接口,用於註冊Listener以便在某個操做完成時獲得通知。
  • ChannelHandler:處理入站和出站數據的的一系列接口和抽象類,開發人員擴展這些類型來完成業務邏輯。
  • ChannelPipline:管理ChannelHandler的容器,將多個ChannelHandler以鏈式的方式管理,數據將在這個鏈上依次流動並被ChannelHandler逐個處理。
  • 引導(Bootstrap、ServerBootstrap):初始化客戶端和服務端的入口類。

3、一個簡單的Demo

建立一個maven工程,引入Netty。爲了方便調試,Demo中引入了日誌和junit5。api

 1 <!-- pom.xml -->
 2 
 3 <dependencies>
 4     <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
 5     <dependency>
 6         <groupId>io.netty</groupId>
 7         <artifactId>netty-all</artifactId>
 8         <version>4.1.50.Final</version>
 9     </dependency>
10 
11     <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
12     <dependency>
13         <groupId>org.slf4j</groupId>
14         <artifactId>slf4j-api</artifactId>
15         <version>1.7.30</version>
16     </dependency>
17 
18     <!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
19     <dependency>
20         <groupId>ch.qos.logback</groupId>
21         <artifactId>logback-classic</artifactId>
22         <version>1.2.3</version>
23     </dependency>
24 
25     <!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-core -->
26     <dependency>
27         <groupId>ch.qos.logback</groupId>
28         <artifactId>logback-core</artifactId>
29         <version>1.2.3</version>
30     </dependency>
31 
32     <dependency>
33         <groupId>org.junit.jupiter</groupId>
34         <artifactId>junit-jupiter</artifactId>
35         <version>5.5.2</version>
36         <scope>test</scope>
37     </dependency>
38 </dependencies>

建立Client和Server服務器

 1 package com.niklai.demo;
 2 
 3 import io.netty.bootstrap.Bootstrap;
 4 import io.netty.buffer.ByteBuf;
 5 import io.netty.buffer.Unpooled;
 6 import io.netty.channel.ChannelFuture;
 7 import io.netty.channel.ChannelHandlerContext;
 8 import io.netty.channel.ChannelInboundHandlerAdapter;
 9 import io.netty.channel.ChannelInitializer;
10 import io.netty.channel.nio.NioEventLoopGroup;
11 import io.netty.channel.socket.SocketChannel;
12 import io.netty.channel.socket.nio.NioSocketChannel;
13 import io.netty.util.CharsetUtil;
14 import org.slf4j.Logger;
15 import org.slf4j.LoggerFactory;
16 
17 import java.net.InetSocketAddress;
18 
19 public class Client {
20     private static final Logger logger = LoggerFactory.getLogger(Client.class.getSimpleName());
21 
22     public static void init() {
23         try {
24             Bootstrap bootstrap = new Bootstrap();              // 初始化客戶端引導
25             NioEventLoopGroup group = new NioEventLoopGroup();
26             bootstrap.group(group)                              // 指定適用於NIO的EventLoop
27                     .channel(NioSocketChannel.class)            // 適用於NIO的Channel
28                     .remoteAddress(new InetSocketAddress("localhost", 9999))    // 指定要綁定的IP和端口
29                     .handler(new ChannelInitializer<SocketChannel>() {
30                         protected void initChannel(SocketChannel socketChannel) throws Exception {
31                             socketChannel.pipeline().addLast(new ClientHandler());      // 添加ChannelHandler到ChannelPipline
32                         }
33                     });
34             ChannelFuture future = bootstrap.connect().sync();      // 阻塞直到鏈接到遠程節點
35             future.channel().closeFuture().sync();                  // 阻塞直到關閉Channel
36             group.shutdownGracefully().sync();                      // 釋放資源
37         } catch (InterruptedException e) {
38             logger.error(e.getMessage(), e);
39         }
40     }
41 
42     static class ClientHandler extends ChannelInboundHandlerAdapter {
43         @Override
44         public void channelActive(ChannelHandlerContext ctx) throws Exception {
45             logger.info("channel active....");
46 
47             String msg = "Client message!";
48             logger.info("send message: {}....", msg);
49             ctx.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
50         }
51 
52         @Override
53         public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
54             ByteBuf buf = (ByteBuf) msg;
55             logger.info("read message: {}....", buf.toString(CharsetUtil.UTF_8));
56         }
57     }
58 }
 1 package com.niklai.demo;
 2 
 3 import io.netty.bootstrap.ServerBootstrap;
 4 import io.netty.buffer.ByteBuf;
 5 import io.netty.buffer.Unpooled;
 6 import io.netty.channel.*;
 7 import io.netty.channel.nio.NioEventLoopGroup;
 8 import io.netty.channel.socket.SocketChannel;
 9 import io.netty.channel.socket.nio.NioServerSocketChannel;
10 import io.netty.util.CharsetUtil;
11 import org.slf4j.Logger;
12 import org.slf4j.LoggerFactory;
13 
14 import java.net.InetSocketAddress;
15 
16 public class Server {
17     private static final Logger logger = LoggerFactory.getLogger(Server.class.getSimpleName());
18 
19     public static void init() {
20         try {
21             ServerBootstrap serverBootstrap = new ServerBootstrap();        // 初始化客戶端引導
22             NioEventLoopGroup group = new NioEventLoopGroup();
23             serverBootstrap.group(group)                                    // 指定適用於NIO的EventLoop
24                     .channel(NioServerSocketChannel.class)                  // 適用於NIO的Channel
25                     .localAddress(new InetSocketAddress("localhost", 9999))     // 指定要綁定的IP和端口
26                     .childHandler(new ChannelInitializer<SocketChannel>() {
27                         protected void initChannel(SocketChannel socketChannel) throws Exception {
28                             socketChannel.pipeline().addLast(new ServerHandler());      // 添加ChannelHandler到ChannelPipline
29                         }
30                     });
31 
32             ChannelFuture future = serverBootstrap.bind().sync();           // 異步綁定阻塞直到完成
33             future.channel().closeFuture().sync();                          // 阻塞直到關閉Channel
34             group.shutdownGracefully().sync();                              // 釋放資源
35         } catch (InterruptedException e) {
36             logger.error(e.getMessage(), e);
37         }
38     }
39 
40     static class ServerHandler extends ChannelInboundHandlerAdapter {
41         @Override
42         public void channelActive(ChannelHandlerContext ctx) throws Exception {
43             logger.info("channel active.....");
44         }
45 
46         @Override
47         public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
48             ByteBuf buf = (ByteBuf) msg;
49             logger.info("read message: {}.....", buf.toString(CharsetUtil.UTF_8));
50         }
51 
52         @Override
53         public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
54             logger.info("read complete.....");
55             ctx.writeAndFlush(Unpooled.copiedBuffer("receive message!", CharsetUtil.UTF_8))
56                     .addListener(ChannelFutureListener.CLOSE);
57         }
58     }
59 }

日誌配置文件網絡

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 
 3 <configuration>
 4     <!-- 定義控制檯輸出 -->
 5     <appender name="consoleOut" class="ch.qos.logback.core.ConsoleAppender">
 6         <encoder>
 7             <pattern>%date %level [%thread] %class#%method [%file:%line] %msg%n</pattern>
 8         </encoder>
 9     </appender>
10 
11     <root level="info">
12         <appender-ref ref="consoleOut" />
13     </root>
14 </configuration>
logback.xml

單元測試代碼app

 1 package com.niklai.demo;
 2 
 3 import org.junit.jupiter.api.Test;
 4 
 5 public class NettyTest {
 6 
 7     @Test
 8     public void test1() throws InterruptedException {
 9         new Thread(() -> {
10             // 服務端
11             Server.init();
12         }).start();
13         Thread.sleep(1000);
14 
15         // 客戶端
16         Client.init();
17 
18         Thread.sleep(5000);
19     }
20 }

 運行結果以下圖框架

從控制檯日誌中能夠看到當Client鏈接到Server後, ServerHandler 和 ClientHandler 的 channerActive 方法都會被調用, ClientHandler 會調用 ctx.writeAndFlush() 方法給Server發送一條消息, ServerHandler 的 channelRead 方法被調用讀取到消息,消息讀取完畢後 channelReadComplete 方法被調用,發送應答消息給Client, ClientHandler 的 channelRead 方法被調用獲取到應答消息。到此一個完整的發送--應答流程就結束了。異步

相關文章
相關標籤/搜索