Netty(四)

       咱們知道,基於JDK默認的序列化機制能夠避免操做底層的字節數組,從而提高了開發效率。也即是java序列化機制的用武之地。java

       那麼這和編解碼有什麼關係呢?因爲java序列化的目的有兩個:一是對象持久化,另外一個是網絡傳輸。而編解碼技術是貫穿在網絡傳輸之中的。在進行遠程跨進程服務調用時,須要把被傳輸的Java對象編碼爲字節數組或者ByteBuffer對象。當讀取到ByteBuffer對象或字節數組時,須要將其編碼爲發送時的Java對象。這被稱爲Java對象編解碼技術。數組

     因爲Java序列化的種種缺陷,衍生出了多種編解碼技術和框架,其中MessagePack、JBoss Marshalling和Google Protobuf是比較流行的,本文只着重介紹Protobuf的原理和使用。緩存

1、Java序列化的缺點網絡

1  沒法跨語言框架

       這也是Java序列化最致命的問題。對於跨進程的服務調用,當咱們須要和異構語言進程交互時,Java序列化就難以勝任。而且經Java序列化後的字節數組,別的語言沒法進行反序列化,就嚴重阻礙了它的應用。異步

2  碼流太大ide

       看下列代碼工具

public class UserInfo implements Serializable {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	
	private String userName;
	
	public final String getUserName() {
		return userName;
	}

	public final void setUserName(String userName) {
		this.userName = userName;
	}

	public final int getUserID() {
		return userID;
	}

	public final void setUserID(int userID) {
		this.userID = userID;
	}

	private int userID;
	
	public UserInfo buildUserName(String userName) {
		this.userName = userName;
		return this;
	}
	
	public UserInfo buildUserID(int userID) {
		this.userID = userID;
		return this;
	}

	public byte[] codeC() {
		ByteBuffer buffer = ByteBuffer.allocate(1024);
		byte[] value = this.userName.getBytes();
		buffer.putInt(value.length);
		buffer.put(value);
		buffer.putInt(this.userID);
		buffer.flip();
		value = null;
		byte[] result = new byte[buffer.remaining()];
		buffer.get(result);
		return result;
	}
}

public class TestUserInfo {

	/**
	 * @param args
	 * @throws IOException
	 */
	public static void main(String[] args) throws IOException {

		UserInfo info = new UserInfo();
		info.buildUserID(100).buildUserName("Welcome to Netty");
		ByteArrayOutputStream bos = new ByteArrayOutputStream();
		ObjectOutputStream os = new ObjectOutputStream(bos);
		os.writeObject(info);
		os.flush();
		os.close();
		byte[] b = bos.toByteArray();
		System.out.println("The jdk serializable length is : " + b.length);
		bos.close();
		System.out.println("-------------------------------------");
		System.out.println("The byte array serializable length is : " + info.codeC().length);
	}
}

執行TestUserInfo,結果以下:oop

這代表,採用JDK序列化機制編碼後的二進制數組大小居然是二進制編碼的4.75倍!性能

3  序列化性能過低

       將以前的代碼稍做修改,改形成性能測試版本

public byte[] codeC(ByteBuffer buffer) {

    buffer.clear();
    byte[] value = this.userName.getBytes();
    buffer.putInt(value.length);
    buffer.put(value);
    buffer.putInt(this.userID);
    buffer.flip();
    value = null;
    byte[] result = new byte[buffer.remaining()];
    buffer.get(result);
    return result;

}

對UserInfo進行改造,新增以上所示方法,並建立一個性能測試版本的UserInfo測試程序。以下:

public class PerformTestUserInfo {

	/**
	 * @param args
	 * @throws IOException
	 */
	public static void main(String[] args) throws IOException {
		
		UserInfo info = new UserInfo();
		info.buildUserID(100).buildUserName("Welcome to Netty");
		int loop  = 1000000;
		ByteArrayOutputStream bos = null;
		ObjectOutputStream os = null;
		long startTime = System.currentTimeMillis();
		for(int i=0;i<loop;i++) {
			bos = new ByteArrayOutputStream();
			os = new ObjectOutputStream(bos);
			os.writeObject(info);
			os.flush();
			os.close();
			byte[] b = bos.toByteArray();
			bos.close();
		}
		long endTime = System.currentTimeMillis();
		System.out.println("The jdk serializable cost time is : " + (endTime - startTime) + " ms");
		System.out.println("----------------------------------------------");
		ByteBuffer buffer = ByteBuffer.allocate(1024);
		startTime = System.currentTimeMillis();
		for(int i=0;i<loop;i++) {
			byte[] b = info.codeC(buffer);
		}
		endTime = System.currentTimeMillis();
		System.out.println("The byte array serializable cost time is : " + (endTime - startTime) + " ms");
	}
}

