併發編程之Exchanger原理與使用

點贊再看,養成習慣,公衆號搜一搜【一角錢技術】關注更多原創技術文章。本文 GitHub org_hejianhui/JavaStudy 已收錄,有個人系列文章。java

前言

在JUC包中,除了一些經常使用的或者說常見的併發工具類(ReentrantLock,CountDownLatch,CyclicBarrier,Semaphore)等,還有一個不經常使用的線程同步器類 —— Exchanger。node

Exchanger是適用在兩個線程之間數據交換的併發工具類,它的做用是找到一個同步點,當兩個線程都執行到了同步點(exchange方法)以後(有一個沒有執行到就一直等待,也能夠設置等待超時時間),就將自身線程的數據與對方交換。git

Exchanger 是什麼?

它提供一個同步點,在這個同步點兩個線程能夠交換彼此的數據。這個兩個線程經過exchange方法交換數據,若是第一個線程先執行exchange方法,它會一直等待第二個線程也執行exchange,當兩個線程都到達同步點時,這兩個線程就能夠交換數據,將本線程生產出來的數據傳遞給對方。所以使用Exchanger的中斷時成對的線程使用exchange()方法,當有一對線程到達了同步點,就會進行交換數據,所以該工具類的線程對象是成對的。github

線程能夠在成對內配對和交換元素的同步點。每一個線程在輸入exchange方法時提供一些對象,與合做者線程匹配,並在返回時接收其合做夥伴的對象。交換器能夠被視爲一個的雙向形式的SynchroniuzedQueue。交換器在諸如遺傳算法和管道設計的應用中多是有用的。web

一個用於兩個工做線程之間交換數據的封裝工具類,簡單說就是一個線程在完成必定事物後想與另外一個線程交換數據,則第一個先拿出數據的線程會一直等待第二個線程,直到第二個線程拿着數據到來時才能彼此交換對應數據。 **算法

Exchanger 用法

  • Exchanger 泛型類型,其中V表示可交換的數據類型
  • V exchanger(V v):等待另外一個線程到達此交換點(除非當前線程被中斷),而後將給定的對象傳送該線程,並接收該線程的對象。
  • V exchanger(V v, long timeout, TimeUnit unit):等待另外一個線程到達此交換點(除非當前線程被中斷或超出類指定的等待時間),而後將給定的對象傳送給該線程,並接收該線程的對象。

應用場景

Exchanger能夠用於遺傳算法,遺傳算法裏須要選出兩我的做爲交配對象,這時候會交換兩人的數據,並使用交叉規則得出2個交配結果。數組

Exchanger也能夠用於校對工做。好比咱們須要將紙製銀流經過人工的方式錄入成電子銀行流水,爲了不錯誤,採用AB崗兩人進行錄入,錄入到Excel以後,系統須要加載這兩個Excel,並對這兩個Excel數據進行校對,看看是否錄入的一致緩存

Exchanger的典型應用場景是:一個任務在建立對象,而這些對象的生產代價很高,另外一個任務在消費這些對象。經過這種方式,能夠有更多的對象在被建立的同時被消費。markdown

案例說明

Exchanger 用於兩個線程間交換數據,固然實際參與的線程能夠不止兩個,測試用例以下:多線程

private static void test1() throws InterruptedException {
        Exchanger<String> exchanger = new Exchanger<>();
        CountDownLatch countDownLatch = new CountDownLatch(5);

        for (int i = 0; i < 5; i++) {
            new Thread(() ->  {

                try {
                    String origMsg = RandomStringUtils.randomNumeric(6);

                    // 先到達的線程會在此等待,直到有一個線程跟它交換數據或者等待超時
                    String exchangeMsg = exchanger.exchange(origMsg,5, TimeUnit.SECONDS);

                    System.out.println(Thread.currentThread().getName() + "\t origMsg:" + origMsg + "\t exchangeMsg:" + exchangeMsg);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }finally {
                    countDownLatch.countDown();
                }

            },String.valueOf(i)).start();
        }

        countDownLatch.await();
    }
複製代碼

第5個線程由於沒有匹配的線程而等待超時,輸出以下:

