答: ① sleep()方法給其餘線程運行機會時不考慮線程的優先級,所以會給低優先級的線程以運行的機會;yield()方法只會給相同優先級或更高優先級的線程以運行的機會; ② 線程執行sleep()方法後轉入阻塞(blocked)狀態,而執行yield()方法後轉入就緒(ready)狀態; ③ sleep()方法聲明拋出InterruptedException,而yield()方法沒有聲明任何異常; ④ sleep()方法比yield()方法(跟操做系統CPU調度相關)具備更好的可移植性。java
答:node
答:若是系統中存在臨界資源(資源數量少於競爭資源的線程數量的資源),例如正在寫的數據之後可能被另外一個線程讀到,或者正在讀的數據可能已經被另外一個線程寫過了,那麼這些數據就必須進行同步存取(數據庫操做中的排他鎖就是最好的例子)。當應用程序在對象上調用了一個須要花費很長時間來執行的方法,而且不但願讓程序等待方法的返回時,就應該使用異步編程,在不少狀況下采用異步途徑每每更有效率。事實上,所謂的同步就是指阻塞式操做,而異步就是非阻塞式操做。程序員
當run() 或者 call() 方法執行完的時候線程會自動結束,若是要手動結束一個線程,你能夠用volatile 布爾變量來退出run()方法的循環或者是取消任務來中斷線程。面試
使用自定義的標誌位決定線程的執行狀況算法
public class SafeStopThread implements Runnable{
private volatile boolean stop=false;//此變量必須加上volatile
int a=0;
@Override
public void run() {
// TODO Auto-generated method stub
while(!stop){
synchronized ("") {
a++;
try {
Thread.sleep(100);
} catch (Exception e) {
// TODO: handle exception
}
a--;
String tn=Thread.currentThread().getName();
System.out.println(tn+":a="+a);
}
}
//線程終止
public void terminate(){
stop=true;
}
public static void main(String[] args) {
SafeStopThread t=new SafeStopThread();
Thread t1=new Thread(t);
t1.start();
for(int i=0;i<5;i++){
new Thread(t).start();
}
t.terminate();
}
}複製代碼
在語言層面有兩種方式。java.lang.Thread 類的實例就是一個線程可是它須要調用java.lang.Runnable接口來執行,因爲線程類自己就是調用的Runnable接口因此你能夠繼承java.lang.Thread 類或者直接調用Runnable接口來重寫run()方法實現線程。數據庫
Java不支持類的多重繼承,但容許你調用多個接口。因此若是你要繼承其餘類,固然是調用Runnable接口好了。編程
Semaphore兩個重要的方法就是semaphore.acquire() 請求一個信號量,這時候的信號量個數-1(一旦沒有可以使用的信號量,也即信號量個數變爲負數時,再次請求的時候就會阻塞,直到其餘線程釋放了信號量)semaphore.release()釋放一個信號量,此時信號量個數+1數組
public class SemaphoreTest {
private Semaphore mSemaphore = new Semaphore(5);
public void run(){
for(int i=0; i< 100; i++){
new Thread(new Runnable() {
@Override
public void run() {
test();
}
}).start();
}
}
private void test(){
try {
mSemaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 進來了");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 出去了");
mSemaphore.release();
}
}複製代碼
線程調度是指系統爲線程分配處理器使用權的過程。 主要調度方式有兩種,分別是協同式線程調度和搶佔式線程調度。緩存
協同式線程調度:線程的執行時間由線程自己控制,當線程把本身的工做執行完了以後,主動通知系統切換到另外一個線程上。安全
搶佔式線程調度:每一個線程由系統分配執行時間,不禁線程自己決定。線程的執行時間是系統可控的,不會有一直阻塞的問題。
Java使用搶佔式調度
搶佔式。一個線程用完CPU以後,操做系統會根據線程優先級、線程飢餓狀況等數據算出一個總的優先級並分配下一個時間片給某個線程執行。
線程類的構造方法、靜態塊是被new這個線程類所在的線程所調用的,而run方法裏面的代碼纔是被線程自身所調用的。
Thread t = Thread.currentThread();
String name = t.getName();
System.out.println("name=" + name);複製代碼
建立線程要花費昂貴的資源和時間,若是任務來了才建立線程那麼響應時間會變長,並且一個進程能建立的線程數有限。
爲了不這些問題,在程序啓動的時候就建立若干線程來響應處理,它們被稱爲線程池,裏面的線程叫工做線程。
Executor框架讓你能夠建立不一樣的線程池。好比單線程池,每次處理一個任務;數目固定的線程池或者是緩存線程池(一個適合不少生存期短的任務的程序的可擴展線程池)。
如下是Java自帶的幾種線程池: 一、newFixedThreadPool 建立一個指定工做線程數量的線程池。 每當提交一個任務就建立一個工做線程,若是工做線程數量達到線程池初始的最大數,則將提交的任務存入到池隊列中。
二、newCachedThreadPool 建立一個可緩存的線程池。 這種類型的線程池特色是:
三、newSingleThreadExecutor建立一個單線程化的Executor,即只建立惟一的工做者線程來執行任務,若是這個線程異常結束,會有另外一個取代它,保證順序執行(我以爲這點是它的特點)。
單工做線程最大的特色是可保證順序地執行各個任務,而且在任意給定的時間不會有多個線程是活動的。
四、newScheduleThreadPool 建立一個定長的線程池,並且支持定時的以及週期性的任務執行,相似於Timer。
Executor 和 ExecutorService 這兩個接口主要的區別是:
Executors 類提供工廠方法用來建立不一樣類型的線程池。
好比: newSingleThreadExecutor() 建立一個只有一個線程的線程池,newFixedThreadPool(int numOfThreads)來建立固定線程數的線程池,newCachedThreadPool()能夠根據須要建立新的線程,但若是已有線程是空閒的會重用已有線程。
Java經過Executors提供四種線程池,分別爲:
newCachedThreadPool建立一個可緩存線程池,若是線程池長度超過處理須要,可靈活回收空閒線程,若無可回收,則新建線程。
newFixedThreadPool 建立一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待。
newScheduledThreadPool 建立一個定長線程池,支持定時及週期性任務執行。
newSingleThreadExecutor 建立一個單線程化的線程池,它只會用惟一的工做線程來執行任務,保證全部任務按照指定順序(FIFO, LIFO, 優先級)執行。
start()方法被用來啓動新建立的線程,並且start()內部調用了run()方法,這和直接調用run()方法的效果不同。
當你調用run()方法的時候,只會是在原來的線程中調用,沒有新的線程啓動,start()方法纔會啓動新線程。
兩個方法均可以向線程池提交任務,execute()方法的返回類型是void,它定義在Executor接口中, 而submit()方法能夠返回持有計算結果的Future對象,它定義在ExecutorService接口中,它擴展了Executor接口,其它線程池類像ThreadPoolExecutor和ScheduledThreadPoolExecutor都有這些方法。
notify()方法不能喚醒某個具體的線程,因此只有一個線程在等待的時候它纔有用武之地。而notifyAll()喚醒全部線程並容許他們爭奪鎖確保了至少有一個線程能繼續運行。
當有線程調用了對象的 notifyAll()方法(喚醒全部 wait 線程)或 notify()方法(只隨機喚醒一個 wait 線程),被喚醒的的線程便會進入該對象的鎖池中,鎖池中的線程會去競爭該對象鎖。也就是說,調用了notify後只要一個線程會由等待池進入鎖池,而notifyAll會將該對象等待池內的全部線程移動到鎖池中,等待鎖競爭
優先級高的線程競爭到對象鎖的機率大,倘若某線程沒有競爭到該對象鎖,它還會留在鎖池中,惟有線程再次調用 wait()方法,它纔會從新回到等待池中。
一個很明顯的緣由是JAVA提供的鎖是對象級的而不是線程級的,每一個對象都有鎖,經過線程得到。
若是線程須要等待某些鎖那麼調用對象中的wait()方法就有意義了。若是wait()方法定義在Thread類中,線程正在等待的是哪一個鎖就不明顯了。
簡單的說,因爲wait,notify和notifyAll都是鎖級別的操做,因此把他們定義在Object類中由於鎖屬於對象。
主要是由於Java API強制要求這樣作,若是你不這麼作,你的代碼會拋出IllegalMonitorStateException異常。還有一個緣由是爲了不wait和notify之間產生競態條件。
最主要的緣由是爲了防止如下這種狀況
// 等待者(Thread1)
while (condition != true) { // step.1
lock.wait() // step.4
}
// 喚醒者(Thread2)
condition = true; // step.2
lock.notify(); // step.3
複製代碼
在對以前的代碼去掉 synchronized 塊以後,若是在等待者判斷 condition != true 以後而調用 wait() 以前,喚醒者**將 condition 修改爲了 true 同時調用了 notify() **的話,那麼等待者在調用了 wait() 以後就沒有機會被喚醒了。
join() 的做用:讓「主線程」等待「子線程」結束以後才能繼續運行。
yield方法能夠暫停當前正在執行的線程對象,讓其它有相同優先級的線程執行。它是一個靜態方法並且只保證當前線程放棄CPU佔用而不能保證使其它線程必定能佔用CPU,執行yield()的線程有可能在進入到暫停狀態後立刻又被執行。
sleep()方法(休眠)是線程類(Thread)的靜態方法,調用此方法會讓當前線程暫停執行指定的時間,將執行機會(CPU)讓給其餘線程,可是對象的鎖依然保持,所以休眠時間結束後會自動恢復。注意這裏的恢復並非恢復到執行的狀態,而是恢復到可運行狀態中等待CPU的寵幸。
Java程序中wait和sleep都會形成某種形式的暫停,它們能夠知足不一樣的須要。
不能被重寫,線程的不少方法都是由系統調用的,不能經過子類覆寫去改變他們的行爲。
Thread類的sleep()和yield()方法將在當前正在執行的線程上運行。
該代碼只有在某個A線程執行時會被執行,這種狀況下通知某個B線程yield是無心義的(由於B線程原本就沒在執行)。所以只有當前線程執行yield纔是有意義的。經過使該方法爲static,你將不會浪費時間嘗試yield 其餘線程。
只能給本身喂安眠藥,不能給別人喂安眠藥。
阻塞式方法是指程序會一直等待該方法完成期間不作其餘事情。
ServerSocket的accept()方法就是一直等待客戶端鏈接。這裏的阻塞是指調用結果返回以前,當前線程會被掛起,直到獲得結果以後纔會返回。
此外,還有異步和非阻塞式方法在任務完成前就返回。
在Java裏面沒有辦法強制啓動一個線程,它是被線程調度器控制着
簡單的說,若是異常沒有被捕獲該線程將會中止執行。
Thread.UncaughtExceptionHandler是用於處理未捕獲異常形成線程忽然中斷狀況的一個內嵌接口。
當一個未捕獲異常將形成線程中斷的時候JVM會使用Thread.getUncaughtExceptionHandler()來查詢線程的UncaughtExceptionHandler並將線程和異常做爲參數傳遞給handler的uncaughtException()方法進行處理。
在Java中有兩種異常。
非運行時異常(Checked Exception):這種異常必須在方法聲明的throws語句指定,或者在方法體內捕獲。例如:IOException和ClassNotFoundException。
運行時異常(Unchecked Exception):這種異常沒必要在方法聲明中指定,也不須要在方法體中捕獲。例如,NumberFormatException。
由於run()方法不支持throws語句,因此當線程對象的run()方法拋出非運行異常時,咱們必須捕獲而且處理它們。當運行時異常從run()方法中拋出時,默認行爲是在控制檯輸出堆棧記錄而且退出程序。
好在,java提供給咱們一種在線程對象裏捕獲和處理運行時異常的一種機制。實現用來處理運行時異常的類,這個類實現UncaughtExceptionHandler接口而且實現這個接口的uncaughtException()方法。示例:
package concurrency;
import java.lang.Thread.UncaughtExceptionHandler;
public class Main2 {
public static void main(String[] args) {
Task task = new Task();
Thread thread = new Thread(task);
thread.setUncaughtExceptionHandler(new ExceptionHandler());
thread.start();
}
}
class Task implements Runnable{
@Override
public void run() {
int numero = Integer.parseInt("TTT");
}
}
class ExceptionHandler implements UncaughtExceptionHandler{
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.printf("An exception has been captured\n");
System.out.printf("Thread: %s\n", t.getId());
System.out.printf("Exception: %s: %s\n", e.getClass().getName(),e.getMessage());
System.out.printf("Stack Trace: \n");
e.printStackTrace(System.out);
System.out.printf("Thread status: %s\n",t.getState());
}
}複製代碼
當一個線程拋出了異常而且沒有被捕獲時(這種狀況只多是運行時異常),JVM檢查這個線程是否被預置了未捕獲異常處理器。若是找到,JVM將調用線程對象的這個方法,並將線程對象和異常做爲傳入參數。
Thread類還有另外一個方法能夠處理未捕獲到的異常,即靜態方法setDefaultUncaughtExceptionHandler()。這個方法在應用程序中爲全部的線程對象建立了一個異常處理器。
當線程拋出一個未捕獲到的異常時,JVM將爲異常尋找如下三種可能的處理器。
處於等待狀態的線程可能會收到錯誤警報和僞喚醒,若是不在循環中檢查等待條件,程序就會在沒有知足結束條件的狀況下退出。
一、通常來講,wait確定是在某個條件調用的,不是if就是while 二、放在while裏面,是防止出於waiting的對象被別的緣由調用了喚醒方法,可是while裏面的條件並無知足(也可能當時知足了,可是因爲別的線程操做後,又不知足了),就須要再次調用wait將其掛起。 三、其實還有一點,就是while最好也被同步,這樣不會致使錯失信號。
while(condition){
wait();
}
複製代碼
忙循環就是程序員用循環讓一個線程等待,不像傳統方法wait()、 sleep() 或 yield(),它們都放棄了CPU控制,而忙循環不會放棄CPU,它就是在運行一個空循環。
這麼作的目的是爲了保留CPU緩存,在多核系統中,一個等待線程醒來的時候可能會在另外一個內核運行,這樣會重建緩存。爲了不重建緩存和減小等待重建的時間就可使用它了。
沒有得到鎖的線程一直循環在那裏看是否該鎖的保持者已經釋放了鎖,這就是自旋鎖。
互斥鎖:從等待到解鎖過程,線程會從sleep狀態變爲running狀態,過程當中有線程上下文的切換,搶佔CPU等開銷。
自旋鎖不會引發調用者休眠,若是自旋鎖已經被別的線程保持,調用者就一直循環在那裏看是否該自旋鎖的保持者釋放了鎖。因爲自旋鎖不會引發調用者休眠,因此自旋鎖的效率遠高於互斥鎖。
雖然自旋鎖效率比互斥鎖高,但它會存在下面兩個問題: 一、自旋鎖一直佔用CPU,在未得到鎖的狀況下,一直運行,若是不能在很短的時間內得到鎖,會致使CPU效率下降。 二、試圖遞歸地得到自旋鎖會引發死鎖。遞歸程序決不能在持有自旋鎖時調用它本身,也決不能在遞歸調用時試圖得到相同的自旋鎖。
因而可知,咱們要慎重的使用自旋鎖,自旋鎖適合於鎖使用者保持鎖時間比較短而且鎖競爭不激烈的狀況。正是因爲自旋鎖使用者通常保持鎖時間很是短,所以選擇自旋而不是睡眠是很是必要的,自旋鎖的效率遠高於互斥鎖。
同一個Runnable,使用全局變量。
第一種:將共享數據封裝到一個對象中,把這個共享數據所在的對象傳遞給不一樣的Runnable
第二種:將這些Runnable對象做爲某一個類的內部類,共享的數據做爲外部類的成員變量,對共享數據的操做分配給外部類的方法來完成,以此實現對操做共享數據的互斥和通訊,做爲內部類的Runnable來操做外部類的方法,實現對數據的操做
class ShareData {
private int x = 0;
public synchronized void addx(){
x++;
System.out.println("x++ : "+x);
}
public synchronized void subx(){
x--;
System.out.println("x-- : "+x);
}
}
public class ThreadsVisitData {
public static ShareData share = new ShareData();
public static void main(String[] args) {
//final ShareData share = new ShareData();
new Thread(new Runnable() {
public void run() {
for(int i = 0;i<100;i++){
share.addx();
}
}
}).start();
new Thread(new Runnable() {
public void run() {
for(int i = 0;i<100;i++){
share.subx();
}
}
}).start();
}
}複製代碼
Runnable和Callable都是接口, 不一樣之處: 1.Callable能夠返回一個類型V,而Runnable不能夠 2.Callable可以拋出checked exception,而Runnable不能夠。 3.Runnable是自從java1.1就有了,而Callable是1.5以後才加上去的 4.Callable和Runnable均可以應用於executors。而Thread類只支持Runnable.
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ThreadTestB {
public static void main(String[] args) {
ExecutorService e=Executors.newFixedThreadPool(10);
Future f1=e.submit(new MyCallableA());
Future f2=e.submit(new MyCallableA());
Future f3=e.submit(new MyCallableA());
System.out.println("--Future.get()....");
try {
System.out.println(f1.get());
System.out.println(f2.get());
System.out.println(f3.get());
} catch (InterruptedException e1) {
e1.printStackTrace();
} catch (ExecutionException e1) {
e1.printStackTrace();
}
e.shutdown();
}
}
class MyCallableA implements Callable<String>{
public String call() throws Exception {
System.out.println("開始執行Callable");
String[] ss={"zhangsan","lisi"};
long[] num=new long[2];
for(int i=0;i<1000000;i++){
num[(int)(Math.random()*2)]++;
}
if(num[0]>num[1]){
return ss[0];
}else if(num[0]<num[1]){
throw new Exception("棄權!");
}else{
return ss[1];
}
}
}複製代碼
CountDownLatch和CyclicBarrier都可以實現線程之間的等待,只不過它們側重點不一樣:
CountDownLatch的用法:
public class Test {
public static void main(String[] args) {
final CountDownLatch latch = new CountDownLatch(2);
new Thread(){
public void run() {
try {
System.out.println("子線程"+Thread.currentThread().getName()+"正在執行");
Thread.sleep(3000);
System.out.println("子線程"+Thread.currentThread().getName()+"執行完畢");
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}.start();
new Thread(){
public void run() {
try {
System.out.println("子線程"+Thread.currentThread().getName()+"正在執行");
Thread.sleep(3000);
System.out.println("子線程"+Thread.currentThread().getName()+"執行完畢");
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}.start();
try {
System.out.println("等待2個子線程執行完畢...");
latch.await();
System.out.println("2個子線程已經執行完畢");
System.out.println("繼續執行主線程");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}複製代碼
CyclicBarrier用法:
public class Test {
public static void main(String[] args) {
int N = 4;
CyclicBarrier barrier = new CyclicBarrier(N,new Runnable() {
@Override
public void run() {
System.out.println("當前線程"+Thread.currentThread().getName());
}
});
for(int i=0;i<N;i++)
new Writer(barrier).start();
}
static class Writer extends Thread{
private CyclicBarrier cyclicBarrier;
public Writer(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println("線程"+Thread.currentThread().getName()+"正在寫入數據...");
try {
Thread.sleep(5000); //以睡眠來模擬寫入數據操做
System.out.println("線程"+Thread.currentThread().getName()+"寫入數據完畢,等待其餘線程寫入完畢");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
}catch(BrokenBarrierException e){
e.printStackTrace();
}
System.out.println("全部線程寫入完畢,繼續處理其餘任務...");
}
}
}複製代碼
interrupt方法用於中斷線程。調用該方法的線程的狀態爲將被置爲」中斷」狀態。
注意:線程中斷僅僅是置線程的中斷狀態位,不會中止線程。須要用戶本身去監視線程的狀態爲並作處理。支持線程中斷的方法(也就是線程中斷後會拋出interruptedException的方法)就是在監視線程的中斷狀態,一旦線程的中斷狀態被置爲「中斷狀態」,就會拋出中斷異常。
isInterrupted 只是簡單的查詢中斷狀態,不會對狀態進行修改。
ConcurrentHashMap的結構是比較複雜的,都深究去本質,其實也就是數組和鏈表而已。咱們由淺入深慢慢的分析其結構。
先簡單分析一下,ConcurrentHashMap 的成員變量中,包含了一個 Segment 的數組(final Segment<K,V>[] segments;),而 Segment 是 ConcurrentHashMap 的內部類,而後在 Segment 這個類中,包含了一個 HashEntry 的數組(transient volatile HashEntry<K,V>[] table;)。而 HashEntry 也是ConcurrentHashMap 的內部類。HashEntry 中,包含了 key 和 value 以及 next 指針(相似於 HashMap 中 Entry),因此 HashEntry 能夠構成一個鏈表。
因此通俗的講,ConcurrentHashMap 數據結構爲一個 Segment 數組,Segment 的數據結構爲 HashEntry 的數組,而 HashEntry 存的是咱們的鍵值對,能夠構成鏈表。
首先,咱們看一下 HashEntry 類。
HashEntry 用來封裝散列映射表中的鍵值對。在 HashEntry 類中,key,hash 和 next 域都被聲明爲 final 型,value 域被聲明爲 volatile 型。其類的定義爲:
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
...
...
}複製代碼
HashEntry 的學習能夠類比着 HashMap 中的 Entry。咱們的存儲鍵值對的過程當中,散列的時候若是發生「碰撞」,將採用「分離鏈表法」來處理碰撞:把碰撞的 HashEntry 對象連接成一個鏈表。
以下圖,咱們在一個空桶中插入 A、B、C 兩個 HashEntry 對象後的結構圖(其實應該爲鍵值對,在這進行了簡化以方便更容易理解):
Segment 的類定義爲static final class Segment<K,V> extends ReentrantLock implements Serializable。其繼承於 ReentrantLock 類,從而使得 Segment 對象能夠充當鎖的角色。Segment 中包含HashEntry 的數組,其能夠守護其包含的若干個桶(HashEntry的數組)。Segment 在某些意義上有點相似於 HashMap了,都是包含了一個數組,而數組中的元素能夠是一個鏈表。
table:table 是由 HashEntry 對象組成的數組若是散列時發生碰撞,碰撞的 HashEntry 對象就以鏈表的形式連接成一個鏈表table數組的數組成員表明散列映射表的一個桶每一個 table 守護整個 ConcurrentHashMap 包含桶總數的一部分若是併發級別爲 16,table 則守護 ConcurrentHashMap 包含的桶總數的 1/16。
count 變量是計算器,表示每一個 Segment 對象管理的 table 數組(若干個 HashEntry 的鏈表)包含的HashEntry 對象的個數。之因此在每一個Segment對象中包含一個 count 計數器,而不在 ConcurrentHashMap 中使用全局的計數器,是爲了不出現「熱點域」而影響併發性。
/**
* Segments are specialized versions of hash tables. This
* subclasses from ReentrantLock opportunistically, just to
* simplify some locking and avoid separate construction.
*/
static final class Segment<K,V> extends ReentrantLock implements Serializable {
/**
* The per-segment table. Elements are accessed via
* entryAt/setEntryAt providing volatile semantics.
*/
transient volatile HashEntry<K,V>[] table;
/**
* The number of elements. Accessed only either within locks
* or among other volatile reads that maintain visibility.
*/
transient int count;
transient int modCount;
/**
* 裝載因子
*/
final float loadFactor;
}複製代碼
咱們經過下圖來展現一下插入 ABC 三個節點後,Segment 的示意圖:
其實從我我的角度來講,Segment結構是與HashMap很像的。
ConcurrentHashMap 的結構中包含的 Segment 的數組,在默認的併發級別會建立包含 16 個 Segment 對象的數組。經過咱們上面的知識,咱們知道每一個 Segment 又包含若干個散列表的桶,每一個桶是由 HashEntry 連接起來的一個鏈表。若是 key 可以均勻散列,每一個 Segment 大約守護整個散列表桶總數的 1/16。
下面咱們還有經過一個圖來演示一下 ConcurrentHashMap 的結構:
在 ConcurrentHashMap 中,當執行 put 方法的時候,會須要加鎖來完成。咱們經過代碼來解釋一下具體過程: 當咱們 new 一個 ConcurrentHashMap 對象,而且執行put操做的時候,首先會執行 ConcurrentHashMap 類中的 put 方法,該方法源碼爲:
/**
* Maps the specified key to the specified value in this table.
* Neither the key nor the value can be null.
*
* <p> The value can be retrieved by calling the <tt>get</tt> method
* with a key that is equal to the original key.
*
* @param key key with which the specified value is to be associated
* @param value value to be associated with the specified key
* @return the previous value associated with <tt>key</tt>, or
* <tt>null</tt> if there was no mapping for <tt>key</tt>
* @throws NullPointerException if the specified key or value is null
*/
@SuppressWarnings("unchecked")
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}複製代碼
咱們經過註釋能夠了解到,ConcurrentHashMap 不容許空值。該方法首先有一個 Segment 的引用 s,而後會經過 hash() 方法對 key 進行計算,獲得哈希值;繼而經過調用 Segment 的 put(K key, int hash, V value, boolean onlyIfAbsent)方法進行存儲操做。該方法源碼爲:
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//加鎖,這裏是鎖定的Segment而不是整個ConcurrentHashMap
HashEntry<K,V> node = tryLock() ? null :scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
//獲得hash對應的table中的索引index
int index = (tab.length - 1) & hash;
//找到hash對應的是具體的哪一個桶,也就是哪一個HashEntry鏈表
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
//解鎖
unlock();
}
return oldValue;
}複製代碼
關於該方法的某些關鍵步驟,在源碼上加上了註釋。
須要注意的是:加鎖操做是針對的 hash 值對應的某個 Segment,而不是整個 ConcurrentHashMap。由於 put 操做只是在這個 Segment 中完成,因此並不須要對整個 ConcurrentHashMap 加鎖。因此,此時,其餘的線程也能夠對另外的 Segment 進行 put 操做,由於雖然該 Segment 被鎖住了,但其餘的 Segment 並無加鎖。同時,讀線程並不會由於本線程的加鎖而阻塞。
正是由於其內部的結構以及機制,因此 ConcurrentHashMap 在併發訪問的性能上要比Hashtable和同步包裝以後的HashMap的性能提升不少。在理想狀態下,ConcurrentHashMap 能夠支持 16 個線程執行併發寫操做(若是併發級別設置爲 16),及任意數量線程的讀操做。
在實際的應用中,散列表通常的應用場景是:除了少數插入操做和刪除操做外,絕大多數都是讀取操做,並且讀操做在大多數時候都是成功的。正是基於這個前提,ConcurrentHashMap 針對讀操做作了大量的優化。經過 HashEntry 對象的不變性和用 volatile 型變量協調線程間的內存可見性,使得 大多數時候,讀操做不須要加鎖就能夠正確得到值。這個特性使得 ConcurrentHashMap 的併發性能在分離鎖的基礎上又有了近一步的提升。
ConcurrentHashMap 是一個併發散列映射表的實現,它容許徹底併發的讀取,而且支持給定數量的併發更新。相比於 HashTable 和用同步包裝器包裝的 HashMap(Collections.synchronizedMap(new HashMap())),ConcurrentHashMap 擁有更高的併發性。在 HashTable 和由同步包裝器包裝的 HashMap 中,使用一個全局的鎖來同步不一樣線程間的併發訪問。同一時間點,只能有一個線程持有鎖,也就是說在同一時間點,只能有一個線程能訪問容器。這雖然保證多線程間的安全併發訪問,但同時也致使對容器的訪問變成串行化的了。
ConcurrentHashMap 的高併發性主要來自於三個方面:
使用分離鎖,減少了請求 同一個鎖的頻率。
經過 HashEntery 對象的不變性及對同一個 Volatile 變量的讀 / 寫來協調內存可見性,使得 讀操做大多數時候不須要加鎖就能成功獲取到須要的值。因爲散列映射表在實際應用中大多數操做都是成功的 讀操做,因此 2 和 3 既能夠減小請求同一個鎖的頻率,也能夠有效減小持有鎖的時間。經過減少請求同一個鎖的頻率和儘可能減小持有鎖的時間 ,使得 ConcurrentHashMap 的併發性相對於 HashTable 和用同步包裝器包裝的 HashMap有了質的提升。
阻塞隊列(BlockingQueue)是一個支持兩個附加操做的隊列。這兩個附加的操做是:在隊列爲空時,獲取元素的線程會等待隊列變爲非空。當隊列滿時,存儲元素的線程會等待隊列可用。阻塞隊列經常使用於生產者和消費者的場景,生產者是往隊列裏添加元素的線程,消費者是從隊列裏拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器裏拿元素。
1)add(E e): 添加元素,若是BlockingQueue能夠容納,則返回true,不然報異常
2)offer(E e): 添加元素,若是BlockingQueue能夠容納,則返回true,不然返回false.
3)put(E e): 添加元素,若是BlockQueue沒有空間,則調用此方法的線程被阻斷直到BlockingQueue裏面有空間再繼續.
4)poll(long timeout, TimeUnit timeUnit): 取走BlockingQueue裏排在首位的對象,若不能當即取出,則能夠等timeout參數規定的時間,取不到時返回null
5)take(): 取走BlockingQueue裏排在首位的對象,若BlockingQueue爲空,阻斷進入等待狀態直到Blocking有新的對象被加入爲止
1)ArrayBlockingQueue: 有界的先入先出順序隊列,構造方法肯定隊列的大小.
2)LinkedBlockingQueue: 無界的先入先出順序隊列,構造方法提供兩種,一種初始化隊列大小,隊列即有界;第二種默認構造方法,隊列無界(有界即Integer.MAX_VALUE)
4)SynchronousQueue: 特殊的BlockingQueue,沒有空間的隊列,即必須有取的方法阻塞在這裏的時候才能放入元素。
3)PriorityBlockingQueue: 支持優先級的阻塞隊列 ,存入對象必須實現Comparator接口 (須要注意的是 隊列不是在加入元素的時候進行排序,而是取出的時候,根據Comparator來決定優先級最高的)。
BlockingQueue 實現主要用於生產者-使用者隊列,BlockingQueue 實現是線程安全的。全部排隊方法均可以使用內部鎖或其餘形式的併發控制來自動達到它們的目的
這是一個生產者-使用者場景的一個用例。注意,BlockingQueue 能夠安全地與多個生產者和多個使用者一塊兒使用 此用例來自jdk文檔
//這是一個生產者類
class Producer implements Runnable {
private final BlockingQueue queue;
Producer(BlockingQueue q) {
queue = q;
}
public void run() {
try {
while(true) {
queue.put(produce());
}
} catch (InterruptedException ex) {
... handle ...
}
}
Object produce() {
...
}
}
//這是一個消費者類
class Consumer implements Runnable {
private final BlockingQueue queue;
Consumer(BlockingQueue q) { queue = q; }
public void run() {
try {
while(true) {
consume(queue.take());
}
} catch (InterruptedException ex) {
... handle ...
}
}
void consume(Object x) {
...
}
}
//這是實現類
class Setup {
void main() {
//實例一個非阻塞隊列
BlockingQueue q = new SomeQueueImplementation();
//將隊列傳入兩個消費者和一個生產者中
Producer p = new Producer(q);
Consumer c1 = new Consumer(q);
Consumer c2 = new Consumer(q);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
}
}複製代碼
合理利用線程池可以帶來三個好處。第一:下降資源消耗。經過重複利用已建立的線程下降線程建立和銷燬形成的消耗。第二:提升響應速度。當任務到達時,任務能夠不須要等到線程建立就能當即執行。第三:提升線程的可管理性。線程是稀缺資源,若是無限制的建立,不只會消耗系統資源,還會下降系統的穩定性,使用線程池能夠進行統一的分配,調優和監控。可是要作到合理的利用線程池,必須對其原理了如指掌。
咱們能夠經過ThreadPoolExecutor來建立一個線程池。
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, milliseconds,runnableTaskQueue, handler);複製代碼
建立一個線程池須要輸入幾個參數:
咱們可使用execute提交的任務,可是execute方法沒有返回值,因此沒法判斷任務是否被線程池執行成功。經過如下代碼可知execute方法輸入的任務是一個Runnable類的實例。
threadsPool.execute(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
}
});複製代碼
咱們也可使用submit 方法來提交任務,它會返回一個future,那麼咱們能夠經過這個future來判斷任務是否執行成功,經過future的get方法來獲取返回值,get方法會阻塞住直到任務完成,而使用get(long timeout, TimeUnit unit)方法則會阻塞一段時間後當即返回,這時有可能任務沒有執行完。
Future<Object> future = executor.submit(harReturnValuetask);
try {
Object s = future.get();
} catch (InterruptedException e) {
// 處理中斷異常
} catch (ExecutionException e) {
// 處理沒法執行任務異常
} finally {
// 關閉線程池
executor.shutdown();
}
複製代碼
咱們能夠經過調用線程池的shutdown或shutdownNow方法來關閉線程池,它們的原理是遍歷線程池中的工做線程,而後逐個調用線程的interrupt方法來中斷線程,因此沒法響應中斷的任務可能永遠沒法終止。可是它們存在必定的區別,shutdownNow首先將線程池的狀態設置成STOP,而後嘗試中止全部的正在執行或暫停任務的線程,並返回等待執行任務的列表,而shutdown只是將線程池的狀態設置成SHUTDOWN狀態,而後中斷全部沒有正在執行任務的線程。
只要調用了這兩個關閉方法的其中一個,isShutdown方法就會返回true。當全部的任務都已關閉後,才表示線程池關閉成功,這時調用isTerminaed方法會返回true。至於咱們應該調用哪種方法來關閉線程池,應該由提交到線程池的任務特性決定,一般調用shutdown來關閉線程池,若是任務不必定要執行完,則能夠調用shutdownNow。
流程分析:線程池的主要工做流程以下圖:
從上圖咱們能夠看出,當提交一個新任務到線程池時,線程池的處理流程以下:
上面的流程分析讓咱們很直觀的瞭解了線程池的工做原理,讓咱們再經過源代碼來看看是如何實現的。線程池執行任務的方法以下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//若是線程數小於基本線程數,則建立線程並執行當前任務
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
//如線程數大於等於基本線程數或線程建立失敗,則將當前任務放到工做隊列中。
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
//若是線程池不處於運行中或任務沒法放入隊列,而且當前線程數量小於最大容許的線程數量,
則建立一個線程執行任務。
else if (!addIfUnderMaximumPoolSize(command))
//拋出RejectedExecutionException異常
reject(command); // is shutdown or saturated
}
}
複製代碼
工做線程。線程池建立線程時,會將線程封裝成工做線程Worker,Worker在執行完任務後,還會無限循環獲取工做隊列裏的任務來執行。咱們能夠從Worker的run方法裏看到這點:
public void run() {
try {
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}
複製代碼
要想合理的配置線程池,就必須首先分析任務特性,能夠從如下幾個角度來進行分析:
任務性質不一樣的任務能夠用不一樣規模的線程池分開處理。CPU密集型任務配置儘量小的線程,如配置Ncpu+1個線程的線程池。IO密集型任務則因爲線程並非一直在執行任務,則配置儘量多的線程,如2*Ncpu。混合型的任務,若是能夠拆分,則將其拆分紅一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那麼分解後執行的吞吐率要高於串行執行的吞吐率,若是這兩個任務執行時間相差太大,則不必進行分解。咱們能夠經過Runtime.getRuntime().availableProcessors()方法得到當前設備的CPU個數。
優先級不一樣的任務可使用優先級隊列PriorityBlockingQueue來處理。它可讓優先級高的任務先獲得執行,須要注意的是若是一直有優先級高的任務提交到隊列裏,那麼優先級低的任務可能永遠不能執行。
執行時間不一樣的任務能夠交給不一樣規模的線程池來處理,或者也可使用優先級隊列,讓執行時間短的任務先執行。
依賴數據庫鏈接池的任務,由於線程提交SQL後須要等待數據庫返回結果,若是等待的時間越長CPU空閒時間就越長,那麼線程數應該設置越大,這樣才能更好的利用CPU。
建議使用有界隊列,有界隊列能增長系統的穩定性和預警能力,能夠根據須要設大一點,好比幾千。有一次咱們組使用的後臺任務線程池的隊列和線程池全滿了,不斷的拋出拋棄任務的異常,經過排查發現是數據庫出現了問題,致使執行SQL變得很是緩慢,由於後臺任務線程池裏的任務全是須要向數據庫查詢和插入數據的,因此致使線程池裏的工做線程所有阻塞住,任務積壓在線程池裏。若是當時咱們設置成無界隊列,線程池的隊列就會愈來愈多,有可能會撐滿內存,致使整個系統不可用,而不僅是後臺任務出現問題。固然咱們的系統全部的任務是用的單獨的服務器部署的,而咱們使用不一樣規模的線程池跑不一樣類型的任務,可是出現這樣問題時也會影響到其餘任務。
經過線程池提供的參數進行監控。線程池裏有一些屬性在監控線程池的時候可使用
經過擴展線程池進行監控。經過繼承線程池並重寫線程池的beforeExecute,afterExecute和terminated方法,咱們能夠在任務執行前,執行後和線程池關閉前幹一些事情。如監控任務的平均執行時間,最大執行時間和最小執行時間等。這幾個方法在線程池裏是空方法。如:
protected void beforeExecute(Thread t, Runnable r) { }
複製代碼
Java中的Semaphore是一種新的同步類,它是一個計數信號。
從概念上講,信號量維護了一個許可集合。若有必要,在許可可用前會阻塞每個 acquire(),而後再獲取該許可。每一個 release()添加一個許可,從而可能釋放一個正在阻塞的獲取者。
可是,不使用實際的許可對象,Semaphore只對可用許可的號碼進行計數,並採起相應的行動。
信號量經常用於多線程的代碼中,好比數據庫鏈接池。
同步方法默認用this或者當前類class對象做爲鎖; 同步代碼塊能夠選擇以什麼來加鎖,比同步方法要更細顆粒度,咱們能夠選擇只同步會發生同步問題的部分代碼而不是整個方法; 同步方法使用關鍵字 synchronized修飾方法,而同步代碼塊主要是修飾須要進行同步的代碼,用 synchronized(object){代碼內容}進行修飾;
使用多線程的時候,一種很是簡單的避免死鎖的方式就是:指定獲取鎖的順序,並強制線程按照指定的順序獲取鎖。所以,若是全部的線程都是以一樣的順序加鎖和釋放鎖,就不會出現死鎖了。
原文:Java架構筆記