本章節繼續介紹:Flux和Mono操做符(二)java
Reactor中經常使用的條件操做符有defaultIfRmpty、skipUntil、skipWhile、takeUntil和takeWhile等。app
defaultIfRmpty操做符返回來自原始數據流的元素,若是原始數據流中沒有元素,則返回一個默認元素。函數
defaultIfRmpty操做符在實際開發過程當中應用普遍,一般用在對方法返回值的處理上。以下controller層對service層返回值的處理。微服務
@GetMapper("/article/{id}") public Mono<ResponseEntity<Article>> findById(@PathVariable String id){ return articleService.findOne(id) .map(ResponseEntity::ok) .defaultIfRmpty(ResponseEntity.status(404).body(null)); }
takeUntil操做符的基本用法是takeUntil(Predicate<? super T>> predicate)
,其中Predicate表明一種斷言條件,takeUntil將提取元素直到斷言條件返回true。工具
示例代碼以下:url
Flux.range(1,100).takeUntil(i -> i == 10).subscribe(System.out::println);
takeWhile操做符的基本用法是takeWhile(Predicate<? super T>> continuePredicate)
,其中continuePredicate也表明一種斷言條件。與takeUntil不一樣的是,takeWhile會在continuePredicate條件返回true時才進行元素的提取。spa
示例代碼以下:3d
Flux.range(1,100).takeWhile(i -> i <= 10).subscribe(System.out::println);
與takeUntil相對應,skipUntil的基本用法是skipUntil(Predicate<? super T>> predicate)
。skipUntil將丟棄原始數據中的元素,直到Predicate返回true。code
與takeWhile相對應,skipWhile操做符的基本用法是skipWhile(Predicate<? super T>> continuePredicate)
。當continuePredicate返回true時才進行元素的丟棄。對象
Reactor中經常使用的數學操做符有concat、count、reduce等。
concat用來合併來自不一樣Flux的數據,這種合併採用的是順序的方式。
count操做符比較簡單,用來統計Flux中元素的個數。
reduce操做符對流中包含的全部元素進行累積操做,獲得一個包含計算結果的Mono序列。具體的累計操做也是經過一個BiFunction來實現的。
示例代碼以下:
Flux.range(1,10).reduce((x,y) -> x+y).subscribe(System.out::println);
這裏BiFunction就是一個求和函數,用來對1到10的數字進行求和,運行結果爲55。
與其相似的還有一個reduceWith。
示例代碼以下:
Flux.range(1,10).reduceWith(() - >5,(x,y) -> x+y).subscribe(System.out::println);
這裏使用5來初始化求和過程,獲得的結果是60。
Reactor中經常使用的Observable操做符有delay、subscribe、timeout等。
delay將時間的傳遞向後延遲一段時間。
在前面的代碼演示了subscribe操做符的用法,咱們能夠經過subscribe()方法來添加相應的訂閱邏輯。
在前面章節中咱們提到了Reactor中的消息類型有三種,即正常消息,異常消息和完成消息。subscribe操做符能夠只處理其中包含的正常消息,也能夠同時處理異常消息和完成消息。當咱們用subscribe處理異常消息時能夠採用如下方式。
Mono.just(100) .conacatWith(Mono.error(new IllegalStateException())) .subscribe(System.out::println,System.err::println);
以上代碼執行結果以下,咱們獲得了一個100,同時也獲取了IllegalStateExxeption這個異常。
100
java.lang.IllegalStateExxeption
有時候咱們不想直接拋出異常,而是想採用一個容錯策略來返回一個默認值,就能夠採用如下方式。
Mono.just(100) .conacatWith(Mono.error(new IllegalStateException())) .onErrorReturn(0) .subscribe(System.out::println);
以上代碼執行結果以下。當產生異常時,使用onErrorReturn()方法返回一個默認值0.
100
0
另外容錯策略也是經過switchOnError()方法使用另外的流產生元素。如下代碼示例演示了這種策略。
與上面的執行結果相同。
Mono.just(100) .conacatWith(Mono.error(new IllegalStateException())) .switchOnError(Mono.just(0)) .subscribe(System.out::println);
timeout操做符維持原始被觀察者的狀態,在特定時間內沒有產生任何事件時,將生成一個異常。
block操做符在沒有接收到下一個元素以前一直被阻塞。block操做符一般用來把響應式的數據流轉換成傳統的數據流。
例如,使用以下方法時,咱們分別將Flux數據流和Mono數據流轉變成了普通的List<Order>
對象和單個Order對象,一樣也能夠設置block的等待時間。
public List<Order> getAllOrder(){ return orderService.getAllOrders().block(Duration.ofSecond(5)); } public Order getOrderById(Long orderId){ return orderService.getOrderById(orderId).block(Duration.ofSecond(2)); }
實戰SpringCloud響應式微服務系列教程(第五章)