02.第二階段、實戰Java高併發程序設計模式-5.JDK併發包1

5.JDK併發包1 ......................................................................................................................1html

1.java

各類同步控制工具的使用 ..........................................................................................3編程

1.1. ReentrantLock .......................................................................................................3api

  1. 1.1.1.  可重入 ...........................................................................................................3數組

  2. 1.1.2.  可中斷 ...........................................................................................................3緩存

  3. 1.1.3.  可限時 ...........................................................................................................3安全

  4. 1.1.4.  公平鎖 ...........................................................................................................3多線程

1.2. Condition ...............................................................................................................3併發

  1. 1.2.1.  概述 ...............................................................................................................3dom

  2. 1.2.2.  主要接口 .......................................................................................................3

  3. 1.2.3.  API詳解 ..........................................................................................................3

1.3. Semaphore ............................................................................................................4

  1. 1.3.1.  概述 ...............................................................................................................4

  2. 1.3.2.  主要接口 .......................................................................................................4

1.4. ReadWriteLock ......................................................................................................4

  1. 1.4.1.  概述 ...............................................................................................................4

  2. 1.4.2.  訪問狀況 .......................................................................................................4

  3. 1.4.3.  主要接口 .......................................................................................................4

1.5. CountDownLatch...................................................................................................4

  1. 1.5.1.  概述 ...............................................................................................................5

  2. 1.5.2.  主要接口 .......................................................................................................5

  3. 1.5.3.  示意圖 ...........................................................................................................5

1.6. CyclicBarrier ..........................................................................................................5

  1. 1.6.1.  概述 ...............................................................................................................5

  2. 1.6.2.  主要接口 .......................................................................................................5

  3. 1.6.3.  示意圖 ...........................................................................................................6

1.7. LockSupport ..........................................................................................................6

  1. 1.7.1.  概述 ...............................................................................................................6

  2. 1.7.2.  主要接口 .......................................................................................................6

  3. 1.7.3.  與suspend()比較 ...........................................................................................6

  4. 1.7.4.  中斷響應 .......................................................................................................6

1.8. ReentrantLock 的實現 ..........................................................................................6

  1. 1.8.1.  CAS狀態 .........................................................................................................6

  2. 1.8.2.  等待隊列 .......................................................................................................6

  3. 1.8.3.  park()..............................................................................................................7

併發容器及典型源碼分析 .........................................................................................7

2.

2.1. 集合包裝...............................................................................................................7

  1. 2.1.1.  HashMap ........................................................................................................7

  2. 2.1.2.  List ..................................................................................................................7

  3. 2.1.3.  Set ..................................................................................................................7

  1. 2.2.  ConcurrentHashMap .............................................................................................7

  2. 2.3.  BlockingQueue ......................................................................................................7

  3. 2.4.  ConcurrentLinkedQueue .......................................................................................8

 

1. 各類同步控制工具的使用

1.1. ReentrantLock

1.1.1. 可重入 單線程能夠重複進入,但要重複退出

1.1.2. 可中斷 lockInterruptibly()

1.1.3. 可限時 超時不能得到鎖,就返回false,不會永久等待構成死鎖

1.1.4. 公平鎖

先來先得

public ReentrantLock(boolean fair)
public static ReentrantLock fairLock = new ReentrantLock(true);

1.2. Condition

1.2.1. 概述

相似於 Object.wait()和Object.notify() 與ReentrantLock結合使用

1.2.2. 主要接口

void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();

1.2.3. API詳解 await()方法會使當前線程等待,同時釋放當前鎖,當其餘線程中使用signal()時或者signalAll()方法時,線

程會從新得到鎖並繼續執行。或者當線程被中斷時,也能跳出等待。這和Object.wait()方法很類似。

awaitUninterruptibly()方法與await()方法基本相同,可是它並不會再等待過程當中響應中斷。

singal()方法用於喚醒一個在等待中的線程。相對的singalAll()方法會喚醒全部在等待中的線程。這和Obej ct.notify()方法很相似。

ReentrantLock(重入鎖)功能詳解和應用演示

目錄

1. ReentrantLock簡介

jdk中獨佔鎖的實現除了使用關鍵字synchronized外,還可使用ReentrantLock。雖然在性能上ReentrantLock和synchronized沒有什麼區別,但ReentrantLock相比synchronized而言功能更加豐富,使用起來更爲靈活,也更適合複雜的併發場景。

2. ReentrantLock和synchronized的相同點

2.1 ReentrantLock是獨佔鎖且可重入的

  • 例子
public class ReentrantLockTest {

    public static void main(String[] args) throws InterruptedException {

        ReentrantLock lock = new ReentrantLock();

        for (int i = 1; i <= 3; i++) {
            lock.lock();
        }

        for(int i=1;i<=3;i++){
            try {

            } finally {
                lock.unlock();
            }
        }
    }
}

上面的代碼經過lock()方法先獲取鎖三次,而後經過unlock()方法釋放鎖3次,程序能夠正常退出。從上面的例子能夠看出,ReentrantLock是能夠重入的鎖,當一個線程獲取鎖時,還能夠接着重複獲取屢次。在加上ReentrantLock的的獨佔性,咱們能夠得出如下ReentrantLock和synchronized的相同點。

  • 1.ReentrantLock和synchronized都是獨佔鎖,只容許線程互斥的訪問臨界區。可是實現上二者不一樣:synchronized加鎖解鎖的過程是隱式的,用戶不用手動操做,優勢是操做簡單,但顯得不夠靈活。通常併發場景使用synchronized的就夠了;ReentrantLock須要手動加鎖和解鎖,且解鎖的操做盡可能要放在finally代碼塊中,保證線程正確釋放鎖。ReentrantLock操做較爲複雜,可是由於能夠手動控制加鎖和解鎖過程,在複雜的併發場景中能派上用場。

  • 2.ReentrantLock和synchronized都是可重入的。synchronized由於可重入所以能夠放在被遞歸執行的方法上,且不用擔憂線程最後可否正確釋放鎖;而ReentrantLock在重入時要卻確保重複獲取鎖的次數必須和重複釋放鎖的次數同樣,不然可能致使其餘線程沒法得到該鎖。

3. ReentrantLock相比synchronized的額外功能

3.1 ReentrantLock能夠實現公平鎖。

公平鎖是指當鎖可用時,在鎖上等待時間最長的線程將得到鎖的使用權。而非公平鎖則隨機分配這種使用權。和synchronized同樣,默認的ReentrantLock實現是非公平鎖,由於相比公平鎖,非公平鎖性能更好。固然公平鎖能防止飢餓,某些狀況下也頗有用。在建立ReentrantLock的時候經過傳進參數true建立公平鎖,若是傳入的是false或沒傳參數則建立的是非公平鎖

ReentrantLock lock = new ReentrantLock(true);

繼續跟進看下源碼

