java.util.Concurrent.Exchanger 源碼

類圖

 

    Exchanger 是一個用於線程間協做的工具類,Exchanger用於進行線程間的數據交換,它提供一個同步點,在這個同步點,兩個線程能夠交換彼此的數據。這兩個線程經過exchange 方法交換數據,若是第一個線程先執行exchange 方法,它會一直等待第二個線程也執行exchange 方法,當兩個線程都到達同步點時,這兩個線程就能夠交換數據。html

 

源碼java

package java.util.concurrent;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;


public class Exchanger<V> {

    private static final int ASHIFT = 7;

    private static final int MMASK = 0xff;

    private static final int SEQ = MMASK + 1;

    private static final int NCPU = Runtime.getRuntime().availableProcessors();

    static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;

    private static final int SPINS = 1 << 10;// 自旋次數

    private static final Object NULL_ITEM = new Object();//若是交換的數據爲null,則用NULL_ITEM代替

    private static final Object TIMED_OUT = new Object();

    private final Participant participant;//每一個線程的數據,ThreadLocal 子類

    private volatile Node[] arena;

    private volatile Node slot;// 用於交換數據的槽位

    private volatile int bound;


    @sun.misc.Contended static final class Node {
       int index;              //arena的下標,多個槽位的時候利用
       int bound;              // 上一次記錄的Exchanger.bound;
       int collides;           // 在當前bound下CAS失敗的次數;
       int hash;               // 用於自旋;
       Object item;            // 這個線程的當前項,也就是須要交換的數據;
       volatile Object match;  // 交換的數據
       volatile Thread parked; // 線程
    }


    static final class Participant extends ThreadLocal<Node> {
        // 初始值返回Node
        public Node initialValue() {
           return new Node();
        }
    }

    private final Object arenaExchange(Object item, boolean timed, long ns) {
        // 槽位數組
        Node[] a = arena;
        //表明當前線程的Node
        Node p = participant.get(); // p.index 初始值爲 0
        for (int i = p.index; ; ) {                      // access slot at i
            int b, m, c;
            long j;                       // j is raw array offset
            //在槽位數組中根據"索引" i 取出數據 j至關因而 "第一個"槽位
            Node q = (Node) U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
            // 該位置上有數據(即有線程在這裏等待交換數據)
            if (q != null && U.compareAndSwapObject(a, j, q, null)) {
                // 進行數據交換,這裏和單槽位的交換是同樣的
                Object v = q.item;                     // release
                q.match = item;
                Thread w = q.parked;
                if (w != null)
                    U.unpark(w);
                return v;
            }
            // bound 是最大的有效的 位置,和MMASK相與,獲得真正的存儲數據的索引最大值
            else if (i <= (m = (b = bound) & MMASK) && q == null) {
                // i 在這個範圍內,該槽位也爲空

                //將須要交換的數據 設置給p
                p.item = item;                         // offer
                //設置該槽位數據(在該槽位等待其它線程來交換數據)
                if (U.compareAndSwapObject(a, j, null, p)) {
                    long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
                    Thread t = Thread.currentThread(); // wait
                    // 進行必定時間的自旋
                    for (int h = p.hash, spins = SPINS; ; ) {
                        Object v = p.match;
                        //在自旋的過程當中,有線程來和該線程交換數據
                        if (v != null) {
                            //交換數據後,清空部分設置,返回交換獲得的數據,over
                            U.putOrderedObject(p, MATCH, null);
                            p.item = null;             // clear for next use
                            p.hash = h;
                            return v;
                        } else if (spins > 0) {
                            h ^= h << 1;
                            h ^= h >>> 3;
                            h ^= h << 10; // xorshift
                            if (h == 0)                // initialize hash
                                h = SPINS | (int) t.getId();
                            else if (h < 0 &&          // approx 50% true
                                    (--spins & ((SPINS >>> 1) - 1)) == 0)
                                Thread.yield();        // two yields per wait
                        }
                        // 交換數據的線程到來,可是尚未設置好match,再稍等一會
                        else if (U.getObjectVolatile(a, j) != p)
                            spins = SPINS;
                            //符合條件,特別注意m==0 這個說明已經到達area 中最小的存儲數據槽位了
                            //沒有其餘線程在槽位等待了,全部當前線程須要阻塞在這裏     
                        else if (!t.isInterrupted() && m == 0 &&
                                (!timed ||
                                        (ns = end - System.nanoTime()) > 0L)) {
                            U.putObject(t, BLOCKER, this); // emulate LockSupport
                            p.parked = t;              // minimize window
                            // 再次檢查槽位,看看在阻塞前,有沒有線程來交換數據
                            if (U.getObjectVolatile(a, j) == p)
                                U.park(false, ns); // 掛起
                            p.parked = null;
                            U.putObject(t, BLOCKER, null);
                        }
                        // 當前這個槽位一直沒有線程來交換數據,準備換個槽位試試
                        else if (U.getObjectVolatile(a, j) == p &&
                                U.compareAndSwapObject(a, j, p, null)) {
                            //更新bound
                            if (m != 0)                // try to shrink
                                U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
                            p.item = null;
                            p.hash = h;
                            // 減少索引值 往"第一個"槽位的方向挪動
                            i = p.index >>>= 1;        // descend
                            // 發送中斷,返回null
                            if (Thread.interrupted())
                                return null;
                            // 超時
                            if (timed && m == 0 && ns <= 0L)
                                return TIMED_OUT;
                            break;                     // expired; restart 繼續主循環
                        }
                    }
                } else
                    //佔據槽位失敗,先清空item,防止成功交換數據後,p.item還引用着item
                    p.item = null;                     // clear offer
            } else { // i 不在有效範圍,或者被其它線程搶先了
                //更新p.bound
                if (p.bound != b) {                    // stale; reset
                    p.bound = b;
                    //新bound ,重置collides
                    p.collides = 0;
                    //i若是達到了最大,那麼就遞減
                    i = (i != m || m == 0) ? m : m - 1;
                } else if ((c = p.collides) < m || m == FULL ||
                        !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
                    p.collides = c + 1; // 更新衝突
                    // i=0 那麼就從m開始,不然遞減i
                    i = (i == 0) ? m : i - 1;          // cyclically traverse
                } else
                    //遞增,日後挪動
                    i = m + 1;                         // grow
                // 更新index
                p.index = i;
            }
        }
    }


    private final Object slotExchange(Object item, boolean timed, long ns) {
       // 獲得一個初試的Node
       Node p = participant.get();
       // 當前線程
       Thread t = Thread.currentThread();
       // 若是發生中斷,返回null,會重設中斷標誌位,並無直接拋異常
       if (t.isInterrupted()) // preserve interrupt status so caller can recheck
          return null;

       for (Node q;;) {
          // 槽位 solt不爲null,則說明已經有線程在這裏等待交換數據了
          if ((q = slot) != null) {
             // 重置槽位
             if (U.compareAndSwapObject(this, SLOT, q, null)) {
                //獲取交換的數據
                Object v = q.item;
                //等待線程須要的數據
                q.match = item;
                //等待線程
                Thread w = q.parked;
                //喚醒等待的線程
                if (w != null)
                    U.unpark(w);
                return v; // 返回拿到的數據,交換完成
             }
             // create arena on contention, but continue until slot null
             //存在競爭,其它線程搶先了一步該線程,所以須要採用多槽位模式,這個後面再分析
             if (NCPU > 1 && bound == 0 &&
                U.compareAndSwapInt(this, BOUND, 0, SEQ))
                arena = new Node[(FULL + 2) << ASHIFT];
          }else if (arena != null) //多槽位不爲空,須要執行多槽位交換
             return null; // caller must reroute to arenaExchange
          else { //尚未其餘線程來佔據槽位
             p.item = item;
             // 設置槽位爲p(也就是槽位被當前線程佔據)
             if (U.compareAndSwapObject(this, SLOT, null, p))
                break; // 退出無限循環
             p.item = null; // 若是設置槽位失敗,則有可能其餘線程搶先了,重置item,從新循環
          }
       }

       //當前線程佔據槽位,等待其它線程來交換數據
       int h = p.hash;
       long end = timed ? System.nanoTime() + ns : 0L;
       int spins = (NCPU > 1) ? SPINS : 1;
       Object v;
       // 直到成功交換到數據
       while ((v = p.match) == null) {
          if (spins > 0) { // 自旋
             h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
             if (h == 0)
                h = SPINS | (int)t.getId();
             else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
                // 主動讓出cpu,這樣能夠提供cpu利用率(反正當前線程也自旋等待,還不如讓其它任務佔用cpu)
                Thread.yield(); 
          }
          else if (slot != p) //其它線程來交換數據了,修改了solt,可是尚未設置match,再稍等一會
             spins = SPINS;
          //須要阻塞等待其它線程來交換數據
          //沒發生中斷,而且是單槽交換,沒有設置超時或者超時時間未到 則繼續執行
          else if (!t.isInterrupted() && arena == null &&
                 (!timed || (ns = end - System.nanoTime()) > 0L)) {
             // cas 設置BLOCKER,能夠參考Thread 中的parkBlocker
             U.putObject(t, BLOCKER, this);
             // 須要掛起當前線程
             p.parked = t;
             if (slot == p)
                U.park(false, ns); // 阻塞當前線程
             // 被喚醒後    
             p.parked = null;
             // 清空 BLOCKER
             U.putObject(t, BLOCKER, null);
          }
          // 不知足前面 else if 條件,交換失敗,須要重置solt
          else if (U.compareAndSwapObject(this, SLOT, p, null)) {
             v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
             break;
          }
       }
       //清空match
       U.putOrderedObject(p, MATCH, null);
       p.item = null;
       p.hash = h;
       // 返回交換獲得的數據(失敗則爲null)
       return v;
    }


    public Exchanger() {
        participant = new Participant();
    }

    public V exchange(V x) throws InterruptedException {
        Object v;
        Object item = (x == null) ? NULL_ITEM : x;
        if ((arena != null ||
             (v = slotExchange(item, false, 0L)) == null) &&
            ((Thread.interrupted() ||
              (v = arenaExchange(item, false, 0L)) == null)))
            throw new InterruptedException();
        return (v == NULL_ITEM) ? null : (V)v;
    }


    public V exchange(V x, long timeout, TimeUnit unit)
        throws InterruptedException, TimeoutException {
        Object v;
        Object item = (x == null) ? NULL_ITEM : x;
        long ns = unit.toNanos(timeout);
        if ((arena != null ||
             (v = slotExchange(item, true, ns)) == null) &&
            ((Thread.interrupted() ||
              (v = arenaExchange(item, true, ns)) == null)))
            throw new InterruptedException();
        if (v == TIMED_OUT)
            throw new TimeoutException();
        return (v == NULL_ITEM) ? null : (V)v;
    }

    
    private static final sun.misc.Unsafe U;
    private static final long BOUND;
    private static final long SLOT;
    private static final long MATCH;
    private static final long BLOCKER;
    private static final int ABASE;
    static {
        int s;
        try {
            U = sun.misc.Unsafe.getUnsafe();
            Class<?> ek = Exchanger.class;
            Class<?> nk = Node.class;
            Class<?> ak = Node[].class;
            Class<?> tk = Thread.class;
            BOUND = U.objectFieldOffset
                (ek.getDeclaredField("bound"));
            SLOT = U.objectFieldOffset
                (ek.getDeclaredField("slot"));
            MATCH = U.objectFieldOffset
                (nk.getDeclaredField("match"));
            BLOCKER = U.objectFieldOffset
                (tk.getDeclaredField("parkBlocker"));
            s = U.arrayIndexScale(ak);
            
            ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT);

        } catch (Exception e) {
            throw new Error(e);
        }
        if ((s & (s-1)) != 0 || s > (1 << ASHIFT))
            throw new Error("Unsupported array scale");
    }
}

類 Exchanger<V>

    類型參數:api

    V - 能夠交換的對象類型數組

    每一個線程將條目上的某個方法呈現給exchange()方法,與夥伴線程進行匹配,而且在返回時接收其夥伴的對象。app

    用法示例:使用 Exchanger 在線程間交換緩衝區。在須要時,填充緩衝區的線程獲取一個新騰空的緩衝區,並將填滿的緩衝區傳遞給騰空緩衝區的線程。 ide

class FillAndEmpty {
   Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();

   DataBuffer initialEmptyBuffer = ...
   DataBuffer initialFullBuffer = ...

   class FillingLoop implements Runnable {
     public void run() {
       DataBuffer currentBuffer = initialEmptyBuffer;
       try {
         while (currentBuffer != null) {
           addToBuffer(currentBuffer);
           if (currentBuffer.isFull())
             currentBuffer = exchanger.exchange(currentBuffer);
         }
       } catch (InterruptedException ex) { ... handle ... }
     }
   }

