ThreadPoolExecutor的擴展

    在java中,在併發的狀況能夠使用ThreadPoolExecutor,但爲了提升吞吐量和性能,能夠擴展ThreadPoolExecutor。java

    ThreadPoolExecutor的執行流程:併發

  • 當core線程數大於當前任務數時,能夠建立新的線程;
  • 當core線程數小於當前執行的任務數時,則將任務加入到隊列;
  • 當隊列已滿時,則開始建立新的線程,當線程數達到max線程數時,則會拋出RejectedExcutionException異常。

    這樣也行會出現一個問題,若是當任務數不斷增大,core線程數已不足於應付時,就會頻繁的將任務加入隊列,不斷的建立隊列,加入隊列,對性能具備很大影響,若是隊列設置較大時,max線程則不多會使用到,那麼不妨考慮如下實現方式也行會更好些:ide

  • 當core線程數大於當前任務數時,能夠建立新的線程;
  • 當core線程數小於當前執行的任務數時,而當前任務數小於max線程數時,則建立新的線程運行任務;
  • 當max線程數沒法應付時,在將任務加入隊列,如隊列滿時,在拋出RejectedExcutionException異常。

這樣實現最大的利用了可用線程數,減小了建立隊列加入隊列的系統消耗。如下爲代碼實現:性能

自定義ThreadPoolExecutor:測試

public class ThreadPoolExecutorExtend extends java.util.concurrent.ThreadPoolExecutor{this

    private final AtomicInteger atomicInt = new AtomicInteger(0);
    public ThreadPoolExecutorExtend(int corePoolSize, int maximumPoolSize,
            long keepAliveTime, TimeUnit unit, TaskQueue workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        workQueue.setExecutor(this);
    }
    @Override
    public void execute(Runnable command) {
        try{
            atomicInt.incrementAndGet();
            super.execute(command);
        }catch(Exception e){
            ((TaskQueue)super.getQueue()).offer(command);
        }
    }
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        atomicInt.decrementAndGet();
        super.afterExecute(r, t);
    }
    
    public int getSubmitTask(){
        return atomicInt.get();
    }
}atom

自定義LinkedBlockingQueue:.net

public class TaskQueue extends java.util.concurrent.LinkedBlockingQueue<Runnable>{線程

    private static final long serialVersionUID = 2804119236686921894L;隊列

    private ThreadPoolExecutorExtend executor;
    
    public TaskQueue(int size) {
        super(size);
    }
    
    @Override
    public boolean offer(Runnable e) {
        // current task size
        int currentTask = executor.getPoolSize();
        if(currentTask < executor.getMaximumPoolSize()){
            return false;
        }
        return super.offer(e);
    }
    
    public void setExecutor(ThreadPoolExecutorExtend e){
        executor = e;
    }
    
}


測試類:

public class MyThreadPoolTest {
    
    public static void main(String[] args) {
        int corePoolSize = 5;
        int maximumPoolSize = 10;
        int keepAliveTime = 60;
        final ThreadPoolExecutorExtend pool = new ThreadPoolExecutorExtend(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, new TaskQueue(5));
        for(int i=0;i<15;i++){
            pool.execute(new Runnable() {
                
                public void run() {
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                
            });
            System.out.println("當前線程:"+pool.getPoolSize()+",當前隊列數量:"+pool.getQueue().size());
        }
    }
}

運行結果以下:

當前線程:1,當前隊列數量:0
當前線程:2,當前隊列數量:0
當前線程:3,當前隊列數量:0
當前線程:4,當前隊列數量:0
當前線程:5,當前隊列數量:0
當前線程:6,當前隊列數量:0
當前線程:7,當前隊列數量:0
當前線程:8,當前隊列數量:0
當前線程:9,當前隊列數量:0
當前線程:10,當前隊列數量:0
當前線程:10,當前隊列數量:1
當前線程:10,當前隊列數量:2
當前線程:10,當前隊列數量:3
當前線程:10,當前隊列數量:4
當前線程:10,當前隊列數量:5

可用將TaskQueue類中重寫offer方法註釋掉,再次運行測試類,看運行結果,便可看到區別。

相關文章
相關標籤/搜索