0	 origMsg:524053	 exchangeMsg:098544
3	 origMsg:433246	 exchangeMsg:956604
4	 origMsg:098544	 exchangeMsg:524053
1	 origMsg:956604	 exchangeMsg:433246
java.util.concurrent.TimeoutException
	at java.util.concurrent.Exchanger.exchange(Exchanger.java:626)
	at com.nuih.juc.ExchangerDemo.lambda$test1$0(ExchangerDemo.java:37)
	at java.lang.Thread.run(Thread.java:748)
複製代碼

上述測試用例是比較簡單,能夠模擬消息消費的場景來觀察Exchanger的行爲,測試用例以下:

private static void test2() throws InterruptedException {
        Exchanger<String> exchanger = new Exchanger<>();
        CountDownLatch countDownLatch = new CountDownLatch(4);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(4);

        // 生產者
        Runnable producer = new Runnable() {
            @Override
            public void run() {
                try{
                    cyclicBarrier.await();

                    for (int i = 0; i < 5; i++) {
                        String msg = RandomStringUtils.randomNumeric(6);
                        exchanger.exchange(msg,5,TimeUnit.SECONDS);
                        System.out.println(Thread.currentThread().getName() + "\t producer msg -> " + msg + " ,\t i -> " + i);
                    }

                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    countDownLatch.countDown();
                }
            }
        };

        // 消費者
        Runnable consumer = new Runnable() {
            @Override
            public void run() {
                try{
                    cyclicBarrier.await();
                    for (int i = 0; i < 5; i++) {
                        String msg = exchanger.exchange(null,5,TimeUnit.SECONDS);
                        System.out.println(Thread.currentThread().getName() + "\t consumer msg -> " + msg + ",\t" + i);
                    }

                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    countDownLatch.countDown();
                }
            }
        };

        for (int i = 0; i < 2; i++){
            new Thread(producer).start();
            new Thread(consumer).start();
        }

        countDownLatch.await();
    }
複製代碼

輸出以下,上面生產者和消費者線程數是同樣的,循環次數也是同樣的,可是仍是出現等待超時的情形:

Thread-3	 consumer msg -> null,	0
Thread-1	 consumer msg -> null,	0
Thread-1	 consumer msg -> null,	1
Thread-2	 producer msg -> 640010 ,	 i -> 0
Thread-2	 producer msg -> 733133 ,	 i -> 1
Thread-3	 consumer msg -> null,	1
Thread-3	 consumer msg -> 476520,	2
Thread-1	 consumer msg -> 640010,	2
Thread-1	 consumer msg -> null,	3
Thread-0	 producer msg -> 993414 ,	 i -> 0
Thread-0	 producer msg -> 292745 ,	 i -> 1
Thread-2	 producer msg -> 476520 ,	 i -> 2
Thread-2	 producer msg -> 408446 ,	 i -> 3
Thread-3	 consumer msg -> null,	3
Thread-1	 consumer msg -> 292745,	4
Thread-2	 producer msg -> 251971 ,	 i -> 4
Thread-0	 producer msg -> 078939 ,	 i -> 2
Thread-3	 consumer msg -> 251971,	4
java.util.concurrent.TimeoutException
	at java.util.concurrent.Exchanger.exchange(Exchanger.java:626)
	at com.nuih.juc.ExchangerDemo$1.run(ExchangerDemo.java:70)
	at java.lang.Thread.run(Thread.java:748)

Process finished with exit code 0

複製代碼

這種等待超時是機率出現的,這是爲啥

由於系統調度的不均衡和Exchanger底層的大量自旋等待致使這4個線程並非調用exchanger成功的次數並不一致。另外從輸出能夠看出,消費者線程並無像咱們想的那樣跟生產者線程一一匹配,生產者線程有時也充當來消費者線程,這是爲啥?由於Exchanger匹配時徹底不關注這個線程的角色,兩個線程之間的匹配徹底由調度決定的,即CPU同時執行來或者緊挨着執行來兩個線程,這兩個線程就匹配成功來。

源碼分析

Exchanger 類圖 其內部主要變量和方法以下:

成員屬性

// ThreadLocal變量,每一個線程都有之間的一個副本
private final Participant participant;
// 高併發下使用的,保存待匹配的Node實例
private volatile Node[] arena;
// 低併發下,arena未初始化時使用的保存待匹配的Node實例
private volatile Node slot;
// 初始值爲0,當建立arena後被負責SEQ,用來記錄arena數組的可用最大索引,
// 會隨着併發的增大而增大直到等於最大值FULL,
// 會隨着並行的線程逐一匹配成功而減小恢復成初始值
private volatile int bound;
複製代碼

