NIO、Netty(Netty基礎)

1、概述

Netty是一個Java的開源框架。提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。java

Netty是一個NIO客戶端,服務端框架。容許快速簡單的開發網絡應用程序。例如:服務端和客戶端之間的協議,它簡化了網絡編程規範。算法

 

2、NIO開發的問題

一、NIO類庫和API複雜,使用麻煩。編程

二、須要具有Java多線程編程能力(涉及到Reactor模式)。後端

三、客戶端斷線重連、網絡不穩定、半包讀寫、失敗緩存、網絡阻塞和異常碼流等問題處理難度很是大數組

四、存在部分BUG緩存

 

NIO進行服務器開發的步驟:安全

一、建立ServerSocketChannel,配置爲非阻塞模式;服務器

二、綁定監聽,配置TCP參數;網絡

三、建立一個獨立的IO線程,用於輪詢多路複用器Selector;多線程

四、建立Selector,將以前建立的ServerSocketChannel註冊到Selector上,監聽Accept事件;

五、啓動IO線程,在循環中執行Select.select()方法,輪詢就緒的Channel;

六、當輪詢處處於就緒狀態的Channel時,須要對其進行判斷,若是是OP_ACCEPT狀態,說明有新的客戶端接入,則調用ServerSocketChannel.accept()方法接受新的客戶端;

七、設置新接入的客戶端鏈路SocketChannel爲非阻塞模式,配置TCP參數;

八、將SocketChannel註冊到Selector上,監聽READ事件;

九、若是輪詢的Channel爲OP_READ,則說明SocketChannel中有新的準備就緒的數據包須要讀取,則構造ByteBuffer對象,讀取數據包;

十、若是輪詢的Channel爲OP_WRITE,則說明還有數據沒有發送完成,須要繼續發送。

3、Netty的優勢

一、API使用簡單,開發門檻低;

二、功能強大,預置了多種編解碼功能,支持多種主流協議;

三、定製功能強,能夠經過ChannelHandler對通訊框架進行靈活的擴展;

四、性能高,經過與其餘業界主流的NIO框架對比,Netty綜合性能最優;

五、成熟、穩定,Netty修復了已經發現的NIO全部BUG;

六、社區活躍;

七、經歷了不少商用項目的考驗。

/**
 * 服務端
 */
public class TimeServer {

	public static void main(String[] args) throws Exception {
		int port=8080; //服務端默認端口
		new TimeServer().bind(port);
	}
	
	public void bind(int port) throws Exception{
		//1用於服務端接受客戶端的鏈接
		EventLoopGroup acceptorGroup = new NioEventLoopGroup();
		//2用於進行SocketChannel的網絡讀寫
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			//Netty用於啓動NIO服務器的輔助啓動類
			ServerBootstrap sb = new ServerBootstrap();
			//將兩個NIO線程組傳入輔助啓動類中
			sb.group(acceptorGroup, workerGroup)
				//設置建立的Channel爲NioServerSocketChannel類型
				.channel(NioServerSocketChannel.class)
				//配置NioServerSocketChannel的TCP參數
				.option(ChannelOption.SO_BACKLOG, 1024)
				//設置綁定IO事件的處理類
				.childHandler(new ChannelInitializer<SocketChannel>() {
					//建立NIOSocketChannel成功後,在進行初始化時,將它的ChannelHandler設置到ChannelPipeline中,用於處理網絡IO事件
					@Override
					protected void initChannel(SocketChannel arg0) throws Exception {
						
						arg0.pipeline().addLast(new TimeServerHandler());
					}
				});
			//綁定端口,同步等待成功(sync():同步阻塞方法,等待bind操做完成才繼續)
			//ChannelFuture主要用於異步操做的通知回調
			ChannelFuture cf = sb.bind(port).sync();
			System.out.println("服務端啓動在8080端口。");
			//等待服務端監聽端口關閉
			cf.channel().closeFuture().sync();
		} finally {
			//優雅退出,釋放線程池資源
			acceptorGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	}
}
/**
 * 服務端channel
 */
