netty入門篇(1)

上一篇 nio簡介   下一篇 netty中級篇(2)

1、爲何選擇Netty

Netty是最流行的框架之1、健壯性、功能、性能、可定製性和可擴展性在同類框架中數一數二,所以被大規模使用,例如ROCKETMQ的NameSRV,例如Hadoop的Avro,例如Dubbo中的RPC通訊等等。。html

爲何選擇Netty?java

  • API簡單;
  • 功能強大,預置了選多的編碼功能,支持多種主流協議;
  • 定製能力強,經過ChannelHandler對通訊框架進行靈活的擴展;
  • 性能強;
  • 成熟,修改已發現的JDK nio BUG
  • 社區活躍
  • 通過大規模的商業應用考驗,質量獲得驗證。

2、使用Netty開發TimeServer

環境準備: pom.xmlshell

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>demo</groupId>
    <artifactId>netty</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.5.Final</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

1. Netty TimeServer

 

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * @author lilinfeng
 * @version 1.0
 * @date 2014年2月14日
 */
public class TimeServer {

    public void bind(int port) throws Exception {
        // 配置服務端的NIO線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //Netty啓動Nio服務端的輔助類
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024) //設置服務端tcp參數
                    .childHandler(new ChildChannelHandler());
            // 綁定端口,同步等待成功
            ChannelFuture f = b.bind(port).sync();

            // 進行阻塞,等待服務端監聽端口關閉
            f.channel().closeFuture().sync();
        } finally {
            // 優雅退出,釋放線程池資源
            System.out.println("服務器關閉...");
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel arg0) throws Exception {
            arg0.pipeline().addLast(new TimeServerHandler());
        }

    }

    /**
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                // 採用默認值
            }
        }
        new TimeServer().bind(port);
    }
}
  • EventLoopGroup Reactor線程組 2個
  • ServerBootstrap:Server端輔助工具
  • 設置channel: NioServerSocketChannel
  • option: 服務端tcp option設置,這裏以backlog 1024爲例..
  • 增長childHandler
  • f.channel().closeFuture().sync()表示進行阻塞,等待服務器端鏈路關閉以後main函數才退出

2. TimeServerHandler

 1 import io.netty.buffer.ByteBuf;
 2 import io.netty.buffer.Unpooled;
 3 import io.netty.channel.ChannelHandlerAdapter;
 4 import io.netty.channel.ChannelHandlerContext;
 5 import io.netty.channel.ChannelInboundHandlerAdapter;
 6 
 7 /**
 8  * @author lilinfeng
 9  * @version 1.0
10  * @date 2014年2月14日
11  */
12 public class TimeServerHandler extends ChannelInboundHandlerAdapter {
13 
14 
15     @Override
16     public void channelRead(ChannelHandlerContext ctx, Object msg)
17             throws Exception {
18         ByteBuf buf = (ByteBuf) msg;
19         byte[] req = new byte[buf.readableBytes()];
20         buf.readBytes(req);
21         String body = new String(req, "UTF-8");
22         System.out.println("The time server receive order : " + body);
23         String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(
24                 System.currentTimeMillis()).toString() : "BAD ORDER";
25         ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
26         ctx.write(resp);
27     }
28 
29     @Override
30     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
31         ctx.flush();
32     }
33 
34     @Override
35     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
36         ctx.close();
37     }
38 }

(1) 18行作類型轉換,將msg轉換爲Netty的ByteBuf對象,這個對象比ByteBuffer更增強大和靈活。apache

(2) 19行到20行經過ByteBuf的readableBytes獲取緩衝區可讀的字節數,根據可讀的字節數建立byte數組。將緩衝區的內容讀取到byte數組中。bootstrap

(3) 31行發現了flush方法,其做用是將消息發送隊列中的消息寫入到SocketChannel中發送給對方。從性能上考慮,爲了防止頻繁喚醒Selector進行消息發送,Netty的write方法不直接寫入到SocketChannel中,調用write方法只會寫入到緩衝數組中,調用flush方法,纔會寫入到SocketChannel中。數組

