本文主要研究下FluxFlatMap的concurrency及prefetch參數html
@Test public void testConcurrencyAndPrefetch(){ int concurrency = 3; int prefetch = 6; Flux.range(1,100) .log() .flatMap(i -> Flux.just(1,2,3,4,5,6,7,8,9,10).log(), concurrency,prefetch) .subscribe(); }
部分輸出java
23:29:38.515 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 23:29:38.534 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) 23:29:38.537 [main] INFO reactor.Flux.Range.1 - | request(3) 23:29:38.537 [main] INFO reactor.Flux.Range.1 - | onNext(1) 23:29:38.538 [main] INFO reactor.Flux.Array.2 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription) 23:29:38.539 [main] INFO reactor.Flux.Array.2 - | request(6) 23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(1) 23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(2) 23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(3) 23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(4) 23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(5) 23:29:38.539 [main] INFO reactor.Flux.Array.2 - | request(5) 23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(6) 23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(7) 23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(8) 23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(9) 23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(10) 23:29:38.539 [main] INFO reactor.Flux.Array.2 - | request(5) 23:29:38.540 [main] INFO reactor.Flux.Array.2 - | onComplete() 23:29:38.540 [main] INFO reactor.Flux.Range.1 - | request(1) 23:29:38.540 [main] INFO reactor.Flux.Range.1 - | onNext(2) 23:29:38.540 [main] INFO reactor.Flux.Array.3 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription) 23:29:38.540 [main] INFO reactor.Flux.Array.3 - | request(6) 23:29:38.540 [main] INFO reactor.Flux.Array.3 - | onNext(1) 23:29:38.540 [main] INFO reactor.Flux.Array.3 - | onNext(2)
但看外內兩個flux的第一次request,能夠初步看到分別是concurrency及prefetch
reactor-core-3.1.5.RELEASE-sources.jar!/reactor/core/publisher/Flux.javareact
/** * Transform the elements emitted by this {@link Flux} asynchronously into Publishers, * then flatten these inner publishers into a single {@link Flux} through merging, * which allow them to interleave. * <p> * There are three dimensions to this operator that can be compared with * {@link #flatMapSequential(Function) flatMapSequential} and {@link #concatMap(Function) concatMap}: * <ul> * <li><b>Generation of inners and subscription</b>: this operator is eagerly * subscribing to its inners.</li> * <li><b>Ordering of the flattened values</b>: this operator does not necessarily preserve * original ordering, as inner element are flattened as they arrive.</li> * <li><b>Interleaving</b>: this operator lets values from different inners interleave * (similar to merging the inner sequences).</li> * </ul> * The concurrency argument allows to control how many {@link Publisher} can be * subscribed to and merged in parallel. The prefetch argument allows to give an * arbitrary prefetch size to the merged {@link Publisher}. * * <p> * <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/flatmapc.png" alt=""> * * @param mapper the {@link Function} to transform input sequence into N sequences {@link Publisher} * @param concurrency the maximum number of in-flight inner sequences * @param prefetch the maximum in-flight elements from each inner {@link Publisher} sequence * @param <V> the merged output sequence type * * @return a merged {@link Flux} */ public final <V> Flux<V> flatMap(Function<? super T, ? extends Publisher<? extends V>> mapper, int concurrency, int prefetch) { return flatMap(mapper, false, concurrency, prefetch); } final <V> Flux<V> flatMap(Function<? super T, ? extends Publisher<? extends V>> mapper, boolean delayError, int concurrency, int prefetch) { return onAssembly(new FluxFlatMap<>( this, mapper, delayError, concurrency, Queues.get(concurrency), prefetch, Queues.get(prefetch) )); }
這裏使用的是FluxFlatMap
reactor-core-3.1.5.RELEASE-sources.jar!/reactor/core/publisher/FluxFlatMap.javagit
FluxFlatMap(Flux<? extends T> source, Function<? super T, ? extends Publisher<? extends R>> mapper, boolean delayError, int maxConcurrency, Supplier<? extends Queue<R>> mainQueueSupplier, int prefetch, Supplier<? extends Queue<R>> innerQueueSupplier) { super(source); if (prefetch <= 0) { throw new IllegalArgumentException("prefetch > 0 required but it was " + prefetch); } if (maxConcurrency <= 0) { throw new IllegalArgumentException("maxConcurrency > 0 required but it was " + maxConcurrency); } this.mapper = Objects.requireNonNull(mapper, "mapper"); this.delayError = delayError; this.prefetch = prefetch; this.maxConcurrency = maxConcurrency; this.mainQueueSupplier = Objects.requireNonNull(mainQueueSupplier, "mainQueueSupplier"); this.innerQueueSupplier = Objects.requireNonNull(innerQueueSupplier, "innerQueueSupplier"); } @Override public void subscribe(CoreSubscriber<? super R> actual) { if (trySubscribeScalarMap(source, actual, mapper, false)) { return; } source.subscribe(new FlatMapMain<>(actual, mapper, delayError, maxConcurrency, mainQueueSupplier, prefetch, innerQueueSupplier)); }
這裏能夠看到subscribe的時候使用了FlatMapMain
static final class FlatMapMain<T, R> extends FlatMapTracker<FlatMapInner<R>> implements InnerOperator<T, R> { FlatMapMain(CoreSubscriber<? super R> actual, Function<? super T, ? extends Publisher<? extends R>> mapper, boolean delayError, int maxConcurrency, Supplier<? extends Queue<R>> mainQueueSupplier, int prefetch, Supplier<? extends Queue<R>> innerQueueSupplier) { this.actual = actual; this.mapper = mapper; this.delayError = delayError; this.maxConcurrency = maxConcurrency; this.mainQueueSupplier = mainQueueSupplier; this.prefetch = prefetch; this.innerQueueSupplier = innerQueueSupplier; this.limit = Operators.unboundedOrLimit(maxConcurrency); } @Override public void request(long n) { if (Operators.validate(n)) { Operators.addCap(REQUESTED, this, n); drain(); } } @Override public void onSubscribe(Subscription s) { if (Operators.validate(this.s, s)) { this.s = s; actual.onSubscribe(this); s.request(Operators.unboundedOrPrefetch(maxConcurrency)); } } @SuppressWarnings("unchecked") @Override public void onNext(T t) { if (done) { Operators.onNextDropped(t, actual.currentContext()); return; } Publisher<? extends R> p; try { p = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null Publisher"); } catch (Throwable e) { onError(Operators.onOperatorError(s, e, t, actual.currentContext())); return; } if (p instanceof Callable) { R v; try { v = ((Callable<R>) p).call(); } catch (Throwable e) { if (!delayError || !Exceptions.addThrowable(ERROR, this, e)) { onError(Operators.onOperatorError(s, e, t, actual.currentContext())); } return; } tryEmitScalar(v); } else { FlatMapInner<R> inner = new FlatMapInner<>(this, prefetch); if (add(inner)) { p.subscribe(inner); } } } //... }
這個能夠理解爲對外層flux的操做,能夠看到onSubscribe的時候,其內部request的大小爲Operators.unboundedOrPrefetch(maxConcurrency),也就是第一個參數concurrency在onNext操做裏頭,對裏頭的flux使用了FlatMapInnergithub
static final class FlatMapInner<R> implements InnerConsumer<R>, Subscription { FlatMapInner(FlatMapMain<?, R> parent, int prefetch) { this.parent = parent; this.prefetch = prefetch; // this.limit = prefetch >> 2; this.limit = Operators.unboundedOrLimit(prefetch); } @Override public void onSubscribe(Subscription s) { if (Operators.setOnce(S, this, s)) { if (s instanceof Fuseable.QueueSubscription) { @SuppressWarnings("unchecked") Fuseable.QueueSubscription<R> f = (Fuseable.QueueSubscription<R>) s; int m = f.requestFusion(Fuseable.ANY | Fuseable.THREAD_BARRIER); if (m == Fuseable.SYNC) { sourceMode = Fuseable.SYNC; queue = f; done = true; parent.drain(); return; } if (m == Fuseable.ASYNC) { sourceMode = Fuseable.ASYNC; queue = f; } // NONE is just fall-through as the queue will be created on demand } s.request(Operators.unboundedOrPrefetch(prefetch)); } } @Override public void request(long n) { long p = produced + n; if (p >= limit) { produced = 0L; s.request(p); } else { produced = p; } } }
subscribe的時候,request的數量爲Operators.unboundedOrPrefetch(prefetch)
這裏能夠看到這裏對prefetch進行右移2操做,至關於除以4,做爲limit,limit是個判斷,用來對inner的flux的request數量進行限制
flatMap的兩個參數concurrency及prefetch,分別是做用於外頭及裏頭的兩個flux,第一次request都是使用該值,後續的話,其內部會對request的數量進行判斷和調整。web