Java併發

一、死鎖

產生死鎖的四個必要條件:
(1) 互斥條件:一個資源每次只能被一個進程使用。
(2) 請求與保持條件:一個進程因請求資源而阻塞時,對已得到的資源保持不放。
(3) 不剝奪條件:進程已得到的資源,在末使用完以前,不能強行剝奪。
(4) 循環等待條件:若干進程之間造成一種頭尾相接的循環等待資源關係。java

相似於下圖:
image_1bbq8dukd16jl1vs91bf6pkf6cl9.png-28.2kB服務器

甚至會有更復雜的,環狀死鎖:網絡

Thread 1 locks A, waits for B
Thread 2 locks B, waits for C
Thread 3 locks C, waits for D
Thread 4 locks D, waits for A數據結構

java代碼示例

public class DeadLock implements Runnable {  
    public int flag = 1;  
    //靜態對象是類的全部對象共享的  
    private static Object o1 = new Object(), o2 = new Object();  
    @Override  
    public void run() {  
        if (flag == 1) {  
            synchronized (o1) {  
                synchronized (o2) {  
                    System.out.println("1");  
                }  
            }  
        }  
        if (flag == 0) {  
            synchronized (o2) {  
                synchronized (o1) {  
                    System.out.println("0");  
                }  
            }  
        }  
    }  
  
    public static void main(String[] args) {  
          
        DeadLock td1 = new DeadLock();  
        DeadLock td2 = new DeadLock();  
        
        td1.flag = 1;  
        td2.flag = 0;  
        
        //td1,td2都處於可執行狀態,但JVM線程調度先執行哪一個線程是不肯定的。  
        //td2的run()可能在td1的run()以前運行  
        new Thread(td1).start();  
        new Thread(td2).start();  
  
    }  
}

加鎖順序

確保全部的線程都是按照相同的順序得到鎖,那麼死鎖就不會發生。多線程

Thread 1:
lock A
lock B併發

Thread 2:
wait for A
lock C (when A locked)dom

Thread 3:
wait for A
wait for B
wait for Cide

加鎖時限

獲取鎖的時候加一個超時時間,若一個線程沒有在給定的時限內成功得到全部須要的鎖,則會進行回退並釋放全部已經得到的鎖,而後等待一段隨機的時間再重試。性能

如下是一個例子,展現了兩個線程以不一樣的順序嘗試獲取相同的兩個鎖,在發生超時後回退並重試的場景:this

Thread 1 locks A
Thread 2 locks B

Thread 1 attempts to lock B but is blocked
Thread 2 attempts to lock A but is blocked

Thread 1's lock attempt on B times out
Thread 1 backs up and releases A as well
Thread 1 waits randomly (e.g. 257 millis) before retrying.

Thread 2's lock attempt on A times out
Thread 2 backs up and releases B as well
Thread 2 waits randomly (e.g. 43 millis) before retrying.

固然,若是有很是多的線程同一時間去競爭同一批資源,就算有超時和回退機制,仍是可能會致使這些線程重複地嘗試但卻始終得不到鎖。

在Java中不能對synchronized同步塊設置超時時間,須要建立一個自定義鎖!

死鎖檢測

每當一個線程請求鎖,或者得到了鎖,能夠在線程和鎖相關的數據結構中(map、graph等等)將其記下。當一個線程請求鎖失敗時,這個線程能夠遍歷鎖的關係圖看看是否有死鎖發生。

死鎖通常要比兩個線程互相持有對方的鎖這種狀況要複雜的多,下面是一幅關於四個線程(A,B,C和D)之間鎖佔有和請求的關係圖。像這樣的數據結構就能夠被用來檢測死鎖。
image_1bbt7jhkssup1veuhm8q29hm29.png-19.7kB

那麼當檢測出死鎖時,能夠按下面方式來處理:

  • 釋放全部鎖,回退,而且等待一段隨機的時間後重試。

  • 給這些線程設置優先級,讓一個(或幾個)線程回退,剩下的線程就像沒發生死鎖同樣繼續保持着它們須要的鎖。

二、飢餓和公平

一個線程由於CPU時間所有被其餘線程搶走而得不到CPU運行時間,這種狀態被稱之爲飢餓

解決飢餓的方案,全部線程均能公平地得到運行機會被稱之爲公平性

飢餓緣由

