Netty源碼分析 服務器端1


Netty是由JBOSS提供的一個java開源框架。Netty提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。html

作過NIO開發的人都遇到不少難以解決的問題,好比半、粘包,超時,安全性、性能等等,Netty成功的解決了原始NIO開發過程當中遇到的各類問題。那麼Netty的內部框架是如何解決這些問題呢?本文初步介紹Netty服務器端開發源碼,而且從源碼分析的角度來分析下各個步驟都作了什麼。java

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
 * @author lilinfeng
 * @date 2014年2月14日
 * @version 1.0
 */
public class TimeServer {
public void bind(int port) throws Exception {
// 配置服務端的NIO線程組
EventLoopGroup bossGroup = new NioEventLoopGroup();
   EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
       ServerBootstrap b = new ServerBootstrap();
       b.group(bossGroup, workerGroup)
          .channel(NioServerSocketChannel.class)
          .option(ChannelOption.SO_BACKLOG, 1024)
          .childHandler(new ChildChannelHandler());
// 綁定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();
// 等待服務端監聽端口關閉
f.channel().closeFuture().sync();
   } finally {
// 優雅退出,釋放線程池資源
bossGroup.shutdownGracefully();
       workerGroup.shutdownGracefully();
   }
    }
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
       arg0.pipeline().addLast(new TimeServerHandler());
   }
    }
/**
     * @param args
* @throws Exception
     */
public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
      port = Integer.valueOf(args[0]);
       } catch (NumberFormatException e) {
// 採用默認值
}
   }
new TimeServer().bind(port);
    }
}
Netty框架的最大特色就是寫起來很是的簡單,這也是他牛叉的緣由。
第一個方法就是一個簡單的對象實例化:
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();


NioEventLoopGroup是Nio線程組。Netty是基於Nio線程池模型實現的。Netty服務器端生成了兩個線程組。第一個bossGroup用於接收鏈接的線程組,第二個是用於處理鏈接請求的線程組,包括讀、寫、業務邏輯等。
第二個方法是啓動類實例化。看到boot我以爲window啓動是否是也用到這個詞兒了,呲牙!
git

ServerBootstrap b = new ServerBootstrap();
第三個方法就是將新生成的兩個線程組組成一組。說白了就是將其賦值給了group、childGroup兩個成員變量中。

b.group(bossGroup, workerGroup)
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
    }
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
    }
this.childGroup = childGroup;
return this;
}

第四步:指定通道類型,服務器端的通道類型是NioServerSocketChannel  .channel(NioServerSocketChannel.class)github

 public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
   }
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
bootstrap

}promise

這塊實例化了一個反射工廠,後面會使用這個工廠去實例化channel安全

第五步:設置TCP的配置項。
       .option(ChannelOption.SO_BACKLOG, 1024)
源碼來看將參數放置到了AbstractBootstrap中的變量options中去了。
第六步:添加事件處理句柄。
         .childHandler(new ChildChannelHandler());*/
public ServerBootstrap childHandler(ChannelHandler childHandler) {
if (childHandler == null) {
throw new NullPointerException("childHandler");
   }
this.childHandler = childHandler;
return this;
服務器

}網絡

到此,咱們也沒有發現Netty的神奇之處,其實Netty的重頭戲只簡簡單單的一行代碼:框架

// 綁定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();咱們來翻翻源碼看看這句話有多神奇.*/
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}第一層咱們看出他最終返回了一個ChannelFuture ,異步調用的神器。他是netty異步調用接口,實現了java的異步調用類java.util.concurrent.Future<V>
第二層:public ChannelFuture bind(SocketAddress localAddress) {
//還記得剛剛的初始化麼,若是有異常這個方法會拋出    validate();
//檢查ip地址是否配置
if (localAddress == null) {
throw new NullPointerException("localAddress");
   }
return doBind(localAddress);

}

第三層:

private ChannelFuture doBind(final SocketAddress localAddress) {
//初始化和註冊渠道  (1)final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
   }

if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
   } else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
       regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
               Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                   // IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
               } else {
// Registration was successful, so set the correct executor to use.
                   // See https://github.com/netty/netty/issues/2586
promise.executor = channel.eventLoop();
               }
doBind0(regFuture, channel, localAddress, promise);
           }
       });
return promise;
   }

}

第三層開始就到了關鍵的步驟了:初始化和註冊通道final ChannelFuture initAndRegister() {

//還記得剛纔的渠道工廠麼,到這裏使用了這個工廠數理化了一個渠道。(工廠模式)

final Channel channel = channelFactory().newChannel();
try {//初始化渠道
       init(channel);
   } catch (Throwable t) {
       channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
   }
//註冊channel 註冊到了parent線程組中了
   ChannelFuture regFuture = group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
           channel.close();
       } else {
           channel.unsafe().closeForcibly();
       }
   }
工廠模式使用了反射的方法去實例化了一個channel
@Override
public T newChannel() {
try {
return clazz.newInstance();
   } catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
   }

}

初始化渠道代碼以下:void init(Channel channel) throws Exception {

//獲取你編碼設置的option配置參數
final Map<ChannelOption<?>, Object> options = options();

synchronized (options) {  

  //將配置參數設置到了渠道參數中 SctpServerChannelConfig config;

       channel.config().setOptions(options);
   }
final Map<AttributeKey<?>, Object> attrs = attrs();

synchronized (attrs) {

//設置channle屬性

for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
           AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
           channel.attr(key).set(e.getValue());
       }
   }//

   ChannelPipeline p = channel.pipeline();
if (handler() != null) {

//句柄放到管道      

p.addLast(handler());

   }

final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
       currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
   }
synchronized (childAttrs) {
       currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
   }

   p.addLast(new ChannelInitializer<Channel>() {
@Override

public void initChannel(Channel ch) throws Exception {

//將服務端註冊的handler放置到pipeline中。    

  ch.pipeline().addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
       }
   });
}

註冊渠道。

ChannelFuture regFuture = group().register(channel);

// AbstractNioChannel

protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(((NioEventLoop) eventLoop().unwrap()).selector, 0, this);
return;
       } catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
               // cached and not removed because no Select.select(..) operation was called yet.
((NioEventLoop) eventLoop().unwrap()).selectNow();
               selected = true;
           } else {
// We forced a select operation on the selector before but the SelectionKey is still cached
               // for whatever reason. JDK bug ?
throw e;
           }
       }
   }
}
註冊成功後,執行doBind0(regFuture, channel, localAddress, promise);
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {

// This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
   // the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {

if (regFuture.isSuccess()) {

//渠道綁定監聽事件

channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
           } else {
promise.setFailure(regFuture.cause());
           }
       }
   });

}

到此netty服務器端就實現了渠道初始化和註冊。若是渠道初始化和註冊都成功,接下來的任務就是創建監聽,等待客戶端的鏈接請求。

if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {

// This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
   // the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {

if (regFuture.isSuccess()) {

//打開渠道監聽事件,並註冊了一個關閉渠道的監聽事件。

channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
           } else {
promise.setFailure(regFuture.cause());
           }
       }
   });
}
參考文獻

https://github.com/code4craft/netty-learning

http://netty.io/wiki/index.html

相關文章
相關標籤/搜索