一次聊天引起的思考--java併發包實戰

一次聊天,談到了死鎖的解決、可重入鎖等等,忽然發現這些離本身很遠,只有一些讀書時的概念涌入腦海,但各自的應用場景怎麼都沒法想出。痛定思痛,決定看看concurrent包裏涉及併發的類及各自的應用場景。html

第一類:原子操做類的atomic包,裏面包含了java

1)布爾類型的AtomicBooleangit

2)整型AtomicInteger、AtomicIntegerArray、AtomicIntegerFieldUpdatergithub

3)長整型AtomicLong、AtomicLongArray、AtomicLongFieldUpdaterapi

4)引用型AtomicMarkableReference、AtomicReference、AtomicReferenceArray、AtomicReferenceFieldUpdater、AtomicStampedReference數組

5)累加器DoubleAccumulator、DoubleAdder、LongAccumulator、LongAdder、Striped64安全

java.util.concurrent.atomic原子操做類包數據結構

這個包裏面提供了一組原子變量類。其基本的特性就是在多線程環境下,當有多個線程同時執行這些類的實例包含的方法時,具備排他性,即當某個線程進入方法,執行其中的指令時,不會被其餘線程打斷,而別的線程就像自旋鎖同樣,一直等到該方法執行完成,才由JVM從等待隊列中選擇一個另外一個線程進入,這只是一種邏輯上的理解。其實是藉助硬件的相關指令來實現的,不會阻塞線程(或者說只是在硬件級別上阻塞了)。能夠對基本數據、數組中的基本數據、對類中的基本數據進行操做。原子變量類至關於一種泛化的volatile變量,可以支持原子的和有條件的讀-改-寫操做。多線程

java.util.concurrent.atomic中的類能夠分紅4組:併發

標量類(Scalar):AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference
數組類:AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray
更新器類:AtomicLongFieldUpdater,AtomicIntegerFieldUpdater,AtomicReferenceFieldUpdater
複合變量類:AtomicMarkableReference,AtomicStampedReference
第一組AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference這四種基本類型用來處理布爾,整數,長整數,對象四種數據,其內部實現不是簡單的使用synchronized,而是一個更爲高效的方式CAS (compare and swap) + volatile和native方法,從而避免了synchronized的高開銷,執行效率大爲提高。如AtomicInteger的實現片段爲:

private static final Unsafe unsafe = Unsafe.getUnsafe();
private volatile int value;
public final int get() {
        return value;
}
public final void set(int newValue) {
        value = newValue;
}
public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

構造函數(兩個構造函數)

  • 默認的構造函數:初始化的數據分別是false,0,0,null
  • 帶參構造函數:參數爲初始化的數據
    set( )和get( )方法:能夠原子地設定和獲取atomic的數據。相似於volatile,保證數據會在主存中設置或讀取
    void set()和void lazySet():set設置爲給定值,直接修改原始值;lazySet延時設置變量值,這個等價於set()方法,可是因爲字段是volatile類型的,所以次字段的修改會比普通字段(非volatile字段)有稍微的性能延時(儘管能夠忽略),因此若是不是想當即讀取設置的新值,容許在「後臺」修改值,那麼此方法就頗有用。
    getAndSet( )方法
  • 原子的將變量設定爲新數據,同時返回先前的舊數據
  • 其本質是get( )操做,而後作set( )操做。儘管這2個操做都是atomic,可是他們合併在一塊兒的時候,就不是atomic。在Java的源程序的級別上,若是不依賴synchronized的機制來完成這個工做,是不可能的。只有依靠native方法才能夠。
public final int getAndSet(int newValue) {
        for (;;) {
            int current = get();
            if (compareAndSet(current, newValue))
                return current;
        }
    }

compareAndSet( ) 和weakCompareAndSet( )方法
這 兩個方法都是conditional modifier方法。這2個方法接受2個參數,一個是指望數據(expected),一個是新數據(new);若是atomic裏面的數據和指望數據一 致,則將新數據設定給atomic的數據,返回true,代表成功;不然就不設定,並返回false。JSR規範中說:以原子方式讀取和有條件地寫入變量但不 建立任何 happen-before 排序,所以不提供與除 weakCompareAndSet 目標外任何變量之前或後續讀取或寫入操做有關的任何保證。大意就是說調用weakCompareAndSet時並不能保證不存在happen- before的發生(也就是可能存在指令重排序致使此操做失敗)。可是從Java源碼來看,其實此方法並無實現JSR規範的要求,最後效果和 compareAndSet是等效的,都調用了unsafe.compareAndSwapInt()完成操做。

public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
public final boolean weakCompareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

對於 AtomicInteger、AtomicLong還提供了一些特別的方法。
getAndIncrement( ):以原子方式將當前值加 1,至關於線程安全的i++操做。
incrementAndGet( ):以原子方式將當前值加 1, 至關於線程安全的++i操做。
getAndDecrement( ):以原子方式將當前值減 1, 至關於線程安全的i--操做。
decrementAndGet ( ):以原子方式將當前值減 1,至關於線程安全的--i操做。
addAndGet( ): 以原子方式將給定值與當前值相加, 實際上就是等於線程安全的i =i+delta操做。
getAndAdd( ):以原子方式將給定值與當前值相加, 至關於線程安全的t=i;i+=delta;return t;操做。
以實現一些加法,減法原子操做。(注意 --i、++i不是原子操做,其中包含有3個操做步驟:第一步,讀取i;第二步,加1或減1;第三步:寫回內存)

使用AtomicReference建立線程安全的堆棧

package thread;
import java.util.concurrent.atomic.AtomicReference;
public class ConcurrentStack<T> {
    private AtomicReference<Node<T>>    stacks    = new AtomicReference<Node<T>>();
    public T push(T e) {
        Node<T> oldNode, newNode;
        for (;;) { // 這裏的處理很是的特別,也是必須如此的。
            oldNode = stacks.get();
            newNode = new Node<T>(e, oldNode);
            if (stacks.compareAndSet(oldNode, newNode)) {
                return e;
            }
        }
    }    
    public T pop() {
        Node<T> oldNode, newNode;
        for (;;) {
            oldNode = stacks.get();
            newNode = oldNode.next;
            if (stacks.compareAndSet(oldNode, newNode)) {
                return oldNode.object;
            }
        }
    }    
    private static final class Node<T> {
        private T        object;        
        private Node<T>    next;        
        private Node(T object, Node<T> next) {
            this.object = object;
            this.next = next;
        }
    }    
}

雖然原子的標量類擴展了Number類,但並無擴展一些基本類型的包裝類,如Integer或Long,事實上他們也不能擴展:基本類型的包裝類是不能夠修改的,而原子變量類是能夠修改的。在原子變量類中沒有從新定義hashCode或equals方法,每一個實例都是不一樣的,他們也不宜用作基於散列容器中的鍵值。
第二組AtomicIntegerArray,AtomicLongArray還有AtomicReferenceArray類進一步擴展了原子操做,對這些類型的數組提供了支持。這些類在爲其數組元素提供 volatile 訪問語義方面也引人注目,這對於普通數組來講是不受支持的。

他們內部並非像AtomicInteger同樣維持一個valatile變量,而是所有由native方法實現,以下
AtomicIntegerArray的實現片段:

private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final int base = unsafe.arrayBaseOffset(int[].class);
private static final int scale = unsafe.arrayIndexScale(int[].class);
private final int[] array;
public final int get(int i) {
        return unsafe.getIntVolatile(array, rawIndex(i));
}
public final void set(int i, int newValue) {
        unsafe.putIntVolatile(array, rawIndex(i), newValue);
}

第三組AtomicLongFieldUpdater,AtomicIntegerFieldUpdater,AtomicReferenceFieldUpdater基於反射的實用工具,能夠對指定類的指定 volatile 字段進行原子更新。API很是簡單,可是也是有一些約束:

(1)字段必須是volatile類型的

(2)字段的描述類型(修飾符public/protected/default/private)是與調用者與操做對象字段的關係一致。也就是說 調用者可以直接操做對象字段,那麼就能夠反射進行原子操做。可是對於父類的字段,子類是不能直接操做的,儘管子類能夠訪問父類的字段。

(3)只能是實例變量,不能是類變量,也就是說不能加static關鍵字。

(4)只能是可修改變量,不能使final變量,由於final的語義就是不可修改。實際上final的語義和volatile是有衝突的,這兩個關鍵字不能同時存在。

(5)對於AtomicIntegerFieldUpdater 和AtomicLongFieldUpdater 只能修改int/long類型的字段,不能修改其包裝類型(Integer/Long)。若是要修改包裝類型就須要使用AtomicReferenceFieldUpdater 。

netty5.0中類ChannelOutboundBuffer統計發送的字節總數,因爲使用volatile變量已經不能知足,因此使用AtomicIntegerFieldUpdater 來實現的,看下面代碼:

//定義
    private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
            AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");

    private volatile long totalPendingSize;

//使用
        long oldValue = totalPendingSize;
        long newWriteBufferSize = oldValue + size;
        while (!TOTAL_PENDING_SIZE_UPDATER.compareAndSet(this, oldValue, newWriteBufferSize)) {
            oldValue = totalPendingSize;
            newWriteBufferSize = oldValue + size;
        }

第二類:鎖的類包,裏面包含了

排他鎖:AbstractOwnableSynchronizer、AbstractQueuedLongSynchronizer、AbstractQueuedSynchronizer

讀寫鎖、可重入鎖:ReadWriteLock、ReentrantLock、Lock、ReentrantReadWriteLock(隱式包含讀鎖和寫鎖)、Condition、LockSupport

混合鎖:StampedLock

condition類似於對象的監控方法object#wait()、object#notify、object#notifyAll,但不一樣之處在於:經過和任意Lock的實現類聯合使用,Condition對每一個對象提供了多個等待-設置功能。

此時Lock代替了synchronized方法和模塊,condition代替了對象的監控方法。

Condition一般也稱做Condition隊列或者condition變量,它提供了一種方法,使一個線程可以暫停執行(wait方法),當別的線程的狀態condition爲true時能夠激活此線程。因爲不一樣線程共享的狀態信息必須受到保護,所以Condition具備一些鎖的形式。等待一個condition的關鍵屬性是自動釋放關聯的鎖而且暫停當前線程,相似於object.wait。

Conditon示例內部綁定了一個鎖,獲取一個特定鎖的實例的Condition實例能夠經過lock#newCondition方法獲得。

舉個condition的示例,先看一下生產者和消費者模式常規代碼:

/** 
         * 生產指定數量的產品 
         * 
         * @param neednum 
         */ 
        public synchronized void produce(int neednum) { 
                //測試是否須要生產 
                while (neednum + curnum > max_size) { 
                        System.out.println("要生產的產品數量" + neednum + "超過剩餘庫存量" + (max_size - curnum) + ",暫時不能執行生產任務!"); 
                        try { 
                                //當前的生產線程等待 
                                wait(); 
                        } catch (InterruptedException e) { 
                                e.printStackTrace(); 
                        } 
                } 
                //知足生產條件,則進行生產,這裏簡單的更改當前庫存量 
                curnum += neednum; 
                System.out.println("已經生產了" + neednum + "個產品,現倉儲量爲" + curnum); 
                //喚醒在此對象監視器上等待的全部線程 
                notifyAll(); 
        } 

        /** 
         * 消費指定數量的產品 
         * 
         * @param neednum 
         */ 
        public synchronized void consume(int neednum) { 
                //測試是否可消費 
                while (curnum < neednum) { 
                        try { 
                                //當前的生產線程等待 
                                wait(); 
                        } catch (InterruptedException e) { 
                                e.printStackTrace(); 
                        } 
                } 
                //知足消費條件,則進行消費,這裏簡單的更改當前庫存量 
                curnum -= neednum; 
                System.out.println("已經消費了" + neednum + "個產品,現倉儲量爲" + curnum); 
                //喚醒在此對象監視器上等待的全部線程 
                notifyAll(); 
        }