(4) 36行的close()是在發生異常後釋放資源服務器

總結: 就是比NIO舒服太多了.框架

3. Time Client

 1 import io.netty.bootstrap.Bootstrap;
 2 import io.netty.channel.ChannelFuture;
 3 import io.netty.channel.ChannelInitializer;
 4 import io.netty.channel.ChannelOption;
 5 import io.netty.channel.EventLoopGroup;
 6 import io.netty.channel.nio.NioEventLoopGroup;
 7 import io.netty.channel.socket.SocketChannel;
 8 import io.netty.channel.socket.nio.NioSocketChannel;
 9 
10 /**
11  * @author lilinfeng
12  * @version 1.0
13  * @date 2014年2月14日
14  */
15 public class TimeClient {
16 
17     public void connect(int port, String host) throws Exception {
18         // 配置客戶端NIO線程組
19         EventLoopGroup group = new NioEventLoopGroup();
20         try {
21             Bootstrap b = new Bootstrap();
22             b.group(group).channel(NioSocketChannel.class)
23                     .option(ChannelOption.TCP_NODELAY, true)
24                     .handler(new ChannelInitializer<SocketChannel>() {
25                         @Override
26                         public void initChannel(SocketChannel ch)
27                                 throws Exception {
28                             ch.pipeline().addLast(new TimeClientHandler());
29                         }
30                     });
31 
32             // 發起異步鏈接操做
33             ChannelFuture f = b.connect(host, port).sync();
34 
35             // 當代客戶端鏈路關閉
36             f.channel().closeFuture().sync();
37         } finally {
38             // 優雅退出,釋放NIO線程組
39             group.shutdownGracefully();
40         }
41     }
42 
43     /**
44      * @param args
45      * @throws Exception
46      */
47     public static void main(String[] args) throws Exception {
48         int port = 8080;
49         if (args != null && args.length > 0) {
50             try {
51                 port = Integer.valueOf(args[0]);
52             } catch (NumberFormatException e) {
53                 // 採用默認值
54             }
55         }
56         new TimeClient().connect(port, "127.0.0.1");
57     }
58 }

(1) 19行建立客戶端處理I/O讀寫的NioEventLoopGroup線程組,而後繼續建立輔助類Bootstrap,而且對其配置,此處配置爲 NioSocketChannel,而後爲其添加Handler。異步

(2) 這裏Handler直接使用匿名內部類socket

(3) 33行的connect發送異步鏈接請求,而後阻塞直到關閉。

4. TimeClientHandler

 1 import io.netty.buffer.ByteBuf;
 2 import io.netty.buffer.Unpooled;
 3 import io.netty.channel.ChannelHandlerAdapter;
 4 import io.netty.channel.ChannelHandlerContext;
 5 import io.netty.channel.ChannelInboundHandlerAdapter;
 6 
 7 import java.util.logging.Logger;
 8 
 9 /**
10  * @author lilinfeng
11  * @version 1.0
12  * @date 2014年2月14日
13  */
14 public class TimeClientHandler extends ChannelInboundHandlerAdapter {
15 
16     private static final Logger logger = Logger
17             .getLogger(TimeClientHandler.class.getName());
18 
19     private final ByteBuf firstMessage;
20 
21     /**
22      * Creates a client-side handler.
23      */
24     public TimeClientHandler() {
25         byte[] req = "QUERY TIME ORDER".getBytes();
26         firstMessage = Unpooled.buffer(req.length);
27         firstMessage.writeBytes(req);
28 
29     }
30 
31     @Override
32     public void channelActive(ChannelHandlerContext ctx) {
33         ctx.writeAndFlush(firstMessage);
34     }
35 
36     @Override
37     public void channelRead(ChannelHandlerContext ctx, Object msg)
38             throws Exception {
39         ByteBuf buf = (ByteBuf) msg;
40         byte[] req = new byte[buf.readableBytes()];
41         buf.readBytes(req);
42         String body = new String(req, "UTF-8");
43         System.out.println("Now is : " + body);
44     }
45 
46     @Override
47     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
48         // 釋放資源
49         logger.warning("Unexpected exception from downstream : "
50                 + cause.getMessage());
51         ctx.close();
52     }
53 }

 

