簡單找了下發現網上沒有關於Netty3比較完整的源碼解析的文章,因而我就去讀官方文檔,爲了增強記憶,翻譯成了中文,有適當的簡化。bootstrap
原文檔地址:Netty3文檔緩存
運行demo的前提有兩個:最新版本的Netty3和JDK1.5以上安全
最簡單的協議就是Discard協議——忽略全部接收到的數據而且不做任何響應。咱們從Netty處理I/O事件的handler實現開始:服務器
public class DiscardServerHandler extends SimpleChannelHandler { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { e.getCause().printStackTrace(); Channel ch = e.getChannel(); ch.close(); } }
接下來寫一個main方法來開啓使用DiscardServerHandler的服務:網絡
public class DiscardServer { public static void main(String[] args) throws Exception { ChannelFactory factory = new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); ServerBootstrap bootstrap = new ServerBootstrap(factory); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { return Channels.pipeline(new DiscardServerHandler()); } }); bootstrap.setOption("child.tcpNoDelay", true); bootstrap.setOption("child.keepAlive", true); bootstrap.bind(new InetSocketAddress(8080)); } }
咱們能夠經過"telnet localhost 8080"命令去測試服務,但由於是Discard服務,咱們都不知道服務是否正常工做。因此咱們修改下服務,讓它打印出接收到的數據。數據結構
@Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { ChannelBuffer buf = (ChannelBuffer) e.getMessage(); while(buf.readable()) { System.out.println((char) buf.readByte()); System.out.flush(); } }
一個服務一般對請求是有響應的。接下來咱們嘗試寫一個實現Echo協議——將接收的數據原路返回給客戶端的服務:異步
@Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { Channel ch = e.getChannel(); ch.write(e.getMessage()); }
此次咱們實現一個時間協議——在不須要任何請求數據的狀況下返回一個32位整型數字而且在發送以後關閉鏈接。由於咱們忽略請求數據,只須要在鏈接創建的發送消息,因此此次不能使用messageReceived方法而是重寫channelConnected方法:socket
@Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) { Channel ch = e.getChannel(); ChannelBuffer time = ChannelBuffers.buffer(4); time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L)); ChannelFuture f = ch.write(time); f.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) { Channel ch = future.getChannel(); ch.close(); } }); }
咱們還須要一個遵照時間協議,即能把整型數字翻譯成日期的客戶端。Netty服務端和客戶端惟一的區別就是要求不一樣的Bootstrap和ChannelFactory:tcp
public static void main(String[] args) throws Exception { String host = args[0]; int port = Integer.parseInt(args[1]); ChannelFactory factory = new NioClientSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); ClientBootstrap bootstrap = new ClientBootstrap(factory); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { return Channels.pipeline(new TimeClientHandler()); } }); bootstrap.setOption("tcpNoDelay", true); bootstrap.setOption("keepAlive", true); bootstrap.connect(new InetSocketAddress(host, port)); }
另外咱們須要一個ChannelHandler實現,負責把接收到服務端返回的32位整型數字翻譯成日期並打印出來,而後斷開鏈接:ide
public class TimeClientHandler extends SimpleChannelHandler { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { ChannelBuffer buf = (ChannelBuffer) e.getMessage(); long currentTimeMillis = buf.readInt() * 1000L; System.out.println(new Date(currentTimeMillis)); e.getChannel().close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { e.getCause().printStackTrace(); e.getChannel().close(); } }
看上去很簡單是吧?可是實際運行過程當中這個handler有時會拋出一個IndexOutOfBoundsException。下一節咱們會討論爲何會這樣。
在像TCP/IP那樣基於流的傳輸中,接收數據保存在一個socket接收緩存中。可是這個緩存不是一個以包爲單位的隊列,而是一個以字節爲單位的隊列。這就意味着,即便發送兩個獨立的消息,操做系統會把他們視爲一個字節串。所以,不能保證你讀到的和另外一端寫入的同樣。因此,不論是客戶端仍是服務端,對於接收到的數據都須要整理成符合應用程序邏輯的結構。
回到前面的時間客戶端的問題,32位整型數字很小,可是它也是能夠拆分的,特別是當流量上升的時候,被拆分的可能性也隨之上升。
一個簡單的處理方式就是內部建立一個累計的緩存,直到接收滿4個字節才進行處理。
private final ChannelBuffer buf = dynamicBuffer(); @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { ChannelBuffer m = (ChannelBuffer) e.getMessage(); buf.writeBytes(m); if (buf.readableBytes() >= 4) { long currentTimeMillis = buf.readInt() * 1000L; System.out.println(new Date(currentTimeMillis)); e.getChannel().close(); } }
第一種方案有不少問題,好比一個複雜的協議,由多個可變長度的域組成,這種狀況下第一種方案的handler就沒法支持了。
你會發現你能夠添加多個ChannelHandler到ChannelPipeline中,利用這個特性,你能夠把一個臃腫的ChannelHandler拆分到多個模塊化的ChannelHandler中,這樣能夠下降應用程序的複雜度。好比,你能夠把TimeClientHandler拆分紅兩個handler:
Netty提供了可擴展的類幫助你實現TimeDecoder:
public class TimeDecoder extends FrameDecoder { @Override protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) { if (buffer.readableBytes() < 4) { return null; } return buffer.readBytes(4); } }
拆分以後,咱們須要修改TimeClient的ChannelPipelineFactory實現:
bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { return Channels.pipeline( new TimeDecoder(), new TimeClientHandler()); } });
Netty還提供了進一步簡化解碼的ReplayingDecoder:
public class TimeDecoder extends ReplayingDecoder<VoidEnum> { @Override protected Object decode( ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, VoidEnum state) { return buffer.readBytes(4); } }
此外,Netty提供了一批開箱即用的解碼器,讓你能夠簡單得實現大多數協議:
上面的demo咱們都是用ChannelBuffer做爲協議化消息的基本數據結構,這一節咱們用POJO替代ChannelBuffer。將從ChannelBuffer提取信息的代碼跟handler分離開,會使handler變得更加可維護的和可重用的。從上面的demo裏不容易看出這個優點,可是實際應用中分離頗有必要。
首先,咱們定義一個類型UnixTime:
public class UnixTime { private final int value; public UnixTime(int value) { this.value = value; } public int getValue() { return value; } @Override public String toString() { return new Date(value * 1000L).toString(); } }
如今咱們能夠修改TimeDecoder讓它返回一個UnixTime而不是ChannelBuffer:
@Override protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) { if (buffer.readableBytes() < 4) { return null; } return new UnixTime(buffer.readInt()); }
編碼器改了,那麼相應的TimeClientHandler就不會繼續使用ChannelBuffer:
@Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { UnixTime m = (UnixTime) e.getMessage(); System.out.println(m); e.getChannel().close(); }
一樣的技術也能夠應用到服務端的TimeServerHandler上:
@Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) { UnixTime time = new UnixTime((int)(System.currentTimeMillis() / 1000)); ChannelFuture f = e.getChannel().write(time); f.addListener(ChannelFutureListener.CLOSE); }
能這樣運用的前提是有一個編碼器,能夠把UnixTime對象翻譯成ChannelBuffer:
public class TimeEncoder extends SimpleChannelHandler { public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) { UnixTime time = (UnixTime) e.getMessage(); ChannelBuffer buf = buffer(4); buf.writeInt(time.getValue()); Channels.write(ctx, e.getFuture(), buf); } }
一樣,TimeEncoder也須要加入到服務端的ChannelPipeline中:
bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { return Channels.pipeline( new TimeServerHandler(), new TimeEncoder()); } });
爲了關閉I/O線程讓應用程序優雅得退出,咱們須要釋放ChannelFactory分配的資源。
一個典型網絡應用程序的關閉過程分爲三步:
應用到TimeClient上:
ChannelFuture future = bootstrap.connect(...); future.awaitUninterruptibly(); if (!future.isSuccess()) { future.getCause().printStackTrace(); } future.getChannel().getCloseFuture().awaitUninterruptibly(); factory.releaseExternalResources();
關閉一個客戶端很簡單,那服務端呢?你須要從端口解綁並關閉全部接收到的鏈接。前提是你須要一個保持跟蹤活躍鏈接的數據結構,Netty提供了ChannelGroup。
ChannelGroup是Java集合API一個特殊的的擴展,它表明一組打開的Channel。若是一個Channel被添加到ChannelGroup,而後這個Channel被關閉了,它會從ChannelGroup中自動移除。你能夠對同一ChannelGroup中的Channel作批量操做,好比在關閉服務的時候關閉全部Channel。
要跟蹤打開的socket,你須要修改TimeServerHandler,把新打開的Channel添加到全局的ChannelGroup變量中。ChannelGroup是線程安全的。
@Override public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) { TimeServer.allChannels.add(e.getChannel()); }
如今咱們自動維持了一個包含全部活躍Channel的列表,關閉服務端就像關閉客戶端同樣容易了。
public class TimeServer { static final ChannelGroup allChannels = new DefaultChannelGroup("time-server"); public static void main(String[] args) throws Exception { ... ChannelFactory factory = ...; ... ServerBootstrap bootstrap = ...; ... Channel channel = bootstrap.bind(new InetSocketAddress(8080)); allChannels.add(channel); waitForShutdownCommand(); ChannelGroupFuture future = allChannels.close(); future.awaitUninterruptibly(); factory.releaseExternalResources(); } }
這一節咱們快速瀏覽了Netty,示範瞭如何用Netty寫一個能正常工做的網絡應用。下一節將介紹Netty的更多細節。