Exchanger 是一個用於線程間協做的工具類,Exchanger用於進行線程間的數據交換,它提供一個同步點,在這個同步點,兩個線程能夠交換彼此的數據。這兩個線程經過exchange 方法交換數據,若是第一個線程先執行exchange 方法,它會一直等待第二個線程也執行exchange 方法,當兩個線程都到達同步點時,這兩個線程就能夠交換數據。html
源碼:java
package java.util.concurrent; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; public class Exchanger<V> { private static final int ASHIFT = 7; private static final int MMASK = 0xff; private static final int SEQ = MMASK + 1; private static final int NCPU = Runtime.getRuntime().availableProcessors(); static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1; private static final int SPINS = 1 << 10;// 自旋次數 private static final Object NULL_ITEM = new Object();//若是交換的數據爲null,則用NULL_ITEM代替 private static final Object TIMED_OUT = new Object(); private final Participant participant;//每一個線程的數據,ThreadLocal 子類 private volatile Node[] arena; private volatile Node slot;// 用於交換數據的槽位 private volatile int bound; @sun.misc.Contended static final class Node { int index; //arena的下標,多個槽位的時候利用 int bound; // 上一次記錄的Exchanger.bound; int collides; // 在當前bound下CAS失敗的次數; int hash; // 用於自旋; Object item; // 這個線程的當前項,也就是須要交換的數據; volatile Object match; // 交換的數據 volatile Thread parked; // 線程 } static final class Participant extends ThreadLocal<Node> { // 初始值返回Node public Node initialValue() { return new Node(); } } private final Object arenaExchange(Object item, boolean timed, long ns) { // 槽位數組 Node[] a = arena; //表明當前線程的Node Node p = participant.get(); // p.index 初始值爲 0 for (int i = p.index; ; ) { // access slot at i int b, m, c; long j; // j is raw array offset //在槽位數組中根據"索引" i 取出數據 j至關因而 "第一個"槽位 Node q = (Node) U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE); // 該位置上有數據(即有線程在這裏等待交換數據) if (q != null && U.compareAndSwapObject(a, j, q, null)) { // 進行數據交換,這裏和單槽位的交換是同樣的 Object v = q.item; // release q.match = item; Thread w = q.parked; if (w != null) U.unpark(w); return v; } // bound 是最大的有效的 位置,和MMASK相與,獲得真正的存儲數據的索引最大值 else if (i <= (m = (b = bound) & MMASK) && q == null) { // i 在這個範圍內,該槽位也爲空 //將須要交換的數據 設置給p p.item = item; // offer //設置該槽位數據(在該槽位等待其它線程來交換數據) if (U.compareAndSwapObject(a, j, null, p)) { long end = (timed && m == 0) ? System.nanoTime() + ns : 0L; Thread t = Thread.currentThread(); // wait // 進行必定時間的自旋 for (int h = p.hash, spins = SPINS; ; ) { Object v = p.match; //在自旋的過程當中,有線程來和該線程交換數據 if (v != null) { //交換數據後,清空部分設置,返回交換獲得的數據,over U.putOrderedObject(p, MATCH, null); p.item = null; // clear for next use p.hash = h; return v; } else if (spins > 0) { h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift if (h == 0) // initialize hash h = SPINS | (int) t.getId(); else if (h < 0 && // approx 50% true (--spins & ((SPINS >>> 1) - 1)) == 0) Thread.yield(); // two yields per wait } // 交換數據的線程到來,可是尚未設置好match,再稍等一會 else if (U.getObjectVolatile(a, j) != p) spins = SPINS; //符合條件,特別注意m==0 這個說明已經到達area 中最小的存儲數據槽位了 //沒有其餘線程在槽位等待了,全部當前線程須要阻塞在這裏 else if (!t.isInterrupted() && m == 0 && (!timed || (ns = end - System.nanoTime()) > 0L)) { U.putObject(t, BLOCKER, this); // emulate LockSupport p.parked = t; // minimize window // 再次檢查槽位,看看在阻塞前,有沒有線程來交換數據 if (U.getObjectVolatile(a, j) == p) U.park(false, ns); // 掛起 p.parked = null; U.putObject(t, BLOCKER, null); } // 當前這個槽位一直沒有線程來交換數據,準備換個槽位試試 else if (U.getObjectVolatile(a, j) == p && U.compareAndSwapObject(a, j, p, null)) { //更新bound if (m != 0) // try to shrink U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1); p.item = null; p.hash = h; // 減少索引值 往"第一個"槽位的方向挪動 i = p.index >>>= 1; // descend // 發送中斷,返回null if (Thread.interrupted()) return null; // 超時 if (timed && m == 0 && ns <= 0L) return TIMED_OUT; break; // expired; restart 繼續主循環 } } } else //佔據槽位失敗,先清空item,防止成功交換數據後,p.item還引用着item p.item = null; // clear offer } else { // i 不在有效範圍,或者被其它線程搶先了 //更新p.bound if (p.bound != b) { // stale; reset p.bound = b; //新bound ,重置collides p.collides = 0; //i若是達到了最大,那麼就遞減 i = (i != m || m == 0) ? m : m - 1; } else if ((c = p.collides) < m || m == FULL || !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) { p.collides = c + 1; // 更新衝突 // i=0 那麼就從m開始,不然遞減i i = (i == 0) ? m : i - 1; // cyclically traverse } else //遞增,日後挪動 i = m + 1; // grow // 更新index p.index = i; } } } private final Object slotExchange(Object item, boolean timed, long ns) { // 獲得一個初試的Node Node p = participant.get(); // 當前線程 Thread t = Thread.currentThread(); // 若是發生中斷,返回null,會重設中斷標誌位,並無直接拋異常 if (t.isInterrupted()) // preserve interrupt status so caller can recheck return null; for (Node q;;) { // 槽位 solt不爲null,則說明已經有線程在這裏等待交換數據了 if ((q = slot) != null) { // 重置槽位 if (U.compareAndSwapObject(this, SLOT, q, null)) { //獲取交換的數據 Object v = q.item; //等待線程須要的數據 q.match = item; //等待線程 Thread w = q.parked; //喚醒等待的線程 if (w != null) U.unpark(w); return v; // 返回拿到的數據,交換完成 } // create arena on contention, but continue until slot null //存在競爭,其它線程搶先了一步該線程,所以須要採用多槽位模式,這個後面再分析 if (NCPU > 1 && bound == 0 && U.compareAndSwapInt(this, BOUND, 0, SEQ)) arena = new Node[(FULL + 2) << ASHIFT]; }else if (arena != null) //多槽位不爲空,須要執行多槽位交換 return null; // caller must reroute to arenaExchange else { //尚未其餘線程來佔據槽位 p.item = item; // 設置槽位爲p(也就是槽位被當前線程佔據) if (U.compareAndSwapObject(this, SLOT, null, p)) break; // 退出無限循環 p.item = null; // 若是設置槽位失敗,則有可能其餘線程搶先了,重置item,從新循環 } } //當前線程佔據槽位,等待其它線程來交換數據 int h = p.hash; long end = timed ? System.nanoTime() + ns : 0L; int spins = (NCPU > 1) ? SPINS : 1; Object v; // 直到成功交換到數據 while ((v = p.match) == null) { if (spins > 0) { // 自旋 h ^= h << 1; h ^= h >>> 3; h ^= h << 10; if (h == 0) h = SPINS | (int)t.getId(); else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0) // 主動讓出cpu,這樣能夠提供cpu利用率(反正當前線程也自旋等待,還不如讓其它任務佔用cpu) Thread.yield(); } else if (slot != p) //其它線程來交換數據了,修改了solt,可是尚未設置match,再稍等一會 spins = SPINS; //須要阻塞等待其它線程來交換數據 //沒發生中斷,而且是單槽交換,沒有設置超時或者超時時間未到 則繼續執行 else if (!t.isInterrupted() && arena == null && (!timed || (ns = end - System.nanoTime()) > 0L)) { // cas 設置BLOCKER,能夠參考Thread 中的parkBlocker U.putObject(t, BLOCKER, this); // 須要掛起當前線程 p.parked = t; if (slot == p) U.park(false, ns); // 阻塞當前線程 // 被喚醒後 p.parked = null; // 清空 BLOCKER U.putObject(t, BLOCKER, null); } // 不知足前面 else if 條件,交換失敗,須要重置solt else if (U.compareAndSwapObject(this, SLOT, p, null)) { v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null; break; } } //清空match U.putOrderedObject(p, MATCH, null); p.item = null; p.hash = h; // 返回交換獲得的數據(失敗則爲null) return v; } public Exchanger() { participant = new Participant(); } public V exchange(V x) throws InterruptedException { Object v; Object item = (x == null) ? NULL_ITEM : x; if ((arena != null || (v = slotExchange(item, false, 0L)) == null) && ((Thread.interrupted() || (v = arenaExchange(item, false, 0L)) == null))) throw new InterruptedException(); return (v == NULL_ITEM) ? null : (V)v; } public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { Object v; Object item = (x == null) ? NULL_ITEM : x; long ns = unit.toNanos(timeout); if ((arena != null || (v = slotExchange(item, true, ns)) == null) && ((Thread.interrupted() || (v = arenaExchange(item, true, ns)) == null))) throw new InterruptedException(); if (v == TIMED_OUT) throw new TimeoutException(); return (v == NULL_ITEM) ? null : (V)v; } private static final sun.misc.Unsafe U; private static final long BOUND; private static final long SLOT; private static final long MATCH; private static final long BLOCKER; private static final int ABASE; static { int s; try { U = sun.misc.Unsafe.getUnsafe(); Class<?> ek = Exchanger.class; Class<?> nk = Node.class; Class<?> ak = Node[].class; Class<?> tk = Thread.class; BOUND = U.objectFieldOffset (ek.getDeclaredField("bound")); SLOT = U.objectFieldOffset (ek.getDeclaredField("slot")); MATCH = U.objectFieldOffset (nk.getDeclaredField("match")); BLOCKER = U.objectFieldOffset (tk.getDeclaredField("parkBlocker")); s = U.arrayIndexScale(ak); ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT); } catch (Exception e) { throw new Error(e); } if ((s & (s-1)) != 0 || s > (1 << ASHIFT)) throw new Error("Unsupported array scale"); } }
類型參數:api
V
- 能夠交換的對象類型數組
每一個線程將條目上的某個方法呈現給exchange()
方法,與夥伴線程進行匹配,而且在返回時接收其夥伴的對象。app
用法示例:使用 Exchanger
在線程間交換緩衝區。在須要時,填充緩衝區的線程獲取一個新騰空的緩衝區,並將填滿的緩衝區傳遞給騰空緩衝區的線程。
ide
class FillAndEmpty { Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>(); DataBuffer initialEmptyBuffer = ... DataBuffer initialFullBuffer = ... class FillingLoop implements Runnable { public void run() { DataBuffer currentBuffer = initialEmptyBuffer; try { while (currentBuffer != null) { addToBuffer(currentBuffer); if (currentBuffer.isFull()) currentBuffer = exchanger.exchange(currentBuffer); } } catch (InterruptedException ex) { ... handle ... } } } class EmptyingLoop implements Runnable { public void run() { DataBuffer currentBuffer = initialFullBuffer; try { while (currentBuffer != null) { takeFromBuffer(currentBuffer); if (currentBuffer.isEmpty()) currentBuffer = exchanger.exchange(currentBuffer); } } catch (InterruptedException ex) { ... handle ...} } } void start() { new Thread(new FillingLoop()).start(); new Thread(new EmptyingLoop()).start(); } }
Exchanger() 建立一個新的 Exchanger。 |
V |
exchange(V x) 等待另外一個線程到達此交換點(除非當前線程被中斷),而後將給定的對象傳送給該線程,並接收該線程的對象。 |
V |
exchange(V x, long timeout, TimeUnit unit) 等待另外一個線程到達此交換點(除非當前線程被中斷,或者超出了指定的等待時間),而後將給定的對象傳送給該線程,同時接收該線程的對象。 |
public Exchanger()
建立一個新的 Exchanger。工具
public V exchange(V x) throws InterruptedException
等待另外一個線程到達此交換點(除非當前線程被 中斷),而後將給定的對象傳送給該線程,並接收該線程的對象。oop
若是另外一個線程已經在交換點等待,則出於線程調度目的,繼續執行此線程,並接收當前線程傳入的對象。當前線程當即返回,接收其餘線程傳遞的交換對象。this
若是尚未其餘線程在交換點等待,則出於調度目的,禁用當前線程,且在發生如下兩種狀況之一前,該線程將一直處於休眠狀態:atom
中斷
當前線程。若是當前線程:
則拋出 InterruptedException
,而且清除當前線程的已中斷狀態。
參數:
x
- 要交換的對象
返回:
另外一個線程提供的對象
拋出:
InterruptedException
- 若是當前線程在等待時被中斷
public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
等待另外一個線程到達此交換點(除非當前線程被 中斷,或者超出了指定的等待時間),而後將給定的對象傳送給該線程,同時接收該線程的對象。
若是另外一個線程已經在交換點上等待,則出於線程調度目的,繼續執行此線程,並接收當前線程傳入的對象。當前線程當即返回,並接收其餘線程傳遞的交換對象。
若是尚未其餘線程在交換點等待,則出於調度目的,禁用當前線程,且在發生如下三種狀況之一前,該線程將一直處於休眠狀態:
若是當前線程:
則拋出 InterruptedException
,而且清除當前線程的已中斷狀態。
若是超出指定的等待時間,則拋出 TimeoutException
異常。若是該時間小於等於零,則此方法根本不會等待。
參數:
x
- 要交換的對象
timeout
- 要等待的最長時間
unit
- timeout 參數的時間單位
返回:
其餘線程提供的對象
拋出:
InterruptedException
- 若是當前線程在等待時被中斷
TimeoutException
- 若是在另外一個線程進入交換點以前已經到達指定的等待時間
線程交換各自擁有的值:
package com.thread; import java.util.concurrent.Exchanger; public class ExchangerDemo extends Thread { private Exchanger<String> exchanger; private String name; public ExchangerDemo(String name, Exchanger<String> exchanger) { this.exchanger = exchanger; this.name = name; } public void run() { try { System.out.println(Thread.currentThread().getName() + ": " + exchanger.exchange(this.name)); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { Exchanger<String> exchanger = new Exchanger<>(); ExchangerDemo exchangerDemo1 = new ExchangerDemo("demo1", exchanger); exchangerDemo1.setName("exchanger1"); ExchangerDemo exchangerDemo2 = new ExchangerDemo("demo2", exchanger); exchangerDemo2.setName("exchanger2"); exchangerDemo1.start(); exchangerDemo2.start(); } }
運行結果:
exchanger1: demo2
exchanger2: demo1
從輸出的值能夠看到,兩個線程的值已經發生了交換。