關於Java併發的書籍和文章已經有不少了,可是就我本身的學習下來的感覺來講,有一些看似簡單的知識點,以致於大神們和文章的做者們都直接忽略了,可是這些知識點卻很重要,若是不搞清楚,很難「完全理解、融會貫通」,這種似懂非懂的感受讓我很難受,因此我總結了這篇文章,可能不會有什麼牛X的技術,高深的理論,可是這些思考曾經讓我對Java併發的認知更進了一步,送給大家。html
先提幾個曾經困擾過個人問題啊,看似很簡單,並且可能還有不少同窗還存在誤解,咱們來一塊兒看一下。java
若是這些問題也曾困擾過你,那這篇文章最合適你不過了,接下來咱們一塊兒進入Java的世界扒一扒併發。程序員
這2個概念是Java內存模型(Java Memory Model)中提出的,關於內存模型的詳細介紹猛戳這裏,咱們目前只須要知道內存模型是幫咱們屏蔽底層硬件細節的,程序員只須要按照它的規則來寫代碼,寫的程序就能夠實現跨平臺運行,很巧妙的設計。緩存
瞭解了內存模型,咱們回到主題,咱們知道JVM將內存劃分瞭如下幾大塊安全
那主內存,工做內存跟它們的對應關係是怎麼樣的呢? 這裏直接給出結論。bash
這個知識點看似不起眼,可是卻很重要,由於只有有了這個結論,才能與咱們後面的實際代碼例子結合起來,不然就會感受理論與實際操做脫節了,無法對應起來。數據結構
需求是這樣的,有一個Counter計數器類,內部有一個count成員變量int類型,記錄當前的總數,具體定義以下。多線程
public class Counter {
private int count;
public void increment() {
this.count++;
}
public int getCount() {
return this.count;
}
}
複製代碼
咱們如今的任務是調用一億次increment方法,而後打印count的數量,那麼顯然正確的輸出應該是一億。併發
public static void main(String[] args) {
// 單線程代碼
Counter counter = new Counter();
int loopCount = 100000000;
long startTime = System.currentTimeMillis();
for (int i = 0; i < loopCount; i++) {
counter.increment();
}
System.out.println("count:" + counter.getCount());
System.out.println("take time:" + (System.currentTimeMillis() - startTime) + "ms");
}
// 輸出結果
count:100000000
take time:577ms
複製代碼
so easy!這樣的代碼咱們太熟悉了,可是此次我想從代碼在虛擬機棧中的具體執行過程來加深理解程序是怎麼運做的。先經過Javac和Javap命令查看Counter類的increment方法的字節碼實現。app
javac Counter.java
javap -verbose Counter.class
// Counter類 increment方法的字節碼
public void increment();
descriptor: ()V
flags: ACC_PUBLIC
Code:
stack=3, locals=1, args_size=1
0: aload_0
1: dup
2: getfield #2 // Field count:I
5: iconst_1
6: iadd
7: putfield #2 // Field count:I
10: return
複製代碼
咱們知道Java中方法的調用是基於棧幀實現的,每一個棧幀中主要包含操做數棧+用到類的運行時常量池引用+本地變量表。我畫了一張示意圖幫助你們理解整個執行過程,而且將其中一次count++操做的字節碼在操做數棧中的執行步驟分解了(以count=10爲例),這裏要注意,下面這張圖的運行是在工做內存中(main方法所在線程的虛擬機棧中)。
上面的條件只要有一個被打破,執行的結果就可能不正確,這也就是爲何Java多線程環境下容易出現併發問題,緣由就是沒有同時知足這三個條件。
上面已經提到,咱們上面的Count類的實例中,需同時知足 有序性,可見性,原子性,其中有序性和原子性,咱們比較容易想到由於多個線程交叉執行,若是不加同步控制,有序性和原子性確定無法保證,可是這裏比較難理解的是可見性,骨頭先撿難啃的啃,因此接下來咱們先談談可見性。
仍是以咱們上面的示例來講明。 咱們已經知道
counter.increment();
複製代碼
編譯成字節碼爲
getfield #2
iconst_1
iadd
putfield #2
複製代碼
前面已經說過,這裏的字節碼的執行過程是在工做內存中,可是getField和putField這二條指令實際上是跟主內存有交互的,這裏仍是以Counter類的increment方法爲例。
正是由於CPU高速cache的存在,在多核環境中會有可見性的問題。這裏額外提一句 ,之因此有高速cache存在,是爲提升運行效率,現代CPU的速度比咱們的內存快不少,若是每次都鎖總線寫主存,會致使執行速度降低不少,這是不能夠接受的,木桶理論咱們都能理解哈。這裏我也畫了一張圖,來幫助你們理解。
volatile修飾的變量有下面的特性
其中monitor能夠理解爲鎖,moniter release就是釋放鎖,monitor acquire就是獲取鎖,這樣就是volatile變量的讀寫都是直接對主存操做的,至關於犧牲一部分性能來換取可見性,這一部分犧牲的性能通常是能夠忽略不計的,只須要知道有這麼回事就行。
給count加上volatile修飾符後,查看編譯後的字節碼後會發現,字節碼層面惟一的變化是給count添加了ACC_VOLATILE標識flag,在運行時會根據這個flag會自動插入內存屏障,保證volatile可見性語義,內存屏障一共有四種,分別是:
這裏有個文檔,比較權威詳細的說明了內存屏障的知識,這一塊知識你們能夠本身繼續深刻。這裏給出文檔中的一個實例,比較形象的說明了內存屏障是怎麼插入的。
再回到上面的例子,咱們給count添加上volatile修飾符以後,是否是就能在多線程中獲得正確的累加結果呢?咱們試驗一下,簡單起見,咱們只開2個線程,每一個線程分配一半的計算量。
// Counter.java
private volatile int count;
// main 方法
Counter counter = new Counter();
int loopCount = 100000000;
int halfCount = loopCount / 2;
Thread thread1 = new Thread(() -> {
for (int i = 0; i < halfCount; i++) {
counter.increment();
}
});
Thread thread2 = new Thread(() -> {
for (int i = halfCount; i < loopCount; i++) {
counter.increment();
}
});
thread1.start();
thread2.start();
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("count:" + counter.getCount());
System.out.println("take time:" + (System.currentTimeMillis() - startTime) + "ms");
// 運行結果
count:51743664
take time:2335ms
複製代碼
結果顯然仍是不對的,並且程序運行的時間長了好幾倍了。這是由於volatile只保證了可見性,卻沒有原子性語義,好比下面這種狀況
那咱們不妨設想下,若是在putfield以前,檢查下當前棧中存儲的count是否是最新的,若是不是最新的從新讀取count,而後重試,若是是最新的,直接寫入更新值,彷佛這樣就能解決咱們上面出現的錯誤寫入的問題。看起來彷佛是一個不錯的想法,可是必定要注意,整個檢查過程要保證原子性,不然仍然會有併發問題。事實上JDK中Unsafe包裏面的CAS方法就是這個思路,不斷循環嘗試,這個過程就是自旋,它的底層實現依賴cmpxchgl 和 cmpxchgq這二個彙編指令,不一樣平臺的cpu有不一樣的實現,可是代碼大同小異,我在這裏以opekjdk8爲例扒一扒CAS的源碼,源碼比較多我只會貼出關鍵代碼塊。
// Unsafe.class中的三個CAS方法,都是native的
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
複製代碼
它對應的native實如今hotspot/src/share/vm/prims/unsafe.cpp
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END
複製代碼
篇幅關係,這裏只貼上compareAndSwapInt的實現,能夠看到又調用了Atomic::cmpxchg方法,繼續跟進去
unsigned Atomic::cmpxchg(unsigned int exchange_value,
volatile unsigned int* dest, unsigned int compare_value) {
assert(sizeof(unsigned int) == sizeof(jint), "more work to do");
return (unsigned int)Atomic::cmpxchg((jint)exchange_value, (volatile jint*)dest,
(jint)compare_value);
}
jbyte Atomic::cmpxchg(jbyte exchange_value, volatile jbyte* dest, jbyte compare_value) {
assert(sizeof(jbyte) == 1, "assumption.");
uintptr_t dest_addr = (uintptr_t)dest;
uintptr_t offset = dest_addr % sizeof(jint);
volatile jint* dest_int = (volatile jint*)(dest_addr - offset);
jint cur = *dest_int;
jbyte* cur_as_bytes = (jbyte*)(&cur);
jint new_val = cur;
jbyte* new_val_as_bytes = (jbyte*)(&new_val);
new_val_as_bytes[offset] = exchange_value;
while (cur_as_bytes[offset] == compare_value) {
jint res = cmpxchg(new_val, dest_int, cur);
if (res == cur) break;
cur = res;
new_val = cur;
new_val_as_bytes[offset] = exchange_value;
}
return cur_as_bytes[offset];
}
複製代碼
咱們跟蹤到了調用了cmpxchg這個方法,這個方法不是在atomic.cpp中定義的,查看atomic.hpp,看到了cmpxchg對應的內聯函數的定義
inline static jint cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value);
// See comment above about using jlong atomics on 32-bit platforms
inline static jlong cmpxchg (jlong exchange_value, volatile jlong* dest, jlong compare_value);
複製代碼
這裏咱們以solaris_x86平臺爲例,cmpxchg對應的內涵函數定義在hotspot/src/os_cpu/solaris_x86/vm/atomic_solaris_x86.inline.hpp
#define LOCK_IF_MP(mp) "cmp $0, " #mp "; je 1f; lock; 1: "
inline jint _Atomic_cmpxchg(jint exchange_value, volatile jint* dest, jint compare_value, int mp) {
__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
: "=a" (exchange_value)
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", "memory");
return exchange_value;
}
複製代碼
這個是內嵌彙編代碼,實話說,彙編這一塊的知識我也還給老師了。根據LOCK_IF_MP這個宏定義判斷是否是多核心,若是是多核心須要加鎖,可是這個鎖是cpu總線鎖,它的代價比咱們應用層中用的Lock代價小得多。同時咱們看到cmpxchgl這個關鍵的指令。追到這一層,我想對於應用開發工程師已經足夠了。瞭解了底層實現,咱們來現學現賣實戰一波。
要使用CAS,確定要使用Unsafe類,咱們仍是經過反射來獲取Unsafe對象,先看UnsafeUtil類的實現
// UnsafeUtil.java
public static Unsafe getUnsafeObject() {
Class clazz = AtomicInteger.class;
try {
Field uFiled = clazz.getDeclaredField("unsafe");
uFiled.setAccessible(true);
return (Unsafe) uFiled.get(null);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public static long getVariableOffset(Object target, String variableName) {
Object unsafeObject = getUnsafeObject();
if (unsafeObject != null) {
try {
Method method = unsafeObject.getClass().getDeclaredMethod("objectFieldOffset", Field.class);
method.setAccessible(true);
Field targetFiled = target.getClass().getDeclaredField(variableName);
return (long) method.invoke(unsafeObject, targetFiled);
} catch (Exception e) {
e.printStackTrace();
}
}
return -1;
}
複製代碼
再來看Counter類的實現
public class Counter {
private volatile int count;
private Unsafe mUnsafe;
private long countOffset;
public Counter() {
mUnsafe = UnsafeUtil.getUnsafeObject();
countOffset = UnsafeUtil.getVariableOffset(this, "count");
}
public void increment() {
int cur = getCount();
while (!mUnsafe.compareAndSwapInt(this, countOffset, cur, cur+1)) {
cur = getCount();
}
}
public int getCount() {
return this.count;
}
}
複製代碼
再次開啓二個線程,執行咱們的累加程序
// 輸出結果
count:100000000
take time:5781ms
複製代碼
能夠看到咱們獲得正確的累加結果,可是運行時長更長了,可是還好,時間複雜度仍是在一個數量級上的。這裏要注意一點的是,上述示例代碼中,我給count變量增長了volatile關鍵字,其實就算不加volatile關鍵字,在這裏CAS也是可以正確工做的,可是效率會低一點,我測試下來差很少性能會低5%左右,你們能夠思考下爲何不加volatile效率會低?
volatile關鍵字還有一個禁止指令重排序的語義,一個經典的應用就是DCL單例模式,你們應該都很熟了,就不贅述了。
到這裏,關於可見性咱們已經討論的差很少了,接下來咱們來討論」原子性「
其實上面咱們已經說起了一些,好比CAS自己就是原子的,那想想還有哪些是原子的?我這裏仍是以Counter類的increment方法爲例。
0: aload_0
1: dup
2: getfield #2 // Field count:I
5: iconst_1
6: iadd
7: putfield #2 // Field count:I
10: return
複製代碼
這7條字節碼指令,都是原子的,沒有問題吧,可是咱們若是再往深處想的話,仍是會有疑問,單個字節碼指令都是原子的嗎? 若是單條都不是原子的,我想咱們前面的全部判斷都是錯誤的,由於咱們得出結論的理論基石被打破了。事實是,單條字節碼就是原子的,這個原子性由誰來保證呢?由Java內存模型JMM來保證,程序員不須要知道具體的細節。
雖然單條字節碼是原子的,可是多條字節碼組合起來就不是原子的了。這也是不少併發問題發生的根源。那咱們程序員有哪些手段保證原子性呢?大概有如下三種。
CAS咱們上面已經討論過,這裏不贅述了,咱們來看看synchronized
synchronized算是咱們最經常使用的同步方式,主要有如下三種使用方式
// 普通類方法同步
synchronized publid void invoke() {}
// 類靜態方法同步
synchronized public static void invoke() {}
// 代碼塊同步
synchronized(object) {
}
複製代碼
這三種方式不一樣之處在於咱們進行同步的對象不一樣,普通類synchronized同步的就是對象自己,靜態方法同步的是類Class自己,代碼塊同步的是咱們在括號內部填入的對象。本質上它們的原理是相同的,都會有一個monitor的對象與咱們要進行同步的對象進行關聯,當有一個線程持有了monitor的鎖後,其餘線程必須等待,一直到該線程釋放了該monitor才能被別的線程從新獲取,hotspot虛擬機中,它在native層對應的實現類是ObjectMonitor.hpp,這個類內部維護了不少同步相關的變量,咱們重點關注二個變量
void * volatile _owner; // pointer to owning thread OR BasicLock
ObjectWaiter * volatile _WaitSet; // LL of threads wait()ing on the monitor
複製代碼
_owner表明當前持有鎖的線程,—WaitSet表明等待鎖的線程隊列,經過ObjectWaiter的數據結構能夠推斷這是一個雙向循環鏈表。
回到咱們的實例,這一次咱們經過synchronized來實現咱們的Counter類。
public void increment() {
synchronized (Counter.class) {
this.count++;
}
}
// 再運行一次,
// 輸出結果
count:100000000
take time:4881ms
複製代碼
輸出的結果是正確的,並且從咱們打印的運行時間結果來看,synchronized加鎖後的執行速度比咱們上面的CAS還要快,這有一點點反直覺,其實synchronized在Java1.7後引入了偏向鎖,輕量級鎖後,synchronized性能有了較大提高,因此在使用synchronized的時候不須要有太多的心理負擔,通常狀況下,對性能不是極度要求高的話,使用synchronized沒有問題。
這裏仍是貼下increment方法加上synchronized同步後的字節碼實現。
// 咱們的實現
synchronized (Counter.class) {
this.count++;
}
// 編譯後,至關於下面的僞代碼
monitorenter Counter.class;
try {
this.count++;
monitorexit Counter.class;
} finally {
monitorexit Counter.class;
}
複製代碼
除了synchronized關鍵字以外,還有個比較經常使用的用來作同步類就是ReentrantLock
ReentrantLock的使用你們應該都很熟了,篇幅關係,這裏只簡單提一下用法,更詳細的使用文檔你們能夠自行查閱相關資料。
// 參數表示 是不是公平鎖,公平鎖嚴格按照等待順序獲取鎖,可是吞吐率低性能差
// 非公平鎖性能高,可是有可能會出現鎖等待飢餓
ReentrantLock reentrantLock = new ReentrantLock(false);
// 一個標準的使用方式
reentrantLock.lock();
try {
// do something
} finally {
reentrantLock.unlock();
}
複製代碼
這裏要注意的是加鎖的lock方法的調用,必定要在try-catch-finally的前面,不能在內部,由於若是在內部調用lock,若是代碼在lock以前就出現異常了,就會出現咱們沒有加鎖就執行了finally裏面的釋放鎖,確定會有問題。
爲了對比,咱們仍是使用ReentrantLock實現一遍上面Counter累加。
Lock lock = new ReentrantLock(false);
Thread thread1 = new Thread(() -> {
for (int i = 0; i < halfCount; i++) {
lock.lock();
try {
counter.increment();
} finally {
lock.unlock();
}
}
});
Thread thread2 = new Thread(() -> {
for (int i = halfCount; i < loopCount; i++) {
lock.lock();
try {
counter.increment();
} finally {
lock.unlock();
}
}
});
// 運行程序,輸出結果
count:100000000
take time:12754ms
複製代碼
輸出結果也是正確的,可是運行時間倒是最長的,差很少是咱們用synchronized的三倍。固然咱們這裏並非搞性能測試,運行的時間也沒什麼參考意義,貼出來只是讓你們有個直觀認識,那就是CAS未必就必定性能高,synchronized未必就必定性能差,要具體問題具體分析,必定要有質疑的意識。
既然說到了Lock,咱們仍是扒一扒它的實現源碼,這裏以非公平鎖爲例。
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
//......
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
}
複製代碼
能夠看到,lock方法的實現,首先會用CAS嘗試去設置State爲1,若是設置成功,將exclusiveOwnerThread設置爲當前thread,若是設置不成功調用acquire方法,又調用了tryAcquire,咱們能夠重寫該方法來自定義鎖的邏輯,ReentrantLock中的默認實現以下
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
複製代碼
首先嚐試獲取state狀態,若是==0嘗試用CAS獲取鎖,若是!=0,檢查是不是當前線程擁有鎖,若是是,將state+1,返回true,若是不是返回false。因此這也就是爲何是可重入鎖的緣由,容許鎖的嵌套,若是已經獲取了鎖,state+1而後返回true。
// 可重入鎖,容許嵌套重複得到鎖,僞代碼以下
lock.lock();
lock.lock();
// do something ...
lock.unlock();
lock.unlock();
複製代碼
總結一下,ReentrantLock的實現是基於隊列同步器AbstractQueuedSynchronizer(AQS)的,而AQS內部也是封裝了CAS來實現的,深究下去仍是有不少內容的,能夠說整個concurrent包都是創建在CAS+AQS這二塊基石上的,篇幅關係,更多的實現細節咱們這裏就不討論了,你們能夠自行參考相關的源碼分析文章加深理解。
到這裏,咱們討論了Java中的」原子性「,以及若是保證」原子性「的三種常規手段,原子性的討論就結束了。接下來咱們進入最後一個問題,Java的」有序性「。
爲了方便說明,咱們首先舉一個簡單的例子
int a = 1; ①
int b = 2; ②
int c = a; ③
複製代碼
按咱們的預期,執行的順序應該是①②③,可是 真實的執行順序多是②①③或者①③②,這是由於指令重排序的緣由,基本有二種重排序
重排序的目的是優化咱們的程序運行速度,可是優化的前提是不能破壞as-if-serial語義,簡單來講,以上面的例子爲例,③因爲有依賴①的結果,因此它須要永遠排在①後面執行。 在單線程有序性還比較容易保證,可是在多線程狀況就會變得複雜起來。因此JMM中抽象出了一個happen-before原則,這個原則是JMM給咱們開發者的承諾,讓咱們寫代碼時對多線程狀況下的有序性有一個正確的預期。這個原則有下面5條。
固然happen-before具備傳遞性,若是A happen-before B, B happen-before C,則A 也 happen-before C。 須要注意的是,happen-before並不徹底等同於時間意義上的先執行,好比上面的例子中,根據第一條happen-before原則,int a = 1; 這條語句 happen-before int b = 2; 這條語句,可是因爲兩者之間沒有依賴關係,能夠指令重排,因此能夠是 int b = 2;先執行,這是合法的,並不違背happen-before原則。
理解這幾條happen-before原則後,不少咱們平時常常寫的併發代碼就有了理論依據,好比第二條,加鎖happen-before解鎖,因此保證了鎖的同步範圍內的代碼,具備原子性和有序性,同時加鎖和解鎖都會插入內存屏障,可見性也獲得保障,因此加鎖後的代碼是線程安全的。再好比第三條,volatile的寫happen-before於volatile的讀,有了這一條,多線程之間volatile修飾的共享變量的可見性獲得保證。
另外幾條原則比較好理解,你們能夠自行結合實際代碼加深理解,這裏就不贅述了。
Java併發算是一個比較高級的主題,可是這一塊的知識又是高級工程師必須掌握的,骨頭再難啃也得啃,但願本文的一些總結能幫助到但願深刻了解Java併發的同窗,哪怕是其中能有一點,能讓你在閱讀中有豁然開朗的感受,個人目的就達到了。
最後,來針雞血,」怕什麼真理無窮,進一寸有一寸的歡喜「。
碼字不易,若是喜歡點個讚唄!