結果爲:

可見,Java序列化的性能只有二進制編碼的51.4%左右。

2、業界主流的編解碼框架

Google Protobuf介紹

特色:

1  結構化數據存儲格式

2  高效的編解碼性能

3  語言無關、平臺無關、擴展性好

4  官方支持Java、C++和Python三種語言

Facebook Thrift介紹

Thrift適用於搭建大型數據交換及存儲的通用工具,對於大型系統中的內部數據傳輸,相對於JSON和XML在性能和傳輸大小上都有明顯的優點。

組件:語言系統及IDL編譯器,TProtocol,TTransport,TProcessor以及TServer

咱們重點關注的是編解碼框架,與之對應的就是TProtocol。因爲一般使用Thrift的時候都會採起RPC框架的方式。可是,它的TProtocol編解碼框架仍是能夠以類庫的方式獨立使用。

Thrift支持三種比較典型的編解碼方式

1  通用的二進制編解碼

2  壓縮二進制編解碼

3  優化的可選字段壓縮編解碼

JBoss Marshalling介紹

優勢:

1  可插拔的類解析器

2  可插拔的對象替換技術

3  可插拔的預約義類緩存表

4  無須實現java.io.Serializable接口,便可實現Java序列化

5  經過緩存技術提高對象的序列化性能

然而JBoss Marshalling更可能是在JBoss內部使用,應用範圍有限。

3、Google Protobuf編解碼

       這裏經過一個簡單的例程來學習介紹怎樣使用Protobuf對POJO對象進行編解碼,而後講解如何在Netty中對POJO對象進行Protobuf編解碼,並在兩個進程之間進行通訊和數據交換。

       此處略過Protobuf環境搭建過程,直接看經過.proto文件編譯後的java代碼來進行Protobuf的使用。

測試代碼以下:

public class TestSubscribeReqProto {

	private static byte[] encode(SubscribeReqProto.SubscribeReq req) {
		return req.toByteArray();
	}
	
	private static SubscribeReqProto.SubscribeReq decode(byte[] body) throws InvalidProtocolBufferException {
		return SubscribeReqProto.SubscribeReq.parseFrom(body);
	}
	
	private static SubscribeReqProto.SubscribeReq createSubscribeReq() {
		SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder();
		builder.setSubReqID(1);
		builder.setUserName("Lilinfeng");
		builder.setProductName("Netty Book");
		List<String> address = new ArrayList<>();
		address.add("NanJing YuHuaTai");
		address.add("BeiJing LiuLiChang");
		address.add("ShenZhen HongShuLin");
		builder.setAddress(address.toString());
		return builder.build();
	}
	
	/**
	 * @param args
	 * @throws InvalidProtocolBufferException
	 */
	public static void main(String[] args) throws InvalidProtocolBufferException {
		SubscribeReqProto.SubscribeReq req = createSubscribeReq();
		System.out.println("Before encode : " + req.toString());
		SubscribeReqProto.SubscribeReq req2 = decode(encode(req));
		System.out.println("After decode : " + req.toString());
		System.out.println("Assert equal : --> " + req2.equals(req));
	}
}

運行後,輸出結果:

運行結果代表,通過Protobuf編碼後,生成的SubscribeReqProto.SubscribeReq與編碼前原始的SubscribeReqProto.SubscribeReq等價。下面,使用Netty的Protobuf編解碼框架試試看。

服務端代碼:

public class SubReqServer {

	public void bind(int port) throws Exception {
		//配置服務端的NIO線程組
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			ServerBootstrap b = new ServerBootstrap();
			b.group(bossGroup, workerGroup)
				.channel(NioServerSocketChannel.class)
				.option(ChannelOption.SO_BACKLOG, 100)
				.handler(new LoggingHandler(LogLevel.INFO))
				.childHandler(new ChannelInitializer<SocketChannel>() {
					@Override
					public void initChannel(SocketChannel ch) {
						ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
						ch.pipeline().addLast(new ProtobufDecoder(SubscribeReqProto.SubscribeReq.getDefaultInstance()));
						ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
						ch.pipeline().addLast(new ProtobufEncoder());
						ch.pipeline().addLast(new SubReqServerHandler());
					}
				});
			
			//綁定端口,同步等待成功
			ChannelFuture f = b.bind(port).sync();
			
			//等待服務端監聽端口關閉
			f.channel().closeFuture().sync();
		} finally {
			//優雅退出,釋放線程池資源
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	}
	
