tomcat線程池策略

tomcat的線程池擴展了jdk的executor,並且隊列用的是本身的task queue,所以其策略與jdk的有所不一樣,須要注意一下,否則容易踩坑。html

tomcat線程池策略

  • 場景1:接受一個請求,此時tomcat啓動的線程數尚未達到corePoolSize(tomcat裏頭叫minSpareThreads),tomcat會啓動一個線程來處理該請求;java

  • 場景2:接受一個請求,此時tomcat啓動的線程數已經達到了corePoolSize,tomcat把該請求放入隊列(offer),若是放入隊列成功,則返回,放入隊列不成功,則嘗試增長工做線程,在當前線程個數<maxThreads的時候,能夠繼續增長線程來處理,超過maxThreads的時候,則繼續往等待隊列裏頭放,等待隊列放不進去,則拋出RejectedExecutionException;apache

值得注意的是,使用LinkedBlockingQueue的話,默認是使用Integer.MAX_VALUE,即無界隊列(這種狀況下若是沒有配置隊列的capacity的話,隊列始終不會滿,那麼始終沒法進入開啓新線程到達maxThreads個數的地步,則此時配置maxThreads實際上是沒有意義的)。tomcat

而TaskQueue的隊列capacity爲maxQueueSize,默認也是Integer.MAX_VALUE。可是,其重寫offer方法,當其線程池大小小於maximumPoolSize的時候,返回false,即在必定程度改寫了隊列滿的邏輯,修復了使用LinkedBlockingQueue默認的capacity爲Integer.MAX_VALUE的時候,maxThreads失效的"bug"。從而能夠繼續增加線程到maxThreads,超過以後,繼續放入隊列。性能優化

TaskQueue的offer操做服務器

@Override
    public boolean offer(Runnable o) {
      //we can't do any checks
        if (parent==null) return super.offer(o);
        //we are maxed out on threads, simply queue the object
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
        //we have idle threads, just add it to the queue
        if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);
        //if we have less threads than maximum force creation of a new thread
        if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
        //if we reached here, we need to add it to the queue
        return super.offer(o);
    }

StandardThreadExecutor

/**
     * Start the component and implement the requirements
     * of {@link org.apache.catalina.util.LifecycleBase#startInternal()}.
     *
     * @exception LifecycleException if this component detects a fatal error
     *  that prevents this component from being used
     */
    @Override
    protected void startInternal() throws LifecycleException {

        taskqueue = new TaskQueue(maxQueueSize);
        TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());
        executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
        executor.setThreadRenewalDelay(threadRenewalDelay);
        if (prestartminSpareThreads) {
            executor.prestartAllCoreThreads();
        }
        taskqueue.setParent(executor);

        setState(LifecycleState.STARTING);
    }

值得注意的是,tomcat的線程池使用了本身擴展的taskQueue,而不是Executors工廠方法裏頭用的LinkedBlockingQueue。(主要是修改了offer的邏輯)併發

這裏的maxQueueSize默認爲less

/**
     * The maximum number of elements that can queue up before we reject them
     */
    protected int maxQueueSize = Integer.MAX_VALUE;

org/apache/tomcat/util/threads/ThreadPoolExecutor

/**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the <tt>Executor</tt> implementation.
     * If no threads are available, it will be added to the work queue.
     * If the work queue is full, the system will wait for the specified
     * time and it throw a RejectedExecutionException if the queue is still
     * full after that.
     *
     * @param command the runnable task
     * @param timeout A timeout for the completion of the task
     * @param unit The timeout time unit
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution - the queue is full
     * @throws NullPointerException if command or unit is null
     */
    public void execute(Runnable command, long timeout, TimeUnit unit) {
        submittedCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            if (super.getQueue() instanceof TaskQueue) {
                final TaskQueue queue = (TaskQueue)super.getQueue();
                try {
                    if (!queue.force(command, timeout, unit)) {
                        submittedCount.decrementAndGet();
                        throw new RejectedExecutionException("Queue capacity is full.");
                    }
                } catch (InterruptedException x) {
                    submittedCount.decrementAndGet();
                    throw new RejectedExecutionException(x);
                }
            } else {
                submittedCount.decrementAndGet();
                throw rx;
            }

        }
    }

注意看這裏改寫了jdk線程池默認的Rejected規則,即catch住了RejectedExecutionException。正常jdk的規則是core線程數+臨時線程數 >maxSize的時候,就拋出RejectedExecutionException。這裏catch住的話,繼續往taskQueue裏頭放。ide

public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
        if ( parent==null || parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
        return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected
    }

注意的是這裏調用的super.offer(o,timeout,unit),即LinkedBlockingQueue,只有當列滿的時候,返回false,纔會拋出從新拋出RejectedExecutionException。(這裏改變了jdk的ThreadPoolExecutor的RejectedExecutionException拋出的邏輯,也就是超出了maxThreads不會拋出RejectedExecutionException,而是繼續往隊列丟任務,而taskQueue自己是無界的,所以能夠默認幾乎不會拋出RejectedExecutionException)性能

JDK線程池策略

  1. 每次提交任務時,若是線程數還沒達到coreSize就建立新線程並綁定該任務。因此第coreSize次提交任務後線程總數必達到coreSize,不會重用以前的空閒線程。

  2. 線程數達到coreSize後,新增的任務就放到工做隊列裏,而線程池裏的線程則努力的使用take()從工做隊列里拉活來幹。

  3. 若是隊列是個有界隊列,又若是線程池裏的線程不能及時將任務取走,工做隊列可能會滿掉,插入任務就會失敗,此時線程池就會緊急的再建立新的臨時線程來補救。

  4. 臨時線程使用poll(keepAliveTime,timeUnit)來從工做隊列拉活,若是時候到了仍然兩手空空沒拉到活,代表它太閒了,就會被解僱掉。

  5. 若是core線程數+臨時線程數 >maxSize,則不能再建立新的臨時線程了,轉頭執行RejectExecutionHanlder。默認的AbortPolicy拋RejectedExecutionException異常,其餘選擇包括靜默放棄當前任務(Discard),放棄工做隊列裏最老的任務(DisacardOldest),或由主線程來直接執行(CallerRuns).

源碼

/**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

小結

tomcat的線程池與jdk的使用無界LinkedBlockingQueue主要有以下兩點區別:

  • jdk的ThreadPoolExecutor的線程池增加策略是:若是隊列是個有界隊列,又若是線程池裏的線程不能及時將任務取走,工做隊列可能會滿掉,插入任務就會失敗,此時線程池就會緊急的再建立新的臨時線程來補救。而tomcat的ThreadPoolExecutor使用的taskQueue,是無界的LinkedBlockingQueue,可是經過taskQueue的offer方法覆蓋了LinkedBlockingQueue的offer方法,改寫了規則,使得它也走jdk的ThreadPoolExecutor的有界隊列的線程增加策略。

  • jdk的ThreadPoolExecutor的線程池,當core線程數+臨時線程數 > maxSize,則不能再建立新的臨時線程了,轉頭執行RejectExecutionHanlder。而tomcat的ThreadPoolExecutor則改寫了這個規則,即catch住了RejectExecutionHanlder,繼續往隊列裏頭放,直到隊列滿了才拋出RejectExecutionHanlder。而默認taskQueue是無界的。

疑問:既然taskQueue是無界的,那麼在哪裏控制tomcat服務器的接收請求限制,如何自我保護。另外acceptCount與maxConnections究竟是什麼關係。

doc

相關文章
相關標籤/搜索