《Java併發編程的藝術》筆記

《Java併發編程的藝術》java

方騰飛 魏鵬 程曉明 著算法

筆記:數據庫

第一章:併發編程的挑戰

一、上下文切換:編程

系統的多線程併發執行,就是CPU經過給每一個線程分配CPU時間片來實現的。當一個線程的時間用完了後,CPU會把這個線程調到就緒隊列上等待下次的執行,而後把CPU分配給其餘的線程,讓其餘的線程執行,可是在切換以前,會保存當前線程的狀態,以便下次能夠再加載這個任務的狀態,那麼任務從保存到加載的過程就是一次上下文切換。數組

上下文切換會影響多線程的執行速度。安全

 

二、如何減小上下文切換服務器

減小上下文切換的方法有:無鎖併發編程、CAS算法、使用最少線程。數據結構

·無鎖併發編程:多線程競爭鎖時,會引發上下文切換,因此多線程處理數據時,能夠用一些辦法來避免使用鎖,如將數據的ID按照Hash算法取模分段,不一樣的線程處理不一樣段的數據。多線程

·CAS算法。Java的Atomic包使用CAS算法來更新數據,而不須要加鎖。併發

·使用最少線程。避免建立不須要的線程,好比任務不多,可是建立了不少線程來處理,這樣會形成大量線程都處於等待狀態。

 

三、避免死鎖的幾個常見方法。

·避免一個線程同時獲取多個鎖。

·避免一個線程在鎖內同時佔用多個資源,儘可能保證每一個鎖只佔用一個資源。

·嘗試使用定時鎖,使用lock.tryLock(timeout)來替代使用內部鎖機制。

·對於數據庫鎖,加鎖和解鎖必須在一個數據庫鏈接裏,不然會出現解鎖失敗的狀況。

 

第2章 Java併發機制的底層實現原理

一、volatile的應用

volatile是輕量級的線程同步機制,它在多線程中保證了共享變量的「可見性」。可見性的意思是當一個線程修改一個共享變量時,另一個線程能讀到這個修改的值。若是volatile變量修飾符使用恰當的話,它比synchronized的使用和執行成本更低,由於它不會引發線程上下文的切換和調度。

PS:何時使用volation變量,請參考:Brian Goetz的文章《Java理論與實踐:正確使用Volatile變量》

 

二、併發編程模型的兩個關鍵問題

在併發編程中,須要處理兩個關鍵問題:線程之間如何通訊及線程之間如何同步, 線程之間的通訊機制有兩種:共享內存和消息傳遞。

在共享內存的併發模型裏,線程之間共享程序的公共狀態,經過寫-讀內存中的公共狀態進行隱式通訊。在消息傳遞的併發模型裏,線程之間沒有公共狀態,線程之間必須經過發送消息來顯式進行通訊.

Java的併發採用的是共享內存模型來實現線程之間的通訊。

若是線程A與線程B之間要通訊的話,必需要經歷下面2個步驟。

1)線程A把本地內存A中更新過的共享變量刷新到主內存中去。

2)線程B到主內存中去讀取線程A以前已更新過的共享變量。

 

三、重排序

重排序是指編譯器和處理器爲了優化程序性能而對指令序列進行從新排序的一種手段。

從Java源代碼到最終實際執行的指令序列,會分別經歷下面3種重排序

源代碼----->1.編譯器優化重排序----->2.指令級重排序---->3.內存系統重排序-->最終執行的指令序列

 

第4章 Java併發編程基礎

一、什麼是線程

現代操做系統在運行一個程序時,會爲其建立一個進程。例如,啓動一個Java程序,操做系統就會建立一個Java進程。現代操做系統調度的最小單元是線程,也叫輕量級進程,在一個進程裏能夠建立多個線程,這些線程都擁有各自的計數器、棧和局部變量等屬性,而且可以訪問共享的堆內存變量。處理器在這些線程上高速切換,讓使用者感受到這些線程在同時執行。

 

二、線程的優先級:

現代操做系統基本採用分時的形式調度運行的線程,操做系統會爲每一個線程分配時間片,當線程的時間片用完了就會發生線程調度,並等待着下次分配。線程分配到的時間片多少也就決定了線程使用處理器資源的多少,而線程優先級就是決定線程須要多或者少分配一些處理器資源的線程屬性。

在Java線程中,經過一個整型成員變量priority來控制優先級,優先級的範圍從1~10,在線程構建的時候能夠經過setPriority(int)方法來修改優先級,默認優先級是5,優先級高的線程分配時間片的數量要多於優先級低的線程。設置線程優先級的時候,針對頻繁阻塞的線程須要設置較高優先級,而須要較多CPU時間的線程則設置較低的優先級,確保處理器不會被獨佔。

 

三、線程的狀態:

java中線程有6種狀態,在任意時刻,只能處於其中的一種狀態。

Java線程中將操做系統中的運行和就緒兩個狀態合併稱爲運行狀態。

(1). new---初始狀態,線程被建立,可是尚未調用start()方法。

(2). runnable---運行狀態,調用start()方法,執行run()中的代碼。

(3). blocked---阻塞狀態,表示線程阻塞於鎖。

(4). waiting---等待狀態。進入這個狀態的線程須要其餘線程來喚醒或中斷。

(5). time_waiting---超時等待,它是能夠在指定的時間內自動返回。

(6). Terminated---終止狀態,表示當前線程已經執行完畢。

各個狀態之間的轉化圖:


說明:

線程建立以後,調用start()方法開始運行。當線程執行wait() 方法以後,線程進入等待狀態。進入等待狀態的線程須要依靠其餘線程的通知纔可以返回到運行狀

態,而超時等待狀態至關於在等待狀態的基礎上增長了超時限制,也就是超時時間到達時將會自動返回到運行狀態。當線程調用同步方法時,在沒有獲取到鎖的狀況下,線程將會進入到阻塞狀態。線程在執行完Runnable的run()方法以後將會進入到終止狀態。

 

四、理解中斷

中斷能夠理解爲線程的一個標識位屬性,它表示一個運行中的線程是否被其餘線程進行了中斷操做。中斷比如其餘線程對該線程打了個招呼,其餘線程經過調用該線程的interrupt() 方法對其進行中斷操做的。

線程經過檢查自身是否被中斷來進行響應,線程經過方法isInterrupted()來進行判斷是否被中斷,也能夠調用靜態方法Thread.interrupted()對當前線程的中斷標識位進行復位。若是該線程已經處於終結狀態,即便該線程被中斷過,在調用該線程對象的isInterrupted()時依舊會返回false。

 

五、過時的suspend()、resume()和stop()

suspend()、resume()和stop() 方法對應於線程的暫停,恢復和終止。

在調用suspend()後,線程不會釋放已經佔有的資源(好比鎖),而是佔有着資源進入睡眠狀態,這樣容易引起死鎖問題。

一樣,stop()方法在中止一個線程時不會保證線程的資源正常釋放,一般是沒有機會讓線程執行資源釋放工做,所以會致使程序可能工做在不肯定狀態下

 

 

安全地終止線程:

可使用中斷來安全的終止線程,還能夠利用一個boolean變量來控制是否須要中止任務並終止該線程。

 

六、線程間通訊

方法一:使用volatile和synchronized關鍵字

Java支持多個線程同時訪問一個對象或者對象的成員變量,因爲每一個線程能夠擁有這個變量的拷貝,因此程序在執行過程當中,一個線程看到的變量並不必定是最新的。

關鍵字volatile能夠用來修飾成員變量,就是告知程序任何對該變量的訪問均須要從共享內存中獲取,而對它的改變必須同步刷新回共享內存,它能保證全部線程對變量訪問的可見性。也就是說全部線程都能讀取到該變量的最新值。

關鍵字synchronized能夠修飾方法或者以同步塊的形式來進行使用,它主要確保多個線程在同一個時刻,只能有一個線程處於方法或者同步塊中,它保證了線程對變量訪問的可見性和排他性

方法二:等待/通知機制

也就是使用Object中wait(),notify()和notifyAll()方法來實現線程的通訊,待/通知機制,是指一個線程A調用了對象O的wait()方法進入等待狀態,而另外一個線程B調用了對象O的notify()或者notifyAll()方法,線程A收到通知後從對象O的wait()方法返回,進而執行後續操做。上述兩個線程經過對象O來完成交互,而對象上的wait()和notify/notifyAll()的關係就如同開關信號同樣,用來完成等待方和通知方之間的交互工做。

方法三:信號量機制

經過java.util.concurrent.locks包中Condition接口中的await()和signal()方法來實現線程的通訊。

方法四:管道機制

管道輸入/輸出流主要包括了以下4種具體實現:

PipedOutputStream、PipedInputStream、PipedReader和PipedWriter,前兩種面向字節,然後兩種面向字符。

 

七、Thread.join()的使用

若是一個線程A執行了B.join()語句,其含義是:當前線程A等待B線程終止以後才能繼續執行。線程Thread除了提供join()方法以外,還提供了join(long millis)和join(longmillis,int nanos)兩個具有超時特性的方法。這兩個超時方法表示,若是線程thread在給定的超時時間裏沒有終止,那麼將會從該超時方法中返回。

join()方法能夠用來實現線程的順序執行。

 

第5章 Java中的鎖

一、Lock接口提供的synchronized關鍵字不具有的主要特性


二、重入鎖

重入鎖ReentrantLock,它表示該鎖可以支持一個線程對資源的重複加鎖。除此以外,該鎖的還支持獲取鎖時的公平和非公平性選擇。

ReentrantLock在調用lock()方法時,已經獲取到鎖的線程,可以再次調用lock()方法獲取鎖而不被阻塞。

公平鎖:就是對鎖的獲取是按照請求的前後順序來知足的。也就是FIFO

不公平鎖:就是不按照前後順序來獲取鎖。

ReentrantLock提供了一個構造函數,可以控制鎖是不是公平的。

公平鎖可以減小「飢餓」發生的機率,等待越久的請求越是可以獲得優先知足。

可是公平鎖的效率沒有非公平鎖的效率高。由於公平鎖的獲取是FIFO原則,其代價是進行大量的線程切換來保證的。非公平性鎖雖然可能形成線程「飢餓」,但不多的線程切換,保證了更大的吞吐量

 

三、讀寫鎖

重入鎖ReentrantLock是排他鎖,這些鎖在同一時刻只容許一個線程進行訪問,而讀寫鎖在同一時刻能夠容許多個讀線程訪問,可是在寫線程訪問時,全部的讀

線程和其餘寫線程均被阻塞。java分別爲讀和寫提供了鎖。

 

四、Condition接口

Condition定義了等待/通知兩種類型的方法,當前線程調用這些方法時,須要提早獲取到Condition對象關聯的鎖。Condition對象是由Lock對象(調用Lock對象的newCondition()方法)建立出來的,換句話說,Condition是依賴Lock對象的。

 

第6章 Java併發容器和框架

一、ConcurrentHashMap的實現原理與使用

ConcurrentHashMap是線程安全且高效的HashMap。

(1)線程不安全的HashMap

在多線程環境下,使用HashMap進行put操做會引發死循環,致使CPU利用率接近100%,由於多線程會致使HashMap的Entry鏈表造成環形數據結構,一旦造成環形數據結構,Entry的next節點永遠不爲空,就會產生死循。因此在併發狀況下不能使用HashMap。

(2)效率低下的HashTable

HashTable容器使用synchronized來保證線程安全,但在線程競爭激烈的狀況下HashTable的效率很是低下。由於當一個線程訪問HashTable的同步方法,其餘線程也訪問HashTable的同步方法時,會進入阻塞或輪詢狀態,好比線程1使用put進行元素添加,線程2不但不能使用put方法添加元素,也不能使用get方法來獲取元素,因此競爭越激烈效率越低。

(3)ConcurrentHashMap的鎖分段技術可有效提高併發訪問率

HashTable容器在競爭激烈的併發環境下表現出效率低下的緣由是全部訪問HashTable的線程都必須競爭同一把鎖,假如容器裏有多把鎖,每一把鎖用於鎖容器其中一部分數據,那麼當多線程訪問容器裏不一樣數據段的數據時,線程間就不會存在鎖競爭,從而能夠有效提升併發訪問效率,這就是ConcurrentHashMap所使用的鎖分段技術。首先將數據分紅一段一段地存儲,而後給每一段數據配一把鎖,當一個線程佔用鎖訪問其中一個段數據的時候,其餘段的數據也能被其餘線程訪問。

 

二、ConcurrentHashMap的結構

ConcurrentHashMap是由Segment數組結構和HashEntry數組結構組成。Segment是一種可重入鎖(ReentrantLock),在ConcurrentHashMap裏扮演鎖的角色;HashEntry則用於存儲鍵值對數據。

 

三、ConcurrentHashMap的操做

get操做:

get操做實現很是簡單和高效。高效之處在於整個get過程不須要加鎖,除非讀到的值是空纔會加鎖重讀。而HashTable容器的get方法是須要加鎖的;緣由是ConcurrentHashMap的get方法裏將要使用的共享變量都定義成volatile類型

可以被多線程同時讀,而且保證不會讀到過時的值。

put操做:

因爲put方法裏須要對共享變量進行寫入操做,因此爲了線程安全,在操做共享變量時必須加鎖。

 

四、Fork/Join框架

Fork/Join框架是Java 7提供的一個用於並行執行任務的框架,是一個把大任務分割成若干個小任務,最終彙總每一個小任務結果後獲得大任務結果的框架。

底層算法:工做竊取算法

工做竊取算法是指某個線程從其餘隊列裏竊取任務來執行。那麼,爲何須要使用工做竊取算法呢?假如咱們須要作一個比較大的任務,能夠把這個任務分割爲若干互不依賴的子任務,爲了減小線程間的競爭,把這些子任務分別放到不一樣的隊列裏,併爲每一個隊列建立一個單獨的線程來執行隊列裏的任務,線程和隊列一一對應。好比A線程負責處理A隊列裏的任務。可是,有的線程會先把本身隊列裏的任務幹完,而其餘線程對應的隊列裏還有任務等待處理。幹完活的線程就去其餘線程的隊列裏竊取一個任務來執行。而在這時它們會訪問同一個隊列,因此爲了減小竊取任務線程和被竊取任務線程之間的競爭,一般會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。

工做竊取算法的優勢:充分利用線程進行並行計算,減小了線程間的競爭。

工做竊取算法的缺點:在某些狀況下仍是存在競爭,好比雙端隊列裏只有一個任務時。而且該算法會消耗了更多的系統資源,好比建立多個線程和多個雙端隊列。

 

五、Fork/Join框架的設計

步驟1:分割任務。首先咱們須要有一個fork類來把大任務分割成子任務,有可能子任務仍是很大,因此還須要不停地分割,直到分割出的子任務足夠小。

步驟2:執行任務併合並結果。分割的子任務分別放在雙端隊列裏,而後幾個啓動線程分別從雙端隊列裏獲取任務執行。子任務執行完的結果都統一放在一個隊列裏,啓動一個線程從隊列裏拿數據,而後合併這些數據。

 

 

第7章 Java中的13個原子操做類

Java從JDK 1.5開始提供了java.util.concurrent.atomic包,這個包中的原子操做類提供了一種用法簡單、性能高效、線程安全地更新一個變量的方式。

由於變量的類型有不少種,因此在Atomic包裏一共提供了13個類,屬於4種類型的原子更新方式,分別是原子更新基本類型、原子更新數組、原子更新引用和原子更新屬性(字段)。

一、原子更新基本類型類

AtomicBoolean:原子更新布爾類型。

AtomicInteger:原子更新整型。

AtomicLong:原子更新長整型。

經常使用方法以下。

·int addAndGet(int delta):以原子方式將輸入的數值與實例中的(AtomicInteger裏的value)相加,並返回結果。

·boolean compareAndSet(int expect,int update):若是輸入的數值等於預期值,則以原子方式將該值設置爲輸入的值。

·int getAndIncrement():以原子方式將當前值加1,注意,這裏返回的是自增前的值。

·void lazySet(int newValue):最終會設置成newValue,使用lazySet設置值後,可能致使其餘線程在以後的一小段時間內仍是能夠讀到舊的值。

int getAndSet(int newValue):以原子方式設置爲newValue的值,並返回舊值。

Atomic包提供了3種基本類型的原子更新,可是Java的基本類型裏還有char、float和double等。那麼問題來了,如何原子的更新其餘的基本類型呢?

看AtomicBoolean源碼,發現它是先把Boolean轉換成整型,再進行更新,因此原子更新char、float和double變量也能夠用相似的思路來實現。

AtomicBoolean的部分源碼:

   public final void set(boolean newValue) {

        value = newValue ? 1 : 0;

    }

   public final void lazySet(boolean newValue) {

        int v = newValue ? 1 : 0;

        unsafe.putOrderedInt(this, valueOffset, v);

    }

 

二、原子更新數組

經過原子的方式更新數組裏的某個元素。

·AtomicIntegerArray:原子更新整型數組裏的元素。

·AtomicLongArray:原子更新長整型數組裏的元素。

·AtomicReferenceArray:原子更新引用類型數組裏的元素。

三、原子更新引用類型

原子更新基本類型的AtomicInteger,只能更新一個變量,若是要原子更新多個變量,就須要使用這個原子更新引用類型提供的類。

·AtomicReference:原子更新引用類型。

·AtomicReferenceFieldUpdater:原子更新引用類型裏的字段。

 

四、原子更新字段類

若是需原子地更新某個類裏的某個字段時,就須要使用原子更新字段類,Atomic包提供瞭如下3個類進行原子字段更新。

·AtomicIntegerFieldUpdater:原子更新整型的字段的更新器。

·AtomicLongFieldUpdater:原子更新長整型字段的更新器。

·AtomicStampedReference:原子更新帶有版本號的引用類型。

原子地更新字段類須要兩步。第一步,由於原子更新字段類都是抽象類,每次使用的時候必須使用靜態方法newUpdater()建立一個更新器,而且須要設置想要更新的類和屬性。第二步,更新類的字段(屬性)必須使用public volatile修飾符。

 

第8章 Java中的併發工具類 java.util.concurrent

在JDK的併發包裏提供了幾個很是有用的併發工具類。CountDownLatch、CyclicBarrier和Semaphore工具類提供了一種併發流程控制的手段,Exchanger工具類則提供了在線程間交換數據的一種手段。

 

一、等待多線程完成的CountDownLatch

CountDownLatch容許一個或多個線程等待其餘線程完成操做。能夠用CountDownLatch來實現Join的操做。而且比Join的功能更多。

CountDownLatch的構造函數接收一個int類型的參數做爲計數器,若是你想等待N個點完成,這裏就傳入N。 當咱們調用CountDownLatch的countDown方法時,N就會減1,CountDownLatch的await方法會阻塞當前線程,直到N變成零。

 

二、同步屏障CyclicBarrier

讓一組線程到達一個屏障時被阻塞,直到最後一個線程到達屏障時,屏障纔會

開門,全部被屏障攔截的線程纔會繼續運行。

CyclicBarrier默認的構造方法是CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每一個線程調用await方法告訴CyclicBarrier我已經到達了屏障,而後當前線程被阻塞

CyclicBarrier的應用場景

CyclicBarrier能夠用於多線程計算數據,最後合併計算結果的場景。

 

三、CyclicBarrier和CountDownLatch的區別

CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可使用reset()方法重置。因此CyclicBarrier能處理更爲複雜的業務場景。例如,若是計算髮生錯誤,能夠重置計數器,並讓線程從新執行一次。

 

四、控制併發線程數的Semaphore

Semaphore(信號量)是用來控制同時訪問特定資源的線程數量,它經過協調各個線程,以保證合理的使用公共資源。

應用場景:

Semaphore能夠用於作流量控制,特別是公用資源有限的應用場景,好比數據庫鏈接。

五、線程間交換數據的Exchanger

Exchanger用於進行線程間的數據交換。它提供一個同步點,在這個同步點,兩個線程能夠交換彼此的數據。這兩個線程經過exchange方法交換數據,若是第一個線程先執行exchange()方法,它會一直等待第二個線程也執行exchange方法,當兩個線程都到達同步點時,這兩個線程就能夠交換數據,將本線程生產出來的數據傳遞給對方。

Exchanger的應用場景。

Exchanger能夠用於遺傳算法,遺傳算法裏須要選出兩我的做爲交配對象,這時候會交換兩人的數據,並使用交叉規則得出2個交配結果。

Exchanger也能夠用於校對工做。

 

第9章 Java中的線程池(末尾有線程池的簡單實現)

對於服務端的程序,常常面對的是客戶端傳入的短小(執行時間短、工做內容較爲單一)任務,須要服務端快速處理並返回結果。若是服務端每次接受到一個任務,建立一個線程,而後進行執行,執行完後銷燬線程,那麼在面對成千上萬的任務遞交進服務器時,若是仍是採用一個任務一個線程的方式,那麼將會建立不少的線程,這會使操做系統頻繁的進行線程上下文切換,無端增長系統的負載,而線程的建立和消亡都是須要耗費系統資源的,也無疑浪費了系統資源。

線程池技術可以很好地解決這個問題,它預先建立了若干數量的線程,而且不能由用戶直接對線程的建立進行控制,在這個前提下重複使用固定或較爲固定數目的線程來完成任務的執行。這樣作的好處是,一方面,消除了頻繁建立和消亡線程的系統資源開銷。

 

一、在開發過程當中,合理地使用線程池可以帶來3個好處。

第一:下降資源消耗。經過重複利用已建立的線程下降線程建立和銷燬形成的消耗。

第二:提升響應速度。當任務到達時,任務能夠不須要等到線程建立就能當即執行。

第三:提升線程的可管理性。線程是稀缺資源,若是無限制地建立,不只會消耗系統資源,還會下降系統的穩定性,使用線程池能夠進行統一分配、調優和監控。

 

二、線程池的實現原理

線程池的主要處理流程:


1)線程池判斷核心線程池裏的線程是否都在執行任務。若是不是,則建立一個新的工做線程來執行任務。若是核心線程池裏的線程都在執行任務,則進入下個流程。

2)線程池判斷工做隊列是否已經滿。若是工做隊列沒有滿,則將新提交的任務存儲在這個工做隊列裏。若是工做隊列滿了,則進入下個流程。

3)線程池判斷線程池的線程是否都處於工做狀態。若是沒有,則建立一個新的工做線程來執行任務。若是已經滿了,則交給飽和策略來處理這個任務。

三、線程池的建立能夠經過ThreadPoolExecutor來建立一個線程池。

new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,

milliseconds,runnableTaskQueue, handler);

參數:

1)corePoolSize(線程池的基本大小):當提交一個任務到線程池時,線程池會建立一個線程來執行任務,即便其餘空閒的基本線程可以執行新任務也會建立線程,等到須要執行的任務數大於線程池基本大小時就再也不建立。

2)runnableTaskQueue(任務隊列):用於保存等待執行的任務的阻塞隊列。能夠選擇如下幾個阻塞隊列。

·ArrayBlockingQueue:是一個基於數組結構的有界阻塞隊列,此隊列按FIFO(先進先出)原則對元素進行排序。

·LinkedBlockingQueue:一個基於鏈表結構的阻塞隊列,此隊列按FIFO排序元素,吞吐量一般要高於ArrayBlockingQueue。

·SynchronousQueue:一個不存儲元素的阻塞隊列。每一個插入操做必須等到另外一個線程調用移除操做,不然插入操做一直處於阻塞狀態,吞吐量一般要高於Linked-BlockingQueue

·PriorityBlockingQueue:一個具備優先級的無限阻塞隊列。、

3)maximumPoolSize(線程池最大數量):線程池容許建立的最大線程數。若是隊列滿了,而且已建立的線程數小於最大線程數,則線程池會再建立新的線程執行任務。

4)keepAliveTime(線程活動保持時間):線程池的工做線程空閒後,保持存活的時間。因此,若是任務不少,而且每一個任務執行的時間比較短,能夠調大時間,提升線程的利用率。

5)RejectedExecutionHandler(飽和策略):當隊列和線程池都滿了,說明線程池處於飽和狀態,那麼必須採起一種策略處理提交的新任務。這個策略默認狀況下是AbortPolicy,表示沒法處理新任務時拋出異常。

 

四、向線程池提交任務

可使用兩個方法向線程池提交任務,分別爲execute()和submit()方法。

execute()方法用於提交不須要返回值的任務,因此沒法判斷任務是否被線程池執行成功。

submit()方法用於提交須要返回值的任務。線程池會返回一個future類型的對象,經過這個future對象能夠判斷任務是否執行成功,而且能夠經過future的get()方法來獲取返回值,get()方法會阻塞當前線程直到任務完成

 

五、合理地配置線程池

要想合理地配置線程池,就必須首先分析任務特性,能夠從如下幾個角度來分析。

·任務的性質:CPU密集型任務、IO密集型任務和混合型任務。

·任務的優先級:高、中和低。

·任務的執行時間:長、中和短。

·任務的依賴性:是否依賴其餘系統資源,如數據庫鏈接。

CPU密集型任務應配置儘量小的線程,如配置N(cpu)+1個線程的線程池。

因爲IO密集型任務線程並非一直在執行任務,則應配置儘量多的線程,如2*N(cpu)。

混合型的任務,若是能夠拆分,將其拆分紅一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那麼分解後執行的吞吐量將高於串行執行的吞吐量。若是這兩個任務執行時間相差太大,則不必進行分解。

 

優先級不一樣的任務可使用優先級隊列PriorityBlockingQueue來處理。它可讓優先級高的任務先執行。

 

執行時間不一樣的任務能夠交給不一樣規模的線程池來處理,或者可使用優先級隊列,讓執行時間短的任務先執行。

 

依賴數據庫鏈接池的任務,由於線程提交SQL後須要等待數據庫返回結果,等待的時間越長,則CPU空閒時間就越長,那麼線程數應該設置得越大,這樣才能更好地利用CPU。

 

第10章 Executor框架

在Java中,使用線程來異步執行任務。Java線程的建立與銷燬須要必定的開銷,若是咱們爲每個任務建立一個新線程來執行,這些線程的建立與銷燬將消耗大量的計算資源。

從JDK 5開始,把工做單元與執行機制分離開來。工做單元包括Runnable和Callable,而執行機制由Executor框架提供

 

一、Executor框架的兩級調度

在上層,Java多線程程序一般把應用分解爲若干個任務,而後使用用戶級的調度器(Executor框架)將這些任務映射爲固定數量的線程;在底層,操做系統內核將這些線程映射到硬件處理器上。

圖:

二、Executor框架的結構

Executor框架主要由3大部分組成以下。

·任務。包括被執行任務須要實現的接口:Runnable接口或Callable接口。

·任務的執行。包括任務執行機制的核心接口Executor,以及繼承自Executor的

ExecutorService接口。

·異步計算的結果。包括接口Future和實現Future接口的FutureTask類。

 

三、ThreadPoolExecutor詳解

主要由下列4個組件構成。

·corePool:核心線程池的大小。

·maximumPool:最大線程池的大小。

