java 線程池 - ThreadPoolExecutor

1. 爲何要用線程池

  • 減小資源的開銷 
  • 減小了每次建立線程、銷燬線程的開銷。
  • 提升響應速度 ,每次請求到來時,因爲線程的建立已經完成,故能夠直接執行任務,所以提升了響應速度。
  • 提升線程的可管理性 ,線程是一種稀缺資源,若不加以限制,不只會佔用大量資源,並且會影響系統的穩定性。 所以,線程池能夠對線程的建立與中止、線程數量等等因素加以控制,使得線程在一種可控的範圍內運行,不只能保證系統穩定運行,並且方便性能調優。

2. 類圖

                   

  ExecutorService提供了兩種基礎線程池的選擇,ScheduledThreadPoolExecutor定時任務的)和ThreadPoolExecutor普通的),本文主要介紹ThreadPoolExecutor。java

 

《阿里巴巴java編程手冊》併發處理部分,講了兩個原則:編程

 1. 線程資源必須經過線程池提供,不容許在應用中自行顯式建立線程。併發

  使用線程池的好處是減小在建立和銷燬線程上所消耗的時間及系統資源,解決資源不足的問題。若是不使用線程池,有可能形成系統建立大量同類線程而致使消耗完內存或者「過分切換」的問題。app

  咱們知道,有繼承Thread,implements Runnable, implements Callable<T>這些種方法建立線程,但推薦使用線程池來建立線程。ide

 2. 線程池不容許使用Executors建立,而是經過ThreadPoolExecutor的方式建立,這樣的處理方式能讓編寫代碼的工程師更加明確線程池的運行規則,規避資源耗盡的風險。

性能

 Executors返回的線程池對象的弊端以下:this

  1)FixedThreadPool和SingleThreadPool:容許的請求隊列長度爲Integer.MAX_VALUE,可能會堆積大量的請求,從而致使OOM(無參的LinkedBlockingQueue的長度默認是Integer.MAX_VALUE)spa

public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }

  2)CachedThreadPool和ScheduledThreadPool:容許的建立線程數量爲Integer.MAX_VALUE,可能會堆積大量的請求,從而致使OOM線程

public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }

  

ThreadPoolExecutor類介紹code

ThreadPoolExecutor.class構造方法

//這個是ThreadPoolExecutor完整的構造器,其餘的構造器其實也是在內部調用這個. 
ThreadPoolExecutor(    int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

           

 

       

下咱們以ThreadPoolExecutor爲例展現下線程池的工做流程圖

        

 

         

       

例子 使用默認的拒絕策略AbortPolicy

public class Demo1 { public static void main(String[] args) { BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(10); RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy(); ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 3, 0, TimeUnit.SECONDS, queue, handler); for (int i = 0; i < 20; i ++){ final int temp = i; pool.execute(() -> { System.out.println("客戶" + temp + "來了......."); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } }); } pool.shutdown(); } }
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.kco.test17.demo1.Demo1$$Lambda$1/15497079@ca494b rejected from java.util.concurrent.ThreadPoolExecutor@1a4f24f[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at com.kco.test17.demo1.Demo1.main(Demo1.java:16)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
pool-1-thread-1客戶0來了.......
pool-1-thread-2客戶1來了.......
pool-1-thread-3客戶2來了.......
pool-1-thread-5客戶14來了.......
pool-1-thread-4客戶13來了.......
pool-1-thread-2客戶3來了.......
pool-1-thread-1客戶4來了.......
pool-1-thread-5客戶5來了.......
pool-1-thread-3客戶6來了.......
pool-1-thread-4客戶7來了.......
pool-1-thread-2客戶9來了.......
pool-1-thread-1客戶8來了.......
pool-1-thread-3客戶10來了.......
pool-1-thread-5客戶11來了.......
pool-1-thread-4客戶12來了.......

從結果看出來,能夠看出線程是重複被使用的,並且當執行的任務超過工做隊列的容量時,線程確實拋出了異常.

例子2 使用忽略策略 DiscardPolicy

將   RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();改成 

  RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardPolicy();

運行結果以下:

pool-1-thread-1客戶0來了.......
pool-1-thread-3客戶2來了.......
pool-1-thread-4客戶13來了.......
pool-1-thread-5客戶14來了.......
pool-1-thread-3客戶3來了.......
pool-1-thread-4客戶4來了.......
pool-1-thread-1客戶5來了.......
pool-1-thread-5客戶6來了.......
pool-1-thread-2客戶1來了.......
pool-1-thread-3客戶7來了.......
pool-1-thread-4客戶8來了.......
pool-1-thread-5客戶9來了.......
pool-1-thread-1客戶10來了.......
pool-1-thread-2客戶11來了.......
pool-1-thread-4客戶12來了.......

  如今線程池正確退出了,並且也不拋出異常了,可是超過工做隊列容量的任務所有被忽略了.

例子3 使用忽略最先任務策略 DiscardOldestPolicy

 RejectedExecutionHandler 改成 RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy();

pool-1-thread-1客戶0來了.......
pool-1-thread-2客戶1來了.......
pool-1-thread-3客戶2來了.......
pool-1-thread-5客戶14來了.......
pool-1-thread-4客戶13來了.......
pool-1-thread-4客戶8來了.......
pool-1-thread-1客戶11來了.......
pool-1-thread-5客戶10來了.......
pool-1-thread-3客戶9來了.......
pool-1-thread-2客戶12來了.......
pool-1-thread-1客戶15來了.......
pool-1-thread-4客戶16來了.......
pool-1-thread-5客戶17來了.......
pool-1-thread-2客戶19來了.......
pool-1-thread-3客戶18來了.......

  從以上結果,咱們能夠看出除了客戶0客戶2恰好是3個核心線程被執行後,客戶3客戶7直接被忽略掉了.

例子4 使用來着不拒策略 CallerRunsPolicy

  一樣講拒絕策略改成RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
運行程序,結果以下:

pool-1-thread-1客戶0來了.......
pool-1-thread-2客戶1來了.......
pool-1-thread-3客戶2來了.......
pool-1-thread-4客戶13來了.......
main客戶15來了.......
pool-1-thread-5客戶14來了.......
pool-1-thread-2客戶3來了.......
pool-1-thread-1客戶4來了.......
main客戶18來了.......
pool-1-thread-3客戶5來了.......
pool-1-thread-4客戶7來了.......
pool-1-thread-5客戶6來了.......
pool-1-thread-5客戶8來了.......
pool-1-thread-1客戶9來了.......
pool-1-thread-4客戶10來了.......
pool-1-thread-3客戶12來了.......
pool-1-thread-2客戶11來了.......
pool-1-thread-1客戶16來了.......
pool-1-thread-5客戶19來了.......
pool-1-thread-3客戶17來了.......

  結果,咱們能夠發現全部的任務都被執行,並且居然還有兩個是在主線程執行的.如今明白我以前說的則直接運行任務的run方法的意思了吧,沒錯是直接調用run方法,而不是開啓線程去執行任務.

例子5 使用自定義的拒絕策略

 如今咱們本身寫一個拒絕策略,要求全部的任務都必須被線程池執行,並且都要在線程池中執行.

public class Demo5 {
    public static void main(String[] args) {
        BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(10);
        RejectedExecutionHandler handler = new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                if (!executor.isShutdown()){
                    try {
                        executor.getQueue().put(r);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 5, 0, TimeUnit.SECONDS, queue, handler);
        for (int i = 0; i < 20; i ++){
            final int temp = i;
            pool.execute(() -> {
                String name = Thread.currentThread().getName();
                System.out.println(name + "客戶" + temp + "來了.......");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        pool.shutdown();
    }
}

 運行結果: 

pool-1-thread-1客戶0來了.......
pool-1-thread-3客戶2來了.......
pool-1-thread-5客戶14來了.......
pool-1-thread-4客戶13來了.......
pool-1-thread-2客戶1來了.......
pool-1-thread-1客戶3來了.......
pool-1-thread-3客戶4來了.......
pool-1-thread-5客戶5來了.......
pool-1-thread-2客戶6來了.......
pool-1-thread-4客戶7來了.......
pool-1-thread-1客戶8來了.......
pool-1-thread-3客戶9來了.......
pool-1-thread-5客戶10來了.......
pool-1-thread-4客戶11來了.......
pool-1-thread-2客戶12來了.......
pool-1-thread-1客戶15來了.......
pool-1-thread-3客戶16來了.......
pool-1-thread-5客戶17來了.......
pool-1-thread-4客戶19來了.......
pool-1-thread-2客戶18來了.......

  ok.全部任務都被線程池執行了.並且咱們自定義的拒絕策略也很簡單,就是讓工做隊列調用put讓其一直等待,直到有可用的容量存聽任務.

 

動手本身寫一個線程池:

import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;

public class MyThreadPool {
    /**存放線程的集合*/
    private ArrayList<MyThread> threads;
    /**任務隊列*/
    private ArrayBlockingQueue<Runnable> taskQueue;
    /**線程池初始限定大小*/
    private int threadNum;
    /**已經工做的線程數目*/
    private int workThreadNum;
    
    private final ReentrantLock mianLock = new ReentrantLock();
    
    public MyThreadPool(int initPoolNum) {
        this.threadNum = initPoolNum;
        this.threads = new ArrayList<>(initPoolNum);
        //任務隊列初始化爲線程池線程數的四倍
        this.taskQueue = new ArrayBlockingQueue<>(initPoolNum*4);
        this.workThreadNum = 0;
    }
    
    public void execute(Runnable runnable) {
        try {
            mianLock.lock();
            //線程池未滿,每加入一個任務則開啓一個線程
            if (this.workThreadNum < this.threadNum) {
                MyThread myThread = new MyThread(runnable);
                myThread.start();
                threads.add(myThread);
                this.workThreadNum++;
            } else { //線程池已滿,放入任務隊列,等待有空閒線程時執行
                //隊列已滿,沒法添加時,拒絕任務
                if (!taskQueue.offer(runnable)) {
                    rejectTask();
                }
            } 
        } finally {
            mianLock.unlock();
        }
    }
    
    private void rejectTask() {
        System.out.println("任務隊列已滿,沒法繼續添加,請擴大您的初始化線程池!");
    }
    
    public static void main(String[] args) {
        MyThreadPool myThreadPool = new MyThreadPool(5);
        Runnable task = new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+"執行中");
            }
        };
        for (int i = 0; i < 20; i++) {
            myThreadPool.execute(task);
        }
    }
    
    class MyThread extends Thread {
        private Runnable task;
        
        public MyThread(Runnable runnable) {
            this.task = runnable;
        }
        
        @Override
        public void run() {
            //該線程一直啓動着,不斷從任務隊列取出任務執行
            while(true) {
                //若是初始化任務不爲空,則執行初始化任務
                if(task != null) {
                    task.run();
                    task = null;
                }else { //不然去任務隊列取任務並執行
                    Runnable queueTask = taskQueue.poll();
                    if(queueTask != null)
                        queueTask.run();
                }
            }
        }
    }
}
相關文章
相關標籤/搜索