在JUC包中,除了一些經常使用的或者說常見的併發工具類(ReentrantLock,CountDownLatch,CyclicBarrier,Semaphore)等,還有一個不經常使用的線程同步器類 —— Exchanger。node
一個用於兩個工做線程之間交換數據的封裝工具類,簡單說就是一個線程在完成必定事物後想與另外一個線程交換數據,則第一個先拿出數據的線程會一直等待第二個線程,直到第二個線程拿着數據到來時才能彼此交換對應數據。 **算法
Exchanger 用於兩個線程間交換數據,固然實際參與的線程能夠不止兩個,測試用例以下:多線程
private static void test1() throws InterruptedException {
Exchanger<String> exchanger = new Exchanger<>();
CountDownLatch countDownLatch = new CountDownLatch(5);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
String origMsg = RandomStringUtils.randomNumeric(6);
// 先到達的線程會在此等待,直到有一個線程跟它交換數據或者等待超時
String exchangeMsg = exchanger.exchange(origMsg,5, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName() + "\t origMsg:" + origMsg + "\t exchangeMsg:" + exchangeMsg);
} catch (InterruptedException e) {
} catch (TimeoutException e) {
}finally {
0 origMsg:524053 exchangeMsg:098544
3 origMsg:433246 exchangeMsg:956604
4 origMsg:098544 exchangeMsg:524053
1 origMsg:956604 exchangeMsg:433246
at java.util.concurrent.Exchanger.exchange(Exchanger.java:626)
at com.nuih.juc.ExchangerDemo.lambda$test1$0(ExchangerDemo.java:37)
at java.lang.Thread.run(Thread.java:748)
private static void test2() throws InterruptedException {
Exchanger<String> exchanger = new Exchanger<>();
CountDownLatch countDownLatch = new CountDownLatch(4);
CyclicBarrier cyclicBarrier = new CyclicBarrier(4);
// 生產者
Runnable producer = new Runnable() {
public void run() {
for (int i = 0; i < 5; i++) {
String msg = RandomStringUtils.randomNumeric(6);
System.out.println(Thread.currentThread().getName() + "\t producer msg -> " + msg + " ,\t i -> " + i);
}catch (Exception e){
}finally {
// 消費者
Runnable consumer = new Runnable() {
public void run() {
for (int i = 0; i < 5; i++) {
String msg = exchanger.exchange(null,5,TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName() + "\t consumer msg -> " + msg + ",\t" + i);
}catch (Exception e){
}finally {
for (int i = 0; i < 2; i++){
new Thread(producer).start();
new Thread(consumer).start();
Thread-3 consumer msg -> null, 0
Thread-1 consumer msg -> null, 0
Thread-1 consumer msg -> null, 1
Thread-2 producer msg -> 640010 , i -> 0
Thread-2 producer msg -> 733133 , i -> 1
Thread-3 consumer msg -> null, 1
Thread-3 consumer msg -> 476520, 2
Thread-1 consumer msg -> 640010, 2
Thread-1 consumer msg -> null, 3
Thread-0 producer msg -> 993414 , i -> 0
Thread-0 producer msg -> 292745 , i -> 1
Thread-2 producer msg -> 476520 , i -> 2
Thread-2 producer msg -> 408446 , i -> 3
Thread-3 consumer msg -> null, 3
Thread-1 consumer msg -> 292745, 4
Thread-2 producer msg -> 251971 , i -> 4
Thread-0 producer msg -> 078939 , i -> 2
Thread-3 consumer msg -> 251971, 4
at java.util.concurrent.Exchanger.exchange(Exchanger.java:626)
at com.nuih.juc.ExchangerDemo$1.run(ExchangerDemo.java:70)
at java.lang.Thread.run(Thread.java:748)
Process finished with exit code 0
Exchanger 類圖 其內部主要變量和方法以下:
// ThreadLocal變量,每一個線程都有之間的一個副本
private final Participant participant;
// 高併發下使用的,保存待匹配的Node實例
private volatile Node[] arena;
// 低併發下,arena未初始化時使用的保存待匹配的Node實例
private volatile Node slot;
// 初始值爲0,當建立arena後被負責SEQ,用來記錄arena數組的可用最大索引,
// 會隨着併發的增大而增大直到等於最大值FULL,
// 會隨着並行的線程逐一匹配成功而減小恢復成初始值
private volatile int bound;
// Unsafe mechanics
private static final sun.misc.Unsafe U;
private static final long BOUND;
private static final long SLOT;
private static final long MATCH;
private static final long BLOCKER;
private static final int ABASE;
static {
int s;
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> ek = Exchanger.class;
Class<?> nk = Node.class;
Class<?> ak = Node[].class;
Class<?> tk = Thread.class;
BOUND = U.objectFieldOffset
SLOT = U.objectFieldOffset
MATCH = U.objectFieldOffset
BLOCKER = U.objectFieldOffset
s = U.arrayIndexScale(ak);
// ABASE absorbs padding in front of element 0
ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT);
} catch (Exception e) {
throw new Error(e);
if ((s & (s-1)) != 0 || s > (1 << ASHIFT))
throw new Error("Unsupported array scale");
Exchanger 定義來多個靜態變量,以下:
// 初始化arena時使用, 1 << ASHIFT 是一個緩存行的大小,避免來不一樣的Node落入到同一個高速緩存行
// 這裏實際是把數組容量擴大來8倍,原來索引相鄰的兩個元素,擴容後中間隔來7個元素,從元素的起始地址上看就隔來8個元素,中間的7個都是空的,爲來避免原來相鄰的兩個元素都落入到同一個緩存行中
// 由於arena是對象數組,一個元素佔8字節,8個就是64字節
private static final int ASHIFT = 7;
// arena 數組元素的索引最大值即255
private static final int MMASK = 0xff;
// arena 數組的最大長度即256
private static final int SEQ = MMASK + 1;
// 獲取CPU核數
private static final int NCPU = Runtime.getRuntime().availableProcessors();
// 實際的數組長度,由於是線程兩兩配對的,因此最大長度是核數除以2
static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;
// 自旋等待的次數
private static final int SPINS = 1 << 10;
// 若是交換的對象是null,則返回此對象
private static final Object NULL_ITEM = new Object();
// 若是等待超時致使交換失敗,則返回此對象
private static final Object TIMED_OUT = new Object();
Exchanger類中有兩個內部類,一個Node,一個Participant。 Participant繼承了ThreadLocal而且重寫了其initialValue方法,返回一個Node對象。其定義以下:
@sun.misc.Contended static final class Node {
int index; // Arena index
int bound; // Last recorded value of Exchanger.bound
int collides; // Number of CAS failures at current bound
int hash; // Pseudo-random for spins
Object item; // This thread's current item
volatile Object match; // Item provided by releasing thread
volatile Thread parked; // Set to this thread when parked, else null
/** The corresponding thread local class */
static final class Participant extends ThreadLocal<Node> {
public Node initialValue() { return new Node(); }
public V exchange(V x) throws InterruptedException {
Object v;
Object item = (x == null) ? NULL_ITEM : x; // translate null args
if ((arena != null || // 是null就執行後面的方法
(v = slotExchange(item, false, 0L)) == null) &&
// 若是執行slotExchange有結果就執行後面的,不然返回
((Thread.interrupted() || // 非中斷則執行後面的方法
(v = arenaExchange(item, false, 0L)) == null)))
throw new InterruptedException();
return (v == NULL_ITEM) ? null : (V)v;
exchange 方法的執行步驟:
slotExchange 是基於slot屬性來完成交換的,調用soltExchange方法時,若是slot屬性爲null,當前線程會將slot屬性由null修改爲當前線程的Node,若是修改失敗則下一次for循環走solt屬性不爲null的邏輯,若是修改爲功則自旋等待,自旋必定次數後經過Unsafe的park方法噹噹前線程休眠,能夠指定休眠的時間,若是沒有指定則無限期休眠直到被喚醒;不管是由於線程中斷被喚醒,等待超時被喚醒仍是其它線程unpark喚醒的,都會檢查當前線程的Node的屬性釋放爲null,若是不爲null說明交互成功,返回該對象;不然返回null或者TIME_OUT,在返回前會將item,match等屬性置爲null,保存以前自旋時計算的hash值,方便下一次調用slotExchange。
// arena 爲null是會調用此方法,返回null表示交換失敗
// item是交換的對象,timed表示是否等待指定的時間,爲false表示無限期等待,ns爲等待時間
private final Object slotExchange(Object item, boolean timed, long ns) {
// 獲取當前線程關聯的participant Node
Node p = participant.get();
Thread t = Thread.currentThread();
// 被中斷,返回null
if (t.isInterrupted()) // preserve interrupt status so caller can recheck
return null;
for (Node q;;) {
if ((q = slot) != null) { // slot 不爲null
// 將slot置爲null,slot對應的線程與當前線程匹配成功
if (U.compareAndSwapObject(this, SLOT, q, null)) {
Object v = q.item;
// 保存item,即完成交互
q.match = item;
// 喚醒q對應的處於休眠狀態的線程
Thread w = q.parked;
if (w != null)
return v;
// slot修改失敗,其它某個線程搶佔來該slot,多個線程同時調用exchange方法會觸發此邏輯
// bound等於0表示未初始化,此處校驗避免重複初始化
if (NCPU > 1 && bound == 0 &&
U.compareAndSwapInt(this, BOUND, 0, SEQ))
arena = new Node[(FULL + 2) << ASHIFT];
else if (arena != null)
return null; // carena不爲null,經過arenaExchange交互
else {
// slot和arena都爲null
p.item = item;
// 修改slot爲p,修改爲功則終止循環
if (U.compareAndSwapObject(this, SLOT, null, p))
// 修改失敗則繼續for循環,將otem恢復成null
p.item = null;
// 將slot修改成p後會進入此分支
int h = p.hash; // hash初始爲0
long end = timed ? System.nanoTime() + ns : 0L;
int spins = (NCPU > 1) ? SPINS : 1;
Object v;
// match保存着同其餘線程交換的對象,若是不爲null,說明交換成功了
while ((v = p.match) == null) {
// 執行自旋等待
if (spins > 0) {
h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
if (h == 0)
h = SPINS | (int)t.getId(); 初始化h
// 只有生成的h小於0時才減小spins
else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
// slot被修改了,已經有匹配的線程,從新自旋,讀取屬性,由於是先修改slot再修改屬性的,二者由於CPU調度的問題可能有時間差
else if (slot != p)
spins = SPINS;
// 線程沒有被中斷且arena爲null
else if (!t.isInterrupted() && arena == null &&
(!timed || (ns = end - System.nanoTime()) > 0L)) {
U.putObject(t, BLOCKER, this);
p.parked = t;
if (slot == p)
U.park(false, ns);
// 線程被喚醒,繼續下一次for循環
// 若是是由於等待超時而被喚醒,下次for循環進入下沒的else if分支,返回TIMED_OUT
p.parked = null;
U.putObject(t, BLOCKER, null);
// 將slot修改爲p
else if (U.compareAndSwapObject(this, SLOT, p, null)) {
// timed爲flase,無限期等待,由於中斷被喚醒返回null
// timed爲ture,由於超時被喚醒,返回TIMED_OUT,由於中斷被喚醒返回null
v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
// 修改match爲null,item爲null,保存h,下一次exchange是h就不是初始值爲0了
U.putOrderedObject(p, MATCH, null);
// 重置 item
p.item = null;
// 保留僞隨機數,供下次種子數字
p.hash = h;
// 返回
return v;
在該方法中,會返回 2 種結果,一是有效的 item,二是 null 要麼是線程競爭使用 slot 了,建立了 arena 數組,要麼是線程中斷了。
// 搶佔slot失敗後進入此方法,arena不爲空
private final Object arenaExchange(Object item, boolean timed, long ns) {
Node[] a = arena;
Node p = participant.get();
// index初始爲0
for (int i = p.index;;) { // access slot at i
int b, m, c; long j; // j is raw array offset
// 在建立arena時,將原本的數組容量 << ASHIFT,爲了不數組元素落到了同一個高速緩存行
// 這裏獲取真實的數組元素索引時也須要 << ASHIFR
Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
// 若是q不爲null,則將對應的數組元素置爲null,表示當前線程和該元素對應的線程匹配l
if (q != null && U.compareAndSwapObject(a, j, q, null)) {
Object v = q.item; // release
q.match = item; // 保存item,交互成功
Thread w = q.parked;
if (w != null) // 喚醒等待的線程
return v;
// q爲null 或者q不爲null,cas搶佔q失敗了
// bound初始化時時SEQ,SEQ & MMASK 就是0,即m的初始值就0,m爲0時,i確定爲0
else if (i <= (m = (b = bound) & MMASK) && q == null) {
p.item = item; // offer
if (U.compareAndSwapObject(a, j, null, p)) {
long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
Thread t = Thread.currentThread(); // wait
for (int h = p.hash, spins = SPINS;;) {
Object v = p.match;
if (v != null) {
U.putOrderedObject(p, MATCH, null);
p.item = null; // clear for next use
p.hash = h;
return v;
else if (spins > 0) {
h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
if (h == 0) // initialize hash
h = SPINS | (int)t.getId();
else if (h < 0 && // approx 50% true
(--spins & ((SPINS >>> 1) - 1)) == 0)
Thread.yield(); // two yields per wait
else if (U.getObjectVolatile(a, j) != p)
spins = SPINS; // releaser hasn't set match yet
else if (!t.isInterrupted() && m == 0 &&
(!timed ||
(ns = end - System.nanoTime()) > 0L)) {
U.putObject(t, BLOCKER, this); // emulate LockSupport
p.parked = t; // minimize window
if (U.getObjectVolatile(a, j) == p)
U.park(false, ns);
p.parked = null;
U.putObject(t, BLOCKER, null);
else if (U.getObjectVolatile(a, j) == p &&
U.compareAndSwapObject(a, j, p, null)) {
if (m != 0) // try to shrink
U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
p.item = null;
p.hash = h;
i = p.index >>>= 1; // descend
if (Thread.interrupted())
return null;
if (timed && m == 0 && ns <= 0L)
return TIMED_OUT;
break; // expired; restart
p.item = null; // clear offer
else {
if (p.bound != b) { // stale; reset
p.bound = b;
p.collides = 0;
i = (i != m || m == 0) ? m : m - 1;
else if ((c = p.collides) < m || m == FULL ||
!U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
p.collides = c + 1;
i = (i == 0) ? m : i - 1; // cyclically traverse
i = m + 1; // grow
p.index = i;
Exchange 和 SynchronousQueue 相似,都是經過兩個線程操做同一個對象實現數據交換,只不過就像咱們開始說的,SynchronousQueue 使用的是同一個屬性,經過不一樣的 isData 來區分,多線程併發時,使用了隊列進行排隊。
Exchange 使用了一個對象裏的兩個屬性,item 和 match,就不須要 isData 屬性了,由於在 Exchange 裏面,沒有 isData 這個語義。而多線程併發時,使用數組來控制,每一個線程訪問數組中不一樣的槽。
