基礎篇:高併發一瞥,線程和線程池的總結

  • 進程是執行程序的實體,系統的調度執行單元,擁有獨屬的進程空間(內存、磁盤等)。而線程是進程的一個執行流程,一個進程可包含多個線程,共享該進程的全部資源:代碼段,數據段(全局變量和靜態變量),堆存儲;但每一個線程擁有本身的執行棧和局部變量
  • 進程建立要分配資源,進程切換既要保存當前進程環境,也要設置新進程環境,開銷大;而線程共享進程的資源,共享部分不需重分配、切換,線程的建立切換是小於進程的。所以更偏向使用線程提高程序的併發性
  • 線程又份內核態和用戶態,內核態可被系統感知調度執行;用戶態是用戶程序級別的,系統不知線程的存在,線程調度由程序負責

1 JAVA線程的實現原理

  • java的線程是基於操做系統原生的線程模型(非用戶態),經過系統調用,將線程交給系統調度執行
  • java線程擁有屬於本身的虛擬機棧,當JVM將棧、程序計數器、工做內存等準備好後,會分配一個系統原生線程來執行。Java線程結束,原生線程隨之被回收
  • 原生線程初始化完畢,會調Java線程的run方法。當JAVA線程結束時,則釋放原生線程和Java線程的全部資源
  • java方法的執行對應虛擬機棧的一個棧幀,用於存儲局部變量、操做數棧、動態連接、方法出口等

2 JAVA線程的生命週期

  • New(新建狀態):用new關鍵字建立線程以後,該線程處於新建狀態,此時僅由JVM爲其分配內存,並初始化其成員變量
  • Runnable(就緒狀態):當調用Thread.start方法後,該線程處於就緒狀態。JVM會爲其分配虛擬機棧等,而後等待系統調度
  • Running(運行狀態):處於就緒狀態的線程得到CPU,執行run方法時,則線程處於運行狀態
  • Blocked(阻塞狀態):阻塞狀態是指線程放棄了cpu的使用權(join,sleep函數的調用),處於暫中止狀態。Blocked狀態的線程須要恢復到Runnable狀態,才能再次被系統調度執行變成Running
  • Dead(線程死亡):線程正常run結束、或拋出一個未捕獲的Throwable、調用Thread.stop來結束該線程,都會致使線程的死亡

  • java線程和linux線程的生命週期基本是一一對應了,就是多了new階段

3 JAVA線程的經常使用方法

  • 線程啓動函數
//Thread.java
//調用start啓動線程,進入Runnable狀態,等待系統調度執行
public synchronized void start(){//synchronized同步執行
    if (threadStatus != 0) //0 表明new狀態,非0則拋出錯誤
            throw new IllegalThreadStateException();
    ...
    start0(); //本地方法方法 private native void start0()
    ...
}
//Running狀態,新線程執行的代碼方法,可被子類重寫
public void run() {
    if (target != null) {
        //target是Runnable,new Thread(Runnable)時傳入
        target.run(); 
    }
}
  • 線程終止函數
//Thread.java
@Deprecated public final void stop();
//中斷線程
public void interrupt()
//判斷的是當前線程是否處於中斷狀態
public static boolean interrupted()
  • 用stop會強行終止線程,致使線程所持有的所有鎖忽然釋放(不可控制),而被鎖突同步的邏輯遭到破壞。不建議使用
  • interrupt函數中斷線程,但它不必定會讓線程退出的。它比stop函數優雅,可控制html

    • 當線程處於調用sleep、wait的阻塞狀態時,會拋出InterruptedException,代碼內部捕獲,而後結束線程
    • 線程處於非阻塞狀態,則須要程序本身調用interrupted()判斷,再決定是否退出
  • 其餘經常使用方法
//Thread.java
//阻塞等待其餘線程
public final synchronized void join(final long millis)
//暫時讓出CPU執行
public static native void yield();
//休眠一段時間
public static native void sleep(long millis) throws InterruptedException;
  • start與run方法的區別java

    • start是Thread類的方法,從線程的生命週期來看,start的執行並不意味着新線程的執行,而是讓JVM分配虛擬機棧,進入Runnable狀態,start的執行仍是在舊線程上
    • run則是新線程被系統調度,獲取CPU時執行的方法,函數run則是繼承Thread重寫的run或者實現接口Runnable的run
  • Thread.sleep與Object.wait區別linux

    • Thread.sleep須要指定休眠時間,時間一到可繼續運行;和鎖機制無關,沒有加鎖也不用釋放鎖
    • Object.wait須要在synchronized中調用,不然報IllegalMonitorStateException錯誤。wait方法會釋放鎖,須要調用相同鎖對象Object.notify來喚醒線程

4 線程池及其優勢

  • 線程的每次使用建立,結束銷燬是很是巨大的開銷。若用緩存的策略(線程池),暫存曾經建立的線程,複用這些線程,能夠減小程序的消耗,提升線程的利用率
  • 下降資源消耗:重複利用線程可下降線程建立和銷燬形成的消耗
  • 提升響應速度:當任務到達時,不須要等待線程建立就能當即執行
  • 提升線程的可管理性:使用線程池能夠進行統一的分配,監控和調優

