用 Java 實現 Stream 高效混排與 Spliterator

對 Stream 執行排序操做只要調用排序 API 就行了,要實現相反的效果(混排)卻並不簡單。html


本文介紹瞭如何使用 Java Stream `Collectors` 工廠方法與自定義 `Spliterator` 對 Stream 進行 Shuffle(混排),支持 Eager 與 Lazy 兩種模式。java


1. Eager Shuffle Collectorgit


Heinz [在這篇文章][1]中給出了一種解決方案:將整個 Stream 轉換爲 list,對 list 執行 `Collections#shuffle`,再轉爲 Stream。像下面這樣封裝成一個複合操做:github


[1]:https://www.javaspecialists.eu/archive/Issue258.htmlshell


```java
public static <T> Collector<T, ?, Stream<T>> toEagerShuffledStream() {
   return Collectors.collectingAndThen(
     toList(),
     list -> {
         Collections.shuffle(list);
         return list.stream();
     });
}
```


這種方法適用於對 Steam 中全部元素進行混排。因爲會提早對集合中全部元素進行 Shuffle,若是隻處理其中一部分則效果不佳,極端狀況好比 Stream 只包含1個元素。數組


讓咱們來看看一個簡單基準測試的運行結果:併發


```java
@State(Scope.Benchmark)
public class RandomSpliteratorBenchmark {
   private List<String> source;

   @Param({"1", "10", "100", "1000", "10000", "10000"})
   public int limit;

   @Param({"100000"})
   public int size;

   @Setup(Level.Iteration)
   public void setUp() {
       source = IntStream.range(0, size)
         .boxed()
         .map(Object::toString)
         .collect(Collectors.toList());
   }

   @Benchmark
   public List<String> eager() {
       return source.stream()
         .collect(toEagerShuffledStream())
         .limit(limit)
         .collect(Collectors.toList());
   }
```


```shell
           (limit)   Mode  Cnt     Score     Error  Units
eager             1  thrpt    5   467.796 ±   9.074  ops/s
eager            10  thrpt    5   467.694 ±  17.166  ops/s
eager           100  thrpt    5   459.765 ±   8.048  ops/s
eager          1000  thrpt    5   467.934 ±  43.095  ops/s
eager         10000  thrpt    5   449.471 ±   5.549  ops/s
eager        100000  thrpt    5   331.111 ±   5.626  ops/s
```


從上面的數據能夠看出,儘管運行結果 Stream 中元素不斷增長,運行效果仍是至關不錯。所以,對整個集合提早混排太浪費了,尤爲是元素較少的時候得分不好。app


讓咱們看看來有什麼好辦法。dom


2. Lazy Shuffle Collectoride


爲了節省 CPU 資源,與其對集合中全部元素預處理,不如根據須要只處理其中一部分。


爲了達到這個效果,須要自定義一個 Spliterator 對全部對元素隨機遍歷,而後經過 `StreamSupport.stream` 構造建立一個 Stream 對象:


```java
public class RandomSpliterator<T> implements Spliterator<T> {
   // ...
   public static <T> Collector<T, ?, Stream<T>> toLazyShuffledStream() {
       return Collectors.collectingAndThen(
         toList(),
         list -> StreamSupport.stream(
           new ShuffledSpliterator<>(list), false));
   }
}
```


3. 實現細節


即便只取出一個隨機元素,也不能避免計算整個 Steam 中的元素(這意味着不支持無限序列)。所以,能夠用 `List<T>` 初始化 `RandomSpliterator<T>`。「注意,這裏有一個陷阱」。


若是給定 `List` 不支持在常量時間內完成隨機訪問,這種方案要比 Eager 方案慢得多。爲了不這種狀況,能夠在實例化 `Spliterator` 的時候進行簡單檢查:


```java
private RandomSpliterator(
 List<T> source, Supplier<? extends Random> random)
{
   if (source.isEmpty()) { ... } // throw
   this.source = source instanceof RandomAccess
     ? source
     : new ArrayList<>(source);
   this.random = random.get();
}
```


相比隨機訪問時間複雜度不是 O(1) 的實現,建立 `ArrayList` 的成本能夠忽略不計。


如今重寫最重要的 `tryAdvance()` 方法。實現很簡單,每次迭代都從 `source` 集合中隨機挑選並刪除一個元素。


沒必要擔憂 `source` 發生改變。這裏不發佈 `RandomSpliterator`,只返回基於它的一個 `Collector`:


```java
@Override
public boolean tryAdvance(Consumer<? super T> action)
{
   int remaining = source.size();
   if (remaining > 0 ) {
       action.accept(source.remove(random.nextInt(remaining)));
       return true;
   } else {
       return false;
   }
}
```


除此以外,還須要實現其它3個方法:


```java
@Override
public Spliterator<T> trySplit() {
   return null; // 表示 split 可不行
}

@Override
public long estimateSize()
{
   return source.size();
}

@Override
public int characteristics()
{
   return SIZED;
}
```


如今檢查一下是否有效果:


```java
IntStream.range(0, 10).boxed()
 .collect(toLazyShuffledStream())
 .forEach(System.out::println);
```


結果以下:


```shell
3
4
8
1
7
6
5
0
2
9
```


4. 性能考慮


在這個實現中,咱們把大小爲 N 的數組換成 M 查找或刪除:


  • N:集合大小

  • M:挑選元素的數量


從 `ArrayList` 中查找或刪除單個元素一般比交換開銷大,所以方案的可擴展性不夠好。可是對於 M 值較小的時候性能會好不少。


如今對比 Eager 方案(都包含100000個對象):


```shell
           (limit)   Mode  Cnt     Score     Error  Units
eager             1  thrpt    5   467.796 ±   9.074  ops/s
eager            10  thrpt    5   467.694 ±  17.166  ops/s
eager           100  thrpt    5   459.765 ±   8.048  ops/s
eager          1000  thrpt    5   467.934 ±  43.095  ops/s
eager         10000  thrpt    5   449.471 ±   5.549  ops/s
eager        100000  thrpt    5   331.111 ±   5.626  ops/s
lazy              1  thrpt    5  1530.763 ±  72.096  ops/s
lazy             10  thrpt    5  1462.305 ±  23.860  ops/s
lazy            100  thrpt    5   823.212 ± 119.771  ops/s
lazy           1000  thrpt    5   166.786 ±  16.306  ops/s
lazy          10000  thrpt    5    19.475 ±   4.052  ops/s
lazy         100000  thrpt    5     4.097 ±   0.416  ops/s
```


(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/chart.png)


能夠明顯看到,若是數據流元素較少,新方案的性能優於前者。但隨着「處理數量/集合大小」增長,吞吐量急劇降低。


這是由於從 `ArrayList` 中移除元素會帶來額外開銷,每次移除都會調用 `System#arraycopy` 對內部數組執行移位操做,開銷較大。


對於較大的集合(1000000個元素)能夠看到相似的模式:


```shell
     (limit)    (size)   Mode  Cnt  Score   Err  Units
eager       1  10000000  thrpt    5  0.915        ops/s
eager      10  10000000  thrpt    5  0.783        ops/s
eager     100  10000000  thrpt    5  0.965        ops/s
eager    1000  10000000  thrpt    5  0.936        ops/s
eager   10000  10000000  thrpt    5  0.860        ops/s
lazy        1  10000000  thrpt    5  4.338        ops/s
lazy       10  10000000  thrpt    5  3.149        ops/s
lazy      100  10000000  thrpt    5  2.060        ops/s
lazy     1000  10000000  thrpt    5  0.370        ops/s
lazy    10000  10000000  thrpt    5  0.05         ops/s
```


(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/chart-2.png)


在更小集合(128個元素)上的表現:


```shell
      (limit)    (size)   Mode  Cnt       Score   Error  Units
eager        2     128    thrpt    5  246439.459          ops/s
eager        4     128    thrpt    5  333866.936          ops/s
eager        8     128    thrpt    5  340296.188          ops/s
eager       16     128    thrpt    5  345533.673          ops/s
eager       32     128    thrpt    5  231725.156          ops/s
eager       64     128    thrpt    5  314324.265          ops/s
eager      128     128    thrpt    5  270451.992          ops/s
lazy         2     128    thrpt    5  765989.718          ops/s
lazy         4     128    thrpt    5  659421.041          ops/s
lazy         8     128    thrpt    5  652685.515          ops/s
lazy        16     128    thrpt    5  470346.570          ops/s
lazy        32     128    thrpt    5  324174.691          ops/s
lazy        64     128    thrpt    5  186472.090          ops/s
lazy       128     128    thrpt    5  108105.699          ops/s
```


(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/chart-3.png)


能不能進一步優化?


5. 進一步提升性能


不幸的是,現有的解決方案擴展性不盡如人意,讓咱們試着改進。但在此以前,先對現有操做進行測評:


(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/Screenshot-2019-01-03-at-16.36.58.png)


不出意外,`Arraylist#remove` 是開銷最大的操做之一。換句話說,從 `ArrayList` 中刪除元素耗費了大量 CPU 資源。


爲何呢?從 `ArrayList` 中刪除元素會對底層實現的數組執行移除操做。問題是,Java 數組不會自動調整大小,每次移除都會建立一個更小的新數組:


```java
private void fastRemove(Object[] es, int i)
{
   modCount++;
   final int newSize;
   if ((newSize = size - 1) > i)
       System.arraycopy(es, i + 1, es, i, newSize - i);
   es[size = newSize] = null;
}
```


接下來該怎麼辦?避免從 `ArrayList` 中移除元素。


爲了達到這個效果,能夠用一個數組存儲剩餘的元素並記錄它的大小:


```java
public class ImprovedRandomSpliterator<T> implements Spliterator<T> {
   private final Random random;
   private final T[] source;
   private int size;
   private ImprovedRandomSpliterator(
     List<T> source, Supplier<? extends Random> random)
{
       if (source.isEmpty()) {
           throw new IllegalArgumentException(...);
       }
       this.source = (T[]) source.toArray();
       this.random = random.get();
       this.size = this.source.length;
   }
}
```


幸運的是,因爲 `Spliterator` 的實例不會在線程之間共享,所以不會遇到併發問題。


如今嘗試移除元素時,實際上不須要建立縮小後的新數組。相反,只要減少 `size` 並忽略數組的其他部分便可。


在此以前,把最後一個元素與返回的元素交換:


```java
@Override
public boolean tryAdvance(Consumer<? super T> action)
{
   if (size > 0) {
       int nextIdx = random.nextInt(size);
       int lastIdx = size - 1;
       action.accept(source[nextIdx]);
       source[nextIdx] = source[lastIdx];
       source[lastIdx] = null; // let object be GCed
       size--;
       return true;
   } else {
       return false;
   }
}
```


對改進後的方案進行評測,能夠看到開銷最大的調用已經消失了:


(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/Screenshot-2019-01-03-at-16.38.47.png)


準備在此運行基準測試進行比較:


```shell
              (limit)  (size)   Mode  Cnt     Score     Error  Units
eager                1  100000  thrpt    3   456.811 ±  20.585  ops/s
eager               10  100000  thrpt    3   469.635 ±  23.281  ops/s
eager              100  100000  thrpt    3   466.486 ±  68.820  ops/s
eager             1000  100000  thrpt    3   454.459 ±  13.103  ops/s
eager            10000  100000  thrpt    3   443.640 ±  96.929  ops/s
eager           100000  100000  thrpt    3   335.134 ±  21.944  ops/s
lazy                 1  100000  thrpt    3  1587.536 ± 389.128  ops/s
lazy                10  100000  thrpt    3  1452.855 ± 406.879  ops/s
lazy               100  100000  thrpt    3   814.978 ± 242.077  ops/s
lazy              1000  100000  thrpt    3   167.825 ± 129.559  ops/s
lazy             10000  100000  thrpt    3    19.782 ±   8.596  ops/s
lazy            100000  100000  thrpt    3     3.970 ±   0.408  ops/s
lazy_improved        1  100000  thrpt    3  1509.264 ± 170.423  ops/s
lazy_improved       10  100000  thrpt    3  1512.150 ± 143.927  ops/s
lazy_improved      100  100000  thrpt    3  1463.093 ± 593.370  ops/s
lazy_improved     1000  100000  thrpt    3  1451.007 ±  58.948  ops/s
lazy_improved    10000  100000  thrpt    3  1148.581 ± 232.218  ops/s
lazy_improved   100000  100000  thrpt    3   383.022 ±  97.082  ops/s
```


(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/chart-5.png)


從上面的結果能夠看出,改進後的方案性能受元素數量變化影響顯著減少。


實際上,即便遇到最差狀況,改進方案的性能也比基於 `Collections#shuffle` 的方案略好一些。


6. 完整示例


完整示例能夠在 [GitHub][2] 上找到。


[2]:https://github.com/pivovarit/articles/tree/master/java-random-stream


```java
package com.pivovarit.stream;
import java.util.List;
import java.util.Random;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.function.Supplier;
public class ImprovedRandomSpliterator<T> implements Spliterator<T> {
   private final Random random;
   private final T[] source;
   private int size;
   ImprovedRandomSpliterator(List<T> source, Supplier<? extends Random> random) {
       if (source.isEmpty()) {
           throw new IllegalArgumentException("RandomSpliterator can't be initialized with an empty collection");
       }
       this.source = (T[]) source.toArray();
       this.random = random.get();
       this.size = this.source.length;
   }
    @Override
   public boolean tryAdvance(Consumer<? super T> action)
{
       if (size > 0) {
           int nextIdx = random.nextInt(size);
           int lastIdx = size - 1;
           action.accept(source[nextIdx]);
           source[nextIdx] = source[lastIdx];
           source[lastIdx] = null; // let object be GCed
           size--;
           return true;
       } else {
           return false;
       }
   }
   @Override
   public Spliterator<T> trySplit() {
       return null;
   }
   @Override
   public long estimateSize()
{
       return source.length;
   }
   @Override
   public int characteristics()
{
       return SIZED;
   }
}
```


```java
package com.pivovarit.stream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Random;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static java.util.stream.Collectors.toCollection;
public final class RandomCollectors {

   private RandomCollectors() {
   }

   public static <T> Collector<T, ?, Stream<T>> toImprovedLazyShuffledStream() {
       return Collectors.collectingAndThen(
         toCollection(ArrayList::new),
         list -> !list.isEmpty()
           ? StreamSupport.stream(new ImprovedRandomSpliterator<>(list, Random::new), false)
           : Stream.empty());
   }

   public static <T> Collector<T, ?, Stream<T>> toLazyShuffledStream() {
       return Collectors.collectingAndThen(
         toCollection(ArrayList::new),
         list -> !list.isEmpty()
           ? StreamSupport.stream(new RandomSpliterator<>(list, Random::new), false)
           : Stream.empty());
   }

   public static <T> Collector<T, ?, Stream<T>> toEagerShuffledStream() {
       return Collectors.collectingAndThen(
         toCollection(ArrayList::new),
         list -> {
             Collections.shuffle(list);
             return list.stream();          });    }}```
相關文章
相關標籤/搜索