Executor線程池的簡單使用

  咱們都知道建立一個線程能夠繼承Thread類或者實現Runnable接口,實際Thread類就是實現了Runnable接口。html

  到今天才明白後端線程的做用:咱們能夠開啓線程去執行一些比較耗時的操做,相似於前臺的ajax異步操做,好比說用戶上傳一個大的文件,咱們能夠獲取到文件以後開啓一個線程去操做該文件,可是能夠提早將結果返回去,若是同步處理有可能太耗時,影響系統可用性。java

一、new Thread的弊端

原生的開啓線程執行異步任務的方式:ajax

new Thread(new Runnable() {

    @Override
    public void run() {
        // TODO Auto-generated method stub
    }
}).start();

弊端以下:後端

  • 線程生命週期的開銷很是高。建立線程都會須要時間,延遲處理的請求,而且須要JVM和操做系統提供一些輔助操做。
  • 資源消耗。活躍的線程會消耗系統資源,尤爲是內存。若是可運行的線程數量多於可用處理器的數量,那麼有些線程將會閒置。大量空閒的線程會佔用許多內存,給GC帶來壓力,並且大量線程在競爭CPU資源時會產生其餘的性能開銷。
  •  穩定性。在可建立線程的數量上存在一個限制,這個限制受多個因素的制約,包括JVM的啓動參數、Thread構造函數中請求棧的大小以及底層操做系統的限制。若是破壞了這些限制,極可能拋出  outOfMemoryError異常。

  也就是說在必定的範圍內增長線程的數量能夠提升系統的吞吐率,可是若是超出了這個範圍,再建立更多的線程只會下降程序的執行效率甚至致使系統的崩潰。緩存

 例如:併發

 使用線程池:能夠了解線程池的用法以及線程池的正確的關閉方法:shutdown以後立刻調用awaitTermination阻塞等待實現同步關閉。框架

package cn.qlq.thread.twenty;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Demo1 {
    private static ExecutorService executorService = Executors.newFixedThreadPool(20);
    private static volatile AtomicInteger atomicInteger = new AtomicInteger(0);

    public static void main(String[] args) {
        long startTime = System.currentTimeMillis();
        for (int i = 0; i < 2000; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    atomicInteger.incrementAndGet();
                }
            });
        }
        executorService.shutdown();
        try {
            executorService.awaitTermination(1, TimeUnit.DAYS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(System.currentTimeMillis() - startTime);
        System.out.println(atomicInteger);
    }
}

結果:less

14
2000異步

 

package cn.qlq.thread.twenty;

import java.util.concurrent.atomic.AtomicInteger;

public class Demo2 {
    private static volatile AtomicInteger atomicInteger = new AtomicInteger(0);

    public static void main(String[] args) {
        long startTime = System.currentTimeMillis();
        for (int i = 0; i < 2000; i++) {
            Thread t = new Thread(new Runnable() {
                @Override
                public void run() {
                    atomicInteger.incrementAndGet();
                }
            });
            t.start();
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println(System.currentTimeMillis() - startTime);
        System.out.println(atomicInteger);
    }
}

結果:ide

257
2000

  不使用線程池話費的時間比使用線程池長了好幾倍,也看出了效率問題。

 

2.核心類結構以下:

 

一、Executor是一個頂級接口,它提供了一種標準的方法將任務的提交過程與執行過程解耦開來,並用Runnable來表示任務。

二、ExecutorService擴展了Executor。添加了一些用於生命週期管理的方法(同時還提供一些用於任務提交的便利方法

三、下面兩個分支,AbstractExecutorService分支就是普通的線程池分支,ScheduledExecutorService是用來建立定時任務的。

3.Executor介紹

  線程池簡化了線程的管理工做。在Java類庫中,任務執行的主要抽象不是Thread,而是Executor,以下:

public interface Executor {
    void execute(Runnable command);
}

 

  Executor是個簡單的接口,它提供了一種標準的方法將任務的提交過程與執行過程解耦開來,並用Runnable來表示任務。Executor還提供了對生命週期的支持,以及統計信息收集、應用程序管理機制和性能監視機制。

  Executor基於"生產者-消費者"模式,提交任務的操做至關於生產者,執行任務的則至關於消費者。

生命週期:

  Executor的實現一般會建立線程來執行任務。但JVM只有在全部非守護線程所有終止纔會退出。所以,若是沒法正確的關閉Executor,那麼JVM將沒法結束。ExecutorService擴展了Executor接口,添加了一些用於生命週期管理的方法(同時還提供一些用於任務提交的便利方法

package java.util.concurrent;
import java.util.List;
import java.util.Collection;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;

public interface ExecutorService extends Executor {
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

  ExecutorService的生命週期有三種:運行、關閉和已終止。ExecutorService在建立時處於運行狀態。shutdown方法將執行平緩的關閉過程:再也不接受新的任務,同時等待已經提交的任務執行完畢----包括還沒開始的任務,這種屬於正常關閉。shutdownNow方法將執行粗暴的關閉過程:它將取消全部運行中的任務,而且再也不啓動隊列中還沒有開始的任務,這種屬於強行關閉(關閉當前正在執行的任務,而後返回全部還沒有啓動的任務清單)。

  在ExecutorService關閉後提交的任務將由"拒絕執行處理器"來處理,它會拋棄任務,或者使得execute方法拋出一個RejectedExecutionException。等全部任務執行完成後,ExecutorService將轉入終止狀態。能夠調用awaitTermination來等待ExecutorService到達終止狀態,或者經過isTerminated來輪詢ExecutorService是否已經終止。一般在調用shutdown以後會當即調用awaitTermination阻塞等待,從而產生同步地關閉ExecutorService的效果。

 4.線程池--ThreadPoolExecutor

  線程池,從字面意義上看,是指管理一組同構工做線程的資源池。線程池是與工做隊列(work queue)密切相關的,其中在工做隊列保存了全部等待執行的任務。工做者線程的任務很簡單:從工做隊列中獲取一個任務並執行任務,而後返回線程池等待下一個任務。(線程池啓動初期線程不會啓動,有任務提交(調用execute或submit)纔會啓動,直到到達最大數量就再也不建立而是進入阻塞隊列)。

  "在線程池中執行任務"比"爲每個任務分配一個線程"優點更多。經過重用現有的線程而不是建立新線程,能夠處理多個請求時分攤在建立線程和銷燬過程當中產生的巨大開銷。另一個額外的好處是,當請求到達時,工做線程一般已經存在,所以不會因爲建立線程而延遲任務的執行,從而提升了性能。

  ThreadPoolExecutor爲Executor提供了一些基本實現。ThreadPoolExecutor是一個靈活的、穩定的線程池,容許各類容許機制。ThreadPoolExecutor定義了不少構造函數,最多見的是下面這個:

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

一、corePoolSize

  核心池的大小。在建立了線程池以後,默認狀況下,線程池中沒有任何線程,而是等待有任務到來才建立線程去執行任務。默認狀況下,在建立了線程池以後,線程池鐘的線程數爲0,當有任務到來後就會建立一個線程去執行任務

二、maximumPoolSize

  池中容許的最大線程數,這個參數表示了線程池中最多能建立的線程數量,當任務數量比corePoolSize大時,任務添加到workQueue,當workQueue滿了,將繼續建立線程以處理任務,maximumPoolSize表示的就是wordQueue滿了,線程池中最多能夠建立的線程數量

三、keepAliveTime

  只有當線程池中的線程數大於corePoolSize時,這個參數纔會起做用。當線程數大於corePoolSize時,終止前多餘的空閒線程等待新任務的最長時間.

四、unit

  keepAliveTime時間單位

五、workQueue

  存儲還沒來得及執行的任務

六、threadFactory

  執行程序建立新線程時使用的線程工廠

七、handler

  因爲超出線程範圍和隊列容量而使執行被阻塞時所使用的處理程序(拒絕執行處理器)

 

拒絕執行處理器其實是定義了拒絕執行線程的行爲:實際上也是一種飽和策略,當有界隊列被填滿後,飽和隊列開始發揮做用。

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

 

在類庫中定義了四種實現:

1.  AbortPolicy-終止策略

  直接拋出一個RejectedExecutionException,也是JDK默認的拒絕策略

    public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

2.CallerRunsPolicy-調運者運行策略

  若是線程池沒有被關閉,就嘗試執行任務。

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        public CallerRunsPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

 3.DiscardOldestPolicy-拋棄最舊的策略

  若是線程池沒有關閉,就移除隊列中最早進入的任務,而且嘗試執行任務。

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        public DiscardOldestPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

4. DiscardPolicy-拋棄策略

  什麼也不作,安靜的丟棄任務

    public static class DiscardPolicy implements RejectedExecutionHandler {
        public DiscardPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

 

5.Executors

5.1ThreadFactory

  在將這個以前先介紹一下ThreadFactory。每當線程池須要一個線程時,都是經過線程工廠建立的線程。默認的線程工廠方法將建立一個新的、非守護的線程,而且不包含特殊的線程信息。固然能夠經過線程工廠定製線程的信息。此工廠也有好多實現:

public interface ThreadFactory {

    /**
     * Constructs a new {@code Thread}.  Implementations may also initialize
     * priority, name, daemon status, {@code ThreadGroup}, etc.
     *
     * @param r a runnable to be executed by new thread instance
     * @return constructed thread, or {@code null} if the request to
     *         create a thread is rejected
     */
    Thread newThread(Runnable r);
}

  其實現類:

 

5.2Executors

  能夠經過Executors中的靜態工廠方法之一建立一個線程池。Executors的靜態工廠能夠建立經常使用的四種線程池:

newFixedThreadPool(採用LinkedBlockingQueue隊列--基於鏈表的阻塞隊列)

  建立一個定長線程池,每當提交一個任務時就建立一個線程,直到線程池的最大數量,這時線程池的規模將再也不變化(若是因爲某個線程因爲發生了未預期的exception而結束,那麼線程池會補充一個新的線程)。

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

newCachedThreadPool(使用SynchronousQueue同步隊列)

  建立一個可緩存線程池,若是線程池長度超過處理須要,可靈活回收空閒線程,若無可回收,則新建線程。線程池的規模不受限。

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

newScheduledThreadPool(使用DelayedWorkQueue延遲隊列)

   建立一個定長線程池,支持定時及週期性任務執行。相似於Timer。

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

 

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
              new DelayedWorkQueue());
    }

 

newSingleThreadExecutor(採用LinkedBlockingQueue隊列--基於鏈表的阻塞隊列)

  建立一個單線程化的線程池,它只會用惟一的工做線程來執行任務,保證全部任務按照指定順序(FIFO, LIFO, 優先級)執行。若是這個線程異常結束會建立一個新的線程來替代。

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

   

  newFixedThreadPool和newCachedThreadPool這兩個工廠方法返回通用的ThreadPoolExecutor實例,這些實例能夠直接用來構造專門用途的execotor。另外上面建立的時候都有一個能夠指定線程工廠的方法:

    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }  

  關於workqueue的選擇: DelayQueue 能夠實現有序加延遲的效果。 SynchronousQueue 同步隊列,實際上它不是一個真正的隊列,由於它不會維護隊列中元素的存儲空間,與其餘隊列不一樣的是,它維護一組線程,這些線程在等待把元素加入或移除隊列。LinkedBlockingQueue 相似於LinkedList,基於鏈表的阻塞隊列。此隊列若是不指定容量大小,默認採用Integer.MAX_VALUE(能夠理解爲無限隊列)。

  關於隊列的使用參考:http://www.javashuo.com/article/p-zhdzotmo-a.html

6.Java線程池的使用

下面全部的測試都是基於Myrunnale進行測試

package cn.qlq.thread.twenty;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyRunnable implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(MyRunnable.class);

    @Override
    public void run() {
        for (int i = 0; i < 5; i++) {
            log.info("threadName -> {},i->{} ", Thread.currentThread().getName(), i);
            try {
                Thread.sleep(1 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

 

1.FixedThreadPool的用法

  建立一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待。在建立的時候並不會立刻建立2個線程,而是在提交任務的時候才建立線程。

建立方法:

    /**
     * 參數是初始化線程池子的大小
     */
    private static final ExecutorService batchTaskPool = Executors.newFixedThreadPool(2);

 

查看源碼:(使用了阻塞隊列,超過池子容量的線程會在隊列中等待)

 

測試代碼:

package cn.qlq.thread.twenty;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo3 {
    /**
     * 參數是初始化線程池子的大小
     */
    private static final ExecutorService batchTaskPool = Executors.newFixedThreadPool(2);

    public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            batchTaskPool.execute(new MyRunnable());
        }
    }
}

結果:(執行完線程並無銷燬)

 

 解釋:

  池子容量大小是2,因此前兩個先被執行,第三個runable只是暫時的加到等待隊列,前兩個執行完成以後線程 pool-1-thread-1空閒以後從等待隊列獲取runnable進行執行。

  定長線程池的大小最好根據系統資源進行設置。如Runtime.getRuntime().availableProcessors()

 

而且上面程序執行完畢以後JVM並無結束,所以線程池建立的線程默認是非守護線程:

 

2.CachedThreadPool

  建立一個可緩存線程池,若是線程池長度超過處理須要,可靈活回收空閒線程,若無可回收,則新建線程。

建立方法:

private static final ExecutorService batchTaskPool = Executors.newCachedThreadPool();

 

查看源碼:(使用了同步隊列)

 

測試代碼:

package cn.qlq.thread.twenty;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo4 {
    /**
     * 參數是初始化線程池子的大小
     */
    private static final ExecutorService batchTaskPool = Executors.newCachedThreadPool();

    public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            batchTaskPool.execute(new MyRunnable());
        }
    }
}

結果:

 

執行完成執行線程並無結束

 

3.SingleThreadExecutor用法

   建立一個單線程化的線程池,它只會用惟一的工做線程來執行任務,保證全部任務按照指定順序(FIFO, LIFO, 優先級)執行。相似於單線程執行的效果同樣。

建立方法:

    private static final ExecutorService batchTaskPool = Executors.newSingleThreadExecutor();

 

查看源碼;使用的阻塞隊列

 

測試代碼:

package cn.qlq.thread.twenty;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo5 {
    private static final ExecutorService batchTaskPool = Executors.newSingleThreadExecutor();

    public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            batchTaskPool.execute(new MyRunnable());
        }
    }
}

結果:

 

只有一個線程在執行任務:

 

 4.ScheduledThreadPool用法------能夠實現任務調度功能

   建立一個定長線程池(會指定容量初始化大小),支持定時及週期性任務執行。能夠實現一次性的執行延遲任務,也能夠實現週期性的執行任務。

建立方法:

    private static final ScheduledExecutorService batchTaskPool = Executors.newScheduledThreadPool(2);

 

查看源碼:(使用了延遲隊列)

 

 

 測試代碼:

package cn.qlq.thread.twenty;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Demo6 {
    private static final ScheduledExecutorService batchTaskPool = Executors.newScheduledThreadPool(2);

    public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            // 第一次執行是在3s後執行(延遲任務)
            batchTaskPool.schedule(new MyRunnable(), 3, TimeUnit.SECONDS);
            // 第一個參數是須要執行的任務,第二個參數是第一次的延遲時間,第三個參數是兩次執行的時間間隔,第四個參數是時間的單位
            batchTaskPool.scheduleAtFixedRate(new MyRunnable(), 3, 7, TimeUnit.SECONDS);
            // 第一個參數是須要執行的任務,第二個參數是第一次的延遲時間,第三個參數是兩次執行的時間間隔,第四個參數是時間的單位
            batchTaskPool.scheduleWithFixedDelay(new MyRunnable(), 3, 5, TimeUnit.SECONDS);
        }
    }
}
schedule是一次性的任務,能夠指定延遲的時間。
scheduleAtFixedRate已固定的頻率來執行某項計劃(任務)
scheduleWithFixedDelay相對固定的延遲後,執行某項計劃 (這個就是第一個任務執行完5s後再次執行,通常用這個方法任務調度)
  若是延遲時間傳入的是負數會當即執行,不會報非法參數錯誤。

 關於兩者的區別:

  scheduleAtFixedRate :這個是按照固定的時間來執行,簡單來講:到點執行
  scheduleWithFixedDelay:這個呢,是等上一個任務結束後,在等固定的時間,而後執行。簡單來講:執行完上一個任務後再執行

