今天咱們要介紹的是Reactor中的多線程模型和定時器模型,Reactor以前咱們已經介紹過了,它其實是觀察者模式的延伸。java
因此從本質上來講,Reactor是和多線程無關的。你能夠把它用在多線程或者不用在多線程。react
今天將會給你們介紹一下如何在Reactor中使用多線程和定時器模型。git
先看一下以前舉的Flux的建立的例子:github
Flux<String> flux = Flux.generate( () -> 0, (state, sink) -> { sink.next("3 x " + state + " = " + 3*state); if (state == 10) sink.complete(); return state + 1; }); flux.subscribe(System.out::println);
能夠看到,不論是Flux generator仍是subscriber,他們實際上都是運行在同一個線程中的。多線程
若是咱們想讓subscribe發生在一個新的線程中,咱們須要新啓動一個線程,而後在線程內部進行subscribe操做。工具
Mono<String> mono = Mono.just("hello "); Thread t = new Thread(() -> mono .map(msg -> msg + "thread ") .subscribe(v -> System.out.println(v + Thread.currentThread().getName()) ) ); t.start(); t.join();
上面的例子中,Mono在主線程中建立,而subscribe發生在新啓動的Thread中。線程
不少狀況下,咱們的publisher是須要定時去調用一些方法,來產生元素的。Reactor提供了一個新的Schedule類來負責定時任務的生成和管理。code
Scheduler是一個接口:教程
public interface Scheduler extends Disposable
它定義了一些定時器中必需要實現的方法:接口
好比當即執行的:
Disposable schedule(Runnable task);
延時執行的:
default Disposable schedule(Runnable task, long delay, TimeUnit unit)
和按期執行的:
default Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit)
Schedule有一個工具類叫作Schedules,它提供了多個建立Scheduler的方法,它的本質就是對ExecutorService和ScheduledExecutorService進行封裝,將其作爲Supplier來建立Schedule。
簡單點看Schedule就是對ExecutorService的封裝。
Schedulers工具類提供了不少個有用的工具類,咱們來詳細介紹一下:
Schedulers.immediate():
提交的Runnable將會立馬在當前線程執行。
Schedulers.single():
使用同一個線程來執行全部的任務。
Schedulers.boundedElastic():
建立一個可重用的線程池,若是線程池中的線程在長時間內都沒有被使用,那麼將會被回收。boundedElastic會有一個最大的線程個數,通常來講是CPU cores x 10。 若是目前沒有可用的worker線程,提交的任務將會被放入隊列等待。
Schedulers.parallel():
建立固定個數的工做線程,個數和CPU的核數相關。
Schedulers.fromExecutorService(ExecutorService):
從一個現有的線程池建立Scheduler。
Schedulers.newXXX:
Schedulers提供了不少new開頭的方法,來建立各類各樣的Scheduler。
咱們看一個Schedulers的具體應用,咱們能夠指定特定的Scheduler來產生元素:
Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))
publishOn和subscribeOn主要用來進行切換Scheduler的執行上下文。
先講一個結論,就是在鏈式調用中,publishOn能夠切換Scheduler,可是subscribeOn並不會起做用。
這是由於真正的publish-subscribe關係只有在subscriber開始subscribe的時候才創建。
下面咱們來具體看一下這兩個方法的使用狀況:
publishOn能夠在鏈式調用的過程當中,進行publish的切換:
@Test public void usePublishOn() throws InterruptedException { Scheduler s = Schedulers.newParallel("parallel-scheduler", 4); final Flux<String> flux = Flux .range(1, 2) .map(i -> 10 + i + ":"+ Thread.currentThread()) .publishOn(s) .map(i -> "value " + i+":"+ Thread.currentThread()); new Thread(() -> flux.subscribe(System.out::println),"ThreadA").start(); System.out.println(Thread.currentThread()); Thread.sleep(5000); }
上面咱們建立了一個名字爲parallel-scheduler的scheduler。
而後建立了一個Flux,Flux先作了一個map操做,而後切換執行上下文到parallel-scheduler,最後右執行了一次map操做。
最後,咱們採用一個新的線程來進行subscribe的輸出。
先看下輸出結果:
Thread[main,5,main] value 11:Thread[ThreadA,5,main]:Thread[parallel-scheduler-1,5,main] value 12:Thread[ThreadA,5,main]:Thread[parallel-scheduler-1,5,main]
能夠看到,主線程的名字是Thread。Subscriber線程的名字是ThreadA。
那麼在publishOn以前,map使用的線程就是ThreadA。 而在publishOn以後,map使用的線程就切換到了parallel-scheduler線程池。
subscribeOn是用來切換Subscriber的執行上下文,無論subscribeOn出如今調用鏈的哪一個部分,最終都會應用到整個調用鏈上。
咱們看一個例子:
@Test public void useSubscribeOn() throws InterruptedException { Scheduler s = Schedulers.newParallel("parallel-scheduler", 4); final Flux<String> flux = Flux .range(1, 2) .map(i -> 10 + i + ":" + Thread.currentThread()) .subscribeOn(s) .map(i -> "value " + i + ":"+ Thread.currentThread()); new Thread(() -> flux.subscribe(System.out::println), "ThreadA").start(); Thread.sleep(5000); }
一樣的,上面的例子中,咱們使用了兩個map,而後在兩個map中使用了一個subscribeOn用來切換subscribe執行上下文。
看下輸出結果:
value 11:Thread[parallel-scheduler-1,5,main]:Thread[parallel-scheduler-1,5,main] value 12:Thread[parallel-scheduler-1,5,main]:Thread[parallel-scheduler-1,5,main]
能夠看到,無論哪一個map,都是用的是切換過的parallel-scheduler。
本文的例子learn-reactive
本文做者:flydean程序那些事本文連接:http://www.flydean.com/reactor-thread-scheduler/
本文來源:flydean的博客
歡迎關注個人公衆號:「程序那些事」最通俗的解讀,最深入的乾貨,最簡潔的教程,衆多你不知道的小技巧等你來發現!