1、在任務和執行策略之間隱性耦合算法
Executor框架將任務的提交和它的執行策略解耦開來。雖然Executor框架爲制定和修改執行策略提供了至關大的靈活性,但並不是全部的任務都能適用全部的執行策略。數據庫
只有任務都是同類型而且相互獨立時,線程池的效率達到最佳安全
一、線程飢餓死鎖——在線程池中全部正在執行任務的線程都因爲等待其餘仍處於工做隊列中的任務而阻塞服務器
例1:在單線程池中,正在執行的任務阻塞等待隊列中的某個任務執行完畢框架
例2:線程池不夠大時,經過柵欄機制協調多個任務時ide
例3:因爲其餘資源的隱性限制,每一個任務都須要使用有限的數據庫鏈接資源,那麼無論線程池多大,都會表現出和和鏈接資源相同的大小 函數
每當提交了一個有依賴性的Executor任務時,要清楚地知道可能會出現線程"飢餓"死鎖,所以須要在代碼或配置Executor地配置文件中記錄線程池地大小限制或配置限制性能
二、運行時間較長的任務ui
線程池的大小應該超過有較長執行時間的任務數量,不然可能形成線程池中線程均服務於長時間任務致使其它短期任務也阻塞致使性能降低this
緩解策略:限定任務等待資源的時間,若是等待超時,那麼能夠把任務標示爲失敗,而後停止任務或者將任務從新返回隊列中以便隨後執行。這樣,不管任務的最終結果是否成功,這種方法都能確保任務總能繼續執行下去,並將線程釋放出來以執行一些能更快完成的任務。例如Thread.join、BlockingQueue.put、CountDownLatch.await以及Selector.select等
2、設置線程池的大小
線程池的理想大小取決於被提交任務的類型及所部署系統的特性
對於計算密集型的任務,在擁有Ncpu個處理器的系統上,當線程池的大小爲Ncpu+1時,一般能實現最優的利用率;對於包含I/O操做或者其餘阻塞操做的任務,因爲線程並不會一直執行,所以線程池的規模應該更大
N(threads)=N(cpu)*U(cpu)*(1+W/C) N(cpu)=CPU的數量=Runtime.getRuntime().availableProcessors(); U(cpu)= 指望CPU的使用率,0<=U(cpu)<=1 ;W/C=等待時間與運行時間的比率
3、配置ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
一、線程的建立與銷燬
newFixedThreadPool: CorePoolSize = MaxmumPoolSize
newCachedThreadPool: CorePoolSize=0,MaxmumPoolSize=Integer.MAX_VALUE,線程池可被無限擴展,需求下降時自動回收
二、管理隊列任務
newFixedThreadPool和newSingleThreadPool在默認狀況下將使用一個無界的LinkedBlockingQueue,有更好的性能
使用有界隊列有助於避免資源耗盡的狀況發生,爲了不當隊列填滿後,在使用有界的工做隊列時,隊列的大小與線程池的大小必須一塊兒調節,能防止過載
對於很是大的或者無界的線程池,能夠經過使用SynchronousQueue來避免任務排隊,要將一個元素放入SynchronousQueue中,必須有另外一個線程正在等待接受這個元素,任務會直接移交給執行它的線程,不然將拒絕任務。newCachedThreadPool工廠方法中就使用了SynchronousQueue
使用優先隊列PriorityBlockingQueue能夠控制任務被執行的順序
三、飽和策略
其餘:對執行策略進行修改,使用信號量,控制處於執行中的任務
public class BoundedExecutor { private final Executor exec; private final Semaphore semaphore; public BoundedExecutor(Executor exec, int bound) { this.exec = exec; this.semaphore = new Semaphore(bound); } public void submitTask(final Runnable command){ try { semaphore.acquire(); //提交任務前請求信號量 exec.execute(new Runnable() { @Override public void run() { try{ command.run(); } finally{ semaphore.release(); //執行完釋放信號 } } }); } catch (InterruptedException e) { // handle exception } } }
四、線程工廠
經過自定義線程工廠能夠對其進行擴展加入新的功能實現
當應用須要利用安全策略來控制某些特殊代碼庫的訪問權,能夠利用PrivilegedThreadFactory來定製本身的線程工廠,以避免出現安全性異常。將與建立privilegedThreadFactory的線程擁有相同的訪問權限、AccessControlContext和contextClassLoader
自定義線程工廠
1 public class MyThreadFactory implements ThreadFactory { 2 private final String poolName; 3 4 public MyThreadFactory(String poolName) { 5 super(); 6 this.poolName = poolName; 7 } 8 9 @Override 10 public Thread newThread(Runnable r) { 11 return new MyAppThread(r); 12 } 13 } 14 15 public class MyAppThread extends Thread { 16 public static final String DEFAULT_NAME="MyAppThread"; 17 private static volatile boolean debugLifecycle = false; 18 private static final AtomicInteger created = new AtomicInteger(); 19 private static final AtomicInteger alive = new AtomicInteger(); 20 private static final Logger log = Logger.getAnonymousLogger(); 21 22 public MyAppThread(Runnable r) { 23 this(r, DEFAULT_NAME); 24 } 25 26 public MyAppThread(Runnable r, String name) { 27 super(r, name+ "-" + created.incrementAndGet()); 28 setUncaughtExceptionHandler( //設置未捕獲的異常發生時的處理器 29 new Thread.UncaughtExceptionHandler() { 30 @Override 31 public void uncaughtException(Thread t, Throwable e) { 32 log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e); 33 } 34 }); 35 } 36 37 @Override 38 public void run() { 39 boolean debug = debugLifecycle; 40 if (debug) 41 log.log(Level.FINE, "running thread " + getName()); 42 try { 43 alive.incrementAndGet(); 44 super.run(); 45 } finally { 46 alive.decrementAndGet(); 47 if (debug) 48 log.log(Level.FINE, "existing thread " + getName()); 49 } 50 } 51 }
五、在調用構造函數後在定製ThreadPoolExecutor
4、擴展ThreadPoolExecutor
ThreadPoolExecutor使用了模板方法模式,提供了beforeExecute、afterExecute和terminated擴展方法
增長日誌和記時等功能的線程池
1 public class TimingThreadPoolExecutor extends ThreadPoolExecutor { 2 private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();//任務執行開始時間 3 private final Logger log = Logger.getAnonymousLogger(); 4 private final AtomicLong numTasks = new AtomicLong(); //統計任務數 5 private final AtomicLong totalTime = new AtomicLong(); //線程池運行總時間 6 7 public TimingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 8 long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { 9 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); 10 } 11 12 @Override 13 protected void beforeExecute(Thread t, Runnable r) { 14 super.beforeExecute(t, r); 15 log.fine(String.format("Thread %s: start %s", t, r)); 16 startTime.set(System.nanoTime()); 17 } 18 19 @Override 20 protected void afterExecute(Runnable r, Throwable t) { 21 try{ 22 long endTime = System.nanoTime(); 23 long taskTime = endTime - startTime.get(); 24 numTasks.incrementAndGet(); 25 totalTime.addAndGet(taskTime); 26 log.fine(String.format("Thread %s: end %s, time=%dns", t, r, taskTime)); 27 } finally{ 28 super.afterExecute(r, t); 29 } 30 } 31 32 @Override 33 protected void terminated() { 34 try{ 35 //任務執行平均時間 36 log.info(String.format("Terminated: average time=%dns", totalTime.get() / numTasks.get())); 37 }finally{ 38 super.terminated(); 39 } 40 } 41 } 42 43
5、遞歸算法的並行化