Java深刻學習併發原理總結

Java 併發多線程基礎總結java

線程池

線程池的簡介

線程池就是首先建立一些線程,它們的集合稱爲線程池。使用線程池能夠很好地提升性能,線程池在系統啓動時即建立大量空閒的線程,程序將一個任務傳給線程池,線程池就會啓動一條線程來執行這個任務,執行結束之後,該線程並不會死亡,而是再次返回線程池中成爲空閒狀態,等待執行下一個任務。node

爲何要使用線程池面試

若是不使用線程池,每個任務都會新開一個線程處理。算法

爲了減小建立和銷燬線程的次數,讓每一個線程能夠屢次使用,可根據系統狀況調整執行的線程數量,防止消耗過多內存,因此咱們能夠使用線程池。數據庫

線程池的好處編程

  • 加快響應速度
  • 合理利用 CPU 和內存
  • 統一管理

線程池的工做機制segmentfault

  1. 線程池剛建立的時候沒有任何線程,當來了新的請求的時候纔會建立核心線程去處理對應的請求。
  2. 當處理完成以後,核心線程並不會回收。
  3. 在覈心線程達到指定的數量以前,每個請求都會在線程池中建立一個新的核心線程。
  4. 當核心線程全都被佔用的時候,新來的請求會放入工做隊列中。工做隊列本質上是一個阻塞隊列
  5. 當工做隊列被佔滿,再來的新請求會交給臨時線程來處理。
  6. 臨時線程在使用完成以後會繼續存活一段時間,直到沒有請求處理纔會被銷燬。

線程池參數詳解

線程池構造函數的參數數組

參數名 類型 含義
corePoolSize int 核心線程數
maxPoolSize int 最大線程數
keepAliveTime long 保持存活時間
workQueue BlockingQueue 任務存儲隊列
threadFactory ThreadFactory 當線程池須要新的線程時,使用 ThreadFactory 來建立新的線程
Handler RejectedExecutionHandler 因爲線程池沒法接受所提交的任務所給出的拒絕策略
  • corePoolSize:指的是核心線程數,線程池初始化完成後,默認狀況下,線程池並無任何線程,線程池會等待任務到來時,再建立新的線程去執行任務。
  • maxPoolSize:線程池有可能會在覈心線程數上,額外增長一些線程,可是這些新增長的線程有一個上限,最大不能超過 maxPoolSize。緩存

    • 若是線程數小於 corePoolSize,即便其餘工做線程處於空閒狀態,也會建立一個新的線程來運行任務。
    • 若是線程數大於等於 corePoolSize 但少於 maxPoolSize,則將任務放進工做隊列中。
    • 若是隊列已滿,而且線程數小於 maxPoolSize,則建立一個新線程來運行任務。
    • 若是隊列已滿,而且線程數已經大於等於 maxPoolSize,則使用拒絕策略來拒絕該任務。
  • keepAliveTime:一個線程若是處於空閒狀態,而且當前的線程數量大於 corePoolSize,那麼在指定時間後,這個空閒線程會被銷燬,這裏的指定時間由 keepAliveTime 來設定。
  • workQueue:新任務被提交後,會先進入到此工做隊列中,任務調度時再從隊列中取出任務。jdk 中提供了四種工做隊列:安全

    • ArrayBlockingQueue:基於數組的有界阻塞隊列,按 FIFO 排序。新任務進來後,會放到該隊列的隊尾,有界的數組能夠防止資源耗盡問題。當線程池中線程數量達到 corePoolSize 後,再有新任務進來,則會將任務放入該隊列的隊尾,等待被調度。若是隊列已是滿的,則建立一個新線程,若是線程數量已經達到 maxPoolSize,則會執行拒絕策略。
    • LinkedBlockingQueue:基於鏈表的無界阻塞隊列(其實最大容量爲 Interger.MAX),按照 FIFO 排序。因爲該隊列的近似無界性,當線程池中線程數量達到 corePoolSize 後,再有新任務進來,會一直存入該隊列,而不會去建立新線程直到 maxPoolSize,所以使用該工做隊列時,參數 maxPoolSize 實際上是不起做用的。
    • SynchronousQueue:一個不緩存任務的阻塞隊列,生產者放入一個任務必須等到消費者取出這個任務。也就是說新任務進來時,不會緩存,而是直接被調度執行該任務,若是沒有可用線程,則建立新線程,若是線程數量達到 maxPoolSize,則執行拒絕策略。
    • PriorityBlockingQueue:具備優先級的無界阻塞隊列,優先級經過參數 Comparator 實現。
    • delayQueue:具備優先級的延時無界阻塞隊列
    • LinkedTransferQueue:基於鏈表的無界阻塞隊列
    • LinkedBlockingDeque:基於鏈表的雙端阻塞隊列
  • threadFactory:建立一個新線程時使用的工廠,能夠用來設定線程名、是否爲 daemon 線程等等
  • handler:當工做隊列中的任務已到達最大限制,而且線程池中的線程數量也達到最大限制,這時若是有新任務提交進來,就會執行拒絕策略。

添加線程的流程

線程池用法演示

  • newFixedThreadPool:固定大小線程池
public class ThreadPoolTest implements Runnable {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5);//核心線程數
        for (int i = 0; i < 1000; i++) {
            executorService.execute(new ThreadPoolTest());
        }
        executorService.shutdown();
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName());
    }
}

運行結果:

pool-1-thread-1
pool-1-thread-3
pool-1-thread-4
pool-1-thread-2
pool-1-thread-5
pool-1-thread-5
...

咱們能夠看到,打印出來的最多的線程也就是五個。

咱們看一下源碼:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
  • 第一個參數:corePoolSize,核心線程數:5
  • 第二個參數:maxPoolSize,最大線程數:5
  • 第三個參數:keepAliveTime,最大存活時間:0
  • 第四個參數:存活時間單位,單位毫秒
  • 第五個參數:workQueue,阻塞隊列使用的是 LinkedBlockingQueue,也就是無界隊列

最後 new ThreadPoolExecutor(),咱們看下這個方法:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
}

將咱們的參數傳遞過去後,線程工廠使用的是默認的線程工廠,和默認的拒絕策略處理器。

因爲咱們使用的是無界阻塞隊列,因此至關於 maxPoolSize 沒有用處。若是任務特別多,核心線程處理不過來的話,就會一直將任務放入到 LinkedBlockingQuene 中,可能會致使 OOM。

演示 OOM:

//-Xms5m -Xmx5m
public class ThreadPoolTest implements Runnable {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            executorService.execute(new ThreadPoolTest());
        }
        executorService.shutdown();
    }

    @Override
    public void run() {
        try {
            TimeUnit.HOURS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

運行結果:

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
    at com.thread.ThreadPoolTest.main(ThreadPoolTest.java:13)
  • newFixedThreadPool:單個核心線程的線程池
public class ThreadPoolTest implements Runnable {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 5; i++) {
            executorService.execute(new ThreadPoolTest());
        }
        executorService.shutdown();
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName());
    }
}

運行結果:

pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1

咱們看下源碼:

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

newSingleThreadExecutor 其實和 newFixedThreadPool 差距不大,只是將核心線程數和最大線程數都設置爲了 1,一樣也是使用的 LinkedBlockingQueue,也可能會致使 OOM。

  • newCachedThreadPool:可緩存的線程池
public class ThreadPoolTest implements Runnable {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 100; i++) {
            executorService.execute(new ThreadPoolTest());
        }
        executorService.shutdown();
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName());
    }
}

運行結果:

pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
pool-1-thread-5
pool-1-thread-5
pool-1-thread-3
pool-1-thread-9
pool-1-thread-6
pool-1-thread-10
pool-1-thread-6
pool-1-thread-11
pool-1-thread-10
pool-1-thread-12
pool-1-thread-12
pool-1-thread-10
pool-1-thread-15
pool-1-thread-13

咱們看下源碼:

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

能夠看出來,它核心線程數爲 0,最大線程數量爲 int 的最大值,存活時間爲 60 秒,使用的是 SynchronousQueue,也就是不存儲任務的阻塞隊列。

SynchronousQueue 的確不會致使 OOM,可是!咱們的線程池能夠存放 2147483647 個線程。在內存不夠的狀況下依然會報出 OOM!

  • newFixedThreadPool:支持定時及週期性任務執行的線程池
public class ThreadPoolTest implements Runnable {

    public static void main(String[] args) {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        //executorService.schedule(new ThreadPoolTest(),5, TimeUnit.SECONDS); //延時運行
        executorService.scheduleAtFixedRate(new ThreadPoolTest(),1,3,TimeUnit.SECONDS);//重複運行
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName());
    }
}

schedule()方法參數:任務,多久後運行、時間單位

scheduleAtFixedRate()方法參數:任務、第一次執行時間:一、每隔多久運行一次:三、時間單位

咱們看一下源碼:

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

咱們看到 DelayedWordkQueue 繼承了 AbstractCollection 接口,實現了 BlockingQueue,因此和 ArrayBlockingQueue 以及 LinkedBlockingQueue 是兄弟關係。
DelayedWorkQueue 定義了一個 DelayQueue,因此 DelayedWorkQueue 的實現是依賴 DelayQueue 的。

DelayQueue:Delayed 元素的一個無界阻塞隊列,只有在延遲期滿時才能從中提取元素。該隊列的頭部是延遲期滿後保存時間最長的 Delayed 元素。若是延遲都尚未期滿,則隊列沒有頭部,而且 poll 將返回 null。當一個元素的 getDelay(TimeUnit.NANOSECONDS)方法返回一個小於等於 0 的值時,將發生到期。即便沒法使用 take 或 poll 移除未到期的元素,也不會將這些元素做爲正常元素對待。例如,size 方法同時返回到期和未到期元素的計數。此隊列不容許使用 null 元素。

BlockingQueue 核心方法

方法類型 拋出異常 特殊值 阻塞 超時
插入 add(e) offer(e) put(e) offer(e,time,unit)
移除 remove() poll() take() poll(time,unit)
檢查 element() peek() 不可用 不可用
  • 拋出異常

當阻塞隊列滿時,再往隊列裏 add 插入元素會拋出 IllegalStateException:Queue full

當阻塞隊列空時,再往隊列裏 remove 移除元素會拋出 NoSuchElementException

  • 特殊值

插入方法,成功 true 失敗 false。

移除方法,成功返回元素,沒有元素就返回 null。

  • 阻塞

當阻塞隊列滿時,生產者線程繼續往隊列裏 put 元素,隊列就會一直阻塞生產線程直到 put 數據 or 響應退出。

當阻塞隊列空時,消費者線程試圖從隊列裏 take 元素,隊列就會一直阻塞消費者線程直到隊列可用。

  • 超時退出

當阻塞隊列滿時,隊列會阻塞生產者線程必定時間,超出時間後生產者線程就會推出。

正確的建立線程池的方法

Executors 存在什麼問題?

在阿里巴巴 Java 開發手冊中提到,使用 Executors 建立線程池可能會致使 OOM(OutOfMemory ,內存溢出)。

咱們以前也已經演示了 OOM 的狀況,咱們看下如何正確建立線程池。

避免使用 Executors 建立線程池,主要是避免使用其中的默認實現,那麼咱們能夠本身直接調用 ThreadPoolExecutor 的構造函數來本身建立線程池。在建立的同時,給 BlockQueue 指定容量就能夠了。

private static ExecutorService executor = new ThreadPoolExecutor(10, 10,
        60L, TimeUnit.SECONDS,
        new ArrayBlockingQueue(10))
具體咱們須要根據不一樣的業務場景、本身設置線程池的參數、想使用某種隊列、想使用本身的線程工廠、想指定某種拒絕策略等等,來實現更合適的線程池。

中止線程池的正確方法

第一種:shutdown

調用線程池的此方法後,再也不接受新的任務,若是有新的任務增長則會拋出異常,待全部任務都執行關閉後,進行關閉。

public class ThreadPoolTest implements Runnable {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) {
            if (i == 5) {
                executorService.shutdown();
            }
            executorService.execute(new ThreadPoolTest());
        }
    }

    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName());
    }
}

運行結果:

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.thread.ThreadPoolTest@3764951d rejected from java.util.concurrent.ThreadPoolExecutor@4b1210ee[Shutting down, pool size = 5, active threads = 5, queued tasks = 0, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at com.thread.ThreadPoolTest.main(ThreadPoolTest.java:16)
pool-1-thread-5
pool-1-thread-3
pool-1-thread-2
pool-1-thread-4
pool-1-thread-1

第二種:isShutdown

當調用 shutdown 以後,此值爲 true。並非全部任務都執行完畢纔是 true。

第三種:isTerminated

線程池全部任務是否已經關閉,包括正在執行和隊列中的任務都結束了則返回 true。

public class ThreadPoolTest implements Runnable {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        for (int i = 0; i < 5; i++) {
            if (i >= 3) {
                executorService.shutdown();
                System.out.println(executorService.isTerminated());
            }else{
                executorService.execute(new ThreadPoolTest());
            }
        }
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("最後線程池狀態是否關閉:"+executorService.isTerminated());
    }

    @Override
    public void run() {
        try {
            TimeUnit.MILLISECONDS.sleep(300);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName());
    }
}

運行結果:

false
false
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
最後線程池狀態是否關閉:true

第四種:awaitTermination

檢測阻塞等待一段時間後,若是線程池任務都執行完了,返回 true,不然 false。

第五種:shutdownNow

馬上關閉全部線程。該方法會返回所未完成方法的集合。

public class ThreadPoolTest implements Runnable {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        for (int i = 0; i < 5; i++) {
            if (i >= 3) {
                Collection<Runnable> runnables = executorService.shutdownNow();
                runnables.forEach(System.out::println);
            }else{
                executorService.execute(new ThreadPoolTest());
            }
        }
    }

    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            System.out.println("我被中斷了!");
        }
        System.out.println(Thread.currentThread().getName());
    }
}

運行結果:

我被中斷了!
pool-1-thread-1
我被中斷了!
pool-1-thread-2
com.thread.ThreadPoolTest@4e50df2e

拒絕策略解析

拒接時機

  1. 當 executor 關閉時,提交新任務會被拒絕
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.thread.ThreadPoolTest@2b193f2d rejected from java.util.concurrent.ThreadPoolExecutor@355da254[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at com.thread.ThreadPoolTest.main(ThreadPoolTest.java:15)
  1. 當 executor 對最大線程和工做隊列容量使用有限邊界而且已經飽和時。

四種拒絕策略

  • CallerRunsPolicy:在調用者線程中直接執行被拒絕任務的 run 方法,除非線程池已經 shutdown,則直接拋棄任務。
  • AbortPolicy:直接丟棄任務,並拋出 RejectedExecutionException 異常。(默認拒絕策略)
  • DiscardPolicy:直接丟棄任務,什麼都不作。
  • DiscardOldestPolicy:該策略下,拋棄進入隊列最先的那個任務,而後嘗試把此次拒絕的任務放入隊列。

Executor 家族解析

Executor、ExecutorService、ThreadPoolExecutor、Executors 之間的關係

  1. Executor
public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

Executor 裏面只有一個 execute(Runnable command)回調接口。用於執行已提交的 Runnable 任務對象。

  1. ExecutorService
public interface ExecutorService extends Executor {

ExecutorService 接口是繼承 Executor 接口,增長了一些關於中斷的方法。

方法 invokeAny 和 invokeAll 是批量執行的最經常使用形式,它們執行任務 collection,而後等待至少一個,
或所有任務完成(可以使用 ExecutorCompletionService 類來編寫這些方法的自定義變體)。

submit 方法是提交一個返回值的任務用於執行,返回一個表示任務的未決結果的 Future。該 Future 的 get 方法在成功完成時將會返回該任務的結果。

  1. ThreadPoolExecutor
public class ThreadPoolExecutor extends AbstractExecutorService {

ThreadPoolExecutor 是 ExecutorService 的一個實現類,它使用可能的幾個池線程之一執行每一個提交的任務,一般使用 Executors 工廠方法配置。
線程池能夠解決兩個不一樣問題:因爲減小了每一個任務調用的開銷,它們一般能夠在執行大量異步任務時提供加強的性能,而且還能夠提供綁定和管理資源(包括執行任務集時使用的線程)的方法。

  1. Executors
public class Executors {

Executors 是一個工具類,能夠用於方便的建立線程池。

線程池實現線程複用的原理

咱們直接看 execute 方法源碼:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        //若是正在運行的核心線程數小於核心線程總數
        if (workerCountOf(c) < corePoolSize) {
            //增長一個核心線程來執行任務
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

咱們看一下 addWorker 方法:

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
}

首先判斷當前線程池的狀態,若是已經狀態不是 shutdown 或者 running,或者已經爲 shutdown 可是工做隊列已經爲空,那麼這個時候直接返回添加工做失敗。接下來是對線程池線程數量的判斷,根據調用時的 core 的值來判斷是跟 corePoolSize 仍是 maximumPoolSize 判斷。

在確認了線程池狀態以及線程池中工做線程數量以後,才真正開始添加工做線程。

新創建一個 worker 類(線程池的內部類,具體的工做線程),將要執行的具體線程作爲構造方法中的參數傳遞進去,接下來將其加入線程池的工做線程容器 workers,而且更新工做線程最大量,最後調用 worker 工做線程的 start()方法,就完成了工做線程的創建與啓動。

接下來咱們能夠看最重要的,也就是咱們以前創建完 Worker 類以後立馬調用的 run()方法了

public void run() {
    runWorker(this);
}
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock();
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

接下來可見,咱們所須要的任務,直接在工做線程中直接以 run()方式以非線程的方式所調用,這裏也就是咱們所須要的任務真正執行的地方。

在執行完畢後,工做線程的使命並無真正宣告段落。在 while 部分 worker 仍舊會經過 getTask()方法試圖取得新的任務。

下面是 getTask()的實現:

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

首先仍舊會判斷線程池的狀態是不是 running 仍是 shutdown 以及 stop 狀態下隊列是否仍舊有須要等待執行的任務。

若是狀態沒有問題,則會跟據 allowCoreThreadTimeOut 和 corePoolSize 的值經過對前面這兩個屬性解釋的方式來選擇從任務隊列中得到任務的方式(是否設置 timeout)。

其中的 timedOut 保證了確認前一次試圖取任務時超時發生的記錄,以確保工做線程的回收。

在 runWorker()方法的最後調用了 processWorkerExit 來執行工做線程的回收。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
}

先確保已經從新更新了線程池中工做線程的數量,以後從線程池中的工做線程容器移去當前工做線程,而且將完成的任務總數加到線程池的任務總數當中。

以後嘗試設置線程池狀態爲 TERMINATED。

若是線程池的線程數量小於核心線程時, 則增長一個線程來繼續處理任務隊列中任務。

execute 執行流程圖

線程池狀態

  • RUNNING :接受新的任務並處理排隊任務
  • SHUTDOWN:不接受新的任務,但處理排隊任務
  • STOP:不接受新任務,也不處理排隊任務,並中斷正在執行的任務
  • TIDYING:全部任務都已終止並 workerCount 爲 0 時,並執行 terminate()方法
  • TERMINATED:terminate()運行完成

源碼:

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

使用線程池的注意點

  • 避免任務堆積
  • 避免線程數過分增長
  • 排查線程泄露

ThreadLocal 詳解

什麼是 ThreadLocal

ThreadLocal 提供一個線程(Thread)局部變量,訪問到某個變量的每個線程都擁有本身的局部變量。說白了,ThreadLocal 就是想在多線程環境下去保證成員變量的安全。

ThreadLocal 的用途

  • 用途一:每一個線程須要獨享的對象
  • 用途二:每一個線程內須要保存全局變量(例如在攔截器中獲取的用戶信息),可讓不一樣方法直接使用,避免參數傳遞的麻煩

用途一:每一個線程須要一個獨享的對象

每一個 Thread 內有本身的實例副本,不共享

好比:教材只有一本,一塊兒作筆記有線程安全的問題,複印後就能夠解決這個問題。

需求:咱們想打印出兩個線程不一樣的時間

public class ThreadLocalTest {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        for (int i = 0; i < 2; i++) {
            int finalI = i;
            executorService.execute(() -> System.out.println(getDate(finalI + 100)));
        }
        executorService.shutdown();
    }

    public static String getDate(int seconds) {
        Date date = new Date(1000 * seconds);
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        return simpleDateFormat.format(date);
    }
}

運行結果:

1970-01-01 08:01:41
1970-01-01 08:01:40

看起來是咱們想要的結果。

可是若是咱們想打印 1000 條不一樣的時間,須要用到不少線程,咱們就會建立銷燬 1000 個 SimpleDateFormat 對象,無疑是浪費內存的寫法。

既然這樣,那咱們就把 SimpleDateFormat 建立爲類變量試試看。

public class ThreadLocalTest {

    static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 1000; i++) {
            int finalI = i;
            executorService.execute(() -> System.out.println(getDate(finalI + 100)));
        }
        executorService.shutdown();
    }

    public static String getDate(int seconds) {
        Date date = new Date(1000 * seconds);
        return simpleDateFormat.format(date);
    }
}

運行結果:

能夠看到這樣會引起線程安全的問題。

固然,咱們也能夠進行加鎖來解決這個問題,可是會引起效率問題。

正確方案使用 ThreadLocal 來解決這個問題

public class ThreadLocalTest {

    static ThreadLocal<SimpleDateFormat> threadLocal = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 1000; i++) {
            int finalI = i;
            executorService.execute(() -> System.out.println(getDate(finalI + 100)));
        }
        executorService.shutdown();
        threadLocal.remove();
    }

    public static String getDate(int seconds) {
        Date date = new Date(1000 * seconds);
        return threadLocal.get().format(date);
    }
}

用途二:當前用戶信息須要被線程內全部方法共享

比較繁瑣的方案就是做爲參數層層專遞。

若是使用 map 也須要保證線程安全問題,因此須要加鎖或者使用 ConcurrentHashMap,但都對性能有影響

因此咱們使用 ThreadLocal 來實現。

public class ThreadLocalTest {

    public static void main(String[] args) {
        new Service1().precess();
    }
}

class Service1 {
    static ThreadLocal<User> threadLocal = new ThreadLocal<>();

    public void precess() {
        User user = new User("Jack");
        threadLocal.set(user);
        new Service2().precess();
        threadLocal.remove();
    }
}

class Service2 {
    public void precess() {
        System.out.println("Service2拿到" + Service1.threadLocal.get().name);
        new Service3().precess();
    }
}

class Service3 {
    public void precess() {
        System.out.println("Service3拿到" + Service1.threadLocal.get().name);
    }
}

class User {
    public String name;

    public User(String name) {
        this.name = name;
    }
}

運行結果:

Service2拿到Jack
Service3拿到Jack

ThreadLocal 的兩個做用

  1. 讓某個須要用到的對象在線程間隔離
  2. 在任何方法中均可以輕鬆獲取到該對象。

場景一:initialValue:在 ThreadLocal第一次get的時候把對象給初始化出來,對象的初始化時機能夠由咱們控制

場景二:set:若是須要保存到 ThreadLocal 中的對象生成時機不禁咱們控制,咱們能夠使用 ThreadLocal.set()直接放到 ThreadLocal 中去,以便後續使用。

使用 ThreadLocal 帶來的好處

  • 達到線程安全
  • 不須要加鎖,提升執行效率
  • 更高效的利用內存節省開銷
  • 免去傳參的繁瑣

ThreadLocal 原理

Thread、ThreadLocal、ThreadLocalMap 之間的關係

在 Thread 類中包含一個成員變量

/* ThreadLocal values pertaining to this thread. This map is maintained
     * by the ThreadLocal class. */
    ThreadLocal.ThreadLocalMap threadLocals = null;

一個 ThreadLocalMap 又能夠包含無數個 ThreadLocal。

圖解以下:

ThreadLocal 重要方法介紹

  • T initialValue() :初始化,咱們看一下方法原理

咱們先看下 get 方法:

public T get() {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        return setInitialValue();
}

能夠看到在 ThreadLocalMap 爲 null 的時候咱們調用了 setInitialValue()方法

private T setInitialValue() {
        T value = initialValue();
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
        return value;
}

在 initialValue 方法沒有被重寫的時候返回的是 null,由於咱們已經重寫了,因此它會將咱們的 value 放到 ThreadLocalMap 中的 ThreadLocal 對象中。

一般,每一個線程最多調用一次此方法,若是已經調用了 remove()方法後,再調用 get(),就會再次觸發 initialValue()方法。

  • set(T t):爲這個線程設置一個新值
  • T get():獲得這個線程對應的 value,若是是第一次調用 get,則會調用 InitialValue()來獲取值。
  • void remove():刪除對應這個線程的值

ThreadLocal 重要方法解析

get 方法解析:

咱們先看源碼:

public T get() {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        return setInitialValue();
}

剛纔已經講過 map 爲 null 的狀況,咱們看下若是不爲 null 是如何獲取到值的。

首先在 map.getEntry(this)中咱們從不爲 null 的 ThreadLocalMap 中 getEntry 也就是咱們的 key,this 就是咱們當前的 ThreadLocal 對象,獲得的 e 也就是咱們的鍵值對,而後.value 來返回咱們的結果。

set 方法解析:

public void set(T value) {
        //獲取當前線程對象
        Thread t = Thread.currentThread();
        //獲取當前線程對象的ThreadLocalMap
        ThreadLocalMap map = getMap(t);
        //若是不爲null就set進去,k爲當前ThreadLocal,v就是咱們傳入的對象
        if (map != null)
            map.set(this, value);
        else
        //爲null就去建立ThreadLocalMap並set當前k、v
            createMap(t, value);
}

remove 方法解析:

public void remove() {
         ThreadLocalMap m = getMap(Thread.currentThread());
         if (m != null)
             m.remove(this);
}

remove 方法比較簡單,就是拿到 ThreadLocalMap 而後刪除掉 k 等於當前對象的 ThreadLocal。

ThreadLocalMap 類

ThreadLocalMap,也就是 Thread.threadlocals

ThreadLocalMap 類是每一個線程 Thread 類裏面的變量,裏面最重要的一個鍵值對數組Entry[] table,能夠認爲是一個 map。

  • k:當前 ThreadLocal
  • v:實際存儲的成員變量
/**
         * The table, resized as necessary.
         * table.length MUST always be a power of two.
         */
    private Entry[] table;

若是發生哈希衝突

ThreadLocalMap 和 HashMap 有所不一樣,HashMap(jdk8)採用的是鏈表+紅黑樹

而 ThreadLocalMap 採用的是線性探測法,若是發生衝突,就繼續尋找下一個空位置,而不是使用鏈表。

ThreadLocal 注意點

  1. 內存泄漏

在 ThreadLocal 中有一個靜態內部類也就是 ThreadLocalMap。

ThreadLocalMap 中的 Entry 是繼承了 WeakReference 也就是弱引用

弱引用的特色就是在垃圾回收器線程掃描它所管轄的內存區域的過程當中,一旦發現了只具備弱引用的對象,無論當前內存空間足夠與否,都會回收它的內存。

可是咱們發現下面一句 value = v;又包含了強引用。

正常狀況下,當線程終止,保存在 ThreadLocal 中的 value 就會被垃圾回收,由於沒有任何強引用了。

可是若是線程不終止(好比線程須要保持好久),那麼 key 對應的 value 就不能被回收,由於有如下調用鏈:

Thread --> ThreadLocalMap --> Entry(key 爲 null) --> Value

由於 value 和 Thread 之間還保存這個強引用鏈路,因此致使value沒法被回收,就可能回出現 OOM。

JDK 已經考慮到這個問題,因此在 set、remove、rehash 方法中會掃描 key 爲 null,若是 key 爲 null 則會把 value 也設置爲 null。

可是若是 ThreadLocal 不被使用,那麼 set、remove、rehash 方法也不會被調用,若是同時線程並無中止,則調用鏈會一直存在,就會致使 value 的內存泄漏。

因此咱們須要在使用完 ThreadLocal 後主動使用 remove()方法來避免內存泄漏。

  1. 若是 set 進去的是一個 static 對象,則仍是會有併發訪問的問題
  2. 子線程訪問問題

咱們來看一下什麼是子線程訪問問題。

public class ThreadLocalTest {

    public static void main(String[] args) {
        ThreadLocal<String> threadLocal = new ThreadLocal<>();
        threadLocal.set("Hello");
        new Thread(() -> {
            System.out.println(threadLocal.get());
        }, "Thread01").start();
    }
}

運行結果:

null

咱們看一下爲何是 null,咱們直接跟進到 get 方法中:

能夠很清楚的看到,咱們在 get 的時候拿到當前線程是 Thead01,而咱們 set 進去的是 main 線程,因此咱們拿到的 ThreadLocalMap 是 null。

而後咱們調用 setInitialValue()方法

private T setInitialValue() {
        T value = initialValue();
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
        return value;
}

在第一句調用了 initialValue()方法:

protected T initialValue() {
        return null;
}

這下咱們就明白了,咱們返回了個 null,而且在 Thead01 子線程中建立了一個 ThreadLocalMap,value 爲 null 。

咱們看另外一個例子:

public class ThreadLocalTest {

    public static void main(String[] args) {
        ThreadLocal<String> threadLocal = ThreadLocal.withInitial(() -> "Hello");
        new Thread(() -> {
            System.out.println(threadLocal.get());
            // System.out.println(threadLocal1.get());
        }, "Thread01").start();
    }
}

運行結果:

Hello

我相信你們已經明白爲何能獲取到 Hello 了。

咱們看源碼:

static final class SuppliedThreadLocal<T> extends ThreadLocal<T> {

        private final Supplier<? extends T> supplier;

        SuppliedThreadLocal(Supplier<? extends T> supplier) {
            this.supplier = Objects.requireNonNull(supplier);
        }

        @Override
        protected T initialValue() {
            return supplier.get();
        }
}

由於在 withInitial 裏面咱們繼承了 ThreadLocal 而且重寫了 initialValue 方法,因此咱們得到到了 Hello。

可是,這樣作咱們在子線程中,至關因而又建立了一個 ThreadLocalMap 將 value 存了進去。

InheritableThreadLocal 解析

咱們剛纔已經看到了在子線程中沒法訪問到父線程 ThreadLocal 類型變量的值。

咱們試試 InheritableThreadLocal 類

public class ThreadLocalTest {

    public static void main(String[] args) {
        ThreadLocal<String> threadLocal = new InheritableThreadLocal<>();
        threadLocal.set("hello");
        new Thread(() -> {
            System.out.println(threadLocal.get());
        }, "Thread01").start();
    }
}

運行結果:

hello

可是,InheritableThreadLocal 爲何可以讀取出來?

在 Thread 類中,inheritableThreadLocals,他的類型同 Thread 內部的 threadLocals 變量。

咱們看一下這個類源碼:

public class InheritableThreadLocal<T> extends ThreadLocal<T> {

    //該函數在父線程建立子線程,向子線程複製InheritableThreadLocal變量時使用
    protected T childValue(T parentValue) {
        return parentValue;
    }

    /**
     * 因爲重寫了getMap,操做InheritableThreadLocal時,
     * 將隻影響Thread類中的inheritableThreadLocals變量,
     * 與threadLocals變量再也不有關係
     */
    ThreadLocalMap getMap(Thread t) {
       return t.inheritableThreadLocals;
    }

    /**
     * 相似於getMap,操做InheritableThreadLocal時,
     * 將隻影響Thread類中的inheritableThreadLocals變量,
     * 與threadLocals變量再也不有關係
     */
    void createMap(Thread t, T firstValue) {
        t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
    }
}

InheritableThreadLocal 繼承了 ThreadLocal 而且重寫了三個方法。

咱們這個時候回過頭看 Thread 類的初始化 init 方法

private void init(ThreadGroup g, Runnable target, String name,
                      long stackSize, AccessControlContext acc,
                      boolean inheritThreadLocals) {

若是 parent 的 inheritableThreadLocals 不是 null,那麼就會將當前線程的 inheritableThreadLocals 設置爲 parent 的 inheritableThreadLocals

parent 是什麼?以前也說過了,就是建立這個線程的線程,也就是平時說的父線程。

因此說藉助於 inheritableThreadLocals,能夠實現,建立線程向被建立線程之間數據傳遞

static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
        return new ThreadLocalMap(parentMap);
}

邏輯很清晰,建立了一個 ThreadLocalMap

簡單理解:這個建立的 ThreadLocalMap 就是根據入參的 ThreadLocalMap,拷貝建立一份。

總結

其實就是從父線程(當前建立線程)中複製的一份,而後後續的數據讀取解析,則是經過 inheritableThreadLocals 變量,和內部的那個 threadLocals 沒有什麼關係。

Lock 接口

什麼是 Lock

鎖是一種工具,用於控制對共享資源的訪問。

Lock 和 synchronized,這兩個都是最多見的鎖,它們均可以達到線程安全的目的,可是在使用上和功能上有較大不一樣。

Lock 並非用來代替synchronized 的,而是在使用 synchronized 不適合或者不足以知足要求的時候,來提供更高級更靈活的功能。

Lock 接口最多見的實現類是ReentrantLock

一般狀況下,Lock 只容許一個線程來訪問這個共享資源。不過有的時候,一些特殊的實現也能夠容許併發訪問,好比 ReadWriteLock 裏面的ReadLock

爲何須要 Lock

首先咱們先看一下爲何 synchronized 不夠用?

  1. 效率低:鎖的釋放狀況少,視圖獲取鎖時不能設定超時、不能中斷一個正在試圖獲取鎖的線程。
  2. 不夠靈活:加鎖和釋放的時機單一,每一個鎖僅有單一的條件(某個對象)。
  3. 沒法知道是否成功得到到鎖

Lock 主要方法介紹

在 Lock 中聲明瞭四個方法來獲取鎖

  • lock()
  • tryLock()
  • tryLock(long time,TImeUnit unit)
  • lockInterruptibly()

lock()

lock()就是最普通的獲取鎖,若是鎖已經被其它線程獲取,則進行等待。

lock 不會像 synchronized 同樣在異常時自動釋放鎖

所以最佳實踐是在finally中釋放鎖,以保證發生異常時鎖必定會被釋放。

public class LockTest {

    private static Lock lock = new ReentrantLock();

    public static void main(String[] args) {
        lock.lock();
        try {
            //業務邏輯
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

    }

}

順便說一下爲何不能在 try 內寫上 lock.lock();

阿里巴巴規範手冊:在使用阻塞等待獲取鎖的方式中,必須在 try 代碼塊以外,而且在加鎖方法與 try 代碼塊之間沒有任何可能拋出異常的方法調用,避免加鎖成功後,在 finally 中沒法解鎖。

  • 說明一:若是在 lock 方法與 try 代碼塊之間的方法調用拋出異常,那麼沒法解鎖,形成其它線程沒法成功獲取鎖。
  • 說明二:若是 lock 方法在 try 代碼塊以內,可能因爲其它方法拋出異常,致使在 finally 代碼塊中,unlock 對未加鎖的對象解鎖,它會調用 AQS 的 tryRelease 方法(取決於具體實現類),拋出 IllegalMonitorStateException 異常。
  • 說明三:在 Lock 對象的 lock 方法實現中可能拋出 unchecked 異常,產生的後果與說明二相同。

lock 方法不能被中斷,這會帶來很大隱患:一旦陷入死鎖,lock()就會陷入永久等待。

tryLock()

tryLock 用來嘗試獲取鎖,若是當前鎖沒有被其餘線程佔用,則獲取成功,返回 true,不然返回 false,表明獲取鎖失敗。

相比於 lock,這樣的方法顯然功能更強大了,咱們能夠根據是否能獲取到鎖來決定後續程序的行爲

該方法會當即返回,並不會在拿不到鎖時阻塞。

tryLock(long time,TimeUnit unit)

該方法就是在該時間段內嘗試獲取鎖,若是超過期間就放棄。

lockInterruptibly()

至關因而把 tryLock 的時間設爲無限,在等待鎖的過程當中,線程能夠被中斷

可見性保證

Lock 一樣也是遵循Happens-before原則。

Lock 的加鎖解鎖和 synchronized 有一樣的內存語義,也就是說,下一個線程加鎖後能夠看到全部前一個線程解鎖前發生的全部操做

鎖的分類圖

這些分類並不是互斥的,也就是多個類型能夠並存:有可能一個鎖,同時屬於兩種類型。

樂觀鎖和悲觀鎖

爲何會誕生非互斥同步鎖————互斥同步鎖的劣勢

  • 阻塞和喚醒帶來的性能劣勢
  • 永久阻塞:若是持有鎖的線程被永久阻塞,好比遇到了無限循環、死鎖等活躍性問題,那麼等待線該程釋放鎖的那幾個悲催線程,將永遠得不到執行。

什麼是樂觀鎖和悲觀鎖

悲觀鎖:顧名思義,悲觀鎖是基於一種悲觀的態度類來防止一切數據衝突,它是以一種預防的姿態在修改數據以前把數據鎖住,而後再對數據進行讀寫,在它釋放鎖以前任何人都不能對其數據進行操做,直到前面一我的把鎖釋放後下一我的數據加鎖纔可對數據進行加鎖,而後才能夠對數據進行操做,通常數據庫自己鎖的機制都是基於悲觀鎖的機制實現的。

典型例子:synchronized、Lock 接口

樂觀鎖:樂觀鎖是對於數據衝突保持一種樂觀態度,操做數據時不會對操做的數據進行加鎖(這使得多個任務能夠並行的對數據進行操做),只有到數據提交的時候才經過一種機制來驗證數據是否存在衝突,通常使用 CAS 算法來實現的。

典型例子:Atomic 原子類、併發容器等

開銷對比

悲觀鎖的原始開銷要高於樂觀鎖,可是特色是一勞永逸,臨界區持鎖時間就算愈來愈差,也不會對互斥鎖的開銷產生影響。

相反,雖然樂觀鎖一開始開銷比較小,可是若是自旋時間很長,或者不停重試,那麼消耗的資源也會愈來愈多

兩種鎖各自的使用場景:各有千秋

  • 悲觀鎖:適用於併發寫入多的狀況,適用於臨界區持鎖時間比較長的狀況,悲觀鎖能夠避免大量的無用自旋等消耗。
  • 樂觀鎖:適用於讀多寫少的場景,不加鎖可讓讀取性能大幅提升。

可重入鎖和不可重入鎖

可重入鎖就是一個類的 A、B 兩個方法,A、B 都有擁有同一把鎖,當 A 方法調用時,得到鎖,在 A 方法的鎖尚未被釋放時,調用 B 方法時,B 方法也得到該鎖。

不可重入鎖就是一個類的 A、B 兩個方法,A、B 都有擁有同一把鎖,當 A 方法調用時,得到鎖,在 A 方法的鎖尚未被釋放時,調用 B 方法時,B 方法也得到不了該鎖,必須等 A 方法釋放掉這個鎖。

synchronized 和 ReentrantLock 都是可重入鎖

下面使用 ReentrantLock 證實可重入鎖的例子:

public class LockTest {

    private static ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) {
        methodA();
    }

    public static void methodA() {
        System.out.println("未得到鎖以前,count爲:" + lock.getHoldCount());
        lock.lock();
        try {
            System.out.println("得到A的鎖,count爲:" + lock.getHoldCount());
            methodB();
        } finally {
            lock.unlock();
            System.out.println("釋放A的鎖,count爲:" + lock.getHoldCount());
        }
    }

    public static void methodB() {
        lock.lock();
        try {
            System.out.println("得到B的鎖,count爲:" + lock.getHoldCount());
        } finally {
            lock.unlock();
            System.out.println("釋放B的鎖,count爲:" + lock.getHoldCount());
        }
    }

}

運行結果:

未得到鎖以前,count爲:0
得到A的鎖,count爲:1
得到B的鎖,count爲:2
釋放B的鎖,count爲:1
釋放A的鎖,count爲:0

證實了 ReentrantLock 是可重入鎖,在 holdCount = 0 的時候就會釋放該鎖。

public void unlock() {
        sync.release(1);
}

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
}

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
}

在 unlock()方法中咱們看到當前狀態 - 1,若是 c == 0 就說明釋放該鎖,否則就只修改鎖的狀態 state。

可重入鎖

不可重入鎖

公平鎖和非公平鎖

什麼是公平和非公平?

公平指的是按照線程的請求順序,來分配鎖;非公平指的是,不徹底按照請求的順序,在必定狀況下,能夠插隊。

爲何要有非公平鎖

假設線程 A 持有一把鎖,線程 B 請求這把鎖,因爲線程 A 已經持有這把鎖了,因此線程 B 會陷入等待,在等待的時候線程 B 會被掛起,也就是進入阻塞狀態,那麼當線程 A 釋放鎖的時候,本該輪到線程 B 甦醒獲取鎖,但若是此時忽然有一個線程 C 插隊請求這把鎖,那麼根據非公平的策略,會把這把鎖給線程 C,這是由於喚醒線程 B 是須要很大開銷的,頗有可能在喚醒以前,線程 C 已經拿到了這把鎖而且執行完任務釋放了這把鎖。

相比於等待喚醒線程 B 的漫長過程,插隊的行爲會讓線程 C 自己跳過陷入阻塞的過程,若是在鎖代碼中執行的內容很少的話,線程 C 就能夠很快完成任務,而且在線程 B 被徹底喚醒以前,就把這個鎖交出去,這樣是一個共贏的局面,對於線程 C 而言,不須要等待提升了它的效率,而對於線程 B 而言,它得到鎖的時間並無推遲,由於等它被喚醒的時候,線程 C 早就釋放鎖了,由於線程 C 的執行速度相比於線程 B 的喚醒速度,是很快的,因此 Java 設計非公平鎖,是爲了提升總體的運行效率避免喚醒帶來的空檔期

代碼案例公平鎖

public class LockTest {

    public static void main(String[] args) {
        PrintQueue printQueue = new PrintQueue();
        Thread thread[] = new Thread[10];
        for (int i = 0; i < 10; i++) {
            thread[i] = new Thread(new Job(printQueue));
        }
        for (int i = 0; i < 10; i++) {
            thread[i].start();
            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class Job implements Runnable {

    PrintQueue printQueue;

    public Job(PrintQueue printQueue) {
        this.printQueue = printQueue;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "開始打印");
        printQueue.printJob(new Object());
        System.out.println(Thread.currentThread().getName() + "打印完畢");
    }
}

class PrintQueue {

    private Lock queueLock = new ReentrantLock(true);

    public void printJob(Object document) {
        queueLock.lock();
        try {
            int duration = new Random().nextInt(10) + 1;
            System.out.println(Thread.currentThread().getName() + "正在打印,須要" + duration);
            TimeUnit.SECONDS.sleep(duration);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            queueLock.unlock();
        }

        queueLock.lock();
        try {
            int duration = new Random().nextInt(10) + 1;
            System.out.println(Thread.currentThread().getName() + "正在打印,須要" + duration + "秒");
            Thread.sleep(duration * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            queueLock.unlock();
        }
    }
}

運行結果:

Thread-0開始打印
Thread-0正在打印,須要3
Thread-1開始打印
Thread-2開始打印
Thread-3開始打印
Thread-4開始打印
Thread-5開始打印
Thread-6開始打印
Thread-1正在打印,須要4
Thread-2正在打印,須要7
Thread-3正在打印,須要7
Thread-4正在打印,須要6
Thread-5正在打印,須要5
Thread-6正在打印,須要5
Thread-0正在打印,須要8秒
Thread-0打印完畢
Thread-1正在打印,須要2秒
Thread-1打印完畢
Thread-2正在打印,須要3秒
Thread-2打印完畢
Thread-3正在打印,須要4秒
Thread-3打印完畢
Thread-4正在打印,須要2秒
Thread-4打印完畢
...

測試非公平鎖只須要將參數改成 false 便可。true 表明公平鎖

private Lock queueLock = new ReentrantLock(true);

源碼分析

公平鎖:

protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            //  和非公平鎖相比,這裏多了一個判斷:隊列中是否有線程在等待
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
}

非公平鎖:

static final class NonfairSync extends Sync {
    final void lock() {
        //  和公平鎖相比,這裏會直接先進行一次CAS,成功就返回了
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

注意

若是使用 tryLock()方法,它是不遵照設定的公平原則,若是有線程執行 tryLock()的時候,一旦有線程釋放了鎖,那麼這個正在執行 tryLock()的線程會當即得到鎖,即便以前已經有人在排隊了。

總結

優點 劣勢
公平鎖 在平等狀況下,每一個線程在等待一段時間後都會得到執行的機會 更慢,吞吐量小
不公平鎖 更快,吞吐量大 可能會致使在阻塞隊列中的線程長期處於飢餓狀態

非公平鎖和公平鎖的兩處不一樣:

非公平鎖在調用 lock 後,首先就會調用 CAS 進行一次搶鎖,若是這個時候恰巧鎖沒有被佔用,那麼直接就獲取到鎖返回了。

非公平鎖在 CAS 失敗後,和公平鎖同樣都會進入到 tryAcquire 方法,在 tryAcquire 方法中,若是發現鎖這個時候被釋放了(state == 0),非公平鎖會直接 CAS 搶鎖,可是公平鎖會判斷等待隊列是否有線程處於等待狀態,若是有則不去搶鎖,乖乖排到後面。

公平鎖和非公平鎖就這兩點區別,若是這兩次 CAS 都不成功,那麼後面非公平鎖和公平鎖是同樣的,都要進入到阻塞隊列等待喚醒。

相對來講,非公平鎖會有更好的性能,由於它的吞吐量比較大。固然,非公平鎖讓獲取鎖的時間變得更加不肯定,可能會致使在阻塞隊列中的線程長期處於飢餓狀態。

共享鎖和排它鎖

什麼是共享鎖和排它鎖

排它鎖:又稱爲獨佔鎖、共享鎖

共享鎖:又稱爲讀鎖,得到共享鎖以後,能夠查看但沒法修改和刪除數據,其餘線程此時也能夠得到到共享鎖,也能夠查看但沒法修改和刪除數據。

共享鎖和排它鎖的典型是讀寫鎖ReentrantReadWriteLock,其中讀鎖是共享鎖,寫鎖是排它鎖

在沒有讀寫鎖以前,咱們假設使用 ReentrantLock,雖然保證了線程安全,可是也浪費了必定的資源多個讀操做同時進行,並無線程安全問題

在讀的地方使用讀鎖,寫的地方使用寫鎖,靈活控制,若是沒有寫鎖的狀況下,讀是無阻塞的,大大提升效率。

讀寫鎖的規則

  • 多個線程只申請讀鎖,均可以申請到。
  • 若是有一個線程佔用了讀鎖,則此時其餘線程若是申請寫鎖,則申請寫鎖的線程會等待釋放讀鎖。
  • 若是有一個線程佔用了寫鎖,則此時其餘線程若是申請讀鎖,則申請讀鎖的線程會等待釋放寫鎖。

ReentrantReadWriteLock 具體用法

public class LockTest {

    private static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private static ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
    private static ReentrantReadWriteLock.ReadLock readLock = lock.readLock();

    public static void main(String[] args) {
        new Thread(() -> write()).start();
        new Thread(() -> read()).start();
        new Thread(() -> read()).start();
        new Thread(() -> write()).start();
        new Thread(() -> read()).start();
    }

    private static void read() {
        readLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "開始學習《Thinking in Java》");
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            System.out.println(Thread.currentThread().getName() + "太難了!我不學了!");
            readLock.unlock();
        }
    }

    private static void write() {
        writeLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "開始印刷《Thinking in Java》");
        } finally {
            System.out.println(Thread.currentThread().getName() + "印刷完成");
            writeLock.unlock();
        }
    }
}

運行結果:

Thread-0開始印刷《Thinking in Java》
Thread-0印刷完成
Thread-1開始學習《Thinking in Java》
Thread-1太難了!我不學了!
Thread-2開始學習《Thinking in Java》
Thread-2太難了!我不學了!
Thread-3開始印刷《Thinking in Java》
Thread-3印刷完成
Thread-4開始學習《Thinking in Java》
Thread-4太難了!我不學了!

讀鎖插隊策略

假設線程 1 和線程 2 在讀取,線程 3 想要寫入,可是拿不到鎖,因而進入等待隊列,線程 4 不在隊列中,如今想要讀取。

此時有兩種策略

  1. 讀能夠插隊,效率高

可是這樣可能會致使後面一堆讀線程過來,一直輪不到線程 3 來寫。致使寫入飢餓。

  1. 避免飢餓

一個個排隊,這樣就不會致使飢餓,ReentrantReadWriteLock 就是採用第二種策略。

更確切的說就是:在非公平鎖狀況下,容許寫鎖插隊,也容許讀鎖插隊,可是讀鎖插隊的前提是隊列中的頭節點不能是想獲取寫鎖的線程。

公平鎖源碼:

非公平鎖源碼:

鎖的升降級

升降級是指讀鎖升級爲寫鎖,寫鎖降級爲度鎖。在 ReentrantReadWriteLock 讀寫鎖中,只支持寫鎖降級爲讀鎖,而不支持讀鎖升級爲寫鎖。

代碼演示:

public class LockTest {

    private static ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false);
    private static ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
    private static ReentrantReadWriteLock.ReadLock readLock = lock.readLock();

    public static void main(String[] args) {
        new Thread(() -> write()).start();
        new Thread(() -> read()).start();
    }

    private static void read() {
        readLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "開始學習《Thinking in Java》");
            writeLock.lock();
            System.out.println(Thread.currentThread().getName() + "得到到了寫鎖");
        } finally {
            writeLock.unlock();
            System.out.println(Thread.currentThread().getName() + "太難了!我不學了!");
            readLock.unlock();
        }
    }

    private static void write() {
        writeLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "開始印刷《Thinking in Java》");
            readLock.lock();
            System.out.println(Thread.currentThread().getName() + "在寫鎖中獲取到了讀鎖");
        } finally {
            readLock.unlock();
            System.out.println(Thread.currentThread().getName() + "印刷完成");
            writeLock.unlock();
        }
    }
}

運行結果:

Thread-0開始印刷《Thinking in Java》
Thread-0在寫鎖中獲取到了讀鎖
Thread-0印刷完成
Thread-1開始學習《Thinking in Java》

咱們能夠看到在寫鎖中成功得到到了讀鎖,而在讀鎖中被一直阻塞。說明不支持鎖升級!

爲何 ReentrantReadWriteLock 不支持鎖升級

主要是避免死鎖,例如兩個線程 A 和 B 都在讀, A 升級要求 B 釋放讀鎖,B 升級要求 A 釋放讀鎖,互相等待造成死循環。若是能嚴格保證每次都只有一個線程升級那也是能夠的。

總結

  1. 讀寫鎖特色特色:讀鎖是共享鎖,寫鎖是排他鎖,讀鎖和寫鎖不能同時存在
  2. 插隊策略:爲了防止線程飢餓,讀鎖不能插隊
  3. 升級策略:只能降級,不能升級
  4. ReentrantReadWriteLock 適合於讀多寫少的場合,能夠提升併發效率,而 ReentrantLock 適合普通場合

自旋鎖和阻塞鎖

阻塞或者喚醒一個 Java 線程須要操做系統切換 CPU 狀態來完成,這種狀態轉換須要耗費處理器時間。

若是同步代碼塊中的內容過於簡單,狀態轉換消耗的時間有可能比用戶代碼執行的時間還要長

在許多場景中,同步資源的鎖定時間很短,爲了這一小段時間去切換線程,線程掛起和恢復現場的話費可能會讓系統得不償失

若是物理機器有多個處理器,可以讓兩個或以上的線程同時並行,咱們就可讓後面那個請求鎖的線程不放棄 CPU 的執行時間,看看持有鎖是否會在短期內釋放鎖。

而爲了讓當前線程"稍等一下",咱們須要讓當前線程進行自旋,若是在自旋過程當中前面鎖定的線程釋放了鎖,那麼當前線程就能夠直接獲取同步資源,避免了資源消耗,這就是自旋鎖

阻塞鎖就是若是沒拿到鎖,會直接阻塞當前線程,直到被喚醒。

自旋鎖的缺點

若是鎖被佔用時間很長,那麼自旋的線程就會白白浪費處理器資源。

代碼演示

public class LockTest {

    private AtomicReference<Thread> sign = new AtomicReference<>();

    public void lock() {
        Thread current = Thread.currentThread();
        while (!sign.compareAndSet(null, current)) {
            System.out.println("自旋獲取失敗,再次嘗試");
        }
    }

    public void unlock() {
        Thread current = Thread.currentThread();
        sign.compareAndSet(current, null);
    }

    public static void main(String[] args) {
        LockTest spinLock = new LockTest();
        Runnable runnable = () -> {
            System.out.println(Thread.currentThread().getName() + "開始嘗試獲取自旋鎖");
            spinLock.lock();
            System.out.println(Thread.currentThread().getName() + "獲取到了自旋鎖");
            try {
                Thread.sleep(300);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                spinLock.unlock();
                System.out.println(Thread.currentThread().getName() + "釋放了自旋鎖");
            }
        };
        Thread thread1 = new Thread(runnable);
        Thread thread2 = new Thread(runnable);
        thread1.start();
        thread2.start();
    }

}

運行結果:

Thread-0開始嘗試獲取自旋鎖
Thread-1開始嘗試獲取自旋鎖
Thread-0獲取到了自旋鎖
Thread-0釋放了自旋鎖
Thread-1獲取到了自旋鎖
Thread-1釋放了自旋鎖

在 while 中會進行大量的循環判斷,能夠嘗試打印語句看看。

後續會講述 Atomic 原子類是如何使用 CAS 算法來實現自旋。

自旋鎖的使用場景

  • 自選鎖通常用於多核的服務器,在併發度不是特別高的狀況下,比阻塞鎖的效率高。
  • 另外,自旋鎖適用於臨界區比較短小的狀況,不然若是臨界區很大(一旦拿到鎖,好久才釋放)那也是不合適的。

可中斷鎖和不可中斷鎖

在 Java 中,synchronized 是不可中斷鎖,而 Lock 是可中斷鎖,由於 tryLock(time)和 lockinterruptibly 都能響應中斷。

synchronized 原理以及鎖優化

同步代碼塊:

monitorenter 指令插入到同步代碼塊的開始位置,monitorexit 指令插入到同步代碼塊的結束位置,JVM 須要保證每個 monitorenter 都有一個 monitorexit 與之相對應。任何對象都有一個 monitor 與之相關聯,當且一個 monitor 被持有以後,他將處於鎖定狀態。線程執行到 monitorenter 指令時,將會嘗試獲取對象所對應的 monitor 全部權,即嘗試獲取對象的鎖。

同步方法:

synchronized 方法則會被翻譯成普通的方法調用和返回指令如:invokevirtual、areturn 指令,在 JVM 字節碼層面並無任何特別的指令來實現被 synchronized 修飾的方法,而是在 Class 文件的方法表中將該方法的 accessflags 字段中的 synchronized 標誌位置 1,表示該方法是同步方法並使用調用該方法的對象或該方法所屬的 Class 在 JVM 的內部對象表示 Klass 作爲鎖對象。

synchronized 鎖 和 對象頭息息相關。因此咱們先了解一下對象在堆中的結構:

咱們須要先了解兩個重要的概念:Java 對象頭、Monitor。

Java 對象頭

synchronized 用的鎖是存在 Java 對象頭裏的,那麼什麼是 Java 對象頭呢?

Hotspot 虛擬機的對象頭主要包括兩部分數據:Mark Word(標記字段)Klass Pointer(類型指針)。其中 Klass Point 是是對象指向它的類元數據的指針,虛擬機經過這個指針來肯定這個對象是哪一個類的實例,Mark Word 用於存儲對象自身的運行時數據,它是實現輕量級鎖和偏向鎖的關鍵。可是若是對象是數組類型,則須要三個機器碼,由於 JVM 虛擬機能夠經過 Java 對象的元數據信息肯定 Java 對象的大小,可是沒法從數組的元數據來確認數組的大小,因此用一塊來記錄數組長度。

Mark Word

Mark Word 用於存儲對象自身的運行時數據,如哈希碼(HashCode)、GC 分代年齡、鎖狀態標誌、線程持有的鎖、偏向線程 ID、偏向時間戳等等,佔用內存大小與虛擬機位長一致。

Klass Word

存儲指向對象所屬類(元數據)的指針,JVM 經過這個肯定這個對象屬於哪一個類。

Monitor

什麼是 Monitor?

咱們能夠把它理解爲一個同步工具,也能夠描述爲一種同步機制,它一般被描述爲一個對象。

與一切皆對象同樣,全部的 Java 對象是天生的 Monitor,每個 Java 對象都有成爲 Monitor 的潛質,由於在 Java 的設計中 ,每個 Java 對象都帶了一把看不見的鎖,它叫作內部鎖或者 Monitor 鎖。

Monitor 是線程私有的數據結構,每個線程都有一個可用 Monitor Record 列表,同時還有一個全局的可用列表。每個被鎖住的對象都會和一個 Monitor 關聯(對象頭的 Mark Word 中的 Lock Word 指向 Monitor 的起始地址),同時 Monitor 中有一個 Owner 字段存放擁有該鎖的線程的惟一標識,表示該鎖被這個線程佔用。

Monitor 是由 ObjectMonitor 實現的,源碼是 C++來實現的。主要結構以下:

ObjectMonitor() {
        _header       = NULL;
        _count        = 0;   // 記錄個數
        _waiters      = 0,   // 等待線程數
        _recursions   = 0;  //  重入次數
        _object       = NULL;
        _owner        = NULL;  // 當前持有鎖的線程
        _WaitSet      = NULL;  // 調用了 wait 方法的線程被阻塞 放在這裏
        _WaitSetLock  = 0 ;    // 保護等待隊列,簡單的自旋
        _Responsible  = NULL ;
        _succ         = NULL ;
        _cxq          = NULL ;
        FreeNext      = NULL ;
        _EntryList    = NULL ; // 等待鎖 處於block的線程 有資格成爲候選資源的線程
        _SpinFreq     = 0 ;
        _SpinClock    = 0 ;
        OwnerIsThread = 0 ;
      }

咱們知道 synchronized 是重量級鎖,效率很低。不過在 JDK 1.6 中對 synchronized 的實現進行了各類優化,使得它顯得不是那麼重了。

鎖優化

JDK1.6 對鎖的實現引入了大量的優化,如自旋鎖適應性自旋鎖鎖消除鎖粗化偏向鎖輕量級鎖等技術來減小鎖操做的開銷。

鎖主要存在四中狀態,依次是:無鎖狀態偏向鎖狀態輕量級鎖狀態重量級鎖狀態。他們會隨着競爭的激烈而逐漸升級。注意鎖能夠升級不可降級,這種策略是爲了提升得到鎖和釋放鎖的效率。

適應自旋鎖

所謂自適應就意味着自旋的次數再也不是固定的,它是由前一次在同一個鎖上的自旋時間及鎖的擁有者的狀態來決定。

線程若是自旋成功了,那麼下次自旋的次數會更加多,由於虛擬機認爲既然上次成功了,那麼這次自旋也頗有可能會再次成功,那麼它就會容許自旋等待持續的次數更多。反之,若是對於某個鎖,不多有自旋可以成功的,那麼在之後要鎖的時候自旋的次數會減小甚至省略掉自旋過程,以避免浪費處理器資源。

鎖消除

鎖消除是發生在編譯器級別的一種鎖優化方式。

有時候咱們寫的代碼徹底不須要加鎖,卻執行了加鎖操做。

好比 StringBuffer 的 append()方法,Vector 的 add()方法。

若是 JVM 明顯檢測到沒有發生方法逃逸,就會將內部的鎖消除。

鎖粗化

一般狀況下,爲了保證多線程間的有效併發,會要求每一個線程持有鎖的時間儘量短,可是在某些狀況下,一個程序對同一個鎖不間斷、高頻地請求、同步與釋放,會消耗掉必定的系統資源,由於鎖的請求、同步與釋放自己會帶來性能損耗,這樣高頻的鎖請求就反而不利於系統性能的優化了,雖然單次同步操做的時間可能很短。鎖粗化就是告訴咱們任何事情都有個度,有些狀況下咱們反而但願把不少次鎖的請求合併成一個請求,以下降短期內大量鎖請求、同步、釋放帶來的性能損耗。

public void doSomethingMethod(){
    synchronized(lock){
        //do some thing
    }
    synchronized(lock){
        //do other thing
    }
}

偏向鎖

若是使用鎖的線程都只有一個,那麼,維護輕量級鎖都是浪費的。偏向鎖的目標是,減小無競爭且只有一個線程使用鎖的狀況下,使用輕量級鎖產生的性能消耗。輕量級鎖每次申請、釋放鎖都至少須要一次 CAS,但偏向鎖只有初始化時須要一次 CAS。

「偏向」的意思是,偏向鎖假定未來只有第一個申請鎖的線程會使用鎖(不會有任何線程再來申請鎖),所以,只須要在 Mark Word 中 CAS 記錄 owner,若是記錄成功,則偏向鎖獲取成功,記錄鎖狀態爲偏向鎖,之後當前線程等於 owner 就能夠零成本的直接得到鎖;不然,說明有其餘線程競爭,膨脹爲輕量級鎖

偏向鎖沒法使用自旋鎖優化,由於一旦有其餘線程申請鎖,就破壞了偏向鎖的假定。

輕量級鎖

輕量級鎖的目標是,減小無實際競爭狀況下,使用重量級鎖產生的性能消耗,包括系統調用引發的內核態與用戶態切換、線程阻塞形成的線程切換等。

顧名思義,輕量級鎖是相對於重量級鎖而言的。使用輕量級鎖時,不須要申請互斥量,僅僅將 Mark Word 中的部分字節 CAS 更新指向線程棧中的 Lock Record,若是更新成功,則輕量級鎖獲取成功,記錄鎖狀態爲輕量級鎖;不然,說明已經有線程得到了輕量級鎖,目前發生了鎖競爭(不適合繼續使用輕量級鎖),接下來膨脹爲重量級鎖

固然,因爲輕量級鎖自然瞄準不存在鎖競爭的場景,若是存在鎖競爭但不激烈,仍然能夠用自旋鎖優化,自旋失敗後再膨脹爲重量級鎖。

重量級鎖

內置鎖在 Java 中被抽象爲監視器鎖(monitor)。在 JDK 1.6 以前,監視器鎖能夠認爲直接對應底層操做系統中的互斥量(mutex)。這種同步方式的成本很是高,包括系統調用引發的內核態與用戶態切換、線程阻塞形成的線程切換等。所以,後來稱這種鎖爲「重量級鎖」。

synchronized 鎖的升級過程

鎖升級是單向的: 無鎖 -> 偏向鎖 -> 輕量級鎖 -> 重量級鎖


圖片引用 blog.dreamtobe.cn

原子類

什麼是原子類,有什麼用

  • 首先原子,就是不可分割。一個操做是不可中斷的,即便在多線程環境下也能夠保證。
  • java.util.concurrent.atomic 包。
  • 原子類的做用和鎖類似,是爲了保證併發狀況下線程安全。不過原子類相比於鎖,有必定的優點。
  • 粒度更細:原子變量能夠把競爭範圍縮小到變量級別,這是咱們能夠得到到最細粒度的狀況,一般鎖的粒度都要大於變量的粒度。
  • 效率更高:一般,使用原子類的效率比使用鎖要高。

六類原子類縱覽

Atomic 基本類型原子類

以 AtomicInteger 爲例,經常使用方法:

  • int get() : 獲取當前值
  • int getAndSet(int i): 獲取當前值,並設置新值
  • int getAndIncrement() : 獲取當前的值,並自增
  • int getAndDecrement() :獲取當前值,並自減
  • int getAndAdd(int delta): 獲取當前值,並在當前值上增長預期值
  • boolean compareAndSet(int expect,int update):若是輸入的數值等於預期值,則以原子的方式將該值設置爲輸入值(update)。

代碼演示:

public class AtomicIntegerTest implements Runnable {

    private static AtomicInteger atomicInteger = new AtomicInteger();
    private static int i = 0;

    public static void main(String[] args) throws InterruptedException {
        AtomicIntegerTest test = new AtomicIntegerTest();
        Thread thread = new Thread(test);
        Thread thread1 = new Thread(test);
        thread.start();
        thread1.start();
        thread.join();
        thread1.join();
        System.out.println("原子類結果爲:" + atomicInteger.get());
        System.out.println("普通int結果爲:" + i);
    }

    @Override
    public void run() {
        for (int j = 0; j < 10000; j++) {
            atomicInteger.getAndIncrement();
            i++;
        }
    }
}

運行結果:

原子類結果爲:20000
普通int結果爲:18647

Atomic 數組原子類

直接代碼演示:

public class AtomicArrTest {

    public static AtomicIntegerArray integerArray = new AtomicIntegerArray(1000);

    public static void main(String[] args) throws InterruptedException {

        //自減
        Runnable runnable1 = () -> {
            for (int i = 0; i < integerArray.length(); i++) {
                integerArray.getAndDecrement(i);
            }
        };

        //自加
        Runnable runnable2 = () -> {
            for (int i = 0; i < integerArray.length(); i++) {
                integerArray.getAndIncrement(i);
            }
        };

        Thread[] threads1 = new Thread[100];
        Thread[] threads2 = new Thread[100];
        for (int i = 0; i < 100; i++) {
            threads1[i] = new Thread(runnable1);
            threads2[i] = new Thread(runnable2);
            threads1[i].start();
            threads2[i].start();
        }

        //等待線程運行結束
        for (int i = 0; i < 100; i++) {
            threads1[i].join();
            threads2[i].join();
        }

        for (int i = 0; i < integerArray.length(); i++) {
            if (integerArray.get(i) != 0) {
                System.out.println("原子類型不安全!發生不等於0的錯誤" + i);
            }
        }
        System.out.println("運行結束");
    }

}

運行結果:

運行結束

能夠發現結果並無一加一減或者一減一加不等於 0 的錯誤。

Atomic Reference 引用類型原子類

AtomicReference 和 AtomicInteger 很是相似,不一樣之處就在於 AtomicInteger 是對整數的封裝,而 AtomicReference 則對應普通的對象引用。也就是它能夠保證你在修改對象引用時的線程安全性。

AtomicReference 是做用是對」對象」進行原子操做。 提供了一種讀和寫都是原子性的對象引用變量。

代碼演示:

public class AtomicReferenceTest {

    public static void main(String[] args) throws InterruptedException {
        AtomicReference<Integer> ref = new AtomicReference<>(new Integer(1000));
        Runnable runnable = () -> {
            for (; ; ) {
                Integer num = ref.get();
                if (ref.compareAndSet(num, num + 1)) {//cas
                    break;
                }
            }
        };
        List<Thread> list = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            Thread t = new Thread(runnable, "Thread-" + i);
            list.add(t);
            t.start();
        }
        for (Thread t : list) {
            t.join();
        }
        System.out.println(ref.get()); //輸出結果:2000
    }

}

把普通變量升級爲具備原子功能

能夠使用 AtomicIntegerFieldUpdater 對普通變量進行升級

那爲何不直接在一開始就進行聲明爲原子變量呢?

由於在有的時候,好比咱們只有在某一時刻須要原子操做,存在大量併發的狀況。而在大部分時候都沒有併發問題的話,就沒有必要一直都進行原子操做。

代碼演示

public class AtomicIntegerFieldUpdaterTest implements Runnable {

    private static Candidate tom = new Candidate();
    private static Candidate peter = new Candidate();
    private static AtomicIntegerFieldUpdater<Candidate> candidateUpdater;

    public static class Candidate {
        volatile int score;
    }

    public static void main(String[] args) throws InterruptedException {
        candidateUpdater = AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score");
        AtomicIntegerFieldUpdaterTest test = new AtomicIntegerFieldUpdaterTest();
        Thread thread1 = new Thread(test);
        Thread thread2 = new Thread(test);
        thread1.start();
        thread2.start();
        thread1.join();
        thread2.join();
        System.out.println("普通變量:" + tom.score);
        System.out.println("原子變量:" + peter.score);
    }

    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            tom.score++;
            candidateUpdater.getAndIncrement(peter);
        }
    }
}

注意點

AtomicIntegerFieldUpdater 不支持 static,以及修飾符不可見範圍。

Adder 累加器

Adder 是 Java 8 中引入的一個類。

高併發下 LongAdder 比 AtomicLong效率高,不過本質仍是空間換時間

競爭激烈的時候,LongAdder 把不一樣線程對應到不一樣的 Cell 上進行修改,下降了衝突機率,是多段鎖的理念,提升了併發效率。

代碼演示 AtomicLong 耗時

public class AtomicLongTest {

    public static void main(String[] args) {
        AtomicLong counter = new AtomicLong(0);
        ExecutorService service = Executors.newFixedThreadPool(20);
        long start = System.currentTimeMillis();
        for (int i = 0; i < 10000; i++) {
            service.submit(new Task(counter));
        }
        service.shutdown();
        while (!service.isTerminated()) {

        }
        long end = System.currentTimeMillis();
        System.out.println(counter.get());
        System.out.println("AtomicLong耗時:" + (end - start));
    }

    private static class Task implements Runnable {

        private AtomicLong counter;

        public Task(AtomicLong counter) {
            this.counter = counter;
        }

        @Override
        public void run() {
            for (int i = 0; i < 10000; i++) {
                counter.incrementAndGet();
            }
        }
    }

}

運行結果:

100000000
AtomicLong耗時:1624

代碼演示 AtomicLong 耗時

public class LongAdderTest {

    public static void main(String[] args) {
        LongAdder counter = new LongAdder();
        ExecutorService service = Executors.newFixedThreadPool(20);
        long start = System.currentTimeMillis();
        for (int i = 0; i < 10000; i++) {
            service.submit(new Task(counter));
        }
        service.shutdown();
        while (!service.isTerminated()) {

        }
        long end = System.currentTimeMillis();
        System.out.println(counter.sum());
        System.out.println("LongAdder耗時:" + (end - start));
    }

    private static class Task implements Runnable {

        private LongAdder counter;

        public Task(LongAdder counter) {
            this.counter = counter;
        }

        @Override
        public void run() {
            for (int i = 0; i < 10000; i++) {
                counter.increment();
            }
        }
    }

}

運行結果:

100000000
LongAdder耗時:464

能夠看到差距很是大,咱們看一下爲何 AtomicLong 在併發下執行時間這麼長。

AtomicLong 的弊端

由於每一次加法,都要進行 flush 和 refresh 致使耗費資源。

在線程 1 進行了修改操做後,就要當即刷新到主存,而後其餘線程再去進行更新。

LongAdder 的改進

LongAdder 的實現原理是,在每一個線程內部都有一個本身的計數器,僅在本身內部計數,這樣就不會被其餘線程的計數器干擾。

如圖示,第一個線程計數器的值也就是 ctr‘ 爲 1 的時候,可能線程 2 的 str‘’已是 3 了,它們之間並不存在競爭關係,因此在加和的過程當中,不須要同步,也不須要 flush 和 refresh。

LongAdder 引入了分段累加的概念,內部有一個 base 變量和一個 Cell[]數組共同參與計數:

base 變量:競爭不激烈,直接累加到變量上。

Cell[] 數組:競爭激烈,各個線程分散累加到本身的槽 Cell[i] 中。

sum 方法源碼

public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {//若是沒有用到cell直接返回
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;//逐步累加
            }
        }
        return sum;
}

總結

  1. 在低爭用的狀況下,二者差距不大,可是在競爭激烈的狀況下,LongAdder 吞吐量要高,可是要消耗更多的空間。
  2. LongAdder 適合的場景是統計求和的場景,並且 LongAdder 只提供了 add 方法,而 AtomicLong 還具備 CAS 方法。

Accumulator 累加器

Accumulator 和 Adder 相似,就是一個更通用版本的 Adder。

代碼演示

public class LongAccumulatorTest {

     public static void main(String[] args) {
        LongAccumulator accumulator = new LongAccumulator((x, y) -> x + y, 100);
        ExecutorService executor = Executors.newFixedThreadPool(8);
        IntStream.range(1, 10).forEach(i -> executor.submit(() -> accumulator.accumulate(i)));

        executor.shutdown();
        while (!executor.isTerminated()) {

        }
        System.out.println(accumulator.getThenReset());
    }
}

運行結果:

145

CAS

CAS 是 compare and swap 的縮寫,也就是咱們所說的比較並交換。cas 是一種基於鎖的操做,並且是樂觀鎖。

舉例就是我認爲 V 的值應該是張三,若是是的話我就把它改成李四,若是不是張三,就說明被人就改過了,那我就不修改了。

CAS 中有三個操做數:內存值 V,預期值 A,要修改的值 B,當V == A 時,則修改成B。不然什麼都不作,返回如今的 V 值。

CAS 源碼解析

例如 AtomicInteger 原子類加載了 Unsafe 工具,用來直接操做內存數據

用 volatile 修飾 value 字段,保證可見性。

就以 getAndAdd 方法舉例,咱們看下源碼:

public final int getAndAdd(int delta) {
        return unsafe.getAndAddInt(this, valueOffset, delta);
    }

底層調用了 unsafe 類的方法:

public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

        return var5;
}

裏面使用了 do while 循環,若是 a,b 線程同時執行這個方法,a 線程拿到值 1 後 cpu 執行時間到了掛起,b 開始執行,也拿到 1,可是沒有掛起,接着將值變成了 2。

這個時候 a 線程恢復執行,去比較的時候發現手上的 1 和內存裏面的值 2 不等,這個時候就要進行下一個循環。

compareAndSwapInt 是 native 方法,Unsafe 類提供了硬件級別的原子操做,而咱們傳入的 valueOffset 就是根據內存偏移地址獲取數據原值,這樣就能夠經過 unsafe 來實現 CAS。

總結

  • CAS 存在 ABA 問題。

ABA 問題就是在主內存中本來是 A 後來有另一個線程修改成了 B 後又改回了 A,第一個線程回來看後仍是 A 覺得沒有變化,實際上已經有了變化。

如何解決 ABA 問題?

AtomicStampedReference 增長版本號,進行版本號判斷。

  • 自旋時間可能過長。

併發容器

併發容器概覽

  • ConcurrentHashMap:線程安全的 HashMap
  • CopyOnWriteArrayList:線程安全的 List
  • BlockingQueue:這是一個接口,表示阻塞隊列
  • ConcurrentLinkedQueue:高效的非阻塞併發隊列,使用鏈表實現,是一個線程安全的 LinkedList
  • ConcurrentSkipListMap:是一個 Map,使用跳錶的數據結構進行快速查找

古老的同步容器

Vector 和 HashTable

併發性能較差,關鍵方法都是使用 synchronized 修飾的方法級別。

public synchronized V put(K key, V value) {
        // Make sure the value is not null
        if (value == null) {
            throw new NullPointerException();
        }

        // Makes sure the key is not already in the hashtable.
        Entry<?,?> tab[] = table;
        int hash = key.hashCode();
        int index = (hash & 0x7FFFFFFF) % tab.length;
        @SuppressWarnings("unchecked")
        Entry<K,V> entry = (Entry<K,V>)tab[index];
        for(; entry != null ; entry = entry.next) {
            if ((entry.hash == hash) && entry.key.equals(key)) {
                V old = entry.value;
                entry.value = value;
                return old;
            }
        }

        addEntry(hash, key, value, index);
        return null;
    }

HashMap 和 ArrayList

雖然這兩個類不是線程安全的,可是咱們能夠使用 Collections.synchronizedList()和 Collections.synchronizedMap()使其變爲線程安全的。

打開源碼能夠看到是使用的同步代碼塊的方式:

ConcurrentHashMap

Map 家族概覽:

HashMap 關於併發的特色

  1. 非線程安全
  2. 迭代時不容許修改
  3. 只讀的併發是安全的
  4. 若是要用在併發的話,使用 Collections.synchronizedMap(new HashMap())

Java1.7 中 ConcurrentHashMap 結構

java 1.7 中 ConcurrentHashMap 最外層是多個segment每一個 segment 的底層數據結構和 HashMap 相似,仍然是數組和鏈表組成的拉鍊法

每一個 segment 中包含獨立的ReentrantLock鎖,每一個 segment 之間互不影響,提升了併發效率。

ConcurrentHashMap 默認有 16 個 segment,因此最多支持 16 個線程同時併發寫入。這個值能夠在初始化時填入,一旦初始化後,是不能擴容的。

Java8 中 ConcurrentHashMap 結構

put 方法解析

public V put(K key, V value) {
    return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    // 獲得 hash 值
    int hash = spread(key.hashCode());
    // 用於記錄相應鏈表的長度
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 若是數組爲空,進行數組初始化
        if (tab == null || (n = tab.length) == 0)
            // 初始化數組
            tab = initTable();

        // 找該 hash 值對應的數組下標,獲得第一個節點 f
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 若是數組該位置爲空,
            // 用一次 CAS 操做將這個新值放入其中便可,這個 put 操做差很少就結束了
            // 若是 CAS 失敗,那就是有併發操做,進入下一次循環
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;
        }
        //說明正在擴容
        else if ((fh = f.hash) == MOVED)
            // 數據遷移
            tab = helpTransfer(tab, f);

        else { // f 是該位置的頭結點,並且不爲空

            V oldVal = null;
            // 獲取數組該位置的頭結點的監視器鎖
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) { // 頭結點的 hash 值大於 0,說明是鏈表
                        // 用於累加,記錄鏈表的長度
                        binCount = 1;
                        // 遍歷鏈表
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 若是發現了"相等"的 key,判斷是否要進行值覆蓋,而後也就能夠 break 了
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            // 到了鏈表的最末端,將這個新值放到鏈表的最後面
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) { // 紅黑樹
                        Node<K,V> p;
                        binCount = 2;
                        // 調用紅黑樹的插值方法插入新節點
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }

            if (binCount != 0) {
                // 判斷是否要將鏈表轉換爲紅黑樹,臨界值和 HashMap 同樣,也是 8
                if (binCount >= TREEIFY_THRESHOLD)
                    // 這個方法和 HashMap 中稍微有一點點不一樣,那就是它不是必定會進行紅黑樹轉換,
                    // 若是當前數組的長度小於 64,那麼會選擇進行數組擴容,而不是轉換爲紅黑樹
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }

    addCount(1L, binCount);
    return null;
}

get 方法分析

  1. 計算 hash 值
  2. 根據 hash 值找到數組對應位置: (n - 1) & h
  3. 根據該位置處結點性質進行相應查找

若是該位置爲 null,那麼直接返回 null

若是該位置處的節點恰好就是咱們須要的,返回該節點的值便可

若是該位置節點的 hash 值小於 0,說明正在擴容,或者是紅黑樹,而後經過 find 方法去尋找

若是以上 3 條都不知足,那就是鏈表,進行遍歷比對

CopyOnWriteArrayList

Vector 和 SynchronizedList 的鎖粒度太大,併發效率較低,而且迭代時沒法編輯

另外 CopyOnWriteSet 是用來代替同步 Set。

適用場景

讀多寫少

讀操做能夠儘量的快,而寫即便慢一些也不要緊。

在不少應用場景中,讀操做可能會遠遠多於寫操做。好比,有些系統級別的信息,每每只須要加載或者修改不多的次數,可是會被系統內全部模塊頻繁的訪問。對於這種場景,咱們最但願看到的就是讀操做能夠儘量的快,而寫即便慢一些也不要緊。

讀寫規則

以前的讀寫鎖:讀讀共享、寫寫互斥、讀寫互斥。

讀寫鎖規則的升級:讀取是徹底不須要加鎖的,而且更強的是,寫入也不會阻塞讀取操做,只有寫入和寫入之間須要同步等待。

代碼演示

首先咱們看一下使用 ArrayList 帶來的修改問題。

對 Vector、ArrayList 在迭代的時候若是同時對其進行修改就會拋出 java.util.ConcurrentModificationException 異常

public class Test {

    public static void main(String[] args) {
        List<String> list = new CopyOnWriteArrayList<>();
        list.add("1");
        list.add("2");
        list.add("3");
        list.add("4");
        list.add("5");
        Iterator<String> iterator = list.iterator();
        while (iterator.hasNext()) {
            String next = iterator.next();
            System.out.println(list);
            if (next.equals("3")) {
                list.remove("5");
            }
            if (next.equals("4")) {
                list.add("new");
            }
        }
    }

}

運行結果:

[1, 2, 3, 4, 5]
[1, 2, 3, 4, 5]
Exception in thread "main" java.util.ConcurrentModificationException
    at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
    at java.util.ArrayList$Itr.next(ArrayList.java:859)
    at test.Test.main(Test.java:25)

咱們看一下源碼:

final void checkForComodification() {
            if (modCount != expectedModCount)
                throw new ConcurrentModificationException();
}

在建立迭代器的時候會把對象的 modCount 的值傳遞給迭代器的 expectedModCount。

每次 next 的時候判斷是否一致若是不一致則拋出異常。

使用 CopyOnWriteArrayList

public class Test {

    public static void main(String[] args) {
        List<String> list = new CopyOnWriteArrayList<>();
        list.add("1");
        list.add("2");
        list.add("3");
        list.add("4");
        list.add("5");
        Iterator<String> iterator = list.iterator();
        while (iterator.hasNext()) {
            String next = iterator.next();
            System.out.println(list);
            if (next.equals("3")) {
                list.remove("5");
            }
            if (next.equals("4")) {
                list.add("new");
            }
        }
    }

}

運行結果:

[1, 2, 3, 4, 5]
[1, 2, 3, 4, 5]
[1, 2, 3, 4, 5]
[1, 2, 3, 4]
[1, 2, 3, 4, new]

源碼解析

先看一下 add 方法

public boolean add(E e) {
        //1.得到獨佔鎖
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //2.得到Object[]數組
            Object[] elements = getArray();
            //3.得到elements的長度
            int len = elements.length;
            //4.複製到新的數組
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            //5.將add的元素添加到新元素
            newElements[len] = e;
            //6.替換以前的數據
            setArray(newElements);
            return true;
        } finally {
            //7.釋放獨佔鎖
            lock.unlock();
        }
}

CopyOnWriteArrayList 使用了 ReentrantLock 獨佔鎖,保證同時只有一個線程對集合進行修改操做。

數據是存儲在 CopyOnWriteArrayList 中的 array 數組中的。

在添加元素的時候,並非直接往 array 裏面 add 元素,而是複製出來了一個新的數組,而且複製出來的數組的長度是 【舊數組的長度+1】,再把舊的數組替換成新的數組。

get方法

public E get(int index) {
        return get(getArray(), index);
}

get 方法沒有加鎖,很簡單,直接獲取元素。 可是不保證數據是最新的,也就是弱一致性

set方法

public E set(int index, E element) {
        //得到獨佔鎖
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //得到Object數組
            Object[] elements = getArray();
            //根據下標,得到舊的元素
            E oldValue = get(elements, index);
            //若是舊的元素不等於新的元素
            if (oldValue != element) {
                // 得到舊數組的長度
                int len = elements.length;
                // 複製出新的數組
                Object[] newElements = Arrays.copyOf(elements, len);
                // 修改
                newElements[index] = element;
                //替換
                setArray(newElements);
            } else {
                //爲了保證volatile 語義,即便沒有修改,也要替換成新的數組
                setArray(elements);
            }
            return oldValue;
        } finally {
            //釋放獨佔鎖
            lock.unlock();
        }
    }

仍是使用 lock 加鎖,而後複製一個 arr 副本進行修改,以後覆蓋。

remove 方法

public E remove(int index) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            E oldValue = get(elements, index);
            int numMoved = len - index - 1;
            if (numMoved == 0)
                setArray(Arrays.copyOf(elements, len - 1));
            else {
                Object[] newElements = new Object[len - 1];
                System.arraycopy(elements, 0, newElements, 0, index);
                System.arraycopy(elements, index + 1, newElements, index,
                                 numMoved);
                setArray(newElements);
            }
            return oldValue;
        } finally {
            lock.unlock();
        }
    }

能夠看到,remove 方法和 add,set 方法是同樣的,第一步仍是先獲取獨佔鎖,來保證線程安全性,若是要刪除的元素是最後一個,則複製出一個長度爲【舊數組的長度-1】的新數組,隨之替換,這樣就巧妙的把最後一個元素給刪除了,若是要刪除的元素不是最後一個,則分兩次複製,隨之替換。

CopyOnWrite 的缺點

CopyOnWrite 容器有不少優勢,可是同時也存在兩個問題,即內存佔用問題和數據一致性問題。因此在開發的時候須要注意一下。

內存佔用問題:由於 CopyOnWrite 的寫時複製機制,因此在進行寫操做的時候,內存裏會同時駐紮兩個對象的內存,舊的對象和新寫入的對象。

數據一致性問題:CopyOnWrite 容器只能保證數據的最終一致性,不能保證數據的實時一致性。

阻塞隊列簡介

什麼是阻塞隊列?

阻塞隊列(BlockingQueue) 是一個支持兩個附加操做的隊列。這兩個附加的操做是:在隊列爲空時,獲取元素的線程會等待隊列變爲非空。當隊列滿時,存儲元素的線程會等待隊列可用。阻塞隊列經常使用於生產者和消費者的場景,生產者是往隊列裏添加元素的線程,消費者是從隊列裏拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器裏拿元素。

