本系列文章經補充和完善,已修訂整理成書《Java編程的邏輯》(馬俊昌著),由機械工業出版社華章分社出版,於2018年1月上市熱銷,讀者好評如潮!各大網店和書店有售,歡迎購買:京東自營連接 html
咱們在67節和68節實現了線程的一些基本協做機制,那是利用基本的wait/notify實現的,咱們提到,Java併發包中有一些專門的同步工具類,本節,咱們就來探討它們。java
咱們要探討的工具類包括:git
與71節介紹的顯示鎖和72節介紹的顯示條件相似,它們也都是基於AQS實現的,AQS可參看71節。在一些特定的同步協做場景中,相比使用最基本的wait/notify,顯示鎖/條件,它們更爲方便,效率更高。下面,咱們就來探討它們的基本概念、用法、用途和基本原理。github
以前章節咱們介紹了兩種鎖,66節介紹了synchronized,71節介紹了顯示鎖ReentrantLock。對於同一受保護對象的訪問,不管是讀仍是寫,它們都要求得到相同的鎖。在一些場景中,這是沒有必要的,多個線程的讀操做徹底能夠並行,在讀多寫少的場景中,讓讀操做並行能夠明顯提升性能。編程
怎麼讓讀操做可以並行,又不影響一致性呢?答案是使用讀寫鎖。在Java併發包中,接口ReadWriteLock表示讀寫鎖,主要實現類是可重入讀寫鎖ReentrantReadWriteLock。swift
ReadWriteLock的定義爲:緩存
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
複製代碼
經過一個ReadWriteLock產生兩個鎖,一個讀鎖,一個寫鎖。讀操做使用讀鎖,寫操做使用寫鎖。bash
須要注意的是,只有"讀-讀"操做是能夠並行的,"讀-寫"和"寫-寫"都不能夠。只有一個線程能夠進行寫操做,在獲取寫鎖時,只有沒有任何線程持有任何鎖才能夠獲取到,在持有寫鎖時,其餘任何線程都獲取不到任何鎖。在沒有其餘線程持有寫鎖的狀況下,多個線程能夠獲取和持有讀鎖。微信
ReentrantReadWriteLock是可重入的讀寫鎖,它有兩個構造方法,以下所示:併發
public ReentrantLock() public ReentrantLock(boolean fair) 複製代碼
fire表示是否公平,不傳遞的話是false,含義與顯式鎖一節介紹的相似,就不贅述了。
咱們看個簡單的例子,使用ReentrantReadWriteLock實現一個緩存類MyCache,代碼以下:
public class MyCache {
private Map<String, Object> map = new HashMap<>();
private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private Lock readLock = readWriteLock.readLock();
private Lock writeLock = readWriteLock.writeLock();
public Object get(String key) {
readLock.lock();
try {
return map.get(key);
} finally {
readLock.unlock();
}
}
public Object put(String key, Object value) {
writeLock.lock();
try {
return map.put(key, value);
} finally {
writeLock.unlock();
}
}
public void clear() {
writeLock.lock();
try {
map.clear();
} finally {
writeLock.unlock();
}
}
}
複製代碼
代碼比較簡單,就不贅述了。
讀寫鎖是怎麼實現的呢?讀鎖和寫鎖看上去是兩個鎖,它們是怎麼協調的?具體實現比較複雜,咱們簡述下其思路。
內部,它們使用同一個整數變量表示鎖的狀態,16位給讀鎖用,16位給寫鎖用,使用一個變量便於進行CAS操做,鎖的等待隊列其實也只有一個。
寫鎖的獲取,就是確保當前沒有其餘線程持有任何鎖,不然就等待。寫鎖釋放後,也就是將等待隊列中的第一個線程喚醒,喚醒的多是等待讀鎖的,也多是等待寫鎖的。
讀鎖的獲取不太同樣,首先,只要寫鎖沒有被持有,就能夠獲取到讀鎖,此外,在獲取到讀鎖後,它會檢查等待隊列,逐個喚醒最前面的等待讀鎖的線程,直到第一個等待寫鎖的線程。若是有其餘線程持有寫鎖,獲取讀鎖會等待。讀鎖釋放後,檢查讀鎖和寫鎖數是否都變爲了0,若是是,喚醒等待隊列中的下一個線程。
以前介紹的鎖都是限制只有一個線程能夠同時訪問一個資源。現實中,資源每每有多個,但每一個同時只能被一個線程訪問,好比,飯店的飯桌、火車上的衛生間。有的單個資源即便能夠被併發訪問,但併發訪問數多了可能影響性能,因此但願限制併發訪問的線程數。還有的狀況,與軟件的受權和計費有關,對不一樣等級的帳戶,限制不一樣的最大併發訪問數。
信號量類Semaphore就是用來解決這類問題的,它能夠限制對資源的併發訪問數,它有兩個構造方法:
public Semaphore(int permits) public Semaphore(int permits, boolean fair) 複製代碼
fire表示公平,含義與以前介紹的是相似的,permits表示許可數量。
Semaphore的方法與鎖是相似的,主要的方法有兩類,獲取許可和釋放許可,主要方法有:
//阻塞獲取許可
public void acquire() throws InterruptedException //阻塞獲取許可,不響應中斷 public void acquireUninterruptibly() //批量獲取多個許可 public void acquire(int permits) throws InterruptedException public void acquireUninterruptibly(int permits) //嘗試獲取 public boolean tryAcquire() //限定等待時間獲取 public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException //釋放許可 public void release() 複製代碼
咱們看個簡單的示例,限制併發訪問的用戶數不超過100,代碼以下:
public class AccessControlService {
public static class ConcurrentLimitException extends RuntimeException {
private static final long serialVersionUID = 1L;
}
private static final int MAX_PERMITS = 100;
private Semaphore permits = new Semaphore(MAX_PERMITS, true);
public boolean login(String name, String password) {
if (!permits.tryAcquire()) {
// 同時登陸用戶數超過限制
throw new ConcurrentLimitException();
}
// ..其餘驗證
return true;
}
public void logout(String name) {
permits.release();
}
}
複製代碼
代碼比較簡單,就不贅述了。
須要說明的是,若是咱們將permits的值設爲1,你可能會認爲它就變成了通常的鎖,不過,它與通常的鎖是不一樣的。通常鎖只能由持有鎖的線程釋放,而Semaphore表示的只是一個許可數,任意線程均可以調用其release方法。主要的鎖實現類ReentrantLock是可重入的,而Semaphore不是,每一次的acquire調用都會消耗一個許可,好比,看下面代碼段:
Semaphore permits = new Semaphore(1);
permits.acquire();
permits.acquire();
System.out.println("acquired");
複製代碼
程序會阻塞在第二個acquire調用,永遠都不會輸出"acquired"。
信號量的基本原理比較簡單,也是基於AQS實現的,permits表示共享的鎖個數,acquire方法就是檢查鎖個數是否大於0,大於則減一,獲取成功,不然就等待,release就是將鎖個數加一,喚醒第一個等待的線程。
咱們在68節使用wait/notify實現了一個簡單的門栓MyLatch,咱們提到,Java併發包中已經提供了相似工具,就是CountDownLatch。它的大概含義是指,它至關因而一個門栓,一開始是關閉的,全部但願經過該門的線程都須要等待,而後開始倒計時,倒計時變爲0後,門栓打開,等待的全部線程均可以經過,它是一次性的,打開後就不能再關上了。
CountDownLatch裏有一個計數,這個計數經過構造方法進行傳遞:
public CountDownLatch(int count) 複製代碼
多個線程能夠基於這個計數進行協做,它的主要方法有:
public void await() throws InterruptedException public boolean await(long timeout, TimeUnit unit) throws InterruptedException public void countDown() 複製代碼
await()檢查計數是否爲0,若是大於0,就等待,await()能夠被中斷,也能夠設置最長等待時間。countDown檢查計數,若是已經爲0,直接返回,不然減小計數,若是新的計數變爲0,則喚醒全部等待的線程。
在68節,咱們介紹了門栓的兩種應用場景,一種是同時開始,另外一種是主從協做。它們都有兩類線程,互相須要同步,咱們使用CountDownLatch從新演示下。
在同時開始場景中,運行員線程等待主裁判線程發出開始指令的信號,一旦發出後,全部運動員線程同時開始,計數初始爲1,運動員線程調用await,主線程調用countDown,示例代碼以下:
public class RacerWithCountDownLatch {
static class Racer extends Thread {
CountDownLatch latch;
public Racer(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
this.latch.await();
System.out.println(getName()
+ " start run "+System.currentTimeMillis());
} catch (InterruptedException e) {
}
}
}
public static void main(String[] args) throws InterruptedException {
int num = 10;
CountDownLatch latch = new CountDownLatch(1);
Thread[] racers = new Thread[num];
for (int i = 0; i < num; i++) {
racers[i] = new Racer(latch);
racers[i].start();
}
Thread.sleep(1000);
latch.countDown();
}
}
複製代碼
代碼比較簡單,就不贅述了。在主從協做模式中,主線程依賴工做線程的結果,須要等待工做線程結束,這時,計數初始值爲工做線程的個數,工做線程結束後調用countDown,主線程調用await進行等待,示例代碼以下:
public class MasterWorkerDemo {
static class Worker extends Thread {
CountDownLatch latch;
public Worker(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
// simulate working on task
Thread.sleep((int) (Math.random() * 1000));
// simulate exception
if (Math.random() < 0.02) {
throw new RuntimeException("bad luck");
}
} catch (InterruptedException e) {
} finally {
this.latch.countDown();
}
}
}
public static void main(String[] args) throws InterruptedException {
int workerNum = 100;
CountDownLatch latch = new CountDownLatch(workerNum);
Worker[] workers = new Worker[workerNum];
for (int i = 0; i < workerNum; i++) {
workers[i] = new Worker(latch);
workers[i].start();
}
latch.await();
System.out.println("collect worker results");
}
}
複製代碼
須要強調的是,在這裏,countDown的調用應該放到finally語句中,確保在工做線程發生異常的狀況下也會被調用,使主線程可以從await調用中返回。
咱們在68節使用wait/notify實現了一個簡單的集合點AssemblePoint,咱們提到,Java併發包中已經提供了相似工具,就是CyclicBarrier。它的大概含義是指,它至關因而一個柵欄,全部線程在到達該柵欄後都須要等待其餘線程,等全部線程都到達後再一塊兒經過,它是循環的,能夠用做重複的同步。
CyclicBarrier特別適用於並行迭代計算,每一個線程負責一部分計算,而後在柵欄處等待其餘線程完成,全部線程到齊後,交換數據和計算結果,再進行下一次迭代。
與CountDownLatch相似,它也有一個數字,但表示的是參與的線程個數,這個數字經過構造方法進行傳遞:
public CyclicBarrier(int parties) 複製代碼
它還有一個構造方法,接受一個Runnable參數,以下所示:
public CyclicBarrier(int parties, Runnable barrierAction) 複製代碼
這個參數表示柵欄動做,當全部線程到達柵欄後,在全部線程執行下一步動做前,運行參數中的動做,這個動做由最後一個到達柵欄的線程執行。
CyclicBarrier的主要方法就是await:
public int await() throws InterruptedException, BrokenBarrierException public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException 複製代碼
await在等待其餘線程到達柵欄,調用await後,表示本身已經到達,若是本身是最後一個到達的,就執行可選的命令,執行後,喚醒全部等待的線程,而後重置內部的同步計數,以循環使用。
await能夠被中斷,能夠限定最長等待時間,中斷或超時後會拋出異常。須要說明的是異常BrokenBarrierException,它表示柵欄被破壞了,什麼意思呢?在CyclicBarrier中,參與的線程是互相影響的,只要其中一個線程在調用await時被中斷了,或者超時了,柵欄就會被破壞,此外,若是柵欄動做拋出了異常,柵欄也會被破壞,被破壞後,全部在調用await的線程就會退出,拋出BrokenBarrierException。
咱們看一個簡單的例子,多個遊客線程分別在集合點A和B同步:
public class CyclicBarrierDemo {
static class Tourist extends Thread {
CyclicBarrier barrier;
public Tourist(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
// 模擬先各自獨立運行
Thread.sleep((int) (Math.random() * 1000));
// 集合點A
barrier.await();
System.out.println(this.getName() + " arrived A "
+ System.currentTimeMillis());
// 集合後模擬再各自獨立運行
Thread.sleep((int) (Math.random() * 1000));
// 集合點B
barrier.await();
System.out.println(this.getName() + " arrived B "
+ System.currentTimeMillis());
} catch (InterruptedException e) {
} catch (BrokenBarrierException e) {
}
}
}
public static void main(String[] args) {
int num = 3;
Tourist[] threads = new Tourist[num];
CyclicBarrier barrier = new CyclicBarrier(num, new Runnable() {
@Override
public void run() {
System.out.println("all arrived " + System.currentTimeMillis()
+ " executed by " + Thread.currentThread().getName());
}
});
for (int i = 0; i < num; i++) {
threads[i] = new Tourist(barrier);
threads[i].start();
}
}
}
複製代碼
在個人電腦上的一次輸出爲:
all arrived 1490053578552 executed by Thread-1
Thread-1 arrived A 1490053578555
Thread-2 arrived A 1490053578555
Thread-0 arrived A 1490053578555
all arrived 1490053578889 executed by Thread-0
Thread-0 arrived B 1490053578890
Thread-2 arrived B 1490053578890
Thread-1 arrived B 1490053578890
複製代碼
多個線程到達A和B的時間是同樣的,使用CyclicBarrier,達到了重複同步的目的。
CyclicBarrier與CountDownLatch可能容易混淆,咱們強調下其區別:
本節介紹了Java併發包中的一些同步協做工具:
實際中,應該優先使用這些工具,而不是手工用wait/notify或者顯示鎖/條件同步。
下一節,咱們來探討一個特殊的概念,線程局部變量ThreadLocal,它是什麼呢?
(與其餘章節同樣,本節全部代碼位於 github.com/swiftma/pro…)
未完待續,查看最新文章,敬請關注微信公衆號「老馬說編程」(掃描下方二維碼),從入門到高級,深刻淺出,老馬和你一塊兒探索Java編程及計算機技術的本質。用心原創,保留全部版權。