java併發的四種風味:Thread、Executor、ForkJoin和Actor

本文由 ImportNew - shenggordon 翻譯自 Oleg Shelajev。歡迎加入翻譯小組。轉載請見文末要求。html

這篇文章討論了Java應用中並行處理的多種方法。從本身管理Java線程,到各類更好幾的解決方法,Executor服務、ForkJoin 框架以及計算中的Actor模型。java

Java併發編程的4種風格:Threads,Executors,ForkJoin和Actorspython

咱們生活在一個事情並行發生的世界。天然地,咱們編寫的程序也反映了這個特色,它們能夠併發的執行。固然除了Python代碼(譯者注:連接裏面講述了Python的全局解釋器鎖,解釋了緣由),不過你仍然能夠使用Jython在JVM上運行你的程序,來利用多處理器電腦的強大能力。編程

然而,併發程序的複雜程度遠遠超出了人類大腦的處理能力。相比較而言,咱們簡直弱爆了:咱們生來就不是爲了思考多線程程序、評估併發訪問有限資源以及預測哪裏會發生錯誤或者瓶頸。多線程

面對這些困難,人類已經總結了很多併發計算的解決方案和模型。這些模型強調問題的不一樣部分,當咱們實現並行計算時,能夠根據問題作出不一樣的選擇。併發

在這篇文章中,我將會用對同一個問題,用不一樣的代碼來實現併發的解決方案;而後討論這些方案有哪些好的地方,有哪些缺陷,可能會有什麼樣的陷阱在等着你。app

咱們將介紹下面幾種併發處理和異步代碼的方式:框架

• 裸線程異步

• Executors和Servicesjvm

• ForkJoin框架和並行流

• Actor模型

爲了更加有趣一些,我沒有僅僅經過一些代碼來講明這些方法,而是使用了一個共同的任務,所以每一節中的代碼差很少都是等價的。另外,這些代碼僅僅是展現用的,初始化的代碼並無寫出來,而且它們也不是產品級的軟件示例。

對了,最後一件事:在文章最後,有一個小調查,關於你或者你的組織正在使用哪一種併發模式。爲了你的工程師同胞們,請填一下調查!

任務

任務:實現一個方法,它接收一條消息和一組字符串做爲參數,這些字符串與某個搜索引擎的查詢頁面對應。對每一個字符串,這個方法發出一個http請求來查詢消息,並返回第一條可用的結果,越快越好。

若是有錯誤發生,拋出一個異常或者返回空都是能夠的。我只是嘗試避免爲了等待結果而出現無限循環。

簡單說明:此次我不會真正深刻到多線程如何通信的細節,或者深刻到Java內存模型。若是你迫切地想了解這些,你能夠看我前面的文章利用JCStress測試併發

那麼,讓咱們從最直接、最核心的方式來在JVM上實現併發:手動管理裸線程。

方法1:使用「原汁原味」的裸線程

解放你的代碼,迴歸天然,使用裸線程!線程是併發最基本的單元。Java線程本質上被映射到操做系統線程,而且每一個線程對象對應着一個計算機底層線程。

天然地,JVM管理着線程的生存期,並且只要你不須要線程間通信,你也不須要關注線程調度。

每一個線程有本身的棧空間,它佔用了JVM進程空間的指定一部分。

線程的接口至關簡明,你只須要提供一個Runnable,調用.start()開始計算。沒有現成的API來結束線程,你須要本身來實現,經過相似boolean類型的標記來通信。

在下面的例子中,咱們對每一個被查詢的搜索引擎,建立了一個線程。查詢的結果被設置到AtomicReference,它不須要鎖或者其餘機制來保證只出現一次寫操做。開始吧!

private static String getFirstResult(String question, List<String> engines) {
 AtomicReference<String> result = new AtomicReference<>();
 for(String base: engines) {
   String url = base + question;
   new Thread(() -> {
     result.compareAndSet(null, WS.url(url).get());
   }).start();
 }
 while(result.get() == null); // wait for some result to appear
 return result.get();
}

使用裸線程的主要優勢是,你很接近併發計算的操做系統/硬件模型,而且這個模型很是簡單。多個線程運行,經過共享內存通信,就是這樣。

本身管理線程的最大劣勢是,你很容易過度的關注線程的數量。線程是很昂貴的對象,建立它們須要耗費大量的內存和時間。這是一個矛盾,線程太少,你不能得到良好的併發性;線程太多,將極可能致使內存問題,調度也變得更復雜。