還有多個表示字段偏移量的靜態屬性,經過static代碼塊初始化,以下:

// Unsafe mechanics
private static final sun.misc.Unsafe U;
private static final long BOUND;
private static final long SLOT;
private static final long MATCH;
private static final long BLOCKER;
private static final int ABASE;
static {
    int s;
    try {
        U = sun.misc.Unsafe.getUnsafe();
        Class<?> ek = Exchanger.class;
        Class<?> nk = Node.class;
        Class<?> ak = Node[].class;
        Class<?> tk = Thread.class;
        BOUND = U.objectFieldOffset
            (ek.getDeclaredField("bound"));
        SLOT = U.objectFieldOffset
            (ek.getDeclaredField("slot"));
        MATCH = U.objectFieldOffset
            (nk.getDeclaredField("match"));
        BLOCKER = U.objectFieldOffset
            (tk.getDeclaredField("parkBlocker"));
        s = U.arrayIndexScale(ak);
        // ABASE absorbs padding in front of element 0
        ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT);

    } catch (Exception e) {
        throw new Error(e);
    }
    if ((s & (s-1)) != 0 || s > (1 << ASHIFT))
        throw new Error("Unsupported array scale");
}
複製代碼

Exchanger 定義來多個靜態變量,以下:

// 初始化arena時使用, 1 << ASHIFT 是一個緩存行的大小,避免來不一樣的Node落入到同一個高速緩存行
// 這裏實際是把數組容量擴大來8倍,原來索引相鄰的兩個元素,擴容後中間隔來7個元素,從元素的起始地址上看就隔來8個元素,中間的7個都是空的,爲來避免原來相鄰的兩個元素都落入到同一個緩存行中
// 由於arena是對象數組,一個元素佔8字節,8個就是64字節
private static final int ASHIFT = 7;
// arena 數組元素的索引最大值即255
private static final int MMASK = 0xff;
// arena 數組的最大長度即256
private static final int SEQ = MMASK + 1;
// 獲取CPU核數
private static final int NCPU = Runtime.getRuntime().availableProcessors();
// 實際的數組長度,由於是線程兩兩配對的,因此最大長度是核數除以2
static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;
// 自旋等待的次數
private static final int SPINS = 1 << 10;
// 若是交換的對象是null,則返回此對象
private static final Object NULL_ITEM = new Object();
// 若是等待超時致使交換失敗,則返回此對象
private static final Object TIMED_OUT = new Object();
複製代碼

內部類

Exchanger類中有兩個內部類,一個Node,一個Participant。 Participant繼承了ThreadLocal而且重寫了其initialValue方法,返回一個Node對象。其定義以下:

@sun.misc.Contended static final class Node {
    int index;              // Arena index
    int bound;              // Last recorded value of Exchanger.bound
    int collides;           // Number of CAS failures at current bound
    int hash;               // Pseudo-random for spins
    Object item;            // This thread's current item
    volatile Object match;  // Item provided by releasing thread
    volatile Thread parked; // Set to this thread when parked, else null
}

/** The corresponding thread local class */
static final class Participant extends ThreadLocal<Node> {
    public Node initialValue() { return new Node(); }
}
複製代碼

其中Contended註解是爲類避免高速緩存行致使的僞共享問題

  • index用來記錄arena數組的索引
  • bound用於記錄上一次的Exchanger bound屬性
  • collides用於記錄在bound不變的狀況下CAS搶佔失敗的次數
  • hash是自旋等待時計算隨機數使用的
  • item表示當前線程請求交換的對象
  • match是同其它線程交換的結果,match不爲null表示交換成功
  • parked爲跟該Node關聯的處於休眠狀態的線程。

重要方法

exchange()方法

@SuppressWarnings("unchecked")
public V exchange(V x) throws InterruptedException {
    Object v;
    Object item = (x == null) ? NULL_ITEM : x; // translate null args
    if ((arena != null || // 是null就執行後面的方法
         (v = slotExchange(item, false, 0L)) == null) &&
        // 若是執行slotExchange有結果就執行後面的,不然返回
        ((Thread.interrupted() || // 非中斷則執行後面的方法
          (v = arenaExchange(item, false, 0L)) == null)))
        throw new InterruptedException();
    return (v == NULL_ITEM) ? null : (V)v;
}
複製代碼

