NioEventLoop的建立

NioEventLoop的建立

NioEventLoop是netty及其重要的組成部件,它的首要職責就是爲註冊在它上的channels服務,發現這些channels上發生的新鏈接、讀寫等I/O事件,而後將事件轉交 channel 流水線處理。使用netty時,咱們首先要作的就是建立NioEventLoopGroup,這是一組NioEventLoop的集合,相似線程與線程池。一般,服務端會建立2個group,一個叫作bossGroup,一個叫作workerGroup。bossGroup負責監聽綁定的端口,接受請求並建立新鏈接,初始化後交由workerGroup處理後續IO事件。java

NioEventLoop和NioEventLoopGroup的類圖

首先看看NioEventLoop和NioEventLoopGroup的類關係圖數組

NioEventLoop
NioEventLoopGroup
類多但不亂,能夠發現三個特色:app

  1. 二者都繼承了ExecutorService,從而與線程池創建了聯繫
  2. NioEventLoop繼承的都是SingleThread,NioEventLoop繼承的是MultiThread
  3. NioEventLoop還繼承了AbstractScheduledEventExecutor,不難猜出這是個和定時任務調度有關的線程池

NioEventLoopGroup的建立

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

咱們先看看bossGroup和workerGroup的構造方法。負載均衡

public NioEventLoopGroup() {
    this(0);
}
public NioEventLoopGroup(int nThreads) {
    this(nThreads, (Executor) null);
}
除此以外,還有多達8種構造方法,這些構造方法能夠指定5種參數:
一、最大線程數量。若是指定爲0,那麼Netty會將線程數量設置爲CPU邏輯處理器數量的2倍
二、線程工廠。要求線程工廠類必須實現java.util.concurrent.ThreadFactory接口。若是沒有指定線程工廠,那麼默認DefaultThreadFactory。
三、SelectorProvider。若是沒有指定SelectorProvider,那麼默認的SelectorProvider爲SelectorProvider.provider()。
四、SelectStrategyFactory。若是沒有指定則默認爲DefaultSelectStrategyFactory.INSTANCE
五、RejectedExecutionHandler。拒絕策略處理類,若是這個EventLoopGroup已被關閉,那麼以後提交的Runnable任務會默認調用RejectedExecutionHandler的reject方法進行處理。若是沒有指定,則默認調用拒絕策略。

最終,NioEventLoopGroup會重載到父類MultiThreadEventExecutorGroup的構造方法上,這裏省略了一些健壯性代碼。ide

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {
    // 步驟1
    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }
    
    // 步驟2
    children = new EventExecutor[nThreads];
    for (int i = 0; i < nThreads; i ++) {
        children[i] = newChild(executor, args);
    }    

    // 步驟3
    chooser = chooserFactory.newChooser(children);

    // 步驟4
    final FutureListener<Object> terminationListener = future -> {
        if (terminatedChildren.incrementAndGet() == children.length) {
            terminationFuture.setSuccess(null);
        }
    };
    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }

    // 步驟5
    Set<EventExecutor> childrenSet = new LinkedHashSet<>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

這裏能夠分解爲5個步驟,下面一步步講解函數

步驟1

第一個步驟是建立線程池executor。從workerGroup構造方法可知,默認傳進來的executor爲null,因此首先建立executor。newDefaultThreadFactory的做用是設置線程的前綴名和線程優先級,默認狀況下,前綴名是nioEventLoopGroup-x-y這樣的命名規則,而線程優先級則是5,處於中間位置。
建立完newDefaultThreadFactory後,進入到ThreadPerTaskExecutor。它直接實現了juc包的線程池頂級接口,從構造方法能夠看到它只是簡單的把factory賦值給本身的成員變量。而它實現的接口方法調用了threadFactory的newThread方法。從名字能夠看出,它構造了一個thread,並當即啓動thread。oop

public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
    this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
    threadFactory.newThread(command).start();
}

那麼咱們回過頭來看下DefaultThreadFactory的newThread方法,發現他建立了一個FastThreadLocalThread。這是netty自定義的一個線程類,顧名思義,netty認爲它的性能更快。關於它的解析留待之後。這裏步驟1建立線程池就完成了。總的來講他與咱們一般使用的線程池不太同樣,不設置線程池的線程數和任務隊列,而是來一個任務啓動一個線程。(問題:那任務一多豈不是直接線程爆炸?)性能

@Override
public Thread newThread(Runnable r) {
    Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());        
    return t;
}
protected Thread newThread(Runnable r, String name) {
    return new FastThreadLocalThread(threadGroup, r, name);
}

步驟2

步驟2是建立workerGroup中的NioEventLoop。在示例代碼中,傳進來的線程數是0,顯然不可能真的只建立0個nioEventLoop線程。在調用父類MultithreadEventLoopGroup構造函數時,對線程數進行了判斷,若爲0,則傳入默認線程數,該值默認爲2倍CPU核心數優化

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
// 靜態代碼塊初始化DEFAULT_EVENT_LOOP_THREADS
static {
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads",     NettyRuntime.availableProcessors() * 2));
}

接下來是經過newChild方法爲每個EventExecutor建立一個對應的NioEventLoop。這個方法傳入了一些args到NioEventLoop中,前三個是在NioEventLoopGroup建立時傳過來的參數。默認值見上文this

  1. SlectorProvider.provider, 用於建立 Java NIO Selector 對象;
  2. SelectStrategyFactory, 選擇策略工廠;
  3. RejectedExecutionHandlers, 拒絕執行處理器;
  4. EventLoopTaskQueueFactory,任務隊列工廠,默認爲null;

進入NioEventLoop的構造函數,以下:

NioEventLoop構造函數
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                 EventLoopTaskQueueFactory queueFactory) {
        super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
                rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }
// 父類構造函數    
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, Queue<Runnable> taskQueue,
                                        RejectedExecutionHandler rejectedHandler) {
    super(parent);
    this.addTaskWakesUp = addTaskWakesUp;
    this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
    this.executor = ThreadExecutorMap.apply(executor, this);
    this.taskQueue = ObjectUtil.checkNotNull(taskQueue, taskQueue");
    rejectedExecutionHandler = ObjectUtil.checkNotNullrejectedHandler, "rejectedHandler");
}

首先調用一個newTaskQueue方法建立一個任務隊列。這是一個mpsc即多生產者單消費者的無鎖隊列。以後調用父類的構造函數,在父類的構造函數中,將NioEventLoopGroup設置爲本身的parent,並經過匿名內部類建立了這樣一個Executor————經過ThreadPerTaskExecutor執行傳進來的任務,而且在執行時將當前線程與NioEventLoop綁定。其餘屬性也一一設置。
在nioEventLoop構造函數中,咱們發現建立了一個selector,不妨看一看netty對它的包裝。

unwrappedSelector = provider.openSelector();
if (DISABLE_KEY_SET_OPTIMIZATION) {
    return new SelectorTuple(unwrappedSelector);
}

首先看到netty定義了一個常量DISABLE_KEY_SET_OPTIMIZATION,若是這個常量設置爲true,也即不對keyset進行優化,則直接返回未包裝的selector。那麼netty對selector進行了哪些優化?

final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {

    SelectionKey[] keys;
    int size;

    SelectedSelectionKeySet() {
        keys = new SelectionKey[1024];
    }
}

往下,咱們看到了一個叫作selectedSelectionKeySet的類,點進去能夠看到,它繼承了AbstractSet,然而它的成員變量卻讓咱們想到了ArrayList,再看看它定義的方法,除了不支持remove和contains外,活脫脫一個簡化版的ArrayList,甚至也支持擴容。
沒錯,netty確實經過反射的方式,將selectionKey從Set替換爲了ArrayList。仔細一想,卻又以爲此番作法有些道理。衆所周知,雖然HashSet和ArrayList隨機查找的時間複雜度都是o(1),但相比數組直接經過偏移量定位,HashSet因爲須要Hash運算,時間消耗上又稍稍遜色了些。再加上使用場景上,都是獲取selectionKey集合而後遍歷,Set去重的特性徹底用不上,也無怪乎追求性能的netty想要替換它了。

步驟3

建立完workerGroup的NioEventLoop後,如何挑選一個nioEventLoop進行工做是netty接下來想要作的事。通常來講輪詢是一個很容易想到的方案,爲此須要建立一個相似負載均衡做用的線程選擇器。固然追求性能到喪心病狂的netty是不會輕易知足的。咱們看看netty在這樣常見的方案裏又作了哪些操做。

public EventExecutorChooser newChooser(EventExecutor[] executors) {
    if (isPowerOfTwo(executors.length)) {
        return new PowerOfTwoEventExecutorChooser(executors);
    } else {
        return new GenericEventExecutorChooser(executors);
    }
}
// PowerOfTwo
public EventExecutor next() {
    return executors[idx.getAndIncrement() & executors.length - 1];
}
// Generic
public EventExecutor next() {
    return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}

能夠看到,netty根據workerGroup內線程的數量採起了2種不一樣的線程選擇器,當線程數x是2的冪次方時,能夠經過&(x-1)來達到對x取模的效果,其餘狀況則須要直接取模。這與hashmap強制設置容量爲2的冪次方有殊途同歸之妙。

步驟4

步驟4就是添加一些保證健壯性而添加的監聽器了,這些監聽器會在EventLoop被關閉後獲得通知。

步驟5

建立一個只讀的NioEventLoop線程組

到此NioEventLoopGroup及其包含的NioEventLoop組就建立完成了

相關文章
相關標籤/搜索