·BlockingQueue:用來暫時保存任務的工做隊列。

·RejectedExecutionHandler:當ThreadPoolExecutor已經關閉ThreadPoolExecutor已經飽和時(達到了最大線程池大小且工做隊列已滿),execute()方法將要調用的Handler。

·經過Executor框架的工具類Executors,能夠建立3種類型的ThreadPoolExecutor。

·FixedThreadPool。

·SingleThreadExecutor。

·CachedThreadPool。

 

四、FixedThreadPool詳解

FixedThreadPool被稱爲可重用固定線程數的線程池。

1)若是當前運行的線程數少於corePoolSize,則建立新線程來執行任務。

2)在線程池完成預熱以後(當前運行的線程數等於corePoolSize),將任務加入LinkedBlockingQueue。

3)線程執行完本身的任務後,會反覆從LinkedBlockingQueue獲取任務來執行。

FixedThreadPool使用無界隊列LinkedBlockingQueue做爲線程池的工做隊列

 

 

 

五、SingleThreadExecutor詳解

SingleThreadExecutor是使用單個worker線程的Executor。SingleThreadExecutor的corePoolSize和maximumPoolSize被設置爲1。SingleThreadExecutor使用無界隊列LinkedBlockingQueue做爲線程池的工做隊列

 

六、CachedThreadPool詳解

CachedThreadPool是一個會根據須要建立新線程的線程池。CachedThreadPool的corePoolSize被設置爲0,即corePool爲空;maximumPoolSize被設置爲

Integer.MAX_VALUE,即maximumPool是無界的。這裏把keepAliveTime設置爲60L,意味着CachedThreadPool中的空閒線程等待新任務的最長時間爲60秒,空閒線程超過60秒後將會被終止。

FixedThreadPool和SingleThreadExecutor使用無界隊列LinkedBlockingQueue做爲線程池的工做隊列。CachedThreadPool使用沒有容量的SynchronousQueue做爲線程池的工做隊列,但CachedThreadPool的maximumPool是無界的。這意味着,若是主線程提交任務的速度高於maximumPool中線程處理任務的速度時,CachedThreadPool會不斷建立新線程。極端狀況下,CachedThreadPool會由於建立過多線程而耗盡CPU和內存資源。

 

七、ScheduledThreadPoolExecutor詳解

ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor。它主要用來在給定的延遲以後運行任務,或者按期執行任務。ScheduledThreadPoolExecutor的功能與Timer相似,但ScheduledThreadPoolExecutor功能更強大、更靈活。Timer對應的是單個後臺線程,而ScheduledThreadPoolExecutor能夠在構造函數中指定多個對應的後臺線程數。

 

八、FutureTask詳解

一、Future接口和實現Future接口的FutureTask類,表明異步計算的結果。

FutureTask除了實現Future接口外,還實現了Runnable接口。所以,FutureTask能夠交給Executor執行,也能夠由調用線程直接執行(FutureTask.run())。

FutureTask能夠處於下面3種狀態。

1)未啓動。FutureTask.run()方法尚未被執行以前,FutureTask處於未啓動狀態。

2)已啓動。FutureTask.run()方法被執行的過程當中,FutureTask處於已啓動狀態。

3)已完成。FutureTask.run()方法執行完後正常結束,或被取消(FutureTask.cancel(…)),或執行FutureTask.run()方法時拋出異常而異常結束,FutureTask處於已完成狀態。

 

當FutureTask處於未啓動或已啓動狀態時,執行FutureTask.get()方法將致使調用線程阻塞;

當FutureTask處於已完成狀態時,執行FutureTask.get()方法將致使調用線程當即返回結果或拋出異常。

當FutureTask處於未啓動狀態時,執行FutureTask.cancel()方法將致使此任務永遠不會被執行;

當FutureTask處於已啓動狀態時,執行FutureTask.cancel(true)方法將以中斷執行此任務線程的方式來試圖中止任務;

當FutureTask處於已啓動狀態時,執行FutureTask.cancel(false)方法將不會對正在執行此任務的線程產生影響(讓正在執行的任務運行完成);

當FutureTask處於已完成狀態時,執行FutureTask.cancel(…)方法將返回false。get方法和cancel方法的執行示意圖


二、FutureTask的使用

當一個線程須要等待另外一個線程把某個任務執行完後它才能繼續執行,此時可使用FutureTask。

 

線程池技術的簡單實現

下面先看一個簡單的線程池接口定義:

public interface ThreadPool<Job extends Runnable> {

     void execute(Job job); // 執行一個Job,這個Job須要實現Runnable

     void shutdown();// 關閉線程池

     void addWorkers(int num); // 增長工做者線程

