Java多線程筆記(二):鎖與閉鎖工具類

爲了更好地支持併發程序,「鎖」是較爲經常使用的同步方法之一。在高併發環境下,激勵的鎖競爭會致使程序的性能降低。
因此咱們將在這裏討論一些有關於鎖使用和問題以及一些注意事項。前端

工具類

ReentrantLock

重入鎖能夠徹底替代Synchronized關鍵字,但其必須顯式的調用unlock。建議視爲Synchronized的高級版,比起Synchronized關鍵字,其可定時、可輪詢並含有可中斷的鎖獲取操做,公平隊列以及非塊結構的鎖。java

/**
 * 重入鎖演示
 *
 */
public class ReeterLock implements Runnable{

    public static ReentrantLock lock = new ReentrantLock();

    public static int i = 0;

    @Override
    public void run() {
        for (int j=0;j<10000;j++){
            //手動上鎖,能夠上N把,這裏是爲了演示
            lock.lock();
            lock.lock();
            lock.lock();
            try {
                i ++;
            } finally {
                //不管如何須須釋放鎖,上幾把 釋放幾把
                lock.unlock();
                lock.unlock();
                lock.unlock();
            }
        }
    }

    public static void main(String[] a) throws InterruptedException {
        ReeterLock rl = new ReeterLock();
        Thread t1 = new Thread(rl);
        Thread t2 = new Thread(rl);
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.print(i);
    }
}

那麼咱們能夠明顯的看到重入鎖保護着臨界區資源i,確保多線程對i操做的安全。在demo中咱們也是加了3次鎖並釋放了3次鎖。程序員

須要注意的是,若是同一線程屢次得到鎖,那麼在釋放鎖的時候,也必須釋放相同次數。若是釋放的次數多了,會獲得一個java.lang.IllegalMonitorStateException異常;反之則會致使當前線程一直持有該鎖,致使其餘線程沒法進入臨界區。算法

中斷響應:ReentrantLock.lockInterruptibly()

對於synchronized來講,若是一個線程在等待鎖,那麼結果只有兩種狀況,要麼它得到這把鎖繼續執行,要麼它就保持等待。而使用重入鎖,則提供另一種可能,那就是線程能夠被中斷。也就是在等待鎖,程序能夠根據須要取消對鎖的請求。有些時候,這麼作是很是有必要的。數組

lockInterruptibly()方法是一個能夠對中斷進行響應的鎖申請動做,即在等待鎖的過程當中,中斷響應。緩存

public class IntLock implements Runnable{

    public static ReentrantLock lock1 = new ReentrantLock();
    public static ReentrantLock lock2 = new ReentrantLock();
    int lock;

    /**
     * 控制加鎖順序,製造死鎖
     * @param lock
     */
    public IntLock(int lock) {
        this.lock = lock;
    }

    @Override
    public void run() {
        try {
            /**
             *  1號線程,先佔用 1號鎖,再申請 2號鎖
             *  2號線程,先佔用 2號鎖,再申請 1號鎖
             *  這樣就很容易形成兩個線程相互等待.
             */
            if (lock == 1){
                //加入優先響應中斷的鎖
                lock1.lockInterruptibly();
                System.out.println(Thread.currentThread().getName() + "  進入...");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                /**
                 * 這時候,1號線程 想要持有 2號鎖 ,可是2號線程已經先佔用了2號鎖,因此1 號線程等待.
                 * 2號線程也同樣,佔用着2號鎖 不釋放,還想申請1號鎖,而1號鎖 被1號線程佔用且不釋放.
                 */
                lock2.lockInterruptibly();
                System.out.println(Thread.currentThread().getName() + "  完成...");

            }else {
                lock2.lockInterruptibly();
                System.out.println(Thread.currentThread().getName() + "  進入...");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                lock1.lockInterruptibly();
                System.out.println(Thread.currentThread().getName() + "  完成...");
            }
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + "  被中斷,報異常...");
            e.printStackTrace();
        } finally {
            if (lock1.isHeldByCurrentThread()) {
                System.out.println(Thread.currentThread().getName() + "  釋放...");
                lock1.unlock();
            }
            if (lock2.isHeldByCurrentThread()) {
                System.out.println(Thread.currentThread().getName() + "  釋放...");
                lock2.unlock();
            }
            System.out.println(Thread.currentThread().getName() + " 線程退出...");
        }
    }

    public static void main(String[] a) throws InterruptedException {
        IntLock re1 = new IntLock(1);
        IntLock re2 = new IntLock(2);
        Thread t1 = new Thread(re1," 1 號線程 ");
        Thread t2 = new Thread(re2," 2 號線程 ");
        t1.start();
        t2.start();
        //主線程sleep 2秒,讓兩個線程相互競爭資源.形成死鎖
        Thread.sleep(2000);
        //中斷2號線程
        t2.interrupt();

        /* 執行結果:

            1 號線程   進入...
            2 號線程   進入...
            2 號線程   被中斷,報異常...    // 執行 t2.interrupt();
            java.lang.InterruptedException
            2 號線程   釋放...
            at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898)
            2 號線程  線程退出...
            at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
            1 號線程   完成...  // 只有1號線程能執行完成
            at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
            1 號線程   釋放...
            at com.iboray.javacore.Thread.T3.IntLock.run(IntLock.java:55)
            1 號線程   釋放...
            at java.lang.Thread.run(Thread.java:745)
            1 號線程  線程退出...
        */


    }
}

鎖申請等待限時:ReentrantLock.tryLock

除了等待外部通以外,避免死鎖還有另一種方法,就是限時等待,給定一個等待時間讓線程自動放棄。安全

  • tryLock(時長,計時單位),若超過設定時長還沒獲得鎖就返回false,若成功得到鎖就返回true。
  • tryLock(),若沒有參數,當前線程會嘗試得到鎖,若是申請鎖成功,則返回true,不然當即返回false。這種模式不會引發線程等待,所以不會產生死鎖。
public class TimeLock implements Runnable{

    public static ReentrantLock lock = new ReentrantLock();

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " 申請資源...");
        try {
            //申請3秒,若是獲取不到,返回false,退出.
            if (lock.tryLock(5, TimeUnit.SECONDS)) {
                System.out.println(Thread.currentThread().getName() + " 得到資源,開始執行...");
                //持有鎖6秒
                Thread.sleep(6000);
                System.out.println(Thread.currentThread().getName() + " 執行完成...");
            }else {
                System.out.println(Thread.currentThread().getName() + " 申請鎖失敗...");
            }
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + " 中斷...");
            e.printStackTrace();
        }finally {
            if (lock.isHeldByCurrentThread()) {
                System.out.println(Thread.currentThread().getName() + " 釋放鎖...");
                lock.unlock();
            }
        }

    }

    public static void main(String[] a) throws InterruptedException {
        TimeLock re = new TimeLock();
        Thread t1 = new Thread(re," 1 號線程 ");
        Thread t2 = new Thread(re," 2 號線程 ");
        t1.start();
        t2.start();

        /*
        執行結果:

            1 號線程  申請資源...
            2 號線程  申請資源...
            1 號線程  得到資源,開始執行...
            2 號線程  釋放鎖...  //等待了5秒後,依然申請不到鎖,就返回false
            1 號線程  執行完成...
            1 號線程  釋放鎖...
        */
    }
}

因爲佔用鎖的線程會持有鎖長達6秒,故另外一個線程沒法在5秒的等待時間內獲取鎖,所以,請求鎖會失敗。性能優化

公平鎖:ReentrantLock(true)

在大多數狀況下,鎖的申請都是非公平的。也就是說,線程1首先請求了鎖A,接着線程2也請求了鎖A。那麼當鎖A可用時,是線程1仍是線程2能夠得到鎖呢?顯然這是不必定的。系統只會從這個鎖的等待隊列中隨機挑選一個,所以不能保證其公平性。數據結構

公平鎖會按照實際的前後順序,保證先到先得,它不會產生飢餓,只要排隊,最終均可以等到資源。在建立重入鎖時,經過有參構造函數,傳入boolean類型的參數,true表示是公平鎖。實現公平所必然要維護一個有序隊列,因此公平鎖的實現成本高,性能相對也很是低,默認狀況下,鎖是非公平的。多線程

public class ReentrantLockExample3 implements Runnable{

    //建立公平鎖
    public static ReentrantLock lock = new ReentrantLock(true);

    static  int i = 0;

    @Override
    public void run() {

        for (int j = 0;j<5;j++){
            lock.lock();
            try {
                i++;
                System.out.println(Thread.currentThread().getName() + " 得到鎖 " + i);
            } finally {
                lock.unlock();
            }
        }

    }

    public static void main(String[] a) throws InterruptedException {
        ReentrantLockExample3 re = new ReentrantLockExample3();
        Thread t1 = new Thread(re," 1 號線程 ");
        Thread t2 = new Thread(re," 2 號線程 ");
        Thread t3 = new Thread(re," 3 號線程 ");
        Thread t4 = new Thread(re," 4 號線程 ");
        t1.start();
        t2.start();
        t3.start();
        t4.start();

        /*
        執行結果:

        1 號線程  得到鎖 1
        2 號線程  得到鎖 2
        3 號線程  得到鎖 3
        4 號線程  得到鎖 4
        1 號線程  得到鎖 5
        2 號線程  得到鎖 6
        3 號線程  得到鎖 7
        4 號線程  得到鎖 8
        .....
        4 號線程  得到鎖 16
        1 號線程  得到鎖 17
        2 號線程  得到鎖 18
        3 號線程  得到鎖 19
        4 號線程  得到鎖 20
        */

    }
}

ReentrantLock的以上幾個重要的方法

  • lock() 獲取鎖,若是鎖被佔用,則等待
  • lockInterruptibly() 獲取鎖,但優先響應中斷
  • tryLock() 嘗試獲取鎖,若是成功返回true,不然返回false。該方法不等待,當即返回。
  • tryLock(long time,TimeUnit unit) 在給定時間內獲取鎖。
  • unlock() 釋放鎖。

就重入鎖實現來看,它主要集中在Java 層面。在重入鎖實現中,主要包含三個要素:

  1. 原子狀態。原子狀態使用CAS操做來存儲當前鎖的狀態,判斷鎖是否已經被別的線程持有。
  2. 等待隊列。全部沒有請求成功的線程都進入等待隊列進行等待。當有線程釋放鎖後,系統就從當前等待隊列中喚醒一個線程繼續工做。
  3. 阻塞原語park()和unpack(),用來掛起和恢復線程。沒有獲得鎖的線程將會被掛起。

重入鎖的好搭檔:Condition條件

若是你們理解了Object.wait()Object.notify()方法的話,就能很容易地理解Condition對象了。它和wait()和notify()方法的做用是大體相同的。可是wait()方法和notify()方法是和synchronized關鍵字合做使用的,而Condtion是與重入鎖相關聯的。經過Lock接口(重入鎖就實現了這個接口)的Condtion newCondition()方法能夠生成一個與當前重入鎖綁定的Condition實例。利用Condition對象,咱們就可讓線程在合適的時間等待,或者在某一個特定的時刻獲得通知,繼續執行。

Condition接口提供的基本方法以下:

void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();

以上方法含義以下

  • await()方法會使當前線程等待,同時釋放當前鎖,當其餘線程中使用signal()或者signalAll()方法時候,線程會從新得到鎖並繼續執行。或者當線程被中斷時,也能跳出等待。這和Object.wait()方法很類似。
  • awaitUninterruptibly()await()方法相似,但它不會再等待過程當中響應中斷。
  • singal() 用於喚醒一個等待隊列中的線程。singalAll()是喚醒全部等待線程。
public class ConditionExample implements Runnable{
    public static ReentrantLock lock = new ReentrantLock();
    public static Condition condition = lock.newCondition();


    @Override
    public void run() {

        try {
            lock.lock();
            System.out.println(Thread.currentThread().getName() + " 獲取到鎖...");
            //等待
            condition.await();
            System.out.println(Thread.currentThread().getName() + " 執行完成");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            //釋放鎖
            lock.unlock();
            System.out.println(Thread.currentThread().getName() + " 釋放鎖");
        }


    }

    public static void main(String[] a) throws InterruptedException {
        ConditionExample re = new ConditionExample();
        Thread t1 = new Thread(re,"1 號線程 ");
        t1.start();
        //主線程sleep,1號線程會一直等待.直到獲取到1號線程的鎖資源,並將其喚醒.
        Thread.sleep(2000);
        //得到鎖
        lock.lock();
        //喚醒前必須得到當前資源對象的鎖
        condition.signal();
        //釋放鎖
        lock.unlock();

    }
}

ReadWriteLock讀寫鎖

ReadWriteLock是JDK5中提供的讀寫分離鎖。讀寫分離鎖能夠有效地幫助減小鎖競爭,以提高系統性能。用鎖分離的機制來提高性能很是容易理解,好比線程A一、A二、A3進行寫操做,B一、B二、B3進行讀操做,若是使用重入鎖或者內部鎖,則理論上說全部讀之間、讀與寫之間、寫和寫之間都是串行操做。當B1進行讀取時,B二、B3則須要等待鎖。因爲讀操做並不對數據的完整性形成破壞,這種等待顯然是不合理的。所以,讀寫鎖就有了發揮功能的餘地。

在這種狀況下,讀寫鎖運行多個線程同時讀。可是考慮到數據完整性,寫寫操做和讀寫操做間依然是須要互相等待和持有鎖的。總的來講,讀寫鎖的訪問約束以下表。

非阻塞 阻塞
阻塞 阻塞

