netty 學習 (3)發送對象

Netty中,通信的雙方創建鏈接後,會把數據按照ByteBuf的方式進行傳輸,例如http協議中,就是經過HttpRequestDecoder對ByteBuf數據流進行處理,轉換成http的對象。基於這個思路,我自定義一種通信協議:Server和客戶端直接傳輸java對象。 java

實現的原理是經過Encoder把java對象轉換成ByteBuf流進行傳輸,經過Decoder把ByteBuf轉換成java對象進行處理,處理邏輯以下圖所示: apache

使用的jar包: bootstrap

使用的log4j.xml文件: app

<?xml version="1.0"?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">

<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
    <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
        <layout class="org.apache.log4j.PatternLayout">
            <param name="ConversionPattern" value="[%-5p] [%d] [%t] [%c] %m%n"/>
        </layout>
    </appender>
    
    <appender name="FILE" class="org.apache.log4j.DailyRollingFileAppender">
        <param name="File" value="./log/netty.log"/>
        <layout class="org.apache.log4j.PatternLayout">
            <param name="ConversionPattern" value="[%-5p] [%d] [%t] [%c] %m%n"/>
        </layout>
    </appender>
    
    <appender name="FILE_ERR" class="org.apache.log4j.DailyRollingFileAppender">
        <param name="File" value="./log/netty_err.log"/>
        <param name="Threshold" value="ERROR" />
        <layout class="org.apache.log4j.PatternLayout">
            <param name="ConversionPattern" value="[%-5p] [%d] [%t] [%c] %m%n"/>
        </layout>
    </appender>	    
    
	<logger name="io.netty" additivity="false">
		<level value="INFO,DEBUG" />
		<appender-ref ref="FILE" />
		<appender-ref ref="FILE_ERR" />
		<appender-ref ref="CONSOLE" />
	</logger>
	<logger name="com.yao" additivity="false">
		<level value="INFO,DEBUG" />
		<appender-ref ref="FILE" />
		<appender-ref ref="FILE_ERR" />
		<appender-ref ref="CONSOLE" />
	</logger>
    
    <root>
    	
      <level value="debug"/>
        <appender-ref ref="FILE"/>
        <appender-ref ref="CONSOLE"/>
        <appender-ref ref="FILE_ERR" />
    </root>

</log4j:configuration>



傳輸的java bean爲Person: socket

package com.yao.nettyobject;

import java.io.Serializable;

// 必須實現Serializable接口
public class Person implements Serializable{
	private static final long	serialVersionUID	= 1L;
	private String	name;
	private String	sex;
	private int		age;

	public String toString() {
		return "name:" + name + " sex:" + sex + " age:" + age;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public String getSex() {
		return sex;
	}

	public void setSex(String sex) {
		this.sex = sex;
	}

	public int getAge() {
		return age;
	}

	public void setAge(int age) {
		this.age = age;
	}
}


Server端類:Server PersonDecoder BusinessHandler ide

一、Server:啓動netty服務 工具

package com.yao.nettyobject;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class Server {
	public void start(int port) throws Exception {
		EventLoopGroup bossGroup = new NioEventLoopGroup(); 
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			ServerBootstrap b = new ServerBootstrap(); 
			b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) 
					.childHandler(new ChannelInitializer<SocketChannel>() { 
								@Override
								public void initChannel(SocketChannel ch) throws Exception {
									//解碼
									ch.pipeline().addLast(new PersonDecoder());
									//業務處理
									ch.pipeline().addLast(new BusinessHandler());
								}
							}).option(ChannelOption.SO_BACKLOG, 128) 
					.childOption(ChannelOption.SO_KEEPALIVE, true); 

			ChannelFuture f = b.bind(port).sync(); 

			f.channel().closeFuture().sync();
		} finally {
			workerGroup.shutdownGracefully();
			bossGroup.shutdownGracefully();
		}
	}

	public static void main(String[] args) throws Exception {
		Server server = new Server();
		server.start(8000);
	}
}



二、PersonDecoder:把ByteBuf流轉換成Person對象,其中ByteBufToBytes是讀取ButeBuf的工具類,上一篇文章中提到過,在此不在詳述。ByteObjConverter是byte和obj的互相轉換的工具。 oop

package com.yao.nettyobject;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;


public class PersonDecoder extends ByteToMessageDecoder {
	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
		ByteBufToBytes read = new ByteBufToBytes();
		Object obj = ByteObjConverter.byteToObject(read.read(in));
		out.add(obj);
	}

}



三、BusinessHandler 讀取Person信息,並打印
package com.yao.nettyobject;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class BusinessHandler extends ChannelInboundHandlerAdapter {
	private Log	logger	= LogFactory.getLog(BusinessHandler.class);

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		Person person = (Person) msg;
		logger.info("BusinessHandler read msg from client :" + person);
	}

	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		ctx.flush();
	}
	
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		
	}
}



Client端的類:Client ClientInitHandler PersonEncoder 學習

一、Client 創建與Server的鏈接 this

package com.yao.nettyobject;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class Client {
	public void connect(String host, int port) throws Exception {
		EventLoopGroup workerGroup = new NioEventLoopGroup();

		try {
			Bootstrap b = new Bootstrap(); 
			b.group(workerGroup); 
			b.channel(NioSocketChannel.class); 
			b.option(ChannelOption.SO_KEEPALIVE, true); 
			b.handler(new ChannelInitializer<SocketChannel>() {
				@Override
				public void initChannel(SocketChannel ch) throws Exception {
					//編碼
					ch.pipeline().addLast(new PersonEncoder());
					//
					ch.pipeline().addLast(new ClientInitHandler());
				}
			});

			ChannelFuture f = b.connect(host, port).sync();
			f.channel().closeFuture().sync();
		} finally {
			workerGroup.shutdownGracefully();
		}

	}

	public static void main(String[] args) throws Exception {
		Client client = new Client();
		client.connect("127.0.0.1", 8000);
	}
}



二、ClientInitHandler 向Server發送Person對象

package com.yao.nettyobject;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class ClientInitHandler extends ChannelInboundHandlerAdapter {
	private static Log	logger	= LogFactory.getLog(ClientInitHandler.class);
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		logger.info("HelloClientIntHandler.channelActive");
		Person person = new Person();
		person.setName("yaokj");
		person.setSex("man");
		person.setAge(30);
		ctx.write(person);
		ctx.flush();
	}
}



三、PersonEncoder 把Person對象轉換成ByteBuf進行傳送

package com.yao.nettyobject;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class PersonEncoder extends MessageToByteEncoder<Person> {

	@Override
	protected void encode(ChannelHandlerContext ctx, Person msg, ByteBuf out) throws Exception {
		byte[] datas = ByteObjConverter.objectToByte(msg);
		out.writeBytes(datas);
		ctx.flush();
	}
}



工具類:ByteObjConverter

package com.yao.nettyobject;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

public class ByteObjConverter {

	public static Object byteToObject(byte[] bytes) {
		Object obj = null;
		ByteArrayInputStream bi = new ByteArrayInputStream(bytes);
		ObjectInputStream oi = null;
		try {
			oi = new ObjectInputStream(bi);
			obj = oi.readObject();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			try {
				bi.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
			try {
				oi.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		return obj;
	}

	public static byte[] objectToByte(Object obj) {
		byte[] bytes = null;
		ByteArrayOutputStream bo = new ByteArrayOutputStream();
		ObjectOutputStream oo = null;
		try {
			oo = new ObjectOutputStream(bo);
			oo.writeObject(obj);
			bytes = bo.toByteArray();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			try {
				bo.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
			try {
				oo.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		return bytes;
	}
}



工具類:ByteBufToBytes

package com.yao.nettyobject;

import io.netty.buffer.ByteBuf;

public class ByteBufToBytes {

	public byte[] read(ByteBuf datas) {
		byte[] bytes = new byte[datas.readableBytes()];
		datas.readBytes(bytes);
		return bytes;
	}
}



經過上述代碼,實現了Server端與Client端直接使用person對象進行通訊的目的。基於此,能夠構建更爲複雜的場景:Server端同時支撐多種協議,不一樣的協議採用不一樣的Decoder進行解析,解析結果保持統一,這樣業務處理類能夠保持接口一致。下一節將編寫這樣一個案例。

本例中須要注意的事項是:

一、Person對象必須實現Serializable接口,不然不能進行序列化。

二、PersonDecoder讀取ByteBuf數據的時候,並無對屢次流式數據進行處理,而是簡單的一次性接收,若是數據量大的狀況下,可能會出現數據不完整,這個問題會在後續的學習中解決。

相關文章
相關標籤/搜索