這裏重點關注3個方法: channelActive channelRead和exceptionCaught

(1) 當客戶端和服務器端成功建立鏈路,調用channelActive方法,發送查詢時間的指令給服務端,調用writeAndFlush方法發送數據。

(2) 39行開始調用channelRead,讀取數據,49行處理異常時釋放資源便可。

3、TCP 粘包/拆包問題的解決之道

一、TCP得粘包和拆包問題

  • TCP是一個流協議
  • TCP底層不瞭解業務數據含義,即不知道多少個字節算是業務上的一總體數據
  • 所以業務上認爲,一個完整的包會被TCP拆分爲多個包進行發送,也有可能將多個小的包封裝成一個大包進行發送、

用下圖進行描述,假設client發送了2個包,D1和D2,服務器端讀到的數據是不肯定的

存在4種可能:

server 分2次,分別讀到D1,D2,完美巧合,沒有粘包和拆包

server一次讀到D1,D2,D1和D2粘在一塊兒,稱爲粘包

server分2次,第一次讀到D1和D2的部份內容,第二次讀到了D2的剩餘內容,拆包

server分2次,第一次讀到D1的部份內容D1_1,第二次讀到D1剩下的內容D1_2和完整的D2。

若是此時服務器端TCP接收的滑窗很是的小、並且數據包D1和D2都比較大,頗有可能發生第5種可能性,服務器端屢次才能將D1和D2接收徹底,期間發生屢次拆包...即上4種狀況的屢次組合...

下面咱們來分析一下緣由:

3個緣由:

(1) 應用程序write寫入的字節大於套接口(scoket)發送緩衝的大小;

(2) 進行MSS大小的TCP分段;

(3) 以太網幀的payload大於MTU進行IP分片

總結就是:不可避免...

解決思路:

(1) 定長數據,例如每一個報文200bytes,不夠空格補充...

(2) 在包圍增長回車換行符或者其餘的特殊字符進行分割,例如FTP協議

(3) 將消息分爲消息頭和消息體,消息頭中包含消息總長度(或者消息體長度)content-length,一般的設計思路爲消息頭的第一個字段用int32來表示消息的總長度;

(4) 更復雜的應用層協議

2. 下面咱們來模擬未考慮TCP粘包問題致使功能異常

修改上面的代碼:

修改TimeServerHandler

 1 import io.netty.buffer.ByteBuf;
 2 import io.netty.buffer.Unpooled;
 3 import io.netty.channel.ChannelHandlerContext;
 4 import io.netty.channel.ChannelInboundHandlerAdapter;
 5 
 6 /**
 7  * @author lilinfeng
 8  * @version 1.0
 9  * @date 2014年2月14日
10  */
11 public class TimeServerHandler extends ChannelInboundHandlerAdapter {
12 
13     private int counter;
14 
15     @Override
16     public void channelRead(ChannelHandlerContext ctx, Object msg)
17             throws Exception {
18         ByteBuf buf = (ByteBuf) msg;
19         byte[] req = new byte[buf.readableBytes()];
20         buf.readBytes(req);
21         String body = new String(req, "UTF-8").substring(0, req.length
22                 - System.getProperty("line.separator").length());
23         System.out.println("The time server receive order : " + body
24                 + " ; the counter is : " + ++counter);
25         String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(
26                 System.currentTimeMillis()).toString() : "BAD ORDER";
27         currentTime = currentTime + System.getProperty("line.separator");
28         ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
29         ctx.writeAndFlush(resp);
30     }
31 
32     @Override
33     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
34         ctx.close();
35     }
36 }

