Java線程池使用和源碼分析

1.爲何使用線程池

在多線程編程中一項很重要的功能就是執行任務,而執行任務的方式有不少種,爲何必定須要使用線程池呢?下面咱們使用Socket編程處理請求的功能,分別對每種執行任務的方式進行分析。java

1.1串行執行任務

當Socket監聽到客戶端有鏈接,經過handleSocket方法順序的處理每個客戶端鏈接,當處理完成後,繼續監聽。代碼以下:數據庫

ServerSocket serverSocket = new ServerSocket();
    SocketAddress endpoint = new InetSocketAddress(host, port);
    serverSocket.bind(endpoint,1023);
    while (!isStop) {
        Socket socket = serverSocket.accept();
        handleSocket(socket);
}

這種方式的缺點很是明顯:當我有多個客戶端請求時,在server處理一個請求的過程當中,其餘請求都須要等待前一個請求處理完畢。這種在高併發狀況下幾乎不可用。編程

1.2爲每一個任務建立一個線程

針對上面的問題進行優化:爲每個客戶端請求建立一個線程來處理請求,主線程只須要建立線程,以後便可繼續堅挺客戶端請求.流程圖以下:
安全

代碼以下:服務器

ServerSocket serverSocket = new ServerSocket();
    SocketAddress endpoint = new InetSocketAddress(host, port);
    serverSocket.bind(endpoint,1023);
    while (!isStop) {
        Socket socket = serverSocket.accept();
        new SocketHandler(socket, THREAD_NAME_PREFIX + threadIndex++).start();
}

這種方式有如下優勢:多線程

1.將處理客戶端鏈接的操做從主線程中分離出去,使得主循環能夠更快的響應下一次請求。
2.處理客戶端鏈接的操做是並行的,提升了程序的吞吐量。併發

可是這種方式有有如下幾個缺點:異步

1.處理請求的線程必須是線程安全的socket

2.線程的建立和銷燬都須要開銷,當大量建立線程的時候,將會消耗大量計算機資源ide

3.當可用的CPU數量小於可運行的線程的時候,那麼多出來的線程會佔用內存資源,給垃圾回收帶來壓力,而且在大量線程競爭CPU資源的時候會有很大的性能開銷

4.JVM中可建立的線程數存在一個上限,這個上限隨着平臺的不一樣而不一樣,而且受多個因素的限制,包括JVM的啓動參數,每一個線程所佔用的內存大小等,若是超出這些限制,將會拋出OOM異常。

1.3 使用線程池處理客戶端請求

對於1.2中出現的問題,最好的解決方案就是使用線程池來執行task,這樣能夠對建立的線程總數作限制,從而避免1.2中的問題。流程圖以下:

處理方式以下:

ServerSocket serverSocket = new ServerSocket();
    SocketAddress endpoint = new InetSocketAddress(host, port);
    serverSocket.bind(endpoint,1023);
    while (!isStop) {
        Socket socket = serverSocket.accept();
        executorService.execute(new SocketHandler(socket, THREAD_NAME_PREFIX + threadIndex++));
}

此中方式有如下幾個優勢:

1.任務提交和任務執行分離開

2.執行任務的線程能夠重用,減小了線程建立和銷燬的開銷,同時當任務到達時能夠直接使用建立好的線程執行任務,也提升了程序的響應速度。

2.java中線程池介紹

在java中線程池的實現是基於生產者-消費者模式的,線程池的功能將任務的提交和任務的執行分離,任務提交的過程爲生產者,執行任務的過程爲消費過程。具體的分析見源碼分析。java線程池的頂層接口爲Executor,源碼以下:

public interface Executor {
    void execute(Runnable command);
}

此接口爲全部線程池實現的頂層接口,其規定了能夠接受的task類型爲Runnable實現類,可是具體的執行task的邏輯由線程池實現類本身定義,好比:

可使用主線程串行執行任務,
也能夠爲每一個任務建立一個新的線程
或者提早建立好一組線程,每次執行任務的時候從一組線程中取,等等

對於線程池的執行策略主要有如下幾個方面:

1.在什麼線程中執行任務
2.按照什麼順序執行任務(FIFO、LIFO、優先級?)
3.有多少個任務能夠併發執行
4.最多能夠有多少個任務在隊列中等待執行
5.當等待隊列中達到最大值的時候,怎麼樣拒絕新提交的task
6.在執行一個任務以前或者以後須要作哪些操做?

應該根據具體的業務選擇不一樣的執行策略。在java類庫中提供了Executors工具類來常見默認策略的線程池。主要有如下幾個接口:

public static ExecutorService newFixedThreadPool(int nThreads)
將會建立一個固定大小的線程池,每當有新任務提交的時候,當線程總數沒有達到核心線程數的時候,爲每一個任務建立一個新線程,當線程的個數到達最大值後,重用以前建立的線程,當線程由於未知異常而中止時候,將會重現建立一個線程做爲補充。

public static ExecutorService newCachedThreadPool()
根據需求建立線程的個數,當線程數大於任務數的時候,將會註銷多餘的線程

public static ExecutorService newSingleThreadExecutor()
建立一個單線程的線程池

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
建立一個可執行定時任務的線程池

在以上的例子中,全部提交的task在提交到線程池後其執行狀態是不可見的,即主線程沒法知道提交的task是否執行結束或者執行結果。針對這個問題,java提供了能夠返回數據的task接口Future和Callable接口。
其中Callable接口提供了任務返回數據以及拋出異常的功能,定義以下:

public interface Callable<V> {
  
    V call() throws Exception;
}

在ExecutorService中全部的submit方法都會返回一個Future對象,其接口定義以下:

public interface Future<V> {

    取消任務執行,當mayInterruptIfRunning爲true,interruptedthisthread
    boolean cancel(boolean mayInterruptIfRunning);
    返回此任務是否在執行完畢以前被取消執行
    boolean isCancelled();
    返回此任務是否已經完成,包括正常結束,異常結束以及被cancel
    boolean isDone();
    返回執行結果,當任務沒有執行結束的時候,等待
    V get() throws InterruptedException, ExecutionException;
}

3.使用線程池可能出現的問題

1.線程飢餓死鎖
在單線程的Executor中,若是Executor中執行的一個任務中,再次提交任務到同一個Executor中,而且等待這個任務執行完畢,那麼就會發生死鎖問題。以下demo中所示:

public class ThreadDeadLock {

    private static final ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadExecutor();


    public static void main(String[] args) throws Exception {
        System.out.println("Main Thread start.");
        EXECUTOR_SERVICE.submit(new DeadLockThread());
        System.out.println("Main Thread finished.");

    }

    private static class DeadLockThread extends Thread{

        @Override
        public void run() {
            try {
                System.out.println("DeadLockThread start.");
                Future future = EXECUTOR_SERVICE.submit(new DeadLockThread2());
                future.get();
                System.out.println("DeadLockThread finished.");
            } catch (Exception e) {

            }
        }
    }

    private static class DeadLockThread2 extends Thread {

        @Override
        public void run() {
            try {
                System.out.println("DeadLockThread2 start.");
                Thread.sleep(1000 * 10);
                System.out.println("DeadLockThread2 finished.");
            } catch (Exception e) {

            }
        }
    }
}

輸出結果爲:

Main Thread start.
Main Thread finished.
DeadLockThread start.

對於多個線程的線程池,若是全部正在執行的線程都由於等待處於工做隊列中的任務執行而阻塞,那麼就會發生線程飢餓死鎖。

當往線程池中提交有依賴的任務時,應清楚的知道可能會出現的線程飢餓死鎖風險。==應考慮是否將依賴的task提交到不一樣的線程池中==
或者使用無界的線程池。

==只有當任務相對獨立時,設置線程池大小和工做隊列的大小纔是合理的,不然有可能會出現線程飢餓死鎖==

2.任務運行時間過長
任務執行時間過長會影響線程池的響應時間,當運行時間長的任務遠大於線程池線程的個數時,會出現全部線程都在執行運行時間長的任務,從而影響對其餘任務的響應。

解決辦法:

1.經過限定任務等待的時長,而不要無限期等待下去,當等待超時的時候,能夠將任務標記爲失敗,或者從新放到線程池中。

