java學習:線程池和異步

1.異步和同步java

同步執行很容易理解,代碼的操做順序就是程序執行的順序。可是實際使用中,不少場景經常會受限於同步執行,不能充分利用cpu的資源,例如,要查找一大批數據中的最大數,同步執行時,多是花費10單位的時間讀取數據,1單位的時間進行計算,總計在11單位時間後獲得結果;而,異步執行時,分派10個線程執行任務,將會花費1單位的時間讀取數據,1單位時間進行計算,總計在2單位時間後獲得結果。數據庫

相對於同步而言,異步本質上是申請線程,提升cpu的利用率(單核cpu執行計算密集型任務時會下降)更快地獲得結果。在解決問題時合理地選擇同步和異步能更好地利用好計算資源。編程

從數據的角度來看,在同步操做中,數據的變化都保持必定的前後順序關係,不會出現衝突的狀況;而在異步操做中,數據隨時可能被其中某個線程更改,這時須要注意數據的一致性,尤爲是寫操做,要保證事務性,這裏能夠對數據加鎖來實現。另外有時線程之間也須要保證必定的順序,須要使用線程鎖(這裏有的能夠經過編碼技巧或者回調方法解決)。多線程

 

2.線程池異步

在面向對象編程中,建立和銷燬對象是很費時間的,由於建立一個對象要獲取內存資源或者其它更多資源。在Java中更是如此,虛擬機將試圖跟蹤每個對象,以便可以在對象銷燬後進行垃圾回收。因此提升服務程序效率的一個手段就是儘量減小建立和銷燬對象的次數,特別是一些很耗資源的對象建立和銷燬。如何利用已有對象來服務就是一個須要解決的關鍵問題,其實這就是一些"池化資源"技術產生的緣由。好比你們所熟悉的數據庫鏈接池正是遵循這一思想而產生的,接下來將介紹的線程池技術一樣符合這一思想。ide

在java中,可使用java.util.concurrent.ThreadPoolExecutor來建立線程池,this

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

建立方法中的幾個參數須要注意:編碼

先說最重要的3個參數,corePoolSize,maximumPoolSize,workQueue。corePoolSize設定核心線程數,maximumPoolSize設定最大線程數,workQueue設定等待隊列。當有新任務時,首先檢查當前的線程數是否小於corePoolSize,若是是,則新建線程處理任務;若是不是,再檢查當前workQueue是否已滿,若是未滿,則把新的任務加入workQueue,core線程完成時會從workQueue中取得任務繼續執行;若是workQueue已滿,再檢查當前線程數是否小於maximumPoolSize,若是是,則建立線程處理任務,若是不是,則拋出拒絕服務的異常(默認是拋出異常,具體如何處理是最後一個參數RejectExecutionHandler來決定的)。spa

其餘的參數分別是,keepAliveTime、unit控制空閒線程的存活時間;threadFactory,建立新的線程時使用的建立者;handler,拒絕服務時的處理方式;(後兩個參數都有默認的選擇)線程

 

從線程池的工做方式能夠看到,core,max,queue決定了線程池的表現,下面講述這三個參數的參考設定。

通常來講,須要處理的qps平均爲n,最大爲m,每一個請求的處理時間爲t(秒)時,core=nt+,max=mt+,queue視須要而定(當這些請求須要儘快響應,cpu資源常有空閒時,queue=0)

對於cpu密集型任務,如大量數據的計算,匹配,排序等,這時cpu的處理能力成爲瓶頸,core和max要注意不要設定得太大,要衡量好cpu的處理能力。

對於io密集型任務,如操做數據庫,http請求等,core和max能夠考慮設定得更大,由於線程一般處於等待之中,不會耗費多少cpu。

(20160329.add)對於cpu密集型任務:cpu爲單核時,沒有必要使用多線程:單線程時,cpu資源主要都在計算上;多線程時,cpu還須要額外耗費線程之間切換的資源,下降了計算效率。cpu爲多核時,有必要使用多線程(單線程時只有一個核在進行計算)

——總的來講,當cpu常有空閒的狀況時,就應該考慮使用多線程了。

 

一些使用的例子:

1.簡單的一個線程

public class TestClass {
    /*/*/
    private SettableFuture<String> settableFuture = SettableFuture.create();
    public void todo(final String param) throws Exception{
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(100);
                    System.out.println("begin " + param);
                    Thread.sleep(1000);
                    System.out.println("finish sleep");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    settableFuture.set("complete " + param);
                }
            }
        });
        thread.start();
    }

    public SettableFuture<String> getSettableFuture() {
        return settableFuture;
    }

    public static void main(String[] args) throws Exception {
        TestClass testClass = new TestClass();
        testClass.todo("test");
        System.out.println("start todo");
        System.out.println(testClass.getSettableFuture().get());
    }
}

 

2.線程池&異步回調

public class TestClass {
    private ListenableFuture<String> listenableFuture;
    private ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));

    public void todo(final String param){
        listenableFuture = listeningExecutorService.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                System.out.println("call " + param);
                Thread.sleep(100);
                return "call " + param + " complete";
            }
        });

        Futures.addCallback(listenableFuture, new FutureCallback<String>() {

            @Override
            public void onSuccess(String s) {
                try {
                    System.out.println(s + " success");
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            @Override
            public void onFailure(Throwable throwable) {
                System.out.println("failed");
            }
        });
    public static void main(String[] args) throws InterruptedException {
        TestClass testClass = new TestClass();
        testClass.todo("test");
        System.out.println("ok");
    }
}

 

3.同時執行多個異步回調任務,等待全部任務結束後,輸出全部任務的執行結果

public class TestClass {
    private ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5));
    public void todo(final String param, final CountDownLatch countDownLatch, final List<String> result) throws InterruptedException {
            ListenableFuture listenableFuture = listeningExecutorService.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    Thread.sleep(100);
                    System.out.println("exec " + param);
                    result.add(String.valueOf(param));
                    System.out.println("exec "+param+" finished");
                    return String.valueOf(param);
                }
            });
            Futures.addCallback(listenableFuture, new FutureCallback<String>() {
                @Override
                public void onSuccess(String s) {
                    System.out.println("success "+s);
                    countDownLatch.countDown();
                }

                @Override
                public void onFailure(Throwable throwable) {
                    System.out.println("failed");
                    countDownLatch.countDown();
                }
            });
        }
    public static void main(String[] args) throws InterruptedException {
        int taskSize = 4;
        TestClass testClass = new TestClass();
        final List<String> result = Lists.newArrayList();
        final CountDownLatch countDownLatch = new CountDownLatch(taskSize);

        for (int i = 0; i < taskSize; i++) {
            testClass.todo("test" + i, countDownLatch, result);
        }

        System.out.println("add task finished");
        countDownLatch.await(10, TimeUnit.SECONDS);
        System.out.println(result);
        testClass.listeningExecutorService.shutdown();
    }
    //*/
}
相關文章
相關標籤/搜索