前一篇文章總結了對 java 併發中的內置鎖的理解,這篇文章來講說線程 ,併發與線程總有剪不斷理還亂的關係。關於 java 線程的基本概念、線程與進程的關係以及如何建立線程,想必你們都很清楚了。以前總結過,存疑新同窗的傳送門:Java 多線程html
咱們知道,java 線程的三種建立方式:java
new Thread(){ @Override public void run() { super.run(); } }.start()
new Thread(new Runnable() { @Override public void run() { } }).start();
new Thread(new FutureTask<Integer>(new Callable<Integer>() { @Override public Integer call() throws Exception { return null; } }) ).start();
先看 Runnable 接口源碼:編程
@FunctionalInterface public interface Runnable { /** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see java.lang.Thread#run() */ public abstract void run(); }
函數式接口,只有一個 run()
方法。
再看 Thread 類源碼,咱們發現,Thread 類實現了 Runnable 接口:設計模式
public class Thread implements Runnable { ... /* What will be run. */ private Runnable target; ... /* Java thread status for tools, * initialized to indicate thread 'not yet started' */ private volatile int threadStatus = 0; ... /** * Causes this thread to begin execution; the Java Virtual Machine * calls the <code>run</code> method of this thread. * <p> * The result is that two threads are running concurrently: the * current thread (which returns from the call to the * <code>start</code> method) and the other thread (which executes its * <code>run</code> method). * <p> * It is never legal to start a thread more than once. * In particular, a thread may not be restarted once it has completed * execution. * * @exception IllegalThreadStateException if the thread was already * started. * @see #run() * @see #stop() */ public synchronized void start() { /** * This method is not invoked for the main method thread or "system" * group threads created/set up by the VM. Any new functionality added * to this method in the future may have to also be added to the VM. * * A zero status value corresponds to state "NEW". */ if (threadStatus != 0) throw new IllegalThreadStateException(); /* Notify the group that this thread is about to be started * so that it can be added to the group's list of threads * and the group's unstarted count can be decremented. */ group.add(this); boolean started = false; try { start0(); started = true; } finally { try { if (!started) { group.threadStartFailed(this); } } catch (Throwable ignore) { /* do nothing. If start0 threw a Throwable then it will be passed up the call stack */ } } } @Override public void run() { if (target != null) { target.run(); } }
咱們看到,Thread 類有一個 Runnable 類型的 target 域。Thread 的 run 方法就是調用的 target 的 run 方法。而啓動線程則須要調用線程的 start 方法。在 Thread 類中,還有一個 volatile
修飾的 threadStatus
域,用來表示線程的狀態,初始值爲0,當咱們重複調用線程的 start 方法時,會拋出 java.lang.IllegalThreadStateException
的異常。緩存
當咱們須要獲取線程中方法執行的返回值時,使用 FutureTask 和 Callable 的方式建立。看 Thread 源碼可知,Thread 類構造方法可傳入 Runnable 對象,方式三,這裏傳入 FutureTask 對象,能夠猜測: FutureTask 必定是實現了 Runnable 接口。而 FutureTask 的構造方法又傳入了 Callable 對象,咱們重寫了 call 方法。咱們看看相關類的源碼,梳理一下。
java.lang.Runnable
安全
@FunctionalInterface public interface Runnable { public abstract void run(); }
java.util.concurrent.Future
多線程
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
java.util.concurrent.Callable
併發
@FunctionalInterface public interface Callable<V> { V call() throws Exception; }
java.util.concurrent.RunnableFuture
框架
public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run(); }
java.util.concurrent.FutureTask
less
public class FutureTask<V> implements RunnableFuture<V> { ... /** The underlying callable; nulled out after running */ private Callable<V> callable; ... /** * Creates a {@code FutureTask} that will, upon running, execute the * given {@code Callable}. * * @param callable the callable task * @throws NullPointerException if the callable is null */ public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } /** * Creates a {@code FutureTask} that will, upon running, execute the * given {@code Runnable}, and arrange that {@code get} will return the * given result on successful completion. * * @param runnable the runnable task * @param result the result to return on successful completion. If * you don't need a particular result, consider using * constructions of the form: * {@code Future<?> f = new FutureTask<Void>(runnable, null)} * @throws NullPointerException if the runnable is null */ public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable } public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } /** * Executes the computation without setting its result, and then * resets this future to initial state, failing to do so if the * computation encounters an exception or is cancelled. This is * designed for use with tasks that intrinsically execute more * than once. * * @return {@code true} if successfully run and reset */ protected boolean runAndReset() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { c.call(); // don't set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran && s == NEW; } }
java.util.concurrent.Executors
public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); } /** * A callable that runs given task and returns given result */ static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
new Thread(new FutureTask(new Callable(){ @override public void call(){ xxx } })).start();
或者
new Thread(new FutureTask(new Runnable(){ @override public void run(){ xxx } },"hello,world")).start();
看過源碼以後,我感受這第二種方式通常不建議寫,除非返回的結果跟線程體執行無關。FutureTask 表示的計算若是是經過 Callable
來實現額,至關於一種可生成結果的 Runnable
,而且能夠處於 3 中狀態:運行等待、正在運行、運行完成。 其中運行完成包括可能的結束方式,正常運行,因爲取消而結束和因爲異常而結束等。當FutureTask處於運行完成狀態後,它會永遠中止在這個狀態。
須要注意的是: Future.get 這個方法取決於任務的狀態。若是任務已經完成,那麼 get 當即返回結果。不然 get 將阻塞知道任務進入完成狀態,而後返回結果或者拋出異常。 FutureTask 保證將計算結果從執行計算的線程安全地發佈到獲取這個結果的線程。
上面說的建立和啓動線程的本質幾乎同樣:new Thread(Runnable r).start()
,經過這種方式建立的線程稱之爲「野線程」,當線程體執行完以後線程就銷燬了,再加上線程的建立,銷燬和線程的調度,都是須要系統資源的開銷。想象一下,在高併發場景下,不對線程數量加以控制,無限制建立線程,當達到系統性能的閾值,系統必然崩潰。因此建立野線程的這種方式實際項目中通常不用,而是使用線程池來管理線程。
線程池的優勢:
從網上扒來一張 java 線程池的框架圖:
java 類庫中,任務執行的主要抽象,不是 Thread, 而是 Executor , 看看 Executor 接口:
package java.util.concurrent; public interface Executor { void execute(java.lang.Runnable runnable); }
ExecutorService
接口繼承了 Executor
接口,擴充了一些方法。線程池的核心實現類是 ThreadPoolExecutor
和 ScheduledThreadPoolExecutor
,前者用來執行被提交的任務,而ScheduledThreadPoolExecutor 能夠在給定的延遲後運行任務,或者按期執行命令。
ThreadPoolExecutor 提供了四個構造方法:
public class ThreadPoolExecutor extends AbstractExecutorService { ..... public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler); ... }
參數最多的構造方法的參數說明:
ScheduledThreadPoolExecutor 繼承自 ThreadPoolExecutor
,一樣有四個相似的構造方法,就不列舉了。
Executors
是一個工廠類,提供了一些靜態的方法操做線程池。一般建立線程池,咱們不直接用調用 ThreadPoolExecutor
和 ScheduledThreadPoolExecutor
的構造方法,而是經過 Executors
類的五個靜態工廠方法建立。
newFixedThreadPool(...) newSingleThreadExecutor(...) newCachedThreadPool(...) newScheduledThreadPool(...) newSingleThreadScheduledExecutor()
newSingleThreadExecutor
建立單線程的線程池 這個線程池只有一個線程在工做,也就是至關於單線程串行執行全部任務。
返回單線程的Executor,將多個任務交給此Exector時,這個線程處理完一個任務後接着處理下一個任務,若該線程出現異常,將會有一個新的線程來替代。此線程池保證全部任務的執行順序按照任務的提交順序執行。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
說明:LinkedBlockingQueue會無限的添加須要執行的Runnable。
newFixedThreadPool
建立一個包含指定數目線程的線程池,若是任務數量多於線程數目,那麼沒有執行的任務必須等待,直到有任務完成爲止。每次提交一個任務就建立一個線程,直到線程達到線程池的最大小。任務線程池的大小一旦達到最大值就會保持不變,若是某個線程由於執行異常而結束,那麼線程池會補充一個新線程。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
newCachedThreadPool
建立一個可緩存的線程池,線程池能夠自動的擴展線程池的容量,核心線程數量爲0.若是線程池的大小超過了處理任務所須要的線程,那麼就會回收部分空閒(60秒不執行任務)的線程,當任務數增長時,此線程池又能夠智能的添加新線程來處理任務。此線程池不會對線程池大小作限制,線程池大小徹底依賴於操做系統(或者說JVM)可以建立的最大線程大小。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
SynchronousQueue是個特殊的隊列。 SynchronousQueue隊列的容量爲0。當試圖爲SynchronousQueue添加Runnable,則執行會失敗。只有當一邊從SynchronousQueue取數據,一邊向SynchronousQueue添加數據才能夠成功。SynchronousQueue僅僅起到數據交換的做用,並不保存線程。但newCachedThreadPool()方法沒有線程上限。Runable添加到SynchronousQueue會被馬上取出。
根據用戶的任務數建立相應的線程來處理,該線程池不會對線程數目加以限制,徹底依賴於JVM能建立線程的數量,可能引發內存不足。
newScheduledThreadPool
建立一個指定大小的定時任務調度的線程池。此線程池支持定時以及週期性執行任務的需求。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
newSingleThreadScheduledExecutor
建立一個單線程的定時任務調度線程池,此線程池支持定時以及週期性執行任務的需求。
public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); }
想起了多線程下載,將整個文件分紅了多段,而後多個線程提供下載,每一個線程下載一段。當全部的下載線程都執行完以後,主線程通知用戶,文件下載完成了。這裏存在一個問題,主線程如何知道全部的下載線程都執行完了?
解決思路有不少種,好比咱們能夠定義一個計數的變量,初始值爲下載線程的數量,每一個線程執行完,計數變量值 -1,計數器的值爲 0 ,咱們就知道全部的下載線程都執行完了。這裏,咱們可能須要對計數器進行相應的同步操做,確保任什麼時候候讀取它的狀態都是正確的。
幸運的是,java 提供了一個相似計算器的工具類,能夠達到此目的。——CountDownLatch 類。
CountDownLatch 位於 java.util.concurrent 包下。是一個同步工具類,用來協調多個線程之間的同步,
CountDownLatch 可以使一個線程在等待另一些線程完成各自工做以後,再繼續執行。使用一個計數器進行實現。計數器初始值爲線程的數量。當每個線程完成本身任務後,計數器的值就會減一。當計數器的值爲0時,表示全部的線程都已經完成了任務,而後在CountDownLatch上等待的線程就能夠恢復執行任務。
CountDownLatch類只提供了一個構造器:
public CountDownLatch(int count) //參數count爲計數值
CountDownLatch 類中有 3 個重要方法:
CountDownLatch 用法舉例:
public class Test { public static void main(String[] args) { final CountDownLatch latch = new CountDownLatch(2); new Thread(){ public void run() { try { System.out.println("子線程"+Thread.currentThread().getName()+"正在執行"); Thread.sleep(2000); System.out.println("子線程"+Thread.currentThread().getName()+"執行完畢"); latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }; }.start(); new Thread(){ public void run() { try { System.out.println("子線程"+Thread.currentThread().getName()+"正在執行"); Thread.sleep(3000); System.out.println("子線程"+Thread.currentThread().getName()+"執行完畢"); latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }; }.start(); try { System.out.println("等待2個子線程執行完畢..."); latch.await(); System.out.println("2個子線程已經執行完畢"); System.out.println("繼續執行主線程"); } catch (InterruptedException e) { e.printStackTrace(); } } }
執行結果:
線程Thread-0正在執行 線程Thread-1正在執行 等待2個子線程執行完畢... 線程Thread-0執行完畢 線程Thread-1執行完畢 2個子線程已經執行完畢 繼續執行主線程
CountDownLatch是一次性的,計數器的值只能在構造方法中初始化一次,以後沒有任何機制再次對其設置值,當CountDownLatch使用完畢後,它不能再次被使用。
柵欄(有的書籍中稱爲同步屏障)CyclicBarrier,字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要作的事情是,讓一組線程到達一個屏障(也能夠叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障纔會開門,全部被屏障攔截的線程纔會繼續幹活。CyclicBarrier默認的構造方法是CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每一個線程調用 await 方法告訴 CyclicBarrier 我已經到達了屏障,而後當前線程被阻塞。
CyclicBarrier類位於java.util.concurrent包下,CyclicBarrier提供2個構造器:
public CyclicBarrier(int parties, Runnable barrierAction) public CyclicBarrier(int parties)
參數parties指讓多少個線程或者任務等待至barrier狀態;參數barrierAction爲當這些線程都達到barrier狀態時會執行的內容。
CyclicBarrier 最重要的方法就是 await 方法,它有2個重載版本:
public int await() throws InterruptedException, BrokenBarrierException public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException
第一個版本比較經常使用,用來掛起當前線程,直至全部線程都到達barrier狀態再同時執行後續任務;
第二個版本是讓這些線程等待至必定的時間,若是還有線程沒有到達barrier狀態就直接讓到達barrier的線程執行後續任務。
CyclicBarrier 用法舉例:
public class Test { public static void main(String[] args) { int N = 4; CyclicBarrier barrier = new CyclicBarrier(N,new Runnable() { @Override public void run() { System.out.println("當前線程"+Thread.currentThread().getName()); } }); for(int i=0;i<N;i++) new Writer(barrier).start(); } static class Writer extends Thread{ private CyclicBarrier cyclicBarrier; public Writer(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { System.out.println("線程"+Thread.currentThread().getName()+"正在寫入數據..."); try { Thread.sleep(5000); //以睡眠來模擬寫入數據操做 System.out.println("線程"+Thread.currentThread().getName()+"寫入數據完畢,等待其餘線程寫入完畢"); cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); }catch(BrokenBarrierException e){ e.printStackTrace(); } System.out.println("全部線程寫入完畢,繼續處理其餘任務..."); } } }
執行了兩次結果分別以下:
從上面輸出結果能夠看出,每一個寫入線程執行完寫數據操做以後,就在等待其餘線程寫入操做完畢。
當全部線程線程寫入操做完畢以後,進行額外的其餘操做爲 CyclicBarrie 提供 Runnable 參數。當四個線程都到達barrier狀態後,會從四個線程中選擇一個線程去執行Runnable。而後全部線程就繼續進行後續的操做了。
另外須要注意的是:CyclicBarrier 是能夠重用的。
Semaphore翻譯成字面意思爲 信號量,Semaphore能夠控同時訪問的線程個數,經過 acquire() 獲取一個許可,若是沒有就等待,而 release() 釋放一個許可。
Semaphore類位於java.util.concurrent包下,它提供了2個構造器:
public Semaphore(int permits) { //參數permits表示許可數目,即同時能夠容許多少線程進行訪問 sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { //這個多了一個參數fair表示是不是公平的,即等待時間越久的越先獲取許可 sync = (fair)? new FairSync(permits) : new NonfairSync(permits); }
Semaphore 類中比較重要的幾個方法,首先是acquire()、release()方法:
public void acquire() throws InterruptedException { } //獲取一個許可 public void acquire(int permits) throws InterruptedException { } //獲取permits個許可 public void release() { } //釋放一個許可 public void release(int permits) { } //釋放permits個許可
acquire()用來獲取一個許可,若無許可可以得到,則會一直等待,直到得到許可。
release()用來釋放許可(調用一次增長一個許可)。
這4個方法都會被阻塞,若是想當即獲得執行結果,可使用下面幾個方法:
public boolean tryAcquire() //嘗試獲取一個許可,若獲取成功,則當即返回true,若獲取失敗,則當即返回false public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException //嘗試獲取一個許可,若在指定的時間內獲取成功,則當即返回true,不然則當即返回false public boolean tryAcquire(int permits) //嘗試獲取permits個許可,若獲取成功,則當即返回true,若獲取失敗,則當即返回false public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException //嘗試獲取permits個許可,若在指定的時間內獲取成功,則當即返回true,不然則當即返回false
另外還能夠經過 availablePermits() 方法獲得可用的許可數目:
使用舉例:
倘若一個工廠有5臺機器,可是有8個工人,一臺機器同時只能被一個工人使用,只有使用完了,其餘工人才能繼續使用。那麼咱們就能夠經過Semaphore來實現:
public class Test { public static void main(String[] args) { int N = 8; //工人數 Semaphore semaphore = new Semaphore(5); //機器數目 for(int i=0;i<N;i++) new Worker(i,semaphore).start(); } static class Worker extends Thread{ private int num; private Semaphore semaphore; public Worker(int num,Semaphore semaphore){ this.num = num; this.semaphore = semaphore; } @Override public void run() { try { semaphore.acquire(); System.out.println("工人"+this.num+"佔用一個機器在生產..."); Thread.sleep(2000); System.out.println("工人"+this.num+"釋放出機器"); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
運行結果: