netty用戶指南

Netty用戶指南

1、前言

1.問題

當今世界咱們須要使用通用的軟件或庫與其餘組件進行通訊,例如使用HTTP客戶端從服務器中獲取信息,或經過網絡服務調用一個遠程的方法。然而通用的協議及其實現一般不具有較好的伸縮性。因此問題看起來是咱們怎麼不使用通用的HTTP服務器去傳輸大文件、e-mail、實事數據、多媒體數據等。咱們須要的是針對特定問題而進行優化的協議實現。例如咱們可能須要從新實現一個HTTP服務器來與AJAX的客戶端進行通訊。另一種狀況是須要處理歷史遺留的協議保證與舊的系統兼容。這些例子的關鍵在於怎樣快速的實現協議而不損失目標系統的穩定性和性能。java

2.解決方案

Netty是一個異步事件驅動的網絡應用框架,能夠用來快速開發可維護的、高性能、可擴展的協議服務器和客戶端。編程

換句話說,Netty是一個基於NIO的客戶端和服務器框架,能夠簡單快速的開發網絡應用程序,如協議的客戶端和服務器。它極大的簡化了TCP、UDP服務器之類的網絡編程。bootstrap

2、開始

1.編寫DiscardServer

最簡單的協議並非「hello world」,而是丟棄。丟棄協議會丟棄任何接受到的數據不作任何的響應。服務器

要實現丟棄協議,須要作的就是丟棄任何接收到的數據。首先從handler的實現開始,handler會處理由Netty產生的I/O事件。網絡

package io.netty.example.discard;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
 * Handles a server-side channel.
 */
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
        // Discard the received data silently.
        ((ByteBuf) msg).release(); // (3)
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}
  1. DiscardServerHandler繼承了ChannelInboundHandlerAdapter,而他又實現了ChannelInboundHandlerChannelInboundHandler提供了不一樣的事件處理方法,你能夠根據須要去覆寫相應的方法。ChannelInboundHandlerAdapter提供了一些默認的實現,因此在這個例子中只須要去繼承它就能夠了。
  2. 覆寫了channelRead方法,Netty從客戶端收到數據時就會調用該方法。消息的類型是ByteBuf
  3. ByteBuf是一個引用計數對象,須要進行手動的釋放。須要注意的是,handler須要釋聽任何傳遞給他的引用計數對象。一般狀況下channelRead()方法一般的實現方式以下:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    try {
        // Do something with msg
    } finally {
        ReferenceCountUtil.release(msg);
    }
}
  1. 因爲IO錯誤Netty拋出異常或handle處理事件拋出異常,都會使exceptionCaught()方法被調用。在大多數狀況下,都須要對異常記日誌,而且關閉相關連的channel

到目前爲止實現了DISCARD服務的通常,接下來須要實現main()方法來啓動服務。數據結構

package io.netty.example.discard;
    
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
    
/**
 * Discards any incoming data.
 */
public class DiscardServer {
    
    private int port;
    
    public DiscardServer(int port) {
        this.port = port;
    }
    
    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new DiscardServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
    
            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)
    
            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
    
    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new DiscardServer(port).run();
    }
}
  1. NioEventLoopGroup 是一個多線程的事件循環,用來處理I/O操做。Netty爲不一樣的通訊方式提供了多種EventLoopGroup實現。在本例中,咱們只須要實現服務器端的應用,因此須要兩個NioEventLoopGroup 。第一個一般稱爲boss,用來接收客戶端的連接請求。第二個稱爲worker,用來處理boss已接收鏈接的I/O請求和把接收的鏈接註冊到worker
  2. ServerBootstrap是用來建立服務器的輔助類。
  3. 使用NioServerSocketChannel類來實例化channel,用來接收鏈接請求。
  4. 在這裏設置的handler會被每個新channel調用,ChannelInitializer是一個特殊的handler用來配置一個新的channel。在本例中,咱們將DiscardServerHandler添加到新channel 的管道中。隨着應用程序的複雜度增長,可能會向管道中加入更多的handler。
  5. 能夠經過option()方法給channel設置一些參數。
  6. option()方法是用來設置NioServerSocketChannel參數的,而childOption()是給接收的鏈接設置參數的。
  7. 剩下的就是綁定端口而後啓動服務了。

2. 測試DiscardServer是否成功

最簡單的方法是使用telnet命令。例如輸入telnet localhost 8080。DiscarServer丟棄了任何接受的數據,咱們能夠把DiscardServer的接收的數據打印出來。多線程

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ByteBuf in = (ByteBuf) msg;
    try {
        while (in.isReadable()) { // (1)
            System.out.print((char) in.readByte());
            System.out.flush();
        }
    } finally {
        ReferenceCountUtil.release(msg); // (2)
    }
}
  1. 循環能夠等價於System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII))
  2. 等價於in.release()

3.寫一個Echo Server

一個服務器一般須要對請求做出響應,而一個Echo服務僅僅須要作的是把請求的內容返回給客戶端。app

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ctx.write(msg); // (1)
    ctx.flush(); // (2)
}
  1. ChannelHandlerContext對象提供了各類出發IO時間的操做。經過調用write(Object)方法把數據發給客戶端。在這裏沒有手動的釋放msg,這是由於當把msg寫入時Netty會自動的釋放它。
  2. ctx.write(Object)並不會把數據寫到外部,而是在內部的緩衝區中,經過調用ctx.flush()把數據刷出到外部。能夠簡潔的調用ctx.wirteAndFlush(msg)達到一樣的效果。

4. 寫一個Timer Server

TIME協議與前面的例子不一樣之處在於,它發送一個32位的整數,不接收任何請求,而且只要消息發送了就馬上關閉鏈接。框架

由於咱們不須要接收任何數據,並且在鏈接創建時就發送數據,因此不能使用channelRead()方法。須要覆寫channelActive()方法異步

package io.netty.example.time;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
        
        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 當一個鏈接創建時,activeChannel()方法會被調用,而後寫一個32位的整數。

  2. 爲了發送一個新的信息,須要分配一個緩衝區。經過調用ctx.alloc()獲取ByteBufAllocator來分配緩衝區。

  3. 在Netty中的Buffer不須要像Java NIO同樣調用flip(),這是由於Netty中的Buffer具備兩個指針,分別用於讀寫操做。當進行寫操做時寫指針在移動而讀指針不移動,讀寫指針分別表明數據的開始和結束。

    另外須要指出的是,ctx.write()返回一個ChannelFuture對象,該對象表明着一個還未發生的IO操做。這意味着,任何一個請求操做可能都未發生,這是由於在Netty中,全部操做都是異步的。例以下面的代碼可能在發送信息前關閉鏈接:

    Channel ch = ...;
    ch.writeAndFlush(message);
    ch.close();

    因此要在ChannelFuture完成前調用close(),當操做完成時,ChannelFuture會通知他的監聽器。close()可能也不會當即關閉鏈接。

  4. 本例中添加一個匿名內部類做爲監聽器,來關閉鏈接。也可使用預約義的監聽器:

    f.addListener(ChannelFutureListener.CLOSE);

5.Time Client

不一樣於DISCARD和ECHO,TIME協議須要一個客戶端將32位的整數轉爲一個日期。Netty中的客戶端和服務器最大的不一樣在於使用了不一樣的BootStrapChannel現實。

package io.netty.example.time;

public class TimeClient {
    public static void main(String[] args) throws Exception {
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)
            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}
  1. BootStapServerBootStrap很類似,但它是用於客戶端的。
  2. 只需指定一個EventLoopGroup,在客戶端中不須要boss。
  3. 使用NioSocketChannel而不是NioServerSocketChannel
  4. 不須要childOption()
  5. 使用connect()方法而不是bind()

TimeClientHandler中,將整數翻譯成日期格式的類型。

package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg; // (1)
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

6.處理基於流的傳輸問題。

TCP/IP協議接收數據並儲存到Socket緩衝區中,可是緩衝區不是數據包的隊列,而是字節的隊列,這意味着你發送了兩條消息,但操做系統會並不認爲是兩條消息而是一組字節。因此在讀數據時並不能肯定讀到了對方發過來的數據。

在TIME協議中,在調用m.readUnsignedInt()時緩衝區中須要有四個字節,若是緩衝區中還未接收到四個字節時就會拋出異常。

解決方法是,再加一個ChannelHandleChannelPipeline。該handler專門處理編碼問題。

package io.netty.example.time;

public class TimeDecoder extends ByteToMessageDecoder { // (1)
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
        if (in.readableBytes() < 4) {
            return; // (3)
        }
        out.add(in.readBytes(4)); // (4)
    }
}
  1. ByteToMessageDecoderChannelInboundHandler的一個實現,專門用於編碼問題。
  2. 當新的數據到達時,Netty會調用decode方法,而且其內部維護着一個累加Buffer。
  3. 當累加Buffer中沒有足夠的數據時,能夠不在out中添加任何數據。當新數據到達後Netty又會調用decode方法。
  4. 若是decode()添加一個對象到out中,意味着編碼信息成功了。Netty會丟棄Buffer中已讀取的部分數據。

TimeDecoder添加到ChannelPipeline中:

b.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
    }
});

另一種更簡單的方式是使用ReplayingDecoder

public class TimeDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(
            ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        out.add(in.readBytes(4));
    }
}

當調用in.readBytes(4)拋出異常時,ReplayingDecoder會捕捉異常並重復執行decode()

7.使用POJO代替ByteBuf

在以前的TIME服務中,都是直接使用ByteBuf做爲協議的數據結構。在Handler中使用POJO對象,能夠把從ByteBuf抽取POJO的代碼分離開。

首先定義UnixTime類:

package io.netty.example.time;

import java.util.Date;

public class UnixTime {

    private final long value;
    
    public UnixTime() {
        this(System.currentTimeMillis() / 1000L + 2208988800L);
    }
    
    public UnixTime(long value) {
        this.value = value;
    }
        
    public long value() {
        return value;
    }
        
    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}

TimeDecoder中解碼產生UnixTime對象

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    if (in.readableBytes() < 4) {
        return;
    }
    out.add(new UnixTime(in.readUnsignedInt()));
}

TimeClientHandler中再也不須要使用ByteBuf了。

在服務器端,首先更改TimeServerHandler

@Override
public void channelActive(ChannelHandlerContext ctx) {
    ChannelFuture f = ctx.writeAndFlush(new UnixTime());
    f.addListener(ChannelFutureListener.CLOSE);
}

還須要建立一個編碼器,將UnixTime轉爲ByteBuf以便網絡傳輸

public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
        out.writeInt((int)msg.value());
    }
}
相關文章
相關標籤/搜索