java 併發編程 :Executor框架實現java 多線程

Executor框架簡介

    在Java 5以後併發編程引入了一堆新的啓動、調度和管理線程的API。Executor框架即是Java 5中引入的,其內部使用了線程池機制,它在java.util.cocurrent 包下,經過該框架來控制線程的啓動、執行和關閉,能夠簡化併發編程的操做。所以,在Java 5以後,經過Executor來啓動線程比使用Thread的start方法更好,除了更易管理,效率更好(用線程池實現,節約開銷)外,還有關鍵的一點:有助於避免this逃逸問題——若是咱們在構造器中啓動一個線程,由於另外一個任務可能會在構造器結束以前開始執行,此時可能會訪問到初始化了一半的對象用Executor在構造器中。java


 Executor框架包括:線程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。編程


 Executor接口中之定義了一個方法execute(Runnable command),該方法接收一個Runable實例,它用來執行一個任務,任務即一個實現了Runnable接口的類。ExecutorService接口繼承自Executor接口,它提供了更豐富的實現多線程的方法,好比,ExecutorService提供了關閉本身的方法,以及可爲跟蹤一個或多個異步任務執行情況而生成 Future 的方法。 能夠調用ExecutorService的shutdown()方法來平滑地關閉 ExecutorService,調用該方法後,將致使ExecutorService中止接受任何新的任務且等待已經提交的任務執行完成(已經提交的任務會分兩類:一類是已經在執行的,另外一類是尚未開始執行的),當全部已經提交的任務執行完畢後將會關閉ExecutorService。所以咱們通常用該接口來實現和管理多線程。緩存


 ExecutorService的生命週期包括三種狀態:運行、關閉、終止。建立後便進入運行狀態,當調用了shutdown()方法時,便進入關閉狀態,此時意味着ExecutorService再也不接受新的任務,但它還在執行已經提交了的任務,當全部已經提交了的任務執行完後,便到達終止狀態。若是不調用shutdown()方法,ExecutorService會一直處在運行狀態,不斷接收新的任務,執行新的任務,服務器端通常不須要關閉它,保持一直運行便可。服務器

 Executors提供了一系列工廠方法用於建立線程池,返回的線程池都實現了ExecutorService接口。 多線程

 1. public static ExecutorService newFixedThreadPool(int nThreads)併發

     建立固定數目線程的線程池。框架


2. public static ExecutorService newCachedThreadPool()異步

    建立一個可緩存的線程池,調用execute將重用之前構造的線程(若是線程可用)。若是現有線程沒有可用的,則建立一個新線   程並添加到池中。終止並從緩存中移除那些已有 60 秒鐘未被使用的線程。ide


3.public static ExecutorService newSingleThreadExecutor()this

    建立一個單線程化的Executor。


4.public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

    建立一個支持定時及週期性的任務執行的線程池,多數狀況下可用來替代Timer類。


這四種方法都是用的Executors中的ThreadFactory創建的線程,下面就以上四個方法作個比較


newCachedThreadPool() -緩存型池子,先查看池中有沒有之前創建的線程,若是有,就 reuse.若是沒有,就建一個新的線程加入池中
-緩存型池子一般用於執行一些生存期很短的異步型任務
 所以在一些面向鏈接的daemon型SERVER中用得很少。但對於生存期短的異步任務,它是Executor的首選。
-能reuse的線程,必須是timeout IDLE內的池中線程,缺省     timeout是60s,超過這個IDLE時長,線程實例將被終止及移出池。
  注意,放入CachedThreadPool的線程沒必要擔憂其結束,超過TIMEOUT不活動,其會自動被終止。
