響應式編程最重要的是解決生產者和消費者之間的關係。若是生產者產生的數據過大,而消費者消費不過來,就會壓垮消費者。因此就須要有一個重要的概念——流控。java
解決流控有幾種方式react
背壓機制web
若是生產者發出的數據比消費者可以處理數據的最大量還要多,消費者可能會被迫一直在獲取和處理數據,消耗愈來愈多的資源,從而埋下潛在的崩潰風險。爲了防止這一點,須要有一種機制使消費者能夠通知生產者下降數據的生成速度。生產者能夠採用多種策略來實現這一要求,這就是背壓。spring
背壓機制應該以非阻塞的方式工做。實現非阻塞背壓的方法是放棄推策略而採用拉策略。編程
響應式流緩存
響應式流規範是提供非阻塞背壓的異步流處理標準的一種倡議。app
響應式流接口框架
public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); }
發佈者(Publisher)是潛在的包含無限數量的有序元素的生產者,它根據收到的請求向當前訂閱者發送元素。dom
public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); }
訂閱者(Subscriber)從發佈者那裏訂閱並接收元素。發佈者向訂閱者發送訂閱令牌(Subscription Token)。使用訂閱令牌,訂閱者向發佈者請求多個元素。當元素準備就緒時,發佈者就會向訂閱者發送合適數量的元素。異步
當執行發佈者的subscribe()方法時,發佈者會回調訂閱者的onSubscribe()方法。在這個方法中,訂閱者一般會藉助傳入的Subscription對象向發佈者請求n個數據。而後發佈者經過不斷調用訂閱者的onNext()方法向訂閱者發出最多n個數據。若是數據所有發完,則會調用onComplete()方法告知訂閱者流已經發完;若是有錯誤發生,則經過onError()方法發出錯誤數據,這一樣也會終止數據流。
public interface Subscription { public void request(long n); public void cancel(); }
訂閱(Subscription)表示訂閱者訂閱的一個發佈者的令牌。當訂閱請求成功時,發佈者將其傳遞給訂閱者。訂閱者使用訂閱令牌與發佈者進行交互。例如,請求更多的元素或取消訂閱。
當發佈者調用subscribe()方法註冊訂閱者時,會經過訂閱者的回調方法onSubscribe()傳入Subscription對象,以後訂閱者就可使用這個Subscription對象的request()方法向發佈者請求數據。背壓機制的實現正是基於這一點。
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }
處理器(Processor)充當訂閱者和發佈者之間的處理媒介。它用於轉換髮布者/訂閱者管道中的元素。Processor<T, R>訂閱類型T的數據元素,接收並轉換爲類型R的元素,而後發佈該元素。處理器在發佈/訂閱管道中充當轉換器(Transformer)的角色。
異步非阻塞的兩種實現方式
public class A { public String methodA() { B b = new B(); try { return b.methodB(this); } catch (InterruptedException e) { e.printStackTrace(); return null; } } public String callback(B b) { return b.getResult(); } public static void main(String[] args) throws InterruptedException { List<A> aList = new ArrayList<>(); for (int i = 0;i < 10;i++) { aList.add(new A()); } aList.parallelStream().map(A::methodA) .forEach(System.out::println); } }
@Data public class B { private String result; public String methodB(A a) throws InterruptedException { Thread.sleep(2000); int i = ThreadLocalRandom.current().nextInt(1, 10); this.result = "ABC" + i; return a.callback(this); } }
運行main方法,兩秒後同時打印
ABC3
ABC8
ABC8
ABC4
ABC2
ABC6
ABC6
ABC7
ABC6
ABC4
有關Future的經常使用模式能夠參考Fork/Join框架原理和使用探祕
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
Future模式最主要的問題是雖然能夠獲取異步執行結果的需求,可是它沒有提供通知機制,咱們沒法得知Future何時完成。爲了獲取結果,咱們要麼使用阻塞的兩種get()方法等待Future結果的返回,這時至關於執行同步操做;要麼使用isDone()方法輪詢地判斷Future是否完成,這樣會消耗CPU資源。
JAVA 8的CompletableFuture在必定程度上彌補了普通Future的缺點。在異步任務完成後,咱們使用任務結果時就不須要等待,能夠直接經過thenAccept(),thenApply(),thenCompose()等方法將前面異步處理的結果交給另一個異步事件處理線程來處理。具體能夠參考Springboot2吞吐量優化的一些解決方案
響應式編程的進步
相比以上異步實現的方式,響應式編程可以支持將來更輕鬆地維護異步處理代碼。
在Spring環境中,爲了支持響應式編程,能夠加入以下依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency>
<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency>
webflux自己包含了Reactor框架
<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency>
在該框架中,最爲核心的就是Flux組件和Mono組件。
Flux
public abstract class Flux<T> implements Publisher<T>
接口實現方法
@Override public final void subscribe(Subscriber<? super T> actual) { onLastAssembly(this).subscribe(Operators.toCoreSubscriber(actual)); }
Flux是一個抽象類,實現了Publisher接口。Flux表明了包含0~n個元素的異步序列,該序列遵循訂閱者的
public void onNext(T t); public void onError(Throwable t); public void onComplete();
傳輸機制。
just()
just()方法能夠指定序列中包含的所有元素,建立出來的Flux序列在發佈這些元素以後會自動結束。通常狀況下,在已知元素數量和內容時,使用just()方法是建立Flux的最簡單的作法。
@SafeVarargs public static <T> Flux<T> just(T... data) { return fromArray(data); }
使用樣例
public class FluxTest { public static void main(String[] args) { Flux.just("A","B","C").subscribe(System.out::println); } }
public final Disposable subscribe(Consumer<? super T> consumer) { Objects.requireNonNull(consumer, "consumer"); return subscribe(consumer, null, null); }
這裏的subscribe是Publisher接口方法的重載實現,Cosumer爲一個無返回類型的函數式接口。這裏須要說明的是just()方法是異步的,可是subscribe()方法是阻塞同步的。
運行結果
A
B
C
fromArray(),fromIterable(),fromStream()
public class FluxTest { public static void main(String[] args) { Flux.fromArray(new Integer[] {1,2,3}) .subscribe(System.out::println); List<Order> orders = new ArrayList<>(); orders.add(new Order(1,"100", LocalDate.now(),23,new BigDecimal(12))); Flux.fromIterable(orders) .subscribe(System.out::println); Flux.fromStream(orders.stream()) .subscribe(System.out::println); } }
public static <T> Flux<T> fromArray(T[] array) { if (array.length == 0) { return empty(); } if (array.length == 1) { return just(array[0]); } return onAssembly(new FluxArray<>(array)); }
public static <T> Flux<T> fromIterable(Iterable<? extends T> it) { return onAssembly(new FluxIterable<>(it)); }
public static <T> Flux<T> fromStream(Stream<? extends T> s) { Objects.requireNonNull(s, "Stream s must be provided"); return onAssembly(new FluxStream<>(() -> s)); }
這裏能夠看到這三個方法最後都是返回的是一個onAssembly()的方法
protected static <T> Flux<T> onAssembly(Flux<T> source) { Function<Publisher, Publisher> hook = Hooks.onEachOperatorHook; if(hook != null) { source = (Flux<T>) hook.apply(source); } if (Hooks.GLOBAL_TRACE) { AssemblySnapshot stacktrace = new AssemblySnapshot(null, Traces.callSiteSupplierFactory.get()); source = (Flux<T>) Hooks.addAssemblyInfo(source, stacktrace); } return source; }
而FluxArray,FluxIterable,FluxStream都是抽象類Flux的子類。
運行結果
1
2
3
Order(id=1, no=100, date=2019-12-25, number=23, amount=12)
Order(id=1, no=100, date=2019-12-25, number=23, amount=12)
empty(),error(),never()
可使用empty()方法建立一個不包含任何元素而只發布結束消息的序列,也可使用error()方法建立一個只包含錯誤消息的序列,還可使用never()方法建立一個不包含任何消息通知的序列。
public class FluxTest { public static void main(String[] args) { Flux.empty().subscribe(System.out::println); Flux.error(new RuntimeException("錯誤")).subscribe(System.out::println); Flux.never().subscribe(System.out::println); } }
public static <T> Flux<T> empty() { return FluxEmpty.instance(); }
public static <T> Flux<T> error(Throwable error) { return error(error, false); }
public static <T> Flux<T> never() { return FluxNever.instance(); }
運行結果
Exception in thread "main" reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: 錯誤
Caused by: java.lang.RuntimeException: 錯誤
at com.guanjian.reflux.fluxtest.FluxTest.main(FluxTest.java:8)
range()
使用range(int start,int count)方法能夠建立包含從start起始的count個對象的序列,序列中的全部對象類型都是Integer。
public class FluxTest { public static void main(String[] args) { Flux.range(1,5).subscribe(System.out::println); } }
public static Flux<Integer> range(int start, int count) { if (count == 1) { return just(start); } if (count == 0) { return empty(); } return onAssembly(new FluxRange(start, count)); }
FluxRange爲Flux的子類。
運行結果
1
2
3
4
5
咱們來修改一下FluxTest的實現
public class FluxTest { public static void main(String[] args) { Flux.range(1,5) .subscribe(i -> System.out.println(i), error -> System.err.println(error), () -> System.out.println("Done")); } }
運行結果
1
2
3
4
5
Done
public static Flux<Integer> range(int start, int count) { if (count == 1) { return just(start); } if (count == 0) { return empty(); } return onAssembly(new FluxRange(start, count)); }
由此處能夠看到,Flux.range(1,5)建立的是一個FluxRange對象,而如下調用是其父類Flux中的方法。
public final Disposable subscribe( @Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer) { return subscribe(consumer, errorConsumer, completeConsumer, null); }
這是一個三參的subscribe()方法
public final Disposable subscribe( @Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Consumer<? super Subscription> subscriptionConsumer) { return subscribeWith(new LambdaSubscriber<>(consumer, errorConsumer, completeConsumer, subscriptionConsumer)); }
其中LambdaSubscriber是一個適配器
/** * 一個無邊界Java Lambda的Subscriber適配器 **/ final class LambdaSubscriber<T> implements InnerConsumer<T>, Disposable { final Consumer<? super T> consumer; final Consumer<? super Throwable> errorConsumer; final Runnable completeConsumer; final Consumer<? super Subscription> subscriptionConsumer; volatile Subscription subscription; static final AtomicReferenceFieldUpdater<LambdaSubscriber, Subscription> S = AtomicReferenceFieldUpdater.newUpdater(LambdaSubscriber.class, Subscription.class, "subscription"); LambdaSubscriber( @Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Consumer<? super Subscription> subscriptionConsumer) { this.consumer = consumer; this.errorConsumer = errorConsumer; this.completeConsumer = completeConsumer; this.subscriptionConsumer = subscriptionConsumer; } @Override public final void onSubscribe(Subscription s) { if (Operators.validate(subscription, s)) { this.subscription = s; if (subscriptionConsumer != null) { try { subscriptionConsumer.accept(s); } catch (Throwable t) { Exceptions.throwIfFatal(t); s.cancel(); onError(t); } } else { //根據傳入的參數,咱們能夠知道subscriptionConsumer爲null,令牌s向發佈者請求的量爲Long.MAX_VALUE s.request(Long.MAX_VALUE); } } } @Override public final void onComplete() { Subscription s = S.getAndSet(this, Operators.cancelledSubscription()); if (s == Operators.cancelledSubscription()) { return; } //根據定義可知,completeConsumer爲() -> System.out.println("Done") if (completeConsumer != null) { try { //該consumer爲Runnable,傳輸完成後執行 completeConsumer.run(); } catch (Throwable t) { Exceptions.throwIfFatal(t); onError(t); } } } @Override public final void onError(Throwable t) { Subscription s = S.getAndSet(this, Operators.cancelledSubscription()); if (s == Operators.cancelledSubscription()) { Operators.onErrorDropped(t, Context.empty()); return; } //根據定義可知,errorConsumer爲error -> System.err.println(error) if (errorConsumer != null) { errorConsumer.accept(t); } else { throw Exceptions.errorCallbackNotImplemented(t); } } @Override public final void onNext(T x) { try { //由定義可知,此處consumer爲i -> System.out.println(i) if (consumer != null) { //此處x爲i,即執行打印i consumer.accept(x); } } catch (Throwable t) { Exceptions.throwIfFatal(t); //若是發生異常,令牌執行終止發送 this.subscription.cancel(); //跳轉到錯誤處理 onError(t); } } @Override @Nullable public Object scanUnsafe(Attr key) { if (key == Attr.PARENT) return subscription; if (key == Attr.PREFETCH) return Integer.MAX_VALUE; if (key == Attr.TERMINATED || key == Attr.CANCELLED) return isDisposed(); return null; } @Override public boolean isDisposed() { return subscription == Operators.cancelledSubscription(); } @Override public void dispose() { Subscription s = S.getAndSet(this, Operators.cancelledSubscription()); if (s != null && s != Operators.cancelledSubscription()) { s.cancel(); } } }
由如下subscribeWith()方法能夠看出LambdaSubscriber對象其實就是一個訂閱者
public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber) { subscribe(subscriber); return subscriber; }
LambdaSubscriber類實現了Subscriber(訂閱者)接口。
在LambdaSubscriber源碼中,咱們能夠看到
//根據傳入的參數,咱們能夠知道subscriptionConsumer爲null,令牌s向發佈者請求的量爲Long.MAX_VALUE s.request(Long.MAX_VALUE);
這個s是一個Subscription(令牌),request()方法爲令牌發起的訂閱,而令牌Subscription接口的實現類爲FluxRange中的內部類RangeSubscription,其繼承圖以下。
咱們來看它的request()方法
@Override public void request(long n) { if (Operators.validate(n)) { if (Operators.addCap(REQUESTED, this, n) == 0) { if (n == Long.MAX_VALUE) { //因爲n爲Long.MAX_VALUE,因此調用fastPath() fastPath(); } else { slowPath(n); } } } }
fastPath()包含了全部的響應式的處理流程,全方法以下
void fastPath() { final long e = end; //此處訂閱者爲LambdaSubscriber final Subscriber<? super Integer> a = actual; for (long i = index; i != e; i++) { if (cancelled) { return; } a.onNext((int) i); } if (cancelled) { return; } a.onComplete(); }
interval()
interval()方法表現爲一個方法系列,其中interval(Duration period)方法用來建立一個包含從0開始遞增的Long對象的序列,序列中的元素按照指定的時間間隔來發布。而interval(Duration delay,Duration period)方法除了能夠指定時間間隔,還能夠指定起始元素髮布以前的延遲時間。intervalMillis(long period)和intervalMillis(long delay,long period)與前面兩個方法的做用相同,只不過這兩個方法經過毫秒數來指定時間間隔和延遲時間。
Duration爲java 8新增的關於時間間隔的類型。
public class FluxTest { public static void main(String[] args) throws InterruptedException { Flux.interval(Duration.ofSeconds(1)).subscribe(System.out::println); Thread.sleep(2000); } }
這裏須要注意的是Flux的建立元素的方法都是異步的守護線程,因此這裏主線程結束,Flux.interval就結束了。
運行結果(2秒後)
0
1
若是不設置主線程睡眠,則立刻會退出,無任何打印內容。固然在WebFlux應用中,主線程都是Springboot來啓動的,因此不準如此操做,主線程不會隨便退出。
public static Flux<Long> interval(Duration period) { return interval(period, Schedulers.parallel()); }
generate()
generate()方法經過同步和逐一的方式來產生Flux序列,序列的產生依賴於Reactor所提供的SynchronousSink組件。
public class FluxTest { public static void main(String[] args) { Flux.generate(sink -> { sink.next("Hello"); sink.complete(); }).subscribe(System.out::println); } }
public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator) { Objects.requireNonNull(generator, "generator"); return onAssembly(new FluxGenerate<>(generator)); }
運行結果
Hello
create()
create()使用的是FluxSink組件。
public class FluxTest { public static void main(String[] args) { Flux.create(sink -> { for (int i = 0;i < 10;i++) { sink.next(i); } sink.complete(); }).subscribe(System.out::println); } }
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter) { return create(emitter, OverflowStrategy.BUFFER); }
運行結果
0
1
2
3
4
5
6
7
8
9
Mono
public abstract class Mono<T> implements Publisher<T>
接口實現方法
@Override public final void subscribe(Subscriber<? super T> actual) { onLastAssembly(this).subscribe(Operators.toCoreSubscriber(actual)); }
Mono也是一個抽象類,實現了Publisher接口,Mono表示包含0個或1個元素的異步序列。該序列遵循訂閱者的
public void onNext(T t); public void onError(Throwable t); public void onComplete();
傳輸機制。Mono也能夠用來表示一個空的異步序列,該序列沒有任何元素,僅僅包含該序列結束的概念。咱們能夠用Mono<Void>表明一個空的異步序列。
Mono包含Flux的靜態方法,有以下不一樣的方法
delay()
delay(Duration duration)和delayMilis(long duration)方法能夠建立一個Mono序列。它們的特色是,在指定的延遲時間以後會產生數字0作爲惟一值。
public class MonoTest { public static void main(String[] args) throws InterruptedException { Mono.delay(Duration.ofSeconds(2)) .subscribe(System.out::println); Thread.sleep(2500); } }
public static Mono<Long> delay(Duration duration) { return delay(duration, Schedulers.parallel()); }
運行結果(2.5秒後)
0
justOrEmpty()
justOrEmpty(Optional<? extends T> data)方法從一個Optional對象建立Mono,只有當Optional對象包含值時,Mono序列才產生對應的元素。關於Optional的說明請參考Java函數式編程整理
public class MonoTest { public static void main(String[] args) throws InterruptedException { Mono.justOrEmpty(Optional.of("Hello")) .subscribe(System.out::println); Mono.justOrEmpty(Optional.empty()) .subscribe(System.out::println); } }
public static <T> Mono<T> justOrEmpty(@Nullable Optional<? extends T> data) { return data != null && data.isPresent() ? just(data.get()) : empty(); }
運行結果
Hello
create()
public class MonoTest { public static void main(String[] args) throws InterruptedException { Mono.create(sink -> sink.success("Hello")) .subscribe(System.out::println); } }
public static <T> Mono<T> create(Consumer<MonoSink<T>> callback) { return onAssembly(new MonoCreate<>(callback)); }
運行結果
Hello
操做符
buffer() 把當前流中的元素收集到集合中,並把集合對象作爲流中的新元素。使用buffer操做符在進行元素收集時能夠指定集合對象所包含的元素對最大數量。
public class FluxTest { public static void main(String[] args) { Flux.range(1,50).buffer(10) .subscribe(System.out::println); } }
public final Flux<List<T>> buffer(int maxSize) { return buffer(maxSize, listSupplier()); }
運行結果
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
[21, 22, 23, 24, 25, 26, 27, 28, 29, 30]
[31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
[41, 42, 43, 44, 45, 46, 47, 48, 49, 50]
由結果可知,buffer操做符從包含50個元素的流中構建集合,每一個集合包含10個元素,共構建5個集合。
bufferTimeout() 能夠指定時間間隔爲一個Duration對象或毫秒數,即便用bufferMillis()或bufferTimeoutMillis()這兩個方法
Flux.range(1,50).bufferTimeout(10, Duration.ofSeconds(2)) .subscribe(System.out::println);
public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime) { return bufferTimeout(maxSize, maxTime, listSupplier()); }
bufferUntil()和bufferWhile() bufferUntile會一直收集,直到謂詞(Predicate)條件返回true,使得謂詞條件返回true的那個元素能夠選擇添加到當前集合或下一個集合中。bufferWhile則只有當謂詞條件返回true時纔會收集,一旦值爲false,會當即開始下一次收集。
public class FluxTest { public static void main(String[] args) { Flux.range(1,10).bufferUntil(i -> i % 2 == 0) .subscribe(System.out::println); } }
public final Flux<List<T>> bufferUntil(Predicate<? super T> predicate) { return onAssembly(new FluxBufferPredicate<>(this, predicate, listSupplier(), FluxBufferPredicate.Mode.UNTIL)); }
運行結果
[1, 2]
[3, 4]
[5, 6]
[7, 8]
[9, 10]
由結果可知,當收集元素爲偶數時,被添加到當前集合中,不爲偶數的元素被添加到下一個集合中。
public class FluxTest { public static void main(String[] args) { Flux.range(1,10).bufferWhile(i -> i % 2 == 0) .subscribe(System.out::println); } }
public final Flux<List<T>> bufferWhile(Predicate<? super T> predicate) { return onAssembly(new FluxBufferPredicate<>(this, predicate, listSupplier(), FluxBufferPredicate.Mode.WHILE)); }
運行結果
[2]
[4]
[6]
[8]
[10]
由結果可知,只有當元素爲偶數時纔會收集,不然會當即開始下一次收集。
map()
map()操做符至關於一種映射操做,它對流中的每一個元素應用一個映射函數,從而達到變換效果。
public class FluxTest { public static void main(String[] args) { Flux.range(1,10).map(i -> String.valueOf(i) + "a") .subscribe(System.out::println); } }
public final <V> Flux<V> map(Function<? super T, ? extends V> mapper) { if (this instanceof Fuseable) { return onAssembly(new FluxMapFuseable<>(this, mapper)); } return onAssembly(new FluxMap<>(this, mapper)); }
運行結果
1a
2a
3a
4a
5a
6a
7a
8a
9a
10a
flatMap()
flatMap操做符把流中的每一個元素轉換成一個流,再把轉換以後獲得的全部流中的元素進行合併。
public class FluxTest { public static void main(String[] args) { Flux.just(1,5).flatMap(x -> Mono.just(x * x)) .subscribe(System.out::println); } }
public final <R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper) { return flatMap(mapper, Queues.SMALL_BUFFER_SIZE, Queues .XS_BUFFER_SIZE); }
運行結果
1
25
window()
window操做符是把當前流中的元素收集到另外的Flux序列中。所以返回類型是Flux<Flux<T>>。
public class FluxTest { public static void main(String[] args) { Flux.range(1,5).window(2).toIterable().forEach(w -> { w.subscribe(System.out::println); System.out.println("-----------"); }); } }
public final Flux<Flux<T>> window(int maxSize) { return onAssembly(new FluxWindow<>(this, maxSize, Queues.get(maxSize))); }
public final Iterable<T> toIterable() { return toIterable(Queues.SMALL_BUFFER_SIZE); }
運行結果
1
2
-----------
3
4
-----------
5
-----------
filter()
filter操做符的含義與普通的過濾器相似,就是對流中包含的元素進行過濾,只留下知足指定過濾條件的元素。
public class FluxTest { public static void main(String[] args) { Flux.range(1,10).filter(i -> i % 2 == 0) .subscribe(System.out::println); } }
運行結果
2
4
6
8
10
first()
first操做符挑選出第一個發佈者,由其提供事件。能有效避免多個源的衝突。
public class FluxTest { public static void main(String[] args) { List<String> list = new ArrayList<>(); list.add("a"); list.add("b"); Flux.first(Flux.fromIterable(list),Flux.just("c","d")) .subscribe(System.out::println); } }
@SafeVarargs public static <I> Flux<I> first(Publisher<? extends I>... sources) { return onAssembly(new FluxFirstEmitting<>(sources)); }
運行結果
a
b
由結果可知,這裏挑選的是Flux.fromIterable(list)作爲數據源。
last()
last操做符返回流中最後一個元素。
public class FluxTest { public static void main(String[] args) { Flux.range(1,10).last() .subscribe(System.out::println); } }
public final Mono<T> last() { if (this instanceof Callable) { @SuppressWarnings("unchecked") Callable<T> thiz = (Callable<T>) this; Mono<T> callableMono = convertToMono(thiz); if (callableMono == Mono.empty()) { return Mono.error(new NoSuchElementException("Flux#last() didn't observe any onNext signal from Callable flux")); } return callableMono; } return Mono.onAssembly(new MonoTakeLastOne<>(this)); }
運行結果
10
skip()
skip操做符會忽略數據流的前n個元素。
public class FluxTest { public static void main(String[] args) { Flux.range(1,10).skip(4) .subscribe(System.out::println); } }
public final Flux<T> skip(long skipped) { if (skipped == 0L) { return this; } else { return onAssembly(new FluxSkip<>(this, skipped)); } }
運行結果
5
6
7
8
9
10
skipLast()
skipLast操做符會忽略流的最後n個元素。
public class FluxTest { public static void main(String[] args) { Flux.range(1,10).skipLast(4) .subscribe(System.out::println); } }
public final Flux<T> skipLast(int n) { if (n == 0) { return this; } return onAssembly(new FluxSkipLast<>(this, n)); }
運行結果
1
2
3
4
5
6
take()
take操做符從當前流中提取元素。能夠按照指定的數量來提取元素,take(long n);也能夠按照指定的時間間隔來提取元素,take(Duration timespan)和takeMillis(long timespan)。
public class FluxTest { public static void main(String[] args) { Flux.range(1,1000000).take(10) .subscribe(System.out::println); } }
public final Flux<T> take(long n) { if (this instanceof Fuseable) { return onAssembly(new FluxTakeFuseable<>(this, n)); } return onAssembly(new FluxTake<>(this, n)); }
運行結果
1
2
3
4
5
6
7
8
9
10
public class FluxTest { public static void main(String[] args) { Flux.range(1,1000000).take(Duration.ofSeconds(1)) .subscribe(System.out::println); } }
public final Flux<T> take(Duration timespan) { return take(timespan, Schedulers.parallel()); }
運行結果
.
.
434349
434350
434351
434352
434353
434354
434355
434356
434357
由結果可知,1秒內能夠提取到434357,而不到100萬。
takeLast()
takeLast操做符用來從當前流中尾部提取元素。
public class FluxTest { public static void main(String[] args) { Flux.range(1,1000000).takeLast(10) .subscribe(System.out::println); } }
public final Flux<T> takeLast(int n) { if(n == 1){ return onAssembly(new FluxTakeLastOne<>(this)); } return onAssembly(new FluxTakeLast<>(this, n)); }
運行結果
999991
999992
999993
999994
999995
999996
999997
999998
999999
1000000
then()
then操做符的含義是等到上一個操做完成再作下一個。
public class FluxTest { public static void main(String[] args) { Flux.range(1,10).then(Mono.fromRunnable(() -> System.out.println("Done"))) .subscribe(); } }
public final <V> Mono<V> then(Mono<V> other) { return Mono.onAssembly(new MonoIgnoreThen<>(new Publisher[] { this }, other)); }
運行結果
Done
when()
when操做符爲等到多個操做一塊兒完成。該操做符爲Mono專有的靜態方法
public class MonoTest { public static void main(String[] args) throws InterruptedException { Mono.when(Flux.range(1,10),Mono.just("a")).then(Mono.just("b")) .subscribe(System.out::println); } }
public static Mono<Void> when(Publisher<?>... sources) { if (sources.length == 0) { return empty(); } if (sources.length == 1) { return empty(sources[0]); } return onAssembly(new MonoWhen(false, sources)); }
由代碼可知,when是一個Mono<Void>的,它不會返回任何數據,但做用就是等待Flux.range(1,10),Mono.just("a")完成。因爲有then
運行結果
b
defer()
defer操做符提供了一種惰性策略,發佈者不會一開始發佈消息,直到訂閱者建立實例。
public class FluxTest { public static void main(String[] args) { Flux.defer(() -> Flux.range(1,5)) .subscribe(System.out::println); } }
public static <T> Flux<T> defer(Supplier<? extends Publisher<T>> supplier) { return onAssembly(new FluxDefer<>(supplier)); }
運行結果
1
2
3
4
5
startWith()
startWith操做符是數據元素序列的開頭插入指定的元素項。
public class FluxTest { public static void main(String[] args) { Flux.range(1,5).startWith(Mono.just(0)) .subscribe(System.out::println); } }
public final Flux<T> startWith(Publisher<? extends T> publisher) { if (this instanceof FluxConcatArray) { FluxConcatArray<T> fluxConcatArray = (FluxConcatArray<T>) this; return fluxConcatArray.concatAdditionalSourceFirst(publisher); } return concat(publisher, this); }
運行結果
0
1
2
3
4
5
merge()
merge操做符用來把多個流合併成一個Flux序列,該操做按照全部流中元素的實際產生順序來合併。
public class FluxTest { public static void main(String[] args) throws InterruptedException { Flux.merge(Flux.interval(Duration.ofSeconds(1)).take(3) ,Flux.interval(Duration.ofSeconds(1)).take(3)) .toStream().forEach(System.out::println); } }
@SafeVarargs public static <I> Flux<I> merge(Publisher<? extends I>... sources) { return merge(Queues.XS_BUFFER_SIZE, sources); }
運行結果(每秒打印一對數)
0
0
1
1
2
2
mergeSequential()
mergeSequential操做符按照全部流被訂閱的順序以流爲單位進行合併。
public class FluxTest { public static void main(String[] args) throws InterruptedException { Flux.mergeSequential(Flux.interval(Duration.ofSeconds(1)).take(3) ,Flux.interval(Duration.ofSeconds(1)).take(3)) .toStream().forEach(System.out::println); } }
@SafeVarargs public static <I> Flux<I> mergeSequential(Publisher<? extends I>... sources) { return mergeSequential(Queues.XS_BUFFER_SIZE, false, sources); }
運行結果(每秒打印第一個流的0,1,2;然後一次性打印第二個流的0,1,2)
0
1
2
0
1
2
zipWith()
zipWith操做符把當前流中的元素與另一個流中的元素按照一對一的方式進行合併。
public class FluxTest { public static void main(String[] args) throws InterruptedException { Flux.just("a","b","f").zipWith(Flux.just("c","d","e")) .subscribe(System.out::println); } }
public final <T2> Flux<Tuple2<T, T2>> zipWith(Publisher<? extends T2> source2) { return zipWith(source2, tuple2Function()); }
這裏Tuple2爲一個兩元素的元組,爲Scala的基本類型。
運行結果
[a,c]
[b,d]
[f,e]
同時咱們也能夠經過一個BiFunction函數(給定任意兩個參數對象,返回一個任意結果對象)對合並的元素進行處理,所獲得的流的元素類型爲該函數的返回值。
public class FluxTest { public static void main(String[] args) throws InterruptedException { Flux.just("a","b","f").zipWith(Flux.just("c","d","e"),(s1,s2) -> String.format("%s + %s",s1,s2)) .subscribe(System.out::println); } }
public final <T2, V> Flux<V> zipWith(Publisher<? extends T2> source2, final BiFunction<? super T, ? super T2, ? extends V> combinator) { if (this instanceof FluxZip) { @SuppressWarnings("unchecked") FluxZip<T, V> o = (FluxZip<T, V>) this; Flux<V> result = o.zipAdditionalSource(source2, combinator); if (result != null) { return result; } } return zip(this, source2, combinator); }
運行結果
a + c
b + d
f + e
defaultIfEmpty()
defaultEmpty操做符返回來自原始數據流的元素,若是原始數據流中沒有元素,則返回一個默認元素。
public class FluxTest { public static void main(String[] args) throws InterruptedException { Flux.just("a","b","c").defaultIfEmpty("g") .subscribe(System.out::println); System.out.println("---------"); Flux.empty().defaultIfEmpty("g") .subscribe(System.out::println); } }
public final Flux<T> defaultIfEmpty(T defaultV) { return onAssembly(new FluxDefaultIfEmpty<>(this, defaultV)); }
運行結果
a
b
c
---------
g
takeUntil()
takeUnitl操做符的基本用法是takeUntil(Predicate<? super T> predicate),takeUntil將提取元素直到謂詞條件返回true。
public class FluxTest { public static void main(String[] args) throws InterruptedException { Flux.range(1,100).takeUntil(i -> i == 10) .subscribe(System.out::println); } }
public final Flux<T> takeUntil(Predicate<? super T> predicate) { return onAssembly(new FluxTakeUntil<>(this, predicate)); }
運行結果
1
2
3
4
5
6
7
8
9
10
由結果可知,從1到100中,只提取到知足等於10時就結束了。
takeWhile()
takeWhile操做符爲takeWhile(Predicate<? super T> continuePredicate),takeWhile會在謂詞continuePredicate返回true時才進行元素的提取。
public class FluxTest { public static void main(String[] args) throws InterruptedException { Flux.range(1,10).takeWhile(i -> i <= 5) .subscribe(System.out::println); } }
public final Flux<T> takeWhile(Predicate<? super T> continuePredicate) { return onAssembly(new FluxTakeWhile<>(this, continuePredicate)); }
運行結果
1
2
3
4
5
經測試,這裏只有設小於或小於等於纔有效,直接等於或者大於都無效
skipUntil()
skipUntil操做符的基本用法是skipUntil(Predicate<? super T> predicate)。skipUntil將丟棄原始數據流中的元素,直到謂詞Predicate返回true.
public class FluxTest { public static void main(String[] args) throws InterruptedException { Flux.range(1,10).skipUntil(i -> i > 5) .subscribe(System.out::println); } }
public final Flux<T> skipUntil(Predicate<? super T> untilPredicate) { return onAssembly(new FluxSkipUntil<>(this, untilPredicate)); }
運行結果
6
7
8
9
10
由結果可知,不知足條件的1到5所有被丟棄了。
skipWhile()
skipWhile操做符爲skipWhile(Predicate<? super T> continuePredicate)。當謂詞continuePredicate返回true時才進行元素的丟棄。
public class FluxTest { public static void main(String[] args) throws InterruptedException { Flux.range(1,10).skipWhile(i -> i < 5) .subscribe(System.out::println); } }
public final Flux<T> skipWhile(Predicate<? super T> skipPredicate) { return onAssembly(new FluxSkipWhile<>(this, skipPredicate)); }
運行結果
5
6
7
8
9
10
經測試,這裏只有設小於或小於等於纔有效,直接等於或者大於都無效
concat()
concat操做符用來合併來自不一樣Flux的數據,這種合併採用的是順序的方式。
public class FluxTest { public static void main(String[] args) throws InterruptedException { Flux.concat(Flux.range(1,5), Mono.just("a")) .subscribe(System.out::println); } }
@SafeVarargs public static <T> Flux<T> concat(Publisher<? extends T>... sources) { return onAssembly(new FluxConcatArray<>(false, sources)); }
運行結果
1
2
3
4
5
a
count()
count操做符統計Flux中全部元素的個數。
public class FluxTest { public static void main(String[] args) throws InterruptedException { Flux.range(1,5).count() .subscribe(System.out::println); } }
public final Mono<Long> count() { return Mono.onAssembly(new MonoCount<>(this)); }
運行結果
5
reduce()
reduce操做符對流中的全部元素進行累計操做,獲得一個包含計算結果的Mono序列。具體的累計操做也是經過BiFunction(給出任意兩種類型,獲得任意一種類型,不過這裏面都是同一種類型)來實現的。
public class FluxTest { public static void main(String[] args) throws InterruptedException { Flux.range(1,5).reduce((x,y) -> x + y) .subscribe(System.out::println); } }
public final Mono<T> reduce(BiFunction<T, T, T> aggregator) { if (this instanceof Callable){ @SuppressWarnings("unchecked") Callable<T> thiz = (Callable<T>)this; return convertToMono(thiz); } return Mono.onAssembly(new MonoReduce<>(this, aggregator)); }
運行結果
15
reduceWith()
reduceWith操做符,用來在進行reduce操做時指定一個初始值。
public class FluxTest { public static void main(String[] args) throws InterruptedException { Flux.range(1,5).reduceWith(() -> 6,(x,y) -> x + y) .subscribe(System.out::println); } }
public final <A> Mono<A> reduceWith(Supplier<A> initial, BiFunction<A, ? super T, A> accumulator) { return Mono.onAssembly(new MonoReduceSeed<>(this, initial, accumulator)); }
運行結果
21
subscribe()
能夠經過subscribe()方法來添加相應的訂閱邏輯。在調用subscribe()方法時能夠指定須要處理的消息類型。
Reactor中的消息類型有三種,即正常消息、錯誤消息和完成消息。subscribe操做符能夠只處理其中包含的正常消息,也能夠同時處理錯誤消息和完成消息。
public class MonoTest { public static void main(String[] args) throws InterruptedException { Mono.just(100).concatWith(Mono.error(new IllegalStateException())) .subscribe(System.out::println,System.err::println); } }
運行結果
100
java.lang.IllegalStateException
有時候咱們不想直接拋出異常,而是但願採用一種容錯策略來返回一個默認值,就能夠採用以下方式
public class MonoTest { public static void main(String[] args) throws InterruptedException { Mono.just(100).concatWith(Mono.error(new IllegalStateException())) .onErrorReturn(0) .subscribe(System.out::println,System.err::println); } }
運行結果
100
0
timeout()
timeout操做符維持原始被觀察者的狀態,在特定時間段內沒有產生任何事件時,將生成一個異常。
public class FluxTest { public static void main(String[] args) throws InterruptedException { Flux.never().timeout(Duration.ofSeconds(1)) .subscribe(System.out::println); Thread.sleep(1500); } }
public final Flux<T> timeout(Duration timeout) { return timeout(timeout, null, Schedulers.parallel()); }
運行結果(1.5秒後)
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 1000ms in 'source(FluxNever)' (and no fallback has been configured)
Caused by: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 1000ms in 'source(FluxNever)' (and no fallback has been configured)
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:288)
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:273)
at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:390)
at reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73)
at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:117)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:50)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:27)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
block()
block操做符在接收到下一個元素以前一直阻塞。
block操做符經常使用來把響應式數據流轉換爲傳統的數據流。
public class MonoTest { public static void main(String[] args) throws InterruptedException { Integer i = Mono.just(100).block(); System.out.println(i); } }
@Nullable public T block() { BlockingMonoSubscriber<T> subscriber = new BlockingMonoSubscriber<>(); onLastAssembly(this).subscribe(Operators.toCoreSubscriber(subscriber)); return subscriber.blockingGet(); }
運行結果
100
block()方法也能夠帶一個Duration的參數
public class MonoTest { public static void main(String[] args) throws InterruptedException { Integer i = Mono.just(100).block(Duration.ofSeconds(1)); System.out.println(i); } }
@Nullable public T block(Duration timeout) { BlockingMonoSubscriber<T> subscriber = new BlockingMonoSubscriber<>(); onLastAssembly(this).subscribe(Operators.toCoreSubscriber(subscriber)); return subscriber.blockingGet(timeout.toMillis(), TimeUnit.MILLISECONDS); }
log()
log操做符用於觀察全部的數據並使用日誌工具進行跟蹤。
public class FluxTest { public static void main(String[] args) throws InterruptedException { Flux.range(1,2).log() .subscribe(System.out::println); } }
public final Flux<T> log() { return log(null, Level.INFO); }
運行結果
00:06:13.347 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
00:06:13.350 [main] INFO reactor.Flux.Range.1 - | request(unbounded)
00:06:13.350 [main] INFO reactor.Flux.Range.1 - | onNext(1)
1
00:06:13.350 [main] INFO reactor.Flux.Range.1 - | onNext(2)
2
00:06:13.351 [main] INFO reactor.Flux.Range.1 - | onComplete()
在Reactor框架中,針對背壓有如下4種策略
Reactor框架提供了相應的onBackpressureXxx操做符來設置背壓
onBackpressureBuff()
對來自下游的請求採起緩衝策略。
public class FluxTest { public static void main(String[] args) { Flux.range(1,1000000).onBackpressureBuffer() .subscribe(System.out::println); } }
public final Flux<T> onBackpressureBuffer() { return onAssembly(new FluxOnBackpressureBuffer<>(this, Queues .SMALL_BUFFER_SIZE, true, null)); }
onBackpressureDrop()
元素就緒時,根據下游是否有未響應的請求來判斷是否發出當前元素。
public class FluxTest { public static void main(String[] args) { Flux.range(1,1000000).onBackpressureDrop() .subscribe(System.out::println); } }
public final Flux<T> onBackpressureDrop() { return onAssembly(new FluxOnBackpressureDrop<>(this)); }
onBackpressureLatest()
當有新的請求到來時,將最新的元素髮出。
public class FluxTest { public static void main(String[] args) { Flux.range(1,1000000).onBackpressureLatest() .subscribe(System.out::println); } }
public final Flux<T> onBackpressureLatest() { return onAssembly(new FluxOnBackpressureLatest<>(this)); }
onBackpressureError()
當有多餘元素就緒時,發出錯誤信號
public class FluxTest { public static void main(String[] args) { Flux.range(1,1000000).onBackpressureError() .subscribe(System.out::println); } }
public final Flux<T> onBackpressureError() { return onBackpressureDrop(t -> { throw Exceptions.failWithOverflow();}); }