咱們但願保證生產者的produce線程和消費者的consume線程有不一樣的等待--設置,這樣,當buffer中的項目或者空間可用時,咱們只須要每次只通知一個單線程就能夠了。優化只要使用兩個Condition實例便可(略有改動):

class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    final Condition notFull  = lock.newCondition(); 
    final Condition notEmpty = lock.newCondition(); 

    final Object[] items = new Object[100];
    int putptr, takeptr, count;

    public void produce(Object x) throws InterruptedException {
      lock.lock();
      try {
        while (count == items.length)
          notFull.await();
        items[putptr] = x;
        if (++putptr == items.length) putptr = 0;
        ++count;
        notEmpty.signal();
      } finally {
        lock.unlock();
      }
    }

    public Object consume() throws InterruptedException {
      lock.lock();
      try {
        while (count == 0)
          notEmpty.await();
        Object x = items[takeptr];
        if (++takeptr == items.length) takeptr = 0;
        --count;
        notFull.signal();
        return x;
      } finally {
        lock.unlock();
      }
    }
  }

java.util.concurrent.ArrayBlockingQueue提供了上述功能,所以沒有必要實現這個示例的類。

Condition實現類能夠提供和對象監控方法不一樣的行爲和語義,例如保證通知的順序,或者當執行通知時不要求保持一個鎖。若condition實現類提供了上述特定的語義,那麼實現類必須以文檔的形式聲明這些語義。

ReentrantReadWriteLock的讀鎖與寫鎖

讀鎖是排寫鎖操做的,讀鎖不排讀鎖操做,多個讀鎖能夠併發不阻塞。在讀鎖獲取和讀鎖釋放以前,寫鎖並不能被任何線程獲取。多個讀鎖同時做用期間,試圖獲取寫鎖的線程都處於等待狀態,當最後一個讀鎖釋放後,試圖獲取寫鎖的線程纔有機會獲取寫鎖。
寫鎖是排寫鎖,排讀鎖操做的。當一個線程獲取到寫鎖以後,其餘試圖獲取寫鎖和試圖獲取讀鎖的線程都處於等待狀態,直到寫鎖被釋放。
同時,寫鎖中是能夠獲取讀鎖,可是讀鎖中是沒法獲取寫鎖的。

下面的是java的ReentrantReadWriteLock官方示例,來解讀一下吧。