public class TimeServerHandler extends ChannelHandlerAdapter {

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		ByteBuf buf = (ByteBuf) msg;
		//buf.readableBytes():獲取緩衝區中可讀的字節數;
		//根據可讀字節數建立數組
		byte[] req = new byte[buf.readableBytes()];
		buf.readBytes(req);
		String body = new String(req, "UTF-8");
		System.out.println("The time server(Thread:"+Thread.currentThread()+") receive order : "+body);
		String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
		
		ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
		//將待發送的消息放到發送緩存數組中
		ctx.writeAndFlush(resp);
	}
}
/**
 * 客戶端
 */
public class TimeClient {
	public static void main(String[] args) throws Exception {
		int port=8080; //服務端默認端口
		new TimeClient().connect(port, "127.0.0.1");
	}
	public void connect(int port, String host) throws Exception{
		//配置客戶端NIO線程組
		EventLoopGroup group = new NioEventLoopGroup();
		try {
			Bootstrap bs = new Bootstrap();
			bs.group(group)
				.channel(NioSocketChannel.class)
				.option(ChannelOption.TCP_NODELAY, true)
				.handler(new ChannelInitializer<SocketChannel>() {
					@Override
					//建立NIOSocketChannel成功後,在進行初始化時,將它的ChannelHandler設置到ChannelPipeline中,用於處理網絡IO事件
					protected void initChannel(SocketChannel arg0) throws Exception {
						arg0.pipeline().addLast(new TimeClientHandler());
					}
				});
			//發起異步鏈接操做
			ChannelFuture cf = bs.connect(host, port).sync();
			//等待客戶端鏈路關閉
			cf.channel().closeFuture().sync();
		} finally {
			//優雅退出,釋放NIO線程組
			group.shutdownGracefully();
		}
	}
}
/**
 * 客戶端channel
 */
public class TimeClientHandler extends ChannelHandlerAdapter {

	@Override
	//向服務器發送指令
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		for (int i = 0; i < 1; i++) {
			byte[] req = "QUERY TIME ORDER".getBytes();
			ByteBuf firstMessage = Unpooled.buffer(req.length);
			firstMessage.writeBytes(req);
			ctx.writeAndFlush(firstMessage);
		}
	}

	@Override
	//接收服務器的響應
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		ByteBuf buf = (ByteBuf) msg;
		//buf.readableBytes():獲取緩衝區中可讀的字節數;
		//根據可讀字節數建立數組
		byte[] req = new byte[buf.readableBytes()];
		buf.readBytes(req);
		String body = new String(req, "UTF-8");
		System.out.println("Now is : "+body);
	}

	@Override
	//異常處理
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		//釋放資源
		ctx.close();
	}
	
}

4、粘包/拆包問題

TCP是一個「流」協議,所謂流,就是沒有界限的一串數據。能夠想象爲河流中的水,並無分界線。TCP底層並不瞭解上層業務數據的具體含義,它會根據TCP緩衝區的實際狀況進行包的劃分,因此在業務上認爲,一個完整的包可能會被TCP拆分紅多個包進行發送,也有可能把多個小的包封裝成一個大的數據包發送,這就是所謂的TCP粘包和拆包問題。

TCP粘包拆包問題示例圖:

假設客戶端分別發送了兩個數據包D1和D2給服務端,因爲服務端一次讀取到的字節數是不肯定的,可能存在如下4種狀況。

一、服務端分兩次讀取到了兩個獨立的數據包,分別是D1和D2,沒有粘包和拆包;

二、服務端一次接收到了兩個數據包,D1和D2粘合在一塊兒,被稱爲TCP粘包;

三、服務端分兩次讀取到了兩個數據包,第一次讀取到了完整的D1包和D2包的部份內容,第二次讀取到了D2包的剩餘部份內容,這被稱爲TCP拆包;

四、服務端分兩次讀取到了兩個數據包,第一次讀取到了D1包的部份內容D1_1,第二次讀取到了D1包的剩餘內容D1_1和D2包的完整內容;

若是此時服務器TCP接收滑窗很是小,而數據包D1和D2比較大,頗有可能發生第五種狀況,既服務端分屢次才能將D1和D2包接收徹底,期間發生屢次拆包;

問題的解決策略

因爲底層的TCP沒法理解上層的業務數據,因此在底層是沒法保證數據包不被拆分和重組的,這個問題只能經過上層的應用協議棧設計來解決,根據業界的主流協議的解決方案可概括以下:

一、消息定長,例如每一個報文的大小爲固定長度200字節,若是不夠,空位補空格;

二、在包尾增長回車換行符進行分割,例如FTP協議;

三、將消息分爲消息頭和消息體,消息頭中包含消息總長度(或消息體總長度)的字段,一般設計思路爲消息頭的第一個字段使用int32來表示消息的總程度;

四、更復雜的應用層協議;

LineBasedFrameDecoder

爲了解決TCP粘包/拆包致使的半包讀寫問題,Netty默認提供了多種編解碼器用於處理半包。

LinkeBasedFrameDecoder的工做原理是它一次遍歷ByteBuf中的可讀字節,判斷看是否有「\n」、「\r\n」,若是有,就一次位置爲結束位置,從可讀索引到結束位置區間的字節就組成一行。它是以換行符爲結束標誌的編解碼,支持攜帶結束符或者不攜帶結束符兩種解碼方式,同時支持配置單行的最大長度。若是連續讀取到最大長度後任然沒有發現換行符,就會拋出異常,同時忽略掉以前讀到的異常碼流。

/**
 * 服務端 
 */
public class TimeServer {

	public static void main(String[] args) throws Exception {
		int port=8080; //服務端默認端口
		new TimeServer().bind(port);
	}
	public void bind(int port) throws Exception{
		//Reactor線程組
		//1用於服務端接受客戶端的鏈接
		EventLoopGroup acceptorGroup = new NioEventLoopGroup();
		//2用於進行SocketChannel的網絡讀寫
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			//Netty用於啓動NIO服務器的輔助啓動類
			ServerBootstrap sb = new ServerBootstrap();
			//將兩個NIO線程組傳入輔助啓動類中
			sb.group(acceptorGroup, workerGroup)
				//設置建立的Channel爲NioServerSocketChannel類型
				.channel(NioServerSocketChannel.class)
				//配置NioServerSocketChannel的TCP參數
				.option(ChannelOption.SO_BACKLOG, 1024)
				//設置綁定IO事件的處理類
				.childHandler(new ChannelInitializer<SocketChannel>() {
					@Override
					protected void initChannel(SocketChannel arg0) throws Exception {
						//處理粘包/拆包問題
						arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
						arg0.pipeline().addLast(new StringDecoder());
						
						arg0.pipeline().addLast(new TimeServerHandler());
					}
				});
			//綁定端口,同步等待成功(sync():同步阻塞方法)
			//ChannelFuture主要用於異步操做的通知回調
			ChannelFuture cf = sb.bind(port).sync();
				
			//等待服務端監聽端口關閉
			cf.channel().closeFuture().sync();
		} finally {
			//優雅退出,釋放線程池資源
			acceptorGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	}
}
/**
 * 服務端channel
 */
public class TimeServerHandler extends ChannelHandlerAdapter {

	private int counter;
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//		ByteBuf buf = (ByteBuf) msg;
//		//buf.readableBytes():獲取緩衝區中可讀的字節數;
//		//根據可讀字節數建立數組
//		byte[] req = new byte[buf.readableBytes()];
//		buf.readBytes(req);
//		String body = new String(req, "UTF-8");
		String body = (String) msg;
		System.out.println("The time server(Thread:"+Thread.currentThread()+") receive order : "+body+". the counter is : "+ ++counter);
		String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
		currentTime = currentTime + System.getProperty("line.separator");
		
		ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
		//將待發送的消息放到發送緩存數組中
		ctx.writeAndFlush(resp);
	}

}
/**
 * 客戶端
 */
