Scalaz(52)- scalaz-stream: 並行運算-parallel processing concurrently by merging

   若是scalaz-stream真的是一個實用的數據流編程工具庫的話,那它應該能處理同時從多個數據源獲取數據以及把數據同時送到多個終點(Sink),最重要的是它應該能夠實現高度靈活的多線程運算。可是:咱們說Process表明了一串多是無窮的元素。這個一串的意思是多個按序排列的元素。也就是說若是咱們有一個Process(a,b,c),那麼咱們只能按順序來進行運算:咱們只能在完成了對a的運算後才能運算b。這樣也說得過去:它讓咱們更容易理解scalaz-stream Process的運算過程。面對scalaz-stream這樣的特性咱們應該怎樣去實現它的並行運算呢?實際上在不少應用場景中咱們對運算結果的排列順序並不關心,咱們只對運算結果內容感興趣。如:從數據庫庫存表中查詢商品價格大於100的全部商品,這時咱們對讀出商品記錄的順序並不關心,咱們只對每條記錄的價格感興趣。若是咱們從不少源頭(數據表)讀取商品信息的話,能夠同時對這些源頭進行並行讀取。scalaz-stream是經過merge來實現並行運算的。merge能夠同時讀取多個數據源而後產生一個合併的數據流。因爲各個源頭的滯後狀況有所不一樣,因此merge產生結果的順序是不可預測的(nondeterministic)。咱們用個例子來示範有那些方法能夠同時從三個文件中逐行讀取文字而後再合併成一個多行文件:數據庫

1 al p1 = io linesR s"/Users/Tiger/Process.scala"  
2 //> p1 : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@7494e528,<function1>,<function1>)
3 val p2 = io linesR s"/Users/Tiger/Wye.scala"      
4 //> p2 : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@1f554b06,<function1>,<function1>)
5 val p3 = io linesR s"/Users/Tiger/Tee.scala"     
6 //> p3 : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@694e1548,<function1>,<function1>)

p1,p2,p3是三個Source。它們分別從Process.scala, Wye.scala, Tee.scala中讀取數據。咱們能夠模擬讀取數據時可能遇到的延遲:編程

1 //假定讀取數據形成不肯定延遲
2 def readDelay(i: Int) = Thread.sleep( i/10 )      //> readDelay: (i: Int)Unit
3 val pa = p1.map{ s => readDelay(s.length); s}     //> pa : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@7494e528,<function1>,<function1>)
4 val pb = p2.map{ s => readDelay(s.length); s}     //> pb : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@1f554b06,<function1>,<function1>)
5 val pc = p3.map{ s => readDelay(s.length); s}     //> pc : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@694e1548,<function1>,<function1>)

如今pa,pb,pc都按照所讀文件中每行文字長度來產生滯延。下面咱們先統計一下每一個Process運算所須要的時間:多線程

 1 val pa_start = System.currentTimeMillis           //> pa_start : Long = 1470051661503
 2 val palines= pa.runFoldMap(_ => 1).run            //> palines : Int = 1616
 3 println(s"reading p1 $palines lines in ${System.currentTimeMillis - pa_start}ms")  4                                                   //> reading p1 1616 lines in 6413ms
 5 val pb_start = System.currentTimeMillis           //> pb_start : Long = 1470051667917
 6 val pblines=pb.runFoldMap(_ => 1).run             //> pblines : Int = 901
 7 println(s"reading p2 $pblines lines in ${System.currentTimeMillis - pb_start}ms")  8                                                   //> reading p2 901 lines in 3275ms
 9 val pc_start = System.currentTimeMillis           //> pc_start : Long = 1470051671192
10 val pclines=pc.runFoldMap(_ => 1).run             //> pclines : Int = 306
11 println(s"reading p3 $pclines lines in ${System.currentTimeMillis - pc_start}ms") 12                                                   //> reading p3 306 lines in 1181ms
13 println(s"reading all ${palines+pblines+pclines} lines in ${System.currentTimeMillis - pa_start}ms") 14                                                   //> reading all 2823 lines in 10870ms

三個文件總共有2823行,讀取時間爲10870ms。咱們用append方式來連續運算:app