主要是增長了一個counter進行計數..

修改TimeClientHandler

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.logging.Logger;

/**
 * @author lilinfeng
 * @version 1.0
 * @date 2014年2月14日
 */
public class TimeClientHandler extends ChannelInboundHandlerAdapter {

    private static final Logger logger = Logger
            .getLogger(TimeClientHandler.class.getName());

    private final ByteBuf firstMessage;

    /**
     * Creates a client-side handler.
     */
    public TimeClientHandler() {
        byte[] req = "QUERY TIME ORDER".getBytes();
        firstMessage = Unpooled.buffer(req.length);
        firstMessage.writeBytes(req);

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(firstMessage);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
        System.out.println("Now is : " + body);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // 釋放資源
        logger.warning("Unexpected exception from downstream : "
                + cause.getMessage());
        ctx.close();
    }
}

主要是進行100次連續的發送數據...

因爲tcp粘包拆包有必定的隨機性,因此每次的結果可能不一樣,其中一次結果大體上是:

Server端打印:

QUERY TIME ORDER
....
the counter is : 2

Client端打印:

Now is : Thu Dec 15 15:11:22 CST 2016
BAD ORDER
BAD ORDER
 ; the counter is : 1

結果代表:client發送了100條消息,可是server是按照2次接收,只返回2條應答,可是client上的counter爲1代表只client也接收了一次,說明這2條也進行了粘包。

3. 解決TCP粘包的TimeServer

 

TimeServer

 1 import io.netty.bootstrap.ServerBootstrap;
 2 import io.netty.channel.ChannelFuture;
 3 import io.netty.channel.ChannelInitializer;
 4 import io.netty.channel.ChannelOption;
 5 import io.netty.channel.EventLoopGroup;
 6 import io.netty.channel.nio.NioEventLoopGroup;
 7 import io.netty.channel.socket.SocketChannel;
 8 import io.netty.channel.socket.nio.NioServerSocketChannel;
 9 import io.netty.handler.codec.LineBasedFrameDecoder;
10 import io.netty.handler.codec.string.StringDecoder;
11 
12 /**
13  * @author lilinfeng
14  * @version 1.0
15  * @date 2014年2月14日
16  */
17 public class TimeServer {
18 
19     public void bind(int port) throws Exception {
20         // 配置服務端的NIO線程組
21         EventLoopGroup bossGroup = new NioEventLoopGroup();
22         EventLoopGroup workerGroup = new NioEventLoopGroup();
23         try {
24             ServerBootstrap b = new ServerBootstrap();
25             b.group(bossGroup, workerGroup)
26                     .channel(NioServerSocketChannel.class)
27                     .option(ChannelOption.SO_BACKLOG, 1024)
28                     .childHandler(new ChildChannelHandler());
29             // 綁定端口,同步等待成功
30             ChannelFuture f = b.bind(port).sync();
31 
32             // 等待服務端監聽端口關閉
33             f.channel().closeFuture().sync();
34         } finally {
35             // 優雅退出,釋放線程池資源
36             bossGroup.shutdownGracefully();
37             workerGroup.shutdownGracefully();
38         }
39     }
40 
41     private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
42         @Override
43         protected void initChannel(SocketChannel arg0) throws Exception {
44             arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
45             arg0.pipeline().addLast(new StringDecoder());
46             arg0.pipeline().addLast(new TimeServerHandler());
47         }
48     }
49 
50     /**
51      * @param args
52      * @throws Exception
53      */
54     public static void main(String[] args) throws Exception {
55         int port = 8080;
56         if (args != null && args.length > 0) {
57             try {
58                 port = Integer.valueOf(args[0]);
59             } catch (NumberFormatException e) {
60                 // 採用默認值
61             }
62         }
63         new TimeServer().bind(port);
64     }
65 }

重點看44行,增長了2個解碼器: LineBasedFrameDecoder和StringDecoder。

繼續看TimeServerHandler

 1 import io.netty.buffer.ByteBuf;
 2 import io.netty.buffer.Unpooled;
 3 import io.netty.channel.ChannelHandlerContext;
 4 import io.netty.channel.ChannelInboundHandlerAdapter;
 5 
 6 /**
 7  * @author lilinfeng
 8  * @version 1.0
 9  * @date 2014年2月14日
10  */
11 public class TimeServerHandler extends ChannelInboundHandlerAdapter {
12 
13     private int counter;
14 
15     @Override
16     public void channelRead(ChannelHandlerContext ctx, Object msg)
17             throws Exception {
18         String body = (String) msg;
19         System.out.println("The time server receive order : " + body
20                 + " ; the counter is : " + ++counter);
21         String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(
22                 System.currentTimeMillis()).toString() : "BAD ORDER";
23         currentTime = currentTime + System.getProperty("line.separator");
24         ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
25         ctx.writeAndFlush(resp);
26     }
27 
28     @Override
29     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
30         ctx.close();
31     }
32 }

看18行,直接獲取以後不是ByteBuf,而直接是一個String對象,代碼很是簡潔。

TimeClient

 1 import io.netty.bootstrap.Bootstrap;
 2 import io.netty.channel.ChannelFuture;
 3 import io.netty.channel.ChannelInitializer;
 4 import io.netty.channel.ChannelOption;
 5 import io.netty.channel.EventLoopGroup;
 6 import io.netty.channel.nio.NioEventLoopGroup;
 7 import io.netty.channel.socket.SocketChannel;
 8 import io.netty.channel.socket.nio.NioSocketChannel;
 9 import io.netty.handler.codec.LineBasedFrameDecoder;
10 import io.netty.handler.codec.string.StringDecoder;
11 
12 /**
13  * @author lilinfeng
14  * @version 1.0
15  * @date 2014年2月14日
16  */
17 public class TimeClient {
18 
19     public void connect(int port, String host) throws Exception {
20         // 配置客戶端NIO線程組
21         EventLoopGroup group = new NioEventLoopGroup();
22         try {
23             Bootstrap b = new Bootstrap();
24             b.group(group).channel(NioSocketChannel.class)
25                     .option(ChannelOption.TCP_NODELAY, true)
26                     .handler(new ChannelInitializer<SocketChannel>() {
27                         @Override
28                         public void initChannel(SocketChannel ch)
29                                 throws Exception {
30                             ch.pipeline().addLast(
31                                     new LineBasedFrameDecoder(1024));
32                             ch.pipeline().addLast(new StringDecoder());
33                             ch.pipeline().addLast(new TimeClientHandler());
34                         }
35                     });
36 
37             // 發起異步鏈接操做
38             ChannelFuture f = b.connect(host, port).sync();
39 
40             // 當代客戶端鏈路關閉
41             f.channel().closeFuture().sync();
42         } finally {
43             // 優雅退出,釋放NIO線程組
44             group.shutdownGracefully();
45         }
46     }
47 
48     /**
49      * @param args
50      * @throws Exception
51      */
52     public static void main(String[] args) throws Exception {
53         int port = 8080;
54         if (args != null && args.length > 0) {
55             try {
56                 port = Integer.valueOf(args[0]);
57             } catch (NumberFormatException e) {
58                 // 採用默認值
59             }
60         }
61         new TimeClient().connect(port, "127.0.0.1");
62     }
63 }

相似TimeServer增長了2個解碼器

再看TimeClientHandler

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.logging.Logger;

/**
 * @author lilinfeng
 * @version 1.0
 * @date 2014年2月14日
 */
public class TimeClientHandler extends ChannelInboundHandlerAdapter {

    private static final Logger logger = Logger
            .getLogger(TimeClientHandler.class.getName());

    private int counter;

    private byte[] req;

    /**
     * Creates a client-side handler.
     */
    public TimeClientHandler() {
        req = ("QUERY TIME ORDER" + System.getProperty("line.separator"))
                .getBytes();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ByteBuf message = null;
        for (int i = 0; i < 100; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        String body = (String) msg;
        System.out.println("Now is : " + body + " ; the counter is : "
                + ++counter);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // 釋放資源
        logger.warning("Unexpected exception from downstream : "
                + cause.getMessage());
        ctx.close();
    }
}

直接運行發現徹底符合咱們需求

4. 分析LineBaseFrameDecoder和StringDecoder

LineBasedFrameDecoder的工做原理很是簡單:

(1) 遍歷ByteBuf中的可讀字節,判斷看是否有"\n"或者"\r\n",若是有,就以此爲結束位置,從可讀索引到結束位置區間的字節就組成了一行

(2) 是一個以換行符爲結束標誌的解碼器,支持攜帶結束符或者不攜帶結束符2種方式,同時支持配置單行的最大長度。

(3) 超過單行最大長度直接拋異常

StringDecoder的很是簡單:

(1) 將接收的對象轉換爲字符串

(2) 繼續調用後面的Handler

所以:

LineBasedFrameDecoder和StringDecoder組合在一塊兒就是行切換文件解碼器。

4、分割符解碼器的應用

使用DelimiterBasedFrameDecoder便可...

1. EohoServer

 1 import io.netty.bootstrap.ServerBootstrap;
 2 import io.netty.buffer.ByteBuf;
 3 import io.netty.buffer.Unpooled;
 4 import io.netty.channel.ChannelFuture;
 5 import io.netty.channel.ChannelInitializer;
 6 import io.netty.channel.ChannelOption;
 7 import io.netty.channel.EventLoopGroup;
 8 import io.netty.channel.nio.NioEventLoopGroup;
 9 import io.netty.channel.socket.SocketChannel;
10 import io.netty.channel.socket.nio.NioServerSocketChannel;
11 import io.netty.handler.codec.DelimiterBasedFrameDecoder;
12 import io.netty.handler.codec.string.StringDecoder;
13 import io.netty.handler.logging.LogLevel;
14 import io.netty.handler.logging.LoggingHandler;
15 
16 /**
17  * @author lilinfeng
18  * @version 1.0
19  * @date 2014年2月14日
20  */
21 public class EchoServer {
22     public void bind(int port) throws Exception {
23         // 配置服務端的NIO線程組
24         EventLoopGroup bossGroup = new NioEventLoopGroup();
25         EventLoopGroup workerGroup = new NioEventLoopGroup();
26         try {
27             ServerBootstrap b = new ServerBootstrap();
28             b.group(bossGroup, workerGroup)
29                     .channel(NioServerSocketChannel.class)
30                     .option(ChannelOption.SO_BACKLOG, 100)
31                     .handler(new LoggingHandler(LogLevel.INFO))
32                     .childHandler(new ChannelInitializer<SocketChannel>() {
33                         @Override
34                         public void initChannel(SocketChannel ch)
35                                 throws Exception {
36                             ByteBuf delimiter = Unpooled.copiedBuffer("$_"
37                                     .getBytes());
38                             ch.pipeline().addLast(
39                                     new DelimiterBasedFrameDecoder(1024,
40                                             delimiter));
41                             ch.pipeline().addLast(new StringDecoder());
42                             ch.pipeline().addLast(new EchoServerHandler());
43                         }
44                     });
45 
46             // 綁定端口,同步等待成功
47             ChannelFuture f = b.bind(port).sync();
48 
49             // 等待服務端監聽端口關閉
50             f.channel().closeFuture().sync();
51         } finally {
52             // 優雅退出,釋放線程池資源
53             bossGroup.shutdownGracefully();
54             workerGroup.shutdownGracefully();
55         }
56     }
57 
58     public static void main(String[] args) throws Exception {
59         int port = 8080;
60         if (args != null && args.length > 0) {
61             try {
62                 port = Integer.valueOf(args[0]);
63             } catch (NumberFormatException e) {
64                 // 採用默認值
65             }
66         }
67         new EchoServer().bind(port);
68     }
69 }

(1) 重點在於38行的DelimiterBasedFrameDecoder, 與上面的換行分割符相似,只是能夠自定義特殊符號

(2) DelimiterBasedFrameDecoder有2個參數,一個爲單行最大長度,一個爲自定義符號對象

(3) 若是到達長度仍然沒有查詢到,就拋出TooLongFrameException異常

2. EchoServerHandler

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * @author lilinfeng
 * @version 1.0
 * @date 2014年2月14日
 */
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    int counter = 0;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        String body = (String) msg;
        System.out.println("This is " + ++counter + " times receive client : ["
                + body + "]");
        body += "$_";
        ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());
        ctx.writeAndFlush(echo);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();// 發生異常,關閉鏈路
    }
}

