正常狀況下,一個流在執行一次終端操做以後便結束了。本文經過複製流內數據的方式,曲折的實現了同一個流上執行屢次操做。app
Demo只是思路,其性能並不必定高效,尤爲是數據都在內存中處理時複製的開銷很大。但若是流涉及大量I/O,也許性能會有提升。ide
public class StreamForker<T> { private final Stream<T> stream; private final Map<Object, Function<Stream<T>, ?>> forks = new HashMap<>(); public StreamForker(Stream<T> stream) { this.stream = stream; } public StreamForker<T> fork(Object key, Function<Stream<T>, ?> f) { forks.put(key, f); return this; } public Results getResults() { ForkingStreamConsumer<T> consumer = build(); try { stream.sequential().forEach(consumer); } finally { consumer.finish(); } return consumer; } private ForkingStreamConsumer<T> build() { List<BlockingQueue<T>> queues = new ArrayList<>(); Map<Object, Future<?>> actions = forks.entrySet().stream().reduce(new HashMap<Object, Future<?>>(), (map, e) -> { map.put(e.getKey(), getOperationResult(queues, e.getValue())); return map; }, (m1, m2) -> { m1.putAll(m2); return m1; }); return new ForkingStreamConsumer<>(queues, actions); } private Future<?> getOperationResult(List<BlockingQueue<T>> queues, Function<Stream<T>, ?> f) { BlockingQueue<T> queue = new LinkedBlockingQueue<>(); queues.add(queue); Spliterator<T> spliterator = new BlockingQueueSpliterator<>(queue); Stream<T> source = StreamSupport.stream(spliterator, false); return CompletableFuture.supplyAsync(() -> f.apply(source)); } }
accept方法將原始流中全部的數據添加到各個BlockingQueue內,此處實現了複製性能
class ForkingStreamConsumer<T> implements Consumer<T>, Results { static final Object END_OF_STREAM = new Object(); private final List<BlockingQueue<T>> queues; private final Map<Object, Future<?>> actions; public ForkingStreamConsumer(List<BlockingQueue<T>> queues, Map<Object, Future<?>> actions) { this.queues = queues; this.actions = actions; } @Override public void accept(T t) { queues.forEach(q -> q.add(t)); } @SuppressWarnings("unchecked") void finish() { accept((T) END_OF_STREAM); } @SuppressWarnings("unchecked") @Override public <R> R get(Object key) { try { return ((Future<R>) actions.get(key)).get(); } catch (Exception e) { throw new RuntimeException(e); } } }
此處重寫了tryAdvance
接口,只是簡單的從BlockingQueue中取出數據,執行action。業務邏輯中複製流是爲了作什麼事情,action就是這件事情。ForkingStreamConsumer.END_OF_STREAM
是Queue中數據結束的標示ui
class BlockingQueueSpliterator<T> implements Spliterator<T> { private final BlockingQueue<T> q; BlockingQueueSpliterator(BlockingQueue<T> q) { this.q = q; } @Override public boolean tryAdvance(Consumer<? super T> action) { T t; while (true) { try { t = q.take(); break; } catch (InterruptedException e) { } } if (t != ForkingStreamConsumer.END_OF_STREAM) { action.accept(t); return true; } return false; } @Override public Spliterator<T> trySplit() { return null; } @Override public long estimateSize() { return 0; } @Override public int characteristics() { return 0; } }