exchange 方法的執行步驟:

  1. 若是執行 soltExchange 有結果就執行後面的 arenaExchange;
  2. 若是 slot 被佔用,就執行 arenaExchange;
  3. 返回的數據 v 是對方線程的數據項;
  4. 總結即:若是A線程先調用,那麼A的數據項存儲的 item中,則B線程的數據項存儲在 math 中;
  5. 當沒有多線程併發操做 Exchange 的時候,使用 slotExchange 就足夠了,slot 是一個 node 對象;
  6. 當出現併發了,一個 slot 就不夠了,就須要使用一個 node 數組 arena 操做了。

slotExchange()方法

slotExchange 是基於slot屬性來完成交換的,調用soltExchange方法時,若是slot屬性爲null,當前線程會將slot屬性由null修改爲當前線程的Node,若是修改失敗則下一次for循環走solt屬性不爲null的邏輯,若是修改爲功則自旋等待,自旋必定次數後經過Unsafe的park方法噹噹前線程休眠,能夠指定休眠的時間,若是沒有指定則無限期休眠直到被喚醒;不管是由於線程中斷被喚醒,等待超時被喚醒仍是其它線程unpark喚醒的,都會檢查當前線程的Node的屬性釋放爲null,若是不爲null說明交互成功,返回該對象;不然返回null或者TIME_OUT,在返回前會將item,match等屬性置爲null,保存以前自旋時計算的hash值,方便下一次調用slotExchange。

調用slotExchange方法時,若是slot屬性不爲null,則當前線程會嘗試將其修改null,若是cas修改爲功,表示當前線程與slot屬性對應的線程匹配成功,會獲取slot屬性對應Node的item屬性,將當前線程交換的對象保存到slot屬性對應的Node的match屬性,而後喚醒獲取slot屬性對應Node的waiter屬性,即處理休眠狀態的線程,至此交換完成,一樣的在返回前須要將item,match等屬性置爲null,保存以前自旋時計算的hash置,方便下一次調用slotExchange;若是cas修改slot屬性失敗,說明有其它線程也在搶佔slot,則初始化arena屬性,下一次for循環由於arena屬性不爲null,直接返回null,從而經過arenaExchange完成交換。

// arena 爲null是會調用此方法,返回null表示交換失敗
// item是交換的對象,timed表示是否等待指定的時間,爲false表示無限期等待,ns爲等待時間
private final Object slotExchange(Object item, boolean timed, long ns) {
    // 獲取當前線程關聯的participant Node
    Node p = participant.get();
    Thread t = Thread.currentThread();
    // 被中斷,返回null
    if (t.isInterrupted()) // preserve interrupt status so caller can recheck
        return null;
    
    for (Node q;;) {
        if ((q = slot) != null) { // slot 不爲null
            // 將slot置爲null,slot對應的線程與當前線程匹配成功
            if (U.compareAndSwapObject(this, SLOT, q, null)) {
                Object v = q.item;
                // 保存item,即完成交互
                q.match = item;
                // 喚醒q對應的處於休眠狀態的線程
                Thread w = q.parked;
                if (w != null)
                    U.unpark(w);
                return v;
            }
            // slot修改失敗,其它某個線程搶佔來該slot,多個線程同時調用exchange方法會觸發此邏輯
            // bound等於0表示未初始化,此處校驗避免重複初始化
            if (NCPU > 1 && bound == 0 &&
                U.compareAndSwapInt(this, BOUND, 0, SEQ))
                arena = new Node[(FULL + 2) << ASHIFT];
        }
        else if (arena != null)
            return null; // carena不爲null,經過arenaExchange交互
        else {
            // slot和arena都爲null
            p.item = item;
            // 修改slot爲p,修改爲功則終止循環
            if (U.compareAndSwapObject(this, SLOT, null, p))
                break;
            // 修改失敗則繼續for循環,將otem恢復成null
            p.item = null;
        }
    }

    // 將slot修改成p後會進入此分支
    int h = p.hash; // hash初始爲0
    long end = timed ? System.nanoTime() + ns : 0L;
    int spins = (NCPU > 1) ? SPINS : 1;
    Object v;
    // match保存着同其餘線程交換的對象,若是不爲null,說明交換成功了
    while ((v = p.match) == null) {
        // 執行自旋等待
        if (spins > 0) {
        h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
        if (h == 0)
            h = SPINS | (int)t.getId(); 初始化h
        // 只有生成的h小於0時才減小spins
        else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
            Thread.yield();
        }
        // slot被修改了,已經有匹配的線程,從新自旋,讀取屬性,由於是先修改slot再修改屬性的,二者由於CPU調度的問題可能有時間差
        else if (slot != p)
            spins = SPINS;
        // 線程沒有被中斷且arena爲null
        else if (!t.isInterrupted() && arena == null &&
                 (!timed || (ns = end - System.nanoTime()) > 0L)) {
            U.putObject(t, BLOCKER, this);
            p.parked = t;
            if (slot == p)
                U.park(false, ns);
            // 線程被喚醒,繼續下一次for循環
            // 若是是由於等待超時而被喚醒,下次for循環進入下沒的else if分支,返回TIMED_OUT
            p.parked = null;
                U.putObject(t, BLOCKER, null);
        }
        // 將slot修改爲p
        else if (U.compareAndSwapObject(this, SLOT, p, null)) {
            // timed爲flase,無限期等待,由於中斷被喚醒返回null
            // timed爲ture,由於超時被喚醒,返回TIMED_OUT,由於中斷被喚醒返回null
            v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
            break;
        }
    }
    // 修改match爲null,item爲null,保存h,下一次exchange是h就不是初始值爲0了
    U.putOrderedObject(p, MATCH, null);
    // 重置 item
    p.item = null;
    // 保留僞隨機數,供下次種子數字
    p.hash = h;
    // 返回
    return v;
}
複製代碼

