Java 併發方案全面學習總結

併發與並行的概念

  • 併發(Concurrency): 問題域中的概念—— 程序須要被設計成可以處理多個同時(或者幾乎同時)發生的事件
  • 並行(Parallelism): 方法域中的概念——經過將問題中的多個部分 並行執行,來加速解決問題。

進程、線程與協程

進程與線程

它們都是並行機制的解決方案。html

  • 進程: 進程是什麼呢?直白地講,進程就是應用程序的啓動實例。好比咱們運行一個遊戲,打開一個軟件,就是開啓了一個進程。進程擁有代碼和打開的文件資源、數據資源、獨立的內存空間。啓動一個進程很是消耗資源,通常一臺機器最多啓動數百個進程。
  • 線程: 線程從屬於進程,是程序的實際執行者。一個進程至少包含一個主線程,也能夠有更多的子線程。線程擁有本身的棧空間。在進程內啓動線程也要消耗必定的資源,通常一個進程最多啓動數千個線程。操做系統可以調度的最小單位就是線程了。
  • 協程: 協程又從屬於線程,它不屬於操做系統管轄,徹底由程序控制,一個線程內能夠啓動數萬甚至數百萬協程。但也正是由於它由程序控制,它對編寫代碼的風格改變也最多。

Java的並行執行實現

JVM中的線程

  • 主線程: 獨立生命週期的線程
  • 守護線程: 被主線程建立,隨着建立線程結束而結束

線程狀態

Java中的線程狀態

要注意的是,線程不是調用start以後立刻進入運行中的狀態,而是在"可運行"狀態,由操做系統來決定調度哪一個線程來運行。java

Jetty中的線程

Web服務器都有本身管理的線程池, 好比輕量級的Jetty, 就有如下三種類型的線程:python

  • Acceptor
  • Selector
  • Worker

Jetty線程模型

最原始的多線程——Thread類

繼承類 vs 實現接口

  • 繼承Thread類
  • 實現Runnable接口

實際使用中顯然實現接口更好, 避免了單繼承限制。c++

Runnable vs Callable

  • Runnable:實現run方法,沒法拋出受檢查的異常,運行時異常會中斷主線程,但主線程沒法捕獲,因此子線程應該本身處理全部異常
  • Callable:實現call方法,能夠拋出受檢查的異常,能夠被主線程捕獲,但主線程沒法捕獲運行時異常,也不會被打斷。

須要返回值的話,就用Callable接口
一個實現了Callable接口的對象,須要被包裝爲RunnableFuture對象, 而後才能被新線程執行, 而RunnableFuture其實仍是實現了Runnable接口。git

Future, Runnable 和FutureTask的關係以下:github

Future, Runnable 和FutureTask的關係

能夠看出FutureTask實際上是RunnableFuture接口的實現類,下面是使用Future的示例代碼算法

public class Callee implements Callable {
    AtomicInteger counter = new AtomicInteger(0);

    private Integer seq=null;

    public Callee()
    {
        super();
    }

    public  Callee(int seq)
    {
        this.seq = seq;
    }

    /**
     * call接口能夠拋出受檢查的異常
     * @return
     * @throws InterruptedException
     */
    @Override
    public Person call() throws InterruptedException {
        Person p = new Person("person"+ counter.incrementAndGet(), RandomUtil.random(0,150));
        System.out.println("In thread("+seq+"), create a Person: "+p.toString());
        Thread.sleep(1000);
        return  p;
    }
}
Callee callee1 = new Callee();
FutureTask<Person> ft= new FutureTask<Person>(callee1);
Thread thread = new Thread(ft);
thread.start();

try {
    thread.join();
} catch (InterruptedException e) {
    e.printStackTrace();
    return;
}

System.out.println("ft.isDone: "+ft.isDone());

Person result1;
try {
    result1 = ((Future<Person>) ft).get();
} catch (InterruptedException e) {
    e.printStackTrace();
    result1 = null;
} catch (ExecutionException e) {
    e.printStackTrace();
    result1 = null;
}
Person result = result1;
System.out.println("main thread get result: "+result.toString());

