Reactor響應式編程,你只差這個!

哈哈哈哈哈,題目有點猖狂。可是既然你都來了,那就看看吧,畢竟響應式編程隨着高併發對於性能的吃緊,愈來愈重要了。react

哦對了,這是一篇Java文章。編程

廢話很少說,直接步入正題。安全

響應式編程核心組件

步入正題以前,我但願你對發佈者/訂閱者模型有一些瞭解。多線程

直接看圖:
併發

Talk is cheap, show you the code!框架

public class Main {

    public static void main(String[] args) {
        Flux<Integer> flux = Flux.range(0, 10);
        flux.subscribe(i -> {
            System.out.println("run1: " + i);
        });
        flux.subscribe(i -> {
            System.out.println("run2: " + i);
        });
    }
}

輸出:dom

run1: 0
run1: 1
run1: 2
run1: 3
run1: 4
run1: 5
run1: 6
run1: 7
run1: 8
run1: 9
run2: 0
run2: 1
run2: 2
run2: 3
run2: 4
run2: 5
run2: 6
run2: 7
run2: 8
run2: 9

Process finished with exit code 0

Flux

Flux是一個多元素的生產者,言外之意,它能夠生產多個元素,組成元素序列,供訂閱者使用。異步

Mono

Mono和Flux的區別在於,它只能生產一個元素供生產者訂閱,也就是數量的不一樣。ide

Mono的一個常見的應用就是Mono<ServerResponse\>做爲WebFlux的返回值。畢竟每次請求只有一個Response對象,因此Mono剛恰好。高併發

快速建立一個Flux/Mono並訂閱它

來看一些官方文檔演示的方法。

Flux<String> seq1 = Flux.just("foo", "bar", "foobar");

List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);

Mono<String> noData = Mono.empty();

Mono<String> data = Mono.just("foo");

Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);

subscribe()方法(Lambda形式)

  • subscribe()方法默認接受一個Lambda表達式做爲訂閱者來使用。它有四個變種形式。
  • 在這裏說明一下subscribe()第四個參數,指出了當訂閱信號到達,初次請求的個數,若是是null則所有請求(Long.MAX_VALUE)
public class FluxIntegerWithSubscribe {

    public static void main(String[] args) {
        Flux<Integer> integerFlux = Flux.range(0, 10);
        integerFlux.subscribe(i -> {
            System.out.println("run");
            System.out.println(i);
        }, error -> {
            System.out.println("error");
        }, () -> {
            System.out.println("done");
        }, p -> {
            p.request(2);
        });
    }
}

若是去掉初次請求,那麼會請求最大值:

public class FluxIntegerWithSubscribe {

    public static void main(String[] args) {
        Flux<Integer> integerFlux = Flux.range(0, 10);
        // 在這裏說明一下subscribe()第四個參數,指出了當訂閱信號到達,初次請求的個數,若是是null則所有請求(Long.MAX_VALUE)
        // 其他subscribe()詳見源碼或文檔:https://projectreactor.io/docs/core/release/reference/#flux
        integerFlux.subscribe(i -> {
            System.out.println("run");
            System.out.println(i);
        }, error -> {
            System.out.println("error");
        }, () -> {
            System.out.println("done");
        });
    }
}

輸出:

run
0
run
1
run
2
run
3
run
4
run
5
run
6
run
7
run
8
run
9
done

Process finished with exit code 0

繼承BaseSubscriber(非Lambda形式)

  • 這種方式更多像是對於Lambda表達式的一種替換表達。
  • 對於基於此方法的訂閱,有一些注意事項,好比初次訂閱時,要至少請求一次。不然會致使程序沒法繼續得到新的元素。
public class FluxWithBaseSubscriber {

    public static void main(String[] args) {
        Flux<Integer> integerFlux = Flux.range(0, 10);
        integerFlux.subscribe(new MySubscriber());
    }

    /**
     * 通常來講,經過繼承BaseSubscriber<T>來實現,並且通常自定義hookOnSubscribe()和hookOnNext()方法
     */
    private static class MySubscriber extends BaseSubscriber<Integer> {

        /**
         * 初次訂閱時被調用
         */
        @Override
        protected void hookOnSubscribe(Subscription subscription) {
            System.out.println("開始啦!");
            // 記得至少請求一次,不然不會執行hookOnNext()方法
            request(1);
        }

