實戰SpringCloud響應式微服務系列教程(第七章)

本章節繼續介紹:Flux和Mono操做符(二)java

1.條件操做符

Reactor中經常使用的條件操做符有defaultIfRmpty、skipUntil、skipWhile、takeUntil和takeWhile等。app

一、defaultIfRmpty

defaultIfRmpty操做符返回來自原始數據流的元素,若是原始數據流中沒有元素,則返回一個默認元素。函數

defaultIfRmpty操做符在實際開發過程當中應用普遍,一般用在對方法返回值的處理上。以下controller層對service層返回值的處理。微服務

@GetMapper("/article/{id}")
public Mono<ResponseEntity<Article>> findById(@PathVariable String id){
     return articleService.findOne(id)
               .map(ResponseEntity::ok)
               .defaultIfRmpty(ResponseEntity.status(404).body(null));
}

 

二、takeUntil

takeUntil操做符的基本用法是takeUntil(Predicate<? super T>> predicate),其中Predicate表明一種斷言條件,takeUntil將提取元素直到斷言條件返回true。工具

示例代碼以下:url

Flux.range(1,100).takeUntil(i -> i == 10).subscribe(System.out::println);

 

三、takeWhile

takeWhile操做符的基本用法是takeWhile(Predicate<? super T>> continuePredicate),其中continuePredicate也表明一種斷言條件。與takeUntil不一樣的是,takeWhile會在continuePredicate條件返回true時才進行元素的提取。spa

示例代碼以下:3d

Flux.range(1,100).takeWhile(i -> i <= 10).subscribe(System.out::println);

 

四、skipUntil

與takeUntil相對應,skipUntil的基本用法是skipUntil(Predicate<? super T>> predicate)。skipUntil將丟棄原始數據中的元素,直到Predicate返回true。code

五、skipWhile

與takeWhile相對應,skipWhile操做符的基本用法是skipWhile(Predicate<? super T>> continuePredicate)。當continuePredicate返回true時才進行元素的丟棄。對象

2.數學操做符

Reactor中經常使用的數學操做符有concat、count、reduce等。

一、concat

concat用來合併來自不一樣Flux的數據,這種合併採用的是順序的方式。

二、count

count操做符比較簡單,用來統計Flux中元素的個數。

三、reduce

reduce操做符對流中包含的全部元素進行累積操做,獲得一個包含計算結果的Mono序列。具體的累計操做也是經過一個BiFunction來實現的。

示例代碼以下:

Flux.range(1,10).reduce((x,y) -> x+y).subscribe(System.out::println);

 

這裏BiFunction就是一個求和函數,用來對1到10的數字進行求和,運行結果爲55。

與其相似的還有一個reduceWith。

示例代碼以下:

Flux.range(1,10).reduceWith(() - >5,(x,y) -> x+y).subscribe(System.out::println);

 

這裏使用5來初始化求和過程,獲得的結果是60。

3.Observable工具操做符

Reactor中經常使用的Observable操做符有delay、subscribe、timeout等。

一、delay

delay將時間的傳遞向後延遲一段時間。

二、subscribe

在前面的代碼演示了subscribe操做符的用法,咱們能夠經過subscribe()方法來添加相應的訂閱邏輯。

在前面章節中咱們提到了Reactor中的消息類型有三種,即正常消息,異常消息和完成消息。subscribe操做符能夠只處理其中包含的正常消息,也能夠同時處理異常消息和完成消息。當咱們用subscribe處理異常消息時能夠採用如下方式。

Mono.just(100)
         .conacatWith(Mono.error(new IllegalStateException()))
         .subscribe(System.out::println,System.err::println);

 

以上代碼執行結果以下,咱們獲得了一個100,同時也獲取了IllegalStateExxeption這個異常。

100
java.lang.IllegalStateExxeption

有時候咱們不想直接拋出異常,而是想採用一個容錯策略來返回一個默認值,就能夠採用如下方式。

Mono.just(100)
         .conacatWith(Mono.error(new IllegalStateException()))
         .onErrorReturn(0)
         .subscribe(System.out::println);

 

以上代碼執行結果以下。當產生異常時,使用onErrorReturn()方法返回一個默認值0.

100
0

另外容錯策略也是經過switchOnError()方法使用另外的流產生元素。如下代碼示例演示了這種策略。

與上面的執行結果相同。

Mono.just(100)
         .conacatWith(Mono.error(new IllegalStateException()))
         .switchOnError(Mono.just(0))
         .subscribe(System.out::println);

 

三、timeout

timeout操做符維持原始被觀察者的狀態,在特定時間內沒有產生任何事件時,將生成一個異常。

四、block

block操做符在沒有接收到下一個元素以前一直被阻塞。block操做符一般用來把響應式的數據流轉換成傳統的數據流。

例如,使用以下方法時,咱們分別將Flux數據流和Mono數據流轉變成了普通的List<Order>對象和單個Order對象,一樣也能夠設置block的等待時間。

public List<Order> getAllOrder(){
      return orderService.getAllOrders().block(Duration.ofSecond(5));
}

public Order getOrderById(Long orderId){
      return orderService.getOrderById(orderId).block(Duration.ofSecond(2));
}

往期

實戰SpringCloud響應式微服務系列教程(第一章)

實戰SpringCloud響應式微服務系列教程(第二章)

實戰SpringCloud響應式微服務系列教程(第三章)

實戰SpringCloud響應式微服務系列教程(第四章)

實戰SpringCloud響應式微服務系列教程(第五章)

實戰SpringCloud響應式微服務系列教程(第六章)

相關文章
相關標籤/搜索