http://www.open-open.com/lib/view/open1468892776625.htmlhtml
最近遇到的一個問題大概是微服務架構中常常會遇到的一個問題:java
服務 A 是咱們開發的系統,它的業務須要調用 B 、 C 、 D 等多個服務,這些服務是經過http的訪問提供的。 問題是 B 、 C 、 D 這些服務都是第三方提供的,不能保證它們的響應時間,快的話十幾毫秒,慢的話甚至1秒多,因此這些服務的Latency比較長。幸運地是這些服務都是集羣部署的,容錯率和併發支持都比較高,因此不擔憂它們的併發性能,惟一不爽的就是就是它們的Latency過高了。react
系統A會從Client接收Request, 每一個Request的處理都須要屢次調用B、C、D的服務,因此完成一個Request可能須要1到2秒的時間。爲了讓A能更好地支持併發數,系統中使用線程池處理這些Request。固然這是一個很是簡化的模型,實際的業務處理比較複雜。git
能夠預見,由於系統B、C、D的延遲,致使整個業務處理都很慢,即便使用線程池,可是每一個線程仍是會阻塞在B、C、D的調用上,致使I/O阻塞了這些線程, CPU利用率相對來講不是那麼高。github
固然在測試的時候使用的是B、C、D的模擬器,沒有預想到它們的響應是那麼慢,所以測試數據的結果還不錯,吞吐率還能夠,可是在實際環境中問題就暴露出來了。web
最開始線程池設置的是200,而後用HttpUrlConnection做爲http client發送請求到B、C、D。固然HttpUrlConnection也有一些坑,好比 Persistent Connections 、 Caveats of HttpURLConnection ,跳出坑後性能依然不行。算法
經過測試,若是B、C、D等服務延遲接近0毫秒,則HttpUrlConnection的吞吐率(線程池的大小爲200)能到40000 requests/秒,可是隨着第三方服務的響應時間變慢,它的吞吐率急劇降低,B、C、D的服務的延遲爲100毫秒的時候,則HttpUrlConnection的吞吐率降到1800 requests/秒,而B、C、D的服務的延遲爲100毫秒的時候HttpUrlConnection的吞吐率降到550 requests/秒。數據庫
增長 http.maxConnections 系統屬性並不能顯著增長吞吐率。apache
若是增長調用HttpUrlConnection的線程池的大小,好比增長到2000,性能會好一些,可是B、C、D的服務的延遲爲500毫秒的時候,吞吐率爲3800 requests/秒,延遲爲1秒的時候,吞吐率爲1900 requests/秒。編程
雖然線程池的增大能帶來性能的提高,可是線程池也不能無限制的增大,由於每一個線程都會佔用必定的資源,並且隨着線程的增多,線程之間的切換也更加的頻繁,對CPU等資源也是一種浪費。
切換成其它http的實現,好比netty,也沒法帶來更好的性能。系統A 嚴重依賴Http協議,而Http協議又是一個 blocking 協議,只有等到Response返回才能夠發送下一個請求(雖然有些http server支持http pipelining,可是咱們沒法保證/控制第三方的B、C、D支持這個特性)。
下面列出了一些經常使用的http client:
這個列表摘自 High-Concurrency HTTP Clients on the JVM ,不止於此,這篇文章重點介紹基於java纖程庫quasar的實現的http client庫,並比較了性能。咱們待會再說。
回到我前面所說的系統,如何能更好的提供性能?有一種方案是藉助其它語言的優點,好比Go,讓Go來代理完成和B、C、D的請求,系統A經過一個TCP鏈接與Go程序交流。第三方服務B、C、D的Response結果能夠異步地返回給系統A。
Go的優點在於能夠實現request-per-goroutine,整個系統中能夠有成千上萬個goroutine。 goroutine是輕量級的,並且在I/O阻塞的時候能夠不佔用線程,這讓Go能夠輕鬆地處理上萬個連接,即便I/O阻塞也沒問題。Go和Java之間的通信協議能夠經過Protobuffer來實現,並且它們之間只保留一個TCP鏈接便可。
固然這種架構的修改帶來系統穩定性的下降,服務A和服務B、C、D之間的通信增長了複雜性。同時,由於是異步方式,服務A的業務也要實現異步方式,不然200個線程依然等待Response的話,仍是一個阻塞的架構。
經過測試,這種架構能夠帶來穩定的吞吐率。 無論服務B、C、D的延遲有多久,A的吞吐率能維持15000 requests/秒。固然Go到B、C、D的併發鏈接數也有限制,我把最大值調高到20000。
這種曲折的方案的最大的兩個弊病就是架構的複雜性以及對原有系統須要進行大的重構。 高複雜性帶來的是系統的穩定性的下降,包括部署、維護、網絡情況、系統資源等。同時系統要改爲異步模型,由於系統業務線程發送Request後不能等待Go返回Response,它須要從Client接收更多的Request,而收到Response以後它才繼續執行剩下的業務,只有這樣纔不會阻塞,進而提到系統的吞吐率。
將系統A改爲異步,而後使用HttpUrlConnection線程池行不行?HttpUrlConnection線程池仍是致使和B、C、D通信的吞吐率降低,可是Go這種方案和B、C、D通信的吞吐率能夠維持一個較高的水平。
考慮到Go的優點,那麼能不能在Java中使用相似Go的這種goroutine模型呢?那就是本文要介紹的Java纖程庫: [Quasar]( http://docs.paralleluniverse.co/quasar/)。
Java官方並無纖程庫。可是偉大的社區提供了一個優秀的庫,它就是Quasar。
創始人是 Ron Pressler和Dafna Pressler ,由Y Combinator孵化。
Quasar is a library that provides high-performance lightweight threads, Go-like channels, Erlang-like actors, and other asynchronous programming tools for Java and Kotlin.
Quasar提供了高性能輕量級的線程,提供了相似Go的channel,Erlang風格的actor,以及其它的異步編程的工具,能夠用在Java和Kotlin編程語言中。Scala目前的支持還不完善,我想若是這個公司能快速的發展壯大,或者被一些大公司收購的話,對Scala的支持才能提上日程。
你須要把下面的包加入到你的依賴中:
Quasar fiber依賴java instrumentation修改你的代碼,能夠在運行時經過java Agent實現,也能夠在編譯時使用ant task實現。
經過java agent很簡單,在程序啓動的時候將下面的指令加入到命令行:
-javaagent:path-to-quasar-jar.jar
對於maven來講,你可使用插件 maven-dependency-plugin ,它會爲你的每一個依賴設置一個屬性,以便在其它地方引用,咱們主要想使用 ${co.paralleluniverse:quasar-core:jar} :
<plugin> <artifactId>maven-dependency-plugin</artifactId> <version>2.5.1</version> <executions> <execution> <id>getClasspathFilenames</id> <goals> <goal>properties</goal> </goals> </execution> </executions> </plugin>
而後你能夠配置 exec-maven-plugin 或者 maven-surefire-plugin 加上agent參數,在執行maven任務的時候久可使用Quasar了。
官方提供了一個 Quasar Maven archetype ,你能夠經過下面的命令生成一個quasar應用原型:
git clone https://github.com/puniverse/quasar-mvn-archetype cdquasar-mvn-archetype mvn install cd.. mvn archetype:generate -DarchetypeGroupId=co.paralleluniverse -DarchetypeArtifactId=quasar-mvn-archetype -DarchetypeVersion=0.7.4-DgroupId=testgrp -DartifactId=testprj cdtestprj mvn test mvn clean compile dependency:properties exec:exec
若是你使用gradle,能夠看一下gradle項目模版: Quasar Gradle template project 。
最容易使用Quasar的方案就是使用Java Agent,它能夠在運行時instrument程序。若是你想編譯的時候就使用AOT instrumentation(Ahead-of-Time),可使用Ant任務 co.paralleluniverse.fibers.instrument.InstrumentationTask ,它包含在quasar-core.jar中。
Quasar最主要的貢獻就是提供了輕量級線程的實現,叫作fiber(纖程)。Fiber的功能和使用相似Thread, API接口也相似,因此使用起來沒有違和感,可是它們不是被操做系統管理的,它們是由一個或者多個ForkJoinPool調度。一個idle fiber只佔用400K內存,切換的時候佔用更少的CPU,你的應用中能夠有上百萬的fiber,顯然Thread作不到這一點。這一點和Go的goroutine相似。
Fiber並不意味着它能夠在全部的場景中均可以替換Thread。當fiber的代碼常常會被等待其它fiber阻塞的時候,就應該使用fiber。
對於那些須要CPU長時間計算的代碼,不多遇到阻塞的時候,就應該首選thread
以上兩條是選擇fiber仍是thread的判斷條件,主要仍是看任務是I/O blocking相關仍是CPU相關。幸運地是,fiber API使用和thread使用相似,因此代碼略微修改久就能夠兼容。
Fiber特別適合替換哪些異步回調的代碼。使用 FiberAsync 異步回調很簡單,並且性能很好,擴展性也更高。
相似Thread, fiber也是用Fiber類表示:
newFiber<V>() { @Override protectedVrun()throwsSuspendExecution, InterruptedException { // your code } }.start();
與Thread相似,但也有些不一樣。Fiber能夠有一個返回值,類型爲泛型V,也能夠爲空Void。 run 也能夠拋出異常 InterruptedException 。
你能夠傳遞 SuspendableRunnable 或 SuspendableCallable 給Fiber的構造函數:
newFiber<Void>(newSuspendableRunnable() { publicvoidrun()throwsSuspendExecution, InterruptedException { // your code } }).start();
甚至你能夠調用Fiber的join方法等待它完成,調用 get 方法獲得它的結果。
Fiber繼承Strand類。Strand類表明一個Fiber或者Thread,提供了一些底層的方法。
逃逸的Fiber(Runaway Fiber)是指那些陷入循環而沒有block、或者block fiber自己運行的線程的Fiber。偶爾有逃逸的fiber沒有問題,可是太頻繁會致使性能的降低,由於須要調度器的線程可能都忙於逃逸fiber了。Quasar會監控這些逃逸fiber,你能夠經過JMX監控。若是你不想監控,能夠設置系統屬性 co.paralleluniverse.fibers.detectRunawayFibers 爲 false 。
fiber中的 ThreadLocal 是fiber local的。 InheritableThreadLocal 繼承父fiber的值。
Fiber、SuspendableRunnable 、SuspendableCallable 的 run 方法會拋出SuspendExecution異常。但這並非真正意義的異常,而是fiber內部工做的機制,經過這個異常暫停因block而須要暫停的fiber。
任何在Fiber中運行的方法,須要聲明這個異常(或者標記@Suspendable),都被稱爲suspendable method。
反射調用一般都被認爲是suspendable, Java8 lambda 也被認爲是suspendable。不該該將類構造函數或類初始化器標記爲suspendable。
synchronized 語句塊或者方法會阻塞操做系統線程,因此它們不該該標記爲suspendable。Blocking線程調用默認也不被quasar容許。可是這兩種狀況均可以被quasar處理,你須要在Quasar javaagent中分別加上 m 和 b 參數,或者ant任務中加上 allowMonitors 和 allowBlocking 屬性。
Quasar最初fork自 Continuations Library 。
若是你瞭解其它語言的coroutine, 好比Lua,你久比較容易理解quasar的fiber了。 Fiber實質上是continuation , continuation能夠捕獲一個計算的狀態,能夠暫停當前的計算,等隔一段時間能夠繼續執行。Quasar經過instrument修改suspendable方法。Quasar的調度器使用 ForkJoinPool 調度這些fiber。
Fiber調度器FiberScheduler是一個高效的、work-stealing、多線程的調度器。
默認的調度器是 FiberForkJoinScheduler ,可是你可使用本身的線程池去調度,請參考 FiberExecutorScheduler 。
當一個類被加載時,Quasar的instrumentation模塊 (使用 Java agent時) 搜索suspendable 方法。每個suspendable 方法 f 經過下面的方式 instrument:
它搜索對其它suspendable方法的調用。對suspendable方法 g 的調用,一些代碼會在這個調用 g 的先後被插入,它們會保存和恢復fiber棧本地變量的狀態,記錄這個暫停點。在這個「suspendable function chain」的最後,咱們會發現對 Fiber.park 的調用。 park 暫停這個fiber,扔出 SuspendExecution異常。
當 g block的時候,SuspendExecution異常會被Fiber捕獲。 當Fiber被喚醒(使用unpark), 方法 f 會被調用, 執行記錄顯示它被block在g的調用上,因此程序會當即跳到 f 調用 g 的那一行,而後調用它。最終咱們會到達暫停點,而後繼續執行。當 g 返回時, f 中插入的代碼會恢復f的本地變量。
過程聽起來很複雜,可是它只會帶來3% ~ 5%的性能的損失。
下面看一個簡單的例子, 方法m2聲明拋出SuspendExecution異常,方法m1調用m2和m3,因此也聲明拋出這個異常,最後這個異常會被Fiber所捕獲:
publicclassHelloworld{ staticvoidm1()throwsSuspendExecution, InterruptedException { String m = "m1"; System.out.println("m1 begin"); m = m2(); m = m3(); System.out.println("m1 end"); System.out.println(m); } staticString m2()throwsSuspendExecution, InterruptedException { return"m2"; } staticString m3()throwsSuspendExecution, InterruptedException { return"m3"; } staticpublicvoidmain(String[] args)throwsExecutionException, InterruptedException { newFiber<Void>("Caller",newSuspendableRunnable() { @Override publicvoidrun()throwsSuspendExecution, InterruptedException { m1(); } }).start(); } }
反編譯這段代碼 (通常的反編譯軟件如jd-gui不能把這段代碼反編譯java文件, Procyon 雖然能反編譯,可是感受反編譯有錯。因此咱們仍是看字節碼吧):
@Instrumented(suspendableCallSites={16,17}, methodStart=13, methodEnd=21, methodOptimized=false) staticvoidm1() throwsSuspendExecution, InterruptedException { // Byte code: // 0: aconst_null // 1: astore_3 // 2: invokestatic 88 co/paralleluniverse/fibers/Stack:getStack ()Lco/paralleluniverse/fibers/Stack; // 5: dup // 6: astore_1 // 7: ifnull +42 -> 49 // 10: aload_1 // 11: iconst_1 // 12: istore_2 // 13: invokevirtual 92 co/paralleluniverse/fibers/Stack:nextMethodEntry ()I // 16: tableswitch default:+24->40, 1:+64->80, 2:+95->111 // 40: aload_1 // 41: invokevirtual 96 co/paralleluniverse/fibers/Stack:isFirstInStackOrPushed ()Z // 44: ifne +5 -> 49 // 47: aconst_null // 48: astore_1 // 49: iconst_0 // 50: istore_2 // 51: ldc 2 // 53: astore_0 // 54: getstatic 3 java/lang/System:out Ljava/io/PrintStream; // 57: ldc 4 // 59: invokevirtual 5 java/io/PrintStream:println (Ljava/lang/String;)V // 62: aload_1 // 63: ifnull +26 -> 89 // 66: aload_1 // 67: iconst_1 // 68: iconst_1 // 69: invokevirtual 100 co/paralleluniverse/fibers/Stack:pushMethod (II)V // 72: aload_0 // 73: aload_1 // 74: iconst_0 // 75: invokestatic 104 co/paralleluniverse/fibers/Stack:push (Ljava/lang/Object;Lco/paralleluniverse/fibers/Stack;I)V // 78: iconst_0 // 79: istore_2 // 80: aload_1 // 81: iconst_0 // 82: invokevirtual 108 co/paralleluniverse/fibers/Stack:getObject (I)Ljava/lang/Object; // 85: checkcast 110 java/lang/String // 88: astore_0 // 89: invokestatic 6 com/colobu/fiber/Helloworld:m2 ()Ljava/lang/String; // 92: astore_0 // 93: aload_1 // 94: ifnull +26 -> 120 // 97: aload_1 // 98: iconst_2 // 99: iconst_1 // 100: invokevirtual 100 co/paralleluniverse/fibers/Stack:pushMethod (II)V // 103: aload_0 // 104: aload_1 // 105: iconst_0 // 106: invokestatic 104 co/paralleluniverse/fibers/Stack:push (Ljava/lang/Object;Lco/paralleluniverse/fibers/Stack;I)V // 109: iconst_0 // 110: istore_2 // 111: aload_1 // 112: iconst_0 // 113: invokevirtual 108 co/paralleluniverse/fibers/Stack:getObject (I)Ljava/lang/Object; // 116: checkcast 110 java/lang/String // 119: astore_0 // 120: invokestatic 7 com/colobu/fiber/Helloworld:m3 ()Ljava/lang/String; // 123: astore_0 // 124: getstatic 3 java/lang/System:out Ljava/io/PrintStream; // 127: ldc 8 // 129: invokevirtual 5 java/io/PrintStream:println (Ljava/lang/String;)V // 132: getstatic 3 java/lang/System:out Ljava/io/PrintStream; // 135: aload_0 // 136: invokevirtual 5 java/io/PrintStream:println (Ljava/lang/String;)V // 139: aload_1 // 140: ifnull +7 -> 147 // 143: aload_1 // 144: invokevirtual 113 co/paralleluniverse/fibers/Stack:popMethod ()V // 147: return // 148: aload_1 // 149: ifnull +7 -> 156 // 152: aload_1 // 153: invokevirtual 113 co/paralleluniverse/fibers/Stack:popMethod ()V // 156: athrow // Line number table: // Java source line #13 -> byte code offset #51 // Java source line #15 -> byte code offset #54 // Java source line #16 -> byte code offset #62 // Java source line #17 -> byte code offset #93 // Java source line #18 -> byte code offset #124 // Java source line #19 -> byte code offset #132 // Java source line #21 -> byte code offset #139 // Local variable table: // start length slot name signature // 53 83 0 m String // 6 147 1 localStack co.paralleluniverse.fibers.Stack // 12 99 2 i int // 1 1 3 localObject Object // 156 1 4 localSuspendExecution SuspendExecution // Exception table: // from to target type // 49 148 148 finally // 49 148 156 co/paralleluniverse/fibers/SuspendExecution // 49 148 156 co/paralleluniverse/fibers/RuntimeSuspendExecution }
這段反編譯的代碼顯示了方法m被instrument後的樣子,雖然咱們不能很清楚的看到代碼執行的樣子,可是也能夠大概地看到它實際在方法的最開始加入了此方法的棧信息的檢查(#0 ~ #49,若是是第一次運行這個方法,則直接運行,而後在一些暫停點上加上一些棧壓入的處理,而且能夠在下次執行的時候直接跳到上次的暫停點上。
官方的工程師關於Quasar的instrument操做以下:
我並無更深刻的去了解Quasar的實現細節以及調度算法,有興趣的讀者能夠翻翻它的代碼。若是你有更深刻的剖析,請留下相關的地址,以便我加到參考文檔中。
曾經, 陸陸續續也有一些Java coroutine的實現( coroutine-libraries ), 可是目前來講最好的應該仍是Quasar。
Oracle會實現一個官方的纖程庫嗎?目前來講沒有看到這方面的計劃,並且從Java的開發進度上來看,這個特性多是遙遙無期的,因此目前還只能藉助社區的力量,從第三方庫如Quasar中尋找解決方案。
更多的Quasar知識,好比Channel、Actor、Reactive Stream 的使用能夠參考官方的文檔,官方也提供了多個 例子 。
Comsat又是什麼?
Comsat仍是Parallel Universe提供的集成Quasar的一套開源庫,能夠提供web或者企業級的技術,如HTTP服務和數據庫訪問。
Comsat並非一套web框架。它並不提供新的API,只是爲現有的技術如Servlet、JAX-RS、JDBC等提供Quasar fiber的集成。
它包含很是多的庫,好比Spring、ApacheHttpClient、OkHttp、Undertow、Netty、Kafka等。
劉小溪在CSDN上寫了一篇關於Quasar的文章: 次時代Java編程(一):Java裏的協程 ,寫的挺好,建議讀者讀一讀。
它參考 Skynet 的測試寫了代碼進行對比,這個測試是併發執行整數的累加:
測試結果是Golang花了261毫秒,Quasar花了612毫秒。其實結果還不錯,可是文中指出這個測試沒有發揮Quasar的性能。由於quasar的性能主要在於阻塞代碼的調度上。
雖然文中加入了排序的功能,顯示Java要比Golang要好,可是我以爲這又陷入了另一種錯誤的比較, Java的排序算法使用TimSort,排序效果至關好,Go的排序效果顯然比不上Java的實現,因此最後的測試主要測試排序算法上。 真正要體現Quasar的性能仍是測試在有阻塞的狀況下fiber的調度性能。
HttpClient
話題扯的愈來愈遠了,拉回來。我最初的目的是要解決的是在第三方服務響應慢的狀況下提升系統A 的吞吐率。最初A是使用200個線程處理業務邏輯,調用第三方服務。由於線程老是被第三方服務阻塞,因此係統A的吞吐率老是很低。
雖然使用Go能夠解決這個問題,可是對於系統A的改造比較大,還增長了系統的複雜性。
這正是Quasar fiber適合的場景,若是一個Fiber被阻塞,它能夠暫時放棄線程,以便線程能夠用來執行其它的Fiber。雖然整個集成系統的吞吐率依然很低,這是沒法避免的,可是系統的吞吐率確很高。
Comsat提供了Apache Http Client的實現: FiberHttpClientBuilder :
finalCloseableHttpClient client = FiberHttpClientBuilder. create(2).// use 2 io threads setMaxConnPerRoute(concurrencyLevel). setMaxConnTotal(concurrencyLevel).build();
而後在Fiber中久能夠調用:
String response = client.execute(newHttpGet("http://localhost:8080"), BASIC_RESPONSE_HANDLER);
你也可使用異步的HttpClient:
finalCloseableHttpAsyncClient client = FiberCloseableHttpAsyncClient.wrap(HttpAsyncClients. custom(). setMaxConnPerRoute(concurrencyLevel). setMaxConnTotal(concurrencyLevel). build()); client.start();
Comsat還提供了Jersey Http Client: AsyncClientBuilder.newClient() 。
甚至提供了 Retrofit 、 OkHttp 的實現。
通過測試,雖然隨着系統B、C、D的響應時間的拉長,吞吐率有所下降,可是在latency爲100毫秒的時候吞吐率依然能達到9900 requests/秒,能夠知足咱們的需求,而咱們的代碼改動的比較小。