java 線程之executors線程池

1、線程池的做用

  平時的業務中,若是要使用多線程,那麼咱們會在業務開始前建立線程,業務結束後,銷燬線程。可是對於業務來講,線程的建立和銷燬是與業務自己無關的,只關心線程所執行的任務。所以但願把儘量多的cpu用在執行任務上面,而不是用在與業務無關的線程建立和銷燬上面。而線程池則解決了這個問題。java

  線程池的做用:線程池做用就是限制系統中執行線程的數量。根據系統的環境狀況,能夠自動或手動設置線程數量,達到運行的最佳效果,從而避免平凡的建立和銷燬線程帶來的系統開銷也有效的規避了由於建立的線程過多而耗盡系統資源致使服務器宕機。使用Runtime.getRuntime().availableProcessors();設置線程數量。緩存

2、 java併發包提供的線程池 Executors類

  A、newFixedThreadPool  用來建立一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待服務器

           ExecutorService fixedThreadPool =Executors.newFixedThreadPool(1);多線程

複製代碼

 public static ExecutorService newFixedThreadPool(int nThreads) {        //參數詳解 第一個參數核心線程數,線程池在實例化的時候初始化時線程數        //第二個:該線程池最大線程數        //第三個:線程空閒時間(0L表示沒有空閒時間即沒有使用就會被回收)        //第四個:空閒時間單位        //第五個:LinkedBlockingQueue ×××隊列 ,將沒有線程處理的任務加入該隊列中
        return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
    }

複製代碼

  B、newSingleThreadExecutor 用來建立一個單線程化的線程池,它只用惟一的工做線程來執行任務,一次只支持一個,全部任務按照指定的順序執行併發

    ExecutorService fixedThreadPool = Executors.newSingleThreadExecutor();ide

複製代碼

public static ExecutorService newSingleThreadExecutor() {        //參數詳解 第一個參數核心線程數,線程池在實例化的時候初始化時線程數        //第二個:該線程池最大線程數        //第三個:線程空閒時間(0L表示沒有空閒時間即沒有使用就會被回收)        //第四個:空閒時間單位        //第五個:LinkedBlockingQueue 無解隊列 ,將沒有線程處理的任務加入該隊列中
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
    }

複製代碼

  C、newCachedThreadPool  用來建立一個可緩存線程池,該線程池沒有長度限制,對於新的任務,若是有空閒的線程,則使用空閒的線程執行,若是沒有,則新建一個線程來執行任務。若是線程池長度超過處理須要,可靈活回收空閒線程。線程

ExecutorService fixedThreadPool = Executors.newCachedThreadPool(); 日誌

複製代碼

 public static ExecutorService newCachedThreadPool() {        //參數詳解 第一個參數核心線程數,線程池在實例化的時候初始化時線程數        //第二個:Integer.MAX_VALUE 不限制該線程池的線程數        //第三個:線程空閒時間(60L 表示線程空閒60秒以後被回收)        //第四個:空閒時間單位 SECONDS 秒        //第五個:SynchronousQueue 無容量隊列 ,將任務直接提交給線程處理自身不存儲任務
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
    }

複製代碼

  D、newScheduledThreadPool 用來建立一個定長線程池,而且支持定時和週期性的執行任務接口

    ScheduledExecutorService executorsScheduled=Executors.newScheduledThreadPool(2);隊列

複製代碼

public ScheduledThreadPoolExecutor(int corePoolSize) {        //參數詳解 第一個參數核心線程數,線程池在實例化的時候初始化時線程數        //第二個:Integer.MAX_VALUE 不限制該線程池的線程數        //第三個:線程空閒時間(0 表示線程空閒0秒以後被回收)        //第四個:空閒時間單位 SECONDS 秒        //第五個:DelayedWorkQueue 延時隊列 
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
    }

複製代碼

使用 newScheduledThreadPool實現定時器

複製代碼

package com.jalja.org.thread.executors;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.ScheduledFuture;import java.util.concurrent.TimeUnit;public class NewScheduledThreadPoolTest {    public static void main(String[] args) {
        Runnable runnable=new ScheduledThread();        //實現定時器
        ScheduledExecutorService executorsScheduled=Executors.newScheduledThreadPool(2);        //runnable 須要執行的任務  1:初始化時間(初始化延遲1秒後執行)  3:輪詢時間(每隔3秒執行)        //TimeUnit.SECONDS:時間單位
        ScheduledFuture<?> scheduledFuture= executorsScheduled.scheduleWithFixedDelay(runnable, 1,3, TimeUnit.SECONDS);
        System.out.println("scheduledFuture:"+scheduledFuture);
    }
} 
class  ScheduledThread implements Runnable{    public void run() {
        System.out.println(Thread.currentThread().getName() +"=>開始");        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {            // TODO Auto-generated catch block            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() +"=>結束");
    }
}

複製代碼

 線程池中提交任務的兩種方式:

execute()方法:該方法是 ExecutorService 接口的父類(接口)方法,該接口只有這一個方法。

 

public interface Executor {    void execute(Runnable command);
}

 

submit()方法:該方法是ExecutorService 接口的方法。

複製代碼

public interface ExecutorService extends Executor {
  ...
  <T> Future<T> submit(Callable<T> task);

  <T> Future<T> submit(Runnable task, T result);

  Future<?> submit(Runnable task);
  ...
}

複製代碼

 從上面的源碼以及講解能夠總結execute()和submit()方法的區別:

  1. 接收的參數不同;

  2. submit()有返回值,而execute()沒有;

 

3、自定義線程池

