這東西其實有不少名詞,好比有的人喜歡稱爲纖程(Fiber),或者綠色線程(GreenThread)。其實最直觀的解釋能夠定義爲線程的線程。有點拗口,但本質上就是這樣。 前端
咱們先回憶一下線程的定義,操做系統產生一個進程,進程再產生若干個線程並行
的處理邏輯,線程的切換由操做系統負責調度。傳統語言C++ Java等線程其實與操做系統線程是1:1的關係,每一個線程都有本身的Stack, Java在64位系統默認Stack大小是1024KB,因此期望一個進程開啓上萬個線程是不現實的。可是實際上咱們也不會這麼幹,由於起這麼多線程並不能充分的利用CPU,大部分線程處於等待狀態,CPU也沒有這麼核讓線程使用。因此通常線程數目都是CPU的核數。java
傳統的J2EE系統都是基於每一個請求佔用一個線程去完成完整的業務邏輯,(包括事務)。因此係統的吞吐能力取決於每一個線程的操做耗時。若是遇到很耗時的I/O行爲,則整個系統的吞吐馬上降低,好比JDBC是同步阻塞的,這也是爲何不少人都說數據庫是瓶頸的緣由。這裏的耗時實際上是讓CPU一直在等待I/O返回,說白了線程根本沒有利用CPU去作運算,而是處於空轉狀態。暴殄天物啊。另外過多的線程,也會帶來更多的ContextSwitch開銷。node
Java的JDK裏有封裝很好的ThreadPool,能夠用來管理大量的線程生命週期,可是本質上仍是不能很好的解決線程數量的問題,以及線程空轉佔用CPU資源的問題。 git
先階段行業裏的比較流行的解決方案之一就是單線程加上異步回調。其表明派是node.js
以及Java裏的新秀Vert.x
。他們的核心思想是同樣的,遇到須要進行I/O操做的地方,就直接讓出CPU資源,而後註冊一個回調函數,其餘邏輯則繼續往下走,I/O結束後帶着結果向事件隊列裏插入執行結果,而後由事件調度器調度回調函數,傳入結果。這時候執行的地方可能就不是你原來的代碼區塊了,具體表如今代碼層面上,你會發現你的局部變量所有丟失,畢竟相關的棧已經被覆蓋了,因此爲了保存以前的棧上數據,你要麼選擇帶着一塊兒放入回調函數裏,要麼就不停的嵌套,從而引發反人類的Callback hell. 程序員
所以相關的Promise,CompletableFuture等技術都是爲解決相關的問題而產生的。可是本質上仍是不能解決業務邏輯的割裂。github
說了這麼多,終於能夠提一下協程了,協程的本質上其實仍是和上面的方法同樣,只不過他的核心點在於調度那塊由他來負責解決,遇到阻塞操做,馬上yield掉,而且記錄當前棧上的數據,阻塞完後馬上再找一個線程恢復棧並把阻塞的結果放到這個線程上去跑,這樣看上去好像跟寫同步代碼沒有任何差異,這整個流程能夠稱爲coroutine
,而跑在由coroutine負責調度的線程稱爲Fiber
。好比Golang裏的 go
關鍵字其實就是負責開啓一個Fiber
,讓func
邏輯跑在上面。而這一切都是發生的用戶態上,沒有發生在內核態上,也就是說沒有ContextSwitch上的開銷。golang
既然咱們的標題叫Java裏的協程,天然咱們會討論JVM上的實現,JVM上早期有kilim
以及如今比較成熟的Quasar
。而本文章會所有基於Quasar
,由於kilim
已經好久不更新了。數據庫
上面已經說明了什麼是Fiber
,什麼是coroutine
。這裏嘗試經過Quasar
來實現相似於golang的coroutine
以及channel
。這裏假設各位已經大體瞭解golang。編程
爲了對比,這裏先用golang實現一個對於10之內天然數分別求平方的例子,固然了能夠直接單線程for循環就完事了,可是爲了凸顯coroutine的高逼格,咱們仍是要稍微複雜化一點的。後端
func counter(out chan<- int) { for x := 0; x < 10; x++ { out <- x } close(out) } func squarer(out chan<- int, in <-chan int) { for v := range in { out <- v * v } close(out) } func printer(in <-chan int) { for v := range in { fmt.Println(v) } } func main() { //定義兩個int類型的channel naturals := make(chan int) squares := make(chan int) //產生兩個Fiber,用go關鍵字 go counter(naturals) go squarer(squares, naturals) //獲取計算結果 printer(squares) }
上面的例子,有點相似生產消費者模式,經過channel兩解耦兩邊的數據共享。你們能夠將channel理解爲Java裏的SynchronousQueue
。那傳統的基於線程模型的Java實現方式,想必你們都知道怎麼作,這裏就不囉嗦了,我直接上Quasar
版的,幾乎能夠原封不動的copy golang的代碼。
public class Example { private static void printer(Channel<Integer> in) throws SuspendExecution, InterruptedException { Integer v; while ((v = in.receive()) != null) { System.out.println(v); } } public static void main(String[] args) throws ExecutionException, InterruptedException, SuspendExecution { //定義兩個Channel Channel<Integer> naturals = Channels.newChannel(-1); Channel<Integer> squares = Channels.newChannel(-1); //運行兩個Fiber實現. new Fiber(() -> { for (int i = 0; i < 10; i++) naturals.send(i); naturals.close(); }).start(); new Fiber(() -> { Integer v; while ((v = naturals.receive()) != null) squares.send(v * v); squares.close(); }).start(); printer(squares); } }
看起來Java彷佛要囉嗦一點,沒辦法這是Java的風格,並且畢竟不是語言上支持coroutine,是經過第三方的庫。到後面我會考慮用其餘JVM上的語言去實現,這樣會顯得更精簡一點。
說到這裏各位確定對Fiber很好奇了。也許你會表示懷疑Fiber是否是如上面所描述的那樣,下面咱們嘗試用Quasar創建一百萬個Fiber,看看內存佔用多少,我先嚐試了建立百萬個Thread
。
for (int i = 0; i < 1_000_000; i++) { new Thread(() -> { try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); }
很不幸,直接報Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread
,這是情理之中的。下面是經過Quasar
創建百萬個Fiber
public static void main(String[] args) throws ExecutionException, InterruptedException, SuspendExecution { int FiberNumber = 1_000_000; CountDownLatch latch = new CountDownLatch(1); AtomicInteger counter = new AtomicInteger(0); for (int i = 0; i < FiberNumber; i++) { new Fiber(() -> { counter.incrementAndGet(); if (counter.get() == FiberNumber) { System.out.println("done"); } Strand.sleep(1000000); }).start(); } latch.await(); }
我這裏加了latch,阻止程序跑完就關閉,Strand.sleep
其實跟Thread.sleep
同樣,只是這裏針對的是Fiber
最終控制檯是能夠輸出done
的,說明程序已經建立了百萬個Fiber,設置Sleep是爲了讓Fiber
一直運行,從而方便計算內存佔用。官方宣稱一個空閒的Fiber
大約佔用400Byte
,那這裏應該是佔用400MB
堆內存,可是這裏經過jmap -heap pid
顯示大約佔用了1000MB
,也就是說一個Fiber
佔用1KB。
其實Quasar實現的coroutine的方式與Golang很像,只不過一個是框架級別實現,一個是語言內置機制而已。
若是你熟悉了Golang的調度機制,那理解Quasar的調度機制就會簡單不少,由於二者是差很少的。
Quasar裏的Fiber實際上是一個continuation,他能夠被Quasar定義的scheduler調度,一個continuation記錄着運行實例的狀態,並且會被隨時中斷,而且也會隨後在他被中斷的地方恢復。Quasar實際上是經過修改bytecode來達到這個目的,因此運行Quasar程序的時候,你須要先經過java-agent在運行時修改你的代碼,固然也能夠在編譯期間這麼幹。golang的內置了本身的調度器,Quasar則默認使用ForkJoinPool
這個JDK7之後纔有的,具備work-stealing
功能的線程池來當調度器。work-stealing很是重要,由於你不清楚哪一個Fiber會先執行完,而work-stealing能夠動態的從其餘的等等隊列偷一個context過來,這樣能夠最大化使用CPU資源。
那這裏你會問了,Quasar怎麼知道修改哪些字節碼呢,其實也很簡單,Quasar會經過java-agent在運行時掃描哪些方法是能夠中斷的,同時會在方法被調用前和調度後的方法內插入一些continuation
邏輯,若是你在方法上定義了@Suspendable
註解,那Quasar會對調用該註解的方法作相似下面的事情。
這裏假設你在方法f
上定義了@Suspendable
,同時去調用了有一樣註解的方法g
,那麼全部調用f
的方法會插入一些字節碼,這些字節碼的邏輯就是記錄當前Fiber棧上的狀態,以便在將來能夠動態的恢復。(Fiber相似線程也有本身的棧)。在suspendable方法鏈內
Fiber的父類會調用Fiber.park
,這樣會拋出SuspendExecution
異常,從而來中止線程
的運行,好讓Quasar的調度器執行調度。這裏的SuspendExecution
會被Fiber本身捕獲,業務層面上不該該捕獲到。若是Fiber被喚醒了(調度器層面會去調用Fiber.unpark
),那麼f
會在被中斷的地方從新被調用(這裏Fiber會知道本身在哪裏被中斷),同時會把g
的調用結果(g
會return結果)插入到f
的恢復點,這樣看上去就好像g
的return是f
的local variables
了,從而避免了callback嵌套。
上面囉嗦了一大堆,其實簡單點講就是,想辦法讓運行中的線程棧停下來,好讓Quasar的調度器介入。JVM線程中斷的條件只有兩個,一個是拋異常,另一個就是return。這裏Quasar就是經過拋異常的方式來達到的,因此你會看到我上面的代碼會拋出SuspendExecution
。可是若是你真捕獲到這個異常,那就說明有問題了,因此通常會這麼寫。
@Suspendable public int f() { try { // do some stuff return g() * 2; } catch(SuspendExecution s) { //這裏不該該捕獲到異常. throw new AssertionError(s); } }
在github上無心中發現一個有趣的benchmark,大體是測試各類語言在生成百萬actor/Fiber的開銷skynet。
大體的邏輯是先生成10個Fiber,每一個Fiber再生成10個Fiber,直到生成1百萬個Fiber,而後每一個Fiber作加法累積計算,並把結果發到channel裏,這樣一直遞歸到根Fiber。後將最終結果發到channel。若是邏輯沒有錯的話結果應該是499999500000。咱們搞個Quasar版的,來測試一下性能。
全部的測試都是基於個人Macbook Pro Retina 2013later。Quasar-0.7.5:JDK8,JDK 1.8.0_91,Golang 1.6
public class Skynet { private static final int RUNS = 4; private static final int BUFFER = 1000; // = 0 unbufferd, > 0 buffered ; < 0 unlimited static void skynet(Channel<Long> c, long num, int size, int div) throws SuspendExecution, InterruptedException { if (size == 1) { c.send(num); return; } Channel<Long> rc = newChannel(BUFFER); long sum = 0L; for (int i = 0; i < div; i++) { long subNum = num + i * (size / div); new Fiber(() -> skynet(rc, subNum, size / div, div)).start(); } for (int i = 0; i < div; i++) sum += rc.receive(); c.send(sum); } public static void main(String[] args) throws Exception { //這裏跑4次,是爲了讓JVM預熱好作優化,因此咱們以最後一個結果爲準。 for (int i = 0; i < RUNS; i++) { long start = System.nanoTime(); Channel<Long> c = newChannel(BUFFER); new Fiber(() -> skynet(c, 0, 1_000_000, 10)).start(); long result = c.receive(); long elapsed = (System.nanoTime() - start) / 1_000_000; System.out.println((i + 1) + ": " + result + " (" + elapsed + " ms)"); } } }
golang的代碼我就不貼了,你們能夠從github上拿到,我這裏直接貼出結果。
platform | time |
---|---|
Golang | 261ms |
Quasar | 612ms |
從Skynet測試中能夠看出,Quasar的性能對比Golang仍是有差距的,可是不該該達到兩倍多吧,通過向Quasar做者求證才得知這個測試並無測試出實際性能,只是測試調度開銷而已。
由於skynet方法內部幾乎沒有作任何事情,只是簡單的作了一個加法而後進一步的遞歸生成新的Fiber而已,至關於只是測試了Quasar生成並調度百萬Fiber所須要的時間而已。而Java裏的加法操做開銷遠比生成Fiber的開銷要低,所以感受總體性能不如golang(golang的coroutine是語言級別的)。
實際上咱們在實際項目中生成的Fiber中不可能只作一下簡單的加法就退出,至少要花費1ms作一些簡單的事情吧,(Quasar裏Fiber的調度差很少在us級別),因此咱們考慮在skynet里加一些比較耗時的操做,好比隨機生成1000個整數並對其進行排序,這樣Fiber裏算是有了相應的性能開銷,與調度的開銷相比,調度的開銷就能夠忽略不計了。(你們能夠把調度開銷想象成不定積分的常數)。
下面我分別爲兩種語言了加了數組排序邏輯,並插在響應的Fiber裏。
public class Skynet { private static Random random = new Random(); private static final int NUMBER_COUNT = 1000; private static final int RUNS = 4; private static final int BUFFER = 1000; // = 0 unbufferd, > 0 buffered ; < 0 unlimited private static void numberSort() { int[] nums = new int[NUMBER_COUNT]; for (int i = 0; i < NUMBER_COUNT; i++) nums[i] = random.nextInt(NUMBER_COUNT); Arrays.sort(nums); } static void skynet(Channel<Long> c, long num, int size, int div) throws SuspendExecution, InterruptedException { if (size == 1) { c.send(num); return; } //加入排序邏輯 numberSort(); Channel<Long> rc = newChannel(BUFFER); long sum = 0L; for (int i = 0; i < div; i++) { long subNum = num + i * (size / div); new Fiber(() -> skynet(rc, subNum, size / div, div)).start(); } for (int i = 0; i < div; i++) sum += rc.receive(); c.send(sum); } public static void main(String[] args) throws Exception { for (int i = 0; i < RUNS; i++) { long start = System.nanoTime(); Channel<Long> c = newChannel(BUFFER); new Fiber(() -> skynet(c, 0, 1_000_000, 10)).start(); long result = c.receive(); long elapsed = (System.nanoTime() - start) / 1_000_000; System.out.println((i + 1) + ": " + result + " (" + elapsed + " ms)"); } } }
const ( numberCount = 1000 loopCount = 1000000 ) //排序函數 func numberSort() { nums := make([]int, numberCount) for i := 0; i < numberCount; i++ { nums[i] = rand.Intn(numberCount) } sort.Ints(nums) } func skynet(c chan int, num int, size int, div int) { if size == 1 { c <- num return } //加了排序邏輯 numberSort() rc := make(chan int) var sum int for i := 0; i < div; i++ { subNum := num + i*(size/div) go skynet(rc, subNum, size/div, div) } for i := 0; i < div; i++ { sum += <-rc } c <- sum } func main() { c := make(chan int) start := time.Now() go skynet(c, 0, loopCount, 10) result := <-c took := time.Since(start) fmt.Printf("Result: %d in %d ms.\n", result, took.Nanoseconds()/1e6) }
platform | time |
---|---|
Golang | 23615ms |
Quasar | 15448ms |
最後再進行一次測試,發現Java的性能優點體現出來了。幾乎是golang的1.5倍,這也許是JVM/JDK通過多年優化的優點。由於加了業務邏輯後,對比的就是各類庫以及編譯器對語言的優化了,協程調度開銷幾乎能夠忽略不計。
其實早在JDK1的時代,Java的線程被稱爲GreenThread
,那個時候就已經有了Fiber,可是當時不能與操做系統實現N:M綁定,因此放棄了。如今Quasar憑藉ForkJoinPool
這個成熟的線程調度庫。
另外,若是你但願你的代碼可以跑在Fiber裏面,須要一個很大的前提條件,那就是你全部的庫,必須是異步無阻塞的,也就說必須相似於node.js上的庫,全部的邏輯都是異步回調,而自Java裏基本上全部的庫都是同步阻塞的,不多見到異步無阻塞的。並且得益於J2EE,以及Java上的三大框架(SSH)洗腦,大部分Java程序員都已經習慣了基於線程,線性的完成一個業務邏輯,很難讓他們接受一種將邏輯割裂的異步編程模型。
可是隨着異步無阻塞這股風氣起來,以及相關的coroutine
語言Golang大力推廣,人們愈來愈知道如何更好的榨乾CPU性能(讓CPU避免沒必要要的等待,減小上下文切換),阻塞的行爲基本發生在I/O上,若是能有一個庫能把全部的I/O行爲都包裝成異步阻塞的話,那麼Quasar就會有用武之地,JVM上公認的是異步網絡通訊庫是Netty,經過Netty基本解決了網絡I/O問題,另外還有一個是文件I/O,而這個JDK7提供的NIO2就能夠知足,經過AsynchronousFileChannel
便可。
剩下的就是如何將他們封裝成更友好的API了。目前能達到生產級別的這種異步工具庫,JVM上只有Vert.x3
,封裝了Netty4,封裝了AsynchronousFileChannel
,並且Vert.x官方也出了一個相對應的封裝了Quasar
的庫vertx-sync
。
Quasar目前是由一家商業公司Parallel Universe控制着,且有本身的一套體系,包括Quasar-actor,Quasar-galaxy等各個模塊,可是Quasar-core是開源的,此外Quasar本身也經過Fiber封裝了不少的第三方庫,目前全都在comsat這個項目裏。隨便找一個項目看看,你會發現其實經過Quasar的Fiber去封裝第三方的同步庫仍是很簡單的。
異步無阻塞的編碼方式其實有不少種實現,好比node.js的提倡的Promise,對應到Java8的就是CompletableFuture。
另外事件響應式也算是一個比較流行的作法,好比ReactiveX系列,RxJava,Rxjs,RxSwift,等。我我的以爲RxJava是一個很是好的函數式響應實現(JDK9會有對應的JDK實現),可是咱們不能要求全部的程序員一眼就提煉出業務裏的functor,monad(這些能力須要長期浸淫在函數式編程思想裏),反而RxJava特別適合用在前端與用戶交互的部分,由於用戶的點擊滑動行爲是一個個真實的事件流,這也是爲何RxJava在Android端很是火的緣由,然後端基本上都是經過Rest請求過來,每個請求其實已經限定了業務範圍,不會再有複雜的事件邏輯,因此基本上RxJava在Vert.x這端只是作了一堆的flatmap,再加上微服務化,全部的業務邏輯都已經作了最小的邊界,因此順序的同步的編碼方式更適合寫業務邏輯的後端程序員。
因此這裏Golang開了個好頭,可是Golang也有其自身的限制,好比不支持泛型,固然這個仁者見仁智者見智了,包的依賴管理比較弱,此外Golang沒有線程池的概念,若是coroutine裏的邏輯發生了阻塞,那麼整個程序會hang死。而這點Vert.x提供了一個Worker Pool的概念,能夠將須要耗時執行的邏輯包到線程池裏面,執行完後異步返回給EventLoop線程。
下一篇咱們來研究一下vertx-sync
,讓vert.x裏全部的異步編碼方式同步化,完全解決Vert.x
裏的Callback Hell。
本文系力譜宿雲LeapCloud旗下 MaxLeap 團隊成員:劉小溪【原創】,轉載請務必註明做者及原創地址
原創首發地址:https://blog.maxleap.cn/archi...