對 Stream 執行排序操做只要調用排序 API 就行了,要實現相反的效果(混排)卻並不簡單。html
本文介紹瞭如何使用 Java Stream `Collectors` 工廠方法與自定義 `Spliterator` 對 Stream 進行 Shuffle(混排),支持 Eager 與 Lazy 兩種模式。java
1. Eager Shuffle Collectorgit
Heinz [在這篇文章][1]中給出了一種解決方案:將整個 Stream 轉換爲 list,對 list 執行 `Collections#shuffle`,再轉爲 Stream。像下面這樣封裝成一個複合操做:github
[1]:https://www.javaspecialists.eu/archive/Issue258.htmlshell
```java
public static <T> Collector<T, ?, Stream<T>> toEagerShuffledStream() {
return Collectors.collectingAndThen(
toList(),
list -> {
Collections.shuffle(list);
return list.stream();
});
}
```
這種方法適用於對 Steam 中全部元素進行混排。因爲會提早對集合中全部元素進行 Shuffle,若是隻處理其中一部分則效果不佳,極端狀況好比 Stream 只包含1個元素。數組
讓咱們來看看一個簡單基準測試的運行結果:併發
```java
@State(Scope.Benchmark)
public class RandomSpliteratorBenchmark {
private List<String> source;
@Param({"1", "10", "100", "1000", "10000", "10000"})
public int limit;
@Param({"100000"})
public int size;
@Setup(Level.Iteration)
public void setUp() {
source = IntStream.range(0, size)
.boxed()
.map(Object::toString)
.collect(Collectors.toList());
}
@Benchmark
public List<String> eager() {
return source.stream()
.collect(toEagerShuffledStream())
.limit(limit)
.collect(Collectors.toList());
}
```
```shell
(limit) Mode Cnt Score Error Units
eager 1 thrpt 5 467.796 ± 9.074 ops/s
eager 10 thrpt 5 467.694 ± 17.166 ops/s
eager 100 thrpt 5 459.765 ± 8.048 ops/s
eager 1000 thrpt 5 467.934 ± 43.095 ops/s
eager 10000 thrpt 5 449.471 ± 5.549 ops/s
eager 100000 thrpt 5 331.111 ± 5.626 ops/s
```
從上面的數據能夠看出,儘管運行結果 Stream 中元素不斷增長,運行效果仍是至關不錯。所以,對整個集合提早混排太浪費了,尤爲是元素較少的時候得分不好。app
讓咱們看看來有什麼好辦法。dom
2. Lazy Shuffle Collectoride
爲了節省 CPU 資源,與其對集合中全部元素預處理,不如根據須要只處理其中一部分。
爲了達到這個效果,須要自定義一個 Spliterator 對全部對元素隨機遍歷,而後經過 `StreamSupport.stream` 構造建立一個 Stream 對象:
```java
public class RandomSpliterator<T> implements Spliterator<T> {
// ...
public static <T> Collector<T, ?, Stream<T>> toLazyShuffledStream() {
return Collectors.collectingAndThen(
toList(),
list -> StreamSupport.stream(
new ShuffledSpliterator<>(list), false));
}
}
```
3. 實現細節
即便只取出一個隨機元素,也不能避免計算整個 Steam 中的元素(這意味着不支持無限序列)。所以,能夠用 `List<T>` 初始化 `RandomSpliterator<T>`。「注意,這裏有一個陷阱」。
若是給定 `List` 不支持在常量時間內完成隨機訪問,這種方案要比 Eager 方案慢得多。爲了不這種狀況,能夠在實例化 `Spliterator` 的時候進行簡單檢查:
```java
private RandomSpliterator(
List<T> source, Supplier<? extends Random> random) {
if (source.isEmpty()) { ... } // throw
this.source = source instanceof RandomAccess
? source
: new ArrayList<>(source);
this.random = random.get();
}
```
相比隨機訪問時間複雜度不是 O(1) 的實現,建立 `ArrayList` 的成本能夠忽略不計。
如今重寫最重要的 `tryAdvance()` 方法。實現很簡單,每次迭代都從 `source` 集合中隨機挑選並刪除一個元素。
沒必要擔憂 `source` 發生改變。這裏不發佈 `RandomSpliterator`,只返回基於它的一個 `Collector`:
```java
@Override
public boolean tryAdvance(Consumer<? super T> action) {
int remaining = source.size();
if (remaining > 0 ) {
action.accept(source.remove(random.nextInt(remaining)));
return true;
} else {
return false;
}
}
```
除此以外,還須要實現其它3個方法:
```java
@Override
public Spliterator<T> trySplit() {
return null; // 表示 split 可不行
}
@Override
public long estimateSize() {
return source.size();
}
@Override
public int characteristics() {
return SIZED;
}
```
如今檢查一下是否有效果:
```java
IntStream.range(0, 10).boxed()
.collect(toLazyShuffledStream())
.forEach(System.out::println);
```
結果以下:
```shell
3
4
8
1
7
6
5
0
2
9
```
4. 性能考慮
在這個實現中,咱們把大小爲 N 的數組換成 M 查找或刪除:
N:集合大小
M:挑選元素的數量
從 `ArrayList` 中查找或刪除單個元素一般比交換開銷大,所以方案的可擴展性不夠好。可是對於 M 值較小的時候性能會好不少。
如今對比 Eager 方案(都包含100000個對象):
```shell
(limit) Mode Cnt Score Error Units
eager 1 thrpt 5 467.796 ± 9.074 ops/s
eager 10 thrpt 5 467.694 ± 17.166 ops/s
eager 100 thrpt 5 459.765 ± 8.048 ops/s
eager 1000 thrpt 5 467.934 ± 43.095 ops/s
eager 10000 thrpt 5 449.471 ± 5.549 ops/s
eager 100000 thrpt 5 331.111 ± 5.626 ops/s
lazy 1 thrpt 5 1530.763 ± 72.096 ops/s
lazy 10 thrpt 5 1462.305 ± 23.860 ops/s
lazy 100 thrpt 5 823.212 ± 119.771 ops/s
lazy 1000 thrpt 5 166.786 ± 16.306 ops/s
lazy 10000 thrpt 5 19.475 ± 4.052 ops/s
lazy 100000 thrpt 5 4.097 ± 0.416 ops/s
```
(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/chart.png)
能夠明顯看到,若是數據流元素較少,新方案的性能優於前者。但隨着「處理數量/集合大小」增長,吞吐量急劇降低。
這是由於從 `ArrayList` 中移除元素會帶來額外開銷,每次移除都會調用 `System#arraycopy` 對內部數組執行移位操做,開銷較大。
對於較大的集合(1000000個元素)能夠看到相似的模式:
```shell
(limit) (size) Mode Cnt Score Err Units
eager 1 10000000 thrpt 5 0.915 ops/s
eager 10 10000000 thrpt 5 0.783 ops/s
eager 100 10000000 thrpt 5 0.965 ops/s
eager 1000 10000000 thrpt 5 0.936 ops/s
eager 10000 10000000 thrpt 5 0.860 ops/s
lazy 1 10000000 thrpt 5 4.338 ops/s
lazy 10 10000000 thrpt 5 3.149 ops/s
lazy 100 10000000 thrpt 5 2.060 ops/s
lazy 1000 10000000 thrpt 5 0.370 ops/s
lazy 10000 10000000 thrpt 5 0.05 ops/s
```
(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/chart-2.png)
在更小集合(128個元素)上的表現:
```shell
(limit) (size) Mode Cnt Score Error Units
eager 2 128 thrpt 5 246439.459 ops/s
eager 4 128 thrpt 5 333866.936 ops/s
eager 8 128 thrpt 5 340296.188 ops/s
eager 16 128 thrpt 5 345533.673 ops/s
eager 32 128 thrpt 5 231725.156 ops/s
eager 64 128 thrpt 5 314324.265 ops/s
eager 128 128 thrpt 5 270451.992 ops/s
lazy 2 128 thrpt 5 765989.718 ops/s
lazy 4 128 thrpt 5 659421.041 ops/s
lazy 8 128 thrpt 5 652685.515 ops/s
lazy 16 128 thrpt 5 470346.570 ops/s
lazy 32 128 thrpt 5 324174.691 ops/s
lazy 64 128 thrpt 5 186472.090 ops/s
lazy 128 128 thrpt 5 108105.699 ops/s
```
(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/chart-3.png)
能不能進一步優化?
5. 進一步提升性能
不幸的是,現有的解決方案擴展性不盡如人意,讓咱們試着改進。但在此以前,先對現有操做進行測評:
(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/Screenshot-2019-01-03-at-16.36.58.png)
不出意外,`Arraylist#remove` 是開銷最大的操做之一。換句話說,從 `ArrayList` 中刪除元素耗費了大量 CPU 資源。
爲何呢?從 `ArrayList` 中刪除元素會對底層實現的數組執行移除操做。問題是,Java 數組不會自動調整大小,每次移除都會建立一個更小的新數組:
```java
private void fastRemove(Object[] es, int i) {
modCount++;
final int newSize;
if ((newSize = size - 1) > i)
System.arraycopy(es, i + 1, es, i, newSize - i);
es[size = newSize] = null;
}
```
接下來該怎麼辦?避免從 `ArrayList` 中移除元素。
爲了達到這個效果,能夠用一個數組存儲剩餘的元素並記錄它的大小:
```java
public class ImprovedRandomSpliterator<T> implements Spliterator<T> {
private final Random random;
private final T[] source;
private int size;
private ImprovedRandomSpliterator(
List<T> source, Supplier<? extends Random> random) {
if (source.isEmpty()) {
throw new IllegalArgumentException(...);
}
this.source = (T[]) source.toArray();
this.random = random.get();
this.size = this.source.length;
}
}
```
幸運的是,因爲 `Spliterator` 的實例不會在線程之間共享,所以不會遇到併發問題。
如今嘗試移除元素時,實際上不須要建立縮小後的新數組。相反,只要減少 `size` 並忽略數組的其他部分便可。
在此以前,把最後一個元素與返回的元素交換:
```java
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if (size > 0) {
int nextIdx = random.nextInt(size);
int lastIdx = size - 1;
action.accept(source[nextIdx]);
source[nextIdx] = source[lastIdx];
source[lastIdx] = null; // let object be GCed
size--;
return true;
} else {
return false;
}
}
```
對改進後的方案進行評測,能夠看到開銷最大的調用已經消失了:
(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/Screenshot-2019-01-03-at-16.38.47.png)
準備在此運行基準測試進行比較:
```shell
(limit) (size) Mode Cnt Score Error Units
eager 1 100000 thrpt 3 456.811 ± 20.585 ops/s
eager 10 100000 thrpt 3 469.635 ± 23.281 ops/s
eager 100 100000 thrpt 3 466.486 ± 68.820 ops/s
eager 1000 100000 thrpt 3 454.459 ± 13.103 ops/s
eager 10000 100000 thrpt 3 443.640 ± 96.929 ops/s
eager 100000 100000 thrpt 3 335.134 ± 21.944 ops/s
lazy 1 100000 thrpt 3 1587.536 ± 389.128 ops/s
lazy 10 100000 thrpt 3 1452.855 ± 406.879 ops/s
lazy 100 100000 thrpt 3 814.978 ± 242.077 ops/s
lazy 1000 100000 thrpt 3 167.825 ± 129.559 ops/s
lazy 10000 100000 thrpt 3 19.782 ± 8.596 ops/s
lazy 100000 100000 thrpt 3 3.970 ± 0.408 ops/s
lazy_improved 1 100000 thrpt 3 1509.264 ± 170.423 ops/s
lazy_improved 10 100000 thrpt 3 1512.150 ± 143.927 ops/s
lazy_improved 100 100000 thrpt 3 1463.093 ± 593.370 ops/s
lazy_improved 1000 100000 thrpt 3 1451.007 ± 58.948 ops/s
lazy_improved 10000 100000 thrpt 3 1148.581 ± 232.218 ops/s
lazy_improved 100000 100000 thrpt 3 383.022 ± 97.082 ops/s
```
(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/chart-5.png)
從上面的結果能夠看出,改進後的方案性能受元素數量變化影響顯著減少。
實際上,即便遇到最差狀況,改進方案的性能也比基於 `Collections#shuffle` 的方案略好一些。
6. 完整示例
完整示例能夠在 [GitHub][2] 上找到。
[2]:https://github.com/pivovarit/articles/tree/master/java-random-stream
```java
package com.pivovarit.stream;
import java.util.List;
import java.util.Random;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.function.Supplier;
public class ImprovedRandomSpliterator<T> implements Spliterator<T> {
private final Random random;
private final T[] source;
private int size;
ImprovedRandomSpliterator(List<T> source, Supplier<? extends Random> random) {
if (source.isEmpty()) {
throw new IllegalArgumentException("RandomSpliterator can't be initialized with an empty collection");
}
this.source = (T[]) source.toArray();
this.random = random.get();
this.size = this.source.length;
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if (size > 0) {
int nextIdx = random.nextInt(size);
int lastIdx = size - 1;
action.accept(source[nextIdx]);
source[nextIdx] = source[lastIdx];
source[lastIdx] = null; // let object be GCed
size--;
return true;
} else {
return false;
}
}
@Override
public Spliterator<T> trySplit() {
return null;
}
@Override
public long estimateSize() {
return source.length;
}
@Override
public int characteristics() {
return SIZED;
}
}
```
```java
package com.pivovarit.stream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Random;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static java.util.stream.Collectors.toCollection;
public final class RandomCollectors {
private RandomCollectors() {
}
public static <T> Collector<T, ?, Stream<T>> toImprovedLazyShuffledStream() {
return Collectors.collectingAndThen(
toCollection(ArrayList::new),
list -> !list.isEmpty()
? StreamSupport.stream(new ImprovedRandomSpliterator<>(list, Random::new), false)
: Stream.empty());
}
public static <T> Collector<T, ?, Stream<T>> toLazyShuffledStream() {
return Collectors.collectingAndThen(
toCollection(ArrayList::new),
list -> !list.isEmpty()
? StreamSupport.stream(new RandomSpliterator<>(list, Random::new), false)
: Stream.empty());
}
public static <T> Collector<T, ?, Stream<T>> toEagerShuffledStream() {
return Collectors.collectingAndThen(
toCollection(ArrayList::new),
list -> {
Collections.shuffle(list);
return list.stream(); }); }}```