本文主要研究下spring webflux返回application/stream+json的實例html
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency>
/** * curl -i localhost:8080/stream * @return */ @GetMapping(value = "/stream",produces = MediaType.APPLICATION_STREAM_JSON_VALUE) public Flux<Price> priceStream(){ return Flux.interval(Duration.ofMillis(500)) .map(l -> new Price(System.currentTimeMillis(),ThreadLocalRandom.current().nextInt(100, 125))) .log(); }
注意這裏produces = MediaType.APPLICATION_STREAM_JSON_VALUE
若是不是application/stream+json則調用端沒法滾動獲得結果,將一直阻塞等待數據流結束或超時。
2018-02-08 21:36:49.701 INFO 1270 --- [ctor-http-nio-2] reactor.Flux.Map.1 : onSubscribe(FluxMap.MapSubscriber) 2018-02-08 21:36:49.702 INFO 1270 --- [ctor-http-nio-2] reactor.Flux.Map.1 : request(1) 2018-02-08 21:36:50.208 INFO 1270 --- [ parallel-2] reactor.Flux.Map.1 : onNext(Price(timestamp=1518097010208, value=120.0)) 2018-02-08 21:36:50.229 INFO 1270 --- [ctor-http-nio-2] reactor.Flux.Map.1 : request(31) 2018-02-08 21:36:50.708 INFO 1270 --- [ parallel-2] reactor.Flux.Map.1 : onNext(Price(timestamp=1518097010708, value=124.0)) 2018-02-08 21:36:51.208 INFO 1270 --- [ parallel-2] reactor.Flux.Map.1 : onNext(Price(timestamp=1518097011208, value=119.0)) 2018-02-08 21:36:51.707 INFO 1270 --- [ parallel-2] reactor.Flux.Map.1 : onNext(Price(timestamp=1518097011707, value=120.0)) 2018-02-08 21:36:52.207 INFO 1270 --- [ parallel-2] reactor.Flux.Map.1 : onNext(Price(timestamp=1518097012207, value=109.0)) 2018-02-08 21:36:52.707 INFO 1270 --- [ parallel-2] reactor.Flux.Map.1 : onNext(Price(timestamp=1518097012707, value=101.0)) 2018-02-08 21:36:53.208 INFO 1270 --- [ parallel-2] reactor.Flux.Map.1 : onNext(Price(timestamp=1518097013208, value=114.0)) 2018-02-08 21:36:53.707 INFO 1270 --- [ parallel-2] reactor.Flux.Map.1 : onNext(Price(timestamp=1518097013707, value=113.0)) 2018-02-08 21:36:54.206 INFO 1270 --- [ parallel-2] reactor.Flux.Map.1 : onNext(Price(timestamp=1518097014206, value=105.0)) 2018-02-08 21:36:54.708 INFO 1270 --- [ parallel-2] reactor.Flux.Map.1 : onNext(Price(timestamp=1518097014708, value=103.0)) 2018-02-08 21:36:55.208 INFO 1270 --- [ parallel-2] reactor.Flux.Map.1 : onNext(Price(timestamp=1518097015207, value=123.0)) 2018-02-08 21:36:55.212 INFO 1270 --- [ctor-http-nio-2] reactor.Flux.Map.1 : cancel()
curl -i localhost:8080/stream HTTP/1.1 200 OK transfer-encoding: chunked Content-Type: application/stream+json;charset=UTF-8 {"timestamp":1518097010208,"value":120.0} {"timestamp":1518097010708,"value":124.0} {"timestamp":1518097011208,"value":119.0} {"timestamp":1518097011707,"value":120.0} {"timestamp":1518097012207,"value":109.0} {"timestamp":1518097012707,"value":101.0} {"timestamp":1518097013208,"value":114.0} {"timestamp":1518097013707,"value":113.0} {"timestamp":1518097014206,"value":105.0} ^C
能夠看到因爲使用了application/stream+json,返回的transfer-encoding是chunked,所以調用端能夠作到滾動輸出。
使用了webflux以後,可能好奇以前的分頁調用怎麼辦。reactive-streams是把數據當作數據流來用的,所以spring data reactive並不支持返回Page,可是調用參數能夠傳Pageable參數react
public interface StocDao extends ReactiveCrudRepository<Stock, String> { Flux<Stock> findByName(String name,Pageable pageable); }
注意這裏返回Flux<Stock>,而不是Page<Stock>
也就是至關於丟失了total count
對於webflux返回的Flux的流數據,須要配合返回MediaType.APPLICATION_STREAM_JSON_VALUE,同時調用端也須要可以支持這種mediaType(WebClient支持
),這樣才能啓到reactive streams的效果。git