ThreadPoolExecutor源碼分析及阻塞提交任務方法

ThreadPoolExecutor源碼

ThreadPoolExecutor 基本使用參考:ThreadPoolExecutor執行過程分析java

線程池狀態標誌

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl 保存了線程池的運行狀態(runState)和線程池內有效線程數量(workerCount)。git

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

用 ctl 的高3位來表示線程池的運行狀態, 用低29位來表示線程池內有效線程的數量。ctlOf() 方法用於計算出ctl的值。runStateOf()和workerCountOf()方法分別經過CAPACITY來計算獲得其runState和workerCount,CAPACITY=29個1。github

線程池的運行狀態:less

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
//shutdown() -> SHUTDONW , 不加新任務,繼續執行阻塞隊列中的任務
private static final int SHUTDOWN   =  0 << COUNT_BITS;
//shutdownNow() -> STOP, 中斷一切操做。
private static final int STOP       =  1 << COUNT_BITS;
//線程池沒有線程,阻塞隊列沒有任務 -> TIDYING
private static final int TIDYING    =  2 << COUNT_BITS;
//terminated() -> TERMINATED
private static final int TERMINATED =  3 << COUNT_BITS;

execute(Runnable command)

/**
 * Executes the given task sometime in the future.  The task
 * may execute in a new thread or in an existing pooled thread.
 *
 * If the task cannot be submitted for execution, either because this
 * executor has been shutdown or because its capacity has been reached,
 * the task is handled by the current {@code RejectedExecutionHandler}.
 *
 * @param command the task to execute
 * @throws RejectedExecutionException at discretion of
 *         {@code RejectedExecutionHandler}, if the task
 *         cannot be accepted for execution
 * @throws NullPointerException if {@code command} is null
 */
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        //若是線程池中線程數沒有達到corePoolSize,則新增線程(worker)
        if (addWorker(command, true))
            return;
        //更新c值。
        c = ctl.get();
    }
    //線程池處於RUNNING狀態,而且阻塞隊列未滿
    //workQueue.offer(command)是非阻塞方法,當隊列滿時直接返回false(例如,SynchronousQueue若是沒有線程在阻塞take,則返回false)
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        //再次檢查狀態,若是發現不是RUNNING狀態,則remove掉剛纔offer的任務。
        if (! isRunning(recheck) && remove(command))
            reject(command);
        //若是有效線程數==0,添加一個線程,而不去啓動它。??
        //怎麼會==0?
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //若是不是RUNNING狀態,或者阻塞隊列已滿,則添加線程
    //若是不能添加,則reject。
    //false 表示添加的線程屬於maximumPoolSize,若是線程數已經達到maximumPoolSize,則reject
    else if (!addWorker(command, false))
        reject(command);
}

20160228204222307

BlockingQueue 的一些操做方法ide

拋出異常 特殊值 阻塞 超時
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
檢查 element() peek() 不可用 不可用

addWorker(Runnable firstTask, boolean core)

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            //1. 處於 STOP, TYDING 或 TERMINATD 狀態 而且 
            //2. 不是SUHTDOWN 或者 firsttask != null 或 queue不爲空
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            //wc大於最大容量。
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                //沒有空餘的線程了。
                return false;
            //有效線程數加一,加一成功後break
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            //runState改變,從頭執行邏輯。
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
            //else runState 沒變,從新去執行加一操做。
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //建立worker
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //添加成功,啓動線程
                //啓動後執行runWorker(this);
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

runWorker(Worker w)

運行worker,該線程不斷的getTask()從隊列中獲取任務,而後 task.run();運行。只要隊列中有值則不斷循環。oop

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //getTask()方法是個無限循環, 會從阻塞隊列 workQueue中不斷取出任務來執行.
        //addWorker(null, false);狀況,task==null,這樣就須要getTask從隊列中取任務執行(本身不帶任務)。直到getTask返回null
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //執行
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

getTask()

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // STOP以上狀態,或者SHUTDOWN狀態下queue爲空,即都沒有任務要執行了。
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            //線程數減一
            decrementWorkerCount();
            //該線程退出。
            return null;
        }
        //下面都是RUNNING狀態,或SHUTDOWN狀態queue!=null

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        //設置了allowCoreThreadTimeOut,或者線程數大於core線程數。
        //是否剔除超時的線程?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
		
        // 經過返回 null 結束線程。
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
           	//線程已經準備好,正在take(),沒有什麼標誌位?
            
            //取出runnable 返回
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

ThreadPoolExecutor阻塞添加任務

使用semaphore限流ThreadPoolExecutor(失效及緣由)

考慮到當線程池滿時(任務數 > maximumPoolSize + Queue.size()),會執行飽和策略。默認AbortPolicy ,拋出RejectedExecutionException。源碼分析

怎麼能避免線程池拒絕提交的任務呢?首先想到經過信號量Semaphore來控制任務的添加。代碼以下:ui

注意:該代碼是無效的。this

Semaphore semaphore;

/**
 * 使用semaphore,控制提交任務速度
 * @throws InterruptedException
 * @throws ExecutionException
 */