        /**
         * 每次讀取新值調用
         */
        @Override
        protected void hookOnNext(Integer value) {
            System.out.println("開始讀取...");
            System.out.println(value);
            // 指出下一次讀取多少個
            request(2);
        }

        @Override
        protected void hookOnComplete() {
            System.out.println("結束啦");
        }
    }
}

輸出:

開始啦!
開始讀取...
0
開始讀取...
1
開始讀取...
2
開始讀取...
3
開始讀取...
4
開始讀取...
5
開始讀取...
6
開始讀取...
7
開始讀取...
8
開始讀取...
9
結束啦

Process finished with exit code 0

終止訂閱:Disposable

  • Disposable是一個訂閱時返回的接口,裏面包含不少能夠操做訂閱的方法。
  • 好比取消訂閱。

在這裏使用多線程模擬生產者生產的很快,而後立馬取消訂閱(雖然馬上取消可是因爲生產者實在太快了,因此訂閱者仍是接收到了一些元素)。

其餘的方法,好比Disposables.composite()會獲得一個Disposable的集合,調用它的dispose()方法會把集合裏的全部Disposable的dispose()方法都調用。

public class FluxWithDisposable {

    public static void main(String[] args) {
        Disposable disposable = getDis();
        // 每次打印數量通常不一樣,由於調用了disposable的dispose()方法進行了取消,不過若是生產者產地太快了,那麼可能來不及終止。
        disposable.dispose();
    }

    private static Disposable getDis() {
        class Add implements Runnable {

            private final FluxSink<Integer> fluxSink;

            public Add(FluxSink<Integer> fluxSink) {
                this.fluxSink = fluxSink;
            }

            @Override
            public synchronized void run() {
                fluxSink.next(new Random().nextInt());
            }
        }
        Flux<Integer> integerFlux = Flux.create(integerFluxSink -> {
            Add add = new Add(integerFluxSink);
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();
        });
        return integerFlux.subscribe(System.out::println);
    }
}

輸出:

這裏的輸出每次調用可能都會不一樣,由於訂閱以後取消了,因此能打印多少取決於那一瞬間CPU的速度。

調整發布者發佈速率

  • 爲了緩解訂閱者壓力,訂閱者能夠經過負壓流回溯進行重塑發佈者發佈的速率。最典型的用法就是下面這個——經過繼承BaseSubscriber來設置本身的請求速率。可是有一點必須明確,就是hookOnSubscribe()方法必須至少請求一次,否則你的發佈者可能會「卡住」。
public class FluxWithLimitRate1 {

    public static void main(String[] args) {
        Flux<Integer> integerFlux = Flux.range(0, 100);
        integerFlux.subscribe(new MySubscriber());
    }

    private static class MySubscriber extends BaseSubscriber<Integer> {

        @Override
        protected void hookOnSubscribe(Subscription subscription) {
            System.out.println("開始啦!");
            // 記得至少請求一次,不然不會執行hookOnNext()方法
            request(1);
        }

        @Override
        protected void hookOnNext(Integer value) {
            System.out.println("開始讀取...");
            System.out.println(value);
            // 指出下一次讀取多少個
            request(2);
        }

        @Override
        protected void hookOnComplete() {
            System.out.println("結束啦!");
        }
    }
}
  • 或者使用limitRate()實例方法進行限制,它返回一個被限制了速率的Flux或Mono。某些上流的操做能夠更改下流訂閱者的請求速率,有一些操做有一個prefetch整型做爲輸入,能夠獲取大於下流訂閱者請求的數量的序列元素,這樣作是爲了處理它們本身的內部序列。這些預獲取的操做方法通常默認預獲取32個,不過爲了優化;每次已經獲取了預獲取數量的75%的時候,會再獲取75%。這叫「補充優化」。
public class FluxWithLimitRate2 {

    public static void main(String[] args) {
        Flux<Integer> integerFlux = Flux.range(0, 100);
        // 最後,來看一些Flux提供的預獲取方法:
        // 指出預取數量
        integerFlux.limitRate(10);
        // lowTide指出預獲取操做的補充優化的值,即修改75%的默認值;highTide指出預獲取數量。
        integerFlux.limitRate(10, 15);
        // 哎~最典型的就是,請求無數:request(Long.MAX_VALUE)可是我給你limitRate(2);那你也只能乖乖每次獲得兩個哈哈哈哈!
        // 還有一個就是limitRequest(N),它會把下流總請求限制爲N。若是下流請求超過了N,那麼只返回N個,不然返回實際數量。而後認爲請求完成,向下流發送onComplete信號。
        integerFlux.limitRequest(5).subscribe(new MySubscriber());
        // 上面這個只會輸出5個。
    }
}

程序化地建立一個序列

靜態同步方法:generate()

如今到了程序化生成Flux/Mono的時候。首先介紹generate()方法,這是一個同步的方法。言外之意就是,它是線程不安全的,且它的接收器只能一次一個的接受輸入來生成Flux/Mono。也就是說,它在任意時刻只能被調用一次且只接受一個輸入。

或者這麼說,它生成的元素序列的順序,取決於代碼編寫的方式。

public class FluxWithGenerate {

    public static void main(String[] args) {
        // 下面這個是它的變種方法之一:第一個參數是提供初始狀態的,第二個參數是一個向接收器寫入數據的生成器,入參爲state(通常爲整數,用來記錄狀態),和接收器。
        // 其餘變種請看源碼
        Flux.generate(() -> 0, (state, sink) -> {
            sink.next(state+"asdf");
            // 加上對於sink.complete()的調用便可終止生成;不然就是無限序列。
            return state+1;
        }).subscribe(System.out::println);
        // generate方法的第三個參數用於結束生成時被調用,消耗state。
        Flux.generate(AtomicInteger::new, (state, sink) -> {
            sink.next(state.getAndIncrement()+"qwer");
            return state;
        }).subscribe(System.out::println);
        // generate()的工做流看起來就像:next()->next()->next()->...
    }
}
  • 經過上述代碼不難看到,每次的接收器接受的值來自於上一次生成方法的返回值,也就是state=上一個迭代的返回值(其實稱爲上一個流才準確,這麼說只是爲了方便理解)。
  • 不過這個state每次都是一個全新的(每次都+1固然是新的),那麼有沒有什麼方法能夠作到先後兩次迭代的state是同一個引用且還能夠更新值呢?答案就是原子類型。也就是上面的第二種方式。

靜態異步多線程方法:create()

說完了同步生成,接下來就是異步生成,仍是多線程的!讓咱們有請:create()閃亮登場!!!

  • create()方法對外暴露出一個FluxSink對象,經過它咱們能夠訪問並生成須要的序列。除此以外,它還能夠觸發回調中的多線程事件。
  • create另外一特性就是很容易把其餘的接口與響應式橋接起來。注意,它是異步多線程並不意味着create能夠並行化你寫的代碼或者異步執行;怎麼理解呢?就是,create方法裏面的Lambda表達式代碼仍是單線程阻塞的。若是你在建立序列的地方阻塞了代碼,那麼可能形成訂閱者即便請求了數據,也得不到,由於序列被阻塞了,無法生成新的。
  • 其實經過上面的現象能夠猜想,默認狀況下訂閱者使用的線程和create使用的是一個線程,固然阻塞create就會致使訂閱者無法運行咯!
  • 上述問題能夠經過Scheduler解決,後面會提到。
public class FluxWithCreate {

    public static void main(String[] args) throws InterruptedException {
        TestProcessor<String> testProcessor = new TestProcessor<>() {

            private TestListener<String> testListener;

            @Override
            public void register(TestListener<String> stringTestListener) {
                this.testListener = stringTestListener;
            }

            @Override
            public TestListener<String> get() {
                return testListener;
            }
        };
        Flux<String> flux = Flux.create(stringFluxSink -> testProcessor.register(new TestListener<String>() {
            @Override
            public void onChunk(List<String> chunk) {
                for (String s : chunk) {
                    stringFluxSink.next(s);
                }
            }

            @Override
            public void onComplete() {
                stringFluxSink.complete();
            }
        }));
        flux.subscribe(System.out::println);
        System.out.println("如今是2020/10/22 22:58;我好睏");
        TestListener<String> testListener = testProcessor.get();
        Runnable1<String> runnable1 = new Runnable1<>() {

            private TestListener<String> testListener;

            @Override
            public void set(TestListener<String> testListener) {
                this.testListener = testListener;
            }

            @Override
            public void run() {
                List<String> list = new ArrayList<>(10);
                for (int i = 0; i < 10; ++ i) {
                    list.add(i+"-run1");
                }
                testListener.onChunk(list);
            }
        };
        Runnable1<String> runnable2 = new Runnable1<>() {

            private TestListener<String> testListener;

            @Override
            public void set(TestListener<String> testListener) {
                this.testListener = testListener;
            }

            @Override
            public void run() {
                List<String> list = new ArrayList<>(10);
                for (int i = 0; i < 10; ++ i) {
                    list.add(i+"-run2");
                }
                testListener.onChunk(list);
            }
        };
        Runnable1<String> runnable3 = new Runnable1<>() {

            private TestListener<String> testListener;

            @Override
            public void set(TestListener<String> testListener) {
                this.testListener = testListener;
            }

            @Override
            public void run() {
                List<String> list = new ArrayList<>(10);
                for (int i = 0; i < 10; ++ i) {
                    list.add(i+"-run3");
                }
                testListener.onChunk(list);
            }
        };
        runnable1.set(testListener);
        runnable2.set(testListener);
        runnable3.set(testListener);
        // create所謂的"異步","多線程"指的是在多線程中調用sink.next()方法。這一點在下面的push對比中能夠看到
        new Thread(runnable1).start();
        new Thread(runnable2).start();
        new Thread(runnable3).start();
        Thread.sleep(1000);
        testListener.onComplete();
        // 另外一方面,create的另外一個變體能夠設置參數來實現負壓控制,具體看源碼。
    }
    public interface TestListener<T> {