線程調度

  • Thread.yield() 方法:調用這個方法,會讓當前線程退回到可運行狀態,而不是阻塞狀態,這樣就留給其餘同級線程一些運行機會
  • Thread.sleep(long millis):調用這個方法,真的會讓當前線程進入阻塞狀態,直到時間結束
  • 線程對象的join():這個方法讓當前線程進入阻塞狀態,直到要等待的線程結束。
  • 線程對象的interrupt():不要覺得它是中斷某個線程!它只是線線程發送一箇中斷信號,讓線程在無限等待時(如死鎖時)能拋出異常,從而結束線程,可是若是你吃掉了這個異常,那麼這個線程仍是不會中斷的!
  • Object類中的wait():線程進入等待狀態,直到其餘線程調用此對象的 notify() 方法或 notifyAll() 喚醒方法。這個狀態跟加鎖有關,因此是Object的方法。
  • Object類中的notify():喚醒在此對象監視器上等待的單個線程。若是全部線程都在此對象上等待,則會選擇喚醒其中一個線程。選擇是任意性的,並在對實現作出決定時發生。線程經過調用其中一個 wait 方法,在對象的監視器上等待。 直到當前的線程放棄此對象上的鎖定,才能繼續執行被喚醒的線程。被喚醒的線程將以常規方式與在該對象上主動同步的其餘全部線程進行競爭;相似的方法還有一個notifyAll(),喚醒在此對象監視器上等待的全部線程。

同步與鎖

內存一致性錯誤

因爲線程在並行時,可能會"同時"訪問一個變量, 因此共享變量的時候,會出現值處於一個不肯定的情況, 例以下面的代碼, c是一個實例變量, 多個線程同時訪問increment或decrement方法時,就可能出現一致性錯誤,最終讓c變成"奇怪"的值。數據庫

public class Counter {
    private int c = 0;

    public void increment() {
        c++;
    }

    public void decrement() {
        c--;
    }

    public int value() {
        return c;
    }
}

volatile

public class Foo {
    private int x = -1;
    private volatile boolean v = false;
    public void setX(int x) {
        this.x = x;
        v = true;
    }
    public int getX() {
        if (v == true) {
            return x;
        }
        return 0;
    }
}

volatile關鍵字實際上指定了變量不使用寄存器, 而且對變量的訪問不會亂序執行,從而避免了並行訪問的不一致問題。但這個方案僅僅對原始類型變量自己生效,若是是++或者--這種「非原子」操做,則不能保證多線程操做的正確性了編程

原子類型

JDK提供了一系列對基本類型的封裝,造成原子類型(Atomic Variables),特別適合用來作計數器api

import java.util.concurrent.atomic.AtomicInteger;

class AtomicCounter {
    private AtomicInteger c = new AtomicInteger(0);

    public void increment() {
        c.incrementAndGet();
    }

    public void decrement() {
        c.decrementAndGet();
    }

    public int value() {
        return c.get();
    }
}

原子操做的實現原理,在Java8以前和以後不一樣

  • Java7
public final int getAndIncrement() {
    for (;;) {
        int current = get();
        int next = current + 1;
        if (compareAndSet(current, next))
            return current;
    }
}
  • Java8
public final int getAndIncrement() {
    return unsafe.getAndAddInt(this, valueOffset, 1);
}

至於Compare-and-Swap,以及Fetch-and-Add兩種算法,是依賴機器底層機制實現的。

線程安全的集合類

  • BlockingQueue: 定義了一個先進先出的數據結構,當你嘗試往滿隊列中添加元素,或者從空隊列中獲取元素時,將會阻塞或者超時
  • ConcurrentMap: 是 java.util.Map 的子接口,定義了一些有用的原子操做。移除或者替換鍵值對的操做只有當 key 存在時才能進行,而新增操做只有當 key 不存在時。使這些操做原子化,能夠避免同步。ConcurrentMap 的標準實現是 ConcurrentHashMap,它是 HashMap 的併發模式。
  • ConcurrentNavigableMap: 是 ConcurrentMap 的子接口,支持近似匹配。ConcurrentNavigableMap 的標準實現是 ConcurrentSkipListMap,它是 TreeMap 的併發模式。