很是簡單直接打印再write便可... 由此也能夠看出netty框架比較乾淨的分離出來了業務邏輯代碼。

3. Client端和ClientHandler基本相似

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
 * @author lilinfeng
 * @version 1.0
 * @date 2014年2月14日
 */
public class EchoClient {

    public void connect(int port, String host) throws Exception {
        // 配置客戶端NIO線程組
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch)
                                throws Exception {
                            ByteBuf delimiter = Unpooled.copiedBuffer("$_"
                                    .getBytes());
                            ch.pipeline().addLast(
                                    new DelimiterBasedFrameDecoder(1024,
                                            delimiter));
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new EchoClientHandler());
                        }
                    });

            // 發起異步鏈接操做
            ChannelFuture f = b.connect(host, port).sync();

            // 當代客戶端鏈路關閉
            f.channel().closeFuture().sync();
        } finally {
            // 優雅退出,釋放NIO線程組
            group.shutdownGracefully();
        }
    }

    /**
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                // 採用默認值
            }
        }
        new EchoClient().connect(port, "127.0.0.1");
    }
}
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * @author lilinfeng
 * @version 1.0
 * @date 2014年2月14日
 */
public class EchoClientHandler extends ChannelInboundHandlerAdapter {

    private int counter;

    static final String ECHO_REQ = "Hi, Lilinfeng. Welcome to Netty.$_";

    /**
     * Creates a client-side handler.
     */
    public EchoClientHandler() {
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        // ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.buffer(ECHO_REQ
        // .getBytes().length);
        // buf.writeBytes(ECHO_REQ.getBytes());
        for (int i = 0; i < 10; i++) {
            ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        System.out.println("This is " + ++counter + " times receive server : ["
                + msg + "]");
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

運行代碼,符合預期..

5、定長解碼器

1. 開發服務端

很是簡單,直接上代碼:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

/**
 * @author lilinfeng
 * @version 1.0
 * @date 2014年2月14日
 */
public class EchoServer {
    public void bind(int port) throws Exception {
        // 配置服務端的NIO線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch)
                                throws Exception {
                            ch.pipeline().addLast(
                                    new FixedLengthFrameDecoder(20));
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new EchoServerHandler());
                        }
                    });

            // 綁定端口,同步等待成功
            ChannelFuture f = b.bind(port).sync();

            // 等待服務端監聽端口關閉
            f.channel().closeFuture().sync();
        } finally {
            // 優雅退出,釋放線程池資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                // 採用默認值
            }
        }
        new EchoServer().bind(port);
    }
}
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * @author lilinfeng
 * @version 1.0
 * @date 2014年2月14日
 */
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        System.out.println("Receive client : [" + msg + "]");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();// 發生異常,關閉鏈路
    }
}

2. 使用telnet進行訪問

(1) 我使用的是Xshell,直接命令

(2) telnet 127.0.0.1 8080

(3) 再隨便輸入字符,發現每20個字符,服務端顯示一次,符合預期

相關文章
相關標籤/搜索