class CachedData {
    Object data;
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

void processCachedData() {
  rwl.readLock().lock();// @1
  if (!cacheValid) {
    // Must release read lock before acquiring write lock
    rwl.readLock().unlock(); // @3
    rwl.writeLock().lock(); // @2
    try {
      // Recheck state because another thread might have
      // acquired write lock and changed state before we did.
      if (!cacheValid) {
        data = ...
        cacheValid = true;
      }
      // Downgrade by acquiring read lock before releasing write lock
      rwl.readLock().lock(); //@4
    } finally {
      rwl.writeLock().unlock(); // Unlock write, still hold read @5
    }
  }

  try {
    use(data);
  } finally {
    rwl.readLock().unlock(); // 6
  }
}

當ABC三個線程同時進入到processcachedData()方法,同時都會獲得讀鎖,而後獲取cachevalid,而後走到3位置釋放讀鎖,同時,假設A線程獲取到寫鎖,因此BC線程就沒法獲取到寫鎖,這個時候進來的D線程就會停留在1位置而沒法獲取讀鎖。A線程繼續往下走,判斷到cachevalid仍是false,就會繼續走下去。爲何這個地方會還有一次判斷,上面註釋很清楚,A線程寫完以後,BC線程獲取到寫鎖,若是再也不次進行判斷,就會寫入新的數據了,就再也不是同步鎖了。因此這個地方有一個新的判斷。回到A線程,A線程繼續進行操做,到達4以後,獲取到讀鎖,這個地方api官方解釋就是,寫鎖要釋放的時候,必須先降級成讀鎖,這樣其餘在等待寫鎖的好比BC,就不會獲取到寫鎖了。而後釋放寫鎖,這就是寫鎖的降級,釋放寫鎖以後,由於還持有讀鎖,因此BC線程沒法獲取到寫鎖,只有在A線程執行到6的時候,BC線程纔會拿到寫鎖,進行判斷,就會發現數據已經有了,釋放寫鎖,釋放讀鎖。

讀寫鎖可以有效的在讀操做明顯大於寫操做的需求中完成高效率的運轉。

第三類:併發數據結構,包含了array、linkedList、set、map、list、queue等併發數據結構,包含以下:

阻塞數據結構:ArrayBlockingQueue、BlockingDeque、BlockingQueue、LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、

併發數據結構:ConcurrentHashMap、ConcurrentLinkedDeque、ConcurrentLinkedQueue、ConcurrentMap、ConcurrentNavigableMap、ConcurrentSkipListMap、ConcurrentSkipListSet

第四類:同步器 ,這部分主要是對線程集合的管理的實現,有Semaphore,CyclicBarrier, CountDownLatch,Exchanger等一些類。

Semaphore
  類 java.util.concurrent.Semaphore 提供了一個計數信號量,從概念上講,信號量維護了一個許可集。若有必要,在許可可用前會阻塞每個 acquire(),而後再獲取該許可。每一個 release()添加一個許可,從而可能釋放一個正在阻塞的獲取者。可是,不使用實際的許可對象,Semaphore只對可用許可的號碼進行計數,並採起相應的行動。
  Semaphore 一般用於限制能夠訪問某些資源(物理或邏輯的)的線程數目。 示例以下:

import java.util.*;import java.util.concurrent.*;

public class SemApp
{
    public static void main(String[] args)
{
        Runnable limitedCall = new Runnable() {
            final Random rand = new Random();
            final Semaphore available = new Semaphore(3);
            int count = 0;
            public void run()
{
                int time = rand.nextInt(15);
                int num = count++;

                try
                {
                    available.acquire();

                    System.out.println("Executing " + 
                        "long-running action for " + 
                        time + " seconds... #" + num);

                    Thread.sleep(time * 1000);

                    System.out.println("Done with #" + 
                        num + "!");

                    available.release();
                }
                catch (InterruptedException intEx)
                {
                    intEx.printStackTrace();
                }
            }
        };

        for (int i=0; i<10; i++)
            new Thread(limitedCall).start();
    }
}

即便本例中的 10 個線程都在運行(您能夠對運行 SemApp 的 Java 進程執行 jstack 來驗證),但只有 3 個線程是活躍的。在一個信號計數器釋放以前,其餘 7 個線程都處於空閒狀態。(實際上,Semaphore 類支持一次獲取和釋放多個 permit,但這不適用於本場景。)

CyclicBarrier

  java.util.concurrent.CyclicBarrier 一個同步輔助類,它容許 (common barrier point),在在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此時 CyclicBarrier 頗有用。一組線程互相等待,直到到達某個公共屏障點。由於該 barrier 在釋放等待線程後能夠重用,因此稱它爲循環的 barrier。
  須要全部的子任務都完成時,才執行主任務,這個時候就能夠選擇使用CyclicBarrier。賽跑時,等待全部人都準備好時,才起跑:

public class CyclicBarrierTest {

    public static void main(String[] args) throws IOException, InterruptedException {
        //若是將參數改成4,可是下面只加入了3個選手,這永遠等待下去
        //Waits until all parties have invoked await on this barrier. 
        CyclicBarrier barrier = new CyclicBarrier(3);

        ExecutorService executor = Executors.newFixedThreadPool(3);
        executor.submit(new Thread(new Runner(barrier, "1號選手")));
        executor.submit(new Thread(new Runner(barrier, "2號選手")));
        executor.submit(new Thread(new Runner(barrier, "3號選手")));

        executor.shutdown();
    }
}

class Runner implements Runnable {
    // 一個同步輔助類,它容許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)
    private CyclicBarrier barrier;

    private String name;

    public Runner(CyclicBarrier barrier, String name) {
        super();
        this.barrier = barrier;
        this.name = name;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(1000 * (new Random()).nextInt(8));
            System.out.println(name + " 準備好了...");
            // barrier的await方法,在全部參與者都已經在此 barrier 上調用 await 方法以前,將一直等待。
            barrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println(name + " 起跑!");
    }
}

輸出結果:
3號選手 準備好了...
2號選手 準備好了...
1號選手 準備好了...
1號選手 起跑!
2號選手 起跑!
3號選手 起跑!

CountDownLatch

  類 java.util.concurrent.CountDownLatch 是一個同步輔助類,在完成一組正在其餘線程中執行的操做以前,它容許一個或多個線程一直等待。 用給定的數字做爲計數器初始化 CountDownLatch。一個線程調用 await()方法後,在當前計數到達零以前,會一直受阻塞。其餘線程調用 countDown() 方法,會使計數器遞減,因此,計數器的值爲 0 後,會釋放全部等待的線程。其餘後續的 await 調用都將當即返回。
這種現象只出現一次,由於計數沒法被重置。若是須要重置計數,請考慮使用 CyclicBarrier。

CountDownLatch 做爲一個通用同步工具,有不少用途。使用「 1 」初始化的
CountDownLatch 用做一個簡單的開/關鎖存器,或入口:在經過調用 countDown() 的線程
打開入口前,全部調用 await 的線程都一直在入口處等待。用 N 初始化的 CountDownLatch
可使一個線程在 N 個線程完成某項操做以前一直等待,或者使其在某項操做完成 N 次以前一直等待。

此類持有全部空閒線程,直到知足特定條件,這時它將會一次釋放全部這些線程。

清單 2. CountDownLatch:讓咱們去賽馬吧!

import java.util.*;
import java.util.concurrent.*;

class Race
{
    private Random rand = new Random();

    private int distance = rand.nextInt(250);
    private CountDownLatch start;
    private CountDownLatch finish;

    private List<String> horses = new ArrayList<String>();

    public Race(String... names)
    {
        this.horses.addAll(Arrays.asList(names));
    }

    public void run()
        throws InterruptedException
    {
        System.out.println("And the horses are stepping up to the gate...");
        final CountDownLatch start = new CountDownLatch(1);
        final CountDownLatch finish = new CountDownLatch(horses.size());
        final List<String> places = 
            Collections.synchronizedList(new ArrayList<String>());

        for (final String h : horses)
        {
            new Thread(new Runnable() {
                public void run() {
                    try
                    {
                        System.out.println(h + 
                            " stepping up to the gate...");
                        start.await();

                        int traveled = 0;
                        while (traveled < distance)
                        {
                            // In a 0-2 second period of time....
                            Thread.sleep(rand.nextInt(3) * 1000);

                            // ... a horse travels 0-14 lengths
                            traveled += rand.nextInt(15);
                            System.out.println(h + 
                                " advanced to " + traveled + "!");
                        }
                        finish.countDown();
                        System.out.println(h + 
                            " crossed the finish!");
                        places.add(h);
                    }
                    catch (InterruptedException intEx)
                    {
                        System.out.println("ABORTING RACE!!!");
                        intEx.printStackTrace();
                    }
                }
            }).start();
        }

        System.out.println("And... they're off!");
        start.countDown();        

        finish.await();
        System.out.println("And we have our winners!");
        System.out.println(places.get(0) + " took the gold...");
        System.out.println(places.get(1) + " got the silver...");
        System.out.println("and " + places.get(2) + " took home the bronze.");
    }
}

public class CDLApp
{
    public static void main(String[] args)
        throws InterruptedException, java.io.IOException
    {
        System.out.println("Prepping...");

        Race r = new Race(
            "Beverly Takes a Bath",
            "RockerHorse",
            "Phineas",
            "Ferb",
            "Tin Cup",
            "I'm Faster Than a Monkey",
            "Glue Factory Reject"
            );

        System.out.println("It's a race of " + r.getDistance() + " lengths");

        System.out.println("Press Enter to run the race....");
        System.in.read();

        r.run();
    }
}

注意,CountDownLatch 有兩個用途:首先,它同時釋放全部線程,模擬馬賽的起點,但隨後會設置一個門閂模擬馬賽的終點。這樣,「主」 線程就能夠輸出結果。 爲了讓馬賽有更多的輸出註釋,能夠在賽場的 「轉彎處」 和 「半程」 點,好比賽馬跨過跑道的四分之1、二分之一和四分之三線時,添加 CountDownLatch。

Exchanger

  類 java.util.concurrent.Exchanger 提供了一個同步點,在這個同步點,一對線程能夠交換
數據。每一個線程經過 exchange()方法的入口提供數據給他的夥伴線程,並接收他的夥伴線程
提供的數據,並返回。

線程間能夠用 Exchanger 來交換數據。當兩個線程經過 Exchanger 交互了對象,這個交換對於兩個線程來講都是安全的。

Future 和 FutureTask

  接口 public interface Future<V> 表示異步計算的結果。它提供了檢查計算是否完成的方法,
以等待計算的完成,並調用get()獲取計算的結果。
FutureTask 類是 Future 的一個實現, 並實現了Runnable ,因此可經過 Executor(線程池) 來執行。
也可傳遞給Thread對象執行。

若是在主線程中須要執行比較耗時的操做時,但又不想阻塞主線程時,能夠把這些做業交給
Future 對象在後臺完成,當主線程未來須要時,就能夠經過 Future 對象得到後臺做業的計算結果或者執行狀態。

第五類:線程管理,

Callable 被執行的任務
Executor 執行任務
Future 異步提交任務的返回數據

一次聊天引起的思考--java併發包實戰
Executor是總的接口,用來執行Runnable任務;
ExecutorService是Executor的擴展接口,主要擴展了執行Runnable或Callable任務的方式,及shutdown的方法;
ScheduledExecutorService是ExecutorService的擴展接口,主要擴展了能夠用任務調度的形式(延遲或按期)執行Runnable或Callable任務;
AbstractExecutorService是ExecutorService接口的實現類,是抽象類,提供一些默認的執行Runnable或Callable任務的方法;
ThreadPoolExecutor是AbstractExecutorService的子類,是線程池的實現;
ScheduledThreadPoolExecutor是ThreadPoolExecutor的子類,實現ScheduledExecutorService接口,基於線程池模式的多任務調度,是Timer工具類的高性能版;
Callable與Future是Runnable的另外的形式,用來異步獲取任務執行結果;
最後,Executors是工具類,用於建立上述各類實例。

Q&A

Synchronization vs volatile

Synchronization supports mutual exclusion and visibility. In contrast, the volatile keyword only supports visibility.

參考文獻:

【1】http://niub.iteye.com/blog/1787639

【2】http://www.ibm.com/developerworks/cn/java/j-5things5.html

【3】http://www.itzhai.com/

【4】http://chenzehe.iteye.com/blog/1759884

【5】http://www.javaworld.com/article/2078809/java-concurrency/java-concurrency-java-101-the-next-generation-java-concurrency-without-the-pain-part-1.html

【6】http://www.javaworld.com/article/2078848/java-concurrency/java-concurrency-java-101-the-next-generation-java-concurrency-without-the-pain-part-2.html

【7】http://gaozp.github.io/tec/2015/03/17/reentrantreadwritelock.html

相關文章
相關標籤/搜索