前言node
Exchanger應該算併發包中工具使用相對少的,由於它主要用於線程之間交換數據,它的用法比較簡單在不一樣線程之間使用exchange方法交換數據,可是內部實現比較巧妙,使用了unsafe的CAS原子操做、自旋來解決衝突問題,下面咱們經過源碼一探究竟。算法
若是想學習Java工程化、高性能及分佈式、深刻淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友能夠加個人Java高級交流:854630135,羣裏有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給你們。
數組
源碼緩存
先看看源碼註釋中關於核心算法的介紹多線程
for (;;) { if (slot is empty) { // slot爲空時,將item 設置到Node 中 place item in a Node; if (can CAS slot from empty to node) { // 當將node經過CAS交換到slot中時,掛起線程等待被喚醒 wait for release; // 被喚醒後返回node中匹配到的item return matching item in node; } } else if (can CAS slot from node to empty) { // release // 將slot設置爲空 // 獲取node中的item,將須要交換的數據設置到匹配的item get the item in node; set matching item in node; // 喚醒等待的線程 release waiting thread; } // else retry on CAS failure }
好比有2條線程A和B,A線程交換數據時,發現slot爲空,則將須要交換的數據放在slot中等待其它線程進來交換數據,等線程B進來,讀取A設置的數據,而後設置線程B須要交換的數據,而後喚醒A線程,原理就是這麼簡單。當時當多個線程之間進行交換數據時就會出現問題,因此Exchanger加入了slot數組。併發
Exchanger 屬性及構造器app
// 用於左移Node數組下標,從而得出數據在內存中的偏移量來獲取數據,避免僞共享 private static final int ASHIFT = 7; // note數組最大下標 private static final int MMASK = 0xff; // 用於遞增bound,每次加一個SEQ private static final int SEQ = MMASK + 1; // CPU核心數 private static final int NCPU = Runtime.getRuntime().availableProcessors(); // 當前數組最大的下標(多處理器狀況下) static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1; // 自旋次數,CPU核心爲1個時,自旋被禁用 private static final int SPINS = 1 << 10; // 空對象,用於當線程exchange方法中參數爲null時傳遞給其餘線程的對象 private static final Object NULL_ITEM = new Object(); // 用於超時時傳遞的對象 private static final Object TIMED_OUT = new Object(); // Participant 繼承了ThreadLocal,也就是說該對象用於獲取每條線程中存放的值 private final Participant participant; // 多個線程交換 private volatile Node[] arena; // 用於2個線程交換 private volatile Node slot; // 該值主要用於與 private volatile int bound; // 經過unsafe用於CAS操做 private static final sun.misc.Unsafe U; private static final long BOUND; private static final long SLOT; private static final long MATCH; private static final long BLOCKER; private static final int ABASE; static { int s; try { U = sun.misc.Unsafe.getUnsafe(); Class<?> ek = Exchanger.class; Class<?> nk = Node.class; Class<?> ak = Node[].class; Class<?> tk = Thread.class; // bound屬性在Exchanger對象中的偏移地址 BOUND = U.objectFieldOffset (ek.getDeclaredField("bound")); // slot屬性在Exchanger對象中的偏移地址 SLOT = U.objectFieldOffset (ek.getDeclaredField("slot")); // slot屬性在Node對象中的偏移地址 MATCH = U.objectFieldOffset (nk.getDeclaredField("match")); // parkBlocker屬性在Thread對象中的偏移地址 BLOCKER = U.objectFieldOffset (tk.getDeclaredField("parkBlocker")); // 獲取Node[]數組中每一個元素的大小,這裏是4 s = U.arrayIndexScale(ak); // ABASE absorbs padding in front of element 0 // 獲取Node[]數組中第一個元素的偏移地址 + 128 ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT); } catch (Exception e) { throw new Error(e); } if ((s & (s-1)) != 0 || s > (1 << ASHIFT)) // 這裏是爲了保證 Node數組中的元素不會爭用一個緩存行 throw new Error("Unsupported array scale"); }
構造器及內部類dom
public Exchanger() { participant = new Participant(); } // 內部類,用於記錄每一個線程的狀態 static final class Participant extends ThreadLocal<Node> { public Node initialValue() { return new Node(); } } // 包含須要交換的數據等信息 // Contended爲 JDK8 新增的註解,用於避免僞共享,提升程序性能 @sun.misc.Contended static final class Node { // arana數組中的下標 int index; // 上一次記錄的bound int bound; // cas操做失敗的次數 int collides; // 用於自旋的僞隨機數 int hash; // Pseudo-random for spins // 當前線程須要交換的數據 Object item; // This thread's current item // 匹配線程交換的數據 volatile Object match; // Item provided by releasing thread // 記錄當前掛起的線程 volatile Thread parked; // Set to this thread when parked, else null }
方法exchange分佈式
// 交換數據,參數X爲本線程提供給其它線程的數據 public V exchange(V x) throws InterruptedException { Object v; // 當參數爲null時須要將item設置爲空的對象 Object item = (x == null) ? NULL_ITEM : x; // translate null args // 注意到這裏的這個表達式是整個方法的核心 if ((arena != null || (v = slotExchange(item, false, 0L)) == null) && ((Thread.interrupted() || // disambiguates null return (v = arenaExchange(item, false, 0L)) == null))) throw new InterruptedException(); return (v == NULL_ITEM) ? null : (V)v; }
仔細看if裏的條件表達式,得知:ide
只有當arena爲null時,纔會執行slotExchange方法;
當arena不爲null或者(arena爲null且slotExchange方法返回null)時,此時線程未中斷,纔會執行arenaExchange方法;
線程中斷時,就會直接拋出線程中斷異常。
下面咱們來看slotExchange方法
1 // timed 爲true表示設置了超時時間,ns爲>0的值,反之沒有設置超時時間 2 private final Object slotExchange(Object item, boolean timed, long ns) { 3 // 獲取當前線程node對象 4 Node p = participant.get(); 5 Thread t = Thread.currentThread(); 6 if (t.isInterrupted()) // preserve interrupt status so caller can recheck 7 // 線程中斷返回null 8 return null; 9 10 // 自旋 11 for (Node q;;) { 12 // 將slot值賦給q 13 if ((q = slot) != null) { 14 // slot 不爲null,即表示已有線程已經把須要交換的數據設置在slot中了 15 // 經過CAS將slot設置成null 16 if (U.compareAndSwapObject(this, SLOT, q, null)) { 17 // CAS操做成功後,將slot中的item賦值給對象v,以便返回。 18 // 這裏也是就讀取以前線程要交換的數據 19 Object v = q.item; 20 // 將當前線程須要交給的數據設置在q中的match 21 q.match = item; 22 // 獲取被掛起的線程 23 Thread w = q.parked; 24 if (w != null) 25 // 若是線程不爲null,喚醒它 26 U.unpark(w); 27 // 返回其餘線程給的V 28 return v; 29 } 30 // CAS 操做失敗,表示有其它線程競爭,在此線程以前將數據已取走 31 // create arena on contention, but continue until slot null 32 if (NCPU > 1 && bound == 0 && 33 U.compareAndSwapInt(this, BOUND, 0, SEQ)) 34 // CPU爲多核心 35 // bound == 0 表示arena數組未初始化過,CAS操做bound將其增長SEQ 36 // 初始化arena數組 37 arena = new Node[(FULL + 2) << ASHIFT]; 38 } 39 // 上面分析過,只有當arena纔會執行slotExchange方法的 40 // 因此表示恰好已有其它線程加入進來將arena初始化 41 else if (arena != null) 42 // 這裏就須要去執行arenaExchange 43 return null; // caller must reroute to arenaExchange 44 else { 45 // 這裏表示當前線程是以第一個線程進來交換數據 46 // 或者表示以前的數據交換已進行完畢,這裏能夠看做是第一個線程 47 // 將須要交換的數據先存放在當前線程變量p中 48 p.item = item; 49 // 將須要交換的數據經過CAS設置到交換區slot 50 if (U.compareAndSwapObject(this, SLOT, null, p)) 51 // 交換成功後跳出自旋 52 break; 53 // CAS操做失敗,表示有其它線程恰好先於當前線程將數據設置到交換區slot 54 // 將當前線程變量中的item設置爲null,而後自旋獲取其它線程存放在交換區slot的數據 55 p.item = null; 56 } 57 } 58 // 執行到這裏表示當前線程已將須要的交換的數據放置於交換區slot中了, 59 // 等待其它線程交換數據而後喚醒當前線程 60 // await release 61 int h = p.hash; 62 long end = timed ? System.nanoTime() + ns : 0L; 63 // 自旋次數 64 int spins = (NCPU > 1) ? SPINS : 1; 65 Object v; 66 // 自旋等待直到p.match不爲null,也就是說等待其它線程將須要交換的數據放置於交換區slot 67 while ((v = p.match) == null) { 68 // 下面的邏輯主要是自旋等待,直到spins遞減到0爲止 69 if (spins > 0) { 70 h ^= h << 1; h ^= h >>> 3; h ^= h << 10; 71 if (h == 0) 72 h = SPINS | (int)t.getId(); 73 else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0) 74 Thread.yield(); 75 } 76 // slot 和 p本應該是相等,除非其它線程執行了第16行代碼中的CAS操做將slot置爲null, 77 // 還將來得及設置match的值,此時只須要自旋等待第21行代碼被其它線程執行, 78 // 這樣p.match纔會不爲null跳出循環 79 else if (slot != p) 80 spins = SPINS; 81 // 此處表示未設置超時或者時間未超時 82 else if (!t.isInterrupted() && arena == null && 83 (!timed || (ns = end - System.nanoTime()) > 0L)) { 84 // 設置線程t被當前對象阻塞 85 U.putObject(t, BLOCKER, this); 86 // 給p掛機線程的值賦值 87 p.parked = t; 88 if (slot == p) 89 // 若是slot尚未被置爲null,也就表示暫未有線程過來交換數據,須要將當前線程掛起 90 U.park(false, ns); 91 // 線程被喚醒,將被掛起的線程設置爲null 92 p.parked = null; 93 // 設置線程t未被任何對象阻塞 94 U.putObject(t, BLOCKER, null); 95 } 96 // 不是以上條件時(多是arena已不爲null或者超時) 97 else if (U.compareAndSwapObject(this, SLOT, p, null)) { 98 // arena不爲null則v爲null,其它爲超時則v爲超市對象TIMED_OUT,而且跳出循環 99 v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null; 100 break; 101 } 102 } 103 // 取走match值,並將p中的match置爲null 104 U.putOrderedObject(p, MATCH, null); 105 // 設置item爲null 106 p.item = null; 107 p.hash = h; 108 // 返回交換值 109 return v; 110 }
再來看arenaExchange方法,此方法被執行時表示多個線程進入交換區交換數據,arena數組已被初始化,此方法中的一些處理方式和slotExchange比較相似,它是經過遍歷arena數組找到須要交換的數據
// timed 爲true表示設置了超時時間,ns爲>0的值,反之沒有設置超時時間 private final Object arenaExchange(Object item, boolean timed, long ns) { Node[] a = arena; // 獲取當前線程中的存放的node Node p = participant.get(); //index初始值0 for (int i = p.index;;) { // access slot at i // 遍歷,若是在數組中找到數據則直接交換並喚醒線程, // 如未找到則將須要交換給其它線程的數據放置於數組中 int b, m, c; long j; // j is raw array offset // 其實這裏就是向右遍歷數組,只是用到了元素在內存偏移的偏移量 // q實際爲arena數組偏移(i + 1) * 128個地址位上的node Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE); // 若是q不爲null,而且CAS操做成功,將下標j的元素置爲null if (q != null && U.compareAndSwapObject(a, j, q, null)) { // 表示當前線程已發現有交換的數據,而後獲取數據,喚醒等待的線程 Object v = q.item; // release q.match = item; Thread w = q.parked; if (w != null) U.unpark(w); return v; } // q 爲null 而且 i 未超過數組邊界 else if (i <= (m = (b = bound) & MMASK) && q == null) { // 將須要給其它線程的item賦予給p中的item p.item = item; // offer if (U.compareAndSwapObject(a, j, null, p)) { // 交換成功 long end = (timed && m == 0) ? System.nanoTime() + ns : 0L; Thread t = Thread.currentThread(); // wait // 自旋直到有其它線程進入,遍歷到該元素並與其交換,同時當前線程被喚醒 for (int h = p.hash, spins = SPINS;;) { Object v = p.match; if (v != null) { // 其它線程設置的須要交換的數據match不爲null // 將match設置null,item設置爲null U.putOrderedObject(p, MATCH, null); p.item = null; // clear for next use p.hash = h; return v; } else if (spins > 0) { // 遞減自旋次數 h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift if (h == 0) // initialize hash h = SPINS | (int)t.getId(); else if (h < 0 && // approx 50% true (--spins & ((SPINS >>> 1) - 1)) == 0) Thread.yield(); // two yields per wait } else if (U.getObjectVolatile(a, j) != p) // 和slotExchange方法中的相似,arena數組中的數據已被CAS設置 // match值還未設置,讓其再自旋會等待match被設置 spins = SPINS; // releaser hasn't set match yet else if (!t.isInterrupted() && m == 0 && (!timed || (ns = end - System.nanoTime()) > 0L)) { // 設置線程t被當前對象阻塞 U.putObject(t, BLOCKER, this); // emulate LockSupport // 線程t賦值 p.parked = t; // minimize window if (U.getObjectVolatile(a, j) == p) // 數組中對象還相等,表示線程還未被喚醒,喚醒線程 U.park(false, ns); p.parked = null; // 設置線程t未被任何對象阻塞 U.putObject(t, BLOCKER, null); } else if (U.getObjectVolatile(a, j) == p && U.compareAndSwapObject(a, j, p, null)) { // 這裏給bound增長加一個SEQ if (m != 0) // try to shrink U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1); p.item = null; p.hash = h; i = p.index >>>= 1; // descend if (Thread.interrupted()) return null; if (timed && m == 0 && ns <= 0L) return TIMED_OUT; break; // expired; restart } } } else // 交換失敗,表示有其它線程更改了arena數組中下標i的元素 p.item = null; // clear offer } else { // 此時表示下標不在bound & MMASK或q不爲null但CAS操做失敗 // 須要更新bound變化後的值 if (p.bound != b) { // stale; reset p.bound = b; p.collides = 0; // 反向遍歷 i = (i != m || m == 0) ? m : m - 1; } else if ((c = p.collides) < m || m == FULL || !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) { // 記錄CAS失敗的次數 p.collides = c + 1; // 循環遍歷 i = (i == 0) ? m : i - 1; // cyclically traverse } else // 此時表示bound值增長了SEQ+1 i = m + 1; // grow // 設置下標 p.index = i; } } }
總結
讀到這裏是否是仍是感受有不少疑問?
1.先看爲何ASHIFT設置成7,這是爲了儘可能避免slot數組中不一樣的元素在同一個緩存行上,<< ASHIFT 左移7位,表示至少移動了128地址位,而咱們主流的緩存行大小通常爲32字節到256字節,因此128個地址位基本覆蓋到了常見的處理器平臺。arena數組中元素的分佈如圖,它們之間間隔128個整數倍地址位,也就是說最小相差128個地址位。
2.爲何Node類用@sun.misc.Contended註解呢?該註解是jdk8新增的註解,是爲了解決以前手動填充數據的問題。填充數據也是爲了不arena數組中的不一樣的元素共享同一個緩存行,致使多線程修改數據時性能受到影響。
若是想學習Java工程化、高性能及分佈式、深刻淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友能夠加個人Java高級交流:854630135,羣裏有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給你們。