java大牛帶你一次清楚,線程併發包J.U.C和AQS的原理

1、J.U.C簡介 

Java.util.concurrent 是在併發編程中比較經常使用的工具類。  java

1.Lock node

Lock是JUC包中最重要的組件,解決synchronized關鍵字在某些場景的短板。  編程

eg.鎖修飾的代碼塊內,調用了同個鎖修飾的代碼塊,鎖對象相同,這時候第一個得到鎖的代碼還沒釋放,後面又有等待獲取鎖的代碼,就造成死鎖狀態  bash

2.Lock併發

實現 Lock本質是一個接口,定義了獲取和釋放鎖的抽象方法。定義了鎖的一個標準規範。如下是主要實現類: app

ReentrantLock,重入鎖,實現了Lock接口。當線程得到鎖後,再次獲取該鎖不須要線程阻塞,只需增長重入次數便可 ide

ReentrantReadWriteLock,讀寫鎖,實現了ReadWriteLock接口。該類維護了兩個鎖,ReadLock和WriteLock,它們分別實現了Lock函數

接口。適合讀多寫少的場景。 工具

原則:讀和讀不互斥、讀和寫互斥、寫和寫互斥 源碼分析

3.Lock繼承關係圖 

eg.UML圖 

4.重入鎖 

設計目的:解決死鎖問題 

eg.死鎖示例 

public class App {
 
    public synchronized void demo(){ // main得到對象鎖
        System.out.println("demo");
        demo2();
    }
    public void demo2(){
        synchronized (this) {
            System.out.println("demo2");
        }
    }
 
    public static void main(String[] args) {
        App app=new App();
        app.demo();
    }
}複製代碼

eg.ReentrantLock使用示例 

public class AtomicDemo {
    private static int count=0;
    static Lock lock=new ReentrantLock();
    public static void inc(){
        lock.lock();
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        count++;
        lock.unlock();
    }
    public static void main(String[] args) throws InterruptedException {
        for(int i=0;i<1000;i++){
            new Thread(()->{AtomicDemo.inc();}).start();;
        }
        Thread.sleep(3000);
        System.out.println("result:"+count);
    }
}複製代碼

eg.ReentrantReadWriteLock使用示例 

public class RWLock {
 
    static ReentrantReadWriteLock wrl = new ReentrantReadWriteLock();
    static Map<String,Object> cacheMap = new HashMap<>();
    static Lock read = wrl.readLock();
    static Lock write = wrl.writeLock();
 
    // 線程B/C/D
    public static final Object get(String key){
        System.out.println("begin read data:" + key);
        read.lock(); // 得到讀鎖-> 阻塞
        try {
            return cacheMap.get(key);
        }finally {
            read.unlock();
        }
    }
    //線程A
    public static final Object put(String key, Object val){
        write.lock(); // 得到了寫鎖
        try{
            return cacheMap.put(key,val);
        }finally {
            write.unlock();
        }
    }
 
    public static void main(String[] args) {
        wrl.readLock(); // B線程 ->阻塞
 
        wrl.writeLock(); // A線程
 
        // 讀->讀是能夠共享
        // 讀->寫 互斥
        // 寫->寫 互斥
        // 讀多寫少的場景
    }
 
}
複製代碼

2、ReentrantLock實現原理 

1.AQS 

在 Lock 中,用到了一個同步隊列 AQS,全稱 AbstractQueuedSynchronizer,它是一個同步工具也是 Lock 用來實現線程同步的核心組件。 

2.AQS兩種功能 

AQS 的功能分爲兩種:獨佔和共享 

獨佔鎖,重入鎖ReentrantLock 

共享鎖,讀寫鎖ReentrantReadWriteLock 

3.AQS內部實現 

AQS隊列內部維護的是一個FIFO雙向鏈表,每一個節點都是雙向的,分別指向直接的後繼節點和前驅節點。 

每一個Node由線程封裝,當線程爭搶鎖失敗後會封裝成Node加入AQS隊列;當獲取鎖的線程釋放了,會從隊列中喚醒一個阻塞的節點(線程)。 

eg.結構 


Node組成 

static final class Node {
	static final Node SHARED = new Node();
	static final Node EXCLUSIVE = null;
	static final int CANCELLED =  1;
	static final int SIGNAL    = -1;
	static final int CONDITION = -2;
	static final int PROPAGATE = -3;
	volatile int waitStatus;
	volatile Node prev; // 前驅節點
	volatile Node next; // 後繼節點
	volatile Thread thread; // 當前線程
	Node nextWaiter; // 存儲在condition隊列中的後繼節點
 
	// 是否爲共享鎖
	final boolean isShared() {
		return nextWaiter == SHARED;
	}
 
	final Node predecessor() throws NullPointerException {
		Node p = prev;
		if (p == null)
			throw new NullPointerException();
		else
			return p;
	}
 
	Node() {    // Used to establish initial head or SHARED marker
	}
 
