AQS是java.util.concurrent.locks下類AbstractQueuedSynchronizer的簡稱,是用於 經過Java源碼來構建多線程的鎖和同步器的一系列框架,用於Java多線程之間的同步,它的類及類結構圖以下:java
在AQS類中維護了一個使用雙向鏈表Node實現的FIFO隊列,用於保存等待的線程,同時利用一個int類型的state來表示狀態,使用時經過繼承AQS類並實現它的acquire和release方法來操做狀態,來實現線程的同步。數據庫
以ReentrantLock爲例,state初始化爲0,表示未鎖定狀態。A線程lock()時,會調用tryAcquire()獨佔該鎖並將state+1。此後,其餘線程再tryAcquire()時就會失敗,直到A線程unlock()到state=0(即釋放鎖)爲止,其它線程纔有機會獲取該鎖。固然,釋放鎖以前,A線程本身是能夠重複獲取此鎖的(state會累加),這就是可重入的概念。但要注意,獲取多少次就要釋放多麼次,這樣才能保證state是能回到零態的。安全
再以CountDownLatch以例,任務分爲N個子線程去執行,state也初始化爲N(注意N要與線程個數一致)。這N個子線程是並行執行的,每一個子線程執行完後countDown()一次,state會CAS(Compare and Swap)減1。等到全部子線程都執行完後(即state=0),會unpark()主調用線程,而後主調用線程就會從await()函數返回,繼續後餘動做。多線程
主要用於等待線程等待其餘線程執行後再執行,其實現是經過控制計數器是否遞減到0來判別,其餘的每個線程執行完畢後,調用countDown()方法讓計數器減一,等待線程調用await()方法,直到計數器爲1在執行。併發
demo 主線程等待200個線程執行完畢後再執行:框架
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/** * \* Created with IntelliJ IDEA. * \* @author: guohezuzi * \* Date: 2019-06-08 * \* Time: 下午4:14 * \* Description: ContDownLatch用法:經過引入CountDownLatch計數器,來等待其餘線程執行完畢 * \ */
@Slf4j
public class CountDownLatchExample {
private static int threadCount = 200;
public static void test(int threadNum) throws InterruptedException {
Thread.sleep(100);
log.info("{}",threadNum);
Thread.sleep(100);
}
public static void main(String[] args) throws InterruptedException {
ExecutorService pool= Executors.newCachedThreadPool();
final CountDownLatch countDownLatch=new CountDownLatch(200);
for (int i = 0; i < threadCount; i++) {
final int threadNum=i;
pool.execute(()->{
try {
Thread.sleep(1);
test(threadNum);
}catch (Exception e){
log.error("exception",e);
}finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
log.info("finish");
pool.shutdown();
}
}
複製代碼
用於等待多個線程都準備好再進行,每個線程準備好後,計數器加1,加到指定值後所有開始ide
demo 一個20個線程每等待5個線程進行一次函數
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.*;
/** * \* Created with IntelliJ IDEA. * \* @author: guohezuzi * \* Date: 2019-06-08 * \* Time: 下午5:20 * \* Description: * 用於等待多個線程都準備好 * 每個線程準備好後 計數器加1 加到指定值後所有開始 * \ */
public class CyclicBarrierExample {
private static final Logger logger = LoggerFactory.getLogger(CountDownLatchExample.class);
private static CyclicBarrier cyclicBarrier=new CyclicBarrier(5);
public static void race(int threadNum) throws InterruptedException{
Thread.sleep(1000);
logger.info("{} is ready",threadNum);
try {
//等待指定數量的其餘線程執行 無參一直等待不拋異常 有參數表示等待指定時間若數量還未等到拋出異常
cyclicBarrier.await(2000, TimeUnit.MILLISECONDS);
} catch (BrokenBarrierException | TimeoutException e) {
logger.error("exception",e);
}
logger.info("{} is continue");
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService= Executors.newCachedThreadPool();
for (int i = 0; i < 20; i++) {
Thread.sleep(1000);
final int threadNum=i;
executorService.execute(() -> {
try {
race(threadNum);
} catch (InterruptedException e) {
logger.error("exception",e);
}
});
}
executorService.shutdown();
}
}
複製代碼
英譯信號量,用於控制某個資源同時可被訪問的個數,如控制數據庫資源能夠同時併發數量爲20學習
demo:ui
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.*;
/** * \* Created with IntelliJ IDEA. * \* @author: guohezuzi * \* Date: 2019-06-08 * \* Time: 下午3:39 * \* Description: 信號量學習例子 控制某個資源同時可被訪問的個數 如控制數據庫資源能夠同時併發數量爲20 * \ */
public class SemaphoreExample {
private static final Logger logger = LoggerFactory.getLogger(CountDownLatchExample.class);
private static int threadCount = 200;
public static void test(int threadNum) throws InterruptedException {
Thread.sleep(100);
logger.info("{}",threadNum);
Thread.sleep(1000);
}
public static void main(String[] args) throws InterruptedException {
ExecutorService pool= Executors.newCachedThreadPool();
//定義容許併發的信號量m
final Semaphore semaphore=new Semaphore(20);
for (int i = 0; i < threadCount; i++) {
final int threadNum=i;
//該線程的最大併發數爲m/n
pool.execute(()->{
try {
//獲取n個信號量 無參爲一個
semaphore.acquire(4);
test(threadNum);
//釋放n個信號量 無參爲一個
semaphore.release(4);
}catch (Exception e){
logger.error("exception",e);
}
});
}
pool.shutdown();
}
}
複製代碼
讀寫鎖,用於須要同步資源時在先後加鎖/解鎖,當一個線程獲取讀鎖後其餘線程能夠繼續獲取讀鎖,當一個線程獲取寫鎖後其餘線程都需等待,所以,可能形成寫鎖飢餓,就是寫鎖一直沒法獲取。
demo: 一個基於aqs鎖實現的部分線程安全的map
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/** * \* Created with IntelliJ IDEA. * \* @author: guohezuzi * \* Date: 2019-06-08 * \* Time: 下午11:58 * \* Description: 讀寫鎖 當一個線程獲取讀鎖後其餘線程能夠繼續獲取讀鎖 當一個線程獲取寫鎖後其餘線程都需等待 * \ */
public class ReentrantReadWriteLockExample {
final Map map = new TreeMap<>();
private final static ReentrantLock lock = new ReentrantLock();
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Lock readLock = readWriteLock.readLock();
private final Lock writeLock = readWriteLock.writeLock();
public Data get(String key) {
readLock.lock();
try {
return map.get(key);
} finally {
readLock.unlock();
}
}
public Set getAllkeys() {
readLock.lock();
try {
return map.keySet();
} finally {
readLock.unlock();
}
}
public Data put(String key, Data vlaue) {
writeLock.lock();
try {
return map.put(key, vlaue);
} finally {
writeLock.unlock();
}
}
class Data {
}
}
複製代碼
相似讀寫鎖的功能和使用方法,不過有如下兩點不一樣
每次獲取鎖會獲得一個long類型的stamp所爲返回值,解鎖是須要將其回傳。
有樂觀讀操做,適合於讀多寫少狀況,指當資源被讀鎖鎖定時,會根據資源是否被變動,進行讀取操做,而不是不容許讀操做。
demo:
import java.util.concurrent.locks.StampedLock;
/** * \* Created with IntelliJ IDEA. * \* @author: guohezuzi * \* Date: 2019-06-09 * \* Time: 下午1:08 * \* Description: * 使用是每次獲取鎖會獲得一個long類型的stamp所爲返回值,解鎖是須要將其回傳 * 該類有 寫 讀 樂觀讀:指當資源被讀鎖鎖定時,會根據資源是否被變動,進行讀取操做 */
public class StampLockExample {
private int count = 0;
private final StampedLock lock = new StampedLock();
class AddHundredNum extends Thread {
@Override
public void run() {
// synchronized (addHundredNum.class) {
long stamp = lock.writeLock();
try {
for (int i = 0; i < 1000; i++) {
count++;
}
} finally {
lock.unlock(stamp);
}
// }
}
}
public void test() throws InterruptedException {
StampLockExample.AddHundredNum[] addHundredNums = new StampLockExample.AddHundredNum[100];
for (int i = 0; i < addHundredNums.length; i++) {
addHundredNums[i] = new StampLockExample.AddHundredNum();
}
for (StampLockExample.AddHundredNum addHundredNum : addHundredNums) {
addHundredNum.start();
}
for (StampLockExample.AddHundredNum addHundredNum : addHundredNums) {
addHundredNum.join();
}
}
public static void main(String[] args) throws Exception {
StampLockExample example = new StampLockExample();
example.test();
System.out.println(example.count);
}
}
複製代碼
配合AQS鎖實現的線程中斷/等待機制,將等待的線程移入condition維護的隊列,並經過condition控制中斷/等待。
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/** * \* Created with IntelliJ IDEA. * \* @author: guohezuzi * \* Date: 2019-06-09 * \* Time: 下午1:26 * \* Description: * \ */
@Slf4j
public class ConditionExample {
public static void main(String[] args){
final ReentrantLock lock=new ReentrantLock();
Condition condition=lock.newCondition();
new Thread(()->{
lock.lock();
log.info("wait signal");
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("get signal");
lock.unlock();
}).start();
new Thread(() -> {
lock.lock();
log.info("get lock");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
condition.signalAll();
log.info("send signal ~");
lock.unlock();
}).start();
}
}
複製代碼