【JDK源碼分析】同步工具Exchanger,它的內部實現原理你看懂了嗎?

前言

Exchanger應該算併發包中工具使用相對少的,由於它主要用於線程之間交換數據,它的用法比較簡單在不一樣線程之間使用exchange方法交換數據,可是內部實現比較巧妙,使用了unsafe的CAS原子操做、自旋來解決衝突問題,下面咱們經過源碼一探究竟。node

源碼

先看看源碼註釋中關於核心算法的介紹算法

for (;;) { if (slot is empty) { // slot爲空時,將item 設置到Node 中 
 place item in a Node; if (can CAS slot from empty to node) { // 當將node經過CAS交換到slot中時,掛起線程等待被喚醒
                    wait for release; // 被喚醒後返回node中匹配到的item
                    return matching item in node; } } else if (can CAS slot from node to empty) { // release // 將slot設置爲空 // 獲取node中的item,將須要交換的數據設置到匹配的item
 get the item in node; set matching item in node; // 喚醒等待的線程
 release waiting thread; } // else retry on CAS failure
        }

好比有2條線程A和B,A線程交換數據時,發現slot爲空,則將須要交換的數據放在slot中等待其它線程進來交換數據,等線程B進來,讀取A設置的數據,而後設置線程B須要交換的數據,而後喚醒A線程,原理就是這麼簡單。當時當多個線程之間進行交換數據時就會出現問題,因此Exchanger加入了slot數組。數組

Exchanger 屬性及構造器

// 用於左移Node數組下標,從而得出數據在內存中的偏移量來獲取數據,避免僞共享
    private static final int ASHIFT = 7; // note數組最大下標
    private static final int MMASK = 0xff; // 用於遞增bound,每次加一個SEQ
    private static final int SEQ = MMASK + 1; // CPU核心數
    private static final int NCPU = Runtime.getRuntime().availableProcessors(); // 當前數組最大的下標(多處理器狀況下)
    static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1; // 自旋次數,CPU核心爲1個時,自旋被禁用
    private static final int SPINS = 1 << 10; // 空對象,用於當線程exchange方法中參數爲null時傳遞給其餘線程的對象
    private static final Object NULL_ITEM = new Object(); // 用於超時時傳遞的對象
    private static final Object TIMED_OUT = new Object(); // Participant 繼承了ThreadLocal,也就是說該對象用於獲取每條線程中存放的值
    private final Participant participant; // 多個線程交換
    private volatile Node[] arena; // 用於2個線程交換
    private volatile Node slot; // 該值主要用於與
    private volatile int bound; // 經過unsafe用於CAS操做
    private static final sun.misc.Unsafe U; private static final long BOUND; private static final long SLOT; private static final long MATCH; private static final long BLOCKER; private static final int ABASE; static { int s; try { U = sun.misc.Unsafe.getUnsafe(); Class<?> ek = Exchanger.class; Class<?> nk = Node.class; Class<?> ak = Node[].class; Class<?> tk = Thread.class; // bound屬性在Exchanger對象中的偏移地址
            BOUND = U.objectFieldOffset (ek.getDeclaredField("bound")); // slot屬性在Exchanger對象中的偏移地址 
            SLOT = U.objectFieldOffset (ek.getDeclaredField("slot")); // slot屬性在Node對象中的偏移地址
            MATCH = U.objectFieldOffset (nk.getDeclaredField("match")); // parkBlocker屬性在Thread對象中的偏移地址
            BLOCKER = U.objectFieldOffset (tk.getDeclaredField("parkBlocker")); // 獲取Node[]數組中每一個元素的大小,這裏是4
            s = U.arrayIndexScale(ak); // ABASE absorbs padding in front of element 0 // 獲取Node[]數組中第一個元素的偏移地址 + 128
            ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT); } catch (Exception e) { throw new Error(e); } if ((s & (s-1)) != 0 || s > (1 << ASHIFT)) // 這裏是爲了保證 Node數組中的元素不會爭用一個緩存行
            throw new Error("Unsupported array scale"); }

構造器及內部類

