一 處理基於流的傳輸(Scoket Buffer的一點附加說明)html
對於例如TCP/IP這種基於流的傳輸協議實現,接收到的數據會被存儲在socket的接受緩衝區內。不幸的是,基於流的傳輸不是一個包隊列而是一個字節隊列。在一個開放的系統中,這意味着即便咱們發送了兩條消息分別包含在兩個數據包裏,接收方不會看成兩條消息來對待,而是將其放在同一個字節獨列中。所以,傳輸不能保證收到的消息與發送的消息一致。java
對於時間客戶端的例子,一個32位的int數據量很是小,通常不會被分片(鏈路層限制一個package大小通常爲1500字節),可是問題是它確實有可能被分紅多片,分片的機率隨着網絡的繁忙而增長。最簡單的解決辦法就是增長一個內部的累加緩衝,等累計滿4個字節時再向上提交數據。promise
package io.netty.example.time; import java.util.Date; public class TimeClientHandler extends ChannelInboundHandlerAdapter { private ByteBuf buf; @Override public void handlerAdded(ChannelHandlerContext ctx) { buf = ctx.alloc().buffer(4); // (1) } @Override public void handlerRemoved(ChannelHandlerContext ctx) { buf.release(); // (1) buf = null; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf m = (ByteBuf) msg; buf.writeBytes(m); // (2) m.release(); if (buf.readableBytes() >= 4) { // (3) long currentTimeMillis = (buf.readInt() - 2208988800L) * 1000L; System.out.println(new Date(currentTimeMillis)); ctx.close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
(1)ChannelHandler有兩個存活期方法:handlerAdded()和handlerRemoved(),這兩個方法容許咱們本身構造一個初始化任務或結束任務。緩存
(2)首先,全部的接受的數據先放到累計緩存裏。服務器
(3)而後,handler必須檢查是否有了足夠的data,如在本例中須足夠4個字節,而後執行實際的業務邏輯。若數據不足,當更多的數據到達時,netty會再次執行channelReade()方法直到累計到4個字節。網絡
第二個解決辦法數據結構
當字段便多時,第一種解決方案會變得很是複雜且不可維護,能夠經過向ChannelPipeline中增長多個ChannelHandler的方法,將一個大的ChannelHandler分解成多個模塊來下降應用的複雜性。例如,socket
package io.netty.example.time; public class TimeDecoder extends ByteToMessageDecoder { // (1) @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2) if (in.readableBytes() < 4) { return; // (3) } out.add(in.readBytes(4)); // (4) } }
(1)ByteToMessageDecoder是一ChannelInboundHandler的實現類,能夠很是容易的處理分片問題。tcp
(2)當新的數據到達時,ByteToMessageDecoder講數據存儲在一個內在的累積buffer中,調用decode()方法進行處理ide
(3)decode()根據接收到的字節大小進行斷定,若滿4個字節則增長一個對象到list。
(4)若是decode()增長了個一個out的對象,意味着decoder編碼成功。ByteToMessageDecoder會丟棄累積buffer中已經讀過的部分。若out.add(null),decoder即中止。
因爲ChannlePipeline中增長了一個handler,所以咱們必須修改ChannelInitializer爲:
b.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler()); } });
若是你是個冒險者,你可能會嘗試ReplayingDecoder。該handler進一步簡化了decoder。
public class TimeDecoder extends ReplayingDecoder<VoidEnum> { @Override protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out, VoidEnum state) { out.add(in.readBytes(4)); } }
另外關於decode的例子能夠參考一下兩個包
io.netty.example.factorial
for a binary protocol, andio.netty.example.telnet
for a text line-based protocol.利用POJO代替ByteBuf
目前咱們看到的例子均是使用ByteBuf做爲協議消息的數據結構。在這一節,咱們將使用POJO來代替ByteBuf來改善Time協議的客戶端和服務器。
使用POJO的有點是十分明顯的,經過分離出解析ByteBuf中數據的代碼,handler會變得更加可維護和重用。在Time的客戶端和服務器的例子中,咱們只讀32字節的integer且這並非一個主要的直接應用ByteBuf的案例。然而,你會發現當你實現一個真正的協議時,作這樣的分離是十分必要的。
首先,咱們先定義一個新類型,UnixTime
package io.netty.example.time; import java.util.Date; public class UnixTime { private final int value; public UnixTime() { this((int) (System.currentTimeMillis() / 1000L + 2208988800L)); } public UnixTime(int value) { this.value = value; } public int value() { return value; } @Override public String toString() { return new Date((value() - 2208988800L) * 1000L).toString(); } }
咱們從新編寫TimeDecoder來禪師一個UnixTIme。
@Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { if (in.readableBytes() < 4) { return; } out.add(new UnixTime(in.readInt())); }
而後咱們更新decoder,TimeClientHandler就再也不使用ByteBuf了
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { UnixTime m = (UnixTime) msg; System.out.println(m); ctx.close(); }
是否是很是的簡潔優雅。Server端一樣如此。首先更新ServerHandler
@Override public void channelActive(ChannelHandlerContext ctx) { ChannelFuture f = ctx.writeAndFlush(new UnixTime()); f.addListener(ChannelFutureListener.CLOSE); }
如今須要編寫解碼部分,encoder是ChannelOutbountHandler的實現類,將UnixTIme轉化成下層的ByteBuf,編寫encoder要比編寫decoder簡單的多,由於此時沒必要考慮tcp包分片的問題。
package io.netty.example.time; public class TimeEncoder extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { UnixTime m = (UnixTime) msg; ByteBuf encoded = ctx.alloc().buffer(4); encoded.writeInt(m.value()); ctx.write(encoded, promise); // (1) } }
(1)這一行中有些十分重要的內容
首先,咱們將原始的ChannelPromis找原來的樣子傳輸,以保證Netty在寫入鏈路時可以正確的標記成功或失敗。
其次,並不調用ctx.flush().handler有一個默認的flush方法,若想每次寫都flush則須:ctx.write(encoded,false,promise);或ctx.writeAndFlush(encode,promise);