newFixedThreadPool(int)   -newFixedThreadPool與cacheThreadPool差很少,也是能reuse就用,但不能隨時建新的線程
-其獨特之處:任意時間點,最多隻能有固定數目的活動線程存在,此時若是有新的線程要創建,只能放在另外的隊列中等待直到當前的線程中某個線程終止直接被移出池子
-和cacheThreadPool不一樣,FixedThreadPool沒有IDLE機制(可能也有,但既然文檔沒提,確定很是長,相似依賴上層的TCP或UDP IDLE機制之類的),因此FixedThreadPool多數針對一些很穩定很固定的正規併發線程,多用於服務器
-從方法的源代碼看,cache池和fixed 池調用的是同一個底層 池,只不過參數不一樣:
fixed池線程數固定,而且是0秒IDLE(無IDLE)    
cache池線程數支持0-Integer.MAX_VALUE(顯然徹底沒考慮主機的資源承受能力),60秒IDLE  
newScheduledThreadPool(int) -調度型線程池
-這個池子裏的線程能夠按schedule依次delay執行,或週期執行
SingleThreadExecutor() -單例線程,任意時間池中只能有一個線程
-用的是和cache池和fixed池相同的底層池,但線程數目是1-1,0秒IDLE(無IDLE)


通常來講,CachedTheadPool在程序執行過程當中一般會建立與所需數量相同的線程,而後在它回收舊線程時中止建立新線程,所以它是合理的Executor的首選,只有當這種方式會引起問題時(好比須要大量長時間面向鏈接的線程時),才須要考慮用FixedThreadPool。(該段話摘自《Thinking in Java》第四版)



Executor執行Runnable任務

 經過Executors的以上四個靜態工廠方法得到 ExecutorService實例,然後調用該實例的execute(Runnable command)方法便可。一旦Runnable任務傳遞到execute()方法,該方法便會自動在一個線程上執行。下面是是Executor執行Runnable任務的示例代碼:

import java.util.concurrent.ExecutorService;   
import java.util.concurrent.Executors;   
  
public class TestCachedThreadPool{   
    public static void main(String[] args){   
        ExecutorService executorService = Executors.newCachedThreadPool();   
//      ExecutorService executorService = Executors.newFixedThreadPool(5);  
//      ExecutorService executorService = Executors.newSingleThreadExecutor();  
        for (int i = 0; i < 5; i++){   
            executorService.execute(new TestRunnable());   
            System.out.println("************* a" + i + " *************");   
        }   
        executorService.shutdown();   
    }   
}   
  
class TestRunnable implements Runnable{   
    public void run(){   
        System.out.println(Thread.currentThread().getName() + "線程被調用了。");   
    }   
}


 某次執行後的結果以下:

從結果中能夠看出,pool-1-thread-1和pool-1-thread-2均被調用了兩次,這是隨機的,execute會首先在線程池中選擇一個已有空閒線程來執行任務,若是線程池中沒有空閒線程,它便會建立一個新的線程來執行任務。



Executor執行Callable任務

 在Java 5以後,任務分兩類:一類是實現了Runnable接口的類,一類是實現了Callable接口的類。二者均可以被ExecutorService執行,可是Runnable任務沒有返回值,而Callable任務有返回值。而且Callable的call()方法只能經過ExecutorService的submit(Callable<T> task) 方法來執行,而且返回一個 <T>Future<T>,是表示任務等待完成的 Future。

 Callable接口相似於Runnable,二者都是爲那些其實例可能被另外一個線程執行的類設計的。可是 Runnable 不會返回結果,而且沒法拋出通過檢查的異常而Callable又返回結果,並且當獲取返回結果時可能會拋出異常。Callable中的call()方法相似Runnable的run()方法,區別一樣是有返回值,後者沒有

 當將一個Callable的對象傳遞給ExecutorService的submit方法,則該call方法自動在一個線程上執行,而且會返回執行結果Future對象。一樣,將Runnable的對象傳遞給ExecutorService的submit方法,則該run方法自動在一個線程上執行,而且會返回執行結果Future對象,可是在該Future對象上調用get方法,將返回null。

 下面給出一個Executor執行Callable任務的示例代碼:

import java.util.ArrayList;   
import java.util.List;   
import java.util.concurrent.*;   
  