經常使用的隊列主要有如下兩種:

先進先出(FIFO):先插入隊列的元素也最早出隊列,相似於排隊的功能。

後進先出(LIFO):後插入隊列的元素最早出隊列,這種隊列優先處理最近發生的事件。

核心方法

方法類型 拋出異常 特殊值 阻塞 超時
插入 add(e) offer(e) put(e) offer(e,time,unit)
移除 remove() poll() take() poll(time,unit)
檢查 element() peek() 不可用 不可用

ArrayBlockingQueue

ArrayBlockingQueue 是一個阻塞式的隊列,繼承自 AbstractBlockingQueue,間接的實現了 Queue 接口和 Collection 接口。底層以數組的形式保存數據(實際上可看做一個循環數組)。而且是一個基於數組的阻塞隊列。

ArrayBlockingQueue 是一個有界隊列,有界也就意味着,它不可以存儲無限多數量的對象。因此在建立 ArrayBlockingQueue 時,必需要給它指定一個隊列的大小。

而且還能夠指定是否公平,若是保證公平的話,那麼等待了最長時間的線程會被優先處理,不過會帶來性能損耗。

代碼示例

有 10 個面試者,一共只有一個面試官,大廳裏有 3 個位置,每一個面試時間是 10 秒,模擬面試場景。

public class ArrayBlockingQueueTest {
    public static void main(String[] args) {

        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);

        Interviewer r1 = new Interviewer(queue);
        Consumer r2 = new Consumer(queue);
        new Thread(r1).start();
        new Thread(r2).start();
    }
}

class Interviewer implements Runnable {

    BlockingQueue<String> queue;

    public Interviewer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        System.out.println("10個候選人都來啦");
        for (int i = 0; i < 10; i++) {
            String candidate = "Candidate" + i;
            try {
                queue.put(candidate);
                System.out.println("安排好了" + candidate);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        try {
            queue.put("stop");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class Consumer implements Runnable {

    BlockingQueue<String> queue;

    public Consumer(BlockingQueue queue) {

        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        String msg;
        try {
            while (!(msg = queue.take()).equals("stop")) {
                System.out.println(msg + "到了");
            }
            System.out.println("全部候選人都結束了");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

運行結果:

10個候選人都來啦
安排好了Candidate0
安排好了Candidate1
安排好了Candidate2
Candidate0到了
安排好了Candidate3
Candidate1到了
Candidate2到了
安排好了Candidate4
Candidate3到了
Candidate4到了
Candidate5到了
安排好了Candidate5
安排好了Candidate6
Candidate6到了
安排好了Candidate7
Candidate7到了
安排好了Candidate8
Candidate8到了
安排好了Candidate9
Candidate9到了
全部候選人都結束了

源碼解析

ArrayBlockingQueue 進隊操做採用了加鎖的方式保證併發安全。

public void put(E e) throws InterruptedException {
    // 非空判斷
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    // 獲取鎖
    lock.lockInterruptibly();
    try {
        while (count == items.length) {
            // 一直阻塞,知道隊列非滿時,被喚醒
            notFull.await();
        }
        // 進隊
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

LinkedBlockingQueue

LinkedBlockingQueue 不一樣於 ArrayBlockingQueue,它若是不指定容量,默認爲 Integer.MAX_VALUE,也就是無界隊列。因此爲了不隊列過大形成機器負載或者內存爆滿的狀況出現,咱們在使用的時候建議手動傳入一個隊列的大小。

源碼分析

/**
 * 節點類,用於存儲數據
 */
static class Node<E> {
    E item;
    Node<E> next;

    Node(E x) { item = x; }
}

/** 阻塞隊列的大小,默認爲Integer.MAX_VALUE */
private final int capacity;

/** 當前阻塞隊列中的元素個數 */
private final AtomicInteger count = new AtomicInteger();

/**
 * 阻塞隊列的頭結點
 */
transient Node<E> head;

/**
 * 阻塞隊列的尾節點
 */
private transient Node<E> last;

/** 獲取並移除元素時使用的鎖,如take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** notEmpty條件對象,當隊列沒有數據時用於掛起執行刪除的線程 */
private final Condition notEmpty = takeLock.newCondition();

/** 添加元素時使用的鎖如 put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** notFull條件對象,當隊列數據已滿時用於掛起執行添加的線程 */
private final Condition notFull = putLock.newCondition();

從上面的屬性咱們知道,每一個添加到 LinkedBlockingQueue 隊列中的數據都將被封裝成 Node 節點,添加的鏈表隊列中,其中 head 和 last 分別指向隊列的頭結點和尾結點。與 ArrayBlockingQueue 不一樣的是,LinkedBlockingQueue 內部分別使用了 takeLock 和 putLock 對併發進行控制,也就是說,添加和刪除操做並非互斥操做,能夠同時進行,這樣也就能夠大大提升吞吐量。

put方法

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 獲取鎖
    putLock.lockInterruptibly();
    try {
        //判斷隊列是否已滿,若是已滿阻塞等待
        while (count.get() == capacity) {
            notFull.await();
        }
        // 把node放入隊列中
        enqueue(node);
        c = count.getAndIncrement();
        // 再次判斷隊列是否有可用空間,若是有喚醒下一個線程進行添加操做
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    // 若是隊列中有一條數據,喚醒消費線程進行消費
    if (c == 0)
        signalNotEmpty();
}
  • 隊列已滿,阻塞等待。
  • 隊列未滿,建立一個 node 節點放入隊列中,若是放完之後隊列還有剩餘空間,繼續喚醒下一個添加線程進行添加。若是放以前隊列中沒有元素,放完之後要喚醒消費線程進行消費。

PriorityBlockingQueue

PriorityBlockingQueue 是一個支持優先級的無界阻塞隊列,直到系統資源耗盡。默認狀況下元素採用天然順序升序排列。也能夠自定義類實現 compareTo()方法來指定元素排序規則,或者初始化 PriorityBlockingQueue 時,指定構造參數 Comparator 來對元素進行排序。但須要注意的是不能保證同優先級元素的順序。PriorityBlockingQueue 也是基於最小二叉堆實現,使用基於 CAS 實現的自旋鎖來控制隊列的動態擴容,保證了擴容操做不會阻塞 take 操做的執行。

SynchronousQueue

SynchronousQueue 是一個內部只能包含一個元素的隊列。插入元素到隊列的線程被阻塞,直到另外一個線程從隊列中獲取了隊列中存儲的元素。一樣,若是線程嘗試獲取元素而且當前不存在任何元素,則該線程將被阻塞,直到線程將元素插入隊列。

SynchronousQueue 沒有 peek 等函數,由於 peek 的含義是取出頭結點,可是 SynchronousQueue 容量是 0,因此沒有頭結點。

SynchronousQueue 是線程池 Executors.newCachedThreadPool()使用的阻塞隊列。

DelayQueue

DelayQueue 是一個沒有邊界 BlockingQueue 實現,加入其中的元素必需實現 Delayed 接口。當生產者線程調用 put 之類的方法加入元素時,會觸發 Delayed 接口中的 compareTo 方法進行排序,也就是說隊列中元素的順序是按到期時間排序的,而非它們進入隊列的順序。排在隊列頭部的元素是最先到期的,越日後到期時間赿晚。底層基於前面說過的 PriorityBlockingQueue 實現的。

ConcurrentLikedQueue

是一個適用於高併發場景下的隊列,經過無鎖的方式,底層使用 CAS,實現了高併發狀態下的高性能,一般 ConcurrentLikedQueue 性能好於 BlockingQueue。

它是一個基於鏈接節點的無界線程安全隊列。該隊列的元素遵循先進先出的原則。頭是最早加入的,尾是最近加入的,該隊列不容許 null 元素。

控制併發流程

什麼是控制併發流程

控制併發流程的工具類,做用就是幫助咱們更容易的讓線程之間合做,相互配合,來知足業務邏輯。

好比線程 A 等待線程 B 執行完成後再執行某段代碼。

經常使用的控制併發流程的工具類

做用 說明
Semaphore 信號量,能夠經過控制"許可證"的數量,來保證線程之間的配合 線程只有在拿到"許可證"後才能繼續運行,更加靈活
CyclicBarrier 線程會等待,直到足夠多線程達到了事先規定的數目,一旦到達觸發條件,就能夠進行下一步操做 是用於線程間相互等待處理結果就緒的狀況
Phaser 和 CyclicBarrier 相似,但計數可變 java1.7 中加入
CountDownLatch 和 CyclicBarrier 相似,數量遞減到 0 時候觸發 不能夠重複使用
Exchanger 讓兩個對象在合適時候交換對象 適用於在兩個線程工做同一個類的不一樣實例時,交換數據
Condition 能夠控制線程的等待和喚醒 是 Object.wait()升級版

CountDownLatch

什麼是 CountDownLatch

CountDownLatch 這個類使一個線程等待其餘線程各自執行完畢後再執行。
是經過一個計數器來實現的,傳入須要倒數的值。每當一個線程執行完畢後,計數器的值就減 1,當計數器的值爲 0 時,表示全部線程都執行完畢,而後在閉鎖上等待的線程就能夠恢復工做了。

主要方法介紹

  • CountDownLatch(int count):僅有一個構造函數,參數爲 count 須要倒數的值。
  • await():調用 await()方法的線程會被掛起,他會等待直到 count 爲 0 纔會繼續執行。
  • countDown():將 count 值減 1.直到爲 0 時,其餘等待的線程就會被喚醒。

用法一:一個線程等待多個線程都執行完畢,再繼續本身的工做

public class CountDownLatchTest {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(5);
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        Runnable r = () -> {
            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " 已經上車!");
            countDownLatch.countDown();
        };
        for (int i = 0; i < 5; i++) {
            executorService.execute(r);
        }
        System.out.println("等待你們上車");
        countDownLatch.await();
        System.out.println("5我的都已經上車,能夠出發咯");
        executorService.shutdown();
    }
}

運行結果:

等待你們上車
pool-1-thread-2 已經上車!
pool-1-thread-1 已經上車!
pool-1-thread-3 已經上車!
pool-1-thread-4 已經上車!
pool-1-thread-5 已經上車!
5我的都已經上車,能夠出發咯

用途二:多個線程同時等待結束後一塊兒工做

public class CountDownLatchTest {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        Runnable r = () -> {
            try {
                System.out.println(Thread.currentThread().getName() + " 已經就緒!");
                countDownLatch.await();
                System.out.println(Thread.currentThread().getName() + " 開始跑步!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        for (int i = 0; i < 5; i++) {
            executorService.execute(r);
        }
        TimeUnit.SECONDS.sleep(1);
        System.out.println("信號槍!biu!");
        countDownLatch.countDown();
        executorService.shutdown();
    }
}

運行結果:

pool-1-thread-1 已經就緒!
pool-1-thread-2 已經就緒!
pool-1-thread-3 已經就緒!
pool-1-thread-4 已經就緒!
pool-1-thread-5 已經就緒!
信號槍!biu!
pool-1-thread-1 開始跑步!
pool-1-thread-2 開始跑步!
pool-1-thread-5 開始跑步!
pool-1-thread-4 開始跑步!
pool-1-thread-3 開始跑步!

注意點

CountDownLatch 是不可以重用的,若是須要從新計數能夠使用 CyclicBarrier,或者建立新的 CountDownLatch 實例。

Semaphore 信號量

Semaphore 能夠用來限制和管理數量優先資源的使用狀況。

信號量的做用是維護一個許可證的計數,線程能夠獲取許可證,那信號量剩餘的許可證就減一,線程也能夠釋放一個許可證,那就會加一,當信號量所擁有的許可證爲 0 的時候,則須要等待,直到有線程釋放了許可證。

主要方法介紹

  • new Semaphore(int permits,boolean fair):第一個參數爲許可證數量,第二個是否爲公平策略,即等待的線程放到 FIFO 隊列中。
  • acquire():獲取一個許可證,若是沒有則等待,容許被中斷。
  • acquireUninterruptibly():取一個許可證,若是沒有則等待,不容許被中斷。
  • tryAcquire():看看目前有沒有空閒的許可證,有就獲取,無則幹別的事,不阻塞。
  • tryAcquire(long timeout):若是在 timeout 時間段內拿不到,就作別的事。
  • release():歸還許可證。

代碼演示:每次只有三我的的作任務

public class SemaphoreTest {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        Semaphore semaphore = new Semaphore(3, true);
        Runnable r = () -> {
            try {
                semaphore.acquire(); //acquire裏面能夠傳入數值,好比傳入3 也就是一下能夠拿三個許可,同時釋放時候也要傳入對應的值
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("拿到許可證!開始作任務!");
            try {
                TimeUnit.MILLISECONDS.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任務結束!釋放許可證!");
            semaphore.release();
        };
        for (int i = 0; i < 1000; i++) {
            executorService.submit(r);
        }
        executorService.shutdown();
    }

}

Condition

Condition 做用

當線程 1 須要等待某個條件的時候,它就去執行 condition.await()方法,一旦執行了 await()方法,線程就進入阻塞狀態。

而後假設線程 2 執行condition.signal()方法,這時 JVM 就會從被阻塞的線程中找到那些被 condition.await()中的線程,這樣線程 1 就會受到可執行信號,狀態就變成Runnable

signalAll()和 signal()的區別

signalAll()會喚起全部的正在等待的線程。

可是 signal()是公平的,只會喚起等待時間最長的線程。

Condition 基本使用

public class ConditionTest {

    private static Lock lock = new ReentrantLock();
    public static Condition condition = lock.newCondition();

    public static void main(String[] args) {
        ConditionTest test = new ConditionTest();
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
                test.methodB();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        test.methodA();
    }

    private void methodA() {
        lock.lock();
        try {
            System.out.println("開始阻塞");
            condition.await();
            System.out.println("我被喚醒了!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    private void methodB() {
        lock.lock();
        try {
            condition.signal();
        } finally {
            lock.unlock();
        }
    }
}

運行結果:

開始阻塞
我被喚醒了!

使用 Condition 實現生產者消費者

public class ConditionTest {

    private int queueSize = 10;
    private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
    private Lock lock = new ReentrantLock();
    private Condition notFull = lock.newCondition();
    private Condition notEmpty = lock.newCondition();

    public static void main(String[] args) {
        ConditionTest test = new ConditionTest();
        Producer producer = test.new Producer();
        Consumer consumer = test.new Consumer();
        producer.start();
        consumer.start();
    }

    class Consumer extends Thread {

        @Override
        public void run() {
            consume();
        }

        private void consume() {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == 0) {
                        System.out.println("隊列空,等待數據");
                        try {
                            notEmpty.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    queue.poll();
                    notFull.signalAll();
                    System.out.println("從隊列裏取走了一個數據,隊列剩餘" + queue.size() + "個元素");
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    class Producer extends Thread {

        @Override
        public void run() {
            produce();
        }

        private void produce() {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == queueSize) {
                        System.out.println("隊列滿,等待有空餘");
                        try {
                            notFull.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    queue.offer(1);
                    notEmpty.signalAll();
                    System.out.println("向隊列插入了一個元素,隊列剩餘空間" + (queueSize - queue.size()));
                } finally {
                    lock.unlock();
                }
            }
        }
    }
}

Condition 注意點

實際上,Condition 就是用來代替 Object.wait/nofity 的,因此用法上和性質上幾乎同樣。

await()方法會自動釋放 Lock 鎖,和 Object.wait 同樣,不須要本身手動釋放鎖。

調用 await()的時候,必須持有鎖,不然會拋出異常。

CyclicBarrier 循環柵欄

CyclicBarrier 和 CountDownLatch 很像,都能阻塞一組線程。

當有大量的線程相互配合,分別計算不一樣任務,最後統一彙總時候,咱們能夠使用 CyclicBarrier,CyclicBarrier 能夠構造一個集結點,當某一個線程完畢後,就會到達集結點等待,等全部線程都到了以後,柵欄就會被撤銷,而後全部線程統一出發,繼續執行剩下的任務。

代碼演示

public class CyclicBarrierTest {

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("全部人都到場了, 你們統一出發!"));
        for (int i = 0; i < 10; i++) {
            new Thread(new Task(i, cyclicBarrier)).start();
        }
    }

    static class Task implements Runnable {
        private int id;
        private CyclicBarrier cyclicBarrier;

        public Task(int id, CyclicBarrier cyclicBarrier) {
            this.id = id;
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            System.out.println("線程" + id + "如今前往集合地點");
            try {
                Thread.sleep((long) (Math.random() * 10000));
                System.out.println("線程" + id + "到了集合地點,開始等待其餘人到達");
                cyclicBarrier.await();
                System.out.println("線程" + id + "出發了");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}

運行結果:

線程0如今前往集合地點
線程1如今前往集合地點
線程2如今前往集合地點
線程3如今前往集合地點
線程4如今前往集合地點
線程5如今前往集合地點
線程6如今前往集合地點
線程7如今前往集合地點
線程8如今前往集合地點
線程9如今前往集合地點
線程9到了集合地點,開始等待其餘人到達
線程8到了集合地點,開始等待其餘人到達
線程6到了集合地點,開始等待其餘人到達
線程5到了集合地點,開始等待其餘人到達
線程0到了集合地點,開始等待其餘人到達
全部人都到場了, 你們統一出發!
線程0出發了
線程9出發了
線程6出發了
線程8出發了
線程5出發了
線程1到了集合地點,開始等待其餘人到達
線程4到了集合地點,開始等待其餘人到達
線程2到了集合地點,開始等待其餘人到達
線程3到了集合地點,開始等待其餘人到達
線程7到了集合地點,開始等待其餘人到達
全部人都到場了, 你們統一出發!
線程7出發了
線程1出發了
線程2出發了
線程3出發了
線程4出發了

CyclicBarrier 和 CountDownLatch 的區別

做用不一樣:CyclicBarrier 要等待固定線程數量都到了柵欄位置才能繼續執行;而 CountDownLatch 只須要等待數字爲 0,也就是說 CountDownLatch 用於事件,可是 CyclicBarrier 用於線程。

可重用性不一樣:CountDownLatch 在到達 0 後打開門閂,就不能在使用了,除非用新的實例,而 CyclicBarrier 能夠重複使用。

AQS

AQS 全名:AbstractQueuedSynchronizer,是併發容器 java.lang.concurrent 下 locks 包內的一個類。它實現了一個 FIFO 的隊列。底層實現的數據結構是一個雙向鏈表

AQS 核心思想是,若是被請求的共享資源空閒,則將當前請求資源的線程設置爲有效的工做線程,而且將共享資源設置爲鎖定狀態。若是被請求的共享資源被佔用,那麼就須要一套線程阻塞等待以及被喚醒時鎖分配的機制,這個機制 AQS 是用 CLH 隊列鎖實現的,即將暫時獲取不到鎖的線程加入到隊列中。

AQS 內部維護了一個 CLH 隊列來管理鎖。線程會首先嚐試獲取鎖,若是失敗就將當前線程及等待狀態等信息包裝成一個node節點加入到同步隊列sync queue裏。接着會不斷的循環嘗試獲取鎖,條件是當前節點爲 head 的直接後繼纔會嘗試。若是失敗就會阻塞本身直到本身被喚醒。而當持有鎖的線程釋放鎖的時候,會喚醒隊列中的後繼線程。

CLH(Craig,Landin,and Hagersten)隊列是一個虛擬的雙向隊列(虛擬的雙向隊列即不存在隊列實例,僅存在結點之間的關聯關係)。AQS 是將每條請求共享資源的線程封裝成一個 CLH 鎖隊列的一個結點(Node)來實現鎖的分配。

AQS 內部核心部分

AQS 最核心的三大部分:

  • state
  • 控制線程搶鎖和配合的 FIFO 隊列
  • 指望協做工具類去實現的獲取/釋放等重要方法

state 狀態

/**
* The synchronization state.
*/
private volatile int state;

這個 state 具體含義,會根據具體實現類不一樣而不一樣,好比在 Semaphore 裏,它表示剩餘的許可證數量,而在 CountDownLatch 中,表示還須要倒數的數量

state 是 volatile 修飾的,會被併發的修改,因此全部修改 state 的方法都須要保證線程安全,好比 getState、setState 以及 compareAndSetState 操做來讀取和更新這個狀態。這些方法都依賴與 atomic 包的支持。

protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

控制線程搶鎖和配合的 FIFO 隊列

這個隊列用來存放等待的線程,AQS 就是排隊管理器,當多個線程爭用同一把鎖時,必須有排隊機制將沒有拿到線程的鎖串在一塊兒,當鎖釋放的時候,管理器就會挑選一個合適的線程來佔有釋放的鎖。

AQS 會維護一個等待的線程隊列,把線程都放到隊列中。

指望協做工具類去實現的獲取/釋放等重要方法

這裏的獲取和釋放方法,是利用 AQS 的寫做工具類中最重要的方法,是由協做類本身去實現的,而且含義各不相同

獲取方法

  • 會依賴 state 變量,常常會阻塞
  • 在 Semaphore 中,獲取就是 acquire 方法,做用就是獲取一個許可證
  • 在 CountDownLatch 中,獲取就是 await 方法,做用就是等待直到 0 結束

釋放方法

  • 釋放操做不會阻塞
  • 在 Semaphore 中,釋放就是 release 方法,做用就是釋放一個許可證
  • 在 CountDownLatch 中,獲取就是 CountDown 方法,做用就是減小一個數

而且子類還須要重寫 tryAcquire 和 tryRelease 方法。

AQS 源碼分析

AQS 用法

第一步:寫一個類,想好協做的邏輯,實現獲取/釋放方法。

第二步:內部寫一個Sync類繼承AbstractQueuedSynchronizer

第三步:根據是否獨佔來重寫 tryAcquire/tryRelease 或 tryAcquireShared(int acquires)和 tryReleaseShared(int releases)等方法,在以前寫的獲取/釋放方法中調用 AQS 的 acquire 或者 shared 方法。

AQS 在 CountDownLatch 中的應用

  • 內部類 Sync 繼承了 AQS

首先咱們看一下構造方法

底層建立了一個 Sync 對象。

public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
}

getCount 方法中也只是返回了 state 的值。

public long getCount() {
        return sync.getCount();
}

int getCount() {
    return getState();
}

await 方法解析:

public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
        if (Thread.interrupted())//判斷當前線程是否中斷
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)  //tryAcquireShared主要判斷當前狀態是否==0,若是返回1 能夠直接放行,不然返回-1 進入隊列
            doAcquireSharedInterruptibly(arg);
    }
protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
}
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
        final Node node = addWaiter(Node.SHARED); //加入到node節點中,SHARED表示共享模式
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())  //阻塞當前線程
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
}

countDown 方法解析:

public void countDown() {
        sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared(); //若是返回true,則會調用此方法喚醒全部等待中的線程。
            return true;
        }
        return false;
    }
protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0) // == 0 說明已經釋放
                    return false;
                int nextc = c-1; // 將state - 1
                if (compareAndSetState(c, nextc)) //經過CAS更新state
                    return nextc == 0; //若是== 0說明門閘打開
            }
}

Future 和 Callable

基本用法

首先看一下 Runnable 的缺陷

  • 沒有返回值
  • 沒法拋出異常

Callable 接口

  • 相似於 Runnable,被其餘線程執行的任務
  • 實現 call 方法
  • 有返回值
  • 能夠拋出異常
@FunctionalInterface
public interface Callable<V> {

    V call() throws Exception;
}

Future 類

在併發編程中,咱們常常用到非阻塞的模型,在以前的多線程的三種實現中,無論是繼承 Thread 類仍是實現 Runnable 接口,都沒法保證獲取到以前的執行結果。經過實現 Callable 接口,並用 Future 能夠來接收多線程的執行結果。

Future 表示一個可能尚未完成的異步任務的結果,針對這個結果能夠添加 Callable 以便在任務執行成功或失敗後做出相應的
操做。

Future 接口定義了主要的 5 個接口方法,有 RunnableFuture 和 SchedualFuture 繼承這個接口,以及 CompleteFuture 和 ForkJoinTask 繼承這個接口。

Callable 和 Future 的關係

  • 咱們能夠用 Future.get()方法來獲取 Callable 接口返回的執行結果,還能夠經過 Future.isDone()來判斷任務是否以及執行完了,以及取消這個任務,限時獲取任務的結果等。
  • 在 call()未執行完畢以前,調用 get()的線程會被阻塞,知道 call()方法返回告終果後,纔會獲得結果,而後線程切換至 Runnable 狀態。

因此 Future 是一個存儲器,它存儲了call()這個任務的結果,而這個任務的執行時間是沒法提早肯定的,由於這徹底取決於 call()方法執行的狀況。

主要方法介紹

  • get():獲取結果,get 方法的行爲取決於 Callable 任務的狀態,只有如下五種狀況:
  1. 任務正常完成,get 方法當即返回結果
  2. 任務沒有完成,get 方法會阻塞到任務完成
  3. 任務執行中拋出異常,get 方法就會拋出 ExecutionException:這裏拋出的異常是 call()執行時產生的異常,不論裏面 call()拋出的是什麼異常
  4. 任務被取消,get 方法拋出 CancellationException
  5. 任務超時,get 方法能夠傳入超時時間,若是時間到了還沒獲取到結果,get 方法就會拋出 TimeoutException
  • get(long timeout,TimeUnit unit):有超時的獲取
  • cancel():取消任務的執行
  • isDone():判斷線程是否執行完畢
  • isCancelled():判斷是否被取消

基本使用

在阻塞一秒後獲取到返回值。

public class FutureTest {

    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(10);
        Callable<Integer> callable = () -> {
            TimeUnit.SECONDS.sleep(1);
            return 10;
        };
        Future<Integer> future = service.submit(callable);
        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        service.shutdown();
    }
}

運行結果:

10

異常捕獲演示

無論裏面發生什麼異常,咱們只能捕獲到 ExecutionException 異常。

public class FutureTest {

    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(20);
        Future<Integer> future = service.submit(new CallableTask());
        try {
            System.out.println(future.isDone()); //並不關心是否拋出異常
            future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
            System.out.println("InterruptedException異常");
        } catch (ExecutionException e) {
            e.printStackTrace();
            System.out.println("ExecutionException異常");
        }finally {
            service.shutdown();
        }
    }

    static class CallableTask implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            throw new IllegalArgumentException("Callable拋出異常");
        }
    }
}

運行結果:

true
ExecutionException異常
java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: Callable拋出異常
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at com.concurrent.FutureTest.main(FutureTest.java:12)
Caused by: java.lang.IllegalArgumentException: Callable拋出異常
    at com.concurrent.FutureTest$CallableTask.call(FutureTest.java:27)
    at com.concurrent.FutureTest$CallableTask.call(FutureTest.java:24)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

cancel 方法:取消任務的執行

  1. 若是這個任務還沒開始執行,那麼這種狀況最簡單,任務被正常的取消,將來也不會被執行,方法返回 true。
  2. 若是任務已經完成,或者已經取消,返回 false。
  3. 若是已經開始了,那麼不會取消該任務,而是根據咱們填入的參數 MayInterruptIfRunning 作判斷。若是傳入 true 則發出中斷信號,false 則不發送。

FutureTask

咱們也能夠使用 FutureTask 來獲取 Future 的任務結果,FutureTask 能夠把 Callable 轉化成 Future 和 Runnable,它同時實現了兩者的接口。

把 Callable 實例當作參數,生成 FutureTask 對象,而後把這個對象當作一個 Runnable 對象,用線程池或另起線程去執行 Runnable 對象,最後經過 FutureTask 獲取剛纔執行的結果。

代碼演示

public class FutureTest {

    public static void main(String[] args) {
        Task task = new Task();
        FutureTask<Integer> integerFutureTask = new FutureTask<>(task);
        ExecutorService service = Executors.newCachedThreadPool();
        service.submit(integerFutureTask);
        try {
            System.out.println("task運行結果:"+integerFutureTask.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } finally {
            service.shutdown();
        }
    }
}

class Task implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        System.out.println("子線程正在計算");
        Thread.sleep(3000);
        int sum = 0;
        for (int i = 0; i <= 100; i++) {
            sum += i;
        }
        return sum;
    }
}

運行結果:

子線程正在計算
task運行結果:5050

FutureTask 注意點

  • Future 的生命週期不能後退,一旦完成後,就停留在完成狀態。
  • 當 for 循環批量獲取 future 的結果時,容易發生一部分線程慢的狀況,get 方法調用時應使用 timeout 限制。也能夠使用 CompletableFuture 工具類,它的做用是哪一個線程先完成就先獲取哪一個結果。
相關文章
相關標籤/搜索