引言html
DISCARD
。它是一種協議,在沒有任何響應的狀況下丟棄任何接收到的數據。
package io.netty.example.discard;
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** * Handles a server-side channel. */ public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1) @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2) // Discard the received data silently.丟棄接收到的數據 ((ByteBuf) msg).release(); // (3) } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4) // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { try { // Do something with msg } finally { ReferenceCountUtil.release(msg); } }
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Discards any incoming data.丟掉全部進來的消息
*/
public class DiscardServer { private int port; public DiscardServer(int port) { this.port = port; } public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) 該對象至關於Socket中使用一個線程專門用戶監聽一個socket端口,而後將監聽到的socket對象傳入另外一對象 EventLoopGroup workerGroup = new NioEventLoopGroup();// 該對象至關於Socket中對於每一個socket鏈接都都單獨開闢了一個線程進行數據解析出處理 try { ServerBootstrap b = new ServerBootstrap(); // (2) b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // (3) .childHandler(new ChannelInitializer<SocketChannel>() { // (4) @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new DiscardServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) // (5) .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) // Bind and start to accept incoming connections. ChannelFuture f = b.bind(port).sync(); // (7) // Wait until the server socket is closed. // In this example, this does not happen, but you can do that to gracefully // shut down your server. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port; if (args.length > 0) { port = Integer.parseInt(args[0]); } else { port = 8080; } new DiscardServer(port).run(); } }
注:一、NioEventLoopGroup是一個處理I/O操做的多線程事件循環。Netty爲不一樣類型的傳輸提供了各類EventLoopGroup實現。在本例中,咱們正在實現一個服務器端應用程序,所以將使用兩個NioEventLoopGroup。第一個,一般被稱爲「老闆」,接受進入的鏈接。第二個一般稱爲「worker」,在boss接受鏈接並將接受的鏈接註冊給worker時,它將處理已接受鏈接的流量。使用多少線程以及如何將它們映射到建立的通道取決於EventLoopGroup實現,甚至能夠經過構造函數進行配置。java
Channel
設置服務器。可是,請注意,這是一個冗長的過程,在大多數狀況下不須要這樣作。@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg; try { while (in.isReadable()) { // (1) System.out.print((char) in.readByte()); System.out.flush(); } } finally { ReferenceCountUtil.release(msg); // (2) } }
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.write(msg); // (1) ctx.flush(); // (2) }
若是您再次運行telnet命令,您將看到服務器返回您發送給它的任何內容。git
echo服務器的完整源代碼位於發行版的io.net .example.echo包中。github
package io.netty.example.time;
public class TimeServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(final ChannelHandlerContext ctx) { // (1) final ByteBuf time = ctx.alloc().buffer(4); // (2) time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L)); final ChannelFuture f = ctx.writeAndFlush(time); // (3) f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { assert f == future; ctx.close(); } }); // (4) } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
三、像往常同樣,咱們編寫構造好的消息。web
可是等等,拋硬幣在哪裏?在使用NIO發送消息以前,咱們不是曾經調用java.nio.ByteBuffer.flip()嗎?ByteBuf沒有這樣的方法,由於它有兩個指針;一個用於讀操做,另外一個用於寫操做。當您向ByteBuf寫入內容時,寫入器索引會增長,而讀取器索引不會改變。閱讀器索引和寫入器索引分別表示消息開始和結束的位置。ajax
相反,NIO緩衝區沒有提供一種乾淨的方法來肯定消息內容在哪裏開始和結束,而不調用flip方法。當您忘記翻轉緩衝區時,您將遇到麻煩,由於不會發送任何或不正確的數據。在Netty中不會發生這樣的錯誤,由於對於不一樣的操做類型,咱們有不一樣的指針。當你習慣了它,你會發現它讓你的生活變得更容易——一個沒有翻轉的生活!
編程
要注意的另外一點是ChannelHandlerContext.write()(和writeAndFlush())方法返回ChannelFuture。ChannelFuture表示還沒有發生的I/O操做。這意味着,因爲Netty中的全部操做都是異步的,所以可能尚未執行任何請求的操做。例如,如下代碼可能會在發送消息以前關閉鏈接:bootstrap
Channel ch = ...;
ch.writeAndFlush(message);
ch.close();
f.addListener(ChannelFutureListener.CLOSE);
要測試咱們的時間服務器是否按預期工做,您可使用UNIX rdate命令:api
$ rdate -o <port> -p <host>
package io.netty.example.time;
public class TimeClient { public static void main(String[] args) throws Exception { String host = args[0]; int port = Integer.parseInt(args[1]); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); // (1) b.group(workerGroup); // (2) b.channel(NioSocketChannel.class); // (3) b.option(ChannelOption.SO_KEEPALIVE, true); // (4) b.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeClientHandler()); } }); // Start the client. ChannelFuture f = b.connect(host, port).sync(); // (5) // Wait until the connection is closed. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } }
package io.netty.example.time;
import java.util.Date; public class TimeClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf m = (ByteBuf) msg; // (1) try { long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L; System.out.println(new Date(currentTimeMillis)); ctx.close(); } finally { m.release(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
TIME
客戶端示例。咱們在這裏遇到一樣的問題。32位整數是很是少許的數據,而且不太可能常常被分段。然而,問題在於它多是碎片化的,而且隨着流量的增長,碎片化的可能性將增長。
TimeClientHandler
修復此問題的修改實現:
package io.netty.example.time;
import java.util.Date; public class TimeClientHandler extends ChannelInboundHandlerAdapter { private ByteBuf buf; @Override public void handlerAdded(ChannelHandlerContext ctx) { buf = ctx.alloc().buffer(4); // (1) } @Override public void handlerRemoved(ChannelHandlerContext ctx) { buf.release(); // (1) buf = null; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf m = (ByteBuf) msg; buf.writeBytes(m); // (2) m.release(); if (buf.readableBytes() >= 4) { // (3) long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L; System.out.println(new Date(currentTimeMillis)); ctx.close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
TIME
客戶端的問題,但修改後的處理程序看起來並不乾淨。想象一個更復雜的協議,它由多個字段組成,例如可變長度字段。您的
ChannelInboundHandler
實施將很快變得沒法維護。
ChannelHandler
爲a 添加多個
ChannelPipeline
,所以,您能夠將一個單片拆分
ChannelHandler
爲多個模塊化,以下降應用程序的複雜性。例如,您能夠拆分
TimeClientHandler
爲兩個處理程序:
TimeDecoder
它涉及碎片問題,以及TimeClientHandler
package io.netty.example.time;
public class TimeDecoder extends ByteToMessageDecoder { // (1)
@Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2) if (in.readableBytes() < 4) { return; // (3) } out.add(in.readBytes(4)); // (4) } }
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler()); } });
若是你是一個喜歡冒險的人,你可能想試試ReplayingDecoder,這將解碼器變得更加簡單。不過,您須要參考API參考以得到更多信息。promise
public class TimeDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { out.add(in.readBytes(4)); } }
io.netty.example.factorial
基於二進制協議io.netty.example.telnet
基於文本行的協議.ChannelHandler
中使用POJO的優點是顯而易見的; 經過分離
ByteBuf
從處理程序中提取信息的代碼,您的處理程序變得更易於維護和重用。在
TIME
客戶端和服務器示例中,咱們只讀取一個32位整數,這不是
ByteBuf
直接使用的主要問題。可是,您會發如今實現真實世界協議時必須進行分離。
UnixTime
。
package io.netty.example.time;
import java.util.Date; public class UnixTime { private final long value; public UnixTime() { this(System.currentTimeMillis() / 1000L + 2208988800L); } public UnixTime(long value) { this.value = value; } public long value() { return value; } @Override public String toString() { return new Date((value() - 2208988800L) * 1000L).toString(); } }
咱們如今能夠修改它TimeDecoder
來產生一個UnixTime
而不是一個ByteBuf
。
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { if (in.readableBytes() < 4) { return; } out.add(new UnixTime(in.readUnsignedInt())); }
使用更新的解碼器,TimeClientHandler
再也不使用ByteBuf
:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { UnixTime m = (UnixTime) msg; System.out.println(m); ctx.close(); }
更簡單,更優雅,對吧?能夠在服務器端應用相同的技術。咱們TimeServerHandler
此次更新第一次:
@Override
public void channelActive(ChannelHandlerContext ctx) { ChannelFuture f = ctx.writeAndFlush(new UnixTime()); f.addListener(ChannelFutureListener.CLOSE); }
如今,惟一缺乏的部分是一個編碼器,它的實現ChannelOutboundHandler
將一個UnixTime
轉換爲一個ByteBuf
。它比編寫解碼器簡單得多,由於編碼消息時無需處理數據包碎片和彙編。
package io.netty.example.time;
public class TimeEncoder extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { UnixTime m = (UnixTime) msg; ByteBuf encoded = ctx.alloc().buffer(4); encoded.writeInt((int)m.value()); ctx.write(encoded, promise); // (1) } }
MessageToByteEncoder
:
public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
@Override
protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) { out.writeInt((int)msg.value()); } }
Future
。
io.netty.example
包中的Netty示例。