ReentrantLock實現了公平鎖與非公平鎖,公平鎖提供順序獲取鎖的方式,而非公平鎖提供搶佔式獲取鎖的方式。
公平鎖: 線程A佔用鎖,B等待,而後依次獲取鎖,其中B會被掛起或者是自旋,而後當線程A釋放鎖後,線程B再被喚醒,以此類推,按照申請鎖的前後順序來。
非公平鎖: 線程A佔用鎖,B等待,於此同時C請求鎖,因爲B線程被喚醒須要時間,因此C有可能在B被喚醒錢就釋放鎖,以此類推,按照鎖空閒時申請鎖的線程爲優先。java
世界應該是公平公正的不是嗎?好了,別白日作夢了,因爲線程的喚醒是一個比較耗時的操做(切換線程上下文,調用OS等)若是線程持有鎖的時間很長,那麼公平鎖就比較有優點node
NonfairSync中lock的實現以下:ui
final void lock() { // CAS操做設置AbstractQueuedSynchronizer中的volatile int state // 若是設置成功將現有的線程setExclusiveOwnerThread if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else // CAS失敗了就調用acquire()方法 acquire(1); }
acquire方法由AbstractQueuedSynchronizer提供this
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire在AbstractQueuedSynchronizer中是一個protected方法而且沒有給出實現,可見是但願由它的子類去擴展線程
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
回去再看NonfairSync中tryAcquire的實現code
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); // 獲取同步state int c = getState(); // 這個判斷頗有意思,因爲調用這個方式是第一次嘗試CAS失敗纔會進入該方法 // 這裏從新再判斷一次同步state,能夠避免以前的線程已經釋放lock,而繼續將 // 該線程放入等待隊列的狀況,和lock()的第一段代碼含義相同設置同步state if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 接下來判斷是不是同一個線程,這個判斷是由於ReentrantLock是可重入的lock else if (current == getExclusiveOwnerThread()) { // 將state++,這裏的lock的獲取就是經過同步state的值來控制的 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
這段代碼大概的意思就是若是尚未線程佔用鎖就設置state爲1,若是是已經佔用該鎖的線程再次訪問就累計state的值,返回true,若是已經被佔用返回false隊列
回過頭來繼續看acquire,!tryAcquire(arg)意味着獲取鎖失敗,而後執行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
進入addWaiter方法ip
/** * 1. 初始化: 若是tail和head沒有被初始化,那麼建立一個node而且指向它 * +------+ * | Node | <---- tail, head * |(Head)| * +------+ * * 2. 添加新的節點進入隊列 * +------+ prev +------+ * head ----> | Node | <---- | Node | <---- tail * |(Head)| ----> |Thread| * +------+ next +------+ */ private Node addWaiter(Node mode) { // 建立一個node使用EXCLUSIVE模式 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; // 若是隊列不是null(已經有線程再等待鎖)那麼將該新增的node加入隊列 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 若是上述代碼沒有成功,這裏是使用自旋的方式繼續加入等待隊列 enq(node); // 入隊成功後返回新增node節點 return node; } // 將新的node節點入隊並返回以前的tail節點 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { // 典型的入隊操做將tail指向新增的node node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
而後再看acquireQueued,此時的node參數是以前咱們分析的新增入隊列的node節點get
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 返回node的前一個節點 final Node p = node.predecessor(); /** * 若是該新增node的prev-node是head-node.以下圖這種狀態 * 也就是說在等待隊列中只有一個node,Head-node不包含在 * 內,而且調用tryAcquire方法成功(即成功的設置了同步state) * * * +------+ prev +------+ * head ----> | Node | <---- | Node | <---- tail * |(Head)| ----> |Thread| * +------+ next +------+ * * 那麼將head指向改node,原來的head的next節點爲null * * +-------------+ * head ----> | Node | <---- tail * |Thread = null| * +-------------+ * */ if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
接下來是shouldParkAfterFailedAcquire,顧名思義該方法返回是否在獲取lock失敗後堵塞線程,該方法會忽略並移出隊列中node.waitStatus = CANCELLED的node同步
/** * 該方的參數是 pred是Thread-A所在的node,node是Thread-B所在的node * 這個必須得理解 * * +------+ prev +--------------+ prev +--------+ * head ----> | Node | <---- | Node | <---- | Node |<---- tail * |(Head)| ----> |Thread-A | ----> |Thread-B| * +------+ next |waitStatus = 0| | | * +--------------+ +--------+ * * +------+ prev +---------------+ prev +--------+ * head ----> | Node | <---- | Node | <---- | Node |<---- tail * |(Head)| ----> |Thread-A | ----> |Thread-B| * +------+ next |waitStatus = -1| | | * +---------------+ +--------+ * * static final int CANCELLED = 1; * static final int SIGNAL = -1; * static final int CONDITION = -2; * static final int PROPAGATE = -3; * * 同時這個方法又分爲2步(彷佛整個AQS中都充斥着延遲初始化的概念) * 1. 初始化: 設置形參pred的waitStatus屬性爲Node.SIGNAL * 2. 因爲調用shouldParkAfterFailedAcquire()方法的acquireQueued()方法 * 還在自旋中,因此該方法會被調用第2次,此次才真正返回true,若是waitStatus * 被設置成CANCELLED,那麼會忽略等待隊列中的這些node * */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ // 這裏就是初始化的代碼,設置形參pred的waitStatus屬性爲Node.SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
parkAndCheckInterrupt()方法,使用LockSupport堵塞當前node對應的thread,並返回中斷標識,當這個方法被調用時才真正的意味着lock.lock()方法完成了它的使命
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }