Exchanger

本文參考羣主的博客http://cmsblogs.com/?p=2269java

Java 併發 API 提供了一種容許2個併發任務間相互交換數據的同步應用。更具體的說,Exchanger 類容許在2個線程間定義同步點,當2個線程到達這個點,他們相互交換數據類型,使用第一個線程的數據類型變成第二個的,而後第二個線程的數據類型變成第一個的。node

package com;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Exchanger;

class Producer implements Runnable
{  
  
    // 要被相互交換的數據類型。  
    private List<String> buffer;  
  
    // 用來同步 producer和consumer  
    private final Exchanger<List<String>> exchanger;  
  
    public Producer(List<String> buffer, Exchanger<List<String>> exchanger)  
    {  
        this.buffer = buffer;  
        this.exchanger = exchanger;  
    }  
  
    public void run()  
    {  
        // 實現10次交換  
        for (int i = 0; i < 10; i++)
        {  
            buffer.add("第" + i + "次生產者的數據" + i);  
            try  
            {  
                // 調用exchange方法來與consumer交換數據  
                System.out.println("第" + i + "次生產者在等待.....");  
                buffer = exchanger.exchange(buffer);  
                System.out.println("第" + i + "次生產者交換後的數據:" + buffer.get(i));  
            }  
            catch (InterruptedException e)  
            {  
                e.printStackTrace();  
            }  
        }  
    }  
}  
  
  
class Consumer implements Runnable
{  
    // 用來相互交換  
    private List<String> buffer;  
  
    // 用來同步 producer和consumer  
    private final Exchanger<List<String>> exchanger;  
  
    public Consumer(List<String> buffer, Exchanger<List<String>> exchanger)  
    {  
        this.buffer = buffer;  
        this.exchanger = exchanger;  
    }  
  
    public void run()  
    {  
        // 實現10次交換  
        for (int i = 0; i < 10; i++)  
        {  
            buffer.add("第" + i + "次消費者的數據" + i);  
            try  
            {  
                // 調用exchange方法來與consumer交換數據  
                System.out.println("第" + i + "次消費者在等待.....");  
                buffer = exchanger.exchange(buffer);  
                System.out.println("第" + i + "次消費者交換後的數據:" + buffer.get(i));  
            }  
            catch (InterruptedException e)  
            {  
                e.printStackTrace();  
            }  
        }  
    }  
}  
  
//主類
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Exchanger;

public class Core
{
    public static void main(String[] args)
    {
        // 建立2個buffers,分別給producer和consumer使用
        List<String> buffer1 = new ArrayList<String>();
        List<String> buffer2 = new ArrayList<String>();

        // 建立Exchanger對象,用來同步producer和consumer
        Exchanger<List<String>> exchanger = new Exchanger<List<String>>();

        // 建立Producer對象和Consumer對象
        Producer producer = new Producer(buffer1, exchanger);
        Consumer consumer = new Consumer(buffer2, exchanger);

        // 建立線程來執行producer和consumer並開始線程
        Thread threadProducer = new Thread(producer);
        Thread threadConsumer = new Thread(consumer);
        threadProducer.start();
        threadConsumer.start();
    }
}

在Exchanger中,若是一個線程已經到達了exchanger節點時,對於它的夥伴節點的狀況有三種:算法

  1. 若是它的夥伴節點在該線程到達以前已經調用了exchanger方法,則它會喚醒它的夥伴而後進行數據交換,獲得各自數據返回。
  2. 若是它的夥伴節點尚未到達交換點,則該線程將會被掛起,等待它的夥伴節點到達被喚醒,完成數據交換。
  3. 若是當前線程被中斷了則拋出異常,或者等待超時了,則拋出超時異常。

Exchanger算法的核心是經過一個可交換數據的slot,以及一個能夠帶有數據item的參與者。數組

for (;;) {
        if (slot is empty) {                       // offer
          place item in a Node;
          if (can CAS slot from empty to node) {
            wait for release;
            return matching item in node;
          }
        }
        else if (can CAS slot from node to empty) { // release
          get the item in node;
          set matching item in node;
          release waiting thread;
        }
        // else retry on CAS failure
      }

Exchanger中定義了以下幾個重要的成員變量:併發

private final Participant participant;
private volatile Node[] arena;
private volatile Node slot;

participant的做用是爲每一個線程保留惟一的一個Node節點。app

slot爲單個槽,arena爲數組槽。他們都是Node類型。在這裏可能會感受到疑惑,slot做爲Exchanger交換數據的場景,應該只須要一個就能夠了啊?爲什麼還多了一個Participant 和數組類型的arena呢?一個slot交換場所原則上來講應該是能夠的,但實際狀況卻不是如此,多個參與者使用同一個交換場所時,會存在嚴重伸縮性問題。既然單個交換場所存在問題,那麼咱們就安排多個,也就是數組arena。經過數組arena來安排不一樣的線程使用不一樣的slot來下降競爭問題,而且能夠保證最終必定會成對交換數據。可是Exchanger不是一來就會生成arena數組來下降競爭,只有當產生競爭是纔會生成arena數組。那麼怎麼將Node與當前線程綁定呢?Participant ,Participant 的做用就是爲每一個線程保留惟一的一個Node節點,它繼承ThreadLocal,同時在Node節點中記錄在arena中的下標index。ide

Node定義以下:this

 @sun.misc.Contended static final class Node {
        int index;              // arena的下標;
        int bound;              // 上一次記錄的Exchanger.bound
        int collides;           // 在當前bound下CAS失敗的次數
        int hash;               // 僞隨機數,用於自旋;
        Object item;            // 這個線程的當前項,也就是須要交換的數據;
        volatile Object match;  // 作releasing操做的線程傳遞的項;
        volatile Thread parked; //掛起時設置線程值,其餘狀況下爲null;
    }

exchange(V x)

exchange(V x):等待另外一個線程到達此交換點(除非當前線程被中斷),而後將給定的對象傳送給該線程,並接收該線程的對象。spa

 public V exchange(V x) throws InterruptedException {
        Object v;
        Object item = (x == null) ? NULL_ITEM : x; // translate null args
        if ((arena != null ||
             (v = slotExchange(item, false, 0L)) == null) &&
            ((Thread.interrupted() || // disambiguates null return
              (v = arenaExchange(item, false, 0L)) == null)))
            throw new InterruptedException();
        return (v == NULL_ITEM) ? null : (V)v;
    }

這個方法比較好理解:arena爲數組槽,若是爲null,則執行slotExchange()方法,不然判斷線程是否中斷,若是中斷值拋出InterruptedException異常,沒有中斷則執行arenaExchange()方法。整套邏輯就是:若是slotExchange(Object item, boolean timed, long ns)方法執行失敗了就執行arenaExchange(Object item, boolean timed, long ns)方法,最後返回結果V。線程

NULL_ITEM 爲一個空節點,其實就是一個Object對象而已,slotExchange()爲單個slot交換。

slotExchange(Object item, boolean timed, long ns)

private final Object slotExchange(Object item, boolean timed, long ns) {
        // 獲取當前線程的節點 p
        Node p = participant.get();
        // 當前線程
        Thread t = Thread.currentThread();
        // 線程中斷,直接返回
        if (t.isInterrupted())
            return null;
        // 自旋
        for (Node q;;) {
            //slot != null
            if ((q = slot) != null) {
                //嘗試CAS替換
                if (U.compareAndSwapObject(this, SLOT, q, null)) {
                    Object v = q.item;      // 當前線程的項,也就是交換的數據
                    q.match = item;         // 作releasing操做的線程傳遞的項
                    Thread w = q.parked;    // 掛起時設置線程值
                    // 掛起線程不爲null,線程掛起
                    if (w != null)
                        U.unpark(w);
                    return v;
                }
                //若是失敗了,則建立arena
                //bound 則是上次Exchanger.bound
                if (NCPU > 1 && bound == 0 &&
                        U.compareAndSwapInt(this, BOUND, 0, SEQ))
                    arena = new Node[(FULL + 2) << ASHIFT];
            }
            //若是arena != null,直接返回,進入arenaExchange邏輯處理
            else if (arena != null)
                return null;
            else {
                p.item = item;
                if (U.compareAndSwapObject(this, SLOT, null, p))
                    break;
                p.item = null;
            }
        }

        /*
         * 等待 release
         * 進入spin+block模式
         */
        int h = p.hash;
        long end = timed ? System.nanoTime() + ns : 0L;
        int spins = (NCPU > 1) ? SPINS : 1;
        Object v;
        while ((v = p.match) == null) {
            if (spins > 0) {
                h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
                if (h == 0)
                    h = SPINS | (int)t.getId();
                else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
                    Thread.yield();
            }
            else if (slot != p)
                spins = SPINS;
            else if (!t.isInterrupted() && arena == null &&
                    (!timed || (ns = end - System.nanoTime()) > 0L)) {
                U.putObject(t, BLOCKER, this);
                p.parked = t;
                if (slot == p)
                    U.park(false, ns);
                p.parked = null;
                U.putObject(t, BLOCKER, null);
            }
            else if (U.compareAndSwapObject(this, SLOT, p, null)) {
                v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
                break;
            }
        }
        U.putOrderedObject(p, MATCH, null);
        p.item = null;
        p.hash = h;
        return v;
    }

程序首先經過participant獲取當前線程節點Node。檢測是否中斷,若是中斷return null,等待後續拋出InterruptedException異常。

若是slot不爲null,則進行slot消除,成功直接返回數據V,不然失敗,則建立arena消除數組。

若是slot爲null,但arena不爲null,則返回null,進入arenaExchange邏輯。

若是slot爲null,且arena也爲null,則嘗試佔領該slot,失敗重試,成功則跳出循環進入spin+block(自旋+阻塞)模式。

在自旋+阻塞模式中,首先取得結束時間和自旋次數。若是match(作releasing操做的線程傳遞的項)爲null,其首先嚐試spins+隨機次自旋(改自旋使用當前節點中的hash,並改變之)和退讓。當自旋數爲0後,假如slot發生了改變(slot != p)則重置自旋數並重試。不然假如:當前未中斷&arena爲null&(當前不是限時版本或者限時版本+當前時間未結束):阻塞或者限時阻塞。假如:當前中斷或者arena不爲null或者當前爲限時版本+時間已經結束:不限時版本:置v爲null;限時版本:若是時間結束以及未中斷則TIMED_OUT;不然給出null(緣由是探測到arena非空或者當前線程中斷)。

match不爲空時跳出循環。

arenaExchange(Object item, boolean timed, long ns)

 private final Object arenaExchange(Object item, boolean timed, long ns) {
        Node[] a = arena;
        Node p = participant.get();
        for (int i = p.index;;) {                      // access slot at i
            int b, m, c; long j;                       // j is raw array offset
            Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
            if (q != null && U.compareAndSwapObject(a, j, q, null)) {
                Object v = q.item;                     // release
                q.match = item;
                Thread w = q.parked;
                if (w != null)
                    U.unpark(w);
                return v;
            }
            else if (i <= (m = (b = bound) & MMASK) && q == null) {
                p.item = item;                         // offer
                if (U.compareAndSwapObject(a, j, null, p)) {
                    long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
                    Thread t = Thread.currentThread(); // wait
                    for (int h = p.hash, spins = SPINS;;) {
                        Object v = p.match;
                        if (v != null) {
                            U.putOrderedObject(p, MATCH, null);
                            p.item = null;             // clear for next use
                            p.hash = h;
                            return v;
                        }
                        else if (spins > 0) {
                            h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
                            if (h == 0)                // initialize hash
                                h = SPINS | (int)t.getId();
                            else if (h < 0 &&          // approx 50% true
                                     (--spins & ((SPINS >>> 1) - 1)) == 0)
                                Thread.yield();        // two yields per wait
                        }
                        else if (U.getObjectVolatile(a, j) != p)
                            spins = SPINS;       // releaser hasn't set match yet
                        else if (!t.isInterrupted() && m == 0 &&
                                 (!timed ||
                                  (ns = end - System.nanoTime()) > 0L)) {
                            U.putObject(t, BLOCKER, this); // emulate LockSupport
                            p.parked = t;              // minimize window
                            if (U.getObjectVolatile(a, j) == p)
                                U.park(false, ns);
                            p.parked = null;
                            U.putObject(t, BLOCKER, null);
                        }
                        else if (U.getObjectVolatile(a, j) == p &&
                                 U.compareAndSwapObject(a, j, p, null)) {
                            if (m != 0)                // try to shrink
                                U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
                            p.item = null;
                            p.hash = h;
                            i = p.index >>>= 1;        // descend
                            if (Thread.interrupted())
                                return null;
                            if (timed && m == 0 && ns <= 0L)
                                return TIMED_OUT;
                            break;                     // expired; restart
                        }
                    }
                }
                else
                    p.item = null;                     // clear offer
            }
            else {
                if (p.bound != b) {                    // stale; reset
                    p.bound = b;
                    p.collides = 0;
                    i = (i != m || m == 0) ? m : m - 1;
                }
                else if ((c = p.collides) < m || m == FULL ||
                         !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
                    p.collides = c + 1;
                    i = (i == 0) ? m : i - 1;          // cyclically traverse
                }
                else
                    i = m + 1;                         // grow
                p.index = i;
            }
        }
    }

首先經過participant取得當前節點Node,而後根據當前節點Node的index去取arena中相對應的節點node。前面提到過arena能夠確保不一樣的slot在arena中是不會相沖突的,那麼是怎麼保證的呢?

arena = new Node[(FULL + 2) << ASHIFT];

取得arena中的node節點後,若是定位的節點q 不爲空,且CAS操做成功,則交換數據,返回交換的數據,喚醒等待的線程。

若是q等於null且下標在bound & MMASK範圍以內,則嘗試佔領該位置,若是成功,則採用自旋 + 阻塞的方式進行等待交換數據。

若是下標不在bound & MMASK範圍以內獲取因爲q不爲null可是競爭失敗的時候:消除p。加入bound 不等於當前節點的bond(b != p.bound),則更新p.bound = b,collides = 0 ,i = m或者m – 1。若是衝突的次數不到m 獲取m 已經爲最大值或者修改當前bound的值失敗,則經過增長一次collides以及循環遞減下標i的值;不然更新當前bound的值成功:咱們令i爲m+1即爲此時最大的下標。最後更新當前index的值。

Exchanger使用、原理都比較好理解,可是這個源碼看起來真心有點兒複雜,是真心難看懂,可是這種交換的思路Doug Lea在後續博文中還會提到,例如SynchronousQueue、LinkedTransferQueue。

    其實就是」我」和」你」(可能有多個」我」,多個」你」)在一個叫Slot的地方作交易(一手交錢,一手交貨),過程分如下步驟:

    1. 我先到一個叫作Slot的交易場所交易,發現你已經到了,那我就嘗試喊你交易,若是你迴應了我,決定和我交易那麼進入第2步;若是別人搶先一步把你喊走了,那我就進入第5步。
    2. 我拿出錢交給你,你可能會接收個人錢,而後把貨給我,交易結束;也可能嫌我掏錢太慢(超時)或者接個電話(中斷),TM的不賣了,走了,那我只能再找別人買貨了(從頭開始)。
    3. 我到交易地點的時候,你不在,那我先嚐試把這個交易點給佔了(一屁股作凳子上…),若是我成功搶佔了單間(交易點),那就坐這兒等着你拿貨來交易,進入第4步;若是被別人搶座了,那我只能在找別的地方兒了,進入第5步。
    4. 你拿着貨來了,喊我交易,而後完成交易;也可能我等了好長時間你都沒來,我不等了,繼續找別人交易去,走的時候我看了一眼,一共沒多少人,弄了這麼多單間(交易地點Slot),太TM浪費了,我喊來交易地點管理員:一共也沒幾我的,搞這麼多單間兒幹毛,給哥撤一個!。而後再找別人買貨(從頭開始);或者我老大給我打了個電話,不讓我買貨了(中斷)。
    5. 我跑去喊管理員,尼瑪,就一個坑交易個毛啊,而後管理在一個更加開闊的地方開闢了好多個單間,而後我就挨個來看每一個單間是否有人。若是有人我就問他是否能夠交易,若是迴應了我,那我就進入第2步。若是我沒有人,那我就佔着這個單間等其餘人來交易,進入第4步。
    6. 若是我嘗試了幾回都沒有成功,我就會認爲,是否是我TM選的這個單間風水很差?不行,得換個地兒繼續(從頭開始);若是我嘗試了屢次發現尚未成功,怒了,把管理員喊來:給哥再開一個單間(Slot),加一個凳子,這麼多人就這麼幾個破凳子夠誰用!
相關文章
相關標籤/搜索