netty 入門

文檔 使用手冊html

近期公司經過TCP鏈接的的方式接了一個硬件設備,用了最基礎的ServerSocket類,參考的oracle的文檔 。 實現的比較簡單,放在github 上,不過這裏應該用Netty纔是正解。因此,過一下Netty的入門文檔。java

本文demogit

序言

問題

咱們通常會用Http客戶端庫來調用web服務,獲取數據。若是一個東西是出於通常性目的設計出來的,那麼他在某些方面可能就不是最合適的。好比獲取大文件,收發郵件,展現實時的金融數據,遊戲數據傳輸等。爲了實現這些需求,須要一個爲其高度優化的特定協議。還有一個沒法避免的問題是你可能須要調用老系統的數據,可是他的協議又是特定。重點來了,如何在不犧牲可靠性性能的前提下快速實現這麼一個系統。github

解決方案

用Netty。用Netty。用Netty。重要的事情說3遍。web

Netty是一個異步 事件驅動 網絡框架 ,能夠用來快速開發易維護,高性能,可擴展的服務端/客戶端。換句話說他簡化了TCP和UDP 等服務的網絡開發。bootstrap

容易開發或者快速開發並不意味着他會犧牲可維護性或者是面臨性能問題。Netty吸收了大量用於實現FTP,SMTP,HTTP協議的經驗,而且仔細當心謹慎的設計。因此,他在易於開發,追求性能,確保穩定性和靈活性上並無對任何一點有所妥協。api

有人可能會說別的框架他們也這麼說本身,那Netty到底或者爲何和他們不同。答案是他的設計理念。Netty提供的API用起來就很是舒服。如今可能不是那麼直觀,可是當你使用的時候就會體會到。數組

開始使用

這節會圍繞Netty的核心構建過程,用幾個例子來讓你快速上手。學完這節你會能夠在Netty框架的基礎上學會寫client和server。promise

若是你想學的深刻一點,瞭解一下他的底層實現,第二節,架構概覽是個不錯的起點。瀏覽器

開始以前

這節須要兩個東西,新版的Netty和jdk1.6+。Netty下載地址

<dependencies>
    <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.49.Final</version>
    </dependency>
</dependencies>

隨着你不斷往下看,你能會對這節引入的類有疑惑,你能夠隨時經過API文檔來了解更多。類名都是帶連接的,能夠直接點過去。

編寫一個Discard Server

前半部分

世界上最簡單的協議並非輸出Hello world,而是Discard,就是過來什麼都直接丟棄,而且不給任何回覆。下面讓咱們直接從Netty提供的handler實現來處理IO事件。

package io.netty.example.discard;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class DiscardServerHandler extends ChannelInboundHandlerAdapter {//1
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//2
//        super.channelRead(ctx, msg);
        ((ByteBuf) msg).release();//3
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {//4
//        super.exceptionCaught(ctx, cause);
        cause.printStackTrace();
        ctx.close();
    }
}

有如下幾點:

  1. 咱們寫一個DiscardServerHandler類繼承自ChannelInboundHandlerAdapter,這個ChannelInboundHandlerAdapter繼承自抽象類ChannelHandlerAdapter而且實現了接口ChannelInboundHandlerChannelInboundHandler提供了各類各樣的可重寫的事件handler方法。這裏只要使用ChannelInboundHandlerAdapterChannelInboundHandler的默認實現就好,不須要本身去實現全部的ChannelInboundHandler方法。

  2. channelRead方法咱們重寫掉了,這個方法會在收到客戶端消息的時候調用。例子中,消息msg的類型爲ByteBufByteBuf是對byte[]的一種抽象,可讓咱們訪問數組內容。

  3. 咱們這裏須要實現的是Discard協議,就是丟棄協議,因此須要忽略收到的全部消息。ByteBuf是一種reference-counted的對象(能夠簡單理解指針之類的東西),必須經過顯式調用其release方法來釋放。一般,咱們的channelRead方法是下面這樣的

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try{
            //對msg作一些處理
        }finally {
            ReferenceCountUtil.release(msg);
        }
    }
  4. Netty在處理IO的遇到exception就會進入exceptionCaught方法。一般,須要作一下日誌記錄,而後把相關的channel(通道)關閉。這裏作法也不是固定的,你能夠先發一個帶code的Response而後再關閉。

後半部分

到這一步,咱們已經實現了Discard服務的前半部分,剩下的就是寫一個main方法而後來啓動這個DiscardServerHandler服務。

package io.netty.example.discard;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class DiscardServer {
    private int port;

    public DiscardServer(int port) {
        this.port = port;
    }

    public void run() throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup();//1
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();//2
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)//3
                    .childHandler(new ChannelInitializer<SocketChannel>() {//4
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new DiscardServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)//5 最大鏈接數128
                    .childOption(ChannelOption.SO_KEEPALIVE, true);//6

            //綁定端口啓動服務
            ChannelFuture f = b.bind(port).sync();//7
            //server關閉的時候調用。由於這裏是Discard 服務,因此永遠不會調用。
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        int port = 8080;
        if (args.length > 0)
            port = Integer.parseInt(args[0]);

        new DiscardServer(port).run();
    }
}

有如下幾點:

  1. NioEventLoopGroup是一個多線程的event loop(事件環?)。Netty針對不一樣的狀況提供了多(18)種EventLoopGroup的實現,由於這裏是一個服務端應用,因此使用NioEventLoopGroup。new出來兩個對象,一般第一個叫boss,接收進來的鏈接。第二個,一般叫worker,由於當boss接收了鏈接以後會把連接註冊給worker,讓worker來處理後面的通訊。每一個EventLoopGroup使用線程數以及他們如何被映射到ChannelEventLoopGroup的實現決定,而且可能能夠經過構造函數來指定。

    1. 下面是NioEventGroup的部分構造函數。

      pic

    2. 什麼是Netty的Channel。按照文檔的介紹,能夠簡單理解爲socket的一個抽象,或者是IO的操做,包括IO的讀寫,鏈接,綁定等。Channel會給使用者提供如下功能:

      • 當前的狀態(鏈接是否已經打開或者連上)
      • channel的配置參數。(接收的緩衝區大小)
      • socket,io相關的操做(讀寫等)
      • 處理io事件的管道
      • 詳細的之後再說
  2. ServerBootstarp是一個配置server的幫助類,你可使用Channel本身來配置,可是會比較枯燥,因此,大多數狀況下直接使用這個ServerBootstrap就好。

  3. NioServerSocketChannel是一個Channel的實例,用來處理進來的鏈接(上面說的channel的功能)。

  4. ChannelInitializer是一個特殊的Handler,做用是幫助用戶配置Channel。一般的做用是把ChannelHandler放到ChannelPipeline(管道)裏面,請求會進入到Pipeline,處理就按照這個Pipeline配置的Handler來。DiscardServerHandler就是一種Handler。

  5. 用來配置Channel的參數。順道看一下ServerBootstrap的定義,這個ServerBootstrap是用來啓動ServerChannelServerChannel實際上就是一個Channel。咱們這裏實現的是一個TCP/IP server,因此,能夠設置tcpNoDelaykeepAlive等參數。具體設置看文檔。

    public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel>{}
  6. option的爲接收鏈接的配置,也就是給boss用,後面的childOption爲worker配置選項。

  7. 萬事俱備,只欠把綁定端口配置上去而後啓動服務。main方法裏面。

恭喜,搞定。用個tcp 客戶端鏈接試試~~能夠看到鏈接成功,發送了3字節,而後由於是Discard,因此沒有返回。

pic

收到的數據