然而,若是你須要一個快速和簡單的解決方案,你絕對可使用這個方法,不要猶豫。

方法2:認真對待Executor和CompletionService

另外一個選擇是使用API來管理一組線程。幸運的是,JVM爲咱們提供了這樣的功能,就是Executor接口。Executor接口的定義很是簡單:

public interface Executor {
 
void execute(Runnable command);
 
}

它隱藏瞭如何處理Runnable的細節。它僅僅說,「開發者!你只是一袋肉,給我任務,我會處理它!」

更酷的是,Executors類提供了一組方法,可以建立擁有完善配置的線程池和executor。咱們將使用newFixedThreadPool(),它建立預約義數量的線程,並不容許線程數量超過這個預約義值。這意味着,若是全部的線程都被使用的話,提交的命令將會被放到一個隊列中等待;固然這是由executor來管理的。

在它的上層,有ExecutorService管理executor的生命週期,以及CompletionService會抽象掉更多細節,做爲已完成任務的隊列。得益於此,咱們沒必要擔憂只會獲得第一個結果。

下面service.take()的一次調用將會只返回一個結果。

private static String getFirstResultExecutors(String question, List<String> engines) {
 ExecutorCompletionService<String> service = new ExecutorCompletionService<String>(Executors.newFixedThreadPool(4));
 
 for(String base: engines) {
   String url = base + question;
   service.submit(() -> {
     return WS.url(url).get();
   });
 }
   try {
     return service.take().get();
   }
   catch(InterruptedException | ExecutionException e) {
     return null;
   }
}

若是你須要精確的控制程序產生的線程數量,以及它們的精確行爲,那麼executor和executor服務將是正確的選擇。例如,須要仔細考慮的一個重要問題是,當全部線程都在忙於作其餘事情時,須要什麼樣的策略?增長線程數量或者不作數量限制?把任務放入到隊列等待?若是隊列也滿了呢?無限制的增長隊列大小?

感謝JDK,已經有不少配置項回答了這些問題,而且有着直觀的名字,例如上面的Executors.newFixedThreadPool(4)。

線程和服務的生命週期也能夠經過選項來配置,使資源能夠在恰當的時間關閉。惟一的不便之處是,對新手來講,配置選項能夠更簡單和直觀一些。然而,在併發編程方面,你幾乎找不到更簡單的了。

總之,對於大型系統,我我的認爲使用executor最合適。

方法3:經過並行流,使用ForkJoinPool (FJP)

Java 8中加入了並行流,今後咱們有了一個並行處理集合的簡單方法。它和lambda一塊兒,構成了併發計算的一個強大工具。

若是你打算運用這種方法,那麼有幾點須要注意。首先,你必須掌握一些函數編程的概念,它實際上更有優點。其次,你很難知道並行流其實是否使用了超過一個線程,這要由流的具體實現來決定。若是你沒法控制流的數據源,你就沒法肯定它作了什麼。

另外,你須要記住,默認狀況下是經過ForkJoinPool.commonPool()實現並行的。這個通用池由JVM來管理,而且被JVM進程內的全部線程共享。這簡化了配置項,所以你不用擔憂。

private static String getFirstResult(String question, List<String> engines) {
 // get element as soon as it is available
 Optional<String> result = engines.stream().parallel().map((base) -> {
   String url = base + question;
   return WS.url(url).get();
 }).findAny();
 return result.get();
}

看上面的例子,咱們不關心單獨的任務在哪裏完成,由誰完成。然而,這也意味着,你的應用程序中可能存在一些停滯的任務,而你卻沒法不知道。在另外一篇關於並行流的文章中,我詳細地描述了這個問題。而且有一個變通的解決方案,雖然它並非世界上最直觀的方案。

ForkJoin是一個很好的框架,由比我更聰明的人來編寫和預先配置。所以當我須要寫一個包含並行處理的小型程序時,它是個人第一選擇。

它最大的缺點是,你必須預見到它可能產生的併發症。若是對JVM沒有總體上的深刻了解,這很難作到。這隻能來自於經驗。

方法4:僱用一個Actor

Actor模型是對咱們本文中所探討的方法的一個奇怪的補充。JDK中沒有actor的實現;所以你必須引用一些實現了actor的庫。

簡短地說,在actor模型中,你把一切都看作是一個actor。一個actor是一個計算實體,就像上面第一個例子中的線程,它能夠從其餘actor那裏接收消息,由於一切都是actor。

