本系列文章索引《響應式Spring的道法術器》
前情提要 Reactor 3快速上手 | 響應式流規範 | 自定義數據流
本文測試源碼html2.4 調度器與線程模型
在1.3.2節簡單介紹了不一樣類型的調度器Scheduler
,以及如何使用publishOn
和subscribeOn
切換不一樣的線程執行環境。java
下邊使用一個簡單的例子再回憶一下:react
@Test public void testScheduling() { Flux.range(0, 10) // .log() // 1 .publishOn(Schedulers.newParallel("myParallel")) // .log() // 2 .subscribeOn(Schedulers.newElastic("myElastic")) .log() // 3 .blockLast(); }
myElastic-x
線程上的;publishOn
以後數據流是執行在myParallel-x
線程上的;subscribeOn
以後數據流依然是執行在myParallel-x
線程上的。經過以上三個log()
的輸出,能夠發現,對於以下圖所示的操做鏈:git
publishOn
會影響鏈中其後的操做符,好比第一個publishOn調整調度器爲elastic,則filter
的處理操做是在彈性線程池中執行的;同理,flatMap
是執行在固定大小的parallel線程池中的;subscribeOn
不管出如今什麼位置,都隻影響源頭的執行環境,也就是range
方法是執行在單線程中的,直至被第一個publishOn
切換調度器以前,因此range
後的map
也在單線程中執行。這一節咱們瞭解一下它的實現機制。github
調度器至關於Reactor中的ExecutorService,不一樣的調度器定義不一樣的線程執行環境。Schedulers
工具類提供的靜態方法可搭建不一樣的線程執行環境。編程
Schedulers
類已經預先建立了幾種經常使用的不一樣線程池模型的調度器:使用single()
、elastic()
和parallel()
方法建立的調度器能夠分別使用內置的單線程、彈性線程池和固定大小線程池。若是想建立新的調度器,可使用newSingle()
、newElastic()
和newParallel()
方法。這些方法都是返回一個Scheduler
的具體實現。數組
看一下Scheduler
都有哪些行爲:多線程
public interface Scheduler extends Disposable { // 調度執行Runnable任務task。 Disposable schedule(Runnable task); // 延遲一段指定的時間後執行。 Disposable schedule(Runnable task, long delay, TimeUnit unit); // 週期性地執行任務。 Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit); // 建立一個工做線程。 Worker createWorker(); // 啓動調度器 void start(); // 如下兩個方法能夠暫時忽略 void dispose(); long now(TimeUnit unit) // 一個Worker表明調度器可調度的一個工做線程,在一個Worker內,遵循FIFO(先進先出)的任務執行策略 interface Worker extends Disposable { // 調度執行Runnable任務task。 Disposable schedule(Runnable task); // 延遲一段指定的時間後執行。 Disposable schedule(Runnable task, long delay, TimeUnit unit); // 週期性地執行任務。 Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit); } }
如圖所示,Scheduler
是領導,Worker
是員工,每一個Scheduler
手中有若干Worker
。接到任務後,Scheduler
負責分派,Worker
負責幹活。架構
在Scheduler
中,每一個Worker
都是一個ScheduledExecutorService
,或一個包裝了ScheduledExecutorService
的對象。因此,Scheduler
擁有的並非線程池,而是一個自行維護的ScheduledExecutorService
池。異步
所謂「自行維護」,主要有三點:
Worker
。好比Schedulers.newParallel()
返回的ParallelScheduler
,其內維護的是一個固定大小的ScheduledExecutorService[]
數組;而ElasticScheduler
由一個ExecutorService的Queue
來維護。ElasticScheduler
和ParallelScheduler
都有一個pick()
方法,用來選出合適的Worker
。Callable
,從而能夠異步地返回一個Future
給調用者。再回到publishOn
和subscribeOn
方法。
在Reactor中,對於數據流的處理,其實是一系列方法調用和基於事件的回調,包括subscribe
、onSubscribe
、request
,以及onNext
、onError
、onComplete
。拿出2.1節的圖幫助理解:
當調用.subscribe()
方法時,會造成從上游向下遊的數據流,數據流中的元素經過onNext* (onError|onComplete)
攜帶「順流而下」。同時,Reactor使用者看不到的是,還有一條從下游向上遊的「訂閱鏈」,request就是沿着這個鏈向上反饋需求的。
publishOn
方法可以將onNext
、onError
、onComplete
調度到給定的Scheduler
的Worker
上執行。因此如上圖場景中,再.map
和.filter
中間增長一個publisheOn(Schedulers.elastic())
的話,.filter
操做的onNext
的過濾處理將會執行在ElasticScheduler
的某個Worker
上。
subscribeOn
方法可以將subscribe
(會調用onSubscribe
)、request
調度到給定的Scheduler
的Worker
上執行。因此在任何位置增長一個subscribeOn(Schedulers.elastic())
的話,都會藉助自下而上的訂閱鏈,經過subscribe()
方法,將線程執行環境傳遞到「源頭」,從而Flux.just
會執行在ElasticScheduler
上。繼而影響到其後的操做符,直至遇到publishOn
改變了執行環境。
此外,有些操做符自己會須要調度器來進行多線程的處理,當你不明確指定調度器的時候,那些操做符會自行使用內置的單例調度器來執行。例如,Flux.delayElements(Duration)
使用的是 Schedulers.parallel()
調度器對象:
@Test public void testDelayElements() { Flux.range(0, 10) .delayElements(Duration.ofMillis(10)) .log() .blockLast(); }
從輸出能夠看到onNext
運行在不一樣的線程上:
[ INFO] (main) onSubscribe(FluxConcatMap.ConcatMapImmediate) [ INFO] (main) request(unbounded) [ INFO] (parallel-1) onNext(0) [ INFO] (parallel-2) onNext(1) [ INFO] (parallel-3) onNext(2) [ INFO] (parallel-4) onNext(3) ...
在Reactor中,基於Scheduler
的線程調度確實很是簡單好用,可是還有個問題須要解決。
咱們以往在編寫多線程的代碼時,若是涉及到只在線程內部使用的值,可能會使用ThreadLocal
進行包裝。
可是在響應式編程中,因爲線程環境常常發生變化,這一用法就失去做用了,而且甚至帶來bug。好比,使用 Logback 的 MDC 來存儲日誌關聯的 ID 就屬於這種狀況。
自從版本 3.1.0,Reactor 引入了一個相似於 ThreadLocal 的高級功能:Context。它做用於一個 Flux 或一個 Mono 上,而不是應用於一個線程(Thread)。也就是其生命週期伴隨整個數據流,而不是線程。
相對來講,用戶使用Context並很少,對此感興趣或有此需求的話,請看我翻譯的相關文檔,能夠對Reactor內部實現尤爲是Subscription
有更深的理解。
現在多核架構已然普及,可以方便的進行並行處理是很重要的。
對於一些可以在一個線程中順序處理的任務,即便調度到ParallelScheduler上,一般也只由一個Worker來執行,好比:
@Test public void testParallelFlux() throws InterruptedException { Flux.range(1, 10) .publishOn(Schedulers.parallel()) .log().subscribe(); TimeUnit.MILLISECONDS.sleep(10); }
輸出以下:
[ INFO] (main) | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber) [ INFO] (main) | request(unbounded) [ INFO] (parallel-1) | onNext(1) [ INFO] (parallel-1) | onNext(2) [ INFO] (parallel-1) | onNext(3) [ INFO] (parallel-1) | onNext(4) [ INFO] (parallel-1) | onNext(5) [ INFO] (parallel-1) | onNext(6) [ INFO] (parallel-1) | onNext(7) [ INFO] (parallel-1) | onNext(8) [ INFO] (parallel-1) | onNext(9) [ INFO] (parallel-1) | onNext(10) [ INFO] (parallel-1) | onComplete()
有時候,咱們確實須要一些任務可以「均勻」分佈在不一樣的工做線程上執行,這時候就須要用到ParallelFlux
。
你能夠對任何Flux使用parallel()
操做符來獲得一個ParallelFlux
。不過這個操做符自己並不會進行並行處理,而只是將負載劃分到多個執行「軌道」上(默認狀況下,軌道個數與CPU核數相等)。
爲了配置ParallelFlux
如何並行地執行每個軌道,須要使用runOn(Scheduler)
,這裏,Schedulers.parallel() 是比較推薦的專門用於並行處理的調度器。
@Test public void testParallelFlux() throws InterruptedException { Flux.range(1, 10) .parallel(2) .runOn(Schedulers.parallel()) // .publishOn(Schedulers.parallel()) .log() .subscribe(); TimeUnit.MILLISECONDS.sleep(10); }
輸出以下:
[ INFO] (main) onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber) [ INFO] (main) request(unbounded) [ INFO] (main) onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber) [ INFO] (main) request(unbounded) [ INFO] (parallel-1) onNext(1) [ INFO] (parallel-2) onNext(2) [ INFO] (parallel-1) onNext(3) [ INFO] (parallel-2) onNext(4) [ INFO] (parallel-1) onNext(5) [ INFO] (parallel-2) onNext(6) [ INFO] (parallel-1) onNext(7) [ INFO] (parallel-2) onNext(8) [ INFO] (parallel-1) onNext(9) [ INFO] (parallel-2) onNext(10) [ INFO] (parallel-1) onComplete() [ INFO] (parallel-2) onComplete()
能夠看到,各個元素的onNext 「均勻」分佈執行在兩個線程上,最後每一個線程上有獨立的onComplete
事件,這與publishOn
調度到ParallelScheduler上的狀況是不一樣的。