Java.util.concurrent 是在併發編程中比較經常使用的工具類。 java
1.Lock node
Lock是JUC包中最重要的組件,解決synchronized關鍵字在某些場景的短板。 編程
eg.鎖修飾的代碼塊內,調用了同個鎖修飾的代碼塊,鎖對象相同,這時候第一個得到鎖的代碼還沒釋放,後面又有等待獲取鎖的代碼,就造成死鎖狀態 bash
2.Lock併發
實現 Lock本質是一個接口,定義了獲取和釋放鎖的抽象方法。定義了鎖的一個標準規範。如下是主要實現類: app
ReentrantLock,重入鎖,實現了Lock接口。當線程得到鎖後,再次獲取該鎖不須要線程阻塞,只需增長重入次數便可 ide
ReentrantReadWriteLock,讀寫鎖,實現了ReadWriteLock接口。該類維護了兩個鎖,ReadLock和WriteLock,它們分別實現了Lock函數
接口。適合讀多寫少的場景。 工具
原則:讀和讀不互斥、讀和寫互斥、寫和寫互斥 源碼分析
3.Lock繼承關係圖
eg.UML圖
4.重入鎖
設計目的:解決死鎖問題
eg.死鎖示例
public class App {
public synchronized void demo(){ // main得到對象鎖
System.out.println("demo");
demo2();
}
public void demo2(){
synchronized (this) {
System.out.println("demo2");
}
}
public static void main(String[] args) {
App app=new App();
app.demo();
}
}複製代碼
eg.ReentrantLock使用示例
public class AtomicDemo {
private static int count=0;
static Lock lock=new ReentrantLock();
public static void inc(){
lock.lock();
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
count++;
lock.unlock();
}
public static void main(String[] args) throws InterruptedException {
for(int i=0;i<1000;i++){
new Thread(()->{AtomicDemo.inc();}).start();;
}
Thread.sleep(3000);
System.out.println("result:"+count);
}
}複製代碼
eg.ReentrantReadWriteLock使用示例
public class RWLock {
static ReentrantReadWriteLock wrl = new ReentrantReadWriteLock();
static Map<String,Object> cacheMap = new HashMap<>();
static Lock read = wrl.readLock();
static Lock write = wrl.writeLock();
// 線程B/C/D
public static final Object get(String key){
System.out.println("begin read data:" + key);
read.lock(); // 得到讀鎖-> 阻塞
try {
return cacheMap.get(key);
}finally {
read.unlock();
}
}
//線程A
public static final Object put(String key, Object val){
write.lock(); // 得到了寫鎖
try{
return cacheMap.put(key,val);
}finally {
write.unlock();
}
}
public static void main(String[] args) {
wrl.readLock(); // B線程 ->阻塞
wrl.writeLock(); // A線程
// 讀->讀是能夠共享
// 讀->寫 互斥
// 寫->寫 互斥
// 讀多寫少的場景
}
}
複製代碼
1.AQS
在 Lock 中,用到了一個同步隊列 AQS,全稱 AbstractQueuedSynchronizer,它是一個同步工具也是 Lock 用來實現線程同步的核心組件。
2.AQS兩種功能
AQS 的功能分爲兩種:獨佔和共享
獨佔鎖,重入鎖ReentrantLock
共享鎖,讀寫鎖ReentrantReadWriteLock
3.AQS內部實現
AQS隊列內部維護的是一個FIFO雙向鏈表,每一個節點都是雙向的,分別指向直接的後繼節點和前驅節點。
每一個Node由線程封裝,當線程爭搶鎖失敗後會封裝成Node加入AQS隊列;當獲取鎖的線程釋放了,會從隊列中喚醒一個阻塞的節點(線程)。
eg.結構
Node組成
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev; // 前驅節點
volatile Node next; // 後繼節點
volatile Thread thread; // 當前線程
Node nextWaiter; // 存儲在condition隊列中的後繼節點
// 是否爲共享鎖
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
// 將線程構形成一個node,添加到等待隊列
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
// 在condition隊列中使用
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}複製代碼
eg.加入節點
eg.釋放節點
4.源碼分析
eg.時序圖
CAS原理
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}複製代碼
經過 cas 樂觀鎖的方式來作比較並替換。若是當前內存中的 state 的值和預期值 expect 相等,則替換爲 update。更新成功返回 true,不然返回 false。
state 是 AQS 中的一個屬性,它在不一樣的實現中所表達的含義不同,對於重入鎖的實現來講,表示一個同步狀態。它有兩個含義:
1. 當 state=0 時,表示無鎖狀態
2. 當 state>0 時,表示已經有線程得到了鎖,它的大小表示重入次數
Unsafe類
Unsafe類,不在java體系中,在sun.misc包中。
stateoff,表示該字段相對該類在內存中地址的偏移量,經過該偏移量找到對應字段。一個java對象可當作是一段內存,每一個字段都按照必定順序放在這段內存。
Node狀態
Node 有5種狀態,分別是:CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)、默認狀態(0)
CANCELLED:在同步隊列中等待的線程等待超時或被中斷,須要從同步隊列中取消該 Node 的結點, 其結點的 waitStatus 爲 CANCELLED,即結束狀態,進入該狀態後的結點將不會再變化。
SIGNAL:只要前置節點釋放鎖,就會通知標識爲 SIGNAL 狀態的後續節點的線程。
CONDITION: 和 Condition 有關係。
PROPAGATE:共享模式下,PROPAGATE 狀態的線程處於可運行狀態。
默認狀態0:初始狀態。
經過 Node 的狀態來判斷,ThreadA 競爭鎖失敗之後是否應該被掛起。若是ThreadA 的 pred節點狀態爲 SIGNAL,那就表示能夠放心掛起當前線程。
LockSupport類
LockSupport類是Java6引入的一個類,提供了基本的線程同步原語。LockSupport其實是調用了 Unsafe 類裏的函數。
public native void unpark(Object var1);
public native void park(boolean var1, long var2);複製代碼
5.公平鎖和非公平鎖
AQS 是一個同步隊列,它可以實現線程的阻塞以及喚醒,但它並不具有業務功能,因此在不一樣的同步場景中,會繼承 AQS 來實現對應場景的功能。
Sync繼承了AQS,Sync有兩個具體實現類:
NoFairSync,非公平搶佔鎖,無論當前隊列是否有等待線程,都有機會得到鎖。
FairSync,嚴格按照FIFO得到鎖。
關鍵方法:await()和signal()/signalAll()
當AwaitThread線程得到lock鎖後,調用await方法,釋放掉當前使用的鎖,將自身封裝成Node節點,Condition狀態加入等待隊列 SignalThread線程競爭鎖,當調用signal方法時,將Condition等待隊列中的線程轉移到AQS隊列中,當SignalThread調用unlock釋放當前鎖後,AwaitThread搶佔鎖,若還沒釋放,則經過自旋加入AQS隊列。
eg.使用示例
public class ConditionWait implements Runnable{
private Lock lock;
private Condition condition;
public ConditionWait(Lock lock, Condition condition) {
this.lock = lock;
this.condition = condition;
}
@Override
public void run() {
try {
lock.lock(); // 競爭鎖
try {
System.out.println("begin - ConditionWait");
condition.await(); // 阻塞(1. 釋放鎖, 2.阻塞當前線程, FIFO(單向、雙向))
System.out.println("end - ConditionWait");
} catch (InterruptedException e) {
e.printStackTrace();
}
}finally {
lock.unlock(); // 釋放鎖
}
}
}
複製代碼
public class ConditionNotify implements Runnable{
private Lock lock;
private Condition condition;
public ConditionNotify(Lock lock, Condition condition) {
this.lock = lock;
this.condition = condition;
}
@Override
public void run() {
try{
lock.lock(); // 得到了鎖
System.out.println("begin - conditionNotify");
condition.signal(); // 喚醒阻塞狀態的線程
System.out.println("end - conditionNotify");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock(); // 釋放鎖
}
}
}複製代碼
public class App {
public static void main(String[] args) {
Lock lock=new ReentrantLock(); // 重入鎖
Condition condition=lock.newCondition();
lock.newCondition();
new Thread(new ConditionWait(lock, condition)).start(); // 阻塞await
new Thread(new ConditionNotify(lock, condition)).start();
}
} 複製代碼