/**
 * Creates an instance of {@code ReentrantLock} with the
 * given fairness policy.
 *
 * @param fair {@code true} if this lock should use a fair ordering policy
 */
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

能夠看到公平鎖和非公平鎖的實現關鍵在於成員變量sync的實現不一樣,這是鎖實現互斥同步的核心。之後有機會咱們再細講。

  • 一個公平鎖的例子
public class ReentrantLockTest {

    static Lock lock = new ReentrantLock(true);

    public static void main(String[] args) throws InterruptedException {

        for(int i=0;i<5;i++){
            new Thread(new ThreadDemo(i)).start();
        }

    }

    static class ThreadDemo implements Runnable {
        Integer id;

        public ThreadDemo(Integer id) {
            this.id = id;
        }

        @Override

      public void run() {
            try {
                TimeUnit.MILLISECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            for(int i=0;i<2;i++){
                lock.lock();
                System.out.println("得到鎖的線程:"+id);
                lock.unlock();
            }
        }
    }
}
  • 公平鎖結果

咱們開啓5個線程,讓每一個線程都獲取釋放鎖兩次。爲了能更好的觀察到結果,在每次獲取鎖前讓線程休眠10毫秒。能夠看到線程幾乎是輪流的獲取到了鎖。若是咱們改爲非公平鎖,再看下結果

  • 非公平鎖結果

線程會重複獲取鎖。若是申請獲取鎖的線程足夠多,那麼可能會形成某些線程長時間得不到鎖。這就是非公平鎖的「飢餓」問題。

  • 公平鎖和非公平鎖該如何選擇
    大部分狀況下咱們使用非公平鎖,由於其性能比公平鎖好不少。可是公平鎖可以避免線程飢餓,某些狀況下也頗有用。

3.2 .ReentrantLock可響應中斷

當使用synchronized實現鎖時,阻塞在鎖上的線程除非得到鎖不然將一直等待下去,也就是說這種無限等待獲取鎖的行爲沒法被中斷。而ReentrantLock給咱們提供了一個能夠響應中斷的獲取鎖的方法lockInterruptibly()。該方法能夠用來解決死鎖問題。

  • 響應中斷的例子
public class ReentrantLockTest {
    static Lock lock1 = new ReentrantLock();
    static Lock lock2 = new ReentrantLock();
    public static void main(String[] args) throws InterruptedException {

        Thread thread = new Thread(new ThreadDemo(lock1, lock2));//該線程先獲取鎖1,再獲取鎖2
        Thread thread1 = new Thread(new ThreadDemo(lock2, lock1));//該線程先獲取鎖2,再獲取鎖1
        thread.start();
        thread1.start();
        thread.interrupt();//是第一個線程中斷
    }

    static class ThreadDemo implements Runnable {
        Lock firstLock;
        Lock secondLock;
        public ThreadDemo(Lock firstLock, Lock secondLock) {
            this.firstLock = firstLock;
            this.secondLock = secondLock;
        }
        @Override
        public void run() {
            try {
                firstLock.lockInterruptibly();
                TimeUnit.MILLISECONDS.sleep(10);//更好的觸發死鎖
                secondLock.lockInterruptibly();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                firstLock.unlock();
                secondLock.unlock();
                System.out.println(Thread.currentThread().getName()+"正常結束!");
            }
        }
    }
}
  • 結果

構造死鎖場景:建立兩個子線程,子線程在運行時會分別嘗試獲取兩把鎖。其中一個線程先獲取鎖1在獲取鎖2,另外一個線程正好相反。若是沒有外界中斷,該程序將處於死鎖狀態永遠沒法中止。咱們經過使其中一個線程中斷,來結束線程間毫無心義的等待。被中斷的線程將拋出異常,而另外一個線程將能獲取鎖後正常結束。

3.3 獲取鎖時限時等待

ReentrantLock還給咱們提供了獲取鎖限時等待的方法tryLock(),能夠選擇傳入時間參數,表示等待指定的時間,無參則表示當即返回鎖申請的結果:true表示獲取鎖成功,false表示獲取鎖失敗。咱們可使用該方法配合失敗重試機制來更好的解決死鎖問題。

  • 更好的解決死鎖的例子
public class ReentrantLockTest {
    static Lock lock1 = new ReentrantLock();
    static Lock lock2 = new ReentrantLock();
    public static void main(String[] args) throws InterruptedException {

        Thread thread = new Thread(new ThreadDemo(lock1, lock2));//該線程先獲取鎖1,再獲取鎖2
        Thread thread1 = new Thread(new ThreadDemo(lock2, lock1));//該線程先獲取鎖2,再獲取鎖1
        thread.start();
        thread1.start();
    }

    static class ThreadDemo implements Runnable {
        Lock firstLock;
        Lock secondLock;
        public ThreadDemo(Lock firstLock, Lock secondLock) {
            this.firstLock = firstLock;
            this.secondLock = secondLock;
        }
        @Override
        public void run() {
            try {
                while(!lock1.tryLock()){
                    TimeUnit.MILLISECONDS.sleep(10);
                }
                while(!lock2.tryLock()){
                    lock1.unlock();
                    TimeUnit.MILLISECONDS.sleep(10);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                firstLock.unlock();
                secondLock.unlock();
                System.out.println(Thread.currentThread().getName()+"正常結束!");
            }
        }
    }
}
  • 結果

線程經過調用tryLock()方法獲取鎖,第一次獲取鎖失敗時會休眠10毫秒,而後從新獲取,直到獲取成功。第二次獲取失敗時,首先會釋放第一把鎖,再休眠10毫秒,而後重試直到成功爲止。線程獲取第二把鎖失敗時將會釋放第一把鎖,這是解決死鎖問題的關鍵,避免了兩個線程分別持有一把鎖而後相互請求另外一把鎖。

4. 結合Condition實現等待通知機制

使用synchronized結合Object上的wait和notify方法能夠實現線程間的等待通知機制。ReentrantLock結合Condition接口一樣能夠實現這個功能。並且相比前者使用起來更清晰也更簡單。

4.1 Condition使用簡介

Condition由ReentrantLock對象建立,而且能夠同時建立多個

static Condition notEmpty = lock.newCondition();

static Condition notFull = lock.newCondition();

Condition接口在使用前必須先調用ReentrantLock的lock()方法得到鎖。以後調用Condition接口的await()將釋放鎖,而且在該Condition上等待,直到有其餘線程調用Condition的signal()方法喚醒線程。使用方式和wait,notify相似。

  • 一個使用condition的簡單例子
public class ConditionTest {

    static ReentrantLock lock = new ReentrantLock();
    static Condition condition = lock.newCondition();
    public static void main(String[] args) throws InterruptedException {

        lock.lock();
        new Thread(new SignalThread()).start();
        System.out.println("主線程等待通知");
        try {
            condition.await();
        } finally {
            lock.unlock();
        }
        System.out.println("主線程恢復運行");
    }
    static class SignalThread implements Runnable {

        @Override
        public void run() {
            lock.lock();
            try {
                condition.signal();
                System.out.println("子線程通知");
            } finally {
                lock.unlock();
            }
        }
    }
}
  • 運行結果

4.2 使用Condition實現簡單的阻塞隊列

阻塞隊列是一種特殊的先進先出隊列,它有如下幾個特色
1.入隊和出隊線程安全
2.當隊列滿時,入隊線程會被阻塞;當隊列爲空時,出隊線程會被阻塞。

  • 阻塞隊列的簡單實現
public class MyBlockingQueue<E> {

    int size;//阻塞隊列最大容量

    ReentrantLock lock = new ReentrantLock();

    LinkedList<E> list=new LinkedList<>();//隊列底層實現

    Condition notFull = lock.newCondition();//隊列滿時的等待條件
    Condition notEmpty = lock.newCondition();//隊列空時的等待條件

    public MyBlockingQueue(int size) {
        this.size = size;
    }

    public void enqueue(E e) throws InterruptedException {
        lock.lock();
        try {
            while (list.size() ==size)//隊列已滿,在notFull條件上等待
                notFull.await();
            list.add(e);//入隊:加入鏈表末尾
            System.out.println("入隊:" +e);
            notEmpty.signal(); //通知在notEmpty條件上等待的線程
        } finally {
            lock.unlock();
        }
    }

    public E dequeue() throws InterruptedException {
        E e;
        lock.lock();
        try {
            while (list.size() == 0)//隊列爲空,在notEmpty條件上等待
                notEmpty.await();
            e = list.removeFirst();//出隊:移除鏈表首元素
            System.out.println("出隊:"+e);
            notFull.signal();//通知在notFull條件上等待的線程
            return e;
        } finally {
            lock.unlock();
        }
    }
}
  • 測試代碼
public static void main(String[] args) throws InterruptedException {

    MyBlockingQueue<Integer> queue = new MyBlockingQueue<>(2);
    for (int i = 0; i < 10; i++) {
        int data = i;
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    queue.enqueue(data);
                } catch (InterruptedException e) {

                }
            }
        }).start();

    }
    for(int i=0;i<10;i++){
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Integer data = queue.dequeue();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

}
  • 運行結果

5. 總結

ReentrantLock是可重入的獨佔鎖。比起synchronized功能更加豐富,支持公平鎖實現,支持中斷響應以及限時等待等等。能夠配合一個或多個Condition條件方便的實現等待通知機制。

 

1.3. Semaphore 1.3.1. 概述

共享鎖 運行多個線程同時臨界區

1.3.2. 主要接口

public void acquire()
public void acquireUninterruptibly()
public boolean tryAcquire()
public boolean tryAcquire(long timeout, TimeUnit unit) public void release()

 

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
 * description:
 *
 * @author: dawn.he QQ:       905845006
 * @email: dawn.he@cloudwise.com
 * @email: 905845006@qq.com
 * @date: 2019/10/3    6:36 PM
 */
public class SemaphoreTest implements Runnable{
    private Semaphore semaphore = new Semaphore(5);// 同步關鍵類,構造方法傳入的數字是多少,則同一個時刻,只運行多少個進程同時運行制定代碼

    public static void main(String args[]) {
        SemaphoreTest semp = new SemaphoreTest();
        ExecutorService exec = Executors.newFixedThreadPool(20);
        for(int i = 0;i<20;i++){
            exec.submit(semp);
        }
        exec.shutdown();
    }

    @Override
    public void run() {
        try {
            semaphore.acquire(); // 2 表示進入此代碼,就會消耗2個通路,2個通路從6箇中扣除
            System.out.println(Thread.currentThread().getId() + ":doSomething start-" );
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getId() + ":doSomething end-" );
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semaphore.release(); // release 放到 finally 中
        }
    }

}

 

1.4. ReadWriteLock

1.4.1. 概述

ReadWriteLock是JDK5中提供的讀寫分離鎖

1.4.2. 訪問狀況

讀-讀不互斥:讀讀之間不阻塞。

讀-寫互斥:讀阻塞寫,寫也會阻塞讀。

寫-寫互斥:寫寫阻塞。

1.4.3. 主要接口

private static ReentrantReadWriteLock readWriteLock=new ReentrantReadWriteLock();

private static Lock readLock = readWriteLock.readLock();
private static Lock writeLock = readWriteLock.writeLock();

 

Java併發包中ReadWriteLock是一個接口,主要有兩個方法,以下:

public interface ReadWriteLock {
    /**
     * Returns the lock used for reading.
     *
     * @return the lock used for reading
     */
    Lock readLock();

    /**
     * Returns the lock used for writing.
     *
     * @return the lock used for writing
     */
    Lock writeLock();
}

ReadWriteLock管理一組鎖,一個是隻讀的鎖,一個是寫鎖。
Java併發庫中ReetrantReadWriteLock實現了ReadWriteLock接口並添加了可重入的特性。
在具體講解ReetrantReadWriteLock的使用方法前,咱們有必要先對其幾個特性進行一些深刻學習瞭解。

1. ReetrantReadWriteLock特性說明

1.1 獲取鎖順序

  • 非公平模式(默認)
    當以非公平初始化時,讀鎖和寫鎖的獲取的順序是不肯定的。非公平鎖主張競爭獲取,可能會延緩一個或多個讀或寫線程,可是會比公平鎖有更高的吞吐量。
  • 公平模式
    當以公平模式初始化時,線程將會以隊列的順序獲取鎖。噹噹前線程釋放鎖後,等待時間最長的寫鎖線程就會被分配寫鎖;或者有一組讀線程組等待時間比寫線程長,那麼這組讀線程組將會被分配讀鎖。

1.2 可重入

什麼是可重入鎖,不可重入鎖呢?"重入"字面意思已經很明顯了,就是能夠從新進入。可重入鎖,就是說一個線程在獲取某個鎖後,還能夠繼續獲取該鎖,即容許一個線程屢次獲取同一個鎖。好比synchronized內置鎖就是可重入的,若是A類有2個synchornized方法method1和method2,那麼method1調用method2是容許的。顯然重入鎖給編程帶來了極大的方便。假如內置鎖不是可重入的,那麼致使的問題是:1個類的synchornized方法不能調用本類其餘synchornized方法,也不能調用父類中的synchornized方法。與內置鎖對應,JDK提供的顯示鎖ReentrantLock也是能夠重入的,這裏經過一個例子着重說下可重入鎖的釋放須要的事兒。

package test;

import java.util.concurrent.locks.ReentrantReadWriteLock;

public class Test1 {

    public static void main(String[] args) throws InterruptedException {
        final ReentrantReadWriteLock  lock = new ReentrantReadWriteLock ();
        Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                lock.writeLock().lock();
                System.out.println("Thread real execute");
                lock.writeLock().unlock();
            }
        });

        lock.writeLock().lock();
        lock.writeLock().lock();
        t.start();
        Thread.sleep(200);
        
        System.out.println("realse one once");
        lock.writeLock().unlock();
    }

}

 

