Netty 自定義協議01

1:客戶端對數據進行編碼後,發送到服務端。服務端接受請求,後解碼數據並展現數據。java

2:服務端接收到數據後,再次編碼發送響應數據到客戶端,客戶端再次解碼,顯示數據。dom

協議約定1:ide

客戶端發送到服務端this

| version | content-length | serviceID | content |

    4+4+36+12=56個字節

協議約定2:編碼

服務端發送到客戶端.net

|serviceID | StateCode |

壹:消息數據

1:客戶端發送服務端數據指針

public class PrivateHeader {
	
	//協議內容
	private int version;
	
	//消息內容長度
	private int contentLength;
	
	//服務ID
	private String serviceID;
	
	public PrivateHeader() {
		super();
	}

	public PrivateHeader(int version, int contentLength, String serviceID) {
		super();
		this.version = version;
		this.contentLength = contentLength;
		this.serviceID = serviceID;
	}

	public int getVersion() {
		return version;
	}

	public void setVersion(int version) {
		this.version = version;
	}

	public int getContentLength() {
		return contentLength;
	}

	public void setContentLength(int contentLength) {
		this.contentLength = contentLength;
	}

	public String getServiceID() {
		return serviceID;
	}

	public void setServiceID(String serviceID) {
		this.serviceID = serviceID;
	}

	[@Override](https://my.oschina.net/u/1162528)
	public String toString() {
		return "PrivateHeader [version=" + version + ", contentLength=" + contentLength + ", serviceID=" + serviceID
				+ "]";
	}

}

	/**
 * 
 */
package com.xulei.netty.privateProtocol.message;

import java.io.Serializable;

/**
 * 說明:
 * [@author](https://my.oschina.net/arthor) 徐磊
 * [@version](https://my.oschina.net/u/931210) 1.0
 * [@date](https://my.oschina.net/u/2504391) 2018年3月29日
 */
public class PrivateMseeage implements Serializable{
	

	private static final long serialVersionUID = -2410007993462707636L;

	private PrivateHeader header;
	
	private String content;
	
	public PrivateMseeage() {
		super();
	}

	public PrivateMseeage(PrivateHeader header, String content) {
		super();
		this.header = header;
		this.content = content;
	}

	public PrivateHeader getHeader() {
		return header;
	}

	public void setHeader(PrivateHeader header) {
		this.header = header;
	}

	public String getContent() {
		return content;
	}

	public void setContent(String content) {
		this.content = content;
	}

	[@Override](https://my.oschina.net/u/1162528)
	public String toString() {
		return "PrivateMseeage: [version=" + header.getVersion() + ", contentLength=" + header.getContentLength()+", serviceID=" + header.getServiceID() 
		+",content="+content+ "]";
	}
	
	
}

2:服務端發送客戶端數據netty

/**
 * 說明:服務端發送響應數據
 * @author 徐磊
 * @version 1.0
 * @date 2018年3月29日
 */
public class ResponseMsg implements Serializable{
	
	private static final long serialVersionUID = -5386025614193196923L;

	//服務ID
	private String serviceID;
	
	//狀態碼
	private int stateCode;
	

	public ResponseMsg() {
		super();
	}

	public ResponseMsg(String serviceID, int stateCode) {
		super();
		this.serviceID = serviceID;
		this.stateCode = stateCode;
	}

	public String getServiceID() {
		return serviceID;
	}

	public void setServiceID(String serviceID) {
		this.serviceID = serviceID;
	}

	public int getStateCode() {
		return stateCode;
	}

	public void setStateCode(int stateCode) {
		this.stateCode = stateCode;
	}

	@Override
	public String toString() {
		return "ResponseMsg :[serviceID=" + serviceID + ", stateCode=" + stateCode + "]";
	}
	
}


/**
 * 說明:
 * @author 徐磊
 * @version 1.0
 * @date 2018年3月29日
 */
public interface StateCode {
	/**
	 * 成功
	 */
	public static int SUCCESS  = 0;
	
	/**
	 * 失敗
	 */
	public static int FAIL  =  1;
}

二:解碼編碼

1:請求解碼器code

public class RequestDecoder extends  ByteToMessageDecoder{


	private Logger logger=LoggerFactory.getLogger(RequestDecoder.class);
	/**
	 * 數據包基本長度
	 */
	private static int count=0;
	
	private static int BASE_LENTH=4+4+36;
	
	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
		
		
		//可讀長度必需要大於數據包的基本長度
		if(in.readableBytes() < BASE_LENTH){
			return;
		}
		int beginReader=in.readerIndex();
		logger.info("讀指針,第一次讀取的標誌位是:"+beginReader);
		logger.info("服務端解碼器調用次數爲:"+ ++count);
		int version = in.readInt();
		
		int contentLength = in.readInt();
		
		byte[] service=new byte[36];
		
		in.readBytes(service);
		
		String serviceID=new String(service);
		
