【Java併發編程六】線程池

1、概述

  在執行併發任務時,咱們能夠把任務傳遞給一個線程池,來替代爲每一個併發執行的任務都啓動一個新的線程,只要池裏有空閒的線程,任務就會分配一個線程執行。在線程池的內部,任務被插入一個阻塞隊列(BlockingQueue),線程池裏的線程會去取這個隊列裏的任務。編程

  利用線程池有三個好處:數組

  1. 下降資源消耗。經過重複利用已建立的線程下降線程建立和銷燬形成的消耗
  2. 提升響應速度。當任務到達時,任務能夠不須要的等到線程建立就能當即執行
  3. 提升線程的可管理性。線程是稀缺資源,若是無限制的建立,不只會消耗系統資源,還會下降系統的穩定性,使用線程池能夠進行統一的分配,調優和監控

2、線程池的使用

  一、建立線程池

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
  TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

  建立一個線程池須要的幾個參數:緩存

  • corePoolSize:線程池的基本大小,當提交一個任務到線程池,線程池會建立一個線程來執行任務,即便其餘空閒的基本線程可以執行新任務也會建立線程,等到須要執行的任務數大於corePoolSize時就再也不建立。若是調用了線程池的prestartAllCoreThreads方法,線程池會提早建立並啓動全部基本線程。
  • maximumPoolSize:線程池最大大小,線程池容許建立的最大線程數,若是隊列滿了,而且已建立的線程數小於最大線程數,則線程池會再建立新的線程執行任務。
  • keepAliveTime:線程活動保持時間,線程池的工做線程空閒後,保持存活的時間。
  • unit:線程活動保持時間的單位
  • workQueue:任務隊列,用於保存等待執行的任務的阻塞隊列,能夠選擇如下幾個隊列:

一、ArrayBlockingQueue:是一個基於數組結構的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序。併發

二、LinkedBlockingQueue:一個基於鏈表結構的阻塞隊列,此隊列按FIFO (先進先出) 排序元素,吞吐量一般要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個隊列。ide

三、SynchronousQueue:一個不存儲元素的阻塞隊列。每一個插入操做必須等到另外一個線程調用移除操做,不然插入操做一直處於阻塞狀態,吞吐量一般要高於LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個隊列。工具

四、PriorityBlockingQueue:一個具備優先級得無限阻塞隊列。this

  • threadFactory:用於設置建立線程的工廠,能夠經過線程工程給每一個建立出來的線程設置更有意義的名字。
  • handler:飽和策略,當線程池和隊列都滿了,說明線程池處於飽和狀態,那麼必須採起一種策略處理提交的新任務。這個策略默認狀況下是AbortPolicy,表示沒法處理新任務時拋出異常。

  此外,咱們還能夠經過調用Ececutors中的某個靜態工廠方法來建立一個線程池(它們的內部實現原理都是相同的,僅僅是使用了不一樣的工做隊列或線程池大小):spa

  newFixedThreadPool:建立一個定長的線程池,每當提交一個任務就建立一個線程,直到達到池的最大長度,這時線程池會保持長度不在變化線程

  newCachedThreadPool:建立一個可緩存的線程池,若是當前的線程池的長度超過了處理的須要時,它能夠靈活的回收空閒的線程,當需求增長時,它能夠靈活的添加新的線程,並不會對池的長度作任何限制rest

  newSingleThreadPool:建立一個單線程化的executor,它只會建立惟一的工做者線程來執行任務

  newScheduledThreadPool:建立一個定長的線程池,並且支持定時的以及週期性的任務執行,相似於Timer

  二、向線程池提交任務

  可使用execute向線程池提交任務:

public class Test2
{
    public static void main(String[] args)
    {
        BlockingQueue<Runnable> workQueue=new LinkedBlockingDeque<Runnable>();
        ThreadPoolExecutor poolExecutor=new ThreadPoolExecutor(2, 3, 60, TimeUnit.SECONDS, workQueue);
        poolExecutor.execute(new Task1());
        poolExecutor.execute(new Task2());
     poolExecutor.shutdown(); } }