運行結果.png


從運行結果中,能夠看到,程序並未執行線程的run方法,由此咱們可知,上面的代碼會出現死鎖,由於主線程2次獲取了鎖,可是卻只釋放1次鎖,致使線程t永遠也不能獲取鎖。一個線程獲取多少次鎖,就必須釋放多少次鎖。這對於內置鎖也是適用的,每一次進入和離開synchornized方法(代碼塊),就是一次完整的鎖獲取和釋放。

再次添加一次unlock以後的運行結果.png

 

1.3 鎖降級

要實現一個讀寫鎖,須要考慮不少細節,其中之一就是鎖升級和鎖降級的問題。什麼是升級和降級呢?ReadWriteLock的javadoc有一段話:

Can the write lock be downgraded to a read lock without allowing an intervening writer? Can a read lock be upgraded to a write lock, in preference to other waiting readers or writers?

翻譯過來的結果是:在不容許中間寫入的狀況下,寫入鎖能夠降級爲讀鎖嗎?讀鎖是否能夠升級爲寫鎖,優先於其餘等待的讀取或寫入操做?簡言之就是說,鎖降級:從寫鎖變成讀鎖;鎖升級:從讀鎖變成寫鎖,ReadWriteLock是否支持呢?讓咱們帶着疑問,進行一些Demo 測試代碼驗證。

Test Code 1

/**
 *Test Code 1
 **/
package test;

import java.util.concurrent.locks.ReentrantReadWriteLock;

public class Test1 {

    public static void main(String[] args) {
        ReentrantReadWriteLock rtLock = new ReentrantReadWriteLock();
        rtLock.readLock().lock();
        System.out.println("get readLock.");
        rtLock.writeLock().lock();
        System.out.println("blocking");
    }
}

Test Code 1 Result

TestCode1 Result.png

結論:上面的測試代碼會產生死鎖,由於同一個線程中,在沒有釋放讀鎖的狀況下,就去申請寫鎖,這屬於鎖升級,ReentrantReadWriteLock是不支持的

Test Code 2

/**
 *Test Code 2
 **/
package test;

import java.util.concurrent.locks.ReentrantReadWriteLock;

public class Test2 {

    public static void main(String[] args) {
        ReentrantReadWriteLock rtLock = new ReentrantReadWriteLock();  
        rtLock.writeLock().lock();  
        System.out.println("writeLock");  
          
        rtLock.readLock().lock();  
        System.out.println("get read lock");  
    }
}

Test Code 2 Result

 

TestCode2 Result.png


結論:ReentrantReadWriteLock支持鎖降級,上面代碼不會產生死鎖。這段代碼雖然不會致使死鎖,但沒有正確的釋放鎖。從寫鎖降級成讀鎖,並不會自動釋放當前線程獲取的寫鎖,仍然須要顯示的釋放,不然別的線程永遠也獲取不到寫鎖。

 

2. ReetrantReadWriteLock對比使用

2.1 Synchronized實現

在使用ReetrantReadWriteLock實現鎖機制前,咱們先看一下,多線程同時讀取文件時,用synchronized實現的效果

package test;

/**
 * 
 * synchronized實現
 * @author itbird
 *
 */
public class ReadAndWriteLockTest {

    public synchronized static void get(Thread thread) {
        System.out.println("start time:" + System.currentTimeMillis());
        for (int i = 0; i < 5; i++) {
            try {
                Thread.sleep(20);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(thread.getName() + ":正在進行讀操做……");
        }
        System.out.println(thread.getName() + ":讀操做完畢!");
        System.out.println("end time:" + System.currentTimeMillis());
    }

    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                get(Thread.currentThread());
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                get(Thread.currentThread());
            }
        }).start();
    }

}

讓咱們看一下運行結果:

synchronized實現的效果結果.png


從運行結果能夠看出,兩個線程的讀操做是順序執行的,整個過程大概耗時200ms。

 

2.2 ReetrantReadWriteLock實現

package test;

import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * 
 * ReetrantReadWriteLock實現
 * @author itbird
 *
 */
public class ReadAndWriteLockTest {

    public static void get(Thread thread) {
        ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
        lock.readLock().lock();
        System.out.println("start time:" + System.currentTimeMillis());
        for (int i = 0; i < 5; i++) {
            try {
                Thread.sleep(20);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(thread.getName() + ":正在進行讀操做……");
        }
        System.out.println(thread.getName() + ":讀操做完畢!");
        System.out.println("end time:" + System.currentTimeMillis());
        lock.readLock().unlock();
    }

    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                get(Thread.currentThread());
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                get(Thread.currentThread());
            }
        }).start();
    }

}

讓咱們看一下運行結果:

ReetrantReadWriteLock實現.png


從運行結果能夠看出,兩個線程的讀操做是同時執行的,整個過程大概耗時100ms。
經過兩次實驗的對比,咱們能夠看出來,ReetrantReadWriteLock的效率明顯高於Synchronized關鍵字。

 

3. ReetrantReadWriteLock讀寫鎖互斥關係

經過上面的測試代碼,咱們也能夠延伸得出一個結論,ReetrantReadWriteLock讀鎖使用共享模式,即:同時能夠有多個線程併發地讀數據。可是另外一個問題來了,寫鎖之間是共享模式仍是互斥模式?讀寫鎖之間是共享模式仍是互斥模式呢?下面讓咱們經過Demo進行一一驗證吧。

3.1 ReetrantReadWriteLock讀寫鎖關係

package test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * 
 * ReetrantReadWriteLock實現
 * @author itbird
 *
 */
public class ReadAndWriteLockTest {

    public static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    public static void main(String[] args) {
        //同時讀、寫
        ExecutorService service = Executors.newCachedThreadPool();
        service.execute(new Runnable() {
            @Override
            public void run() {
                readFile(Thread.currentThread());
            }
        });
        service.execute(new Runnable() {
            @Override
            public void run() {
                writeFile(Thread.currentThread());
            }
        });
    }

