Java併發編程中的若干核心技術,向高手進階!

來源:簡書
http://www.jianshu.com/p/5f49...

引言java

本文試圖從一個更高的視角來總結Java語言中的併發編程內容,但願閱讀完本文以後,能夠收穫一些內容,至少應該知道在Java中作併發編程實踐的時候應該注意什麼,應該關注什麼,如何保證線程安全,以及如何選擇合適的工具來知足需求。程序員

固然,更深層次的內容就會涉及到JVM層面的知識,包括底層對Java內存的管理,對線程的管理等較爲核心的問題,固然,本文的定位在於抽象與總結,更爲具體而深刻的內容就須要本身去實踐,考慮到可能篇幅過長、重複描述某些內容,以及自身技術深度等緣由,本文將在深度和廣度上作一些權衡,某些內容會作一些深刻的分析,而有些內容會一帶而過,點到爲止。面試

總之,本文就當是對學習Java併發編程內容的一個總結,以及給那些但願快速瞭解Java併發編程內容的讀者拋磚引玉,不足之處還望指正。算法

Java線程編程

通常來講,在java中實現高併發是基於多線程編程的,所謂併發,也就是多個線程同時工做,來處理咱們的業務,在機器廣泛多核心的今天,併發編程的意義極爲重大,由於咱們有多個cpu供線程使用,若是咱們的應用依然只使用單線程模式來工做的話,對極度浪費機器資源的。因此,學習java併發知識的首要問題是:如何建立一個線程,而且讓這個線程作一些事情?後端

這是java併發編程內容的起點,下面將分別介紹多個建立線程,而且讓線程作一些事情的方法。數組

繼承Thread類緩存

繼承Thread類,而後重寫run方法,這是第一種建立線程的方法。run方法裏面就是咱們要作的事情,能夠在run方法裏面寫咱們想要在新的線程裏面運行的任務,下面是一個小例子,咱們繼承了Thread類,而且在run方法裏面打印出了固然線程的名字,而後sleep1秒中以後就退出了:安全

/**
 * Created by hujian06 on 2017/10/31.
 *
 * the demo of thread
 */
public class ThreadDemo {

    public static void main(String ... args) {數據結構

        AThread aThread = new AThread();

        //start the thread
        aThread.start();

    }

}

class AThread extends Thread {
    @Override
    public void run() {
        System.out.println("Current Thread Name:" +
                Thread.currentThread().getName());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

若是咱們想要啓動這個線程,只須要像上面代碼中那樣,調用Thread類的start方法就能夠了。

實現Runnable接口

啓動一個線程的第二種方法是實現Runnable接口,而後實現其run方法,將你想要在新線程裏面執行的業務代碼寫在run方法裏面,下面的例子展現了這種方法啓動線程的示例,實現的功能和上面的第一種示例是同樣的:

/**
 * Created by hujian06 on 2017/10/31.
 *
 * the demo of Runnable
 */
public class ARunnableaDemo {

    public static void main(String ... args) {

        ARunnanle aRunnanle = new ARunnanle();
        Thread thread = new Thread(aRunnanle);

        thread.start();

    }

}

class ARunnanle implements Runnable {

    @Override
    public void run() {
        System.out.println("Current Thread Name:" +
                Thread.currentThread().getName());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在啓動線程的時候,依然仍是使用了Thread這個類,只是咱們在構造函數中將咱們實現的Runnable對象傳遞進去了,因此在咱們執行Thread類的start方法的時候,實際執行的內容是咱們的Runnable的run方法。

使用FutureTask

啓動一個新的線程的第三種方法是使用FutureTask,下面來看一下FutureTask的類圖,就能夠明白爲何可使用FutureTask來啓動一個新的線程了:


FutureTask的類圖

從FutureTask的類圖中能夠看出,FutureTask實現了Runnable接口和Future接口,因此它兼備Runnable和Future兩種特性,下面先來看看如何使用FutureTask來啓動一個新的線程:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

/**
 * Created by hujian06 on 2017/10/31.
 *
 * the demo of FutureTask
 */
public class FutureTaskDemo {

    public static void main(String ... args) {

        ACallAble callAble = new ACallAble();

        FutureTask<String> futureTask = new FutureTask<>(callAble);

        Thread thread = new Thread(futureTask);

        thread.start();

        do {

        }while (!futureTask.isDone());

        try {
            String result = futureTask.get();

            System.out.println("Result:" + result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

    }

}

class ACallAble implements Callable<String> {

    @Override
    public String call() throws Exception {
        Thread.sleep(1000);
        return "Thread-Name:" +
                Thread.currentThread().getName();
    }
}

能夠看到,使用FutureTask來啓動一個線程以後,咱們能夠監控這個線程是否完成,上面的示例中主線程會一直等待這個新建立的線程直到它返回,其實只要是Future提供的接口,咱們在FutureTask中均可以使用,這極大的方便了咱們,Future在併發編程中的意義極爲重要,Future表明一個將來會發生的東西,它是一種暗示,一種佔位符,它示意咱們它可能不會當即獲得結果,由於它的任務還在運行,可是咱們能夠獲得一個對這個線程的監控對象。

咱們能夠對線程的執行作一些判斷,甚至是控制,好比,若是咱們以爲咱們等了過久,而且咱們以爲沒有必要再等待下去的時候,就能夠將這個Task取消,還有一點須要提到的是,Future表明它可能正在運行,也可能已經返回,固然Future更多的暗示你能夠在等待這個結果的同時可使用其餘的線程作一些其餘的事情,當你真的須要這個結果的時候再來獲取就能夠了,這就是併發,理解這一點很是重要。

本小節經過介紹三種建立並啓動一個新線程的方法,爲進行併發編程開了一個頭,目前,咱們還只是在能建立多個線程,而後讓多個線程作不一樣個的事情的階段,固然,這是學習併發編程最爲基礎的,不管如何,如今,咱們可讓咱們的應用運行多個線程了,下面的文章將會基於這個假設(一個應用開啓了多個線程)討論一些併發編程中值得關注的內容。

線程模型

咱們如今能夠啓動多個線程,可是好像並無造成一種相似於模型的東西,很是混亂,而且到目前爲止咱們的多個線程依然只是各自作各自的事情,互不相干,多個線程之間並無交互(通訊),這是最簡單的模型,也是最基礎的模型,本小節試圖介紹線程模型,一種指導咱們的代碼組織的思想,線程模型肯定了咱們須要處理那些多線程的問題,在一個系統中,多個線程之間沒有通訊是不太可能的,更爲通常的狀況是,多個線程共享一些資源,而後相互競爭來獲取資源權限,多個線程相互配合,來提升系統的處理能力。

正由於多個線程之間會有通訊交互,因此本文接下來的討論纔有了意義,若是咱們的系統裏面有幾百個線程在工做,可是這些線程互不相干,那麼這樣的系統要麼實現的功能很是單一,要麼毫無心義(固然不是絕對的,好比Netty的線程模型)。

繼續來討論線程模型,上面說到線程模型是一種指導代碼組織的思想,這是我本身的理解,不一樣的線程模型須要咱們使用不一樣的代碼組織,好的線程模型能夠提升系統的併發度,而且可使得系統的複雜度下降,這裏須要提一下Netty 4的線程模型,Netty 4的線程模型使得咱們能夠很容易的理解Netty的事件處理機制,這種優秀的設計基於Reactor線程模型,Reactor線程模型分爲單線程模型、多線程模型以及主從多線程模型,Netty的線程模型相似於Reactor主從多線程模型。

固然線程模型是一種更高級別的併發編程內容,它是一種編程指導思想,尤爲在咱們進行底層框架設計的時候特別須要注意線程模型,由於一旦線程模型設計不合理,可能會致使後面框架代碼過於複雜,而且可能由於線程同步等問題形成問題不可控,最終致使系統運行失控。相似於Netty的線程模型是一種好的線程模型,下面展現了這種模型:

Netty線程模型

簡單來講,Netty爲每一個新創建的Channel分配一個NioEventLoop,而每一個NioEventLoop內部僅使用一個線程,這就避免了多線程併發的同步問題,由於爲每一個Channel處理的線程僅有一個,因此不須要使用鎖等線程同步手段來作線程同步,在咱們的系統設計的時候應該借鑑這種線程模型的設計思路,能夠避免咱們走不少彎路。

Java線程池

池化技術是一種很是有用的技術,對於線程來講,建立一個線程的代價是很高的,若是咱們在建立了一個線程,而且讓這個線程作一個任務以後就回收的話,那麼下次要使用線程來執行咱們的任務的時候又須要建立一個新的線程,是否能夠在建立完成一個線程以後一直緩衝,直到系統關閉的時候再進行回收呢?教你如何監控 Java 線程池運行狀態

線程池就是這樣的組件,使用線程池,就不必頻繁建立線程,線程池會爲咱們管理線程,當咱們須要一個新的線程來執行咱們的任務的時候,就向線程池申請,而線程池會從池子裏面找到一個空閒的線程返回給請求者,若是池子裏面沒有可用的線程,那麼線程池會根據一些參數指標來建立一個新的線程,或者將咱們的任務提交到任務隊列中去,等待一個空閒的線程來執行這個任務。

細節內容在下文中進行分析,目前咱們只須要明白,線程池裏面有不少線程,這些線程會一直到系統關係纔會被回收,不然一直會處於處理任務或者等待處理任務的狀態。

首先,如何使用線程池呢?下面的代碼展現瞭如何使用java線程池的例子:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by hujian06 on 2017/10/31.
 *
 * the demo of Executors
 */
public class ExecutorsDemo {

    public static void main(String ... args) {

        int cpuCoreCount = Runtime.getRuntime().availableProcessors();
        AThreadFactory threadFactory = new AThreadFactory();
        ARunnanle runnanle = new ARunnanle();

        ExecutorService fixedThreadPool=
                Executors.newFixedThreadPool(cpuCoreCount, threadFactory);

        ExecutorService cachedThreadPool =
                Executors.newCachedThreadPool(threadFactory);

        ScheduledExecutorService newScheduledThreadPool =
                Executors.newScheduledThreadPool(cpuCoreCount, threadFactory);

        ScheduledExecutorService singleThreadExecutor =
                Executors.newSingleThreadScheduledExecutor(threadFactory);

        fixedThreadPool.submit(runnanle);
        cachedThreadPool.submit(runnanle);
        newScheduledThreadPool.scheduleAtFixedRate(runnanle, 0, 1, TimeUnit.SECONDS);
        singleThreadExecutor.scheduleWithFixedDelay(runnanle, 0, 100, TimeUnit.MILLISECONDS);

        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        fixedThreadPool.shutdownNow();
        cachedThreadPool.shutdownNow();
        newScheduledThreadPool.shutdownNow();
        singleThreadExecutor.shutdownNow();
    }

}

class ARunnable implements Runnable {

    @Override
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Current Thread Name:" +
                Thread.currentThread().getName());
    }
}

/**
 * the thread factory
 */
class AThreadFactory implements ThreadFactory {
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    @Override
    public Thread newThread(Runnable r) {
        return new Thread("aThread-" + threadNumber.incrementAndGet());
    }
}

更爲豐富的應用應該本身去探索,結合自身的需求來藉助線程池來實現,下面來分析一下Java線程池實現中幾個較爲重要的內容。

ThreadPoolExecutor和ScheduledThreadPoolExecutor

ThreadPoolExecutor和ScheduledThreadPoolExecutor是java實現線程池的核心類,不一樣類型的線程池其實就是在使用不一樣的構造函數,以及不一樣的參數來構造出ThreadPoolExecutor或者ScheduledThreadPoolExecutor,因此,學習java線程池的重點也在於學習這兩個核心類。

前者適用於構造通常的線程池,然後者繼承了前者,而且不少內容是通用的,可是ScheduledThreadPoolExecutor增長了schedule功能,也就是說,ScheduledThreadPoolExecutor使用於構造具備調度功能的線程池,在須要週期性調度執行的場景下就可使用ScheduledThreadPoolExecutor。

下面展現了ThreadPoolExecutor和ScheduledThreadPoolExecutor的類圖,能夠看出他們的關係,以及他們的繼承關係:

ThreadPoolExecutor類圖

ScheduledThreadPoolExecutor類圖

關於較爲細節的內容再也不本文的敘述範圍以內,若是想要了解這些內容的詳細內容,能夠參考文章中給出的連接,這些文章較爲深刻的分析和總結了相關的內容。

上文中提到,線程池會管理着一些線程,這些線程要麼處於運行狀態,要麼處於等待任務的狀態,固然這只是咱們較爲形象的描述,一個線程的狀態不只有運行態與等待狀態,還有其餘的狀態,可是對我咱們來講,線程池裏面的線程確實是要麼處於運行狀態,要麼處於等待任務的狀態,這體如今,當咱們向一個線程池提交一個任務的時候,可能會被等待任務的線程當即執行,可是可能線程池裏面的線程都處於忙碌狀態,那麼咱們提交的任務就會被加入到等待運行的任務隊列中去,當有空閒線程了,或者隊列也滿了,那麼線程池就會採用一些策略來執行任務,而且在某些時刻會拒絕提交的任務,這些細節均可以在ThreadPoolExecutor的實現中找到。

在線程池的實現中,有一個角色特別重要,那就是任務隊列,當線程池裏面沒有空閒的線程來執行咱們的任務的時候,咱們的任務就會被添加到任務隊列中去等待執行,而這個任務隊列可能會被多個線程併發讀寫,因此須要支持多線程安全訪問,java提供了一類支持併發環境的隊列,稱爲阻塞隊列,這是一類特殊的隊列,他們的使用時很是普遍的,特別是在jdk自身的類庫建設上,固然在咱們實際的工做中也是有不少使用場景的。

關於ThreadPoolExecutor是如何處理一個提交的任務的細節,能夠參考下面的代碼:

   public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

下面來看一下java中藉助ThreadPoolExecutor來構造的幾個線程池的特性:

一、newFixedThreadPool

使用ThreadPoolExecutor構造一個newCachedThreadPool的流程以下:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

在任意時刻,newFixedThreadPool構造出來的線程池中最多隻可能存活着nThreads個線程,若是全部的線程都在運行任務,那麼這個時候提交的任務將會被添加到任務隊列中去等待執行。

咱們能夠控制corePoolSize和maximumPoolSize來使得經過ThreadPoolExecutor構造出來的線程池具備一些不同的特性,可是須要注意的是,當咱們設置的maximumPoolSize大於corePoolSize的時候,若是當前線程池裏面的線程數量已經達到了corePoolSize了,而且當前因此線程都處於運行任務的狀態,那麼在這個時候提交的任務會被添加到任務隊列中去,只有在任務隊列滿了的時候,纔會去建立新的線程,若是線程數量已經達到了maximumPoolSize了,那麼到此就會拒絕提交的任務,這些流程能夠參考上面展現出來的execute方法的實現。該類型的線程池使用的任務隊列是LinkedBlockingQueue類型的阻塞隊列。

二、newCachedThreadPool

經過ThreadPoolExecutor構造一個newCachedThreadPool線程池的流程以下:

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

newCachedThreadPool適合於相似秒殺系統中,它能夠按需建立線程。每一個線程在空閒了一段時間以後會被回收,而後須要建立的時候再建立出來,在使用的時候應該使用合適的構造參數。

該類型使用的任務隊列是SynchronousQueue這種同步隊列,這是一種特別的隊列,每一個線程都是有使命的,每一個線程都會等待另一個線程和本身交易,在交易完成以前都會阻塞住線程,他們之間有一種傳遞關係,數據是從一個線程直接傳遞到例外一個線程中去的,SynchronousQueue這種隊列不存儲實際的數據,而是存儲着一些線程的信息,而SynchronousQueue管理着這些線程之間的交易,更爲詳細的細節參考後面的文章。

上面提到,ScheduleThreadPoolExecutor是繼承自ThreadPoolExecutor的,並且從類圖中也能夠看出來這種關係,因此其實ScheduleThreadPoolExecutor是對ThreadPoolExecutor的加強,它增長了schedule功能,使用與那些須要週期性調度執行,或者是延時執行的任務,在ScheduleThreadPoolExecutor中使用了一種阻塞隊列稱爲延時阻塞隊列,這種隊列有能力持有一段時間數據,咱們能夠設定這種時間,時間沒到的時候嘗試獲取數據的線程會被阻塞,直到設定的時間到了,線程纔會被喚醒來消費數據。而關於ScheduleThreadPoolExecutor是如何運做的,包括他的週期性任務調度是如何工做的,能夠參考上面提到的連接。

Future

Future表明一種將來某個時刻會發生的事情,在併發環境下使用Future是很是重要的,使用Future的前提是咱們能夠允許線程執行一段時間來完成這個任務,可是須要在咱們提交了任務的時候就返回一個Future,這樣在接下來的時間程序員能夠根據實際狀況來取消任務或者獲取任務,在多個任務沒有相互依賴關係的時候,使用Future能夠實現多線程的併發執行,多個線程能夠執行在不一樣的處理器上,而後在某個時間點來統一獲取結果就能夠了。

上文中已經提到了FutureTask,FutureTask既是一種Runnable,也是一種Future,而且結合了兩種類型的特性。下面展現了Future提供的一些方法,使用這些方法能夠很方便的進行任務控制:

public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

在java 8中增長了一個新的類CompletableFuture,這是對Future的極大加強,CompletableFuture提供了很是豐富的操做能夠來控制咱們的任務,而且能夠根據多種規則來關聯多個Future。

Fork/Join框架

Fork/Join框架是一種並行框架,它能夠將一個較大的任務切分紅一些小任務來執行,而且多個線程之間會相互配合,每一個線程都會有一個任務隊列,對於某些線程來講它們可能很快完成了本身的任務隊列中的任務,可是其餘的線程尚未完成,那麼這些線程就會去竊取那些尚未完成任務執行的線程的任務來執行,這成爲「工做竊取」算法,關於Fork/Join中的工做竊取,其實現仍是較爲複雜的,下面展現了Fork/Join框架的工做模式:

Fork/Join工做模式

能夠從上面的圖中看出,一個較大的任務會被切分爲一個小任務,而且小任務還會繼續切分,直到符合咱們設定的執行閾值,而後就會執行,執行完成以後會進行join,也就是將小任務的結果組合起來,組裝出咱們提交的整個任務的結果,這是一種很是先進的工做模式,很是有借鑑意義。固然,使用Fork/Join框架的前提是咱們的任務時能夠拆分紅小任務來執行的,而且小人物的結果能夠組裝出整個大任務的結果,歸併排序是一種能夠藉助Fork/Join框架來提供處理速度的算法,下面展現了使用Fork/Join框架來執行歸併排序的代碼,能夠試着調整參數來進行性能測試:

import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

/**
 * Created by hujian06 on 2017/10/23.
 *
 * merge sort by fork/join
 */
public class ForkJoinMergeSortDemo {

    public static void main(String ... args) {
        new Worker().runWork();
    }

}

class Worker {

    private static final boolean isDebug = false;

    public void runWork() {

        int[] array = mockArray(200000000, 1000000); // mock the data

        forkJoinCase(array);
        normalCase(array);

    }

    private void printArray(int[] arr) {

        if (isDebug == false) {
            return;
        }

        for (int i = 0; i < arr.length; i ++) {
            System.out.print(arr[i] + " ");
        }

        System.out.println();
    }

    private void forkJoinCase(int[] array) {
        ForkJoinPool pool = new ForkJoinPool();

        MergeSortTask mergeSortTask = new MergeSortTask(array, 0, array.length - 1);

        long start = System.currentTimeMillis();

        pool.invoke(mergeSortTask);

        long end = System.currentTimeMillis();

        printArray(array);

        System.out.println("[for/join mode]Total cost: " + (end - start) / 1000.0 + " s, for " +
                array.length + " items' sort work.");
    }

    private void normalCase(int[] array) {

        long start = System.currentTimeMillis();

        new MergeSortWorker().sort(array, 0, array.length - 1);

        long end = System.currentTimeMillis();

        printArray(array);

        System.out.println("[normal mode]Total cost: " + (end - start) / 1000.0 + " s, for " +
                array.length + " items' sort work.");
    }

    private static final  int[] mockArray(int length, int up) {
        if (length <= 0) {
            return null;
        }

        int[] array = new int[length];

        Random random = new Random(47);

        for (int i = 0; i < length; i ++) {
            array[i] = random.nextInt(up);
        }

        return array;
    }
}

class MergeSortTask extends RecursiveAction {

    private static final int threshold = 100000;
    private final MergeSortWorker mergeSortWorker = new MergeSortWorker();

    private int[] data;

    private int left;
    private int right;

    public MergeSortTask(int[] array, int l, int r) {
        this.data = array;
        this.left = l;
        this.right = r;
    }

    @Override
    protected void compute() {
        if (right - left < threshold) {
            mergeSortWorker.sort(data, left, right);
        } else {
            int mid = left + (right - left) / 2;
            MergeSortTask l = new MergeSortTask(data, left, mid);
            MergeSortTask r = new MergeSortTask(data, mid + 1, right);

            invokeAll(l, r);

            mergeSortWorker.merge(data, left, mid, right);
        }
    }
}

class MergeSortWorker {

    // Merges two subarrays of arr[].
    // First subarray is arr[l..m]
    // Second subarray is arr[m+1..r]
    void merge(int arr[], int l, int m, int r) {
        // Find sizes of two subarrays to be merged
        int n1 = m - l + 1;
        int n2 = r - m;

        /* Create temp arrays */
        int L[] = new int[n1];
        int R[] = new int[n2];

        /*Copy data to temp arrays*/
        for (int i = 0; i < n1; ++i)
            L[i] = arr[l + i];
        for (int j = 0; j < n2; ++j)
            R[j] = arr[m + 1 + j];

        /* Merge the temp arrays */

        // Initial indexes of first and second subarrays
        int i = 0, j = 0;

        // Initial index of merged subarry array
        int k = l;
        while (i < n1 && j < n2) {
            if (L[i] <= R[j]) {
                arr[k ++] = L[i ++];
            } else {
                arr[k ++] = R[j ++];
            }
        }

        /* Copy remaining elements of L[] if any */
        while (i < n1) {
            arr[k ++] = L[i ++];
        }

        /* Copy remaining elements of R[] if any */
        while (j < n2) {
            arr[k ++] = R[j ++];
        }
    }

    // Main function that sorts arr[l..r] using
    // merge()
    void sort(int arr[], int l, int r) {
        if (l < r) {
            // Find the middle point
            int m = l + (r - l) / 2;

            // Sort first and second halves
            sort(arr, l, m);
            sort(arr, m + 1, r);

            // Merge the sorted halves
            merge(arr, l, m, r);
        }
    }
}

在jdk中,使用Fork/Join框架的一個典型案例是Streams API,Streams API試圖簡化咱們的併發編程,可使用很簡單的流式API來處理咱們的數據流,在咱們無感知的狀態下,其實Streams的實現上藉助了Fork/Join框架來實現了併發計算,因此強烈建議使用Streams API來處理咱們的流式數據,這樣能夠充分的利用機器的多核心資源,來提升數據處理的速度。鑑於Fork/Join框架的先進思想,理解而且學會使用Fork/Join框架來處理咱們的實際問題是很是有必要的。

Java volatile關鍵字

volatile解決的問題是多個線程的內存可見性問題,在併發環境下,每一個線程都會有本身的工做空間,每一個線程只能訪問各自的工做空間,而一些共享變量會被加載到每一個線程的工做空間中,因此這裏面就有一個問題,內存中的數據何時被加載到線程的工做緩存中,而線程工做空間中的內容何時會回寫到內存中去。這兩個步驟處理不當就會形成內存可加性問題,也就是數據的不一致,好比某個共享變量被線程A修改了,可是沒有回寫到內存中去,而線程B在加載了內存中的數據以後讀取到的共享變量是髒數據,正確的作法應該是線程A的修改應該對線程B是可見的,更爲通用一些,就是在併發環境下共享變量對多個線程是一致的。volatile關鍵字解析~高級java必問

對於內存可見性的一點補充是,之因此會形成多個線程看到的共享變量的值不同,是由於線程在佔用CPU時間的時候,cpu爲了提升處理速度不會直接和內存交互,而是會先將內存中的共享內容讀取到內部緩存中(L1,L2),而後cpu在處理的過程當中就只會和內部緩存交互,在多核心的機器中這樣的處理方式就會形成內存可見性問題。

volatile能夠解決併發環境下的內存可見性問題,只須要在共享變量前面加上volatile關鍵字就能夠解決,可是須要說明的是,volatile僅僅是解決內存可見性問題,對於像i++這樣的問題仍是須要使用其餘的方式來保證線程安全。使用volatile解決內存可見性問題的原理是,若是對被volatile修飾的共享變量執行寫操做的話,JVM就會向cpu發送一條Lock前綴的指令,cpu將會這個變量所在的緩存行(緩存中能夠分配的最小緩存單位)寫回到內存中去。可是在多處理器的狀況下,將某個cpu上的緩存行寫回到系統內存以後,其餘cpu上該變量的緩存仍是舊的,這樣再進行後面的操做的時候就會出現問題,因此爲了使得全部線程看到的內容都是一致的,就須要實現緩存一致性協議,cpu將會經過監控總線上傳遞過來的數據來判斷本身的緩存是否過時,若是過時,就須要使得緩存失效,若是cpu再來訪問該緩存的時候,就會發現緩存失效了,這時候就會從新從內存加載緩存。

總結一下,volatile的實現原則有兩條:

一、JVM的Lock前綴的指令將使得cpu緩存寫回到系統內存中去
二、爲了保證緩存一致性原則,在多cpu的情景下,一個cpu的緩存回寫內存會致使其餘的cpu上的緩存都失效,再次訪問會從新從系統內存加載新的緩存內容。

原子操做CAS

原子操做表達的意思是要麼一個操做成功,要麼失敗,中間過程不會被其餘的線程中斷,這一點對於併發編程來講很是重要,在java中使用了大量的CAS來作併發編程,包括jdk的ConcurrentHsahMap的實現,還有AtomicXXX的實現等其餘一些併發工具的實現都使用了CAS這種技術,CAS包括兩部分,也就是Compare and swap,首先是比較,而後再交互,這樣作的緣由是,在併發環境下,可能不止一個線程想要來改變某個共享變量的值,那麼在進行操做以前使用一個比較,而這個比較的值是當前線程認爲(知道)該共享變量最新的值,可是可能其餘線程已經改變了這個值,那麼此時CAS操做就會失敗,只有在共享變量的值等於線程提供的用於比較的值的時候纔會進行原子改變操做。

java中有一個類是專門用於提供CAS操做支持的,那就是Unsafe類,可是咱們不能直接使用Unsafe類,由於Unsafe類提供的一些底層的操做,須要很是專業的人才能使用好,而且Unsafe類可能會形成一些安全問題,因此不建議直接使用Unsafe類,可是若是想使用Unsafe類的話仍是有方法的,那就是經過反射來獲取Unsafe實例,相似於下面的代碼:

class UnsafeHolder {

    private static Unsafe U = null;

    public static Unsafe getUnsafe() {
        if (U == null) {
            synchronized (UnsafeHolder.class) {
                if (U == null) {

                    List<Exception> exception = null;
                    try {
                        Field field = Unsafe.class.getDeclaredField("theUnsafe");

                        field.setAccessible(true);

                        try {
                            U = (Unsafe) field.get(null);
                        } catch (IllegalAccessException e) {

                            exception.add(e);
                        }
                    } catch (NoSuchFieldException e) {

                        exception.add(e);
                    } finally {

                        if (exception != null) {
                            reportException(exception);
                        }

                    }
                }
            }
        }

        return U;
    }

    /**
     * handler the exception in this method .
     * @param e The exception
     */
    private static void reportException(List<Exception> e) {
        e.forEach(System.out::println);
    }

}

若是想要了解Unsafe類到底提供了哪些較爲底層的操做,能夠直接參考Unsafe的源碼。CAS操做解決了原子操做問題,只要進行操做,CAS就會保證操做會成功,不會被中斷,這是一種很是好很是強大的特性,下面就java 8中的ConcurrentHashMap的size實現來談談CAS操做在併發環境下的使用案例。

在java 7中,ConcurrentHashMap的實現是基於分段鎖協議的實現,本質上仍是使用了鎖,只是基於一種考慮,就是多個線程訪問哈希桶具備隨機性,基於這種考慮來將數據存儲在不一樣的哈希段上面,而後每個段配有一把鎖,在須要寫某個段的時候須要加鎖,而在這個時候,其餘訪問其餘段的線程是不須要阻塞的,可是對於該段的線程訪問就須要等待,直到這個加鎖的線程釋放了鎖,其餘線程才能進行訪問。在java 8中,ConcurrentHashMap的實現拋棄了這種複雜的架構設計,可是繼承了這種分散線程競爭壓力的思想,其實就提升系統的併發度這一維度來講,分散競爭壓力是一種最爲直接明瞭的解決方案,而java 8在實現ConcurrentHashMap的時候大量使用了CAS操做,減小了使用鎖的頻度來提升系統的響應度,其實使用鎖和使用CAS來作併發在複雜度上不是一個數量級的,使用鎖在很大程度上假設了多個線程的排斥性,而且使用鎖會將線程阻塞等待,也就是說使用鎖來作線程同步的時候,線程的狀態是會改變的,可是使用CAS是不會改變線程的狀態的(不太嚴謹的說),因此使用CAS比起使用synchronized或者使用Lcok來講更爲輕量級。Java Map集合面試題彙總

如今就ConcurrentHashMap的size方法來分析一下如何將線程競爭的壓力分散出去。在java 7的實現上,在調用size方法以後,ConcurrentHashMap會進行兩次對哈希桶中的記錄累加的操做,這兩次累加的操做是不加鎖的,而後判斷兩次結果是否一致,若是一致就說明目前的系統是讀多寫少的場景,而且可能目前沒有線程競爭,因此直接返回就能夠,這就避免了使用鎖,可是若是兩次累加結果不一致,那就說明此時可能寫的線程較多,或者線程競爭較爲嚴重,那麼此時ConcurrentHashMap就會進行一個重量級的操做,對全部段進行加鎖,而後對每個段進行記錄計數,而後求得最終的結果返回。在最有狀況下,size方法須要作兩次累加計數,最壞狀況須要三次,而且會涉及全局加鎖這種重量級的加鎖操做,性能確定是不高的。而在java 8的實現上,ConcurrentHashMap的size方法其實是與ConcurrentHashMap是解耦的,size方法更像是接入了一個額外的併發計數系統,在進行size方法調用的時候是不會影響數據的存取的,這實際上是一種很是先進的思想,就是一個系統模塊化,而後模塊能夠進行更新,系統解耦,好比java 8中接入了併發計數組件Striped64來做爲size方法的支撐,可能將來出現了比Striped64更爲高效的算法來計數,那麼只須要將Striped64模塊換成新的模塊就能夠了,對原來的核心操做是不影響的,這種模塊化系統設定的思想應該在咱們的項目中具體實踐。

上面說到java 8在進行size方法的設計上引入了Striped64這種併發計數組件,這種組件的計數思想其實也是分散競爭,Striped64的實現上使用了volatile和CAS,在Striped64的實現中是看不到鎖的使用的,可是Striped64確實是一種高效的適用於併發環境下的計數組件,它會基於請求計數的線程,Striped64的計數會根據兩部分的內容來獲得最後的結果,相似於java 7中ConcurrentHashMap的size方法的實現,在Striped64的實現上也是借鑑了這種思想的,Striped64會首先嚐試將某個線程的計數請求累加到一個base共享變量上,若是成功了,那麼說明目前的競爭不是很激烈,也就不必後面的操做了,可是不少狀況下,併發環境下的線程競爭是很激烈的,因此嘗試累加到base上的計數請求很大機率是會失敗的,那麼Striped64會維護一個Cell數組,每一個Cell是一個計數組件,Striped64會爲每一個請求計數的線程計算一個哈希值,而後哈希到Cell數組中的某個位置上,而後這個線程的計數就會累加到該Cell上面去。

併發同步框架AQS

AQS是java中實現Lock的基礎,也是實現線程同步的基礎,AQS提供了鎖的語義,而且支持獨佔模式和共享模式,對應於悲觀鎖和樂觀鎖,獨佔模式的含義是說同一時刻只能有一個線程獲取鎖,而其餘試圖獲取鎖的線程都須要阻塞等待,而共享鎖的含義是說能夠有多個線程得到鎖,兩種模式在不一樣的場景下使用。

而鎖在併發編程中的地位不言而喻,多個線程的同步不少時候是須要鎖來作同步的,好比對於某些資源,咱們但願能夠有多個線程得到鎖來讀取,可是隻容許有一個線程得到鎖來執行寫操做,這種鎖稱爲讀寫鎖,它的實現上結合了AQS的共享模式和獨佔模式,共享模式對應於可使得多個線程得到鎖來進行讀操做,獨佔模式對應於只容許有一個線程得到鎖來進行寫操做。該文章詳細講述了多個Lock接口的實現類,以及他們是如何藉助AQS來實現的具體細節。

某些時候,咱們須要定製咱們本身的線程同步策略,個性化的線程同步藉助AQS能夠很容易的實現,好比咱們的需求是容許限定個數的線程得到鎖來進行一些操做,想要實現這樣的語義,只須要實現一個類,繼承AQS,而後重寫方法下面兩個方法:

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

還須要提到的一點是,鎖分爲公平鎖和非公平鎖,java中大多數時候會使用隊列來實現公平鎖,而使用棧來實現非公平鎖,固然這是基於隊列和棧這兩種數據結構的特色來實現的,直觀的來講,使用隊列的FIFO的特性就能夠實現相似排隊的效果,也就保證了公平性,而棧是一個後進先出的數據結構,它的這種結構形成的結果就是,最新進入的線程可能比那些等待過一段時間的線程更早的得到鎖,更爲具體的內容能夠參考上面的文章進行了解。

synchronized(同步鎖)

相對於volatile,synchronized就顯得比較重量級了。

首先,咱們應該知道,在java中,全部的對象均可以做爲鎖。能夠分爲下面三種狀況:

一、普通方法同步,鎖是當前對象
二、靜態方法同步,鎖是當前類的Class對象
三、普通塊同步,鎖是synchronize裏面配置的對象

當一個線程試圖訪問同步代碼時,必需要先得到鎖,退出或者拋出異常時必需要釋放鎖。

JVM基於進入和退出Monitor對象來實現方法同步和代碼塊同步,可使用monitorenter和monitorexit指令實現。monitorenter指令是在編譯後插入到同步代碼塊的開始位置,而monitorexit指令則插入到方法結束和異常處,JVM保證每一個monitorenter都有一個monitorexit閾值相對應。線程執行到monitorenter的時候,會嘗試得到對象所對應的monitor的鎖,而後才能得到訪問權限,synchronize使用的鎖保存在Java對象頭中。

併發隊列(阻塞隊列,同步隊列)

併發隊列,也就是能夠在併發環境下使用的隊列,爲何通常的隊列不能再併發環境下使用呢?由於在併發環境下,可能會有多個線程同時來訪問一個隊列,這個時候由於上下文切換的緣由可能會形成數據不一致的狀況,併發隊列解決了這個問題,而且java中的併發隊列的使用時很是普遍的,好比在java的線程池的實現上使用了多種不一樣特性的阻塞隊列來作任務隊列,對於阻塞隊列來講,它要解決的首要的兩個問題是:

1. 多線程環境支持,多個線程能夠安全的訪問隊列
2. 支持生產和消費等待,多個線程之間互相配合,當隊列爲空的時候,消費線程會阻塞等待隊列不爲空;當隊列滿了的時候,生產線程就會阻塞直到隊列不滿。

Java中提供了豐富的併發隊列實現,下面展現了這些併發隊列的概覽:

java併發隊列概覽

根據上面的圖能夠將java中實現的併發隊列分爲幾類:

1. 通常的阻塞隊列
2. 支持雙端存取的併發隊列
3. 支持延時獲取數據的延時阻塞隊列
4. 支持優先級的阻塞隊列

這些隊列的區別就在於從隊列中存取數據時的具體表現,好比對於延時隊列來講,獲取數據的線程可能被阻塞等待一段時間,也可能馬上返回,對於優先級阻塞隊列,獲取的數據是根據必定的優先級取到的。下面展現了一些隊列操做的具體表現:

  • Throws Exception 類型的插入和取出在不能當即被執行的時候就會拋出異常。
  • Special Value 類型的插入和取出在不能被當即執行的狀況下會返回一個特殊的值(true 或者 false)
  • Blocked 類型的插入和取出操做在不能被當即執行的時候會阻塞線程直到能夠操做的時候會被其餘線程喚醒
  • Timed out 類型的插入和取出操做在不能當即執行的時候會被阻塞必定的時候,若是在指定的時間內沒有被執行,那麼會返回一個特殊值

總結

本文總結了Java併發編程中的若干核心技術,而且對每個核心技術都作了一些分析,並給出了參考連接,能夠在參考連接中查找到更爲具體深刻的分析總結內容。

Java併發編程須要解決一些問題,好比線程間同步問題,如何保證數據可見性問題,以及如何高效的協調多個線程工做等內容,本文在這些維度上都有所設計。

本文做爲對閱讀java.util.Concurrent包的源碼閱讀的一個總結,同時本文也做爲一個起點,一個開始更高層次分析總結的起點,以前的分析都是基於JDK源碼來進行的,而且某些細節的內容尚未徹底搞明白,其實在閱讀了一些源碼以後就會發現。

若是想要深刻分析某個方面的內容,就須要一些底層的知識,不然很難完整的分析總結出來,可是這種不完全的分析又是頗有必要的,至少能夠對這些內容有一些大概的瞭解,而且知道本身的不足,以及將來須要瞭解的底層內容。

對於Java併發包的分析研究,深刻到底層就是對JVM如何管理內容,如何管理線程的分析,在深刻下去,就是操做系統對內存的管理,對線程的管理等內容,從操做系統再深刻下去,就是去理解CPU的指令系統,學習磁盤知識等內容。

固然,知識的關聯是無止境的,學習也是無止境的,目前來講,首要解決的問題是能夠熟練的使用Java提供的併發包內容來進行併發編程,在業務上提升併發處理能力,在出現問題的時候能夠很快找到問題而且解決問題,在達到這個要求以後,能夠去了解一些JVM層次的內容,好比JVM的內存模型,以及線程的實現,而且能夠與學習操做系統的相關內容並行進行。

推薦去個人博客閱讀更多:

1.Java JVM、集合、多線程、新特性系列教程

2.Spring MVC、Spring Boot、Spring Cloud 系列教程

3.Maven、Git、Eclipse、Intellij IDEA 系列工具教程

4.Java、後端、架構、阿里巴巴等大廠最新面試題

以爲不錯,別忘了點贊+轉發哦!

相關文章
相關標籤/搜索