CountDownLatch、CyclicBarrier、 Semaphore、ReentrantLock和AQS

學習自http://www.importnew.com/21889.html

https://blog.csdn.net/yanyan19880509/article/details/52349056

https://www.cnblogs.com/waterystone/p/4920797.html

1
2
3
public void await() throws InterruptedException { };   //調用await()方法的線程會被掛起,它會等待直到count值爲0才繼續執行
public boolean await( long timeout, TimeUnit unit) throws InterruptedException { };  //和await()類似,只不過等待一定的時間後count值還沒變爲0的話就會繼續執行
public void countDown() { };  //將count值減1

下面看一個例子大家就清楚CountDownLatch的用法了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class Test {
      public static void main(String[] args) {  
          final CountDownLatch latch = new CountDownLatch( 2 );
 
          new Thread(){
              public void run() {
                  try {
                      System.out.println( "子線程" +Thread.currentThread().getName()+ "正在執行" );
                     Thread.sleep( 3000 );
                     System.out.println( "子線程" +Thread.currentThread().getName()+ "執行完畢" );
                     latch.countDown();
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                 }
              };
          }.start();
 
          new Thread(){
              public void run() {
                  try {
                      System.out.println( "子線程" +Thread.currentThread().getName()+ "正在執行" );
                      Thread.sleep( 3000 );
                      System.out.println( "子線程" +Thread.currentThread().getName()+ "執行完畢" );
                      latch.countDown();
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                 }
              };
          }.start();
 
          try {
              System.out.println( "等待2個子線程執行完畢..." );
             latch.await();
             System.out.println( "2個子線程已經執行完畢" );
             System.out.println( "繼續執行主線程" );
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
      }
}

執行結果:

1
2
3
4
5
6
7
線程Thread- 0 正在執行
線程Thread- 1 正在執行
等待 2 個子線程執行完畢...
線程Thread- 0 執行完畢
線程Thread- 1 執行完畢
2 個子線程已經執行完畢
繼續執行主線程


在new CountDownLatch(2)的時候


對於 CountDownLatch 來說,state=2表示所有調用await方法的線程都應該阻塞,等到同一個latch被調用兩次countDown後才能喚醒沉睡的線程

爲了充分了解AQS裏的鏈表,這裏假設上面掛起等待的線程數爲2個


當latch被成功減到0後,AQS的state就成了0。那個成功減到0的那個線程。然後節點3被喚醒了。當節點3醒來後,發現自己是通知狀態,然後刪除自己,喚醒節點4。

上面的流程,如果落實到代碼,把 state置爲0的那個線程,會判斷head指向節點的狀態,如果爲通知狀態,則喚醒後續節點,即線程3節點,然後head指向線程3節點,head指向的舊節點會被刪除掉。當線程3恢復執行後,發現自身爲通知狀態,又會把head指向線程4節點,然後刪除自身節點,並喚醒線程4。

線程節點的狀態是什麼時候設置上去的?其實,一個線程在阻塞之前,就會把它前面的節點設置爲通知狀態,這樣便可以實現鏈式喚醒機制了。


AQS是一些同步的抽象,簡單介紹一下


兩大元素

1.volatile int state->共享資源

2.FIFO線程等待隊列,多線程爭用資源被阻塞時會進入此隊列


AQS定義兩種資源佔用方式:

1.Exclusive(獨佔,只有一個線程能執行,如ReentrantLock)

2.Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch)

自定義同步器在實現時只需要實現state的增減即可,至於具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在底層實現好了


自定義同步器的時候,主要實現以下幾種方法:

1.isHeldExclusively(),該線程是否正在獨佔資源,只有用到condition的時候才需要去實現這個方法

2.tryAcquire(int),獨佔。嘗試獲取資源,成功返回true,失敗返回fasle

3.tryRelease(int)

4.tryAcquireShared(int),共享。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。

5.tryReleaseShared(int)


以ReentrantLock爲例

state初始化爲0,表示未鎖定狀態。A線程lock()時,會調用tryAcquire()獨佔該鎖並將state+1。此後,其他線程再tryAcquire()時就會失敗,直到A線程unlock()到state=0(即釋放鎖)爲止,其它線程纔有機會獲取該鎖。當然,釋放鎖之前,A線程自己是可以重複獲取此鎖的(state會累加),這就是可重入的概念。但要注意,獲取多少次就要釋放多麼次,這樣才能保證state是能回到零態的。


