本文主要展現一下reactive streams的一些transform操做java
@Test public void testMerge(){ Flux<String> flux1 = Flux.interval(Duration.ofSeconds(1)) .take(3) .map(e -> "[flux1]:"+e); Flux<String> mergeFlux = Flux.interval(Duration.ofSeconds(1)) .delayElements(Duration.ofSeconds(1)) .take(3) .map(e -> "[flux2]:"+e) .mergeWith(flux1); mergeFlux.subscribe(e -> { LOGGER.info("subscribe:{}",e); }); mergeFlux.blockLast(); }
輸出實例react
21:18:07.583 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 21:18:08.618 [parallel-2] INFO com.example.demo.TransformTest - subscribe:[flux1]:0 21:18:09.619 [parallel-2] INFO com.example.demo.TransformTest - subscribe:[flux1]:1 21:18:09.645 [parallel-6] INFO com.example.demo.TransformTest - subscribe:[flux2]:0 21:18:10.619 [parallel-2] INFO com.example.demo.TransformTest - subscribe:[flux1]:2 21:18:10.649 [parallel-8] INFO com.example.demo.TransformTest - subscribe:[flux2]:1 21:18:11.654 [parallel-2] INFO com.example.demo.TransformTest - subscribe:[flux2]:2
能夠發現,他們是交叉合併的。
@Test public void testConcat(){ Flux<String> flux1 = Flux.interval(Duration.ofSeconds(1)) .take(3) .map(e -> "[flux1]:"+e); Flux<String> concatFlux = Flux.interval(Duration.ofSeconds(1)) .delayElements(Duration.ofSeconds(1)) .take(3) .map(e -> "[flux2]:"+e) .concatWith(flux1); concatFlux.subscribe(e -> { LOGGER.info("subscribe:{}",e); }); concatFlux.blockLast(); }
輸出app
21:19:00.779 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 21:19:02.832 [parallel-4] INFO com.example.demo.TransformTest - subscribe:[flux2]:0 21:19:03.836 [parallel-6] INFO com.example.demo.TransformTest - subscribe:[flux2]:1 21:19:04.840 [parallel-8] INFO com.example.demo.TransformTest - subscribe:[flux2]:2 21:19:05.845 [parallel-2] INFO com.example.demo.TransformTest - subscribe:[flux1]:0 21:19:06.845 [parallel-2] INFO com.example.demo.TransformTest - subscribe:[flux1]:1 21:19:07.844 [parallel-2] INFO com.example.demo.TransformTest - subscribe:[flux1]:2
能夠發現concatWith只是鏈接兩個flux的數據,並非按emit的順序交叉來
@Test public void testZip(){ List<String> firstList = Lists.newArrayList("a","b","c","d","e","a","b"); List<String> secondList = Lists.newArrayList("1","2","3","4","5"); Flux<Tuple2<String,String>> zipFlux = Flux.fromIterable(firstList) .zipWith(Flux.fromIterable(secondList)); zipFlux.subscribe(e -> { LOGGER.info("subscribe:{}",e); }); }
輸出以下異步
21:20:59.506 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 21:20:59.516 [main] INFO com.example.demo.TransformTest - subscribe:[a,1] 21:20:59.517 [main] INFO com.example.demo.TransformTest - subscribe:[b,2] 21:20:59.517 [main] INFO com.example.demo.TransformTest - subscribe:[c,3] 21:20:59.517 [main] INFO com.example.demo.TransformTest - subscribe:[d,4] 21:20:59.517 [main] INFO com.example.demo.TransformTest - subscribe:[e,5]
能夠發現flux1相比flux2多餘的數據沒有被zip
@Test public void testFlatMap(){ List<String> secondList = Lists.newArrayList("1","2","3","4","5"); Flux<String> flatMapFlux = Flux.fromIterable(secondList) .flatMap((str) ->{ return Mono.just(str).repeat(2).map(String::toUpperCase).delayElements(Duration.ofMillis(1)); }); flatMapFlux.subscribe(e -> { LOGGER.info("subscribe:{}",e); }); flatMapFlux.blockLast(); Flux<String> mapFlux = Flux.fromIterable(secondList) .repeat(2) .map(String::toUpperCase); mapFlux.subscribe(e -> { LOGGER.info("map subscribe:{}",e); }); mapFlux.blockLast(); }
輸出3d
21:33:46.904 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 21:33:46.958 [parallel-1] INFO com.example.demo.TransformTest - subscribe:1 21:33:46.959 [parallel-1] INFO com.example.demo.TransformTest - subscribe:2 21:33:46.959 [parallel-1] INFO com.example.demo.TransformTest - subscribe:3 21:33:46.960 [parallel-1] INFO com.example.demo.TransformTest - subscribe:4 21:33:46.960 [parallel-1] INFO com.example.demo.TransformTest - subscribe:5 21:33:46.960 [parallel-1] INFO com.example.demo.TransformTest - subscribe:2 21:33:46.960 [parallel-7] INFO com.example.demo.TransformTest - subscribe:3 21:33:46.960 [parallel-8] INFO com.example.demo.TransformTest - subscribe:4 21:33:46.960 [parallel-1] INFO com.example.demo.TransformTest - subscribe:5 21:33:46.961 [parallel-6] INFO com.example.demo.TransformTest - subscribe:1 21:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:1 21:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:2 21:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:3 21:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:4 21:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:5 21:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:1 21:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:2 21:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:3 21:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:4 21:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:5
flatMap是異步的
@Test public void testReduce(){ List<String> secondList = Lists.newArrayList("1","2","3","4","5"); Mono<Integer> reduceMono = Flux.fromIterable(secondList) .flatMap(e -> Mono.just(e).map(item -> Integer.valueOf(item))) .reduce((total, e) -> total + e); reduceMono.subscribe(e -> { LOGGER.info("subscribe:{}",e); }); }
輸出code
21:36:29.978 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 21:36:30.014 [main] INFO com.example.demo.TransformTest - subscribe:15
@Test public void testGroup(){ List<String> firstList = Lists.newArrayList("a","b","c","d","e","a","b"); Flux<GroupedFlux<String, String>> groupFlux = Flux.fromIterable(firstList) .map(String::toUpperCase) .groupBy(key -> key); groupFlux.subscribe(e -> { LOGGER.info("subscribe:{}",e.collectList().subscribe(item -> { LOGGER.info("item:{}",item); })); }); }
輸出orm
21:37:00.912 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 21:37:00.949 [main] INFO com.example.demo.TransformTest - subscribe:reactor.core.publisher.LambdaMonoSubscriber@5faeada1 21:37:00.951 [main] INFO com.example.demo.TransformTest - subscribe:reactor.core.publisher.LambdaMonoSubscriber@1563da5 21:37:00.951 [main] INFO com.example.demo.TransformTest - subscribe:reactor.core.publisher.LambdaMonoSubscriber@2bbf4b8b 21:37:00.951 [main] INFO com.example.demo.TransformTest - subscribe:reactor.core.publisher.LambdaMonoSubscriber@30a3107a 21:37:00.951 [main] INFO com.example.demo.TransformTest - subscribe:reactor.core.publisher.LambdaMonoSubscriber@33c7e1bb 21:37:00.951 [main] INFO com.example.demo.TransformTest - item:[A, A] 21:37:00.952 [main] INFO com.example.demo.TransformTest - item:[B, B] 21:37:00.952 [main] INFO com.example.demo.TransformTest - item:[C] 21:37:00.952 [main] INFO com.example.demo.TransformTest - item:[D] 21:37:00.952 [main] INFO com.example.demo.TransformTest - item:[E]
@Test public void testFirst(){ List<String> firstList = Lists.newArrayList("a","b","c","d","e","a","b"); List<String> secondList = Lists.newArrayList("1","2","3","4","5"); Flux<String> firstFlux = Flux.fromIterable(firstList) .delayElements(Duration.ofMillis(200)); Flux<String> secondFlux = Flux.fromIterable(secondList) .take(2); Flux<String> result = Flux.first(firstFlux, secondFlux); result.subscribe(e -> { LOGGER.info("subscribe:{}",e); }); }
@Test public void testToIterable(){ List<String> firstList = Lists.newArrayList("a","b","c","d","e","a","b"); Iterable<String> itr = Flux.fromIterable(firstList) .map(String::toUpperCase) .toIterable(); itr.forEach(e -> LOGGER.info(e)); }
輸出flux
21:39:35.031 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 21:39:35.045 [main] INFO com.example.demo.TransformTest - A 21:39:35.045 [main] INFO com.example.demo.TransformTest - B 21:39:35.045 [main] INFO com.example.demo.TransformTest - C 21:39:35.045 [main] INFO com.example.demo.TransformTest - D 21:39:35.045 [main] INFO com.example.demo.TransformTest - E 21:39:35.045 [main] INFO com.example.demo.TransformTest - A 21:39:35.045 [main] INFO com.example.demo.TransformTest - B
reactive streams的操做至關於在jdk的streams的基礎上實現了reactive化,能夠參照着瞭解。ip