若是在系統中,讀的次數遠遠大於寫的操做,讀寫鎖就能夠發揮最大的功效,提高系統的性能。

栗子:

public class ReadWriteLockExample {

    //建立普通重入鎖
    private static Lock lock = new ReentrantLock();

    //建立讀寫分離鎖
    private static ReentrantReadWriteLock rwlock = new ReentrantReadWriteLock();

    //建立讀鎖
    private static Lock readLock = rwlock.readLock();

    //建立寫鎖
    private static Lock writeLock = rwlock.writeLock();

    private  int value;

    public Object HandleRead(Lock lock) throws InterruptedException {
        try {
            //上鎖
            lock.lock();
            //模擬處理業務
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() + " Read...");
            return value;
        } finally {
            //釋放鎖
            lock.unlock();
        }
    }

    public void HandleWrite(Lock lock,int index) throws InterruptedException {
        try {
            lock.lock();
            Thread.sleep(1000);
            value = index;
            System.out.println(Thread.currentThread().getName() + " Write...");
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] a ) throws InterruptedException {
        final ReadWriteLockExample rwle = new ReadWriteLockExample();

        //建立讀方法
        Runnable readR = new Runnable() {
            @Override
            public void run() {
                try {
                    //rwle.HandleRead(lock); //普通鎖
                    rwle.HandleRead(readLock);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        //建立寫方法
        Runnable writeR = new Runnable() {
            @Override
            public void run() {
                try {
                    //rwle.HandleWrite(lock,new Random().nextInt()); //普通鎖
                    rwle.HandleWrite(writeLock,new Random().nextInt());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        //18次讀
        for (int i=0;i<18;i++){
            Thread s = new Thread(readR);
            s.start();
        }
        //2次寫
        for (int i=18;i<20;i++){
            Thread s = new Thread(writeR);
            s.start();
        }

        /**
         * 結論:
         *
         * 用普通鎖運行,大約執行20秒左右
         *
         * 用讀寫分離鎖,大約執行3秒左右
         *
         */

    }

}

在讀鎖和寫鎖之間的交互能夠採用多種實現方式。ReadWriteLock中的一些可選實現包括:

  • 釋放優先:當一個寫入操做釋放寫入鎖時,而且隊列中同時存在讀線程和寫線程,那麼應該優先選擇哪個線程。
  • 讀線程插隊:若是鎖是由讀線程持有,可是寫線程還在等待,是否容許新到的讀線程得到訪問權,仍是應在寫線程後面等待?若容許的話能夠提升併發性可是可能形成寫線程的飢餓。
  • 重入性:讀取鎖和寫入鎖是否可重入。
  • 降級和升級:若一個線程持有寫鎖能否在繼續持有寫鎖的狀態下獲取讀鎖?這可能會使寫鎖「降級」爲讀鎖。讀鎖是否優先於其它正在等待的讀線程和寫線程而升級爲一個寫鎖?在大多數讀寫鎖實現中不支持「升級」,由於這樣容易死鎖(兩個讀線程試圖同時升級爲寫鎖,那麼兩者都不會釋放寫鎖)。

閉鎖

閉鎖是一種同步工具類,能夠延遲線程的進度直到其到達終止狀態。閉鎖的做用至關於一扇門:在閉鎖到達結束狀態以前,這扇門一直是關閉的,而且沒有任何線程能經過,當到達結束狀態時,這扇門會打開並容許全部的線程經過。當閉鎖到達結束狀態後,將不會再改變狀態,所以這扇門將永遠保持打開狀態。閉鎖能夠用來確保某些活動直到其餘活動都完成才繼續執行,例如:

  • 確保某個計算在其須要的全部資源都被初始化後才繼續執行。二元閉鎖(包括兩個狀態)能夠用來表示「資源R已經被初始化」,而全部須要R的操做都必須先在這個比鎖上等待。
  • 確保某個服務在其依賴的全部其餘服務都已經啓動以後才啓動。每一個服務都有一個相關的二元閉鎖。當啓動服務S時,將首先在S依賴的其餘服務的閉鎖上等待,在全部依賴的服務都啓動後會釋放閉鎖S,這樣其餘依賴S的服務才能繼續執行。
  1. 等待直到某個操做的全部參與者(例如,在多玩家遊戲中的全部玩家)都就緒再繼續執行。在這種狀況中,當全部的玩家都執行就緒時,閉鎖將到達結束狀態。

CountDownLatch

CountDownLatch 就是一種靈活的閉鎖實現,能夠在上述的各類狀況中使用,它可使一個或多個線程等待一組時間發生CountDown在英文中意爲倒計時,Latch爲門閂。閉鎖的狀態包括一個計數器,該計數器被初始化爲一個正數,表示須要等待的事情數量。countDown方法遞減計數器,表示有一個事件已經發生了,而await方法等待計數器達到零,這表示全部須要等待的事情都已經發生。若是計數器的值是非零,那麼await會一直阻塞直到計數器爲零,或者等待中的線程中斷,或者等待超時。所以,這個工具一般用來控制線程等待,它可讓某一個線程等待直到倒計時結束,再開始執行。

CountDownLatch的構造函數接收一個整數做爲參數,即當前這個計數器的技術個數。

public CountDownLatch(int count)

下面這個簡單的示例,演示了CountDownLatch的使用。

public class CountDownLatchExample implements Runnable{

    static final CountDownLatch cdl = new CountDownLatch(10);
    static final CountDownLatchExample cdle = new CountDownLatchExample();

    @Override
    public void run() {
        try {
            Thread.sleep(new Random().nextInt(10) * 1000);
            System.out.println(Thread.currentThread().getName() + " 部件檢查完畢...");
            //一個線程完成工做,倒計時器減1
            cdl.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] a) throws InterruptedException {
        ExecutorService exec = Executors.newFixedThreadPool(10);
        for (int i=0;i<10;i++){
            exec.submit(cdle);
        }
        //等待全部線程完成,主線程才繼續執行
        cdl.await();
        System.out.println(Thread.currentThread().getName() + " 全部檢查完成,上跑道起飛...");
        //關閉線程池
        exec.shutdown();
    }
}

FutureTask

FutureTask也能夠用作閉鎖。FutureTask表示的計算是經過Callable來實現的,至關於一種可生成結果的Runnable,而且能夠處於如下3種狀態:

  • 等待運行(Waiting to run)
  • 正在運行(Running)
  • 運行完成(Completed)

Future.get的行爲取決於任務的狀態。若是任務已經完成,那麼get會當即返回結果,不然get將阻塞直到任務進入完成狀態,而後返回結果或者拋出異常。FutureTask將計算結果從執行的計算的線程傳遞到獲取這個結果的線程,而FutureTask的規劃確保了這種傳遞過程可以實現結果的安全發佈。

FutureTask在ExeCutor中表示異步任務,此外還能夠用來表示一些時間較長的計算,這些計算能夠在使用計算結果以前啓動。

信號量

技術信號量(Counting Semaphore)用來控制同時訪問某個特定資源的操做數量,或者同時執行某個指定操做的數量。計數信號量還能夠用來實現某種資源池,或者對容器施加邊界。

信號量爲多線程協做提供了更爲強大的控制方法。廣義上說,信號量是對鎖的擴展。不管是內部鎖synchronized仍是重入鎖ReentrantLock,一次都只容許一個線程訪問一個資源,而信號量卻能夠指定多個線程,同時訪問某一個資源。信號量主要提供瞭如下構造函數:

public Semaphore(int permits)
public Semaphore(int permits,boolean fair) //第二個參數能夠指定是否公平

在構造信號量對象時,必需要指定信號量的准入數,即同時能申請多少個許可。當每一個線程每次只申請一個許可時,這就至關於指定了同時有多少個線程能夠訪問某一個資源。信號量的主要邏輯方法有:

public void acquire() //嘗試得到一個准入的許可。若沒法得到,則線程會等待,直到有線程釋放一個許可或者當前線程被中斷
public void acquireUninterruptibly()//和acquire()相似,可是不響應中斷
public boolean tryAcquire()//嘗試得到一個許可,成功true失敗fasle,不會等待,馬上返回
public boolean tryAcquire(long timeout,TimeUnit unit)
public void release()//線程訪問資源結束後,釋放一個許可,以使其餘等待許可的線程進行資源訪問

栗子:

public class SemaphoreExample implements Runnable {

    //指定信號量,同時能夠有5個線程訪問資源
    public static final Semaphore s = new Semaphore(5);

    @Override
    public void run() {

        try {
            //申請信號量,也能夠直接使用 s.acquire();
            if (s.tryAcquire(1500, TimeUnit.SECONDS)) {
                //模擬耗時操做
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getName() + " 完成了任務..");
                //離開時必須釋放信號量,否則會致使信號量泄露——申請了但沒有釋放
                s.release();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] a) throws InterruptedException {
       //申請20個線程
        ExecutorService exec = Executors.newFixedThreadPool(20);
        final SemaphoreExample re = new SemaphoreExample();
        for (int i=0;i<20;i++){
            exec.submit(re);
        }
        exec.shutdown();
    }
}

Semaphore中管理者一組虛擬的許可(permit),許可的初始數量可經過構造函數來指定。在執行操做時能夠先得到許可(只要還有剩餘的許可),並在使用之後釋放許可。若是沒有許可,那麼acquire將阻塞直到有許可(或者被中斷或者操做超時)。release方法將返回一個許可給信號量。計算信號量的一種簡化形式是二值信號量,即初始值爲1的Semaphore。二值信號量能夠用作互斥體(mutex),並具有不可重入的加鎖語義:誰擁有這個惟一的許可,誰就擁有了互斥鎖。

一樣,咱們也可使用Semaphore將任何一種容器變成有界阻塞容器。

CyclicBarrier

和以前的CountDownLatch相似,它(循環柵欄)也能夠實現線程間的技術等待,但它的功能比CountDownLatch更加複雜強大。它能阻塞一組線程直到某個事件發生。所以,柵欄能夠用於實現一些協議,例如「開會必定要在xx地方集合,等其餘人到了再討論下一步要作的事情」。

CyclicBarrier的使用場景也很豐富。好比,司令下達命令,要求10個士兵一塊兒去完成一項任務。這時,就會要求10個士兵先集合報道,接着,一塊兒雄赳赳氣昂昂地執行任務。當10個士兵把本身手頭的任務都執行完成了,那麼司令才能對外宣佈,任務完成!

下面的栗子使用CyclicBarrier演示了上述司機命令士兵完成任務的場景。

public class CyclicBarrierExample {

    public static class Soldier implements Runnable{

        private String name;
        private CyclicBarrier cyclicBarrier;

        public Soldier(String name, CyclicBarrier cyclicBarrier) {
            this.name = name;
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            try {
                System.out.println(name + " 來報道..");
                //等待全部士兵到齊
                cyclicBarrier.await();
                doWork();
                //等待全部士兵完成任務
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }

        void doWork(){
            try {
                Thread.sleep(new Random().nextInt(10) * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(name + " 任務已完成..");
        }
    }

    public static class doOrder implements Runnable{

        boolean flag;
        int n;

        public doOrder(boolean flag, int n) {
            this.flag = flag;
            this.n = n;
        }

        @Override
        public void run() {
            if (flag){
                System.out.println("司令 : 士兵 " + n +"個 任務完成");
            }else {
                System.out.println("司令 : 士兵 " + n +"個 集合完畢");
                //執行完後 改變完成標記.當下一次調用doOrder時,能夠進入if
                flag = true;
            }
        }
    }

    public static void main(String[] a){
        final int n = 10;
        //是否完成了任務
        boolean flag = false;

        //建立10個士兵線程
        Thread[] allSoldier = new Thread[n];
        //建立CyclicBarrier實例
        //這裏的意思是,等待10個線程都執行完,就執行doOrder()方法
        CyclicBarrier c = new CyclicBarrier(n, new doOrder(flag,n));
        for (int i=0;i<n;i++){
            //System.out.println("士兵" + i + " 報道");
            //裝配士兵線程
            allSoldier[i] = new Thread(new Soldier("士兵" + i,c));
            /**
             * 開啓士兵線程,可是執行到第一個cyclicBarrier.await()柵欄時,
             * 要等待,等到10個士兵線程都到這裏等着,等到執行完doOrder()方法後,完成第一次計數.
             *
             * 這樣才能繼續執行下一個方法doWork(),而doWork()完成後,又須要第二次等待,
             * 等待所有士兵線程都到等待隊列後,再次調用doOrder()方法.完成第二次計數.
             * 而這個方法中,每一個線程的flag都已經改變,利用flag,完成任務.
             *
             */
            allSoldier[i].start();

            /*
                執行結果:

                士兵0 來報道..
                士兵1 來報道..
                ......
                士兵8 來報道..
                士兵9 來報道..
                司令 : 士兵 10個 集合完畢
                士兵2 任務已完成..
                士兵8 任務已完成..
                ......
                士兵9 任務已完成..
                士兵4 任務已完成..
                司令 : 士兵 10個 任務完成
            */

        }
    }

}

優化鎖

通常咱們對於鎖的優化有如下幾個大體方向:

  • 減小鎖的持有時間
  • 減小鎖的請求頻率
  • 使用帶有協調機制的獨佔鎖,這些機制容許更高的併發性

鎖分段技術

在某些狀況下,能夠將鎖分解技術進一步擴展爲對一組獨立對象上的鎖進行分解,這種狀況被稱爲鎖分段。例如,在ConcurrentHashMap的實現中使用了一個包含16個鎖的數組,每一個鎖保護全部散列桶的1/16,其中第N個散列桶由第(N mod 16)個鎖來保護。假設散列函數具備合理的分佈性,而且關鍵字可以均勻分佈,那麼這大約能把對於鎖的請求減小到原來的1/16,正是這項技術使得ConcurrentHashMap可以支持多達16個併發的寫入器。(要使得擁有大量處理器的系統在高訪問量的狀況下實現更高的併發性,還能夠進一步增長鎖的數量,但僅當你能證實併發寫入線程的競爭足夠激烈並須要突破這個限制時,才能將鎖分段的數量超過默認的16個。)

另外一個典型的案例就是LinkedBlockingQueue的實現。
take()和put()方法雖然都對隊列進行了修改操做,但因爲是鏈表,所以,兩個操做分別做用於隊列的前端和末尾,理論上二者並不衝突。使用獨佔鎖,則要求在進行take和put操做時獲取當前隊列的獨佔鎖,那麼take和put就不可能真正的併發,他們會彼此等待對方釋放鎖。在JDK的實現中,取而代之的是兩把不一樣的鎖,分離了take和put操做.削弱了競爭的可能性.實現類取數據和寫數據的分離,實現了真正意義上成爲併發操做。

鎖分段的一個劣勢在於:與採用單個鎖來實現獨佔訪問相比,要獲取多個鎖來實現獨佔訪問將更加困難而且開銷更高。一般,在執行一個操做時最多隻需獲取一個鎖,但在某些狀況下須要加鎖整個容器,例如當ConcurrentHashMap須要擴展映射範圍,以及從新計算鍵值的散列值要分佈到更大的桶集合中時,就須要獲取分段鎖集合中的全部鎖。

避免熱點域

鎖分解和鎖分段技術都能提升可伸縮性,由於它們都能使不一樣的線程在不一樣的數據(或者同一個數據的不一樣部分)上操做,而不會相互干擾。若是程序採用鎖分段或分解技術,那麼必定要表現出在鎖上的競爭頻率高於在鎖保護的數據上發生競爭的頻率。若是一個鎖保護兩個獨立變量X和Y,而且線程A想要訪問X,而線程B想要訪問Y(這相似於在ServerStatus中,一個線程調用addUser,而另外一個線程調用addQuery),那麼這兩個線程不會在任何數據上發生競爭,即便它們會在同一個鎖上發生競爭。

當每一個操做都請求多個變量時,鎖的粒度將很難下降。這是在性能與可伸縮性之間相互制衡的另外一個方面,一些常見的優化措施,例如將一些反覆計算的結果緩存起來,都會引入一些」熱點域「,而這些熱點域每每會限制可伸縮性。

當實現HashMap時,你須要考慮如何在size方法中計算Map中的元素數量。最簡單的方法就是,在每次調用時都統計一次元素的數量。一種常見的優化措施是,在插入和移除元素時更新一個計數器,雖然這在put和remove等方法中略微增長了一些開銷,以確保計數器是最新的值,但這把size方法的開銷從O(n)下降到O(1)。

在單線程或者採用徹底同步的實現中,使用一個獨立的計算器能很好地提升相似size和isEmpty這些方法的執行速度,但卻致使更難以提高實現的可伸縮性,由於每一個修改map的操做都須要更新這個共享的計數器。即便使用鎖分段技術來實現散列鏈,那麼在對計數器的訪問進行同步時,也會從新致使在使用獨佔鎖時存在的可伸縮性問題。一個看似性能優化的措施——緩存size操做的結果,已經變成了一個可伸縮性問題。在這種狀況下,計數器也被稱爲熱點域,由於每一個致使元素數量發生變化的操做都須要訪問它。
爲了不這個問題,ConcurrentHashMap中的size將對每一個分段進行枚舉並將每一個分段中的元素數量相加,而不是維護一個全局計數。爲了不枚舉每一個元素,ConcurrentHashMap爲每一個分段都維護一個獨立的計數,並經過每一個分段的鎖來維護這個值。

一些替代獨佔鎖的方法

第三種下降競爭鎖的影響的技術就是放棄使用獨佔鎖,從而有助於使用一種友好併發的方式來管理共享狀態。例如,使用併發容器、讀-寫鎖、不可變對象以及原子變量。

ReadWriteLock實現了一種在多個讀取操做以及單個寫入操做狀況下的加鎖規則:若是多個讀取操做都不會修改共享資源,那麼這些讀取操做能夠同時訪問該共享資源,但在執行寫入操做時必須以獨佔方式來獲取鎖。對於讀取操做佔多數的數據結構,ReadWriteLock可以提供比獨佔鎖更高的併發性。而對於只讀的數據結構,其中包含的不變性能夠徹底不須要加鎖操做。

原子變量提供了一種方式來下降更新「熱點域」時的開銷,例如競態計數器、序列發生器、或者對鏈表數據結構中頭節點的引用。原子變量類提供了在整數或者對象引用上的細粒度原子操做(所以可伸縮性更高),並使用了現代處理器中提供的底層併發原語(例如比較並交換)。若是在類中只包含少許的熱點域,而且這些域不會與其餘變量參與到不變性條件中,那麼用原子變量來替代他們能提升可伸縮性。(經過減小算法中的熱點域,能夠提升可伸縮性——雖然原子變量能下降熱點域的更新開銷,但並不能徹底消除。)

來自JVM的鎖優化

鎖粗化

若是對一個鎖不停地進行請求,同步和釋放,其自己也會消耗系統寶貴的資源,反而不利於性能優化.
虛擬機在遇到須要一連串對同一把鎖不斷進行請求和釋放操做的狀況時,便會把全部的鎖操做整合成對鎖的一次請求,從而減小對鎖的請求同步次數,這就是鎖的粗化。

鎖偏向

偏向鎖是一種針對加鎖操做的優化手段,他的核心思想是:若是一個線程得到了鎖,那麼鎖就進行偏向模式.當這個線程再次請求鎖時,無需再作任何同步操做.這樣就節省了大量操做鎖的動做,從而提升程序性能.

所以,對於幾乎沒有鎖競爭的場合,偏向鎖有比較好的優化效果.由於極有可能連續屢次是同一個線程請求相同的鎖.而對於鎖競爭激烈的程序,其效果不佳.

使用Java虛擬機參數:-XX:+UseBiasedLocking 能夠開啓偏向鎖.

輕量級鎖

若是偏向鎖失敗,虛擬機並不會當即掛起線程.它還會使用一種稱爲輕量級的鎖的優化手段.輕量級鎖只是簡單的將對象頭部做爲指針,指向持有鎖的線程堆棧內部,來判斷一個線程是否持有對象鎖.若是線程得到輕量鎖成功,則能夠順利進入臨界區.若是失敗,則表示其餘線程爭搶到了鎖,那麼當前線程的鎖請求就會膨脹爲重量級鎖.

自旋鎖

鎖膨脹後,虛擬機爲了不線程真實的在操做系統層面掛起,虛擬機還作了最後的努力就是自旋鎖.若是一個線程暫時沒法得到索,有可能在幾個CPU時鐘週期後就能夠獲得鎖,
那麼簡單粗暴的掛起線程多是得不償失的操做.虛擬機會假設在很短期內線程是能夠得到鎖的,因此會讓線程本身空循環(這即是自旋的含義),若是嘗試若干次後,能夠獲得鎖,那麼久能夠順利進入臨界區,
若是還得不到,纔會真實地講線程在操做系統層面掛起.

鎖消除

鎖消除是一種更完全的鎖優化,Java虛擬機在JIT編譯時,經過對運用上下文的掃描,去除不可能存在的共享資源競爭鎖,節省毫無心義的資源開銷.

咱們可能會問:若是不可能存在競爭,爲何程序員還要加上鎖呢?

在Java軟件開發過程當中,咱們必然會用上一些JDK的內置API,好比StringBuffer、Vector等。你在使用這些類的時候,也許根本不會考慮這些對象到底內部是如何實現的。好比,你頗有可能在一個不可能存在併發競爭的場合使用Vector。而周所衆知,Vector內部使用了synchronized請求鎖,以下代碼:

public String [] createString(){
  Vector<String> v = new Vector<String>();
  for (int i =0;i<100;i++){
    v.add(Integer.toString(i));
  }
  return v.toArray(new String[]{});
}

上述代碼中的Vector,因爲變量v只在createString()函數中使用,所以,它只是一個單純的局部變量。局部變量是在線程棧上分配的,屬於線程私有的數據,所以不可能被其餘線程訪問。因此,在這種狀況下,Vector內部全部加鎖同步都是沒有必要的。若是虛擬機檢測到這種狀況,就會將這些無用的鎖操做去除。

鎖消除設計的一項關鍵技術是逃逸分析,就是觀察某個變量是否會跳出某個做用域(好比對Vector的一些操做).在本例中,變量v顯然沒有逃出createString()函數以外。以次爲基礎,虛擬機才能夠大膽將v內部逃逸出當前函數,也就是說v有可能被其餘線程訪問。若是是這樣,虛擬機就不能消除v中的鎖操做。

逃逸分析必須在-server模式下進行,可使用-XX:+DoEscapeAnalysis參數打開逃逸分析。使用-XX:+EliminateLocks參數能夠打開鎖消除。

相關文章
相關標籤/搜索