而Lock用的是樂觀鎖方式。所謂樂觀鎖就是,每次不加鎖而是假設沒有衝突而去完成某項操做,若是由於衝突失敗就重試,直到成功爲止。樂觀鎖實現的機制就是CAS操做(Compare and Swap)。咱們能夠進一步研究ReentrantLock的源代碼,會發現其中比較重要的得到鎖的一個方法是compareAndSetState。這裏其實就是調用的CPU提供的特殊指令。
現代的CPU提供了指令,能夠自動更新共享數據,並且可以檢測到其餘線程的干擾,而 compareAndSet() 就用這些代替了鎖定。這個算法稱做非阻塞算法,意思是一個線程的失敗或者掛起不該該影響其餘線程的失敗或掛起的算法。
1 |
public class Buffer { |
2 |
3 |
private Object lock; |
4 |
5 |
public Buffer() { |
6 |
lock = this ; |
7 |
} |
8 |
9 |
public void write() { |
10 |
synchronized (lock) { |
11 |
long startTime = System.currentTimeMillis(); |
12 |
System.out.println( "開始往這個buff寫入數據…" ); |
13 |
for (;;) // 模擬要處理很長時間 |
14 |
{ |
15 |
if (System.currentTimeMillis() |
16 |
- startTime > Integer.MAX_VALUE) |
17 |
break ; |
18 |
} |
19 |
System.out.println( "終於寫完了" ); |
20 |
} |
21 |
} |
22 |
23 |
public void read() { |
24 |
synchronized (lock) { |
25 |
System.out.println( "從這個buff讀數據" ); |
26 |
} |
27 |
} |
28 |
} |
1 |
public class Writer extends Thread { |
2 |
3 |
private Buffer buff; |
4 |
5 |
public Writer(Buffer buff) { |
6 |
this .buff = buff; |
7 |
} |
8 |
9 |
@Override |
10 |
public void run() { |
11 |
buff.write(); |
12 |
} |
13 |
14 |
} |
15 |
16 |
public class Reader extends Thread { |
17 |
18 |
private Buffer buff; |
19 |
20 |
public Reader(Buffer buff) { |
21 |
this .buff = buff; |
22 |
} |
23 |
24 |
@Override |
25 |
public void run() { |
26 |
27 |
buff.read(); //這裏估計會一直阻塞 |
28 |
29 |
System.out.println( "讀結束" ); |
30 |
31 |
} |
32 |
33 |
} |
1 |
public class Test { |
2 |
public static void main(String[] args) { |
3 |
Buffer buff = new Buffer(); |
4 |
5 |
final Writer writer = new Writer(buff); |
6 |
final Reader reader = new Reader(buff); |
7 |
8 |
writer.start(); |
9 |
reader.start(); |
10 |
11 |
new Thread( new Runnable() { |
12 |
13 |
@Override |
14 |
public void run() { |
15 |
long start = System.currentTimeMillis(); |
16 |
for (;;) { |
17 |
//等5秒鐘去中斷讀 |
18 |
if (System.currentTimeMillis() |
19 |
- start > 5000 ) { |
20 |
System.out.println( "不等了,嘗試中斷" ); |
21 |
reader.interrupt(); |
22 |
break ; |
23 |
} |
24 |
25 |
} |
26 |
27 |
} |
28 |
}).start(); |
29 |
30 |
} |
31 |
} |
咱們期待「讀」這個線程能退出等待鎖,但是事與願違,一旦讀這個線程發現本身得不到鎖,就一直開始等待了,就算它等死,也得不到鎖,由於寫線程要21億秒才能完成 T_T ,即便咱們中斷它,它都不來響應下,看來真的要等死了。這個時候,ReentrantLock給了一種機制讓咱們來響應中斷,讓「讀」能伸能屈,勇敢放棄對這個鎖的等待。咱們來改寫Buffer這個類,就叫BufferInterruptibly吧,可中斷緩存。多線程
1 |
import java.util.concurrent.locks.ReentrantLock; |
2 |
3 |
public class BufferInterruptibly { |
4 |
5 |
private ReentrantLock lock = new ReentrantLock(); |
6 |
7 |
public void write() { |
8 |
lock.lock(); |
9 |
try { |
10 |
long startTime = System.currentTimeMillis(); |
11 |
System.out.println( "開始往這個buff寫入數據…" ); |
12 |
for (;;) // 模擬要處理很長時間 |
13 |
{ |
14 |
if (System.currentTimeMillis() |
15 |
- startTime > Integer.MAX_VALUE) |
16 |
break ; |
17 |
} |
18 |
System.out.println( "終於寫完了" ); |
19 |
} finally { |
20 |
lock.unlock(); |
21 |
} |
22 |
} |
23 |
24 |
public void read() throws InterruptedException { |
25 |
lock.lockInterruptibly(); // 注意這裏,能夠響應中斷 |
26 |
try { |
27 |
System.out.println( "從這個buff讀數據" ); |
28 |
} finally { |
29 |
lock.unlock(); |
30 |
} |
31 |
} |
32 |
33 |
} |
1 |
public class Reader extends Thread { |
2 |
3 |
private BufferInterruptibly buff; |
4 |
5 |
public Reader(BufferInterruptibly buff) { |
6 |
this .buff = buff; |
7 |
} |
8 |
9 |
@Override |
10 |
public void run() { |
11 |
12 |
try { |
13 |
buff.read(); //能夠收到中斷的異常,從而有效退出 |
14 |
} catch (InterruptedException e) { |
15 |
System.out.println( "我不讀了" ); |
16 |
} |
17 |
18 |
System.out.println( "讀結束" ); |
19 |
20 |
} |
21 |
22 |
} |
23 |
24 |
/** |
25 |
* Writer倒不用怎麼改動 |
26 |
*/ |
27 |
public class Writer extends Thread { |
28 |
29 |
private BufferInterruptibly buff; |
30 |
31 |
public Writer(BufferInterruptibly buff) { |
32 |
this .buff = buff; |
33 |
} |
34 |
35 |
@Override |
36 |
public void run() { |
37 |
buff.write(); |
38 |
} |
39 |
40 |
} |
41 |
42 |
public class Test { |
43 |
public static void main(String[] args) { |
44 |
BufferInterruptibly buff = new BufferInterruptibly(); |
45 |
46 |
final Writer writer = new Writer(buff); |
47 |
final Reader reader = new Reader(buff); |
48 |
49 |
writer.start(); |
50 |
reader.start(); |
51 |
52 |
new Thread( new Runnable() { |
53 |
54 |
@Override |
55 |
public void run() { |
56 |
long start = System.currentTimeMillis(); |
57 |
for (;;) { |
58 |
if (System.currentTimeMillis() |
59 |
- start > 5000 ) { |
60 |
System.out.println( "不等了,嘗試中斷" ); |
61 |
reader.interrupt(); |
62 |
break ; |
63 |
} |
64 |
65 |
} |
66 |
67 |
} |
68 |
}).start(); |
69 |
70 |
} |
71 |
} |
1 |
volatile boolean isProcess = false ; |
2 |
ReentrantLock lock = new ReentrantLock(); |
3 |
Condtion processReady = lock.newCondtion(); |
4 |
thread: run() { |
5 |
lock.lock(); |
6 |
isProcess = true ; |
7 |
try { |
8 |
while (!isProcessReady) { //isProcessReady 是另一個線程的控制變量 |
9 |
processReady.await(); //釋放了lock,在此等待signal |
10 |
} catch (InterruptedException e) { |
11 |
Thread.currentThread().interrupt(); |
12 |
} finally { |
13 |
lock.unlock(); |
14 |
isProcess = false ; |
15 |
} |
16 |
} |
17 |
} |
18 |
} |
1 |
private class MapOutputBuffer<K extends Object, V extends Object> |
2 |
implements MapOutputCollector<K, V>, IndexedSortable { |
3 |
... |
4 |
boolean spillInProgress; |
5 |
final ReentrantLock spillLock = new ReentrantLock(); |
6 |
final Condition spillDone = spillLock.newCondition(); |
7 |
final Condition spillReady = spillLock.newCondition(); |
8 |
volatile boolean spillThreadRunning = false ; |
9 |
final SpillThread spillThread = new SpillThread(); |
10 |
... |
11 |
public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job, |
12 |
TaskReporter reporter |
13 |
) throws IOException, ClassNotFoundException { |
14 |
... |
15 |
spillInProgress = false ; |
16 |
spillThread.setDaemon( true ); |
17 |
spillThread.setName( "SpillThread" ); |
18 |
spillLock.lock(); |
19 |
try { |
20 |
spillThread.start(); |
21 |
while (!spillThreadRunning) { |
22 |
spillDone.await(); |
23 |
} |
24 |
} catch (InterruptedException e) { |
25 |
throw new IOException( "Spill thread failed to initialize" , e); |
26 |
} finally { |
27 |
spillLock.unlock(); |
28 |
} |
29 |
} |
30 |
31 |
protected class SpillThread extends Thread { |
32 |
33 |
@Override |
34 |
public void run() { |
35 |
spillLock.lock(); |
36 |
spillThreadRunning = true ; |
37 |
try { |
38 |
while ( true ) { |
39 |
spillDone.signal(); |
40 |
while (!spillInProgress) { |
41 |
spillReady.await(); |
42 |
} |
43 |
try { |
44 |
spillLock.unlock(); |
45 |
sortAndSpill(); |
46 |
} catch (Throwable t) { |
47 |
sortSpillException = t; |
48 |
} finally { |
49 |
spillLock.lock(); |
50 |
if (bufend < bufstart) { |
51 |
bufvoid = kvbuffer.length; |
52 |
} |
53 |
kvstart = kvend; |
54 |
bufstart = bufend; |
55 |
spillInProgress = false ; |
56 |
} |
57 |
} |
58 |
} catch (InterruptedException e) { |
59 |
Thread.currentThread().interrupt(); |
60 |
} finally { |
61 |
spillLock.unlock(); |
62 |
spillThreadRunning = false ; |
63 |
} |
64 |
} |
65 |
} |
代碼中spillDone 就是 spillLock的一個newCondition()。調用spillDone.await()時能夠釋放spillLock鎖,線程進入阻塞狀態,而等待其餘線程的 spillDone.signal()操做時,就會喚醒線程,從新持有spillLock鎖。