多線程知識梳理(1) 併發編程的藝術筆記

第三章 Java內存模型

3.1 Java內存模型的基礎

  • 通訊 在共享內存的模型裏,經過寫-讀內存中的公共狀態進行隱式通訊;在消息傳遞的併發模型裏,線程之間必須經過發送消息來進行顯示的通訊。
  • 同步 在共享內存併發模型裏,同步是顯示進行的,程序員必須顯示指定某個方法或者某段代碼須要在線程之間互斥執行;在消息傳遞的併發模型裏,因爲消息的發送必須在接收以前,所以同步是隱式進行的。

Java中,全部實例域、靜態域和數組元素都存儲在堆內存中,堆內存在線程之間共享;局部變量、方法定義參數和異常處理器參數不會在線程之間共享。 從抽象角度來看,JMM定義了線程和主內存之間的抽象關係:線程之間的共享變量存儲在主內存中,每一個線程都有一個私有的本地內存,本地內存涵蓋了緩存、寫緩衝區、寄存器以及其它的硬件和編譯器優化。 JMM經過控制主內存與每一個線程的本地內存之間的交互,來爲Java程序員提供內存可見性保證node

重排序

指編譯器和處理器爲了優化程序性能而對指令序列進行從新排序的一種手段:程序員

  • 編譯器優化的重排序:編譯器在不改變單線程程序語義的前提下,從新安排語句的執行順序。
  • 處理器的指令級並行的重排序:若是不存在數據依賴性,處理器能夠改變語句對應機器指令的執行順序。
  • 內存系統的重排序:因爲處理器使用緩存和讀/寫緩衝區,這使得加載和存儲操做看上去多是在亂序執行。

JMM的編譯器從新排序規則會禁止特定類型的編譯器重排序,對於處理器重排序,JMM的處理器重排序規則會要求Java編譯器在生成指令時,插入特定類型的內存屏障。 現代的處理器使用寫緩衝區臨時保存向內存寫入的數據,但每一個處理器上的寫緩衝區,僅僅對它所在的處理器可見。 因爲寫緩衝區僅對本身的處理器可見,它會致使處理器執行內存操做的順序可能會與內存實際的操做執行順序不一致,因爲現代的處理器都會使用寫緩衝區,所以現代的處理器都會容許對寫-讀操做進行重排序,但不容許對存在數據依賴的操做作重排序。算法

happens-before簡介

用來闡述操做之間的內存可見性,若是一個操做執行的結果須要對另外一個操做可見,那麼這兩個操做必需要存在happens-before關係,這兩個操做既能夠在一個線程以內,也能夠在不一樣線程之間,但並不等於前一個操做必需要在後一個操做以前執行編程

數據依賴性

編譯器和處理器不會改變存在數據依賴關係的兩個操做的執行順序,可是僅針對單個處理器中執行的指令序列和單個線程中執行的操做。數組

as-if-serial

不管怎麼重排序,單線程程序的執行結果不能改變。緩存

在單線程中,對存在控制依賴的操做重排序,不會改變執行結果;但在多線程程序中,對存在控制依賴的操做重排序,可能會改變程序的執行結果。安全

順序一致性

順序一致性是一個理論參考模型,在設計的時候,處理器的內存模型和編程語言的內存模型都會以順序一致性內存做爲參照。 若是程序是正確同步的,程序的執行將具備順序一致性:即程序的執行結果與該程序在順序一致性內存模型中的執行結果相同。 若是程序是正確同步的,程序的執行將具備順序一致性:即程序的執行結果該程序在順序一致性內存模型中的執行結果相同。 順序一致模型有兩大特性:bash

  • 一個線程中的全部操做必須按照程序的順序來執行。
  • 全部線程都只能看到一個單一的操做執行順序,在順序一致性內存模型中,每一個操做都必須原子執行且馬上對全部線程可見。

對於未同步或未正確同步的多線程程序,JMM只提供最小安全性:線程執行時讀取到的值,要麼是以前某個線程寫入的值,要麼是默認值。 JMM不保證未同步程序的執行結果與該程序在順序一致性模型中的執行結果一致。 未同步程序在兩個模型中的執行特徵有以下差別:數據結構

  • 順序一致性模型保證單線程內的操做會按程序的順序執行,而JMM不保證單線程內的操做會按程序的順序執行。
  • 順序一致性模型保證全部線程只能看到一致的操做執行順序,而JMM不保證全部線程能看到一致的操做執行順序。
  • JMM不保證對64位的long/double型變量的寫操做具備原子性,而順序一致性模型保證對全部內存讀/寫操做都具備原子性。

第四章 Java併發編程基礎

  • 現代操做系統調度的最小單元是線程,也叫輕量級進程,在一個進程裏能夠建立多個線程,這些線程都擁有各自的計數器、堆棧和局部變量等特性,而且可以訪問共享的內存變量。
  • 設置線程優先級時,針對頻繁阻塞(休眠或者I/O操做)的線程須要設置較高優先級,而偏重計算(須要較多CPU時間或者偏運算)的線程則設置較低的優先級,確保處理器不會被獨佔。
  • 線程在運行的生命週期中可能處於如下6種不一樣的狀態:
  • New:初始狀態,線程被建立,可是沒有調用start()方法。
  • Runnable:運行狀態,Java線程將操做系統中的就緒和運行兩種狀態統稱爲「運行中」。
  • Blocked:阻塞狀態,表示線程阻塞於鎖。
  • Waiting:等待狀態,表示線程進入等待狀態,進入該狀態表示當前線程須要等待其它線程作出一些指定動做(通知或中斷)。
  • Time_Waiting:超時等待狀態,能夠在指定的時間自行返回。
  • Terminated:終止狀態,表示當前線程已經執行完畢。
  • 中斷能夠理解爲線程的一個標識位屬性,它標識一個運行中的線程是否被其它線程進行了中斷操做。中斷比如其餘線程對該線程打了一個招呼,其餘線程經過調用該線程的interrupt()方法對其進行中斷操做。
  • 線程經過檢查自身是否被中斷來進行響應,線程經過方法isInterrupt來進行判斷是否被中斷,也能夠調用靜態方法Thread.interrupt對當前線程的中斷標識位進行復位,若是該線程已經處於終止狀態,即便該線程被中斷過,在調用該線程對象的isInterrupt時依舊返回false
  • 在拋出InterruptedException異常以前,Java虛擬機會先將該線程的中斷標識位清除。
  • 中斷狀態是線程的一個標識位,而中斷操做是一種簡便的線程間交互方式,而這種交互方式最適合用來取消或中止任務,除了中斷以外,還能夠利用一個boolean變量來控制是否須要中止任務並終止該線程。
  • Java支持多個線程同時訪問一個對象或者對象的成員變量,因爲每一個線程能夠擁有這個變量的拷貝,因此在程序的執行過程當中,一個線程看到的變量並不必定是最新的。
  • volatile能夠用來修飾字段,就是告知程序任何對該變量的訪問須要從共享內存中獲取,而對它的改變必須同步刷新回共享內存,它能保證全部線程對變量訪問的可見性。
  • synchronized能夠修飾方法或者以同步塊的形式來進行使用,它主要確保多個線程在同一時刻,只能有一個線程處於方法或者同步塊中,它保證了線程對變量訪問的可見性和排他性。
  • 任意線程對ObjectObjectsynchronized保護)的訪問,首先要得到Object的監視器,若是獲取失敗,線程進入同步隊列,線程狀態變爲Blocked,當訪問Object的前驅(得到了鎖的線程)釋放了鎖,則該釋放操做喚醒阻塞在同步隊列中的線程,使其從新嘗試對監視器的獲取。
  • 等待/通知的相關方法:
  • notify():通知一個在對象上等待的線程,使其從wait()方法返回,而返回的前提是該線程獲取到了對象上的鎖。
  • notifyAll():通知全部等待在該對象上的鎖。
  • wait():調用該方法的線程進入Waiting狀態,只有等待另外線程的通知或被中斷纔會返回,調用wait()方法後,會釋放對象的鎖。
  • wait(long):超時等待一段時間,若是沒有通知就返回。
  • wait(long, int):對於超時時間更精細粒度的控制,能夠達到納秒。
  • 兩個線程經過對象來完成交互,而對象上的waitnotify/notifyAll()的關係就如同開關信號同樣,用來完成等待方和通知方之間的交互工做。
  • 等待/通知的經典範式:
  • 等待方 (1) 獲取對象的鎖。 (2) 若是條件不知足,那麼調用對象的wait()方法,被通知後仍要檢查條件。 (3) 條件知足則執行對應的邏輯。
synchronized(對象) {
        while(條件不知足) {
            對象.wait();
        }
        對應的處理邏輯;
}
複製代碼
  • 通知方 (1) 得到對象的鎖 (2) 改變條件 (3) 通知全部等待在該對象上的線程。
synchronized(對象) {
        改變條件;
        對象.notifyAll();
}
複製代碼
  • 管道輸入/輸出流用於線程之間的數據傳輸,而傳輸的媒介爲內存,主要包括瞭如下4種實現:PipedOutputStream、PipeInputStream、PipedReader、PipedWriter,前兩種面向字節,後兩種面向字符。
  • 若是一個線程A執行了Thread.join(),其含義是:當前線程A等待Thread線程終止後,才從Thread.join返回,線程Thread除了提供join()方法外,還提供了join(long millis)join(long millis, int nanos)兩個具有超時特性的方法,若是在給定的超時時間內沒有終止,那麼將會從超時方法中返回。
  • ThreadLocal,即線程變量,是一個以ThreadLocal對象爲鍵、任意對象爲值的存儲結構,這個結構被附帶在線程上,也就是說一個線程能夠根據一個ThreadLocal對象查詢到綁定在這個線程上的一個值,能夠經過set(T)方法來設置一個值,在當前線程下再經過get()方法獲取到原先設置的值。

第五章 Java中的鎖

5.1 Lock接口

  • 鎖是用來控制多個線程訪問共享資源的方式,雖然它缺乏了隱式獲取釋放鎖的便捷性,可是卻擁有了鎖獲取與釋放的可操做性、可中斷地獲取鎖以及超時獲取鎖等多種synchronized關鍵字不具有的同步特性。多線程

  • finally塊中釋放鎖,目的是保證在獲取到鎖以後,最終可以被釋放。

  • Lock接口提供的synchronized關鍵字不具有的主要特性

  • 嘗試非阻塞地獲取鎖:當前線程嘗試獲取鎖,若是這一時刻沒有被其它線程獲取到,則成功獲取並持有鎖。

  • 能被中斷地獲取鎖:與synchronized不一樣,獲取到鎖的線程可以響應中斷,當獲取到鎖的線程被中斷時,中斷異常將會被拋出,同時鎖會被釋放。

  • 在指定的截止時間以前獲取鎖:若是截止時間到了仍舊沒法獲取鎖,則返回。

  • LockAPI

  • void lock():獲取鎖,調用該方法當前線程將會獲取鎖,當鎖得到後,從該方法返回。

  • void lockInterruptibly():可中斷地獲取鎖,該方法會響應中斷,即在鎖的獲取中能夠中斷當前線程。

  • boolean tryLock():嘗試非阻塞地獲取鎖,調用該方法後馬上返回,若是可以獲取則返回true,不然返回false

  • boolean tryLock(long time, TimeUnit unit) throws InterruptedException:當前線程在超時時間內得到了鎖;當前線程在超時時間內被中斷;超時時間結束,返回false

  • void unlock():釋放鎖。

  • Condition newCondition():獲取等待/通知組件,該組件和當前的鎖綁定,當前線程只有得到了鎖,才能調用該組件的wait()方法,而調用後,當前線程將釋放鎖。

5.2 隊列同步器

5.2.1 隊列同步器接口

  • 隊列同步器AbstractQueuedSynchronizer,是用來構建鎖或者其它同步組件的基礎框架,它使用了一個int成員變量表示同步狀態,經過內置的FIFO隊列來完成資源獲取線程的排隊工做。
  • 同步器是實現鎖的關鍵,在鎖的實現中聚合同步器,利用同步器實現鎖的語義。能夠理解兩者之間的關係:鎖是面向使用者的,它定義了使用者與鎖交互的接口,隱藏了實現細節;同步器面向鎖的實現者,它簡化了鎖的實現方式,屏蔽了同步狀態管理、線程的排隊、等待與喚醒等底層操做。鎖和同步器很好地隔離了使用者和實現者所需關注地領域。
  • 同步器的設計是基於模板方法模式,使用者須要繼承同步器並重寫指定的方法,隨後將同步器組合自定義同步組件的實現中,並調用同步器的模板方法,而這些模板方法將會調用使用者重載的方法。
  • 重寫同步器指定的方法時,須要使用同步器提供的3個方法來訪問或者修改同步狀態:
  • getState():獲取當前同步狀態。
  • setState(int newState):設置當前同步狀態。
  • compareAndSetState(int except, int update):使用CAS設置當前狀態,該方法可以保證狀態設置的原始性。
  • 同步器提供的模板方法基本上分爲如下3類:
  • 獨佔式獲取與釋放同步狀態
  • 共享式獲取與釋放同步狀態
  • 查詢同步隊列中的等待線程狀況。

5.2.2 隊列同步器的實現分析

5.2.2.1 同步隊列

  • 同步器依賴內部的同步隊列來完成同步狀態的管理,當前線程獲取同步狀態失敗時,同步器會將當前線程以及等待狀態等信息構造稱爲一個節點,並將其加入同步隊列,同時會阻塞當前線程,當同步狀態釋放時,會把首節點中的線程喚醒,使其再次嘗試獲取同步狀態。
  • 同步器中包含了兩個節點類型的引用,一個指向頭節點,而另外一個指向尾節點。
  • 當一個線程成功地獲取了同步狀態,其餘線程將沒法獲取到同步狀態,轉而被構形成爲節點並加入到同步隊列當中,而這個加入到隊列地過程必需要保證線程安全,所以同步器提供了一個基於CAS的設置尾節點的方法。
  • 同步隊列遵循FIFO,首節點是獲取同步狀態成功的節點,首節點的線程在釋放同步狀態時,將會喚醒後繼節點,然後繼節點將會在獲取同步狀態成功時將本身設置爲首節點。

5.2.2.2 獨佔式同步狀態獲取與釋放

  • 經過調用同步器的acquire(int arg)方法能夠獲取同步狀態,該方法對中斷不敏感,即因爲線程獲取同步狀態失敗而進入同步隊列後,後續對線程進行中斷操做時,線程不會從同步隊列中移除。
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
複製代碼

它的主要邏輯是:

  • (1)調用自定義同步器實現的tryAcquire方法,該方法保證線程安全的獲取同步狀態,這個方法須要隊列同步器的實現者來重寫
  • (2)若是同步狀態獲取失敗,則構造同步節點(獨佔式Node.EXCLUSIVE)並經過addWaiter(Node node)方法將該節點加入到同步隊列的尾部。
private Node addWaiter(Node mode) {
       Node node = new Node(Thread.currentThread(), mode);
       // Try the fast path of enq; backup to full enq on failure
       Node pred = tail;
       if (pred != null) {
           node.prev = pred;
           //1.確保節點可以線程安全地被添加
           if (compareAndSetTail(pred, node)) {
               pred.next = node;
               return node;
           }
       }
       //2.經過死循環來確保節點的正確添加,在"死循環"中只有經過`CAS`將節點設置爲尾節點以後,當前線程才能從該方法返回,不然當前線程不斷地進行嘗試。
       enq(node);
       return node;
   }

   private Node enq(final Node node) {
       for (;;) {
           Node t = tail;
           if (t == null) { // Must initialize
               if (compareAndSetHead(new Node()))
                   tail = head;
           } else {
               node.prev = t;
               if (compareAndSetTail(t, node)) {
                   t.next = node;
                   return t;
               }
           }
       }
   }
複製代碼
  • (3)最後調用acquireQueued(Node node, int arg)方法,使得該節點以死循環的方式獲取同步狀態。
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //1.獲得當前節點的前驅節點
                final Node p = node.predecessor();
                //2.若是當前節點的前驅節點是頭節點,只有在這種狀況下獲取同步狀態成功
                if (p == head && tryAcquire(arg)) {
                    //3.將當前節點設爲頭節點
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
複製代碼
  • 能夠看到,當前線程在「死循環」中嘗試獲取同步狀態,而只有前驅節點是頭節點纔可以嘗試獲取同步狀態,這是因爲:

    • 頭節點是成功獲取到同步狀態的節點,而頭節點的線程釋放了同步狀態後,將會喚醒其後繼節點,後繼節點的線程被喚醒後須要檢查本身的前驅節點是不是頭節點。
    • 維護同步隊列的FIFO原則,經過簡單地判斷本身的前驅是否爲頭節點,這樣就使得節點的釋放規則符合FIFO,而且也便於對過早通知的處理(過早通知是指前驅節點不是頭節點的線程因爲中斷而被喚醒
  • 當同步狀態獲取成功以後,當前線程從acquire(int arg)方法返回,若是對於鎖這種併發組件而言,表明着當前線程獲取了鎖。

  • 經過調用同步器的release(int arg)方法能夠釋放同步狀態,該方法執行時,會喚醒頭節點的後繼節點線程,unparkSuccessor(Node node)方法使用LockSupport來喚醒處於等待狀態的線程。

public final boolean release(int arg) {
      if (tryRelease(arg)) {
          Node h = head;
          if (h != null && h.waitStatus != 0)
              unparkSuccessor(h);
          return true;
      }
      return false;
  }
複製代碼
  • (4)若是獲取不到,則阻塞節點中的線程,而被阻塞線程的喚醒主要依靠前驅節點的出隊或阻塞線程被中斷來實現。

總結: 1.在獲取同步狀態時,同步器維護一個同步隊列,獲取狀態失敗的線程都會被加入到隊列中進行自旋; 2.移出隊列(或中止自旋)的條件是前驅節點爲頭節點且成功獲取了同步狀態。 3.在釋放同步狀態時,同步器調用tryRelease(int arg)方法來釋放同步狀態,而後喚醒頭節點的後繼節點。

5.2.2.3 共享式同步狀態獲取與釋放

  • 共享式獲取和獨佔式獲取最主要的區別在於同一時刻可以有多個線程同時獲取到同步狀態
  • 經過調用同步器的acquireShared(int arg)方法能夠共享式地獲取同步狀態:
public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
複製代碼

tryAcquireShared返回int類型,若是同步狀態獲取成功,那麼返回值大於等於0,不然進入自旋狀態;成功獲取到同步狀態並退出自旋狀態的條件是當前節點的前驅節點爲頭節點,而且返回值大於等於0.

  • 共享式獲取,經過調用releaseShared(int arg)方法釋放同步狀態,tryReleaseShared必需要確保同步狀態線程安全釋放,通常是經過循環或CAS來保證的,由於釋放同步狀態的操做會同時來自多個線程。
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
複製代碼

5.2.2.4 獨佔式超時獲取同步狀態

  • 經過調用同步器的doAcquireNanos(int arg, long nanosTimeout)方法能夠超時獲取同步狀態,即在指定的時間段內獲取同步狀態。
  • 在此以前,一個線程若是獲取不到鎖而被阻塞在synchronized以外,對該線程進行中斷操做,此時線程中斷的標誌位會被修改,但線程依舊會阻塞在synchronized上;若是經過acquireInterruptibly(int arg)方法獲取,若是在等待過程當中被中斷,會馬上返回,並拋出InterruptedException異常。
private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        //1.計算出截止時間.
        final long deadline = System.nanoTime() + nanosTimeout;
       //2.加入節點
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                //3.取出前驅節點
                final Node p = node.predecessor();
                //4.若是獲取成功則直接返回
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                //5.若是到了超時時間,則直接返回
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                //6.若是在自旋過程當中被中斷,那麼拋出異常返回
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
複製代碼

經過上面的代碼能夠知道,它和獨佔式獲取的區別在於未獲取到同步狀態時的處理邏輯:獨佔式獲取在獲取不到是會一直自旋等待;而超時獲取則會使當前線程等待nanosTimeout納秒,若是當前線程在這個時間內沒有獲取到同步狀態,將會從等待邏輯中自動返回。

5.2.2.5 自定義同步組件 - TwinsLock

TwinsLock只容許至多兩個線程同時訪問,超過兩個線程的訪問將會被阻塞。

public class TwinsLock implements Lock {
    
    private final Sync sync = new Sync(2);
    
    private static final class Sync extends AbstractQueuedSynchronizer {
        
        Sync(int count) {
            //初始值爲2.
            setState(count);
        }

        @Override
        protected int tryAcquireShared(int arg) {
            for(;;) {
                //1.得到當前的狀態.
                int current = getState();
                //2.newCount表示剩餘可獲取同步狀態的線程數
                int newCount = current - arg;
                //3.若是小於0,那麼返回獲取同步狀態失敗;不然經過CAS確保設置的正確性.
                if (newCount < 0 || compareAndSetState(current, newCount)) {
                    //4.當返回值大於等於0表示獲取同步狀態成功.
                    return newCount;
                }
            }
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            for (;;) {
                int current = getState();
                //將可獲取同步狀態的線程數加1.
                int newCount = current + current;
                if (compareAndSetState(current, newCount)) {
                    return true;
                }
            }
        }
    }

    @Override
    public void lock() {
        sync.acquireShared(1);
    }

    @Override
    public void unlock() {
        sync.releaseShared(1);
    }

    @Override
    public boolean tryLock() {
        return false;
    }

    @Override
    public boolean tryLock(long time, @NonNull TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        
    }

    @NonNull
    @Override
    public Condition newCondition() {
        return null;
    }
}
複製代碼

測試用例:

public static void createTwinsLock() {
        final Lock lock = new TwinsLock();
        class TwinsLockThread extends Thread {

            @Override
            public void run() {
                Log.d(TAG, "TwinsLockThread, run=" + Thread.currentThread().getName());
                while (true) {
                    lock.lock();
                    try {
                        Thread.sleep(1000);
                        Log.d(TAG, "TwinsLockThread, name=" + Thread.currentThread().getName());
                        Thread.sleep(1000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        Log.d(TAG, "TwinsLockThread, unlock=" + Thread.currentThread().getName());
                        lock.unlock();
                    }
                }
            }
        }
        for (int i = 0; i < 10; i++) {
            Thread thread = new TwinsLockThread();
            thread.start();
        }
    }
複製代碼

5.3 重入鎖

  • 重入鎖ReentrantLock表示該鎖可以支持一個線程對資源的重複加鎖。
  • 若是在絕對時間上,先對鎖獲取的請求必定先被知足,那麼這個鎖是公平的,公平地獲取鎖,也就是等待時間最長的線程最優先地獲取鎖。

5.3.1 實現重進入

重進入須要解決兩個問題:

  • 線程再次獲取鎖,鎖須要去識別獲取鎖地線程是否爲當前佔據鎖的線程,若是是,則再次獲取成功。
  • 鎖的最終釋放,線程重複n次獲取了鎖,隨後在第n次釋放該鎖後,其它線程可以獲取到該鎖。

5.3.2 公平與非公平鎖的區別

  • 公平與否是針對獲取鎖而言的,若是一個鎖是公平的,那麼鎖的獲取順序就應該符合請求的絕對時間順序,即FIFO
  • 公平鎖的區別在於加入了同步隊列中當前節點是否有前驅節點的判斷,若是該方法返回true,表示有線程比當前線程更早地請求獲取鎖,所以須要等待前驅線程獲取並釋放鎖以後才能繼續獲取鎖;而對於非公平鎖,只要CAS設置同步狀態成功便可。
  • 所以,公平鎖每次都是從同步隊列中的第一個節點獲取到鎖,而非公平鎖出現了一個線程連續獲取鎖的狀況。
  • 非公平鎖可能使線程飢餓,但其極少的線程切換,保證了更大的吞吐量。

5.4 讀寫鎖

  • 以前提到的鎖都是排它鎖,這些鎖在同一時刻只容許一個線程進行訪問,而讀寫鎖在同一時刻能夠容許多個讀線程訪問,可是在寫線程訪問時,全部的讀線程和其餘寫線程均被阻塞。讀寫鎖維護了一對鎖,一個讀鎖和一個寫鎖,經過分離讀鎖和寫鎖,使得併發性有很大提高。
  • 併發包提供的讀寫鎖的實現是ReentrantReadWrireLock,它支持公平性選擇、重進入、鎖降級(寫鎖可以降級爲讀鎖)

ReadWriteLock僅定義了獲取讀鎖和寫鎖的兩個方法,即readLockwriteLock,而其實現ReentrantReadWriteLock

  • getReadLockCount:返回當前讀鎖被獲取的次數。
  • getReadHoldCount:返回當前線程獲取讀鎖的次數。
  • isWriteLocked:判斷寫鎖是否被獲取。
  • getWriteHoldCount:返回當前線程獲取寫鎖的次數。

下面是一個讀寫鎖的簡單用例:

public class ReadWriteCache {
    
    static Map<String, Object> map = new HashMap<>();
    static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    static Lock r = rwl.readLock();
    static Lock w = rwl.writeLock();
    
    public static Object get(String key) {
        r.lock();
        try {
            return map.get(key);
        } finally {
            r.unlock();
        }
    }
    
    public static Object put(String key, Object value) {
        w.lock();
        try {
            return map.put(key, value);
        } finally {
            w.unlock();
        }
    }
    
    public static void clear() {
        w.lock();
        try {
            map.clear();
        } finally {
            w.unlock();
        }
    }
}
複製代碼

5.4.2 讀寫鎖的實現分析

  • 讀寫狀態的設計 讀寫鎖須要在同步狀態(一個整形變量,高16表示讀,低16表示寫)上維護多個讀線程和一個寫線程的狀態,使得該狀態的設計成爲讀寫鎖實現的關鍵。
  • 寫鎖的獲取與釋放 寫鎖是一個支持重進入的排它鎖,若是當前線程已經獲取了寫鎖,則增長寫狀態。若是當前線程在獲取寫鎖時,讀鎖已經被獲取,則當前線程進入等待狀態。 緣由在於:讀寫鎖要確保寫鎖的操做對讀鎖可見,若是容許讀鎖在已經被獲取的狀況下對寫鎖的獲取,那麼正在運行的其它讀線程就沒法感知到當前寫線程的操做。
  • 讀鎖的獲取與釋放 讀鎖是一個支持重進入的共享鎖,它能被多個線程同時獲取,在沒有其它寫線程訪問(或者寫狀態爲0)時,讀鎖老是被成功地獲取,而所作的也只是(線程安全)增長讀狀態。
  • 鎖降級 鎖降級是指把持住(當前擁有的)寫鎖,再獲取到讀鎖,隨後釋放(先前擁有的)寫鎖的過程。

5.6 Condition接口

Condition定義了等待/通知兩種類型的方法,當前線程調用這些方法時,須要提早獲取到Condition對象關聯的鎖,Condition是依賴Lock對象的。 當調用await()方法後,當前線程會釋放鎖並在此等待,而其餘線程調用Condition對象的signal方法,通知當前線程後,當前線程才從await方法返回,而且在返回前已經獲取了鎖。 獲取一個Condition必須經過LocknewCondition方法,下面是一個有界隊列的示例:

public class BoundedQueue<T> {

    private Object[] items;
    private int addIndex, removeIndex, count;
    private Lock lock = new ReentrantLock();
    private Condition notEmpty = lock.newCondition();
    private Condition notFull = lock.newCondition();

    public BoundedQueue(int size) {
        items = new Object[size];
    }

    public void add(T t) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length) { //若是當前隊列內的個數等於最大長度,那麼釋放鎖.
                notFull.await();
            }
            if (++addIndex == items.length) { //若是已經到了尾部,那麼從頭開始.
                addIndex = 0;
            }
            ++count;
            notEmpty.signal(); //通知阻塞在"空"條件上的線程.
        } finally {
            lock.unlock();
        }
    }

    public T remove() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {
                notEmpty.await(); //若是當前隊列的個數等於0,那麼釋放鎖.
            }
            Object x = items[removeIndex];
            if (++removeIndex == items.length) {
                removeIndex = 0;
            }
            --count;
            notFull.signal(); //通知阻塞在"滿"條件上的線程.
            return (T) x;
        } finally {
            lock.unlock();
        }
    }
}
複製代碼

Condition的方法:

  • await():當前線程進入等待狀態直到被通知signal或中斷,當前線程進入運行狀態且從await返回的狀況:

  • 其餘線程調用該ConditionsignalsignalAll方法。

  • 其它線程中斷當前線程(interrupt)。

  • 若是當前等待線程從await方法返回,那麼代表當前線程已經獲取了Condition對象所對應的鎖。

  • awaitUninerruptibly:對中斷不敏感

  • long await Nanos(long):加入了超時的判斷,返回值是(nanosTimeout - 實際耗時),若是返回值是0或者負數,那麼能夠認定爲超時。

  • boolean awaitUntil(Data):直到某個固定時間。

  • signal:喚醒一個等待在Condition上的線程。

  • signalAll:喚醒全部等待在Condition上的線程。

5.6.2 Condition的實現

ConditionObjectAbstractQueuedSynchronizer的內部類,每一個Condition對象都包含着一個隊列。

1.等待隊列

在隊列中的每一個節點都包含了一個線程的引用,該線程就是在Condition對象上等待的線程,同步隊列和等待隊列中節點的類型都是同步器的靜態內部類AbstractQueuedSynchronizer.Node。 因爲Condition的實現是同步器的內部類,所以每一個Condition實例都可以訪問同步器提供的方法,至關於每一個Condition都擁有所屬同步器的引用。 當調用await方法時,將會以當前線程構造節點,並將節點從尾部加入到等待隊列,也就是將同步隊列移動到**Condition**隊列當中。

2.等待

調用該方法的前提是當前線程必須獲取了鎖,也就是同步隊列中的首節點,它不是直接加入到等待隊列當中,而是經過addConditionWaiter()方法把當前線程構形成一個新的節點並將其加入到等待隊列當中。

3.通知

調用該方法的前提是當前線程必須獲取了鎖,接着獲取等待隊列的首節點,將其移動到同步隊列並使用LockSupport喚醒節點中的線程。 被喚醒的線程,將從await方法中的while中返回,進而調用同步器的acquireQueued方法加入到獲取同步狀態的競爭中。 ConditionsignalAll方法,至關於對等待隊列中的每一個節點均執行一次signal方法,效果就是將等待隊列中全部節點所有移動到同步隊列中,並喚醒每一個節點。

6、Java併發容器和框架

6.1 ConcurrentHashMap

ConcurrentHashMap是線程安全而且高效的HashMap,其它的相似容器有如下缺點:

  • HashMap在併發執行put操做時,會致使Entry鏈表造成環形數據結構,就會產生死循環獲取Entry
  • HashTable使用synchronized來保證線程安全,但在線程競爭激烈的狀況下HashTable的效率很是低下。 ConcurrentHashMap高效的緣由在於它採用鎖分段技術,首先將數據分紅一段一段地存儲,而後給每段數據配一把鎖,當一個線程佔用鎖而且訪問一段數據的時候,其餘段的數據也能被其餘線程訪問。

6.1.2 ConcurrentHashMap的結構

ConcurrentHashMap是由Segment數組結構和HashEntry數組結構組成:

  • Segment是一種可重入鎖,在ConcurrentHashMap裏面扮演鎖的角色;
  • HashEntry則用於存儲鍵值對數據。

一個ConcurrentHashMap裏包含一個Segment數組,它的結構和HashMap相似,是一種數組和鏈表結構。 一個Segment裏包含一個HashEntry數組,每一個HashEntry是一個鏈表結構的元素,每一個Segment守護着一個HashEntry裏的元素,當對HashEntry數組的數據進行修改時,必須首先得到與它對應的Segment鎖。

6.1.5 ConcurrentHashMap的操做

get get的高效在於整個get過程當中不須要加鎖,除非讀到的值是空纔會加鎖重讀。緣由是它的get方法將要使用的共享變量都設爲volatile,可以在線程間保持可見性,可以被多線程同時讀,而且不會讀到過時的值,例如用於統計當前Segment大小的count字段和用於存儲值的HashEntryvalueput put方法裏須要對共享變量進行寫入操做,因此爲了線程安全,在操做共享變量以前必須加鎖,put首先定位到Segment,而後在Segment裏進行插入操做。 size 先嚐試2次經過不鎖住Segment的方式來統計各個Segment的大小,若是統計的過程當中,容器的count發生了變化,則再用加鎖的方式來統計全部Segment的大小。

6.2 ConcurrentLinkedQueue

ConcurrentLinkedQueue是一個基於連接節點的無界線程安全隊列,它採用先進先出的規則對節點進行排序,它採用CAS算法來實現。

6.2.1 入隊列

入隊主要作兩件事情:

  • 將入隊節點設置成當前隊列尾節點的下一個節點。
  • 更新tail節點,若是tail節點的next節點不爲空,則將入隊節點設置成tail節點;若是tail節點的next節點爲空,則將入隊節點設置成tailnext節點。

在多線程狀況下,若是有一個線程正在入隊,那麼它必須先獲取尾節點,而後設置尾節點的下一個節點爲入隊節點,但這時可能有另一個線程插隊了,那麼隊列的尾節點就會發生變化,這時第一個線程要暫停入隊操做,而後從新獲取尾節點。 整個入隊操做主要作兩件事:

  • 定位出尾節點。
  • 使用CAS算法將入隊節點設置成尾節點的next節點,如不成功則重試。

6.3 阻塞隊列

6.3.1 阻塞隊列

阻塞隊列是一個支持兩個附加操做的隊列,這兩個附加的操做支持阻塞的插入和移除方法:

  • 當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿。
  • 當隊列空時,獲取元素的線程會等待隊列爲空。

在阻塞隊列不可用時,附加操做提供了4種處理方式:拋出異常、返回特殊值、一直阻塞、超時退出。每種方式經過調用不一樣的方法來實現。 Java裏面提供了7種阻塞隊列。

6.4 Fork/Join框架

用於並行執行任務的框架,是把一個大任務分割成若干個小任務,最終彙總每一個小任務結果後獲得大人物結果的框架。 Fork/Join使用兩個類來完成事情:

  • ForkJoinTask:它提供了fork()join()操做的機制,一般狀況下,咱們繼承它的子類:有返回結果的RecursiveTask和沒有返回結果的RecursiveAction
  • ForkJoinPoolForkJoinTask須要經過ForkJoinPool來添加。 ForkJoinTask在執行的時候可能會拋出異常,可是咱們沒有辦法在主線程裏直接捕獲異常,因此ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務是否已經拋出異常或已經取消了。 ForkJoinPoolForkJoinTask數組和ForkJoinWorkerThread數組組成,ForkJoinTask數組負責將存放程序提交給ForkJoinPool的任務,而ForkJoinWorkerThread數組負責執行這些任務。

7、Java中的13個原子操做類

Atomic包裏提供了:原子更新基本類型、原子更新數組、原子更新引用和原子更新屬性。

7.1 原子更新基本類型:

  • AtomicBoolean
  • AtomicInteger
  • AtomicLong

基本方法:

  • int addAndGet(int delta):以原子方式將輸入的值與當前的值相加,並返回結果。
  • boolean compareAndSet(int expect, int update):若是當前的數值等於預期值,則以原子方式將該值設置爲輸入的值。
  • int getAndIncrement():以原子方式加1,並返回自增前的值。
  • void lazySet(int newValue):最終會設置成newValue,可能會致使其餘線程在以後的一小段時間內仍是讀到舊值。
  • int getAndSet(int newValue):以原子方式設置爲newValue的值,並返回舊值。

7.2 原子更新引用類型

  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReferenceArray

基本方法:

  • int addAndGet(int i, int delta):以原子方式將輸入值和索引i的元素相加。
  • boolean compareAndSet(int i, int expect, int update):若是當前值等於預期值,則以原子方式將數組位置i的元素設置成update值。

7.3 原子更新引用類型

用於原子更新多個變量,提供了3種類型:

  • AtomicReference:原子更新引用類型。
  • AtomicReferenceFieldUpdater:原子更新引用類型裏的字段。
  • AtomicMarkableReference:原子更新帶有標記位的引用類型。

7.4 原子更新字段類

  • AtomicIntegerFieldUpdater:原子更新整形的字段的更新器。
  • AtomicLongFieldUpdater:原子更新長整形字段的更新器。
  • AtomicStampedReference:原子更新帶有版本號的引用類型。

原子地更新字段須要兩步:

  • 由於原子更新字段類都是抽象類,每次使用的時候必須使用靜態方法newUpdater建立一個更新器,而且須要設置想要更新的類和屬性。
  • 更新類的字段必須使用public volatile來修飾。

8、Java中的併發工具類

9、Java中的線程池

線程池的優勢:下降資源消耗,提升響應速度,提升線程的可管理性。

9.1 線程池的實現原理

線程池的處理流程以下:

  • 判斷核心線程池是否已滿,若是不是,則建立一個新的工做線程來執行任務;若是已滿,則進入下個流程。
  • 判斷工做隊列是否已滿,若是不是,則將提交的任務存儲在工做隊列裏;若是已滿,則進入下個流程。
  • 判斷線程池的線程是否都處於工做狀態,若是沒有,則建立一個新的工做線程;若是已滿,則交給飽和策略來處理。
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { //1.添加進入核心線程. if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { //2.添加進入隊列. int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) //3.添加進入非核心線程. reject(command); } private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; } 複製代碼

在以上的三步中,除了加入隊列不用獲取全局鎖之外,其它兩種狀況都須要獲取,爲了儘量地避免獲取全局鎖,在ThreadPoolExecutor完成預熱以後(當前運行的線程數大於corePoolSize),幾乎全部的execute方法調用都是加入到隊列當中。

9.2 線程池的使用

9.2.1 線程池的建立

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
複製代碼
  • corePoolSize:當提交一個任務到線程池時,線程池會建立一個線程來執行任務,即便其它空閒的基本線程可以執行新任務也會建立。
  • runnableTaskQueue:用於保存等待執行的任務的阻塞隊列,能夠選擇:
  • ArrayBlockingQueue:基於數組結構的有界阻塞隊列。
  • LinkedBlockingQueue:基於鏈表結構的阻塞隊列,吞吐量高於前者。
  • SynchronousQueue:不存儲元素的阻塞隊列,每一個插入操做必須等待另外一個線程調用了移除操做,靜態工廠方法Executors.newCachedThreadPool使用了這個隊列。
  • PriorityBlockingQueue:一個具備優先級的無限阻塞隊列。
  • maxPoolSize:容許建立的最大線程數。
  • ThreadFactory:用於設置建立線程的工廠。
  • RejectExecutionHandler:飽和策略。
  • keepAliveTime:線程池的工做線程空閒後,保持存活的時間。
  • TimeUnit:線程保持活動的單位。

9.2.2 向線程池提交任務

  • execute(Runnable runnable):提交不須要返回值的任務。
  • Future<Object> future = executor.submit(haveReturnValuetask):用於提交須要返回值的任務,線程池會返回一個future類型任務,能夠用它來判斷任務是否執行成功,而且能夠經過get方法來獲取返回值,get方法會阻塞當前線程直到任務完成。

9.2.3 關閉線程池

  • shutdownNow:首先將線程池的狀態設爲STOP,而後嘗試中止全部的正在執行或暫停任務的線程,並返回等待執行任務的列表。
  • shutdown:將線程池的狀態置爲SHUTDOWN,而後中斷全部沒有正在執行任務的線程。

10、Executor框架

(1)在上層,Java多線程程序一般把應用分解爲若干個任務,而後使用用戶級的調度器(Executor框架)將這些任務映射爲固定數量的線程。 (2)在HotSpot VM的線程模型中,Java線程再被一對一映射爲本地操做系統線程,Java線程啓動時會建立一個本地操做系統線程,當該線程終止時,這個操做系統線程也會被回收。 (3)操做系統會調度全部線程並將它們分配給可用的CPU

Executor框架

由三個部分組成:

  • 任務,即Runnable接口或Callable接口。
  • 任務的執行,包括核心接口Executor,以及繼承自ExecutorExecutorService,還有它的兩個關鍵類ThreadPoolExecutor(用來執行任務)和ScheduledThreadPoolExecutor(能夠在給定的延遲後運行命令,或者按期執行命令)。
  • 異步計算的結果,包括接口Future和實現類FutureTask

10.2 ThreadPoolExecutor詳解

經過工具類Executors,能夠建立如下三種類型的ThreadPoolExecutor,調用靜態建立方法以後,會返回ExecutorService

  • FixedThreadPool 可重用固定線程數的線程池;若是當前運行的線程數少於corePoolSize,則建立新線程來執行任務;若是等於corePoolSize,將任務加入到無界隊列LinkedBlockingQueue當中;多餘的空閒線程將會被當即終止。
  • SingleThreadPool 單個woker線程的executorcorePoolSizemaximumPoolSize爲1;採用無界隊列做爲工做隊列。
  • CacheThreadPool 採用沒有容量的SynchronousQueue做爲線程池的工做隊列,其corePoolSize爲0,maximumPool是無界的;其中的空閒線程最多等待60s。 若是主線程提交任務的速度高於maximumPool中線程處理任務的速度時,CacheThreadPool會不斷建立新線程,極端狀況下,CacheThreadPool會由於建立過多線程而耗盡CPU資源。

10.3 ScheduledThreadPoolExecutor詳解

用來在給定的延遲以後執行任務,或者按期執行任務,而且能夠在指定的構造函數中指定多個對應的後臺線程數。 它採用DelayQueue這個無界隊列做爲工做隊列,其執行分爲兩個部分:

  • 當調用ScheduledThreadPoolExecutorscheduleAtFixedRate()或者scheduleWithFIxedDelay,它會向DelayQueue中添加ScheduledFutureTask
  • 線程池中的線程從DelayQueue中獲取ScheduledFutureTask
相關文章
相關標籤/搜索