❝最近Caffe在嘗試優化業務中發現,服務的心跳信息中有不少線程都是處於
❞waiting
,以下圖所示:前端
thread count="608"
daemon-count="420"
peak-count="611"
total-started-count="13722"
deadlocked="0" new="0" runnable="169" blocked="0"
waiting="314"
複製代碼
❝而後看了CPU的使用率,從左到右分別表示
❞CPU的任務等待數/CPU核數
,CPU的執行時間佔比總時間(CPU執行時間+CPU空閒時間+ CPU等待時間)
,當前JAVA進程執行時間佔比總時間
java
❝圖中能夠清晰地看到,並非計算型業務致使了線程等待,而是極大可能因爲服務到底層數據查詢的網絡IO等待使得排隊的線程增長,所以決定考慮優化這一部分。優化的目標,在保證服務和底層存儲的心跳信息在一個安全的範圍內,儘量的增長服務吞吐能力。node
❞
當時優化的第一時間就想到了大名鼎鼎的quasar
三方庫。quasar
能夠理解爲輕量級的線程實現,熟悉go語言必定知道goroutine
,咱們知道Java語言中不支持協程,業務中不少場景都須要用線程池進行優化,可是使用線程池的成本也很高,不管是內存佔用仍是線程之間的切換消耗,都限制了一個應用不能無限制的建立線程。android
好在社區開源了一款Java coroutine
框架quasar
,容我先吐槽一下,這個框架真的是直男程序員寫的(已經被拉去寫JDK的協程,十分期待JDK能早點支持協程),文檔十分匱乏,致使我本地開始搞得時候就報錯了一把,開局體驗不是很舒服。程序員
固然優勢也十分突出,應用中網絡IO耗時佔比比較突出的場景中,使用quasar
能夠極大的提升CPU
的吞吐率。簡單描述就是能夠在更短的時間內處理更多的請求。不會由於一個線程中的網絡IO
堵塞而讓後面的線程處於waiting
中,堵塞的時候CPU
是不幹活的,所以將整個系統的吞吐率拉胯。web
官網的文檔中提供了兩種使用方式,爲了節約篇幅先用第1種方式示範一下使用方式:編程
這裏我先用Gradle
項目做爲🌰來詳解一下怎麼使用。安全
Gradle
配置模塊服務器
configurations { quasar } // tasks.withType(JavaExec) { jvmArgs "-javaagent:${configurations.quasar.iterator().next()}" } // dependencies { compile "org.antlr:antlr4:4.7.2" compile "co.paralleluniverse:quasar-core:0.7.5" quasar "co.paralleluniverse:quasar-core:0.7.5:jdk8@jar" testCompile group: 'junit', name: 'junit', version: '4.12' } 複製代碼複製代碼
echo
服務器兩個Fiber
(至關因而Java的Thread
)相互通訊,increasing
發送一個int
數字給echo
,echo
收到以後返回給increasing
,increasing
接收到echo
返回的消息,先打印,在執行++
操做,而後打印出最後的結果。代碼示例以下:網絡
increasing
final IntChannel increasingToEcho = Channels.newIntChannel(0);
final IntChannel echoToIncreasing = Channels.newIntChannel(0);
//
Fiber<Integer> increasing = new Fiber<>("INCREASING", new SuspendableCallable<Integer>() {
@Override
public Integer run() throws SuspendExecution, InterruptedException {
int curr = 0;
for (int i = 0; i < 10; ++i) {
Fiber.sleep(10);
System.out.println("INCREASING sending curr = " + curr);
increasingToEcho.send(curr);
curr = echoToIncreasing.receive();
System.out.println("INCREASING received curr = " + curr);
curr++;
System.out.println("INCREASING now curr = " + curr);
}
//
System.out.println("INCREASING closing channel and exiting");
increasingToEcho.close();
return curr;
}
}).start();
複製代碼
echo
Fiber<Void> echo = new Fiber<Void>("ECHO", new SuspendableRunnable() {
@Override
public void run() throws SuspendExecution, InterruptedException {
Integer curr;
while(true) {
Fiber.sleep(1000);
curr = increasingToEcho.receive();
System.out.println("ECHO received curr = " + curr);
//
if (curr != null) {
System.out.println("ECHO sending curr = " + curr);
echoToIncreasing.send(curr);
} else {
System.out.println("ECHO 檢測到關閉channel,closing and existing");
echoToIncreasing.close();
return;
}
}
}
}).start();
複製代碼
increasing
和
increasing
try {
increasing.join();
echo.join();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
複製代碼
能夠看到,使用起來和Java中的Thread
比較類似,API
的語義清晰明瞭,減少了使用人員的成本。
顧名思義,經過修改javaagent
的方式,原理就是在classloading
階段動態的修改字節碼。好比熟悉的AspectJ
框架,核心就是ajc(編譯器)
和織入器(weaver)
達到不修改業務邏輯而修改字節碼,ajc
在java編譯器的基礎上,定義了一些aop語法,將符合這些語法的方法進行從新編譯。分爲「預編譯(CTW)」、「加載期(LTW)」、「後編譯期(PTW)」 3種織入方式。
quasar
的jvaagent
就屬於「加載期(LTW)」 織入的方式。一樣也是在不影響正常編譯的狀況下,增長一些代碼檢測,當檢測到某一個「方法」須要支持「暫停」功能的時候,進行從新編譯,從而達到掛起方法保存上下文,阻塞完成後恢復執行下面的代碼。
好比一個方法,在運行的時候須要請求網絡,此時這個方法就被阻塞了,須要等網絡請求返回在執行下面的代碼。那麼,當方法被阻塞的時候,就須要交給協程去控制,須要保存此時方法運行的上下文。當網絡請求完成的時候,在執行方法的下文。粗略的歸納一下協程的工做方式大體就是如此。
可是實際的場景會無比複雜。實際的一段阻塞的代碼中,裏面可會有多個阻塞的代碼塊,所以最頂端須要一個調度中心
,只有當裏面的阻塞代碼塊執行完了以後,纔會執行外面的代碼塊,否則最後都亂套了。
這就很像java中的ForkJoinPool
,一個大任務能夠Fork出不少子任務,只有當子任務
都完成執行,纔會去執行父任務
。quasar
也是如此,運行過程當中將須要被掛起的方法和方法內的代碼塊交給調度中心,調度中心中存儲任務之間的父子兄弟
關係,而後按照任務層次關係執行代碼。
quasar會將知足下面條件的方法進行織入:
@Suspendable
註解
SuspendExecution
/META-INF/suspendables
、
/META-INF/suspendable-supers
指定了一些類或者接口,
quasar
會對這些類或者接口的方法進行分析,符合上面任意一種的方法進行
「織入」
MethodHandle.invoke
動態調用的方法先後進行
「織入」
咱們也能夠從quasar
的官網文檔中看到依賴項,其中就有ASM — Java bytecode manipulation and analysis framework, by the ASM team,所以更多想了解織入的細節,你們能夠去了解下ASM
框架。Caffe
有空的時候也會單獨出一篇文章科普下,由於這塊的東西比較偏虛擬機
底層。
若是業務中一個方法中有不少阻塞性業務,那麼就要將這些阻塞性業務放入不一樣的Fiber
執行,能夠看到上文中的echo
和increasing
就屬於兩個阻塞型業務同時又相互依賴,邏輯上的依賴經過Channels
解決。
不管是經過javaagent
仍是AOT
(預編譯織入)的方式進行織入,本質上都是經過對字節碼先後進行插入特定的指令。可是這種很容易帶來一些兼容性問題,好比不少大廠都會經過pt-tracer
這種染色技術,來對java線程進行着色,進行全鏈路的調用監控或者壓測流量的區分。因此caffe
思來想去就放棄了使用quasar
這款偉大的協程框架,擔憂這種織入方式會不兼容線程中的染色
。
不事後面會嘗試解決,畢竟quasar
的性能讓人看了不起不流口水。
❝這塊你們都應該很熟悉了吧,最出名的就數
❞ReactiveX/RxJava
,這款在android
中最爲被普遍使用,caffe
在這使用RxJava3
進行舉例說明。
❝RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
❞
翻譯過來就是使用事件驅動實現異步的一款響應式框架。事件驅動
相信寫過前端
很熟悉,這也是node.js社區所吹噓的高併發
。RxJava
底層利用發佈訂閱模式
(與node.js底層模式類似)而且支持線程切換
來完成在有限的時間內支持更高的併發。
既然是發佈訂閱
模式,那必不可少的三要素發佈者
,訂閱者
, 事件類型
。
主要分爲下面3種事件類型:
Next
,發佈者能夠發佈多個
Next
事件,訂閱者也能夠訂閱多個
Next
事件;
Complete
,訂閱者接收到
Complete
事件便再也不訂閱發佈者的事件;
Error
,發佈者發佈
Error
事件以後,便再也不發佈事件。訂閱者接受
Error
事件也不會繼續訂閱事件。
increasing
充當發佈者的角色,每隔一段時間向echo
推送一個數字類型的消息。echo
服務接收到消息以後打印出來。
increasing
// 發佈者發送事件
Observable increasing = Observable.create((emitter) -> {
Thread.sleep(new Random().nextInt(1000));
emitter.onNext(0);
Thread.sleep(new Random().nextInt(1000));
emitter.onNext(1);
Thread.sleep(new Random().nextInt(1000));
emitter.onNext(2);
Thread.sleep(new Random().nextInt(1000));
emitter.onComplete();
});
複製代碼
echo
// 建立訂閱者
Observer<Integer> echo = new Observer<Integer>() {
private Disposable disposable;
//
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("觀察者開始訂閱");
disposable = d;
}
//
@Override
public void onNext(@NonNull Integer integer) {
System.out.println("觀察者接受到消息: " + integer);
}
//
@Override
public void onError(@NonNull Throwable e) {
System.out.println("觀察者接收到報錯: " + e.getMessage());
}
//
@Override
public void onComplete() {
System.out.println("觀察者訂閱完成,再也不繼續訂閱消息");
}
};
複製代碼
echo
訂閱increasing
increasing.subscribe(echo);
複製代碼
能夠看到最後的執行結果:
觀察者開始訂閱
觀察者接受到消息: 0
觀察者接受到消息: 1
觀察者接受到消息: 2
觀察者訂閱完成,再也不繼續訂閱消息
複製代碼
所以,在RxJava中,Observable
扮演發佈者,Observer
扮演訂閱者,ObservableOnSubscribe.subscribe
方法來完成事件
的發佈。發佈和訂閱之間的關聯是經過Observable.subscribe
來完成的。
線程切換
上面的實例代碼全部的發佈者和訂閱者的代碼都是在一個主線程中進行的。可是實際的業務場景中須要將發佈者的業務,和訂閱者的業務用不一樣的線程去完成,以減少業務的耗時。
線程切換
的代碼以下:
increasing
:
CountDownLatch latch = new CountDownLatch(3);
// 發佈者發送事件
ObservableOnSubscribe<Integer> onSubscribe = new ObservableOnSubscribe<Integer>() {
//
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
System.out.println("發佈者開始發佈事件-Thread.currentThread().getName() = " + Thread.currentThread().getName());
Thread.sleep(new Random().nextInt(1000));
emitter.onNext(0);
Thread.sleep(new Random().nextInt(1000));
emitter.onNext(1);
Thread.sleep(new Random().nextInt(1000));
emitter.onNext(2);
emitter.onComplete();
}
};
Observable<Integer> increasing = Observable.create(onSubscribe);
複製代碼
echo
:
// 訂閱者接受事件
Observer<Integer> echo = new Observer<Integer>() {
private Disposable disposable;
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("訂閱者開始訂閱事件-" + Thread.currentThread().getName());
disposable = d;
}
//
@Override
public void onNext(@NonNull Integer integer) {
latch.countDown();
System.out.println("訂閱者接收到事件-" + Thread.currentThread().getName() + " " + integer);
}
//
@Override
public void onError(@NonNull Throwable e) {
System.out.println("訂閱者接收到報錯,中止接受訂閱事件-" + Thread.currentThread().getName());
}
//
@Override
public void onComplete() {
System.out.println("訂閱者接收到complete事件,中止接受訂閱事件-" + Thread.currentThread().getName());
}
};
複製代碼
線程切換
:
// 訂閱者和發佈者切換線程訂閱
increasing
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.trampoline())
.subscribe(echo);
//
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
複製代碼
最後的結果:
訂閱者開始訂閱事件-main
發佈者開始發佈事件-Thread.currentThread().getName() = RxCachedThreadScheduler-1
訂閱者接收到事件-RxCachedThreadScheduler-1 0
訂閱者接收到事件-RxCachedThreadScheduler-1 1
訂閱者接收到事件-RxCachedThreadScheduler-1 2
訂閱者接收到complete事件,中止接受訂閱事件-RxCachedThreadScheduler-1
複製代碼
能夠看到,發佈者和訂閱者分別在不一樣的線程中執行。其中Observable.subscribeOn(@NonNull Scheduler scheduler)
是定義發佈者方法執行的調度器,Observable。observeOn(@NonNull Scheduler scheduler)
定義了訂閱者方法的調度器。
而調度器
有不少種類別,好比IoScheduler
、NewThreadScheduler
、SingleScheduler
、ComputationScheduler
等,須要根據不一樣的業務場景,合理的選擇Scheduler
。
所以,須要更深層次的理解RxJava,就須要再去扒Scheduler
的具體實現,這裏caffe準備以後的文章中進行深度分析。
❝
Caffe
手寫辛苦,麻煩各位大佬點贊關注留言,很是感謝大家的鼓勵和支持。下週咱們再見!