public Exchanger() { participant = new Participant(); } // 內部類,用於記錄每一個線程的狀態
    static final class Participant extends ThreadLocal<Node> { public Node initialValue() { return new Node(); } } // 包含須要交換的數據等信息 // Contended爲 JDK8 新增的註解,用於避免僞共享,提升程序性能
    @sun.misc.Contended static final class Node { // arana數組中的下標
        int index; // 上一次記錄的bound 
        int bound; // cas操做失敗的次數 
        int collides; // 用於自旋的僞隨機數 
        int hash;               // Pseudo-random for spins // 當前線程須要交換的數據
        Object item;            // This thread's current item // 匹配線程交換的數據
        volatile Object match;  // Item provided by releasing thread // 記錄當前掛起的線程
        volatile Thread parked; // Set to this thread when parked, else null
    }

方法exchange

// 交換數據,參數X爲本線程提供給其它線程的數據
    public V exchange(V x) throws InterruptedException { Object v; // 當參數爲null時須要將item設置爲空的對象
        Object item = (x == null) ? NULL_ITEM : x; // translate null args // 注意到這裏的這個表達式是整個方法的核心
        if ((arena != null || (v = slotExchange(item, false, 0L)) == null) && ((Thread.interrupted() || // disambiguates null return
              (v = arenaExchange(item, false, 0L)) == null))) throw new InterruptedException(); return (v == NULL_ITEM) ? null : (V)v; }

仔細看if裏的條件表達式,得知: 
只有當arena爲null時,纔會執行slotExchange方法; 
當arena不爲null或者(arena爲null且slotExchange方法返回null)時,此時線程未中斷,纔會執行arenaExchange方法; 
線程中斷時,就會直接拋出線程中斷異常。 
下面咱們來看slotExchange方法緩存

 1     // timed 爲true表示設置了超時時間,ns爲>0的值,反之沒有設置超時時間
 2     private final Object slotExchange(Object item, boolean timed, long ns) {  3         // 獲取當前線程node對象
 4         Node p = participant.get();  5         Thread t = Thread.currentThread();  6         if (t.isInterrupted()) // preserve interrupt status so caller can recheck  7             // 線程中斷返回null
 8             return null;  9         
 10         // 自旋
 11         for (Node q;;) {  12             // 將slot值賦給q
 13             if ((q = slot) != null) {  14                 // slot 不爲null,即表示已有線程已經把須要交換的數據設置在slot中了  15                 // 經過CAS將slot設置成null
 16                 if (U.compareAndSwapObject(this, SLOT, q, null)) {  17                     // CAS操做成功後,將slot中的item賦值給對象v,以便返回。  18                     // 這裏也是就讀取以前線程要交換的數據
 19                     Object v = q.item;  20                     // 將當前線程須要交給的數據設置在q中的match
 21                     q.match = item;  22                     // 獲取被掛起的線程
 23                     Thread w = q.parked;  24                     if (w != null)  25                         // 若是線程不爲null,喚醒它
 26  U.unpark(w);  27                     // 返回其餘線程給的V
 28                     return v;  29  }  30                 // CAS 操做失敗,表示有其它線程競爭,在此線程以前將數據已取走  31                 // create arena on contention, but continue until slot null
 32                 if (NCPU > 1 && bound == 0 &&
 33                     U.compareAndSwapInt(this, BOUND, 0, SEQ))  34                     // CPU爲多核心  35                     // bound == 0 表示arena數組未初始化過,CAS操做bound將其增長SEQ  36                     // 初始化arena數組
 37                     arena = new Node[(FULL + 2) << ASHIFT];  38  }  39             // 上面分析過,只有當arena纔會執行slotExchange方法的  40             // 因此表示恰好已有其它線程加入進來將arena初始化
 41             else if (arena != null)  42                 // 這裏就須要去執行arenaExchange
 43                 return null; // caller must reroute to arenaExchange
 44             else {  45                 // 這裏表示當前線程是以第一個線程進來交換數據  46                 // 或者表示以前的數據交換已進行完畢,這裏能夠看做是第一個線程  47                 // 將須要交換的數據先存放在當前線程變量p中
 48                 p.item = item;  49                 // 將須要交換的數據經過CAS設置到交換區slot
 50                 if (U.compareAndSwapObject(this, SLOT, null, p))  51                     // 交換成功後跳出自旋
 52                     break;  53                 // CAS操做失敗,表示有其它線程恰好先於當前線程將數據設置到交換區slot  54                 // 將當前線程變量中的item設置爲null,而後自旋獲取其它線程存放在交換區slot的數據
 55                 p.item = null;  56  }  57  }  58         // 執行到這裏表示當前線程已將須要的交換的數據放置於交換區slot中了,  59         // 等待其它線程交換數據而後喚醒當前線程  60         // await release
 61         int h = p.hash;  62         long end = timed ? System.nanoTime() + ns : 0L;  63         // 自旋次數
 64         int spins = (NCPU > 1) ? SPINS : 1;  65  Object v;  66         // 自旋等待直到p.match不爲null,也就是說等待其它線程將須要交換的數據放置於交換區slot
 67         while ((v = p.match) == null) {  68             // 下面的邏輯主要是自旋等待,直到spins遞減到0爲止
 69             if (spins > 0) {  70                 h ^= h << 1; h ^= h >>> 3; h ^= h << 10;  71                 if (h == 0)  72                     h = SPINS | (int)t.getId();  73                 else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)  74  Thread.yield();  75  }  76             // slot 和 p本應該是相等,除非其它線程執行了第16行代碼中的CAS操做將slot置爲null,  77             // 還將來得及設置match的值,此時只須要自旋等待第21行代碼被其它線程執行,  78             // 這樣p.match纔會不爲null跳出循環
 79             else if (slot != p)  80                 spins = SPINS;  81             // 此處表示未設置超時或者時間未超時
 82             else if (!t.isInterrupted() && arena == null &&
 83                      (!timed || (ns = end - System.nanoTime()) > 0L)) {  84                 // 設置線程t被當前對象阻塞
 85                 U.putObject(t, BLOCKER, this);  86                 // 給p掛機線程的值賦值
 87                 p.parked = t;  88                 if (slot == p)  89                     // 若是slot尚未被置爲null,也就表示暫未有線程過來交換數據,須要將當前線程掛起
 90                     U.park(false, ns);  91                 // 線程被喚醒,將被掛起的線程設置爲null
 92                 p.parked = null;  93                 // 設置線程t未被任何對象阻塞
 94                 U.putObject(t, BLOCKER, null);  95  }  96             // 不是以上條件時(多是arena已不爲null或者超時)
 97             else if (U.compareAndSwapObject(this, SLOT, p, null)) {  98                 // arena不爲null則v爲null,其它爲超時則v爲超市對象TIMED_OUT,而且跳出循環
 99                 v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null; 100                 break; 101  } 102  } 103         // 取走match值,並將p中的match置爲null
