Java基礎系列:線程同步和線程池

小夥伴們,咱們認識一下。java

俗世遊子:專一技術研究的程序猿面試

最近在作新項目的數據庫設計,目前爲止一共出了80張表,預計只作了一半,心好累o(╥﹏╥)o數據庫

前一節咱們聊過了多線程的基礎問題,可是還漏掉一個知識點:緩存

  • 線程同步

這裏咱們補上安全

線程基礎

線程同步

咱們老是在說多線程操做,會出現線程不安全的問題,那麼該怎麼解釋這個線程安全呢?服務器

通俗的來說,當多個線程操做同一份共享數據的時候,數據的一致性被破壞,這就是線程不安全的。數據結構

舉個例子:多線程

循環的數值調大才能看出效果,本人試了好久併發

public class ThreadSafe {

    public static void main(String[] args) throws InterruptedException {
        ShareObj shareObj = new ShareObj();

        new Thread(() -> {
            for (int i = 0; i < 20_0000; i++) {
                shareObj.num += 1;
            }
            System.out.println(shareObj.num);
        }, "線程A").start();
        new Thread(() -> {
            for (int i = 0; i < 20_0000; i++) {
                shareObj.num += 1;
            }
            System.out.println(shareObj.num);
        }, "線程B").start();
    }
}

class ShareObj {
    int num = 0;
}

兩個線程同時操做共享變量的話,就會出現數據不一致的問題:框架

ThreadSafe

synchronized

那怎麼解決這個問題呢,其實也就是加鎖:synchronized,可是咱們要注意加鎖的資源

  • synchronized是對共享變量進行加鎖,只有線程搶佔到鎖以後,該線程才能繼續操做,操做完成以後釋放鎖資源

那麼,上面的小例子咱們就能夠進行調整:

public class ThreadSafe {

    public static void main(String[] args) throws InterruptedException {
        ShareObj shareObj = new ShareObj();

        new Thread(() -> {
            synchronized(shareObj) {
                for (int i = 0; i < 20_0000; i++) {
                    shareObj.num += 1;
                }
                System.out.println(shareObj.num);
            }
        }, "線程A").start();
        new Thread(() -> {
            synchronized(shareObj) {
                for (int i = 0; i < 20_0000; i++) {
                    shareObj.num += 1;
                }
                System.out.println(shareObj.num);
            }
        }, "線程B").start();
    }
}

class ShareObj {
    int num = 0;
}

這樣就解決了問題,達到了咱們預想的結果

那麼咱們來聊一聊synchronized

同步鎖,監視共享資源或共享對象(同步監視器),須要的是Object的子類。能夠經過同步代碼塊或者同步方法的方法來加鎖

同步方法也就是將業務邏輯抽離成一個普通方法,使用synchronized進行修飾,是同樣的效果

public synchronized void update() {
    // 業務邏輯
}
  • 必須是兩個或者兩個以上的線程在同時操做同一份共享資源
  • 使用同步鎖以後,線程只要搶到鎖以後纔可以繼續執行,不然就必須等待鎖釋放

這種狀況下執行效率可見通常

也就是說:

  • 當線程A訪問,鎖定同步監視器,開始執行業務邏輯。當線程B訪問,發現同步監視器鎖定,沒法訪問
  • 當線程A執行完成,解鎖同步監視器,線程B訪問,發現同步監視器未鎖定,鎖定並執行業務邏輯

除了這個問題以外,在線程中中還會出現很是嚴重的問題:死鎖

死鎖,通常狀況下表示互相等待,是程序運行是出現的一種狀態,簡單一點理解:

就是說兩個線程,各自須要對方的資源,可是本身又不釋放本身的資源,就形成了死鎖現象

死鎖沒辦法解決,只能在編寫代碼的過程時刻注意

生產者和消費者

很是經典的一個案例,不知道大家在面試的時候有沒有被它支配過

前提條件:

  • 生產者負責生產產品,放到一個區域裏,消費者從這個區域裏取走產品