public class TimeClient {
	public static void main(String[] args) throws Exception {
		int port=8080; //服務端默認端口
		new TimeClient().connect(port, "127.0.0.1");
	}
	public void connect(int port, String host) throws Exception{
		//配置客戶端NIO線程組
		EventLoopGroup group = new NioEventLoopGroup();
		try {
			Bootstrap bs = new Bootstrap();
			bs.group(group)
				.channel(NioSocketChannel.class)
				.option(ChannelOption.TCP_NODELAY, true)
				.handler(new ChannelInitializer<SocketChannel>() {
					@Override
					//建立NIOSocketChannel成功後,在進行初始化時,將它的ChannelHandler設置到ChannelPipeline中,用於處理網絡IO事件
					protected void initChannel(SocketChannel arg0) throws Exception {
						//處理粘包/拆包問題
						arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
						arg0.pipeline().addLast(new StringDecoder());
						
						arg0.pipeline().addLast(new TimeClientHandler());
					}
				});
			//發起異步鏈接操做
			ChannelFuture cf = bs.connect(host, port).sync();
			//等待客戶端鏈路關閉
			cf.channel().closeFuture().sync();
		} finally {
			//優雅退出,釋放NIO線程組
			group.shutdownGracefully();
		}
	}
}
/**
 * 客戶端channel
 */
public class TimeClientHandler extends ChannelHandlerAdapter {

	private int counter;
	private byte[] req;
	
	@Override
	//向服務器發送指令
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		ByteBuf message=null;
		//模擬一百次請求,發送重複內容
		for (int i = 0; i < 200; i++) {
			req = ("QUERY TIME ORDER"+System.getProperty("line.separator")).getBytes();
			message=Unpooled.buffer(req.length);
			message.writeBytes(req);
			ctx.writeAndFlush(message);
		}
		
	}

	@Override
	//接收服務器的響應
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//		ByteBuf buf = (ByteBuf) msg;
//		//buf.readableBytes():獲取緩衝區中可讀的字節數;
//		//根據可讀字節數建立數組
//		byte[] req = new byte[buf.readableBytes()];
//		buf.readBytes(req);
//		String body = new String(req, "UTF-8");
		String body = (String) msg;
		System.out.println("Now is : "+body+". the counter is : "+ ++counter);
	}

	@Override
	//異常處理
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		//釋放資源
		ctx.close();
	}
	
}

DelimiterBasedFrameDecoder

實現自定義分隔符做爲消息的結束標誌,完成解碼。

/**
 * 服務端
 */
public class TimeServer {
	public static void main(String[] args) throws Exception {
		int port=8080; //服務端默認端口
		new TimeServer().bind(port);
	}

	public void bind(int port) throws Exception{
		//Reactor線程組
		//1用於服務端接受客戶端的鏈接
		EventLoopGroup acceptorGroup = new NioEventLoopGroup();
		//2用於進行SocketChannel的網絡讀寫
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			//Netty用於啓動NIO服務器的輔助啓動類
			ServerBootstrap sb = new ServerBootstrap();
			//將兩個NIO線程組傳入輔助啓動類中
			sb.group(acceptorGroup, workerGroup)
				//設置建立的Channel爲NioServerSocketChannel類型
				.channel(NioServerSocketChannel.class)
				//配置NioServerSocketChannel的TCP參數
				.option(ChannelOption.SO_BACKLOG, 1024)
				//設置綁定IO事件的處理類
				.childHandler(new ChannelInitializer<SocketChannel>() {
					@Override
					protected void initChannel(SocketChannel arg0) throws Exception {
						//處理粘包/拆包問題
						ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
						arg0.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
						arg0.pipeline().addLast(new StringDecoder());
						arg0.pipeline().addLast(new TimeServerHandler());
					}
				});
			//綁定端口,同步等待成功(sync():同步阻塞方法)
			//ChannelFuture主要用於異步操做的通知回調
			ChannelFuture cf = sb.bind(port).sync();
				
			//等待服務端監聽端口關閉
			cf.channel().closeFuture().sync();
		} finally {
			//優雅退出,釋放線程池資源
			acceptorGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	}
}
/**
 * 服務端channel
 */
public class TimeServerHandler extends ChannelHandlerAdapter {

	private int counter;
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		String body = (String) msg;
		System.out.println("The time server(Thread:"+Thread.currentThread()+") receive order : "+body+". the counter is : "+ ++counter);
		String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
		currentTime += "$_";
		
		ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
		//將待發送的消息放到發送緩存數組中
		ctx.writeAndFlush(resp);
	}
}
/**
 * 客戶端
 */
public class TimeClient {
	public static void main(String[] args) throws Exception {
		int port=8080; //服務端默認端口
		new TimeClient().connect(port, "127.0.0.1");
	}
	public void connect(int port, String host) throws Exception{
		//配置客戶端NIO線程組
		EventLoopGroup group = new NioEventLoopGroup();
		try {
			Bootstrap bs = new Bootstrap();
			bs.group(group)
				.channel(NioSocketChannel.class)
				.option(ChannelOption.TCP_NODELAY, true)
				.handler(new ChannelInitializer<SocketChannel>() {
					@Override
					//建立NIOSocketChannel成功後,在進行初始化時,將它的ChannelHandler設置到ChannelPipeline中,用於處理網絡IO事件
					protected void initChannel(SocketChannel arg0) throws Exception {
						//處理粘包/拆包問題
						ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
						arg0.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
						arg0.pipeline().addLast(new StringDecoder());
						
						arg0.pipeline().addLast(new TimeClientHandler());
					}
				});
			//發起異步鏈接操做
			ChannelFuture cf = bs.connect(host, port).sync();
			//等待客戶端鏈路關閉
			cf.channel().closeFuture().sync();
		} finally {
			//優雅退出,釋放NIO線程組
			group.shutdownGracefully();
		}
	}
}
/**
 * 客戶端channel
 */
public class TimeClientHandler extends ChannelHandlerAdapter {
	
	private int counter;
	private byte[] req;
	
	@Override
	//向服務器發送指令
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		ByteBuf message=null;
		//模擬一百次請求,發送重複內容
		for (int i = 0; i < 200; i++) {
			req = ("QUERY TIME ORDER"+"$_").getBytes();
			message=Unpooled.buffer(req.length);
			message.writeBytes(req);
			ctx.writeAndFlush(message);
		}
		
	}

	@Override
	//接收服務器的響應
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		String body = (String) msg;
		System.out.println("Now is : "+body+". the counter is : "+ ++counter);
	}

	@Override
	//異常處理
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		//釋放資源
		ctx.close();
	}
	
}

FixedLengthFrameDecoder

是固定長度解碼器,可以按照指定的長度對消息進行自動解碼,開發者不須要考慮TCP的粘包/拆包問題。

 

5、Netty的高性能

一、異步非阻塞通訊

在IO編程過程當中,當須要同時處理多個客戶端接入請求時,能夠利用多線程或者IO多路複用技術進行處理。IO多路複用技術經過把多個IO的阻塞複用到同一個Selector的阻塞上,從而使得系統在單線程的狀況下能夠同時處理多個客戶端請求。與傳統的多線程/多進程模型相比,IO多路複用的最大優點是系統開銷小,系統不須要建立新的額外進程或者線程,也不須要維護這些進程和線程的運行,下降了系統的維護工做量,節省了系統資源。

Netty的IO線程NioEventLoop因爲聚合了多路複用器Selector,能夠同時併發處理成百上千個客戶端SocketChannel。因爲讀寫操做都是非阻塞的,這就能夠充分提高IO線程的運行效率,避免由頻繁的IO阻塞致使的線程掛起。另外,因爲Netty採用了異步通訊模式,一個IO線程能夠併發處理N個客戶端鏈接和讀寫操做,這從根本上解決了傳統同步阻塞IO中 一鏈接一線程模型,架構的性能、彈性伸縮能力和可靠性都獲得了極大的提高。

 

二、高效的Reactor線程模型

經常使用的Reactor線程模型有三種,分別以下:

1.Reactor單線程模型;

2.Reactor多線程模型;

3.主從Reactor多線程模型;

 

Reactor單線程模型,指的是全部的IO操做都在同一個NIO線程上面完成,NIO線程職責以下:

一、做爲NIO服務端,接收客戶端的TCP鏈接;

二、做爲NIO客戶端,向服務端發起TCP鏈接;

三、讀取通訊對端的請求或者應答消息;

四、向通訊對端發送請求消息或者應答消息;

 