     void removeWorker(int num); // 減小工做者線程

     int getJobSize();// 獲得正在等待執行的任務數量

}

線程池的簡單實現:

public class ThreadPoolImpl<Job extends Runnable> implements ThreadPool<Job>{
    // 線程池最大限制數
    private static final int MAX_WORKER_NUMBERS = 10;
    // 線程池默認的數量
    private static final int DEFAULT_WORKER_NUMBERS = 5;
    // 線程池最小的數量
    private static final int MIN_WORKER_NUMBERS = 1;
    // 這是一個工做列表,將會向裏面插入工做
    private final LinkedList<Job> jobs = new LinkedList<Job>();
    // 工做者列表
    private final List<Worker> workers = Collections.synchronizedList(new
                                       ArrayList<Worker>());
    // 工做者線程的數量
    private int workerNum = DEFAULT_WORKER_NUMBERS;
    // 線程編號生成
    private AtomicLong threadNum = new AtomicLong();

    public ThreadPoolImpl () {
        initializeWokers(DEFAULT_WORKER_NUMBERS);
    }

    public ThreadPoolImpl (int num) {
        workerNum = num > MAX_WORKER_NUMBERS MAX_WORKER_NUMBERS : num < MIN_WORKER_
        NUMBERS MIN_WORKER_NUMBERS : num;
        initializeWokers(workerNum);
 
    }

    public void execute(Job job) {
        if (job != null) {
            // 添加一個工做,而後進行通知
           synchronized (jobs) {
           jobs.addLast(job);
            jobs.notify();
        }
    }

    public  void  shutdown() {
        for (Worker worker : workers) {
           worker.shutdown();
         }
     }

    public void addWorkers(int num) {
        synchronized (jobs) {
         // 限制新增的Worker數量不能超過最大值
         if (num + this.workerNum > MAX_WORKER_NUMBERS) {
             num = MAX_WORKER_NUMBERS - this.workerNum;
          }

        initializeWokers(num);
        this.workerNum += num;
      }
   }


    public void removeWorker(int num) {
       synchronized (jobs) {
          if (num >= this.workerNum) {
               throw new IllegalArgumentException("beyond workNum");
           }
           // 按照給定的數量中止Worker
           int count = 0;
           while (count < num) {
                 Worker worker = workers.get(count)
                 if (workers.remove(worker)) {
                    worker.shutdown();
                    count++;
                 }
            }
           this.workerNum -= count;
         }
      }
   }


    public int getJobSize() {
        return jobs.size();
    }

   // 初始化線程工做者
    private void initializeWokers(int num) {
        for (int i = 0; i < num; i++) {
           Worker worker = new Worker();
           workers.add(worker);
           Thread thread =new Thread(worker, "ThreadPool-Worker-" + threadNum.
                          incrementAndGet());
            thread.start();
        }
     }

      //工做者,負責消費任務
    class Worker implements Runnable {
       // 是否工做
        private volatile boolean running = true;
           public void run() {
           while (running) {
              Job job = null;
              synchronized (jobs) {
                   // 若是工做者列表是空的,那麼就wait
                  while (jobs.isEmpty()) {
                     try {
                         jobs.wait();
                     } catch (InterruptedException ex) {
                      // 感知到外部對WorkerThread的中斷操做,返回
                           Thread.currentThread().interrupt();
                           return;
                      }
                  }
                 // 取出一個Job
                 job = jobs.removeFirst();
              }
             if (job != null) {
                try {
                   job.run();
               } catch (Exception ex) {
                  // 忽略Job執行中的Exception
              }
           }      
       }
    }

     public void shutdown() {
        running = false;
     }

從線程池的實現能夠看到,當客戶端調用execute(Job)方法時,會不斷地向任務列表jobs中添加Job,而每一個工做者線程會不斷地從jobs上取出一個Job進行執行,當jobs爲空時,工做者線程進入等待狀態。

添加一個Job後,對工做隊列jobs調用了其notify()方法,而不是notifyAll()方法,由於可以肯定有工做者線程被喚醒,這時使用notify()方法將會比notifyAll()方法得到更小的開銷(避免將等待隊列中的線程所有移動到阻塞隊列中)。

能夠看到,線程池的本質就是使用了一個線程安全的工做隊列鏈接工做者線程和客戶端線程,客戶端線程將任務放入工做隊列後便返回,而工做者線程則不斷地從工做隊列上取出工做並執行。當工做隊列爲空時,全部的工做者線程均等待在工做隊列上,當有客戶端提交了一個任務以後會通知任意一個工做者線程,隨着大量的任務被提交,更多的工做者線程會被喚醒。

相關文章
相關標籤/搜索