####一、線程池的數量 Thread_num=CPU_num * U * (1 + W/C)java
U: 目標CPU使用率 [0,1] W: wait time C: compute time數組
####二、任務獨立時,設置線程池的工做隊列界限才合理,若是任務之間存在依賴性,則可能致使線程「飢餓死鎖」,應使用無界線程池,如newCachedThreadPool緩存
####三、單線程的Executor可能發生死鎖,newSingleThreadExecutor 對象同時執行父任務與子任務安全
/** * Created with IntelliJ IDEA. * User: pingansheng * Date: 2016/6/6 * Time: 15:43 */ public class SingleThreadExecutorDeadLock { ExecutorService exec = Executors.newSingleThreadExecutor() ; class MyCall implements Callable<String> { @Override public String call () throws Exception { Future<String> f1 = exec.submit( new MyThread()) ; Future<String> f2 = exec.submit(new MyThread()) ; System. out.println(" 任務提交結束,等待兩個子任務返回 "); // 主線程在單線程線程池中,因此 f1與f2 均在等待隊列中,永遠沒法執行。 return f1.get() + f2.get(); } } class MyThread implements Callable<String> { @Override public String call () throws Exception { System.out.println( "子任務結束") ; return "RES"; } } public static void main(String[] args) throws Exception { SingleThreadExecutorDeadLock lock = new SingleThreadExecutorDeadLock() ; Future<String> f3 = lock.exec .submit(lock.new MyCall()) ; try { System.out.println(f3.get()) ; } finally { lock.exec.shutdown() ; } } }
####四、一種帶有緩存的計算工具,線程安全的併發
/** * 一種帶有緩存的計算工具 * Created with IntelliJ IDEA. * User: pingansheng * Date: 2016/6/1 * Time: 14:29 */ public class CachedCompute { interface Computable<A, V> { V compute(A args) throws InterruptedException , ExecutionException; } class CacheComputer<A, V> implements Computable< A, V > { // 緩存 private final Map<A, Future<V>> cache = new ConcurrentHashMap<>() ; private Computable< A, V > computer; public CacheComputer(Computable< A, V > c) { this .computer = c ; } @Override public V compute (A args) throws InterruptedException , ExecutionException { Future<V> f = cache.get(args); if (null == f) { Callable<V> callable = new Callable< V>() { @Override public V call() throws Exception { return computer .compute(args) ; } }; FutureTask<V > ft = new FutureTask< V>(callable); f = cache .putIfAbsent(args, ft) ; if (null == f) { System. out.println(" 緩存放置成功 "); f = ft; ft.run(); } } try { return f.get() ; } catch (CancellationException e) { cache.remove(args , f); } catch (ExecutionException e) { throw e ; } return null; } } CacheComputer cache = new CacheComputer<String , String>( new Computable<String, String>() { @Override public String compute (String args) throws InterruptedException , ExecutionException { return "計算結果"; } }); public static void main (String[] args) throws Throwable { CachedCompute compute = new CachedCompute(); System. out.println(compute.cache .compute("key")) ; } }
####五、一種This指針逃逸ide
/** * Created with IntelliJ IDEA. * User: pingansheng * Date: 2016/5/11 * Time: 17:08 */ public class ThisEscape { private String name; public ThisEscape() throws Throwable{ new Thread(new EscapeRunnable()).start() ; Thread. sleep( 1000); name ="123"; } private class EscapeRunnable implements Runnable { @Override public void run() { // 經過ThisEscape.this就能夠引用外圍類對象 , 可是此時外圍類對象可能尚未構造完成 , 即發生了外圍類的this引用的逃逸,構造函數未完成以前不該該暴露this指針 System. out.println(ThisEscape.this. name); //可能會出現使人疑惑的錯誤 name=null } } public static void main(String[] args) throws Throwable{ new ThisEscape(); } }
/** * 使用 semaphore控制任務的提交速度 * Created with IntelliJ IDEA. * User: pingansheng * Date: 2016/6/7 * Time: 10:24 */ public class BoundedExecutor { private final Executor executor; private final Semaphore semaphore; public BoundedExecutor(Executor exe , int bound) { this .executor = exe ; this. semaphore = new Semaphore(bound); } public void submitTask(final Runnable command) throws InterruptedException { semaphore .acquire(); System. out.println(" 信號量獲取成功,當前剩餘數: " + semaphore .availablePermits()) ; try { executor .execute(new Runnable() { @Override public void run() { try { command.run(); } finally { semaphore.release(); } } }); } catch (RejectedExecutionException e) { semaphore .release(); } } public static void main(String[] args) throws Exception { ExecutorService es=Executors.newCachedThreadPool() ; BoundedExecutor exe = new BoundedExecutor(es , 100) ; for ( int i = 0 ; i < 50 ; i++) { exe.submitTask(new Runnable() { @Override public void run() { try { System. out.println(" 任務執行 "); Thread.sleep(1000) ; } catch (Throwable e) { } } }); } System.out.println( "提交50 個任務結束 "); es.shutdown() ; } }
####七、修改標準工廠建立的executor函數
/** * 強制類型轉換從新設置 Executor的線程池參數 * newSingleThreadExecutor 除外,不是線程工廠直接建立,而是經過包裝類 * public static ExecutorService newSingleThreadExecutor() { * return new FinalizableDelegatedExecutorService * (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS, * new LinkedBlockingQueue<Runnable>())); * } * Created with IntelliJ IDEA. * User: pingansheng * Date: 2016/6/7 * Time: 10:24 */ public class ExecutorForceSet { private static final ExecutorService executor = Executors.newFixedThreadPool( 1); // 使用此方法包裝後能夠避免被修改 // private static final ExecutorService executor =Executors.unconfigurableExecutorService(Executors.newFixedThreadPool(1)); public static void main(String[] args) throws Exception { if (executor instanceof ThreadPoolExecutor) { ((ThreadPoolExecutor) executor ).setCorePoolSize(100) ; ((ThreadPoolExecutor) executor ).setMaximumPoolSize( 100); } else { System.out.println( "轉換出錯,非線程工廠建立 "); } for (int i = 0; i < 50; i++) { //lambda executor.execute(() -> { try { System. out.println(" 任務執行 "); Thread.sleep (2000 ); } catch (Throwable e) { } }); } System.out.println( "提交50 個任務結束 "); executor.shutdown() ; } }
####八、Amdahl定律 串行部分越小加速比越大(單核運行時間/多核運行時間)工具
Speedup=1/(F+(1-F)/N)ui
F: 必須串行部分的比例 N: CPU個數this
####九、增長系統的伸縮性(增長計算資源時,吞吐量和處理能力相應增長)
####十、一種非阻塞的計數器
/** * CasCounter * <p/> * Nonblocking counter using CAS * * @author Brian Goetz and Tim Peierls */ @ThreadSafe public class CasCounter { private SimulatedCAS value; public int getValue() { return value.get(); } public int increment() { int v; do { v = value.get(); } while (v != value.compareAndSwap(v, v + 1)); //非阻塞通常使用底層的併發原語操做 return v + 1; } }