        void onChunk(List<T> chunk);

        void onComplete();
    }

    public interface TestProcessor<T> {

        void register(TestListener<T> tTestListener);

        TestListener<T> get();
    }

    public interface Runnable1<T> extends Runnable {
         void set(TestListener<T> testListener);
    }
}

靜態異步單線程方法:push()

說完了異步多線程,同步的生成方法,接下來就是異步單線程:push()。

其實說到push和create的對比,我我的理解以下:

  • reate容許多線程環境下調用.next()方法,只管生成元素,元素序列的順序取決於...算了,隨機的,畢竟多線程;
  • 可是push只容許一個線程生產元素,因此是有序的,至於異步指的是在新的線程中也能夠,而沒必要非得在當前線程。
  • 順帶一提,push和create都支持onCancel()和onDispose()操做。通常來講,onCancel只響應於cancel操做,而onDispose響應於error,cancel,complete等操做。
public class FluxWithPush {

    public static void main(String[] args) throws InterruptedException {
        TestProcessor<String> testProcessor = new TestProcessor<>() {

            private TestListener<String> testListener;

            @Override
            public void register(TestListener<String> testListener) {
                this.testListener = testListener;
            }

            @Override
            public TestListener<String> get() {
                return this.testListener;
            }
        };
        Flux<String> flux = Flux.push(stringFluxSink -> testProcessor.register(new TestListener<>() {
            @Override
            public void onChunk(List<String> list) {
                for (String s : list) {
                    stringFluxSink.next(s);
                }
            }

            @Override
            public void onComplete() {
                stringFluxSink.complete();
            }
        }));
        flux.subscribe(System.out::println);
        Runnable1<String> runnable = new Runnable1<>() {

            private TestListener<String> testListener;

            @Override
            public void set(TestListener<String> testListener) {
                this.testListener = testListener;
            }

            @Override
            public void run() {
                List<String> list = new ArrayList<>(10);
                for (int i = 0; i < 10; ++i) {
                    list.add(UUID.randomUUID().toString());
                }
                testListener.onChunk(list);
            }
        };
        TestListener<String> testListener = testProcessor.get();
        runnable.set(testListener);
        new Thread(runnable).start();
        Thread.sleep(15);
        testListener.onComplete();
    }

    public interface TestListener<T> {
        void onChunk(List<T> list);
        void onComplete();
    }

    public interface TestProcessor<T> {
        void register(TestListener<T> testListener);
        TestListener<T> get();
    }

    public interface Runnable1<T> extends Runnable {
        void set(TestListener<T> testListener);
    }
}

同create同樣,push也支持負壓調節。可是我沒寫出來,我試過的Demo都是直接請求Long.MAX_VALUE,其實就是經過sink.onRequest(LongConsumer)方法調用來實現負壓控制的。原理在這,想深究的請自行探索,鄙人不才,花費一下午沒實現。

實例方法:handle()

在Flux的實例方法裏,handle相似filter和map的操做。

public class FluxWithHandle {

