繁雜網絡IO型業務的分析及探索--協程和響應式

最近Caffe在嘗試優化業務中發現,服務的心跳信息中有不少線程都是處於waiting,以下圖所示:前端

thread count="608" 
daemon-count="420" 
peak-count="611" 
total-started-count="13722" 
deadlocked="0" new="0" runnable="169" blocked="0" 
waiting="314"
複製代碼

而後看了CPU的使用率,從左到右分別表示CPU的任務等待數/CPU核數CPU的執行時間佔比總時間(CPU執行時間+CPU空閒時間+ CPU等待時間)當前JAVA進程執行時間佔比總時間java

圖中能夠清晰地看到,並非計算型業務致使了線程等待,而是極大可能因爲服務到底層數據查詢的網絡IO等待使得排隊的線程增長,所以決定考慮優化這一部分。優化的目標,在保證服務和底層存儲的心跳信息在一個安全的範圍內,儘量的增長服務吞吐能力。node

思路一協程

當時優化的第一時間就想到了大名鼎鼎的quasar三方庫。quasar能夠理解爲輕量級的線程實現,熟悉go語言必定知道goroutine,咱們知道Java語言中不支持協程,業務中不少場景都須要用線程池進行優化,可是使用線程池的成本也很高,不管是內存佔用仍是線程之間的切換消耗,都限制了一個應用不能無限制的建立線程。android

好在社區開源了一款Java coroutine框架quasar,容我先吐槽一下,這個框架真的是直男程序員寫的(已經被拉去寫JDK的協程,十分期待JDK能早點支持協程),文檔十分匱乏,致使我本地開始搞得時候就報錯了一把,開局體驗不是很舒服。程序員

固然優勢也十分突出,應用中網絡IO耗時佔比比較突出的場景中,使用quasar能夠極大的提升CPU的吞吐率。簡單描述就是能夠在更短的時間內處理更多的請求。不會由於一個線程中的網絡IO堵塞而讓後面的線程處於waiting中,堵塞的時候CPU是不幹活的,所以將整個系統的吞吐率拉胯。web

官網的文檔中提供了兩種使用方式,爲了節約篇幅先用第1種方式示範一下使用方式:編程

  1. Running the Instrumentation Java Agent(加載器織入)
  2. Ahead-of-Time (AOT) Instrumentation(預編譯織入)

這裏我先用Gradle項目做爲🌰來詳解一下怎麼使用。安全

1、Gradle配置模塊

服務器

configurations { quasar } // tasks.withType(JavaExec) { jvmArgs "-javaagent:${configurations.quasar.iterator().next()}" } // dependencies { compile "org.antlr:antlr4:4.7.2" compile "co.paralleluniverse:quasar-core:0.7.5" quasar "co.paralleluniverse:quasar-core:0.7.5:jdk8@jar" testCompile group: 'junit', name: 'junit', version: '4.12' } 複製代碼複製代碼

2、實現一個耳熟能詳的echo服務器

兩個Fiber(至關因而Java的Thread)相互通訊,increasing發送一個int數字給echoecho收到以後返回給increasingincreasing接收到echo返回的消息,先打印,在執行++操做,而後打印出最後的結果。代碼示例以下:網絡

  1. increasing
final IntChannel increasingToEcho = Channels.newIntChannel(0);
        final IntChannel echoToIncreasing = Channels.newIntChannel(0);
        //
        Fiber<Integer> increasing = new Fiber<>("INCREASING", new SuspendableCallable<Integer>() {
            @Override
            public Integer run() throws SuspendExecution, InterruptedException {
                int curr = 0;
                for (int i = 0; i < 10; ++i) {
                    Fiber.sleep(10);
                    System.out.println("INCREASING sending curr = " + curr);
                    increasingToEcho.send(curr);
                    curr = echoToIncreasing.receive();
                    System.out.println("INCREASING received curr = " + curr);
                    curr++;
                    System.out.println("INCREASING now curr = " + curr);
                }
                //
                System.out.println("INCREASING closing channel and exiting");
                increasingToEcho.close();
                return curr;
            }
        }).start();
複製代碼
  1. echo
Fiber<Void> echo = new Fiber<Void>("ECHO", new SuspendableRunnable() {
            @Override
            public void run() throws SuspendExecution, InterruptedException {
                Integer curr;
                while(true) {
                    Fiber.sleep(1000);
                    curr = increasingToEcho.receive();
                    System.out.println("ECHO received curr = " + curr);
                    //
                    if (curr != null) {
                        System.out.println("ECHO sending curr = " + curr);
                        echoToIncreasing.send(curr);
                    } else {
                        System.out.println("ECHO 檢測到關閉channel,closing and existing");
                        echoToIncreasing.close();
                        return;
                    }
                }
            }
        }).start();
複製代碼
  1. 運行 increasingincreasing
try {
            increasing.join();
            echo.join();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
複製代碼

能夠看到,使用起來和Java中的Thread比較類似,API的語義清晰明瞭,減少了使用人員的成本。

3、原理及使用注意事項

1. Running the Instrumentation Java Agent

顧名思義,經過修改javaagent的方式,原理就是在classloading階段動態的修改字節碼。好比熟悉的AspectJ框架,核心就是ajc(編譯器)織入器(weaver)達到不修改業務邏輯而修改字節碼,ajc在java編譯器的基礎上,定義了一些aop語法,將符合這些語法的方法進行從新編譯。分爲預編譯(CTW)加載期(LTW)後編譯期(PTW) 3種織入方式。

quasarjvaagent就屬於加載期(LTW) 織入的方式。一樣也是在不影響正常編譯的狀況下,增長一些代碼檢測,當檢測到某一個方法須要支持暫停功能的時候,進行從新編譯,從而達到掛起方法保存上下文,阻塞完成後恢復執行下面的代碼。

好比一個方法,在運行的時候須要請求網絡,此時這個方法就被阻塞了,須要等網絡請求返回在執行下面的代碼。那麼,當方法被阻塞的時候,就須要交給協程去控制,須要保存此時方法運行的上下文。當網絡請求完成的時候,在執行方法的下文。粗略的歸納一下協程的工做方式大體就是如此。

可是實際的場景會無比複雜。實際的一段阻塞的代碼中,裏面可會有多個阻塞的代碼塊,所以最頂端須要一個調度中心,只有當裏面的阻塞代碼塊執行完了以後,纔會執行外面的代碼塊,否則最後都亂套了。

這就很像java中的ForkJoinPool,一個大任務能夠Fork出不少子任務,只有當子任務都完成執行,纔會去執行父任務quasar也是如此,運行過程當中將須要被掛起的方法和方法內的代碼塊交給調度中心,調度中心中存儲任務之間的父子兄弟關係,而後按照任務層次關係執行代碼。

1.1 quasar織入的條件

quasar會將知足下面條件的方法進行織入:

  1. 方法帶有 @Suspendable註解
  2. 方法拋出了異常 SuspendExecution
  3. classpath下 /META-INF/suspendables/META-INF/suspendable-supers指定了一些類或者接口, quasar會對這些類或者接口的方法進行分析,符合上面任意一種的方法進行 織入
  4. 方法內部經過反射調用的方法,先後也會進行 織入
  5. MethodHandle.invoke動態調用的方法先後進行 織入
  6. JDK動態代理執行的代碼塊先後 織入
  7. Java 8 lambdas調用先後 織入

咱們也能夠從quasar的官網文檔中看到依賴項,其中就有ASM — Java bytecode manipulation and analysis framework, by the ASM team,所以更多想了解織入的細節,你們能夠去了解下ASM 框架。Caffe有空的時候也會單獨出一篇文章科普下,由於這塊的東西比較偏虛擬機底層。

1.2 quasar實際使用中須要對業務怎麼改造?

若是業務中一個方法中有不少阻塞性業務,那麼就要將這些阻塞性業務放入不一樣的Fiber執行,能夠看到上文中的echoincreasing就屬於兩個阻塞型業務同時又相互依賴,邏輯上的依賴經過Channels解決。

1.3 兼容性問題

不管是經過javaagent仍是AOT(預編譯織入)的方式進行織入,本質上都是經過對字節碼先後進行插入特定的指令。可是這種很容易帶來一些兼容性問題,好比不少大廠都會經過pt-tracer這種染色技術,來對java線程進行着色,進行全鏈路的調用監控或者壓測流量的區分。因此caffe思來想去就放棄了使用quasar這款偉大的協程框架,擔憂這種織入方式會不兼容線程中的染色

不事後面會嘗試解決,畢竟quasar的性能讓人看了不起不流口水。

思路二響應式編程

這塊你們都應該很熟悉了吧,最出名的就數ReactiveX/RxJava,這款在android中最爲被普遍使用,caffe在這使用RxJava3進行舉例說明。

1、RxJava簡介

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

翻譯過來就是使用事件驅動實現異步的一款響應式框架。事件驅動相信寫過前端很熟悉,這也是node.js社區所吹噓的高併發RxJava底層利用發佈訂閱模式(與node.js底層模式類似)而且支持線程切換來完成在有限的時間內支持更高的併發。

1.RxJava相關概念介紹

既然是發佈訂閱模式,那必不可少的三要素發佈者訂閱者事件類型

1.1 事件類型

主要分爲下面3種事件類型:

  1. Next,發佈者能夠發佈多個 Next事件,訂閱者也能夠訂閱多個 Next事件;
  2. Complete,訂閱者接收到 Complete事件便再也不訂閱發佈者的事件;
  3. Error,發佈者發佈 Error事件以後,便再也不發佈事件。訂閱者接受 Error事件也不會繼續訂閱事件。
1.2 echo服務中的發佈、訂閱、事件

increasing充當發佈者的角色,每隔一段時間向echo推送一個數字類型的消息。echo服務接收到消息以後打印出來。

increasing

// 發佈者發送事件
        Observable increasing = Observable.create((emitter) -> {
            Thread.sleep(new Random().nextInt(1000));
            emitter.onNext(0);
            Thread.sleep(new Random().nextInt(1000));
            emitter.onNext(1);
            Thread.sleep(new Random().nextInt(1000));
            emitter.onNext(2);
            Thread.sleep(new Random().nextInt(1000));
            emitter.onComplete();
        });
複製代碼

echo

// 建立訂閱者
          Observer<Integer> echo = new Observer<Integer>() {
            private Disposable disposable;
            //
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                System.out.println("觀察者開始訂閱");
                disposable = d;
            }
            //
            @Override
            public void onNext(@NonNull Integer integer) {
                System.out.println("觀察者接受到消息: " + integer);
            }
            // 
            @Override
            public void onError(@NonNull Throwable e) {
                System.out.println("觀察者接收到報錯: " + e.getMessage());
            }
            //
            @Override
            public void onComplete() {
                System.out.println("觀察者訂閱完成,再也不繼續訂閱消息");
            }
        };
