本文介紹和點評JVM上的Thread, Thread Pool, Future, Rx, async-await, Fiber, Actor等併發編程模型。本人經驗有限,不免粗陋,還請高手多多指教。
咱們知道程序分爲同步風格和異步風格。java
異步的目的:充分利用計算資源。數據庫
同步使線程阻塞,致使等待。編程
異步是非阻塞的,無需等待。設計模式
若是發生了沒必要要的等待,就會浪費資源,使程序變慢。服務器
好比這樣的程序:網絡
val res1 = get("http://server1") val res2 = get("http://server2") compute(res1, res2)
按照同步編程風格,必定要先拿到res1,才能開始拿res2。session
按照異步編程風格,res1和res2互不依賴,發起對res1的獲取後,沒必要等待結果,而是立刻發起對res2的獲取,到了compute的時候,才須要阻塞等待兩個數據。數據結構
這是一種「順序解耦」。有時候咱們並不要求某些操做按順序執行!那麼爲何要強制其順序呢?異步風格讓咱們能放棄強制,解放資源,減小沒必要要的等待。多線程
若是異步操做能並行,程序性能就提高了,若是不能並行,程序性能就沒有提高。在當今的硬件條件下,通常都能並行,因此異步成爲了趨勢。閉包
怎麼個並行法?這要從計算機架構提及了。讓咱們把任何有處理能力的硬件看作一個處理單元——CPU顯然是主要的處理單元,I/O設備也是處理單元,好比說網卡、內存控制器、硬盤控制器。CPU能夠向一或多個I/O設備發出請求,當設備在準備數據時,CPU能夠作其餘事情(設備就緒後會用中斷通知CPU),這時就有n個硬件在並行了!何況CPU本就是多核的,能作並行計算。除此以外,在分佈式系統中,能同時調動多臺計算機配合完成任務,也是並行。
所以,讓CPU等待、每次只請求一個I/O設備、不利用多核、不利用其餘空閒的計算機,都是比較浪費的。
下面咱們來分析常見的併發編程模型。
這是最簡單的模型,建立線程來執行一個任務,完畢後銷燬線程。當任務數量大時,會建立大量的線程。
你們都知道大量的線程會下降性能,可是你真的清楚性能開銷在哪裏嗎?我試列舉一下:
建立線程
建立一個線程是比較耗時間的。須要請求操做系統、分配棧空間、初始化等工做。
上下文切換
你們都知道的,操做系統基本概念,再也不贅述。值得注意的是,WAITING狀態的線程(多見於I/O等待)幾乎不會被調度,所以並不致使過多的上下文切換。
CPU cache miss
大量線程頻繁切換,勢必要訪問不一樣的數據,打亂了空間局部性,致使CPU cache miss增長,須要常常訪問更慢的內存,會明顯影響CPU密集型程序的性能,這點你們恐怕沒想到吧。
內存佔用
線程會增長內存佔用,線程的棧空間一般佔1MB,1000個就是1GB。並且在棧上引用了不少對象,暫時不能回收,你說有多少個GB?
資源佔用
一些有限的資源,如鎖、數據庫鏈接、文件句柄等,當線程被掛起或阻塞,就暫時無人可用了,浪費!還有死鎖風險!
那麼分配多少線程好呢?
傳統的網絡程序是每一個會話佔用一個鏈接、一個線程。I/O多路複用(I/O multiplexing:多個會話共用一個鏈接)是應C10K問題而生的,C10K就是1萬個鏈接。1萬個鏈接是很耗系統資源的,況且還有1萬個線程。從上文的分析可知,C1K的時候就能夠開始運用I/O多路複用了。
預留一些可反覆使用的線程在一個池裏,反覆地接受任務。線程數量多是固定的,也多是必定範圍內變更的,依所選擇的線程池的實現而定。
這個模型是極其經常使用的,例如Tomcat就是用線程池來處理請求的。
注意——儘可能不要阻塞任務線程;若實在沒法避免,多開一些線程——每阻塞一個線程,線程池就少一個可用的線程。
Java典型的線程池有Executors.newFixedThreadPool
Executors.newFixedThreadPool
Executors.newFixedThreadPool
Executors.newScheduledThreadPool
等等,也能夠直接new ThreadPoolExecutor
(可指定線程數的上限和下限)。
Scala沒有增長新的線程池種類,但有個blocking方法能告訴線程池某個調用會阻塞,須要臨時增長1個線程。
Future是一個將來將會有值的對象,至關於一個佔位符(提貨憑證!)。
將任務投入線程池執行時,可爲任務綁定一個Future,憑此Future便可在將來取得任務執行結果。將來是何時呢?要經過檢查Future內部的狀態來獲知——任務完成時會修改這個狀態,將執行結果存進去。
最初的代碼示例可改寫爲:
// 兩個future是並行的 val f1 = Future { get("http://server1") } val f2 = Future { get("http://server2") } compute(f1.get(), f2.get())
Rx (Reactive Extensions)是響應式編程的一種具體形式。響應式編程是一種面向數據流和變化傳播的編程模式。
咱們知道Java 8提供了Stream類型,表明一個有限或無限的數據流,可應用map, filter, collect等操做。Rx相似於Stream,也是有限或無限的數據流,只不過數據操做能夠委託給線程池異步執行。(Rx也像是生產者/消費者模型的延伸,增長了分發和轉換的能力。對數據流進行鏈接組合,這邊生產,那邊分發和轉換,源源不斷交給消費者。)
以RxJava爲例:
Flowable.just("file.txt") .map(name -> Files.readLines(name)) .subscribe(lines -> System.out.println(lines.size()), Throwable::printStackTrace);
以Reactor爲例:
Flux.fromIterable(getSomeLongList()) .mergeWith(Flux.interval(100)) .doOnNext(serviceA::someObserver) .map(d -> d * 2) .take(3) .onErrorResumeWith(errorHandler::fallback) .doAfterTerminate(serviceM::incrementTerminate) .subscribe(System.out::println);
由代碼可見,對數據流的操做很像是對集合的函數式操做,subscribe就是異步的forEach,doOnNext就是有返回值的異步的forEach。
主流實現有RxJava、Reactor、Akka Streams,API各有不一樣。可是它們都在靠攏Reactive Streams規範,想必會變得愈來愈類似。
async-await是一種特殊語法,能自動把同步風格代碼轉換成異步風格代碼。正確運用,就能使代碼在阻塞時自動讓出控制權。
C#內置的async-await是最完整的實現。Scala經過Async庫提供這個語法,代碼大概是這樣:
val future = async { println("Begin blocking") await { async {Thread.sleep(1000)} } println("End blocking") }
代碼會被自動轉換成多種future的組合形式。無需特地處理,能並行的部分都會自動並行。
Fiber是協程的仿製品。通常多線程是搶佔式調度,你一個任務跑得好好的忽然把你暫停;協程是協做式的,你一個任務阻塞或完成時要主動讓出控制權,讓調度器換入另外一個任務。
async-await自動把代碼轉換成可自動讓出控制權的形式,已經有協程的雛形了。Fiber更加智能,連async-await語法都不用了,只要把代碼寫在Fiber裏面,就像寫在Thread裏面同樣,自動異步化了。
async-await只能暫存當前做用域(轉換成閉包),Fiber則能暫存整個執行棧(每一個做用域只是一個棧幀)。固然了,運用嵌套的async-await也能暫存整個執行棧,我更贊同如此,由於能更好地控制內存佔用。
JVM上主流的實現是Quasar,經過java-agent改寫字節碼來實現,在須要讓出控制權時拋出異常打斷控制流(沒必要擔憂異常方面的性能開銷),保存執行棧,而後換入另外一個任務。
Java示例:
new Fiber<V>() { @Override protected V run() throws SuspendExecution, InterruptedException { // your code } }.start();
Kotlin示例:
fiber @Suspendable { // your code }
代碼中調用的任何會阻塞的方法都要標記@Suspendable,讓Quasar知道調這個方法時要暫停當前Fiber並執行另外一個Fiber,同時用另外的線程池執行會阻塞的方法。
起源於電信領域的Erlang的編程模型。actor是任務處理單元:每一個actor只處理一個任務,每一個任務同時只有一個actor處理(若是有大任務,就要分解成小任務),actor之間用消息來通訊。
在Erlang中,每一個actor是一個輕量級進程,有獨立的內存空間(因此通訊只能靠消息),所以有獨立的垃圾回收,不會stop the world。
actor能夠發了消息就無論了(tell),這是典型的異步;也能夠發了消息等迴應(ask),返回值是一個Future,其實是建立了一個新的actor在悄悄等待迴應,仍然是異步。
actor能夠透明地分佈在不一樣機器上,消息能夠發給本機的actor,也能夠發給遠程的actor。
JVM上惟一成熟的實現是Akka,JVM不能給每一個actor獨立的內存,垃圾回收仍可能stop the world。
actor顯然是一個對象,擁有狀態和行爲。
actor也可被視爲一個閉包,擁有函數和上下文(整個對象的狀態都是上下文)。
actor每次能接收並處理一個消息,處理中能夠發送消息給本身或另外一個actor,而後掛起或結束。
爲何要發送消息給本身呢?由於正在處理消息時是不能掛起的,只能在「一個消息以後,下一個消息以前」的間隙中掛起。
假設你收到一個A消息,執行前半段業務邏輯,要作一次I/O再執行後半段業務邏輯。作I/O時應當結束當前處理,當IO完成時給本身發一個B消息,下次再讓你在處理B消息時完成剩餘業務邏輯。先後邏輯要分開寫,共享變量要聲明爲actor的對象字段。
僞代碼以下:
class MyActor extends BasicActor { var halfDoneResult: XXX = None def receive(): Receive = { case A => { halfDoneResult = 前半段邏輯() doIO(halfDoneResult).onComplete { self ! B() } } case B => 後半段邏輯(halfDoneResult) } }
當actor的狀態要完全改變時,能夠用become操做完全改變actor的行爲。從面向對象編程的設計模式來看,這是state pattern,從函數式編程來看,這是把一個函數變換成另外一個函數。
因而可知,actor模型就是把函數表示成了更容易控制的對象,以便於知足一些併發或分佈式方面的架構約束。
這段邏輯假如改寫成async-await或fiber,僞代碼以下所示,簡單多了:
def logicInAsync() = async { val halfDoneResult = 前半段邏輯() await { doIO(halfDoneResult) } 後半段邏輯(halfDoneResult) } def logicInFiber() = fiber { val halfDoneResult = 前半段邏輯() doIO(halfDoneResult) 後半段邏輯(halfDoneResult) }
能夠看出,相比於async-await或Fiber,actor就是一種狀態機,是較爲底層、不易用的編程模型。可是actor附帶了成熟的分佈式能力。
我感受actor很像異步版的EJB。EJB中有stateless session bean和stateful session bean,actor也可按stateless和stateful來分類。
PayPal的支付系統就是基於Akka的,還爲此編寫並開源了一個Squbs框架。業務邏輯還是用actor實現,Squbs只增長了集成和運維方面的支持(這個也重要)。然而我對此技術路線(業務邏輯基於actor)持審慎態度,接下來就分類說明個人意見:
我認爲,此架構只須要三種通訊模型:消息隊列、同步RPC、異步RPC。
消息隊列、同步RPC都不須要Akka出場,自有各類MQ、RPC框架來解決。至於異步RPC,GRPC是一個跨語言的RPC框架,也可建造一個基於WebSocket協議的RPC框架。若是無需跨語言,也可以讓Akka出場,但不是直接基於Akka編程——而是在Akka之上構建一個RPC層。若是功力較高,可直接基於Netty構建RPC層。
actor進行「請求-響應」往返通訊時,在收到響應以前,請求端的actor要掛起、暫存在內存中。協程進行這種通訊時,則是請求端的執行棧要掛起、暫存在內存中。
這是actor的龍興之地, 也是最合適的用武之地。
以即時聊天(IM)爲例,用actor怎麼實現呢?
所以選擇第一種實現:每一個actor對應一我的,actor要記得它對應哪一個人、消息往來狀況如何,這就是「狀態」!若是10萬用戶在線,就要10萬鏈接(這與IO多路複用無關,對吧?),單機顯然hold不住,須要多機。若是用actor A和actor B不在同一臺機器,就要遠程通訊了。對基於Akka的程序來講,本地通訊或遠程通訊是透明的,贊!
其實不用actor也能實現,一切狀態和關係都能用數據結構來表達,只不過actor可能更方便一些。
總而言之,Akka模仿Erlang,精心設計了業務無關的actor的概念,然而越是精心設計的業務無關的概念越有可能不符合多變的業務需求:)。若是問我用不用actor,我只能說,看狀況吧。也但願有哪位英雄能介紹一兩個非actor不可的場景。
如今,假設有一個微服務架構,在衆多服務中有A、B、C三個服務,調用順序是A->B->C。RPC只能以A->B->C的方向請求,再以C->B->A的方向響應;actor則能讓C直接發送響應給A。但若是C要直接回復A,就要與A創建鏈接,使網絡拓撲和依賴管理都變複雜了——如非必要,勿增複雜。
爲了不,利用MQ來發送響應?MQ就像一個聊天服務,讓分佈各處的服務能彼此聊天。IM、actor、MQ,一切都聯繫起來了,有沒有感覺到妙趣橫生的意境?
可是壓力集中到了MQ的broker,網絡也多了一跳(publisher->broker->consumer),對性能有所影響。
本文介紹、點評了JVM上多種常見的併發模型,並試圖創建模型之間的聯繫,最後以分佈式架構爲例加以分析。
那麼應用程序要怎麼寫呢?看文檔吧,各類庫或框架都但願有人來用,知足它們吧!