ThreadLocal-只有本線程才能訪問的變量

ThreadLoal 變量,它的基本原理是,同一個 ThreadLocal 所包含的對象(對ThreadLocal< String >而言即爲 String 類型變量),在不一樣的 Thread 中有不一樣的副本(實際是不一樣的實例,後文會詳細闡述)。這裏有幾點須要注意

由於每一個 Thread 內有本身的實例副本,且該副本只能由當前 Thread 使用。這是也是 ThreadLocal 命名的由來
既然每一個 Thread 有本身的實例副本,且其它 Thread 不可訪問,那就不存在多線程間共享的問題。

它與普通變量的區別在於,每一個使用該變量的線程都會初始化一個徹底獨立的實例副本。ThreadLocal 變量一般被private static修飾。當一個線程結束時,它所使用的全部 ThreadLocal 相對的實例副本均可被回收。

總的來講,ThreadLocal 適用於每一個線程須要本身獨立的實例且該實例須要在多個方法中被使用,也即變量在線程間隔離而在方法或類間共享的場景。後文會經過實例詳細闡述該觀點。另外,該場景下,並不是必須使用 ThreadLocal ,其它方式徹底能夠實現一樣的效果,只是 ThreadLocal 使得實現更簡潔。

synchronized關鍵字

  • 方法加鎖:其實不是加在指定的方法上,而是在指定的對象上,只不過在方法開始前會檢查這個鎖
  • 靜態方法鎖:加在類上,它和加在對象上的鎖互補干擾
  • 代碼區塊鎖:其實不是加在指定的代碼塊上,而是加在指定的對象上,只不過在代碼塊開始前會檢查這個鎖。一個對象只會有一個鎖,因此代碼塊鎖和實例方法鎖是會互相影響的

須要注意的是:不管synchronized關鍵字加在方法上仍是對象上,它取得的鎖都是對象,而不是把一段代碼或函數看成鎖――並且同步方法極可能還會被其餘線程的對象訪問,每一個對象只有一個鎖(lock)與之相關聯

加鎖不慎可能會形成死鎖

線程池(Java 5)

用途

真正的多線程使用,是從線程池開始的,Callable接口,基本上也是被線程池調用的。

線程池全景圖

線程池全景圖
線程池類圖

線程池的使用

ExecutorService pool = Executors.newFixedThreadPool(3);

        Callable<Person> worker1 = new Callee();
        Future ft1 = pool.submit(worker1);

        Callable<Person> worker2 = new Callee();
        Future ft2 = pool.submit(worker2);

        Callable<Person> worker3 = new Callee();
        Future ft3 = pool.submit(worker3);

        System.out.println("準備通知線程池shutdown...");
        pool.shutdown();
        System.out.println("已通知線程池shutdown");
        try {
            pool.awaitTermination(2L, TimeUnit.SECONDS);
            System.out.println("線程池徹底結束");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

線程池要解決的問題

  • 任務排隊:當前能併發執行的線程數老是有限的,但任務數能夠很大
  • 線程調度:線程的建立是比較消耗資源的,須要一個池來維持活躍線程
  • 結果收集:每一個任務完成之後,其結果須要統一採集

線程池類型

  • newSingleThreadExecutor:建立一個單線程的線程池。這個線程池只有一個線程在工做,也就是至關於單線程串行執行全部任務。若是這個惟一的線程由於異常結束,那麼會有一個新的線程來替代它。此線程池保證全部任務的執行順序按照任務的提交順序執行。
  • newFixedThreadPool:建立固定大小的線程池。每次提交一個任務就建立一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,若是某個線程由於執行異常而結束,那麼線程池會補充一個新線程。
  • newCachedThreadPool:建立一個可緩存的線程池。若是線程池的大小超過了處理任務所須要的線程,那麼就會回收部分空閒(60秒不執行任務)的線程,當任務數增長時,此線程池又能夠智能的添加新線程來處理任務。此線程池不會對線程池大小作限制,線程池大小徹底依賴於操做系統(或者說JVM)可以建立的最大線程大小。
  • newScheduledThreadPool:建立一個大小無限的線程池。此線程池支持定時以及週期性執行任務的需求。
  • newSingleThreadScheduledExecutor:建立一個單線程的線程池。此線程池支持定時以及週期性執行任務的需求。

線程池狀態

線程池狀態遷移圖

  • 線程池在構造前(new操做)是初始狀態,一旦構造完成線程池就進入了執行狀態RUNNING。嚴格意義上講線程池構造完成後並無線程被當即啓動,只有進行「預啓動」或者接收到任務的時候纔會啓動線程。這個會後面線程池的原理會詳細分析。可是線程池是出於運行狀態,隨時準備接受任務來執行。
  • 線程池運行中能夠經過shutdown()和shutdownNow()來改變運行狀態。shutdown()是一個平緩的關閉過程,線程池中止接受新的任務,同時等待已經提交的任務執行完畢,包括那些進入隊列尚未開始的任務,這時候線程池處於SHUTDOWN狀態;shutdownNow()是一個當即關閉過程,線程池中止接受新的任務,同時線程池取消全部執行的任務和已經進入隊列可是尚未執行的任務,這時候線程池處於STOP狀態。
  • 一旦shutdown()或者shutdownNow()執行完畢,線程池就進入TERMINATED狀態,此時線程池就結束了。
  • isTerminating()描述的是SHUTDOWN和STOP兩種狀態。
  • isShutdown()描述的是非RUNNING狀態,也就是SHUTDOWN/STOP/TERMINATED三種狀態。

任務拒絕策略

線程池任務拒絕策略

Fork/Join模型(Java7)

用途

計算密集型的任務,最好不多有IO等待,也沒有Sleep之類的,最好是自己就適合遞歸處理的算法

分析

在給定的線程數內,儘量地最大化利用CPU資源,但又不會致使其餘資源過載(好比內存),或者大量空線程等待。

ForkJoinPool主要用來使用分治法(Divide-and-Conquer Algorithm)來解決問題。典型的應用好比快速排序算法。

這裏的要點在於,ForkJoinPool須要使用相對少的線程來處理大量的任務。

好比要對1000萬個數據進行排序,那麼會將這個任務分割成兩個500萬的排序任務和一個針對這兩組500萬數據的合併任務。以此類推,對於500萬的數據也會作出一樣的分割處理,到最後會設置一個閾值來規定當數據規模到多少時,中止這樣的分割處理。好比,當元素的數量小於10時,會中止分割,轉而使用插入排序對它們進行排序。

那麼到最後,全部的任務加起來會有大概2000000+個。問題的關鍵在於,對於一個任務而言,只有當它全部的子任務完成以後,它纔可以被執行。

因此當使用ThreadPoolExecutor時,使用分治法會存在問題,由於ThreadPoolExecutor中的線程沒法像任務隊列中再添加一個任務而且在等待該任務完成以後再繼續執行。而使用ForkJoinPool時,就可以讓其中的線程建立新的任務,並掛起當前的任務,此時線程就可以從隊列中選擇子任務執行。

以上程序的關鍵是fork()和join()方法。在ForkJoinPool使用的線程中,會使用一個內部隊列來對須要執行的任務以及子任務進行操做來保證它們的執行順序。

那麼使用ThreadPoolExecutor或者ForkJoinPool,會有什麼性能的差別呢?

首先,使用ForkJoinPool可以使用數量有限的線程來完成很是多的具備父子關係的任務,好比使用4個線程來完成超過200萬個任務。可是,使用ThreadPoolExecutor時,是不可能完成的,由於ThreadPoolExecutor中的Thread沒法選擇優先執行子任務,須要完成200萬個具備父子關係的任務時,也須要200萬個線程,顯然這是不可行的。

ps:ForkJoinPool在執行過程當中,會建立大量的子任務,致使GC進行垃圾回收,這些是須要注意的。

原理與使用

ForkJoinPool首先是ExecutorService的實現類,所以是特殊的線程池。

建立了ForkJoinPool實例以後,就能夠調用ForkJoinPool的submit(ForkJoinTask<T> task) 或invoke(ForkJoinTask<T> task)方法來執行指定任務了。

其中ForkJoinTask表明一個能夠並行、合併的任務。ForkJoinTask是一個抽象類,它還有兩個抽象子類:RecusiveAction和RecusiveTask。其中RecusiveTask表明有返回值的任務,而RecusiveAction表明沒有返回值的任務。

ForkJoin Pool 類圖

我的認爲ForkJoinPool設計不太好的地方在於,ForkJoinTask不是個接口,而是抽象類,實際使用時基本上不是繼承RecursiveAction就是繼承RecursiveTask,對業務類有限制。

示例

典型的一個例子,就是一串數組求和

public interface Calculator {
    long sumUp(long[] numbers);
}
public class ForkJoinCalculator implements Calculator {
    private ForkJoinPool pool;

    private static class SumTask extends RecursiveTask<Long> {
        private long[] numbers;
        private int from;
        private int to;

        public SumTask(long[] numbers, int from, int to) {
            this.numbers = numbers;
            this.from = from;
            this.to = to;
        }

        @Override
        protected Long compute() {
            // 當須要計算的數字小於6時,直接計算結果
            if (to - from < 6) {
                long total = 0;
                for (int i = from; i <= to; i++) {
                    total += numbers[i];
                }
                return total;
            // 不然,把任務一分爲二,遞歸計算
            } else {
                int middle = (from + to) / 2;
                SumTask taskLeft = new SumTask(numbers, from, middle);
                SumTask taskRight = new SumTask(numbers, middle+1, to);
                taskLeft.fork();
                taskRight.fork();
                return taskLeft.join() + taskRight.join();
            }
        }
    }

    public ForkJoinCalculator() {
        // 也可使用公用的 ForkJoinPool:
        // pool = ForkJoinPool.commonPool()
        pool = new ForkJoinPool();
    }

    @Override
    public long sumUp(long[] numbers) {
        return pool.invoke(new SumTask(numbers, 0, numbers.length-1));
    }
}

這個例子展現了當數組被拆分得足夠小(<6)以後,就不須要並行處理了,而更大的數組就拆爲兩半,分別處理。

Stream(Java 8)

概念

別搞混了,跟IO的Stream徹底不是一回事,能夠把它看作是集合處理的聲明式語法,相似數據庫操做語言SQL。固然也有跟IO相似的地方,就是Stream只能消費一次,不能重複使用。

看個例子:

int sum = widgets.stream()
.filter(w -> w.getColor() == RED)
 .mapToInt(w -> w.getWeight())
 .sum();

流提供了一個能力,任何一個流,只要獲取一次並行流,後面的操做就均可以並行了。
例如:

Stream<String> stream = Stream.of("a", "b", "c","d","e","f","g");
String str = stream.parallel().reduce((a, b) -> a + "," + b).get();
System.out.println(str);

流操做

流操做示意圖

生成流

  • Collection.stream()
  • Collection.parallelStream()
  • Arrays.stream(T array) or Stream.of()
  • java.io.BufferedReader.lines()
  • java.util.stream.IntStream.range()
  • java.nio.file.Files.walk()
  • java.util.Spliterator
  • Random.ints()
  • BitSet.stream()
  • Pattern.splitAsStream(java.lang.CharSequence)
  • JarFile.stream()

示例

// 1. Individual values
Stream stream = Stream.of("a", "b", "c");
// 2. Arrays
String [] strArray = new String[] {"a", "b", "c"};
stream = Stream.of(strArray);
stream = Arrays.stream(strArray);
// 3. Collections
List<String> list = Arrays.asList(strArray);
stream = list.stream();

須要注意的是,對於基本數值型,目前有三種對應的包裝類型 Stream:

IntStream、LongStream、DoubleStream。固然咱們也能夠用 Stream<Integer>、Stream<Long> >、Stream<Double>,可是 boxing 和 unboxing 會很耗時,因此特別爲這三種基本數值型提供了對應的 Stream。

Intermediate

一個流能夠後面跟隨零個或多個 intermediate 操做。其目的主要是打開流,作出某種程度的數據映射/過濾,而後返回一個新的流,交給下一個操做使用。這類操做都是惰性化的(lazy),就是說,僅僅調用到這類方法,並無真正開始流的遍歷。

已知的Intermediate操做包括:map (mapToInt, flatMap 等)、 filter、 distinct、 sorted、 peek、 limit、 skip、 parallel、 sequential、 unordered。

Terminal

一個流只能有一個 terminal操做,當這個操做執行後,流就被使用「光」了,沒法再被操做。因此這一定是流的最後一個操做。Terminal 操做的執行,纔會真正開始流的遍歷,而且會生成一個結果,或者一個 side effect。

已知的Terminal操做包括:forEach、 forEachOrdered、 toArray、 reduce、 collect、 min、 max、 count、 anyMatch、 allMatch、 noneMatch、 findFirst、 findAny、 iterator

  • reduce解析: reduce本質上是個聚合方法,它的做用是用流裏面的元素生成一個結果,因此用來作累加,字符串拼接之類的都很是合適。它有三個參數

    • 初始值:最終結果的初始化值,能夠是一個空的對象
    • 聚合函數:一個二元函數(有兩個參數),第一個參數是上一次聚合的結果,第二個參數是某個元素
    • 多個部分結果的合併函數:若是流併發了,那麼聚合操做會分爲多段進行,這裏顯示了多段之間如何配合
  • collect: collect比reduce更強大:reduce最終只能獲得一個跟流裏數據類型相同的值, 但collect的結果能夠是任何對象。簡單的collect也有三個參數:

    • 最終要返回的數據容器
    • 把元素併入返回值的方法
    • 多個部分結果的合併

兩個collect示例

//和reduce相同的合併字符操做
String concat = stringStream.collect(StringBuilder::new, StringBuilder::append,StringBuilder::append).toString();
//等價於上面,這樣看起來應該更加清晰
String concat = stringStream.collect(() -> new StringBuilder(),(l, x) -> l.append(x), (r1, r2) -> r1.append(r2)).toString();
//把stream轉成map
Stream stream = Stream.of(1, 2, 3, 4).filter(p -> p > 2);

List result = stream.collect(() -> new ArrayList<>(), (list, item) -> list.add(item), (one, two) -> one.addAll(two));
/* 或者使用方法引用 */
result = stream.collect(ArrayList::new, List::add, List::addAll);

協程

協程,英文Coroutines,也叫纖程(Fiber)是一種比線程更加輕量級的存在。正如一個進程能夠擁有多個線程同樣,一個線程也能夠擁有多個協程。
協程概念

協程其實是在語言底層(或者框架)對須要等待的程序進行調度,從而充分利用CPU的方法, 其實這徹底能夠經過回調來實現, 可是深層回調的代碼太變態了,因此發明了協程的寫法。理論上多個協程不會真的"同時"執行,也就不會引發共享變量操做的不肯定性,不須要加鎖(待確認)。

pythone協程示例

Pythone版協程示例

Pythone, Golang和C#都內置了協程的語法,但Java沒有,只能經過框架實現,常見的框架包括:Quasar,kilim和ea-async

Java ea-async 協程示例

import static com.ea.async.Async.await;
import static java.util.concurrent.CompletableFuture.completedFuture;

public class Store
{
    //購物操做, 傳一個商品id和一個價格
    public CompletableFuture<Boolean> buyItem(String itemTypeId, int cost)
    {
        //銀行扣款(長時間操做)
        if(!await(bank.decrement(cost))) {
            return completedFuture(false);
        }
        try {
            //商品出庫(長時間操做)
            await(inventory.giveItem(itemTypeId));
            return completedFuture(true);
        } catch (Exception ex) {
            await(bank.refund(cost));
            throw new AppException(ex);
        }
    }
}

參考資料


相關文章
相關標籤/搜索