Netty入門

簡介

linux 網絡I/O模型介紹java

1)堵塞I/O模型linux

2)非堵塞I/O模型bootstrap

3)僞異步I/O模型數組

4)多路複用select / poll /epoll緩存

5)信號驅動I/O模型服務器

6) 異步I/O網絡

netty入門應用框架

package com.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class TimeServer {
    public void bind(int port) throws Exception {
        // 配置線程組
        //用於網絡事件的處理
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //用於socketChannel的網絡讀寫
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //netty用於啓動NIO服務端的輔助啓動類,目的是下降服務端的開發難度
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChildChannelHandler());  //綁定網絡IO事件的處理類
            // 綁定端口,同步等待成功
            ChannelFuture f = b.bind(port).sync();
            // 等待服務端監聽端口關閉
            f.channel().closeFuture().sync();
        } finally {
            // 優雅退出,釋放線程池資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel arg0) throws Exception {
            arg0.pipeline().addLast(new TimeServerHandler());

        }

    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if(args != null && args.length > 0){
            try {
                port = Integer.valueOf(args[0]);
            } catch (Exception e) {
                //採用默認值
            }
        }
        new TimeServer().bind(port);
    }
}
View Code
package com.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class TimeServerHandler extends ChannelHandlerAdapter{

    public void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];//根據緩衝區可讀字節數建立byte數組
        buf.readBytes(req);
        String body = new String(req,"UTF-8");
        System.out.println("The time server receive order :"+body);
        String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)?new java.util.Date(System.currentTimeMillis()).toString():"BAD ORDER";
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.write(resp);
    }
    
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception{
        ctx.flush();
    }

    /* (non-Javadoc)
     * @see io.netty.channel.ChannelHandlerAdapter#exceptionCaught(io.netty.channel.ChannelHandlerContext, java.lang.Throwable)
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // TODO Auto-generated method stub
        super.exceptionCaught(ctx, cause);
        ctx.close();
    }
    
}
View Code
package com.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class TimeClient {
    public void connect(int port, String host) throws Exception {
        // 配置客戶端NIO線程組
        NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new TimeClientHandler());

                        }

                    });
            // 發起異步鏈接操做,等待鏈接成功
            ChannelFuture f = b.connect(host, port).sync();
            // 等待客戶端鏈路關閉
            f.channel().closeFuture().sync();
        } finally {
            // 優雅的退出 釋放NIO線程組
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (Exception e) {
                // TODO: handle exception
            }
        }
        new TimeClient().connect(port, "127.0.0.1");
    }

}
View Code
package com.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class TimeClientHandler extends ChannelHandlerAdapter {

    private final ByteBuf firstMessage;

    public TimeClientHandler() {
        byte[] req = "QUERY TIME ORDER".getBytes();
        firstMessage = Unpooled.buffer(req.length);
        firstMessage.writeBytes(req);
    }

    // 當客戶端與服務器TCP創建成功後,Netty的NIO線程會調用channelActive方法
    public void channelActive(ChannelHandlerContext ctx) {
        // 將請求消息發送給服務端
        ctx.writeAndFlush(firstMessage);
    }

    // 當服務端返回應答消息時,channelRead方法被調用
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
        System.out.println("Now is :" + body);
    }

    /*
     * (non-Javadoc)
     * 
     * @see
     * io.netty.channel.ChannelHandlerAdapter#exceptionCaught(io.netty.channel.
     * ChannelHandlerContext, java.lang.Throwable)
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // TODO Auto-generated method stub
        super.exceptionCaught(ctx, cause);
        ctx.close();
    }

}
View Code

TCP粘包/拆包問題解決之道

 1)消息定長異步

2)在包尾增長回車換行符進行分割,列如FTP協議socket

3)將消息分爲消息頭和消息體,消息頭包含消息的總長度

4)更復雜的應用層協議

 使用LineBasedFrameDecoder和StringDecoder按行切換的文本編輯器

在client和server的pipeline中添加:

/**
* LineBasedFrameDecoder的工做原理依次遍歷ByteBuf中的可讀字節,判斷是否有「\n」和「\r\n」,若是有,以此結束。
* 當1024長度還沒發現結束符,則結束掉並拋棄以前讀到的異常流碼
*
* StringDecoder將接受到的對象轉化成字符串,而後繼續調用handler
*
* LineBasedFrameDecoder+StringDecoder組合其實就是按行切換的文本編輯器
*/

//判斷ByteBuf中的可讀字節,判斷是否有"\n","\r\n",若是有就以此位置爲結束位置。
//當最大長度1024字節仍然沒有發現換行符,就拋出異常。
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
//將收到的對象轉化成字符串
ch.pipeline().addLast(new StringDecoder());

分隔符和定長編碼器的應用

對消息區分:

1)消息固定長度

2)將回車換行符做爲消息結束符

3)將特殊分隔符做爲消息的結束標誌

4)經過在消息頭中定義長度字段來標識消息的總長度

 DelimiterBasedFrameDecoder開發

在client與server的pipeline中添加以下解碼器

//建立分隔符緩衝對象
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
ch.pipeline().addLast(new StringDecoder());

FixedLengthFrameDecoder應用開發

/**
* 利用FixedLengthFrameDecoder解碼器,不管一次性接收多少數據報,
* 他都會按照構造函數中設置的固定長度進行解碼,若是是半包消息,
* FixedLengthFrameDecoder會緩存半包消息並等待下個包到達後進行拼包,直到讀取到一個完整的包。
*/
ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
ch.pipeline().addLast(new StringDecoder());

編解碼技術

java序列化的缺點:

1)沒法跨語言

2)碼流太大

3)序列化性能不高

編解碼框架

MessagePack編解碼  

特色:

1)編解碼高效,性能高。

2)序列化以後的碼流小。

3)支持跨語言。

/**
 * MessagePack編碼器的開發
 * 負責將Object類型的POJO對象編碼成byte數組,而後寫入到ByteBuf中
 * @author Administrator
 *
 */
public class MsgpackEncoder extends MessageToByteEncoder<Object> {

    @Override
    protected void encode(ChannelHandlerContext arg0, Object arg1, ByteBuf arg2) throws Exception {
        MessagePack msgpack = new MessagePack();
        // 對象arg1序列化
        byte[] raw = msgpack.write(arg1);
        arg2.writeBytes(raw);
    }

}
/**
 * MessagePack 解碼器開發
 * @author Administrator
 *
 */
public class MsgpackDecoder extends MessageToMessageDecoder<ByteBuf> {

    @Override
    protected void decode(ChannelHandlerContext arg0, ByteBuf arg1, List<Object> arg2) throws Exception {
        final byte[] array;
        final int length = arg1.readableBytes();
        array = new byte[length];
        //首先從數據報arg1中獲取解碼的byte數組
        arg1.getBytes(arg1.readerIndex(), array,0,length);
        MessagePack msgpack = new MessagePack();
        //調用read()將其反序列化爲object對象,並加入到解碼列表中
        arg2.add(msgpack.read(array));
    }

}

在client和server的pipeline中添加編解碼處理器:

//LengthFieldBasedFrameDecoder  LengthFieldPrepender  解決粘包問題  
                            //MsgpackDecoder MsgpackEncoder  解決對象編碼問題
                            ch.pipeline().addLast("frameDecoder",
                                    new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
                            ch.pipeline().addLast("msgpack decoder",new MsgpackDecoder());
//加了2個字節的消息字段 ch.pipeline().addLast(
"frameEncoder",new LengthFieldPrepender(2)); ch.pipeline().addLast("msgpack encoder",new MsgpackEncoder());

Google Protobuf編解碼

JBoss Marshalling編解碼    jboss-marshalling-1.3.0.jar 和 jboss-marshalling-serial-1.3.4.jar

/**
     * 建立Jboss Marshalling解碼器marshallingDecoder
     * @return
     */
    public static MarshallingDecoder buildMarshallingDecoder(){
        //獲取MarshallerFactory實例 ,參數serial表示建立的java序列化工廠對象,
        //由jboss-marshalling-serial-1.3.4.jar提供
        final MarshallerFactory marshallerFactory =
                Marshalling.getMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        UnmarshallerProvider provider = 
                new DefaultUnmarshallerProvider(marshallerFactory, configuration);
        //1024 單個消息序列化最大長度
        MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
        return decoder;
    }
    
    public static MarshallingEncoder buildMarshallingEncoder(){
        final MarshallerFactory marshallerFactory = 
                Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        DefaultMarshallerProvider provider =
                new DefaultMarshallerProvider(marshallerFactory, configuration);
        //將POJO對象序列化爲二進制數組
        MarshallingEncoder encoder = new MarshallingEncoder(provider);
        return encoder;
    }

在client和server的pipeline中添加編解碼器: (支持半包和拆包的處理)

ch.pipeline().addLast(MarshallingCodeCFactory
            .buildMarshallingDecoder());
ch.pipeline().addLast(MarshallingCodeCFactory
            .buildMarshallingEncoder());
相關文章
相關標籤/搜索