		//組裝協議頭
		PrivateHeader ph=new PrivateHeader(version, contentLength, serviceID);
		logger.info("可讀字節數目是:"+in.readableBytes());
		//查看剩下的可讀字節是否知足實際內容的長度,不知足,直接重置
		if(in.readableBytes() < contentLength){
			in.readerIndex(beginReader);
			
			return;
		}
		//開始讀取這個
		byte[] content= new byte[contentLength];
		in.readBytes(content);
		PrivateMseeage pm=new PrivateMseeage(ph, new String(content));
		out.add(pm);
	}

}

2:請求解碼器server

/**
  • 說明:請求編碼器,客戶端把數據進行編碼,發送到服務端

  • @author 徐磊

  • @version 1.0

  • @date 2018年3月29日 */ public class RequestEncoder extends MessageToByteEncoder<PrivateMseeage> {

    /**

    • | version | content-length | serviceID | content | */ @Override protected void encode(ChannelHandlerContext ctx, PrivateMseeage msg, ByteBuf out) throws Exception {

      PrivateHeader header = msg.getHeader();

      out.writeInt(header.getVersion());

      out.writeInt(header.getContentLength());

      out.writeBytes(header.getServiceID().getBytes());

      out.writeBytes(msg.getContent().getBytes());

    }

}

3:響應解碼器

/**
 * 說明:響應解碼器,客戶端接收到服務端發送的響應後,進行的解碼
 * @author 徐磊
 * @version 1.0
 * @date 2018年3月29日
 */
public class ResponseDecoder extends ByteToMessageDecoder{
	
	private Logger logger=LoggerFactory.getLogger(ResponseDecoder.class);
	
	
	private static int count=0;
	/**
	 *  | serviceID | stateCode | 
	 */
	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
		
		
		
		int beginIndex = in.readerIndex();
		
		logger.info("讀指針,第一次讀取的標誌位是:"+beginIndex);
		
		if(in.readableBytes() < 40){
			
			logger.error("服務端發送的消息長度不夠!!!");
		}
		
		
		byte[] service=new byte[36];
		
		in.readBytes(service);
		
		String serviceID = new String(service);
		
		int stateCode = in.readInt();
		ResponseMsg res=new ResponseMsg(serviceID, stateCode);
		
		logger.info("客戶端解碼器調用次數爲:"+ ++count);
		out.add(res);
		
	}

}

4:響應編碼器

public class ResponseEncoder extends MessageToByteEncoder<ResponseMsg>{

	/**
	 *  | serviceID | stateCode | 
	 */
	@Override
	protected void encode(ChannelHandlerContext ctx, ResponseMsg msg, ByteBuf out) throws Exception {
		
		
		out.writeBytes(msg.getServiceID().getBytes());
		
		out.writeInt(msg.getStateCode());
		
	}

}

叄:主要程序

1:客戶端

@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		
		
		
		int version=1;
		
		String serviceID=UUID.randomUUID().toString();
		
		String content="hello"+"-------11";
		
		PrivateHeader header=new PrivateHeader(version, content.length(), serviceID);
		
		PrivateMseeage pm=new PrivateMseeage(header, content);
		
		
		ctx.writeAndFlush(pm);
}

@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		
		ResponseMsg remsg=(ResponseMsg) msg;
		
		logger.info("服務端 --》 客戶端   消息內容是: "+remsg);
		
		
	}

/**
 * 說明:
 * @author 徐磊
 * @version 1.0
 * @date 2018年3月29日
 */
public class PPclientInitialHandler extends ChannelInitializer<SocketChannel>{

	@Override
	protected void initChannel(SocketChannel ch) throws Exception {
		
		ChannelPipeline pipeline = ch.pipeline();
		
		//客戶端發送請求,開始進行編碼處理
		pipeline.addLast("encoder", new RequestEncoder());
		
		//客戶端接收響應 進行對應解碼
		pipeline.addLast("decoder", new ResponseDecoder());
		
		//客戶端實際處理類
		pipeline.addLast(new PPclientHandler());
		
	}

}

2:服務端主程序

public class PPinitialServerHandler extends ChannelInitializer<SocketChannel> {

	@Override
	protected void initChannel(SocketChannel ch) throws Exception {
		
		
		ChannelPipeline pipeline = ch.pipeline();
		//服務端接收到消息後,開始進行解碼
		pipeline.addLast("decoder", new RequestDecoder());
		
		//服務端把返回數據進行編碼後,再次發送到客戶端
		pipeline.addLast("encoder",new ResponseEncoder());
		
		//服務端處理數據
		
		pipeline.addLast(new PPservertHandler());
		
	}



}

業務處理類

/**
	 * 說明:
	 * @author 徐磊
	 * @version 1.0
	 * @date 2018年3月29日
	 */
	public class PPservertHandler extends ChannelInboundHandlerAdapter {
		
		private Logger logger=LoggerFactory.getLogger(PPservertHandler.class);
		
		
		//服務端接收到消息
		@Override
		public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
			
			PrivateMseeage ps=(PrivateMseeage)msg;
			
			logger.info("客戶端 --》 服務端  消息內容是: "+ps.toString());
			
			/**
			 *  |serviceID | StateCode |
			 */
			
			ResponseMsg remsg=new ResponseMsg();
			
			remsg.setServiceID(ps.getHeader().getServiceID());
			
			
			if(ps.getHeader().getVersion()==1){
				remsg.setStateCode(StateCode.SUCCESS);
			}else{
				remsg.setStateCode(StateCode.FAIL);
			}
			
			
			ctx.writeAndFlush(remsg);
			
		}

Ⅳ:結果

1:服務端

客戶端 --》 服務端  消息內容是: PrivateMseeage: [version=1, contentLength=14, serviceID=358dc080-4787-46cf-9dcc-881fba29c032,content=hello-------11]

2:客戶端

服務端 --》 客戶端   消息內容是: ResponseMsg :[serviceID=358dc080-4787-46cf-9dcc-881fba29c032, stateCode=0]
相關文章
相關標籤/搜索