按部就班理解AQS(1):如何實現鎖

概述

AbstractQueuedSynchronizer(如下簡稱AQS)是Java中用於構建鎖和同步器的框架,許多同步器均可以經過AQS很容易而且高效地構造出來。不少文章已經基於論文和源碼對實現進行了解讀,本文試着從另外的角度入手:先不考慮AQS的實現,假設讓咱們本身實現鎖,咱們能夠怎麼作?最後再來看AQS的實現,才能更好地理解爲何要這麼實現。html

鎖的實現思路

咱們能夠形象地把鎖理解成門票,只有當線程拿到了門票,才能進入臨界區。所以咱們能夠用一個狀態變量state表示鎖,state = true表示能夠獲取到鎖,反之就是表示鎖已經被佔用。那麼當鎖被佔用時,應該怎麼處理?這裏有兩種思路:node

  1. 循環檢測直到鎖可用(也叫自旋)
  2. 讓出處理器,等待通知

TAS鎖

基於第一種思路實現的鎖叫作自旋鎖(SpinLock)。下面咱們先看自選鎖中最簡單的實現,這個實現叫作Test-And-Set-LOCK,簡稱TAS Locklinux

public class TASLock {
    AtomicBoolean state = new AtomicBoolean(false);

    public void lock() {
        while (!state.getAndSet(true)) {} // 循環檢測直到狀態可用
    }

    public void unlock() {
        state.set(false);
    }
}
複製代碼

從實現上咱們能夠看出,獲取鎖的線程一直處於活躍狀態,可是並無執行任何有效的任務,所以使用自旋鎖會形成busy-waitinggit

在對TAS鎖提出優化思路前,先介紹一下緩存一致性。下面這張圖描述的是每一個處理器都有本身的緩存,但共享一個內存,緩存的內容來自內存。一旦處理器更新了本身的緩存,若是這個更新須要被其餘處理器感知,就須要經過總線來通知。所以頻繁更新會佔用大量總線流量。github

目前咱們是用一個狀態變量來標識鎖的狀態。TAS鎖每次循環都會調用getAndSet(),這是一個更新指令,會致使其餘線程的緩存都失效,從而都會去內存中獲取值,所以佔用總線流量資源。數組

TTAS鎖

TAS鎖的問題在於每次循環都修改狀態,實際上只有狀態是可用的狀況下,纔有必要去修改。TTASTest-Test-And-Set)改進就是在加鎖前先檢查狀態變量是否爲false,只有條件知足纔去修改。緩存

public class TTASLock {
    AtomicBoolean state = new AtomicBoolean(false);

    public void lock() {
        while (true) {
            while (state.get()) {} // 循環讀取state狀態
            if (!state.getAndSet(true)) { // 只有當state爲false,纔會修改
                return;
            }
        }
    }

    public void unlock() {
        state.set(false);
    }
}
複製代碼

可是當釋放鎖時,其餘線程檢測到state都是false,這時都會調用state.getAndSet(true),又退化到TAS的情形。bash

指數退避

TTAS的問題關鍵在於全部線程都同時去獲取鎖,所以引入延遲能夠解決問題:當獲取鎖失敗時,在重試前先睡眠一段時間,再次失敗則延遲時間翻倍——指數退避。架構

public class BackoffLock {
    AtomicBoolean state = new AtomicBoolean(false);

    private int minDelay;
    private int maxDelay;

    public BackoffLock(int minDelay, int maxDelay) {
        this.minDelay = minDelay;
        this.maxDelay = maxDelay;
    }

    public void lock() throws InterruptedException {
        int delay = minDelay;
        while (true) {
            while (state.get()) {}
            if (!state.getAndSet(true)) {
                return;
            }
            Thread.sleep((int) (Math.random() * delay));
            if (delay < maxDelay) {
                delay = 2 * delay;
            }
        }
    }

    public void unlock() {
        state.set(false);
    }
}
複製代碼

指數退避自旋的不足在於須要設置好延遲參數,極可能就在線程睡眠過程當中,獲取鎖的線程恰好就釋放了鎖。框架

