Java多線程 - AQS詳解

介紹

AQS是java.util.concurrent.locks下類AbstractQueuedSynchronizer的簡稱,是用於 經過Java源碼來構建多線程的鎖和同步器的一系列框架,用於Java多線程之間的同步,它的類及類結構圖以下:java

原理

在AQS類中維護了一個使用雙向鏈表Node實現的FIFO隊列,用於保存等待的線程,同時利用一個int類型的state來表示狀態,使用時經過繼承AQS類並實現它的acquire和release方法來操做狀態,來實現線程的同步。數據庫

以ReentrantLock爲例,state初始化爲0,表示未鎖定狀態。A線程lock()時,會調用tryAcquire()獨佔該鎖並將state+1。此後,其餘線程再tryAcquire()時就會失敗,直到A線程unlock()到state=0(即釋放鎖)爲止,其它線程纔有機會獲取該鎖。固然,釋放鎖以前,A線程本身是能夠重複獲取此鎖的(state會累加),這就是可重入的概念。但要注意,獲取多少次就要釋放多麼次,這樣才能保證state是能回到零態的。安全

再以CountDownLatch以例,任務分爲N個子線程去執行,state也初始化爲N(注意N要與線程個數一致)。這N個子線程是並行執行的,每一個子線程執行完後countDown()一次,state會CAS(Compare and Swap)減1。等到全部子線程都執行完後(即state=0),會unpark()主調用線程,而後主調用線程就會從await()函數返回,繼續後餘動做。多線程

不一樣組件的使用

CountDownLatch

主要用於等待線程等待其餘線程執行後再執行,其實現是經過控制計數器是否遞減到0來判別,其餘的每個線程執行完畢後,調用countDown()方法讓計數器減一,等待線程調用await()方法,直到計數器爲1在執行。併發

demo 主線程等待200個線程執行完畢後再執行:框架

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/** * \* Created with IntelliJ IDEA. * \* @author: guohezuzi * \* Date: 2019-06-08 * \* Time: 下午4:14 * \* Description: ContDownLatch用法:經過引入CountDownLatch計數器,來等待其餘線程執行完畢 * \ */
@Slf4j
public class CountDownLatchExample {
    private static int threadCount = 200;

    public static void test(int threadNum) throws InterruptedException {
        Thread.sleep(100);
        log.info("{}",threadNum);
        Thread.sleep(100);
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService pool= Executors.newCachedThreadPool();

        final CountDownLatch countDownLatch=new CountDownLatch(200);
        for (int i = 0; i < threadCount; i++) {
            final int threadNum=i;
            pool.execute(()->{
                try {
                    Thread.sleep(1);
                    test(threadNum);
                }catch (Exception e){
                    log.error("exception",e);
                }finally {
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await();
        log.info("finish");
        pool.shutdown();
    }
}
複製代碼

CyclicBarrier

用於等待多個線程都準備好再進行,每個線程準備好後,計數器加1,加到指定值後所有開始ide

demo 一個20個線程每等待5個線程進行一次函數

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.*;

/** * \* Created with IntelliJ IDEA. * \* @author: guohezuzi * \* Date: 2019-06-08 * \* Time: 下午5:20 * \* Description: * 用於等待多個線程都準備好 * 每個線程準備好後 計數器加1 加到指定值後所有開始 * \ */
public class CyclicBarrierExample {
    private static final Logger logger = LoggerFactory.getLogger(CountDownLatchExample.class);
    private static CyclicBarrier cyclicBarrier=new CyclicBarrier(5);

    public static void race(int threadNum) throws InterruptedException{
        Thread.sleep(1000);
        logger.info("{} is ready",threadNum);
        try {
            //等待指定數量的其餘線程執行 無參一直等待不拋異常 有參數表示等待指定時間若數量還未等到拋出異常
            cyclicBarrier.await(2000, TimeUnit.MILLISECONDS);
        } catch (BrokenBarrierException | TimeoutException e) {
            logger.error("exception",e);
        }
        logger.info("{} is continue");
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService= Executors.newCachedThreadPool();
        for (int i = 0; i < 20; i++) {
            Thread.sleep(1000);
            final int threadNum=i;
            executorService.execute(() -> {
                try {
                    race(threadNum);
                } catch (InterruptedException e) {
                    logger.error("exception",e);
                }
            });
        }
        executorService.shutdown();
    }

}
複製代碼

Semaphore

英譯信號量,用於控制某個資源同時可被訪問的個數,如控制數據庫資源能夠同時併發數量爲20學習

demo:ui

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.*;

/** * \* Created with IntelliJ IDEA. * \* @author: guohezuzi * \* Date: 2019-06-08 * \* Time: 下午3:39 * \* Description: 信號量學習例子 控制某個資源同時可被訪問的個數 如控制數據庫資源能夠同時併發數量爲20 * \ */
public class SemaphoreExample {
    private static final Logger logger = LoggerFactory.getLogger(CountDownLatchExample.class);
    private static int threadCount = 200;

    public static void test(int threadNum) throws InterruptedException {
        Thread.sleep(100);
        logger.info("{}",threadNum);
        Thread.sleep(1000);
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService pool= Executors.newCachedThreadPool();
        //定義容許併發的信號量m
        final Semaphore semaphore=new Semaphore(20);
        for (int i = 0; i < threadCount; i++) {
            final int threadNum=i;
            //該線程的最大併發數爲m/n
            pool.execute(()->{
                try {
                    //獲取n個信號量 無參爲一個
                    semaphore.acquire(4);
                    test(threadNum);
                    //釋放n個信號量 無參爲一個
                    semaphore.release(4);
                }catch (Exception e){
                    logger.error("exception",e);
                }
            });
        }
        pool.shutdown();
    }
}
複製代碼

ReentrantReadWriteLock

讀寫鎖,用於須要同步資源時在先後加鎖/解鎖,當一個線程獲取讀鎖後其餘線程能夠繼續獲取讀鎖,當一個線程獲取寫鎖後其餘線程都需等待,所以,可能形成寫鎖飢餓,就是寫鎖一直沒法獲取。

demo: 一個基於aqs鎖實現的部分線程安全的map

import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/** * \* Created with IntelliJ IDEA. * \* @author: guohezuzi * \* Date: 2019-06-08 * \* Time: 下午11:58 * \* Description: 讀寫鎖 當一個線程獲取讀鎖後其餘線程能夠繼續獲取讀鎖 當一個線程獲取寫鎖後其餘線程都需等待 * \ */
public class ReentrantReadWriteLockExample {
    final Map map = new TreeMap<>();

    private final static ReentrantLock lock = new ReentrantLock();

    private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();

    private final Lock readLock = readWriteLock.readLock();

    private final Lock writeLock = readWriteLock.writeLock();

    public Data get(String key) {
        readLock.lock();
        try {
            return map.get(key);
        } finally {
            readLock.unlock();
        }
    }

    public Set getAllkeys() {
        readLock.lock();
        try {
            return map.keySet();
        } finally {
            readLock.unlock();
        }
    }

    public Data put(String key, Data vlaue) {
        writeLock.lock();
        try {
            return map.put(key, vlaue);
        } finally {
            writeLock.unlock();
        }
    }

    class Data {

    }
}
複製代碼

StampLock

相似讀寫鎖的功能和使用方法,不過有如下兩點不一樣

  1. 每次獲取鎖會獲得一個long類型的stamp所爲返回值,解鎖是須要將其回傳。

  2. 有樂觀讀操做,適合於讀多寫少狀況,指當資源被讀鎖鎖定時,會根據資源是否被變動,進行讀取操做,而不是不容許讀操做。

demo:

import java.util.concurrent.locks.StampedLock;

/** * \* Created with IntelliJ IDEA. * \* @author: guohezuzi * \* Date: 2019-06-09 * \* Time: 下午1:08 * \* Description: * 使用是每次獲取鎖會獲得一個long類型的stamp所爲返回值,解鎖是須要將其回傳 * 該類有 寫 讀 樂觀讀:指當資源被讀鎖鎖定時,會根據資源是否被變動,進行讀取操做 */
public class StampLockExample {
    private int count = 0;
    private final StampedLock lock = new StampedLock();

    class AddHundredNum extends Thread {
        @Override
        public void run() {
// synchronized (addHundredNum.class) {
            long stamp = lock.writeLock();
            try {
                for (int i = 0; i < 1000; i++) {
                    count++;
                }
            } finally {
                lock.unlock(stamp);
            }
// }
        }
    }

    public void test() throws InterruptedException {
        StampLockExample.AddHundredNum[] addHundredNums = new StampLockExample.AddHundredNum[100];
        for (int i = 0; i < addHundredNums.length; i++) {
            addHundredNums[i] = new StampLockExample.AddHundredNum();
        }

        for (StampLockExample.AddHundredNum addHundredNum : addHundredNums) {
            addHundredNum.start();
        }

        for (StampLockExample.AddHundredNum addHundredNum : addHundredNums) {
            addHundredNum.join();
        }
    }

    public static void main(String[] args) throws Exception {
        StampLockExample example = new StampLockExample();
        example.test();
        System.out.println(example.count);
    }
}
複製代碼

Condition

配合AQS鎖實現的線程中斷/等待機制,將等待的線程移入condition維護的隊列,並經過condition控制中斷/等待。

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/** * \* Created with IntelliJ IDEA. * \* @author: guohezuzi * \* Date: 2019-06-09 * \* Time: 下午1:26 * \* Description: * \ */
@Slf4j
public class ConditionExample {
    public static void main(String[] args){
        final ReentrantLock lock=new ReentrantLock();
        Condition condition=lock.newCondition();
        new Thread(()->{
            lock.lock();
            log.info("wait signal");
            try {
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("get signal");
            lock.unlock();
        }).start();

        new Thread(() -> {
            lock.lock();
            log.info("get lock");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            condition.signalAll();
            log.info("send signal ~");
            lock.unlock();
        }).start();
    }
}
複製代碼
相關文章
相關標籤/搜索