thread_ThreadPoolExecutor

目錄緩存

  1.基礎知識less

  2.簡單應用dom

  3.異常機制ide

  4.豐富的擴展函數

一.基礎知識性能

  構造函數。 this

public ThreadPoolExecutor(   
int corePoolSize,    指的是保留的線程池大小
int maximumPoolSize,    指的是線程池的最大大小
long keepAliveTime,    指的是空閒線程結束的超時時間
TimeUnit unit,   
BlockingQueue<Runnable> workQueue)  表示存聽任務的隊列  spa

  工做過程:線程

1 、線程池剛建立時,裏面沒有一個線程。任務隊列是做爲參數傳進來的。不過,就算隊列裏面有任務,線程池也不會立刻執行它們。
2 、當調用 execute() 方法添加一個任務時,線程池會作以下判斷:
  a. 若是正在運行的線程數量小於 corePoolSize,那麼立刻建立線程運行這個任務;
  b. 若是正在運行的線程數量大於或等於 corePoolSize,那麼將這個任務放入隊列。
  c. 若是這時候隊列滿了,並且正在運行的線程數量小於 maximumPoolSize,那麼仍是要建立線程運行這個任務;
  d. 若是隊列滿了,並且正在運行的線程數量大於或等於 maximumPoolSize,那麼線程池會拋出異常,告訴調用者「我不能再接受任務了」。
3 、當一個線程完成任務時,它會從隊列中取下一個任務來執行。
4 、當一個線程無事可作,超過必定的時間(keepAliveTime)時,線程池會判斷,若是當前運行的線程數大於 corePoolSize,那麼這個線程就被停掉。因此線程池的全部任務完成後,它最終會收縮到 corePoolSize 的大小。日誌

二.簡單應用  

    public void threadPool1Test() throws InterruptedException {
       BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();  
       ThreadPoolExecutor exe = new ThreadPoolExecutor(2, 6, 1, TimeUnit.DAYS, queue);  
        try {
            int threadNum = 0;
            for (int i = 0; i < 10; i++) {
                threadNum++;
                final int currentThreadNum = threadNum;
                exe.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            System.out.println("子線程[" + currentThreadNum + "]開啓");
                            Thread.sleep(1000 * 10);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } finally {
                            System.out.println("子線程[" + currentThreadNum + "]結束");
                        }
                    }
                });
            }
            
            //不會當即終止線程池,而是要等全部任務緩存隊列中的任務都執行完後才終止,但不再會接受新的任務
            System.out.println("已經開啓全部的子線程");
            exe.shutdown();
            System.out.println("shutdown():啓動一次順序關閉,執行之前提交的任務,但不接受新任務。");
            
            //判斷線程池全部任務是否執行完畢
            while (true) {
                if (exe.isTerminated()) {
                    System.out.println("全部的子線程都結束了!");
                    break;
                }
                Thread.sleep(1000);
                System.out.println("線程池中線程數目:"+exe.getPoolSize()+
                                   ",隊列中等待執行的任務數目:"+ exe.getQueue().size()+
                                   ",已執行玩別的任務數目:"+exe.getCompletedTaskCount());    
                System.out.println("線程隊列大小爲-->"+queue.size());  
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            System.out.println("主線程結束");
        }
    }

一、BlockingQueue 只是一個接口,經常使用的實現類有 LinkedBlockingQueue 和 ArrayBlockingQueue。用 LinkedBlockingQueue 的好處在於沒有大小限制。這樣的話,由於隊列不會滿,因此 execute() 不會拋出異常,而線程池中運行的線程數也永遠不會超過 corePoolSize 個,keepAliveTime 參數也就沒有意義了。
二、shutdown() 方法不會阻塞。調用 shutdown() 方法以後,主線程就立刻結束了,而線程池會繼續運行直到全部任務執行完纔會中止。若是不調用 shutdown() 方法,那麼線程池會一直保持下去,以便隨時添加新的任務。 

二.異常處理

     線程超出了線程池的總容量(線程隊列大小+最大線程數)

    @Test
    public void threadPool2Test() throws InterruptedException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 6, 1, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(4));
        //executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        executor.setRejectedExecutionHandler(
          new RejectedExecutionHandler() {
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println(String.format("Task %d rejected.", r.hashCode()));
                //System.out.println("DemoTask Rejected : " + ((DemoThread) r).getName());
                System.out.println("Waiting for a second !!"); 
                try {
                    Thread.sleep(1000);
                     executor.execute(r);
                     //executor.getQueue().put(r);
                } catch (InterruptedException e) {
                }
            }
          });
        ///////
        for (int i = 0; i < 11; i++) {
            final int taskNum = i;
            Runnable myTask = new Runnable() {
                @Override
                public void run() {
                    System.out.println("正在執行task " + taskNum);

                    try {
                        Thread.currentThread().sleep(100 * (new Random()).nextInt(8));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("task " + taskNum + "執行完畢");
                }
            };
            executor.execute(myTask);
        }
        // 不會當即終止線程池,而是要等全部任務緩存隊列中的任務都執行完後才終止,但不再會接受新的任務
        executor.shutdown();
        while (true) {
            if (executor.isTerminated()) {
                System.out.println("全部的子線程都結束了!");
                break;
            }
            Thread.sleep(1000);
            System.out.println("線程池中線程數目:" + executor.getPoolSize() + ",隊列中等待執行的任務數目:" + executor.getQueue().size()
                    + ",已執行玩別的任務數目:" + executor.getCompletedTaskCount());
        }
    }

 

   ThreadPoolExecutor 提供 4 個現有的策略,分別是:

ThreadPoolExecutor.AbortPolicy:表示拒絕任務並拋出異常

ThreadPoolExecutor.DiscardPolicy:表示拒絕任務但不作任何動做
ThreadPoolExecutor.CallerRunsPolicy:表示拒絕任務,並在調用者的線程中直接執行該任務
ThreadPoolExecutor.DiscardOldestPolicy:表示先丟棄任務隊列中的第一個任務,而後把這個任務加進隊列。
使用CallerRunsPolicy,會將全部線程都執行到,也不可避免的會使用主線程來加載一個線程任務。一次同時建立maximumPoolSize個線程,加上主線程,就是一次執行maximumPoolSize+1個線程任務,等執行完後纔會,再執行maximunPoolSize+1個線程任務,固然建立的maximumPoolSize個線程也會複用。
顯然這種策略保證了每個線程任務都會執行,但犧牲了性能。而,DiscardPolicy和DiscardOldestPolicy都會拋棄容納不了的線程任務,保證了性能。

 a. ThreadPoolExecutor能夠設置一個「拒絕策略」,這是指當一個task被拒絕添加到線程池中時,採起的處理措施,
  executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
 b.task很是重要,「等得起」,可是「丟不起」
    System.out.println("Waiting for a second !!");
   try {
      Thread.sleep(1000);
       executor.execute(r);
   } catch (InterruptedException e) {
   }
 c.利用RejectedExecutionHandler來阻塞submit()?
    首先 submit()方法是調用了workQueue的offer()方法來塞入task,而offer()方法是非阻塞的,當workQueue已經滿的時候,offer()方法會當即返回false,並不會阻塞在那裏等待workQueue有空出位置,因此要讓submit()阻塞,關鍵在於改變向workQueue添加task的行爲,
  if (!executor.isShutdown()) {
      try {
          executor.getQueue().put(r);
       } catch (InterruptedException e) {
       }
   }
   調用了getQueue()方法,獲得了workQueue,再調用其put()方法,將task放到workQueue中,而這個put()方法是阻塞的
   當workQueue滿時,submit()一個task會致使調用咱們自定義的RejectedExecutionHandler,而咱們自定義的RejectedExecutionHandler會保證該task繼續被嘗試   用阻塞式的put()到workQueue中。

d.重寫了offer()方法的BlockingQueue

              因爲submit()是調用workQueue的offer()方法來添加task的,而offer()是非阻塞的,因此,若是咱們本身實現一個BlockingQueue,其offer()方法是阻塞的