    public static void main(String[] args) {
        Flux<String> stringFlux = Flux.push(stringFluxSink -> {
            for (int i = 0; i < 10; ++ i) {
                stringFluxSink.next(UUID.randomUUID().toString().substring(0, 5));
            }
        });
        // 獲取全部包含'a'的串
        Flux<String> flux = stringFlux.handle((str, sink) -> {
            String s = f(str);
            if (s != null) {
                sink.next(s);
            }
        });
        flux.subscribe(System.out::println);
    }

    private static String f(String str) {
        return str.contains("a") ? str : null;
    }
}

線程和調度

Schedulers的那些靜態方法

通常來講,響應式框架都不支持併發,P.s. create那個是生產者併發,它自己不是併發的。因此也沒有可用的併發庫,須要開發者本身實現。

同時,每個操做通常都是在上一個操做所在的線程裏運行,它們不會擁有本身的線程,而最頂的操做則是和subscribe()在同一個線程。好比Flux.create(...).handle(...).subscribe(...)都在主線程運行的。

在響應式框架裏,Scheduler決定了操做在哪一個線程被怎麼執行,它的做用相似於ExecutorService。不過功能稍微多點。若是你想實現一些併發操做,那麼能夠考慮使用Schedulers提供的靜態方法,來看看有哪些可用的:

Schedulers.immediate(): 直接在當前線程提交Runnable任務,並當即執行。

package com.learn.reactor.flux;

import reactor.core.scheduler.Schedulers;

/**
 * @author Mr.M
 */
public class FluxWithSchedulers {

    public static void main(String[] args) throws InterruptedException {
        // Schedulers.immediate(): 直接在當前線程提交Runnable任務,並當即執行。
        System.out.println("當前線程:" + Thread.currentThread().getName());
        System.out.println("zxcv");
        Schedulers.immediate().schedule(() -> {
            System.out.println("當前線程是:" + Thread.currentThread().getName());
            System.out.println("qwer");
        });
        System.out.println("asdf");
        // 確保異步任務能夠打印出來
        Thread.sleep(1000);
    }
}

經過上面看得出,immediate()其實就是在執行位置插入須要執行的Runnable來實現的。和直接把代碼寫在這裏沒什麼區別。

Schedulers.newSingle():保證每次執行的操做都使用的是一個新的線程。

package com.learn.reactor.flux;

import reactor.core.scheduler.Schedulers;

/**
 * @author Mr.M
 */
public class FluxWithSchedulers {

    public static void main(String[] args) throws InterruptedException {
        // 若是你想讓每次調用都是一個新的線程的話,可使用Schedulers.newSingle(),它能夠保證每次執行的操做都使用的是一個新的線程。
        Schedulers.single().schedule(() -> {
            System.out.println("當前線程是:" + Thread.currentThread().getName());
            System.out.println("bnmp");
        });
        Schedulers.single().schedule(() -> {
            System.out.println("當前線程是:" + Thread.currentThread().getName());
            System.out.println("ghjk");
        });
        Schedulers.newSingle("線程1").schedule(() -> {
            System.out.println("當前線程是:" + Thread.currentThread().getName());
            System.out.println("1234");
        });
        Schedulers.newSingle("線程1").schedule(() -> {
            System.out.println("當前線程是:" + Thread.currentThread().getName());
            System.out.println("5678");
        });
        Schedulers.newSingle("線程2").schedule(() -> {
            System.out.println("當前線程是:" + Thread.currentThread().getName());
            System.out.println("0100");
        });
        Thread.sleep(1000);
    }
}

Schedulers.single(),它的做用是爲當前操做開闢一個新的線程,可是記住,全部使用這個方法的操做都共用一個線程;

Schedulers.elastic():一個彈性無界線程池。

無界通常意味着不可管理,由於它可能會致使負壓問題和過多的線程被建立。因此立刻就要提到它的替代方法。

Schedulers.bounededElastic():有界可複用線程池

package com.learn.reactor.flux;

import reactor.core.scheduler.Schedulers;

/**
 * @author Mr.M
 */
public class FluxWithSchedulers {

    public static void main(String[] args) throws InterruptedException {
        Schedulers.boundedElastic().schedule(() -> {
            System.out.println("當前線程是:" + Thread.currentThread().getName());
            System.out.println("1478");
        });
        Schedulers.boundedElastic().schedule(() -> {
            System.out.println("當前線程是:" + Thread.currentThread().getName());
            System.out.println("2589");
        });
        Schedulers.boundedElastic().schedule(() -> {
            System.out.println("當前線程是:" + Thread.currentThread().getName());
            System.out.println("0363");
        });
        Thread.sleep(1000);
    }
}

Schedulers.boundedElastic()是一個更好的選擇,由於它能夠在須要的時候建立工做線程池,並複用空閒的池;同時,某些池若是空閒時間超過一個限定的數值就會被拋棄。

同時,它還有一個容量限制,通常10倍於CPU核心數,這是它後備線程池的最大容量。最多提交10萬條任務,而後會被裝進任務隊列,等到有可用時再調度,若是是延時調度,那麼延時開始時間是在有線程可用時纔開始計算。

因而可知Schedulers.boundedElastic()對於阻塞的I/O操做是一個不錯的選擇,由於它可讓每個操做都有本身的線程。可是記得,太多的線程會讓系統備受壓力。

Schedulers.parallel():提供了系統級並行的能力

package com.learn.reactor.flux;

import reactor.core.scheduler.Schedulers;

/**
 * @author Mr.M
 */
public class FluxWithSchedulers {

    public static void main(String[] args) throws InterruptedException {
        Schedulers.parallel().schedule(() -> {
            System.out.println("當前線程是:" + Thread.currentThread().getName());
            System.out.println("6541");
        });
        Schedulers.parallel().schedule(() -> {
            System.out.println("當前線程是:" + Thread.currentThread().getName());
            System.out.println("9874");
        });
        Thread.sleep(1000);
    }
}

最後,Schedulers.parallel()提供了並行的能力,它會建立數量等於CPU核心數的線程來實現這一功能。

其餘線程操做

順帶一提,還能夠經過ExecutorService建立新的Scheduler。固然,Schedulers的一堆newXXX方法也能夠。

有一點很重要,就是boundedElastic()方法能夠適用於傳統阻塞式代碼,可是single()和parallel()都不行,若是你非要這麼作那就會拋異常。自定義Schedulers能夠經過設置ThreadFactory屬性來設置接收的線程是不是被NonBlocking接口修飾的Thread實例。

Flux的某些方法會使用默認的Scheduler,好比Flux.interval()方法就默認使用Schedulers.parallel()方法,固然能夠經過設置Scheduler來更改這種默認。

在響應式鏈中,有兩種方式能夠切換執行上下文,分別是publishOn()和subscribeOn()方法,前者在流式鏈中的位置很重要。在Reactor中,能夠以任意形式添加任意數量的訂閱者來知足你的需求,可是,只有在設置了訂閱方法後,才能激活這條訂閱鏈上的所有對象。只有這樣,請求才會上溯到發佈者,進而產生源序列。

在訂閱鏈中切換執行上下文

publishOn()

publishOn()就和普通操做同樣,添加在操做鏈的中間,它會影響在它下面的全部操做的執行上下文。看個例子:

public class FluxWithPublishOnSubscribeOn {

    public static void main(String[] args) throws InterruptedException {
        // 建立一個並行線程
        Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
        final Flux<String> flux = Flux
                .range(1, 2)
                // map確定是跑在T上的。
                .map(i -> 10 + i)
                // 此時的執行上下文被切換到了並行線程
                .publishOn(s)
                // 這個map仍是跑在並行線程上的,由於publishOn()的後面的操做都被切換到了另外一個執行上下文中。
                .map(i -> "value " + i);
        // 假設這個new出來的線程名爲T
        new Thread(() -> flux.subscribe(System.out::println));
        Thread.sleep(1000);
    }
}

subscribeOn()

public class FluxWithPublishOnSubscribeOn {

    public static void main(String[] args) throws InterruptedException {
        // 依舊是建立一個並行線程
        Scheduler ss = Schedulers.newParallel("parallel-scheduler", 4);
        final Flux<String> fluxflux = Flux
                .range(1, 2)
                // 不過這裏的map就已經在ss裏跑了
                .map(i -> 10 + i)
                // 這裏切換,可是切換的是整個鏈
                .subscribeOn(s)
                // 這裏的map也運行在ss上
                .map(i -> "value " + i);
        // 這是一個匿名線程TT
        new Thread(() -> fluxflux.subscribe(System.out::println));
        Thread.sleep(1000);
    }
}

subscribeOn()方法會把訂閱以後的整個訂閱鏈都切換到新的執行上下文中。不管在subscribeOn()哪裏,均可以把最前面的訂閱以後的訂閱序列進行切換,固然了,若是後面還有publishOn(),publishOn()會進行新的切換。

相關文章
相關標籤/搜索