多線程併發編程總結(四)

本文基於https://github.com/h2pl/Java-Tutorial的總結html

Java7 HashMap(數組+鏈表)

HashMap 裏面是一個數組,而後數組中每一個元素是一個單向鏈表。

Java7 ConcurrentHashMap(段(數組+鏈表)+ ReentrantLock)

ConcurrentHashMap 是一個 Segment 數組(默認16個),Segment 經過繼承 ReentrantLock 來進行加鎖,
因此每次須要加鎖的操做鎖住的是一個 segment,這樣只要保證每一個 Segment 是線程安全的,也就實現了全局的線程安全。

Segment 內部是由 數組+鏈表 組成的。


Segment 經過繼承 ReentrantLock 來進行加鎖。


在往某個 segment 中 put 的時候,首先會調用:

    HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value)

    也就是說先進行一次 tryLock() 快速獲取該 segment 的獨佔鎖,
    若是失敗,那麼進入到 scanAndLockForPut 這個方法來獲取鎖。

Java8 HashMap (數組+鏈表+紅黑樹) ( 若是鏈表元素過多(遠大於8),查找效率從O(n)變爲O(logN) )

HashMap 介紹java

Java8 ConcurrentHashMap ( CAS+synchronized )

用一次 CAS 操做,若是 CAS 失敗,那就是有併發操做(synchronized)。

JUC 一 ConcurrentHashMapnode

LockSupport

LockSupport 是JDK中比較底層的類,用來建立鎖和其餘同步工具類的基本線程阻塞原語。
能夠作到與 join() 、wait() 功能同樣,使線程自由的阻塞、釋放。

Java 鎖和同步器框架的核心 AQS(AbstractQueuedSynchronizer抽象隊列同步器),
就是經過調用 LockSupport.park() 和 LockSupport.unpark() 實現線程的阻塞和喚醒的。


LockSupport 方法底層都是調用Unsafe的方法實現。全名sun.misc.Unsafe,該類能夠直接操控內存。

LockSupport 提供 park() 和 unpark() 方法實現阻塞線程和解除線程阻塞。


LockSupport() 操做的是線程對象,直接傳入的就是 Thread,
而 wait() 屬於具體對象 ( synchronized(t)這裏的鎖定了t,那麼wait需用t.wait():釋放掉t )

wait/notify 須要獲取對象的監視器,即synchronized修飾,
而park/unpark 不須要獲取對象的監視器。

Fork/Join框架(分治,並行執行任務)

Fork/Join框架和執行器框架(Executor Framework)主要的區別在於:

    工做竊取算法(Work-Stealing Algorithm)。

    與執行器框架不一樣,使用Join操做讓一個主任務等待它所建立的子任務的完成,執行這個任務的線程稱之爲工做者線程(Worker Thread)。
    工做者線程尋找其餘仍未被執行的任務,而後開始執行。
    經過這種方式,提高應用程序的性能。


爲了達到這個目標,經過Fork/Join框架執行的任務有如下限制:

    任務只能使用 fork()和join() 操做看成同步機制。
    若是使用其餘的同步機制,工做者線程就不能執行其餘任務。

    任務不能執行I/O操做,好比文件數據的讀取與寫入。

    任務不能拋出非運行時異常(Checked Exception),必須在代碼中處理掉這些異常。


Fork/Join框架的核心是由下列兩個類組成的:

    ForkJoinPool(執行Task):
        這個類實現了ExecutorService接口和工做竊取算法(Work-Stealing Algorithm)。
        它管理工做者線程,並提供任務的狀態信息,以及任務的執行信息。

    ForkJoinTask(執行具體的分支邏輯):
        這個類是一個將在ForkJoinPool中執行的任務的基類。


Fork/Join 框架提供了在一個任務裏執行 fork()和join() 操做的機制和控制任務狀態的方法。
一般,爲了實現Fork/Join任務,須要實現一個如下兩個類之一的子類:

    RecursiveAction : 用於任務沒有返回結果的場景。
    RecursiveTask : 用於任務有返回結果的場景。


ForkJoinPool 使用 submit 或 invoke 提交的區別:
    invoke 是同步執行,調用以後須要等待任務完成,才能執行後面的代碼。
    submit 是異步執行,只有在 Future 調用 get 的時候會阻塞。


執行子任務調用 fork 方法並非最佳的選擇,最佳的選擇是 invokeAll 方法。

    leftTask.fork();  
    rightTask.fork();

    替換爲

    invokeAll(leftTask, rightTask);

    fork方法至關於A先分工給B,而後A當監工不幹活,B去完成A交代的任務。因此上面的模式至關於浪費了一個線程。
    那麼若是使用invokeAll至關於A分工給B後,A和B都去完成工做。這樣能夠更好的利用線程池,縮短執行的時間。

線程池 一 ForkJoinPoolgit

工做竊取算法

基本思想:

    ForkJoinPool 的每一個工做線程都維護着一個工做隊列(WorkQueue),這是一個雙端隊列(Deque),
    裏面存放的對象是任務(ForkJoinTask)。

    每一個工做線程在運行中產生新的任務 ( 一般是由於調用了 fork() )時,會放入工做隊列的隊尾,
    而且工做線程在處理本身的工做隊列時,使用的是 LIFO 方式,也就是說每次從隊尾取出任務來執行。

    每一個工做線程在處理本身的工做隊列同時,會嘗試竊取一個任務
    (或是來自於剛剛提交到 pool 的任務,或是來自於其餘工做線程的工做隊列),
    竊取的任務位於其餘線程的工做隊列的隊首,也就是說工做線程在竊取其餘工做線程的任務時,使用的是 FIFO 方式。

    在遇到 join() 時,若是須要 join 的任務還沒有完成,則會先處理其餘任務,並等待其完成。
    在既沒有本身的任務,也沒有能夠竊取的任務時,進入休眠。

ForkJoinTaskExample

class ForkJoinTaskExample extends RecursiveTask<Integer> {

    public static final int threshold = 2;
    private int start;
    private int end;

    public ForkJoinTaskExample(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;

        //若是任務足夠小就計算任務
        if ((end - start) <= threshold) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            // 若是任務大於閾值,就分裂成兩個子任務計算
            int middle = (start + end) / 2;
            ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
            ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);

            // 執行子任務
            leftTask.fork();
            rightTask.fork();

            // 等待任務執行結束合併其結果
            //int leftResult = leftTask.join();
            //int rightResult = rightTask.join();
            int leftResult = leftTask.invoke();
            int rightResult = rightTask.invoke();
            // 合併子任務
            sum = leftResult + rightResult;
        }
        return sum;
    }

    public static void main(String[] args) {
        long l = System.currentTimeMillis();

        ForkJoinPool forkjoinPool = new ForkJoinPool();
        //生成一個計算任務,計算1+2+3+4
        ForkJoinTaskExample task = new ForkJoinTaskExample(1, 1000000);
        //執行一個任務(時間600)
        Future<Integer> result = forkjoinPool.submit(task);

        //時間20
        int result1 = 0;
        for (int i = 1; i <= 1000000; i++) {
            result1 += i;
        }

        try {
            System.out.println(result.get());
            System.out.println(result1);

            long l1 = System.currentTimeMillis();
            System.out.println("time=" + (l1 - l));
        } catch (Exception e) {
            System.out.println(e);
        }
    }
}
相關文章
相關標籤/搜索