Jdk1.8--ThreadPoolExecute的Worker類

線程池的work對象---一個同步的工做隊列,實現了runable接口

/**
 * Class Worker mainly maintains interrupt control state for
 * threads running tasks, along with other minor bookkeeping.
 * This class opportunistically extends AbstractQueuedSynchronizer
 * to simplify acquiring and releasing a lock surrounding each
 * task execution.  This protects against interrupts that are
 * intended to wake up a worker thread waiting for a task from
 * instead interrupting a task being run.  We implement a simple
 * non-reentrant mutual exclusion lock rather than use
 * ReentrantLock because we do not want worker tasks to be able to
 * reacquire the lock when they invoke pool control methods like
 * setCorePoolSize.  Additionally, to suppress interrupts until
 * the thread actually starts running tasks, we initialize lock
 * state to a negative value, and clear it upon start (in
 * runWorker).
 */
private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * [@param](https://my.oschina.net/u/2303379) firstTask the first task (null if none)
     * 初始化當前工做隊列,隊列的第一個任務是 FirstTask,爲任務的執行建立新的            * 線程,設置當前隊列的狀態,參數產科 AQS的參數詳情

     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
   //實現runable接口,執行runWorker方法
    public void run() {
        runWorker(this);
    }

    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.
    // 檢查排他模式下是否被佔用,0表示未上鎖,1表示上鎖。
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    //嘗試加鎖
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    
    //嘗試釋放鎖
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    //執行中斷
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}`

線程池的建立

ThreadPoolExecutor t = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                60L, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>());

參數是:java

  • corePoolSize:核心線程池的大小,
  • runableTaskQueue:任務隊列:用於保存等待執行的任務的阻塞隊列
  • maximumPoolSize:線程池最大數量,線程池容許建立的最大線程數。
  • ThreadFactory:用於設置建立線程的工程,能夠經過線程工廠給每一個建立出來的線程設置更有意義的名字。
  • RejectExecutionHandler(飽和策略),當隊列和線程池都滿了,說明線程池處於飽和狀態,那麼必須採起一種策略處理提交的新任務。
  • KeepAliveTime(線程活動保持時間):線程池的工做線程空閒後,保持存活的時間。
  • TimeUtil (線程活動保持時間的單位)

向線程池提交任務

public class MyRunnable1 implements Runnable {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName()+" " + System.currentTimeMillis());
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName()+" end");
    }
}

public class ExecutorTest {
    public static void main(String[] args) throws InterruptedException {
        MyRunnable1 myRunnable1 = new MyRunnable1();
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 10, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
        pool.execute(myRunnable1);
        pool.execute(myRunnable1);
        pool.execute(myRunnable1);
        pool.execute(myRunnable1);
        Thread.sleep(1000);
        pool.shutdown();
        pool.execute(myRunnable1);
        System.out.println("main end!");
    }
}

execute( ) 方法用於提交不須要返回值的任務,因此沒法判斷任務是否被線程池執行成功。 submit()方法用於提交須要返回值的任務。線程池會返回一個future類型的對象,經過這個future對象能夠判斷任務是否執行成功,而且能夠經過future的get()方法來獲取返回值。ide

線程池的關閉

使用線程池的 shutdown 或shutdownNow 方法來關閉線程池。oop

線程池的監控

  • taskcount:線程池須要執行的任務數量。
  • completedTaskCount線程池在運行過程當中已完成的任務數量,小於或等於taskcount
  • largestPoolSize:線程池裏曾經建立過的最大線程數量。
  • getPoolSize:線程池的線程數量。
  • getActiveCount:獲取活動的線程數。
相關文章
相關標籤/搜索