關鍵點:

  • 先生產,再消費

定義的產品類

public class Goods {
    // 品牌
    public String brand;
    // 名稱
    public String name;

    public Goods() {
    }
    public Goods(String brand, String name) {
        this.brand = brand;
        this.name = name;
    }
}

生產者

public class Producer implements Runnable {

    private Goods goods;

    public Producer(Goods goods) {
        this.goods = goods;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            if (i % 2 == 0) {
                goods.brand = "農夫山泉";
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                goods.name = "礦泉水";
            } else {
                goods.brand = "旺仔";
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                goods.name = "小饅頭";
            }
            System.out.println(String.format("生產者生產了:%s---%s", goods.brand, goods.name));
        }
    }
}

消費者

public class Consumer implements Runnable {

    private Goods goods;

    public Consumer(Goods goods) {
        this.goods = goods;
    }

    @Override
    public void run() {
        System.out.println("消費者開始消費");
        for (int i = 0; i < 10; i++) {
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(String.format("消費者消費產品:%s--%s", goods.brand, goods.name));
        }
    }
}

測試方法

public class Main {
    public static void main(String[] args) {

        Goods goods = new Goods();
        new Thread(new Producer(goods)).start();
        new Thread(new Consumer(goods)).start();
    }
}

這裏出現兩個問題,看一下結果

  1. 出現先消費後生產的問題
  2. 出現品牌和名稱不一致的問題

生產者消費者問題1

下面咱們來第二版解決

  • 首先是第一個問題:這裏咱們只須要讓生產者先執行就好,那麼若是首先執行到的是消費者,那麼就讓其等待
  • 品牌名稱不對應的問題,這是因爲生產產品和消費產品的方式不是原子性操做,在中間容易被中斷,因此咱們經過同步代碼方法來解決,保證在執行過程當中不會被打斷

在產品類中定義生產和消費的方法

public class Goods {

    // 品牌
    public String brand;

    // 名稱
    public String name;

    // 標誌位
    public boolean flag;

    public Goods() {
    }
    public Goods(String brand, String name) {
        this.brand = brand;
        this.name = name;
    }

    public synchronized void set(String brand, String name) {
        /**
         * 若是生產者搶佔到CPU資源,那麼先判斷當前有沒有產品,若是有產品,那麼就進入等待狀態,等待消費者消費完以後再次生產
         */
        if (flag) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.brand = brand;
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.name = name;

        flag = true;
        // 喚醒消費者消費
        notify();
    }

    public synchronized void get() {
        /**
         * 若是flag=false,說明生產者沒有生產商品,那麼消費者進入等待狀態,等待生產者生產產品以後,而後再次消費
         */
        if (!flag) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(String.format("消費者消費產品:%s--%s", this.brand, this.name));

        flag = false;
        // 喚醒生產者進行生產
        notify();
    }
}

生產者

public class Producer implements Runnable {

    private Goods goods;

    public Producer(Goods goods) {
        this.goods = goods;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            if (i % 2 == 0) {
                goods.set("農夫山泉", "礦泉水");
            } else {
                goods.set("旺仔", "小饅頭");
            }
            System.out.println(String.format("生產者生產了:%s---%s", goods.brand, goods.name));
        }
    }
}

消費者

public class Consumer implements Runnable {

    private Goods goods;

    public Consumer(Goods goods) {
        this.goods = goods;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            goods.get();
        }
    }
}

其餘不變

這樣就解決了上面的兩個問題,

生產者和消費者

其實還有一種解決方式是採用BlockingQueue(隊列)來解決,等待和喚醒的操做就不用咱們來進行,BlockingQueue會幫咱們來完成

就是提一下,不懂的等以後學過了隊列,就清楚了

BlockingQueue的版本,該類位於java.util.concurrent包下(JUC),後續咱們詳細聊

