java併發編程實戰:第八章----線程池的使用

1、在任務和執行策略之間隱性耦合算法

Executor框架將任務的提交和它的執行策略解耦開來。雖然Executor框架爲制定和修改執行策略提供了至關大的靈活性,但並不是全部的任務都能適用全部的執行策略。數據庫

  • 依賴性任務:依賴其餘同步任務的結果,使其不得不順序執行,影響活躍性
  • 使用線程封閉的任務:在單線程的Executor中執行,任務能夠不是線程安全的,可是一旦提交到線程池時,就會失去線程安全
  • 對響應時間敏感的任務:在單個線程或含有少許線程的線程池中執行是不可接受的
  • 使用ThreadLocal的任務:ThreadLocal使每一個線程均可以擁有某個變量的一個私有"版本",而線程池中的線程是重複使用的,即一次使用完後,會被從新放回線程池,可被從新分配使用。所以,ThreadLocal線程變量,若是保存的信息只是針對一次請求的,放回線程池以前須要清空這些Threadlocal變量的值(或者取得線程以後,首先清空這些Threadlocal變量的值)

只有任務都是同類型而且相互獨立時,線程池的效率達到最佳安全

一、線程飢餓死鎖——在線程池中全部正在執行任務的線程都因爲等待其餘仍處於工做隊列中的任務而阻塞服務器

  例1:在單線程池中,正在執行的任務阻塞等待隊列中的某個任務執行完畢框架

  例2:線程池不夠大時,經過柵欄機制協調多個任務時ide

  例3:因爲其餘資源的隱性限制,每一個任務都須要使用有限的數據庫鏈接資源,那麼無論線程池多大,都會表現出和和鏈接資源相同的大小 函數

每當提交了一個有依賴性的Executor任務時,要清楚地知道可能會出現線程"飢餓"死鎖,所以須要在代碼或配置Executor地配置文件中記錄線程池地大小限制或配置限制性能

二、運行時間較長的任務ui

  線程池的大小應該超過有較長執行時間的任務數量,不然可能形成線程池中線程均服務於長時間任務致使其它短期任務也阻塞致使性能降低this

緩解策略:限定任務等待資源的時間,若是等待超時,那麼能夠把任務標示爲失敗,而後停止任務或者將任務從新返回隊列中以便隨後執行。這樣,不管任務的最終結果是否成功,這種方法都能確保任務總能繼續執行下去,並將線程釋放出來以執行一些能更快完成的任務。例如Thread.join、BlockingQueue.put、CountDownLatch.await以及Selector.select等

 

2、設置線程池的大小

線程池的理想大小取決於被提交任務的類型及所部署系統的特性

  • 線程池過大,那麼大量的線程將在相對不多的CPU和內存資源上發生競爭,這不只會致使更高的內存使用量,並且還可能耗盡資源
  • 若是線程池太小,那麼將致使許多空閒的處理器沒法執行工做,從而下降吞吐量

對於計算密集型的任務,在擁有Ncpu個處理器的系統上,當線程池的大小爲Ncpu+1時,一般能實現最優的利用率;對於包含I/O操做或者其餘阻塞操做的任務,因爲線程並不會一直執行,所以線程池的規模應該更大

N(threads)=N(cpu)*U(cpu)*(1+W/C)   N(cpu)=CPU的數量=Runtime.getRuntime().availableProcessors(); U(cpu)= 指望CPU的使用率,0<=U(cpu)<=1 ;W/C=等待時間與運行時間的比率

3、配置ThreadPoolExecutor

 

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

一、線程的建立與銷燬

  • CorePoolSize: 線程池基本大小,在建立ThreadPoolExecutor初期,線程並不會當即啓動,而是等到有任務提交時纔會啓動,除非調用prestartAllCoreThreads,而且只有在工做隊列滿了的狀況下才會建立超出這個數量的線程。
  • MaxmumPooSize: 線程池最大大小表示可同時活動的線程數量的上限。若某個線程的空閒時間超過了keepAliveTime, 則被標記爲可回收的

newFixedThreadPool: CorePoolSize = MaxmumPoolSize

newCachedThreadPool: CorePoolSize=0,MaxmumPoolSize=Integer.MAX_VALUE,線程池可被無限擴展,需求下降時自動回收

二、管理隊列任務

  • workQueue:用於保存超過線程池線程處理速率的Runnable任務的隊列 (三種:無界隊列、有界隊列和同步移交)

newFixedThreadPool和newSingleThreadPool在默認狀況下將使用一個無界的LinkedBlockingQueue,有更好的性能

使用有界隊列有助於避免資源耗盡的狀況發生,爲了不當隊列填滿後,在使用有界的工做隊列時,隊列的大小與線程池的大小必須一塊兒調節,能防止過載

對於很是大的或者無界的線程池,能夠經過使用SynchronousQueue來避免任務排隊,要將一個元素放入SynchronousQueue中,必須有另外一個線程正在等待接受這個元素,任務會直接移交給執行它的線程,不然將拒絕任務。newCachedThreadPool工廠方法中就使用了SynchronousQueue

使用優先隊列PriorityBlockingQueue能夠控制任務被執行的順序

三、飽和策略

  • AbortPolicy(停止策略),默認的飽和策略。會拋出RejectedExecutionException異常(拋棄當前任務vs拋棄最舊任務)
  • 調用者運行:下一個任務在調用了execute方法的主線程中進行運行,主線程至少在一段時間內不能提交任何任務。到達的請求將被保存在TCP層的隊列中而不是在應用程序的隊列中,致使服務器在高負載下實現一種平緩的性能下降

其餘:對執行策略進行修改,使用信號量,控制處於執行中的任務