    // 讀操做
    public static void readFile(Thread thread) {
        lock.readLock().lock();
        boolean readLock = lock.isWriteLocked();
        if (!readLock) {
            System.out.println("當前爲讀鎖!");
        }
        try {
            for (int i = 0; i < 5; i++) {
                try {
                    Thread.sleep(20);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(thread.getName() + ":正在進行讀操做……");
            }
            System.out.println(thread.getName() + ":讀操做完畢!");
        } finally {
            System.out.println("釋放讀鎖!");
            lock.readLock().unlock();
        }
    }

    // 寫操做
    public static void writeFile(Thread thread) {
        lock.writeLock().lock();
        boolean writeLock = lock.isWriteLocked();
        if (writeLock) {
            System.out.println("當前爲寫鎖!");
        }
        try {
            for (int i = 0; i < 5; i++) {
                try {
                    Thread.sleep(20);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(thread.getName() + ":正在進行寫操做……");
            }
            System.out.println(thread.getName() + ":寫操做完畢!");
        } finally {
            System.out.println("釋放寫鎖!");
            lock.writeLock().unlock();
        }
    }
}

運行結果:

運行結果.png


結論:讀寫鎖的實現必須確保寫操做對讀操做的內存影響。換句話說,一個得到了讀鎖的線程必須能看到前一個釋放的寫鎖所更新的內容,讀寫鎖之間爲互斥。

 

3.2 ReetrantReadWriteLock寫鎖關係

package test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * 
 * ReetrantReadWriteLock實現
 * @author itbird
 *
 */
public class ReadAndWriteLockTest {

    public static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    public static void main(String[] args) {
        //同時寫
        ExecutorService service = Executors.newCachedThreadPool();
        service.execute(new Runnable() {
            @Override
            public void run() {
                writeFile(Thread.currentThread());
            }
        });
        service.execute(new Runnable() {
            @Override
            public void run() {
                writeFile(Thread.currentThread());
            }
        });
    }

    // 讀操做
    public static void readFile(Thread thread) {
        lock.readLock().lock();
        boolean readLock = lock.isWriteLocked();
        if (!readLock) {
            System.out.println("當前爲讀鎖!");
        }
        try {
            for (int i = 0; i < 5; i++) {
                try {
                    Thread.sleep(20);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(thread.getName() + ":正在進行讀操做……");
            }
            System.out.println(thread.getName() + ":讀操做完畢!");
        } finally {
            System.out.println("釋放讀鎖!");
            lock.readLock().unlock();
        }
    }

    // 寫操做
    public static void writeFile(Thread thread) {
        lock.writeLock().lock();
        boolean writeLock = lock.isWriteLocked();
        if (writeLock) {
            System.out.println("當前爲寫鎖!");
        }
        try {
            for (int i = 0; i < 5; i++) {
                try {
                    Thread.sleep(20);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(thread.getName() + ":正在進行寫操做……");
            }
            System.out.println(thread.getName() + ":寫操做完畢!");
        } finally {
            System.out.println("釋放寫鎖!");
            lock.writeLock().unlock();
        }
    }
}

運行結果:

 

運行結果.png

4. 總結

1.Java併發庫中ReetrantReadWriteLock實現了ReadWriteLock接口並添加了可重入的特性
2.ReetrantReadWriteLock讀寫鎖的效率明顯高於synchronized關鍵字
3.ReetrantReadWriteLock讀寫鎖的實現中,讀鎖使用共享模式;寫鎖使用獨佔模式,換句話說,讀鎖能夠在沒有寫鎖的時候被多個線程同時持有,寫鎖是獨佔的
4.ReetrantReadWriteLock讀寫鎖的實現中,須要注意的,當有讀鎖時,寫鎖就不能得到;而當有寫鎖時,除了得到寫鎖的這個線程能夠得到讀鎖外,其餘線程不能得到讀鎖

 

1.5. CountDownLatch

1.5.1. 概述

倒數計時器 一種典型的場景就是火箭發射。在火箭發射前,爲了保證萬無一失,每每還要進行各項設備、儀器的檢查。 只有等全部檢查完畢後,引擎才能點火。這種場景就很是適合使用CountDownLatch。它可使得點火線程 ,等待全部檢查線程所有完工後,再執行

1.5.2. 主要接口

static final CountDownLatch end = new CountDownLatch(10);

end.countDown();
end.await();

1.5.3 示意圖

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * description:
 *
 * @author: dawn.he QQ:       905845006
 * @email: dawn.he@cloudwise.com
 * @email: 905845006@qq.com
 * @date: 2019/10/2    5:45 PM
 */
public class CountDownLatchDemo implements  Runnable {
    static  final CountDownLatch end  = new CountDownLatch(10);
    static  final CountDownLatchDemo demo = new CountDownLatchDemo();
    @Override
    public void run() {

        try {
            //模擬檢查任務
            Thread.sleep(new Random().nextInt(10)*1000);
            System.out.println("check cpmplete");
            end.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService exec = Executors.newFixedThreadPool(10);
        for(int i = 0;i<10;i++){
            exec.submit(demo);
        }
        //等待檢查
        end.await();
        //發射火箭
        System.out.println("Fire!");
        exec.shutdown();
    }
}

1.6. CyclicBarrier

1.6.1. 概述

循環柵欄 Cyclic意爲循環,也就是說這個計數器能夠反覆使用。好比,假設咱們將計數器設置爲10。那麼湊齊第一批1 0個線程後,計數器就會歸零,而後接着湊齊下一批10個線程

1.6.2. 主要接口
public CyclicBarrier(int parties, Runnable barrierAction)

barrierAction就是當計數器一次計數完成後,系統會執行的動做 await()

1.6.3. 示意圖

import java.util.concurrent.CyclicBarrier;

/**
 * description:
 *
 * @author: dawn.he QQ:       905845006
 * @email: dawn.he@cloudwise.com
 * @email: 905845006@qq.com
 * @date: 2019/10/3    9:14 PM
 */
public class CyclicBarrierDemo {

    static class TaskThread extends Thread {

        CyclicBarrier barrier;

        public TaskThread(CyclicBarrier barrier) {
            this.barrier = barrier;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                System.out.println(getName() + " 到達柵欄 A");
                barrier.await();
                System.out.println(getName() + " 衝破柵欄 A");

                Thread.sleep(2000);
                System.out.println(getName() + " 到達柵欄 B");
                barrier.await();
                System.out.println(getName() + " 衝破柵欄 B");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        int threadNum = 5;
        CyclicBarrier barrier = new CyclicBarrier(threadNum, new Runnable() {

            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + " 完成最後任務");
            }
        });

        for(int i = 0; i < threadNum; i++) {
            new TaskThread(barrier).start();
        }
    }

}
Thread-3 到達柵欄 A
Thread-1 到達柵欄 A
Thread-4 到達柵欄 A
Thread-0 到達柵欄 A
Thread-2 到達柵欄 A
Thread-2 完成最後任務
Thread-2 衝破柵欄 A
Thread-1 衝破柵欄 A
Thread-3 衝破柵欄 A
Thread-0 衝破柵欄 A
Thread-4 衝破柵欄 A
Thread-2 到達柵欄 B
Thread-3 到達柵欄 B
Thread-4 到達柵欄 B
Thread-1 到達柵欄 B
Thread-0 到達柵欄 B
Thread-0 完成最後任務
Thread-0 衝破柵欄 B
Thread-2 衝破柵欄 B
Thread-1 衝破柵欄 B
Thread-4 衝破柵欄 B
Thread-3 衝破柵欄 B

1.7. LockSupport 1.7.1. 概述

提供線程阻塞原語 1.7.2. 主要接口

LockSupport.park(); LockSupport.unpark(t1);

1.7.3. 與suspend()比較 不容易引發線程凍結

1.7.4. 中斷響應 可以響應中斷,但不拋出異常。

中斷響應的結果是,park()函數的返回,能夠從Thread.interrupted()獲得中斷標誌

import java.util.concurrent.locks.LockSupport;

/**
 * description:
 *
 * @author: dawn.he QQ:       905845006
 * @email: dawn.he@cloudwise.com
 * @email: 905845006@qq.com
 * @date: 2019/10/3    9:36 PM
 */
public class ThreadParkTest {
    public static void main(String[] args) {
        MyThread mt = new MyThread();
        mt.setName("mt");
        mt.start();
        try {
            Thread.currentThread().sleep(10);
            mt.park();
            Thread.currentThread().sleep(10000);
            mt.unPark();
            Thread.currentThread().sleep(10000);
            mt.park();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    static class MyThread extends Thread {
        private boolean isPark = false;

        public void run() {
            System.out.println(" Enter Thread running.....");
            while (true) {
                if (isPark) {
                    System.out.println(Thread.currentThread().getName() + "Thread is Park.....");
                    LockSupport.park();
                }                //do something
                System.out.println(Thread.currentThread().getName() + ">> is running");
                try {
                    Thread.currentThread().sleep(1000);
                } catch (InterruptedException e) {                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }

        public void park() {
            isPark = true;
        }

        public void unPark() {
            isPark = false;
            LockSupport.unpark(this);
            System.out.println("Thread is unpark.....");
        }
    }
}

 

2.3. BlockingQueue

阻塞隊列

1:BlockingQueue繼承關係

  java.util.concurrent 包裏的 BlockingQueue是一個接口, 繼承Queue接口,Queue接口繼承 Collection

 

  BlockingQueue----->Queue-->Collection

 圖:

 

隊列的特色是:先進先出(FIFO)

 

2:BlockingQueue的方法

BlockingQueue 具備 4 組不一樣的方法用於插入、移除以及對隊列中的元素進行檢查。若是請求的操做不能獲得當即執行的話,每一個方法的表現也不一樣。這些方法以下:

 

 

  拋出異常 特殊值 阻塞 超時
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
檢查 element() peek() 不可用 不可用
 

 

 

四組不一樣的行爲方式解釋:

1(異常)

若是試圖的操做沒法當即執行,拋一個異常。

2(特定值) 

若是試圖的操做沒法當即執行,返回一個特定的值(經常是 true / false)。

3(阻塞) 

若是試圖的操做沒法當即執行,該方法調用將會發生阻塞,直到可以執行。

4(超時) 

若是試圖的操做沒法當即執行,該方法調用將會發生阻塞,直到可以執行,但等待時間不會超過給定值。返回一個特定值以告知該操做是否成功(典型的是 true / false)。

 

不能向BlockingQueue插入一個空對象,不然會拋出NullPointerException,相應的實現類校驗代碼

 
  1. private static void checkNotNull(Object v) {

  2.         if (v == null)

  3.             throw new NullPointerException();

  4.     }

 

BlockingQueue :不接受 null 元素。試圖 add、put 或 offer 一個 null 元素時,某些實現會拋出 NullPointerException。null 被用做指示 poll 操做失敗的警惕值。

BlockingQueue: 能夠是限定容量的。它在任意給定時間均可以有一個 remainingCapacity,超出此容量,便沒法無阻塞地 put 附加元素。沒有任何內部容量約束的 BlockingQueue 老是報告Integer.MAX_VALUE 的剩餘容量。

BlockingQueue :實現主要用於生產者-使用者隊列,但它另外還支持 Collection 接口。所以,舉例來講,使用 remove(x) 從隊列中移除任意一個元素是有可能的。然而,這種操做一般 會有效執行,只能有計劃地偶爾使用,好比在取消排隊信息時。

BlockingQueue :實現是線程安全的。全部排隊方法均可以使用內部鎖或其餘形式的併發控制來自動達到它們的目的。然而,大量的 Collection 操做(addAll、containsAll、retainAll 和removeAll)沒有 必要自動執行,除非在實現中特別說明。所以,舉例來講,在只添加了 c 中的一些元素後,addAll(c) 有可能失敗(拋出一個異常)。

 

BlockingQueue 實質上 支持使用任何一種「close」或「shutdown」操做來指示再也不添加任何項。這種功能的需求和使用有依賴於實現的傾向。例如,一種經常使用的策略是:對於生產者,插入特殊的 end-of-stream 或 poison 對象,並根據使用者獲取這些對象的時間來對它們進行解釋。

 

3:BlockingQueue實現類和繼承接口

   ArrayBlockingQueue

    DelayQueue

    LinkedBlockingQueue

    PriorityBlockingQueue

    SynchronousQueue

 

繼承他的接口:

   public interface BlockingDeque extends BlockingQueue, Deque 1.6新增

   public interface TransferQueue extends BlockingQueue           1.7新增

 

4:BlockingQueue用法

BlockingQueue 一般用於一個線程生產對象,而另一個線程消費這些對象的場景。下圖是對這個原理的闡述:

 

 

 一個線程往裏邊放,另一個線程從裏邊取的一個 BlockingQueue。

 一個線程將會持續生產新對象並將其插入到隊列之中,直到隊列達到它所能容納的臨界點。也就是說,它是有限的。若是該阻塞隊列到達了其臨界點,負責生產的線程將會在往裏邊插入新對象時發生阻塞。它會一直處於阻塞之中,直到負責消費的線程從隊列中拿走一個對象。

 負責消費的線程將會一直從該阻塞隊列中拿出對象。若是消費線程嘗試去從一個空的隊列中提取對象的話,這個消費線程將會處於阻塞之中,直到一個生產線程把一個對象丟進隊列。

 

5:BlockingQueue Example

 

public class BlockingQueueExample {

    public static void main(String[] args) throws Exception {

        BlockingQueue queue = new ArrayBlockingQueue(1024);

        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);

        new Thread(producer).start();
        new Thread(consumer).start();

        Thread.sleep(4000);
    }
}

 

public class Producer implements Runnable{

    protected BlockingQueue queue = null;

    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            queue.put("1");
            Thread.sleep(1000);
            queue.put("2");
            Thread.sleep(1000);
            queue.put("3");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 

 

public class Consumer implements Runnable{

    protected BlockingQueue queue = null;

    public Consumer(BlockingQueue queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            System.out.println(queue.take());
            System.out.println(queue.take());
            System.out.println(queue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 

 

 

 

6:BlockingQueue實現類詳解

 

1  數組阻塞隊列 ArrayBlockingQueue

一個由數組支持的有界阻塞隊列。此隊列按 FIFO(先進先出)原則對元素進行排序。隊列的頭部 是在隊列中存在時間最長的元素。隊列的尾部 是在隊列中存在時間最短的元素。新元素插入到隊列的尾部,隊列獲取操做則是從隊列頭部開始得到元素。

這是一個典型的「有界緩存區」,固定大小的數組在其中保持生產者插入的元素和使用者提取的元素。一旦建立了這樣的緩存區,就不能再增長其容量。試圖向已滿隊列中放入元素會致使操做受阻塞;試圖從空隊列中提取元素將致使相似阻塞。

此類支持對等待的生產者線程和使用者線程進行排序的可選公平策略。默認狀況下,不保證是這種排序。然而,經過將公平性 (fairness) 設置爲 true 而構造的隊列容許按照 FIFO 順序訪問線程。公平性一般會下降吞吐量,但也減小了可變性和避免了「不平衡性」

 

 

 
  1. BlockingQueue queue = new ArrayBlockingQueue(1024);

  2. queue.put("1");String string = queue.take();

 

2:延遲隊列DelayQueue

 

Delayed 元素的一個無界阻塞隊列,只有在延遲期滿時才能從中提取元素。該隊列的頭部 是延遲期滿後保存時間最長的 Delayed 元素。若是延遲都尚未期滿,則隊列沒有頭部,而且 poll 將返回 null。當一個元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一個小於等於 0 的值時,將發生到期。即便沒法使用 take 或 poll 移除未到期的元素,也不會將這些元素做爲正常元素對待。例如,size 方法同時返回到期和未到期元素的計數。此隊列不容許使用 null 元素

 

3. 鏈阻塞隊列 LinkedBlockingQueue

LinkedBlockingQueue 類實現了 BlockingQueue 接口。

LinkedBlockingQueue 內部以一個鏈式結構(連接節點)對其元素進行存儲。若是須要的話,這一鏈式結構能夠選擇一個上限。若是沒有定義上限,將使用 Integer.MAX_VALUE 做爲上限。

LinkedBlockingQueue 內部以 FIFO(先進先出)的順序對元素進行存儲。隊列中的頭元素在全部元素之中是放入時間最久的那個,而尾元素則是最短的那個。

 

 

 
  1. BlockingQueue unbounded = new LinkedBlockingQueue();

  2. BlockingQueue bounded   = new LinkedBlockingQueue(1024);bounded.put("Value");

  3. String value = bounded.take();

  4. System.out.println(value);

  5. System.out.println(unbounded.remainingCapacity()==Integer.MAX_VALUE);//true

 

 

4. 具備優先級的阻塞隊列 PriorityBlockingQueue

PriorityBlockingQueue 類實現了 BlockingQueue 接口。

一個無界阻塞隊列,它使用與類 PriorityQueue 相同的順序規則,而且提供了阻塞獲取操做。雖然此隊列邏輯上是無界的,可是資源被耗盡時試圖執行 add 操做也將失敗(致使OutOfMemoryError)。此類不容許使用 null 元素。依賴天然順序的優先級隊列也不容許插入不可比較的對象(這樣作會致使拋出 ClassCastException)。

此類及其迭代器能夠實現 Collection 和 Iterator 接口的全部可選 方法。iterator() 方法中提供的迭代器並不 保證以特定的順序遍歷 PriorityBlockingQueue 的元素。若是須要有序地進行遍歷,則應考慮使用 Arrays.sort(pq.toArray())。此外,可使用方法 drainTo 按優先級順序移除 所有或部分元素,並將它們放在另外一個 collection 中。

在此類上進行的操做不保證具備同等優先級的元素的順序。若是須要實施某一排序,那麼能夠定義自定義類或者比較器,比較器可以使用修改鍵斷開主優先級值之間的聯繫。例如,如下是應用先進先出 (first-in-first-out) 規則斷開可比較元素之間聯繫的一個類。要使用該類,則須要插入一個新的 FIFOEntry(anEntry) 來替換普通的條目對象。

 


5. 同步隊列 SynchronousQueue

SynchronousQueue 類實現了 BlockingQueue 接口。

SynchronousQueue 是一個特殊的隊列,它的內部同時只可以容納單個元素。若是該隊列已有一元素的話,試圖向隊列中插入一個新元素的線程將會阻塞,直到另外一個線程將該元素從隊列中抽走。一樣,若是該隊列爲空,試圖向隊列中抽取一個元素的線程將會阻塞,直到另外一個線程向隊列中插入了一條新的元素。

據此,把這個類稱做一個隊列顯然是誇大其詞了。它更多像是一個匯合點。

 


6. 阻塞雙端隊列 BlockingDeque

java.util.concurrent 包裏的 BlockingDeque 接口表示一個線程安放入和提取實例的雙端隊列。本小節我將給你演示如何使用 BlockingDeque。

BlockingDeque 類是一個雙端隊列,在不可以插入元素時,它將阻塞住試圖插入元素的線程;在不可以抽取元素時,它將阻塞住試圖抽取的線程。

deque(雙端隊列) 是 "Double Ended Queue" 的縮寫。所以,雙端隊列是一個你能夠從任意一端插入或者抽取元素的隊列。

 

 

7. 鏈阻塞雙端隊列 LinkedBlockingDeque

 

 

 一個基於已連接節點的、任選範圍的阻塞雙端隊列。

 

 可選的容量範圍構造方法參數是一種防止過分膨脹的方式。若是未指定容量,那麼容量將等於 Integer.MAX_VALUE。只要插入元素不會使雙端隊列超出容量,每次插入後都將動態地建立連接節點。

 

 大多數操做都以固定時間運行(不計阻塞消耗的時間)。異常包括 remove、removeFirstOccurrence、removeLastOccurrence、contains、iterator.remove() 以及批量操做,它們均以線性時間運行。

 

2.4. ConcurrentLinkedQueue

ConcurrentLinkedQueue使用和方法介紹

定義

一個基於連接節點的無界線程安全隊列。此隊列按照 FIFO(先進先出)原則對元素進行排序。隊列的頭部 是隊列中時間最長的元素。隊列的尾部 是隊列中時間最短的元素。
新的元素插入到隊列的尾部,隊列獲取操做從隊列頭部得到元素。當多個線程共享訪問一個公共 collection 時,ConcurrentLinkedQueue 是一個恰當的選擇。此隊列不容許使用 null 元素。

offer和poll

offer(E e) 
          將指定元素插入此隊列的尾部。

poll() 
          獲取並移除此隊列的頭,若是此隊列爲空,則返回 null。

複製代碼

public static void main(String[] args) {
        ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();
        queue.offer("哈哈哈");
        System.out.println("offer後,隊列是否空?" + queue.isEmpty());
        System.out.println("從隊列中poll:" + queue.poll());
        System.out.println("pool後,隊列是否空?" + queue.isEmpty());
    }

複製代碼

offer是往隊列添加元素,poll是從隊列取出元素而且刪除該元素

執行結果

offer後,隊列是否空?false
從隊列中poll:哈哈哈
pool後,隊列是否空?true

 

ConcurrentLinkedQueue中的add() 和 offer() 徹底同樣,都是往隊列尾部添加元素

還有個取元素方法peek

peek() 
          獲取但不移除此隊列的頭;若是此隊列爲空,則返回 null

複製代碼

public static void main(String[] args) {
        ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();
        queue.offer("哈哈哈");
        System.out.println("offer後,隊列是否空?" + queue.isEmpty());
        System.out.println("從隊列中peek:" + queue.peek());
        System.out.println("從隊列中peek:" + queue.peek());
        System.out.println("從隊列中peek:" + queue.peek());
        System.out.println("pool後,隊列是否空?" + queue.isEmpty());
    }

複製代碼

執行結果:

offer後,隊列是否空?false
從隊列中peek:哈哈哈
從隊列中peek:哈哈哈
從隊列中peek:哈哈哈
pool後,隊列是否空?false

remove

remove(Object o) 
          從隊列中移除指定元素的單個實例(若是存在)

複製代碼

public static void main(String[] args) {
        ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();
        queue.offer("哈哈哈");
        System.out.println("offer後,隊列是否空?" + queue.isEmpty());
        System.out.println("從隊列中remove已存在元素 :" + queue.remove("哈哈哈"));
        System.out.println("從隊列中remove不存在元素:" + queue.remove("123"));
        System.out.println("remove後,隊列是否空?" + queue.isEmpty());
    }

複製代碼

remove一個已存在元素,會返回true,remove不存在元素,返回false

執行結果:

offer後,隊列是否空?false
從隊列中remove已存在元素 :true
從隊列中remove不存在元素:false
remove後,隊列是否空?true

size or isEmpty

size() 
          返回此隊列中的元素數量

注意:

若是此隊列包含的元素數大於 Integer.MAX_VALUE,則返回 Integer.MAX_VALUE。
須要當心的是,與大多數 collection 不一樣,此方法不是 一個固定時間操做。因爲這些隊列的異步特性,肯定當前的元素數須要進行一次花費 O(n) 時間的遍歷。
因此在須要判斷隊列是否爲空時,儘可能不要用 queue.size()>0,而是用 !queue.isEmpty()

比較size()和isEmpty() 效率的示例:

場景:10000我的去飯店吃飯,10張桌子供飯,分別比較size() 和 isEmpty() 的耗時

複製代碼

public class Test01ConcurrentLinkedQueue {
    public static void main(String[] args) throws InterruptedException {
        int peopleNum = 10000;//吃飯人數
        int tableNum = 10;//飯桌數量

        ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
        CountDownLatch count = new CountDownLatch(tableNum);//計數器

        //將吃飯人數放入隊列(吃飯的人進行排隊)
        for(int i=1;i<=peopleNum;i++){
            queue.offer("消費者_" + i);
        }
        //執行10個線程從隊列取出元素(10個桌子開始供飯)
        System.out.println("-----------------------------------開飯了-----------------------------------");
        long start = System.currentTimeMillis();
        ExecutorService executorService = Executors.newFixedThreadPool(tableNum);
        for(int i=0;i<tableNum;i++) {
            executorService.submit(new Dinner("00" + (i+1), queue, count));
        }
        //計數器等待,知道隊列爲空(全部人吃完)
        count.await();
        long time = System.currentTimeMillis() - start;
        System.out.println("-----------------------------------全部人已經吃完-----------------------------------");
        System.out.println("共耗時:" + time);
        //中止線程池
        executorService.shutdown();
    }

    private static class Dinner implements Runnable{
        private String name;
        private ConcurrentLinkedQueue<String> queue;
        private CountDownLatch count;

        public Dinner(String name, ConcurrentLinkedQueue<String> queue, CountDownLatch count) {
            this.name = name;
            this.queue = queue;
            this.count = count;
        }

        @Override
        public void run() {
            //while (queue.size() > 0){
            while (!queue.isEmpty()){
                //從隊列取出一個元素 排隊的人少一個
                System.out.println("【" +queue.poll() + "】----已吃完..., 飯桌編號:" + name);
            }
            count.countDown();//計數器-1
        }
    }
}

複製代碼

執行結果:

使用size耗時:757ms

使用isEmpty耗時:210

當數據量越大,這種耗時差距越明顯。因此這種判斷用isEmpty 更加合理

 

contains

contains(Object o) 
          若是此隊列包含指定元素,則返回 true

public static void main(String[] args) throws InterruptedException {
        ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();
        queue.offer("123");
        System.out.println(queue.contains("123"));
        System.out.println(queue.contains("234"));
    }

執行結果:

toArray

toArray() 
          返回以恰當順序包含此隊列全部元素的數組

toArray(T[] a) 
          返回以恰當順序包含此隊列全部元素的數組;返回數組的運行時類型是指定數組的運行時類型

複製代碼

public static void main(String[] args) throws InterruptedException {
        ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();
        queue.offer("123");
        queue.offer("234");
        Object[] objects = queue.toArray();
        System.out.println(objects[0] + ", " + objects[1]);

        //將數據存儲到指定數組
        String[] strs = new String[2];
        queue.toArray(strs);
        System.out.println(strs[0] + ", " + strs[1]);
    }

複製代碼

執行結果:

iterator

iterator() 
          返回在此隊列元素上以恰當順序進行迭代的迭代器

複製代碼

public static void main(String[] args) throws InterruptedException {
        ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();
        queue.offer("123");
        queue.offer("234");
        Iterator<String> iterator = queue.iterator();
        while (iterator.hasNext()){
            System.out.println(iterator.next());
        }
    }

複製代碼

 

ConcurrentLinkedQueue文檔說明:

構造方法摘要
ConcurrentLinkedQueue() 
          建立一個最初爲空的 ConcurrentLinkedQueue。
ConcurrentLinkedQueue(Collection<? extends E> c) 
          建立一個最初包含給定 collection 元素的 ConcurrentLinkedQueue,按照此 collection 迭代器的遍歷順序來添加元素。

 

 

方法摘要
 boolean add(E e) 
          將指定元素插入此隊列的尾部。
 boolean contains(Object o) 
          若是此隊列包含指定元素,則返回 true。
 boolean isEmpty() 
          若是此隊列不包含任何元素,則返回 true。
 Iterator<E> iterator() 
          返回在此隊列元素上以恰當順序進行迭代的迭代器。
 boolean offer(E e) 
          將指定元素插入此隊列的尾部。
 E peek() 
          獲取但不移除此隊列的頭;若是此隊列爲空,則返回 null。
 E poll() 
          獲取並移除此隊列的頭,若是此隊列爲空,則返回 null。
 boolean remove(Object o) 
          從隊列中移除指定元素的單個實例(若是存在)。
 int size() 
          返回此隊列中的元素數量。
 Object[] toArray() 
          返回以恰當順序包含此隊列全部元素的數組。
<T> T[]
toArray(T[] a)            返回以恰當順序包含此隊列全部元素的數組;返回數組的運行時類型是指定數組的運行時類型。
相關文章
相關標籤/搜索