	// 將線程構形成一個node,添加到等待隊列
	Node(Thread thread, Node mode) {     // Used by addWaiter
		this.nextWaiter = mode;
		this.thread = thread;
	}
 
	// 在condition隊列中使用
	Node(Thread thread, int waitStatus) { // Used by Condition
		this.waitStatus = waitStatus;
		this.thread = thread;
	}
}複製代碼

eg.加入節點 ​ 

eg.釋放節點  ​

4.源碼分析 

eg.時序圖


 CAS原理

protected final boolean compareAndSetState(int expect, int update) {
	// See below for intrinsics setup to support this
	return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}複製代碼

經過 cas 樂觀鎖的方式來作比較並替換。若是當前內存中的 state 的值和預期值 expect 相等,則替換爲 update。更新成功返回 true,不然返回 false。

state 是 AQS 中的一個屬性,它在不一樣的實現中所表達的含義不同,對於重入鎖的實現來講,表示一個同步狀態。它有兩個含義:

1. 當 state=0 時,表示無鎖狀態

2. 當 state>0 時,表示已經有線程得到了鎖,它的大小表示重入次數

Unsafe類

Unsafe類,不在java體系中,在sun.misc包中。

stateoff,表示該字段相對該類在內存中地址的偏移量,經過該偏移量找到對應字段。一個java對象可當作是一段內存,每一個字段都按照必定順序放在這段內存。

Node狀態

Node 有5種狀態,分別是:CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)、默認狀態(0)

CANCELLED:在同步隊列中等待的線程等待超時或被中斷,須要從同步隊列中取消該 Node 的結點, 其結點的 waitStatus 爲 CANCELLED,即結束狀態,進入該狀態後的結點將不會再變化。

SIGNAL:只要前置節點釋放鎖,就會通知標識爲 SIGNAL 狀態的後續節點的線程。

CONDITION: 和 Condition 有關係。

PROPAGATE:共享模式下,PROPAGATE 狀態的線程處於可運行狀態。

默認狀態0:初始狀態。

經過 Node 的狀態來判斷,ThreadA 競爭鎖失敗之後是否應該被掛起。若是ThreadA 的 pred節點狀態爲 SIGNAL,那就表示能夠放心掛起當前線程。

LockSupport類

LockSupport類是Java6引入的一個類,提供了基本的線程同步原語。LockSupport其實是調用了 Unsafe 類裏的函數。

public native void unpark(Object var1);
public native void park(boolean var1, long var2);複製代碼

5.公平鎖和非公平鎖

AQS 是一個同步隊列,它可以實現線程的阻塞以及喚醒,但它並不具有業務功能,因此在不一樣的同步場景中,會繼承 AQS 來實現對應場景的功能。

Sync繼承了AQS,Sync有兩個具體實現類:

NoFairSync,非公平搶佔鎖,無論當前隊列是否有等待線程,都有機會得到鎖。

FairSync,嚴格按照FIFO得到鎖。

3、Condition類

關鍵方法:await()和signal()/signalAll()

當AwaitThread線程得到lock鎖後,調用await方法,釋放掉當前使用的鎖,將自身封裝成Node節點,Condition狀態加入等待隊列 SignalThread線程競爭鎖,當調用signal方法時,將Condition等待隊列中的線程轉移到AQS隊列中,當SignalThread調用unlock釋放當前鎖後,AwaitThread搶佔鎖,若還沒釋放,則經過自旋加入AQS隊列。

eg.使用示例

public class ConditionWait implements Runnable{
 
    private Lock lock;
    private Condition condition;
 
    public ConditionWait(Lock lock, Condition condition) {
        this.lock = lock;
        this.condition = condition;
    }
    @Override
    public void run() {
        try {
            lock.lock(); // 競爭鎖
            try {
                System.out.println("begin - ConditionWait");
                condition.await(); // 阻塞(1. 釋放鎖, 2.阻塞當前線程, FIFO(單向、雙向))
                System.out.println("end - ConditionWait");
 
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }finally {
            lock.unlock(); // 釋放鎖
        }
 
    }
 
}
複製代碼


public class ConditionNotify implements Runnable{
 
    private Lock lock;
    private Condition condition;
 
    public ConditionNotify(Lock lock, Condition condition) {
        this.lock = lock;
        this.condition = condition;
    }
 
    @Override
    public void run() {
        try{
            lock.lock(); // 得到了鎖
            System.out.println("begin - conditionNotify");
            condition.signal(); // 喚醒阻塞狀態的線程
            System.out.println("end - conditionNotify");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock(); // 釋放鎖
        }
    }
}複製代碼

 

public class App {
    public static void main(String[] args) {
        Lock lock=new ReentrantLock(); // 重入鎖
        Condition condition=lock.newCondition();
        lock.newCondition();
        new Thread(new ConditionWait(lock, condition)).start(); // 阻塞await
        new Thread(new ConditionNotify(lock, condition)).start();
    }
} 複製代碼
相關文章
相關標籤/搜索