本文主要講一下reactive streams的Publisher接口的兩個抽象類Mono與Fluxjava
reactive-streams-1.0.1-sources.jar!/org/reactivestreams/Publisher.javareact
/** * A {@link Publisher} is a provider of a potentially unbounded number of sequenced elements, publishing them according to * the demand received from its {@link Subscriber}(s). * <p> * A {@link Publisher} can serve multiple {@link Subscriber}s subscribed {@link #subscribe(Subscriber)} dynamically * at various points in time. * * @param <T> the type of element signaled. */ public interface Publisher<T> { /** * Request {@link Publisher} to start streaming data. * <p> * This is a "factory method" and can be called multiple times, each time starting a new {@link Subscription}. * <p> * Each {@link Subscription} will work for only a single {@link Subscriber}. * <p> * A {@link Subscriber} should only subscribe once to a single {@link Publisher}. * <p> * If the {@link Publisher} rejects the subscription attempt or otherwise fails it will * signal the error via {@link Subscriber#onError}. * * @param s the {@link Subscriber} that will consume signals from this {@link Publisher} */ public void subscribe(Subscriber<? super T> s); }
reactor-core-3.1.2.RELEASE-sources.jar!/reactor/core/publisher/Mono.javagit
public abstract class Mono<T> implements Publisher<T> { //... /** * Expose the specified {@link Publisher} with the {@link Mono} API, and ensure it will emit 0 or 1 item. * The source emitter will be cancelled on the first `onNext`. * <p> * <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.1.RELEASE/src/docs/marble/from1.png" alt=""> * <p> * @param source the {@link Publisher} source * @param <T> the source type * * @return the next item emitted as a {@link Mono} */ public static <T> Mono<T> from(Publisher<? extends T> source) { if (source instanceof Mono) { @SuppressWarnings("unchecked") Mono<T> casted = (Mono<T>) source; return casted; } if (source instanceof Flux) { @SuppressWarnings("unchecked") Flux<T> casted = (Flux<T>) source; return casted.next(); } return onAssembly(new MonoFromPublisher<>(source)); } /** * Create a new {@link Mono} that emits the specified item, which is captured at * instantiation time. * * <p> * <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.1.RELEASE/src/docs/marble/just.png" alt=""> * <p> * @param data the only item to onNext * @param <T> the type of the produced item * * @return a {@link Mono}. */ public static <T> Mono<T> just(T data) { return onAssembly(new MonoJust<>(data)); } //... }
reactor-core-3.1.2.RELEASE-sources.jar!/reactor/core/publisher/Flux.javagithub
public abstract class Flux<T> implements Publisher<T> { //...... /** * Programmatically create a {@link Flux} with the capability of emitting multiple * elements in a synchronous or asynchronous manner through the {@link FluxSink} API. * <p> * This Flux factory is useful if one wants to adapt some other multi-valued async API * and not worry about cancellation and backpressure (which is handled by buffering * all signals if the downstream can't keep up). * <p> * For example: * * <pre><code> * Flux.<String>create(emitter -> { * * ActionListener al = e -> { * emitter.next(textField.getText()); * }; * // without cleanup support: * * button.addActionListener(al); * * // with cleanup support: * * button.addActionListener(al); * emitter.onDispose(() -> { * button.removeListener(al); * }); * }, FluxSink.OverflowStrategy.LATEST); * </code></pre> * * @param <T> The type of values in the sequence * @param backpressure the backpressure mode, see {@link OverflowStrategy} for the * available backpressure modes * @param emitter Consume the {@link FluxSink} provided per-subscriber by Reactor to generate signals. * @return a {@link Flux} */ public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure) { return onAssembly(new FluxCreate<>(emitter, backpressure, FluxCreate.CreateMode.PUSH_PULL)); } /** * Decorate the specified {@link Publisher} with the {@link Flux} API. * <p> * <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.1.RELEASE/src/docs/marble/from.png" alt=""> * <p> * @param source the source to decorate * @param <T> The type of values in both source and output sequences * * @return a new {@link Flux} */ public static <T> Flux<T> from(Publisher<? extends T> source) { if (source instanceof Flux) { @SuppressWarnings("unchecked") Flux<T> casted = (Flux<T>) source; return casted; } if (source instanceof Fuseable.ScalarCallable) { try { @SuppressWarnings("unchecked") T t = ((Fuseable.ScalarCallable<T>) source).call(); if (t != null) { return just(t); } return empty(); } catch (Exception e) { return error(e); } } return wrap(source); } /** * Programmatically create a {@link Flux} by generating signals one-by-one via a * consumer callback and some state, with a final cleanup callback. The * {@code stateSupplier} may return {@literal null} but your cleanup {@code stateConsumer} * will need to handle the null case. * <p> * <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.1.RELEASE/src/docs/marble/generate.png" alt=""> * <p> * * @param <T> the value type emitted * @param <S> the per-subscriber custom state type * @param stateSupplier called for each incoming Subscriber to provide the initial state for the generator bifunction * @param generator Consume the {@link SynchronousSink} provided per-subscriber by Reactor * as well as the current state to generate a <strong>single</strong> signal on each pass * and return a (new) state. * @param stateConsumer called after the generator has terminated or the downstream cancelled, receiving the last * state to be handled (i.e., release resources or do other cleanup). * * @return a {@link Flux} */ public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer) { return onAssembly(new FluxGenerate<>(stateSupplier, generator, stateConsumer)); } }
@Test public void testMonoBasic(){ Mono.fromSupplier(() -> "Hello").subscribe(System.out::println); Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println); Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println); }
Mono ,是指最多隻能觸發(emit) (事件)一次。它對應於 RxJava 庫的 Single 和 Maybe 類型或者是java的Optional。所以一個異步任務,若是隻是想要在完成時給出完成信號,就能夠使用 Mono<Void>。spring
調用 Flux<T>的single()將返回一個 Mono<T>,而鏈接兩個 monos一塊兒使用 concatWith 將產生一個 Flux。
@Test public void testBasic(){ Flux.just("Hello", "World").subscribe(System.out::println); Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println); Flux.empty().subscribe(System.out::println); Flux.range(1, 10).subscribe(System.out::println); Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println); }
Flux 至關於一個 RxJava Observable,可以發出 0~N 個數據項,而後(可選地)completing 或 erroring。處理多個數據項做爲stream。
Mono和Flux都是實現Publisher接口的抽象類,一個至關於Optional,一個至關於有0..N的stream。兩個都是spring 5 reactive編程的重要基礎概念。編程