響應式編程總覽

引子:被譽爲「中國大數據第一人」的塗子沛先生在其成名做《數據之巔》裏提到,摩爾定律、社交媒體、數據挖掘是大數據的三大成因。IBM的研究稱,整我的類文明所得到的所有數據中,有90%是過去兩年內產生的。在此背景下,包括NoSQL,Hadoop, Spark, Storm, Kylin在內的大批新技術應運而生。其中以RxJavaReactor爲表明的響應式(Reactive)編程技術針對的就是經典的大數據4V定義(Volume,Variety,Velocity,Value)中的Velocity,即高併發問題,而在即將發佈的Spring 5中,也引入了響應式編程的支持。在接下來的幾周,我會圍繞響應式編程分三期與你分享個人一些學習心得。本篇是第二篇,以Reactor框架爲例介紹響應式編程的幾個關鍵特性。java

前情概要:react

1 響應式編程總覽

In computing, reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. - Reactive programming - Wikipediagit

在上述響應式編程(後面簡稱RP)的定義中,除了異步編程,還包含兩個重要的關鍵詞:github

  • Data streams: 即數據流,分爲靜態數據流(好比數組,文件)和動態數據流(好比事件流,日誌流)兩種。基於數據流模型,RP得以提供一套統一的Stream風格的數據處理接口。和Java 8中的Stream API相比,RP API除了支持靜態數據流,還支持動態數據流,而且容許複用和同時接入多個訂閱者。
  • The propagation of change: 變化傳播,簡單來講就是以一個數據流爲輸入,通過一連串操做轉化爲另外一個數據流,而後分發給各個訂閱者的過程。這就有點像函數式編程中的組合函數,將多個函數串聯起來,把一組輸入數據轉化爲格式迥異的輸出數據。

一個容易混淆的概念是響應式設計,雖然它的名字中也包含了「響應式」三個字,但其實和RP徹底是兩碼事。響應式設計是指網頁可以自動調整佈局和樣式以適配不一樣尺寸的屏幕,屬於網站設計的範疇,而RP是一種關注系統可響應性,面向數據流的編程思想或者說編程框架。web

特性

從本質上說,RP是一種異步編程框架,和其餘框架相比,RP至少包含了如下三個特性:spring

  • 描述而非執行:在你最終調用subscribe()方法以前,從發佈端到訂閱端,沒有任何事會發生。就比如不管多長的水管,只要水龍頭不打開,水管裏的水就不會流動。爲了提升描述能力,RP提供了比Stream豐富的多的多的API,好比buffer(), merge(), onErrorMap()等。
  • 提升吞吐量: 相似於HTTP/2中的鏈接複用,RP經過線程複用來提升吞吐量。在傳統的Servlet容器中,每來一個請求就會發起一個線程進行處理。受限於機器硬件資源,單臺服務器所能支撐的線程數是存在一個上限的,假設爲T,那麼應用同時能處理的請求數(吞吐量)必然也不會超過T。但對於一個使用Spring 5開發的RP應用,若是運行在像Netty這樣的異步容器中,不管有多少個請求,用於處理請求的線程數是相對固定的,所以最大吞吐量就有可能超過T。
  • 背壓(Backpressure)支持:簡單來講,背壓就是一種反饋機制。在通常的Push模型中,發佈者既不知道也不關心訂閱者的處理速度,當數據的發佈速度超過處理速度時,須要訂閱者本身決定是緩存仍是丟棄。若是使用RP,決定權就交回給發佈者,訂閱者只須要根據本身的處理能力問發佈者請求相應數量的數據。你可能會問這不就是Pull模型嗎?實際上是不一樣的。在Pull模型中,訂閱者每次處理完數據,都要從新發起一次請求拉取新的數據,而使用背壓,訂閱者只須要發起一次請求,就能接二連三的重複請求數據。

適用場景

瞭解了RP的這些特性,你可能已經猜測到RP有哪些適用場景了。通常來講,RP適用於高併發、帶延遲操做的場景,好比如下這些狀況(的組合):編程

  • 一次請求涉及屢次外部服務調用
  • 非可靠的網絡傳輸
  • 高併發下的消息處理
  • 彈性計算網絡

代價

Every coin has two sides.api

和任何框架同樣,有優點必然就有劣勢。RP的兩個比較大的問題是:數組

  • 雖然複用線程有助於提升吞吐量,但一旦在某個回調函數中線程被卡住,那麼這個線程上全部的請求都會被阻塞,最嚴重的狀況,整個應用會被拖垮。
  • 難以調試。因爲RP強大的描述能力,在一個典型的RP應用中,大部分代碼都是以鏈式表達式的形式出現,好比flux.map(String::toUpperCase).doOnNext(s -> LOG.info("UC String {}", s)).next().subscribe(),一旦出錯,你將很難定位到具體是哪一個環節出了問題。所幸的是,RP框架通常都會提供一些工具方法來輔助進行調試。

2 Reactor實戰

爲了幫助你理解上面說的一些概念,下面我就經過幾個測試用例,演示RP的兩個關鍵特性:提升吞吐量和背壓。完整的代碼可參見我GitHub上的示例工程緩存

提升吞吐量

@Test
    public void testImperative() throws InterruptedException {
        _runInParallel(CONCURRENT_SIZE, () -> {
            ImperativeRestaurantRepository.INSTANCE.insert(load);
        });
    }

    private void _runInParallel(int nThreads, Runnable task) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
        for (int i = 0; i < nThreads; i++) {
            executorService.submit(task);
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.MINUTES);
    }

    @Test
    public void testReactive() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(CONCURRENT_SIZE);
        for (int i = 0; i < CONCURRENT_SIZE; i++) {
            ReactiveRestaurantRepository.INSTANCE.insert(load).subscribe(s -> {
            }, e -> latch.countDown(), latch::countDown);
        }
        latch.await();
    }複製代碼

用例解讀:

  • 第一個測試用例使用的是多線程+MongoDB Driver,同時起100個線程,每一個線程往MongoDB中插入10000條數據,總共100萬條數據,平均用時15秒左右。
  • 第二個測試用例使用的是Reactor+MongoDB Reactive Streams Driver,一樣是插入100萬條數據,平均用時不到10秒,吞吐量提升了50%!

背壓

在演示測試用例以前,先看兩張圖,幫助你更形象的理解什麼是背壓。

圖片出處:Dataflow and simplified reactive programming

兩張圖乍一看沒啥區別,但實際上是徹底兩種不一樣的背壓策略。第一張圖,發佈速度(100/s)遠大於訂閱速度(1/s),但因爲背壓的關係,發佈者嚴格按照訂閱者的請求數量發送數據。第二張圖,發佈速度(1/s)小於訂閱速度(100/s),當訂閱者請求100個數據時,發佈者會積滿所需個數的數據再開始發送。能夠看到,經過背壓機制,發佈者能夠根據各個訂閱者的能力動態調整發布速度。

@BeforeEach
    public void beforeEach() {
        // initialize publisher
        AtomicInteger count = new AtomicInteger();
        timerPublisher = Flux.create(s ->
                new Timer().schedule(new TimerTask() {
                    @Override
                    public void run() {
                        s.next(count.getAndIncrement());
                        if (count.get() == 10) {
                            s.complete();
                        }
                    }
                }, 100, 100)
        );
    }

    @Test
    public void testNormal() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        timerPublisher
                .subscribe(r -> System.out.println("Continuous consuming " + r),
                        e -> latch.countDown(),
                        latch::countDown);
        latch.await();
    }

    @Test
    public void testBackpressure() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        AtomicReference<Subscription> timerSubscription = new AtomicReference<>();
        Subscriber<Integer> subscriber = new BaseSubscriber<Integer>() {
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                timerSubscription.set(subscription);
            }

            @Override
            protected void hookOnNext(Integer value) {
                System.out.println("consuming " + value);
            }

            @Override
            protected void hookOnComplete() {
                latch.countDown();
            }

            @Override
            protected void hookOnError(Throwable throwable) {
                latch.countDown();
            }
        };
        timerPublisher.onBackpressureDrop().subscribe(subscriber);
        new Timer().schedule(new TimerTask() {
            @Override
            public void run() {
                timerSubscription.get().request(1);
            }
        }, 100, 200);
        latch.await();
    }複製代碼

用例解讀:

  • 第一個測試用例演示了在理想狀況下,即訂閱者的處理速度可以跟上發佈者的發佈速度(以100ms爲間隔產生10個數字),控制檯從0打印到9,一共10個數字,和發佈端一致。
  • 第二個測試用例故意調慢了訂閱者的處理速度(每200ms處理一個數字),同時發佈者採用了Drop的背壓策略,結果控制檯只打印了一半的數字(0,2,4,6,8),另一半的數字因爲背壓的緣由被髮布者Drop掉了,並無發給訂閱者。

3 小結

經過上面的介紹,不難看出RP其實是一種內置了發佈者訂閱者模型的異步編程框架,包含了線程複用,背壓等高級特性,特別適用於高併發、有延遲的場景。

以上就是我對響應式編程的一些簡單介紹,歡迎你到個人留言板分享,和你們一塊兒過過招。下一篇我將綜合前兩篇的內容,詳解一個完整的Spring 5示例應用,敬請期待。

4 參考

相關文章
相關標籤/搜索