Netty5入門學習筆記001

Netty官網:http://netty.io/
java

本例程使用最新的netty5.x版本編寫bootstrap

服務器端: 服務器

TimeServer 時間服務器 服務端接收客戶端的鏈接請求和查詢當前時間的指令,判斷指令正確後響應返回當前服務器的校準時間。網絡

package c1;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * server 有粘包問題
 * @author xwalker
 */
public class TimeServer {
	public void bind(int port) throws Exception {
		// 服務器線程組 用於網絡事件的處理 一個用於服務器接收客戶端的鏈接
		// 另外一個線程組用於處理SocketChannel的網絡讀寫
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			// NIO服務器端的輔助啓動類 下降服務器開發難度
			ServerBootstrap serverBootstrap = new ServerBootstrap();
			serverBootstrap.group(bossGroup, workerGroup)
					.channel(NioServerSocketChannel.class)// 相似NIO中serverSocketChannel
					.option(ChannelOption.SO_BACKLOG, 1024)// 配置TCP參數
					.childHandler(new ChildChannelHandler());// 最後綁定I/O事件的處理類
																// 處理網絡IO事件

			// 服務器啓動後 綁定監聽端口 同步等待成功 主要用於異步操做的通知回調 回調處理用的ChildChannelHandler
			ChannelFuture f = serverBootstrap.bind(port).sync();
			System.out.println("timeServer啓動");
			// 等待服務端監聽端口關閉
			f.channel().closeFuture().sync();

		} finally {
			// 優雅退出 釋放線程池資源
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
			System.out.println("服務器優雅的釋放了線程資源...");
		}

	}

	/**
	 * 網絡事件處理器
	 */
	private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
		@Override
		protected void initChannel(SocketChannel ch) throws Exception {
			ch.pipeline().addLast(new TimeServerHandler());
		}

	}

	public static void main(String[] args) throws Exception {
		int port = 8000;
		new TimeServer().bind(port);
	}

}

TimerServer接收到客戶端的鏈接和讀寫請求後交給處理器handler進行事件的響應處理,服務器定義兩組線程組,一組用來處理客戶端鏈接,一組用來處理網絡IO事件(SocketChannel)的響應,NioEventLoopGroup是Netty提供的NIO線程組,實際上就是Java NIO中的Reactor線程組。異步

ServerBootstrap是Netty提供的用於NIO服務端輔助啓動類,下降了NIO服務端的開發複雜度。socket

ServerBootstrap須要綁定服務器網絡IO事件的處理類ChildChannelHandler ,用於實際處理具體的IO事件,例如記錄日誌,對消息編解碼等。ide

TimeServerHandler須要繼承Netty提供的適配器ChannelhandlerAdapter重寫channelRead等方法完成消息的讀寫。oop

package c1;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.util.Date;
/**
 * server端網絡IO事件處理
 * @author xwalker
 *
 */
public class TimeServerHandler extends ChannelHandlerAdapter {
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg)
			throws Exception {
		System.out.println("服務器讀取到客戶端請求...");
		ByteBuf buf=(ByteBuf) msg;
		byte[] req=new byte[buf.readableBytes()];
		buf.readBytes(req);
		String body=new String(req,"UTF-8");
		System.out.println("the time server receive order:"+body);
		String curentTime="QUERY TIME ORDER".equalsIgnoreCase(body)?new Date(System.currentTimeMillis()).toString():"BAD ORDER";
		ByteBuf resp=Unpooled.copiedBuffer(curentTime.getBytes());
		ctx.write(resp);
		System.out.println("服務器作出了響應");
	}
	
	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		ctx.flush();
		System.out.println("服務器readComplete 響應完成");
	}
	
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
			throws Exception {
		ctx.close();
		System.out.println("服務器異常退出"+cause.getMessage());
	}
}

服務器經過handler接收和處理消息請求,channelRead中的msg就是客戶端請求的消息,經過解碼獲取具體信息後根據消息格式和定義完成後續的響應。學習

ByteBuf是netty封裝和擴展的java NIO中的ByteBuffer類,功能更完善。經過ByteBuf接收和解碼msg 轉成String類型 而後判斷命令是都準確,根據結果作出響應。spa

客戶端:

客戶端的處理比較簡單,啓動客戶端,連接服務器成功後發送時間查詢的指令,等待服務器響應。

package c1;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
 * client 存在TCP粘包問題
 * @author xwlaker
 *
 */
public class TimeClient {
	/**
	 * 鏈接服務器
	 * @param port
	 * @param host
	 * @throws Exception
	 */
	public void connect(int port, String host) throws Exception {
		//配置客戶端NIO線程組
		EventLoopGroup group = new NioEventLoopGroup();
		try {
			//客戶端輔助啓動類 對客戶端配置
			Bootstrap b = new Bootstrap();
			b.group(group).channel(NioSocketChannel.class)
					.option(ChannelOption.TCP_NODELAY, true)
					.handler(new ChannelInitializer<SocketChannel>() {
						@Override
						protected void initChannel(SocketChannel ch)
								throws Exception {
							ch.pipeline().addLast(new TimeClientHandler());
						}
					});
			//異步連接服務器 同步等待連接成功
			ChannelFuture f = b.connect(host, port).sync();
			//等待連接關閉
			f.channel().closeFuture().sync();

		} finally {
			group.shutdownGracefully();
			System.out.println("客戶端優雅的釋放了線程資源...");
		}

	}

	public static void main(String[] args) throws Exception {
		new TimeClient().connect(8000, "127.0.0.1");
	}

}

客戶端定義一組線程組用於處理與服務器的網絡IO事件。經過客戶端輔助啓動類 Bootstrap來配置線程組、TCP參數以及IO事件處理的Handler。

package c1;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.util.logging.Logger;
/**
 * Client 網絡IO事件處理
 * @author xwalker
 *
 */
public class TimeClientHandler extends ChannelHandlerAdapter {
	private static final Logger logger=Logger.getLogger(TimeClientHandler.class.getName());
	private  ByteBuf firstMessage;
	public TimeClientHandler(){
		byte[] req ="QUERY TIME ORDER".getBytes();
		firstMessage=Unpooled.buffer(req.length);
		firstMessage.writeBytes(req);
	}
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		ctx.writeAndFlush(firstMessage);
		System.out.println("客戶端active");
	}
	
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg)
			throws Exception {
		System.out.println("客戶端收到服務器響應數據");
		ByteBuf buf=(ByteBuf) msg;
		byte[] req=new byte[buf.readableBytes()];
		buf.readBytes(req);
		String body=new String(req,"UTF-8");
		System.out.println("Now is:"+body);
		
	}
	
	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		ctx.flush();
		System.out.println("客戶端收到服務器響應數據處理完成");
	}
	
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
			throws Exception {
		logger.warning("Unexpected exception from downstream:"+cause.getMessage());
		ctx.close();
		System.out.println("客戶端異常退出");
	}
}

TimeClienthandler繼承Netty提供的Handler適配器,重寫channelActive和channelRead方法 前者通道打開active狀態時 發送查詢指令,後者接收服務器響應的消息並解碼輸出。

運行結果:

客戶端啓動後首先處理器channelActive被調用發送查詢指令,服務器端接收到查詢指令後返回了當前時間,客戶端接收到服務器響應後解碼輸出當前時間。

JFinal經典入門到精通課程


Netty5入門學習筆記002-TCP粘包/拆包問題的解決之道(上)

Netty5入門學習筆記003-TCP粘包/拆包問題的解決之道(下)

相關文章
相關標籤/搜索