java併發學習--線程池(一)

關於java中的線程池,我一開始以爲就是爲了不頻繁的建立和銷燬線程吧,先建立必定量的線程,而後再進行復用。可是要具體說一下如何作到的,本身又說不出一個一二三來了,這大概就是本身的學習習慣流於表面,不常常深刻的結果吧。因此這裏決定系統的學習一下線程池的相關知識。java

 

本身稍微總結了一下,學習一些新的知識或者技術的時候,大概均可以分爲這麼幾個點:多線程

一、爲何會有這項技術,用原來的方法有什麼問題。併發

二、這項新技術具體是怎麼解決這個問題的(這時可能就要涉及到一些具體的知識點和編碼了)框架

三、是否是使用這項技術問題就能夠獲得完美解決了,有沒有什麼不一樣的方案?各自的優缺點是什麼?(這是對一些具體的技術來講的,可是線程池是一個比較大的概念,可能不涉及這一點,但相應的線程池中有許多不一樣的種類,來應對不一樣的場景)less

 

下面的內容是本身讀過《實戰java 高併發程序設計》以後加上本身的理解寫的筆記,若是有錯漏之處,請你們在評論區指出。dom

 

爲何要使用線程池?


 

一、正如前面的所說,頻繁的建立和銷燬時間消耗了太多的資源,佔用了太多的時間jvm

二、線程也是須要佔用必定內存的,同時存在不少個線程的話,內存很快就溢出了,即便沒有溢出,大量的線程回收也會對GC形成很大的壓力,延長GC的停頓時間ide

 

這裏能夠舉個例子來講明一下,好比你去銀行辦理業務時,首先得拿號排隊吧,而後叫你去哪一個窗口你就得去哪一個窗口,在我看來,這就是一個很典型的線程池的例子。高併發

咱們能夠想象一下,若是不按這種模式,會是什麼樣子……工具

你來到了銀行的業務大廳,業務經理問你要辦理什麼業務,你說我想開個帳戶,因而經理拿起手機打通了職工大樓的電話,「讓負責開帳戶的的那個小組派我的過來」(new 了一個開帳戶的對象),業務員快馬加鞭的趕了過來而後幫你處理完了任務,只聽經理又說到「這裏沒你事兒了,回去吧」,因而你又回到了職工大樓。

而後又來了一個客戶……

 因而你把上述的過程又執行了一遍,那麼業務員在路上的時間可能比處理業務的時間還要長了。

更糟的是,若是有200個線程同時存在,而且每個客戶的業務處理時間都很是的長,那麼業務大廳就可能同時存在200個客戶和業務員了,大廳擠得都快遇上春運了。

 

ps : 上面這個小例子舉得並非很好,因此你們不要跟實際的知識點對號入座。好比說這裏有10個業務員,那麼10個業務員其實是不能同時進行服務的,由於你的電腦沒有10個cpu,只能是cpu不斷的在線程之間進行切換,只要它換的夠快,就能夠給每個客戶一種他一直都在爲我服務的感受。

 

認識線程池


 

線程池的輪子咱們已經不用本身造了,在jdk5版本以後,引入了Executor框架,用於管理線程。

Executor 框架包括:線程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable 等

先放一張Executor框架的部分類圖(下面這個類圖就是用idea自帶的工具作的,很是方便,有時間再寫一下它的用法):

 其中虛線箭頭指的是實現,實線箭頭指的是繼承

而本文中咱們須要瞭解的就是這個ThreadPoolExecutor 和 下面這個Executors了。

 

ThreadPoolExecutor:

從網上找了一個小例子,就是給一個集合添加2000個元素,咱們分紅兩個測試,一個測試是添加一次元素就建立一個線程,另一個測試是先建立好線程池,而後再添加。

不使用線程池版本:

//每一次添加操做都開一個線程
    public static void getTimeWithThread() {
        System.out.println("使用多線程測試 start");
        final List<Integer> list = new LinkedList<>();
        Random random = new Random();
        long start = System.currentTimeMillis();
Runnable target
= new Runnable() { @Override public void run() { list.add(random.nextInt()); } }; for (int i = 0; i < 20000 ; i++) { Thread thread = new Thread(target); thread.start(); try { thread.join(); } catch (InterruptedException e) { e.printStackTrace(); } } long end = System.currentTimeMillis(); long time = end - start; System.out.println("最終list的大小爲:" + list.size()); System.out.println("使用多線程測試 end, 用時:" + time + "ms\n"); }

例子比較簡單,咱們須要建立線程,這裏用的是實現Runnable接口的方式,而後爲了保證子線程執行完成以後主線程(main線程)才執行,咱們這裏使用了join方法。

那麼整個for循環的意思就是,我開啓一個線程,而後你的main線程得等我執行完以後才能開啓下一個線程繼續執行。

 

使用線程池版本:

    //使用線程池進行集合添加元素的操做
    public static void getTimeWithThreadPool() {
        System.out.println("使用線程池測試 start");
        final List<Integer> list = new LinkedList<>();
        Random random = new Random();
        long start = System.currentTimeMillis();
Runnable target
= new Runnable() { @Override public void run() { list.add(random.nextInt()); } }; ThreadPoolExecutor tp = new ThreadPoolExecutor(100, 100, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(20000)); for (int i = 0; i < 20000 ; i++) { tp.execute(target); } tp.shutdown(); try { tp.awaitTermination(1,TimeUnit.DAYS); } catch (InterruptedException e) { e.printStackTrace(); } long end = System.currentTimeMillis(); long time = end - start; System.out.println("最終list的大小爲:" + list.size()); System.out.println("使用線程池測試 end, 用時:" + time + "ms\n"); }

 使用線程池就比較簡單了,咱們只要先建立好線程池,而後向它提交任務就好了,具體的線程該怎麼操做,怎麼管理都不用咱們來操心。

execute() : 它是頂層接口Executor的一個方法(也是惟一一個),其實跟普通的建立線程執行run方法沒有太大的區別

shutdown(): 顧名思義是關閉線程池,它會將已經提交但尚未執行的任務執行完成以後再關閉線程池(什麼是提交?咱們後面再說)

至於ThreadPoolExecutor裏面那一大堆參數,咱們慢慢再來看。

 

 最後的測試結果:

 不用線程池的話,個人機器跑出來大概要8000ms左右(人家網上的例子測出來只要2000ms左右,這差距,是我電腦垃圾,仍是jvm沒設置好啊,以後再來看這個問題),使用線程池的話是180ms左右。

能夠看出來線程池相對於單純的使用線程來講的話做用是至關大的。

 

ps:這裏本身另外測試了一組,不使用線程直接添加,發現時間會快不少,這個問題其實還不是很是明白,多個線程一塊兒執行難道不是執行的更快嗎?暫時尚未得出結論,等更進一步的理解以後再寫一篇文章來進行分析。

 

ThreadPoolExecutor裏面那些參數都是幹嗎用的?

/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

 

 corePoolSize : 指定了線程池的線程數量

maximumPoolSize : 指定了線程池中的最大線程數量

keepAliveTime : 超過了corePoolSize時多餘線程的存活時間

unit : KeepAliveTime的時間單位

workQueue : 任務隊列,被提交但還沒有被執行的任務

threadFactory: 線程工廠,用於建立線程

handler : 拒絕策略,當任務太多來不及處理的時候,如何拒絕任務

 

corePoolSize 和 maximumPoolSize(這裏假設corePoolSize是5,maximumPoolSize是10 )

 線程池的工做原理是你來一個線程,我就幫你在線程池中新建立一個線程,建立了5個線程以後,再來一個線程,我就不是在第一時間去建立一個新的線程,而是把它加入到一個等待隊列中去,等線程池中有了空餘的線程再從隊列中拿一個出來進行 處理,等待隊列的容量是咱們一開始設置好的,若是等待隊列也滿了的話再去建立新的線程。

