linux 網絡I/O模型介紹java
1)堵塞I/O模型linux
2)非堵塞I/O模型bootstrap
3)僞異步I/O模型數組
4)多路複用select / poll /epoll緩存
5)信號驅動I/O模型服務器
6) 異步I/O網絡
netty入門應用框架
package com.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class TimeServer { public void bind(int port) throws Exception { // 配置線程組 //用於網絡事件的處理 EventLoopGroup bossGroup = new NioEventLoopGroup(); //用於socketChannel的網絡讀寫 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //netty用於啓動NIO服務端的輔助啓動類,目的是下降服務端的開發難度 ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChildChannelHandler()); //綁定網絡IO事件的處理類 // 綁定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); // 等待服務端監聽端口關閉 f.channel().closeFuture().sync(); } finally { // 優雅退出,釋放線程池資源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel arg0) throws Exception { arg0.pipeline().addLast(new TimeServerHandler()); } } public static void main(String[] args) throws Exception { int port = 8080; if(args != null && args.length > 0){ try { port = Integer.valueOf(args[0]); } catch (Exception e) { //採用默認值 } } new TimeServer().bind(port); } }
package com.netty; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class TimeServerHandler extends ChannelHandlerAdapter{ public void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{ ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()];//根據緩衝區可讀字節數建立byte數組 buf.readBytes(req); String body = new String(req,"UTF-8"); System.out.println("The time server receive order :"+body); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)?new java.util.Date(System.currentTimeMillis()).toString():"BAD ORDER"; ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); ctx.write(resp); } public void channelReadComplete(ChannelHandlerContext ctx) throws Exception{ ctx.flush(); } /* (non-Javadoc) * @see io.netty.channel.ChannelHandlerAdapter#exceptionCaught(io.netty.channel.ChannelHandlerContext, java.lang.Throwable) */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // TODO Auto-generated method stub super.exceptionCaught(ctx, cause); ctx.close(); } }
package com.netty; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class TimeClient { public void connect(int port, String host) throws Exception { // 配置客戶端NIO線程組 NioEventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeClientHandler()); } }); // 發起異步鏈接操做,等待鏈接成功 ChannelFuture f = b.connect(host, port).sync(); // 等待客戶端鏈路關閉 f.channel().closeFuture().sync(); } finally { // 優雅的退出 釋放NIO線程組 group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (Exception e) { // TODO: handle exception } } new TimeClient().connect(port, "127.0.0.1"); } }
package com.netty; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class TimeClientHandler extends ChannelHandlerAdapter { private final ByteBuf firstMessage; public TimeClientHandler() { byte[] req = "QUERY TIME ORDER".getBytes(); firstMessage = Unpooled.buffer(req.length); firstMessage.writeBytes(req); } // 當客戶端與服務器TCP創建成功後,Netty的NIO線程會調用channelActive方法 public void channelActive(ChannelHandlerContext ctx) { // 將請求消息發送給服務端 ctx.writeAndFlush(firstMessage); } // 當服務端返回應答消息時,channelRead方法被調用 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8"); System.out.println("Now is :" + body); } /* * (non-Javadoc) * * @see * io.netty.channel.ChannelHandlerAdapter#exceptionCaught(io.netty.channel. * ChannelHandlerContext, java.lang.Throwable) */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // TODO Auto-generated method stub super.exceptionCaught(ctx, cause); ctx.close(); } }
1)消息定長異步
2)在包尾增長回車換行符進行分割,列如FTP協議socket
3)將消息分爲消息頭和消息體,消息頭包含消息的總長度
4)更復雜的應用層協議
使用LineBasedFrameDecoder和StringDecoder按行切換的文本編輯器
在client和server的pipeline中添加:
/**
* LineBasedFrameDecoder的工做原理依次遍歷ByteBuf中的可讀字節,判斷是否有「\n」和「\r\n」,若是有,以此結束。
* 當1024長度還沒發現結束符,則結束掉並拋棄以前讀到的異常流碼
*
* StringDecoder將接受到的對象轉化成字符串,而後繼續調用handler
*
* LineBasedFrameDecoder+StringDecoder組合其實就是按行切換的文本編輯器
*/
//判斷ByteBuf中的可讀字節,判斷是否有"\n","\r\n",若是有就以此位置爲結束位置。 //當最大長度1024字節仍然沒有發現換行符,就拋出異常。 ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); //將收到的對象轉化成字符串 ch.pipeline().addLast(new StringDecoder());
對消息區分:
1)消息固定長度
2)將回車換行符做爲消息結束符
3)將特殊分隔符做爲消息的結束標誌
4)經過在消息頭中定義長度字段來標識消息的總長度
DelimiterBasedFrameDecoder開發
在client與server的pipeline中添加以下解碼器
//建立分隔符緩衝對象 ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
ch.pipeline().addLast(new StringDecoder());
FixedLengthFrameDecoder應用開發
/** * 利用FixedLengthFrameDecoder解碼器,不管一次性接收多少數據報, * 他都會按照構造函數中設置的固定長度進行解碼,若是是半包消息, * FixedLengthFrameDecoder會緩存半包消息並等待下個包到達後進行拼包,直到讀取到一個完整的包。 */ ch.pipeline().addLast(new FixedLengthFrameDecoder(20)); ch.pipeline().addLast(new StringDecoder());
java序列化的缺點:
1)沒法跨語言
2)碼流太大
3)序列化性能不高
編解碼框架
MessagePack編解碼
特色:
1)編解碼高效,性能高。
2)序列化以後的碼流小。
3)支持跨語言。
/** * MessagePack編碼器的開發 * 負責將Object類型的POJO對象編碼成byte數組,而後寫入到ByteBuf中 * @author Administrator * */ public class MsgpackEncoder extends MessageToByteEncoder<Object> { @Override protected void encode(ChannelHandlerContext arg0, Object arg1, ByteBuf arg2) throws Exception { MessagePack msgpack = new MessagePack(); // 對象arg1序列化 byte[] raw = msgpack.write(arg1); arg2.writeBytes(raw); } }
/** * MessagePack 解碼器開發 * @author Administrator * */ public class MsgpackDecoder extends MessageToMessageDecoder<ByteBuf> { @Override protected void decode(ChannelHandlerContext arg0, ByteBuf arg1, List<Object> arg2) throws Exception { final byte[] array; final int length = arg1.readableBytes(); array = new byte[length]; //首先從數據報arg1中獲取解碼的byte數組 arg1.getBytes(arg1.readerIndex(), array,0,length); MessagePack msgpack = new MessagePack(); //調用read()將其反序列化爲object對象,並加入到解碼列表中 arg2.add(msgpack.read(array)); } }
在client和server的pipeline中添加編解碼處理器:
//LengthFieldBasedFrameDecoder LengthFieldPrepender 解決粘包問題 //MsgpackDecoder MsgpackEncoder 解決對象編碼問題 ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2)); ch.pipeline().addLast("msgpack decoder",new MsgpackDecoder());
//加了2個字節的消息字段 ch.pipeline().addLast("frameEncoder",new LengthFieldPrepender(2)); ch.pipeline().addLast("msgpack encoder",new MsgpackEncoder());
Google Protobuf編解碼
JBoss Marshalling編解碼 jboss-marshalling-1.3.0.jar 和 jboss-marshalling-serial-1.3.4.jar
/** * 建立Jboss Marshalling解碼器marshallingDecoder * @return */ public static MarshallingDecoder buildMarshallingDecoder(){ //獲取MarshallerFactory實例 ,參數serial表示建立的java序列化工廠對象, //由jboss-marshalling-serial-1.3.4.jar提供 final MarshallerFactory marshallerFactory = Marshalling.getMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration); //1024 單個消息序列化最大長度 MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024); return decoder; } public static MarshallingEncoder buildMarshallingEncoder(){ final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); DefaultMarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration); //將POJO對象序列化爲二進制數組 MarshallingEncoder encoder = new MarshallingEncoder(provider); return encoder; }
在client和server的pipeline中添加編解碼器: (支持半包和拆包的處理)
ch.pipeline().addLast(MarshallingCodeCFactory
.buildMarshallingDecoder());
ch.pipeline().addLast(MarshallingCodeCFactory
.buildMarshallingEncoder());