以CountDownLatch以例

任務分爲N個子線程去執行,state也初始化爲N(注意N要與線程個數一致)。這N個子線程是並行執行的,每個子線程執行完後countDown()一次,state會CAS減1。等到所有子線程都執行完後(即state=0),會unpark()主調用線程,然後主調用線程就會從await()函數返回,繼續後餘動作。


深入看一下CountDownLatch

首先是他的內部類

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c - 1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

private final Sync sync;

tryAcquireShared不知道有啥用,理論上應該是來獲取資源的,但是對於CountDownLatch不需要了,因爲他只需要一開始設定state即可。不過CountDownLatch爲什麼是共享鎖我理解了,他允許多個線程同時使用CPU資源。


Node.waitStatus

-CANCELLED:值爲1,在同步隊列中等待的線程等待超時或被中斷,需要從同步隊列中取消該Node的結點,其結點的waitStatus爲CANCELLED,即結束狀態,進入該狀態後的結點將不會再變化。
-SIGNAL:值爲-1,被標識爲該等待喚醒狀態的後繼結點,當其前繼結點的線程釋放了同步鎖或被取消,將會通知該後繼結點的線程執行。說白了,就是處於喚醒狀態只要前繼結點釋放鎖,就會通知標識爲SIGNAL狀態的後繼結點的線程執行
-CONDITION:值爲-2,與Condition相關,該標識的結點處於等待隊列中,結點的線程等待在Condition上,當其他線程調用了Condition的signal()方法後,CONDITION狀態的結點將從等待隊列轉移到同步隊列中,等待獲取同步鎖。
-PROPAGATE:值爲-3,與共享模式相關,在共享模式中,該狀態標識結點的線程處於可運行狀態。
0狀態:值爲0,代表初始化狀態。

AQS在判斷狀態時,通過用waitStatus>0表示取消狀態,而waitStatus<0表示有效狀態。


CountDownLatch的-1方法

public void countDown() {
    sync.releaseShared(1);
}

AQS中

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

前面講到:releaseShared就是讓state-1,當你減到0的時候,減到0的那個線程開始喚醒線程。

這裏和代碼邏輯對應一下。

tryReleaseShared就是讓state-1。如果減到0,就返回true了,開始執行doReleaseShared,喚醒線程。

private void doReleaseShared() {
    for (;;) {
        Node h = head;//頭結點
        if (h != null && h != tail) {//如果不爲空,且不是尾節點
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {//如果是待喚醒,waitStatus改爲初始化狀態
                if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);//找到他後面的第一個節點,也就是允許使用cpu,同時解鎖了
            }
            else if (ws == 0 &&//如果是初始化狀態 希望把他變成可運行狀態
                     !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)//如果head變了,就繼續loop                   // loop if head changed
            break;
    }
}

所以doReleaseShared的整體意思是:喚醒下一個線程

unparkSuccessor

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        node.compareAndSetWaitStatus(ws, 0);//如果是可運行、可喚醒,那就置爲初始化狀態

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;//如果爲空或者cancel
        for (Node p = tail; p != node && p != null; p = p.prev)//從後往前找
            if (p.waitStatus <= 0)//目的是爲了找到一個可運行、可喚醒的
                s = p;
    }
    if (s != null)//如果找到了
        LockSupport.unpark(s.thread);//解鎖這個線程
}

整體的意思就是,找到他後面的第一個節點,也就是允許使用cpu,同時解鎖了


那麼被喚醒的線程是在哪裏喚醒下一個線程的呢?

需要從await入手

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)//如果state!=0,之前初始化已經讓state=2了
        doAcquireSharedInterruptibly(arg);
}