因爲Reactor模式使用的是異步非阻塞IO,全部的IO操做都不會致使阻塞,理論上一個線程能夠獨立處理全部IO相關操做。從架構層面看,一個NIO線程確實能夠完成其承擔的職責。例如,經過Acceptor接收客戶端的TCP鏈接請求消息,鏈路創建成功以後,經過Dispatch將對應的ByteBuffer派發到指定的Handler上進行消息編碼。用戶Handler能夠經過NIO線程將消息發送給客戶端。

對於一些小容量應用場景,可使用單線程模型,可是對於高負載、大併發的應用卻不合適,主要緣由以下:

一、一個NIO線程同時處理成百上千的鏈路,性能上沒法支撐。即使NIO線程的CPU負荷達到100%,也沒法知足海量消息的編碼、解碼、讀取和發送;

二、當NIO線程負載太重後,處理速度將變慢,這會致使大量客戶端鏈接超時,超時以後每每會進行重發,這更加劇了NIO線程的負載,最終會致使大量消息積壓和處理超時,NIO線程會成爲系統的性能瓶頸;

三、可靠性問題。一旦NIO線程意外進入死循環,會致使整個系統通訊模塊不可用,不能接收和處理外部消息,形成節點故障。

爲了解決這些問題,從而演進出了Reactor多線程模型。

 

Reactor多線程模型與單線程模型最大的區別就是有一組NIO線程處理IO操做,特色以下:

一、有一個專門的NIO線程——Acceptor線程用於監聽服務端,接收客戶端TCP鏈接請求;

二、網絡IO操做——讀、寫等由一個NIO線程池負責,線程池能夠採用標準的JDK線程池實現,它包含一個任務隊列和N個可用的線程,由這些NIO線程負責消息的讀取、編碼、解碼和發送;

三、1個NIO線程能夠同時處理N條鏈路,可是1個鏈路只對應1個NIO線程,防止發生併發操做問題。

在絕大多數場景下,Reactor多線程模型均可以知足性能需求;可是,在極特殊應用場景中,一個NIO線程負責監聽和處理全部的客戶端鏈接可能會存在性能問題。例如百萬客戶端併發鏈接,或者服務端須要對客戶端的握手消息進行安全認證,認證自己很是損耗性能。在這類場景下,單獨一個Acceptor線程可能會存在性能不足問題,爲了解決性能問題,產生了第三種Reactor線程模型——主從Reactor多線程模型。

 

主從Reactor線程模型的特色是:服務端用於接收客戶端鏈接的再也不是一個單獨的NIO線程,而是一個獨立的NIO線程池。Acceptor接收到客戶端TCP鏈接請求處理完成後(可能包含接入認證等),將新建立的SocketChannel註冊到IO線程池(subReactor線程池)的某個IO線程上,由它負責SocketChannel的讀寫和編解碼工做。Acceptor線程池只用於客戶端的登陸、握手和安全認證,一旦鏈路創建成功,就將鏈路註冊到後端subReactor線程池的IO線程上,由IO線程負責後續的IO操做。

利用主從NIO線程模型,能夠解決1個服務端監聽線程沒法有效處理全部客戶端鏈接的性能不足問題。Netty官方推薦使用該線程模型。它的工做流程總結以下:

一、從主線程池中隨機選擇一個Reactor線程做爲Acceptor線程,用於綁定監聽端口,接收客戶端鏈接;

二、Acceptor線程接收客戶端鏈接請求以後,建立新的SocketChannel,將其註冊到主線程池的其餘Reactor線程上,由其負責接入認證、IP黑白名單過濾、握手等操做;

三、而後也業務層的鏈路正式創建成功,將SocketChannel從主線程池的Reactor線程的多路複用器上摘除,從新註冊到Sub線程池的線程上,用於處理IO的讀寫操做。

 

三、無化的串行設計

在大多數場景下,並行多線程處理能夠提高系統的併發性能。可是,若是對於共享資源的併發訪問處理不當,會帶來嚴重的鎖競爭,這最終會致使性能的降低。爲了儘量地避免鎖競爭帶來的性能損耗,能夠經過串行化設計,既消息的處理儘量在同一個線程內完成,期間不進行線程切換,這樣就避免了多線程競爭和同步鎖。

