本系列研究總結高併發下的幾種同步鎖的使用以及之間的區別,分別是:ReentrantLock、CountDownLatch、CyclicBarrier、Phaser、ReadWriteLock、StampedLock、Semaphore、Exchanger、LockSupport。因爲博客園對博客字數的要求限制,會分爲三個篇幅:html
高併發之ReentrantLock、CountDownLatch、CyclicBarrierjava
高併發之Phaser、ReadWriteLock、StampedLocknode
高併發之Semaphore、Exchanger、LockSupport算法
信號量(Semaphore),有時被稱爲信號燈,是在多線程環境下使用的一種設施, 它負責協調各個線程, 以保證它們可以正確、合理的使用公共資源。Semaphore分爲單值和多值兩種,前者只能被一個線程得到,後者能夠被若干個線程得到。
數組
以一個停車場是運做爲例。爲了簡單起見,假設停車場只有三個車位,一開始三個車位都是空的。這是若是同時來了五輛車,看門人容許其中三輛不受阻礙的進入,而後放下車攔,剩下的車則必須在入口等待,此後來的車也都不得不在入口處等待。這時,有一輛車離開停車場,看門人得知後,打開車攔,放入一輛,若是又離開兩輛,則又能夠放入兩輛,如此往復。緩存
在這個停車場系統中,車位是公共資源,每輛車比如一個線程,看門人起的就是信號量的做用。更進一步,信號量的特性以下:信號量是一個非負整數(車位數),全部經過它的線程(車輛)都會將該整數減一(經過它固然是爲了使用資源),當該整數值爲零時,全部試圖經過它的線程都將處於等待狀態。在信號量上咱們定義兩種操做: Wait(等待) 和 Release(釋放)。 當一個線程調用Wait等待)操做時,它要麼經過而後將信號量減一,要麼一自等下去,直到信號量大於一或超時。Release(釋放)其實是在信號量上執行加操做,對應於車輛離開停車場,該操做之因此叫作「釋放」是應爲加操做其實是釋放了由信號量守護的資源。數據結構
public class TestSemaphore { public static void main(String[] args) { //Semaphore s = new Semaphore(2); Semaphore s = new Semaphore(2, true); //容許一個線程同時執行 //Semaphore s = new Semaphore(1); new Thread(()->{ try { s.acquire(); System.out.println("T1 running..."); Thread.sleep(200); System.out.println("T1 running..."); } catch (InterruptedException e) { e.printStackTrace(); } finally { s.release(); } }).start(); new Thread(()->{ try { s.acquire(); System.out.println("T2 running..."); Thread.sleep(200); System.out.println("T2 running..."); s.release(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }
Exchanger,它容許在併發任務之間交換數據。具體來講,Exchanger類容許在兩個線程之間定義同步點。當兩個線程都到達同步點時,他們交換數據結構,所以第一個線程的數據結構進入到第二個線程中,第二個線程的數據結構進入到第一個線程中。多線程
Exchanger是在兩個任務之間交換對象的柵欄,當這些任務進入柵欄時,它們各自擁有一個對象。當他們離開時,它們都擁有以前由對象持有的對象。它典型的應用場景是:一個任務在建立對象,這些對象的生產代價很高昂,而另外一個任務在消費這些對象。經過這種方式,能夠有更多的對象在被建立的同時被消費。併發
Exchange實現較爲複雜,咱們先看其怎麼使用,而後再來分析其源碼。如今咱們用Exchange來模擬生產-消費者問題:app
public class ExchangerTest { static class Producer implements Runnable{ //生產者、消費者交換的數據結構 private List<String> buffer; //步生產者和消費者的交換對象 private Exchanger<List<String>> exchanger; Producer(List<String> buffer,Exchanger<List<String>> exchanger){ this.buffer = buffer; this.exchanger = exchanger; } @Override public void run() { for(int i = 1 ; i < 5 ; i++){ System.out.println("生產者第" + i + "次提供"); for(int j = 1 ; j <= 3 ; j++){ System.out.println("生產者裝入" + i + "--" + j); buffer.add("buffer:" + i + "--" + j); } System.out.println("生產者裝滿,等待與消費者交換..."); try { exchanger.exchange(buffer); } catch (InterruptedException e) { e.printStackTrace(); } } } } static class Consumer implements Runnable { private List<String> buffer; private final Exchanger<List<String>> exchanger; public Consumer(List<String> buffer, Exchanger<List<String>> exchanger) { this.buffer = buffer; this.exchanger = exchanger; } @Override public void run() { for (int i = 1; i < 5; i++) { //調用exchange()與消費者進行數據交換 try { buffer = exchanger.exchange(buffer); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("消費者第" + i + "次提取"); for (int j = 1; j <= 3 ; j++) { System.out.println("消費者 : " + buffer.get(0)); buffer.remove(0); } } } } public static void main(String[] args){ List<String> buffer1 = new ArrayList<String>(); List<String> buffer2 = new ArrayList<String>(); Exchanger<List<String>> exchanger = new Exchanger<List<String>>(); Thread producerThread = new Thread(new Producer(buffer1,exchanger)); Thread consumerThread = new Thread(new Consumer(buffer2,exchanger)); producerThread.start(); consumerThread.start(); } }
打印結果
生產者第1次提供 生產者裝入1--1 生產者裝入1--2 生產者裝入1--3 生產者裝滿,等待與消費者交換... 生產者第2次提供 生產者裝入2--1 生產者裝入2--2 生產者裝入2--3 生產者裝滿,等待與消費者交換... 消費者第1次提取 消費者 : buffer:1--1 消費者 : buffer:1--2 消費者 : buffer:1--3 消費者第2次提取 ......
首先生產者Producer、消費者Consumer首先都建立一個緩衝列表,經過Exchanger來同步交換數據。消費中經過調用Exchanger與生產者進行同步來獲取數據,而生產者則經過for循環向緩存隊列存儲數據並使用exchanger對象消費者同步。到消費者從exchanger哪裏獲得數據後,他的緩衝列表中有3個數據,而生產者獲得的則是一個空的列表。上面的例子充分展現了消費者-生產者是如何利用Exchanger來完成數據交換的。
在Exchanger中,若是一個線程已經到達了exchanger節點時,對於它的夥伴節點的狀況有三種:
Exchanger算法的核心是經過一個可交換數據的slot,以及一個能夠帶有數據item的參與者。源碼中的描述以下:
for (;;) { if (slot is empty) { // offer place item in a Node; if (can CAS slot from empty to node) { wait for release; return matching item in node; } } else if (can CAS slot from node to empty) { // release get the item in node; set matching item in node; release waiting thread; } // else retry on CAS failure }
Exchanger中定義了以下幾個重要的成員變量:
private final Participant participant; private volatile Node[] arena; private volatile Node slot;
participant的做用是爲每一個線程保留惟一的一個Node節點。
slot爲單個槽,arena爲數組槽。他們都是Node類型。在這裏可能會感受到疑惑,slot做爲Exchanger交換數據的場景,應該只須要一個就能夠了啊?爲什麼還多了一個Participant 和數組類型的arena呢?一個slot交換場所原則上來講應該是能夠的,但實際狀況卻不是如此,多個參與者使用同一個交換場所時,會存在嚴重伸縮性問題。既然單個交換場所存在問題,那麼咱們就安排多個,也就是數組arena。經過數組arena來安排不一樣的線程使用不一樣的slot來下降競爭問題,而且能夠保證最終必定會成對交換數據。可是Exchanger不是一來就會生成arena數組來下降競爭,只有當產生競爭是纔會生成arena數組。那麼怎麼將Node與當前線程綁定呢?Participant ,Participant 的做用就是爲每一個線程保留惟一的一個Node節點,它繼承ThreadLocal,同時在Node節點中記錄在arena中的下標index。
Node定義以下:
@sun.misc.Contended static final class Node { int index; // Arena index int bound; // Last recorded value of Exchanger.bound int collides; // Number of CAS failures at current bound int hash; // Pseudo-random for spins Object item; // This thread's current item volatile Object match; // Item provided by releasing thread volatile Thread parked; // Set to this thread when parked, else null }
在Node定義中有兩個變量值得思考:bound以及collides。前面提到了數組area是爲了不競爭而產生的,若是系統不存在競爭問題,那麼徹底沒有必要開闢一個高效的arena來徒增系統的複雜性。首先經過單個slot的exchanger來交換數據,當探測到競爭時將安排不一樣的位置的slot來保存線程Node,而且能夠確保沒有slot會在同一個緩存行上。如何來判斷會有競爭呢?CAS替換slot失敗,若是失敗,則經過記錄衝突次數來擴展arena的尺寸,咱們在記錄衝突的過程當中會跟蹤「bound」的值,以及會從新計算衝突次數在bound的值被改變時。這裏闡述可能有點兒模糊,不着急,咱們先有這個概念,後面在arenaExchange中再次作詳細闡述,咱們直接看exchange()方法。
exchange(V x):等待另外一個線程到達此交換點(除非當前線程被中斷),而後將給定的對象傳送給該線程,並接收該線程的對象。方法定義以下:
public V exchange(V x) throws InterruptedException { Object v; Object item = (x == null) ? NULL_ITEM : x; // translate null args if ((arena != null || (v = slotExchange(item, false, 0L)) == null) && ((Thread.interrupted() || // disambiguates null return (v = arenaExchange(item, false, 0L)) == null))) throw new InterruptedException(); return (v == NULL_ITEM) ? null : (V)v; }
這個方法比較好理解:arena爲數組槽,若是爲null,則執行slotExchange()方法,不然判斷線程是否中斷,若是中斷值拋出InterruptedException異常,沒有中斷則執行arenaExchange()方法。整套邏輯就是:若是slotExchange(Object item, boolean timed, long ns)方法執行失敗了就執行arenaExchange(Object item, boolean timed, long ns)方法,最後返回結果V。
NULL_ITEM 爲一個空節點,其實就是一個Object對象而已,slotExchange()爲單個slot交換。
slotExchange(Object item, boolean timed, long ns)
private final Object slotExchange(Object item, boolean timed, long ns) { // 獲取當前線程的節點 p Node p = participant.get(); // 當前線程 Thread t = Thread.currentThread(); // 線程中斷,直接返回 if (t.isInterrupted()) return null; // 自旋 for (Node q;;) { //slot != null if ((q = slot) != null) { //嘗試CAS替換 if (U.compareAndSwapObject(this, SLOT, q, null)) { Object v = q.item; // 當前線程的項,也就是交換的數據 q.match = item; // 作releasing操做的線程傳遞的項 Thread w = q.parked; // 掛起時設置線程值 // 掛起線程不爲null,線程掛起 if (w != null) U.unpark(w); return v; } //若是失敗了,則建立arena //bound 則是上次Exchanger.bound if (NCPU > 1 && bound == 0 && U.compareAndSwapInt(this, BOUND, 0, SEQ)) arena = new Node[(FULL + 2) << ASHIFT]; } //若是arena != null,直接返回,進入arenaExchange邏輯處理 else if (arena != null) return null; else { p.item = item; if (U.compareAndSwapObject(this, SLOT, null, p)) break; p.item = null; } } /* * 等待 release * 進入spin+block模式 */ int h = p.hash; long end = timed ? System.nanoTime() + ns : 0L; int spins = (NCPU > 1) ? SPINS : 1; Object v; while ((v = p.match) == null) { if (spins > 0) { h ^= h << 1; h ^= h >>> 3; h ^= h << 10; if (h == 0) h = SPINS | (int)t.getId(); else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0) Thread.yield(); } else if (slot != p) spins = SPINS; else if (!t.isInterrupted() && arena == null && (!timed || (ns = end - System.nanoTime()) > 0L)) { U.putObject(t, BLOCKER, this); p.parked = t; if (slot == p) U.park(false, ns); p.parked = null; U.putObject(t, BLOCKER, null); } else if (U.compareAndSwapObject(this, SLOT, p, null)) { v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null; break; } } U.putOrderedObject(p, MATCH, null); p.item = null; p.hash = h; return v; }
程序首先經過participant獲取當前線程節點Node。檢測是否中斷,若是中斷return null,等待後續拋出InterruptedException異常。
若是slot不爲null,則進行slot消除,成功直接返回數據V,不然失敗,則建立arena消除數組。
若是slot爲null,但arena不爲null,則返回null,進入arenaExchange邏輯。
若是slot爲null,且arena也爲null,則嘗試佔領該slot,失敗重試,成功則跳出循環進入spin+block(自旋+阻塞)模式。
在自旋+阻塞模式中,首先取得結束時間和自旋次數。若是match(作releasing操做的線程傳遞的項)爲null,其首先嚐試spins+隨機次自旋(改自旋使用當前節點中的hash,並改變之)和退讓。當自旋數爲0後,假如slot發生了改變(slot != p)則重置自旋數並重試。不然假如:當前未中斷&arena爲null&(當前不是限時版本或者限時版本+當前時間未結束):阻塞或者限時阻塞。假如:當前中斷或者arena不爲null或者當前爲限時版本+時間已經結束:不限時版本:置v爲null;限時版本:若是時間結束以及未中斷則TIMED_OUT;不然給出null(緣由是探測到arena非空或者當前線程中斷)。
match不爲空時跳出循環。
整個slotExchange清晰明瞭。
arenaExchange(Object item, boolean timed, long ns)
private final Object arenaExchange(Object item, boolean timed, long ns) { Node[] a = arena; Node p = participant.get(); for (int i = p.index;;) { // access slot at i int b, m, c; long j; // j is raw array offset Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE); if (q != null && U.compareAndSwapObject(a, j, q, null)) { Object v = q.item; // release q.match = item; Thread w = q.parked; if (w != null) U.unpark(w); return v; } else if (i <= (m = (b = bound) & MMASK) && q == null) { p.item = item; // offer if (U.compareAndSwapObject(a, j, null, p)) { long end = (timed && m == 0) ? System.nanoTime() + ns : 0L; Thread t = Thread.currentThread(); // wait for (int h = p.hash, spins = SPINS;;) { Object v = p.match; if (v != null) { U.putOrderedObject(p, MATCH, null); p.item = null; // clear for next use p.hash = h; return v; } else if (spins > 0) { h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift if (h == 0) // initialize hash h = SPINS | (int)t.getId(); else if (h < 0 && // approx 50% true (--spins & ((SPINS >>> 1) - 1)) == 0) Thread.yield(); // two yields per wait } else if (U.getObjectVolatile(a, j) != p) spins = SPINS; // releaser hasn't set match yet else if (!t.isInterrupted() && m == 0 && (!timed || (ns = end - System.nanoTime()) > 0L)) { U.putObject(t, BLOCKER, this); // emulate LockSupport p.parked = t; // minimize window if (U.getObjectVolatile(a, j) == p) U.park(false, ns); p.parked = null; U.putObject(t, BLOCKER, null); } else if (U.getObjectVolatile(a, j) == p && U.compareAndSwapObject(a, j, p, null)) { if (m != 0) // try to shrink U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1); p.item = null; p.hash = h; i = p.index >>>= 1; // descend if (Thread.interrupted()) return null; if (timed && m == 0 && ns <= 0L) return TIMED_OUT; break; // expired; restart } } } else p.item = null; // clear offer } else { if (p.bound != b) { // stale; reset p.bound = b; p.collides = 0; i = (i != m || m == 0) ? m : m - 1; } else if ((c = p.collides) < m || m == FULL || !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) { p.collides = c + 1; i = (i == 0) ? m : i - 1; // cyclically traverse } else i = m + 1; // grow p.index = i; } } }
首先經過participant取得當前節點Node,而後根據當前節點Node的index去取arena中相對應的節點node。前面提到過arena能夠確保不一樣的slot在arena中是不會相沖突的,那麼是怎麼保證的呢?咱們先看arena的建立:
arena = new Node[(FULL + 2) << ASHIFT];
這個arena到底有多大呢?咱們先看FULL 和ASHIFT的定義:
static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1; private static final int ASHIFT = 7; private static final int NCPU = Runtime.getRuntime().availableProcessors(); private static final int MMASK = 0xff; // 255
假如個人機器NCPU = 8 ,則獲得的是768大小的arena數組。而後經過如下代碼取得在arena中的節點:
Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
仍然是經過右移ASHIFT位來取得Node的,ABASE定義以下:
Class<?> ak = Node[].class; ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT);
U.arrayBaseOffset獲取對象頭長度,數組元素的大小能夠經過unsafe.arrayIndexScale(T[].class) 方法獲取到。這也就是說要訪問類型爲T的第N個元素的話,你的偏移量offset應該是arrayOffset+N*arrayScale。也就是說BASE = arrayOffset+ 128 。其次咱們再看Node節點的定義
@sun.misc.Contended static final class Node{ .... }
在Java 8 中咱們是能夠利用sun.misc.Contended來規避僞共享的。因此說經過 << ASHIFT方式加上sun.misc.Contended,因此使得任意兩個可用Node不會再同一個緩存行中。
LockSupport
是一個線程阻塞工具類,全部的方法都是靜態方法,可讓線程在任意位置阻塞,固然阻塞以後確定得有喚醒的方法。
接下面我來看看LockSupport
有哪些經常使用的方法。主要有兩類方法:park
和unpark
。
public static void park(Object blocker); // 暫停當前線程 public static void parkNanos(Object blocker, long nanos); // 暫停當前線程,不過有超時時間的限制 public static void parkUntil(Object blocker, long deadline); // 暫停當前線程,直到某個時間 public static void park(); // 無期限暫停當前線程 public static void parkNanos(long nanos); // 暫停當前線程,不過有超時時間的限制 public static void parkUntil(long deadline); // 暫停當前線程,直到某個時間 public static void unpark(Thread thread); // 恢復當前線程 public static Object getBlocker(Thread t);
park英文意思爲停車。咱們若是把Thread當作一輛車的話,park就是讓車停下,unpark就是讓車啓動而後跑起來。
寫一個例子來看看這個工具類怎麼用
public class LockSupportDemo { public static Object u = new Object(); static ChangeObjectThread t1 = new ChangeObjectThread("t1"); static ChangeObjectThread t2 = new ChangeObjectThread("t2"); public static class ChangeObjectThread extends Thread { public ChangeObjectThread(String name) { super(name); } @Override public void run() { synchronized (u) { System.out.println("in " + getName()); LockSupport.park(); if (Thread.currentThread().isInterrupted()) { System.out.println("被中斷了"); } System.out.println("繼續執行"); } } } public static void main(String[] args) throws InterruptedException { t1.start(); Thread.sleep(1000L); t2.start(); Thread.sleep(3000L); t1.interrupt(); LockSupport.unpark(t2); t1.join(); t2.join(); } }
運行的結果以下:
in t1 被中斷了 繼續執行 in t2 繼續執行
這兒park
和unpark
其實實現了wait
和notify
的功能,不過仍是有一些差異的。
park
不須要獲取某個對象的鎖park
不會拋出InterruptedException
異常,因此須要在park
以後自行判斷中斷狀態,而後作額外的處理咱們再來看看Object blocker
,這是個什麼東西呢?這其實就是方便在線程dump的時候看到具體的阻塞對象的信息。
"t1" #10 prio=5 os_prio=31 tid=0x00007f95030cc800 nid=0x4e03 waiting on condition [0x00007000011c9000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:304) // `下面的這個信息` at com.wtuoblist.beyond.concurrent.demo.chapter3.LockSupportDemo$ChangeObjectThread.run(LockSupportDemo.java:23) // - locked <0x0000000795830950> (a java.lang.Object)
相對於線程的stop和resume
,park和unpark
的前後順序並非那麼嚴格。stop和resume
若是順序反了,會出現死鎖現象。而park和unpark
卻不會。這又是爲何呢?仍是看一個例子
public class LockSupportDemo { public static Object u = new Object(); static ChangeObjectThread t1 = new ChangeObjectThread("t1"); public static class ChangeObjectThread extends Thread { public ChangeObjectThread(String name) { super(name); } @Override public void run() { synchronized (u) { System.out.println("in " + getName()); try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } LockSupport.park(); if (Thread.currentThread().isInterrupted()) { System.out.println("被中斷了"); } System.out.println("繼續執行"); } } } public static void main(String[] args) { t1.start(); LockSupport.unpark(t1); System.out.println("unpark invoked"); } }
t1內部有休眠1s的操做,因此unpark確定先於park的調用,可是t1最終仍然能夠完結。這是由於park和unpark
會對每一個線程維持一個許可(boolean值)