Netty 4.0 中文文檔

一 處理基於流的傳輸(Scoket Buffer的一點附加說明)html

對於例如TCP/IP這種基於流的傳輸協議實現,接收到的數據會被存儲在socket的接受緩衝區內。不幸的是,基於流的傳輸不是一個包隊列而是一個字節隊列。在一個開放的系統中,這意味着即便咱們發送了兩條消息分別包含在兩個數據包裏,接收方不會看成兩條消息來對待,而是將其放在同一個字節獨列中。所以,傳輸不能保證收到的消息與發送的消息一致。java

對於時間客戶端的例子,一個32位的int數據量很是小,通常不會被分片(鏈路層限制一個package大小通常爲1500字節),可是問題是它確實有可能被分紅多片,分片的機率隨着網絡的繁忙而增長。最簡單的解決辦法就是增長一個內部的累加緩衝,等累計滿4個字節時再向上提交數據。promise

package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    private ByteBuf buf;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        buf = ctx.alloc().buffer(4); // (1)
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        buf.release(); // (1)
        buf = null;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m); // (2)
        m.release();

        if (buf.readableBytes() >= 4) { // (3)
            long currentTimeMillis = (buf.readInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

(1)ChannelHandler有兩個存活期方法:handlerAdded()和handlerRemoved(),這兩個方法容許咱們本身構造一個初始化任務或結束任務。緩存

(2)首先,全部的接受的數據先放到累計緩存裏。服務器

(3)而後,handler必須檢查是否有了足夠的data,如在本例中須足夠4個字節,而後執行實際的業務邏輯。若數據不足,當更多的數據到達時,netty會再次執行channelReade()方法直到累計到4個字節。網絡

 

第二個解決辦法數據結構

當字段便多時,第一種解決方案會變得很是複雜且不可維護,能夠經過向ChannelPipeline中增長多個ChannelHandler的方法,將一個大的ChannelHandler分解成多個模塊來下降應用的複雜性。例如,socket

package io.netty.example.time;

public class TimeDecoder extends ByteToMessageDecoder { // (1)
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
        if (in.readableBytes() < 4) {
            return; // (3)
        }

        out.add(in.readBytes(4)); // (4)
    }
}

(1)ByteToMessageDecoder是一ChannelInboundHandler的實現類,能夠很是容易的處理分片問題。tcp

(2)當新的數據到達時,ByteToMessageDecoder講數據存儲在一個內在的累積buffer中,調用decode()方法進行處理ide

(3)decode()根據接收到的字節大小進行斷定,若滿4個字節則增長一個對象到list。

(4)若是decode()增長了個一個out的對象,意味着decoder編碼成功。ByteToMessageDecoder會丟棄累積buffer中已經讀過的部分。若out.add(null),decoder即中止。

因爲ChannlePipeline中增長了一個handler,所以咱們必須修改ChannelInitializer爲:

b.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
    }
});

若是你是個冒險者,你可能會嘗試ReplayingDecoder。該handler進一步簡化了decoder。

public class TimeDecoder extends ReplayingDecoder<VoidEnum> {
    @Override
    protected void decode(
            ChannelHandlerContext ctx, ByteBuf in, List<Object> out, VoidEnum state) {
        out.add(in.readBytes(4));
    }
}

另外關於decode的例子能夠參考一下兩個包

利用POJO代替ByteBuf

  目前咱們看到的例子均是使用ByteBuf做爲協議消息的數據結構。在這一節,咱們將使用POJO來代替ByteBuf來改善Time協議的客戶端和服務器。

  使用POJO的有點是十分明顯的,經過分離出解析ByteBuf中數據的代碼,handler會變得更加可維護和重用。在Time的客戶端和服務器的例子中,咱們只讀32字節的integer且這並非一個主要的直接應用ByteBuf的案例。然而,你會發現當你實現一個真正的協議時,作這樣的分離是十分必要的。

首先,咱們先定義一個新類型,UnixTime

package io.netty.example.time;

import java.util.Date;

public class UnixTime {

    private final int value;

    public UnixTime() {
        this((int) (System.currentTimeMillis() / 1000L + 2208988800L));
    }

    public UnixTime(int value) {
        this.value = value;
    }

    public int value() {
        return value;
    }

    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}

咱們從新編寫TimeDecoder來禪師一個UnixTIme。

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    if (in.readableBytes() < 4) {
        return;
    }

    out.add(new UnixTime(in.readInt()));
}

而後咱們更新decoder,TimeClientHandler就再也不使用ByteBuf了

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    UnixTime m = (UnixTime) msg;
    System.out.println(m);
    ctx.close();
}

是否是很是的簡潔優雅。Server端一樣如此。首先更新ServerHandler

@Override
public void channelActive(ChannelHandlerContext ctx) {
    ChannelFuture f = ctx.writeAndFlush(new UnixTime());
    f.addListener(ChannelFutureListener.CLOSE);
}

如今須要編寫解碼部分,encoder是ChannelOutbountHandler的實現類,將UnixTIme轉化成下層的ByteBuf,編寫encoder要比編寫decoder簡單的多,由於此時沒必要考慮tcp包分片的問題。

 

package io.netty.example.time;

public class TimeEncoder extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        UnixTime m = (UnixTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt(m.value());
        ctx.write(encoded, promise); // (1)
    }
}

(1)這一行中有些十分重要的內容

  首先,咱們將原始的ChannelPromis找原來的樣子傳輸,以保證Netty在寫入鏈路時可以正確的標記成功或失敗。

  其次,並不調用ctx.flush().handler有一個默認的flush方法,若想每次寫都flush則須:ctx.write(encoded,false,promise);或ctx.writeAndFlush(encode,promise);

相關文章
相關標籤/搜索