Scalaz(57)- scalaz-stream: fs2-多線程編程,fs2 concurrency

    fs2的多線程編程模式不但提供了無阻礙I/O(java nio)能力,更爲並行運算提供了良好的編程工具。在進入並行運算討論前咱們先示範一下fs2 pipe2對象裏的一些Stream合併功能。咱們先設計兩個幫助函數(helper)來跟蹤運算及模擬運算環境:java

1   def log[A](prompt: String): Pipe[Task,A,A] = _.evalMap {a =>
2     Task.delay { println(prompt + a); a}}         //> log: [A](prompt: String)fs2.Pipe[fs2.Task,A,A]
3 
4   Stream(1,2,3).through(log(">")).run.unsafeRun   //> >1 5                                                   //| >2 6                                                   //| >3

log是個運算跟蹤函數。編程

 1  implicit val strategy = Strategy.fromFixedDaemonPool(4)  2   //> strategy : fs2.Strategy = Strategy
 3  implicit val scheduler = Scheduler.fromFixedDaemonPool(2)  4   //> scheduler : fs2.Scheduler = Scheduler(java.util.concurrent.ScheduledThreadPoolExecutor@16022d9d[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0])
 5   def randomDelay[A](max: FiniteDuration): Pipe[Task, A, A] = _.evalMap { a => {  6     val delay: Task[Int] = Task.delay {  7  scala.util.Random.nextInt(max.toMillis.toInt)  8  }  9     delay.flatMap { d => Task.now(a).schedule(d.millis) } 10  } 11   }      //> randomDelay: [A](max: scala.concurrent.duration.FiniteDuration)fs2.Pipe[fs2.Task,A,A]
12   Stream(1,2,3).through(randomDelay(1.second)).through(log("delayed>")).run.unsafeRun 13                                                   //> delayed>1 14                                                   //| delayed>2 15                                                   //| delayed>3

randomDelay是一個模擬任意延遲運算環境的函數。咱們也能夠在鏈接randomDelay先後進行跟蹤: 多線程

1 Stream(1,2,3).through(log("befor delay>")) 2                .through(randomDelay(1.second)) 3                .through(log("after delay>")).run.unsafeRun 4                                                   //> befor delay>1 5                                                   //| after delay>1 6                                                   //| befor delay>2 7                                                   //| after delay>2 8                                                   //| befor delay>3 9                                                   //| after delay>3

值得注意的是randomDelay並不會阻礙(block)當前運算。app

下面咱們來看看pipe2對象裏的合併函數interleave:dom

 1 val sa = Stream(1,2,3).through(randomDelay(1.second)).through(log("A>"))  2         //> sa : fs2.Stream[fs2.Task,Int] = Segment(Emit(Chunk(1, 2, 3))).flatMap(<function1>).flatMap(<function1>)
 3 val sb = Stream(1,2,3).through(randomDelay(1.second)).through(log("B>"))  4         //> sb : fs2.Stream[fs2.Task,Int] = Segment(Emit(Chunk(1, 2, 3))).flatMap(<function1>).flatMap(<function1>)
 5 (sa interleave sb).through(log("AB")).run.unsafeRun  6                                                   //> A>1  7                                                   //| B>1  8                                                   //| AB>1  9                                                   //| AB>1 10                                                   //| A>2 11                                                   //| B>2 12                                                   //| AB>2 13                                                   //| AB>2 14                                                   //| A>3 15                                                   //| B>3 16                                                   //| AB>3 17                                                   //| AB>3

咱們看到合併後的數據發送必須等待sa,sb完成了元素髮送以後。這是一種固定順序的合併操做。merge是一種不定順序的合併方式,咱們看看它的使用示範:異步

 1 (sa merge sb).through(log("AB>")).run.unsafeRun   //> B>1  2                                                   //| AB>1  3                                                   //| B>2  4                                                   //| AB>2  5                                                   //| B>3  6                                                   //| AB>3  7                                                   //| A>1  8                                                   //| AB>1  9                                                   //| A>2 10                                                   //| AB>2 11                                                   //| A>3 12                                                   //| AB>3

咱們看到merge不會同時等待sa,sb完成後再發送結果,只要其中一個完成發送就開始發送結果了。換言之merge合併基本上是跟着跑的快的那個,因此結果順序是不規則不可肯定的(nondeterministic)。那麼從運算時間上來說:interleave合併所花費時間就是肯定的sa+sb,而merge則選sa,sb之間最快的時間。固然整體運算所需時間是至關的,但在merge時咱們能夠對發出的元素進行並行運算,能大大縮短運算時間。用merge其中一個問題是咱們沒法肯定當前的元素是從那裏發出的,咱們能夠用either來解決這個問題:async

 1 (sa either sb).through(log("AB>")).run.unsafeRun  //> A>1  2                                                   //| AB>Left(1)  3                                                   //| B>1  4                                                   //| AB>Right(1)  5                                                   //| A>2  6                                                   //| AB>Left(2)  7                                                   //| B>2  8                                                   //| AB>Right(2)  9                                                   //| B>3 10                                                   //| AB>Right(3) 11                                                   //| A>3 12                                                   //| AB>Left(3)

咱們經過left,right分辨數據源頭。若是再增多一個Stream源頭,咱們仍是能夠用merge來合併三個Stream:函數

 1 val sc = Stream.range(1,10).through(randomDelay(1.second)).through(log("C>"))  2     //> sc : fs2.Stream[fs2.Task,Int] = Segment(Emit(Chunk(()))).flatMap(<function1>).flatMap(<function1>).flatMap(<function1>)
 3 ((sa merge sb) merge sc).through(log("ABC>")).run.unsafeRun  4                                                   //> B>1  5                                                   //| ABC>1  6                                                   //| C>1  7                                                   //| ABC>1  8                                                   //| A>1  9                                                   //| ABC>1 10                                                   //| B>2 11                                                   //| ABC>2 12                                                   //| A>2 13                                                   //| ABC>2 14                                                   //| B>3 15                                                   //| ABC>3 16                                                   //| C>2 17                                                   //| ABC>2 18                                                   //| A>3 19                                                   //| ABC>3 20                                                   //| C>3 21                                                   //| ABC>3 22                                                   //| C>4 23                                                   //| ABC>4 24                                                   //| C>5 25                                                   //| ABC>5 26                                                   //| C>6 27                                                   //| ABC>6 28                                                   //| C>7 29                                                   //| ABC>7 30                                                   //| C>8 31                                                   //| ABC>8 32                                                   //| C>9 33                                                   //| ABC>9

若是咱們沒法肯定數據源頭數量的話,那麼咱們能夠用如下的類型款式來表示: 工具

Stream[Task,Stream[Task,A]]

這個類型表明的是Stream of Streams。在外部的Stream裏包含了不肯定數量的Streams。用具體的例子能夠解釋:外部的Stream表明客端數據鏈接(connection),內部的Stream表明每一個客端讀取的數據。把上面的三個Stream用這種類型來表示的話:ui

1 val streams:Stream[Task,Stream[Task,Int]] = Stream(sa,sb,sc) 2      //> streams : fs2.Stream[fs2.Task,fs2.Stream[fs2.Task,Int]] = Segment(Emit(Chunk(Segment(Emit(Chunk(1, 2, 3))).flatMap(<function1>).flatMap(<function1>),Segment(Emit(Chunk(1, 2, 3))).flatMap(<function1>).flatMap(<function1>), S
3 egment(Emit(Chunk(()))).flatMap(<function1>).flatMap(<function1>).flatMap(<function1>))))

如今咱們不但須要對內部Stream進行運算還須要把結果打平成Stream[Task,A]。在fs2.concurrent包裏就有這樣一個組件(combinator):

  def join[F[_],O](maxOpen: Int)(outer: Stream[F,Stream[F,O]])(implicit F: Async[F]): Stream[F,O] = {...}