2.當線程池中阻塞任務過多的時,應該考慮擴大線程池的大小

4.線程池大小的設置

線程池的大小依賴於提交任務的類型以及服務器的可用資源,線程池的大小應該避免設置過大或者太小,當線程設置過打的時候可能會有資源耗盡的風險,線程池設置太小會有可用cpu空閒從而影響系統吞吐量。

影響線程池大小的資源有不少,好比CPU、內存、數據庫連接池等,只須要計算資源可用總資源 / 每一個任務須要的資源,取最小值,便可得出線程池的上限。
線程池的最小值應該大於可用的CPU數量。

4.java中經常使用線程池源碼分析-ThreadPoolExecutor

ThreadPoolExecutor線程池是比較經常使用的一個線程池實現類,經過Executors工具類建立的線程池中,其具體實現類是ThreadPoolExecutor。首先咱們能夠看下ThreadPoolExecutor的構造函數以下:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

下面分別對構造函數中的各個參數對應的策略進行分析:

1.線程的建立與銷燬

首先構造函數中corePoolSize、maximumPoolSize、keepAliveTime和unit參數影響線程的建立和銷燬。其中corePoolSize爲核心線程數,當第一次提交任務的時候若是正在執行的線程數小於corePoolSize,則新建一個線程執行task,若是已經超過corePoolSize,則將任務放到任務隊列中等待執行。當任務隊列的個數到達上限的時候,而且工做線程數量小於maximumPoolSize,則繼續建立線程執行工做隊列中的任務。當任務的個數小於maximumPoolSize的時候,將會把空閒的線程標記爲可回收的垃圾線程。對於如下代碼段測試此功能:

public class ThreadPoolTest {

    private static ThreadPoolExecutor executorService = new ThreadPoolExecutor(3, 6,100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3));

    public static void main(String[] args) throws Exception {
        for (int i = 0; i< 9; i++) {
            executorService.submit(new Task());
            System.out.println("Active thread:" + executorService.getActiveCount() + ".Task count:" + executorService.getTaskCount() + ".TaskQueue size:" + executorService.getQueue().size());
        }
    }

    private static class Task extends Thread {

        @Override
        public void run() {
            try {
                Thread.sleep(1000 * 100);
            } catch (Exception e) {

            }
        }
    }

}

輸出結果爲:

Active thread:1.Task count:1.TaskQueue size:0
Active thread:2.Task count:2.TaskQueue size:0
Active thread:3.Task count:3.TaskQueue size:0
Active thread:3.Task count:4.TaskQueue size:1
Active thread:3.Task count:5.TaskQueue size:2
Active thread:3.Task count:6.TaskQueue size:3
Active thread:4.Task count:7.TaskQueue size:3
Active thread:5.Task count:8.TaskQueue size:3
Active thread:6.Task count:9.TaskQueue size:3

2.任務隊列

在ThreadPoolExecutor的構造函數中能夠傳入保存任務的隊列,當新提交的任務沒有空閒線程執行時候,會將task保存到此隊列中。保存的順序是根據插入的順序或者Comparator來排序的。

3.飽和策略

ThreadPoolExecutor.AbortPolicy
拋出RejectedExecutionException

ThreadPoolExecutor.CallerRunsPolicy
將任務的執行交給調用者,即將本該異步執行的任務變成同步執行。

4.線程工廠

當線程池須要建立線程的時候,默認是使用線程工廠方法來建立線程的,一般狀況下咱們經過指定線程工廠的方式來爲線程命名,便於出現線程安全問題時候來定位問題。

6.線程池最佳實現

1.項目中全部的線程應該都有線程池來提供,不容許自行建立線程

2.儘可能不要用Executors來建立線程,而是使用ThreadPoolExecutor來建立
Executors有如下問題:

1)FixedThreadPool 和 SingleThreadPool:
容許的請求隊列長度爲 Integer.MAX_VALUE,可能會堆積大量的請求,從而致使 OOM。
2)CachedThreadPool 和 ScheduledThreadPool:
容許的建立線程數量爲 Integer.MAX_VALUE,可能會建立大量的線程,從而致使 OOM。
相關文章
相關標籤/搜索