在應答消息時,它能夠給其餘actor發送消息,或者建立新的actor並與之交互,或者只改變本身的內部狀態。

至關簡單,但這是一個很是強大的概念。生命週期和消息傳遞由你的框架來管理,你只須要指定計算單元是什麼就能夠了。另外,actor模型強調避免全局狀態,這會帶來不少便利。你能夠應用監督策略,例如免費重試,更簡單的分佈式系統設計,錯誤容忍度等等。

下面是一個使用Akka Actors的例子。Akka Actors有Java接口,是最流行的JVM Actor庫之一。實際上,它也有Scala接口,而且是Scala目前默認的actor庫。Scala曾經在內部實現了actor。很多JVM語言都實現了actor,好比Fantom。這些說明了Actor模型已經被普遍接受,並被看作是對語言很是有價值的補充。

static class Message {
 String url;
 Message(String url) {this.url = url;}
}
static class Result {
 String html;
 Result(String html) {this.html = html;}
}
 
static class UrlFetcher extends UntypedActor {
 
 @Override
 public void onReceive(Object message) throws Exception {
   if (message instanceof Message) {
     Message work = (Message) message;
     String result = WS.url(work.url).get();
     getSender().tell(new Result(result), getSelf());
   } else {
     unhandled(message);
   }
 }
}
 
static class Querier extends UntypedActor {
 private String question;
 private List<String> engines;
 private AtomicReference<String> result;
 
 public Querier(String question, List<String> engines, AtomicReference<String> result) {
 
   this.question = question;
   this.engines = engines;
   this.result = result;
 }
 
 @Override public void onReceive(Object message) throws Exception {
   if(message instanceof Result) {
     result.compareAndSet(null, ((Result) message).html);
     getContext().stop(self());
   }
   else {
     for(String base: engines) {
       String url = base + question;
       ActorRef fetcher = this.getContext().actorOf(Props.create(UrlFetcher.class), "fetcher-"+base.hashCode());
       Message m = new Message(url);
       fetcher.tell(m, self());
     }
   }
 }
}
 
private static String getFirstResultActors(String question, List<String> engines) {
 ActorSystem system = ActorSystem.create("Search");
 AtomicReference<String> result = new AtomicReference<>();
 
 final ActorRef q = system.actorOf(
   Props.create((UntypedActorFactory) () -> new Querier(question, engines, result)), "master");
 q.tell(new Object(), ActorRef.noSender());
 
 while(result.get() == null);
 return result.get();
}

Akka actor在內部使用ForkJoin框架來處理工做。這裏的代碼很冗長。不要擔憂。大部分代碼是消息類Message和Result的定義,而後是兩個不一樣的actor:Querier用來組織全部的搜索引擎,而URLFetcher用來從給定的URL獲取結果。這裏代碼行比較可能是由於我不肯意把不少東西寫在同一行上。Actor模型的強大之處來自於Props對象的接口,經過接口咱們能夠爲actor定義特定的選擇模式,定製的郵箱地址等。結果系統也是可配置的,只包含了不多的活動件。這是一個很好的跡象!

使用Actor模型的一個劣勢是,它要求你避免全局狀態,所以你必須當心的設計你的應用程序,而這可能會使項目遷移變得很複雜。同時,它也有很多優勢,所以學習一些新的範例和使用新的庫是徹底值得的。

反饋時間:你使用什麼?

你最經常使用的併發方式是什麼?你理解它背後的計算模式是什麼嗎?僅僅使用一個包含Job或者後臺任務對象的框架來自動地爲你的代碼添加異步計算能力?

爲了收集更多信息,以找出我是否應該繼續更深刻地講解一些不一樣的併發模式,例如,寫一篇關於Akka如何工做,以及它Java接口的優勢和缺點,我建立了一個簡單的調查。親愛的讀者,請填一下調查表。我很是感謝你的互動!

總結

這篇文章中咱們討論了在Java應用中添加並行的幾種不一樣方法。從咱們本身管理Java線程開始,咱們逐漸地發現更高級的解決方案,執行不一樣的executor服務、ForkJoin框架和actor計算模型。

不知道當你面臨真實問題時該如何選擇?它們都有各自的優缺點,你須要在直觀和易用性、配置和增長/減小機器性能等方面作出選擇。

原文連接: Oleg Shelajev 翻譯: ImportNew.com shenggordon
譯文連接: http://www.importnew.com/14506.html
轉載請保留原文出處、譯者和譯文連接。]

關於做者: shenggordon

相關文章
相關標籤/搜索