在Java中,下面三個常見的緣由會致使線程飢餓:

  1. 高優先級線程吞噬全部的低優先級線程的CPU時間。

  2. 線程被永久堵塞在一個等待進入同步塊的狀態
    Java的同步代碼區對哪一個線程容許進入的次序沒有任何保障。理論上存在一個試圖進入該同步區的線程處於被永久堵塞的風險,由於其餘線程老是能持續地先於它得到訪問

  3. 線程在等待一個自己(在其上調用wait())也處於永久等待完成的對象
    若是多個線程處在wait()方法執行上,而對其調用notify()不會保證哪個線程會得到喚醒,任何線程都有可能處於繼續等待的狀態。所以存在這樣一個風險:一個等待線程歷來得不到喚醒,由於其餘等待線程老是能被得到喚醒。

公平性

在java中不可能實現100%的公平性,爲了提升等待線程的公平性,咱們使用鎖方式來替代同步塊。

public class Synchronizer{
    Lock lock = new Lock();
    
    //使用lock,而不是synchronized實現同步塊
    public void doSynchronized() throws InterruptedException{
        this.lock.lock();
        //critical section, do a lot of work which takes a long time
        this.lock.unlock();
    }
}

Lock的簡單實現原理:

public class Lock{
    private boolean isLocked      = false;
    private Thread lockingThread = null;

    public synchronized void lock() throws InterruptedException{

    while(isLocked){
        wait();
    }

    isLocked = true;
    lockingThread = Thread.currentThread();

}

    public synchronized void unlock(){
        if(this.lockingThread != Thread.currentThread()){
            throw new IllegalMonitorStateException(
                "Calling thread has not locked this lock");
            }

        isLocked = false;
        lockingThread = null;
        notify();
    }
}

上面的例子能夠看到兩點:

  1. 若是多個線程同時調用lock.lock方法的話,線程將阻塞在lock方法處,由於lock方法是一個同步方法。

  2. 若是lock對象的鎖被一個線程持有,那麼其餘線程都將調用在while循環中的wait方法而阻塞。

如今在把目光集中在doSynchronized方法中,在lock和unlock之間有一段註釋,寫明瞭這一段代碼將執行很長一段時間。咱們假設這段時間比線程進入lock方法內部而且因爲lock已被鎖定而調用wait方法等待的時間長。這意味着線程大部分時間都消耗在了wait等待上而不是阻塞在lock方法上。

以前曾提到同步塊沒法保證當多個線程等待進入同步塊中時哪一個線程先進入,一樣notify方法也沒法保證在多個線程調用wait的狀況下哪一個線程先被喚醒。當前這個版本的Lock類在公平性上和以前加了synchronized關鍵字的doSynchronized方法沒什麼區別,可是咱們能夠修改它。

咱們注意到,當前版本的Lock方法是調用本身的wait方法。若是每一個線程調用不一樣對象的wait方法,那麼Lock類就能夠決定哪些對象調用notify方法,這樣就能夠選擇性的喚醒線程。

公平鎖

public class FairLock {
    private boolean           isLocked       = false;
    private Thread            lockingThread  = null;
    private List<QueueObject> waitingThreads =
            new ArrayList<QueueObject>();

  public void lock() throws InterruptedException{
    QueueObject queueObject           = new QueueObject();
    boolean     isLockedForThisThread = true;
    synchronized(this){
        waitingThreads.add(queueObject);
    }

    while(isLockedForThisThread){
      synchronized(this){
        isLockedForThisThread =
            isLocked || waitingThreads.get(0) != queueObject;
        if(!isLockedForThisThread){
          isLocked = true;
           waitingThreads.remove(queueObject);
           lockingThread = Thread.currentThread();
           return;
         }
      }
      try{
        queueObject.doWait();
      }catch(InterruptedException e){
        synchronized(this) { waitingThreads.remove(queueObject); }
        throw e;
      }
    }
  }

  public synchronized void unlock(){
    if(this.lockingThread != Thread.currentThread()){
      throw new IllegalMonitorStateException(
        "Calling thread has not locked this lock");
    }
    isLocked      = false;
    lockingThread = null;
    if(waitingThreads.size() > 0){
      waitingThreads.get(0).doNotify();
    }
  }
}
public class QueueObject {

  private boolean isNotified = false;

  public synchronized void doWait() throws InterruptedException {
    while(!isNotified){
        this.wait();
    }
    this.isNotified = false;
  }

  public synchronized void doNotify() {
    this.isNotified = true;
    this.notify();
  }

  public boolean equals(Object o) {
    return this == o;
  }
}

