Reactor pattern(反應器模式)是用於處理經過一個或多個輸入同時傳遞給服務器的服務請求的事件處理模式。服務處理程序複用傳入的請求,並將它們同步分派給關聯的handler。 關鍵幾點:java
負責響應事件,將事件分發綁定了該事件的Handler處理。對應netty 的NioEventLoop.run(),processSelectedKeys()。react
事件處理器,綁定了某類事件,負責執行對應事件的任務對事件進行處理。對應netty的IdleStateHandler等。promise
Acceptor屬於handler中的一種,由於更加特殊,獨立出來說,是reactor的事件接收類,負責初始化selector和接收緩衝隊列。對應netty的ServerBootstrapAcceptor。bash
bossGroup = new NioEventLoopGroup();
workGroup = new NioEventLoopGroup(4);
複製代碼
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
children = new SingleThreadEventExecutor[nThreads];
...
for (int i = 0; i < nThreads; i ++) {
...
children[i] = newChild(threadFactory, args);
...
}
}
複製代碼
@Override
protected EventExecutor newChild(
ThreadFactory threadFactory, Object... args) throws Exception {
return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
}
複製代碼
在這裏建立了mainReactor和subReactor線程池,而且建立了eventLoop線程服務器
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
super(parent, threadFactory, false);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
provider = selectorProvider;
selector = openSelector();
}
複製代碼
每一個eventLoop線程都會有自身的selector,這裏的eventLoop線程還未啓動,後面啓動以後,會執行run()裏面的selector.select。socket
ChannelFuture regFuture = group().register(channel); 這裏的group()是bossGroupide
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
複製代碼
next()執行函數
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
複製代碼
private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
@Override
public EventExecutor next() {
return children[childIndex.getAndIncrement() & children.length - 1];
}
}
複製代碼
從線程池中取出第一個eventLoopoop
@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
...
channel.unsafe().register(this, promise);
return promise;
}
複製代碼
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new OneTimeTask() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
}
複製代碼
這裏將mainReactor的eventLoop和服務器的NioServerSocketChannel綁定。 由於剛開始啓動是main線程,執行eventLoop.execute,在這裏mainReactor只啓動了一個線程。ui
@Override
public void execute(Runnable task) {
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
...
}
...
}
複製代碼
在execute中執行startThread(),正式啓動mainReactor線程循環,而且將register0(promise)這個Task加入taskQueue中,讓mainReactor循環執行。
private void register0(ChannelPromise promise) {
doRegister();
neverRegistered = false;
registered = true;
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (firstRegistration && isActive()) {
pipeline.fireChannelActive();
}
}
複製代碼
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
...
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
...
}
}
複製代碼
這裏將mainReactor 中eventLoop的selector註冊一個爲0的操做監聽位,並將服務器的NioServerSocketChannel綁定到mainSubReactor線程上 在doBind()-->doBind0()-->channel.bind()-->…-->next.invokeBind()-->HeadContext. Bind()-->unsafe.bind()-->pipeline.fireChannelActive()-->channel.read()-->…-->doBeginRead()中修改成OP_ACCEPT(16)操做監聽位。
@Override
protected void doBeginRead() throws Exception {
…
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
複製代碼
自此,mainReactor的eventLoop從run開始循環執行selector.select。
注:readInterestOp的值來自於建立NioServerSocketChannel的構造函數
複製代碼
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
複製代碼
在收到客戶端鏈接後,將會在ServerBootstrapAcceptor中把客戶端的Channel註冊在subReactor線程上,並將這個channel綁定到subReactor線程的selector上,監聽客戶端channel的OP_READ事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
複製代碼
當監聽到客戶端鏈接,執行服務器AbstractNioUnsafe的read();
@Override
public void read() {
...
int localRead = doReadMessages(readBuf);
...
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
...
pipeline.fireChannelReadComplete();
...
}
複製代碼
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();
...
buf.add(new NioSocketChannel(this, ch));
...
}
複製代碼
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
複製代碼
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
複製代碼
設置客戶端channel監聽位的值爲OP_READ(1)
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
for (Entry<ChannelOption<?>, Object> e: childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
}
複製代碼
ServerBootstrapAcceptor不只將subReactor綁定客戶端channel,還爲客戶端channel進行一些參數的初始化
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
複製代碼
和上面的register同樣,只是將mainReactor線程池改成了subReactor線程池。 在這裏將從subReactor線程池中取一個線程的selector與客戶端channel綁定,並監聽該客戶端0事件。
@Override
public ChannelPipeline fireChannelReadComplete() {
head.fireChannelReadComplete();
if (channel.config().isAutoRead()) {
read();
}
return this;
}
複製代碼
read()-->tail.read()-->next.invokeRead()-->HeadContext. read()-->…--> doBeginRead()
@Override
protected void doBeginRead() throws Exception {
...
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
複製代碼
在這裏將監聽位改成OP_READ(1)
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
...
}
複製代碼
進到NioByteUnsafe的read()方法
@Override
public final void read() {
...
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
...
byteBuf = allocHandle.allocate(allocator);
...
pipeline.fireChannelRead(byteBuf);
...
}
複製代碼
@Override
public ChannelPipeline fireChannelRead(Object msg) {
head.fireChannelRead(msg);
return this;
}
複製代碼
private void invokeChannelRead(Object msg) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
複製代碼
public class InBoundHandlerB extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InBoundHandlerB: " + msg);
super.channelRead(ctx, msg);
}
}
複製代碼
在這裏將接收到客戶端消息處理
Reactor線程池有多少個,就會建立多少個selector, mainReactor的eventLoop會與服務器的channel綁定,並只關注服務器channel的ACCEPT事件,subReactor的eventLoop會與客戶端的channel綁定,並只關注客戶端channel的READ事件。
mainReactor和subReactor循環各自的selector,mainReactor會循環ACCEPT事件的selector,subReactor會循環READ事件的selector,mainReactor接受到客戶端鏈接後,會執行ServerBootstrapAcceptor的channelRead方法,將客戶端鏈接與subReactor綁定。