// 初始化netty服務端 private void init(String hostToBind, int portToBind) { // io模式,有兩種選項NIO, EPOLL IOMode ioMode = IOMode.valueOf(conf.ioMode()); // 建立bossGroup和workerGroup,即主線程組合子線程組 EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server"); EventLoopGroup workerGroup = bossGroup; // 緩衝分配器,分爲堆內存和直接內存 PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator( conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads()); // 建立一個netty服務端引導對象,並設置相關參數 bootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NettyUtils.getServerChannelClass(ioMode)) .option(ChannelOption.ALLOCATOR, allocator) .childOption(ChannelOption.ALLOCATOR, allocator); // 內存使用的度量對象 this.metrics = new NettyMemoryMetrics( allocator, conf.getModuleName() + "-server", conf); // 排隊的鏈接數 if (conf.backLog() > 0) { bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog()); } // socket接收緩衝區大小 if (conf.receiveBuf() > 0) { bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf()); } // socket發送緩衝區大小 if (conf.sendBuf() > 0) { bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf()); } // 子channel處理器 bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { RpcHandler rpcHandler = appRpcHandler; for (TransportServerBootstrap bootstrap : bootstraps) { rpcHandler = bootstrap.doBootstrap(ch, rpcHandler); } context.initializePipeline(ch, rpcHandler); } }); InetSocketAddress address = hostToBind == null ? new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind); // 綁定到ip地址和端口 channelFuture = bootstrap.bind(address); // 同步等待綁定成功 channelFuture.syncUninterruptibly(); port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort(); logger.debug("Shuffle server started on port: {}", port); }
public ChannelFuture bind(SocketAddress localAddress) {
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
private ChannelFuture doBind(final SocketAddress localAddress) { // 建立一個channel,並對這個channel作一些初始化工做 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); // 將這個channel綁定到指定的地址 doBind0(regFuture, channel, localAddress, promise); return promise; } else {// 對於還沒有註冊成功的狀況,採用異步的方式,即添加一個回調 // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
public NioServerSocketChannel() {
public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }
void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); // 設置參數 synchronized (options) { setChannelOptions(channel, options, logger); } // 設置屬性 final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); // 子channel的group和handler參數 final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); } // 添加處理器 p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); // 通常狀況下,對於ServerBootstrap用戶無需設置handler ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } // 這裏添加了一個關鍵的handler,而且順手啓動了對應的EventLoop的線程 ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
// 處理read和accept事件 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); }
public void read() { // 確認當前代碼的執行是在EventLoop的線程中 assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { // 這裏讀取到的是創建的鏈接對應的channel, // jdk的socketChannel被包裝成了netty的NioSocketChannel int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; // 把接收到的每個channel做爲消息,在channelPipeline中觸發一個讀事件 pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); // 最後觸發一個讀完成的事件 pipeline.fireChannelReadComplete(); if (exception != null) { closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception); } if (closed) { inputShutdown = true; if (isOpen()) { close(voidPromise()); } } } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } }
public void channelRead(ChannelHandlerContext ctx, Object msg) { // 類型轉換,這裏的強制轉換是安全的的, // 是由各類具體的AbstractNioMessageChannel子類型的實現保證的 // 各類具體的AbstractNioMessageChannel子類型的讀方法確保它們讀取並最終返回的是一個Channel類型 final Channel child = (Channel) msg; // 給子channel添加handler child.pipeline().addLast(childHandler); // 給子channel設置參數 setChannelOptions(child, childOptions, logger); // 給子channel設置屬性 for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { // 將子channel註冊到子group中 childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { // 調用了channel.bind方法完成綁定的邏輯 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return tail.bind(localAddress, promise); }
public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) { unsafe.bind(localAddress, promise); }
由於後面右有幾個事件的觸發,每一個觸發事件都是經過channel的相關方法來觸發,而後又是經過channelpipeline的傳遞事件,這些事件最後基本都是由HeadContext處理了,因此這裏我只簡單地敘述一下後面的 大概邏輯,代碼比較繁瑣,並且不少都是相同的調用過程,因此就不貼代碼了。
從代碼中能夠看出來,最終調用了jdk的api,將感興趣的事件添加到selectionKey中。經過前面的 分析,咱們知道對於NioSocketChannel,它的感興趣的讀事件類型是SelectionKey.OP_READ,也就是讀事件;
protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; // 將讀事件類型加入到selectionKey的感興趣的事件中 // 這樣jdk底層的selector就會監聽相應類型的事件 final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); } }