class Task1 implements Runnable { public void run() { System.out.println("執行任務1"); } } class Task2 implements Runnable { public void run() { System.out.println("執行任務2"); } }

  也可使用submit方法來提交任務,它會返回一個future,咱們能夠經過這個future來判斷任務是否執行成功,經過future的get方法獲取返回值,get方法會阻塞直到任務完成。

public class Test3
{
    public static void main(String[] args) throws InterruptedException, ExecutionException
    {
        ExecutorService executorService = Executors.newCachedThreadPool();
        List<Future<String>> resultList = new ArrayList<Future<String>>();   
        // 建立10個任務並執行
        for (int i = 0; i < 10; i++)
        {
            // 使用ExecutorService執行Callable類型的任務,並將結果保存在future變量中
            Future<String> future = executorService.submit(new TaskWithResult(i));
            resultList.add(future);
        }
        for (Future<String> future : resultList)
        {
            while (!future.isDone());// Future返回若是沒有完成,則一直循環等待,直到Future返回完成
            {
                System.out.println(future.get()); // 打印各個線程(任務)執行的結果
            }
        }
        executorService.shutdown();
    }
}
class TaskWithResult implements Callable<String>
{
    private int id;
    public TaskWithResult(int id)
    {
        this.id = id;
    }
    public String call() throws Exception
    {
        return "執行結果"+id;
    }
}

 

  三、線程關閉

  能夠經過調用線程池的shutdown或shutdownNow方法來關閉線程池,可是它們的實現原理不一樣,shutdown的原理是隻是將線程池的狀態設置成SHUTDOWN狀態,而後中斷全部沒有正在執行任務的線程。shutdownNow的原理是遍歷線程池中的工做線程,而後逐個調用線程的interrupt方法來中斷線程,因此沒法響應中斷的任務可能永遠沒法終止。shutdownNow會首先將線程池的狀態設置成STOP,而後嘗試中止全部的正在執行或暫停任務的線程,並返回等待執行任務的列表。

  只要調用了這兩個關閉方法的其中一個,isShutdown方法就會返回true。當全部的任務都已關閉後,才表示線程池關閉成功,這時調用isTerminaed方法會返回true。至於咱們應該調用哪種方法來關閉線程池,應該由提交到線程池的任務特性決定,一般調用shutdown來關閉線程池,若是任務不必定要執行完,則能夠調用shutdownNow。

3、線程池的執行過程

  整個ThreadExecutor的任務處理通過下面4個步驟,以下圖所示:

  

  一、若是當前的線程數<corePoolSize,提交的Runnable任務,會直接做爲new Thread的參數,當即執行,當提交的任務數超過了corePoolSize,就進入第二部操做

  二、將當前的任務提交到BlockingQueue阻塞隊列中,若是Block Queue是個有界隊列,當隊列滿了以後就進入第三步

  三、若是poolSize < maximumPoolsize時,會嘗試new 一個Thread的進行救急處理,執行對應的runnable任務

  四、若是第三步也沒法處理,就會用RejectedExecutionHandler來作拒絕處理

4、執行定時及週期性任務

  Timer工具管理任務的定時以及週期性執行。示例代碼以下:

public class TimerTest
{
    final static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 
    public static void main(String[] args)
    {
        TimerTask timerTask1=new TimerTask()
        {
            @Override
            public void run()
            {
                System.out.println("任務1執行時間:"+sdf.format(new Date()));
                try
                {
                    Thread.sleep(3000);//模擬任務1執行時間爲3秒
                }
                catch (InterruptedException e)
                {
                    // TODO 自動生成的 catch 塊
                    e.printStackTrace();
                }
            }
        };
        System.out.println("當前時間:"+sdf.format(new Date()));
        Timer timer=new Timer();
        timer.schedule(timerTask1, new Date(),4000); //間隔4秒週期性執行
    }
}

  執行結果:

  

  能夠看到上述任務1以4秒爲間隔週期性執行。可是Timer存在一些缺陷,主要是下面兩個方面的問題:

  缺陷1Timer只建立惟一的線程的來執行全部的Timer任務,若是一個time任務的執行很耗時,會致使其餘的TimeTask的時效準確性出問題。看下面的例子:

public class TimerTest
{
    final static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 
    public static void main(String[] args)
    {
        TimerTask timerTask1=new TimerTask()
        {
            @Override
            public void run()
            {
                System.out.println("任務1執行時間:"+sdf.format(new Date()));
                try
                {
                    Thread.sleep(10000);
                }
                catch (InterruptedException e)
                {
                    // TODO 自動生成的 catch 塊
                    e.printStackTrace();
                }
            }
        };
        TimerTask timerTask2=new TimerTask()
        {
            @Override
            public void run()
            {
                
                System.out.println("任務2執行時間:"+sdf.format(new Date()));
            }
        };
        System.out.println("當前時間:"+sdf.format(new Date()));
        Timer timer=new Timer();
        timer.schedule(timerTask1, new Date(),1000); //間隔1秒週期性執行
        timer.schedule(timerTask2, new Date(),4000); //間隔4秒週期性執行
    }
}

  執行結果:

  

  缺陷2:若是TimeTask拋出未檢查的異常,Timer將產生沒法預料的行爲。Timer線程並不捕獲線程,全部TimerTask拋出的未檢查的異常會終止timer線程。看下面的代碼:

public class TimerTest2
{
    final static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 
    public static void main(String[] args)
    {
        TimerTask timerTask1=new TimerTask()
        {
            @Override
            public void run()
            {
                System.out.println("任務1執行時間:"+sdf.format(new Date()));
                throw new RuntimeException();
            }
        };
        
        TimerTask timerTask2=new TimerTask()
        {
            @Override
            public void run()
            {
                System.out.println("任務2執行時間:"+sdf.format(new Date()));
            }
        };
        System.out.println("當前時間:"+sdf.format(new Date()));
        Timer timer=new Timer();
        timer.schedule(timerTask1, new Date(),1000); //週期1秒執行任務1
        timer.schedule(timerTask2, new Date() ,3000); //週期3秒執行任務2
        
    }
}

  執行結果爲:

  

  

  針對上述的兩個問題,咱們可使用ScheduledThreadPoolExecutor來做爲Timer的替代。

  針對問題1,有下面代碼:

public class ScheduledThreadPoolExecutorTest
{
    final static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    public static void main(String[] args)
    {
        TimerTask timerTask1 = new TimerTask()
        {

            @Override
            public void run()
            {
                System.out.println("任務1執行時間:" + sdf.format(new Date()));
                try
                {
                    Thread.sleep(10000);
                } catch (InterruptedException e)
                {
                    // TODO 自動生成的 catch 塊
                    e.printStackTrace();
                }
            }
        };
        TimerTask timerTask2 = new TimerTask()
        {
            @Override
            public void run()
            {
                System.out.println("任務2執行時間:" + sdf.format(new Date()));
            }
        };
        System.out.println("當前時間:" + sdf.format(new Date()));
        ScheduledThreadPoolExecutor poolExecutor=new ScheduledThreadPoolExecutor(2);
        poolExecutor.scheduleAtFixedRate(timerTask1, 0, 1000,TimeUnit.MILLISECONDS);
        poolExecutor.scheduleAtFixedRate(timerTask2,  0, 4000,TimeUnit.MILLISECONDS);
    }
}

  執行的結果爲:

  

  針對問題2,有下面代碼:

public class ScheduledThreadPoolExecutorTest2
{
    final static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    public static void main(String[] args)
    {
        TimerTask timerTask1 = new TimerTask()
        {

            @Override
            public void run()
            {
                System.out.println("任務1執行時間:" + sdf.format(new Date()));
                throw new RuntimeException();
            }
        };
        TimerTask timerTask2 = new TimerTask()
        {
            @Override
            public void run()
            {

                System.out.println("任務2執行時間:" + sdf.format(new Date()));
            }
        };
        System.out.println("當前時間:" + sdf.format(new Date()));
        ScheduledThreadPoolExecutor poolExecutor=new ScheduledThreadPoolExecutor(2);
        poolExecutor.scheduleAtFixedRate(timerTask1, 0, 1000, TimeUnit.MILLISECONDS);  
        poolExecutor.scheduleAtFixedRate(timerTask2, 0, 2000, TimeUnit.MILLISECONDS);
    }
}

  執行結果爲:

  

5、參考資料

  一、Java併發編程實踐

相關文章
相關標籤/搜索