public class BoundedExecutor {
    private final Executor exec;
    private final Semaphore semaphore;
    
    public BoundedExecutor(Executor exec, int bound) {
        this.exec = exec;
        this.semaphore = new Semaphore(bound);
    }
    
    public void submitTask(final Runnable command){
        try {
            semaphore.acquire(); //提交任務前請求信號量
            exec.execute(new Runnable() {
                @Override
                public void run() {
                    try{
                        command.run();
                    } finally{
                        semaphore.release(); //執行完釋放信號
                    }
                }
            });
        } catch (InterruptedException e) {
            // handle exception
        }
    }
}

四、線程工廠

經過自定義線程工廠能夠對其進行擴展加入新的功能實現

當應用須要利用安全策略來控制某些特殊代碼庫的訪問權,能夠利用PrivilegedThreadFactory來定製本身的線程工廠,以避免出現安全性異常。將與建立privilegedThreadFactory的線程擁有相同的訪問權限、AccessControlContext和contextClassLoader

自定義線程工廠
 1 public class MyThreadFactory implements ThreadFactory {
 2     private final String poolName;
 3     
 4     public MyThreadFactory(String poolName) {
 5         super();
 6         this.poolName = poolName;
 7     }
 8 
 9     @Override
10     public Thread newThread(Runnable r) {
11         return new MyAppThread(r);
12     }
13 }
14 
15 public class MyAppThread extends Thread {
16     public static final String DEFAULT_NAME="MyAppThread";
17     private static volatile boolean debugLifecycle = false;
18     private static final AtomicInteger created = new AtomicInteger();
19     private static final AtomicInteger alive = new AtomicInteger();
20     private static final Logger log = Logger.getAnonymousLogger();
21     
22     public MyAppThread(Runnable r) {
23         this(r, DEFAULT_NAME);
24     }
25 
26     public MyAppThread(Runnable r, String name) {
27         super(r, name+ "-" + created.incrementAndGet());
28         setUncaughtExceptionHandler( //設置未捕獲的異常發生時的處理器
29                 new Thread.UncaughtExceptionHandler() {
30                     @Override
31                     public void uncaughtException(Thread t, Throwable e) {
32                         log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e);
33                     }
34                 });
35     }
36 
37     @Override
38     public void run() {
39         boolean debug = debugLifecycle;
40         if (debug) 
41             log.log(Level.FINE, "running thread " + getName());
42         try {
43             alive.incrementAndGet();
44             super.run();
45         } finally {
46             alive.decrementAndGet();
47             if (debug) 
48                 log.log(Level.FINE, "existing thread " + getName());
49         }
50     }
51 } 

 

五、在調用構造函數後在定製ThreadPoolExecutor

  • 能夠在建立線程池後,再經過Setter方法設置其基本屬性(將ExecutorService擴展爲ThreadPoolExecutor)
  • 在Executors中包含一個unconfigurableExecutorService工廠方法,該方法對一個現有的ExecutorService進行包裝,使其只暴露出ExecutorService的方法,所以不能對它進行配置

 

4、擴展ThreadPoolExecutor

ThreadPoolExecutor使用了模板方法模式,提供了beforeExecute、afterExecute和terminated擴展方法

  • 線程執行前調用beforeExecute(若是beforeExecute拋出了一個RuntimeException,那麼任務將不會被執行)
  • 線程執行後調用afterExecute(拋出異常也會調用,若是任務在完成後帶有一個Error,那麼就不會調用afterExecute)
  • 在線程池完成關閉操做時調用terminated,也就是全部任務都已經完成而且全部工做者線程也已經關閉後

 

增長日誌和記時等功能的線程池
 1 public class TimingThreadPoolExecutor extends ThreadPoolExecutor {
 2     private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();//任務執行開始時間
 3     private final Logger log = Logger.getAnonymousLogger();
 4     private final AtomicLong numTasks = new AtomicLong(); //統計任務數
 5     private final AtomicLong totalTime = new AtomicLong(); //線程池運行總時間
 6 
 7     public TimingThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
 8             long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
 9         super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
10     }
11 
12     @Override
13     protected void beforeExecute(Thread t, Runnable r) {
14         super.beforeExecute(t, r);
15         log.fine(String.format("Thread %s: start %s", t, r));
16         startTime.set(System.nanoTime());
17     }
18 
19     @Override
20     protected void afterExecute(Runnable r, Throwable t) {
21         try{
22             long endTime = System.nanoTime();
23             long taskTime = endTime - startTime.get();
24             numTasks.incrementAndGet();
25             totalTime.addAndGet(taskTime);
26             log.fine(String.format("Thread %s: end %s, time=%dns", t, r, taskTime));
27         } finally{
28             super.afterExecute(r, t);
29         }
30     }
31 
32     @Override
33     protected void terminated() {
34         try{
35             //任務執行平均時間
36             log.info(String.format("Terminated: average time=%dns", totalTime.get() / numTasks.get()));
37         }finally{
38             super.terminated();
39         }
40     }
41 }
42 
43 

 

5、遞歸算法的並行化

  • 若是循環中的迭代操做都是獨立的,而且不須要等待全部的迭代操做都完成再繼續執行,那麼就可使用Executor將串行循環轉化爲並行循環
  • 若是須要提交一個任務集並等待它們完成,那麼可使用ExecutorService.invokeAll
  • 若是遞歸執行的任務中,在每一個迭代操做中都不須要來自於後續遞歸迭代的結果,能夠建立一個特定於遍歷過程的Executor,並使用shutdown和awaitTermination等方法,等待上面並行運行的結果
相關文章
相關標籤/搜索