java jdk提供了一系列解決併發衝突的鎖和工具,ReentrantLock爲可重入獨佔鎖。 要從哪裏開始,由於這個其實要講起了不少。java
public class Locktest { /** * 測試Lock的使用。在方法中使用Lock,能夠避免使用Synchronized關鍵字。 */ public static class LockTest { Lock lock = new ReentrantLock();// 鎖 double value = 0d; // 值 int addtimes = 0; /** * 增長value的值,該方法的操做分爲2步,並且相互依賴,必須實如今一個事務中 * 因此該方法必須同步,之前的作法是在方法聲明中使用Synchronized關鍵字。 * @throws InterruptedException */ public void addValue(double v) throws InterruptedException { lock.lock();// 取得鎖 System.out.println("LockTest to addValue: " + v + " " + System.currentTimeMillis()); this.value += v; Thread.sleep(1000); this.addtimes++; lock.unlock();// 釋放鎖 } public Double getValue() { return this.value; } } public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { final LockTest lockTest = new LockTest(); Runnable run = new Runnable(){ @Override public void run() { try { lockTest.addValue(1); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }; for(int i=0;i<100;i++){ new Thread(run).start(); }爲了保證value 和addtimes 的操做在addValue函數中是原子操做,且最後的值是正確的,加了一把ReentrantLock鎖。node
那麼接下來咱們來分析下ReentrantLock是如何實現的?數據結構
看源碼分析前建議能夠先從最後總結開始,從宏觀上有一個大體認識。架構
ReetrantLock在jdkjava.util.concurrent.locks包下,實現接口Lock併發
a Lock lock = new ReentrantLock();// 鎖 b lock.lock(); //獲取鎖 2 /** * 業務邏輯,保證只有一個線程同時執行 **/ c lock.unlock() //釋放鎖 3
如圖所示: 由三個內部類Sync、FairSync、NonfairSync,關係以下,都是基於AbstractQueuedSynchronizer實現,後面簡稱AQS,因此能夠知道,jdk鎖的實現AQS是關鍵app
/** * Creates an instance of {@code ReentrantLock}. * This is equivalent to using {@code ReentrantLock(false)}. */ public ReentrantLock() { sync = new NonfairSync(); }
能夠指定 new ReentrantLock(true); 爲公平鎖ide
public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
public void lock() { sync.lock(); }
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); //直接調用獲取鎖方法acquire,按照正常的程序拿鎖,進入隊列 } ... }
非公平鎖會先直接去搶佔,而後在acquire函數
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L;
final void lock() { if (compareAndSetState(0, 1)) //先嚐試插隊直接去拿鎖,更改state狀態爲1,若是成功則把Owner線程設置爲當前線程,則表示成功得到鎖 setExclusiveOwnerThread(Thread.currentThread()); else //插隊失敗則按照公平鎖方式同樣,排隊獲取 acquire(1); } //嘗試獲取鎖後面再講 protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
爲AQS一方法,底層調用CAS,將state公共變量更改成1。工具
protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
AQS是JUC重要的同步器,全部的鎖基於整個同步器實現協調,這裏簡單的介紹下,有興趣後面再重點分析 主要由如下幾個重要部分組成源碼分析
state是關鍵,volatile int state;用volatile修飾。當爲0時表示鎖是空閒,能夠獲取鎖,當大於0時表示得到鎖。 獨佔鎖時大於0表示鎖的重入次數,共享鎖時,state共當前共享線程個數。
node是一個雙向鏈表,有Node、prev、next、head、tail組成,該鏈表被稱之CHL隊列(FIFO) 如上圖
acquire流程通過如下步驟:
public final void acquire(int arg) { // 1.先嚐試 tryAcquire 獲取鎖,具體實現後面再詳細講解, // 2.再addWaiter 加入隊尾等待,acquireQueued放入同步隊列 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
公平鎖與最大區別在tryAcquire,如下分析兩則tryAcquire源碼
公平鎖嘗試獲取鎖實現(OwnerThread爲以得到鎖的線程)
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread();//當前前程 int c = getState();//獲取當前鎖狀態 if (c == 0) {//當鎖空閒時 判斷前置節點爲空,則調用cas將state設置成1,當前線程設置成OwnerThread,獲取鎖成功,true返回 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } }//Ownerthread爲當前線程時,+1,如下爲重入鎖的邏輯 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc);//設置state值,+1 return true;//返回true獲取鎖 } return false; }
非公平鎖tryAcquire實現
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread();//獲取當前線程 int c = getState();//get到state鎖狀態 if (c == 0) {//鎖空閒,能夠獲取鎖 //經過CAS將state狀態更改爲 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
//建立與當前線程隊列的節點和給定鎖模式(獨佔、共享) //新節點node從隊尾加入,設置成功則把新節點設置成尾節點tail,並將原tail.next 指向node /** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node // 返回新的節點 */ private Node addWaiter(Node mode) { // new 一個新節點,設置當前線程和獨佔模式exclusive Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail;//tail節點賦值給pred,用於後面交換 if (pred != null) {//若是原尾節點存在 node.prev = pred; //將新節點的上一個指針指向原尾節點 if (compareAndSetTail(pred, node)) {//新節點node經過CAS設置成新tail節點 pred.next = node;//原tail節點的下一個指針指向新的尾節點tail return node;//返回新節點,即也是新尾節點 } } enq(node);//假如原尾節點爲空或者compareAndSetTail失敗再次enq放入尾節點 return node; }
//空隊列,首先必須初始化,插入隊列尾部,返回當前節點上一個節點 /** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
獲取鎖的關鍵
// /** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; //設置標誌位,若是爲true 則會被中斷 try { boolean interrupted = false; for (;;) {//自旋 //當前節點node已經經過addWaiter設置爲tail了,定義p爲tail上一個節點 final Node p = node.predecessor(); //若是p爲head節點,則纔有資格嘗試調用tryAcquire獲取鎖 if (p == head && tryAcquire(arg)) { //獲取鎖成功則當前節點設置成head,setHead中已將node.prev = null;指向前置節點設置成null了,再也不指向原head setHead(node); //將原head節點next指向null,這個時候,原head將是一個孤立的node,有利於gc回收 p.next = null; // help GC failed = false;//獲取成功標誌 return interrupted; } //一、獲取鎖失敗後,只有被unpark喚醒的waitStatus狀態爲Node.SIGNAL才能夠被阻塞;二、阻塞當前線程,返回中斷狀態 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //阻塞當前線程,返回中斷狀態,爲true,則返回 interrupted = true;//若是阻塞線程被中斷則設置true,下次for循環進來被return interrupted; } } finally { if (failed)//若是失敗則取消該節點獲取鎖 cancelAcquire(node); } }
// CANCELLED = 1 // SIGNAL = -1 // CONDITION = -2 // NORMAL = 0 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 前一個節點的狀態(注意:不是當前節點) int ws = pred.waitStatus; if (ws < 0) // waitStatus<0,也就是前面的節點尚未得到到鎖,那麼返回true,表示當前節點(線程)就應該park()了。 return true; if (ws > 0) { // waitStatus>0,也就是前一個節點被CANCELLED了,那麼就將前一個節點去掉,遞歸此操做直到全部前一個節點的waitStatus<=0,進行4 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // waitStatus=0,修改前一個節點狀態位爲SINGAL,表示後面有節點等待你處理,須要根據它的等待狀態來決定是否該park() compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } // ws<0才須要park(),ws>=0都返回false,表示線程不該該park() return false; }
public final boolean release(int arg) { //嘗試釋放鎖,設置AQS state狀態,若是爲0則返回true,若是解鎖成功則喚醒head的下一個節點,讓其得到鎖 if (tryRelease(arg)) { Node h = head;//head 賦給h,中間變量用於後面交換 //存在頭節點,waitStatus 爲1 -1 -2 -3,喚醒下一個節點 if (h != null && h.waitStatus != 0) unparkSuccessor(h);//喚醒下一個節點 return true; } return false; }
protected final boolean tryRelease(int releases) { int c = getState() - releases;//當前狀態state,獨佔表示重入次數-1 //當前線程不是獨佔OwnerThread,則拋出異常,由於lock和unlock是一對,必須保證釋放鎖的線程爲當前得到鎖的線程 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) {//若是等於0表示解鎖成功,OwnerThread設置null 若是是重入鎖要屢次解鎖,直到0 free = true; setExclusiveOwnerThread(null); } setState(c);//設置AQS state狀態,若是是重入鎖要多長解鎖 return free; }
若是一個存在,喚醒節點的next
/** * Wakes up node's successor, if one exists. * * @param node the node */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; //head節點等待狀態, // 此時node是須要釋放鎖的頭節點 // 清空頭節點的waitStatus,也就是不須要鎖了,這裏修改爲功失敗無所謂 if (ws < 0)//設置0代表已經得到鎖 compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; //若是不存在下一個節點或者線程已中斷或已取消 // 從頭節點的下一個節點開始尋找繼任節點,當且僅當繼任結點的waitStatus<=0纔是有效繼任節點,不然將這些waitStatus>0(也就是CANCELLED的節點)從AQS隊列中剔除 if (s == null || s.waitStatus > 0) { s = null; //從隊尾開始往前任找,直到node.next,過濾掉中斷的結點 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //下一個節點存在則直接喚醒 LockSupport.unpark(s.thread); }
static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3;
LockSupport爲阻塞線程提供基礎的功能,它由一對park和unpark組成,park會阻塞當前線程(獲取許可,線程默認許可被佔用了),unpark「喚醒」等待線程(釋放許可);至關於信號量,park拿到才能夠運行。 簡而言之,是用mutex和condition保護了一個_counter的變量,當park時,這個變量置爲了0,當unpark時,這個變量置爲1。
LockSupport.park(); 中止 System.out.println("======");
樂觀的獨佔鎖(相似ReentrantLock) SimpleExclusiveLock .java
package com.concurrent; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.LockSupport; /** * 簡單樂觀獨佔鎖 */ public class SimpleExclusiveLock { /** * 獨佔鎖標記 true 鎖不可用 false 鎖可用 */ private AtomicBoolean state = new AtomicBoolean(false); List<Thread> queue = new ArrayList<Thread>();//阻塞隊列 public boolean lock() { if (!state.get()&&state.compareAndSet(false, true)) {//取鎖成功不會阻塞,程序會繼續執行 return true; // 利用CAS } else { System.out.println("queue.add and park "+Thread.currentThread()); queue.add(Thread.currentThread());//加入阻塞隊列 LockSupport.park();//阻塞線程 System.out.println("park after "+Thread.currentThread()); return false; } } public boolean unLock() { if (state.get()) { System.out.println("queue.remove and unpark "+Thread.currentThread()); queue.remove(Thread.currentThread());//從隊列裏移除 if (state.compareAndSet(true, false)) {// 利用CAS if(!queue.isEmpty()){ System.out.println("unpark "+queue.get(0).getName()); LockSupport.unpark((Thread) queue.get(0));//喚醒第一個等待線程 System.out.println("unpark after "+queue.get(0).getName()); } return true; } return false; } else { return false; } } }
SimpleExclusiveLockTest .java
使用
package com.concurrent; public class SimpleExclusiveLockTest { public static SimpleExclusiveLock lock = new SimpleExclusiveLock(); // 獨佔鎖 public static volatile int i = 0; // 保證可見性 public class RunnableTask implements Runnable { @Override public void run() { while (true) { try { lock.lock();//加鎖 i += 1; System.out.println("thread name:"+ Thread.currentThread().getName() +" i="+ i); try { Thread.currentThread().sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } finally { lock.unLock();//釋放鎖 } } } } public void runTask() { for (int i = 0; i < 100; i++) { new Thread(new RunnableTask(),"thread"+ i).start(); } } public static void main(String[] args) { SimpleExclusiveLockTest test = new SimpleExclusiveLockTest(); test.runTask(); } }
JUC(Java Util Concurrency)僅用簡單的park, unpark和CAS指令就實現了各類高級同步數據結構,並且效率很高,使人驚歎。
如下我從宏觀角度描述獲取鎖和解鎖流程
鎖的狀態是由AQS.state控制,加鎖和解鎖都會感知和變動此變量,當爲0時表示鎖是空閒,能夠獲取鎖,當大於0時表示得到鎖。 獨佔鎖時大於0表示鎖的重入次數,共享鎖時,state共當前共享線程個數。
acquire流程通過如下步驟:
與公平鎖acquire惟一區別在tryAcquire流程中,不用要求前置節點是head節點,則表示tail能夠直接去搶佔鎖,若是搶佔失敗後面的流程與公平一致。
解鎖流程比較簡單,解鎖節點確定是head,由於head持有鎖