public class Main {
    public static void main(String[] args) {
        BlockingQueue<Goods> queue = new ArrayBlockingQueue<>(5);
        new Thread(new Producer(queue)).start();
        new Thread(new Consumer(queue)).start();
    }
}

class Producer implements Runnable {

    private BlockingQueue<Goods> queue;

    public Producer(BlockingQueue<Goods> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            if (i % 2 == 0) {
                try {
                    queue.put(new Goods("農夫山泉", "礦泉水"));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                try {
                    queue.put(new Goods("旺仔", "小饅頭"));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

class Consumer implements Runnable {

    private BlockingQueue<Goods> queue;

    public Consumer(BlockingQueue<Goods> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                Goods take = queue.take();
                System.out.println(String.format("消費者消費產品:%s--%s", take.brand, take.name));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class Goods {
    // 品牌
    public String brand;
    // 名稱
    public String name;

    public Goods() {
    }
    public Goods(String brand, String name) {
        this.brand = brand;
        this.name = name;

        System.out.println(String.format("生產者生產了:%s---%s", brand, name));
    }
}

線程池

WHY

在實際的使用中,線程是很是消耗系統資源的,並且若是對線程管理不善,很容易形成系統資源的浪費,

並且在實際開發中,會形成線程的不可控,好比:

  • 線程名稱不統一
  • 線程的建立方式等等

所以咱們推薦在實際開發中採用線程池來進行開發,擁有如下優勢:

  • 使用線程池可使用已有的線程來執行任務,能夠避免線程在建立和銷燬時的資源消耗
  • 因爲沒有了線程的建立和銷燬過程,提升了系統的響應性能
  • 能夠經過服務器配置對線程池進行合理的配置,好比:可運行線程數大小等

核心參數

瞭解到這一點以後,咱們來看一看其具體的實現方式,在Java中,建立線程池主要是經過ThreadPoolExecutor來構造,下面咱們來具體瞭解一下

咱們看參數最多的構造方法

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
        null :
    AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

工做原理

在瞭解這些參數以前,咱們先來聊一個知識點,就是線程池的工做原理,否則下面聊着有點生硬。畫個圖:

線程池的工做原理

簡單用文字描述一下就是這樣的過程:

  • 提交進行來的線程任務首先先判斷核心線程池中的線程是否所有都在執行任務,若是不是,那麼就建立線程執行提交進行來的線程任務,不然的話,就進入下一個判斷
  • 判斷線程池中的阻塞隊列是否已經佔滿,若是沒有,就將任務添加到阻塞隊列中等待執行,不然的話,就進行下一個判斷
  • 這裏判斷線程池中全部的線程是否都在執行任務,若是不是,那麼就建立線程執行任務,不然的話就交給飽和策略進行處理

這是總體處理的一個過程,下面咱們去實際源碼中看看:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

execute()的註釋中也有至關詳細的說明

好了,下面看詳細的參數,這裏很是重要

corePoolSize和maximumPoolSize

corePoolSize表示核心線程數,maximumPoolSize表示線程池中容許存在的最大線程數。

那麼

  • 若是正在運行的線程數 小於 核心線程數,那麼當新進任務的話,即便存在空閒狀態下的線程,那麼也會建立新的線程來執行當前任務
  • 若是正在運行的線程數 大於 核心線程數,可是 小於 最大線程數,那麼僅在等待隊列滿的時候纔會建立新的線程

keepAliveTime和unit

keepAliveTime:當線程數大於核心時,多餘的空閒線程在終止以前等待新任務的最長時間

unit表示空閒線程存活時間的表示單位

簡單一點理解:

  • 好比如今線程池中有30個核心線程數,當任務高峯來臨時,當前核心線程數不足,那麼會新建立出20個臨時線程來執行任務,
  • 當任務高峯結束後,發現當前30個核心線程數沒有徹底在執行任務,那麼也就不會用到20個臨時線程,這20個臨時線程就是空閒的線程,而後經歷過指定的時間後,若是尚未用到就會被銷燬

workQueue

阻塞隊列或者說是等待隊列,用於存放等待執行的任務。

隊列在這裏先了解一下,等到後面聊 數據結構 的時候再詳細介紹

數據結構很重要,這裏簡單聊一下

隊列通常會和一塊兒作對比,二者都是動態集合。中刪除的元素都是最近插入的元素,遵循的是後進先出的策略(LIFO);而隊列刪除的都是在集合中存在時間最長的元素,遵循的是先進先出的策略(FIFO)。

這裏我羅列出隊列的類和說明,你們查看一下

隊列

threadFactory

建立新線程時須要用到的工廠,該參數,若是沒有另外指定,則默認使用Executors.defaultThreadFactory() ,該工廠建立的線程所有位於同一ThreadGroup而且具備相同的NORM_PRIORITY優先級和非守護程序狀態。 經過提供其餘ThreadFactory,能夠更改線程的名稱,線程組,優先級,守護程序狀態等。

/**
     * The default thread factory
     */
static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
        Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
            poolNumber.getAndIncrement() +
            "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

Java還爲咱們提供了一種工廠方式:`Executors.privilegedThreadFactory()。返回用於建立具備與當前線程相同權限的新線程的線程工廠

/**
     * Thread factory capturing access control context and class loader
     */
static class PrivilegedThreadFactory extends DefaultThreadFactory {
    private final AccessControlContext acc;
    private final ClassLoader ccl;

    PrivilegedThreadFactory() {
        super();
        SecurityManager sm = System.getSecurityManager();
        if (sm != null) {
            // Calls to getContextClassLoader from this class
            // never trigger a security check, but we check
            // whether our callers have this permission anyways.
            sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);

            // Fail fast
            sm.checkPermission(new RuntimePermission("setContextClassLoader"));
        }
        this.acc = AccessController.getContext();
        this.ccl = Thread.currentThread().getContextClassLoader();
    }

    public Thread newThread(final Runnable r) {
        return super.newThread(new Runnable() {
            public void run() {
                AccessController.doPrivileged(new PrivilegedAction<Void>() {
                    public Void run() {
                        Thread.currentThread().setContextClassLoader(ccl);
                        r.run();
                        return null;
                    }
                }, acc);
            }
        });
    }
}

還有一點,若是咱們想自定義線程工廠的話,那麼咱們能夠參考上面兩種的寫法

handler

飽和策略,也能夠稱爲拒絕策略。也就是當線程池中線程數都佔滿了沒法再繼續添加執行任務,最後就會交給飽和策略來處理

在線程池中飽和策略分爲四種:

  • ThreadPoolExecutor.AbortPolicy

這是Java提供的默認策略,也就是說當前策略會丟棄任務並拋出RejectedExecutionException異常

public static class AbortPolicy implements RejectedExecutionHandler {
    /**
         * Creates an {@code AbortPolicy}.
         */
    public AbortPolicy() { }

    /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}
  • ThreadPoolExecutor.CallerRunsPolicy

當前策略是經過調用線程處理該任務,只要線程池不關閉,那麼就會執行該任務

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    /**
         * Creates a {@code CallerRunsPolicy}.
         */
    public CallerRunsPolicy() { }

    /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}
  • ThreadPoolExecutor.DiscardPolicy

什麼都不作,直接將任務丟棄

public static class DiscardPolicy implements RejectedExecutionHandler {
    /**
         * Creates a {@code DiscardPolicy}.
         */
    public DiscardPolicy() { }

    /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}
  • ThreadPoolExecutor.DiscardOldestPolicy

也就是說,若是線程池沒有關閉,那麼將阻塞隊列中的頭任務丟棄,而後再經過execute()從新執行當前任務

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
    public DiscardOldestPolicy() { }

    /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}

JDK提供的線程池

在Java中,提供三種類型的線程池,下面咱們一一來聊一聊

ThreadPoolExecutor

線程池執行器,爲咱們提供瞭如下幾種:

  • newCachedThreadPool

建立一個可根據須要建立新線程的線程池,可是在之前構造的線程可用時將重用它們,並在須要時使用提供的 ThreadFactory, 可用於業務邏輯處理時間短的操做

該方法是無參或者參數爲ThreadFactory,其構建參數以下:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}

擁有如下特性:

  • 線程池數量沒有固定,默承認以達到Integer最大值
  • 線程池中的線程可進行重複利用和回收,默認時長爲1分鐘
  • 緩存隊列採用的是無緩衝的等待隊列,採用直接交接的排隊策略

寫個小案例

private static void newCachePoolExecutor() {
    ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
    for (int i = 0; i < 20; i++) {
        newCachedThreadPool.execute(() -> System.out.println("anc"));
    }
    newCachedThreadPool.shutdown();
}
  • newFixedThreadPool

建立一個可重用固定線程數的線程池,已***隊列方式來運行這些線程

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}

擁有如下特性:

  • 在線程池中的線程數處於必定的量,能夠很好的控制線程的併發數
  • 線程在顯示關閉以前,線程池內的線程都將一直存在
  • 採用無限隊列的排隊策略,當核心線程中的線程繁忙時,將新任務添加到隊列中等待,在這裏,參數maximumPoolSize無效

寫個小案例:

private static void newFixPoolExecutor() {
    ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(12);
    for (int i = 0; i < 20; i++) {
        newFixedThreadPool.execute(() -> System.out.println("anc"));
    }
    newFixedThreadPool.shutdown();
}
  • newSingleThreadPoolExecutor

建立一個使用單個worker線程的Executor,已***隊列方式來運行該線程。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(),
                                threadFactory));
}

擁有如下特性:

  • 核心線程數爲1,因此當前線程池只會存在一個正在運行的線程任務
  • 可保證順序的執行各個任務,而且在任意的時間點內不會存在多個活動線程
  • 一樣採用無限隊列的排隊策略

寫個小案例:

private static void newSingleThreadPoolExecutor() {
    ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
    for (int i = 0; i < 20; i++) {
        newSingleThreadExecutor.execute(() -> System.out.println("anc"));
    }
    newSingleThreadExecutor.shutdown();
}

ScheduledThreadPoolExecutor

這是一種可調度的執行器,也就是說能夠執行定時任務,常常有面試會被問到

除了使用定時任務框架和Timer以外,還有什麼技術能夠實現定時任務?

其中一種就是採用該線程池技術

  • newSingleThreadScheduledExecutor

該方法建立了一個單線程池的可調度執行器,

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1));
}

// ScheduledThreadPoolExecutor 繼承自ThreadPoolExecutor
public ScheduledThreadPoolExecutor(int corePoolSize) {
    // 這裏調用父類的構造方法
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

擁有以下特性:

  • 核心線程數爲1,以後提交的線程任務會排在隊列中一次等待執行
  • 使用延時隊列的方式來保存任務,只有當其中元素指定的延遲時間到了,才能從隊列中獲取到該元素。

寫個小案例:

private static void newSingleScheduledPoolExecutor() {
     ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();

     // 延遲1s執行,每一個1s執行一次
     newSingleThreadScheduledExecutor.scheduleAtFixedRate(() -> System.out.println("kk"), 1L, 1L, TimeUnit.SECONDS);

     // 延遲1s執行
     newSingleThreadScheduledExecutor.schedule(() -> System.out.println("kk"), 1L,  TimeUnit.SECONDS);
 }
  • newScheduledThreadPool

建立一個線程池,可安排在給定延遲後運行命令或者按期執行,和上面線程池同樣,最終調用的是同一個類,可是不一樣點在於:

該線程池能夠指定核心線程數

寫個小案例:

private static void newScheduledThreadPool() {
    ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(6);

    newScheduledThreadPool.scheduleAtFixedRate(() -> System.out.println("kk"), 1L, 1L, TimeUnit.SECONDS);
    newScheduledThreadPool.schedule(() -> System.out.println("aa"), 1L,  TimeUnit.SECONDS);
}

ForkJoinPool

該線程池是JDK1.7以後添加進來的,採用了分而治之的思想,在大數據中不少地方都用到了這種思想。

建立一個帶並行級別的線程池,並行級別決定了同一個時刻作多有多少線程在執行,如不傳並行級別參數,將默認爲當前系統的CPU個數

我直接給個案例吧,你們看看,畢竟這種方式本人在實際的開發中基本沒有用過

這是計算總和的例子

public class SumTask extends RecursiveTask<Integer> {

    private static final int THRESHOLD = 20;

    private int[] arry;
    private int start;
    private int end;

    public SumTask(int[] arry, int start, int end) {
        this.arry = arry;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        if (end - start < THRESHOLD) {
            for (int i = start; i < end; i++) {
                sum += arry[i];
            }
            return sum;
        } else {
            int middle = (start + end) / 2;
            SumTask left = new SumTask(arry, start, middle);
            SumTask right = new SumTask(arry, middle, end);

//            left.fork();
//            right.fork();
            invokeAll(left, right);

            return left.join() + right.join();
        }
    }
}

int[] arry = new int[100];
for (int i = 0; i < 100; i++) {
    arry[i] = new Random().nextInt(20);
}

// 實際調用
SumTask sumTask = new SumTask(arry, 0, arry.length);
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();

System.out.println("多線程執行結果:" + forkJoinPool.submit(sumTask).get());

實現類:

  • RecursiveTask

經過泛型能夠指定其執行的返回結果

  • RecursiveAction

無返回值

生命週期

線程池生命週期只有兩種:

  • RUNNING

線程池在RUNNING狀態下,可以接收新的任務,而且也可以處理阻塞隊列中的任務

  • TERMINATED

線程池正式進入到已終止的狀態

在這兩種狀態中間,還包含三種過分狀態:

  • SHUTDOWN

當線程池調用shutdown()方法的時候,會進入到SHUTDOWN狀態,該狀態下,線程池再也不接收新的任務,可是阻塞隊列中的任務卻能夠繼續執行

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);  // SHUTDOWN狀態
        interruptIdleWorkers();     // 中斷可能正在等待任務的線程
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();             // 若是(SHUTDOWN狀態和線程池和隊列爲空)或(STOP和線程池爲空),則轉換爲TERMINATED狀態
}
  • STOP

當線程池調用shutdownNow()方法的時候,會進入到STOP狀態,該狀態下,線程池再也不接收新的任務,也不會執行阻塞隊列中的任務,同時還會中斷如今執行的任務

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);  // STOP狀態
        interruptWorkers();     // 中斷全部線程,即便處於活動狀態也是如此
        tasks = drainQueue();   // 將沒有執行的任務從隊列中remove(),並添加到List中
    } finally {
        mainLock.unlock();
    }
    tryTerminate();             // 若是(SHUTDOWN狀態和線程池和隊列爲空)或(STOP和線程池爲空),則轉換爲TERMINATED狀態
    return tasks;
}

private List<Runnable> drainQueue() {
    BlockingQueue<Runnable> q = workQueue;
    ArrayList<Runnable> taskList = new ArrayList<Runnable>();
    q.drainTo(taskList);
    if (!q.isEmpty()) {
        for (Runnable r : q.toArray(new Runnable[0])) {
            if (q.remove(r))
                taskList.add(r);
        }
    }
    return taskList;
}

上面也就是shutdown()shutdownNow()的區別,更多的是推薦使用shutdown()

  • TIDYING

當線程池中全部任務都已終止,而且 工做線程 爲0,那麼線程池就會調用terminated()方法進入到TERMINATED狀態

用圖來表示:

ThreadPoolExecutor生命週期

寫在最後

多線程的基礎知識就聊到這裏,歡迎你們在評論區積極互動,提出本身的看法

相關文章
相關標籤/搜索