Netty5+Jboss(Marshalling)完成對象序列化傳輸

  TCP在網絡通信的時候,一般在解決TCP粘包、拆包問題的時候,通常會用如下幾種方式:java

  一、 消息定長 例如每一個報文的大小固定爲200個字節,若是不夠,空位補空格;bootstrap

  二、 在消息尾部添加特殊字符進行分割,如添加回車;服務器

  三、 將消息分爲消息體和消息頭,在消息頭裏麪包含表示消息長度的字段,而後進行業務邏輯的處理。網絡

  在Netty中咱們主要利用對象的序列化進行對象的傳輸,雖然Java自己的序列化也能完成,可是Java序列化有不少問題,如後字節碼流太大,以及序列化程度過低等。Jboss的序列化有程度較高、序列化後碼流較小。這裏利用Jboss的Marshalling測試一個簡單的對象序列化。異步

  新建Maven工程,引入Netty5和Jboss的Marshalling。socket

  注:這裏的Marshalling的版本,若是版本過低,可能會出現消息發送失敗的問題。我在測試的時候起先用的是1.3.9,結果就是消息發送失敗,打印異常信息發現是空指針的問題。ide

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>5.0.0.Alpha2</version>
        </dependency>
        <dependency>
            <groupId>org.jboss.marshalling</groupId>
            <artifactId>jboss-marshalling-serial</artifactId>
            <version>2.0.0.Beta2</version>
        </dependency>        

 

  一、服務端工具

package com.netty.parry.ende4;

import com.netty.parry.ende3.MarshallingCodeCFactory;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class Server {

    public void start(int port) throws Exception {
        // 配置NIO線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            // 服務器輔助啓動類配置
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 128)
            .option(ChannelOption.SO_RCVBUF, 32 * 1024)
            .option(ChannelOption.SO_SNDBUF, 32 * 1024)
            .option(ChannelOption.SO_KEEPALIVE, true)
            .childHandler(new ChildChannelHandler());
            // 綁定端口 同步等待綁定成功
            ChannelFuture f = b.bind(port).sync(); 
            // 等到服務端監聽端口關閉
            f.channel().closeFuture().sync();
        } finally {
            // 優雅釋放線程資源
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    /**
     * 網絡事件處理器
     */
    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            // 添加Jboss的序列化,編解碼工具
            ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
            ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
            // 處理網絡IO
            ch.pipeline().addLast(new ServerHandler());
        }
    }

    public static void main(String[] args) throws Exception {
        new Server().start(8765);
    }
}

 

  二、服務端IO處理類oop

package com.netty.parry.ende4;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class ServerHandler extends ChannelHandlerAdapter {
    
    // 用於獲取客戶端發送的信息
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 用於獲取客戶端發來的數據信息
        Message body = (Message) msg;
        System.out.println("Server接受的客戶端的信息 :" + body.toString());

        // 寫數據給客戶端
        Message response = new Message("歡迎您,與服務端鏈接成功");
        // 當服務端完成寫操做後,關閉與客戶端的鏈接
        ctx.writeAndFlush(response);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

 

  三、客戶端測試

package com.netty.parry.ende4;

import com.netty.parry.ende3.MarshallingCodeCFactory;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class Client {
    /**
     * 鏈接服務器
     * 
     * @param port
     * @param host
     * @throws Exception
     */
    public void connect(int port, String host) throws Exception {
        // 配置客戶端NIO線程組
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            // 客戶端輔助啓動類 對客戶端配置
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new MyChannelHandler());
            // 異步連接服務器 同步等待連接成功
            ChannelFuture f = b.connect(host, port).sync();
            // 等待連接關閉
            f.channel().closeFuture().sync();

        } finally {
            group.shutdownGracefully();
            System.out.println("客戶端優雅的釋放了線程資源...");
        }

    }

    /**
     * 網絡事件處理器
     */
    private class MyChannelHandler extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            System.out.println("MyChannelHandler");
            // 添加Jboss的序列化,編解碼工具
            ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
            ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
            // 處理網絡IO
            ch.pipeline().addLast(new ClientHandler());
        }
    }

    public static void main(String[] args) throws Exception {
        new Client().connect(8765, "127.0.0.1");
    }
}

 

  四、客戶端IO處理類

package com.netty.parry.ende4;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

public class ClientHandler extends ChannelHandlerAdapter {

    // 客戶端與服務端,鏈接成功的售後
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 發送消息
        Message request1 = new Message("666");
        ctx.writeAndFlush(request1).addListener(new ChannelFutureListener() {
            
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    System.out.println("成功發送到服務端消息");
                } else {
                    System.out.println("失敗服務端消息失敗:"+future.cause().getMessage());
                    future.cause().printStackTrace();
                }
            }
        });
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            Message response = (Message) msg;
            System.out.println(response);
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

 

  五、消息體

package com.netty.parry.ende4;

import java.io.Serializable;

public class Message implements Serializable{

    /**
     * 
     */
    private static final long serialVersionUID = -5296315429304117678L;
    
    private String body;

    public String getBody() {
        return body;
    }

    public void setBody(String body) {
        this.body = body;
    }

    public Message(String body) {
        super();
        this.body = body;
    }

    public Message() {
        super();
    }

    @Override
    public String toString() {
        return "Message [body=" + body + "]";
    }
}
相關文章
相關標籤/搜索