(14)Reactor調度器與線程模型——響應式Spring的道法術器

本系列文章索引《響應式Spring的道法術器》
前情提要 Reactor 3快速上手 | 響應式流規範 | 自定義數據流
本文測試源碼html

2.4 調度器與線程模型

1.3.2節簡單介紹了不一樣類型的調度器Scheduler,以及如何使用publishOnsubscribeOn切換不一樣的線程執行環境。java

下邊使用一個簡單的例子再回憶一下:react

@Test
    public void testScheduling() {
        Flux.range(0, 10)
//                .log()    // 1
                .publishOn(Schedulers.newParallel("myParallel"))
//                .log()    // 2
                .subscribeOn(Schedulers.newElastic("myElastic"))
                .log()    // 3
                .blockLast();
    }
  1. 只保留這個log()的話,能夠看到,源頭數據流是執行在myElastic-x線程上的;
  2. 只保留這個log()的話,能夠看到,publishOn以後數據流是執行在myParallel-x線程上的;
  3. 只保留這個log()的話,能夠看到,subscribeOn以後數據流依然是執行在myParallel-x線程上的。

經過以上三個log()的輸出,能夠發現,對於以下圖所示的操做鏈:git

調度

  • publishOn會影響鏈中其後的操做符,好比第一個publishOn調整調度器爲elastic,則filter的處理操做是在彈性線程池中執行的;同理,flatMap是執行在固定大小的parallel線程池中的;
  • subscribeOn不管出如今什麼位置,都隻影響源頭的執行環境,也就是range方法是執行在單線程中的,直至被第一個publishOn切換調度器以前,因此range後的map也在單線程中執行。

這一節咱們瞭解一下它的實現機制。github

2.4.1 調度器

調度器至關於Reactor中的ExecutorService,不一樣的調度器定義不一樣的線程執行環境。Schedulers工具類提供的靜態方法可搭建不一樣的線程執行環境。編程

Schedulers類已經預先建立了幾種經常使用的不一樣線程池模型的調度器:使用single()elastic()parallel()方法建立的調度器能夠分別使用內置的單線程、彈性線程池和固定大小線程池。若是想建立新的調度器,可使用newSingle()newElastic()newParallel()方法。這些方法都是返回一個Scheduler的具體實現。數組

看一下Scheduler都有哪些行爲:多線程

public interface Scheduler extends Disposable {
    // 調度執行Runnable任務task。
    Disposable schedule(Runnable task);
    // 延遲一段指定的時間後執行。
    Disposable schedule(Runnable task, long delay, TimeUnit unit);
    // 週期性地執行任務。
    Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);
    // 建立一個工做線程。
    Worker createWorker();
    // 啓動調度器
    void start();
    // 如下兩個方法能夠暫時忽略
    void dispose();
    long now(TimeUnit unit)

    // 一個Worker表明調度器可調度的一個工做線程,在一個Worker內,遵循FIFO(先進先出)的任務執行策略
    interface Worker extends Disposable {
        // 調度執行Runnable任務task。
        Disposable schedule(Runnable task);
        // 延遲一段指定的時間後執行。
        Disposable schedule(Runnable task, long delay, TimeUnit unit);
        // 週期性地執行任務。
        Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);
    }
}

title

如圖所示,Scheduler是領導,Worker是員工,每一個Scheduler手中有若干Worker。接到任務後,Scheduler負責分派,Worker負責幹活。架構

Scheduler中,每一個Worker都是一個ScheduledExecutorService,或一個包裝了ScheduledExecutorService的對象。因此,Scheduler擁有的並非線程池,而是一個自行維護的ScheduledExecutorService池。異步

所謂「自行維護」,主要有三點:

  1. 可供調遣的Worker。好比Schedulers.newParallel()返回的ParallelScheduler,其內維護的是一個固定大小的ScheduledExecutorService[]數組;而ElasticScheduler由一個ExecutorService的Queue來維護。
  2. 任務分派策略。ElasticSchedulerParallelScheduler都有一個pick()方法,用來選出合適的Worker
  3. 對於要處理的任務,包裝爲Callable,從而能夠異步地返回一個Future給調用者。

2.4.2 切換執行環境

再回到publishOnsubscribeOn方法。

在Reactor中,對於數據流的處理,其實是一系列方法調用和基於事件的回調,包括subscribeonSubscriberequest,以及onNextonErroronComplete。拿出2.1節的圖幫助理解:

title

當調用.subscribe()方法時,會造成從上游向下遊的數據流,數據流中的元素經過onNext* (onError|onComplete)攜帶「順流而下」。同時,Reactor使用者看不到的是,還有一條從下游向上遊的「訂閱鏈」,request就是沿着這個鏈向上反饋需求的。

publishOn方法可以將onNextonErroronComplete調度到給定的SchedulerWorker上執行。因此如上圖場景中,再.map.filter中間增長一個publisheOn(Schedulers.elastic())的話,.filter操做的onNext的過濾處理將會執行在ElasticScheduler的某個Worker上。

