Java多線程知識點整理(線程池)

1.線程池的使用java

    線程池通常配合隊列一塊兒工做,是線程池限制併發處理任務的數量。而後設置隊列的大小,當任務超過隊列大小時,經過必定的拒絕策略來處理,這樣能夠保護系統免受大流量而致使崩潰--只是部分拒絕服務,仍是有一部分是能夠正常服務的。程序員

    線程池通常有核心線程池大小和線程池最大大小配置,當線程池中的線程空閒一段時間時將會回收,而核心線程池中的線程不會被回收。算法

    多少個線程合適呢?建議根據實際業務狀況來壓測決定,或者根據利特法則來算出一個合理的線程池大小。Java提供了ExecutorService的幾種實現:緩存

    a.ThreadPoolExecutor:標準線程池。併發

    b.newCachedThreadPool建立一個可緩存線程池,若是線程池長度超過處理須要,可靈活回收空閒線程,若無可回收,則新建線程。this

    c.newFixedThreadPool 建立一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待。url

    d.newScheduledThreadPool 建立一個定長線程池,支持定時及週期性任務執行。spa

    e.newSingleThreadExecutor 建立一個單線程化的線程池,它只會用惟一的工做線程來執行任務,保證全部任務按照指定順序(FIFO, LIFO, 優先級)執行。線程

    f.ForkJoinPool:相似於ThreadPoolExecutor,可是使用work-stealing模式,其會爲線程池中的每一個線程建立一個隊列,從而用work-stealing(任務竊取)算法使得線程能夠從其餘線程隊列裏竊取任務來執行。即若是本身的任務處理完成了,則能夠去忙碌的工做線程那裏竊取任務執行。code

2.線程池簡單分析

 2.1 、建立單線程的線程池:newSingleThreadExecutor 

ExecutorService  executorService=  Executors.newSingleThreadExecutor();

等價於

return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));

2.二、建立固定數量的線程池

ExecutorService  executorService1=  Executors.newFixedThreadPool(10);

等價於

return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());

注意:單線程的線程池與固定數量的線程池使用隊列的策略是同樣的,若是固定數量的線程池爲1,則至關於單線程的線程池。

dubbo源碼的使用:

// 文件緩存定時寫入
    private final ExecutorService registryCacheExecutor = 
Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true));

//執行代碼
if (syncSaveFile) {
      doSaveProperties(version);
 } else {
         registryCacheExecutor.execute(new SaveProperties(version));
 }

 

2.三、建立可緩存的線程池,初始大小爲0,線程池最大大小爲Integer.MAX_VALUE。其使用SynchronousQueue隊列,一個沒有數據緩衝的阻塞隊列。對其執行put操做後必須等待take操做消費該數據,反之亦然。該線程池不限制最大大小,若是線程池有空閒則複用,不然會建立一個新線程。若是線程池中的線程空閒60秒,則將被回收。該線程默認最大大小爲Integer.MAX_VALUE,請肯定必要後再使用該線程池。

ExecutorService  executorService2=  Executors.newCachedThreadPool();

等價於

return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());

dubbo源碼:在集羣的時候使用到了,由於並不知道,傳遞過來的集羣參數是多少。

private ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("mergeable-cluster-executor", true));
   

    @SuppressWarnings("rawtypes")
	public Result invoke(final Invocation invocation) throws RpcException {
        List<Invoker<T>> invokers = directory.list(invocation);

        
        Map<String, Future<Result>> results = new HashMap<String, Future<Result>>();
        for( final Invoker<T> invoker : invokers ) {
            Future<Result> future = executor.submit( new Callable<Result>() {
                public Result call() throws Exception {
                    return invoker.invoke(new RpcInvocation(invocation, invoker));
                }
            } );
            results.put( invoker.getUrl().getServiceKey(), future );
        }

2.四、支持延遲執行的線程池,其使用DelayedWorkQueue實現任務延遲。

ExecutorService  executorService3=  Executors.newScheduledThreadPool(10);

等價於

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

使用的案例:

Dubbo源碼

//1.檢測並鏈接註冊中心,使用的是newScheduledThreadPool
  //定義一個全局的線程池:
// 定時任務執行器
private final ScheduledExecutorService retryExecutor = 
Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true));

    // 失敗重試定時器,定時檢查是否有請求失敗,若有,無限次重試
    private final ScheduledFuture<?> retryFuture;


public FailbackRegistry(URL url) {
        super(url);
        int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, 
Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
        this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
            public void run() {
                // 檢測並鏈接註冊中心
                try {
                    retry();
                } catch (Throwable t) { // 防護性容錯
                    logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
                }
            }
        }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
    }