舉例子

  scheduledThreadPool.scheduleAtFixedRate(new TaskTest("執行調度任務3"),0, 1, TimeUnit.SECONDS);  //這個就是每隔1秒,開啓一個新線程
  scheduledThreadPool.scheduleWithFixedDelay(new TaskTest("第四個"),0, 3, TimeUnit.SECONDS); //這個就是上一個任務執行完,3秒後開啓一個新線程

補充:好比想要實如今某一個時鐘定時晚上11點執行任務,而且天天都執行

        long curDateSecneds = 0;
        try {
            String time = "21:00:00";
            DateFormat dateFormat = new SimpleDateFormat("yy-MM-dd HH:mm:ss");
            DateFormat dayFormat = new SimpleDateFormat("yy-MM-dd");
            Date curDate = dateFormat.parse(dayFormat.format(new Date()) + " " + time);
            curDateSecneds = curDate.getTime();
        } catch (ParseException ignored) {
            // ignored
        }

        // 單位是s
        long initialDelay = (curDateSecneds - System.currentTimeMillis()) / 1000;
        int periodOneDaySeconds = 1 * 24 * 60 * 60;
        batchTaskPool.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println("111222");
            }
        }, initialDelay, periodOneDaySeconds, TimeUnit.SECONDS);

注意: 上面的單位也就是四個參數是延遲時間和間隔的單位,也就是說第四個參數決定第二個和第三個參數。

 

 固然實現任務調度還能夠採用quartz框架來實現,更加的靈活。參考:http://www.javashuo.com/article/p-xqwvhekc-gn.html

 