5 JDK封裝的線程池

//ThreadPoolExecutor.java
public ThreadPoolExecutor(
    int corePoolSize, 
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler)
  • 1 corePoolSize:核心線程數,線程池維持的線程數量
  • 2 maximumPoolSize:最大的線程數,當阻塞隊列不可再接受任務時且maximumPoolSize大於corePoolSize則會建立非核心線程來執行。但任務執行時,會被銷燬
  • 3 keepAliveTime:非核心線程在閒暇間的存活時間
  • 4 TimeUnit:和keepAliveTime配合使用,表示keepAliveTime參數的時間單位
  • 5 workQueue:任務的等待阻塞隊列,正在執行的任務數超過corePoolSize時,加入該隊列
  • 6 threadFactory:線程的建立工廠
  • 7 handler:拒絕策略,線程數達到了maximumPoolSize,還有任務提交則使用拒絕策略處理

6 線程池原理之執行流程

//ThreadPoolExecutor.java
public void execute(Runnable command) {
    ...
    if (workerCountOf(c) < corePoolSize) { //plan A
        if (addWorker(command, true))  
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) { //plan B
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //addWorker(command, false) false表明可建立非核心線程來執行任務
    else if (!addWorker(command, false)) //plan C
        reject(command);    // //plan D
}
  • plan A:任務的execute,先判斷核心線程數量達到上限;否,則建立核心線程來執行任務;是,則執行plan B
  • plan B:當任務數大於核心數時,任務被加入阻塞隊列,若是超過阻塞隊列的容量上限,執行C
  • plan C: 阻塞隊列不能接受任務時,且設置的maximumPoolSize大於corePoolSize,建立新的非核心線程執行任務
  • plan D:當plan A、B、C都無能爲力時,使用拒絕策略處理

7 阻塞隊列的簡單瞭解

  • 隊列的阻塞插入:當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿
  • 隊列的阻塞移除:當隊列爲空時,獲取元素的線程會等待隊列變爲非空
  • BlockingQueue提供的方法以下,其中put和take是阻塞操做
操做方法 拋出異常 返回特殊值 阻塞線程 超時退出
插入元素 add(e) offer(e) put(e) offer(e, timeout, unit)
移除元素 remove() poll() take() pull(timeout, unit)
檢查 element() peek()
  • ArrayBlockingQueue程序員

    • ArrayBlockingQueue是用數組實現的有界阻塞隊列,必須指定隊列大小,先進先出(FIFO)原則排隊
  • LinkedBlockingQueue算法

    • 是用鏈表實現的有界阻塞隊列,若是構造LinkedBlockingQueue時沒有指定大小,則默認是Integer.MAX_VALUE,無限大
    • 該隊列生產端和消費端使用獨立的鎖來控制數據操做,以此來提升隊列的併發性
  • PriorityBlockingQueue數組

    • public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator)
    • 基於數組,元素具備優先級的無界阻塞隊列,優先級由Comparator決定
    • PriorityBlockingQueue不會阻塞生產者,卻會在沒有可消費的任務時,阻塞消費者
  • DelayQueue緩存

    • 支持延時獲取元素的無界阻塞隊列,基於PriorityQueue實現
    • 元素必須實現Delayed接口,指定多久才能從隊列中獲取該元素。
    • 可用於緩存系統的設計、定時任務調度等場景的使用
  • SynchronousQueue併發

    • SynchronousQueue是一種無緩衝的等待隊列,添加一個元素必須等待被取走後才能繼續添加元素
  • LinkedTransferQueueless

    • 由鏈表組成的TransferQueue無界阻塞隊列,相比其餘隊列多了tryTransfer和transfer函數
    • transfer:當前有消費者正在等待元素,則直接傳給消費者,不然存入隊尾,並阻塞等待元素被消費才返回
    • tryTransfer:試探傳入的元素是否能直接傳給消費者。若是沒消費者等待消費元素,元素加入隊尾,返回false
  • LinkedBlockingDequeide

    • LinkedBlockingDeque是由鏈表構建的雙向阻塞隊列,多了一端可操做入隊出隊,少了一半的競爭,提升併發性

8 Executors的四種線程池淺析

  • newFixedThreadPool
//Executors.java
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
}
  • 指定核心線程數,隊列是LinkedBlockingQueue無界阻塞隊列,永遠不可能拒絕任務;適合用在穩定且固定的併發場景,建議線程設置爲CPU核數
  • newCachedThreadPool
//Executors.java
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                60L, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>());
}
  • 核心池大小爲0,線程池最大線程數爲最大整型,任務提交先加入到阻塞隊列中,非核心線程60s沒任務執行則銷燬,阻塞隊列爲SynchronousQueue。newCachedThreadPool會不斷的建立新線程來執行任務,不建議用
  • newScheduledThreadPool