	public static void main(String[] args) throws Exception {
		int port = 8080;
		if(args != null && args.length > 0) {
			try {
				port = Integer.valueOf(args[0]);
			} catch (NumberFormatException e) {
				
			}
		}
		new SubReqServer().bind(port);
	}
}

@Sharable
public class SubReqServerHandler extends ChannelHandlerAdapter {

	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		SubscribeReqProto.SubscribeReq req = (SubscribeReq) msg;
		if("Lilinfeng".equalsIgnoreCase(req.getUserName())) {
			System.out.println("Service accept client subscribe req : [" + req + "]");
			ctx.writeAndFlush(resp(req.getSubReqID()));
		}
	}
	
	private SubscribeRespProto.SubscribeResp resp(int subReqID) {
		SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp.newBuilder();
		builder.setSubReqID(subReqID);
		builder.setRespCode(0);
		builder.setDesc("Netty book order succeed, 3 days later, sent to the designated address");
		return builder.build();
	}
	
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		cause.printStackTrace();
		ctx.close();
	}
}

       因爲使用了ProtobufEncoder,因此不須要對SubscribeRespProto.SubscribeResp進行手工編碼。另外,ProtobufDecoder已經對消息進行了自動解碼,所以接收到的請求消息能夠直接使用。

客戶端代碼以下:

public class SubReqClient {

	public void connect(int port, String host) throws Exception {
		//配置客戶端NIO線程組
		EventLoopGroup group = new NioEventLoopGroup();
		try {
			Bootstrap b = new Bootstrap();
			b.group(group).channel(NioSocketChannel.class)
				.option(ChannelOption.TCP_NODELAY, true)
				.handler(new ChannelInitializer<SocketChannel>() {
					@Override
					protected void initChannel(SocketChannel ch) throws Exception {
						ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
						ch.pipeline().addLast(new ProtobufDecoder(SubscribeRespProto.SubscribeResp.getDefaultInstance()));
						ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
						ch.pipeline().addLast(new ProtobufEncoder());
						ch.pipeline().addLast(new SubReqClientHandler());
					}
				});
			
			//發起異步鏈接操做
			ChannelFuture f =b.connect(host, port).sync();
			
			//等待客戶端鏈路關閉
			f.channel().closeFuture().sync();
		} finally {
			//優雅退出,釋放NIO線程組
			group.shutdownGracefully();
		}
	}
	
	public static void main(String[] args) throws Exception {
		int port = 8080;
		if (args!=null && args.length > 0) {
			try{
				port = Integer.valueOf(args[0]);
 			} catch(NumberFormatException e) {
 				//use default value
 			}
		}
		new SubReqClient().connect(port, "127.0.0.1");
	}
}

public class SubReqClientHandler extends ChannelHandlerAdapter {

	
	public SubReqClientHandler() {
		super();
		// TODO Auto-generated constructor stub
	}

	public void channelActive(ChannelHandlerContext ctx) {
		for(int i=0;i<10;i++) {
			ctx.write(subReq(i));
		}
		ctx.flush();
	}
	
	private SubscribeReqProto.SubscribeReq subReq(int i) {
		SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder();
		builder.setSubReqID(i);
		builder.setUserName("Lilinfeng");
		builder.setProductName("Netty Book For Protobuf");
		List<String> address = new ArrayList<>();
		address.add("NanJing YuHuaTai");
		address.add("BeiJing LiuLiChang");
		address.add("ShenZhen HongShuLin");
		builder.setAddress(address.toString());
		return builder.build();
	}
	
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		System.out.println("Receive server response : [" + msg + "]");
	}
	
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		ctx.flush();
	}
	
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
		ctx.close();
	}

}

       客戶端接收到服務端的應答消息以後會直接打印,按照統計,應該打印10次。下面就測試下Protobuf的服務端和客戶端,看它是否能正常運行。

       能夠看出利用Netty提供的Protobuf編解碼能力,咱們再不須要了解Protobuf實現和使用細節的狀況下就能輕鬆支持Protobuf編解碼,能夠方便地實現跨語言的遠程服務調用和與周邊的異構系統進行通訊對接。

本節完結。

相關文章
相關標籤/搜索