本文參考羣主的博客http://cmsblogs.com/?p=2269java
Java 併發 API 提供了一種容許2個併發任務間相互交換數據的同步應用。更具體的說,Exchanger 類容許在2個線程間定義同步點,當2個線程到達這個點,他們相互交換數據類型,使用第一個線程的數據類型變成第二個的,而後第二個線程的數據類型變成第一個的。node
package com; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Exchanger; class Producer implements Runnable { // 要被相互交換的數據類型。 private List<String> buffer; // 用來同步 producer和consumer private final Exchanger<List<String>> exchanger; public Producer(List<String> buffer, Exchanger<List<String>> exchanger) { this.buffer = buffer; this.exchanger = exchanger; } public void run() { // 實現10次交換 for (int i = 0; i < 10; i++) { buffer.add("第" + i + "次生產者的數據" + i); try { // 調用exchange方法來與consumer交換數據 System.out.println("第" + i + "次生產者在等待....."); buffer = exchanger.exchange(buffer); System.out.println("第" + i + "次生產者交換後的數據:" + buffer.get(i)); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Consumer implements Runnable { // 用來相互交換 private List<String> buffer; // 用來同步 producer和consumer private final Exchanger<List<String>> exchanger; public Consumer(List<String> buffer, Exchanger<List<String>> exchanger) { this.buffer = buffer; this.exchanger = exchanger; } public void run() { // 實現10次交換 for (int i = 0; i < 10; i++) { buffer.add("第" + i + "次消費者的數據" + i); try { // 調用exchange方法來與consumer交換數據 System.out.println("第" + i + "次消費者在等待....."); buffer = exchanger.exchange(buffer); System.out.println("第" + i + "次消費者交換後的數據:" + buffer.get(i)); } catch (InterruptedException e) { e.printStackTrace(); } } } }
//主類 import java.util.ArrayList; import java.util.List; import java.util.concurrent.Exchanger; public class Core { public static void main(String[] args) { // 建立2個buffers,分別給producer和consumer使用 List<String> buffer1 = new ArrayList<String>(); List<String> buffer2 = new ArrayList<String>(); // 建立Exchanger對象,用來同步producer和consumer Exchanger<List<String>> exchanger = new Exchanger<List<String>>(); // 建立Producer對象和Consumer對象 Producer producer = new Producer(buffer1, exchanger); Consumer consumer = new Consumer(buffer2, exchanger); // 建立線程來執行producer和consumer並開始線程 Thread threadProducer = new Thread(producer); Thread threadConsumer = new Thread(consumer); threadProducer.start(); threadConsumer.start(); } }
在Exchanger中,若是一個線程已經到達了exchanger節點時,對於它的夥伴節點的狀況有三種:算法
Exchanger算法的核心是經過一個可交換數據的slot,以及一個能夠帶有數據item的參與者。數組
for (;;) { if (slot is empty) { // offer place item in a Node; if (can CAS slot from empty to node) { wait for release; return matching item in node; } } else if (can CAS slot from node to empty) { // release get the item in node; set matching item in node; release waiting thread; } // else retry on CAS failure }
Exchanger中定義了以下幾個重要的成員變量:併發
private final Participant participant; private volatile Node[] arena; private volatile Node slot;
participant的做用是爲每一個線程保留惟一的一個Node節點。app
slot爲單個槽,arena爲數組槽。他們都是Node類型。在這裏可能會感受到疑惑,slot做爲Exchanger交換數據的場景,應該只須要一個就能夠了啊?爲什麼還多了一個Participant 和數組類型的arena呢?一個slot交換場所原則上來講應該是能夠的,但實際狀況卻不是如此,多個參與者使用同一個交換場所時,會存在嚴重伸縮性問題。既然單個交換場所存在問題,那麼咱們就安排多個,也就是數組arena。經過數組arena來安排不一樣的線程使用不一樣的slot來下降競爭問題,而且能夠保證最終必定會成對交換數據。可是Exchanger不是一來就會生成arena數組來下降競爭,只有當產生競爭是纔會生成arena數組。那麼怎麼將Node與當前線程綁定呢?Participant ,Participant 的做用就是爲每一個線程保留惟一的一個Node節點,它繼承ThreadLocal,同時在Node節點中記錄在arena中的下標index。ide
Node定義以下:this
@sun.misc.Contended static final class Node { int index; // arena的下標; int bound; // 上一次記錄的Exchanger.bound int collides; // 在當前bound下CAS失敗的次數 int hash; // 僞隨機數,用於自旋; Object item; // 這個線程的當前項,也就是須要交換的數據; volatile Object match; // 作releasing操做的線程傳遞的項; volatile Thread parked; //掛起時設置線程值,其餘狀況下爲null; }
exchange(V x):等待另外一個線程到達此交換點(除非當前線程被中斷),而後將給定的對象傳送給該線程,並接收該線程的對象。spa
public V exchange(V x) throws InterruptedException { Object v; Object item = (x == null) ? NULL_ITEM : x; // translate null args if ((arena != null || (v = slotExchange(item, false, 0L)) == null) && ((Thread.interrupted() || // disambiguates null return (v = arenaExchange(item, false, 0L)) == null))) throw new InterruptedException(); return (v == NULL_ITEM) ? null : (V)v; }
這個方法比較好理解:arena爲數組槽,若是爲null,則執行slotExchange()方法,不然判斷線程是否中斷,若是中斷值拋出InterruptedException異常,沒有中斷則執行arenaExchange()方法。整套邏輯就是:若是slotExchange(Object item, boolean timed, long ns)方法執行失敗了就執行arenaExchange(Object item, boolean timed, long ns)方法,最後返回結果V。線程
NULL_ITEM 爲一個空節點,其實就是一個Object對象而已,slotExchange()爲單個slot交換。
private final Object slotExchange(Object item, boolean timed, long ns) { // 獲取當前線程的節點 p Node p = participant.get(); // 當前線程 Thread t = Thread.currentThread(); // 線程中斷,直接返回 if (t.isInterrupted()) return null; // 自旋 for (Node q;;) { //slot != null if ((q = slot) != null) { //嘗試CAS替換 if (U.compareAndSwapObject(this, SLOT, q, null)) { Object v = q.item; // 當前線程的項,也就是交換的數據 q.match = item; // 作releasing操做的線程傳遞的項 Thread w = q.parked; // 掛起時設置線程值 // 掛起線程不爲null,線程掛起 if (w != null) U.unpark(w); return v; } //若是失敗了,則建立arena //bound 則是上次Exchanger.bound if (NCPU > 1 && bound == 0 && U.compareAndSwapInt(this, BOUND, 0, SEQ)) arena = new Node[(FULL + 2) << ASHIFT]; } //若是arena != null,直接返回,進入arenaExchange邏輯處理 else if (arena != null) return null; else { p.item = item; if (U.compareAndSwapObject(this, SLOT, null, p)) break; p.item = null; } } /* * 等待 release * 進入spin+block模式 */ int h = p.hash; long end = timed ? System.nanoTime() + ns : 0L; int spins = (NCPU > 1) ? SPINS : 1; Object v; while ((v = p.match) == null) { if (spins > 0) { h ^= h << 1; h ^= h >>> 3; h ^= h << 10; if (h == 0) h = SPINS | (int)t.getId(); else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0) Thread.yield(); } else if (slot != p) spins = SPINS; else if (!t.isInterrupted() && arena == null && (!timed || (ns = end - System.nanoTime()) > 0L)) { U.putObject(t, BLOCKER, this); p.parked = t; if (slot == p) U.park(false, ns); p.parked = null; U.putObject(t, BLOCKER, null); } else if (U.compareAndSwapObject(this, SLOT, p, null)) { v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null; break; } } U.putOrderedObject(p, MATCH, null); p.item = null; p.hash = h; return v; }
程序首先經過participant獲取當前線程節點Node。檢測是否中斷,若是中斷return null,等待後續拋出InterruptedException異常。
若是slot不爲null,則進行slot消除,成功直接返回數據V,不然失敗,則建立arena消除數組。
若是slot爲null,但arena不爲null,則返回null,進入arenaExchange邏輯。
若是slot爲null,且arena也爲null,則嘗試佔領該slot,失敗重試,成功則跳出循環進入spin+block(自旋+阻塞)模式。
在自旋+阻塞模式中,首先取得結束時間和自旋次數。若是match(作releasing操做的線程傳遞的項)爲null,其首先嚐試spins+隨機次自旋(改自旋使用當前節點中的hash,並改變之)和退讓。當自旋數爲0後,假如slot發生了改變(slot != p)則重置自旋數並重試。不然假如:當前未中斷&arena爲null&(當前不是限時版本或者限時版本+當前時間未結束):阻塞或者限時阻塞。假如:當前中斷或者arena不爲null或者當前爲限時版本+時間已經結束:不限時版本:置v爲null;限時版本:若是時間結束以及未中斷則TIMED_OUT;不然給出null(緣由是探測到arena非空或者當前線程中斷)。
match不爲空時跳出循環。
private final Object arenaExchange(Object item, boolean timed, long ns) { Node[] a = arena; Node p = participant.get(); for (int i = p.index;;) { // access slot at i int b, m, c; long j; // j is raw array offset Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE); if (q != null && U.compareAndSwapObject(a, j, q, null)) { Object v = q.item; // release q.match = item; Thread w = q.parked; if (w != null) U.unpark(w); return v; } else if (i <= (m = (b = bound) & MMASK) && q == null) { p.item = item; // offer if (U.compareAndSwapObject(a, j, null, p)) { long end = (timed && m == 0) ? System.nanoTime() + ns : 0L; Thread t = Thread.currentThread(); // wait for (int h = p.hash, spins = SPINS;;) { Object v = p.match; if (v != null) { U.putOrderedObject(p, MATCH, null); p.item = null; // clear for next use p.hash = h; return v; } else if (spins > 0) { h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift if (h == 0) // initialize hash h = SPINS | (int)t.getId(); else if (h < 0 && // approx 50% true (--spins & ((SPINS >>> 1) - 1)) == 0) Thread.yield(); // two yields per wait } else if (U.getObjectVolatile(a, j) != p) spins = SPINS; // releaser hasn't set match yet else if (!t.isInterrupted() && m == 0 && (!timed || (ns = end - System.nanoTime()) > 0L)) { U.putObject(t, BLOCKER, this); // emulate LockSupport p.parked = t; // minimize window if (U.getObjectVolatile(a, j) == p) U.park(false, ns); p.parked = null; U.putObject(t, BLOCKER, null); } else if (U.getObjectVolatile(a, j) == p && U.compareAndSwapObject(a, j, p, null)) { if (m != 0) // try to shrink U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1); p.item = null; p.hash = h; i = p.index >>>= 1; // descend if (Thread.interrupted()) return null; if (timed && m == 0 && ns <= 0L) return TIMED_OUT; break; // expired; restart } } } else p.item = null; // clear offer } else { if (p.bound != b) { // stale; reset p.bound = b; p.collides = 0; i = (i != m || m == 0) ? m : m - 1; } else if ((c = p.collides) < m || m == FULL || !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) { p.collides = c + 1; i = (i == 0) ? m : i - 1; // cyclically traverse } else i = m + 1; // grow p.index = i; } } }
首先經過participant取得當前節點Node,而後根據當前節點Node的index去取arena中相對應的節點node。前面提到過arena能夠確保不一樣的slot在arena中是不會相沖突的,那麼是怎麼保證的呢?
arena = new Node[(FULL + 2) << ASHIFT];
取得arena中的node節點後,若是定位的節點q 不爲空,且CAS操做成功,則交換數據,返回交換的數據,喚醒等待的線程。
若是q等於null且下標在bound & MMASK範圍以內,則嘗試佔領該位置,若是成功,則採用自旋 + 阻塞的方式進行等待交換數據。
若是下標不在bound & MMASK範圍以內獲取因爲q不爲null可是競爭失敗的時候:消除p。加入bound 不等於當前節點的bond(b != p.bound),則更新p.bound = b,collides = 0 ,i = m或者m – 1。若是衝突的次數不到m 獲取m 已經爲最大值或者修改當前bound的值失敗,則經過增長一次collides以及循環遞減下標i的值;不然更新當前bound的值成功:咱們令i爲m+1即爲此時最大的下標。最後更新當前index的值。
Exchanger使用、原理都比較好理解,可是這個源碼看起來真心有點兒複雜,是真心難看懂,可是這種交換的思路Doug Lea在後續博文中還會提到,例如SynchronousQueue、LinkedTransferQueue。
其實就是」我」和」你」(可能有多個」我」,多個」你」)在一個叫Slot的地方作交易(一手交錢,一手交貨),過程分如下步驟: