netty: 編解碼之jboss marshalling, 用marshalling進行對象傳輸

jboss marshalling是jboss內部的一個序列化框架,速度也十分快,這裏netty也提供了支持,使用十分方便。java

TCP在網絡通信的時候,一般在解決TCP粘包、拆包問題的時候,通常會用如下幾種方式:數組

  一、 消息定長 例如每一個報文的大小固定爲200個字節,若是不夠,空位補空格;網絡

  二、 在消息尾部添加特殊字符進行分割,如添加回車;框架

  三、 將消息分爲消息體和消息頭,在消息頭裏麪包含表示消息長度的字段,而後進行業務邏輯的處理。ide

  在Netty中咱們主要利用對象的序列化進行對象的傳輸,雖然Java自己的序列化也能完成,可是Java序列化有不少問題,如後字節碼流太大,以及序列化程度過低等。Jboss的序列化有程度較高、序列化後碼流較小。這裏利用Jboss的Marshalling測試一個簡單的對象序列化。工具

 

引入marshallingoop

<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
	<dependency>
	    <groupId>io.netty</groupId>
	    <artifactId>netty-all</artifactId>
	    <version>5.0.0.Alpha2</version>
	</dependency>
	
	<!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling -->
	<dependency>
	    <groupId>org.jboss.marshalling</groupId>
	    <artifactId>jboss-marshalling</artifactId>
	    <version>2.0.0.CR1</version>
	</dependency>
    
    <!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling-serial -->
	<dependency>
	    <groupId>org.jboss.marshalling</groupId>
	    <artifactId>jboss-marshalling-serial</artifactId>
	    <version>2.0.0.CR1</version>	    
	</dependency>

  

server:測試

public class Server {

	public static void main(String[] args) throws InterruptedException {
		//1.第一個線程組是用於接收Client端鏈接的  
        EventLoopGroup bossGroup = new NioEventLoopGroup();   
        //2.第二個線程組是用於實際的業務處理的  
        EventLoopGroup workerGroup = new NioEventLoopGroup();  
        ServerBootstrap b = new ServerBootstrap();  
        b.group(bossGroup, workerGroup);//綁定兩個線程池  
        b.channel(NioServerSocketChannel.class);//指定NIO的模式,若是是客戶端就是NioSocketChannel  
//        b.option(ChannelOption.SO_BACKLOG, 1024);//TCP的緩衝區設置  
//        b.option(ChannelOption.SO_SNDBUF, 32*1024);//設置發送緩衝的大小  
//        b.option(ChannelOption.SO_RCVBUF, 32*1024);//設置接收緩衝區大小  
//        b.option(ChannelOption.SO_KEEPALIVE, true);//保持連續  
        b.childHandler(new ChannelInitializer<SocketChannel>() {  
            protected void initChannel(SocketChannel ch) throws Exception {
            	//設置Marshalling的編碼和解碼
                ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                ch.pipeline().addLast(new ServerHandler());
            }
        });
        ChannelFuture future = b.bind(8765).sync();//綁定端口  
        future.channel().closeFuture().sync();//等待關閉(程序阻塞在這裏等待客戶端請求)  
        bossGroup.shutdownGracefully();//關閉線程  
        workerGroup.shutdownGracefully();//關閉線程 

	}
}

  

ServerHandlerui

public class ServerHandler extends ChannelHandlerAdapter{

	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
    }
 
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
    	Send send = (Send) msg;
    	System.out.println("client發送:"+send);
    	
    	Receive receive = new Receive();
    	receive.setId(send.getId());
    	receive.setMessage(send.getMessage());
    	receive.setName(send.getName());
    	ctx.writeAndFlush(receive);
    }
	
}

  

clientthis

public class Client {

