併發編程專題六-線程池的使用與分析

五一要結束了,是時候開始新的一波學習了~java

1、什麼是線程池?爲何要用線程池?

線程池(thread pool):一種線程使用模式。線程過多會帶來調度開銷,進而影響緩存局部性和總體性能。而線程池維護着多個線程,等待着監督管理者分配可併發執行的任務。這避免了在處理短期任務時建立與銷燬線程的代價。線程池不只可以保證內核的充分利用,還能防止過度調度。 數據庫

優點:編程

  1. 下降資源的消耗。下降線程建立和銷燬的資源消耗;
  2. 提升響應速度。例如:線程的建立時間爲T1,執行時間T2,銷燬時間T3,免去T1和T3的時間
  3. 提升線程的可管理性。

2、如何實現一個線程池

根據線程池的概念,若是要本身建立線程池,應該知足一下條件。緩存

  1. 保存線程的容器。由於線程必須在池子已經建立好了,而且能夠保持住,所以,須要一個容器去保存咱們的線程。
  2. 能夠接受外部任務。線程還要可以接受外部的任務,冰並運行這個任務。
  3. 保存任務的容器,有些任務可能來不及執行,所以須要未來不及執行的任務經過容器保存起來。

根據以上的條件以及以前咱們學的併發編程知識,咱們先手動本身嘗試寫一個線程池服務器

Code:網絡

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
 * @Auther: DarkKing
 * @Date: 2019/5/4 12:09
 * @Description:
 */
public class MyThreadPool {
    // 線程池中默認線程的個數爲5
    private static int WORK_NUM = 5;
    // 隊列默認任務個數爲100
    private static int TASK_COUNT = 100;  
    
    // 工做線程組
    private WorkThread[] workThreads;

    // 任務隊列,做爲一個緩衝
    private final BlockingQueue<Runnable> taskQueue;
    private final int worker_num;//用戶在構造這個池,但願的啓動的線程數

    // 建立具備默認線程個數的線程池
    public MyThreadPool() {
        this(WORK_NUM,TASK_COUNT);
    }

    // 建立線程池,worker_num爲線程池中工做線程的個數
    public MyThreadPool(int worker_num,int taskCount) {
    	if (worker_num<=0) worker_num = WORK_NUM;
    	if(taskCount<=0) taskCount = TASK_COUNT;
        this.worker_num = worker_num;
        taskQueue = new ArrayBlockingQueue<>(taskCount);
        workThreads = new WorkThread[worker_num];
        for(int i=0;i<worker_num;i++) {
        	workThreads[i] = new WorkThread();
        	workThreads[i].start();
        }
       
    }
    
    // 執行任務,其實只是把任務加入任務隊列,何時執行有線程池管理器決定
    public void execute(Runnable task) {
    	try {
			taskQueue.put(task);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

    }


    // 銷燬線程池,該方法保證在全部任務都完成的狀況下才銷燬全部線程,不然等待任務完成才銷燬
    public void destroy() {
        // 工做線程中止工做,且置爲null
        System.out.println("ready close pool.....");
        for(int i=0;i<worker_num;i++) {
        	workThreads[i].stopWorker();
        	workThreads[i] = null;//help gc
        }
        taskQueue.clear();// 清空任務隊列
    }

    // 覆蓋toString方法,返回線程池信息:工做線程個數和已完成任務個數
    @Override
    public String toString() {
        return "WorkThread number:" + worker_num
                + "  wait task number:" + taskQueue.size();
    }

    /**
     * 內部類,工做線程
     */
    private class WorkThread extends Thread{
    	
    	@Override
    	public void run(){
    		Runnable r = null;
    		try {
				while (!isInterrupted()) {
				    //監聽阻塞隊列,若是有任務,則執行相應的任務
					r = taskQueue.take();
					if(r!=null) {
						System.out.println(getId()+" ready exec :"+r);
						r.run();
					}
					r = null;//help gc;
				} 
			} catch (Exception e) {
				// TODO: handle exception
			}
    	}
    	
    	//中止線程
    	public void stopWorker() {
    		interrupt();
    	}
    	
    }
}

一、定義WorkThread類,用來表示執行的線程,用於監聽阻塞隊列任務。併發

二、建立構建函數,咱們將線程池進行初始化,並啓動全部的工做線程。workThreads用來保存運行的線程,使用BlockingQueue<Runnable> taskQueue用來保存咱們的任務隊列dom

三、建立提交任務方法execute,用於提交咱們的任務。異步

四、建立銷燬線程池的方法destroy,用於銷燬線程池。ide

以後咱們編寫測試類

import java.util.Random;

/**
 * @Auther: DarkKing
 * @Date: 2019/5/4 12:09
 * @Description:
 */
public class TestMyThreadPool {
    public static void main(String[] args) throws InterruptedException {
        // 建立3個線程的線程池
        MyThreadPool t = new MyThreadPool(3,0);
        t.execute(new MyTask("testA"));
        t.execute(new MyTask("testB"));
        t.execute(new MyTask("testC"));
        t.execute(new MyTask("testD"));
        t.execute(new MyTask("testE"));
        System.out.println(t);
        Thread.sleep(10000);
        t.destroy();// 全部線程都執行完成才destory
        System.out.println(t);
    }

    // 任務類
    static class MyTask implements Runnable {

        private String name;
        private Random r = new Random();

        public MyTask(String name) {
            this.name = name;
        }

        public String getName() {
            return name;
        }

        @Override
        public void run() {// 執行任務
            try {
                Thread.sleep(r.nextInt(1000)+2000);
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getId()+" sleep InterruptedException:"
                        +Thread.currentThread().isInterrupted());
            }
            System.out.println("任務 " + name + " 完成");
        }
    }
}

能按照線程池的方式進行執行。

咱們簡單地手動寫了一個線程池,但以上線程池有哪些問題呢?

一、啓動的時候就將全部線程啓動了,若是長時間不用比較消耗資源。

二、沒有任務飽和的一個錯略。

三、沒有任務超時機制等等。

3、JDK中的線程池和工做機制

咱們大體瞭解了線程池的一個機制,那咱們看下JDK中,是如何實現線程池的吧。

一、線程池的建立

JAVA中,ThreadPoolExecutor,是全部線程池實現的父類,類結構圖以下。

 

它的構造函數含有如下參數

參數 含義
int corePoolSize  線程池中核心線程數,< corePoolSize  ,就會建立新線程,= corePoolSize  ,這個任務就會保存到BlockingQueue,若是調用prestartAllCoreThreads()方法就會一次性的啓動corePoolSize  個數的線程。
int maximumPoolSize 容許的最大線程數,若是BlockingQueue也滿了,而且線程數< maximumPoolSize時候就會再次建立新的線程
long keepAliveTime 線程空閒下來後,存活的時間,這個參數只在線程數> corePoolSize纔有用
TimeUnit unit 存活時間的單位值
BlockingQueue<Runnable> workQueue 保存任務的阻塞隊列
ThreadFactory threadFactory 建立線程的工廠,給新建的線程賦予名字
RejectedExecutionHandler handler

飽和策略

AbortPolicy :直接拋出異常,默認;

CallerRunsPolicy:用調用者所在的線程來執行任務

DiscardOldestPolicy:丟棄阻塞隊列裏最老的任務,隊列裏最靠前的任務

DiscardPolicy :當前任務直接丟棄

實現本身的飽和策略只要實現RejectedExecutionHandler接口便可

提交任務

execute(Runnable command)  不須要返回

Future<T> submit(Callable<T> task) 須要返回值

關閉線程池

shutdown(),shutdownNow();

shutdownNow():設置線程池的狀態,還會嘗試中止正在運行或者暫停任務的線程

shutdown()設置線程池的狀態,只會中斷全部沒有執行任務的線程

二、線程池的工做機制

 

 

一、若是工做線程數小於核心線程數,則建立工做線程

二、若是工做線程數等於或者大於核心線程數,則將任務提交到阻塞隊列中

三、若是阻塞隊列也滿了,但線程數小於最大線程數,則建立新的線程

四、若是建立新的線程也滿了,則執行任務飽和策略。

源碼以下

三、如何合理配置線程池

根據任務的性質來:計算密集型(CPU),IO密集型,混合型

計算密集型:例如加密,大數分解,正則……等

推薦:機器的Cpu核心數+1,爲何+1,防止頁缺失,(機器的Cpu核心=Runtime.getRuntime().availableProcessors();)

IO密集型:讀取文件,數據庫鏈接,網絡通信,

推薦:線程數適當大一點,機器的Cpu核心數*2,

混合型:儘可能拆分,若是IO密集型遠遠計算密集型,拆分意義不大。

在阻塞隊列的選擇上,應該使用有界,無界隊列可能會致使內存溢出

四、預約義的線程池

Java中,幫咱們預約了5種線程池

一、FixedThreadPool

建立固定線程數量的線程池,適用於負載較重的服務器,使用了無界隊列

二、SingleThreadExecutor

建立單個線程的線程池,適用於須要順序保證執行任務,不會有多個線程業務,使用了無界隊列

三、CachedThreadPool

會根據須要來建立新線程的,適用於執行不少短時間異步任務的程序,使用了SynchronousQueue

四、WorkStealingPool(JDK7之後)

工做密取線程池,基於ForkJoinPool實現。適用於大任務分解的線程池。

五、ScheduledThreadPoolExecutor

須要按期執行週期任務的線程池。有兩種實現

newSingleThreadScheduledExecutor:只包含一個線程,只須要單個線程執行週期任務,保證順序的執行各個任務

newScheduledThreadPool 能夠包含多個線程的,線程執行週期任務,適度控制後臺線程數量的時候

方法說明:

schedule:只執行一次,任務還能夠延時執行

scheduleAtFixedRate:提交固定時間間隔的任務

scheduleWithFixedDelay:提交固定延時間隔執行的任務

具體使用,你們能夠自行百度,都比較多,通常若是併發比較高的業務中,以上線程池都不建議使用,由於他們採用的都是無界隊列,任務量比較大的時候有可能致使內存溢出。通常正確用法是直接使用ThreadPoolExecutor進行建立,或根據自身需求進行自定義。

 

本章重點:線程池的建立,使用,以及運行原理。

其餘閱讀   併發編程專題

481021518c4b8fa00ba60ef9609c53b2b5f.jpg

相關文章
相關標籤/搜索