哈哈哈哈哈,題目有點猖狂。可是既然你都來了,那就看看吧,畢竟響應式編程隨着高併發對於性能的吃緊,愈來愈重要了。react
哦對了,這是一篇Java文章。編程
廢話很少說,直接步入正題。安全
步入正題以前,我但願你對發佈者/訂閱者模型有一些瞭解。多線程
直接看圖:併發
Talk is cheap, show you the code!框架
public class Main { public static void main(String[] args) { Flux<Integer> flux = Flux.range(0, 10); flux.subscribe(i -> { System.out.println("run1: " + i); }); flux.subscribe(i -> { System.out.println("run2: " + i); }); } }
輸出:dom
run1: 0 run1: 1 run1: 2 run1: 3 run1: 4 run1: 5 run1: 6 run1: 7 run1: 8 run1: 9 run2: 0 run2: 1 run2: 2 run2: 3 run2: 4 run2: 5 run2: 6 run2: 7 run2: 8 run2: 9 Process finished with exit code 0
Flux是一個多元素的生產者,言外之意,它能夠生產多個元素,組成元素序列,供訂閱者使用。異步
Mono和Flux的區別在於,它只能生產一個元素供生產者訂閱,也就是數量的不一樣。ide
Mono的一個常見的應用就是Mono<ServerResponse\>做爲WebFlux的返回值。畢竟每次請求只有一個Response對象,因此Mono剛恰好。高併發
來看一些官方文檔演示的方法。
Flux<String> seq1 = Flux.just("foo", "bar", "foobar"); List<String> iterable = Arrays.asList("foo", "bar", "foobar"); Flux<String> seq2 = Flux.fromIterable(iterable); Mono<String> noData = Mono.empty(); Mono<String> data = Mono.just("foo"); Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);
public class FluxIntegerWithSubscribe { public static void main(String[] args) { Flux<Integer> integerFlux = Flux.range(0, 10); integerFlux.subscribe(i -> { System.out.println("run"); System.out.println(i); }, error -> { System.out.println("error"); }, () -> { System.out.println("done"); }, p -> { p.request(2); }); } }
若是去掉初次請求,那麼會請求最大值:
public class FluxIntegerWithSubscribe { public static void main(String[] args) { Flux<Integer> integerFlux = Flux.range(0, 10); // 在這裏說明一下subscribe()第四個參數,指出了當訂閱信號到達,初次請求的個數,若是是null則所有請求(Long.MAX_VALUE) // 其他subscribe()詳見源碼或文檔:https://projectreactor.io/docs/core/release/reference/#flux integerFlux.subscribe(i -> { System.out.println("run"); System.out.println(i); }, error -> { System.out.println("error"); }, () -> { System.out.println("done"); }); } }
輸出:
run 0 run 1 run 2 run 3 run 4 run 5 run 6 run 7 run 8 run 9 done Process finished with exit code 0
public class FluxWithBaseSubscriber { public static void main(String[] args) { Flux<Integer> integerFlux = Flux.range(0, 10); integerFlux.subscribe(new MySubscriber()); } /** * 通常來講,經過繼承BaseSubscriber<T>來實現,並且通常自定義hookOnSubscribe()和hookOnNext()方法 */ private static class MySubscriber extends BaseSubscriber<Integer> { /** * 初次訂閱時被調用 */ @Override protected void hookOnSubscribe(Subscription subscription) { System.out.println("開始啦!"); // 記得至少請求一次,不然不會執行hookOnNext()方法 request(1); } /** * 每次讀取新值調用 */ @Override protected void hookOnNext(Integer value) { System.out.println("開始讀取..."); System.out.println(value); // 指出下一次讀取多少個 request(2); } @Override protected void hookOnComplete() { System.out.println("結束啦"); } } }
輸出:
開始啦! 開始讀取... 0 開始讀取... 1 開始讀取... 2 開始讀取... 3 開始讀取... 4 開始讀取... 5 開始讀取... 6 開始讀取... 7 開始讀取... 8 開始讀取... 9 結束啦 Process finished with exit code 0
在這裏使用多線程模擬生產者生產的很快,而後立馬取消訂閱(雖然馬上取消可是因爲生產者實在太快了,因此訂閱者仍是接收到了一些元素)。
其餘的方法,好比Disposables.composite()會獲得一個Disposable的集合,調用它的dispose()方法會把集合裏的全部Disposable的dispose()方法都調用。
public class FluxWithDisposable { public static void main(String[] args) { Disposable disposable = getDis(); // 每次打印數量通常不一樣,由於調用了disposable的dispose()方法進行了取消,不過若是生產者產地太快了,那麼可能來不及終止。 disposable.dispose(); } private static Disposable getDis() { class Add implements Runnable { private final FluxSink<Integer> fluxSink; public Add(FluxSink<Integer> fluxSink) { this.fluxSink = fluxSink; } @Override public synchronized void run() { fluxSink.next(new Random().nextInt()); } } Flux<Integer> integerFlux = Flux.create(integerFluxSink -> { Add add = new Add(integerFluxSink); new Thread(add).start(); new Thread(add).start(); new Thread(add).start(); new Thread(add).start(); new Thread(add).start(); new Thread(add).start(); new Thread(add).start(); new Thread(add).start(); new Thread(add).start(); new Thread(add).start(); new Thread(add).start(); }); return integerFlux.subscribe(System.out::println); } }
輸出:
這裏的輸出每次調用可能都會不一樣,由於訂閱以後取消了,因此能打印多少取決於那一瞬間CPU的速度。
public class FluxWithLimitRate1 { public static void main(String[] args) { Flux<Integer> integerFlux = Flux.range(0, 100); integerFlux.subscribe(new MySubscriber()); } private static class MySubscriber extends BaseSubscriber<Integer> { @Override protected void hookOnSubscribe(Subscription subscription) { System.out.println("開始啦!"); // 記得至少請求一次,不然不會執行hookOnNext()方法 request(1); } @Override protected void hookOnNext(Integer value) { System.out.println("開始讀取..."); System.out.println(value); // 指出下一次讀取多少個 request(2); } @Override protected void hookOnComplete() { System.out.println("結束啦!"); } } }
public class FluxWithLimitRate2 { public static void main(String[] args) { Flux<Integer> integerFlux = Flux.range(0, 100); // 最後,來看一些Flux提供的預獲取方法: // 指出預取數量 integerFlux.limitRate(10); // lowTide指出預獲取操做的補充優化的值,即修改75%的默認值;highTide指出預獲取數量。 integerFlux.limitRate(10, 15); // 哎~最典型的就是,請求無數:request(Long.MAX_VALUE)可是我給你limitRate(2);那你也只能乖乖每次獲得兩個哈哈哈哈! // 還有一個就是limitRequest(N),它會把下流總請求限制爲N。若是下流請求超過了N,那麼只返回N個,不然返回實際數量。而後認爲請求完成,向下流發送onComplete信號。 integerFlux.limitRequest(5).subscribe(new MySubscriber()); // 上面這個只會輸出5個。 } }
如今到了程序化生成Flux/Mono的時候。首先介紹generate()方法,這是一個同步的方法。言外之意就是,它是線程不安全的,且它的接收器只能一次一個的接受輸入來生成Flux/Mono。也就是說,它在任意時刻只能被調用一次且只接受一個輸入。
或者這麼說,它生成的元素序列的順序,取決於代碼編寫的方式。
public class FluxWithGenerate { public static void main(String[] args) { // 下面這個是它的變種方法之一:第一個參數是提供初始狀態的,第二個參數是一個向接收器寫入數據的生成器,入參爲state(通常爲整數,用來記錄狀態),和接收器。 // 其餘變種請看源碼 Flux.generate(() -> 0, (state, sink) -> { sink.next(state+"asdf"); // 加上對於sink.complete()的調用便可終止生成;不然就是無限序列。 return state+1; }).subscribe(System.out::println); // generate方法的第三個參數用於結束生成時被調用,消耗state。 Flux.generate(AtomicInteger::new, (state, sink) -> { sink.next(state.getAndIncrement()+"qwer"); return state; }).subscribe(System.out::println); // generate()的工做流看起來就像:next()->next()->next()->... } }
說完了同步生成,接下來就是異步生成,仍是多線程的!讓咱們有請:create()閃亮登場!!!
public class FluxWithCreate { public static void main(String[] args) throws InterruptedException { TestProcessor<String> testProcessor = new TestProcessor<>() { private TestListener<String> testListener; @Override public void register(TestListener<String> stringTestListener) { this.testListener = stringTestListener; } @Override public TestListener<String> get() { return testListener; } }; Flux<String> flux = Flux.create(stringFluxSink -> testProcessor.register(new TestListener<String>() { @Override public void onChunk(List<String> chunk) { for (String s : chunk) { stringFluxSink.next(s); } } @Override public void onComplete() { stringFluxSink.complete(); } })); flux.subscribe(System.out::println); System.out.println("如今是2020/10/22 22:58;我好睏"); TestListener<String> testListener = testProcessor.get(); Runnable1<String> runnable1 = new Runnable1<>() { private TestListener<String> testListener; @Override public void set(TestListener<String> testListener) { this.testListener = testListener; } @Override public void run() { List<String> list = new ArrayList<>(10); for (int i = 0; i < 10; ++ i) { list.add(i+"-run1"); } testListener.onChunk(list); } }; Runnable1<String> runnable2 = new Runnable1<>() { private TestListener<String> testListener; @Override public void set(TestListener<String> testListener) { this.testListener = testListener; } @Override public void run() { List<String> list = new ArrayList<>(10); for (int i = 0; i < 10; ++ i) { list.add(i+"-run2"); } testListener.onChunk(list); } }; Runnable1<String> runnable3 = new Runnable1<>() { private TestListener<String> testListener; @Override public void set(TestListener<String> testListener) { this.testListener = testListener; } @Override public void run() { List<String> list = new ArrayList<>(10); for (int i = 0; i < 10; ++ i) { list.add(i+"-run3"); } testListener.onChunk(list); } }; runnable1.set(testListener); runnable2.set(testListener); runnable3.set(testListener); // create所謂的"異步","多線程"指的是在多線程中調用sink.next()方法。這一點在下面的push對比中能夠看到 new Thread(runnable1).start(); new Thread(runnable2).start(); new Thread(runnable3).start(); Thread.sleep(1000); testListener.onComplete(); // 另外一方面,create的另外一個變體能夠設置參數來實現負壓控制,具體看源碼。 } public interface TestListener<T> { void onChunk(List<T> chunk); void onComplete(); } public interface TestProcessor<T> { void register(TestListener<T> tTestListener); TestListener<T> get(); } public interface Runnable1<T> extends Runnable { void set(TestListener<T> testListener); } }
說完了異步多線程,同步的生成方法,接下來就是異步單線程:push()。
其實說到push和create的對比,我我的理解以下:
public class FluxWithPush { public static void main(String[] args) throws InterruptedException { TestProcessor<String> testProcessor = new TestProcessor<>() { private TestListener<String> testListener; @Override public void register(TestListener<String> testListener) { this.testListener = testListener; } @Override public TestListener<String> get() { return this.testListener; } }; Flux<String> flux = Flux.push(stringFluxSink -> testProcessor.register(new TestListener<>() { @Override public void onChunk(List<String> list) { for (String s : list) { stringFluxSink.next(s); } } @Override public void onComplete() { stringFluxSink.complete(); } })); flux.subscribe(System.out::println); Runnable1<String> runnable = new Runnable1<>() { private TestListener<String> testListener; @Override public void set(TestListener<String> testListener) { this.testListener = testListener; } @Override public void run() { List<String> list = new ArrayList<>(10); for (int i = 0; i < 10; ++i) { list.add(UUID.randomUUID().toString()); } testListener.onChunk(list); } }; TestListener<String> testListener = testProcessor.get(); runnable.set(testListener); new Thread(runnable).start(); Thread.sleep(15); testListener.onComplete(); } public interface TestListener<T> { void onChunk(List<T> list); void onComplete(); } public interface TestProcessor<T> { void register(TestListener<T> testListener); TestListener<T> get(); } public interface Runnable1<T> extends Runnable { void set(TestListener<T> testListener); } }
同create同樣,push也支持負壓調節。可是我沒寫出來,我試過的Demo都是直接請求Long.MAX_VALUE,其實就是經過sink.onRequest(LongConsumer)方法調用來實現負壓控制的。原理在這,想深究的請自行探索,鄙人不才,花費一下午沒實現。
在Flux的實例方法裏,handle相似filter和map的操做。
public class FluxWithHandle { public static void main(String[] args) { Flux<String> stringFlux = Flux.push(stringFluxSink -> { for (int i = 0; i < 10; ++ i) { stringFluxSink.next(UUID.randomUUID().toString().substring(0, 5)); } }); // 獲取全部包含'a'的串 Flux<String> flux = stringFlux.handle((str, sink) -> { String s = f(str); if (s != null) { sink.next(s); } }); flux.subscribe(System.out::println); } private static String f(String str) { return str.contains("a") ? str : null; } }
通常來講,響應式框架都不支持併發,P.s. create那個是生產者併發,它自己不是併發的。因此也沒有可用的併發庫,須要開發者本身實現。
同時,每個操做通常都是在上一個操做所在的線程裏運行,它們不會擁有本身的線程,而最頂的操做則是和subscribe()在同一個線程。好比Flux.create(...).handle(...).subscribe(...)都在主線程運行的。
在響應式框架裏,Scheduler決定了操做在哪一個線程被怎麼執行,它的做用相似於ExecutorService。不過功能稍微多點。若是你想實現一些併發操做,那麼能夠考慮使用Schedulers提供的靜態方法,來看看有哪些可用的:
package com.learn.reactor.flux; import reactor.core.scheduler.Schedulers; /** * @author Mr.M */ public class FluxWithSchedulers { public static void main(String[] args) throws InterruptedException { // Schedulers.immediate(): 直接在當前線程提交Runnable任務,並當即執行。 System.out.println("當前線程:" + Thread.currentThread().getName()); System.out.println("zxcv"); Schedulers.immediate().schedule(() -> { System.out.println("當前線程是:" + Thread.currentThread().getName()); System.out.println("qwer"); }); System.out.println("asdf"); // 確保異步任務能夠打印出來 Thread.sleep(1000); } }
經過上面看得出,immediate()其實就是在執行位置插入須要執行的Runnable來實現的。和直接把代碼寫在這裏沒什麼區別。
package com.learn.reactor.flux; import reactor.core.scheduler.Schedulers; /** * @author Mr.M */ public class FluxWithSchedulers { public static void main(String[] args) throws InterruptedException { // 若是你想讓每次調用都是一個新的線程的話,可使用Schedulers.newSingle(),它能夠保證每次執行的操做都使用的是一個新的線程。 Schedulers.single().schedule(() -> { System.out.println("當前線程是:" + Thread.currentThread().getName()); System.out.println("bnmp"); }); Schedulers.single().schedule(() -> { System.out.println("當前線程是:" + Thread.currentThread().getName()); System.out.println("ghjk"); }); Schedulers.newSingle("線程1").schedule(() -> { System.out.println("當前線程是:" + Thread.currentThread().getName()); System.out.println("1234"); }); Schedulers.newSingle("線程1").schedule(() -> { System.out.println("當前線程是:" + Thread.currentThread().getName()); System.out.println("5678"); }); Schedulers.newSingle("線程2").schedule(() -> { System.out.println("當前線程是:" + Thread.currentThread().getName()); System.out.println("0100"); }); Thread.sleep(1000); } }
Schedulers.single(),它的做用是爲當前操做開闢一個新的線程,可是記住,全部使用這個方法的操做都共用一個線程;
無界通常意味着不可管理,由於它可能會致使負壓問題和過多的線程被建立。因此立刻就要提到它的替代方法。
package com.learn.reactor.flux; import reactor.core.scheduler.Schedulers; /** * @author Mr.M */ public class FluxWithSchedulers { public static void main(String[] args) throws InterruptedException { Schedulers.boundedElastic().schedule(() -> { System.out.println("當前線程是:" + Thread.currentThread().getName()); System.out.println("1478"); }); Schedulers.boundedElastic().schedule(() -> { System.out.println("當前線程是:" + Thread.currentThread().getName()); System.out.println("2589"); }); Schedulers.boundedElastic().schedule(() -> { System.out.println("當前線程是:" + Thread.currentThread().getName()); System.out.println("0363"); }); Thread.sleep(1000); } }
Schedulers.boundedElastic()是一個更好的選擇,由於它能夠在須要的時候建立工做線程池,並複用空閒的池;同時,某些池若是空閒時間超過一個限定的數值就會被拋棄。
同時,它還有一個容量限制,通常10倍於CPU核心數,這是它後備線程池的最大容量。最多提交10萬條任務,而後會被裝進任務隊列,等到有可用時再調度,若是是延時調度,那麼延時開始時間是在有線程可用時纔開始計算。
因而可知Schedulers.boundedElastic()對於阻塞的I/O操做是一個不錯的選擇,由於它可讓每個操做都有本身的線程。可是記得,太多的線程會讓系統備受壓力。
package com.learn.reactor.flux; import reactor.core.scheduler.Schedulers; /** * @author Mr.M */ public class FluxWithSchedulers { public static void main(String[] args) throws InterruptedException { Schedulers.parallel().schedule(() -> { System.out.println("當前線程是:" + Thread.currentThread().getName()); System.out.println("6541"); }); Schedulers.parallel().schedule(() -> { System.out.println("當前線程是:" + Thread.currentThread().getName()); System.out.println("9874"); }); Thread.sleep(1000); } }
最後,Schedulers.parallel()提供了並行的能力,它會建立數量等於CPU核心數的線程來實現這一功能。
順帶一提,還能夠經過ExecutorService建立新的Scheduler。固然,Schedulers的一堆newXXX方法也能夠。
有一點很重要,就是boundedElastic()方法能夠適用於傳統阻塞式代碼,可是single()和parallel()都不行,若是你非要這麼作那就會拋異常。自定義Schedulers能夠經過設置ThreadFactory屬性來設置接收的線程是不是被NonBlocking接口修飾的Thread實例。
Flux的某些方法會使用默認的Scheduler,好比Flux.interval()方法就默認使用Schedulers.parallel()方法,固然能夠經過設置Scheduler來更改這種默認。
在響應式鏈中,有兩種方式能夠切換執行上下文,分別是publishOn()和subscribeOn()方法,前者在流式鏈中的位置很重要。在Reactor中,能夠以任意形式添加任意數量的訂閱者來知足你的需求,可是,只有在設置了訂閱方法後,才能激活這條訂閱鏈上的所有對象。只有這樣,請求才會上溯到發佈者,進而產生源序列。
publishOn()就和普通操做同樣,添加在操做鏈的中間,它會影響在它下面的全部操做的執行上下文。看個例子:
public class FluxWithPublishOnSubscribeOn { public static void main(String[] args) throws InterruptedException { // 建立一個並行線程 Scheduler s = Schedulers.newParallel("parallel-scheduler", 4); final Flux<String> flux = Flux .range(1, 2) // map確定是跑在T上的。 .map(i -> 10 + i) // 此時的執行上下文被切換到了並行線程 .publishOn(s) // 這個map仍是跑在並行線程上的,由於publishOn()的後面的操做都被切換到了另外一個執行上下文中。 .map(i -> "value " + i); // 假設這個new出來的線程名爲T new Thread(() -> flux.subscribe(System.out::println)); Thread.sleep(1000); } }
public class FluxWithPublishOnSubscribeOn { public static void main(String[] args) throws InterruptedException { // 依舊是建立一個並行線程 Scheduler ss = Schedulers.newParallel("parallel-scheduler", 4); final Flux<String> fluxflux = Flux .range(1, 2) // 不過這裏的map就已經在ss裏跑了 .map(i -> 10 + i) // 這裏切換,可是切換的是整個鏈 .subscribeOn(s) // 這裏的map也運行在ss上 .map(i -> "value " + i); // 這是一個匿名線程TT new Thread(() -> fluxflux.subscribe(System.out::println)); Thread.sleep(1000); } }
subscribeOn()方法會把訂閱以後的整個訂閱鏈都切換到新的執行上下文中。不管在subscribeOn()哪裏,均可以把最前面的訂閱以後的訂閱序列進行切換,固然了,若是後面還有publishOn(),publishOn()會進行新的切換。