泛函編程(12)-數據流-Stream

    在前面的章節中咱們介紹了List,也討論了List的數據結構和操做函數。List這個東西從外表看上去挺美,但在現實中使用起來卻可能很不實在。爲何?有兩方面:其一,咱們能夠發現全部List的操做都是在內存中進行的,要求List中的全部元素都必須在操做時存在於內存裏。若是必須針對大型數據集進行List操做的話就明顯不切實際了。其二,List的抽象算法如摺疊算法、map, flatMap等是沒法中途跳出的,不管如何都一直進行到底;只有經過遞歸算法在才能在中途中止運算。但遞歸算法不夠抽象,常常出現重複的代碼。最要命的是遞歸算法會隨着數據量增長堆棧內存佔用(non-tail-recursive),處理大型數據集一樣不實際。以上缺陷使List的應用被侷限在小規模的數據集處理範圍。es6

    矛盾的是,List因爲內存佔用問題不適合大數據集處理,但它的計算模式又是排列數據模式必須的選擇。Stream數據類型具有了List的排列數據計算模式但有不須要將所有數據搬到內存裏,能夠解決以上提到的大數據集處理問題。Stream的特性是經過「延後計算」(lazy evaluation)來實現的。能夠想象一下可能的原理:Stream內元素讀取是在具體使用時才進行的。不用說,Stream是典型的只讀數據類型。既然要繼承List的計算模式,那麼在結構設計上是否相同呢?咱們先看看Stream的結構設計:算法

1   trait Stream[+A] 2   case object Empty extends Stream[Nothing] 3   case class Cons[+A](head: () => A, tail: () => Stream[A]) extends Stream[A]

天啊,簡直是活脫脫的List結構嘛。不過Stream的頭元素(head)和無頭尾(tail)是延後計算的(non-strict)。因爲Cons不是普通函數而是一個類,不允許延後計算類參數,因此傳入的是一個函數 () => ???。數據結構

以上Stream結構設計與List相同;兩種狀態是用子類來表示的。如下咱們探索如下另一種設計方案:app

 1   trait Stream[+A] {  2  def uncons: Option[(A, Stream[A])]  3       def isEmpty: Boolean = uncons.isEmpty  4  }  5  object Stream {  6       def empty[A]: Stream[A] = new Stream[A] {  7           def uncons = None  8  }  9       def cons[A](h: => A, t: => Stream[A]): Stream[A] = new Stream[A] { 10           def uncons = Some((h,t)) 11  } 12       def apply[A](as: A*): Stream[A] = { 13           if (as.isEmpty) empty 14           else cons(as.head, apply(as.tail: _*)) 15  } 16   }

以上的設計方案採用告終構封裝形式:數據結構uncons,兩種狀態empty, cons都被封裝在類結構裏。最起碼咱們如今能夠直接使用=> A 來表達延後計算參數了。函數

實際上Stream就是對一個List的描述,一個類型的聲明。它的實例生成延後到了具體使用的時候,此時須要的元素已經搬入內存,成了貨真價實的List了:大數據

 

 1      //tail recursive
 2       def toList_1: List[A] = {  3  @annotation.tailrec  4           def go(s: Stream[A], acc: List[A]): List[A] = {  5  s.uncons match {  6                   case None => acc  7                   case Some((h,t)) => go(t,h :: acc)  8  }  9  } 10           go(this,Nil).reverse    // h :: acc 產生相反順序
11  } 12       //省去reverse
13       def toListFast: List[A] = { 14           val buf = new collection.mutable.ListBuffer[A] 15  @annotation.tailrec 16           def go(s: Stream[A]): List[A] ={ 17  s.uncons match { 18                   case Some((h,t)) => { 19                       buf += h 20  go(t) 21  } 22                   case _ => buf.toList 23  } 24  } 25           go(this) 26  } 27 Stream(1,2.3)                                     //> res0: ch5.stream.Stream[Double] = ch5.stream$Stream$$anon$2@1e643faf
28 Stream(1,2,3).toList                              //> res1: List[Int] = List(1, 2, 3)
29 Stream(1,2,3).toList_1                            //> res2: List[Int] = List(1, 2, 3)
30 Stream(1,2,3).toListFast                          //> res3: List[Int] = List(1, 2, 3)

 

看看,Stream(1,2,3)就是一個聲明。咱們經過List轉換才真正產生了實例。this

再看看Stream最基本的一些操做函數:lua

 

 1    def take(n: Int): Stream[A] = {  2       if ( n == 0 ) empty  3       else
 4  uncons match {  5            case None => empty  6            case Some((h,t)) => cons(h,t.take(n-1))  7  }  8  }  9     def drop(n: Int): Stream[A] = { 10         if (n == 0) this
11         else { 12  uncons match { 13                 case Some((h,t)) => t.drop(n-1) 14                 case _ => this
15  } 16  } 17  } 18 Stream(1,2,3) take 2                              //> res4: ch5.stream.Stream[Int] = ch5.stream$Stream$$anon$2@3dd3bcd
19 (Stream(1,2,3) take 2).toList                     //> res5: List[Int] = List(1, 2)
20 Stream(1,2,3) drop 2                              //> res6: ch5.stream.Stream[Int] = ch5.stream$Stream$$anon$2@97e1986
21 (Stream(1,2,3) drop 2).toList                     //> res7: List[Int] = List(3)

 

從操做結果能夠肯定:Stream的操做也都是對操做的描述,是延後計算的。當元素被搬到List時系統纔回真正計算這些Stream元素的值。

不過這些操做函數的實現方式與List基本相像:
es5

 

 1    def takeWhile(f: A => Boolean): Stream[A] = {  2  uncons match {  3             case None => empty  4             case Some((h,t)) => if ( f(h) ) cons(h,t.takeWhile(f)) else empty  5  }  6  }  7     def dropWhile(f: A => Boolean): Stream[A] = {  8  uncons match {  9             case None => empty 10             case Some((h,t)) => if ( f(h) ) t.dropWhile(f) else t 11  } 12  } 13     def headOption: Option[A] = uncons match { 14             case Some((h,t)) => Some(h) 15             case _ => None 16  } 17     def tail: Stream[A] = uncons match { 18         case Some((h,t)) => t 19         case _ => empty 20  } 21   
22 (Stream(1,2,3,4,5) takeWhile {_ < 3}).toList      //> res8: List[Int] = List(1, 2)
23 (Stream(1,2,3,4,5) dropWhile {_ < 3}).toList      //> res9: List[Int] = List(4, 5)
24 Stream(1,2,3,4,5).tail                            //> res10: ch5.stream.Stream[Int] = ch5.stream$Stream$$anon$2@337d0578
25 (Stream(1,2,3,4,5).tail).toList                   //> res11: List[Int] = List(2, 3, 4, 5)
26 Stream(1,2,3,4,5).headOption                      //> res12: Option[Int] = Some(1)

前面提到過List的摺疊算法沒法着中途跳出,而Stream經過「延後計算」(lazy evaluation)是能夠實現提前終結計算的。咱們先看看Stream的右摺疊(foldRight)算法:spa

1     def foldRight[B](z: B)(op: (A, => B) => B): B = { 2  uncons match { 3             case None => z 4             case Some((h,t)) => op(h,t.foldRight(z)(op)) 5  } 6     }

這個與List的foldRight簡直一模樣嘛,不一樣的只有op函數的第二個參數是延後計算的 => B。祕密就在這個延後計算的B上。看看下面圖示:

 

因爲op的第二個參數B是延後計算的,那麼t.foldRight(z)(op)這個表達式的計算就是延後的,系統能夠決定先不計算這個表達式從而獲得了一箇中間停頓的結果。

函數exists是在碰到第一個符合條件的元素時立刻終止的。咱們一般使用遞歸算法來實現exists的這個特性。如今咱們也能夠用右摺疊算法達到一樣效果:

1     def exists(p: A => Boolean): Boolean = { 2         foldRight(false){(a,b) => p(a) || b } 3     }

注意:當p(a)=true時系統再也不運算b,因此整個運算停了下來。

一樣,用foldRight來實現forAll:

1     def forAll(p: A => Boolean): Boolean = { 2         foldRight(true){(a,b) => p(a) && b} 3     }

當咱們遇到數據結構只能存一個元素如Option,Either時咱們用map2來對接兩個結構。當咱們遇到能存多個元素的數據結構如List,Tree時咱們就會用append來對接。Stream是一個多元素的數據結構,咱們須要實現append:

1     //把兩個Stream鏈接起來
2     def append[B >: A](b: Stream[B]): Stream[B] = { 3  uncons match { 4             case None => b 5             case Some((h,t)) => cons(h, t.append(b)) 6  } 7  } 8     //append簡寫
9     def #++[B >: A](b: Stream[B]): Stream[B] = append(b)
1 (Stream(1,2) #++ Stream(3,4,5)).toList            //> res14: List[Int] = List(1, 2, 3, 4, 5)

標準裝備函數實現:

 1     //用遞歸算法
 2     def flatMap[B](f: A => Stream[B]): Stream[B] = {  3  uncons match {  4             case None => empty  5             case Some((h,t)) => f(h) #++ t.flatMap(f)  6  }  7  }  8     //用foldRight實現
 9     def flatMap_1[B](f: A => Stream[B]): Stream[B] = { 10         foldRight(empty[B]){(h,t) => f(h) #++ t} 11  } 12     //用遞歸算法
13     def filter(p: A => Boolean): Stream[A] = { 14  uncons match { 15             case None => empty 16             case Some((h,t)) => if(p(h)) cons(h,t.filter(p)) else t.filter(p) 17  } 18  } 19     //用foldRight實現
20     def filter_1(p: A => Boolean): Stream[A] = { 21         foldRight(empty[A]){(h,t) => if(p(h)) cons(h,t) else t} 22  } 23 (Stream(1,2,3,4,5) map {_ + 10}).toList           //> res15: List[Int] = List(11, 12, 13, 14, 15)
24 (Stream(1,2,3,4,5) flatMap {x => Stream(x+10)}).toList 25                                                   //> res16: List[Int] = List(11, 12, 13, 14, 15)
26 (Stream(1,2,3,4,5) flatMap_1 {x => Stream(x+10)}).toList 27                                                   //> res17: List[Int] = List(11, 12, 13, 14, 15)
28 (Stream(1,2,3,4,5) filter {_ < 3}).toList         //> res18: List[Int] = List(1, 2)
29 (Stream(1,2,3,4,5) filter_1 {_ < 3}).toList       //> res19: List[Int] = List(1, 2)

看來都備齊了。

咱們再看看List與Stream還有什麼別的值得關注的區別。先從一個List操做的例子開始:

1 scala> List(1,2,3,4) map (_ + 10) filter (_ % 2 == 0) map (_ * 3) 2 List(36,42)

根據List的特性,每一個操做都會當即完成,產生一個結果List,而後接着下一個操做。咱們試着約化:

1 List(1,2,3,4) map (_ + 10) filter (_ % 2 == 0) map (_ * 3) 2 List(11,12,13,14) filter (_ % 2 == 0) map (_ * 3) 3 List(12,14) map (_ * 3) 4 List(36,42)

實際上這個運算遍歷(traverse)了List三次。一次map操做產生了中間List(11,12,13,14),二次操做filter產生了List(12,14),三次操做map產生最終結果List(36,42)。實際上咱們若是把遍歷這個List的方式變一下:變成每次走一個元素,連續對這個元素進行三次操做,直到走完整個List。這樣咱們在一個遍歷過程就能夠完成所有三個操做。Stream剛好是一個元素一個元素走的,由於下面的元素處於延後計算狀態。咱們試着用Stream來證實:

 1 Stream(1,2,3,4).map(_ + 10).filter(_ % 2 == 0)  2 (11 #:: Stream(2,3,4).map(_ + 10)).filter(_ % 2 == 0)  3 Stream(2,3,4).map(_ + 10).filter(_ % 2 == 0)  4 (12 #:: Stream(3,4).map(_ + 10)).filter(_ % 2 == 0)  5 12 #:: Stream(3,4).map(_ + 10).filter(_ % 2 == 0)  6 12 #:: (13 #:: Stream(4).map(_ + 10)).filter(_ % 2 == 0)  7 12 #:: Stream(4).map(_ + 10).filter(_ % 2 == 0)  8 12 #:: (14 #:: Stream().map(_ + 10)).filter(_ % 2 == 0)  9 12 #:: 14 #:: Stream().map(_ + 10).filter(_ % 2 == 0) 10 12 #:: 14 #:: Stream()

以上的#::是cons的操做符號。

相關文章
相關標籤/搜索