private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);//尾部加一個node
    try {
        for (;;) {
            final Node p = node.predecessor();//前一個node
            if (p == head) {//是head
                int r = tryAcquireShared(arg);
                if (r >= 0) {//state==0
                    setHeadAndPropagate(node, r);//那就開始,並喚醒下一個線程
                    p.next = null; // help GC
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())//鎖住咯,並讓上一個線程置爲通知狀態
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}

CountDownLatch就分析的差不多了


爲何CountDownLatch被稱爲共享鎖?

因爲CountDownLatch latch = new CountDownLatch(2),他允許了多個線程同時使用CPU資源,而ReentrantLock是單行道。在這個實例化中,鎖可以認爲有2道(state),每個線程執行完後,通過cas方式正確地減了一道。


再講一下CyclicBarrier、 Semaphore,不過就不扯源碼了


CyclicBarrier

可以實現讓一組線程等待至某個狀態之後再全部同時執行

例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Test {
     public static void main(String[] args) {
         int N = 4 ;
         CyclicBarrier barrier  = new CyclicBarrier(N);
         for ( int i= 0 ;i<N;i++)
             new Writer(barrier).start();
     }
     static class Writer extends Thread{
         private CyclicBarrier cyclicBarrier;
         public Writer(CyclicBarrier cyclicBarrier) {
             this .cyclicBarrier = cyclicBarrier;
         }
 
         @Override
         public void run() {
             System.out.println( "線程" +Thread.currentThread().getName()+ "正在寫入數據..." );
             try {
                 Thread.sleep( 5000 );      //以睡眠來模擬寫入數據操作
                 System.out.println( "線程" +Thread.currentThread().getName()+ "寫入數據完畢,等待其他線程寫入完畢" );
                 cyclicBarrier.await();
             } catch (InterruptedException e) {
                 e.printStackTrace();
             } catch (BrokenBarrierException e){
                 e.printStackTrace();
             }
             System.out.println( "所有線程寫入完畢,繼續處理其他任務..." );
         }
     }
}

執行結果:

1
2
3
4
5
6
7
8
9
10
11
12
線程Thread- 0 正在寫入數據...
線程Thread- 3 正在寫入數據...
線程Thread- 2 正在寫入數據...
線程Thread- 1 正在寫入數據...
線程Thread- 2 寫入數據完畢,等待其他線程寫入完畢
線程Thread- 0 寫入數據完畢,等待其他線程寫入完畢
線程Thread- 3 寫入數據完畢,等待其他線程寫入完畢
線程Thread- 1 寫入數據完畢,等待其他線程寫入完畢
所有線程寫入完畢,繼續處理其他任務...
所有線程寫入完畢,繼續處理其他任務...
所有線程寫入完畢,繼續處理其他任務...
所有線程寫入完畢,繼續處理其他任務...


總結CountDownLatch、CyclicBarrie的區別

前者是手動-1,另外線程先阻塞着。state減到0後,另外線程可以跑了

後者是若干線程先阻塞着,等到阻塞到若干數量後,所有線程就可以跑了


Semaphore

若一個工廠有5臺機器,但是有8個工人,一臺機器同時只能被一個工人使用,只有使用完了,其他工人才能繼續使用。那麼我們就可以通過Semaphore來實現:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Test {
     public static void main(String[] args) {
         int N = 8 ;            //工人數
         Semaphore semaphore = new Semaphore( 5 ); //機器數目
         for ( int i= 0 ;i<N;i++)
             new Worker(i,semaphore).start();
     }
 
     static class Worker extends Thread{
         private int num;
         private Semaphore semaphore;
         public Worker( int num,Semaphore semaphore){
             this .num = num;
             this .semaphore = semaphore;
         }
 
         @Override
         public void run() {
             try {
                 semaphore.acquire();
                 System.out.println( "工人" + this .num+ "佔用一個機器在生產..." );
                 Thread.sleep( 2000 );
                 System.out.println( "工人" + this .num+ "釋放出機器" );
                 semaphore.release();          
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         }
     }
}

執行結果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
工人 0 佔用一個機器在生產...
工人 1 佔用一個機器在生產...
工人 2 佔用一個機器在生產...
工人 8
9
10
11
12
13
14
15
16
工人 0 佔用一個機器在生產...
工人 1 佔用一個機器在生產...
工人 2 佔用一個機器在生產...
工人 4 佔用一個機器在生產...
工人 5 佔用一個機器在生產...
工人 0 釋放出機器
8
9
10
11
12
13
14
15
16
工人 0 佔用一個機器在生產...
工人 1 佔用一個機器在生產...
工人 2 佔用一個機器在生產...
工人 4 佔用一個機器在生產...
工人 5 佔用一個機器在生產...
工人 0 釋放出機器
工人 2 釋放出機器
工人 3 佔用一個機器在生產...
相關文章
相關標籤/搜索