複製代碼

echo訂閱increasing

increasing.subscribe(echo);
複製代碼

能夠看到最後的執行結果:

觀察者開始訂閱
觀察者接受到消息: 0
觀察者接受到消息: 1
觀察者接受到消息: 2
觀察者訂閱完成,再也不繼續訂閱消息
複製代碼

所以,在RxJava中,Observable扮演發佈者,Observer扮演訂閱者,ObservableOnSubscribe.subscribe方法來完成事件的發佈。發佈和訂閱之間的關聯是經過Observable.subscribe來完成的。

1.3 發佈者和訂閱者線程切換

上面的實例代碼全部的發佈者和訂閱者的代碼都是在一個主線程中進行的。可是實際的業務場景中須要將發佈者的業務,和訂閱者的業務用不一樣的線程去完成,以減少業務的耗時。

線程切換的代碼以下:

increasing

CountDownLatch latch = new CountDownLatch(3);
            // 發佈者發送事件
            ObservableOnSubscribe<Integer> onSubscribe = new ObservableOnSubscribe<Integer>() {
            //
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                System.out.println("發佈者開始發佈事件-Thread.currentThread().getName() = " + Thread.currentThread().getName());
                Thread.sleep(new Random().nextInt(1000));
                emitter.onNext(0);
                Thread.sleep(new Random().nextInt(1000));
                emitter.onNext(1);
                Thread.sleep(new Random().nextInt(1000));
                emitter.onNext(2);
                emitter.onComplete();
            }
          };
          Observable<Integer> increasing = Observable.create(onSubscribe);
複製代碼

echo

// 訂閱者接受事件
            Observer<Integer> echo = new Observer<Integer>() {
            private Disposable disposable;
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                System.out.println("訂閱者開始訂閱事件-" + Thread.currentThread().getName());
                disposable = d;
            }
            // 
            @Override
            public void onNext(@NonNull Integer integer) {
                latch.countDown();
                System.out.println("訂閱者接收到事件-" + Thread.currentThread().getName() + "   " + integer);
            }
            //
            @Override
            public void onError(@NonNull Throwable e) {
                System.out.println("訂閱者接收到報錯,中止接受訂閱事件-" + Thread.currentThread().getName());
            }
            //
            @Override
            public void onComplete() {
                System.out.println("訂閱者接收到complete事件,中止接受訂閱事件-" + Thread.currentThread().getName());
            }
        };
複製代碼

線程切換

// 訂閱者和發佈者切換線程訂閱
        increasing
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.trampoline())
                .subscribe(echo);
        //
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
複製代碼

最後的結果:

訂閱者開始訂閱事件-main
發佈者開始發佈事件-Thread.currentThread().getName() = RxCachedThreadScheduler-1
訂閱者接收到事件-RxCachedThreadScheduler-1   0
訂閱者接收到事件-RxCachedThreadScheduler-1   1
訂閱者接收到事件-RxCachedThreadScheduler-1   2
訂閱者接收到complete事件,中止接受訂閱事件-RxCachedThreadScheduler-1
複製代碼

能夠看到,發佈者和訂閱者分別在不一樣的線程中執行。其中Observable.subscribeOn(@NonNull Scheduler scheduler)是定義發佈者方法執行的調度器,Observable。observeOn(@NonNull Scheduler scheduler)定義了訂閱者方法的調度器。

調度器有不少種類別,好比IoSchedulerNewThreadSchedulerSingleSchedulerComputationScheduler等,須要根據不一樣的業務場景,合理的選擇Scheduler

所以,須要更深層次的理解RxJava,就須要再去扒Scheduler的具體實現,這裏caffe準備以後的文章中進行深度分析。

Caffe手寫辛苦,麻煩各位大佬點贊關注留言,很是感謝大家的鼓勵和支持。下週咱們再見!

相關文章
相關標籤/搜索