Java併發編程--4.Executor框架

簡介

Executor框架是啓動,管理線程的API, 它的內部實現是線程池機制,它有不少好處,好比使任務提交和任務執行解耦合,防止this逃逸;
它的主要API包括: Executor,  Executors, ExecutorService , Callable,   Future,   CompletionService,    ThreadPoolExecutor緩存

ExecutorService 生命週期

一個Executor的生命週期有三種狀態: 運行狀態,關閉狀態,終止狀態; ExecutorService中添加了生命週期管理的方法

Executor建立時, 處於運行狀態; 當調用shutdown()後,處於關閉狀態,中止接受新的線程,並執行已接受的線程任務; 全部任務執行完成後,處於終止狀態

Executors 建立線程池

newFixedThreadPool : 建立定長的線程池,  最多有固定數量的線程, 若是還有建立新的線程,須要放到隊列中等待, 直到有線程從池中移出
newCachedThreadPool :可緩存的線程池, 若是現有線程沒有可用,則建立一個新線程並添加到池中。60 秒鐘未被使用的線程會被移除
newSingleThreadExecutor : 只建立一個工做線程, 當工做線程異常結束, 會從新建立一個線程
newScheduledThreadPool: 建立定長的線程池, 支持定時的任務執行

線程池執行Runnable的例子

public class MyExecutors {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        
        for (int i = 0; i < 5; i++){   
            executorService.execute(new MyRunnable());   
        } 
         
        executorService.shutdown(); 
    }

}

class MyRunnable implements Runnable {

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "被調用");
    }
}

Callable和Future 攜帶結果的任務

Callable: 帶有返回值的任務
Future: 保存一個任務執行後的結果

下面給出一個例子:框架

public class MyCallable {
    public static void main(String[] args){   
        ExecutorService executorService = Executors.newCachedThreadPool();   
        List<Future<String>> resultList = new ArrayList<Future<String>>();   
  
        //建立10個任務並執行   
        for (int i = 0; i < 10; i++){   
            Future<String> future = executorService.submit(new TaskWithResult(i));   
            
            //將任務執行結果存儲到List中   
            resultList.add(future);   
        }   
  
        //遍歷任務的結果   
        for (Future<String> fs : resultList){   
                try{   
                    while(!fs.isDone());//Future返回若是沒有完成,則一直循環等待,直到Future返回完成  
                    System.out.println(fs.get());     //打印各個線程(任務)執行的結果   
               
                }catch(Exception e){   
                    e.printStackTrace();   
                }finally{   
                    //啓動一次順序關閉,執行之前提交的任務,但不接受新任務  
                    executorService.shutdown();   
                }   
        }   
    }   
}   
  
  
class TaskWithResult implements Callable<String>{   
    private int id;   
  
    public TaskWithResult(int id){   
        this.id = id;   
    }   
  
    /**  
     * 任務的具體過程,一旦任務傳給ExecutorService的submit方法, 
     * 則該方法自動在一個線程上執行 
     */   
    public String call() throws Exception {  
        System.out.println("call()方法被自動調用!!!    " + Thread.currentThread().getName());   
        //該返回結果將被Future的get方法獲得  
        return "call()方法被自動調用,任務返回的結果是:" + id + "    " + Thread.currentThread().getName();   
    }   
}

CompletionService完成服務,打包結果

任意任務完成後就把其加到結果中, 調用CompletionService的take()方法,返回 按任務的完成順序  封裝的結果, 像是一個打包的Futureide

下面給出一個例子函數

public class MyCompletionService {
    public static void main(String[] args){   
        ExecutorService executorService = Executors.newCachedThreadPool(); 
        
        //構造函數傳入一個Executor
        CompletionService<String> completionService = new ExecutorCompletionService<String>(executorService);  
        
        //建立10個任務並執行   
        for (int i = 0; i < 10; i++){   
           if(!executorService.isShutdown()) {
               //由CompletionService執行任務
               completionService.submit(new Result0());
           }   
            
        }   
        
        //把多個任務的結果加起來   
        String result = "2_";  
            try {  
                for (int i = 0; i < 10; i++) {
                    // 若是任務未完成,則該任務的take()會阻塞
                    String s = completionService.take().get();
                    result += s;  
                }    
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            } catch (ExecutionException e) {  
                e.printStackTrace();  
            }  
       
        //輸出最終的計算結果
        System.out.println(result);
        
        try{   
       
        }catch(Exception e){   
            e.printStackTrace();   
        }finally{   
            //啓動一次順序關閉,執行之前提交的任務,但不接受新任務  
            executorService.shutdown();   
        }   
    }   
}   
  
class Result0 implements Callable<String>{   
  
    /**  
     * 任務的具體過程,一旦任務傳給ExecutorService的submit方法, 
     * 則該方法自動在一個線程上執行 
     */   
    public String call() throws Exception {  
        //該返回結果將被Future的get方法獲得  
        return "1";   
    }   
}

ThreadPoolExecutor自定義線程池

ThreadPoolExecutor 有多個構造方法建立線程池,下面是一個構造方法this

public ThreadPoolExecutor(int corePoolSize,     
                          int maximumPoolSize,  
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue)
參數說明:                           
corePoolSize:線程池中所保存的核心線程數,包括空閒線程。
maximumPoolSize:池中容許的最大線程數。
keepAliveTime:線程池中的空閒線程所能持續的最長時間。
unit:持續時間的單位。
workQueue:任務執行前保存任務的隊列,僅保存由execute方法提交的Runnable任務。

添加任務的過程

1、池中的線程數量少於corePoolSize,即便線程池中有空閒線程,也會建立一個新的線程來執行新添加的任務;

2、池中的線程數量大於等於corePoolSize,但緩衝隊列未滿,則將新添加的任務放到workQueue中,
    線程池中有線程空閒出來後依次將緩衝隊列中的任務交付給空閒的線程執行;
    若是裏面有線程的空閒時間超過了keepAliveTime,就將其移除線程池

3、池中的線程數量大於等於corePoolSize,且緩衝隊列workQueue已滿,但線程數量小於maximumPoolSize,則會建立新的線程來處理被添加的任務;

四、若是線程池中的線程數量等於了maximumPoolSize,經過設置飽和策略處理,也便是設置6個參數構造函數的第6個參數RejectedExecutionHandler

整體流程就是: 先看線程池中的線程數量是否大於corePoolSize,再看緩衝隊列workQueue是否滿,最後看線程池中的線程數量是否大於maximumPoolSizespa

排隊策略

無界隊列: 採用預約義容量的LinkedBlockingQueue,理論上是該緩衝隊列能夠對無限多的任務排隊,newFixedThreadPool採用的即是這種策略
有界隊列: 通常使用ArrayBlockingQueue制定隊列的長度
同步移交 :SynchronousQueue 跳過隊列,將任務從生產者直接交給消費者,若是不存在可用於當即運行任務的線程,會構造一個新的線程來處理新添加的任務,一般是無界的,newCa chedThreadPool採用的即是這種策略。

使用案例

public class MyThreadPoolExecutor {
    public static void main(String[] args){   
        //建立等待隊列   
        BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(20);   
       
        //建立線程池,池中保存的線程數爲3,容許的最大線程數爲5  
        ThreadPoolExecutor pool = new ThreadPoolExecutor(3,5,50,TimeUnit.MILLISECONDS,bqueue);   
       
        //建立七個任務   
        Runnable t1 = new MyThread();   
        Runnable t2 = new MyThread();   
        Runnable t3 = new MyThread();   
        Runnable t4 = new MyThread();   
        Runnable t5 = new MyThread();   
        Runnable t6 = new MyThread();   
        Runnable t7 = new MyThread();   
        
        //每一個任務會在一個線程上執行  
        pool.execute(t1);   
        pool.execute(t2);   
        pool.execute(t3);   
        pool.execute(t4);   
        pool.execute(t5);   
        pool.execute(t6);   
        pool.execute(t7); 
        
        //關閉線程池   
        pool.shutdown();   
    }   
}   
  
class MyThread implements Runnable{   
    @Override   
    public void run(){   
        System.out.println(Thread.currentThread().getName() + "正在執行。。。");   
        try{   
            Thread.sleep(100);   
        }catch(InterruptedException e){   
            e.printStackTrace();   
        }   
    }   
}  

控制檯輸出:線程

pool-1-thread-2正在執行。。。
pool-1-thread-1正在執行。。。
pool-1-thread-3正在執行。。。
pool-1-thread-2正在執行。。。
pool-1-thread-1正在執行。。。
pool-1-thread-3正在執行。。。
pool-1-thread-2正在執行。。。

能夠看出7個任務在3個線程上執行code

相關文章
相關標籤/搜索