subscribeOn方法可以將subscribe(會調用onSubscribe)、request調度到給定的SchedulerWorker上執行。因此在任何位置增長一個subscribeOn(Schedulers.elastic())的話,都會藉助自下而上的訂閱鏈,經過subscribe()方法,將線程執行環境傳遞到「源頭」,從而Flux.just會執行在ElasticScheduler上。繼而影響到其後的操做符,直至遇到publishOn改變了執行環境。

此外,有些操做符自己會須要調度器來進行多線程的處理,當你不明確指定調度器的時候,那些操做符會自行使用內置的單例調度器來執行。例如,Flux.delayElements(Duration) 使用的是 Schedulers.parallel()調度器對象:

@Test
    public void testDelayElements() {
        Flux.range(0, 10)
                .delayElements(Duration.ofMillis(10))
                .log()
                .blockLast();
    }

從輸出能夠看到onNext運行在不一樣的線程上:

[ INFO] (main) onSubscribe(FluxConcatMap.ConcatMapImmediate)
[ INFO] (main) request(unbounded)
[ INFO] (parallel-1) onNext(0)
[ INFO] (parallel-2) onNext(1)
[ INFO] (parallel-3) onNext(2)
[ INFO] (parallel-4) onNext(3)
...

2.4.3 爲數據流配置Context

在Reactor中,基於Scheduler的線程調度確實很是簡單好用,可是還有個問題須要解決。

咱們以往在編寫多線程的代碼時,若是涉及到只在線程內部使用的值,可能會使用ThreadLocal進行包裝。

可是在響應式編程中,因爲線程環境常常發生變化,這一用法就失去做用了,而且甚至帶來bug。好比,使用 Logback 的 MDC 來存儲日誌關聯的 ID 就屬於這種狀況。

自從版本 3.1.0,Reactor 引入了一個相似於 ThreadLocal 的高級功能:Context。它做用於一個 Flux 或一個 Mono 上,而不是應用於一個線程(Thread)。也就是其生命週期伴隨整個數據流,而不是線程。

相對來講,用戶使用Context並很少,對此感興趣或有此需求的話,請看我翻譯的相關文檔,能夠對Reactor內部實現尤爲是Subscription有更深的理解。

2.4.4 並行執行

現在多核架構已然普及,可以方便的進行並行處理是很重要的。

對於一些可以在一個線程中順序處理的任務,即便調度到ParallelScheduler上,一般也只由一個Worker來執行,好比:

@Test
    public void testParallelFlux() throws InterruptedException {
        Flux.range(1, 10)
                .publishOn(Schedulers.parallel())
                .log().subscribe();
        TimeUnit.MILLISECONDS.sleep(10);
    }

輸出以下:

[ INFO] (main) | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (parallel-1) | onNext(1)
[ INFO] (parallel-1) | onNext(2)
[ INFO] (parallel-1) | onNext(3)
[ INFO] (parallel-1) | onNext(4)
[ INFO] (parallel-1) | onNext(5)
[ INFO] (parallel-1) | onNext(6)
[ INFO] (parallel-1) | onNext(7)
[ INFO] (parallel-1) | onNext(8)
[ INFO] (parallel-1) | onNext(9)
[ INFO] (parallel-1) | onNext(10)
[ INFO] (parallel-1) | onComplete()

有時候,咱們確實須要一些任務可以「均勻」分佈在不一樣的工做線程上執行,這時候就須要用到ParallelFlux

你能夠對任何Flux使用parallel()操做符來獲得一個ParallelFlux。不過這個操做符自己並不會進行並行處理,而只是將負載劃分到多個執行「軌道」上(默認狀況下,軌道個數與CPU核數相等)。

爲了配置ParallelFlux如何並行地執行每個軌道,須要使用runOn(Scheduler),這裏,Schedulers.parallel() 是比較推薦的專門用於並行處理的調度器。

@Test
    public void testParallelFlux() throws InterruptedException {
        Flux.range(1, 10)
                .parallel(2)
                .runOn(Schedulers.parallel())
//                .publishOn(Schedulers.parallel())
                .log()
                .subscribe();

        TimeUnit.MILLISECONDS.sleep(10);
    }

輸出以下:

[ INFO] (main) onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (main) onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (parallel-1) onNext(1)
[ INFO] (parallel-2) onNext(2)
[ INFO] (parallel-1) onNext(3)
[ INFO] (parallel-2) onNext(4)
[ INFO] (parallel-1) onNext(5)
[ INFO] (parallel-2) onNext(6)
[ INFO] (parallel-1) onNext(7)
[ INFO] (parallel-2) onNext(8)
[ INFO] (parallel-1) onNext(9)
[ INFO] (parallel-2) onNext(10)
[ INFO] (parallel-1) onComplete()
[ INFO] (parallel-2) onComplete()

能夠看到,各個元素的onNext 「均勻」分佈執行在兩個線程上,最後每一個線程上有獨立的onComplete事件,這與publishOn調度到ParallelScheduler上的狀況是不一樣的。

相關文章
相關標籤/搜索