FairLock類會給每一個調用lock方法的線程建立一個QueueObject對象,當線程調用unlock方法時隊列中的第一個。
QueueObject出列而且調用doNotify方法激活對應的線程。這種方式能夠保證只有一個線程被喚醒而不是全部等待線程。

注意到FairLock在同步塊中設置了狀態檢測來避免失控。

QueueObject實際上就是一個信號量(semaphore),QueueObject對象內部保存了一個信號isNotified.這樣作是爲了防止信號丟失。queueObject.wait方法是被放在了synchronized(this)塊的外部來避免嵌套監視器閉環。這樣當沒有線程運行lock方法中的synchronized同步塊時其餘線程能夠調用unlock方法。

最後咱們注意到lock方法用到了try-catch塊,這樣當發生InterruptedException時線程將退出lock方法,這個時候咱們應該將對應的QueueObject對象出列。

效率
FairLock的執行效率相比Lock類要低一些。它對你的應用程序的影響取決於FairLock所保證的臨界區代碼的執行時間,這個時間越長,那麼影響就越小;同時也取決於這段臨界區代碼的執行頻率。

三、嵌套管程鎖死

嵌套管程鎖死與死鎖相似,場景以下所示:

線程1得到A對象的鎖。
線程1得到對象B的鎖(同時持有對象A的鎖)。
線程1決定等待另外一個線程的信號再繼續。
線程1調用B.wait(),從而釋放了B對象上的鎖,但仍然持有對象A的鎖。

線程2須要同時持有對象A和對象B的鎖,才能向線程1發信號。
線程2沒法得到對象A上的鎖,由於對象A上的鎖當前正被線程1持有。
線程2一直被阻塞,等待線程1釋放對象A上的鎖。

線程1一直阻塞,等待線程2的信號,所以,不會釋放對象A上的鎖,
而線程2須要對象A上的鎖才能給線程1發信號……

代碼示例:

//lock implementation with nested monitor lockout problem
public class Lock{
    protected MonitorObject monitorObject = new MonitorObject();
    protected boolean isLocked = false;

    public void lock() throws InterruptedException{
        synchronized(this){
            while(isLocked){
                synchronized(this.monitorObject){
                    this.monitorObject.wait();
                }
            }
            isLocked = true;
        }
    }

    public void unlock(){
        synchronized(this){
            this.isLocked = false;
            synchronized(this.monitorObject){
                this.monitorObject.notify();
            }
        }
    }
}

區別

在死鎖中咱們已經對死鎖有了個大概的解釋,死鎖一般是由於兩個線程獲取鎖的順序不一致形成的,線程1鎖住A,等待獲取B,線程2已經獲取了B,再等待獲取A。如死鎖避免中所說的,死鎖能夠經過老是以相同的順序獲取鎖來避免。

可是發生嵌套管程鎖死時鎖獲取的順序是一致的。線程1得到A和B,而後釋放B,等待線程2的信號。線程2須要同時得到A和B,才能向線程1發送信號。因此,一個線程在等待喚醒,另外一個線程在等待想要的鎖被釋放。

  1. 死鎖中,二個線程都在等待對方釋放鎖。

  2. 嵌套管程鎖死中,線程1持有鎖A,同時等待從線程2發來的信號,線程2須要鎖A來發信號給線程1。

四、Slipped Conditions

從一個線程檢查某一特定條件到該線程操做此條件期間,這個條件已經被其它線程改變,致使第一個線程在該條件上執行了錯誤的操做。這裏有一個簡單的例子:

public class Lock {
    private boolean isLocked = true;

    public void lock(){
      synchronized(this){
        while(isLocked){
          try{
            this.wait();
          } catch(InterruptedException e){
            //do nothing, keep waiting
          }
        }
      }

      synchronized(this){
        isLocked = true;
      }
    }

    public synchronized void unlock(){
      isLocked = false;
      this.notify();
    }
}

假如在某個時刻isLocked爲false,有兩個線程同時訪問lock方法。若是第一個線程先進入第一個同步塊,這個時候它會發現isLocked爲false,若此時容許第二個線程執行,它也進入第一個同步塊,一樣發現isLocked是false。如今兩個線程都檢查了這個條件爲false,而後它們都會繼續進入第二個同步塊中並設置isLocked爲true。

爲避免slipped conditions,條件的檢查與設置必須是原子的,也就是說,在第一個線程檢查和設置條件期間,不會有其它線程檢查這個條件。

public class Lock {
    private boolean isLocked = true;

