併發編程(貳):線程池淺析

線程池淺析


描述

線程的建立、啓動、銷燬等是一個很是消耗資源的過程。引出線程池。java

線程池做用

  1. 下降資源消耗,重複利用已建立好的線程。
  2. 提升響應速度,經過已經建立好的線程直接執行到達的任務,無需等待。
  3. 線程的統一管理,對線程統一分配、監控和調優(專人專職)。

線程池的建立一

1、建立

目前線程池的共有六種建立方式。先來講明常見的四種建立方式,本質都是經過改變構造函數的參數來建立不一樣的線程的。最終調用的構造函數都是:面試

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

如下以demo分別說明線程池的建立:數組

一、建立緩存線程
ExecutorService threadPool = Executors.newCachedThreadPool();
 for(int i=0;i<10;i++){
     int temp = i;
     threadPool.execute(new Runnable() {
         @Override
         public void run() {
             System.out.println(Thread.currentThread().getName()+","+temp);
         }
     });
 }
 threadPool.shutdown();// 停掉線程池
二、建立固定長度的線程(經常使用)
ExecutorService threadPool = Executors.newFixedThreadPool(3);
  for (int i = 0; i < 10; i++) {
      int temp = i;
      threadPool.execute(new Runnable() {
          @Override
          public void run() {
              System.out.println(Thread.currentThread().getName()+","+temp);
          }
      });
  }
  threadPool.shutdown();// 停掉線程池
三、建立定時線程
ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(3);
 threadPool.schedule(new Runnable() {
     @Override
     public void run() {
         System.out.println("我是定時線程,三秒後啓動");
     }
 },3, TimeUnit.SECONDS); // 第一個參數是任務,第二個參數是時間長度,第三個參數時間單位
 threadPool.shutdown();// 停掉線程池
四、建立單線程
ExecutorService threadPool = Executors.newSingleThreadExecutor();
 for (int i = 0; i < 10; i++) {
     threadPool.execute(new Runnable() {
         @Override
         public void run() {
             System.out.println("我是單線程線程");
         }
     });
 }
 threadPool.shutdown();// 停掉線程池
2、線程池參數說明

重點:幾乎是面試必問的。緩存

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {}

corePoolSize:核心線程數dom

maximumPoolSize:最大線程數ide

keepAliveTimea:線程空閒時間函數

unit:TimeUnit枚舉類型的值,表明keepAliveTime時間單位,能夠取下列值:this

  • TimeUnit.DAYS; //天   
  • TimeUnit.HOURS; //小時   
  • TimeUnit.MINUTES; //分鐘   
  • TimeUnit.SECONDS; //秒   
  • TimeUnit.MILLISECONDS; //毫秒   
  • TimeUnit.MICROSECONDS; //微妙   
  • TimeUnit.NANOSECONDS; //納秒

workQueue:阻塞隊列,用來存儲等待執行的任務,決定了線程池的排隊策略,有如下取值:線程

  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • SynchronousQueue

threadFactory:線程工廠,是用來建立線程的code

handler:線程拒絕策略。當建立的線程超出maximumPoolSize,且緩衝隊列已滿時,新任務會拒絕,有如下取值:

  • ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。
  • ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,可是不拋出異常。   
  • ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,而後從新嘗試執行任務(重複此過程)   
  • ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務
3、線程的建立過程

下圖是線程池的執行流程:


流程說明:

  1. 用戶提交任務,先到核心線程池,判斷核心線程池是都已滿;

  2. 如何核心線程池未滿,線程任務執行;如何核心線程已滿,走下一步;

  3. 進入線程緩存隊列,判斷緩存隊列是否已滿;

  4. 若是線程緩存隊列已滿,進入最大線程池;

  5. 若是最大線程池未滿,建立線程任務;

  6. 若是最大線程池已滿,則拒絕。

線程池的建立二

五、newWorkStealingPool
ExecutorService m = Executors.newWorkStealingPool(2);

1)建立一個帶並行級別的線程池,並行級別決定了同一時刻最多有多少個線程在執行,如不傳如並行級別參數,將默認爲當前系統的CPU個數。

2)每一個線程維護本身的一個隊列,任務執行結束了,就本身主動去取別的任務。

3)產生的都是守護線程。

4)不是 ThreadPoolExecutor 的擴展,是 ForkJoinPool 的擴展。

5)演示:

public class Main {
    public static void main(String[] args) throws Exception {
        // 設置並行級別爲2,即默認每時每刻只有2個線程同時執行
        ExecutorService m = Executors.newWorkStealingPool(2);
        for (int i = 1; i <= 10; i++) {
            final int count=i;
            m.submit(new Runnable() {
                @Override
                public void run() {
                    Date now=new Date();
                    System.out.println("線程" + Thread.currentThread() + "完成任務:"+ count+"   時間爲:"+    now.getSeconds());
                    try {
                        Thread.sleep(1000);//此任務耗時1s
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }); 
        }
        while(true){
            //主線程陷入死循環,來觀察結果,不然是看不到結果的
        }
    }
}
六、ForkJoinPool

該線程池的思想是,把運用遞歸的思想,將大的任務拆分紅小的任務(能夠根據業務需求來控制拆分的粒度)。下面以一個面試題演示:一個長度100萬的數組,元素是一百之內的隨機數,將各個元素相加。

public class ForkJoin {
    static int[] nums = new int[1000000];
    static final int MAX_NUM = 50000;
    private static Random random = new Random();
    static {
        for (int i = 0; i < nums.length; i++) {
            nums[i] = random.nextInt(100);
        }
        System.out.println(Arrays.stream(nums).sum()); // 傳統的方式計算
    }
    // 遞歸思想,不斷將大任務分紅小任務。
    // RecursiveAction 無返回值;RecursiveTask 有返回值。
    static class AddTask extends RecursiveAction{
        int start,end;
        AddTask(int start, int end) {
            this.start = start;
            this.end = end;
        }
        @Override
        protected void compute() {
            if(end-start<=MAX_NUM){
                long sum = 0L;
                for (int i = start; i < end; i++) {
                    sum += nums[i];
                }
                System.out.println("from+"+start+"to"+end+"="+sum);
            }else{
                int middle =start + (end-start)/2;
                AddTask addTask = new AddTask(start,middle);
                AddTask addTask2 = new AddTask(middle,end);
                addTask.fork();
                addTask2.fork();
            }
        }
    }
    public static void main(String[] args) throws IOException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        AddTask addTask = new AddTask(0,nums.length);
        forkJoinPool.execute(addTask);
        System.in.read();
    }
}

該線程池的優點是,能夠充分利用多cpu,多核cpu的優點,把一個任務拆分紅多個「小任務」,把多個「小任務」放到多個處理器核心上並行執行;當多個「小任務」執行完成以後,再將這些執行結果合併起來便可。

相關文章
相關標籤/搜索