1 val pl_start = System.currentTimeMillis           //> pl_start : Long = 1470051672373
2 val plines = (pa ++ pb ++ pc).runFoldMap(_ => 1).run 3                                                   //> plines : Int = 2823
4 println(s"continue reading $plines in ${System.currentTimeMillis - pl_start}ms") 5                                                   //> continue reading 2823 in 10501ms

連續運算所需時間10501ms,稍微短於分開運算結果。那麼若是咱們用merge來並行運算呢?ide

1 val par_start = System.currentTimeMillis          //> par_start : Long = 1470051682874
2 val parlines = (pa merge pb merge pc).runFoldMap(_ => 1).run 3                                                   //> parlines : Int = 2823
4 println(s"parallel reading $parlines in ${System.currentTimeMillis - par_start}ms") 5                                                   //> parallel reading 2823 in 6278ms

如今整個運算只須要6278ms,約莫是連續運算所需時間的60%。固然,若是咱們須要從更多的源頭讀取數據的話,那麼merge方法能夠實現更高的效率提高。可是,因爲stream多是一串無窮的元素,咱們更須要對一個stream無窮的元素實現並行運算。在上面的例子裏咱們用merge把三個源頭的數據合併成爲一個更長的數據串,若是咱們對其中每條記錄進行運算如抽取、對比篩選等的話,那麼運算時間仍然與數據串的長度成直線正比。好比:在以上例子的基礎上,咱們須要對合並的數據進行統計:計算出使用元音(vowl)的頻率的。咱們能夠先把每條記錄中的vowl過濾出來;而後把全部篩選出來的記錄加起來就能得出這個統計結果了:函數

 1 /c 是個vowl  2 def vowls(c: Char): Boolean = List('A','E','I','O','U').contains(c)  3                                                   //> vowls: (c: Char)Boolean  4 
 5 //返回Map表明每一個字符頻率, 測試使用了scalaz.Lens
 6 def vowlCount(text: String): Map[Char,Int] = {  7     text.toUpperCase.toList.filter(vowls).foldLeft(Map[Char,Int]()) { (b,a) =>
 8       if ((Lens.mapVLens(a) get b) == None) Lens.mapVLens(a) set(b,1.some)  9       else Lens.mapVLens(a).set(b, (Lens.mapVLens(a) get b).map(_ + 1)) 10  } 11  }                                                //> vowlCount: (text: String)Map[Char,Int] 12 //直接用scala標準庫實現
13 def stdVowlsCount(text: String): Map[Char,Int] =
14   text.toUpperCase.toList.filter(vowls).groupBy(s => s).mapValues(_.size) 15                                                   //> stdVowlsCount: (text: String)Map[Char,Int]

咱們先按序運算結果:工具

 1 //爲runFoldMap提供一個Map[Char,Int]Monoid實例
 2 implicit object mapMonoid extends Monoid[Map[Char,Int]] {  3    def zero: Map[Char,Int] = Map()  4    def append(m1: Map[Char,Int], m2: => Map[Char,Int]): Map[Char,Int] = {  5      (m1.keySet ++ m2.keySet).map { k =>
 6        (k, m1.getOrElse(k,0) + m2.getOrElse(k,0))  7  }.toMap  8  }  9 } 10 
11 val cnt_start = System.currentTimeMillis          //> cnt_start : Long = 1470197392016
12 val merged = (pa merge pb merge pc) 13  .map(vowlCount) 14   .runFoldMap(identity).run                       //> merged : Map[Char,Int] = Map(E -> 7330, U -> 1483, A -> 4531, I -> 4393, O-> 3748)
15 println(s"calc vowl frequency in ${System.currentTimeMillis - cnt_start}ms") 16                                                   //> calc vowl frequency in 28646ms

整個運算須要28646ms。實際上這些運算不會依賴每條記錄的排列位置,那麼若是可以實現並行運算的話可能會提升效率。scalaz-stream提供了merge.mergeN方法來支持對一順數據流進行並行運算。merge.mergeN函數的款式以下:測試

/** * Merges non-deterministically processes that are output of the `source` process. * * Merging stops when all processes generated by source have stopped, and all source process stopped as well. * Merging will also stop when resulting process terminated. In that case the cleanup of all `source` * processes is run, followed by cleanup of resulting process. * * When one of the source processes fails the mergeN process will fail with that reason. * * Merging is non-deterministic, but is fair in sense that every process is consulted, once it has `A` ready. * That means processes that are `faster` provide it's `A` more often than slower processes. * * Internally mergeN keeps small buffer that reads ahead up to `n` values of `A` where `n` equals to number * of active source streams. That does not mean that every `source` process is consulted in this read-ahead * cache, it just tries to be as much fair as possible when processes provide their `A` on almost the same speed. * */ def mergeN[A](source: Process[Task, Process[Task, A]])(implicit S: Strategy): Process[Task, A] = scalaz.stream.nondeterminism.njoin(0, 0)(source)(S) /** * MergeN variant, that allows to specify maximum of open `source` processes. * If, the maxOpen is <= 0 it acts like standard mergeN, where the number of processes open is not limited. * However, when the maxOpen > 0, then at any time only `maxOpen` processes will be running at any time * * This allows for limiting the eventual concurrent processing of opened streams not only by supplied strategy, * but also by providing a `maxOpen` value. * * * @param maxOpen Max number of open (running) processes at a time * @param source source of processes to merge */ def mergeN[A](maxOpen: Int)(source: Process[Task, Process[Task, A]])(implicit S: Strategy): Process[Task, A] = scalaz.stream.nondeterminism.njoin(maxOpen, maxOpen)(source)(S)

mergeN的入參source類型款式是這樣的:Process[Task,Process[Task,A]],意思是在Process裏還有一個Process。這個內部Process是並行運算的。這樣的類型款式也能夠被理解爲:內部的Process是讀取數據庫的記錄(data),咱們能夠同時從多個源頭讀取數據,外部Process是數據庫鏈接(connection)。應用在咱們上面的例子裏:內部Process就是vowlCount做業,由於咱們但願對每條記錄的vowlCount並行處理。那麼咱們先要進行類型款式轉換:從Process[Task,A] 轉換到 Process[Task,Process[Task,A]]:this

1 val merged = (pa merge pb merge pc)               //> merged : scalaz.stream.Process[scalaz.concurrent.Task,String] = Append(Hal 2                                                   //| t(End),Vector(<function1>))
3 val par = merged.map {text => Task {vowlCount(text)} } 4           .map {task => Process.eval(task)}       //> par : scalaz.stream.Process[scalaz.concurrent.Task,scalaz.stream.Process[scalaz.concurrent.Task,Map[Char,Int]]] = Append(Halt(End),Vector(<function1>))

這個par的類型是咱們但願的了。如今咱們能夠看看mergeN運算的效率:spa

1 val cnt_start = System.currentTimeMillis          //> cnt_start : Long = 1470204623562
2 val merged = (pa merge pb merge pc)               //> merged : scalaz.stream.Process[scalaz.concurrent.Task,String] = Append(Halt(End),Vector(<function1>))
3 val par = merged.map {text => Task {vowlCount(text)} } 4           .map {task => Process.eval(task)}       //> par : scalaz.stream.Process[scalaz.concurrent.Task,scalaz.stream.Process[scalaz.concurrent.Task,Map[Char,Int]]] = Append(Halt(End),Vector(<function1>))
5 val resm = merge.mergeN(par).runFoldMap(identity).run 6                                                   //> resm : Map[Char,Int] = Map(E -> 7330, U -> 1483, A -> 4531, I -> 4393, O -> 3748)
7 println(s"parallel calc vowl frequency in ${System.currentTimeMillis - cnt_start}ms") 8                                                   //> parallel calc vowl frequency in 6922ms

看看這個結果:從28646ms降到6922,約莫4倍效率的提升,夠顯著的了。若是咱們把上面這個例子用在實際的數據庫操做上:好比對幾個數據庫表裏的全部在必定價格範圍內商品購買次數進行統計等,咱們是能夠在scalaz-stream裏實現這個場景並行運算的。

相關文章
相關標籤/搜索