總結一下上面執行的邏輯:

  • Exchange 使用了對象池的技術,將對象保存在 ThreadLocal 中,這個對象(Node)封裝了數據項,線程對象等關鍵數據;
  • 第一個線程進入的時候,會將數據放到池化對象中,並賦值給 slot 的 item,並阻塞本身(一般不會當即阻塞,而是使用 yield 自旋一下子),等待對方取值;
  • 當第二個線程進入的時候,會拿出存儲在 slot item 中的值,而後對 slot 的 match 賦值,並喚醒上次阻塞的線程;
  • 當第一個線程阻塞被喚醒後,說明對方取到值了,就獲取 slot 的 match 值,並重置 slot 的數據和池化對象的數據,並返回本身的數據;
  • 若是超時了,就返回 Time_out 對象;
  • 若是線程中斷了,就返回 null。

在該方法中,會返回 2 種結果,一是有效的 item,二是 null 要麼是線程競爭使用 slot 了,建立了 arena 數組,要麼是線程中斷了。

經過一副圖來看看具體邏輯

arenaExchange() 方法

arenaExchange是基於arena屬性完成交換的,總體邏輯比較複雜,有如下幾個要點:

  • m的初始值就是0,index的初始值也是0,兩個都是大於等於0且i不大於m,當某個線程屢次嘗試搶佔index對應數組元素的Node都失敗的情形下則嘗試將m加1,而後搶佔m加1對應的新數組元素,將其由null修改爲當前線程關聯的Node,而後自旋等待匹配;若是自旋結束,沒有匹配的線程,則將m加1對應的新數組元素從新置爲null,將m減1,而後再次for循環搶佔其餘爲null的數組元素。極端併發下m會一直增長直到達到最大值FULL爲止,達到FULL後只能經過for循環不斷嘗試與其餘線程匹配或者搶佔爲null的數組元素,而後隨着併發減小,m會一直減小到0。經過這種動態調整m的方式能夠避免過多的線程基於CAS修改同一個元素致使CAS失敗,提升匹配的效率,這種思想跟LongAdder的實現是一致的。
  • 只有當m等於0的時候纔會經過Unsafe park方法讓線程休眠,若是不等於0,即此時存在多個並行的等待匹配的線程,則主要經過自旋的方式等待其餘線程到來,這是由於交換動做自己是很快的很短暫的,經過自旋等待就可讓多個等待的線程快速的完成匹配;只有當前只剩下一個線程的時候,此時m確定等於0,短時間內沒有匹配的線程,纔會考慮經過park方法阻塞。
