📦 本文以及示例源碼已歸檔在 javacorehtml
Java 的 java.util.concurrent
包(簡稱 J.U.C)中提供了大量併發工具類,是 Java 併發能力的主要體現(注意,不是所有,有部分併發能力的支持在其餘包中)。從功能上,大體能夠分爲:java
AtomicInteger
、AtomicIntegerArray
、AtomicReference
、AtomicStampedReference
等。ReentrantLock
、ReentrantReadWriteLock
等。ConcurrentHashMap
、CopyOnWriteArrayList
、CopyOnWriteArraySet
等。ArrayBlockingQueue
、LinkedBlockingQueue
等。ConcurrentLinkedQueue
、LinkedTransferQueue
等。Executor
框架(線程池)- 如:ThreadPoolExecutor
、Executors
等。我我的理解,Java 併發框架能夠分爲如下層次。git
由 Java 併發框架圖不難看出,J.U.C 包中的工具類是基於 synchronized
、volatile
、CAS
、ThreadLocal
這樣的併發核心機制打造的。因此,要想深刻理解 J.U.C 工具類的特性、爲何具備這樣那樣的特性,就必須先理解這些核心機制。github
synchronized
是 Java 中的關鍵字,是 利用鎖的機制來實現互斥同步的。算法
synchronized
能夠保證在同一個時刻,只有一個線程能夠執行某個方法或者某個代碼塊。數據庫若是不須要
Lock
、ReadWriteLock
所提供的高級同步特性,應該優先考慮使用synchronized
,理由以下:編程
- Java 1.6 之後,
synchronized
作了大量的優化,其性能已經與Lock
、ReadWriteLock
基本上持平。從趨勢來看,Java 將來仍將繼續優化synchronized
,而不是ReentrantLock
。ReentrantLock
是 Oracle JDK 的 API,在其餘版本的 JDK 中不必定支持;而synchronized
是 JVM 的內置特性,全部 JDK 版本都提供支持。
synchronized
有 3 種應用方式:數組
Class
對象synchonized
括號裏配置的對象說明:緩存
相似
Vector
、Hashtable
這類同步類,就是使用synchonized
修飾其重要方法,來保證其線程安全。安全事實上,這類同步容器也非絕對的線程安全,當執行迭代器遍歷,根據條件刪除元素這種場景下,就可能出現線程不安全的狀況。此外,Java 1.6 針對
synchonized
進行優化前,因爲阻塞,其性能不高。綜上,這類同步容器,在現代 Java 程序中,已經漸漸不用了。
❌ 錯誤示例 - 未同步的示例
public class NoSynchronizedDemo implements Runnable { public static final int MAX = 100000; private static int count = 0; public static void main(String[] args) throws InterruptedException { NoSynchronizedDemo instance = new NoSynchronizedDemo(); Thread t1 = new Thread(instance); Thread t2 = new Thread(instance); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(count); } @Override public void run() { for (int i = 0; i < MAX; i++) { increase(); } } public void increase() { count++; } } // 輸出結果: 小於 200000 的隨機數字
Java 實例方法同步是同步在擁有該方法的對象上。這樣,每一個實例其方法同步都同步在不一樣的對象上,即該方法所屬的實例。只有一個線程可以在實例方法同步塊中運行。若是有多個實例存在,那麼一個線程一次能夠在一個實例同步塊中執行操做。一個實例一個線程。
public class SynchronizedDemo implements Runnable { private static final int MAX = 100000; private static int count = 0; public static void main(String[] args) throws InterruptedException { SynchronizedDemo instance = new SynchronizedDemo(); Thread t1 = new Thread(instance); Thread t2 = new Thread(instance); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(count); } @Override public void run() { for (int i = 0; i < MAX; i++) { increase(); } } /** * synchronized 修飾普通方法 */ public synchronized void increase() { count++; } }
靜態方法的同步是指同步在該方法所在的類對象上。由於在 JVM 中一個類只能對應一個類對象,因此同時只容許一個線程執行同一個類中的靜態同步方法。
對於不一樣類中的靜態同步方法,一個線程能夠執行每一個類中的靜態同步方法而無需等待。無論類中的那個靜態同步方法被調用,一個類只能由一個線程同時執行。
public class SynchronizedDemo2 implements Runnable { private static final int MAX = 100000; private static int count = 0; public static void main(String[] args) throws InterruptedException { SynchronizedDemo2 instance = new SynchronizedDemo2(); Thread t1 = new Thread(instance); Thread t2 = new Thread(instance); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(count); } @Override public void run() { for (int i = 0; i < MAX; i++) { increase(); } } /** * synchronized 修飾靜態方法 */ public synchronized static void increase() { count++; } }
有時你不須要同步整個方法,而是同步方法中的一部分。Java 能夠對方法的一部分進行同步。
注意 Java 同步塊構造器用括號將對象括起來。在上例中,使用了 this
,即爲調用 add 方法的實例自己。在同步構造器中用括號括起來的對象叫作監視器對象。上述代碼使用監視器對象同步,同步實例方法使用調用方法自己的實例做爲監視器對象。
一次只有一個線程可以在同步於同一個監視器對象的 Java 方法內執行。
public class SynchronizedDemo3 implements Runnable { private static final int MAX = 100000; private static int count = 0; public static void main(String[] args) throws InterruptedException { SynchronizedDemo3 instance = new SynchronizedDemo3(); Thread t1 = new Thread(instance); Thread t2 = new Thread(instance); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(count); } @Override public void run() { for (int i = 0; i < MAX; i++) { increase(); } } /** * synchronized 修飾代碼塊 */ public static void increase() { synchronized (SynchronizedDemo3.class) { count++; } } }
synchronized
通過編譯後,會在同步塊的先後分別造成 monitorenter
和 monitorexit
這兩個字節碼指令,這兩個字節碼指令都須要一個引用類型的參數來指明要鎖定和解鎖的對象。若是 synchronized
明確制定了對象參數,那就是這個對象的引用;若是沒有明確指定,那就根據 synchronized
修飾的是實例方法仍是靜態方法,去對對應的對象實例或 Class 對象來做爲鎖對象。
synchronized
同步塊對同一線程來講是可重入的,不會出現鎖死問題。
synchronized
同步塊是互斥的,即已進入的線程執行完成前,會阻塞其餘試圖進入的線程。
鎖具有如下兩種特性:
monitor
對象,這個對象其實就是 Java 對象的鎖,一般會被稱爲「內置鎖」或「對象鎖」。類的對象能夠有多個,因此每一個對象有其獨立的對象鎖,互不干擾。Java 1.6 之後,
synchronized
作了大量的優化,其性能已經與Lock
、ReadWriteLock
基本上持平。
互斥同步進入阻塞狀態的開銷都很大,應該儘可能避免。在許多應用中,共享數據的鎖定狀態只會持續很短的一段時間。自旋鎖的思想是讓一個線程在請求一個共享數據的鎖時執行忙循環(自旋)一段時間,若是在這段時間內能得到鎖,就能夠避免進入阻塞狀態。
自旋鎖雖然能避免進入阻塞狀態從而減小開銷,可是它須要進行忙循環操做佔用 CPU 時間,它只適用於共享數據的鎖定狀態很短的場景。
在 Java 1.6 中引入了自適應的自旋鎖。自適應意味着自旋的次數再也不固定了,而是由前一次在同一個鎖上的自旋次數及鎖的擁有者的狀態來決定。
鎖消除是指對於被檢測出不可能存在競爭的共享數據的鎖進行消除。
鎖消除主要是經過逃逸分析來支持,若是堆上的共享數據不可能逃逸出去被其它線程訪問到,那麼就能夠把它們當成私有數據對待,也就能夠將它們的鎖進行消除。
對於一些看起來沒有加鎖的代碼,其實隱式的加了不少鎖。例以下面的字符串拼接代碼就隱式加了鎖:
public static String concatString(String s1, String s2, String s3) { return s1 + s2 + s3; }
String 是一個不可變的類,編譯器會對 String 的拼接自動優化。在 Java 1.5 以前,會轉化爲 StringBuffer 對象的連續 append() 操做:
public static String concatString(String s1, String s2, String s3) { StringBuffer sb = new StringBuffer(); sb.append(s1); sb.append(s2); sb.append(s3); return sb.toString(); }
每一個 append() 方法中都有一個同步塊。虛擬機觀察變量 sb,很快就會發現它的動態做用域被限制在 concatString() 方法內部。也就是說,sb 的全部引用永遠不會逃逸到 concatString() 方法以外,其餘線程沒法訪問到它,所以能夠進行消除。
若是一系列的連續操做都對同一個對象反覆加鎖和解鎖,頻繁的加鎖操做就會致使性能損耗。
上一節的示例代碼中連續的 append() 方法就屬於這類狀況。若是虛擬機探測到由這樣的一串零碎的操做都對同一個對象加鎖,將會把加鎖的範圍擴展(粗化)到整個操做序列的外部。對於上一節的示例代碼就是擴展到第一個 append() 操做以前直至最後一個 append() 操做以後,這樣只須要加鎖一次就能夠了。
Java 1.6 引入了偏向鎖和輕量級鎖,從而讓鎖擁有了四個狀態:
輕量級鎖是相對於傳統的重量級鎖而言,它 使用 CAS 操做來避免重量級鎖使用互斥量的開銷。對於絕大部分的鎖,在整個同步週期內都是不存在競爭的,所以也就不須要都使用互斥量進行同步,能夠先採用 CAS 操做進行同步,若是 CAS 失敗了再改用互斥量進行同步。
當嘗試獲取一個鎖對象時,若是鎖對象標記爲 0 01,說明鎖對象的鎖未鎖定(unlocked)狀態。此時虛擬機在當前線程的虛擬機棧中建立 Lock Record,而後使用 CAS 操做將對象的 Mark Word 更新爲 Lock Record 指針。若是 CAS 操做成功了,那麼線程就獲取了該對象上的鎖,而且對象的 Mark Word 的鎖標記變爲 00,表示該對象處於輕量級鎖狀態。
偏向鎖的思想是偏向於讓第一個獲取鎖對象的線程,這個線程在以後獲取該鎖就再也不須要進行同步操做,甚至連 CAS 操做也再也不須要。
volatile 是輕量級的 synchronized,它在多處理器開發中保證了共享變量的「可見性」。
可見性的意思是當一個線程修改一個共享變量時,另一個線程能讀到這個修改的值。
一旦一個共享變量(類的成員變量、類的靜態成員變量)被 volatile 修飾以後,那麼就具有了兩層語義:
若是一個字段被聲明成 volatile,Java 線程內存模型確保全部線程看到這個變量的值是一致的。
若是 volatile
變量修飾符使用恰當的話,它比 synchronized
的使用和執行成本更低,由於它不會引發線程上下文的切換和調度。可是,volatile
沒法替代 synchronized
,由於 volatile
沒法保證操做的原子性。
一般來講,使用 volatile
必須具有如下 2 個條件:
示例:狀態標記量
volatile boolean flag = false; while(!flag) { doSomething(); } public void setFlag() { flag = true; }
示例:雙重鎖實現線程安全的單例類
class Singleton { private volatile static Singleton instance = null; private Singleton() {} public static Singleton getInstance() { if(instance==null) { synchronized (Singleton.class) { if(instance==null) instance = new Singleton(); } } return instance; } }
觀察加入 volatile 關鍵字和沒有加入 volatile 關鍵字時所生成的彙編代碼發現,加入 volatile
關鍵字時,會多出一個 lock
前綴指令。
lock
前綴指令實際上至關於一個內存屏障(也成內存柵欄),內存屏障會提供 3 個功能:
互斥同步是最多見的併發正確性保障手段。
互斥同步最主要的問題是線程阻塞和喚醒所帶來的性能問題,所以互斥同步也被稱爲阻塞同步。互斥同步屬於一種悲觀的併發策略,老是認爲只要不去作正確的同步措施,那就確定會出現問題。不管共享數據是否真的會出現競爭,它都要進行加鎖(這裏討論的是概念模型,實際上虛擬機會優化掉很大一部分沒必要要的加鎖)、用戶態核心態轉換、維護鎖計數器和檢查是否有被阻塞的線程須要喚醒等操做。
隨着硬件指令集的發展,咱們可使用基於衝突檢測的樂觀併發策略:先進行操做,若是沒有其它線程爭用共享數據,那操做就成功了,不然採起補償措施(不斷地重試,直到成功爲止)。這種樂觀的併發策略的許多實現都不須要將線程阻塞,所以這種同步操做稱爲非阻塞同步。
爲何說樂觀鎖須要 硬件指令集的發展 才能進行?由於須要操做和衝突檢測這兩個步驟具有原子性。而這點是由硬件來完成,若是再使用互斥同步來保證就失去意義了。硬件支持的原子性操做最典型的是:CAS。
CAS(Compare and Swap),字面意思爲比較並交換。CAS 有 3 個操做數,分別是:內存值 V,舊的預期值 A,要修改的新值 B。當且僅當預期值 A 和內存值 V 相同時,將內存值 V 修改成 B,不然什麼都不作。
Java 是如何實現 CAS ?
Java 主要利用 Unsafe
這個類提供的 CAS 操做。
Unsafe
的 CAS 依賴的是 JV M 針對不一樣的操做系統實現的 Atomic::cmpxchg
指令。
Atomic::cmpxchg
的實現使用了彙編的 CAS 操做,並使用 CPU 提供的 lock
信號保證其原子性。
原子類是 CAS 在 Java 中最典型的應用。
咱們先來看一個常見的代碼片斷。
if(a==b) { a++; }
若是 a++
執行前, a 的值被修改了怎麼辦?還能獲得預期值嗎?出現該問題的緣由是在併發環境下,以上代碼片斷不是原子操做,隨時可能被其餘線程所篡改。
解決這種問題的最經典方式是應用原子類的 incrementAndGet
方法。
public class AtomicIntegerDemo { public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(3); final AtomicInteger count = new AtomicInteger(0); for (int i = 0; i < 10; i++) { executorService.execute(new Runnable() { @Override public void run() { count.incrementAndGet(); } }); } executorService.shutdown(); executorService.awaitTermination(3, TimeUnit.SECONDS); System.out.println("Final Count is : " + count.get()); } }
J.U.C 包中提供了 AtomicBoolean
、AtomicInteger
、AtomicLong
分別針對 Boolean
、Integer
、Long
執行原子操做,操做和上面的示例大致類似,不作贅述。
利用原子類(本質上是 CAS),能夠實現自旋鎖。
所謂自旋鎖,是指線程反覆檢查鎖變量是否可用,直到成功爲止。因爲線程在這一過程當中保持執行,所以是一種忙等待。一旦獲取了自旋鎖,線程會一直保持該鎖,直至顯式釋放自旋鎖。
示例:非線程安全示例
public class AtomicReferenceDemo { private static int ticket = 10; public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(3); for (int i = 0; i < 5; i++) { executorService.execute(new MyThread()); } executorService.shutdown(); } static class MyThread implements Runnable { @Override public void run() { while (ticket > 0) { System.out.println(Thread.currentThread().getName() + " 賣出了第 " + ticket + " 張票"); ticket--; } } } }
輸出結果:
pool-1-thread-2 賣出了第 10 張票 pool-1-thread-1 賣出了第 10 張票 pool-1-thread-3 賣出了第 10 張票 pool-1-thread-1 賣出了第 8 張票 pool-1-thread-2 賣出了第 9 張票 pool-1-thread-1 賣出了第 6 張票 pool-1-thread-3 賣出了第 7 張票 pool-1-thread-1 賣出了第 4 張票 pool-1-thread-2 賣出了第 5 張票 pool-1-thread-1 賣出了第 2 張票 pool-1-thread-3 賣出了第 3 張票 pool-1-thread-2 賣出了第 1 張票
很明顯,出現了重複售票的狀況。
示例:使用自旋鎖來保證線程安全
能夠經過自旋鎖這種非阻塞同步來保證線程安全,下面使用 AtomicReference
來實現一個自旋鎖。
public class AtomicReferenceDemo2 { private static int ticket = 10; public static void main(String[] args) { threadSafeDemo(); } private static void threadSafeDemo() { SpinLock lock = new SpinLock(); ExecutorService executorService = Executors.newFixedThreadPool(3); for (int i = 0; i < 5; i++) { executorService.execute(new MyThread(lock)); } executorService.shutdown(); } static class SpinLock { private AtomicReference<Thread> atomicReference = new AtomicReference<>(); public void lock() { Thread current = Thread.currentThread(); while (!atomicReference.compareAndSet(null, current)) {} } public void unlock() { Thread current = Thread.currentThread(); atomicReference.compareAndSet(current, null); } } static class MyThread implements Runnable { private SpinLock lock; public MyThread(SpinLock lock) { this.lock = lock; } @Override public void run() { while (ticket > 0) { lock.lock(); if (ticket > 0) { System.out.println(Thread.currentThread().getName() + " 賣出了第 " + ticket + " 張票"); ticket--; } lock.unlock(); } } } }
輸出結果:
pool-1-thread-2 賣出了第 10 張票 pool-1-thread-1 賣出了第 9 張票 pool-1-thread-3 賣出了第 8 張票 pool-1-thread-2 賣出了第 7 張票 pool-1-thread-3 賣出了第 6 張票 pool-1-thread-1 賣出了第 5 張票 pool-1-thread-2 賣出了第 4 張票 pool-1-thread-1 賣出了第 3 張票 pool-1-thread-3 賣出了第 2 張票 pool-1-thread-1 賣出了第 1 張票
通常狀況下,CAS 比鎖性能更高。由於 CAS 是一種非阻塞算法,因此其避免了線程阻塞和喚醒的等待時間。
可是,CAS 也有一些問題。
若是一個變量初次讀取的時候是 A 值,它的值被改爲了 B,後來又被改回爲 A,那 CAS 操做就會誤認爲它歷來沒有被改變過。
J.U.C 包提供了一個帶有標記的原子引用類 AtomicStampedReference
來解決這個問題,它能夠經過控制變量值的版原本保證 CAS 的正確性。大部分狀況下 ABA 問題不會影響程序併發的正確性,若是須要解決 ABA 問題,改用傳統的互斥同步可能會比原子類更高效。
自旋 CAS (不斷嘗試,直到成功爲止)若是長時間不成功,會給 CPU 帶來很是大的執行開銷。
若是 JVM 能支持處理器提供的 pause
指令那麼效率會有必定的提高,pause
指令有兩個做用:
比較花費 CPU 資源,即便沒有任何用也會作一些無用功。
當對一個共享變量執行操做時,咱們可使用循環 CAS 的方式來保證原子操做,可是對多個共享變量操做時,循環 CAS 就沒法保證操做的原子性,這個時候就能夠用鎖。
或者有一個取巧的辦法,就是把多個共享變量合併成一個共享變量來操做。好比有兩個共享變量 i = 2, j = a
,合併一下 ij=2a
,而後用 CAS 來操做 ij
。從 Java 1.5 開始 JDK 提供了 AtomicReference
類來保證引用對象之間的原子性,你能夠把多個變量放在一個對象裏來進行 CAS 操做。
ThreadLocal
是一個存儲線程本地副本的工具類。要保證線程安全,不必定非要進行同步。同步只是保證共享數據爭用時的正確性,若是一個方法原本就不涉及共享數據,那麼天然無須同步。
Java 中的 無同步方案 有:
- 可重入代碼 - 也叫純代碼。若是一個方法,它的 返回結果是能夠預測的,即只要輸入了相同的數據,就能返回相同的結果,那它就知足可重入性,固然也是線程安全的。
- 線程本地存儲 - 使用
ThreadLocal
爲共享變量在每一個線程中都建立了一個本地副本,這個副本只能被當前線程訪問,其餘線程沒法訪問,那麼天然是線程安全的。
ThreadLocal
的方法:
public class ThreadLocal<T> { public T get() {} public void set(T value) {} public void remove() {} public static <S> ThreadLocal<S> withInitial(Supplier<? extends S> supplier) {} }
說明:
get
- 用於獲取ThreadLocal
在當前線程中保存的變量副本。set
- 用於設置當前線程中變量的副本。remove
- 用於刪除當前線程中變量的副本。若是此線程局部變量隨後被當前線程讀取,則其值將經過調用其initialValue
方法從新初始化,除非其值由中間線程中的當前線程設置。 這可能會致使當前線程中屢次調用initialValue
方法。initialValue
- 爲 ThreadLocal 設置默認的get
初始值,須要重寫initialValue
方法 。
ThreadLocal
經常使用於防止對可變的單例(Singleton)變量或全局變量進行共享。典型應用場景有:管理數據庫鏈接、Session。
示例 - 數據庫鏈接
private static ThreadLocal<Connection> connectionHolder = new ThreadLocal<Connection>() { @Override public Connection initialValue() { return DriverManager.getConnection(DB_URL); } }; public static Connection getConnection() { return connectionHolder.get(); }
示例 - Session 管理
private static final ThreadLocal<Session> sessionHolder = new ThreadLocal<>(); public static Session getSession() { Session session = (Session) sessionHolder.get(); try { if (session == null) { session = createSession(); sessionHolder.set(session); } } catch (Exception e) { e.printStackTrace(); } return session; }
示例 - 完整使用示例
public class ThreadLocalDemo { private static ThreadLocal<Integer> threadLocal = new ThreadLocal<Integer>() { @Override protected Integer initialValue() { return 0; } }; public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(10); for (int i = 0; i < 10; i++) { executorService.execute(new MyThread()); } executorService.shutdown(); } static class MyThread implements Runnable { @Override public void run() { int count = threadLocal.get(); for (int i = 0; i < 10; i++) { try { count++; Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } threadLocal.set(count); threadLocal.remove(); System.out.println(Thread.currentThread().getName() + " : " + count); } } }
所有輸出 count = 10
Thread
類中維護着一個 ThreadLocal.ThreadLocalMap
類型的成員 threadLocals
。這個成員就是用來存儲線程獨佔的變量副本。
ThreadLocalMap
是 ThreadLocal
的內部類,它維護着一個 Entry
數組, Entry
用於保存鍵值對,其 key 是 ThreadLocal
對象,value 是傳遞進來的對象(變量副本)。
ThreadLocalMap
雖然是相似 Map
結構的數據結構,但它並無實現 Map
接口。它不支持 Map
接口中的 next
方法,這意味着 ThreadLocalMap
中解決 Hash 衝突的方式並不是 拉鍊表 方式。
實際上,ThreadLocalMap
採用線性探測的方式來解決 Hash 衝突。所謂線性探測,就是根據初始 key 的 hashcode 值肯定元素在 table 數組中的位置,若是發現這個位置上已經被其餘的 key 值佔用,則利用固定的算法尋找必定步長的下個位置,依次判斷,直至找到可以存放的位置。
ThreadLocalMap 的 Entry
繼承了 WeakReference
,因此它的 key (ThreadLocal
對象)是弱引用,而 value (變量副本)是強引用。
ThreadLocal
對象沒有外部強引用來引用它,那麼 ThreadLocal
對象會在下次 GC 時被回收。Entry
中的 key 已經被回收,可是 value 因爲是強引用不會被垃圾收集器回收。若是建立 ThreadLocal
的線程一直持續運行,那麼 value 就會一直得不到回收,產生內存泄露。那麼如何避免內存泄漏呢?方法就是:使用 ThreadLocal
的 set
方法後,顯示的調用 remove
方法 。
ThreadLocal<String> threadLocal = new ThreadLocal(); try { threadLocal.set("xxx"); // ... } finally { threadLocal.remove(); }