本文主要研究一下reactive streams的processorsjava
processors既是Publisher也是Subscriber。在project reactor中processor有諸多實現,他們的分類大體以下:react
DirectProcessor以及UnicastProcessor
)EmitterProcessor及ReplayProcessor
)TopicProcessor及WorkQueueProcessor
)它不支持backpressure特性,若是publisher發佈了N個數據,若是其中一個subscriber請求數<N,則拋出IllegalStateException.緩存
@Test public void testDirectProcessor(){ DirectProcessor<Integer> directProcessor = DirectProcessor.create(); Flux<Integer> flux = directProcessor .filter(e -> e % 2 == 0) .map(e -> e +1); flux.subscribe(new Subscriber<Integer>() { private Subscription s; @Override public void onSubscribe(Subscription s) { this.s = s; // s.request(2); } @Override public void onNext(Integer integer) { LOGGER.info("subscribe:{}",integer); } @Override public void onError(Throwable t) { LOGGER.error(t.getMessage(),t); } @Override public void onComplete() { } }); IntStream.range(1,20) .forEach(e -> { directProcessor.onNext(e); }); directProcessor.onComplete(); directProcessor.blockLast(); }
輸出以下數據結構
16:00:11.201 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 16:00:11.216 [main] ERROR com.example.demo.ProcessorTest - Can't deliver value due to lack of requests reactor.core.Exceptions$OverflowException: Can't deliver value due to lack of requests at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215) at reactor.core.publisher.DirectProcessor$DirectInner.onNext(DirectProcessor.java:304) at reactor.core.publisher.DirectProcessor.onNext(DirectProcessor.java:106) at com.example.demo.ProcessorTest.lambda$testDirectProcessor$5(ProcessorTest.java:82) at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110) at java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:557) at com.example.demo.ProcessorTest.testDirectProcessor(ProcessorTest.java:81) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:234) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:74) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
@Test public void testUnicastProcessor() throws InterruptedException { UnicastProcessor<Integer> unicastProcessor = UnicastProcessor.create(Queues.<Integer>get(8).get()); Flux<Integer> flux = unicastProcessor .map(e -> e) .doOnError(e -> { LOGGER.error(e.getMessage(),e); }); IntStream.rangeClosed(1,12) .forEach(e -> { LOGGER.info("emit:{}",e); unicastProcessor.onNext(e); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e1) { e1.printStackTrace(); } }); LOGGER.info("begin to sleep 7 seconds"); TimeUnit.SECONDS.sleep(7); //UnicastProcessor allows only a single Subscriber flux.subscribe(e -> { LOGGER.info("flux subscriber:{}",e); }); unicastProcessor.onComplete(); TimeUnit.SECONDS.sleep(10); // unicastProcessor.blockLast(); //blockLast也是一個subscriber }
輸出實例多線程
16:31:04.970 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 16:31:04.977 [main] INFO com.example.demo.ProcessorTest - emit:1 16:31:05.990 [main] INFO com.example.demo.ProcessorTest - emit:2 16:31:06.991 [main] INFO com.example.demo.ProcessorTest - emit:3 16:31:07.994 [main] INFO com.example.demo.ProcessorTest - emit:4 16:31:08.998 [main] INFO com.example.demo.ProcessorTest - emit:5 16:31:10.002 [main] INFO com.example.demo.ProcessorTest - emit:6 16:31:11.007 [main] INFO com.example.demo.ProcessorTest - emit:7 16:31:12.010 [main] INFO com.example.demo.ProcessorTest - emit:8 16:31:13.014 [main] INFO com.example.demo.ProcessorTest - emit:9 16:31:14.029 [main] INFO com.example.demo.ProcessorTest - emit:10 16:31:14.030 [main] DEBUG reactor.core.publisher.Operators - onNextDropped: 10 16:31:15.034 [main] INFO com.example.demo.ProcessorTest - emit:11 16:31:15.034 [main] DEBUG reactor.core.publisher.Operators - onNextDropped: 11 16:31:16.038 [main] INFO com.example.demo.ProcessorTest - emit:12 16:31:16.038 [main] DEBUG reactor.core.publisher.Operators - onNextDropped: 12 16:31:17.043 [main] INFO com.example.demo.ProcessorTest - begin to sleep 7 seconds 16:31:24.053 [main] INFO com.example.demo.ProcessorTest - flux subscriber:1 16:31:24.053 [main] INFO com.example.demo.ProcessorTest - flux subscriber:2 16:31:24.053 [main] INFO com.example.demo.ProcessorTest - flux subscriber:3 16:31:24.053 [main] INFO com.example.demo.ProcessorTest - flux subscriber:4 16:31:24.053 [main] INFO com.example.demo.ProcessorTest - flux subscriber:5 16:31:24.054 [main] INFO com.example.demo.ProcessorTest - flux subscriber:6 16:31:24.054 [main] INFO com.example.demo.ProcessorTest - flux subscriber:7 16:31:24.054 [main] INFO com.example.demo.ProcessorTest - flux subscriber:8 16:31:24.058 [main] ERROR com.example.demo.ProcessorTest - The receiver is overrun by more signals than expected (bounded queue...) reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...) at reactor.core.Exceptions.failWithOverflow(Exceptions.java:202) at reactor.core.publisher.UnicastProcessor.onNext(UnicastProcessor.java:330) at com.example.demo.ProcessorTest.lambda$testUnicastProcessor$8(ProcessorTest.java:108) at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110) at java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:557)
@Test public void testEmitterProcessor() throws InterruptedException { int bufferSize = 3; //小於8的會被重置爲8 FluxProcessor<Integer, Integer> processor = EmitterProcessor.create(bufferSize); Flux<Integer> flux1 = processor.map(e -> e); Flux<Integer> flux2 = processor.map(e -> e*10); IntStream.rangeClosed(1,8).forEach(e -> { LOGGER.info("emit:{}",e); processor.onNext(e); //若是發佈的未消費數據超過bufferSize,則會阻塞在這裏 }); flux1.subscribe(e -> { LOGGER.info("flux1 subscriber:{}",e); }); IntStream.rangeClosed(9,10).forEach(e -> { LOGGER.info("emit:{}",e); processor.onNext(e); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e1) { e1.printStackTrace(); } }); //這個是後面添加的訂閱,只能消費以後發佈的數據 flux2.subscribe(e -> { LOGGER.info("flux2 subscriber:{}",e); }); processor.onNext(11); processor.onNext(12); processor.onComplete(); processor.blockLast(); }
輸出實例併發
17:27:01.008 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 17:27:01.044 [main] INFO com.example.demo.ProcessorTest - emit:1 17:27:01.084 [main] INFO com.example.demo.ProcessorTest - emit:2 17:27:01.084 [main] INFO com.example.demo.ProcessorTest - emit:3 17:27:01.084 [main] INFO com.example.demo.ProcessorTest - emit:4 17:27:01.084 [main] INFO com.example.demo.ProcessorTest - emit:5 17:27:01.084 [main] INFO com.example.demo.ProcessorTest - emit:6 17:27:01.084 [main] INFO com.example.demo.ProcessorTest - emit:7 17:27:01.084 [main] INFO com.example.demo.ProcessorTest - emit:8 17:27:01.086 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:1 17:27:01.086 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:2 17:27:01.087 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:3 17:27:01.087 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:4 17:27:01.087 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:5 17:27:01.087 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:6 17:27:01.087 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:7 17:27:01.087 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:8 17:27:01.088 [main] INFO com.example.demo.ProcessorTest - emit:9 17:27:01.088 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:9 17:27:02.091 [main] INFO com.example.demo.ProcessorTest - emit:10 17:27:02.092 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:10 17:27:03.096 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:11 17:27:03.096 [main] INFO com.example.demo.ProcessorTest - flux2 subscriber:110 17:27:03.096 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:12 17:27:03.096 [main] INFO com.example.demo.ProcessorTest - flux2 subscriber:120
能夠緩存經過sink產生的數據或者訂閱publisher的數據,而後重放給後來的訂閱者。有以下四種配置app
只緩存最後一個數據
緩存最後N個數據
對每一個數據打上時間戳標籤,只緩存age在指定ttl內的數據
對每一個數據打上時間戳標籤,只緩存age在指定ttl內的N個數據
實例異步
@Test public void testReplayProcessor() throws InterruptedException { ReplayProcessor<Integer> replayProcessor = ReplayProcessor.create(3); Flux<Integer> flux1 = replayProcessor .map(e -> e); Flux<Integer> flux2 = replayProcessor .map(e -> e); flux1.subscribe(e -> { LOGGER.info("flux1 subscriber:{}",e); }); IntStream.rangeClosed(1,5) .forEach(e -> { replayProcessor.onNext(e); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e1) { e1.printStackTrace(); } }); LOGGER.info("finish publish data"); TimeUnit.SECONDS.sleep(3); LOGGER.info("begin to subscribe flux2"); flux2.subscribe(e -> { LOGGER.info("flux2 subscriber:{}",e); }); replayProcessor.onComplete(); replayProcessor.blockLast(); }
輸出以下async
15:13:39.415 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 15:13:39.438 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:1 15:13:40.445 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:2 15:13:41.449 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:3 15:13:42.454 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:4 15:13:43.459 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:5 15:13:44.463 [main] INFO com.example.demo.ProcessorTest - finish publish data 15:13:47.466 [main] INFO com.example.demo.ProcessorTest - begin to subscribe flux2 15:13:47.467 [main] INFO com.example.demo.ProcessorTest - flux2 subscriber:3 15:13:47.467 [main] INFO com.example.demo.ProcessorTest - flux2 subscriber:4 15:13:47.468 [main] INFO com.example.demo.ProcessorTest - flux2 subscriber:5
@Test public void testTopicProcessor() throws InterruptedException { TopicProcessor<Integer> topicProcessor = TopicProcessor.<Integer>builder() .share(true) // .executor(Executors.newSingleThreadExecutor()) .build(); Flux<Integer> flux1 = topicProcessor .map(e -> e); Flux<Integer> flux2 = topicProcessor .map(e -> e); Flux<Integer> flux3 = topicProcessor .map(e -> e); AtomicInteger count = new AtomicInteger(0); flux1.subscribe(e -> { LOGGER.info("flux1 subscriber:{}",e); count.incrementAndGet(); }); flux2.subscribe(e -> { LOGGER.info("flux2 subscriber:{}",e); }); flux3.subscribe(e -> { LOGGER.info("flux3 subscriber:{}",e); }); IntStream.rangeClosed(1,100) .parallel() .forEach(e -> { // LOGGER.info("emit:{}",e); topicProcessor.onNext(e); }); topicProcessor.onComplete(); topicProcessor.blockLast(); TimeUnit.SECONDS.sleep(10); System.out.println(count.get()); }
注意兩個地方:ide
share背後設置的是EventLoopProcessor的multiproducers屬性
reactor-core-3.1.2.RELEASE-sources.jar!/reactor/core/publisher/EventLoopProcessor.java
EventLoopProcessor( int bufferSize, @Nullable ThreadFactory threadFactory, @Nullable ExecutorService executor, ExecutorService requestExecutor, boolean autoCancel, boolean multiproducers, Supplier<Slot<IN>> factory, WaitStrategy strategy) { if (!Queues.isPowerOfTwo(bufferSize)) { throw new IllegalArgumentException("bufferSize must be a power of 2 : " + bufferSize); } if (bufferSize < 1){ throw new IllegalArgumentException("bufferSize must be strictly positive, " + "was: "+bufferSize); } this.autoCancel = autoCancel; contextClassLoader = new EventLoopContext(multiproducers); this.name = defaultName(threadFactory, getClass()); this.requestTaskExecutor = Objects.requireNonNull(requestExecutor, "requestTaskExecutor"); if (executor == null) { this.executor = Executors.newCachedThreadPool(threadFactory); } else { this.executor = executor; } if (multiproducers) { this.ringBuffer = RingBuffer.createMultiProducer(factory, bufferSize, strategy, this); } else { this.ringBuffer = RingBuffer.createSingleProducer(factory, bufferSize, strategy, this); } }
若是share爲true,則建立的是createMultiProducer.
具體的表象就是若是有多線程調用processor的onNext方法,而沒有開啓share的話,會有併發問題,即數據會丟失.好比上面的代碼,若是註釋掉share(true),則最後count的大小就不必定是100,而開啓share爲true就能保證最後count的大小是100若是設置executor(Executors.newSingleThreadExecutor()),則flux1,flux2,flux3的訂閱者則是順序執行,而不是併發的.
@Test public void testWorkQueueProcessor(){ WorkQueueProcessor<Integer> workQueueProcessor = WorkQueueProcessor.create(); Flux<Integer> flux1 = workQueueProcessor .map(e -> e); Flux<Integer> flux2 = workQueueProcessor .map(e -> e); Flux<Integer> flux3 = workQueueProcessor .map(e -> e); flux1.subscribe(e -> { LOGGER.info("flux1 subscriber:{}",e); }); flux2.subscribe(e -> { LOGGER.info("flux2 subscriber:{}",e); }); flux3.subscribe(e -> { LOGGER.info("flux3 subscriber:{}",e); }); IntStream.range(1,20) .forEach(e -> { workQueueProcessor.onNext(e); }); workQueueProcessor.onComplete(); workQueueProcessor.blockLast(); }
輸出實例
21:56:58.203 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 21:56:58.214 [main] DEBUG reactor.core.publisher.UnsafeSupport - Starting UnsafeSupport init in Java 1.8 21:56:58.215 [main] DEBUG reactor.core.publisher.UnsafeSupport - Unsafe is available 21:56:58.228 [WorkQueueProcessor-1] INFO com.example.demo.ProcessorTest - flux1 subscriber:1 21:56:58.228 [WorkQueueProcessor-3] INFO com.example.demo.ProcessorTest - flux3 subscriber:3 21:56:58.228 [WorkQueueProcessor-2] INFO com.example.demo.ProcessorTest - flux2 subscriber:2 21:56:58.229 [WorkQueueProcessor-1] INFO com.example.demo.ProcessorTest - flux1 subscriber:4 21:56:58.229 [WorkQueueProcessor-3] INFO com.example.demo.ProcessorTest - flux3 subscriber:5 21:56:58.229 [WorkQueueProcessor-2] INFO com.example.demo.ProcessorTest - flux2 subscriber:6 21:56:58.230 [WorkQueueProcessor-1] INFO com.example.demo.ProcessorTest - flux1 subscriber:7 21:56:58.230 [WorkQueueProcessor-3] INFO com.example.demo.ProcessorTest - flux3 subscriber:8 21:56:58.230 [WorkQueueProcessor-2] INFO com.example.demo.ProcessorTest - flux2 subscriber:9 21:56:58.230 [WorkQueueProcessor-1] INFO com.example.demo.ProcessorTest - flux1 subscriber:10 21:56:58.230 [WorkQueueProcessor-3] INFO com.example.demo.ProcessorTest - flux3 subscriber:11 21:56:58.230 [WorkQueueProcessor-2] INFO com.example.demo.ProcessorTest - flux2 subscriber:12 21:56:58.230 [WorkQueueProcessor-1] INFO com.example.demo.ProcessorTest - flux1 subscriber:13 21:56:58.230 [WorkQueueProcessor-3] INFO com.example.demo.ProcessorTest - flux3 subscriber:14 21:56:58.230 [WorkQueueProcessor-2] INFO com.example.demo.ProcessorTest - flux2 subscriber:15 21:56:58.230 [WorkQueueProcessor-3] INFO com.example.demo.ProcessorTest - flux3 subscriber:17 21:56:58.230 [WorkQueueProcessor-1] INFO com.example.demo.ProcessorTest - flux1 subscriber:16 21:56:58.230 [WorkQueueProcessor-3] INFO com.example.demo.ProcessorTest - flux3 subscriber:19 21:56:58.230 [WorkQueueProcessor-2] INFO com.example.demo.ProcessorTest - flux2 subscriber:18
能夠看到WorkQueueProcessor的subscriber就相似kafka的同屬於一個group的consumer,各自消費的消息總和就是publisher發佈的總消息,不像TopicProcessor那種廣播式的消息傳遞.