爲了儘量提高性能,Netty採用了串行無鎖化設計,在IO線程內部進行串行操做,避免多線程競爭致使的性能降低。表面上看,串行化設計彷佛CPU利用率不高,併發程度不夠。可是,經過調整NIO線程池的線程參數,能夠同時啓動多個串行化的線程並行運行,這種局部無鎖化的串行線程設計相比一個隊列——多個工做線程模型性能更優。

Netty串行化設計工做原理圖以下:

Netty的NioEventLoop讀取到消息後,直接調用ChannelPipeline的fireChannelRead(Object msg),只要用戶不主動切換線程,一直會由NioEventLoop調用到用戶的Handler,期間不進行線程切換。這種串行化處理方式避免了多線程致使的鎖競爭,從性能角度看是最優的。

 

四、高效的併發編程

Netty高效併發編程主要體現

一、volatile的大量、正確使用;

二、CAS和原子類的普遍使用;

三、線程安全容器的使用;

四、經過讀寫鎖提高併發性能。

 

五、高性能的序列化框架

    影響序列化性能的關鍵因素總結以下:

    一、序列化後的碼流大小(網絡寬帶的佔用);

    二、序列化與反序列化的性能(CPU資源佔用);

    三、是否支持跨語言(異構系統的對接和開發語言切換)。

    Netty默認提供了對GoogleProtobuf的支持,經過擴展Netty的編解碼接口,用戶能夠實現其餘的高性能序列化框架

 

六、零拷貝

    Netty的「零拷貝」主要體如今三個方面:

    1)、Netty的接收和發送ByteBuffer採用DIRECT BUFFERS,使用堆外直接內存進行Socket讀寫,不須要進行字節緩衝區的二次拷貝。若是使用傳統的堆內存(HEAP BUFFERS)進行Socket讀寫,JVM會將堆內存Buffer拷貝一份到直接內存中,而後才寫入Socket中。相比於堆外直接內存,消息在發送過程當中多了一次緩衝區的內存拷貝。

    2)、第二種「零拷貝 」的實現CompositeByteBuf,它對外將多個ByteBuf封裝成一個ByteBuf,對外提供統一封裝後的ByteBuf接口。

    3)、第三種「零拷貝」就是文件傳輸,Netty文件傳輸類DefaultFileRegion經過transferTo方法將文件發送到目標Channel中。不少操做系統直接將文件緩衝區的內容發送到目標Channel中,而不須要經過循環拷貝的方式,這是一種更加高效的傳輸方式,提高了傳輸性能,下降了CPU和內存佔用,實現了文件傳輸的「零拷貝」。

        

七、內存池

    隨着JVM虛擬機和JIT即時編譯技術的發展,對象的分配和回收是個很是輕量級的工做。可是對於緩衝區Buffer,狀況卻稍有不一樣,特別是對於堆外直接內存的分配和回收,是一件耗時的操做。爲了儘可能重用緩衝區,Netty提供了基於內存池的緩衝區重用機制。

  

八、靈活的TCP參數配置能力

    Netty在啓動輔助類中能夠靈活的配置TCP參數,知足不一樣的用戶場景。合理設置TCP參數在某些場景下對於性能的提高能夠起到的顯著的效果,總結一下對性能影響比較大的幾個配置項:

    1)、SO_RCVBUF和SO_SNDBUF:一般建議值爲128KB或者256KB;

    2)、TCP_NODELAY:NAGLE算法經過將緩衝區內的小封包自動相連,組成較大的封包,阻止大量小封包的發送阻塞網絡,從而提升網絡應用效率。可是對於時延敏感的應用場景須要關閉該優化算法;

    3)、軟中斷:若是Linux內核版本支持RPS(2.6.35以上版本),開啓RPS後能夠實現軟中斷,提高網絡吞吐量。RPS根據數據包的源地址,目的地址以及目的和源端口,計算出一個hash值,而後根據這個hash值來選擇軟中斷運行的CPU。從上層來看,也就是說將每一個鏈接和CPU綁定,並經過這個hash值,來均衡軟中斷在多個CPU上,提高網絡並行處理性能。

相關文章
相關標籤/搜索