Java多線程之自旋鎖與隊列鎖

編寫高效的併發程序,須要對互斥問題從新研究,設計出適用於多線程的互斥協議。那麼問題來了,若是不能得到鎖,應該怎麼作?java

  • 旋轉:繼續進行嘗試,如自旋鎖,延遲較短;node

  • 阻塞:掛起本身,請求調度器切換到另外一個線程,代價較大。git

綜合來看,先旋轉一小段時間再阻塞,是種不錯的選擇。github

java.util.concurrent.locks.Lock接口提供了lock()unlock()兩個重要的方法,用於解決實際互斥問題。算法

Lock mutex = new MyLock();
mutex.lock();
try {
    do something
} finally {
    mutex.unlock();
}

測試-設置鎖

測試-設置來源於getAndSet()操做,經過一個原子布爾型狀態變量的值判斷當前鎖的狀態。若爲true表示鎖忙,若爲false表示鎖空閒。數組

public class TASLock {
    AtomicBoolean state = new AtomicBoolean(false);

    public void lock() {
        while (state.getAndSet(true)) {
            ;
        }
    }

    public void unlock() {
        state.set(false);
    }
}

測試-測試-設置鎖

TTASLock是升級版的TASLock算法,沒有直接調用getAndSet()方法,而是在鎖看起來空閒(state.get()返回false)時才調用。緩存

public class TTASLock {
    AtomicBoolean state = new AtomicBoolean(false);

    public void lock() {
        while (true) {
            while (state.get()) {
                ;
            }
            if (! state.getAndSet(true)) {
                return;
            }
        }
    }

    public void unlock() {
        state.set(false);
    }
}

這兩個算法都能保證無死鎖的互斥,可是TTASLock的性能會比TASLock高許多。多線程

能夠從計算機系統結構的高速緩存和局部性來解釋這個問題,每一個處理器都有一個cache,cache的訪問速度比內存快好幾個數量級。當cache命中時,會當即加載這個值;當cache缺失時,會在內存或兩一個處理器的cache中尋找這個數據。尋找的過程比較漫長,處理器在總線上廣播這個地址,其餘處理器監聽總線。若其餘處理器在本身的cache中發現這個地址,則廣播該地址及其值來作出響應。若全部處理器都沒發現這個地址,則之內存地址及其所對應的值進行響應。併發

getAndSet()的直接調用讓TASLock性能損失許多:dom

  • getAndSet()的調用實質是總線上的一個廣播,這個調用將會延遲全部的線程,由於全部線程都要經過監聽總線通訊。

  • getAndSet()的調用會更新state的值,即便值仍爲true,可是其餘處理器cache中的鎖副本將會被丟棄,從而致使cache缺失。

  • 當持有鎖的線程試圖釋放鎖時可能被延遲,由於總線被正在自旋的線程獨佔。

與此相反,對於TTASLock算法採用的是本地旋轉(線程反覆地重讀被緩存的值而不是反覆地使用總線),線程A持有鎖時,線程B嘗試得到鎖,但線程B只會在第一次讀鎖是cache缺失,以後每次cache命中不產生總線流量。

那麼缺點來了,TTASLock釋放鎖時,會使各自旋線程處理器中的cache副本當即失效,這些線程會從新讀取這個值,形成總線流量風暴。

指數後退鎖

對於TTASLock算法,當鎖看似空閒(state.get()返回false)時,存在高爭用(多個線程試圖同時獲取一個鎖)的可能。高爭用意味着獲取鎖的可能性小,而且會形成總線流量增長。線程在重試以前回退一段時間是種不錯的選擇。

這裏實現的指數回退算法的回退準則是,不成功嘗試的次數越多,發生爭用的可能性就越高,線程須要後退的時間就應越長。

public class BackoffLock {
    private AtomicBoolean state = new AtomicBoolean(false);
    private static final int MIN_DELAY = 10;
    private static final int MAX_DELAY = 100;

    public void lock() {
        Backoff backoff = new Backoff(MIN_DELAY, MAX_DELAY);
        while (true) {
            while (state.get()) {
                ;
            }
            if (! state.getAndSet(true)) {
                return;
            } else {
                backoff.backoff();
            }
        }
    }

    public void unlock() {
        state.set(false);
    }
}

class Backoff {
    private final int minDelay, maxDelay;
    int limit;
    final Random random;

    public Backoff(int min, int max) {
        minDelay = min;
        maxDelay = max;
        limit = minDelay;
        random = new Random();
    }

    public void backoff() {
        int delay = random.nextInt(limit);
        limit = Math.min(maxDelay, 2 * limit);
        try {
            Thread.sleep(delay);
        } catch (InterruptedException e) {
            ;
        }
    }
}

指數後退算法解決了TTASLock釋放鎖時的高爭用問題,可是它的性能與minDelaymaxDelay的選取密切相關,而且很難找到一個通用兼容的值。

另外,BackoffLock算法還有兩個問題:

  • cache一致性流量:全部線程都在同一個共享存儲單元上旋轉;

  • 臨界區利用率低:後退時間沒法肯定,線程延遲可能過長。

基於數組的鎖

下面的這些是隊列鎖,名字看上去奇形怪狀的,實際上是發明者名字的首字母。隊列鎖就是將線程組織成一個隊列,讓每一個線程在不一樣的存儲單元上旋轉,從而下降cache一致性流量。

基於循環數組實現隊列鎖ALock,每一個線程檢測本身的slot對應的flag[]域來判斷是否輪到本身。

一個線程想得到鎖,就要調用lock()方法,得到自增tail得到分配的slot號,而後等待這個slot空閒;當釋放鎖時,就要阻塞當前slot,而後讓下一個slot可運行。
flag[i]true時,那麼這個線程就有權得到鎖。任意時刻的flag[]數組中,應該只有一個slot的值爲true

public class ALock {
    ThreadLocal<Integer> mySlotIndex = new ThreadLocal<Integer>();
    AtomicInteger tail;
    volatile boolean [] flag;
    int size;

    public ALock() {
        size = 100;
        tail = new AtomicInteger(0);
        flag = new boolean[size];
        flag[0] = true;
    }

    public void lock() {
        int slot = tail.getAndIncrement() % size;
        mySlotIndex.set(slot);
        while (! flag[slot]) {
            ;
        }
    }

    public void unlock() {
        int slot = mySlotIndex.get();
        flag[slot] = false;
        flag[(slot + 1) % size] = true;
    }
}
  • mySlotIndex是線程的局部變量,只能被一個線程訪問,每一個線程都有本身獨立初始化的副本。不須要保存在共享存儲器,不須要同步,不會產生一致性流量。使用get()set()方法來訪問局部變量的值。

  • tail是常規變量,域被全部的線程共享,支持原子操做。

  • 數組flag[]也是被多個線程共享的,可是每一個線程都是在一個數組元素的本地cache副本上旋轉。

ALock對BackoffLock的改進:在多個共享存儲單元上旋轉,將cache無效性降到最低;把一個線程釋放鎖和另外一個線程得到該鎖之間的時間間隔最小化;先來先服務的公平性。可是,數組的大小至少與最大的併發線程數相同,並非空間有效的,當併發線程最大個數爲n時,同步L個不一樣對象就須要O(Ln)大小的空間。

CLH隊列鎖

CLH隊列鎖表示爲QNode對象的鏈表,每一個線程經過一個線程局部變量pred指向其前驅。每一個線程經過檢測前驅結點的locked域來判斷是否輪到本身。若是該域爲true,則前驅線程要麼已經得到鎖要麼正在等待鎖;若是該域爲false,則前驅進程已釋放鎖,輪到本身了。正常狀況下,隊列鏈中只有一個結點的locked域爲false

當一個線程調用lock()方法想得到鎖時,將本身的locked域置爲true,表示該線程不許備釋放鎖,而後並將本身的結點加入到隊列鏈尾部。最後就是在前驅的locked域上旋轉,等待前驅釋放鎖。當這個線程調用unlock()方法要釋放鎖時,線程要將本身的locked域置爲false,表示已經釋放鎖,而後將前驅結點做爲本身的新結點以便往後訪問。

那麼問題來了,爲何要在釋放鎖時作myNode.set(myPred.get())這個處理呢?假設線程A釋放鎖,A的後繼結點爲B,若是不使用這種處理方式,A在釋放鎖後立刻申請鎖將本身的locked域置爲true,整個動做在B檢測到前驅A釋放鎖以前,那麼將發生死鎖。

public class CLHLock {
    AtomicReference<QNode> tail;
    ThreadLocal<QNode> myPred;
    ThreadLocal<QNode> myNode;

    public CLHLock() {
        tail = new AtomicReference<QNode>(new QNode());
        myPred = new ThreadLocal<QNode>() {
            protected QNode initialValue() {
                return null;
            }
        };
        myNode = new ThreadLocal<QNode>() {
            protected QNode initialValue() {
                return new QNode();
            }
        };
    }

    public void lock() {
        QNode qnode = myNode.get();
        qnode.locked = true;
        QNode pred = tail.getAndSet(qnode);
        myPred.set(pred);
        while (pred.locked) {
            ;
        }
    }

    public void unlock() {
        QNode qnode = myNode.get();
        qnode.locked = false;
        myNode.set(myPred.get());
    }

    class QNode {
        boolean locked = false;
    }
}

若是最大線程數爲n,有L個不一樣對象,那麼CLHLock只須要O(L+n)空間。比ALock所需空間少,而且不須要知道可能使用鎖的最大線程數量。可是,在無cache的系統上性能較差,由於一次要訪問兩個結點,若這兩個結點內存位置較遠,性能損失會很大。

MCS隊列鎖

MCS隊列鎖經過檢測本身所在結點的locked的值來判斷是否輪到本身,等待這個域被前驅釋放鎖時改變。

線程若要得到鎖,需把本身結點添加到鏈表的尾部。若隊列鏈表原先爲空,則得到鎖。不然,將前驅結點的next域指向本身,在本身的locked域上自旋等待,直到前驅將該域置爲false。線程若要釋放鎖,判斷是否在隊尾,若是是隻需將tail置爲null,若是不是需將後繼的locked域置爲false且將本身結點的next域置爲默認的null。注意在隊尾的狀況,可能有個線程正在得到鎖,要等一下變爲後一種狀況。

public class MCSLock {
    AtomicReference<QNode> tail;
    ThreadLocal<QNode> myNode;

    public MCSLock() {
        tail = new AtomicReference<QNode>(null);
        myNode = new ThreadLocal<QNode>() {
            protected QNode initialValue() {
                return new QNode();
            }
        };
    }

    public void lock() {
        QNode qnode = myNode.get();
        QNode pred = tail.getAndSet(qnode);
        if (pred != null) {
            qnode.locked = true;
            pred.next = qnode;
            while (qnode.locked) {
                ;
            }
        }
    }

    public void unlock() {
        QNode qnode = myNode.get();
        if (qnode.next == null) {
            if (tail.compareAndSet(qnode, null)) {
                return;
            }
            while (qnode.next == null) {
                ;
            }
        }
        qnode.next.locked = false;
        qnode.next = null;
    }

    class QNode {
        boolean locked = false;
        QNode next = null;
    }
}

結點能被重複使用,該鎖的空間複雜度爲O(L+n)。MCSLock算法適合無cache的系統結構,由於每一個線程只需控制本身自旋的存儲單元。可是,釋放鎖也須要旋轉,另外讀寫比較次數比CLHLock多。

時限隊列鎖

Lock接口中有個一個tryLock()方法,能夠指定一個時限(得到鎖可等待的最大時間),若得到鎖超時則放棄嘗試。最後返回一個布爾值說明鎖是否申請成功。

時限隊列鎖TOLock是基於CLHLock類的,鎖是一個結點的虛擬隊列,每一個結點在它的前驅結點上自旋等待鎖釋放。可是當這個線程超時時,不能簡單的拋棄它的隊列結點,而是將這個結點標記爲已廢棄,這樣它的後繼們(若是有)就會注意到這個結點已放棄,轉而在放棄結點的前驅上自旋。爲了保證隊列鏈表的連續性,每次申請鎖都會new一個QNode

時限隊列鎖結點的域pred會特殊一點,它有3種類型的取值:

  • null:初始狀態,未得到鎖或已釋放鎖;

  • AVAILABLE:一個靜態結點,表示對應結點已釋放鎖,申請鎖成功;

  • QNodepred域爲null的前驅結點,表示對應結點因超時放棄鎖請求,在放棄請求時纔會設置這個值。

申請鎖時,建立一個pred域爲null的新結點並加入到鏈表尾部,若原先鏈表爲空或前驅結點已釋放鎖,則得到鎖。不然,在嘗試時間內,找到pred域爲null的前驅結點,等待它釋放鎖。若在等待前驅結點釋放鎖的過程當中超時,就嘗試從鏈表中刪除這個結點,要分這個結點是否有後繼兩種狀況。
釋放鎖時,檢查該結點是否有後繼,若無後繼可直接把tail設置爲null,不然將該結點的pred域指向AVAILABLE

public class TOLock {
    static QNode AVAILABLE = new QNode();
    AtomicReference<QNode> tail;
    ThreadLocal<QNode> myNode;

    public TOLock() {
        tail = new AtomicReference<QNode>(null);
        myNode = new ThreadLocal<QNode>();
    }

    public boolean tryLock(long time, TimeUnit unit) {
        long startTime = System.currentTimeMillis();
        long patience = TimeUnit.MILLISECONDS.convert(time, unit);
        QNode qnode = new QNode();
        myNode.set(qnode);
        qnode.pred = null;
        QNode myPred = tail.getAndSet(qnode);
        if (myPred == null || myPred.pred == AVAILABLE) {
            return true;
        }
        while (System.currentTimeMillis() - startTime < patience) {
            QNode predPred = myPred.pred;
            if (predPred == AVAILABLE) {
                return true;
            } else {
                if (predPred != null) {
                    myPred = predPred;
                }
            }
        }
        if (! tail.compareAndSet(qnode, myPred)) {
            qnode.pred = myPred;
        }
        return false;
    }

    public void unlock() {
        QNode qnode = myNode.get();
        if (! tail.compareAndSet(qnode, null)) {
            qnode.pred = AVAILABLE;
        }
    }

    static class QNode {
        public QNode pred = null;
    }
}

優勢:同CLHLock。
缺點:每次申請鎖都要分配一個新結點,在鎖上旋轉的線程可能要回溯一個超時結點鏈。

上面實現的這些鎖算法不支持重入。咱們可使用銀行轉帳的例子來測試一下鎖的效果,任意帳戶間能夠隨意轉帳,鎖生效時全部帳戶的總金額是不變的。完整的算法實現和測試代碼在這裏

相關文章
相關標籤/搜索