@Test
public void test555() throws InterruptedException, ExecutionException {
   ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 7, 10, TimeUnit.SECONDS, new SynchronousQueue<>());
   //信號量設置爲線程池最大線程數
   semaphore = new Semaphore(threadPoolExecutor.getMaximumPoolSize());
   ExecutorCompletionService<String> executorCompletionService = new ExecutorCompletionService(threadPoolExecutor);

   Runnable runnable = new Runnable() {
      @Override
      public void run() {
         for (int i = 0; i < 50; i++) {
            String name = "name_" + i;
            TestCallable testCallable = new TestCallable(name);
            try {
               //RetryUtil.createThreadPoolExecutor()
               semaphore.acquire();

               executorCompletionService.submit(testCallable);
               logger.info("+++添加任務 name: " + name + poolInfo(threadPoolExecutor));
               //threadPoolExecutor.submit(testCallable);
            } catch (RejectedExecutionException e) {
               logger.info("拒絕:" + name);
            } catch (InterruptedException e) {
               e.printStackTrace();
            }
            try {
               //添加任務間隔200ms
               Thread.sleep(200);
            } catch (InterruptedException e) {
               e.printStackTrace();
            }
         }
         finishState = 1;
      }
   };

   Thread addThread = new Thread(runnable);
   addThread.start();

   //logger.info(" taskCount: " + threadPoolExecutor.getTaskCount());

   //添加的任務有被拋棄的。taskCount不必定等於添加的任務。
   int completeCount = 0;
   while (!(completeCount == threadPoolExecutor.getTaskCount() && finishState == 1)) {
      Future<String> take = executorCompletionService.take();
      String taskName = null;

      try {
         taskName = take.get();
         //有可能線程池還沒準備好?
         semaphore.release();
         System.out.println("???" + take.isDone());

      } catch (InterruptedException e) {
         e.printStackTrace();
      } catch (ExecutionException e) {
         logger.info(e.getMessage());
      }

      logger.info("---完成任務 name: "
            + taskName + poolInfo(threadPoolExecutor)
            + " finishTask:" + (++completeCount));

   }

   addThread.join();


   while (threadPoolExecutor.getPoolSize() > 0) {
      Thread.sleep(1000);
      SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss");
      logger.info(simpleDateFormat.format(new Date()) + poolInfo(threadPoolExecutor));
   }

   // Tell threads to finish off.
   threadPoolExecutor.shutdown();
   // Wait for everything to finish.
   while (!threadPoolExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
      logger.info("complete");
   }

}

public String poolInfo(ThreadPoolExecutor threadPoolExecutor) {
   return " ActiveCount: " + threadPoolExecutor.getActiveCount()
         + " poolSize: " + threadPoolExecutor.getPoolSize()
         + " queueSize: " + threadPoolExecutor.getQueue().size()
         + " taskCount: " + threadPoolExecutor.getTaskCount();
}

只是在submit以前添加semaphore.acquire(); 在獲取future後,添加semaphore.release();。atom

但這樣依然會產生RejectedExecutionException。

經過源碼分析緣由,

當線程池中線程已滿,而且都處於忙碌狀態。此時semaphore的值==線程池線程數,addThread被semaphore.acquire()阻塞,禁止submit新任務。當線程池中一個線程t1執行了runWorker(Worker w)中的task.run(),main線程就能夠執行Future<String> take = executorCompletionService.take()獲取結果並semaphore.release()釋放信號量。

釋放信號量semaphore後,addThread線程能夠submit新任務,假設此時t1線程尚未執行到getTask() 中的poll()和take()方法。此時workQueue隊列依然是滿的。

Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();

而addThread已經執行到execute()的

if (isRunning(c) && workQueue.offer(command)) {

當workQueue已滿,offer() 直接返回false(正確的順序應該是等t1線程執行到workQueue.take()後addThread再開始執行workQueue.offer(command)。)。執行execute() 以下邏輯

else if (!addWorker(command, false))
    reject(command);

addWork()中,wc = maximumPoolSize 返回false。

if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                //沒有空餘的線程了。
                return false;

執行reject(),拋出RejectedExecutionException。

使用自定義隊列(不建議)

public class LimitedQueue<E> extends LinkedBlockingQueue<E> 
{
    public LimitedQueue(int maxSize)
    {
        super(maxSize);
    }

    @Override
    public boolean offer(E e)
    {
        // turn offer() and add() into a blocking calls (unless interrupted)
        try {
            put(e);
            return true;
        } catch(InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
        return false;
    }

}

其思想就是替換BlockingQueue中的offer()方法爲put()方法,這樣execute() 中的workQueue.offer(command),就變成put(),阻塞添加任務,不會存在workQueue.offer() 返回false的狀況。

//void execute(Runnable command) 中代碼

if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
        reject(command);
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
}
//一下代碼,沒法執行
else if (!addWorker(command, false))
    reject(command);

但這樣的問題是下面的else if (!addWorker(command, false)) 代碼邏輯將沒法執行,致使的結果就是,只針對corePoolSize==maxPoolSize 時有效。不建議這麼作。

自定義RejectedExecutionHandler

RejectedExecutionHandler block = new RejectedExecutionHandler() {
  rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
     executor.getQueue().put( r );
  }
};

ThreadPoolExecutor pool = new ...
pool.setRejectedExecutionHandler(block);

經過自定義RejectedExecutionHandler,在reject時調用Queue的put()方法,阻塞式添加任務。

使用CallerRunsPolicy

其實忙活一圈,發現最簡單的方式就是使用ThreadPoolExecutor.CallerRunsPolicy。

CallerRunsPolicy被拒絕的任務,誰submit的誰執行。想一想以前的各類阻塞也對,負責添加任務的線程由於線程池滿了就阻塞在那裏,還不如幫着執行一些任務..

Reference

相關文章
相關標籤/搜索