基於數組的隊列鎖

一開始由於咱們都是基於一個狀態變量來標識鎖,纔會致使頻繁佔用總線流量,那麼若是每一個線程都有一個狀態,就能夠大幅減小佔用。

基於數組的隊列鎖lock()時從數組中按順序找到一個可用的位置,用來表明當前線程。unlock()時通知下一個線程。

public class ArrayLock {
    private int n;
    private volatile boolean[] flags;

    private AtomicInteger next = new AtomicInteger(0);
    private ThreadLocal<Integer> slot = new ThreadLocal<>();

    public ArrayLock(int n) {
        this.n = n;
        flags = new boolean[n];
        flags[0] = true;
    }

    public void lock() {
        int index = next.getAndIncrement();
        slot.set(index);
        while (!flags[index % n]) {}
    }

    public void unlock() {
        int index = slot.get();
        flags[index % n] = false; // 爲複用作好準備
        flags[(index + 1) % n] = true; // 通知下一個線程
    }
}
複製代碼

顯然,基於數組的隊列鎖的不足之處就是鎖的數量受限於數組長度。所以,可用考慮經過鏈表來改進。

CLH鎖

CLH鎖內部就維護了一個隱式的鏈表。CLH是Craig, Landin, and Hagersten的縮寫。

public class CLHSpinLock {
    private final ThreadLocal<QNode> node;
    private final ThreadLocal<QNode> prev;
    AtomicReference<QNode> tail = new AtomicReference<>(new QNode());

    public CLHSpinLock() {
        node = new ThreadLocal<QNode>() {
            @Override
            protected QNode initialValue() {
                return new QNode();
            }
        };

        prev = new ThreadLocal<QNode>() {
            @Override
            protected QNode initialValue() {
                return null;
            }
        };
    }

    public void lock() {
        QNode myNode = node.get();
        myNode.locked = true;
        QNode pred = tail.getAndSet(myNode);
        prev.set(pred);

        // 在前繼節點自旋
        while (pred.locked) {};
    }

    public void unlock() {
        QNode myNode = node.get();
        myNode.locked = false;
        node.set(prev.get());
    }

    class QNode {
        volatile boolean locked = false;
    }
}
複製代碼

因爲CLH是在前繼節點上自旋,在NUMA架構下,可能須要頻繁訪問遠端內存,影響性能。那麼能不能直接在本地節點自旋呢?

NUMA架構

MCS鎖

MCS鎖就是在本地節點自旋,把CLH的屢次對遠端內存的監聽 + 一次對本地內存的更新,簡化成了屢次對本地內存的監聽 + 一次對遠端內存的更新。

public class MCSSpinLock {
    ThreadLocal<QNode> node = new ThreadLocal<QNode>() {
        @Override
        protected QNode initialValue() {
            return new QNode();
        }
    };

    AtomicReference<QNode> tail = new AtomicReference<>(null);

    public void lock() {
        QNode qNode = node.get();
        QNode pred = tail.getAndSet(qNode);
        if (pred != null) {
            qNode.locked = true;
            pred.next = qNode; // QNode.next是volatile,保證了線程可見性
            while (qNode.locked) {};
        }
    }

    public void unlock() {
        QNode qNode = node.get();
        if (qNode.next == null) { // 當前節點沒有發現後繼節點
            if (tail.compareAndSet(qNode, null)) { // 確實沒有後繼節點
                return;
            }
            while (qNode.next == null) {}; // 有後繼節點,可是尚未關聯上,須要等待
        }
        qNode.next.locked = false;
    }

    class QNode {
        volatile boolean locked = false;
        volatile QNode next = null;
    }
}
複製代碼

參考文獻

  1. Spin Locks and Contention
  2. Practice: Spin Locks and Contention
  3. 自旋鎖學習系列(4):基於數組的隊列鎖
  4. building fifo and priority-queueing spin locks from atomic swap
  5. Ticket Lock, CLH Lock, MCS Lock
  6. NUMA架構的CPU -- 你真的用好了麼?
相關文章
相關標籤/搜索