編解碼器

  編解碼器的做用是將原始字節數據與自定義的消息對象進行互轉。編碼器負責處理「出站」數據。java

  解碼器web

    解碼器負責解碼「入站」數據從一種格式到另外一種格式,解碼器處理入站數據是抽象ChannelInboundHandler的實現。安全

    解碼器有三種類型:解碼字節到消息;解碼消息到消息以及解碼消息到字節。服務器

    ByteToMessageDecoderwebsocket

      decode(ChannelHandlerContext, ByteBuf, List<Object>):這個方法須要本身實現的抽象方法,做用是將ByteBuf數據解碼成其餘形式的數據併發

      decodeLast(ChannelHandlerContext, ByteBuf, List<Object>)app

    ReplayingDecodersocket

      ReplayingDecoder是ByteToMessageDecoder的一種特殊的抽象基類,讀取緩衝區的數據以前須要檢查緩衝區是否有足夠的字節,使用ReplayingDecoder就無需本身檢查,若ByteBuf中有足夠的字節,則正常讀取,不然就是中止解碼。不是全部的操做都被ByteBuf支持,如有一個不支持的就會拋出DecoderException;ByteBuf.readableBytes()大部分不會返回指望值ide

  

    MessageToMessageDecoder大數據

      decode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Excetopn;

    

  編碼器

    編碼器有兩種類型:消息對象編碼成消息對象和消息對象編碼成字節碼

    Netty也提供了兩個抽象類:MessageToByteEncoder和MessageToMessageEncoder。須要重寫encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception;

  

 

  byte-to-byte編解碼器

    Netty提供了ByteArrayEncoder和ByteArrayDecoder兩個類。

public class ByteArrayDecoder extends MessageToMessageDecoder<ByteBuf>{
  protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception{
    byte[] array = new byte[msg.readableBytes()];
    msg.getBytes(0, array);
   
    out.add(array);
  }
}

@Sharable
public class ByteArrayEncoder extends MessageToMessageEncoder<byte[]>{
  protected void encode(ChannelHandlerContext ctx, byte[] msg, List<Object> out) throws Exception{
    out.add(Unpooled.wrappedBuffer(msg));
  }
}

 

  ByteToMessageCodec

    ByteToMessageCodec用來處理byte-to-message和message-to-byte。ByteToMessageCodec是一種組合,等同於ByteToMessageDecoder和MessageToByteEncoder的組合。MessageToByteEncoder有兩個抽象方法:

      encode(ChannelHandlerContext, I, ByteBuf) //編碼

      decode(ChannelHandlerContext, ByteBuf,  List<Object>) //解碼

 

  MessageToMessageCodec

    MessageToMessageCodec用於message-to-message的編碼和解碼。能夠當作是MessageToMessageDecoder和MessageToMessageEncoder的組合體。MessageToMessageCodec有兩個抽象方法:

      encode(ChannelHandlerContext, OUTBOUNG_IN, List<Object>)

      decode(ChannelHandlerContext, INBOUND_IN, List<Object>)

  CombinedChannelDuplexHandler

    自定義編碼器和解碼器

public class CharCodec extends CombinedChannelDuplexHandler<Decoder, Encoder>{
  public CharCodec(){
    super(new Decoder(), new Encoder());
  }
}

 

  使用SSL/TLS建立安全的netty程序

    java提供了SslContext和SslEngine支持SSL/TLS。Netty提供了SslHandler,它擴展了Java的SslEngine。

public class SslChannelInitializer extends ChannelInitializer<Channel>{
  private final SSLContext context;
  private final boolean client;
  private final boolean startTls;

  public SslChannelInitializer(SSLContext context, boolean client, boolean startTls) {
    this.context = context;
    this.client = client;
    this.startTls = startTls;
  }

  protected void initChannel(Channel ch) throws Exception{
    SSLEngine engine = context.createSSLEngine();
    engine.setUserClientMode(client);
    ch.pipeline().addFirst("ssl", new SslHander(engine, startTls));
  }
}

  SslHandler必需要添加到ChannelPipeline的第一個位置。

    SSL/TLS的方法:

      setHandshakeTimeout(long handshakeTimeout, TimeUnit unit):設置握手超時時間,ChannelFuture將獲得通知

      setHandshakeTimeoutMillis(long handshakeTimeoutMillis):設置握手超時時間,ChannelFuture將會獲得通知

      getHandshakeTimeoutMillis():獲取握手超時時間值

      setCloseNotifyTimeout(long closeNotifyTimeout, TimeUnit unit):設置關閉通知超時事件,若超時,ChannelFuture會關閉失敗

      setHandshakeTimeoutMillis(long handshakeTimeoutMillis):設置關閉通知超時時間,若超時,ChannelFuture會關閉失敗

      getCloseNotifyTimeoutMillis():獲取關閉通知超時時間

      handshakeFuture():返回完成握手後的ChannelFuture

      close():發送關閉通知請求關閉和銷燬

    

    一個HTTP請求/響應消息可能不止一個,但最終都會包含LastHttpContent消息。FullHttpRequest和FullHttpResponse是Netty提供的兩個接口,分別用來完成http請求和響應。

    Netty提供了HTTP請求和響應的編碼器和解碼器:

      HttpRequestEncoder:將HttpRequest或HttpContent編碼成ByteBuf

      HttpRequestDecoder:將ByteBuf解碼成HttpRequest和HttpContent

      HttpResponseEncoder:將HttpResponse或HttpContent編碼成ByteBuf

      HttpResponseDecoder:將ByteBuf解碼成HttpResponse和HttpContent

    

    HTTP消息聚合

      處理HTTP時可能接受HTTP消息片斷,Netty須要緩衝直到接受完整個消息。要處理HTTP消息,Netty提供了HttpObjectAggregator。經過HttpObjectAggregator,Netty能夠聚合HTPP消息,使用FullHttpResponse和FullHttpRequest到ChannelPipeline中的下一個ChannelHandler,這樣就消除了斷裂消息,保證了消息的完整性。

    HTTP壓縮

      Netty支持「gzip」和「deflate」。

