Netty是由JBOSS提供的一個java開源框架。Netty提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。html
作過NIO開發的人都遇到不少難以解決的問題,好比半、粘包,超時,安全性、性能等等,Netty成功的解決了原始NIO開發過程當中遇到的各類問題。那麼Netty的內部框架是如何解決這些問題呢?本文初步介紹Netty服務器端開發源碼,而且從源碼分析的角度來分析下各個步驟都作了什麼。java
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * @author lilinfeng * @date 2014年2月14日 * @version 1.0 */ public class TimeServer { public void bind(int port) throws Exception { // 配置服務端的NIO線程組 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChildChannelHandler()); // 綁定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); // 等待服務端監聽端口關閉 f.channel().closeFuture().sync(); } finally { // 優雅退出,釋放線程池資源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel arg0) throws Exception { arg0.pipeline().addLast(new TimeServerHandler()); } } /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 採用默認值 } } new TimeServer().bind(port); } }Netty框架的最大特色就是寫起來很是的簡單,這也是他牛叉的緣由。
EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup();
NioEventLoopGroup是Nio線程組。Netty是基於Nio線程池模型實現的。Netty服務器端生成了兩個線程組。第一個bossGroup用於接收鏈接的線程組,第二個是用於處理鏈接請求的線程組,包括讀、寫、業務邏輯等。
第二個方法是啓動類實例化。看到boot我以爲window啓動是否是也用到這個詞兒了,呲牙!git
ServerBootstrap b = new ServerBootstrap();第三個方法就是將新生成的兩個線程組組成一組。說白了就是將其賦值給了group、childGroup兩個成員變量中。
b.group(bossGroup, workerGroup) public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { super.group(parentGroup); if (childGroup == null) { throw new NullPointerException("childGroup"); } if (this.childGroup != null) { throw new IllegalStateException("childGroup set already"); } this.childGroup = childGroup; return this; }
第四步:指定通道類型,服務器端的通道類型是NioServerSocketChannel .channel(NioServerSocketChannel.class)github
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
bootstrap
}promise
這塊實例化了一個反射工廠,後面會使用這個工廠去實例化channel安全
第五步:設置TCP的配置項。
.option(ChannelOption.SO_BACKLOG, 1024)
源碼來看將參數放置到了AbstractBootstrap中的變量options中去了。
第六步:添加事件處理句柄。
.childHandler(new ChildChannelHandler());*/
public ServerBootstrap childHandler(ChannelHandler childHandler) {
if (childHandler == null) {
throw new NullPointerException("childHandler");
}
this.childHandler = childHandler;
return this;
服務器
}網絡
到此,咱們也沒有發現Netty的神奇之處,其實Netty的重頭戲只簡簡單單的一行代碼:框架
// 綁定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();咱們來翻翻源碼看看這句話有多神奇.*/
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}第一層咱們看出他最終返回了一個ChannelFuture ,異步調用的神器。他是netty異步調用接口,實現了java的異步調用類java.util.concurrent.Future<V>
第二層:public ChannelFuture bind(SocketAddress localAddress) {
//還記得剛剛的初始化麼,若是有異常這個方法會拋出 validate();
//檢查ip地址是否配置
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
第三層:
private ChannelFuture doBind(final SocketAddress localAddress) {
//初始化和註冊渠道 (1)final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.executor = channel.eventLoop();
}
doBind0(regFuture, channel, localAddress, promise);
}
});
return promise;
}
}
第三層開始就到了關鍵的步驟了:初始化和註冊通道final ChannelFuture initAndRegister() {
//還記得剛纔的渠道工廠麼,到這裏使用了這個工廠數理化了一個渠道。(工廠模式)
final Channel channel = channelFactory().newChannel();
try {//初始化渠道
init(channel);
} catch (Throwable t) {
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
//註冊channel 註冊到了parent線程組中了
ChannelFuture regFuture = group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
工廠模式使用了反射的方法去實例化了一個channel
@Override
public T newChannel() {
try {
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
初始化渠道代碼以下:void init(Channel channel) throws Exception {
//獲取你編碼設置的option配置參數
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
//將配置參數設置到了渠道參數中 SctpServerChannelConfig config;
channel.config().setOptions(options);
}
final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
//設置channle屬性
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}//
ChannelPipeline p = channel.pipeline();
if (handler() != null) {
//句柄放到管道
p.addLast(handler());
}
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
//將服務端註冊的handler放置到pipeline中。
ch.pipeline().addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
註冊渠道。
ChannelFuture regFuture = group().register(channel);
// AbstractNioChannel
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(((NioEventLoop) eventLoop().unwrap()).selector, 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
((NioEventLoop) eventLoop().unwrap()).selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
註冊成功後,執行doBind0(regFuture, channel, localAddress, promise);
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
//渠道綁定監聽事件
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
到此netty服務器端就實現了渠道初始化和註冊。若是渠道初始化和註冊都成功,接下來的任務就是創建監聽,等待客戶端的鏈接請求。
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
//打開渠道監聽事件,並註冊了一個關閉渠道的監聽事件。
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
參考文獻
https://github.com/code4craft/netty-learning
http://netty.io/wiki/index.html