public class LimitedQueue<E> extends LinkedBlockingQueue<E> {
  public LimitedQueue(int maxSize) {
    super(maxSize);
  }
  @Override
  public boolean offer(E e) {
    // turn offer() and add() into a blocking calls (unless interrupted)
    try {
      put(e);
      return true;
    } catch (InterruptedException ie) {
      Thread.currentThread().interrupt();
    }
    return false;
  }
}

  四.豐富的可擴展性

   1.擴展生成器,如線程的建立前beforeExecute,建立後afterExecute,退出terminated;   
                         線程的建立策略, 經過重構ThreadFactory 重構線程名,設置守護線程等;

                         線程的拒絕策略,日誌,拒絕,重試等

                         線程運行拋出異常機制    

class ExtThreadPoolExecutor extends ThreadPoolExecutor {

    public ExtThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        super.setThreadFactory(new ExtThreadFactory());
        super.setRejectedExecutionHandler(new ExtRejectedExecutionHandler());
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        System.out.println("Perform beforeExecute() ");
    }
    @Override
    protected void terminated() {
        
        System.out.println("線程池退出 ");
    }
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t != null) {
            System.out.println("Perform exception handler logic");
        }
        System.out.println("Perform afterExecute() ");
    }
    /////////////////////////////////
//    @Override
//    public void execute(Runnable task) {
//        super.execute(wrap(task,clientTrace(),Thread.currentThread().getName())); 
//    }
//    @Override
//    public Future<?> submit(Runnable task) {
//        return super.submit(wrap(task,clientTrace(),Thread.currentThread().getName())); 
//    }
    private Exception clientTrace(){
        return new Exception("client stack trace");
    }
    private Runnable wrap(final Runnable task ,final Exception clientStack,String clientThreadName ){
        return new Runnable(){
            @Override
            public void run() {
                try {
                    task.run();
                } catch (Exception e) {
                    clientStack.printStackTrace();
                    throw e;
                }
                task.run();
                
            }
        };
        
    }
    ///////////////////////////////////////////
    private class ExtThreadFactory implements ThreadFactory {
        private AtomicInteger count = new AtomicInteger(0);
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            String threadName = ExtThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1);
            System.out.println("create "+threadName);
            //t.setDaemon(true);
            t.setName(threadName);
            return t;
        }
    }
    
    //
    private class ExtRejectedExecutionHandler implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.out.println("DemoTask Rejected : " + ((DemoThread) r).getName());
            System.out.println("Waiting for a second !!");

            if (!executor.isShutdown()) {
                try {
                    Thread.sleep(1000);
                    // executor.execute(r);
                     executor.getQueue().put(r);
                    // 當task被拒絕添加到線程池中時,ThreadPoolExecutor會採用「丟棄」策略來對待這個任務,即這個task被丟棄了。
                //    System.out.println("Lets add another time : " + ((DemoThread) r).getName());
                } catch (InterruptedException e) {
                }
            }
        }
    }
}

 

   2.擴展生成器

 

 

class DemoThread implements Runnable {
    private String name = null;

    public DemoThread(String name) {
        this.name = name;
    }

    public String getName() {
        return this.name;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Executing : taskname: " + name +", Thread id: " +Thread.currentThread().getId());
    }
}

 

    public static void main(String[] args) {

        BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(50);

        ExtThreadPoolExecutor pool = new ExtThreadPoolExecutor(10, 20, 5000, TimeUnit.MILLISECONDS,
                blockingQueue);

 
        for (int i = 1; i < 100; i++) {
            System.out.println("提交第" + i + "個任務!");
            pool.execute(new DemoThread(Integer.valueOf(i).toString() ) );
            pool.submit(new DemoThread(Integer.valueOf(i).toString() ));
        }

        // 2.銷燬----此處不能銷燬,由於任務沒有提交執行完,若是銷燬線程池,任務也就沒法執行了
        // exec.destory();

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
本站公眾號
   歡迎關注本站公眾號,獲取更多信息