控制併發線程數的Semaphore
java
1.簡介
信號量(Semaphore),有時被稱爲信號燈,是在多線程環境下使用的一種設施, 它負責協調各個線程, 以保證它們可以正確、合理的使用公共資源。編程
2.概念
Semaphore分爲單值和多值兩種,前者只能被一個線程得到,後者能夠被若干個線程得到。緩存
以一個停車場運做爲例。爲了簡單起見,假設停車場只有三個車位,一開始三個車位都是空的。這時若是同時來了五輛車,看門人容許其中三輛不受阻礙的進入,而後放下車攔,剩下的車則必須在入口等待,此後來的車也都不得不在入口處等待。這時,有一輛車離開停車場,看門人得知後,打開車攔,放入一輛,若是又離開兩輛,則又能夠放入兩輛,如此往復。多線程
在這個停車場系統中,車位是公共資源,每輛車比如一個線程,看門人起的就是信號量的做用。併發
更進一步,信號量的特性以下:信號量是一個非負整數(車位數),全部經過它的線程(車輛)都會將該整數減一(經過它固然是爲了使用資源),當該整數值爲零時,全部試圖經過它的線程都將處於等待狀態。在信號量上咱們定義兩種操做: Wait(等待) 和 Release(釋放)。 當一個線程調用Wait(等待)操做時,它要麼經過而後將信號量減一,要麼一直等下去,直到信號量大於一或超時。Release(釋放)其實是在信號量上執行加操做,對應於車輛離開停車場,該操做之因此叫作「釋放」是由於加操做其實是釋放了由信號量守護的資源。dom
在java中,還能夠設置該信號量是否採用公平模式,若是以公平方式執行,則線程將會按到達的順序(FIFO)執行,若是是非公平,則能夠後請求的有可能排在隊列的頭部。
JDK中定義以下:
Semaphore(int permits, boolean fair)
建立具備給定的許可數和給定的公平設置的Semaphore。socket
Semaphore當前在多線程環境下被擴放使用,操做系統的信號量是個很重要的概念,在進程控制方面都有應用。Java併發庫Semaphore 能夠很輕鬆完成信號量控制,Semaphore能夠控制某個資源可被同時訪問的個數,經過 acquire() 獲取一個許可,若是沒有就等待,而 release() 釋放一個許可。好比在Windows下能夠設置共享文件的最大客戶端訪問個數。ide
Semaphore實現的功能就相似廁全部5個坑,假若有10我的要上廁所,那麼同時只能有多少我的去上廁所呢?同時只能有5我的可以佔用,當5我的中 的任何一我的讓開後,其中等待的另外5我的中又有一我的能夠佔用了。另外等待的5我的中能夠是隨機得到優先機會,也能夠是按照先來後到的順序得到機會,這取決於構造Semaphore對象時傳入的參數選項。單個信號量的Semaphore對象能夠實現互斥鎖的功能,而且能夠是由一個線程得到了「鎖」,再由另外一個線程釋放「鎖」,這可應用於死鎖恢復的一些場合。性能
3.案例一:ui
下面是模擬一個鏈接池,控制同一時間最多隻能有50個線程訪問。
Timer 的優勢在於簡單易用,但因爲全部任務都是由同一個線程來調度,所以全部任務都是串行執行的,同一時間只能有一個任務在執行,前一個任務的延遲或異常都將會影響到以後的任務。
咱們關於定時/週期操做都是經過Timer來實現的。可是Timer有如下幾種危險
a. Timer是基於絕對時間的。容易受系統時鐘的影響。
b. Timer只新建了一個線程來執行全部的TimeTask。全部TimeTask可能會相關影響
c. Timer不會捕獲TimerTask的異常,只是簡單地中止。這樣勢必會影響其餘TimeTask的執行。
鑑於 Timer 的上述缺陷,Java 5 推出了基於線程池設計的ScheduledThreadPoolExecutor。其設計思想是,每個被調度的任務都會由線程池中一個線程去執行,所以任務是併發執行的,相互之間不會受到干擾。需 要注意的是,只有當任務的執行時間到來時,ScheduedExecutor 纔會真正啓動一個線程,其他時間 ScheduledExecutor 都是在輪詢任務的狀態。
有如下四個調度器的方法:
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
那麼這四個方法有什麼區別呢?其實第一個和第二個區別不大,一個是Runnable、一個是Callable,內部包裝後是同樣的效果;因此把頭兩個方法幾乎當成一種調度,那麼三種狀況分別是:
一、 進行一次延遲調度:延遲delay這麼長時間,單位爲:TimeUnit傳入的的一個基本單位,例如:TimeUnit.SECONDS屬於提供好的枚舉信息;(適合於方法1和方法2)。
二、 屢次調度,每次依照上一次預計調度時間進行調度,例如:延遲2s開始,5s一次,那麼就是二、七、十二、17,若是中間因爲某種緣由致使線程不夠用,沒有獲得調度機會,那麼接下來計算的時間會優先計算進去,由於他的排序會被排在前面,有點相似Timer中的:scheduleAtFixedRate方法,只是這裏是多線程的,它的方法名也叫:scheduleAtFixedRate,因此這個是比較好記憶的(適合方法3)
三、 屢次調度,每次按照上一次實際執行的時間進行計算下一次時間,同上,若是在第7秒沒有被獲得調度,而是第9s才獲得調度,那麼計算下一次調度時間就不是12秒,而是9+5=14s,若是再次延遲,就會延遲一個週期以上,也就會出現少調用的狀況(適合於方法3);
四、 最後補充execute方法是一次調度,指望被當即調度,時間爲空
1.阻塞隊列的概念
阻塞隊列與普通隊列的區別在於,當隊列是空的時,從隊列中獲取元素的操做將會被阻塞,或者當隊列是滿時,往隊列裏添加元素的操做會被阻塞。試圖從空的阻塞隊列中獲取元素的線程將會被阻塞,直到其餘的線程往空的隊列插入新的元素。一樣,試圖往已滿的阻塞隊列中添加新元素的線程一樣也會被阻塞,直到其餘的線程使隊列從新變得空閒起來,如從隊列中移除一個或者多個元素,或者徹底清空隊列,下圖展現瞭如何經過阻塞隊列來合做:
線程1往阻塞隊列中添加元素,而線程2從阻塞隊列中移除元素
從剛纔的描述能夠看出,發生阻塞起碼得知足下面至少一個條件: (前提:隊列是有界的)
1.從隊列裏取元素時,若是隊列爲空,則代碼一直等在這裏(即阻塞),直到隊列裏有東西了,拿到元素了,後面的代碼才能繼續
2.向隊列裏放元素時,若是隊列滿了(即放不下更多元素),則代碼也會卡住,直到隊列裏的東西被取走了(即:有空位能夠放新元素了),後面的代碼才能繼續
下面先使用Object.wait()和Object.notify()、非阻塞隊列實現生產者-消費者模式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
public
class
Test {
private
int
queueSize =
10
;
private
PriorityQueue<Integer> queue =
new
PriorityQueue<Integer>(queueSize);
public
static
void
main(String[] args) {
Test test =
new
Test();
Producer producer = test.
new
Producer();
Consumer consumer = test.
new
Consumer();
producer.start();
consumer.start();
}
class
Consumer
extends
Thread{
public
void
run() {
consume();
}
private
void
consume() {
while
(
true
){
synchronized
(queue) {
while
(queue.size() ==
0
){
try
{
System.out.println(
"隊列空,等待數據"
);
queue.wait();
}
catch
(InterruptedException e) {
e.printStackTrace();
queue.notify();
}
}
queue.poll();
//每次移走隊首元素
queue.notify();
System.out.println(
"從隊列取走一個元素,隊列剩餘"
+queue.size()+
"個元素"
);
}
}
}
}
class
Producer
extends
Thread{
public
void
run() {
produce();
}
private
void
produce() {
while
(
true
){
synchronized
(queue) {
while
(queue.size() == queueSize){
try
{
System.out.println(
"隊列滿,等待有空餘空間"
);
queue.wait();
}
catch
(InterruptedException e) {
e.printStackTrace();
queue.notify();
}
}
queue.offer(
1
);
//每次插入一個元素
queue.notify();
System.out.println(
"向隊列取中插入一個元素,隊列剩餘空間:"
+(queueSize-queue.size()));
}
}
}
}
}
|
這個是經典的生產者-消費者模式,經過阻塞隊列和Object.wait()和Object.notify()實現,wait()和notify()主要用來實現線程間通訊。
具體的線程間通訊方式(wait和notify的使用)在後續問章中會講述到。
下面是使用阻塞隊列實現的生產者-消費者模式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
public
class
Test {
private
int
queueSize =
10
;
private
ArrayBlockingQueue<Integer> queue =
new
ArrayBlockingQueue<Integer>(queueSize);
public
static
void
main(String[] args) {
Test test =
new
Test();
Producer producer = test.
new
Producer();
Consumer consumer = test.
new
Consumer();
producer.start();
consumer.start();
}
class
Consumer
extends
Thread{
public
void
run() {
consume();
}
private
void
consume() {
while
(
true
){
try
{
queue.take();
System.out.println(
"從隊列取走一個元素,隊列剩餘"
+queue.size()+
"個元素"
);
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
}
}
class
Producer
extends
Thread{
public
void
run() {
produce();
}
private
void
produce() {
while
(
true
){
try
{
queue.put(
1
);
System.out.println(
"向隊列取中插入一個元素,隊列剩餘空間:"
+(queueSize-queue.size()));
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
|
有沒有發現,使用阻塞隊列代碼要簡單得多,不須要再單獨考慮同步和線程間通訊的問題。
在併發編程中,通常推薦使用阻塞隊列,這樣實現能夠儘可能地避免程序出現意外的錯誤。
阻塞隊列使用最經典的場景就是socket客戶端數據的讀取和解析,讀取數據的線程不斷將數據放入隊列,而後解析線程不斷從隊列取數據解析。還有其餘相似的場景,只要符合生產者-消費者模型的均可以使用阻塞隊列。
3.實現原理:
這裏只貼幾段主要的代碼,體會一下思想:
1
2
3
4
5
6
7
8
|
/** Main lock guarding all access */
final
ReentrantLock lock;
/** Condition for waiting takes */
private
final
Condition notEmpty;
/** Condition for waiting puts */
private
final
Condition notFull;
|
這3個變量很重要,ReentrantLock重入鎖,notEmpty檢查不爲空的Condition 以及 notFull用來檢查隊列未滿的Condition
Condition是一個接口,裏面有二個重要的方法:
await() : Causes the current thread to wait until it is signalled or interrupted. 即阻塞當前線程,直到被通知(喚醒)或中斷
singal(): Wakes up one waiting thread. 喚醒阻塞的線程
再來看put方法:(jdk 1.8)
1
2
3
4
5
6
7
8
9
10
11
12
|
public
void
put(E e)
throws
InterruptedException {
checkNotNull(e);
final
ReentrantLock lock =
this
.lock;
lock.lockInterruptibly();
try
{
while
(count == items.length)
notFull.await();
enqueue(e);
}
finally
{
lock.unlock();
}
}
|
1.先獲取鎖
2.而後用while循環檢測元素個數是否等於items長度,若是相等,表示隊列滿了,調用notFull的await()方法阻塞線程
3.不然調用enqueue()方法添加元素
4.最後解鎖
1
2
3
4
5
6
7
8
9
10
|
private
void
enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final
Object[] items =
this
.items;
items[putIndex] = x;
if
(++putIndex == items.length)
putIndex =
0
;
count++;
notEmpty.signal();
}
|
這是添加元素的代碼(jdk 1.8),注意最後一行notEmpty.signal()方法,表示添加完元素後,調用singal()通知等待(從隊列中取元素)的線程,隊列不空(有值)啦,能夠來取東西了。
相似的take()與dequeue()方法則至關於逆過程(注:一樣都是jdk 1.8)
1
2
3
4
5
6
7
8
9
10
11
|
public
E take()
throws
InterruptedException {
final
ReentrantLock lock =
this
.lock;
lock.lockInterruptibly();
try
{
while
(count ==
0
)
notEmpty.await();
return
dequeue();
}
finally
{
lock.unlock();
}
}
|
相似的:
1. 先加鎖
2. 若是元素個數爲空,表示隊列已空,調用notEmpty的await()阻塞線程,直接隊列裏又有新元素加入爲止
3. 而後調用dequeue 從隊列裏刪除元素
4. 解鎖
dequeue方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
private
E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final
Object[] items =
this
.items;
@SuppressWarnings
(
"unchecked"
)
E x = (E) items[takeIndex];
items[takeIndex] =
null
;
if
(++takeIndex == items.length)
takeIndex =
0
;
count--;
if
(itrs !=
null
)
itrs.elementDequeued();
notFull.signal();
return
x;
}
|
倒數第2行,元素移除後,調用notFull.singnal喚醒等待(向隊列添加元素的)線程,隊列有空位了,能夠向裏面添加元素了。
1.排他鎖(互斥鎖)的概念:
synchronized,ReentrantLock這些鎖都是排他鎖,這些鎖同一時刻只容許一個線程進行訪問。
2.讀寫鎖的概念:
分爲讀鎖和寫鎖,多個讀鎖不互斥,讀鎖和寫鎖互斥,寫鎖與寫鎖互斥。
3.讀寫鎖的好處:
爲了提升性能,Java提供了讀寫鎖,在讀的地方使用讀鎖,在寫的地方使用寫鎖,靈活控制,若是沒有寫鎖的狀況下,讀是無阻塞的,在必定程度上提升了程序的執行效率。
原來使用的互斥鎖只能同時間有一個線程在運行,如今的讀寫鎖同一時刻能夠多個讀鎖同時運行,這樣的效率比原來的排他鎖(互斥鎖)效率高。
4.讀寫鎖的原理分析:
Java中讀寫鎖有個接口java.util.concurrent.locks.ReadWriteLock,也有具體的實現ReentrantReadWriteLock,
lock方法 是基於CAS 來實現的
源碼:
5.案例一:
在加入讀寫鎖以後:讀的過程當中,不會有寫
6.案例二: