本文主要整理自《Java併發編程實戰》java
當某個操做沒法執行下去時,就會發生活躍性問題,如死鎖,飢餓,活鎖等ios
Timer, servlet/JSP,RMI,swing/AWT算法
最好將一個有關聯的同步操做放在同一個線程安全類中,由一個類提供入口,在類中作好同步措施,客戶端調用類時不須要再考慮同步問題,好比concurrent包下的類數據庫
當計算的正確性取決於多個線程的交替執行順序時,就會發生競態條件
常見的競態條件是「先檢查後執行」(check then act),檢查和執行的間隙被其餘線程隔斷,發生錯誤
應儘可能使用線程安全的類來管理類的狀態,如原子類(經過CAS方式實現,CAS算法有ABA問題)
當狀態變量有多個且相互關聯時,單純的原子類已經不夠用了,應使用同步代碼管理,此時能夠不用原子變量了編程
每一個java對象能夠用做同步的鎖,稱爲內置鎖
內置鎖是可重入的,所以,若是某線程試圖獲取一個已經由它本身持有的鎖,那這個請求會成功.重入意味着獲取鎖操做的粒度是線程,而不是調用windows
注意, synchronized實例方法鎖住的都是調用者實例數組
class Widget {
public synchronized void doSomething() {
}
}
class LoggingWidget extends Widget {
/* * 實例方法上的synchronized鎖住的都是調用實例 * 這裏確定是用LoggingWidget實例去調用,鎖住LoggingWidget實例 */
public synchronized void doSomething() {
//這裏依舊是LoggingWidget實例去調用父類的synchronized方法
//鎖住的依然是調用者LoggingWidget實例
super.doSomething();
}
}
複製代碼
每一個鎖都關聯一個請求計數器和一個佔有他的線程,當請求計數器爲0時,這個鎖能夠被認爲是unhled的,當一個線程請求一個unheld的鎖時,JVM記錄鎖的擁有者,並把鎖的請求計數加1,若是同一個線程再次請求這個鎖時,請求計數器就會增長,當該線程退出syncronized塊時,計數器減1,當計數器爲0時,鎖被釋放(這就保證了鎖是可重入的,不會發生死鎖的狀況)。緩存
線程執行互斥代碼的過程安全
Lock -> 主內存 -> 工做內存 -> 主內存 -> unlock
bash
對於可能被多個線程同時訪問的可變狀態變量,在訪問時須要持有同一個鎖,狀態變量由這個鎖保護
若原子變量的操做已經在同步代碼塊內,則可放棄使用原子變量,普通變量更好——不一樣的同步機制容易形成混亂,一個同步機制已經足夠時,就不要加入其它同步機制,多餘
某個線程在得到對象的鎖以後,只能阻止其餘線程得到同一個鎖。之因此每一個對象都有一個內置鎖,只是爲了免去顯示建立鎖對象。
Synchronized修飾實例方法,得到的就是實例鎖(對象鎖),修飾靜態方法,就得到類鎖,代碼塊同理
鎖分爲對象鎖和類鎖
Java內存模型要求,變量讀寫必須是原子操做,當無線程同步時,讀取到的變量值必爲某個線程設置的值,而不是一個隨機值,注意這裏讀寫的概念,這是jvm中的讀寫,並非代碼中的讀寫
但對於非volatile類型的long和double變量,jvm容許將64位的讀/寫操做分解爲兩個32位的操做,這就有可能形成高32位和低32位不是原組合的問題
解決方法:用volatile修飾或者用鎖保護起來
volatile修飾的變量操做不會與其餘內存操做一塊兒重排序,volatile變量不會被緩存在寄存器或者其餘處理器不可見的地方(直接讀寫主內存上的值,無副本),所以在讀取volatile類型的變量時總會返回最新寫入的值
Example
volatile boolean asleep;
…
while(asleep)
countSomeSleep()
複製代碼
若不使用volatile,可能當asleep被一個線程修改時,執行判斷的線程修改不了,此時用volatile比鎖機制簡單方便
可是,volatile只保證可見性,而不保證原子性,如volatile不保證count++的原子性(count++比存在讀取和寫入兩步),但鎖機制能夠
可使用volatile類型來發布不可變對象P40
對於服務器應用程序,在開發和測試階段,啓動jvm時都要指定-server命令行選項,server模式的jvm將比client模式的jvm進行更多的優化,例如將循環中未被修改的變量提高到循環外部,可能致使開發環境(client模式的jvm)正常的代碼到部署環境(server模式的jvm)發生異常
在volatile示例代碼中,若asleep未聲明爲volatile類型,那麼server模式的jvm會將asleep的判斷條件提高到循環體外部,而client模式的jvm不會這麼作
當在對象構造完成以前發佈該對象到其餘線程,就會破壞線程安全性,特別的,當從對象的構造函數中發佈對象時,只是發佈了一個還沒有構造完成的對象(即this未初始化完成,但你this卻能夠被構造函數中新發布(實例化)的對象引用)
不要在構造過程當中使用this引用逸出
Example
//Wrong
public class ThisEscape {
public ThisEscape(EventSource source) {
source.registerListener(new EventListener() {
public void onEvent(Event e) {
doSomething(e);
}
});
}
//Correct
public class SafeListener {
private final EventListener listener;
private SafeListener() {
listener = new EventListener() {
public void onEvent(Event e) {
doSomething(e);
}
};
}
public static SafeListener newInstance(EventSource source) {
SafeListener safe = new SafeListener(); //先構造完成
source.registerListener(safe.listener); //再發布
return safe;
}
複製代碼
指線程封閉性的職責徹底由程序實現來承擔,不共享數據,僅在單線程內訪問數據,將對象封閉到目標線程上;因其脆弱性,應該儘可能少用它,應使用更強的線程封閉技術,如棧封閉或threadlocal類
變量只存在於執行線程棧中,只在線程內部使用
若是在線程內部上下文中使用非線程安全的對象,那麼該對象仍然是線程安全的
類能夠將ThreadLoad<T>
視爲Map<Thread ,T>
Threadlocad提供了set get方法,這些方法爲使用該變量的線程都保存一份獨立的副本,所以get老是返回當前執行線程在調用set時設置的最新值
最好不要放在線程池中,避免複用
public class Holder {
// private int n;
private final int n;
public Holder(int n) {
this.n = n;
}
public void assertSanity() {
if (n != n)
throw new AssertionError("This statement is false.");
}
}
複製代碼
設爲final就能夠保證正確地構造對象,就是線程安全的了
1. 在靜態初始化函數中初始化一個對象引用:單例餓漢模式
2. 將對象的引用保存到volatile類型的域或者AtomicReferance對象中:原子類
3. 將對象引用保存到某個正確構造對象的final域中:不可變
4. 將對象引用保存到一個由鎖保護的域中:鎖
能夠將對象放入到線程安全的容器中:
複製代碼
Hashtable synchronizedMap concurrentMap vector copyOnWriterArrayList copyOnWriterSet synchronizedList synchronizedSet blockingQueue concurrentLinkedQueue
靜態初始化器由jvm在類的初始化階段執行,因爲在jvm內部存在着同步機制,故對象能夠被安全地發佈
public static Hodlder holder = new Hodeler(42);
不可變對象能夠經過任意機制發佈
事實不可變對象必須經過安全方式發佈
可變對象必須經過安全方式發佈,而且必須是線程安全的或由鎖保護起來
一些java基礎同步類並非線程安全的,但能夠經過包裝器工廠方法collections.synchronizedList()
,將容器類封裝在一個同步的包裝器對象中
把對象的全部可變狀態都封裝起來,並用對象本身的內置鎖來保護
如vector和hashtable
使用對象私有的鎖(private)可能更有優勢
同時,在獲取被保護的對象時,能夠返回複製對象,修改對象時經過保護對象共有方法修改便可(不是直接修改返回的複製對象)
copyonwrite是修改返回的集合,而後修改引用
若是一個類是由多個獨立且線程安全的狀態變量組成,而且在全部的操做中都不包括無效狀態轉換,則可將線程安全性委託給底層的狀態變量
線程安全能夠將狀態變量賦予線程安全的類來管理,好比線程安全容器,不可變容器,原子類等
涉及線程安全的變量,儘可能設爲final類型
返回引用時,特別須要注意是否會形成逸出,能夠返回複製對象,或者不可變對象(對象自己不可變(注意是否能夠修改引用),不可變容器,同步容器)
須要同步的對象能夠放到客戶端中同步,須要注意同步時加鎖同一個鎖
如vector爲同步類,其內部方法操做是同步的,但涉及幾個操做按序同步執行時,能夠在客戶端加鎖實現,此時,所加的鎖應與vector對象本來的鎖一致,即vector對象自身 synchronized(vector){ … }
同步容器
Vector、Hashtable、同步封裝類,能夠由Collections.synchronizedXxxx
等方法建立
同步容器類雖然都是線程安全的,可是在某些狀況下(複合操做),仍然須要加鎖來保護;
同步容器對全部容器狀態的訪問都串行化,嚴重下降了併發性;當多個線程競爭鎖時,吞吐量嚴重降低;
併發容器
java5.0以後提供了多種併發容器來改善同步容器的性能,如ConcurrentHashMap、CopyOnWriteArrayList、CopyOnWriteArraySet、ConcurrentSkipListMap等
以ConcurrentHashMap爲例
採用分離鎖技術,同步容器中,是一個容器一個鎖,但在ConcurrentHashMap中,會將hash表的數組部分分紅若干段,每段維護一個鎖,以達到高效的併發訪問;
迭代器弱一致性,迭代期間不會拋出ConcurrentModificationException異常;
size()
、isEmpty()
等方法返回的是一個近似值;
如size操做,就保存了一個last值用於記錄上次循環時統計的總數,只有先後兩次統計值相等時纔會返回
增長了若干原子操做方法,如putIfAbsent(沒有該key,則添加)
注意,此時不能再經過客戶端加鎖新建新的原子操做了,客戶端只能對併發容器自身加鎖,但併發容器內部使用的並非自身鎖
寫入時複製容器:Copyonwrite,在每次修改時都會加鎖並建立並從新發佈一個新的容器副本,直接修改容器引用,從而實現可見性,但在讀取時不加鎖,直接讀取原值,致使的問題就是寫入時雖然加鎖,但仍能夠讀取,可能讀到失效值.其訪問和寫入的操做最終必定會經過對應的final方法,好比setArray()
,getArray()
讀多寫少時使用Copyonwrite
總結
只有在應用程序須要對容器加鎖進行獨佔式訪問時,才用同步容器,不然使用非併發容器以保證更優性能
但在代碼中調用一個能夠拋出InterruptedException的方法時,本身的方法就變成了一個阻塞方法,而且必須處理中斷的響應
public void run() {
try {
processTask(queue.take());
} catch (InterruptedException e) {
// 恢復中斷狀態
Thread.currentThread().interrupt();
}
}
複製代碼
同步工具類能夠根據自身狀態來協調線程的控制流
put()
方法爲例)lock = new ReentrantLock(fair);
Condition notEmpty = lock.newCondition();
Condition notFull = lock.newCondition();
複製代碼
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
複製代碼
注意,一定要有循環,當被喚醒時,須要回到循環中再次作判斷是否符合條件
//提供統一入口&出口
public class TestHarness {
public long timeTasks(int nThreads, final Runnable task) throws InterruptedException {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads);
for (int i = 0; i < nThreads; i++) {
Thread t = new Thread() {
public void run() {
try {
// 全部線程等在這裏,直到計數爲0,即調用了startGate.countDown();
startGate.await();
try {
task.run();
} finally {
endGate.countDown();
}
} catch (InterruptedException ignored) {
}
}
};
t.start();
}
long start = System.nanoTime();
startGate.countDown();
// 全部線程等在這裏,直到計數爲0,即調用了endGate.countDown() nThreads次;
endGate.await();
long end = System.nanoTime();
return end - start;
}
}
複製代碼
FutureTask:可生成結果的runnable,包括3種狀態:等待運行、正在運行和運行完成。若任務已經完成,則future.get()
會當即返回結果,不然阻塞直至完成狀態.一旦完成就永遠中止 FutureTask使用場景:用ConcurrentMap <A, Future<V>>
緩存計算,vaule值是Future,P89
// 使用FutureTask來提早加載稍後須要的數據
public class Preloader {
ProductInfo loadProductInfo() {
return null; // 這裏執行復雜計算or等待
}
private final FutureTask<ProductInfo> future = new FutureTask<ProductInfo>(
new Callable<ProductInfo>() {
public ProductInfo call() throws InterruptedException {
return loadProductInfo();
}
});
private final Thread thread = new Thread(future);
public void start() {
thread.start();
}
public ProductInfo get() throws InterruptedException {
try {
return future.get(); // 阻塞直到有結果
} catch (ExecutionException e) {
throw e;
}
}
interface ProductInfo {
}
}
複製代碼
public class BoundedHashSet <T> {
private final Set<T> set;
private final Semaphore sem;
public BoundedHashSet(int bound) {
this.set = Collections.synchronizedSet(new HashSet<T>());
sem = new Semaphore(bound);
}
public boolean add(T o) throws InterruptedException {
sem.acquire();
boolean wasAdded = false;
try {
wasAdded = set.add(o);
return wasAdded;
} finally {
if (!wasAdded)
sem.release();
}
}
public boolean remove(Object o) {
boolean wasRemoved = set.remove(o);
if (wasRemoved)
sem.release();
return wasRemoved;
}
}
複製代碼
用於等待其餘線程,全部線程必須同時到達柵欄位置,才能繼續執行
CyclicBarier可使必定數量的參與方反覆地在柵欄位置聚集,在並行迭代算法中很是有用
Exchanger 是一種兩方柵欄,各方在柵欄位置上交換數據,用於雙方執行不對稱操做
private final CyclicBarrier barrier;
//barrier.await()調用了count次就執行內部線程mainBoard.commitNewValues()方法
this.barrier = new CyclicBarrier(count,
new Runnable() {
public void run() {
mainBoard.commitNewValues();
}});
public void run() {
while (!board.hasConverged()) {
//當循環走完,即表示計算完成
for (int x = 0; x < board.getMaxX(); x++)
for (int y = 0; y < board.getMaxY(); y++)
board.setNewValue(x, y, computeValue(x, y));
try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
複製代碼
P84線程CPU數與吞吐量
在不涉及I/O操做或共享數據訪問時,當線程數量爲cpu數或CPU數+1 時,將得到最優的吞吐量。一個進程下的線程是如此,若是有多個進程呢?進程間的時間分配?
提供了一種標準方法,將任務的提交過程和執行過程解耦,還提供了對生命週期的支持,以及統計信息收集,應用程序管理機制和性能監視等機制
名稱 | 線程數 | 異常 | 特性 | 實現隊列 |
---|---|---|---|---|
newfixedThreadPool | 固定 | 出現異常而結束則補充一個線程 | 逐步增長,直到最大 | LinkedBlockingQueue |
newCachedThreadPool | 最大Interger. MAX_VALUE | 可緩存線程池,線程池規模多於當前需求,則回收空閒線程,線程池可無限擴大 | SynchronousQueue | |
newSingleThreadExecutor | 1 | 出現異常而結束則另起一個線程 | 單線程按優先級等順序執行 | LinkedBlockingQueue |
newScheduledThreadPool | 固定 | 以延遲或定時的方式執行 | RunnableScheduledFuture[] 數組 |
ExecutorService exec = Executors.newSingleThreadExecutor();
三種狀態:運行、關閉和已終止
四個生命週期階段:建立,提交,開始和完成
已提交但還沒有開始的任務能夠取消,而已開始執行的任務,只有當它們能響應中斷時才能取消
JVM只有在全部非守護線程所有終止後纔會退出,若是沒法正確關閉executor,那麼jvm則沒法結束
而關閉時有兩種方式
1. 平緩關閉shutdown:中止接受新任務,執行完全部正在執行和在等待隊列中的任務
2. 強制關閉shutdownNow:取消全部運行中的任務,再也不啓動等待隊列中的任務,返回全部已提交但未開始的任務,能夠將任務記入日誌etc
java.util.Timer類在執行全部定時任務時只會建立一個線程,若某個任務執行時間過長,那麼將破壞其餘TimerTask的定時精確性
Timer若拋出異常,則會取消全部timer類下的定時線程,不會恢復執行
ExecutorService中的全部submit方法都將放回一個future,可得到任務結果
Future有cancle方法,能夠取消任務
CompletionService:將executor和blockingqueue的功能融合在一塊兒,將callable任務交給提交給他來執行,而後使用相似於隊列操做的take和poll等方法得到future,再future.get()
返回結果.這裏是能夠應付一組計算結果,一旦有返回就能夠得到
如ExecutorCompletionService實際上就是將計算完成後的結果放在blockingqueue中
void renderPage(CharSequence source) {
final List<ImageInfo> info = scanForImageInfo(source);
CompletionService<ImageData> completionService = new ExecutorCompletionService<ImageData>(executor);
for (final ImageInfo imageInfo : info)
completionService.submit(new Callable<ImageData>() {
public ImageData call() {
return imageInfo.downloadImage();
}
});
renderText(source);
try {
for (int t = 0, n = info.size(); t < n; t++) {
Future<ImageData> f = completionService.take();
ImageData imageData = f.get();
renderImage(imageData);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw launderThrowable(e.getCause());
}
}
複製代碼
Future爲一個任務設計時限:時限內有結果,get當即返回,超過期限拋出TimeOutException Future.get(long,timeType)
提交一組任務
InvokeAll:將多個任務提交到一個ExecutorService並得到結果,InvokeAll按照任務集合中迭代器的順序添加到返回集合,由此可關聯各個future與callable
當任務執行完成/調用者線程中斷/超時,invokeAll將返回,能夠經過get或者isCancle判斷是何種狀況
List<QuoteTask> tasks = new ArrayList<QuoteTask>();
for (TravelCompany company : companies)
tasks.add(new QuoteTask(company, travelInfo));
List<Future<TravelQuote>> futures = exec.invokeAll(tasks, time, unit);
List<TravelQuote> quotes = new ArrayList<TravelQuote>(tasks.size());
for (Future<TravelQuote> f : futures) {
try {
quotes.add(f.get());
} catch (ExecutionException e) {
quotes.add(...); //按序放回關聯,須要放入對象
} catch (CancellationException e) {
quotes.add(...); //按序放回關聯,須要放入對象
}
}
複製代碼
class QuoteTask implements Callable<TravelQuote> {
public TravelQuote call() throws Exception {
return company.solicitQuote(travelInfo);
}
}
複製代碼
Java沒有提供任何機制來安全地終止線程,而是提供了中斷(interrupion),能使一個線程終止另外一個線程的當前工做
Callable認爲主入口點將返回一個值,並可能拋出一個異常
無返回值,可以使用Callable
調用interrupt並不意味着當即中止目標線程正在進行的工做,而只是傳遞了請求中斷的消息,而後由線程在下一個合適的時刻中斷本身
一般,中斷是實現取消最合理的方式,而不是設置標誌位:若使用標誌位,I/O阻塞就會一直卡住,中斷請求只能設置線程的中斷狀態,同樣也卡住,只能關閉I/O接口
得到中斷狀態,並清除當前線程的中斷狀態.
在調用interrupted時返回了true,則會清除線程中斷狀態,下次再調用interrupted時就已經不是中斷狀態了,故須要對中斷作處理—拋出interruptException或者再次調用interrupt恢復中斷狀態:Thread.currentThread().interrupt();
最合理的取消操做是某種形式的線程級取消操做或服務級取消操做:儘快退出,在必要時進行清理,通知某個全部者線程已經退出
線程應該只能由其全部者中斷,全部者能夠將線程的中斷策略信息封裝到某個合適的取消機制中,例如shutdown方法
public void run() {
try {
processTask(queue.take());
} catch (InterruptedException e) {
// restore interrupted status
Thread.currentThread().interrupt();
}
}
複製代碼
當嘗試取消某個任務時,不宜直接中斷線程池,只能經過任務的future來實現取消Future<?> task = taskExec.submit(r);
try {
task.get(timeout, unit);
} catch (ExecutionException e) {
throw launderThrowable(e.getCause());
} finally {
task.cancel(true);
}
複製代碼
在如socket I/O或者等待得到內置鎖而阻塞時,那麼中斷請求只能設置線程的中斷狀態,除此以外並沒有多大做用。此時應該中斷底層的阻塞操做,拋出異常,以此響應中斷
Example:
Socket讀取阻塞
改寫thread的中斷方法
public void interrupt() {
try {
socket.close();
} catch (IOException ignored) {
} finally {
super.interrupt();
}
}
複製代碼
能夠中斷線程,也能夠取消底層阻塞方法
注意,在取消生產者-消費者操做時,須要同時取消生產者和消費者
public abstract class SocketUsingTask <T> implements CancellableTask<T> {
@GuardedBy("this") private Socket socket;
protected synchronized void setSocket(Socket s) {
socket = s;
}
//自定義的取消方法
public synchronized void cancel() {
try {
if (socket != null)
socket.close();
} catch (IOException ignored) {
}
}
public RunnableFuture<T> newTask() {
return new FutureTask<T>(this) {
public boolean cancel(boolean mayInterruptIfRunning) {
try {
//先調用自身取消方法
SocketUsingTask.this.cancel();
} finally {
return super.cancel(mayInterruptIfRunning);
}
}
};
}
}
//新增兩個方法
interface CancellableTask <T> extends Callable<T> {
void cancel();
RunnableFuture<T> newTask();
}
@ThreadSafe
class CancellingExecutor extends ThreadPoolExecutor {
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
if (callable instanceof CancellableTask)
return ((CancellableTask<T>) callable).newTask(); //返回擴展對象
else
return super.newTaskFor(callable);
}
}
複製代碼
能夠設置一個Boolean flag標識是否取消。同時設置一個計數器統計當前任務隊列中任務數量,關閉時設置flag,中斷線程,而底層的生產者方法就判斷flag是否已關閉,拋出異常,消費者則只在flag和計數器值爲0時取消,不然一直處理任務隊列,直到完成全部任務。
public class LogService {
private final BlockingQueue<String> queue;
private final LoggerThread loggerThread;
private final PrintWriter writer;
@GuardedBy("this") private boolean isShutdown;
@GuardedBy("this") private int reservations;
public LogService(Writer writer) {
this.queue = new LinkedBlockingQueue<String>();
this.loggerThread = new LoggerThread();
this.writer = new PrintWriter(writer);
}
public void start() {
loggerThread.start();
}
public void stop() {
synchronized (this) {
isShutdown = true;
}
loggerThread.interrupt();
}
public void log(String msg) throws InterruptedException {
synchronized (this) {
if (isShutdown)
throw new IllegalStateException(/*...*/);
++reservations;
}
queue.put(msg);
}
private class LoggerThread extends Thread {
public void run() {
try {
while (true) {
try {
synchronized (LogService.this) {
//要把對象消費完
if (isShutdown && reservations == 0)
break;
}
String msg = queue.take();
synchronized (LogService.this) {
--reservations;
}
writer.println(msg);
} catch (InterruptedException e) { /* retry */
}
}
} finally {
writer.close();
}
}
}
}
複製代碼
往任務隊列中添加約定的對象,消費者每次都查看對象,判斷是否退出
也能夠再作一個統計,達到數量才退出,這樣就能夠確保取消多個線程
已知生產者消費者時纔有用,要確認生產的毒丸對象數量
注意:只有在無界隊列中,毒丸對象才能可靠地工做
public class IndexingService {
private static final int CAPACITY = 1000;
private static final File POISON = new File("");
private final IndexerThread consumer = new IndexerThread();
private final CrawlerThread producer = new CrawlerThread();
private final BlockingQueue<File> queue;
private final FileFilter fileFilter;
private final File root;
public IndexingService(File root, final FileFilter fileFilter) {
this.root = root;
this.queue = new LinkedBlockingQueue<File>(CAPACITY);
this.fileFilter = new FileFilter() {
public boolean accept(File f) {
return f.isDirectory() || fileFilter.accept(f);
}
};
}
private boolean alreadyIndexed(File f) {
return false;
}
public void start() {
producer.start();
consumer.start();
}
public void stop() { //中斷機制
producer.interrupt();
}
public void awaitTermination() throws InterruptedException {
consumer.join();
}
}
複製代碼
消費者
class IndexerThread extends Thread {
public void run() {
try {
while (true) {
File file = queue.take();
if (file == POISON)
break;
else
indexFile(file);
}
} catch (InterruptedException consumed) {
}
}
public void indexFile(File file) {
/*...*/
};
}
複製代碼
生產者
class CrawlerThread extends Thread {
public void run() {
try {
crawl(root);
} catch (InterruptedException e) { /* 被打斷就放入毒丸對象 */
} finally {
while (true) {
try {
queue.put(POISON);
break;
} catch (InterruptedException e1) { /* retry */
}
}
}
}
private void crawl(File root) throws InterruptedException {
File[] entries = root.listFiles(fileFilter);
if (entries != null) {
for (File entry : entries) {
if (entry.isDirectory())
crawl(entry);
else if (!alreadyIndexed(entry))
queue.put(entry);
}
}
}
}
複製代碼
平緩關閉shutdown:中止接受新任務,執行完全部正在執行和在等待隊列中的任務
public void stop() {
try {
exec.shutdown();
exec.awaitTermination(3000, TimeUnit);//等待執行完成,這裏不是冗餘嗎?
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
...
}
}
複製代碼
致使線程提早死亡的最主要緣由就是runtimeException,在線程代碼中可使用try-catch代碼塊捕獲異常並進行處理
未捕獲異常
UncaughtExceptionHandler,Thread API中提供的處理異常類,能檢測出某個線程因爲未捕獲的異常而終結的狀況,至少將異常信息打印到日誌表中。須要爲ThreadPoolExecutor的構造函數提供一個ThreadFactory
public class MyAppThread extends Thread {
public MyAppThread(Runnable runnable, String name) {
super(runnable, name);
setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e);
}
});
}
}
複製代碼
只有經過execute提交的任務,才能將它拋出的異常交給未捕獲異常處理器,而經過submit提交的任務,不管是拋出的未檢查異常仍是已檢查異常,都將被認爲是任務返回狀態的一部分.
若一個由submit提交的任務因爲拋出了異常而結束,那麼這個異常將被future.get封裝在ExecutionException中從新拋出
若但願在任務中因爲發生異常而失敗時得到通知,而且執行一些特定於任務的恢復操做,那麼能夠將任務封裝在能捕獲異常的runnable或callable中,或者改寫ThreadPoolExecutor的afterExecute方法
Shutdown hook:正常關閉jvm時,jvm首先調用全部已經註冊好的關閉鉤子,指經過Runtime.addShutdownHook()
註冊但未開始的線程
Jvm並不會中止或者中斷任何在關閉時仍然運行的應用程序線程,當jvm最終結束時,守護線程將被強行結束
Runtime.getRuntime().addShutdownHook(new Thread(){...});
普通線程:主線程建立的全部線程都是普通線程,普通線程繼承了建立它的線程的守護狀態
守護線程:非普通線程,當一個線程退出時,jvm會檢查正在運行的線程,若都是守護線程,則jvm退出,當jvm中止時,全部的守護線程將被拋棄,直接退出
守護線程最好執行「內部任務」
只有當線程本地值的生命週期受限於任務的生命週期時,在線程池的線程中使用threadlocal纔有意義,而在線程池的線程中不該該使用threadlocal在任務之間傳遞值
只要線程池中的任務須要無限期地等待一些必須由池中其餘任務才能提供的資源或條件,除非線程池足夠大,不然將發生線程飢餓死鎖
每當提交一個有依賴性的executor任務時,須要知道可能會出現線程飢餓死鎖,故而須要在代碼或配置executor的配置文件中記錄線程池的大小限制或配置限制
只有當任務相互獨立時,爲線程池工做隊列設置界限纔是合理的,若是任務之間存在依賴性,那麼有界的線程池或隊列就可能致使線程」飢餓死鎖」問題,此時應該使用無界的線程池,例如newCachePool
可阻塞方法大都定義了限時版本和不限時版本,如Thread.join
, blockingQueue.put
, countDownLatch.await
, selector.select
等。若等待超時,能夠把任務標識爲失敗,而後終止任務或者將任務從新放回隊列以便隨後執行
線程池大小不該該固定,應該經過配置機制提供,或者根據Runtime.getRuntime().availableProcessors()
來動態計算
若是須要執行不一樣類別的任務,而且它們之間的行爲相差很大,那麼應該考慮使用多個線程池,從而使每一個線程池能夠根據各自的工做負載來調整
計算密集型:線程池大小爲CPU數+1
I/O操做或其餘阻塞操做:線程並不會一直執行,規模應該更大,須要估算任務的等待時間與計算時間的比值
N_cpu=number of CPUs
U_cpu=指望CPU利用率,0≤U_cpu≤1
W/C=等待時間/計算時間
複製代碼
要使處理器達到指望的使用率,線程池的最優大小等於:
N_threads =N_cpu*U_cpu*(1+W/C)
複製代碼
能夠經過runtime來得到CPU數目 int cpu = Runtime.getRuntime().availableProcessors();
threadPoolExecutor容許提供一個BlockingQueue來保存等待執行的任務,基本的任務排隊方法有3種:無界隊列,有界隊列和同步移交
工廠方法newFixedPoolExecutor , newSingleThreadExecutor在默認狀況下使用一個無界的linkedBlockedQueue
ArrayBlockingQueue,有界的linkedBlockingQueue、PriorityBlockingQueue,有界隊列有助於避免資源耗盡的狀況發生
對於很是大或者無界的線程池,能夠經過synchronousQueue來避免任務排隊,以及直接將任務從生產者交給工做者線程
若沒有線程正在等待接受任務,而且線程池未滿,則新建立線程接受任務,不然根據飽和策略,這個任務將被拒絕
NewCachedThreadPool工廠方法使用了synchronousQueue
只有當線程池是無界的或者能夠拒絕任務時, synchronousQueue纔有價值
當有界隊列被填滿後,或者Executor已關閉,飽和策略開始發揮做用,ThreadPoolExextor的飽和策略能夠經過調用setRejectedExcutionHandler修改
不一樣的RejectedExcutionHandler實現
threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
複製代碼
能夠控制對某些特殊代碼庫的訪問權限,所建立的線程將與建立PrivilegedThreadFactory的線程擁有相同的訪問權限、AccessControlContext和contextClassLoader
若不使用PrivilegedThreadFactory,新建的線程將從調用exectute或submit的客戶程序中繼承訪問權限
提供子類改寫方法beforeExecute, afterExecute, terminated
Run方法返回或拋出異常,afterExecute都會執行,如有error,則不會執行afterExecute
若beforeExecute拋出RuntimeException異常,則任務不被執行,且afterExecute也不會被調用
線程須要以固定一致的順序獲取鎖
須要注意的是,雖然對象引用順序是固定的,但在兩次加鎖時其實際對象是交換的,這實際上就不是固定順序加鎖,容易致使死鎖
加鎖時能夠以惟一,不可變的值做爲加鎖的排序依據,好比帳號,id等
在制定鎖的順序時,可使用system.identityHashCode()
獲取對象hashcode值,以hashcode值爲順序加鎖,又對象可能有相同hashcode值,那麼可使用加時賽鎖,即當判斷hashcode值同樣時,就對加時賽鎖上鎖,而後再以一個固定順序上鎖
若是在持有鎖的狀況下調用某個外部方法,要檢查被調用外部方法是否也有同步操做,避免出現死鎖問題
開放調用:在調用某個方法時不須要持有鎖
在程序中應該儘可能使用開放調用,更加容易進行死鎖分析
死鎖
//Class A
public synchronized void setLocation(Point location) {
...
if (location.equals(destination))
dispatcher.notifyAvailable(this); //方法調用也加鎖
}
//class B
public synchronized void notifyAvailable(Taxi taxi) {
availableTaxis.add(taxi);
}
複製代碼
開放調用
public void setLocation(Point location) {
boolean reachedDestination; //新增中間變量
synchronized (this) {
…
reachedDestination = location.equals(destination);
}
if (reachedDestination)
dispatcher.notifyAvailable(this);
}
複製代碼
能夠指定一個超時時限,在超時後會返回一個失敗信息
Jvm經過線程轉儲幫助識別死鎖的發生,線程轉儲包括各個運行中的線程的棧追蹤信息,加鎖信息(每一個線程有哪些鎖,那些幀棧得到這些鎖,被阻塞的線程正在等待獲取哪個鎖) 線程轉儲前,jvm將在等待關係圖中經過搜索循環來找出死鎖,若發現死鎖,則獲取相應死鎖信息
線程因爲沒法訪問它所須要的資源而不能繼續執行時,就發生了飢餓,引起飢餓最多見的資源就是CPU時鐘週期
更改線程優先級且使用不當,或者在持有鎖時執行一些沒法結束的結構(無限循環,無線等待etc)
要避免使用線程優先級,由於這會增長平臺依賴性,並可能致使活躍性危險
線程不斷重複執行相同操做,但老是失敗。如在事務消息處理中,若不能成功地處理某個消息,那麼消息處理機制將回滾整個事務,並將它從新放到隊列的開頭
當多個相互協做的線程都對彼此進行響應從而修改各自的狀態,並讓線程沒法繼續執行,如行人同時互相讓路,此時引入隨機數,能夠避免問題
避免不成熟的優化,首先使程序正確,而後再提升運行速度——若是運行不夠快
在增長計算資源的狀況下,程序在理論上可以實現的最高加速比,取決於程序中可並行組件與串行組件的比重
F: 串行執行比率
N:處理器個數
Speedup≤1/(F+(1-F)/N)
複製代碼
線程調度過程當中須要訪問由操做系統和jvm共享的數據結構,其開銷包括jvm和操做系統代碼執行開銷,同時,因爲新線程切換進來時,它所須要的數據結構可能不在當前處理器本地緩存中,故還有緩存切換開銷
jvm能夠將阻塞的線程掛起並容許它被交換出去,當線程頻發發生阻塞,則CPU密集型的程序就會發生越多的上下文切換,從而增長調度開銷,下降吞吐量
在大多數通用的處理器中,上下文切換的開銷至關於5000~10000個時鐘週期,幾微秒
Unix系統的vmstat命令和windows perform工具可以報告上下文切換次數及在內核中執行時間所佔比例等信息。若內核佔用率超過10%,那麼一般表示調度活動頻繁,多是由I/O或者競爭鎖致使的阻塞形成的。
Jvm在實現阻塞行爲時,能夠
其效率取決於上下文切換的開銷以及在成功得到鎖以前須要等待的時間。 等待時間短,則選擇自旋等待,等待時間長,則選擇線程掛起
阻塞時,有兩次上下文切換,包括兩次必要的系統操做和緩存操做:
在ConcurrentHashMap中的size函數,並非直接返回一個儲存在map中的全局計數值,由於這會致使這個值成爲熱點值(每次增刪操做都會修改,即便不是同一線程,會致使鎖競爭),而是每一個分段獨立維護各自分段的計數值,計算map size值是直接枚舉分段計數值相加便可
使用併發容器,讀寫鎖,不可變對象、原子變量
工具命令UNIX vmstat/mpstat, windows perfmom
CPU沒有被充分利用緣由
線程從線程池中請求對象時若被阻塞,其阻塞開銷將是內存分配操做(新建對象)的數百倍
另外,須要確保從新使用對象時要將對象重置到正確狀態
可使用中斷來解除阻塞,在主線程中啓動含有阻塞操做的測試線程,此時測試線程阻塞中,在主線程中中斷測試線程,測試線程拋出InterruptException,測試線程執行join操做,確保測試線程走完,而後當測試線程.isAlive()==false則表示阻塞測試成功
使用Thread.getState來驗證線程可否在一個條件等待上阻塞,這並不可靠
測試因爲數據競爭而引起的錯誤,須要多個線程分別執行put和take操做
關鍵問題是:找出容易檢查的屬性,且這些屬性在發生錯誤的狀況下極有可能失敗,同時又不能使得錯誤檢查代碼人爲地限制併發性
可使用校驗和計算函數來計算入列和出列的元素校驗和,若是兩者相等,代碼正確。須要考慮是否順序敏感
測試時,可使用CyclicBarrier或者CountDownLatch來統一運行多線程測試程序,同時執行到同一位置,避免建立線程時致使的不一樣步問題
當拋出異常,或者無限循環時,測試可能永遠不會結束,此時測試程序能夠設置最大等待時間,過期不執行,後期再排查問題
測試線程數量應該多於CPU數量,則任意時刻都有線程在運行和被交換出去,增長交替行爲
不須要對象時,銷燬對象引用
Thread.yield()
或 Thread.sleep()
. (sleep 會好一些) 使用AOP提升方便性–verbose:gc
當某個類第一次被加載時,JVM經過解釋字節碼的方式執行,而熱點代碼在運行中可能會被動態編器編譯成機器代碼,則代碼將熱點代碼變爲直接執行;代碼也可能被退回解釋執行,從新編譯
-XX:+PrintCompilation
,當動態編譯時會輸出信息,驗證動態編譯是在測試運行前動態編譯器可能針對一個單線程測試程序進行一些專門優化,但只要在真實的應用程序中包含一些並行,都會使這些優化不復存在——將單線程性能測試與多線程性能測試結合在一塊兒
HotSpot中,-server
模式比-client
模式更好,-server
模式編譯器能產生更有效的代碼,並且這種模式更易於經過優化消除無用代碼
System.nanoTime
的當前值,若相等,則輸出無用且可被忽略的消息if ( f.x.hashCode() == System.nanoTime() ) {
System.out.println(" ");
}
複製代碼
若是在構造函數中啓動一個線程,那麼將可能帶來子類化問題,同時還會致使this引用從構造函數中逸出
當在一個條件隊列上等待時,object.wait
和condition.await
方法應該在檢查了狀態謂詞以後,在某個循環之中調用,同時須要持有正確的鎖,若是在調用object.wait
和 condition.await
方法時沒有持有鎖,或者不在某個循環中,或者沒有檢查某些狀態謂詞,那麼一般都是一個錯誤
若是在調用thread.sleep時持有一個鎖,那麼將致使其餘線程在很長一段時間內沒法執行,所以可能致使嚴重的活躍性問題.若是在調用object.wait
或condition.await
時持有兩個鎖,那麼也可能致使一樣的問題
必須在finally塊中釋放鎖unlock
在synchronized內置鎖中,出現死鎖時,恢復程序的惟一方式是重啓程序,而防止死鎖的惟一方式是在構造程序時避免出現不一致的鎖順序
特性:可定時,可輪詢,可中斷的鎖獲取操做,公平隊列, 非塊結構 reentrantLock.lockInterruptibly();
可中斷的鎖獲取操做
提供另外一種選擇來避免死鎖的發生
若是不能獲取鎖,會釋放已經得到的鎖,而後從新嘗試獲取全部鎖
定時鎖能根據剩餘時間來提供一個時限,若是操做不能在指定時限內完成,則程序提早結束
競爭激烈時,非公平鎖的性能高於公平鎖的性能的一個緣由是:在恢復一個被掛起的線程與該線程真正開始運行以前存在着嚴重的延遲
當持有鎖的時間較長,或者請求鎖的平均時間間隔較長,應該使用公平鎖
僅當內置鎖沒法知足需求的狀況下,才使用ReentrantLock
使用ReentrantLock場景:可定時的,可輪詢的與可中斷的鎖獲取操做,公平隊列,以及非塊結構的鎖
讀寫鎖能提升讀取多處理器系統上的數據結構的速度,而在其餘狀況下,讀寫鎖的性能比較差
當鎖由讀線程持有,而由寫線程請求鎖時,其餘讀線程只能等到寫線程使用完並釋放了寫入鎖後才能持有讀取鎖
寫線程擁有更高的優先級,寫線程能夠降級爲讀線程,而讀線程不能升級爲寫線程,不然容易致使死鎖:若是兩個讀線程試圖同時升級爲寫入鎖,那麼兩者都不會釋放讀取鎖
某些操做是基於狀態的,如不能從空隊列刪除元素,要獲取還沒有結束的任務的計算結果,必須等到隊列進入「非空」狀態或者任務進入已完成狀態
依賴狀態的操做能夠一直阻塞直到能夠繼續執行,能夠經過輪詢(循環)與休眠來實現簡單的阻塞,其思路是使用循環方式,重試直到成功,這並非一種好的實現
使用基於 LinkedBlockingQueue latch Semaphore FutureTask的條件隊列
循環判斷:
public synchronized void put(V v) throws InterruptedException {
while (isFull())
wait();
doPut(v);
notifyAll();
}
複製代碼
每當在等待一個條件時,必定要確保在條件謂詞變成真時經過某種方式發出通知
如條件謂詞 中,每當put一個元素後,都執行notifyAll(放後,儘快退出, notify和notifyAll方法都不釋放鎖,只是通知wait狀態的線程準備獲取鎖),通知喚醒在take上等待的線程
注意是喚醒哪一個鎖上的對象
對於狀態依賴的類,要麼將其等待和通知等協議徹底向子類公開,而且寫入正式文檔,要麼徹底阻止子類參與到等待和通知等過程當中
用於描述wait和notify方法的正確使用
對於每一個依賴狀態依賴的操做,與每一個修改其餘操做依賴狀態的操做,都應該定義一個入口協議和出口協議
入口協議即操做的條件謂詞,出口協議則包括檢查被該操做修改的全部狀態變量,並確認他們是否使某個其餘的條件謂詞爲真,如果,則通知相關的條件隊列
private final Condition notEmpty = lock.newCondition();
複製代碼
AbstractQueuedSynchronizer(AQS)是一個用於構建鎖和同步器的框架,許多同步器能夠經過AQS很容易並高效地構造出來,如ReentrantLock,Semaphore,FutureTask, CountDownLatch
CAS包含3個操做數——須要讀寫的內存位置V,進行比較的值A和擬寫入的新值B
當且僅當V的值等於A時,CAS纔會經過原子方式用新值B來更新V的值,不然不執行任何操做
CAS是一項樂觀的技術,它但願能成功地執行更新操做,而且若是有另外一個線程在最近一次檢查後更新了該變量,那麼CAS能檢測到這個錯誤,在本次更新時不執行更新操做
因爲CAS能檢測到來自其餘線程的干擾,所以即便不用鎖也能實現原子的讀-改-寫操做序列
經驗法則:在大多數處理器上,在無競爭的鎖獲取和釋放的「快捷代碼路徑」上的開銷,大約是CAS開銷的兩倍
12個原子變量類,分紅4組
非阻塞算法:一個線程的失敗或掛起不會致使其餘線程也失敗或掛起
無鎖算法:在算法的每一個步驟中都存在某個線程可以執行下去
構建非阻塞算法的技巧在於:將執行原子修改的範圍縮小到單個變量上
在算法執行中,值改變後又改變回原來的值,在CAS判斷時就有誤判
解決方案:不是更新某個引用的值,而是更新兩個值,包括一個引用和一個版本號
若是兩個操做之間缺少Happens-before關係,那麼JVM能夠任意地重排序
volatile變量規則:對volatile變量的寫入操做必須在對該變量的讀操做以前執行
線程啓動規則:在線程上對Thread.start()的調用必須在該線程中執行任何操做以前執行
傳遞性:A在B前完成,B在C前完成,則A在C前完成
當缺乏Happens-before關係時,就可能出現重排序問題,因此纔會出如今沒有充分同步的狀況下發佈一個對象會致使另外一個線程看到一個只被部分構造的對象
同步集合類,Hashtable 和 Vector 還有同步集合包裝類,Collections.synchronizedMap()
和Collections.synchronizedList()
提供了一個基本的有條件的線程安全的Map和List的實現。
併發集合像ConcurrentHashMap,不只提供線程安全還用鎖分離和內部分區等現代技術提升了可擴展性
死鎖的發生必須知足如下四個條件:
volatile變量能夠確保先行關係,即寫操做會發生在後續的讀操做以前, 但它並不能保證原子性。例如用volatile修飾count變量那麼 count++
操做就不是原子性的
AtomicInteger類提供的atomic方法可讓這種操做具備原子性
沒有找到好的解釋
是對全部的操做都封裝了同步方法
public int size() {
synchronized (mutex) {return m.size();}
}
複製代碼