當線程池也滿了,等待隊列也滿了(線程池數量達到了maximumPoolSize)的時候就拒絕執行線程的任務,這就涉及到了拒絕的策略。

而通過一段時間以後發現,業務沒有那麼繁忙了,就不須要一直維持着10個線程,能夠清除掉一部分,以避免佔據多餘的空間。

 

keepAliveTime  和 unit

有了上面的結束,這個參數就比較好理解了,上面說線程滿了再通過一段時間以後就會被清除掉一部分線程,這個通過的時間就是有keepAliveTime 和 unit決定的

好比 keepAliveTime  = 1 ,unit = TimeUnit.Days  ,那麼就是通過一天以後再去清理線程池。

 

workQueue : 

咱們前面也提到了當線程的數量超過了coreSize以後會添加到一個等待隊列中去,這個隊列就是workQueue。workQueue 採用的是一個實現了BlockingQueue的接口的對象

work也分爲不一樣的幾種,採起不一樣的策略

  •  ArrayBlockingQueue(有界的任務隊列) :
pubic ArrayBlockingQueue( int capacity )

首先是最容易想到的,就是給等待隊列設置一個容量,超過這個容量以後再建立新的線程。

  •  SynchronousQueue該隊列沒有容量,每插入一個元素都要等待一個刪除操做,使用這個隊列的話,任務不會實際保存到隊列中去,會直接提交到線程池中,若是線程池尚未滿(還沒達到maximumPoolSize),則分配線程,不然執行拒絕策略。

 

  • LinkedBlockingQueue(無界的任務隊列) : 顧名思義,這個隊列是沒有界限的,就是說你能夠一直往隊列裏添加元素,直到內存資源被耗盡。

 

  • PriorityBlockingQueue(優先任務隊列,同時也是一個無界的任務隊列):能夠控制任務的優先級(優先級是經過實現Comparable接口實現的,具體可百度)

 

handler(拒絕策略):

當線程池和等待隊列都滿了以後,線程池就會拒絕執行新的任務了,那麼該怎麼拒絕呢,直接就說你走吧,哥們兒hold不住了嗎?顯然沒這麼簡單。。

AbortPlicy 策略 : 直接拋出異常,阻止系統正常工做

CallerRunsPolicy策略 : 只要線程池沒有關閉,就在調用者線程之中執行這個任務。好比說是主線程提交的這個任務,那我就直接在主線程之中執行這個任務。

DiscardOledestPolicy策略:該策略會丟掉最老的一個請求,也就是即將被執行的那個請求。並嘗試再次發起請求。

DiscardPolicy策略:直接丟,不作任何處理

以上策略都是經過實現RejectedExecutionHandler接口實現的,若是上述策略還沒法知足你的話,那麼你也能夠本身實現這個接口。

 

 

Executors:


 

介紹了基本的線程池以後就能夠介紹一些jdk爲咱們寫好的一些線程池了。

它由Executors類生成的。有如下幾種:

  • newFIxedThreadPool :固定大小線程池,大小固定,因此它不存在corePoolSize 和 maximumPoolSize ,而且使用無界隊列做爲等待隊列

 

  • newSingleThreadExecutor : 與newFixedThreadPool基本沒有什麼區別,可是數量只有1個線程

 

  • newCachedThreadPool :一個corePoolSize,maximumPoolSize無限大的線程池,也就是說沒有任務時,線程池中就沒有線程,任務被提交時會看線程池有有沒有空閒的線程,若是有的話,就交給它執行,若是沒有的話,就交給SynchronousQueue, 也就是說會直接交給線程池,而因爲maximumPoolSize是無限大的,因此它會再添加一個線程

 

  • newScheduledThreadPool :定時執行任務的線程池(能夠是延時執行,也能夠是週期性的執行任務)

 

  • newSingleThreadScheduledExecutor :與上面的線程池差很少,只不過線程池的大小爲1。

 