	public static void main(String[] args) throws InterruptedException {
		EventLoopGroup worker = new NioEventLoopGroup();
		Bootstrap b = new Bootstrap();
		b.group(worker)
		.channel(NioSocketChannel.class)
		.handler(new ChannelInitializer<SocketChannel>() {
			@Override
			protected void initChannel(SocketChannel sc) throws Exception {
				//ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes()); 
				//sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,buf));
				//sc.pipeline().addLast(new StringDecoder());
				sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
				sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
				sc.pipeline().addLast(new ClientHandler());
			}
		});
		ChannelFuture f=b.connect("127.0.0.1",8765).sync();
		for(int i=1;i<=5;i++){
			Send send = new Send();
	        send.setId(i);
	        send.setMessage("message"+i);
	        send.setName("name"+i);
	        f.channel().writeAndFlush(send);
		}
		f.channel().closeFuture().sync();
		worker.shutdownGracefully();

	}
}

  

clientHandler

public class ClientHandler extends ChannelHandlerAdapter{
	
	@Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
    	Receive receive = (Receive) msg;
        System.out.println("server反饋:"+receive);

    }
	
}

  

 

send

public class Send implements Serializable {
	 
	/**
	 * serialVersionUID:TODO(用一句話描述這個變量表示什麼)
	 * 
	 * @since 1.0.0
	 */
 
	private static final long serialVersionUID = 1L;
 
	private Integer id;
	private String name;
	private String message;
 
	public Integer getId() {
		return id;
	}
 
	public void setId(Integer id) {
		this.id = id;
	}
 
	public String getName() {
		return name;
	}
 
	public void setName(String name) {
		this.name = name;
	}
 
	public String getMessage() {
		return message;
	}
 
	public void setMessage(String message) {
		this.message = message;
	}
 
	@Override
	public String toString() {
		return "Send [id=" + id + ", name=" + name + ", message=" + message + "]";
	}
 
}

  

 

receive

public class Receive implements Serializable{
 
	/**
	 * serialVersionUID:TODO(用一句話描述這個變量表示什麼)
	 * @since 1.0.0
	 */
	
	private static final long serialVersionUID = 1L;
	private Integer id;
	private String name;
	private String message;
	private byte[] sss;
	
	public byte[] getSss() {
		return sss;
	}
	public void setSss(byte[] sss) {
		this.sss = sss;
	}
	public Integer getId() {
		return id;
	}
	public void setId(Integer id) {
		this.id = id;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public String getMessage() {
		return message;
	}
	public void setMessage(String message) {
		this.message = message;
	}
	@Override
	public String toString() {
		return "Receive [id=" + id + ", name=" + name + ", message=" + message + ", sss=" + Arrays.toString(sss) + "]";
	}
	
}

  

 

marshalling工廠類

public final class MarshallingCodeCFactory {

	 /**
     * 建立Jboss Marshalling解碼器MarshallingDecoder
     * @return MarshallingDecoder
     */
    public static MarshallingDecoder buildMarshallingDecoder() {
    	//首先經過Marshalling工具類的精通方法獲取Marshalling實例對象 參數serial標識建立的是java序列化工廠對象。
		final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
		//建立了MarshallingConfiguration對象,配置了版本號爲5 
		final MarshallingConfiguration configuration = new MarshallingConfiguration();
		configuration.setVersion(5);
		//根據marshallerFactory和configuration建立provider
		UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
		//構建Netty的MarshallingDecoder對象,倆個參數分別爲provider和單個消息序列化後的最大長度
		MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
		return decoder;
    }

    /**
     * 建立Jboss Marshalling編碼器MarshallingEncoder
     * @return MarshallingEncoder
     */
    public static MarshallingEncoder buildMarshallingEncoder() {
		final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
		final MarshallingConfiguration configuration = new MarshallingConfiguration();
		configuration.setVersion(5);
		MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
		//構建Netty的MarshallingEncoder對象,MarshallingEncoder用於實現序列化接口的POJO對象序列化爲二進制數組
		MarshallingEncoder encoder = new MarshallingEncoder(provider);
		return encoder;
    }
}

  

運行結果

server反饋:Receive [id=1, name=name1, message=message1, sss=null]
server反饋:Receive [id=2, name=name2, message=message2, sss=null]
server反饋:Receive [id=3, name=name3, message=message3, sss=null]
server反饋:Receive [id=4, name=name4, message=message4, sss=null]
server反饋:Receive [id=5, name=name5, message=message5, sss=null]
相關文章
相關標籤/搜索