public class CallableDemo{   
    public static void main(String[] args){   
        ExecutorService executorService = Executors.newCachedThreadPool();   
        List<Future<String>> resultList = new ArrayList<Future<String>>();   
  
        //建立10個任務並執行   
        for (int i = 0; i < 10; i++){   
            //使用ExecutorService執行Callable類型的任務,並將結果保存在future變量中   
            Future<String> future = executorService.submit(new TaskWithResult(i));   
            //將任務執行結果存儲到List中   
            resultList.add(future);   
        }   
  
        //遍歷任務的結果   
        for (Future<String> fs : resultList){   
                try{   
                    while(!fs.isDone);//Future返回若是沒有完成,則一直循環等待,直到Future返回完成  
                    System.out.println(fs.get());     //打印各個線程(任務)執行的結果   
                }catch(InterruptedException e){   
                    e.printStackTrace();   
                }catch(ExecutionException e){   
                    e.printStackTrace();   
                }finally{   
                    //啓動一次順序關閉,執行之前提交的任務,但不接受新任務  
                    executorService.shutdown();   
                }   
        }   
    }   
}   
  
  
class TaskWithResult implements Callable<String>{   
    private int id;   
  
    public TaskWithResult(int id){   
        this.id = id;   
    }   
  
    /**  
     * 任務的具體過程,一旦任務傳給ExecutorService的submit方法, 
     * 則該方法自動在一個線程上執行 
     */   
    public String call() throws Exception {  
        System.out.println("call()方法被自動調用!!!    " + Thread.currentThread().getName());   
        //該返回結果將被Future的get方法獲得  
        return "call()方法被自動調用,任務返回的結果是:" + id + "    " + Thread.currentThread().getName();   
    }   
}


某次執行結果以下:



 從結果中能夠一樣能夠看出,submit也是首先選擇空閒線程來執行任務,若是沒有,纔會建立新的線程來執行任務。另外,須要注意:若是Future的返回還沒有完成,則get()方法會阻塞等待,直到Future完成返回,能夠經過調用isDone()方法判斷Future是否完成了返回。


自定義線程池


自定義線程池,能夠用ThreadPoolExecutor類建立,它有多個構造方法來建立線程池,用該類很容易實現自定義的線程池,這裏先貼上示例程序:

import java.util.concurrent.ArrayBlockingQueue;   
import java.util.concurrent.BlockingQueue;   
import java.util.concurrent.ThreadPoolExecutor;   
import java.util.concurrent.TimeUnit;   
  
public class ThreadPoolTest{   
    public static void main(String[] args){   
        //建立等待隊列   
        BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(20);   
        //建立線程池,池中保存的線程數爲3,容許的最大線程數爲5  
        ThreadPoolExecutor pool = new ThreadPoolExecutor(3,5,50,TimeUnit.MILLISECONDS,bqueue);   
        //建立七個任務   
        Runnable t1 = new MyThread();   
        Runnable t2 = new MyThread();   
        Runnable t3 = new MyThread();   
        Runnable t4 = new MyThread();   
        Runnable t5 = new MyThread();   
        Runnable t6 = new MyThread();   
        Runnable t7 = new MyThread();   
        //每一個任務會在一個線程上執行  
        pool.execute(t1);   
        pool.execute(t2);   
        pool.execute(t3);   
        pool.execute(t4);   
        pool.execute(t5);   
        pool.execute(t6);   
        pool.execute(t7);   
        //關閉線程池   
        pool.shutdown();   
    }   
}   
  
class MyThread implements Runnable{   
    @Override   
    public void run(){   
        System.out.println(Thread.currentThread().getName() + "正在執行。。。");   
        try{   
            Thread.sleep(100);   
        }catch(InterruptedException e){   
            e.printStackTrace();   
        }   
    }   
}

運行結果以下:

  

 從結果中能夠看出,七個任務是在線程池的三個線程上執行的。這裏簡要說明下用到的ThreadPoolExecuror類的構造方法中各個參數的含義。   



public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long         keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue)

corePoolSize:線程池中所保存的核心線程數,包括空閒線程。

maximumPoolSize:池中容許的最大線程數。

keepAliveTime:線程池中的空閒線程所能持續的最長時間。

unit:持續時間的單位。

workQueue:任務執行前保存任務的隊列,僅保存由execute方法提交的Runnable任務。

    根據ThreadPoolExecutor源碼前面大段的註釋,咱們能夠看出,當試圖經過excute方法講一個Runnable任務添加到線程池中時,按照以下順序來處理:

    一、若是線程池中的線程數量少於corePoolSize,即便線程池中有空閒線程,也會建立一個新的線程來執行新添加的任務;

    二、若是線程池中的線程數量大於等於corePoolSize,但緩衝隊列workQueue未滿,則將新添加的任務放到workQueue中,按照FIFO的原則依次等待執行(線程池中有線程空閒出來後依次將緩衝隊列中的任務交付給空閒的線程執行);

    三、若是線程池中的線程數量大於等於corePoolSize,且緩衝隊列workQueue已滿,但線程池中的線程數量小於maximumPoolSize,則會建立新的線程來處理被添加的任務;

    四、若是線程池中的線程數量等於了maximumPoolSize,有4種才處理方式(該構造方法調用了含有5個參數的構造方法,並將最後一個構造方法爲RejectedExecutionHandler類型,它在處理線程溢出時有4種方式,這裏再也不細說,要了解的,本身能夠閱讀下源碼)。

    總結起來,也便是說,當有新的任務要處理時,先看線程池中的線程數量是否大於corePoolSize,再看緩衝隊列workQueue是否滿,最後看線程池中的線程數量是否大於maximumPoolSize。

    另外,當線程池中的線程數量大於corePoolSize時,若是裏面有線程的空閒時間超過了keepAliveTime,就將其移除線程池,這樣,能夠動態地調整線程池中線程的數量。

    咱們大體來看下Executors的源碼,newCachedThreadPool的不帶RejectedExecutionHandler參數(即第五個參數,線程數量超過maximumPoolSize時,指定處理方式)的構造方法以下:

public static ExecutorService newCachedThreadPool() {  
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  
                                  60L, TimeUnit.SECONDS,  
                                  new SynchronousQueue<Runnable>());  
}

 它將corePoolSize設定爲0,而將maximumPoolSize設定爲了Integer的最大值,線程空閒超過60秒,將會從線程池中移除。因爲核心線程數爲0,所以每次添加任務,都會先從線程池中找空閒線程,若是沒有就會建立一個線程(SynchronousQueue<Runnalbe>決定的,後面會說)來執行新的任務,並將該線程加入到線程池中,而最大容許的線程數爲Integer的最大值,所以這個線程池理論上能夠不斷擴大。


    再來看newFixedThreadPool的不帶RejectedExecutionHandler參數的構造方法,以下:

public static ExecutorService newFixedThreadPool(int nThreads) {  
    return new ThreadPoolExecutor(nThreads, nThreads,  
                                  0L, TimeUnit.MILLISECONDS,  
                                  new LinkedBlockingQueue<Runnable>());  
}

 它將corePoolSize和maximumPoolSize都設定爲了nThreads,這樣便實現了線程池的大小的固定,不會動態地擴大,另外,keepAliveTime設定爲了0,也就是說線程只要空閒下來,就會被移除線程池,勇於LinkedBlockingQueue下面會說。


    下面說說幾種排隊的策略:

    一、直接提交。緩衝隊列採用 SynchronousQueue,它將任務直接交給線程處理而不保持它們。若是不存在可用於當即運行任務的線程(即線程池中的線程都在工做),則試圖把任務加入緩衝隊列將會失敗,所以會構造一個新的線程來處理新添加的任務,並將其加入到線程池中。直接提交一般要求*** maximumPoolSizes(Integer.MAX_VALUE) 以免拒絕新提交的任務。newCachedThreadPool採用的即是這種策略。

    二、***隊列。使用***隊列(典型的即是採用預約義容量的 LinkedBlockingQueue,理論上是該緩衝隊列能夠對無限多的任務排隊)將致使在全部 corePoolSize 線程都工做的狀況下將新任務加入到緩衝隊列中。這樣,建立的線程就不會超過 corePoolSize,也所以,maximumPoolSize 的值也就無效了。當每一個任務徹底獨立於其餘任務,即任務執行互不影響時,適合於使用***隊列。newFixedThreadPool採用的即是這種策略。

    三、有界隊列。當使用有限的 maximumPoolSizes 時,有界隊列(通常緩衝隊列使用ArrayBlockingQueue,並制定隊列的最大長度)有助於防止資源耗盡,可是可能較難調整和控制,隊列大小和最大池大小須要相互折衷,須要設定合理的參數。



轉自:http://blog.csdn.net/ns_code/article/details/17465497


寫的很是詳細,感謝!

相關文章
相關標籤/搜索