以newFixedThreadPool爲例展現一下它的使用方法。

package thread;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 展現Executors的簡單用法
 */
public class Lesson15_ThreadPool02 {
    public static void main(String[] args) {
        Runnable task = new Runnable() {
            @Override
            public void run() {
                System.out.println(System.currentTimeMillis() +
                        ": Thread Id : " + Thread.currentThread().getId());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        ExecutorService ex = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10 ; i++) {
            ex.submit(task);
        }
        ex.shutdown();
    }
}
1541580732981: Thread Id : 13
1541580732981: Thread Id : 14
1541580732981: Thread Id : 11
1541580732981: Thread Id : 12
1541580732981: Thread Id : 15
1541580733981: Thread Id : 13
1541580733981: Thread Id : 15
1541580733981: Thread Id : 12
1541580733981: Thread Id : 11
1541580733981: Thread Id : 14

 

關於定時線程池的兩個方法的區別:

  • scheduleAtFixedRate以給定的的週期執行任務,任務開始於給定的初始延時,通過period以後開始下一個任務

舉個例子,好比說,初始延時是1秒,period是5秒,任務實際的執行時間是2秒,那麼第一個任務開始執行的時間是1秒,再第二個任務執行的時間是6秒,你看跟任務的實際執行時間並無什麼關係。

可是這裏會有一個顯而易見的問題,按照上面的說法,若是個人任務執行時間是10秒怎麼辦,遠比period要大,那麼此時會等待上一個任務執行完成以後當即執行下一個任務,

你也能夠理解成period變成了8秒

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

 

  • scheduleWithFixedDelay這個方法則規定了上一個任務結束到下一個任務開始這之間的時間,仍是上面那個例子,只不過將period改爲delay仍是5秒,那麼第一個任務在第1秒開始執行,第2個任務在(1 + 2  + 5) = 8 時開始執行,也就是第一個任務執行完成以後再等5秒開始執行下一個任務。

 

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

 

 

以schedeleAtFixedRate爲例,簡單寫一下代碼的用法:

package thread;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * 這裏演示定時線程池的功能
 */
public class Lesson15_ThreadPool03 {
    public static void main(String[] args) {
        Runnable task = new Runnable() {
            @Override
            public void run() {
                System.out.println(System.currentTimeMillis()/1000 +
                        ": Thread Id : " + Thread.currentThread().getId());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        System.out.println(System.currentTimeMillis()/1000);
        ScheduledExecutorService ses = Executors.newScheduledThreadPool(5);

        ses.scheduleAtFixedRate(task,1,3,TimeUnit.SECONDS);

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        ses.shutdown();

    }

}

 

1541584928
1541584929: Thread Id : 11
1541584932: Thread Id : 11
1541584935: Thread Id : 12

Process finished with exit code 0

從28開始執行定時線程池的任務,1秒鐘(初始延時)以後開始執行第一個任務,以後每過三秒鐘執行下一個任務

這裏若是不關閉線程池的話,任務會一直執行下去。

 

線程池的分析暫時先到這裏,還有一部份內容,例如擴展線程池,如何決定線程池的線程數量,fork/join框架等。等認真讀過下一部分以後再繼續把線程池部分的筆記湊齊。

package thread;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* 這裏演示定時線程池的功能
*/
public class Lesson15_ThreadPool03 {
public static void main(String[] args) {
Runnable task = new Runnable() {
@Override
public void run() {
System.out.println(System.currentTimeMillis()/1000 +
": Thread Id : " + Thread.currentThread().getId());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
System.out.println(System.currentTimeMillis()/1000);
ScheduledExecutorService ses = Executors.newScheduledThreadPool(5);

ses.scheduleAtFixedRate(task,1,5,TimeUnit.SECONDS);}}
相關文章
相關標籤/搜索