5.測試建立線程池的時候傳入一個線程工廠建立的線程是守護線程且自定義線程的name

package cn.qlq.thread.twenty;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class Demo7 {
    private static volatile AtomicInteger atomicInteger = new AtomicInteger();
    /**
     * 參數是初始化線程池子的大小
     */
    private static final ExecutorService batchTaskPool = Executors.newFixedThreadPool(2, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("t" + atomicInteger.incrementAndGet());
            thread.setDaemon(true);// 設置爲守護線程
            return thread;
        }
    });

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 3; i++) {
            batchTaskPool.execute(new MyRunnable());
        }
        // 必須休眠。不然建立的是守護線程會直接關閉進程
        Thread.sleep(20 * 1000);
    }
}

結果:

 

補充:關於線程池中的線程銷燬問題

  線程池中的線程若是不調用shutdown()方法線程是不會銷燬的,即便是方法內部的局部變量線程池也不會銷燬;調用shutdown方法以後在全部線程執行完後會線程線程池中的線程。因此在使用線程池的時候若是是方法內部使用必定要shutdown銷燬線程,若是是全局使用的靜態線程池能夠不shutdown。

例如:不調用shutdown方法不會銷燬線程。調用shutdown會銷燬線程。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestThreadPoolDestroy {
    public static void main(String[] args) {
        TestPoolDestroy();
        try {
            Thread.sleep(10 * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        TestPoolDestroy();
        try {
            Thread.sleep(10 * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void TestPoolDestroy() {
        ExecutorService batchTaskPool = Executors.newFixedThreadPool(3);
        final CountDownLatch latch = new CountDownLatch(3);// 閉鎖
        for (int i = 0; i < 3; i++) {
            batchTaskPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getName() + "進入run");
                        Thread.sleep(5 * 10000);
                        System.out.println(Thread.currentThread().getName() + "退出run");
                        latch.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        try {
            latch.await();// 閉鎖產生同步效果
            System.out.println("三個都執行完畢");
            // batchTaskPool.shutdown();// 調用此方法等待執行完畢會銷燬線程,若是不調用此方法即便方法退出也不會銷燬線程
            System.out.println(batchTaskPool.isTerminated());
            System.out.println(batchTaskPool.isShutdown());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

結果:

 

 將上面的shutdown方法的註釋去掉再次測試,結果以下:  調用shutdown以後線程會銷燬

 

 

補充:若是想要判斷線程池中的線程是否執行完畢,或者在多個線程在線程池中執行完畢以後處理某些事情能夠結合閉鎖來實現,參考:http://www.javashuo.com/article/p-djhilvzp-n.html

 (1)閉鎖實現 (建議使用這種)

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestThreadPoolDestroy {
    public static void main(String[] args) {
        TestPoolDestroy();
        System.out.println("main end");
    }

    private static void TestPoolDestroy() {
        ExecutorService batchTaskPool = Executors.newFixedThreadPool(3);
        final CountDownLatch latch = new CountDownLatch(3);// 閉鎖
        for (int i = 0; i < 3; i++) {
            batchTaskPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getName() + "進入run");
                        Thread.sleep(5 * 1000);
                        System.out.println(Thread.currentThread().getName() + "退出run");
                        latch.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        try {
            latch.await();// 閉鎖產生同步效果
            System.out.println("三個都執行完畢");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

結果:

pool-1-thread-1進入run
pool-1-thread-2進入run
pool-1-thread-3進入run
pool-1-thread-1退出run
pool-1-thread-3退出run
pool-1-thread-2退出run
三個都執行完畢
main end

 

(2)線程池自身攜帶的方法實現:  shuwdown後當即調用awaitTermination 實現

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class TestThreadPoolDestroy {
    public static void main(String[] args) {
        TestPoolDestroy();
        System.out.println("main end");
    }

    private static void TestPoolDestroy() {
        ExecutorService batchTaskPool = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 3; i++) {
            batchTaskPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getName() + "進入run");
                        Thread.sleep(5 * 1000);
                        System.out.println(Thread.currentThread().getName() + "退出run");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        try {
            batchTaskPool.shutdown();
            batchTaskPool.awaitTermination(1, TimeUnit.DAYS);
            System.out.println("三個都執行完畢");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

結果:

pool-1-thread-2進入run
pool-1-thread-3進入run
pool-1-thread-1進入run
pool-1-thread-3退出run
pool-1-thread-2退出run
pool-1-thread-1退出run
三個都執行完畢
main end

 

補充:例如我係統中使用的一個ExcutorService的例子:

/**
 * 同步釘釘組織結構和人員的Action
 * 
 * @author Administrator
 *
 */
@Namespace("/sync")
public class SyncGroupAndUserAndBaseInfoAction extends DMSActionSupport {

    /**
     * serialID
     */
    private static final long serialVersionUID = 3526083465788431949L;
    
    private static final ExecutorService batchTaskPool = Executors.newFixedThreadPool(2);

    private static Logger logger = LoggerFactory.getLogger(SyncGroupAndUserAndBaseInfoAction.class);

    @Autowired
    private GroupAndUserService groupService;

    @Autowired
    private BaseInfoService baseInfoService;

    /**
     * 同步基本信息的數據
     * 
     * @return
     */
    @Action(value = "syncGroupAndUser")
    public String syncGroupAndUser() {
        long startTime = System.currentTimeMillis();
        logger.info("manual sync groups and users!");

        String accessToken = FetchDataUtils.getAccessToken();
        if (StringUtils.isBlank(accessToken)) {
            setPreJs("accessToken is null!");
            return "js";
        }

        String groupStr = FetchDataUtils.getGroupStr(accessToken);
        if (StringUtils.isBlank(groupStr)) {
            setPreJs("groupStr is null");
            return "js";
        }
        
        
        Set<String> dingGroupIds = FetchDataUtils.getGroupIds(groupStr);// 釘釘同步回來的組
        //新開一個線程去獲取釘釘用戶和組織
        batchDisposeDingGroupAndUser(dingGroupIds,groupStr,accessToken);
        

        Map<String,Object> response = new HashMap<String,Object>();
        response.put("success", true);
        response.put("message", "success sync datas!");
        setPreJs(APIUtils.getJsonResultFromMap(response));
        
        long endTime = System.currentTimeMillis();
        logger.info("同步釘釘組織結構和用戶完成-----用時:{}ms",(endTime-startTime));
        return "js";
    }

    private void batchDisposeDingGroupAndUser(final Set<String> dingGroupIds, final String groupStr,final String accessToken) {
        Runnable run = new Runnable() {
            @Override
            public void run() {
                groupService.batchDisposeGroups(groupStr, dingGroupIds);
                groupService.fetchAndDisposeUsers(accessToken, dingGroupIds);                
            }
        };
        batchTaskPool.execute(run);
    }
    
}

注意:

  batchDisposeDingGroupAndUser()方法的形參必須聲明爲final,不然編譯錯誤。


補充:阿里規約有一條

【強制】線程池不容許使用 Executors 去建立,而是經過 ThreadPoolExecutor 的方式,這樣的處理方式讓寫的同窗更加明確線程池的運行規則,規避資源耗盡的風險。
說明: Executors 返回的線程池對象的弊端以下:
1) FixedThreadPool 和 SingleThreadPool:
  容許的請求隊列長度爲 Integer.MAX_VALUE,可能會堆積大量的請求,從而致使 OOM。

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

 

阻塞隊列默認是Integer.MAX_VALUE(能夠理解爲無限隊列)

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

 

2) CachedThreadPool 和 ScheduledThreadPool:
  容許的建立線程數量爲 Integer.MAX_VALUE, 可能會建立大量的線程,從而致使 OOM。

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
              new DelayedWorkQueue());
    }
相關文章
相關標籤/搜索