    public void lock(){
      synchronized(this){
        while(isLocked){
          try{
            this.wait();
          } catch(InterruptedException e){
            //do nothing, keep waiting
          }
        }
        isLocked = true;
      }
    }

    public synchronized void unlock(){
      isLocked = false;
      this.notify();
    }
}

五、信號量

Semaphore(信號量) 是一個線程同步結構,用於在線程間傳遞信號,以免出現信號丟失,或者像鎖同樣用於保護一個關鍵區域。

public class Semaphore {
    private boolean signal = false;

    public synchronized void take() {
        this.signal = true;
        this.notify();
    }

    public synchronized void release() throws InterruptedException{
        while(!this.signal) wait();
        this.signal = false;
    }
}

Take 方法發出一個被存放在 Semaphore內部的信號,而Release方法則等待一個信號,當其接收到信號後,標記位 signal 被清空,而後該方法終止。

使用這個 semaphore 能夠避免錯失某些信號通知。用 take 方法來代替 notify,release 方法來代替 wait。若是某線程在調用 release 等待以前調用 take 方法,那麼調用 release 方法的線程仍然知道 take 方法已經被某個線程調用過了,由於該 Semaphore 內部保存了 take 方法發出的信號。而 wait 和 notify 方法就沒有這樣的功能。

六、阻塞隊列

阻塞隊列與普通隊列的區別在於

當隊列是空的時,從隊列中獲取元素的操做將會被阻塞。
當隊列是滿時,往隊列裏添加元素的操做會被阻塞。

試圖從空的阻塞隊列中獲取元素的線程將會被阻塞,直到其餘的線程往空的隊列插入新的元素。一樣,試圖往已滿的阻塞隊列中添加新元素的線程一樣也會被阻塞,直到其餘的線程使隊列從新變得空閒起來。
image_1bc6sq76nmnh1o9v1181hqbh99.png-11.9kB

public class BlockingQueue {

    private List queue = new LinkedList();

    private int  limit = 10;

    public BlockingQueue(int limit){
        this.limit = limit;
    }

    public synchronized void enqueue(Object item) throws InterruptedException  {
        while(this.queue.size() == this.limit) {
            wait();
        }
        
         if(this.queue.size() == 0) {
            notifyAll();
        }

         this.queue.add(item);
}

    public synchronized Object dequeue() throws InterruptedException{

         while(this.queue.size() == 0){
              wait();
        }

        if(this.queue.size() == this.limit){
            notifyAll();
        }   

        return this.queue.remove(0);
    }
}

必須注意到,在 enqueue 和 dequeue 方法內部,只有隊列的大小等於上限(limit)或者下限(0)時,才調用notifyAll方法。
若是隊列的大小既不等於上限,也不等於下限,任何線程調用 enqueue 或者 dequeue 方法時,都不會阻塞,都可以正常的往隊列中添加或者移除元素。

七、線程池

線程池(Thread Pool)對於限制應用程序中同一時刻運行的線程數頗有用。由於每啓動一個新線程都會有相應的性能開銷,每一個線程都須要給棧分配一些內存等等。

咱們能夠把併發執行的任務傳遞給一個線程池,來替代爲每一個併發執行的任務都啓動一個新的線程。只要池裏有空閒的線程,任務就會分配給一個線程執行。在線程池的內部,任務被插入一個阻塞隊列(Blocking Queue),線程池裏的線程會去取這個隊列裏的任務。當一個新任務插入隊列時,一個空閒線程就會成功的從隊列中取出任務而且執行它。

線程池常常應用在多線程服務器上。每一個經過網絡到達服務器的鏈接都被包裝成一個任務而且傳遞給線程池。線程池的線程會併發的處理鏈接上的請求。

public class PoolThread extends Thread {

  private BlockingQueue<Runnable> taskQueue = null;
  private boolean       isStopped = false;

  public PoolThread(BlockingQueue<Runnable> queue) {
    taskQueue = queue;
  }

  public void run() {
    while (!isStopped()) {
      try {
        Runnable runnable =taskQueue.take();
        runnable.run();
      } catch(Exception e) {
        // 寫日誌或者報告異常,
        // 但保持線程池運行.
      }
    }
  }

  public synchronized void toStop() {
    isStopped = true;
    this.interrupt(); // 打斷池中線程的 dequeue() 調用.
  }

  public synchronized boolean isStopped() {
    return isStopped;
  }
}
相關文章
相關標籤/搜索