Spring5-Reactor函數式編程

前言

反應式編程是一種能夠替代命令式編程的編程範式。這種可替代性存在的緣由在於反應式編程解決了命令式編程中的一些限制。理解這些限制,有助於你更好地理解反應式編程模型的優勢java

反應式流規範

對比 Java 中的流react

Java的流和反應式流Java的流和反應式流之間有不少類似之處。首先,它們的名字中都有流(Stream)這個詞。編程

它們還提供了用於處理數據的函數式API。事實上,正如你稍後將會在介紹Reactor時看到的那樣,它們甚至能夠共享許多相同的操做。數組

Java的流一般都是同步的,而且只能處理有限的數據集。從本質上來講,它們只是使用函數來對集合進行迭代的一種方式。微信

反應式流支持異步處理任意大小的數據集,一樣也包括無限數據集。只要數據就緒,它們就能實時地處理數據,而且可以經過回壓來避免壓垮數據的消費者。app

反應式流規範異步

反應式流規範能夠總結爲4個接口:Publisher、Subscriber、Subscription和Processor。函數

Publisher負責生成數據,並將數據發送給Subscription(每一個Subscriber對應一個Subscription)。測試

Publisher接口聲明瞭一個方法subscribe(),Subscriber能夠經過該方法向Publisher發起訂閱。flex


public interface Publisher<T> { void subscribe(Subscriber<? super T> var1); }

public interface Publisher<T> { void subscribe(Subscriber<? super T> var1); }
public interface Subscriber<T> { void onSubscribe(Subscription var1);
void onNext(T var1);
void onError(Throwable var1);
void onComplete(); }
public interface Subscription { void request(long var1);
void cancel(); }
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

初識Reactor

Reactor項目是反應式流規範的一個實現,提供了一組用於組裝反應式流的函數式API。

反應式編程要求咱們採起和命令式編程不同的思惟方式。此時咱們不會再描述每一步要進行的步驟,反應式編程意味着要構建數據將要流經的管道。當數據流經管道時,能夠對它們進行某種形式的修改或者使用。

命令式編程:

 String a = "Apple"; String s = a.toUpperCase(); String s1 = "hello" + s + "!"; System.out.println(s1);

反應式編程:

 Mono.just("Apple") .map(String::toUpperCase) .map(x-> "hello" + x + "!") .subscribe(System.out::println);

控制檯:

helloAPPLE!14:36:38.685 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging frameworkhelloAPPLE!

這個例子中的Mono是Reactor的兩種核心類型之一,另外一個類型是Flux。二者都實現了反應式流的Publisher接口。Flux表明具備零個、一個或者多個(多是無限個)數據項的管道.

添加Reactor依賴

要開始使用Reactor,請將下面的依賴項添加到項目的構建文件中:

 <!--reactor core--> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency> <!--reactor test--> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <version>3.2.6.RELEASE</version> <scope>compile</scope> </dependency>

使用常見的反應式操做

Flux和Mono是Reactor提供的最基礎的構建塊,而這兩種反應式類型所提供的操做符則是組合使用它們以構建數據流動管線的黏合劑。

Flux和Mono共有500多個操做,這些操做均可以大體歸類爲:

•建立操做;

•組合操做;

•轉換操做;

•邏輯操做。

1 建立


Flux<String> fruitFlux = Flux.just("Apple","Orange");
fruitFlux.subscribe(System.out::println);

這裏傳遞給subscribe()方法的lambda表達式其實是一個java.util.Consumer,用來建立反應式流的Subscriber。在調用subscribe()以後,數據會開始流動。在這個例子中,沒有中間操做,因此數據從Flux直接流向訂閱者

要驗證預約義的數據是否流經了fruitFlux,咱們能夠編寫以下所示的測試代碼:


StepVerifier.create(fruitFlux) .expectNext("Apple") .expectNext("Orange") .verifyComplete();

在這個例子中,StepVerifier訂閱了fruitFlux,而後斷言Flux中的每一個數據項是否與預期的水果名稱相匹配。最後,它驗證Flux在發佈完「Strawberry」以後,整個fruitFlux正常完成。

還能夠從數組 集合 Java Stream來做爲Flux的源。

 List<String> list = Lists.newArrayList(); list.add("Apple"); list.add("Orange"); Flux<String> stringFlux = Flux.fromIterable(list);