  在Java線程池中的newCachedThreadPool,newFixedThreadPool,newSingleThreadExecutor,newScheduledThreadPool這四個線程池在底層都是調用了ThreadPoolExecutor()這個構造方法。若Executors這個類沒法知足咱們的需求的時候,能夠本身建立自定義的線程池。
ThreadPoolExecutor類的定義以下

複製代碼

                public ThreadPoolExecutor(int corePoolSize,//核心線程數--線程池初始化建立的線程數量  
                   int maximumPoolSize,//最大線程數,線程池中能建立的最大線程數  
                   long keepAliveTime,//線程存活時間  
                   TimeUnit unit,//線程存貨時間單位  
                   BlockingQueue<Runnable> workQueue,//一個阻塞隊列  
                   ThreadFactory threadFactory//拒絕策略  
                 ) {……}

複製代碼

自定義線程池使用有界隊列(ArrayBlockingQueue 、LinkedBlockingQueue ):

  如有新的任務須要執行,若是線程池實際線程數小於corePoolSize核心線程數的時候,則優先建立線程。若大於corePoolSize時,則會將多餘的線程存放在隊列中,若隊列已滿,且最請求線程小於maximumPoolSize的狀況下,則自定義的線程池會建立新的線程,若隊列已滿,且最請求線程大於maximumPoolSize的狀況下,則執行拒絕策略,或其餘自定義方式。

複製代碼

package com.jalja.org.thread.executors;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class ExecutorsTest {    public static void main(String[] args) {
        ThreadPoolExecutor test=new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(2));
        test.execute(new ExecutorsTest().new ThreadTest());
        test.execute(new ExecutorsTest().new ThreadTest());
        test.execute(new ExecutorsTest().new ThreadTest());
        test.execute(new ExecutorsTest().new ThreadTest());
        test.execute(new ExecutorsTest().new ThreadTest());
        test.shutdown();
    }    class ThreadTest implements Runnable{        public void run() {
            System.out.println(Thread.currentThread().getName());            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } 
        }
    }
}

複製代碼

結果:

複製代碼

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.jalja.org.thread.executors.ExecutorsTest$ThreadTest@70dea4e rejected from java.util.concurrent.ThreadPoolExecutor@5c647e05[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    at com.jalja.org.thread.executors.ExecutorsTest.main(ExecutorsTest.java:17)
pool-1-thread-1pool-1-thread-2pool-1-thread-1pool-1-thread-2

複製代碼

看結果可知有一個任務是沒有執行直接拋出異常的。隊列已滿,且最請求線程大於maximumPoolSize的狀況下,則執行拒絕策略,這裏使用的是——AbortPolicy:直接拋出異常,系統正常工做(默認的策略)。

自定義線程池使用×××隊列:

  對於×××隊列除非系統資源耗盡,不然×××隊列不存在任務入隊失敗的狀況,若系統的線程數小於corePoolSize時,則新建線程執行corePoolSize,當達到corePoolSize後,則把多餘的任務放入隊列中等待執行若任務的建立和處理的速速差別很大,×××隊列會保持快速增加,直到耗盡系統內存爲之,對於×××隊列的線程池maximumPoolSize並沒有真實用處。

4、拒絕策略

JDK提供策略:

 

1.AbortPolicy:直接拋出異常,系統正常工做。(默認的策略)

複製代碼

package com.jalja.org.thread.executors;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.RejectedExecutionException;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class ExecutorsTest {    public static void main(String[] args) {
        BlockingQueue<Runnable> f=new LinkedBlockingQueue<Runnable>(2);
        ThreadPoolExecutor test=new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS,f);        try {
            test.execute(new ExecutorsTest().new ThreadTest());
            test.execute(new ExecutorsTest().new ThreadTest());
            test.execute(new ExecutorsTest().new ThreadTest());
            test.execute(new ExecutorsTest().new ThreadTest());
            test.execute(new ExecutorsTest().new ThreadTest());
        } catch (RejectedExecutionException e) {
            e.printStackTrace();
            System.out.println("超過有界隊列的數據記錄日誌");
        }
        test.shutdown();
    }    class ThreadTest implements Runnable{        public void run() {
            System.out.println(Thread.currentThread().getName());            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } 
        }
    }
}

複製代碼

2.CallerRunsPolicy:只要線程池未關閉,該策略直接在調用者線程中執行,運行當前被丟棄的任務。
3.DiscardOrderstPolicy:丟棄最老的請求,嘗試再次提交當前任務。
4.丟棄沒法處理的任務,不給於任何處理。

自定義策略:須要實現RejectedExecutionHandler接口

複製代碼

package com.jalja.org.thread.executors;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.RejectedExecutionException;import java.util.concurrent.RejectedExecutionHandler;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class ExecutorsTest {    public static void main(String[] args) {
        BlockingQueue<Runnable> f=new LinkedBlockingQueue<Runnable>(2);
        ThreadPoolExecutor test=new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS,f, new MyRejected());
            test.execute(new ExecutorsTest().new ThreadTest());
            test.execute(new ExecutorsTest().new ThreadTest());
            test.execute(new ExecutorsTest().new ThreadTest());
            test.execute(new ExecutorsTest().new ThreadTest());
            test.execute(new ExecutorsTest().new ThreadTest());
            test.shutdown();
    }    class ThreadTest implements Runnable{        public void run() {
            System.out.println(Thread.currentThread().getName());            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } 
        }
    }
}class MyRejected implements RejectedExecutionHandler{    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("執行異常的任務加入日誌");
    }
}
相關文章
相關標籤/搜索