Java併發編程實戰

####一、線程池的數量 Thread_num=CPU_num * U * (1 + W/C)java

U: 目標CPU使用率 [0,1] W: wait time C: compute time數組

####二、任務獨立時,設置線程池的工做隊列界限才合理,若是任務之間存在依賴性,則可能致使線程「飢餓死鎖」,應使用無界線程池,如newCachedThreadPool緩存

####三、單線程的Executor可能發生死鎖,newSingleThreadExecutor 對象同時執行父任務與子任務安全

/**
* Created with IntelliJ IDEA.
* User: pingansheng
* Date: 2016/6/6
* Time: 15:43
*/
public class SingleThreadExecutorDeadLock {

    ExecutorService exec = Executors.newSingleThreadExecutor() ;

    class MyCall implements Callable<String> {
        @Override public String call () throws Exception {
            Future<String> f1 = exec.submit( new MyThread()) ;
            Future<String> f2 = exec.submit(new MyThread()) ;
            System. out.println(" 任務提交結束,等待兩個子任務返回 ");
            // 主線程在單線程線程池中,因此 f1與f2 均在等待隊列中,永遠沒法執行。
            return f1.get() + f2.get();
        }
    }

    class MyThread implements Callable<String> {
        @Override public String call () throws Exception {
            System.out.println( "子任務結束") ;
            return "RES";
        }
    }

    public static void main(String[] args) throws Exception {
        SingleThreadExecutorDeadLock lock = new SingleThreadExecutorDeadLock() ;

        Future<String> f3 = lock.exec .submit(lock.new MyCall()) ;
        try {
            System.out.println(f3.get()) ;
        } finally {
            lock.exec.shutdown() ;
        }
    }
}

####四、一種帶有緩存的計算工具,線程安全的併發

/**
* 一種帶有緩存的計算工具
 * Created with IntelliJ IDEA.
* User: pingansheng
* Date: 2016/6/1
* Time: 14:29
*/
public class CachedCompute {

    interface Computable<A, V> {
        V compute(A args) throws InterruptedException , ExecutionException;
    }

    class CacheComputer<A, V> implements Computable< A, V > {

        // 緩存
        private final Map<A, Future<V>> cache = new ConcurrentHashMap<>() ;

        private Computable< A, V > computer;

        public CacheComputer(Computable< A, V > c) {
            this .computer = c ;
        }

        @Override public V compute (A args) throws InterruptedException , ExecutionException {
            Future<V> f = cache.get(args);
            if (null == f) {
                Callable<V> callable = new Callable< V>() {
                    @Override public V call() throws Exception {
                        return computer .compute(args) ;
                    }
                };

                FutureTask<V > ft = new FutureTask< V>(callable);

                f = cache .putIfAbsent(args, ft) ;

                if (null == f) {
                    System. out.println(" 緩存放置成功 ");
                    f = ft;
                    ft.run();
                }
            }
            try {
                return f.get() ;
            } catch (CancellationException e) {
                cache.remove(args , f);
            } catch (ExecutionException e) {
                throw e ;
            }
            return null;
        }
    }

    CacheComputer cache = new CacheComputer<String , String>( new Computable<String, String>() {
        @Override public String compute (String args)
                throws InterruptedException , ExecutionException {
            return "計算結果";
        }
    });

    public static void main (String[] args) throws Throwable {
        CachedCompute compute = new CachedCompute();
        System. out.println(compute.cache .compute("key")) ;
    } 
}

####五、一種This指針逃逸ide

/**
* Created with IntelliJ IDEA.
* User: pingansheng
* Date: 2016/5/11
* Time: 17:08
*/
public class ThisEscape {

    private String name;

    public ThisEscape() throws Throwable{
        new Thread(new EscapeRunnable()).start() ;

        Thread. sleep( 1000);
        name ="123";
    }

    private class EscapeRunnable implements Runnable {
        @Override
        public void run() {
            // 經過ThisEscape.this就能夠引用外圍類對象 , 可是此時外圍類對象可能尚未構造完成 , 即發生了外圍類的this引用的逃逸,構造函數未完成以前不該該暴露this指針 
            System. out.println(ThisEscape.this. name); //可能會出現使人疑惑的錯誤 name=null
        }
    }