// 搶佔slot失敗後進入此方法,arena不爲空 
private final Object arenaExchange(Object item, boolean timed, long ns) {
    Node[] a = arena;
    Node p = participant.get();
    // index初始爲0
    for (int i = p.index;;) {                      // access slot at i
        int b, m, c; long j;                       // j is raw array offset
        // 在建立arena時,將原本的數組容量 << ASHIFT,爲了不數組元素落到了同一個高速緩存行
        // 這裏獲取真實的數組元素索引時也須要 << ASHIFR
        Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
        // 若是q不爲null,則將對應的數組元素置爲null,表示當前線程和該元素對應的線程匹配l
        if (q != null && U.compareAndSwapObject(a, j, q, null)) {
            Object v = q.item;                     // release
            q.match = item; // 保存item,交互成功
            Thread w = q.parked;
            if (w != null) // 喚醒等待的線程
                U.unpark(w);
            return v;
        }
        // q爲null 或者q不爲null,cas搶佔q失敗了
        // bound初始化時時SEQ,SEQ & MMASK 就是0,即m的初始值就0,m爲0時,i確定爲0
        else if (i <= (m = (b = bound) & MMASK) && q == null) {
            p.item = item;                         // offer
            if (U.compareAndSwapObject(a, j, null, p)) {
                long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
                Thread t = Thread.currentThread(); // wait
                for (int h = p.hash, spins = SPINS;;) {
                    Object v = p.match;
                    if (v != null) {
                        U.putOrderedObject(p, MATCH, null);
                        p.item = null;             // clear for next use
                        p.hash = h;
                        return v;
                    }
                    else if (spins > 0) {
                        h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
                        if (h == 0)                // initialize hash
                            h = SPINS | (int)t.getId();
                        else if (h < 0 &&          // approx 50% true
                                 (--spins & ((SPINS >>> 1) - 1)) == 0)
                            Thread.yield();        // two yields per wait
                    }
                    else if (U.getObjectVolatile(a, j) != p)
                        spins = SPINS;       // releaser hasn't set match yet
                    else if (!t.isInterrupted() && m == 0 &&
                             (!timed ||
                              (ns = end - System.nanoTime()) > 0L)) {
                        U.putObject(t, BLOCKER, this); // emulate LockSupport
                        p.parked = t;              // minimize window
                        if (U.getObjectVolatile(a, j) == p)
                            U.park(false, ns);
                        p.parked = null;
                        U.putObject(t, BLOCKER, null);
                    }
                    else if (U.getObjectVolatile(a, j) == p &&
                             U.compareAndSwapObject(a, j, p, null)) {
                        if (m != 0)                // try to shrink
                            U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
                        p.item = null;
                        p.hash = h;
                        i = p.index >>>= 1;        // descend
                        if (Thread.interrupted())
                            return null;
                        if (timed && m == 0 && ns <= 0L)
                            return TIMED_OUT;
                        break;                     // expired; restart
                    }
                }
            }
            else
                p.item = null;                     // clear offer
        }
        else {
            if (p.bound != b) {                    // stale; reset
                p.bound = b;
                p.collides = 0;
                i = (i != m || m == 0) ? m : m - 1;
            }
            else if ((c = p.collides) < m || m == FULL ||
                     !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
                p.collides = c + 1;
                i = (i == 0) ? m : i - 1;          // cyclically traverse
            }
            else
                i = m + 1;                         // grow
            p.index = i;
        }
    }
}
複製代碼

總結

Exchange 和 SynchronousQueue 相似,都是經過兩個線程操做同一個對象實現數據交換,只不過就像咱們開始說的,SynchronousQueue 使用的是同一個屬性,經過不一樣的 isData 來區分,多線程併發時,使用了隊列進行排隊。

Exchange 使用了一個對象裏的兩個屬性,item 和 match,就不須要 isData 屬性了,由於在 Exchange 裏面,沒有 isData 這個語義。而多線程併發時,使用數組來控制,每一個線程訪問數組中不一樣的槽。

PS:以上代碼提交在 Githubgithub.com/Niuh-Study/…

PS:這裏有一個技術交流羣(扣扣羣:1158819530),方便你們一塊兒交流,持續學習,共同進步,有須要的能夠加一下。

文章持續更新,能夠公衆號搜一搜「 一角錢技術 」第一時間閱讀, 本文 GitHub org_hejianhui/JavaStudy 已經收錄,歡迎 Star。

相關文章
相關標籤/搜索