Stream是Java8的新特性之一,從名字看出來,它的含義是流,所謂的流就是一個數據渠道,能夠用於操做集合所生成的元素序列。 實際上不光是集合,包括數組、文件等,只要是能夠轉換成流,咱們均可以藉助流式處理,相似於咱們寫SQL語句同樣對其進行操做。java
1)Stream本身不會存儲元素。
2)Stream不會改變源對象。
3)Stream操做是延時執行的,這意味着它們會等到須要結果時才執行。數組
Stream的流式處理能夠分爲三個部分:轉換成流、中間操做、終端操做,以下圖:數據結構
具體語句以下:app
List userIds = userLists.stream().map(UserDO::getId).collect(Collectors.toList()); stream語句將userLists集合轉換成了一個流,map爲中間操做,經過函數式表達式獲取user的Id,最後經過終端操做將其轉換成一個list集合返回。less
能夠發現,Stream極大地簡化了咱們對集合的操做。dom
1) 經過Collection得Stream()方法(串行流)或者 parallelStream()方法(並行流)建立Stream。ide
@Test
public void test1 () {
//1. 經過Collection得Stream()方法(串行流)
//或者 parallelStream()方法(並行流)建立Stream
List<String> list = new ArrayList<String>();
Stream<String> stream1 = list.stream();
Stream<String> stream2 = list.parallelStream();
}
複製代碼
2) 經過Arrays中得靜態方法stream()獲取數組流 。函數
@Test
public void test2 () {
//2. 經過Arrays中得靜態方法stream()獲取數組流
IntStream stream = Arrays.stream(new int[]{3,5});
}
複製代碼
3) 經過Stream類中得 of()靜態方法獲取流 。學習
@Test
public void test3 () {
//3. 經過Stream類中得 of()靜態方法獲取流
Stream<String> stream = Stream.of("4645", "huinnj");
}
複製代碼
4) 建立無限流(迭代、生成) 。ui
@Test
public void test4 () {
//4. 建立無限流
//迭代(須要傳入一個種子,也就是起始值,而後傳入一個一元操做)
Stream<Integer> stream1 = Stream.iterate(2, (x) -> x * 2);
//生成(無限產生對象)
Stream<Double> stream2 = Stream.generate(() -> Math.random());
}
複製代碼
distinct: 對於Stream中包含的元素進行去重操做(去重邏輯依賴元素的equals方法),新生成的Stream中沒有重複的元素。
filter方法是根據設置的條件來過濾、篩選出所須要的元素。
map方法是對於Stream中包含的元素使用給定的轉換函數進行轉換操做,新生成的Stream只包含轉換生成的元素。
limit方法是根據設定的數量從流中獲取對應數量的元素。
skip方法與limt有點區別,此方法是獲取到根據設定的數量n去除掉集合中前n個元素以後的全部數據。
sorted方法是對流中的元素進行排序,降序和升序均可以使用天然排序的方法,也能夠調用Comparator中的方法進行排序
peek 方法是生成一個包含原Stream的全部元素的新Stream,同時會提供一個消費函數(Consumer實例),新Stream每一個元素被消費的時候都會執行給定的消費函數 注:剛開始覺得peek與map的用法同樣,後面經過學習及練習,peek方法實際上是沒有返回值的,多半用於一些輸出,數據的校驗等。
接收一個函數做爲參數,將流中的每一個值都換成一個流,而後把全部流鏈接成一個流。
檢查是否匹配全部元素。
檢查是否至少匹配全部元素 。
檢查是否沒有匹配全部元素 。
返回第一個元素。
@Test
public void test16 () {
Optional<Person> person = persons.stream().findFirst();
System.out.println(person);
person.orElse(new Person("王五", "男", 35, Status.BUSY));
}
複製代碼
返回當前流中任意元素。
返回流中元素總個數。
返回流中最大值。
返回流中最小值。
有reduce(T identitty,BinaryOperator) 與reduce(BinaryOperator),前者須要一個起始值,而後傳入二元運算,後者沒有起始值。
Collector接口中方法得實現決定了如何對流執行收集操做(如收集到List,Set,Map)。可是Collectors實用類提供了不少靜態方法,能夠方便地建立常見得收集器實例。
先來看下列語句:
List<String> list = Arrays.asList("Hello","World","Word");
// 求出以W開頭,長度最長的單詞,並把長度賦值給num
Integer num = list.stream().filter(x -> x.startsWith("W")).mapToInt(String::length).max().getAsInt();
複製代碼
邏輯很簡單,使用一行stream()語句便可完成需求,可是這裏有一個問題,stream()到底是怎麼作到用一行來完成這麼多篩選條件的語句呢?它是每調用一次中間操做就遍歷一遍集合元素嗎?答案是否是的,咱們來看看stream()具體是怎麼操做的。
首先咱們來看看通常的for循環是怎麼在一次迭代的狀況下完成的:
int maxLength = 0;
for (String str : list) {
if (str.startsWith("W")) {
maxLength = Math.max(str.length(), maxLength);
}
}
複製代碼
這種方式不但只用了一次迭代,同時避免了存儲中間結果,可是它有個前提,必須得知道當前的程序意圖。而Stream設計者並不能知道用戶想要實現的功能是什麼。所以,在沒法假設用戶行爲的前提下實現流水線,是設計者首要考慮的問題。
在這裏,你們應該都能想到,若是可以像隊列同樣記錄用戶的每一次操做,在用戶結束操做的時候,再將全部的記錄疊加到一次迭代中所有執行掉,那不就解決問題了嗎。那麼接下來又會有一系列的問題:
1)用戶的操做該怎麼記錄?
2)操做如何疊加?
3)疊加以後的操做如何?
4)執行後的結果在哪裏?
注意,這裏的操做是指中間操做,Stream中會使用Stage概念來描述一個完整的操做,並用某種實例化後的PipelineHelper來表明Stage,將具備前後順序的各個Stage連到一塊兒,就構成了整個流水線,其中Stream相關類和接口的繼承關係如上圖所示。
Stream流水線組織結構示意圖以下:
經過Collection.stream()方法獲得Head,也就是stage0,緊接着調用一系列中間操做,不斷產生新的Stream,這些Stream對象以雙向鏈表的形式組織在一塊兒,構成整個流水線,因爲每一個Stage都記錄了前一個Stage和本次的操做以及回調函數,依靠這種結構就能創建起對數據源的全部操做。
Head記錄Stream起始操做。
StatelessOp記錄無狀態的中間操做。
StatefulOp記錄有狀態的中間操做 。
Reference構造方法
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
if (previousStage.linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
previousStage.linkedOrConsumed = true;
previousStage.nextStage = this;
this.previousStage = previousStage;
this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
this.sourceStage = previousStage.sourceStage;
if (opIsStateful())
sourceStage.sourceAnyStateful = true;
this.depth = previousStage.depth + 1;
}
複製代碼
Reference構造方法調用了AbstractPipeline的構造方法,能夠發現,它的構造函數更像是鏈表的操做,指定了前驅stage的後繼stage爲當前stage。
Head構造方法
AbstractPipeline(Supplier<? extends Spliterator<?>> source,
int sourceFlags, boolean parallel) {
this.previousStage = null;
this.sourceSupplier = source;
this.sourceStage = this;
this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
// The following is an optimization of:
// StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
this.depth = 0;
this.parallel = parallel;
}
複製代碼
能夠發現,Head的構造方法指定了sourceStage爲當前stage。
如今咱們知道Stream()是如何記錄每一步的操做了,要想讓流水線將全部的操做疊加在一塊兒,還須要有一種協議來協調相鄰的Stage之間的關係。這個協議就是Sink接口,Sink接口方法以下:
有了此協議,相鄰Stage之間調用就很方便了,每一個Stage都會將本身的操做封裝到一個Sink裏,前一個Stage只需調用後一個Stage的accept()方法便可,並不須要知道內部是如何處理的。對於短路操做,也要實現cancellationRequested()。實際上Stream API內部實現的本質,就是如何重載Sink的這四個接口方法。
有了Sink包裝,流水線調用時只須要從流水線的head開始,對數據源依次調用每一個Stage對應的Sink{begin(),accept(),cancellationRequested(),end()}就能夠了。就相似於[處理->轉發]這種模型。
讓咱們以**Stream.map()**爲例,探索Sink是怎麼實現的:
// 產生一個新的stream
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
Objects.requireNonNull(mapper);
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
// opWrapSink返回回調函數包裝成的Sink
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void accept(P_OUT u) {
// 將處理結果傳遞給流水線下游
downstream.accept(mapper.apply(u));
}
};
}
};
}
複製代碼
上述邏輯總結以下:首先,將回調函數mapper包裝到一個Sink中,因爲Stream.map()是一個無狀態的中間操做,因此返回一個StatelessOp對象,調用這個新Stream的opWripSink()獲得一個包裝了當前回調函數的Sink。
接下來咱們看一看**Stream.sorted()**方法:
// Stream.sort()方法用到的Sink實現
class RefSortingSink<T> extends AbstractRefSortingSink<T> {
private ArrayList<T> list;// 存放用於排序的元素
RefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) {
super(downstream, comparator);
}
@Override
public void begin(long size) {
...
// 建立一個存放排序元素的列表
list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
}
@Override
public void end() {
list.sort(comparator);// 只有元素所有接收以後才能開始排序
downstream.begin(list.size());
if (!cancellationWasRequested) {// 下游Sink不包含短路操做
list.forEach(downstream::accept);// 2. 將處理結果傳遞給流水線下游的Sink
}
else {// 下游Sink包含短路操做
for (T t : list) {// 每次都調用cancellationRequested()詢問是否能夠結束處理。
if (downstream.cancellationRequested()) break;
downstream.accept(t);// 2. 將處理結果傳遞給流水線下游的Sink
}
}
downstream.end();
list = null;
}
@Override
public void accept(T t) {
list.add(t);// 1. 使用當前Sink包裝動做處理t,只是簡單的將元素添加到中間列表當中
}
}
複製代碼
1)首先beging()方法告訴Sink參與排序的元素個數,方便肯定中間結果容器的的大小;
2)以後經過accept()方法將元素添加到中間結果當中,最終執行時調用者會不斷調用該方法,直到遍歷全部元素;
3)最後end()方法告訴Sink全部元素遍歷完畢,啓動排序步驟,排序完成後將結果傳遞給下游的Sink;
4)若是下游的Sink是短路操做,將結果傳遞給下游時不斷詢問下游cancellationRequested()是否能夠結束處理。
既然Sink已經將Stream的每一步操做都進行了封裝,就差最後一步執行了,那麼到底是誰來執行呢?沒錯,就是咱們一開始提到的結束操做。
結束操做以後不能再有別的操做,因此結束操做會建立一個包裝了本身操做的Sink,這也是最後一個Sink,它只需處理數據,而不須要轉發,對於Sink的[處理,轉發]模型,結束操做的Sink就是調用鏈的出口。
咱們再來考察一下上游的Sink是如何找到下游Sink的。一種可選的方案是在PipelineHelper中設置一個Sink字段,在流水線中找到下游Stage並訪問Sink字段便可。
但Stream類庫的設計者沒有這麼作,而是設置了一個Sink AbstractPipeline.opWrapSink(int flags, Sink downstream)
方法來獲得Sink,該方法的做用是返回一個新的包含了當前Stage表明的操做以及可以將結果傳遞給downstream的Sink對象。
爲何要產生一個新對象而不是返回一個Sink字段?這是由於使用opWrapSink()能夠將當前操做與下游Sink(上文中的downstream參數)結合成新Sink。
試想只要從流水線的最後一個Stage開始,不斷調用上一個Stage的opWrapSink()方法直到最開始(不包括stage0,由於stage0表明數據源,不包含操做),就能夠獲得一個表明了流水線上全部操做的Sink,用代碼表示就是這樣:
// AbstractPipeline.wrapSink()
// 從下游向上遊不斷包裝Sink。若是最初傳入的sink表明結束操做,
// 函數返回時就能夠獲得一個表明了流水線上全部操做的Sink。
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
...
for (AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink<P_IN>) sink;
}
複製代碼
如今流水線上從開始到結束的全部的操做都被包裝到了一個Sink裏,執行這個Sink就至關於執行整個流水線,執行Sink的代碼以下:
// AbstractPipeline.copyInto(), 對spliterator表明的數據執行wrappedSink表明的操做。
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
...
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());// 通知開始遍歷
spliterator.forEachRemaining(wrappedSink);// 迭代
wrappedSink.end();// 通知遍歷結束
}
...
}
複製代碼
1)對於表中返回boolean或者Optional的操做(Optional是存放 一個 值的容器)的操做,因爲值返回一個值,只須要在對應的Sink中記錄這個值,等到執行結束時返回就能夠了。
2)對於歸約操做,最終結果放在用戶調用時指定的容器中(容器類型經過收集器指定)。collect(), reduce(), max(), min()都是歸約操做,雖然max()和min()也是返回一個Optional,但事實上底層是經過調用reduce()方法實現的。
3)於返回是數組的狀況,毫無疑問的結果會放在數組當中。這麼說固然是對的,但在最終返回數組以前,結果實際上是存儲在一種叫作Node的數據結構中的。Node是一種多叉樹結構,元素存儲在樹的葉子當中,而且一個葉子節點能夠存放多個元素。這樣作是爲了並行執行方便。