在分佈式開發中,鎖是線程控制的重要途徑。Java爲此也提供了2種鎖機制,synchronized和lock。作爲Java愛好者,天然少不了對比一下這2種機制,也能從中學到些分佈式開發須要注意的地方。
咱們先從最簡單的入手,逐步分析這2種的區別。
1、synchronized和lock的用法區別
synchronized:在須要同步的對象中加入此控制,synchronized能夠加在方法上,也能夠加在特定代碼塊中,括號中表示須要鎖的對象。
lock:須要顯示指定起始位置和終止位置。通常使用ReentrantLock類作爲鎖,多個線程中必需要使用一個ReentrantLock類作爲對象才能保證鎖的生效。且在加鎖和解鎖處須要經過lock()和unlock()顯示指出。因此通常會在finally塊中寫unlock()以防死鎖。
用法區別比較簡單,這裏不贅述了,若是不懂的能夠看看Java基本語法。
2、synchronized和lock性能區別
synchronized是託管給JVM執行的,而lock是java寫的控制鎖的代碼。在Java1.5中,synchronize是性能低效的。由於這是一個重量級操做,須要調用操做接口,致使有可能加鎖消耗的系統時間比加鎖之外的操做還多。相比之下使用Java提供的Lock對象,性能更高一些。可是到了Java1.6,發生了變化。synchronize在語義上很清晰,能夠進行不少優化,有適應自旋,鎖消除,鎖粗化,輕量級鎖,偏向鎖等等。致使在Java1.6上synchronize的性能並不比Lock差。官方也表示,他們也更支持synchronize,在將來的版本中還有優化餘地。
說到這裏,仍是想提一下這2中機制的具體區別。據我所知,html
synchronized原始採用的是CPU悲觀鎖機制,即線程得到的是獨佔鎖。獨佔鎖意味着其餘線程只能依靠阻塞來等待線程釋放鎖。而在CPU轉換線程阻塞時會引發線程上下文切換,當有不少線程競爭鎖的時候,會引發CPU頻繁的上下文切換致使效率很低。
而Lock用的是樂觀鎖方式。所謂樂觀鎖就是,每次不加鎖而是假設沒有衝突而去完成某項操做,若是由於衝突失敗就重試,直到成功爲止。樂觀鎖實現的機制就是CAS操做(Compare and Swap)。咱們能夠進一步研究ReentrantLock的源代碼,會發現其中比較重要的得到鎖的一個方法是compareAndSetState。這裏其實就是調用的CPU提供的特殊指令。
現代的CPU提供了指令,能夠自動更新共享數據,並且可以檢測到其餘線程的干擾,而 compareAndSet() 就用這些代替了鎖定。這個算法稱做非阻塞算法,意思是一個線程的失敗或者掛起不該該影響其餘線程的失敗或掛起的算法。
我也只是瞭解到這一步,具體到CPU的算法若是感興趣的讀者還能夠在查閱下,若是有更好的解釋也能夠給我留言,我也學習下。
3、synchronized和lock用途區別
synchronized原語和ReentrantLock在通常狀況下沒有什麼區別,可是在很是複雜的同步應用中,請考慮使用ReentrantLock,特別是遇到下面2種需求的時候。
1.某個線程在等待一個鎖的控制權的這段時間須要中斷
2.須要分開處理一些wait-notify,ReentrantLock裏面的Condition應用,可以控制notify哪一個線程
3.具備公平鎖功能,每一個到來的線程都將排隊等候
下面細細道來……
先說第一種狀況,ReentrantLock的lock機制有2種,忽略中斷鎖和響應中斷鎖,這給咱們帶來了很大的靈活性。好比:若是A、B2個線程去競爭鎖,A線程獲得了鎖,B線程等待,可是A線程這個時候實在有太多事情要處理,就是一直不返回,B線程可能就會等不及了,想中斷本身,再也不等待這個鎖了,轉而處理其餘事情。這個時候ReentrantLock就提供了2種機制,第一,B線程中斷本身(或者別的線程中斷它),可是ReentrantLock不去響應,繼續讓B線程等待,你再怎麼中斷,我全當耳邊風(synchronized原語就是如此);第二,B線程中斷本身(或者別的線程中斷它),ReentrantLock處理了這個中斷,而且再也不等待這個鎖的到來,徹底放棄。(若是你沒有了解java的中斷機制,請參考下相關資料,再回頭看這篇文章,80%的人根本沒有真正理解什麼是java的中斷,呵呵)
這裏來作個試驗,首先搞一個Buffer類,它有讀操做和寫操做,爲了避免讀到髒數據,寫和讀都須要加鎖,咱們先用synchronized原語來加鎖,以下:java
1 |
public class Buffer { |
2 |
3 |
private Object lock; |
4 |
5 |
public Buffer() { |
6 |
lock = this ; |
7 |
} |
8 |
9 |
public void write() { |
10 |
synchronized (lock) { |
11 |
long startTime = System.currentTimeMillis(); |
12 |
System.out.println( "開始往這個buff寫入數據…" ); |
13 |
for (;;) // 模擬要處理很長時間 |
14 |
{ |
15 |
if (System.currentTimeMillis() |
16 |
- startTime > Integer.MAX_VALUE) |
17 |
break ; |
18 |
} |
19 |
System.out.println( "終於寫完了" ); |
20 |
} |
21 |
} |
22 |
23 |
public void read() { |
24 |
synchronized (lock) { |
25 |
System.out.println( "從這個buff讀數據" ); |
26 |
} |
27 |
} |
28 |
} |
接着,咱們來定義2個線程,一個線程去寫,一個線程去讀。算法
1 |
public class Writer extends Thread { |
2 |
3 |
private Buffer buff; |
4 |
5 |
public Writer(Buffer buff) { |
6 |
this .buff = buff; |
7 |
} |
8 |
9 |
@Override |
10 |
public void run() { |
11 |
buff.write(); |
12 |
} |
13 |
14 |
} |
15 |
16 |
public class Reader extends Thread { |
17 |
18 |
private Buffer buff; |
19 |
20 |
public Reader(Buffer buff) { |
21 |
this .buff = buff; |
22 |
} |
23 |
24 |
@Override |
25 |
public void run() { |
26 |
27 |
buff.read(); //這裏估計會一直阻塞 |
28 |
29 |
System.out.println( "讀結束" ); |
30 |
31 |
} |
32 |
33 |
} |
好了,寫一個Main來試驗下,咱們有意先去「寫」,而後讓「讀」等待,「寫」的時間是無窮的,就看「讀」能不能放棄了。緩存
1 |
public class Test { |
2 |
public static void main(String[] args) { |
3 |
Buffer buff = new Buffer(); |
4 |
5 |
final Writer writer = new Writer(buff); |
6 |
final Reader reader = new Reader(buff); |
7 |
8 |
writer.start(); |
9 |
reader.start(); |
10 |
11 |
new Thread( new Runnable() { |
12 |
13 |
@Override |
14 |
public void run() { |
15 |
long start = System.currentTimeMillis(); |
16 |
for (;;) { |
17 |
//等5秒鐘去中斷讀 |
18 |
if (System.currentTimeMillis() |
19 |
- start > 5000 ) { |
20 |
System.out.println( "不等了,嘗試中斷" ); |
21 |
reader.interrupt(); |
22 |
break ; |
23 |
} |
24 |
25 |
} |
26 |
27 |
} |
28 |
}).start(); |
29 |
30 |
} |
31 |
} |
咱們期待「讀」這個線程能退出等待鎖,但是事與願違,一旦讀這個線程發現本身得不到鎖,就一直開始等待了,就算它等死,也得不到鎖,由於寫線程要21億秒才能完成 T_T ,即便咱們中斷它,它都不來響應下,看來真的要等死了。這個時候,ReentrantLock給了一種機制讓咱們來響應中斷,讓「讀」能伸能屈,勇敢放棄對這個鎖的等待。咱們來改寫Buffer這個類,就叫BufferInterruptibly吧,可中斷緩存。多線程
1 |
import java.util.concurrent.locks.ReentrantLock; |
2 |
3 |
public class BufferInterruptibly { |
4 |
5 |
private ReentrantLock lock = new ReentrantLock(); |
6 |
7 |
public void write() { |
8 |
lock.lock(); |
9 |
try { |
10 |
long startTime = System.currentTimeMillis(); |
11 |
System.out.println( "開始往這個buff寫入數據…" ); |
12 |
for (;;) // 模擬要處理很長時間 |
13 |
{ |
14 |
if (System.currentTimeMillis() |
15 |
- startTime > Integer.MAX_VALUE) |
16 |
break ; |
17 |
} |
18 |
System.out.println( "終於寫完了" ); |
19 |
} finally { |
20 |
lock.unlock(); |
21 |
} |
22 |
} |
23 |
24 |
public void read() throws InterruptedException { |
25 |
lock.lockInterruptibly(); // 注意這裏,能夠響應中斷 |
26 |
try { |
27 |
System.out.println( "從這個buff讀數據" ); |
28 |
} finally { |
29 |
lock.unlock(); |
30 |
} |
31 |
} |
32 |
33 |
} |
固然,要對reader和writer作響應的修改併發
1 |
public class Reader extends Thread { |
2 |
3 |
private BufferInterruptibly buff; |
4 |
5 |
public Reader(BufferInterruptibly buff) { |
6 |
this .buff = buff; |
7 |
} |
8 |
9 |
@Override |
10 |
public void run() { |
11 |
12 |
try { |
13 |
buff.read(); //能夠收到中斷的異常,從而有效退出 |
14 |
} catch (InterruptedException e) { |
15 |
System.out.println( "我不讀了" ); |
16 |
} |
17 |
18 |
System.out.println( "讀結束" ); |
19 |
20 |
} |
21 |
22 |
} |
23 |
24 |
/** |
25 |
* Writer倒不用怎麼改動 |
26 |
*/ |
27 |
public class Writer extends Thread { |
28 |
29 |
private BufferInterruptibly buff; |
30 |
31 |
public Writer(BufferInterruptibly buff) { |
32 |
this .buff = buff; |
33 |
} |
34 |
35 |
@Override |
36 |
public void run() { |
37 |
buff.write(); |
38 |
} |
39 |
40 |
} |
41 |
42 |
public class Test { |
43 |
public static void main(String[] args) { |
44 |
BufferInterruptibly buff = new BufferInterruptibly(); |
45 |
46 |
final Writer writer = new Writer(buff); |
47 |
final Reader reader = new Reader(buff); |
48 |
49 |
writer.start(); |
50 |
reader.start(); |
51 |
52 |
new Thread( new Runnable() { |
53 |
54 |
@Override |
55 |
public void run() { |
56 |
long start = System.currentTimeMillis(); |
57 |
for (;;) { |
58 |
if (System.currentTimeMillis() |
59 |
- start > 5000 ) { |
60 |
System.out.println( "不等了,嘗試中斷" ); |
61 |
reader.interrupt(); |
62 |
break ; |
63 |
} |
64 |
65 |
} |
66 |
67 |
} |
68 |
}).start(); |
69 |
70 |
} |
71 |
} |
此次「讀」線程接收到了lock.lockInterruptibly()中斷,而且有效處理了這個「異常」。分佈式
至於第二種狀況,ReentrantLock能夠與Condition的配合使用,Condition爲ReentrantLock鎖的等待和釋放提供控制邏輯。
例如,使用ReentrantLock加鎖以後,能夠經過它自身的Condition.await()方法釋放該鎖,線程在此等待Condition.signal()方法,而後繼續執行下去。await方法須要放在while循環中,所以,在不一樣線程之間實現併發控制,還須要一個volatile的變量,boolean是原子性的變量。所以,通常的併發控制的操做邏輯以下所示:ide
1 |
volatile boolean isProcess = false ; |
2 |
ReentrantLock lock = new ReentrantLock(); |
3 |
Condtion processReady = lock.newCondtion(); |
4 |
thread: run() { |
5 |
lock.lock(); |
6 |
isProcess = true ; |
7 |
try { |
8 |
while (!isProcessReady) { //isProcessReady 是另一個線程的控制變量 |
9 |
processReady.await(); //釋放了lock,在此等待signal |
10 |
} catch (InterruptedException e) { |
11 |
Thread.currentThread().interrupt(); |
12 |
} finally { |
13 |
lock.unlock(); |
14 |
isProcess = false ; |
15 |
} |
16 |
} |
17 |
} |
18 |
} |
這裏只是代碼使用的一段簡化,下面咱們看Hadoop的一段摘取的源碼:oop
1 |
private class MapOutputBuffer<K extends Object, V extends Object> |
2 |
implements MapOutputCollector<K, V>, IndexedSortable { |
3 |
... |
4 |
boolean spillInProgress; |
5 |
final ReentrantLock spillLock = new ReentrantLock(); |
6 |
final Condition spillDone = spillLock.newCondition(); |
7 |
final Condition spillReady = spillLock.newCondition(); |
8 |
volatile boolean spillThreadRunning = false ; |
9 |
final SpillThread spillThread = new SpillThread(); |
10 |
... |
11 |
public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job, |
12 |
TaskReporter reporter |
13 |
) throws IOException, ClassNotFoundException { |
14 |
... |
15 |
spillInProgress = false ; |
16 |
spillThread.setDaemon( true ); |
17 |
spillThread.setName( "SpillThread" ); |
18 |
spillLock.lock(); |
19 |
try { |
20 |
spillThread.start(); |
21 |
while (!spillThreadRunning) { |
22 |
spillDone.await(); |
23 |
} |
24 |
} catch (InterruptedException e) { |
25 |
throw new IOException( "Spill thread failed to initialize" , e); |
26 |
} finally { |
27 |
spillLock.unlock(); |
28 |
} |
29 |
} |
30 |
31 |
protected class SpillThread extends Thread { |
32 |
33 |
@Override |
34 |
public void run() { |
35 |
spillLock.lock(); |
36 |
spillThreadRunning = true ; |
37 |
try { |
38 |
while ( true ) { |
39 |
spillDone.signal(); |
40 |
while (!spillInProgress) { |
41 |
spillReady.await(); |
42 |
} |
43 |
try { |
44 |
spillLock.unlock(); |
45 |
sortAndSpill(); |
46 |
} catch (Throwable t) { |
47 |
sortSpillException = t; |
48 |
} finally { |
49 |
spillLock.lock(); |
50 |
if (bufend < bufstart) { |
51 |
bufvoid = kvbuffer.length; |
52 |
} |
53 |
kvstart = kvend; |
54 |
bufstart = bufend; |
55 |
spillInProgress = false ; |
56 |
} |
57 |
} |
58 |
} catch (InterruptedException e) { |
59 |
Thread.currentThread().interrupt(); |
60 |
} finally { |
61 |
spillLock.unlock(); |
62 |
spillThreadRunning = false ; |
63 |
} |
64 |
} |
65 |
} |
代碼中spillDone 就是 spillLock的一個newCondition()。調用spillDone.await()時能夠釋放spillLock鎖,線程進入阻塞狀態,而等待其餘線程的 spillDone.signal()操做時,就會喚醒線程,從新持有spillLock鎖。
這裏能夠看出,利用lock可使咱們多線程交互變得方便,而使用synchronized則沒法作到這點。
最後呢,ReentrantLock這個類還提供了2種競爭鎖的機制:公平鎖和非公平鎖。這2種機制的意思從字面上也能瞭解個大概:即對於多線程來講,公平鎖會依賴線程進來的順序,後進來的線程後得到鎖。而非公平鎖的意思就是後進來的鎖也能夠和前邊等待鎖的線程同時競爭鎖資源。對於效率來說,固然是非公平鎖效率更高,由於公平鎖還要判斷是否是線程隊列的第一個纔會讓線程得到鎖。性能