    public static void main(String[] args) throws Throwable{
        new ThisEscape();
    }
}

六、使用Semaphore控制任務提交速度

/**
* 使用 semaphore控制任務的提交速度
 * Created with IntelliJ IDEA.
* User: pingansheng
* Date: 2016/6/7
* Time: 10:24
*/
public class BoundedExecutor {

    private final Executor executor;
    private final Semaphore semaphore;

    public BoundedExecutor(Executor exe , int bound) {
        this .executor = exe ;
        this. semaphore = new Semaphore(bound);
    }

    public void submitTask(final Runnable command) throws InterruptedException {
        semaphore .acquire();
        System. out.println(" 信號量獲取成功,當前剩餘數: " + semaphore .availablePermits()) ;
        try {
            executor .execute(new Runnable() {
                @Override public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            semaphore .release();
        }
    }

    public static void main(String[] args) throws Exception {
        ExecutorService es=Executors.newCachedThreadPool() ;
        BoundedExecutor exe = new BoundedExecutor(es , 100) ;

        for ( int i = 0 ; i < 50 ; i++) {
            exe.submitTask(new Runnable() {
                @Override public void run() {
                    try {
                        System. out.println(" 任務執行 ");
                        Thread.sleep(1000) ;
                    } catch (Throwable e) {

                    }
                }
            });
        }
        System.out.println( "提交50 個任務結束 ");
        es.shutdown() ;
    }
}

####七、修改標準工廠建立的executor函數

/**
* 強制類型轉換從新設置 Executor的線程池參數
 * newSingleThreadExecutor 除外,不是線程工廠直接建立,而是經過包裝類
 * public static ExecutorService newSingleThreadExecutor() {
* return new FinalizableDelegatedExecutorService
* (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,
* new LinkedBlockingQueue<Runnable>()));
* }
* Created with IntelliJ IDEA.
* User: pingansheng
* Date: 2016/6/7
* Time: 10:24
*/
public class ExecutorForceSet {

    private static final ExecutorService executor = Executors.newFixedThreadPool( 1);
    //    使用此方法包裝後能夠避免被修改
    // private static final ExecutorService executor =Executors.unconfigurableExecutorService(Executors.newFixedThreadPool(1));

    public static void main(String[] args) throws Exception {

        if (executor instanceof ThreadPoolExecutor) {
            ((ThreadPoolExecutor) executor ).setCorePoolSize(100) ;
            ((ThreadPoolExecutor) executor ).setMaximumPoolSize( 100);
        } else {
            System.out.println( "轉換出錯,非線程工廠建立 ");
        }
        for (int i = 0; i < 50; i++) {
            //lambda
            executor.execute(() -> {
                try {
                    System. out.println(" 任務執行 ");
                    Thread.sleep (2000 );
                } catch (Throwable e) {

                }
            });
        }

        System.out.println( "提交50 個任務結束 ");
        executor.shutdown() ;
    }
}

####八、Amdahl定律 串行部分越小加速比越大(單核運行時間/多核運行時間)工具

Speedup=1/(F+(1-F)/N)ui

F: 必須串行部分的比例 N: CPU個數this

####九、增長系統的伸縮性(增長計算資源時,吞吐量和處理能力相應增長)

  • 縮小鎖的範圍synchronized方法變爲synchronized代碼塊(鎖真正關鍵的地方)
  • 縮小鎖的粒度synchronized方法(鎖對象,static鎖Class對象)變爲synchronized代碼塊(鎖變量或對象)
  • 鎖分段:多個鎖保護不一樣的區域,如10個對象數組保護10個數據片斷(每片10個),經過取餘獲取相應的鎖,(key.hashCode % length) % lockSize

####十、一種非阻塞的計數器

/**
 * CasCounter
 * <p/>
 * Nonblocking counter using CAS
 *
 * @author Brian Goetz and Tim Peierls
 */
@ThreadSafe
public class CasCounter {
    private SimulatedCAS value;

    public int getValue() {
        return value.get();
    }

    public int increment() {
        int v;
        do {
            v = value.get();
        } while (v != value.compareAndSwap(v, v + 1));
        //非阻塞通常使用底層的併發原語操做
        return v + 1;
    }
}
相關文章
相關標籤/搜索