本文首發於一世流雲專欄: https://segmentfault.com/blog...
Exchanger
——交換器,是JDK1.5時引入的一個同步器,從字面上就能夠看出,這個類的主要做用是交換數據。java
Exchanger有點相似於CyclicBarrier
,咱們知道CyclicBarrier是一個柵欄,到達柵欄的線程須要等待其它必定數量的線程到達後,才能經過柵欄。segmentfault
Exchanger能夠當作是一個雙向柵欄,以下圖:
數組
Thread1線程到達柵欄後,會首先觀察有沒其它線程已經到達柵欄,若是沒有就會等待,若是已經有其它線程(Thread2)已經到達了,就會以成對的方式交換各自攜帶的信息,所以Exchanger很是適合用於兩個線程之間的數據交換。緩存
咱們來看一個示例,理解下Exchanger的功能:併發
示例:假設如今有1個生產者,1個消費者,若是要實現生產者-消費者模式,通常的思路是利用隊列做爲一個消息隊列,生產者不斷生產消息,而後入隊;消費者不斷從消息隊列中取消息進行消費。若是隊列滿了,生產者等待,若是隊列空了,消費者等待。
咱們來看下如何利用Exchanger實現生產者-消息者模式:
生產者:app
public class Producer implements Runnable { private final Exchanger<Message> exchanger; public Producer(Exchanger<Message> exchanger) { this.exchanger = exchanger; } @Override public void run() { Message message = new Message(null); for (int i = 0; i < 3; i++) { try { Thread.sleep(1000); message.setV(String.valueOf(i)); System.out.println(Thread.currentThread().getName() + ": 生產了數據[" + i + "]"); message = exchanger.exchange(message); System.out.println(Thread.currentThread().getName() + ": 交換獲得數據[" + String.valueOf(message.getV()) + "]"); } catch (InterruptedException e) { e.printStackTrace(); } } } }
消費者:ide
public class Consumer implements Runnable { private final Exchanger<Message> exchanger; public Consumer(Exchanger<Message> exchanger) { this.exchanger = exchanger; } @Override public void run() { Message msg = new Message(null); while (true) { try { Thread.sleep(1000); msg = exchanger.exchange(msg); System.out.println(Thread.currentThread().getName() + ": 消費了數據[" + msg.getV() + "]"); msg.setV(null); } catch (InterruptedException e) { e.printStackTrace(); } } } }
Main:工具
public class Main { public static void main(String[] args) { Exchanger<Message> exchanger = new Exchanger<>(); Thread t1 = new Thread(new Consumer(exchanger), "消費者-t1"); Thread t2 = new Thread(new Producer(exchanger), "生產者-t2"); t1.start(); t2.start(); } }
輸出:性能
生產者-t2: 生產了數據[0] 生產者-t2: 交換獲得數據[null] 消費者-t1: 消費了數據[0] 生產者-t2: 生產了數據[1] 消費者-t1: 消費了數據[1] 生產者-t2: 交換獲得數據[null] 生產者-t2: 生產了數據[2] 消費者-t1: 消費了數據[2] 生產者-t2: 交換獲得數據[null]
上述示例中,生產者生產了3個數據:0、一、2。經過Exchanger與消費者進行交換。能夠看到,消費者消費完後會將空的Message交換給生產者。優化
咱們先來看下Exchanger的構造,Exchanger只有一個空構造器:
構造時,內部建立了一個Participant對象,Participant是Exchanger的一個內部類,本質就是一個ThreadLocal,用來保存線程本地變量Node:
咱們能夠把Node對象理解成每一個線程自身攜帶的交換數據,:
Exchanger有兩種數據交換的方式,當併發量低的時候,內部採用「單槽位交換」;併發量高的時候會採用「多槽位交換」。
咱們先來看下exchange方法:
能夠看到exchange其實就是一個用於判斷數據交換方式的方法,它的內部會根據Exchanger的某些字段狀態來判斷當前應該採用單槽交換(slotExchange)仍是多槽交換(arenaExchange),整個判斷的流程圖以下:
Exchanger的arena字段是一個Node類型的數組,表明了一個槽數組,只在多槽交換時會用到。此外,Exchanger還有一個slot字段,表示單槽交換結點,只在單槽交換時使用。
slot字段最終會指向首個到達的線程的自身Node結點,表示線程佔用了槽位。
單槽交換示意圖:
咱們來看下Exchanger具體是如何實現單槽交換的,單槽交換方法slotExchange並不複雜,slotExchange的入參item表示當前線程攜帶的數據,返回值正常狀況下爲配對線程攜帶的數據:
/** * 單槽交換 * * @param item 待交換的數據 * @return 其它配對線程的數據; 若是多槽交換被激活或被中斷返回null, 若是超時返回TIMED_OUT(一個Obejct對象) */ private final Object slotExchange(Object item, boolean timed, long ns) { Node p = participant.get(); // 當前線程攜帶的交換結點 Thread t = Thread.currentThread(); if (t.isInterrupted()) // 線程的中斷狀態檢查 return null; for (Node q; ; ) { if ((q = slot) != null) { // slot != null, 說明已經有線程先到並佔用了slot if (U.compareAndSwapObject(this, SLOT, q, null)) { Object v = q.item; // 獲取交換值 q.match = item; // 設置交換值 Thread w = q.parked; if (w != null) // 喚醒在此槽位等待的線程 U.unpark(w); return v; // 交換成功, 返回結果 } // CPU核數數多於1個, 且bound爲0時建立arena數組,並將bound設置爲SEQ大小 if (NCPU > 1 && bound == 0 && U.compareAndSwapInt(this, BOUND, 0, SEQ)) arena = new Node[(FULL + 2) << ASHIFT]; } else if (arena != null) // slot == null && arena != null // 單槽交換中途出現了初始化arena的操做,須要從新直接路由到多槽交換(arenaExchange) return null; else { // 當前線程先到, 則佔用此slot p.item = item; if (U.compareAndSwapObject(this, SLOT, null, p)) // 將slot槽佔用 break; p.item = null; // CAS操做失敗, 繼續下一次自旋 } } // 執行到這, 說明當前線程先到達, 且已經佔用了slot槽, 須要等待配對線程到達 int h = p.hash; long end = timed ? System.nanoTime() + ns : 0L; int spins = (NCPU > 1) ? SPINS : 1; // 自旋次數, 與CPU核數有關 Object v; while ((v = p.match) == null) { // p.match == null表示配對的線程還未到達 if (spins > 0) { // 優化操做:自旋過程當中隨機釋放CPU h ^= h << 1; h ^= h >>> 3; h ^= h << 10; if (h == 0) h = SPINS | (int) t.getId(); else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0) Thread.yield(); } else if (slot != p) // 優化操做:配對線程已經到達, 可是還未徹底準備好, 因此須要再自旋等待一下子 spins = SPINS; else if (!t.isInterrupted() && arena == null && (!timed || (ns = end - System.nanoTime()) > 0L)) { //已經自旋好久了, 仍是等不到配對, 此時才阻塞當前線程 U.putObject(t, BLOCKER, this); p.parked = t; if (slot == p) U.park(false, ns); // 阻塞當前線程 p.parked = null; U.putObject(t, BLOCKER, null); } else if (U.compareAndSwapObject(this, SLOT, p, null)) { // 超時或其餘(取消), 給其餘線程騰出slot v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null; break; } } U.putOrderedObject(p, MATCH, null); p.item = null; p.hash = h; return v; }
上述代碼的整個流程大體以下:
首先到達的線程:
稍後到達的配對線程:
若是當前線程(配對線程)不是首個到達的線程,則到達時槽(slot)已經被佔用,此時slot指向首個到達線程自身的Node結點。配對線程會將slot置空,並取Node中的item做爲交換獲得的數據返回,另外,配對線程會把自身攜帶的數據存入Node的match字段中,並喚醒Node.parked
所指向的線程(也就是先到達的線程)。
首先到達的線程被喚醒:
線程被喚醒後,因爲match不爲空(存放了配對線程攜帶過來的數據),因此會退出自旋,而後將match對應的值返回。
這樣,線程A和線程B就實現了數據交換,整個過程都沒有用到同步操做。
Exchanger最複雜的地方就是它的多槽位交換(arenaExchange),咱們先看下,何時會觸發多槽位交換?
咱們以前說了,併發量大的時候會觸發多槽交換,這個說法並不許確。
單槽交換(slotExchange)中有這樣一段代碼:
也就是說,若是在單槽交換中,同時出現了多個配對線程競爭修改slot槽位,致使某個線程CAS修改slot失敗時,就會初始化arena多槽數組,後續全部的交換都會走arenaExchange:
/** * 多槽交換 * * @param item 待交換的數據 * @return 其它配對線程的數據; 若是被中斷返回null, 若是超時返回TIMED_OUT(一個Obejct對象) */ private final Object arenaExchange(Object item, boolean timed, long ns) { Node[] a = arena; Node p = participant.get(); // 當前線程攜帶的交換結點 for (int i = p.index; ; ) { // 當前線程的arena索引 int b, m, c; long j; // 從arena數組中選出偏移地址爲(i << ASHIFT) + ABASE的元素, 即真正可用的Node Node q = (Node) U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE); if (q != null && U.compareAndSwapObject(a, j, q, null)) { // CASE1: 槽不爲空,說明已經有線程到達並在等待了 Object v = q.item; // 獲取已經到達的線程所攜帶的值 q.match = item; // 把當前線程攜帶的值交換給已經到達的線程 Thread w = q.parked; // q.parked指向已經到達的線程 if (w != null) U.unpark(w); // 喚醒已經到達的線程 return v; } else if (i <= (m = (b = bound) & MMASK) && q == null) { // CASE2: 有效槽位位置且槽位爲空 p.item = item; if (U.compareAndSwapObject(a, j, null, p)) { // 佔用該槽位, 成功 long end = (timed && m == 0) ? System.nanoTime() + ns : 0L; Thread t = Thread.currentThread(); for (int h = p.hash, spins = SPINS; ; ) { // 自旋等待一段時間,看看有沒其它配對線程到達該槽位 Object v = p.match; if (v != null) { // 有配對線程到達了該槽位 U.putOrderedObject(p, MATCH, null); p.item = null; p.hash = h; return v; // 返回配對線程交換過來的值 } else if (spins > 0) { h ^= h << 1; h ^= h >>> 3; h ^= h << 10; if (h == 0) // initialize hash h = SPINS | (int) t.getId(); else if (h < 0 && // approx 50% true (--spins & ((SPINS >>> 1) - 1)) == 0) Thread.yield(); // 每一次等待有兩次讓出CPU的時機 } else if (U.getObjectVolatile(a, j) != p) // 優化操做:配對線程已經到達, 可是還未徹底準備好, 因此須要再自旋等待一下子 spins = SPINS; else if (!t.isInterrupted() && m == 0 && (!timed || (ns = end - System.nanoTime()) > 0L)) { // 等不到配對線程了, 阻塞當前線程 U.putObject(t, BLOCKER, this); p.parked = t; // 在結點引用當前線程,以便配對線程到達後喚醒我 if (U.getObjectVolatile(a, j) == p) U.park(false, ns); p.parked = null; U.putObject(t, BLOCKER, null); } else if (U.getObjectVolatile(a, j) == p && U.compareAndSwapObject(a, j, p, null)) { // 嘗試縮減arena槽數組的大小 if (m != 0) // try to shrink U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1); p.item = null; p.hash = h; i = p.index >>>= 1; // descend if (Thread.interrupted()) return null; if (timed && m == 0 && ns <= 0L) return TIMED_OUT; break; // expired; restart } } } else // 佔用槽位失敗 p.item = null; } else { // CASE3: 無效槽位位置, 須要擴容 if (p.bound != b) { p.bound = b; p.collides = 0; i = (i != m || m == 0) ? m : m - 1; } else if ((c = p.collides) < m || m == FULL || !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) { p.collides = c + 1; i = (i == 0) ? m : i - 1; // cyclically traverse } else i = m + 1; // grow p.index = i; } } } /** * 單槽交換 * * @param item 待交換的數據 * @return 其它配對線程的數據; 若是多槽交換被激活或被中斷返回null, 若是超時返回TIMED_OUT(一個Obejct對象) */ private final Object slotExchange(Object item, boolean timed, long ns) { Node p = participant.get(); // 當前線程攜帶的交換結點 Thread t = Thread.currentThread(); if (t.isInterrupted()) // 線程的中斷狀態檢查 return null; for (Node q; ; ) { if ((q = slot) != null) { // slot != null, 說明已經有線程先到並佔用了slot if (U.compareAndSwapObject(this, SLOT, q, null)) { Object v = q.item; // 獲取交換值 q.match = item; // 設置交換值 Thread w = q.parked; if (w != null) // 喚醒在此槽位等待的線程 U.unpark(w); return v; // 交換成功, 返回結果 } // CPU核數數多於1個, 且bound爲0時建立arena數組,並將bound設置爲SEQ大小 if (NCPU > 1 && bound == 0 && U.compareAndSwapInt(this, BOUND, 0, SEQ)) arena = new Node[(FULL + 2) << ASHIFT]; } else if (arena != null) // slot == null && arena != null // 單槽交換中途出現了初始化arena的操做,須要從新直接路由到多槽交換(arenaExchange) return null; else { // 當前線程先到, 則佔用此slot p.item = item; if (U.compareAndSwapObject(this, SLOT, null, p)) // 將slot槽佔用 break; p.item = null; // CAS操做失敗, 繼續下一次自旋 } } // 執行到這, 說明當前線程先到達, 且已經佔用了slot槽, 須要等待配對線程到達 int h = p.hash; long end = timed ? System.nanoTime() + ns : 0L; int spins = (NCPU > 1) ? SPINS : 1; // 自旋次數, 與CPU核數有關 Object v; while ((v = p.match) == null) { // p.match == null表示配對的線程還未到達 if (spins > 0) { // 優化操做:自旋過程當中隨機釋放CPU h ^= h << 1; h ^= h >>> 3; h ^= h << 10; if (h == 0) h = SPINS | (int) t.getId(); else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0) Thread.yield(); } else if (slot != p) // 優化操做:配對線程已經到達, 可是還未徹底準備好, 因此須要再自旋等待一下子 spins = SPINS; else if (!t.isInterrupted() && arena == null && (!timed || (ns = end - System.nanoTime()) > 0L)) { //已經自旋好久了, 仍是等不到配對, 此時才阻塞當前線程 U.putObject(t, BLOCKER, this); p.parked = t; if (slot == p) U.park(false, ns); // 阻塞當前線程 p.parked = null; U.putObject(t, BLOCKER, null); } else if (U.compareAndSwapObject(this, SLOT, p, null)) { // 超時或其餘(取消), 給其餘線程騰出slot v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null; break; } } U.putOrderedObject(p, MATCH, null); p.item = null; p.hash = h; return v; }
多槽交換方法arenaExchange的總體流程和slotExchange相似,主要區別在於它會根據當前線程的數據攜帶結點Node中的index字段計算出命中的槽位。
若是槽位被佔用,說明已經有線程先到了,以後的處理和slotExchange同樣;
若是槽位有效且爲null,說明當前線程是先到的,就佔用槽位,而後按照:spin->yield->block
這種鎖升級的順序進行優化的等待,等不到配對線程就會進入阻塞。
另外,因爲arenaExchange利用了槽數組,因此涉及到槽數組的擴容和縮減問題,讀者能夠本身去研讀源碼。
其次,在定位arena數組的有效槽位時,須要考慮緩存行的影響。因爲高速緩存與內存之間是以緩存行爲單位交換數據的,根據局部性原理,相鄰地址空間的數據會被加載到高速緩存的同一個數據塊上(緩存行),而數組是連續的(邏輯,涉及到虛擬內存)內存地址空間,所以,多個slot會被加載到同一個緩存行上,當一個slot改變時,會致使這個slot所在的緩存行上全部的數據(包括其餘的slot)無效,須要從內存從新加載,影響性能。
須要注意的是,因爲不一樣的JDK版本,同步工具類內部的實現細節千差萬別,因此最關鍵的仍是理解它的設計思想。
Exchanger的設計思想和
LongAdder有些相似,都是經過
無鎖+分散熱點
的方式提高性能,可是我的感受JDK1.8中的
Exchanger實現更爲複雜,特別是其中的多槽交換,還涉及了緩存行相關的東西。