最近遇到的一個問題大概是微服務架構中常常會遇到的一個問題:html
服務 A 是咱們開發的系統,它的業務須要調用 B、C、D 等多個服務,這些服務是經過http的訪問提供的。 問題是 B、C、D 這些服務都是第三方提供的,不能保證它們的響應時間,快的話十幾毫秒,慢的話甚至1秒多,因此這些服務的Latency比較長。幸運地是這些服務都是集羣部署的,容錯率和併發支持都比較高,因此不擔憂它們的併發性能,惟一不爽的就是就是它們的Latency過高了。java
系統A會從Client接收Request, 每一個Request的處理都須要屢次調用B、C、D的服務,因此完成一個Request可能須要1到2秒的時間。爲了讓A能更好地支持併發數,系統中使用線程池處理這些Request。固然這是一個很是簡化的模型,實際的業務處理比較複雜。react
能夠預見,由於系統B、C、D的延遲,致使整個業務處理都很慢,即便使用線程池,可是每一個線程仍是會阻塞在B、C、D的調用上,致使I/O阻塞了這些線程, CPU利用率相對來講不是那麼高。git
固然在測試的時候使用的是B、C、D的模擬器,沒有預想到它們的響應是那麼慢,所以測試數據的結果還不錯,吞吐率還能夠,可是在實際環境中問題就暴露出來了。github
最開始線程池設置的是200,而後用HttpUrlConnection做爲http client發送請求到B、C、D。固然HttpUrlConnection也有一些坑,好比Persistent Connections、Caveats of HttpURLConnection,跳出坑後性能依然不行。web
經過測試,若是B、C、D等服務延遲接近0毫秒,則HttpUrlConnection的吞吐率(線程池的大小爲200)能到40000 requests/秒,可是隨着第三方服務的響應時間變慢,它的吞吐率急劇降低,B、C、D的服務的延遲爲100毫秒的時候,則HttpUrlConnection的吞吐率降到1800 requests/秒,而B、C、D的服務的延遲爲100毫秒的時候HttpUrlConnection的吞吐率降到550 requests/秒。算法
增長http.maxConnections
系統屬性並不能顯著增長吞吐率。數據庫
若是增長調用HttpUrlConnection的線程池的大小,好比增長到2000,性能會好一些,可是B、C、D的服務的延遲爲500毫秒的時候,吞吐率爲3800 requests/秒,延遲爲1秒的時候,吞吐率爲1900 requests/秒。apache
雖然線程池的增大能帶來性能的提高,可是線程池也不能無限制的增大,由於每一個線程都會佔用必定的資源,並且隨着線程的增多,線程之間的切換也更加的頻繁,對CPU等資源也是一種浪費。
切換成netty(channel pool),與B、C、D通信的性能還不錯, latency爲500ms的時候吞吐率能達到10000 requests/秒,通信不成問題,問題是須要將業務代碼改爲異步的方式,異步地接收到這些response後在一個線程池中處理這些消息。
下面列出了一些經常使用的http client:
這個列表摘自 High-Concurrency HTTP Clients on the JVM,不止於此,這篇文章重點介紹基於java纖程庫quasar的實現的http client庫,並比較了性能。咱們待會再說。
回到我前面所說的系統,如何能更好的提供性能?有一種方案是藉助其它語言的優點,好比Go,讓Go來代理完成和B、C、D的請求,系統A經過一個TCP鏈接與Go程序交流。第三方服務B、C、D的Response結果能夠異步地返回給系統A。
Go的優點在於能夠實現request-per-goroutine,整個系統中能夠有成千上萬個goroutine。 goroutine是輕量級的,並且在I/O阻塞的時候能夠不佔用線程,這讓Go能夠輕鬆地處理上萬個連接,即便I/O阻塞也沒問題。Go和Java之間的通信協議能夠經過Protobuffer來實現,並且它們之間只保留一個TCP鏈接便可。
固然這種架構的修改帶來系統穩定性的下降,服務A和服務B、C、D之間的通信增長了複雜性。同時,由於是異步方式,服務A的業務也要實現異步方式,不然200個線程依然等待Response的話,仍是一個阻塞的架構。
經過測試,這種架構能夠帶來穩定的吞吐率。 無論服務B、C、D的延遲有多久,A的吞吐率能維持15000 requests/秒。固然Go到B、C、D的併發鏈接數也有限制,我把最大值調高到20000。
這種曲折的方案的最大的兩個弊病就是架構的複雜性以及對原有系統須要進行大的重構。 高複雜性帶來的是系統的穩定性的下降,包括部署、維護、網絡情況、系統資源等。同時系統要改爲異步模型,由於系統業務線程發送Request後不能等待Go返回Response,它須要從Client接收更多的Request,而收到Response以後它才繼續執行剩下的業務,只有這樣纔不會阻塞,進而提到系統的吞吐率。
將系統A改爲異步,而後使用HttpUrlConnection線程池行不行?
HttpUrlConnection線程池仍是致使和B、C、D通信的吞吐率降低,可是Go這種方案和B、C、D通信的吞吐率能夠維持一個較高的水平。
考慮到Go的優點,那麼能不能在Java中使用相似Go的這種goroutine模型呢?那就是本文要介紹的Java纖程庫: [Quasar](http://docs.paralleluniverse.co/quasar/)。
實際測試結果代表Go和Netty都是兩種比較好的解決方案,並且Netty的性能驚人的好,很差的地方正如前面所講,咱們須要將代碼改爲異步的處理。線程池中的業務單元用Netty發送完Request以後,不要等待Response, Response的處理交給另外的線程來處理,同時注意不要在Netty的Handler裏面處理業務邏輯。要解決的問題就變成如何更高效的處理Response了,而不是第三方系統阻塞的問題。
如下介紹Java的另外一個解決方案,也就是Java中的coroutine庫,由於最近剛剛看這個庫,感受挺不錯的,並且用它替換Thread改動較少。
Java官方並無纖程庫。可是偉大的社區提供了一個優秀的庫,它就是Quasar。
創始人是Ron Pressler和Dafna Pressler,由Y Combinator孵化。
Quasar is a library that provides high-performance lightweight threads, Go-like channels, Erlang-like actors, and other asynchronous programming tools for Java and Kotlin.
Quasar提供了高性能輕量級的線程,提供了相似Go的channel,Erlang風格的actor,以及其它的異步編程的工具,能夠用在Java和Kotlin編程語言中。Scala目前的支持還不完善,我想若是這個公司能快速的發展壯大,或者被一些大公司收購的話,對Scala的支持才能提上日程。
你須要把下面的包加入到你的依賴中:
Quasar fiber依賴java instrumentation修改你的代碼,能夠在運行時經過java Agent實現,也能夠在編譯時使用ant task實現。
經過java agent很簡單,在程序啓動的時候將下面的指令加入到命令行:
1
|
-javaagent:path-to-quasar-jar.jar
|
對於maven來講,你可使用插件maven-dependency-plugin,它會爲你的每一個依賴設置一個屬性,以便在其它地方引用,咱們主要想使用 ${co.paralleluniverse:quasar-core:jar}
:
1
2
3
4
5
6
7
8
9
10
11
12
|
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.5.1</version>
<executions>
<execution>
<id>getClasspathFilenames</id>
<goals>
<goal>properties</goal>
</goals>
</execution>
</executions>
</plugin>
|
而後你能夠配置exec-maven-plugin
或者maven-surefire-plugin
加上agent參數,在執行maven任務的時候久可使用Quasar了。
官方提供了一個Quasar Maven archetype,你能夠經過下面的命令生成一個quasar應用原型:
1
2
3
4
5
6
7
8
|
git clone https://github.com/puniverse/quasar-mvn-archetype
cd quasar-mvn-archetype
mvn install
cd ..
mvn archetype:generate -DarchetypeGroupId=co.paralleluniverse -DarchetypeArtifactId=quasar-mvn-archetype -DarchetypeVersion=
0.7.4 -DgroupId=testgrp -DartifactId=testprj
cd testprj
mvn test
mvn clean compile dependency:properties
exec:exec
|
若是你使用gradle,能夠看一下gradle項目模版:Quasar Gradle template project。
最容易使用Quasar的方案就是使用Java Agent,它能夠在運行時instrument程序。若是你想編譯的時候就使用AOT instrumentation(Ahead-of-Time),可使用Ant任務co.paralleluniverse.fibers.instrument.InstrumentationTask
,它包含在quasar-core.jar中。
Quasar最主要的貢獻就是提供了輕量級線程的實現,叫作fiber(纖程)。Fiber的功能和使用相似Thread, API接口也相似,因此使用起來沒有違和感,可是它們不是被操做系統管理的,它們是由一個或者多個ForkJoinPool調度。一個idle fiber只佔用400字節內存,切換的時候佔用更少的CPU,你的應用中能夠有上百萬的fiber,顯然Thread作不到這一點。這一點和Go的goroutine相似。
Fiber並不意味着它能夠在全部的場景中均可以替換Thread。當fiber的代碼常常會被等待其它fiber阻塞的時候,就應該使用fiber。
對於那些須要CPU長時間計算的代碼,不多遇到阻塞的時候,就應該首選thread
以上兩條是選擇fiber仍是thread的判斷條件,主要仍是看任務是I/O blocking相關仍是CPU相關。幸運地是,fiber API使用和thread使用相似,因此代碼略微修改久就能夠兼容。
Fiber特別適合替換哪些異步回調的代碼。使用FiberAsync
異步回調很簡單,並且性能很好,擴展性也更高。
相似Thread, fiber也是用Fiber類表示:
1
2
3
4
5
6
|
new Fiber<V>() {
@Override
protected V run() throws SuspendExecution, InterruptedException {
// your code
}
}.start();
|
與Thread相似,但也有些不一樣。Fiber能夠有一個返回值,類型爲泛型V,也能夠爲空Void。run
也能夠拋出異常InterruptedException
。
你能夠傳遞SuspendableRunnable
或 SuspendableCallable
給Fiber的構造函數:
1
2
3
4
5
|
new Fiber<Void>(new SuspendableRunnable() {
public void run() throws SuspendExecution, InterruptedException {
// your code
}
}).start();
|
甚至你能夠調用Fiber的join方法等待它完成,調用get
方法獲得它的結果。
Fiber繼承Strand類。Strand類表明一個Fiber或者Thread,提供了一些底層的方法。
逃逸的Fiber(Runaway Fiber)是指那些陷入循環而沒有block、或者block fiber自己運行的線程的Fiber。偶爾有逃逸的fiber沒有問題,可是太頻繁會致使性能的降低,由於須要調度器的線程可能都忙於逃逸fiber了。Quasar會監控這些逃逸fiber,你能夠經過JMX監控。若是你不想監控,能夠設置系統屬性co.paralleluniverse.fibers.detectRunawayFibers
爲false
。
fiber中的ThreadLocal
是fiber local的。InheritableThreadLocal
繼承父fiber的值。
Fiber、SuspendableRunnable 、SuspendableCallable 的run
方法會拋出SuspendExecution異常。但這並非真正意義的異常,而是fiber內部工做的機制,經過這個異常暫停因block而須要暫停的fiber。
任何在Fiber中運行的方法,須要聲明這個異常(或者標記@Suspendable),都被稱爲suspendable method。
反射調用一般都被認爲是suspendable, Java8 lambda 也被認爲是suspendable。不該該將類構造函數或類初始化器標記爲suspendable。
synchronized
語句塊或者方法會阻塞操做系統線程,因此它們不該該標記爲suspendable。Blocking線程調用默認也不被quasar容許。可是這兩種狀況均可以被quasar處理,你須要在Quasar javaagent中分別加上m
和b
參數,或者ant任務中加上allowMonitors
和allowBlocking
屬性。
Quasar最初fork自Continuations Library。
若是你瞭解其它語言的coroutine, 好比Lua,你久比較容易理解quasar的fiber了。 Fiber實質上是 continuation, continuation能夠捕獲一個計算的狀態,能夠暫停當前的計算,等隔一段時間能夠繼續執行。Quasar經過instrument修改suspendable方法。Quasar的調度器使用ForkJoinPool
調度這些fiber。
Fiber調度器FiberScheduler是一個高效的、work-stealing、多線程的調度器。
默認的調度器是FiberForkJoinScheduler
,可是你可使用本身的線程池去調度,請參考FiberExecutorScheduler
。
當一個類被加載時,Quasar的instrumentation模塊 (使用 Java agent時) 搜索suspendable 方法。每個suspendable 方法 f
經過下面的方式 instrument:
它搜索對其它suspendable方法的調用。對suspendable方法g
的調用,一些代碼會在這個調用g
的先後被插入,它們會保存和恢復fiber棧本地變量的狀態,記錄這個暫停點。在這個「suspendable function chain」的最後,咱們會發現對Fiber.park
的調用。park
暫停這個fiber,扔出 SuspendExecution異常。
當g
block的時候,SuspendExecution異常會被Fiber捕獲。 當Fiber被喚醒(使用unpark), 方法f
會被調用, 執行記錄顯示它被block在g的調用上,因此程序會當即跳到f
調用g
的那一行,而後調用它。最終咱們會到達暫停點,而後繼續執行。當g
返回時, f
中插入的代碼會恢復f的本地變量。
過程聽起來很複雜,可是它只會帶來3% ~ 5%的性能的損失。
下面看一個簡單的例子, 方法m2聲明拋出SuspendExecution異常,方法m1調用m2和m3,因此也聲明拋出這個異常,最後這個異常會被Fiber所捕獲:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
public class Helloworld {
static void m1() throws SuspendExecution, InterruptedException {
String m =
"m1";
System.out.println(
"m1 begin");
m = m2();
m = m3();
System.out.println(
"m1 end");
System.out.println(m);
}
static String m2() throws SuspendExecution, InterruptedException {
return "m2";
}
static String m3() throws SuspendExecution, InterruptedException {
return "m3";
}
static public void main(String[] args) throws ExecutionException, InterruptedException {
new Fiber<Void>("Caller", new SuspendableRunnable() {
@Override
public void run() throws SuspendExecution, InterruptedException {
m1();
}
}).start();
}
}
|
反編譯這段代碼 (通常的反編譯軟件如jd-gui不能把這段代碼反編譯java文件,Procyon雖然能反編譯,可是感受反編譯有錯。因此咱們仍是看字節碼吧):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
@Instrumented(suspendableCallSites={16, 17}, methodStart=13, methodEnd=21, methodOptimized=false)
static void m1()
throws SuspendExecution, InterruptedException
{
// Byte code:
// 0: aconst_null
// 1: astore_3
// 2: invokestatic 88 co/paralleluniverse/fibers/Stack:getStack ()Lco/paralleluniverse/fibers/Stack;
// 5: dup
// 6: astore_1
// 7: ifnull +42 -> 49
// 10: aload_1
// 11: iconst_1
// 12: istore_2
// 13: invokevirtual 92 co/paralleluniverse/fibers/Stack:nextMethodEntry ()I
// 16: tableswitch default:+24->40, 1:+64->80, 2:+95->111
// 40: aload_1
// 41: invokevirtual 96 co/paralleluniverse/fibers/Stack:isFirstInStackOrPushed ()Z
// 44: ifne +5 -> 49
// 47: aconst_null
// 48: astore_1
// 49: iconst_0
// 50: istore_2
// 51: ldc 2
// 53: astore_0
// 54: getstatic 3 java/lang/System:out Ljava/io/PrintStream;
// 57: ldc 4
// 59: invokevirtual 5 java/io/PrintStream:println (Ljava/lang/String;)V
// 62: aload_1
// 63: ifnull +26 -> 89
// 66: aload_1
// 67: iconst_1
// 68: iconst_1
// 69: invokevirtual 100 co/paralleluniverse/fibers/Stack:pushMethod (II)V
// 72: aload_0
// 73: aload_1
// 74: iconst_0
// 75: invokestatic 104 co/paralleluniverse/fibers/Stack:push (Ljava/lang/Object;Lco/paralleluniverse/fibers/Stack;I)V
// 78: iconst_0
// 79: istore_2
// 80: aload_1
// 81: iconst_0
// 82: invokevirtual 108 co/paralleluniverse/fibers/Stack:getObject (I)Ljava/lang/Object;
// 85: checkcast 110 java/lang/String
// 88: astore_0
// 89: invokestatic 6 com/colobu/fiber/Helloworld:m2 ()Ljava/lang/String;
// 92: astore_0
// 93: aload_1
// 94: ifnull +26 -> 120
// 97: aload_1
// 98: iconst_2
// 99: iconst_1
// 100: invokevirtual 100 co/paralleluniverse/fibers/Stack:pushMethod (II)V
// 103: aload_0
// 104: aload_1
// 105: iconst_0
// 106: invokestatic 104 co/paralleluniverse/fibers/Stack:push (Ljava/lang/Object;Lco/paralleluniverse/fibers/Stack;I)V
// 109: iconst_0
// 110: istore_2
// 111: aload_1
// 112: iconst_0
// 113: invokevirtual 108 co/paralleluniverse/fibers/Stack:getObject (I)Ljava/lang/Object;
// 116: checkcast 110 java/lang/String
// 119: astore_0
// 120: invokestatic 7 com/colobu/fiber/Helloworld:m3 ()Ljava/lang/String;
// 123: astore_0
// 124: getstatic 3 java/lang/System:out Ljava/io/PrintStream;
// 127: ldc 8
// 129: invokevirtual 5 java/io/PrintStream:println (Ljava/lang/String;)V
// 132: getstatic 3 java/lang/System:out Ljava/io/PrintStream;
// 135: aload_0
// 136: invokevirtual 5 java/io/PrintStream:println (Ljava/lang/String;)V
// 139: aload_1
// 140: ifnull +7 -> 147
// 143: aload_1
// 144: invokevirtual 113 co/paralleluniverse/fibers/Stack:popMethod ()V
// 147: return
// 148: aload_1
// 149: ifnull +7 -> 156
// 152: aload_1
// 153: invokevirtual 113 co/paralleluniverse/fibers/Stack:popMethod ()V
// 156: athrow
// Line number table:
// Java source line #13 -> byte code offset #51
// Java source line #15 -> byte code offset #54
// Java source line #16 -> byte code offset #62
// Java source line #17 -> byte code offset #93
// Java source line #18 -> byte code offset #124
// Java source line #19 -> byte code offset #132
// Java source line #21 -> byte code offset #139
// Local variable table:
// start length slot name signature
// 53 83 0 m String
// 6 147 1 localStack co.paralleluniverse.fibers.Stack
// 12 99 2 i int
// 1 1 3 localObject Object
// 156 1 4 localSuspendExecution SuspendExecution
// Exception table:
// from to target type
// 49 148 148 finally
// 49 148 156 co/paralleluniverse/fibers/SuspendExecution
// 49 148 156 co/paralleluniverse/fibers/RuntimeSuspendExecution
}
|
這段反編譯的代碼顯示了方法m被instrument後的樣子,雖然咱們不能很清楚的看到代碼執行的樣子,可是也能夠大概地看到它實際在方法的最開始加入了此方法的棧信息的檢查(#0 ~ #49,若是是第一次運行這個方法,則直接運行,
而後在一些暫停點上加上一些棧壓入的處理,而且能夠在下次執行的時候直接跳到上次的暫停點上。
官方的工程師關於Quasar的instrument操做以下:
- Fully analyze the bytecode to find all the calls into suspendable methods. A method that (potentially) calls into other suspendable methods is itself considered suspendable, transitively.
- Inject minimal bytecode in suspendable methods (and only them) that will manage an user-mode stack, in the following places:
- At the beginning we’ll check if we’re resuming the fiber and only in this case we’ll jump into the relevant bytecode index.
- Before a call into another suspendable method we’ll push a snapshot of the current activation frame, including the resume bytecode index; we can do it because we know the structure statically from the analysis phase.
- After a call into another suspendable method we’ll pop the top activation frame and, if resumed, we’ll restore it in the current fiber.
我並無更深刻的去了解Quasar的實現細節以及調度算法,有興趣的讀者能夠翻翻它的代碼。若是你有更深刻的剖析,請留下相關的地址,以便我加到參考文檔中。
曾經, 陸陸續續也有一些Java coroutine的實現(coroutine-libraries), 可是目前來講最好的應該仍是Quasar。
Oracle會實現一個官方的纖程庫嗎?目前來講沒有看到這方面的計劃,並且從Java的開發進度上來看,這個特性多是遙遙無期的,因此目前還只能藉助社區的力量,從第三方庫如Quasar中尋找解決方案。
更多的Quasar知識,好比Channel、Actor、Reactive Stream 的使用能夠參考官方的文檔,官方也提供了多個例子。
Comsat又是什麼?
Comsat仍是Parallel Universe提供的集成Quasar的一套開源庫,能夠提供web或者企業級的技術,如HTTP服務和數據庫訪問。
Comsat並非一套web框架。它並不提供新的API,只是爲現有的技術如Servlet、JAX-RS、JDBC等提供Quasar fiber的集成。
它包含很是多的庫,好比Spring、ApacheHttpClient、OkHttp、Undertow、Netty、Kafka等。
劉小溪在CSDN上寫了一篇關於Quasar的文章:次時代Java編程(一):Java裏的協程,寫的挺好,建議讀者讀一讀。
它參考Skynet的測試寫了代碼進行對比,這個測試是併發執行整數的累加:
測試結果是Golang花了261毫秒,Quasar花了612毫秒。其實結果還不錯,可是文中指出這個測試沒有發揮Quasar的性能。由於quasar的性能主要在於阻塞代碼的調度上。
雖然文中加入了排序的功能,顯示Java要比Golang要好,可是我以爲這又陷入了另一種錯誤的比較, Java的排序算法使用TimSort,排序效果至關好,Go的排序效果顯然比不上Java的實現,因此最後的測試主要測試排序算法上。 真正要體現Quasar的性能仍是測試在有阻塞的狀況下fiber的調度性能。
話題扯的愈來愈遠了,拉回來。我最初的目的是要解決的是在第三方服務響應慢的狀況下提升系統 A 的吞吐率。最初A是使用200個線程處理業務邏輯,調用第三方服務。由於線程老是被第三方服務阻塞,因此係統A的吞吐率老是很低。
雖然使用Go能夠解決這個問題,可是對於系統A的改造比較大,還增長了系統的複雜性。Netty性能好,改動量還能夠接受,可是不妨看一下這個場景,系統的問題是由http阻塞引發。
這正是Quasar fiber適合的場景,若是一個Fiber被阻塞,它能夠暫時放棄線程,以便線程能夠用來執行其它的Fiber。雖然整個集成系統的吞吐率依然很低,這是沒法避免的,可是系統的吞吐率確很高。
Comsat提供了Apache Http Client的實現: FiberHttpClientBuilder
:
1
2
3
4
|
final CloseableHttpClient client = FiberHttpClientBuilder.
create(
2). // use 2 io threads
setMaxConnPerRoute(concurrencyLevel).
setMaxConnTotal(concurrencyLevel).build();
|
而後在Fiber中久能夠調用:
1
|
String response = client.execute(
new HttpGet("http://localhost:8080"), BASIC_RESPONSE_HANDLER);
|
你也可使用異步的HttpClient:
1
2
3
4
5
6
|
final CloseableHttpAsyncClient client = FiberCloseableHttpAsyncClient.wrap(HttpAsyncClients.
custom().
setMaxConnPerRoute(concurrencyLevel).
setMaxConnTotal(concurrencyLevel).
build());
client.start();
|
Comsat還提供了Jersey Http Client: AsyncClientBuilder.newClient()
。
甚至提供了Retrofit
、OkHttp
的實現。
通過測試,雖然隨着系統B、C、D的響應時間的拉長,吞吐率有所下降,可是在latency爲100毫秒的時候吞吐率依然能達到9900 requests/秒,能夠知足咱們的需求,而咱們的代碼改動的比較小。
綜上所述,若是想完全改造系統A,則可使用Go庫重寫,或者使用Netty + Rx的方式去處理,都能達到比較好的效果。若是想改動比較小,能夠考慮使用quasar替換線程對代碼進行維護。
我但願本文不要給讀者形成誤解,覺得Java NIO/Selector這種方式不能解決本文的問題,也就是第三方阻塞的問題。 事實上Java NIO也正是適合解決這樣的問題, 好比Netty性能就不錯,可是你須要當心的是, 不要讓你的這個client對外又變成阻塞的方式,而是程序應該異步的去發送request和處理response。固然本文重點不是介紹這種實現,而是介紹Java的線程庫,它能夠改造傳統的代碼,即便有阻塞,也只是阻塞Fiber,而不是阻塞線程,這是另外一個解決問題的思路。
另外一篇關於Quasar的文檔: 繼續瞭解Java的纖程庫 - Quasar
前一篇文章Java中的纖程庫 - Quasar中我作了簡單的介紹,如今進一步介紹這個纖程庫。
Quasar尚未獲得普遍的應用,搜尋整個github也就pinterest/quasar-thrift這麼一個像樣的使用Quasar的庫,。而且官方的文檔也很簡陋,不少地方並無詳細的介紹,和Maven的集成也不是很好。這些都限制了Quasar的進一步發展。
可是,做爲目前最好用的Java coroutine的實現,它在某些狀況下的性能仍是表現至關出色的,但願這個項目可以獲得更大的支持和快速發展。
由於Quasar文檔的缺少,因此使用起來須要不斷的摸索和在論壇上搜索答案,本文將一些記錄了我在Quasar使用過程當中的一些探索。
雖然Java的線程的API封裝的很好,使用起來很是的方便,可是使用起來也得當心。首先線程須要耗費資源,因此單個的機器上建立上萬個線程很困難,其次線程之間的切換也須要耗費CPU,在線程很是多的狀況下致使不少CPU資源耗費在線程切換上,經過提升線程數來提升系統的性能有時候拔苗助長。你能夠看到如今一些優秀的框架如Netty都不會建立不少的線程,默認2倍的CPU core的線程數就已經應付的很好了,好比node.js可使用單一的進程/線程應付高併發。
纖程使用的資源更少,它主要保存棧信息,因此一個系統中能夠建立上萬的纖程Fiber,而實際的纖程調度器只須要幾個Java線程便可。
咱們看一個性能的比較,直觀的感覺一下Quasar帶來的吞吐率的提升。
下面這個例子中方法m1
調用m2
,m2
調用m3
,可是m2
會暫停1秒鐘,用來模擬實際產品中的阻塞,m3
執行了一個簡單的計算。
經過線程和纖程兩種方式咱們看看系統的吞吐率(throughput)和延遲(latency)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
|
public class Helloworld {
@Suspendable
static void m1() throws InterruptedException, SuspendExecution {
String m =
"m1";
//System.out.println("m1 begin");
m = m2();
//System.out.println("m1 end");
//System.out.println(m);
}
static String m2() throws SuspendExecution, InterruptedException {
String m = m3();
Strand.sleep(
1000);
return m;
}
//or define in META-INF/suspendables
@Suspendable
static String m3() {
List l = Stream.of(
1,2,3).filter(i -> i%2 == 0).collect(Collectors.toList());
return l.toString();
}
static public void main(String[] args) throws ExecutionException, InterruptedException {
int count = 10000;
testThreadpool(count);
testFiber(count);
}
static void testThreadpool(int count) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(count);
ExecutorService es = Executors.newFixedThreadPool(
200);
LongAdder latency =
new LongAdder();
long t = System.currentTimeMillis();
for (int i =0; i< count; i++) {
es.submit(() -> {
long start = System.currentTimeMillis();
try {
m1();
}
catch (InterruptedException e) {
e.printStackTrace();
}
catch (SuspendExecution suspendExecution) {
suspendExecution.printStackTrace();
}
start = System.currentTimeMillis() - start;
latency.add(start);
latch.countDown();
});
}
latch.await();
t = System.currentTimeMillis() - t;
long l = latency.longValue() / count;
System.out.println(
"thread pool took: " + t + ", latency: " + l + " ms");
es.shutdownNow();
}
static void testFiber(int count) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(count);
LongAdder latency =
new LongAdder();
long t = System.currentTimeMillis();
for (int i =0; i< count; i++) {
new Fiber<Void>("Caller", new SuspendableRunnable() {
@Override
public void run() throws SuspendExecution, InterruptedException {
long start = System.currentTimeMillis();
m1();
start = System.currentTimeMillis() - start;
latency.add(start);
latch.countDown();
}
}).start();
}
latch.await();
t = System.currentTimeMillis() - t;
long l = latency.longValue() / count;
System.out.println(
"fiber took: " + t + ", latency: " + l + " ms");
}
}
|
運行這個程序(須要某種instrument, agent或者AOT或者其它,在下面會介紹),輸出結果爲:
1
2
|
thread pool took: 50341, latency: 1005 ms
fiber took:
1158, latency: 1000 ms
|
若是使用線程,執行完1萬個操做須要50秒,平均延遲爲1秒左右(咱們故意讓延遲至少1秒),線程池數量爲200。(其實總時間50秒能夠計算出來)
可是若是使用纖程,執行完1萬個操做僅須要1.158秒,平均延遲時間爲1秒,線程數量爲CPU core數(缺省使用ForkJoinPool)。
能夠看到,經過使用纖程,盡受限於系統的業務邏輯,咱們沒有辦法提高業務的處理時間, 可是咱們確能夠極大的提升系統的吞吐率,如上面的簡單的例子將10000個操做的處理時間從50秒提升到1秒,非凡的成就。
若是咱們將方法m2
中的Strand.sleep(1000);
註釋掉,這樣這個例子中就沒有什麼阻塞了,咱們看看在這種純計算的狀況下二者的表現:
1
2
|
thread pool took: 114, latency: 0 ms
fiber took:
180, latency: 0 ms
|
能夠看到,纖程非但沒有提高性能,反而會帶來性能的降低。對於這種純計算沒有阻塞的case,Quasar並不適合。
正如官方所說:
Fibers are not meant to replace threads in all circumstances. A fiber should be used when its body (the code it executes) blocks very often waiting on other fibers (e.g. waiting for messages sent by other fibers on a channel, or waiting for the value of a dataflow-variable). For long-running computations that rarely block, traditional threads are preferable. Fortunately, as we shall see, fibers and threads interoperate very well.
Fiber中的run方法,如SuspendableRunnable
和 SuspendableCallable
聲明瞭SuspendExecution
異常。這並非一個真的異常,而是fiber內部工做的機制。任何運行在fiber中的可能阻塞的方法,若是聲明瞭這個異常,就被叫作 suspendable 方法。 若是你的方法調用了一個suspendable
方法,那麼你的方法也是suspendable
方法,因此也須要聲明拋出SuspendExecution
異常。
有時候不能在某個方法上聲明拋出SuspendExecution
異常,好比你實現某個接口,你不能更改接口的方法聲明,你不得不使用其它的方法來指定suspendable
方法。方法之一就是使用@Suspendable
註解,在你須要指定的suspendable
方法上加上這個註解就能夠告訴Quasar這個方法是suspendable
方法。
另外一個狀況就是對於第三的庫,你不可能更改它們的代碼,若是想指定這些庫的某些方法是suspendable
方法,好比java.net.URL.openStream()Ljava/io/InputStream;
, 就須要另一種解決辦法,也就是在META-INF/suspendables
和META-INF/suspendable-supers
定義。
文件中每一個方法佔一行,具體(concrete)的suspendable
方法應該寫在META-INF/suspendables
中,non-suspendable
方法,可是有suspendable override
的類、接口寫在META-INF/suspendable-supers
中(能夠是具體類單不能是final, 接口和抽象類也能夠)。
每一行應該是方法的簽名的全稱「full.class.name.methodName」 以及*
通配符。
使用`SuspendablesScanner`能夠自動增長你的方法到這些文件中,待會介紹它。
java.lang
包下的方法不能標記爲suspendable
,其它的JDK方法則能夠顯示地在文件META-INF/suspendables
和META-INF/suspendable-supers
中標記爲suspendable
,而且設置環境變量co.paralleluniverse.fibers.allowJdkInstrumentation
爲true,可是不多這樣使用。
還有一些特殊的狀況也會被認爲是suspendable
的。
反射調用老是被看做是suspendable
的。
Java 8 lambda也老是被看做suspendable
的。
構造函數/類初始化器不能被標記爲suspendable
。
缺省狀況下synchronized
和blocking thread 調用不能運行在Fiber中。這是由於它們會阻塞Fiber使用的線程,致使系統處理變慢,可是若是你非要在Fiber中使用它們,能夠能夠將allowMonitors
和allowBlocking
傳給instrumentation Ant task,或者將b
、m
傳給Quasar Java agent。
Quasar依賴字節碼的instrumentation, instrumentation用來修改字節碼。 Quasar能夠在運行時或者編譯時修改字節碼,下面介紹這幾種實現。
一、Quasar Java Agent
Quasar java agent能夠在運行時動態修改字節碼,將下面一行加搭配java命令行中便可,注意把path-to-quasar-jar.jar替換成你實際的quasar java的地址。
1
|
-javaagent:path-to-quasar-jar.jar
|
若是你使用maven的exec task,你可使用maven-dependency-plugin
爲依賴設置properties,而後在插件exec-maven-plugin中引用quasar庫便可。
詳細配置能夠參考Specifying the Java Agent with Maven:。
Quasar對gradle的支持比較好,你能夠方便的使用gradle配置。
這是首選的一種方式,由於在某些狀況下,好比你使用第三方的庫,如comsat,它們只能使用這種方式配置。
二、AOT(Ahead-of-Time)
另一種是在編譯時的時候完成instrumentation。
它是經過一個Ant Task來完成的,因此對於Maven管理的項目來講,配置起來有些麻煩。
這個Ant Task是co.paralleluniverse.fibers.instrument.InstrumentationTask
,包含在quasar-core.jar
中。它接受一組(fileset)classes進行instrument,但並非傳給它的全部classes都須要classes進行instrument,只有suspendable
方法纔有可能被instrument。它還會進行優化,有些suspendable
方法可能不須要instrument。
在Maven中配置起來有些複雜,以下面所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<id>instrument-classes</id>
<phase>compile</phase>
<configuration>
<tasks>
<property name="ant_classpath" refid="maven.dependency.classpath"/>
<taskdef name="instrumentationTask"
classname="co.paralleluniverse.fibers.instrument.InstrumentationTask"
classpath="${co.paralleluniverse:quasar-core:jar:jdk8}"/>
<instrumentationTask allowMonitors="true" allowBlocking="true" check="true" verbose="true" debug="true">
<fileset dir="${project.build.directory}/classes/" includes="**/*"/>
</instrumentationTask>
</tasks>
</configuration>
<goals>
<goal>run</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.5.1</version>
<executions>
<execution>
<id>getClasspathFilenames</id>
<goals>
<goal>properties</goal>
</goals>
</execution>
</executions>
</plugin>
|
Quasar官方並無提供一個maven插件,好心的社區卻是提供了一個quasar-maven-plugin。因此你能夠不用上面的寫法,而是用下面簡單的寫法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
<plugin>
<groupId>com.vlkan</groupId>
<artifactId>quasar-maven-plugin</artifactId>
<version>0.7.3</version>
<configuration>
<check>true</check>
<debug>true</debug>
<verbose>true</verbose>
</configuration>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>instrument</goal>
</goals>
</execution>
</executions>
</plugin>
|
三、在Web容器中
若是你使用web容器使用基於Quasar的庫comsat等,好比Tomcat,則比較棘手。由於你不太像將Quasar java agent直接加到tomcat的啓動腳本中,這樣會instrument全部的應用,致使不少的警告。
Comsat提供了Tomcat和Jetty的解決方案。
Tomcat
對於tomcat,你能夠把comsat-tomcat-loader-0.7.0-jdk8.jar
或者comsat-tomcat-loader-0.7.0.jar
加入到tomcat的common/lib
或者lib
中,而後在你的web應用META-INF/context.xml
中加入:
1
|
<Loader loaderClass="co.paralleluniverse.comsat.tomcat.QuasarWebAppClassLoader" />
|
Jetty
若是使用Jetty,則把comsat-jetty-loader-0.7.0-jdk8.jar
或者comsat-jetty-loader-0.7.0.jar
加入到Jetty的lib中,而後在你的context.xml中加入<Set name="classLoader">
:
1
2
3
4
5
6
7
8
9
10
11
|
<Configure id="ctx" class="org.eclipse.jetty.webapp.WebAppContext">
<Set name="war">./build/wars/dep.war</Set>
<!--use custom classloader in order to instrument classes by quasar-->
<Set name="classLoader">
<New class="co.paralleluniverse.comsat.jetty.QuasarWebAppClassLoader">
<Arg>
<Ref id="ctx"/>
</Arg>
</New>
</Set>
</Configure>
|
總之,經過實現一個定製的ClassLoader實現instrumentation。
quasar提供了一個ant task,能夠實現自動偵測suspendable
方法,並能夠把它們寫入到`META-INF/suspendables和
META-INF/suspendable-supers`。
可是官方並無詳細的介紹,並且也沒有相應的maven插件可使用。
咱們能夠看看在gradle如何使用的,咱們能夠把偵測結果複製到maven中使用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
apply plugin: 'java'
apply plugin: 'maven'
group = 'com.colobu.fiber'
version = '1.0'
description = """"""
sourceCompatibility = 1.8
targetCompatibility = 1.8
repositories {
maven { url "http://repo.maven.apache.org/maven2" }
}
dependencies {
compile group: 'co.paralleluniverse', name: 'quasar-core', version:'0.7.5', classifier:'jdk8'
compile group: 'co.paralleluniverse', name: 'comsat-httpclient', version:'0.7.0'
testCompile group: 'junit', name: 'junit', version:'4.12'
}
classes {
doFirst {
ant.taskdef(name: 'scanSuspendables',
classname: 'co.paralleluniverse.fibers.instrument.SuspendablesScanner',
classpath: "build/classes/main:build/resources/main:${configurations.runtime.asPath}")
ant.scanSuspendables(auto: true,
suspendablesFile: "$sourceSets.main.output.resourcesDir/META-INF/suspendables",
supersFile: "$sourceSets.main.output.resourcesDir/META-INF/suspendable-supers",
append: true) {
fileset(dir: sourceSets.main.output.classesDir)
}
}
}
|
咱們能夠看一下官方的庫comsat的一些`META-INF/suspendables`例子:
一、comsat-okhttp
/META-INF/suspendables:
1
|
com.squareup.okhttp.apache.OkApacheClient.execute
|
二、comsat-httpclient
/META-INF/suspendables
1
2
|
org.apache.http.
impl.client.CloseableHttpClient.doExecute
org.apache.http.
impl.client.CloseableHttpClient.execute
|
當前quasar依賴字節碼的instrumentation,因此suspendable方法必須在運行以前進行標記。
Quasar開發組和OpenJDK協做,將在JDK9中移除這個限制,將會有效地自動地實現instrumentation。
若是你忘記將一個方法標記爲suspendable
(throws SuspendExecution、@Suspendable或者META-INF/suspendables/META-INF/suspendable-supers),你可能會遇到一些奇怪的錯誤。
環境變量co.paralleluniverse.fibers.verifyInstrumentation
設爲true能夠檢查未標記的方法。可是在生產環境中不要設置它。
UnableToInstrumentException
異常代表quasar不能instrument一些方法如synchronized
或者阻塞的線程調用。verbose(v), debug(d) 和 check(c)能夠打印出詳細信息。
更多的調試能夠參考:troubleshooting。
Fiber能夠序列化。
Fiber也能夠打印它的堆棧進行調試。
Fiber也有Actor和Channel的實現,而且能夠運行在集羣上。