它們都是並行機制的解決方案。html
要注意的是,線程不是調用start以後立刻進入運行中的狀態,而是在"可運行"狀態,由操做系統來決定調度哪一個線程來運行。java
Web服務器都有本身管理的線程池, 好比輕量級的Jetty, 就有如下三種類型的線程:python
實際使用中顯然實現接口更好, 避免了單繼承限制。c++
須要返回值的話,就用Callable接口
一個實現了Callable接口的對象,須要被包裝爲RunnableFuture對象, 而後才能被新線程執行, 而RunnableFuture其實仍是實現了Runnable接口。git
Future, Runnable 和FutureTask的關係以下:github
能夠看出FutureTask實際上是RunnableFuture接口的實現類,下面是使用Future的示例代碼算法
public class Callee implements Callable { AtomicInteger counter = new AtomicInteger(0); private Integer seq=null; public Callee() { super(); } public Callee(int seq) { this.seq = seq; } /** * call接口能夠拋出受檢查的異常 * @return * @throws InterruptedException */ @Override public Person call() throws InterruptedException { Person p = new Person("person"+ counter.incrementAndGet(), RandomUtil.random(0,150)); System.out.println("In thread("+seq+"), create a Person: "+p.toString()); Thread.sleep(1000); return p; } }
Callee callee1 = new Callee(); FutureTask<Person> ft= new FutureTask<Person>(callee1); Thread thread = new Thread(ft); thread.start(); try { thread.join(); } catch (InterruptedException e) { e.printStackTrace(); return; } System.out.println("ft.isDone: "+ft.isDone()); Person result1; try { result1 = ((Future<Person>) ft).get(); } catch (InterruptedException e) { e.printStackTrace(); result1 = null; } catch (ExecutionException e) { e.printStackTrace(); result1 = null; } Person result = result1; System.out.println("main thread get result: "+result.toString());
因爲線程在並行時,可能會"同時"訪問一個變量, 因此共享變量的時候,會出現值處於一個不肯定的情況, 例以下面的代碼, c是一個實例變量, 多個線程同時訪問increment或decrement方法時,就可能出現一致性錯誤,最終讓c變成"奇怪"的值。數據庫
public class Counter { private int c = 0; public void increment() { c++; } public void decrement() { c--; } public int value() { return c; } }
public class Foo { private int x = -1; private volatile boolean v = false; public void setX(int x) { this.x = x; v = true; } public int getX() { if (v == true) { return x; } return 0; } }
volatile關鍵字實際上指定了變量不使用寄存器, 而且對變量的訪問不會亂序執行,從而避免了並行訪問的不一致問題。但這個方案僅僅對原始類型變量自己生效,若是是++或者--這種「非原子」操做,則不能保證多線程操做的正確性了編程
JDK提供了一系列對基本類型的封裝,造成原子類型(Atomic Variables),特別適合用來作計數器api
import java.util.concurrent.atomic.AtomicInteger; class AtomicCounter { private AtomicInteger c = new AtomicInteger(0); public void increment() { c.incrementAndGet(); } public void decrement() { c.decrementAndGet(); } public int value() { return c.get(); } }
原子操做的實現原理,在Java8以前和以後不一樣
public final int getAndIncrement() { for (;;) { int current = get(); int next = current + 1; if (compareAndSet(current, next)) return current; } }
public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1); }
至於Compare-and-Swap,以及Fetch-and-Add兩種算法,是依賴機器底層機制實現的。
ThreadLoal 變量,它的基本原理是,同一個 ThreadLocal 所包含的對象(對ThreadLocal< String >而言即爲 String 類型變量),在不一樣的 Thread 中有不一樣的副本(實際是不一樣的實例,後文會詳細闡述)。這裏有幾點須要注意
由於每一個 Thread 內有本身的實例副本,且該副本只能由當前 Thread 使用。這是也是 ThreadLocal 命名的由來
既然每一個 Thread 有本身的實例副本,且其它 Thread 不可訪問,那就不存在多線程間共享的問題。
它與普通變量的區別在於,每一個使用該變量的線程都會初始化一個徹底獨立的實例副本。ThreadLocal 變量一般被private static修飾。當一個線程結束時,它所使用的全部 ThreadLocal 相對的實例副本均可被回收。
總的來講,ThreadLocal 適用於每一個線程須要本身獨立的實例且該實例須要在多個方法中被使用,也即變量在線程間隔離而在方法或類間共享的場景。後文會經過實例詳細闡述該觀點。另外,該場景下,並不是必須使用 ThreadLocal ,其它方式徹底能夠實現一樣的效果,只是 ThreadLocal 使得實現更簡潔。
須要注意的是:不管synchronized關鍵字加在方法上仍是對象上,它取得的鎖都是對象,而不是把一段代碼或函數看成鎖――並且同步方法極可能還會被其餘線程的對象訪問,每一個對象只有一個鎖(lock)與之相關聯
加鎖不慎可能會形成死鎖
真正的多線程使用,是從線程池開始的,Callable接口,基本上也是被線程池調用的。
ExecutorService pool = Executors.newFixedThreadPool(3); Callable<Person> worker1 = new Callee(); Future ft1 = pool.submit(worker1); Callable<Person> worker2 = new Callee(); Future ft2 = pool.submit(worker2); Callable<Person> worker3 = new Callee(); Future ft3 = pool.submit(worker3); System.out.println("準備通知線程池shutdown..."); pool.shutdown(); System.out.println("已通知線程池shutdown"); try { pool.awaitTermination(2L, TimeUnit.SECONDS); System.out.println("線程池徹底結束"); } catch (InterruptedException e) { e.printStackTrace(); }
計算密集型的任務,最好不多有IO等待,也沒有Sleep之類的,最好是自己就適合遞歸處理的算法
在給定的線程數內,儘量地最大化利用CPU資源,但又不會致使其餘資源過載(好比內存),或者大量空線程等待。
ForkJoinPool主要用來使用分治法(Divide-and-Conquer Algorithm)來解決問題。典型的應用好比快速排序算法。
這裏的要點在於,ForkJoinPool須要使用相對少的線程來處理大量的任務。
好比要對1000萬個數據進行排序,那麼會將這個任務分割成兩個500萬的排序任務和一個針對這兩組500萬數據的合併任務。以此類推,對於500萬的數據也會作出一樣的分割處理,到最後會設置一個閾值來規定當數據規模到多少時,中止這樣的分割處理。好比,當元素的數量小於10時,會中止分割,轉而使用插入排序對它們進行排序。
那麼到最後,全部的任務加起來會有大概2000000+個。問題的關鍵在於,對於一個任務而言,只有當它全部的子任務完成以後,它纔可以被執行。
因此當使用ThreadPoolExecutor時,使用分治法會存在問題,由於ThreadPoolExecutor中的線程沒法像任務隊列中再添加一個任務而且在等待該任務完成以後再繼續執行。而使用ForkJoinPool時,就可以讓其中的線程建立新的任務,並掛起當前的任務,此時線程就可以從隊列中選擇子任務執行。
以上程序的關鍵是fork()和join()方法。在ForkJoinPool使用的線程中,會使用一個內部隊列來對須要執行的任務以及子任務進行操做來保證它們的執行順序。
那麼使用ThreadPoolExecutor或者ForkJoinPool,會有什麼性能的差別呢?
首先,使用ForkJoinPool可以使用數量有限的線程來完成很是多的具備父子關係的任務,好比使用4個線程來完成超過200萬個任務。可是,使用ThreadPoolExecutor時,是不可能完成的,由於ThreadPoolExecutor中的Thread沒法選擇優先執行子任務,須要完成200萬個具備父子關係的任務時,也須要200萬個線程,顯然這是不可行的。
ps:ForkJoinPool在執行過程當中,會建立大量的子任務,致使GC進行垃圾回收,這些是須要注意的。
ForkJoinPool首先是ExecutorService的實現類,所以是特殊的線程池。
建立了ForkJoinPool實例以後,就能夠調用ForkJoinPool的submit(ForkJoinTask<T> task) 或invoke(ForkJoinTask<T> task)方法來執行指定任務了。
其中ForkJoinTask表明一個能夠並行、合併的任務。ForkJoinTask是一個抽象類,它還有兩個抽象子類:RecusiveAction和RecusiveTask。其中RecusiveTask表明有返回值的任務,而RecusiveAction表明沒有返回值的任務。
我的認爲ForkJoinPool設計不太好的地方在於,ForkJoinTask不是個接口,而是抽象類,實際使用時基本上不是繼承RecursiveAction就是繼承RecursiveTask,對業務類有限制。
典型的一個例子,就是一串數組求和
public interface Calculator { long sumUp(long[] numbers); }
public class ForkJoinCalculator implements Calculator { private ForkJoinPool pool; private static class SumTask extends RecursiveTask<Long> { private long[] numbers; private int from; private int to; public SumTask(long[] numbers, int from, int to) { this.numbers = numbers; this.from = from; this.to = to; } @Override protected Long compute() { // 當須要計算的數字小於6時,直接計算結果 if (to - from < 6) { long total = 0; for (int i = from; i <= to; i++) { total += numbers[i]; } return total; // 不然,把任務一分爲二,遞歸計算 } else { int middle = (from + to) / 2; SumTask taskLeft = new SumTask(numbers, from, middle); SumTask taskRight = new SumTask(numbers, middle+1, to); taskLeft.fork(); taskRight.fork(); return taskLeft.join() + taskRight.join(); } } } public ForkJoinCalculator() { // 也可使用公用的 ForkJoinPool: // pool = ForkJoinPool.commonPool() pool = new ForkJoinPool(); } @Override public long sumUp(long[] numbers) { return pool.invoke(new SumTask(numbers, 0, numbers.length-1)); } }
這個例子展現了當數組被拆分得足夠小(<6)以後,就不須要並行處理了,而更大的數組就拆爲兩半,分別處理。
別搞混了,跟IO的Stream徹底不是一回事,能夠把它看作是集合處理的聲明式語法,相似數據庫操做語言SQL。固然也有跟IO相似的地方,就是Stream只能消費一次,不能重複使用。
看個例子:
int sum = widgets.stream() .filter(w -> w.getColor() == RED) .mapToInt(w -> w.getWeight()) .sum();
流提供了一個能力,任何一個流,只要獲取一次並行流,後面的操做就均可以並行了。
例如:
Stream<String> stream = Stream.of("a", "b", "c","d","e","f","g"); String str = stream.parallel().reduce((a, b) -> a + "," + b).get(); System.out.println(str);
示例
// 1. Individual values Stream stream = Stream.of("a", "b", "c"); // 2. Arrays String [] strArray = new String[] {"a", "b", "c"}; stream = Stream.of(strArray); stream = Arrays.stream(strArray); // 3. Collections List<String> list = Arrays.asList(strArray); stream = list.stream();
須要注意的是,對於基本數值型,目前有三種對應的包裝類型 Stream:
IntStream、LongStream、DoubleStream。固然咱們也能夠用 Stream<Integer>、Stream<Long> >、Stream<Double>,可是 boxing 和 unboxing 會很耗時,因此特別爲這三種基本數值型提供了對應的 Stream。
一個流能夠後面跟隨零個或多個 intermediate 操做。其目的主要是打開流,作出某種程度的數據映射/過濾,而後返回一個新的流,交給下一個操做使用。這類操做都是惰性化的(lazy),就是說,僅僅調用到這類方法,並無真正開始流的遍歷。
已知的Intermediate操做包括:map (mapToInt, flatMap 等)、 filter、 distinct、 sorted、 peek、 limit、 skip、 parallel、 sequential、 unordered。
一個流只能有一個 terminal操做,當這個操做執行後,流就被使用「光」了,沒法再被操做。因此這一定是流的最後一個操做。Terminal 操做的執行,纔會真正開始流的遍歷,而且會生成一個結果,或者一個 side effect。
已知的Terminal操做包括:forEach、 forEachOrdered、 toArray、 reduce、 collect、 min、 max、 count、 anyMatch、 allMatch、 noneMatch、 findFirst、 findAny、 iterator
reduce解析: reduce本質上是個聚合方法,它的做用是用流裏面的元素生成一個結果,因此用來作累加,字符串拼接之類的都很是合適。它有三個參數
collect: collect比reduce更強大:reduce最終只能獲得一個跟流裏數據類型相同的值, 但collect的結果能夠是任何對象。簡單的collect也有三個參數:
兩個collect示例
//和reduce相同的合併字符操做 String concat = stringStream.collect(StringBuilder::new, StringBuilder::append,StringBuilder::append).toString(); //等價於上面,這樣看起來應該更加清晰 String concat = stringStream.collect(() -> new StringBuilder(),(l, x) -> l.append(x), (r1, r2) -> r1.append(r2)).toString();
//把stream轉成map Stream stream = Stream.of(1, 2, 3, 4).filter(p -> p > 2); List result = stream.collect(() -> new ArrayList<>(), (list, item) -> list.add(item), (one, two) -> one.addAll(two)); /* 或者使用方法引用 */ result = stream.collect(ArrayList::new, List::add, List::addAll);
協程,英文Coroutines,也叫纖程(Fiber)是一種比線程更加輕量級的存在。正如一個進程能夠擁有多個線程同樣,一個線程也能夠擁有多個協程。
協程其實是在語言底層(或者框架)對須要等待的程序進行調度,從而充分利用CPU的方法, 其實這徹底能夠經過回調來實現, 可是深層回調的代碼太變態了,因此發明了協程的寫法。理論上多個協程不會真的"同時"執行,也就不會引發共享變量操做的不肯定性,不須要加鎖(待確認)。
pythone協程示例
Pythone, Golang和C#都內置了協程的語法,但Java沒有,只能經過框架實現,常見的框架包括:Quasar,kilim和ea-async。
Java ea-async 協程示例
import static com.ea.async.Async.await; import static java.util.concurrent.CompletableFuture.completedFuture; public class Store { //購物操做, 傳一個商品id和一個價格 public CompletableFuture<Boolean> buyItem(String itemTypeId, int cost) { //銀行扣款(長時間操做) if(!await(bank.decrement(cost))) { return completedFuture(false); } try { //商品出庫(長時間操做) await(inventory.giveItem(itemTypeId)); return completedFuture(true); } catch (Exception ex) { await(bank.refund(cost)); throw new AppException(ex); } } }