Exchange是最簡單的也是最複雜的,簡單在於API很是簡單,就一個構造方法和兩個exchange()方法,最複雜在於它的實現是最複雜的。java
在API是這麼介紹的:能夠在對中對元素進行配對和交換的線程的同步點。每一個線程將條目上的某個方法呈現給 exchange 方法,與夥伴線程進行匹配,而且在返回時接收其夥伴的對象。node
Exchanger 可能被視爲 SynchronousQueue 的雙向形式。Exchanger 可能在應用程序(好比遺傳算法和管道設計)中頗有用。算法
Exchanger,它容許在併發任務之間交換數據。具體來講,Exchanger類容許在兩個線程之間定義同步點。當兩個線程都到達同步點時,他們交換數據結構,所以第一個線程的數據結構進入到第二個線程中,第二個線程的數據結構進入到第一個線程中。數組
應用示例 Exchange實現較爲複雜,咱們先看其怎麼使用,而後再來分析其源碼。如今咱們用Exchange來模擬生產-消費者問題:緩存
public class ExchangerTest {
static class Producer implements Runnable{
//生產者、消費者交換的數據結構
private List<String> buffer;
//步生產者和消費者的交換對象
private Exchanger<List<String>> exchanger;
Producer(List<String> buffer,Exchanger<List<String>> exchanger){
this.buffer = buffer;
this.exchanger = exchanger;
}
@Override
public void run() {
for(int i = 1 ; i < 5 ; i++){
System.out.println("生產者第" + i + "次提供");
for(int j = 1 ; j <= 3 ; j++){
System.out.println("生產者裝入" + i + "--" + j);
buffer.add("buffer:" + i + "--" + j);
}
System.out.println("生產者裝滿,等待與消費者交換...");
try {
exchanger.exchange(buffer);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class Consumer implements Runnable {
private List<String> buffer;
private final Exchanger<List<String>> exchanger;
public Consumer(List<String> buffer, Exchanger<List<String>> exchanger) {
this.buffer = buffer;
this.exchanger = exchanger;
}
@Override
public void run() {
for (int i = 1; i < 5; i++) {
//調用exchange()與消費者進行數據交換
try {
buffer = exchanger.exchange(buffer);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消費者第" + i + "次提取");
for (int j = 1; j <= 3 ; j++) {
System.out.println("消費者 : " + buffer.get(0));
buffer.remove(0);
}
}
}
}
public static void main(String[] args){
List<String> buffer1 = new ArrayList<String>();
List<String> buffer2 = new ArrayList<String>();
Exchanger<List<String>> exchanger = new Exchanger<List<String>>();
Thread producerThread = new Thread(new Producer(buffer1,exchanger));
Thread consumerThread = new Thread(new Consumer(buffer2,exchanger));
producerThread.start();
consumerThread.start();
}
}
複製代碼
運行結果: bash
首先生產者Producer、消費者Consumer首先都建立一個緩衝列表,經過Exchanger來同步交換數據。消費中經過調用Exchanger與生產者進行同步來獲取數據,而生產者則經過for循環向緩存隊列存儲數據並使用exchanger對象消費者同步。到消費者從exchanger哪裏獲得數據後,他的緩衝列表中有3個數據,而生產者獲得的則是一個空的列表。上面的例子充分展現了消費者-生產者是如何利用Exchanger來完成數據交換的。數據結構
在Exchanger中,若是一個線程已經到達了exchanger節點時,對於它的夥伴節點的狀況有三種:併發
Exchanger算法的核心是經過一個可交換數據的slot,以及一個能夠帶有數據item的參與者。源碼中的描述以下:app
for (;;) {
if (slot is empty) { // offer
place item in a Node;
if (can CAS slot from empty to node) {
wait for release;
return matching item in node;
}
}
else if (can CAS slot from node to empty) { // release
get the item in node;
set matching item in node;
release waiting thread;
}
// else retry on CAS failure
}
複製代碼
Exchanger中定義了以下幾個重要的成員變量:dom
private final Participant participant;
private volatile Node[] arena;
private volatile Node slot;
複製代碼
participant的做用是爲每一個線程保留惟一的一個Node節點。
slot爲單個槽,arena爲數組槽。他們都是Node類型。在這裏可能會感受到疑惑,slot做爲Exchanger交換數據的場景,應該只須要一個就能夠了啊?爲什麼還多了一個Participant 和數組類型的arena呢?
一個slot交換場所原則上來講應該是能夠的,但實際狀況卻不是如此,多個參與者使用同一個交換場所時,會存在嚴重伸縮性問題。既然單個交換場所存在問題,那麼咱們就安排多個,也就是數組arena。經過數組arena來安排不一樣的線程使用不一樣的slot來下降競爭問題,而且能夠保證最終必定會成對交換數據。可是Exchanger不是一來就會生成arena數組來下降競爭,只有當產生競爭是纔會生成arena數組。那麼怎麼將Node與當前線程綁定呢?
Participant ,Participant 的做用就是爲每一個線程保留惟一的一個Node節點,它繼承ThreadLocal,同時在Node節點中記錄在arena中的下標index。
Node定義以下:
@sun.misc.Contended static final class Node {
int index; // Arena index
int bound; // Last recorded value of Exchanger.bound
int collides; // Number of CAS failures at current bound
int hash; // Pseudo-random for spins
Object item; // This thread's current item
volatile Object match; // Item provided by releasing thread
volatile Thread parked; // Set to this thread when parked, else null
}
複製代碼
在Node定義中有兩個變量值得思考:bound以及collides。
前面提到了數組area是爲了不競爭而產生的,若是系統不存在競爭問題,那麼徹底沒有必要開闢一個高效的arena來徒增系統的複雜性。
首先經過單個slot的exchanger來交換數據,當探測到競爭時將安排不一樣的位置的slot來保存線程Node,而且能夠確保沒有slot會在同一個緩存行上。
如何來判斷會有競爭呢?CAS替換slot失敗,若是失敗,則經過記錄衝突次數來擴展arena的尺寸,咱們在記錄衝突的過程當中會跟蹤「bound」的值,以及會從新計算衝突次數在bound的值被改變時。這裏闡述可能有點兒模糊,不着急,咱們先有這個概念,後面在arenaExchange中再次作詳細闡述。
咱們直接看exchange()方法
exchange(V x):等待另外一個線程到達此交換點(除非當前線程被中斷),而後將給定的對象傳送給該線程,並接收該線程的對象。方法定義以下:
public V exchange(V x) throws InterruptedException {
Object v;
Object item = (x == null) ? NULL_ITEM : x; // translate null args
if ((arena != null ||
(v = slotExchange(item, false, 0L)) == null) &&
((Thread.interrupted() || // disambiguates null return
(v = arenaExchange(item, false, 0L)) == null)))
throw new InterruptedException();
return (v == NULL_ITEM) ? null : (V)v;
}
複製代碼
這個方法比較好理解:arena爲數組槽,若是爲null,則執行slotExchange()方法,不然判斷線程是否中斷,若是中斷值拋出InterruptedException異常,沒有中斷則執行arenaExchange()方法。
整套邏輯就是:若是slotExchange(Object item, boolean timed, long ns)方法執行失敗了就執行arenaExchange(Object item, boolean timed, long ns)方法,最後返回結果V。
NULL_ITEM 爲一個空節點,其實就是一個Object對象而已,slotExchange()爲單個slot交換。
private final Object slotExchange(Object item, boolean timed, long ns) {
// 獲取當前線程的節點 p
Node p = participant.get();
// 當前線程
Thread t = Thread.currentThread();
// 線程中斷,直接返回
if (t.isInterrupted())
return null;
// 自旋
for (Node q;;) {
//slot != null
if ((q = slot) != null) {
//嘗試CAS替換
if (U.compareAndSwapObject(this, SLOT, q, null)) {
Object v = q.item; // 當前線程的項,也就是交換的數據
q.match = item; // 作releasing操做的線程傳遞的項
Thread w = q.parked; // 掛起時設置線程值
// 掛起線程不爲null,線程掛起
if (w != null)
U.unpark(w);
return v;
}
//若是失敗了,則建立arena
//bound 則是上次Exchanger.bound
if (NCPU > 1 && bound == 0 &&
U.compareAndSwapInt(this, BOUND, 0, SEQ))
arena = new Node[(FULL + 2) << ASHIFT];
}
//若是arena != null,直接返回,進入arenaExchange邏輯處理
else if (arena != null)
return null;
else {
p.item = item;
if (U.compareAndSwapObject(this, SLOT, null, p))
break;
p.item = null;
}
}
/* * 等待 release * 進入spin+block模式 */
int h = p.hash;
long end = timed ? System.nanoTime() + ns : 0L;
int spins = (NCPU > 1) ? SPINS : 1;
Object v;
while ((v = p.match) == null) {
if (spins > 0) {
h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
if (h == 0)
h = SPINS | (int)t.getId();
else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
Thread.yield();
}
else if (slot != p)
spins = SPINS;
else if (!t.isInterrupted() && arena == null &&
(!timed || (ns = end - System.nanoTime()) > 0L)) {
U.putObject(t, BLOCKER, this);
p.parked = t;
if (slot == p)
U.park(false, ns);
p.parked = null;
U.putObject(t, BLOCKER, null);
}
else if (U.compareAndSwapObject(this, SLOT, p, null)) {
v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
break;
}
}
U.putOrderedObject(p, MATCH, null);
p.item = null;
p.hash = h;
return v;
}
複製代碼
程序首先經過participant獲取當前線程節點Node。檢測是否中斷,若是中斷return null,等待後續拋出InterruptedException異常。
若是slot不爲null,則進行slot消除,成功直接返回數據V,不然失敗,則建立arena消除數組。
若是slot爲null,但arena不爲null,則返回null,進入arenaExchange邏輯。
若是slot爲null,且arena也爲null,則嘗試佔領該slot,失敗重試,成功則跳出循環進入spin+block(自旋+阻塞)模式。
在自旋+阻塞模式中,首先取得結束時間和自旋次數。若是match(作releasing操做的線程傳遞的項)爲null,其首先嚐試spins+隨機次自旋(改自旋使用當前節點中的hash,並改變之)和退讓。當自旋數爲0後,假如slot發生了改變(slot != p)則重置自旋數並重試。不然假如:當前未中斷&arena爲null&(當前不是限時版本或者限時版本+當前時間未結束):阻塞或者限時阻塞。假如:當前中斷或者arena不爲null或者當前爲限時版本+時間已經結束:不限時版本:置v爲null;限時版本:若是時間結束以及未中斷則TIMED_OUT;不然給出null(緣由是探測到arena非空或者當前線程中斷)。
match不爲空時跳出循環。
整個slotExchange清晰明瞭。
private final Object arenaExchange(Object item, boolean timed, long ns) {
Node[] a = arena;
Node p = participant.get();
for (int i = p.index;;) { // access slot at i
int b, m, c; long j; // j is raw array offset
Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
if (q != null && U.compareAndSwapObject(a, j, q, null)) {
Object v = q.item; // release
q.match = item;
Thread w = q.parked;
if (w != null)
U.unpark(w);
return v;
}
else if (i <= (m = (b = bound) & MMASK) && q == null) {
p.item = item; // offer
if (U.compareAndSwapObject(a, j, null, p)) {
long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
Thread t = Thread.currentThread(); // wait
for (int h = p.hash, spins = SPINS;;) {
Object v = p.match;
if (v != null) {
U.putOrderedObject(p, MATCH, null);
p.item = null; // clear for next use
p.hash = h;
return v;
}
else if (spins > 0) {
h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
if (h == 0) // initialize hash
h = SPINS | (int)t.getId();
else if (h < 0 && // approx 50% true
(--spins & ((SPINS >>> 1) - 1)) == 0)
Thread.yield(); // two yields per wait
}
else if (U.getObjectVolatile(a, j) != p)
spins = SPINS; // releaser hasn't set match yet
else if (!t.isInterrupted() && m == 0 &&
(!timed ||
(ns = end - System.nanoTime()) > 0L)) {
U.putObject(t, BLOCKER, this); // emulate LockSupport
p.parked = t; // minimize window
if (U.getObjectVolatile(a, j) == p)
U.park(false, ns);
p.parked = null;
U.putObject(t, BLOCKER, null);
}
else if (U.getObjectVolatile(a, j) == p &&
U.compareAndSwapObject(a, j, p, null)) {
if (m != 0) // try to shrink
U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
p.item = null;
p.hash = h;
i = p.index >>>= 1; // descend
if (Thread.interrupted())
return null;
if (timed && m == 0 && ns <= 0L)
return TIMED_OUT;
break; // expired; restart
}
}
}
else
p.item = null; // clear offer
}
else {
if (p.bound != b) { // stale; reset
p.bound = b;
p.collides = 0;
i = (i != m || m == 0) ? m : m - 1;
}
else if ((c = p.collides) < m || m == FULL ||
!U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
p.collides = c + 1;
i = (i == 0) ? m : i - 1; // cyclically traverse
}
else
i = m + 1; // grow
p.index = i;
}
}
}
複製代碼
首先經過participant取得當前節點Node,而後根據當前節點Node的index去取arena中相對應的節點node。前面提到過arena能夠確保不一樣的slot在arena中是不會相沖突的,那麼是怎麼保證的呢?咱們先看arena的建立:
arena = new Node[(FULL + 2) << ASHIFT];
複製代碼
這個arena到底有多大呢?咱們先看FULL 和ASHIFT的定義:
static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;
private static final int ASHIFT = 7;
private static final int NCPU = Runtime.getRuntime().availableProcessors();
private static final int MMASK = 0xff; // 255
複製代碼
假如個人機器NCPU = 8 ,則獲得的是768大小的arena數組。而後經過如下代碼取得在arena中的節點:
Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
複製代碼
他仍然是經過右移ASHIFT位來取得Node的,ABASE定義以下:
Class<?> ak = Node[].class;
ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT);
複製代碼
U.arrayBaseOffset獲取對象頭長度,數組元素的大小能夠經過unsafe.arrayIndexScale(T[].class) 方法獲取到。這也就是說要訪問類型爲T的第N個元素的話,你的偏移量offset應該是arrayOffset+N*arrayScale。也就是說BASE = arrayOffset+ 128 。其次咱們再看Node節點的定義
@sun.misc.Contended static final class Node{
....
}
複製代碼
在Java 8 中咱們是能夠利用sun.misc.Contended來規避僞共享的。因此說經過 << ASHIFT方式加上sun.misc.Contended,因此使得任意兩個可用Node不會再同一個緩存行中。
關於僞共享請參考以下博文:
Java8中用sun.misc.Contended避免僞共享(false sharing)
咱們再次回到arenaExchange()。取得arena中的node節點後,若是定位的節點q 不爲空,且CAS操做成功,則交換數據,返回交換的數據,喚醒等待的線程。
若是q等於null且下標在bound & MMASK範圍以內,則嘗試佔領該位置,若是成功,則採用自旋 + 阻塞的方式進行等待交換數據。
若是下標不在bound & MMASK範圍以內獲取因爲q不爲null可是競爭失敗的時候:消除p。加入bound 不等於當前節點的bond(b != p.bound),則更新p.bound = b,collides = 0 ,i = m或者m - 1。若是衝突的次數不到m 獲取m 已經爲最大值或者修改當前bound的值失敗,則經過增長一次collides以及循環遞減下標i的值;不然更新當前bound的值成功:咱們令i爲m+1即爲此時最大的下標。最後更新當前index的值。
Exchanger使用、原理都比較好理解,可是這個源碼看起來真心有點兒複雜,是真心難看懂,可是這種交換的思路Doug Lea在後續博文中還會提到,例如SynchronousQueue、LinkedTransferQueue。
最後用一個在網上看到的段子結束此篇博客(brokendreams.iteye.com/blog/225395…
其實就是」我」和」你」(可能有多個」我」,多個」你」)在一個叫Slot的地方作交易(一手交錢,一手交貨),過程分如下步驟: