/** * Class Worker mainly maintains interrupt control state for * threads running tasks, along with other minor bookkeeping. * This class opportunistically extends AbstractQueuedSynchronizer * to simplify acquiring and releasing a lock surrounding each * task execution. This protects against interrupts that are * intended to wake up a worker thread waiting for a task from * instead interrupting a task being run. We implement a simple * non-reentrant mutual exclusion lock rather than use * ReentrantLock because we do not want worker tasks to be able to * reacquire the lock when they invoke pool control methods like * setCorePoolSize. Additionally, to suppress interrupts until * the thread actually starts running tasks, we initialize lock * state to a negative value, and clear it upon start (in * runWorker). */ private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * [@param](https://my.oschina.net/u/2303379) firstTask the first task (null if none) * 初始化當前工做隊列,隊列的第一個任務是 FirstTask,爲任務的執行建立新的 * 線程,設置當前隊列的狀態,參數產科 AQS的參數詳情 */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ //實現runable接口,執行runWorker方法 public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. // 檢查排他模式下是否被佔用,0表示未上鎖,1表示上鎖。 protected boolean isHeldExclusively() { return getState() != 0; } //嘗試加鎖 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } //嘗試釋放鎖 protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } //執行中斷 void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }`
ThreadPoolExecutor t = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
public class MyRunnable1 implements Runnable { @Override public void run() { System.out.println(Thread.currentThread().getName()+" " + System.currentTimeMillis()); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+" end"); } } public class ExecutorTest { public static void main(String[] args) throws InterruptedException { MyRunnable1 myRunnable1 = new MyRunnable1(); ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 10, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<>()); pool.execute(myRunnable1); pool.execute(myRunnable1); pool.execute(myRunnable1); pool.execute(myRunnable1); Thread.sleep(1000); pool.shutdown(); pool.execute(myRunnable1); System.out.println("main end!"); } }
execute( ) 方法用於提交不須要返回值的任務,因此沒法判斷任務是否被線程池執行成功。 submit()方法用於提交須要返回值的任務。線程池會返回一個future類型的對象,經過這個future對象能夠判斷任務是否執行成功,而且能夠經過future的get()方法來獲取返回值。ide
使用線程池的 shutdown 或shutdownNow 方法來關閉線程池。oop