線程編程一直是老生常談的問題,在Java中,隨着JDK的逐漸發展,JDK提供給咱們的併發模型也愈來愈多,本文摘取三例使用不一樣原理的模型,分析其大體原理。node
cow是copy-on-write的簡寫,這種模型來源於linux系統fork命令,Java中一種使用cow模型來實現的併發類是CopyOnWriteArrayList。相比於Vector,它的讀操做是無需加鎖的:linux
1
2
3
|
public
E get(
int
index) {
return
(E) elements[index];
}
|
之因此有如此神奇功效,其採起的是空間換取時間的方法,查看其add方法:算法
1
2
3
4
5
6
7
|
public
synchronized
boolean
add(E e) {
Object[] newElements =
new
Object[elements.length +
1
];
System.arraycopy(elements,
0
, newElements,
0
, elements.length);
newElements[elements.length] = e;
elements = newElements;
return
true
;
}
|
咱們注意到,CopyOnWriteArrayList的add方法是須要加鎖的,但其內部並無直接對elements數組作操做,而是先copy一份當前的數據到一個新的數組,而後對新的數組進行賦值操做。這樣作就讓get操做從同步中解脫出來。由於更改的數據並無發生在get所需的數組中。而是放生在新生成的副本中,因此不須要同步。但應該注意的是,儘管如此,get操做仍是可能會讀取到髒數據的。編程
CopyOnWriteArrayList的另外一特色是容許多線程遍歷,且其它線程更改數據並不會致使遍歷線程拋出ConcurrentModificationException
異常,來看下iterator()
,數組
1
2
3
4
|
public
Iterator<E> iterator() {
Object[] snapshot = elements;
return
new
CowIterator<E>(snapshot,
0
, snapshot.length);
}
|
這個CowIterator 是 ListIterator的子類,這個Iterator的特色是它並不支持對數據的更改操做:安全
1
2
3
4
5
6
7
8
9
|
public
void
add(E object) {
throw
new
UnsupportedOperationException();
}
public
void
remove() {
throw
new
UnsupportedOperationException();
}
public
void
set(E object) {
throw
new
UnsupportedOperationException();
}
|
這樣作的緣由也很容易理解,咱們能夠簡單地的認爲CowIterator中的snapshot是不可變數組,由於list中有數據更新都會生成新數組,而不會改變snapshot, 因此此時Iterator沒辦法再將更改的數據寫回list了。同理,list數據有更新也不會反映在CowIterator中。CowIterator只是保證其迭代過程不會發生異常。微信
CAS是Compare and Swap的簡寫,即比較與替換,CAS造做將比較和替換封裝爲一組原子操做,不會被外部打斷。這種原子操做的保證每每由處理器層面提供支持。多線程
在Java中有一個很是神奇的Unsafe類來對CAS提供語言層面的接口。但類如其名,此等神器若是使用不當,會形成武功盡失的,因此Unsafe不對外開放,想使用的話須要經過反射等技巧。這裏不對其作展開。介紹它的緣由是由於它是JDK1.8中ConcurrentHashMap的實現基礎。併發
ConcurrentHashMap
與HashMap
對數據的存儲有着類似的地方,都採用數組+鏈表+紅黑樹的方式。基本邏輯是內部使用Node來保存map中的一項key, value結構,對於hash不衝突的key,使用數組來保存Node數據,而每一項Node都是一個鏈表,用來保存hash衝突的Node,當鏈表的大小達到必定程度會轉爲紅黑樹,這樣會使在衝突數據較多時也會有比較好的查詢效率。函數
瞭解了ConcurrentHashMap
的存儲結構後,咱們來看下在這種結構下,ConcurrentHashMap
是如何實現高效的併發操做,這得益於ConcurrentHashMap
中的以下三個函數。
1
2
3
4
5
6
7
8
9
10
|
static
final
<K,V> Node<K,V> tabAt(Node<K,V>[] tab,
int
i) {
return
(Node<K,V>)U.getObjectVolatile(tab, ((
long
)i << ASHIFT) + ABASE);
}
static
final
<K,V>
boolean
casTabAt(Node<K,V>[] tab,
int
i,
Node<K,V> c, Node<K,V> v) {
return
U.compareAndSwapObject(tab, ((
long
)i << ASHIFT) + ABASE, c, v);
}
static
final
<K,V>
void
setTabAt(Node<K,V>[] tab,
int
i, Node<K,V> v) {
U.putOrderedObject(tab, ((
long
)i << ASHIFT) + ABASE, v);
}
|
其中的U就是咱們前文提到的Unsafe的一個實例,這三個函數都經過Unsafe的幾個方法保證了是原子性:
有了這三個函數就能夠保證ConcurrentHashMap
的線程安全嗎?並非的,ConcurrentHashMap
內部也使用比較多的synchronized,不過與HashTable這種對全部操做都使用synchronized不一樣,ConcurrentHashMap
只在特定的狀況下使用synchronized,來較少鎖的定的區域。來看下putVal方法(精簡版):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
final
V putVal(K key, V value,
boolean
onlyIfAbsent) {
if
(key ==
null
|| value ==
null
)
throw
new
NullPointerException();
int
hash = spread(key.hashCode());
int
binCount =
0
;
for
(Node<K,V>[] tab = table;;) {
Node<K,V> f;
int
n, i, fh;
if
(tab ==
null
|| (n = tab.length) ==
0
)
tab = initTable();
else
if
((f = tabAt(tab, i = (n -
1
) & hash)) ==
null
) {
if
(casTabAt(tab, i,
null
,
new
Node<K,V>(hash, key, value,
null
)))
break
;
// no lock when adding to embin
}
else
if
((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else
{
V oldVal =
null
;
synchronized
(f) {
....
}
}
}
addCount(1L, binCount);
return
null
;
}
|
整個put流程大體以下:
ConcurrentHashMap
是能夠多線程同時擴容的。這裏說協助的緣由在於,對於數組擴容,通常分爲兩步:1.新建一個更大的數組;2.將原數組數據copy到新數組中。對於第一步,ConcurrentHashMap
經過CAW來控制一個int變量保證新建數組這一步只會執行一次。對於第二步,ConcurrentHashMap
採用CAW + synchronized + 移動後標記 的方式來達到多線程擴容的目的。感興趣能夠查看transfer
函數。黑科技
的流程已嘗試無效,目標Node已經存在值,只能鎖住當前Node來進行put操做,固然,這裏省略了不少代碼,包括鏈表轉紅黑樹的操做等等。相比於put,get的代碼更好理解一下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
public
V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p;
int
n, eh; K ek;
int
h = spread(key.hashCode());
if
((tab = table) !=
null
&& (n = tab.length) >
0
&&
(e = tabAt(tab, (n -
1
) & h)) !=
null
) {
if
((eh = e.hash) == h) {
if
((ek = e.key) == key || (ek !=
null
&& key.equals(ek)))
return
e.val;
}
else
if
(eh <
0
)
return
(p = e.find(h, key)) !=
null
? p.val :
null
;
while
((e = e.next) !=
null
) {
if
(e.hash == h &&
((ek = e.key) == key || (ek !=
null
&& key.equals(ek))))
return
e.val;
}
}
return
null
;
}
|
還有一種實現線程安全的方式是經過將讀寫進行分離,這種方式的一種實現是LinkedBlockingQueue
。LinkedBlockingQueue
總體設計的也十分精巧,它的全局變量分爲三類:
final型變量因爲聲明後就不會被修改,因此天然線程安全,Atomic型內部採用了cas模型來保證線程安全。對於普通型變量,LinkedBlockingQueue
中只包含head與last兩個表示隊列的頭與尾。而且私有,外部沒法更改,因此,LinkedBlockingQueue
只須要保證head與last的安全便可保證真個隊列的線程安全。而且LinkedBlockingQueue
屬於FIFO型隊列,通常狀況下,讀寫會在不一樣元素上工做,因此, LinkedBlockingQueue
定義了兩個可重入鎖,巧妙的經過對head與last分別加鎖,實現讀寫分離,來實現良好的安全併發特性:
1
2
3
4
5
6
7
8
|
/** Lock held by take, poll, etc */
private
final
ReentrantLock takeLock =
new
ReentrantLock();
/** Wait queue for waiting takes */
private
final
Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private
final
ReentrantLock putLock =
new
ReentrantLock();
/** Wait queue for waiting puts */
private
final
Condition notFull = putLock.newCondition();
|
首先看下它的offer 方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
public
boolean
offer(E e) {
if
(e ==
null
)
throw
new
NullPointerException();
final
AtomicInteger count =
this
.count;
if
(count.get() == capacity)
return
false
;
int
c = -
1
;
Node<E> node =
new
Node<E>(e);
final
ReentrantLock putLock =
this
.putLock;
putLock.lock();
try
{
if
(count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if
(c +
1
< capacity)
notFull.signal();
}
}
finally
{
putLock.unlock();
}
if
(c ==
0
)
signalNotEmpty();
return
c >=
0
;
}
|
可見,在對隊列進行添加元素時,只須要對putLock進行加鎖便可,保證同一時刻只有一個線程能夠對last進行插入。一樣的,在從隊列進行提取元素時,也只須要獲取takeLock鎖來對head操做便可:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
public
E poll() {
final
AtomicInteger count =
this
.count;
if
(count.get() ==
0
)
return
null
;
E x =
null
;
int
c = -
1
;
final
ReentrantLock takeLock =
this
.takeLock;
takeLock.lock();
try
{
if
(count.get() >
0
) {
x = dequeue();
c = count.getAndDecrement();
if
(c >
1
)
notEmpty.signal();
}
}
finally
{
takeLock.unlock();
}
if
(c == capacity)
signalNotFull();
return
x;
}
|
LinkedBlockingQueue
總體仍是比較好理解的,但有幾個點須要特殊注意:
LinkedBlockingQueue
是一個阻塞隊列,當隊列無元素爲空時,全部取元素的線程會經過notEmpty 的await()方法進行等待,直到再次有數據enqueue時,notEmpty發出signal信號。對於隊列達到上限時也是同理。