   class EmptyingLoop implements Runnable {
     public void run() {
       DataBuffer currentBuffer = initialFullBuffer;
       try {
         while (currentBuffer != null) {
           takeFromBuffer(currentBuffer);
           if (currentBuffer.isEmpty())
             currentBuffer = exchanger.exchange(currentBuffer);
         }
       } catch (InterruptedException ex) { ... handle ...}
     }
   }

   void start() {
     new Thread(new FillingLoop()).start();
     new Thread(new EmptyingLoop()).start();
   }
}

 

構造方法摘要

Exchanger() 
          建立一個新的 Exchanger。

 

方法摘要

 V exchange(V x) 
          等待另外一個線程到達此交換點(除非當前線程被中斷),而後將給定的對象傳送給該線程,並接收該線程的對象。
 V exchange(V x, long timeout, TimeUnit unit) 
          等待另外一個線程到達此交換點(除非當前線程被中斷,或者超出了指定的等待時間),而後將給定的對象傳送給該線程,同時接收該線程的對象。

  

Exchanger

public Exchanger()

建立一個新的 Exchanger。工具

 

exchange

public V exchange(V x) throws InterruptedException

    等待另外一個線程到達此交換點(除非當前線程被 中斷),而後將給定的對象傳送給該線程,並接收該線程的對象。oop

    若是另外一個線程已經在交換點等待,則出於線程調度目的,繼續執行此線程,並接收當前線程傳入的對象。當前線程當即返回,接收其餘線程傳遞的交換對象。this

    若是尚未其餘線程在交換點等待,則出於調度目的,禁用當前線程,且在發生如下兩種狀況之一前,該線程將一直處於休眠狀態:atom

  • 其餘某個線程進入交換點;或者
  • 其餘某個線程中斷當前線程。

    若是當前線程:

  • 在進入此方法時已經設置了該線程的中斷狀態;或者
  • 在等待交換時被中斷

    則拋出 InterruptedException,而且清除當前線程的已中斷狀態。

    參數:

    x - 要交換的對象

    返回:

        另外一個線程提供的對象

    拋出:

    InterruptedException - 若是當前線程在等待時被中斷

 

exchange

public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException

    等待另外一個線程到達此交換點(除非當前線程被 中斷,或者超出了指定的等待時間),而後將給定的對象傳送給該線程,同時接收該線程的對象。

    若是另外一個線程已經在交換點上等待,則出於線程調度目的,繼續執行此線程,並接收當前線程傳入的對象。當前線程當即返回,並接收其餘線程傳遞的交換對象。

    若是尚未其餘線程在交換點等待,則出於調度目的,禁用當前線程,且在發生如下三種狀況之一前,該線程將一直處於休眠狀態:

  • 其餘某個線程進入交換點;或者
  • 其餘某個線程中斷當前線程;或者
  • 已超出指定的等待時間。

    若是當前線程:

  • 在進入此方法時已經設置其中斷狀態;或者
  • 在等待交換時被中斷

    則拋出 InterruptedException,而且清除當前線程的已中斷狀態。

    若是超出指定的等待時間,則拋出 TimeoutException 異常。若是該時間小於等於零,則此方法根本不會等待。

    參數:

    x - 要交換的對象

    timeout - 要等待的最長時間

    unit - timeout 參數的時間單位

    返回:

        其餘線程提供的對象

    拋出:

    InterruptedException - 若是當前線程在等待時被中斷

    TimeoutException - 若是在另外一個線程進入交換點以前已經到達指定的等待時間

使用實例:

    線程交換各自擁有的值:

package com.thread;

import java.util.concurrent.Exchanger;

public class ExchangerDemo extends Thread {
    private Exchanger<String> exchanger;
    private String name;

    public ExchangerDemo(String name, Exchanger<String> exchanger) {
        this.exchanger = exchanger;
        this.name = name;
    }

    public void run() {
        try {
            System.out.println(Thread.currentThread().getName() + ": " + exchanger.exchange(this.name));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();
        ExchangerDemo exchangerDemo1 = new ExchangerDemo("demo1", exchanger);
        exchangerDemo1.setName("exchanger1");
        ExchangerDemo exchangerDemo2 = new ExchangerDemo("demo2", exchanger);
        exchangerDemo2.setName("exchanger2");
        exchangerDemo1.start();
        exchangerDemo2.start();
    }
}

    運行結果:

exchanger1: demo2
exchanger2: demo1

    從輸出的值能夠看到,兩個線程的值已經發生了交換。 

相關文章
相關標籤/搜索