輸入參數outer和運算結果類型都對得上。maxOpen表明最多並行運算數。咱們能夠用join運算上面合併sa,sb,sc的例子:

 1 val ms = concurrent.join(3)(streams)              //> ms : fs2.Stream[fs2.Task,Int] = attemptEval(Task).flatMap(<function1>).flatMap(<function1>)
 2 ms.through(log("ABC>")).run.unsafeRun             //> C>1  3                                                   //| ABC>1  4                                                   //| A>1  5                                                   //| ABC>1  6                                                   //| C>2  7                                                   //| ABC>2  8                                                   //| B>1  9                                                   //| ABC>1 10                                                   //| C>3 11                                                   //| ABC>3 12                                                   //| A>2 13                                                   //| ABC>2 14                                                   //| B>2 15                                                   //| ABC>2 16                                                   //| C>4 17                                                   //| ABC>4 18                                                   //| A>3 19                                                   //| ABC>3 20                                                   //| B>3 21                                                   //| ABC>3 22                                                   //| C>5 23                                                   //| ABC>5 24                                                   //| C>6 25                                                   //| ABC>6 26                                                   //| C>7 27                                                   //| ABC>7 28                                                   //| C>8 29                                                   //| ABC>8 30                                                   //| C>9 31                                                   //| ABC>9

結果就是咱們預料的。上面提到過maxOpen是最大並行運算數。咱們用另外一個例子來觀察:

 1 val rangedStreams = Stream.range(0,5).map {id =>
 2       Stream.range(1,5).through(randomDelay(1.second)).through(log((('A'+id).toChar).toString +">")) }  3       //> rangedStreams : fs2.Stream[Nothing,fs2.Stream[fs2.Task,Int]] = Segment(Emit(Chunk(()))).flatMap(<function1>).mapChunks(<function1>)
 4 concurrent.join(3)(rangedStreams).run.unsafeRun   //> B>1  5                                                   //| A>1  6                                                   //| C>1  7                                                   //| B>2  8                                                   //| C>2  9                                                   //| A>2 10                                                   //| B>3 11                                                   //| C>3 12                                                   //| C>4 13                                                   //| D>1 14                                                   //| A>3 15                                                   //| A>4 16                                                   //| B>4 17                                                   //| E>1 18                                                   //| E>2 19                                                   //| E>3 20                                                   //| D>2 21                                                   //| D>3 22                                                   //| E>4 23                                                   //| D>4

能夠看到一共只有三個運算過程同時存在,如:ABC, ED...

當咱們的程序須要與外界程序交互時,可能會如下面的幾種形式進行:

一、產生反作用的運算是同步運行的。這種狀況最容易處理,由於直接能夠獲取結果

二、產生反作用的運算是異步的:經過調用一次callback函數來提供運算結果

三、產生反作用的運算是異步的,但結果必須經過屢次調用callback函數來分批提供

下面咱們就一種一種狀況來分析:

一、同步運算最容易處理:咱們只須要把運算包嵌在Stream.eval裏就好了:

1 def destroyUniverse: Unit = println("BOOOOM!!!")  //> destroyUniverse: => Unit
2 val s = Stream.eval_(Task.delay(destroyUniverse)) ++ Stream("...move on") 3     //> s : fs2.Stream[fs2.Task,String] = append(attemptEval(Task).flatMap(<function1>).flatMap(<function1>), Segment(Emit(Chunk(()))).flatMap(<function1>))
4 s.runLog.unsafeRun                        //> BOOOOM!!! 5                                           //| res8: Vector[String] = Vector(...move on)

二、第二種狀況:fs2裏的Async trait有個async是用來登記callback函數的:

trait Async[F[_]] extends Effect[F] { self =>
/** Create an `F[A]` from an asynchronous computation, which takes the form of a function with which we can register a callback. This can be used to translate from a callback-based API to a straightforward monadic version. */ def async[A](register: (Either[Throwable,A] => Unit) => F[Unit]): F[A] = bind(ref[A]) { ref => bind(register { e => runSet(ref)(e) }) { _ => get(ref) }} ...

咱們用一個實際的例子來作示範,假設咱們有一個callback函數readBytes:

1 trait Connection { 2   def readBytes(onSuccess: Array[Byte] => Unit, onFailure: Throwable => Unit): Unit

這個Connection就是一個交互界面(interface)。假設它是這樣實現實例化的:

1 val conn = new Connection { 2   def readBytes(onSuccess: Array[Byte] => Unit, onFailure: Throwable => Unit): Unit = { 3     Thread.sleep(1000) 4     onSuccess(Array(1,2,3,4,5)) 5  } 6 }  //> conn : demo.ws.fs2Concurrent.connection = demo.ws.fs2Concurrent$$anonfun$main$1$$anon$1@4c40b76e

咱們能夠用async登記(register)這個callback函數,把它變成純代碼可組合的(monadic)組件Task[Array[Byte]]:

1 val bytes = T.async[Array[Byte]] { (cb: Either[Throwable,Array[Byte]] => Unit) => { 2  Task.delay { conn.readBytes ( 3      ready => cb(Right(ready)), 4      fail => cb(Left(fail)) 5  ) } 6 }}             //> bytes : fs2.Task[Array[Byte]] = Task

這樣咱們才能用Stream.eval來運算bytes:

1 Stream.eval(bytes).map(_.toList).runLog.unsafeRun //> res9: Vector[List[Byte]] = Vector(List(1, 2, 3, 4, 5))

這種只調用一次callback函數的狀況也比較容易處理:當咱們來不及處理數據時中止讀取就是了。若是須要屢次調用callback,好比外部程序也是一個Stream API:一旦數據準備好就調用一次callback進行傳送。這種狀況下可能出現咱們的程序來不及處理收到的數據的情況。咱們能夠用fs2.async包提供的queue來解決這個問題:

 1 import fs2.async
 2  import fs2.util.Async  3 
 4   type Row = List[String]  5   // defined type alias Row
 6 
 7  trait CSVHandle {  8     def withRows(cb: Either[Throwable,Row] => Unit): Unit  9  } 10   // defined trait CSVHandle
11 
12   def rows[F[_]](h: CSVHandle)(implicit F: Async[F]): Stream[F,Row] =
13     for { 14       q <- Stream.eval(async.unboundedQueue[F,Either[Throwable,Row]]) 15       _ <- Stream.suspend { h.withRows { e => F.unsafeRunAsync(q.enqueue1(e))(_ => ()) }; Stream.emit(()) } 16       row <- q.dequeue through pipe.rethrow 17     } yield row 18   // rows: [F[_]](h: CSVHandle)(implicit F: fs2.util.Async[F])fs2.Stream[F,Row]

enqueue1和dequeue在Queue trait裏是這樣定義的:

/** * Asynchronous queue interface. Operations are all nonblocking in their * implementations, but may be 'semantically' blocking. For instance, * a queue may have a bound on its size, in which case enqueuing may * block until there is an offsetting dequeue. */ trait Queue[F[_],A] { /** * Enqueues one element in this `Queue`. * If the queue is `full` this waits until queue is empty. * * This completes after `a` has been successfully enqueued to this `Queue` */ def enqueue1(a: A): F[Unit] /** Repeatedly call `dequeue1` forever. */ def dequeue: Stream[F, A] = Stream.repeatEval(dequeue1) /** Dequeue one `A` from this queue. Completes once one is ready. */ def dequeue1: F[A] ...

咱們用enqueue1把一次callback調用存入queue。dequeue的運算結果是Stream[F,Row],因此咱們用dequeue運算存在queue裏的任務取出數據。

fs2提供了signal,queue,semaphore等數據類型。下面是一些使用示範:async.signal

 1 Stream.eval(async.signalOf[Task,Int](0)).flatMap {s =>
 2     val monitor: Stream[Task,Nothing] =
 3       s.discrete.through(log("s updated>")).drain  4     val data: Stream[Task,Int] =
 5       Stream.range(10,16).through(randomDelay(1.second))  6     val writer: Stream[Task,Unit] =
 7       data.evalMap {d => s.set(d)}  8  monitor merge writer  9   }.run.unsafeRun                                 //> s updated>0 10                                                   //| s updated>10 11                                                   //| s updated>11 12                                                   //| s updated>12 13                                                   //| s updated>13 14                                                   //| s updated>14 15                                                   //| s updated>15

async.queue使用示範:

 1 Stream.eval(async.boundedQueue[Task,Int](5)).flatMap {q =>
 2     val monitor: Stream[Task,Nothing] =
 3       q.dequeue.through(log("dequeued>")).drain  4     val data: Stream[Task,Int] =
 5       Stream.range(10,16).through(randomDelay(1.second))  6     val writer: Stream[Task,Unit] =
 7  data.to(q.enqueue)  8  monitor mergeHaltBoth writer  9 
10   }.run.unsafeRun                                 //> dequeued>10 11                                                   //| dequeued>11 12                                                   //| dequeued>12 13                                                   //| dequeued>13 14                                                   //| dequeued>14 15                                                   //| dequeued>15

fs2還在time包裏提供了一些定時自動產生數據的函數和類型。咱們用一些代碼來示範它們的用法:

1 time.awakeEvery[Task](1.second) 2    .through(log("time:")) 3    .take(5).run.unsafeRun                         //> time:1002983266 nanoseconds 4                                                   //| time:2005972864 nanoseconds 5                                                   //| time:3004831159 nanoseconds 6                                                   //| time:4002104307 nanoseconds 7                                                   //| time:5005091850 nanoseconds

awakeEvery產生的是一個無窮數據流,因此咱們用take(5)來取前5個元素。咱們也可讓它運算5秒鐘:

1  val tick = time.awakeEvery[Task](1.second).through(log("time:")) 2     //> tick : fs2.Stream[fs2.Task,scala.concurrent.duration.FiniteDuration] = Segment(Emit(Chunk(()))).flatMap(<function1>).flatMap(<function1>).flatMap(<function1>)
3  tick.run.unsafeRunFor(5.seconds)                 //> time:1005685270 nanoseconds 4                                                   //| time:2004331473 nanoseconds 5                                                   //| time:3005046945 nanoseconds 6                                                   //| time:4002795227 nanoseconds 7                                                   //| time:5002807816 nanoseconds 8                                                   //| java.util.concurrent.TimeoutException

若是咱們但願避免TimeoutException,能夠用Task.schedule:

 1 val tick = time.awakeEvery[Task](1.second).through(log("time:"))  2    //> tick : fs2.Stream[fs2.Task,scala.concurrent.duration.FiniteDuration] = Seg
 3 ment(Emit(Chunk(()))).flatMap(<function1>).flatMap(<function1>).flatMap(<function1>)  4  tick.interruptWhen(Stream.eval(Task.schedule(true,5.seconds)))  5       .run.unsafeRun                              //> time:1004963839 nanoseconds  6                                                   //| time:2005325025 nanoseconds  7                                                   //| time:3005238921 nanoseconds  8                                                   //| time:4004240985 nanoseconds  9                                                   //| time:5001334732 nanoseconds 10                                                   //| time:6003586673 nanoseconds 11                                                   //| time:7004728267 nanoseconds 12                                                   //| time:8004333608 nanoseconds 13                                                   //| time:9003907670 nanoseconds 14                                                   //| time:10002624970 nanoseconds

最直接的方法是用fs2的tim.sleep:

1  (time.sleep[Task](5.seconds) ++ Stream.emit(true)).runLog.unsafeRun 2                                                   //> res14: Vector[Boolean] = Vector(true)
3  tick.interruptWhen(time.sleep[Task](5.seconds) ++ Stream.emit(true)) 4     .run.unsafeRun                                //> time:1002078506 nanoseconds 5                                                   //| time:2005144318 nanoseconds 6                                                   //| time:3004049135 nanoseconds 7                                                   //| time:4002963861 nanoseconds 8                                                   //| time:5000088103 nanoseconds
相關文章
相關標籤/搜索