//Thread.java //調用start啓動線程,進入Runnable狀態,等待系統調度執行 public synchronized void start(){//synchronized同步執行 if (threadStatus != 0) //0 表明new狀態,非0則拋出錯誤 throw new IllegalThreadStateException(); ... start0(); //本地方法方法 private native void start0() ... } //Running狀態,新線程執行的代碼方法,可被子類重寫 public void run() { if (target != null) { //target是Runnable,new Thread(Runnable)時傳入 target.run(); } }
//Thread.java @Deprecated public final void stop(); //中斷線程 public void interrupt() //判斷的是當前線程是否處於中斷狀態 public static boolean interrupted()
interrupt函數中斷線程,但它不必定會讓線程退出的。它比stop函數優雅,可控制html
//Thread.java //阻塞等待其餘線程 public final synchronized void join(final long millis) //暫時讓出CPU執行 public static native void yield(); //休眠一段時間 public static native void sleep(long millis) throws InterruptedException;
start與run方法的區別java
Thread.sleep與Object.wait區別linux
//ThreadPoolExecutor.java public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
//ThreadPoolExecutor.java public void execute(Runnable command) { ... if (workerCountOf(c) < corePoolSize) { //plan A if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { //plan B int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } //addWorker(command, false) false表明可建立非核心線程來執行任務 else if (!addWorker(command, false)) //plan C reject(command); // //plan D }
操做方法 | 拋出異常 | 返回特殊值 | 阻塞線程 | 超時退出 |
---|---|---|---|---|
插入元素 | add(e) | offer(e) | put(e) | offer(e, timeout, unit) |
移除元素 | remove() | poll() | take() | pull(timeout, unit) |
檢查 | element() | peek() | 無 | 無 |
ArrayBlockingQueue程序員
LinkedBlockingQueue算法
PriorityBlockingQueue數組
public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator)
DelayQueue緩存
SynchronousQueue併發
LinkedTransferQueueless
LinkedBlockingDequeide
//Executors.java public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
//Executors.java public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
//Executors.java public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue(), threadFactory); } //指定延遲執行時間 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
//Executors.java public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); //無界隊列 }
public List<Runnable> shutdownNow() { ... final ReentrantLock mainLock = this.mainLock; mainLock.lock(); //加鎖 try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); //interrupt關閉線程 tasks = drainQueue(); //未執行任務 ...
先考慮下爲啥線程池的線程不會被釋放,它是怎麼管理線程的生命週期的呢
//ThreadPoolExecutor.Worker.class final void runWorker(Worker w) { ... //工做線程會進入一個循環獲取任務執行的邏輯 while (task != null || (task = getTask()) != null) ... } private Runnable getTask(){ ... Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); //線程會阻塞掛起等待任務, ... }
能夠看出,無任務執行時,線程池實際上是利用阻塞隊列的take方法掛起,從而維持核心線程的存活
//Worker class,一個worker一個線程 Worker(Runnable firstTask) { //禁止新線程未開始就被中斷 setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } final void runWorker(Worker w) { .... //對應構造Worker是的setState(-1) w.unlock(); // allow interrupts boolean completedAbruptly = true; .... w.lock(); //加鎖同步 .... try { ... task.run(); afterExecute(task, null); } finally { .... w.unlock(); //釋放鎖 }
worker繼承AQS的意義:A 禁止線程未開始就被中斷;B 同步runWorker方法的處理邏輯
A handler for rejected tasks that runs the rejected task directly in the calling thread of the {@code execute} method, unless the executor has been shut down, in which case the task is discarded.
若是任務被拒絕了,則由提交任務的線程執行此任務
當線程執行完本身deque的任務,且其餘線程deque還有多的任務,則會啓動竊取策略,從其餘線程deque隊尾獲取線程
//該demo代碼是引用他人的,若有侵權,請聯繫我 public class ForkJoinPoolTest { public static void main(String[] args) throws ExecutionException, InterruptedException { ForkJoinPool forkJoinPool = new ForkJoinPool(); for (int i = 0; i < 10; i++) { ForkJoinTask task = forkJoinPool.submit(new Fibonacci(i)); System.out.println(task.get()); } } static class Fibonacci extends RecursiveTask<Integer> { int n; public Fibonacci(int n) { this.n = n; } @Override protected Integer compute() { if (n <= 1) { return n; } Fibonacci fib1 = new Fibonacci(n - 1); fib1.fork(); //至關於開啓新線程執行 Fibonacci fib2 = new Fibonacci(n - 2); fib2.fork(); //至關於開啓新線程執行 return fib1.join() + fib2.join(); //合併返回結果 } } }
https://juejin.im/post/5f016b...