104         U.putOrderedObject(p, MATCH, null); 105         // 設置item爲null
106         p.item = null; 107         p.hash = h; 108         // 返回交換值
109         return v; 110     }

再來看arenaExchange方法,此方法被執行時表示多個線程進入交換區交換數據,arena數組已被初始化,此方法中的一些處理方式和slotExchange比較相似,它是經過遍歷arena數組找到須要交換的數據markdown

// timed 爲true表示設置了超時時間,ns爲>0的值,反之沒有設置超時時間
    private final Object arenaExchange(Object item, boolean timed, long ns) { Node[] a = arena; // 獲取當前線程中的存放的node
        Node p = participant.get(); //index初始值0
        for (int i = p.index;;) {                      // access slot at i // 遍歷,若是在數組中找到數據則直接交換並喚醒線程, // 如未找到則將須要交換給其它線程的數據放置於數組中
            int b, m, c; long j;                       // j is raw array offset // 其實這裏就是向右遍歷數組,只是用到了元素在內存偏移的偏移量 // q實際爲arena數組偏移(i + 1) * 128個地址位上的node
            Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE); // 若是q不爲null,而且CAS操做成功,將下標j的元素置爲null
            if (q != null && U.compareAndSwapObject(a, j, q, null)) { // 表示當前線程已發現有交換的數據,而後獲取數據,喚醒等待的線程
                Object v = q.item;                     // release
                q.match = item; Thread w = q.parked; if (w != null) U.unpark(w); return v; } // q 爲null 而且 i 未超過數組邊界
            else if (i <= (m = (b = bound) & MMASK) && q == null) { // 將須要給其它線程的item賦予給p中的item
                p.item = item;                         // offer
                if (U.compareAndSwapObject(a, j, null, p)) { // 交換成功
                    long end = (timed && m == 0) ? System.nanoTime() + ns : 0L; Thread t = Thread.currentThread(); // wait // 自旋直到有其它線程進入,遍歷到該元素並與其交換,同時當前線程被喚醒
                    for (int h = p.hash, spins = SPINS;;) { Object v = p.match; if (v != null) { // 其它線程設置的須要交換的數據match不爲null // 將match設置null,item設置爲null
                            U.putOrderedObject(p, MATCH, null); p.item = null;             // clear for next use
                            p.hash = h; return v; } else if (spins > 0) { // 遞減自旋次數
                            h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
                            if (h == 0)                // initialize hash
                                h = SPINS | (int)t.getId(); else if (h < 0 &&          // approx 50% true
                                     (--spins & ((SPINS >>> 1) - 1)) == 0) Thread.yield(); // two yields per wait
 } else if (U.getObjectVolatile(a, j) != p) // 和slotExchange方法中的相似,arena數組中的數據已被CAS設置 // match值還未設置,讓其再自旋會等待match被設置
                            spins = SPINS;       // releaser hasn't set match yet
                        else if (!t.isInterrupted() && m == 0 && (!timed || (ns = end - System.nanoTime()) > 0L)) { // 設置線程t被當前對象阻塞
                            U.putObject(t, BLOCKER, this); // emulate LockSupport // 線程t賦值
                            p.parked = t;              // minimize window
                            if (U.getObjectVolatile(a, j) == p) // 數組中對象還相等,表示線程還未被喚醒,喚醒線程
                                U.park(false, ns); p.parked = null; // 設置線程t未被任何對象阻塞
                            U.putObject(t, BLOCKER, null); } else if (U.getObjectVolatile(a, j) == p && U.compareAndSwapObject(a, j, p, null)) { // 這裏給bound增長加一個SEQ
                            if (m != 0)                // try to shrink
                                U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1); p.item = null; p.hash = h; i = p.index >>>= 1;        // descend
                            if (Thread.interrupted()) return null; if (timed && m == 0 && ns <= 0L) return TIMED_OUT; break;                     // expired; restart
 } } } else
                    // 交換失敗,表示有其它線程更改了arena數組中下標i的元素
                    p.item = null;                     // clear offer
 } else { // 此時表示下標不在bound & MMASK或q不爲null但CAS操做失敗 // 須要更新bound變化後的值
                if (p.bound != b) {                    // stale; reset
                    p.bound = b; p.collides = 0; // 反向遍歷
                    i = (i != m || m == 0) ? m : m - 1; } else if ((c = p.collides) < m || m == FULL ||
                         !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) { // 記錄CAS失敗的次數
                    p.collides = c + 1; // 循環遍歷
                    i = (i == 0) ? m : i - 1;          // cyclically traverse
 } else
                    // 此時表示bound值增長了SEQ+1
                    i = m + 1;                         // grow // 設置下標
                p.index = i; } } }

總結

讀到這裏是否是仍是感受有不少疑問?多線程

  1. 先看爲何ASHIFT設置成7,這是爲了儘可能避免slot數組中不一樣的元素在同一個緩存行上,<< ASHIFT 左移7位,表示至少移動了128地址位,而咱們主流的緩存行大小通常爲32字節到256字節,因此128個地址位基本覆蓋到了常見的處理器平臺。arena數組中元素的分佈如圖,它們之間間隔128個整數倍地址位,也就是說最小相差128個地址位。 
    arena數組
  2. 爲何Node類用@sun.misc.Contended註解呢?該註解是jdk8新增的註解,是爲了解決以前手動填充數據的問題。填充數據也是爲了不arena數組中的不一樣的元素共享同一個緩存行,致使多線程修改數據時性能受到影響。

參考: 
僞共享(False Sharing)併發

相關文章
相關標籤/搜索