protected void intiChannel(Channel ch) throws Exception{
  ChannelPipeline pipeline = ch.pipeline();
  if(client){
    pipeline.addLast("codec", new HttpClientCodec());
    pipeline.addLast("decompressor", new HttpContentDecompressor());
  }else{
    pipeline.addLast("codec", new HttpServerCodec());
    pipeline.addLast("decompressor", new HttpContentDecompressor());
  }
  pipeline.addLast("aggegator", new HttpObjectAggregator(512 * 1024));
}

 

  使用HTTPS

    

public class HttpsCodecInitializer extends ChannelInitializer<Channel>{
  private final SSLContext context;
  private final boolean client;

  public HttpsCodecInitializer(SSLContext context, boolean client){
    this.context = context;
    this.client = client;
  }

  protected void initChannel(Channel ch) throws Exception{
    SSLEngine engine = context.createSSLEngine();
    engine.setUseClientMode(client);
    ChannelPipeline pipeline = ch.pipeline();
    pipeline.addFirst("ssl", new SslHandler(engine));
    if(client)
      pipeline.addLast("codec", new HttpClientCodec());
   else
      pipeline.addLast("codec", new HttpServerCodec());
  }
}

 

  WebSocket

    Netty經過ChannelHandler對WebSocket進行支持。Netty支持以下WebSocket:

      BinaryWebSocketFrame:包含二進制數據

      TextWebSocketFrame:包含文本數據

      ContinuationWebSocketFrame:包含二進制數據或文本數據

      CloseWebSocketFrame:表明一個關閉請求,包含關閉狀態碼和短語

      PingWebSocketFrame:WebSocketFrame要求PongWebSocketFrame發送數據

      PongWebSocketFrame:WebSocketFrame要求PingWebSocketFrame響應

public class WebSocketServerInitializer extends ChannelInitializer<Channel>{
  protected void initChannel(Channel ch) throws Exception{
    ch.pipeline().addLast(new HttpServerCodec(), new HttpObjectAggregator(65536), new WebSocketServerProtocolHandler("/websocket"), new TextFrameHandler(), new BinaryFrameHandler(), new ContinuationFrameHandler());
  }

  public static final class TextFrameHanlder extends SimpleChannelInboundHandler<TextWebSocketFrame>{
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception{
    }
  }
 
  public static final class BinaryFrameHandler extends SimpleChannelInboundHandler<BinaryWebSocketFrame>{
    protected void channelRead0(ChannelHandlerContext ctx, BinaryWebSocketFrame msg) throws Exception{
    }
  }

  public static final class ContinuationFrameHandler extends SimpleChannelInboundHandler<ContinuationWebSocketFrame>{
   protected void channelRead0(ChannelHandlerContext ctx, ContinuationWebSocketFrame msg) throws Exception{
   }
  }
}

 

  SPDY

    SPDY是Google開發的基於TCP的應用層協議。SPDY的定位:將頁面加載時間減小50%;最大限度減小部署的複雜性;避免網站開發者改動內容

    SPDY技術實現:單個TCP鏈接支持併發的HTTP請求;壓縮包頭和去掉沒必要要的頭部來減小當前HTTP使用的帶寬;定義一個易實現,在服務器端高效的協議。經過減小邊緣狀況,定義易解析的消息格式來減小HTTP的複雜性;強制使用SSL;容許服務器在須要時發起對客戶端的鏈接並推送數據。

    

  Netty用三種不一樣的ChannelHanlder處理閒置和超時鏈接:

    IdeStateHandler:當一個通道沒有進行讀寫或運行了一段時間後發出IdleStateEvent

    ReadTimeoutHandler:在指定時間內沒有接收到任何數據將拋出ReadTimeoutException

    WriteTimeoutHandler:在指定時間內寫入數據將拋出WriteTimeoutException

public class IdleStateHandlerInitializer extends ChannelInitialzier<Channel>{
  protected void initChannel(Channel ch) throws Exception{
    ChannelPipeline pipeline = ch.pipeline();
    pipeline.addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS));

    pipeline.addLast(new HeartbeatHandler());
  }

  public static final class HeartbeatHandler extends ChannelInboundHandlerAdapter{
    private static final ByteBuf HEARTBEAT_SEQUEUE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.UTF_8));

    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{
      if(evt instanceof IdleStateEvent)
        ctx.writeAndFlush(HEARTBEAT_SEQUEUE.duplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
      else
        super.userEventTriggered(ctx, evt);
    }
  }
}

 

  解碼分隔符和基於長度的協議

    Netty提供兩個類用於提取序列分割:

      DelimiterBasedFrameDecoder:接收ByteBuf由一個或多個分隔符拆分

      LineBasedFrameDecoder:接收ByteBuf以分隔線結束,如\n, \r\n

 

public class LineBaseHandlerInitializer extends ChannelInitializer<Channel>{
  protected void initChannel(Channel ch) throw Exception{
    ch.pipeline().addLast(new LineBasedFrameDecoder(65 * 1024), new FrameHandler());
  }

  public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf>{
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception{
    }
  }
}

 

  Netty提供了兩個以長度爲單位的解碼器:FixedLengthFrameDecoder和LengthFieldBasedFrameDecoder。

   

public class LengthBasedInitializer extends ChannelInitializer<Channel>{
  protected void intiChannel(Channel ch) throws Exception{
    ch.pipeline().addLast(new LengthFileBasedFrameDecoder(65 * 1024, 0, 8)).addLast(new FrameHandler());
  }

  public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf>{
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception{
    }
  }
}

 

  寫大數據

    Netty使用零拷貝寫文件內容時經過DefaultFileRegion,ChannelHandlerContext和ChannelPipeline。

    

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
  File file = new File("test.txt");
  FileInputStream fis = new FileInputStream(file);
  FileRegion region = new DefaultFileRegion(fis.getChannel(), 0, file.length());
  Channel channel = ctx.channel();
  channel.writeAndFlush(region).addListener(new ChannelFutureListener(){
    protected void operationComplelte(ChannelFuture future) throws Exception{
      if(!future.isSuccess())
        Throwable cause = future.cause();
    }
  });
}

  Netty提供了ChunkedWriteHandler,容許經過處理ChunkedInput來寫大的數據塊。ChunkedFile,ChunkedNioFile,ChunkedStream和ChunkedNioStream實現了ChunkedInput。

public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel>{
  private final File file;

  public ChunkedWriteHandlerInitializer(File file){
    this.file = file;
  }

  protected void initChannel(Channel ch) throws Exception{
    ch.pipeline().addLast(new ChunkedWriteHandler()).addLast(new WriteStreamHandler());
  }

  public final class WriteStreamHandler extends ChannelInboundHandlerAdapter{
    public void channelActive(ChannelHandlerContext ctx) throws Exception{
      super.channelActive(ctx);
      ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file)));
    }
  }
}

 

  序列化 

    Java提供了ObjectInputStream和ObjectOutputStream等序列化接口。

    io.netty.handler.codec.serialization提供了以下接口:CompatibleObjectEncoder,CompactObjectInputStream,CompactObjectOutputStream,ObjectEncoder,ObjectDecoder,ObjectEncoderOutputStream和ObjectDecoderInputStream。

    JBoss Marshalling序列化的速度是JDK的3倍且序列化的結構更緊湊。

    io.netty.handler.codec.marshalling提供以下接口:CompatibleMarshallingEncoder,CompatibleMarshallingDecoder,MarshallingEncoder和MarshallingDecoder。

pubilc class MarshallingInitializer extends ChannelInitializer<Channel>{
  private final MarshallerProvider marshallerProvider;
  private final UnmarshallerProvider unmarshallerProvider;

  public MarshallingInitializer(MarshallerProvider marshallerProvider, UnmarshallerProvider unmarshallerProvider){
    this.marshallerProvider = marshallerProvider;
    this.unmarshallerProvider = unmarshallerProvider;
  }

  protected void initChannel(Channel ch) throws Exception{
    ch.pipeline().addLast(new MarshallingDecoder(unmarshallerProvider)).addLast(new MarshallingEncoder(marshallerProvider)).addLast(new ObjectHandler());
  }

  public final class ObjectHandler extends SimpleChannelInboundHander<Serializable>{
    protected void channelRead0(ChannelHandlerContext ctx, Serializable msg) throws Exception{
    }
  }
}

 

    ProtoBuf是Google開源的一種編碼和解碼技術,做用是使序列化數據更高效。

    io.netty.handler.codec.protobuf提供了:ProtoBufDecoder,ProtobufEncoder,ProtobufVarint32FrameDecoder和ProtibufVarint32LengthFieldPrepender

public class ProtoBufInitializer extends ChannelInitializer<Channel>{
  private final MessageLite lite;

  public ProtoBufInitializer(MessageLite lite){
    this.lite = lite;
  }

  protected void initChannel(Channel ch) throws Exception{
    ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()).addLast(new ProtobufEncoder()).addLast(new ProtobufDecoder(lite)).addLast(new ObjectHandler());
  }

  public final class ObjectHandler extends SimpleChannelInboundHandler<Serializable>{
    protected void channelRead0(ChannelHandlerContext ctx, Serializable msg) throws Exception{
    }
  }
}
相關文章
相關標籤/搜索