//Executors.java
public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue(), threadFactory);
}
//指定延遲執行時間    
public <V> ScheduledFuture<V> 
schedule(Callable<V> callable, long delay, TimeUnit unit)
  • ScheduledThreadPoolExecutor(STPE)實際上是ThreadPoolExecutor的子類,可指定核心線程數,隊列是STPE的內部類DelayedWorkQueue。STPE的好處是 A 延時可執行任務,B 可執行帶有返回值的任務
  • newSingleThreadExecutor
//Executors.java
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<Runnable>())); //無界隊列
}
  • 和newFixedThreadPool構造方法一致,不過線程數被設置爲1了。SingleThreadExecutor比new個線程的好處是;線程運行時拋出異常的時候會有新的線程加入線程池完成接下來的任務;阻塞隊列能夠保證任務按FIFO執行

9 若是優雅地關閉線程池

  • 線程池的關閉,就要先關閉池中的線程,上文第三點有提,暴力強制性stop線程會致使同步數據的不一致,所以咱們要調用interrupt關閉線程
  • 而線程池提供了兩個關閉方法,shutdownNow和shuwdown
  • shutdownNow:線程池拒接收新任務,同時立馬關閉線程池(進行中的任務會執行完),隊列的任務再也不執行,返回未執行任務List
public List<Runnable> shutdownNow() {
    ...
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock(); //加鎖
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        interruptWorkers(); //interrupt關閉線程
        tasks = drainQueue(); //未執行任務
    ...
  • shuwdown:線程池拒接收新任務,同時等待線程池裏的任務執行完畢後關閉線程池,代碼和shutdownNow相似就不貼了

10 線程池爲何使用的是阻塞隊列

先考慮下爲啥線程池的線程不會被釋放,它是怎麼管理線程的生命週期的呢

//ThreadPoolExecutor.Worker.class
final void runWorker(Worker w) {
    ...
    //工做線程會進入一個循環獲取任務執行的邏輯
    while (task != null || (task = getTask()) != null)
    ...
}

private Runnable getTask(){
    ...
    Runnable r = timed ? 
        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 
        : workQueue.take(); //線程會阻塞掛起等待任務,
    ...    
}

能夠看出,無任務執行時,線程池實際上是利用阻塞隊列的take方法掛起,從而維持核心線程的存活

11 線程池的worker繼承AQS的意義

//Worker class,一個worker一個線程
Worker(Runnable firstTask) {
    //禁止新線程未開始就被中斷
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

final void runWorker(Worker w) {
    ....
    //對應構造Worker是的setState(-1)
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
        ....
        w.lock(); //加鎖同步
        ....
        try {
            ...
            task.run();
            afterExecute(task, null);
        } finally {
            ....
            w.unlock(); //釋放鎖
        }

worker繼承AQS的意義:A 禁止線程未開始就被中斷;B 同步runWorker方法的處理邏輯

12 拒絕策略

  • AbortPolicy 丟棄任務並拋出RejectedExecutionException異常
  • DiscardOldestPolicy 丟棄隊列最前面的任務,而後從新提交被拒絕的任務
  • DiscardPolicy 丟棄任務,可是不拋出異常
  • CallerRunsPolicy
A handler for rejected tasks that runs the rejected task directly in the calling thread of the {@code execute} method, unless the executor has been shut down, in which case the task is discarded.

若是任務被拒絕了,則由提交任務的線程執行此任務

13 ForkJoinPool瞭解一波

  • ForkJoinPool和ThreadPoolExecutor不一樣,它適合執行能夠分解子任務的任務,如樹的遍歷,歸併排序等一些遞歸場景


  • ForkJoinPool每一個線程有一個對應的雙端隊列deque;當線程中的任務被fork分裂,分裂出來的子任務會放入線程本身的deque,減小線程的競爭
  • work-stealing工做竊取算法


當線程執行完本身deque的任務,且其餘線程deque還有多的任務,則會啓動竊取策略,從其餘線程deque隊尾獲取線程

  • 使用RecursiveTask實現ForkJoin流程demo
//該demo代碼是引用他人的,若有侵權,請聯繫我
public class ForkJoinPoolTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        for (int i = 0; i < 10; i++) {
            ForkJoinTask task = forkJoinPool.submit(new Fibonacci(i));
            System.out.println(task.get());
        }
    }
    static class Fibonacci extends RecursiveTask<Integer> {
        int n;
        public Fibonacci(int n) {  this.n = n;  }
        @Override
        protected Integer compute() {
            if (n <= 1) { return n; }
            Fibonacci fib1 = new Fibonacci(n - 1);
            fib1.fork(); //至關於開啓新線程執行
            Fibonacci fib2 = new Fibonacci(n - 2);
            fib2.fork(); //至關於開啓新線程執行
            return fib1.join() + fib2.join(); //合併返回結果
        }
    }
}

首發掘金網站,但願你們支持下

https://juejin.im/post/5f016b...

歡迎指正文中錯誤

關注公衆號,一塊兒交流

參考文章

相關文章
相關標籤/搜索