2.五、work-stealing線程池,默認爲並行行數爲Runtime.getRuntime().availableProcessors()

ExecutorService  executorService4=  Executors.newWorkStealingPool(2);

等價於

return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);

 

3.線程池終止

線程池不在使用記得中止線程,能夠調用shutdown以確保不接受新任務,並等待線程池中任務處理完成後再退出,或調用shutdownNow清除未執行任務,並用Thread.interrupt中止正在執行的任務。而後調用awaitTermination方法等待終止操做執行完成。

static ExecutorService  executorService3=  Executors.newScheduledThreadPool(10);
	public static void main(String[] args) {
		executorService3.shutdown();
		try {
			executorService3.awaitTermination(30, TimeUnit.SECONDS);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
	}

總結:在使用線程池時務必須設置池大小、隊列大小並設置相應的拒絕策略(RejectedExcutionHandler)。線程池執行狀況下沒法捕獲堆棧上下文,所以任務要記錄相關參數,以方便定位提交任務的源頭及定位引發問題的源頭。

4.ThreadPoolExecutor六個核心參數

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

4.一、corePoolSize

核心池的大小。在建立了線程池以後,默認狀況下,線程池中沒有任何線程,而是等待有任務到來才建立線程去執行任務。默認狀況下,在建立了線程池以後,線程池鐘的線程數爲0,當有任務到來後就會建立一個線程去執行任務。

4.二、maximumPoolSize

池中容許的最大線程數,這個參數表示了線程池中最多能建立的線程數量,當任務數量比corePoolSize大時,任務添加到workQueue,當workQueue滿了,將繼續建立線程以處理任務,maximumPoolSize表示的就是wordQueue滿了,線程池中最多能夠建立的線程數量。

4.三、keepAliveTime

只有當線程池中的線程數大於corePoolSize時,這個參數纔會起做用。當線程數大於corePoolSize時,終止前多餘的空閒線程等待新任務的最長時間。

4.四、unit

keepAliveTime時間單位。

4.五、workQueue

存儲還沒來得及執行的任務。

4.六、threadFactory

執行程序建立新線程時使用的工廠。

4.七、handler

因爲超出線程範圍和隊列容量而使執行被阻塞時所使用的處理程序。

總結:上面的內容,其餘應該都相對比較好理解,只有corePoolSize和maximumPoolSize須要多思考。這裏要特別再舉例以四條規則解釋一下這兩個參數:

一、池中線程數小於corePoolSize,新任務都不排隊而是直接添加新線程

二、池中線程數大於等於corePoolSize,workQueue未滿,首選將新任務加入workQueue而不是添加新線程

三、池中線程數大於等於corePoolSize,workQueue已滿,可是線程數小於maximumPoolSize,添加新的線程來處理被添加的任務

四、池中線程數大於大於corePoolSize,workQueue已滿,而且線程數大於等於maximumPoolSize,新任務被拒絕,使用handler處理被拒絕的任務

ThreadPoolExecutor的使用很簡單,前面的代碼也寫過例子了。經過execute(Runnable command)方法來發起一個任務的執行,經過shutDown()方法來對已經提交的任務作一個有效的關閉。儘管線程池很好,但咱們要注意JDK API的一段話:

強烈建議程序員使用較爲方便的Executors工廠方法Executors.newCachedThreadPool()(無界線程池,能夠進行線程自動回收)、Executors.newFixedThreadPool(int)(固定大小線程池)和Executors.newSingleThreadExecutor()(單個後臺線程),它們均爲大多數使用場景預約義了設置。

因此,跳開對ThreadPoolExecutor的關注(仍是那句話,有問題查詢JDK API),重點關注一下JDK推薦的Executors。

4.八、四種拒絕策略

所謂拒絕策略以前也提到過了,任務太多,超過maximumPoolSize了怎麼把?固然是接不下了,接不下那只有拒絕了。拒絕的時候能夠指定拒絕策略,也就是一段處理程序。

決絕策略的父接口是RejectedExecutionHandler,JDK自己在ThreadPoolExecutor裏給用戶提供了四種拒絕策略,看一下:

一、AbortPolicy

直接拋出一個RejectedExecutionException,這也是JDK默認的拒絕策略。

二、CallerRunsPolicy

嘗試直接運行被拒絕的任務,若是線程池已經被關閉了,任務就被丟棄了。

三、DiscardOldestPolicy

移除最晚的那個沒有被處理的任務,而後執行被拒絕的任務。一樣,若是線程池已經被關閉了,任務就被丟棄了。

四、DiscardPolicy

不能執行的任務將被刪除。

相關文章
相關標籤/搜索