多線程併發問題,基本是面試必問的。java
大部分同窗應該都知道Synchronized
,Lock
,部分同窗能說到volatile
、併發包
,優秀的同窗則能在前面的基礎上,說出Synchronized、volatile的原理,以及併發包中經常使用的數據結構,例如ConcurrentHashMap的原理。面試
這篇文章將總結多線程併發的各類處理方式,但願對你們有所幫助。數組
爲何多線程同時訪問(讀寫)同個變量,會有併發問題?緩存
- Java 內存模型規定了全部的變量都存儲在主內存中,每條線程有本身的工做內存。
- 線程的工做內存中保存了該線程中用到的變量的主內存副本拷貝,線程對變量的全部操做都必須在工做內存中進行,而不能直接讀寫主內存。
- 線程訪問一個變量,首先將變量從主內存拷貝到工做內存,對變量的寫操做,不會立刻同步到主內存。
- 不一樣的線程之間也沒法直接訪問對方工做內存中的變量,線程間變量的傳遞均須要本身的工做內存和主存之間進行數據同步進行。
Java 內存模型(JMM) 做用於工做內存(本地內存)和主存之間數據同步過程,它規定了如何作數據同步以及何時作數據同步,以下圖。安全
原子性:在一個操做中,CPU 不能夠在中途暫停而後再調度,即不被中斷操做,要麼執行完成,要麼就不執行。bash
可見性:多個線程訪問同一個變量時,一個線程修改了這個變量的值,其餘線程可以當即看獲得修改的值。markdown
有序性:程序執行的順序按照代碼的前後順序執行。數據結構
下面結合不一樣場景分析解決併發問題的處理方式。多線程
保證可見性,不保證原子性併發
- 當寫一個volatile變量時,JVM會把本地內存的變量強制刷新到主內存中
- 這個寫操做致使其餘線程中的緩存無效,其餘線程讀,會從主內存讀。volatile的寫操做對其它線程實時可見。
禁止指令重排序 指令重排序是指編譯器和處理器爲了優化程序性能對指令進行排序的一種手段,須要遵照必定規則:
- 不會對存在依賴關係的指令重排序,例如 a = 1;b = a; a 和b存在依賴關係,不會被重排序
- 不能影響單線程下的執行結果。好比:a=1;b=2;c=a+b這三個操做,前兩個操做能夠重排序,可是c=a+b不會被重排序,由於要保證結果是3
對於一個變量,只有一個線程執行寫操做,其它線程都是讀操做,這時候能夠用 volatile 修飾這個變量。
public class TestInstance {
private static volatile TestInstance mInstance;
public static TestInstance getInstance(){ //1
if (mInstance == null){ //2
synchronized (TestInstance.class){ //3
if (mInstance == null){ //4
mInstance = new TestInstance(); //5
}
}
}
return mInstance;
}
複製代碼
}
假如沒有用volatile,併發狀況下會出現問題,線程A執行到註釋5 new TestInstance()
的時候,分爲以下幾個幾步操做:
這時候若是發生指令重排,執行順序是132,執行到第3的時候,線程B恰好進來了,而且執行到註釋2,這時候判斷mInstance 不爲空,直接使用一個未初始化的對象。因此使用volatile關鍵字來禁止指令重排序。
在JVM底層volatile是採用內存屏障來實現的,內存屏障會提供3個功能:
- 它確保指令重排序時不會把其後面的指令排到內存屏障以前的位置,也不會把前面的指令排到內存屏障的後面;即在執行到內存屏障這句指令時,在它前面的操做已經所有完成;
- 它會強制將緩存的修改操做當即寫到主內存
- 寫操做會致使其它CPU中的緩存行失效,寫以後,其它線程的讀操做會從主內存讀。
volatile 只能保證可見性,不能保證原子性寫操做對其它線程可見,可是不能解決多個線程同時寫的問題。
多個線程同時寫一個變量。
例如售票,餘票是100張,窗口A和窗口B同時各賣出一張票, 假如餘票變量用 volatile 修飾,是有問題的。
A窗口獲取餘票是100,B窗口獲取餘票也是100,A賣出一張變成99,刷新回主內存,同時B賣出一張變成99,也刷新回主內存,會致使最終主內存餘票是99而不是98。
前面說到 volatile 的侷限性,就是多個線程同時寫的狀況,這種狀況通常可使用Synchronized。
Synchronized 能夠保證同一時刻,只有一個線程可執行某個方法或某個代碼塊。
public class SynchronizedTest {
public static void main(String[] args) {
synchronized (SynchronizedTest.class) {
System.out.println("123");
}
method();
}
private static void method() {
}
}
複製代碼
將這段代碼先用javac
命令編譯,再java p -v SynchronizedTest.class
命令查看字節碼,部分字節碼以下
public static void main(java.lang.String[]);
descriptor: ([Ljava/lang/String;)V
flags: ACC_PUBLIC, ACC_STATIC
Code:
stack=2, locals=3, args_size=1
0: ldc #2 // class com/lanshifu/opengldemo/test/SynchronizedTest
2: dup
3: astore_1
4: monitorenter
5: getstatic #3 // Field java/lang/System.out:Ljava/io/PrintStream;
8: ldc #4 // String 123
10: invokevirtual #5 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
13: aload_1
14: monitorexit
15: goto 23
18: astore_2
19: aload_1
20: monitorexit
21: aload_2
22: athrow
23: invokestatic #6 // Method method:()V
26: return
複製代碼
能夠看到 4: monitorenter
和 14: monitorexit
,中間是打印的語句。
執行同步代碼塊,首先會執行monitorenter
指令,而後執行同步代碼塊中的代碼,退出同步代碼塊的時候會執行monitorexit
指令 。
使用Synchronized進行同步,其關鍵就是必需要對對象的監視器monitor進行獲取,當線程獲取monitor後才能繼續往下執行,不然就進入同步隊列,線程狀態變成BLOCK,同一時刻只有一個線程可以獲取到monitor,當監聽到monitorexit被調用,隊列裏就有一個線程出隊,獲取monitor。詳情參考:www.jianshu.com/p/d53bf830f…
每一個對象擁有一個計數器,當線程獲取該對象鎖後,計數器就會加一,釋放鎖後就會將計數器減一,因此只要這個鎖的計數器大於0,其它線程訪問就只能等待。
你們對Synchronized的理解可能就是重量級鎖,可是Java1.6對 Synchronized 進行了各類優化以後,有些狀況下它就並不那麼重,Java1.6 中爲了減小得到鎖和釋放鎖帶來的性能消耗而引入的偏向鎖和輕量級鎖。
偏向鎖: 大多數狀況下,鎖不只不存在多線程競爭,並且老是由同一線程屢次得到,爲了讓線程得到鎖的代價更低而引入了偏向鎖。
當一個線程A訪問加了同步鎖的代碼塊時,會在對象頭中存 儲當前線程的id,後續這個線程進入和退出這段加了同步鎖的代碼塊時,不須要再次加鎖和釋放鎖。
輕量級鎖: 在偏向鎖狀況下,若是線程B也訪問了同步代碼塊,比較對象頭的線程id不同,會升級爲輕量級鎖,而且經過自旋的方式來獲取輕量級鎖。
重量級鎖: 若是線程A和線程B同時訪問同步代碼塊,則輕量級鎖會升級爲重量級鎖,線程A獲取到重量級鎖的狀況下,線程B只能入隊等待,進入BLOCK狀態。
上面說到Synchronized
的缺點,不能設置鎖超時時間和不能經過代碼釋放鎖,ReentranLock
就能夠解決這個問題。
在多個條件變量和高度競爭鎖的地方,用ReentrantLock更合適,ReentrantLock還提供了Condition
,對線程的等待和喚醒等操做更加靈活,一個ReentrantLock能夠有多個Condition實例,因此更有擴展性。
lock 和 unlock
ReentrantLock reentrantLock = new ReentrantLock(); System.out.println("reentrantLock->lock"); reentrantLock.lock(); try { System.out.println("睡眠2秒..."); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }finally { reentrantLock.unlock(); System.out.println("reentrantLock->unlock"); } 複製代碼
實現可定時的鎖請求:tryLock
public static void main(String[] args) { ReentrantLock reentrantLock = new ReentrantLock(); Thread thread1 = new Thread_tryLock(reentrantLock); thread1.setName("thread1"); thread1.start(); Thread thread2 = new Thread_tryLock(reentrantLock); thread2.setName("thread2"); thread2.start(); } static class Thread_tryLock extends Thread { ReentrantLock reentrantLock; public Thread_tryLock(ReentrantLock reentrantLock) { this.reentrantLock = reentrantLock; } @Override public void run() { try { System.out.println("try lock:" + Thread.currentThread().getName()); boolean tryLock = reentrantLock.tryLock(3, TimeUnit.SECONDS); if (tryLock) { System.out.println("try lock success :" + Thread.currentThread().getName()); System.out.println("睡眠一下:" + Thread.currentThread().getName()); Thread.sleep(5000); System.out.println("醒了:" + Thread.currentThread().getName()); } else { System.out.println("try lock 超時 :" + Thread.currentThread().getName()); } } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("unlock:" + Thread.currentThread().getName()); reentrantLock.unlock(); } } } 複製代碼
打印的日誌:
try lock:thread1 try lock:thread2 try lock success :thread2 睡眠一下:thread2 try lock 超時 :thread1 unlock:thread1 Exception in thread "thread1" java.lang.IllegalMonitorStateException at java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:151) at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1261) at java.util.concurrent.locks.ReentrantLock.unlock(ReentrantLock.java:457) at com.lanshifu.demo_module.test.lock.ReentranLockTest$Thread_tryLock.run(ReentranLockTest.java:60) 醒了:thread2 unlock:thread2 複製代碼
上面演示了trtLock
的使用,trtLock
設置獲取鎖的等待時間,超過3秒直接返回失敗,能夠從日誌中看到結果。 有異常是由於thread1獲取鎖失敗,不該該調用unlock。
public static void main(String[] args) { Thread_Condition thread_condition = new Thread_Condition(); thread_condition.setName("測試Condition的線程"); thread_condition.start(); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } thread_condition.singal(); } static class Thread_Condition extends Thread { @Override public void run() { await(); } private ReentrantLock lock = new ReentrantLock(); public Condition condition = lock.newCondition(); public void await() { try { System.out.println("lock"); lock.lock(); System.out.println(Thread.currentThread().getName() + ":我在等待通知的到來..."); condition.await();//await 和 signal 對應 //condition.await(2, TimeUnit.SECONDS); //設置等待超時時間 System.out.println(Thread.currentThread().getName() + ":等到通知了,我繼續執行>>>"); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("unlock"); lock.unlock(); } } public void singal() { try { System.out.println("lock"); lock.lock(); System.out.println("我要通知在等待的線程,condition.signal()"); condition.signal();//await 和 signal 對應 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("unlock"); lock.unlock(); } } } 複製代碼
運行打印日誌
lock
測試Condition的線程:我在等待通知的到來...
lock
我要通知在等待的線程,condition.signal()
unlock
測試Condition的線程:等到通知了,我繼續執行>>>
unlock
複製代碼
上面演示了Condition的 await 和 signal
使用,前提要先lock。
ReentrantLock 構造函數傳true表示公平鎖。
公平鎖表示線程獲取鎖的順序是按照線程加鎖的順序來分配的,即先來先得的順序。而非公平鎖就是一種鎖的搶佔機制,是隨機得到鎖的,可能會致使某些線程一致拿不到鎖,因此是不公平的。
經過上面分析,併發嚴重的狀況下,使用鎖顯然效率低下,由於同一時刻只能有一個線程能夠得到鎖,其它線程只能乖乖等待。
Java提供了併發包解決這個問題,接下來介紹併發包裏一些經常使用的數據結構。
咱們都知道HashMap是線程不安全的數據結構,HashTable則在HashMap基礎上,get方法和put方法加上Synchronized修飾變成線程安全,不過在高併發狀況下效率底下,最終被ConcurrentHashMap
替代。
ConcurrentHashMap 採用分段鎖,內部默認有16個桶,get和put操做,首先將key計算hashcode,而後跟16取餘,落到16個桶中的一個,而後每一個桶中都加了鎖(ReentrantLock),桶中是HashMap結構(數組加鏈表,鏈表過長轉紅黑樹)。
因此理論上最多支持16個線程同時訪問。
鏈表結構的阻塞隊列,內部使用多個ReentrantLock
/** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition(); private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } } /** * Signals a waiting put. Called only from take/poll. */ private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } } 複製代碼
源碼不貼太多,簡單說一下LinkBlockingQueue
的邏輯:
- 從隊列獲取數據,若是隊列中沒有數據,會調用
notEmpty.await();
進入等待。- 在放數據進去隊列的時候會調用
notEmpty.signal();
,通知消費者,1中的等待結束,喚醒繼續執行。- 從隊列裏取到數據的時候會調用
notFull.signal();
,通知生產者繼續生產。- 在put數據進入隊列的時候,若是判斷隊列中的數據達到最大值,那麼會調用
notFull.await();
,等待消費者消費掉,也就是等待3去取數據而且發出notFull.signal();
,這時候生產者才能繼續生產。
LinkBlockingQueue
是典型的生產者消費者模式,源碼細節就很少說。
內部採用CAS(compare and swap)保證原子性
舉一個int自增的例子
AtomicInteger atomicInteger = new AtomicInteger(0);
atomicInteger.incrementAndGet();//自增
複製代碼
源碼看一下
/** * Atomically increments by one the current value. * * @return the updated value */ public final int incrementAndGet() { return U.getAndAddInt(this, VALUE, 1) + 1; } 複製代碼
U 是 Unsafe,看下 Unsafe#getAndAddInt
public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; } 複製代碼
經過compareAndSwapInt
保證原子性。
面試中問到多線程併發問題,能夠這麼答:
- 當只有一個線程寫,其它線程都是讀的時候,能夠用
volatile
修飾變量- 當多個線程寫,那麼通常狀況下併發不嚴重的話能夠用
Synchronized
,Synchronized並非一開始就是重量級鎖,在併發不嚴重的時候,好比只有一個線程訪問的時候,是偏向鎖;當多個線程訪問,但不是同時訪問,這時候鎖升級爲輕量級鎖;當多個線程同時訪問,這時候升級爲重量級鎖。因此在併發不是很嚴重的狀況下,使用Synchronized是能夠的。不過Synchronized有侷限性,好比不能設置鎖超時,不能經過代碼釋放鎖。ReentranLock
能夠經過代碼釋放鎖,能夠設置鎖超時。- 高併發下,Synchronized、ReentranLock 效率低,由於同一時刻只有一個線程能進入同步代碼塊,若是同時有不少線程訪問,那麼其它線程就都在等待鎖。這個時候可使用併發包下的數據結構,例如
ConcurrentHashMap
,LinkBlockingQueue
,以及原子性的數據結構如:AtomicInteger
。
面試的時候按照上面總結的這個思路回答基本就ok了。既然說到併發包,那麼除了ConcurrentHashMap
,其它一些經常使用的數據結構的原理也須要去了解下,例如HashMap、HashTable、TreeMap
原理,ArrayList、LinkedList
對比,這些都是老生常談的,本身去看源碼或者一些博客。
關於多線程併發就先總結到這裏,若是是應付面試的話按照這篇文章的思路來準備應該是沒太大問題的。