例如,要建立一個每秒發佈一個值的Flux,你可使用Flux上的靜態interval() 方法,以下所示:

 Flux<Long> take = Flux.interval(Duration.ofSeconds(1)).take(5);

經過interval()方法建立的Flux會從0開始發佈值,而且後續的條目依次遞增。此外,由於interval()方法沒有指定最大值,因此它可能會永遠運行。咱們也可使用take()方法將結果限制爲前5個條目

2 組合反應式類型

有時候,咱們會須要操做兩種反應式類型,並以某種方式將它們合併在一塊兒。或者,在其餘狀況下,咱們可能須要將Flux拆分爲多種反應式類型

合併:

 Flux<String> fruitFluxA = Flux.just("Apple","Orange");
Flux<String> fruitFluxB = Flux.just("Banana","watermelon");
fruitFluxA.mergeWith(fruitFluxB).subscribe(System.out::println);
 com.ckj.superlearn.superlearn.base.ReactorStrategy16:03:07.343 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging frameworkAppleOrangeBananawatermelon
Process finished with exit code 0

mergeWith()方法不能完美地保證源Flux之間的前後順序,因此咱們能夠考慮使用zip()方法


Flux<String> fruitFluxA = Flux.just("Apple","Orange").delayElements(Duration.ofMillis(10));
Flux<String> fruitFluxB = Flux.just("Banana","watermelon").delayElements(Duration.ofMillis(50));
Flux<String> allFlux = fruitFluxA.mergeWith(fruitFluxB);
allFlux.subscribe(x-> System.out.println("allFlux:"+x));
Flux<Tuple2<String, String>> zip = Flux.zip(fruitFluxA, fruitFluxB);
zip.subscribe(x-> System.out.println("zip:"+x));
Thread.sleep(1000);

控制檯:

/com.ckj.superlearn.superlearn.base.ReactorStrategy16:49:44.543 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging frameworkallFlux:AppleallFlux:OrangeallFlux:Bananazip:[Apple,Banana]allFlux:watermelonzip:[Orange,watermelon]
Process finished with exit code 0

3 轉換和過濾反應式流

針對具備多個數據項的Flux,skip操做將建立一個新的Flux,它會首先跳過指定數量的數據項,而後從源Flux中發佈剩餘的數據項。下面的測試方法展現如何使用skip()方法:

 Flux<String> fruitFluxA = Flux.just("Apple","Orange","Banana","watermelon").skip(2);
fruitFluxA.subscribe(x->{
System.out.println(x);
});
/com.ckj.superlearn.superlearn.base.ReactorStrategy17:05:00.141 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging frameworkBananawatermelon
Process finished with exit code 0

與之對應相反的是take()

 Flux<String> fruitFluxA = Flux.just("Apple","Orange","Banana","watermelon").take(2);
fruitFluxA.subscribe(x->{
System.out.println(x);
});

com.ckj.superlearn.superlearn.base.ReactorStrategy17:20:59.483 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging frameworkAppleOrange
Process finished with exit code 0

filter()的過濾效果

 Flux<String> fruitFluxA = Flux.just("Apple","Orange","Banana","watermelon").take(2);
fruitFluxA.filter(x->x.equals("Apple")).subscribe(x->{ System.out.println(x); });
com.ckj.superlearn.superlearn.base.ReactorStrategy17:24:03.242 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging frameworkApple
Process finished with exit code 0

如何使用flatMap()方法和subscribeOn()方法

 Flux<String> fruitFluxA = Flux.just("Apple", "Orange", "Banana", "watermelon", "Apple", "Orange", "Banana", "watermelon", "Apple", "Orange", "Banana", "watermelon", "Apple", "Orange", "Banana", "watermelon");
fruitFluxA.flatMap(Mono::just).map(String::toUpperCase).subscribeOn(Schedulers.parallel());

使用flatMap()和subscribeOn()的好處是:咱們能夠在多個並行線程之間拆分工做,從而增長流的吞吐量。由於工做是並行完成的,沒法保證哪項工做首先完成,因此結果Flux中數據項的發佈順序是未知的

原創不易,若是以爲有點用的話,請絕不留情點個贊,轉發一下,這將是我持續輸出優質文章的最強動力。


本文分享自微信公衆號 - 小技術君(gh_2fd927ba125d)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息