讓咱們稍微修改一下代碼,以便看看咱們收到的數據。按照以前的例子,須要再channelRead方法裏面作修改。

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    final ByteBuf in = (ByteBuf) msg;
    try {
        System.out.println(in.toString(CharsetUtil.US_ASCII));
    } finally {
        in.release();
    }
}

msg能夠直接轉換成ByteBuf對象,而後用ByteBuf的toString方法,設置ASCII參數裝成string打印出來。

運行起來而後能夠直接在瀏覽器輸入localhost:8080訪問,就能看到傳過來的數據。

pic

寫一個Echo Server

咱們寫一個Echo服務,客戶端輸入什麼,咱們就回復什麼。

public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.write(msg);//1
        ctx.flush();//2
    }
}
  1. 經過ChannelHandlerContext對象,咱們能夠觸發一些IO事件或者執行一些操做。這裏咱們不須要手動release msg,由於當咱們執行了wirte方法,Netty會幫咱們釋放。
  2. ctx.write(Object)會把內容寫到緩衝區,在調用flush後再輸出出去。能夠用writeAndFlush方法代替。

測試一下,發送3個字節,收到3個字節的回覆。

pic

寫一個TIME Server

這個例子用來實現一個Time協議。經過實現這個協議,咱們能夠了解Netty如何構造發送數據。根據RFC868協議,Time協議有這麼幾步

  1. 服務器監聽37端口
  2. 客戶端鏈接
  3. 服務端返回一個4字節的int時間數據
  4. 客戶端接收到這個數據
  5. 客戶端關閉鏈接
  6. 服務端關閉鏈接。

這裏服務端忽略收到的任何客戶端數據,而是當客戶端一創建鏈接就返回數據,因此這裏不使用channelRead方法,而是channelActive方法。

package io.netty.example.time;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(final ChannelHandlerContext ctx) throws Exception {//1
        final ByteBuf timeBuf = ctx.alloc().buffer(4);//2
        timeBuf.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));

        final ChannelFuture channelFuture = ctx.writeAndFlush(timeBuf);//3
        channelFuture.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) throws Exception {
                assert channelFuture == future;
                ctx.close();
            }
        });//4

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 重寫的是channelActive方法,這個方法會在鏈接進來的時候調用。

  2. 由於要返回一個int值,因此須要4個字節,經過ChannelHandlerContext分配,而後writeAndFlush方法寫入併發送。

  3. 把數據發送給非阻塞IO流的時候不須要調用java.nio.ByteBuffer.flip()方法,Netty的ByteBuf沒有提供這個方法,由於他不須要。ByteBuf內部有兩個指針,一個用於讀,一個用於寫。write的時候讀指針移動,寫指針不動,反之同理。在使用ByteBuffer的時候若是沒有flip,數據就會亂。

    Netty裏面全部的IO操做都是異步的,這樣可能會致使write沒有開始(或者沒有完成)以前就鏈接就close掉了。好比下面的代碼:

    Channel ch = ...;
    ch.writeAndFlush(message);
    ch.close();//這也不是立馬關閉,也是一個ChannelFuture對象

    write(writeAndFlush)返回的是一個ChannelFuture對象,來大體看下這個對象的解釋。

    pic

    繼承自Future,表示一個Channel的IO操做的結果,不過他還沒完成,只是表示已經建立。【詳細的之後再講。】

  4. 如何能知道這個IO操做的結果呢?咱們能夠給這個ChannelFuture增長一個ChannelFutureListener的實例(接口),而後實現它的operationComplete方法。這裏面的方法比較簡單,就是close掉這個ChannelHandlerContext,因此,可使用定義好的ChannelFutureListener.CLOSE方法。像下面這樣

    channelFuture.addListener(ChannelFutureListener.CLOSE);
  5. 用rdate 測試一下。測試經過。

    pic

寫一個TIME Client

寫完server以後就要寫client了。client程序和server程序最大的不一樣在於選擇的BootstrapChannel的實現類的差別。

package io.netty.example.time;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class TimeClient {
    public static void main(String[] args) throws InterruptedException {
        int port = 37;
        String host = "192.168.1.181";
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            final Bootstrap bootstrap = new Bootstrap();//1
            bootstrap.group(workerGroup);//2
            bootstrap.channel(NioSocketChannel.class);//3
            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);//4
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());//6
                }
            });

            final ChannelFuture connectFuture = bootstrap.connect(host, port).sync();//5

            connectFuture.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}
  1. Bootstrap和前面的ServerBootstrap相似,不過這個不是給non-server非服務器用,而是給客戶端或者connectionless非鏈接的用。

  2. 客戶端就不須要boss EventLoopGroup了。其實前面的Server中group能用同一個。

    serverBootstrap.group(workGroup, workGroup);//同一個group
  3. channel也須要換成NioSocketChannel,而不是NioServerSocketChannel

  4. 這裏直接用option方法,而不是childOption和option,由於對應client,沒有childOption的概念。

  5. client須要去connect,而不是bind來監聽。

看一下TimeClientHander.java

package io.netty.example.time;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        final ByteBuf m = (ByteBuf) msg;//1
        try {
            final long currentTimeMills = (m.readUnsignedInt() - 2208988800L) * 1000L;//2
            System.out.println(new Date(currentTimeMills));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 對象轉換成ByteBuf
  2. 直接經過readUnsignedInt()方法讀取數值。

代碼看起來比較簡單,可是必定可能性(小几率)會有IndexOutOfBoundsException異常,咱們下節討論。

處理基於stream的傳輸

Socket Buffer的一個問題

TCP/IP是一個典型的stream-based協議,接收數據而後放到socket buffer裏面。可是這個buffer隊列存的是byte,而不是packet數據包。因此,就算髮了兩個packet,在系統看來,他就是一堆byte。因此,沒有辦法保證你讀取到的東西和發過來的必定同樣。

舉個例子,假設收到了3個數據包,ABC,DEF,GHI

pic

有可能收到的是下面這樣的

pic

因此,server和client須要一種規則來劃分數據包,而後對方就知道每一個包究竟是啥樣的。

第一種解決方案

其實道理上來講由於int數據包也就4個字節,因此不太會被分片,不太容易出現IndexOutOfBoundsException異常。可是,隨着數據包變大,分片的可能性就會增長,到時候異常出現的機率就會增大。

由於咱們知道收到的數據是4個字節,因此,咱們能夠分配一個4本身的空間,等到一滿,咱們就知道已經收到該有的數據包了,就直接處理就好。來改一下咱們的TimeClientHandler

package io.netty.example.time;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;

public class TimeClientHandler2 extends ChannelInboundHandlerAdapter {
    private ByteBuf buf;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        buf = ctx.alloc().buffer(4);//1
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        buf.release();//1
        buf = null;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        final ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m);//2
        m.release();

        if (buf.readableBytes() >= 4) {//3
            final long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 重寫了handlerAddedhandlerRemoved方法,在這兩個方法裏面初始化或者銷燬buf對象。只要這兩個方法不會阻塞太長時間,是沒有關係的。
  2. 把收到的內容寫到buf對象裏面。
  3. 每次有數據過來的時候會進入channelRead方法(不一樣的鏈接不會串),作一個業務邏輯判斷。

第二種解決方案

雖然上面的問題是解決了,可是由於咱們曉得發過來的數據是4個字節的(就一個字段),因此比較好處理。可是,若是這個對象是一個比較複雜的業務對象,那麼要維護這個類就會比較麻煩。

咱們能夠對這個TimeClientHandler2的功能拆解成2部分。

  1. TimeDecoder專門處理數據包分片的問題。
  2. TimeClientHandler2仍是保持簡單。
package io.netty.example.time;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

public class TimeDecoder  extends ByteToMessageDecoder {//1
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {//2
        if (in.readableBytes()<4)
            return;//3
        out.add(in.readBytes(4));//4
    }
}
  1. ByteToMessageDecoder繼承自ChannelInboundHandlerAdapter,實現了ChannelInboundHandler接口,因此這個Decoder對象也是一個ChannelHandler對象。他專門用來處理分片問題。
  2. ByteToMessageDecoder會在有新的數據進來的時候調用decode方法,內部維護一個buffer。
  3. ByteToMessageDecoder能夠根據本身的業務邏輯來執行。
  4. 假設進來的字節數據大於4,那麼他就會調用這個decode屢次,每次處理4個字節。

使用POJO而不是ByteBuf

前面的例子讀寫數據的核心都是ByteBuf類,在ChannelHandler裏面直接把object msg 轉成ByteBuf,而後操做。若是咱們能經過POJO來操做,那麼,代碼的可維護性明顯會高一些。讓咱們來改造一下咱們的代碼。

第一步,定義一個UnixTime類,來表示咱們要處理的對象。

package io.netty.example.time2;

import java.util.Date;

public class UnixTime {
    private final long value;

    public UnixTime() {
        this(System.currentTimeMillis() / 1000L + 2208988800L);
    }

    public UnixTime(long value) {
        this.value = value;
    }

    public long getValue() {
        return value;
    }

    @Override
    public String toString() {
        return "轉換出來的時間是:"+ new Date((getValue() - 2208988800L) * 1000L).toString();
    }
}

第二步,咱們改一下咱們的TimeDecoder來產生一個UnixTime對象。

package io.netty.example.time2.client;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

public class TimeDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.readableBytes()<4)
            return;
        out.add(new UnixTime(in.readUnsignedInt()));
    }
}

第三步,在ChannelHandler裏面咱們直接按照UnixTime對象操做。

package io.netty.example.time2.client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        UnixTime time = (UnixTime) msg;
        System.out.println(time);
        ctx.close();
    }
}

第四步,同理,server端也能夠相似的修改。

package io.netty.example.time2.server;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ChannelFuture channelFuture = ctx.writeAndFlush(new UnixTime());
        channelFuture.addListener(ChannelFutureListener.CLOSE);
    }
}

相較於之前的分配空間的操做,明顯簡單了許多。

第五步,如今,還缺一個東西,一個encoder,用來把UnixTime轉成ByteBuf,這個是逃不開的,哈哈。

package io.netty.example.time2.server;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

public class TimeEncoder extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        UnixTime m = (UnixTime) msg;
        ByteBuf buffer = ctx.alloc().buffer(4);
        buffer.writeInt((int)m.getValue());
        ctx.write(buffer,promise);//1
    }
}
  1. 這一行裏面有一些比較重要的事情。
    1. 這裏有個ChannelPromise對象,來標記write成功與否。
    2. 咱們沒有手動調用flush方法,由於ChannelOutboundHandlerAdapter有個flush會自動調用。

其實這個用MessageToByteEncoder 泛型還能更簡單一點

package io.netty.example.time2.server;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class TimeEncoder2 extends MessageToByteEncoder<UnixTime> {
    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) throws Exception {
        out.writeInt((int) msg.getValue());
    }
}

MessageToByteEncoder是一個ChannelOutboundHandlerAdapter的實現抽象類,專門負責把POJO對象轉成ByteBuf。

最後一步,把Encoder像以前Decoder同樣加到ChannelPipeline裏面。你懂的。

client.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());//客戶端解碼 加進去
    }
});
server.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new TimeEncoder2(), new TimeServerHandler());//服務端直接操做UnixTime,須要編碼,加進去
            }
        })
        .option(ChannelOption.SO_BACKLOG, 128)
        .childOption(ChannelOption.SO_KEEPALIVE, true);

代碼結構看起來是這樣的

pic

關閉程序

關閉比較簡單,調用shutdownGracefully()便可,而後會返回一個Future對象。

小結

強烈建議看看官方的例子

相關文章
相關標籤/搜索