本文主要研究下reactive streams的backpressurereact
@Test public void testShowReactiveStreams() throws InterruptedException { Flux.interval(Duration.ofMillis(1000)) .take(500) .subscribe(e -> LOGGER.info("get {}",e)); Thread.sleep(5*60*1000); }
輸出實例以下:框架
18:52:34.118 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 18:52:35.157 [parallel-2] INFO com.example.demo.FluxTest - get 0 18:52:36.156 [parallel-2] INFO com.example.demo.FluxTest - get 1 18:52:37.156 [parallel-2] INFO com.example.demo.FluxTest - get 2 18:52:38.159 [parallel-2] INFO com.example.demo.FluxTest - get 3 18:52:39.157 [parallel-2] INFO com.example.demo.FluxTest - get 4 18:52:40.155 [parallel-2] INFO com.example.demo.FluxTest - get 5 18:52:41.154 [parallel-2] INFO com.example.demo.FluxTest - get 6 18:52:42.158 [parallel-2] INFO com.example.demo.FluxTest - get 7 18:52:43.157 [parallel-2] INFO com.example.demo.FluxTest - get 8 18:52:44.156 [parallel-2] INFO com.example.demo.FluxTest - get 9 18:52:45.154 [parallel-2] INFO com.example.demo.FluxTest - get 10
傳統的list streams不是異步的,比如如一批500件的半成品,得在A環節都處理完,才能下一個環節B,而reactive streams之因此成爲reactive,就比如如這批500件的半成品,A環節每處理完一件就能夠當即推往下個環節B處理,源源不斷,而不是等全部的半成品都在A環節處理再推往B環節。典型的活生生的一個生產流水線的例子。
這樣一個生產流水線,有個要求就是每一個環節的處理要可以協調,就像電影起跑線裏頭男主角去工廠打工,流水線花花往他那邊推送貨物,他速度跟不上,致使貨物都掉地上了,最後不得不人工關掉流水線。
在應用程序裏頭,若是發佈者速度過快,而訂閱者速度慢,那麼就會數據就會堆積,控制很差就容易產生內存溢出,而backpressure就專門用來解決這個問題的。異步
@Test public void testPullBackpressure(){ Flux.just(1, 2, 3, 4) .log() .subscribe(new Subscriber<Integer>() { private Subscription s; int onNextAmount; @Override public void onSubscribe(Subscription s) { this.s = s; s.request(2); } @Override public void onNext(Integer integer) { System.out.println(integer); onNextAmount++; if (onNextAmount % 2 == 0) { s.request(2); } } @Override public void onError(Throwable t) {} @Override public void onComplete() {} }); try { Thread.sleep(10*1000); } catch (InterruptedException e) { e.printStackTrace(); } }
藉助線程相關的操做符,好比timeout(),delayElements(),buffer(),skip(),take()來控制數據產生速度。
@Test public void testPushBackpressure() throws InterruptedException { Flux.range(1, 1000) .delayElements(Duration.ofMillis(200)) .subscribe(e -> { LOGGER.info("subscribe:{}",e); try { Thread.sleep(2000); } catch (InterruptedException e1) { e1.printStackTrace(); } }); Thread.sleep(100*1000); }
輸出實例ide
19:37:00.870 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 19:37:01.117 [parallel-1] INFO com.example.demo.FluxTest - subscribe:1 19:37:03.326 [parallel-2] INFO com.example.demo.FluxTest - subscribe:2 19:37:05.535 [parallel-3] INFO com.example.demo.FluxTest - subscribe:3 19:37:07.743 [parallel-4] INFO com.example.demo.FluxTest - subscribe:4 19:37:09.953 [parallel-5] INFO com.example.demo.FluxTest - subscribe:5 19:37:12.156 [parallel-6] INFO com.example.demo.FluxTest - subscribe:6 19:37:14.363 [parallel-7] INFO com.example.demo.FluxTest - subscribe:7 19:37:16.568 [parallel-8] INFO com.example.demo.FluxTest - subscribe:8 19:37:18.775 [parallel-1] INFO com.example.demo.FluxTest - subscribe:9
這是個delayElements的例子,能夠看到數據不丟失,可是延時是生產延時+消費延時
@Test public void testSampleBackpressure() throws InterruptedException { Flux.range(1, 1000) .log() .delayElements(Duration.ofMillis(200)) .sample(Duration.ofMillis(1000)) .subscribe(e -> { LOGGER.info("subscribe:{}",e); try { Thread.sleep(2000); } catch (InterruptedException e1) { e1.printStackTrace(); } }); Thread.sleep(100*1000); }
輸出實例this
19:48:40.516 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 19:48:40.544 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) 19:48:40.546 [main] INFO reactor.Flux.Range.1 - | onNext(1) 19:48:40.770 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(2) 19:48:40.974 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(3) 19:48:41.175 [parallel-4] INFO reactor.Flux.Range.1 - | onNext(4) 19:48:41.378 [parallel-5] INFO reactor.Flux.Range.1 - | onNext(5) 19:48:41.543 [parallel-1] INFO com.example.demo.FluxTest - subscribe:4 19:48:41.583 [parallel-6] INFO reactor.Flux.Range.1 - | onNext(6) 19:48:41.785 [parallel-7] INFO reactor.Flux.Range.1 - | onNext(7) 19:48:41.989 [parallel-8] INFO reactor.Flux.Range.1 - | onNext(8) 19:48:43.547 [parallel-1] INFO reactor.Flux.Range.1 - | onNext(9) 19:48:43.548 [parallel-1] INFO com.example.demo.FluxTest - subscribe:8 19:48:43.751 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(10) 19:48:43.952 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(11)
能夠看到,因爲訂閱者速度慢,致使部分數據被丟棄
@Test public void testBufferBackpressure() throws InterruptedException { Flux.range(1, 1000) // .log() .delayElements(Duration.ofMillis(200)) .buffer(Duration.ofMillis(800)) .subscribe(e -> { LOGGER.info("subscribe:{}",e); try { Thread.sleep(2000); } catch (InterruptedException e1) { e1.printStackTrace(); } }); Thread.sleep(100*1000); }
輸出實例線程
19:55:06.680 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 19:55:06.712 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) 19:55:06.714 [main] INFO reactor.Flux.Range.1 - | onNext(1) 19:55:06.940 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(2) 19:55:07.141 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(3) 19:55:07.343 [parallel-4] INFO reactor.Flux.Range.1 - | onNext(4) 19:55:07.509 [parallel-1] INFO com.example.demo.FluxTest - subscribe:[1, 2, 3] 19:55:07.545 [parallel-5] INFO reactor.Flux.Range.1 - | onNext(5) 19:55:07.748 [parallel-6] INFO reactor.Flux.Range.1 - | onNext(6) 19:55:07.951 [parallel-7] INFO reactor.Flux.Range.1 - | onNext(7) 19:55:08.156 [parallel-8] INFO reactor.Flux.Range.1 - | onNext(8) 19:55:09.512 [parallel-1] INFO com.example.demo.FluxTest - subscribe:[4, 5, 6, 7] 19:55:11.515 [parallel-1] INFO reactor.Flux.Range.1 - | onNext(9) 19:55:11.516 [parallel-1] INFO com.example.demo.FluxTest - subscribe:[8] 19:55:11.719 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(10) 19:55:11.923 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(11) 19:55:12.127 [parallel-4] INFO reactor.Flux.Range.1 - | onNext(12) 19:55:12.330 [parallel-5] INFO reactor.Flux.Range.1 - | onNext(13) 19:55:12.533 [parallel-6] INFO reactor.Flux.Range.1 - | onNext(14) 19:55:12.735 [parallel-7] INFO reactor.Flux.Range.1 - | onNext(15) 19:55:12.941 [parallel-8] INFO reactor.Flux.Range.1 - | onNext(16) 19:55:13.516 [parallel-1] INFO com.example.demo.FluxTest - subscribe:[9, 10, 11, 12, 13, 14, 15] 19:55:15.517 [parallel-1] INFO reactor.Flux.Range.1 - | onNext(17) 19:55:15.517 [parallel-1] INFO com.example.demo.FluxTest - subscribe:[16] 19:55:15.721 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(18) 19:55:15.925 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(19) 19:55:16.127 [parallel-4] INFO reactor.Flux.Range.1 - | onNext(20) 19:55:16.331 [parallel-5] INFO reactor.Flux.Range.1 - | onNext(21) 19:55:16.537 [parallel-6] INFO reactor.Flux.Range.1 - | onNext(22) 19:55:16.738 [parallel-7] INFO reactor.Flux.Range.1 - | onNext(23) 19:55:16.942 [parallel-8] INFO reactor.Flux.Range.1 - | onNext(24) 19:55:17.519 [parallel-1] INFO com.example.demo.FluxTest - subscribe:[17, 18, 19, 20, 21, 22, 23] 19:55:19.522 [parallel-1] INFO reactor.Flux.Range.1 - | onNext(25) 19:55:19.522 [parallel-1] INFO com.example.demo.FluxTest - subscribe:[24]
將每一個800ms內產生的數據堆積爲一批次推送給訂閱者
@Test public void testSkip() throws InterruptedException { Flux.range(1, 1000) .log() .delayElements(Duration.ofMillis(200)) .skip(Duration.ofMillis(800)) .subscribe(e -> { LOGGER.info("subscribe:{}",e); try { Thread.sleep(2000); } catch (InterruptedException e1) { e1.printStackTrace(); } }); Thread.sleep(100*1000); }
輸出實例code
20:02:07.558 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 20:02:07.606 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) 20:02:07.608 [main] INFO reactor.Flux.Range.1 - | onNext(1) 20:02:07.815 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(2) 20:02:08.016 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(3) 20:02:08.218 [parallel-4] INFO reactor.Flux.Range.1 - | onNext(4) 20:02:08.421 [parallel-5] INFO com.example.demo.FluxTest - subscribe:4 20:02:10.425 [parallel-5] INFO reactor.Flux.Range.1 - | onNext(5) 20:02:10.631 [parallel-6] INFO com.example.demo.FluxTest - subscribe:5 20:02:12.635 [parallel-6] INFO reactor.Flux.Range.1 - | onNext(6) 20:02:12.840 [parallel-7] INFO com.example.demo.FluxTest - subscribe:6 20:02:14.843 [parallel-7] INFO reactor.Flux.Range.1 - | onNext(7) 20:02:15.049 [parallel-8] INFO com.example.demo.FluxTest - subscribe:7
經過skip指定跳過最初一個時間段內產生的數據
@Test public void testTakeBackpressure() throws InterruptedException { Flux.range(1, 1000) .log() .delayElements(Duration.ofMillis(200)) .take(Duration.ofMillis(4000)) .subscribe(e -> { LOGGER.info("subscribe:{}",e); try { Thread.sleep(2000); } catch (InterruptedException e1) { e1.printStackTrace(); } }); Thread.sleep(100*1000); }
輸出實例ip
20:05:08.366 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 20:05:08.419 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) 20:05:08.422 [main] INFO reactor.Flux.Range.1 - | onNext(1) 20:05:08.629 [parallel-2] INFO com.example.demo.FluxTest - subscribe:1 20:05:10.633 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(2) 20:05:10.835 [parallel-3] INFO com.example.demo.FluxTest - subscribe:2 20:05:12.418 [parallel-1] INFO reactor.Flux.Range.1 - | cancel()
經過take表示只推送前面幾個或前面一段時間產生的數據給訂閱者
reactive streams對於具備多個階段的數據處理來講,很是有用,能夠節省不少時間,另外又有backpressure來控制訂閱者速度過慢的問題,很是值得使用。內存