Scalaz(53)- scalaz-stream: 程序運算器-application scenario

    從上面多篇的討論中咱們瞭解到scalaz-stream表明一串連續無窮的數據或者程序。對這個數據流的處理過程就是一個狀態機器(state machine)的狀態轉變過程。這種模式與咱們一般遇到的程序流程很類似:經過程序狀態的變化來推動程序進展。傳統OOP式編程多是經過一些全局變量來記錄當前程序狀態,而FP則是經過函數組合來實現狀態轉變的。這個FP模式講起來有些模糊和抽象,但實際上經過咱們前面長時間對FP編程的學習瞭解到FP編程講究避免使用任何局部中間變量,更不用說全局變量了。FP程序的數據A是包嵌在算法F[A]內的。FP編程模式提供了一整套全新的數據更新方法來實現對F[A]中數據A的操做。對許多編程人員來說,FP的這種編程方式會顯得很彆扭、不容易掌握。若是咱們仔細觀察分析,會發覺scalaz-stream就是一種很好的FP編程工具:它的數據也是不可變的(immutable),而且是包嵌在高階類型結構裏的,是經過Process狀態轉變來標示數據處理過程進展的。scalaz-stream的數據處理是有序流程,這樣能夠使咱們更容易分析理解程序的運算過程,它的三個大環節包括:數據源(source),數據傳換(transducer)及數據終點(Sink/Channel)能夠很形象地描繪一個程序運算的全過程。scalaz-stream在運算過程當中的並行運算方式(parallel computaion)、安全資源使用(resource safety)和異常處理能力(exception handling)是實現泛函多線程編程最好的支持。咱們先來看看scalaz-stream裏的一個典型函數:java

/** * Await the given `F` request and use its result. * If you need to specify fallback, use `awaitOr` */ def await[F[_], A, O](req: F[A])(rcv: A => Process[F, O]): Process[F, O] = awaitOr(req)(Halt.apply)(rcv) /** * Await a request, and if it fails, use `fb` to determine the next state. * Otherwise, use `rcv` to determine the next state. */ def awaitOr[F[_], A, O](req: F[A])(fb: EarlyCause => Process[F, O])(rcv: A => Process[F, O]): Process[F, O] = Await(req,(r: EarlyCause \/ A) => Trampoline.delay(Try(r.fold(fb,rcv))))

這個await函數能夠說是一個表明完整程序流程的典範。注意,awaitOr裏的Await是個數據結構。這樣咱們在遞歸運算await時能夠避免StackOverflowError的發生。req: F[A]表明與外界交互的一個運算,如從外部獲取輸入、函數rcv對這個req產生的運算結果進行處理並設定程序新的狀態。算法

1 import scalaz.stream._ 2 import scalaz.concurrent._ 3 object streamApps { 4 import Process._ 5   def getInput: Task[Int] = Task.delay { 3 }      //> getInput: => scalaz.concurrent.Task[Int]
6   val prg = await(getInput)(i => emit(i * 3))     //> prg : scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await(scalaz.concurrent.Task@4973813a,<function1>,<function1>)
7   prg.runLog.run                                  //> res0: Vector[Int] = Vector(9)
8 }

這是一個一步計算程序。咱們能夠再加一步:編程

1  val add10 = await1[Int].flatMap{i => emit(i + 10)} 2                                                   //> add10 : scalaz.stream.Process[[x]scalaz.stream.Process.Env[Int,Any]#Is[x],Int] = Await(Left,<function1>,<function1>)
3   val prg1 = await(getInput)(i => emit(i * 3) |> add10) 4                                                   //> prg1 : scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await(scalaz.concurrent.Task@6737fd8f,<function1>,<function1>)
5   prg1.runLog.run                                 //> res0: Vector[Int] = Vector(19)

add10是新增的一個運算步驟,是個transducer因此調用了Process1的函數await1,並用pipe(|>)來鏈接。實際上咱們能夠用組合方式(compose)把add10和prg組合起來:數組

1 val prg3 = prg |> add10                         //> prg3 : scalaz.stream.Process[scalaz.concurrent.Task,Int] = Append(Halt(End) ,Vector(<function1>))
2   prg3.runLog.run                               //> res1: Vector[Int] = Vector(19)

咱們一樣能夠增長一步輸出運算:安全

1  val outResult: Sink[Task,Int] = sink.lift { i => Task.delay{println(s"the result is: $i")}} 2                                                   //> outResult : scalaz.stream.Sink[scalaz.concurrent.Task,Int] = Append(Emit(Vector(<function1>)),Vector(<function1>))
3   val prg4 = prg1 to outResult                    //> prg4 : scalaz.stream.Process[[x]scalaz.concurrent.Task[x],Unit] = Append(Halt(End),Vector(<function1>, <function1>))
4   prg4.run.run                                    //> the result is: 19

 scalaz-stream的輸出類型是Sink,咱們用to來鏈接。那麼若是須要不斷重複運算呢:數據結構

 1 import scalaz._  2 import Scalaz._  3 import scalaz.concurrent._  4 import scalaz.stream._  5 import Process._  6 object streamAppsDemo extends App {  7   def putLine(line: String) = Task.delay { println(line) }  8   def getLine = Task.delay { Console.readLine }  9   val readL = putLine("Enter:>").flatMap {_ => getLine} 10   val readLines = repeatEval(readL) 11   val echoLine = readLines.flatMap {line => eval(putLine(line))} 12  echoLine.run.run 13 }

這是一個無窮運算程序:不停地把鍵盤輸入迴響到顯示器上。下面是一些測試結果:多線程

1 Enter:>
2 hello world!
3 hello world!
4 Enter:>
5 how are you?
6 how are you?
7 Enter:>

固然,咱們也能夠把上面的程序表達的更形象些:app

1   val outLine: Sink[Task,String] = constant(putLine _).toSource 2   val echoInput: Process[Task,Unit] = readLines to outLine 3   //echoLine.run.run
4   echoInput.run.run 

用to Sink來表述可能更形象。這個程序沒有任何控制:甚至沒法有意識地退出。咱們試着加一些控制機制:框架

 1   def lines: Process[Task,String] = {  2     def go(line: String): Process[Task,String] = 
 3  line.toUpperCase match {  4           case "QUIT" => halt  5           case _ => emit(line) ++ await(readL)(go)  6  }  7     await(readL)(go)  8  }  9   
10   val prg = lines to outLine 11   prg.run.run 

在rcv函數裏檢查輸入是否quit,若是是就halt,不然重複運算await。如今能夠控制終止程序了。函數

下面再示範一下異常處理機制:看看能不能有效的捕捉到運行時的錯誤:

1   def mul(i: Int) = await1[String].flatMap { s => emit((s.toDouble * i).toString) }.repeat 2   val prg = (lines |> mul(5)) to outLine 3   prg.run.run 

加了個transducer mul(5),若是輸入是可轉變爲數字類型的就乘5否者會異常退出。下面是一些測試場景:

 1 Enter:>
 2 5
 3 25.0
 4 Enter:>
 5 6
 6 30.0
 7 Enter:>
 8 six  9 Exception in thread "main" java.lang.NumberFormatException: For input string: "six"
10     at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2043)

咱們能夠用onFailure來捕捉任何錯誤:

1   def mul(i: Int) = await1[String].flatMap { s => emit((s.toDouble * i).toString) }.repeat 2 //val prg = (lines |> mul(5)) to outLine 
3   val prg = (lines |> mul(5)).onFailure { e => emit("invalid input!!!") } to outLine 4   prg.run.run 

如今運算結果變成了下面這樣:

1 Enter:>
2 5
3 25.0
4 Enter:>
5 6
6 30.0
7 Enter:>
8 six 9 invalid input!!!

 證實咱們捕捉並處理了錯誤。一個完整安全的程序還必須具有自動過後清理的功能。這項能夠經過onComplete來實現:

1   def mul(i: Int) = await1[String].flatMap { s => emit((s.toDouble * i).toString) }.repeat 2 //val prg = (lines |> mul(5)) to outLine 
3   val prg = (lines |> mul(5)).onFailure { e => emit("invalid input!!!") } 4   val prg1 = prg.onComplete{ Process.eval(Task.delay {println("end of program"); ""}) } to outLine 5   prg1.run.run 

測試結果以下:

 1 Enter:>
 2 5
 3 25.0
 4 Enter:>
 5 6
 6 30.0
 7 Enter:>
 8 six  9 invalid input!!!
10 end of program

再有一個值得探討的就是這些程序的組合集成。scalaz-stream就是存粹的泛函類型,那麼基於scalaz-stream的程序就天然具有組合的能力了。咱們能夠用兩個獨立的程序來示範Process程序組合:

 1 import scalaz._  2 import Scalaz._  3 import scalaz.concurrent._  4 import scalaz.stream._  5 import Process._  6 object prgStream extends App {  7   def prompt(prmpt: String) = Task.delay { print(prmpt) }  8   def putLine(line: String) = Task.delay { println(line) }  9   def getLine = Task.delay { Console.readLine } 10   val readLine1 = prompt("Prg1>:").flatMap {_ => getLine} 11   val readLine2 = prompt("Prg2>:").flatMap {_ => getLine} 12   val stdOutput = constant(putLine _).toSource 13   def multiplyBy(n: Int) = await1[String].flatMap {line => 
14       if (line.isEmpty) halt 15       else emit((line.toDouble * n).toString) 16  }.repeat 17   val prg1: Process[Task,String] = { 18     def go(line: String): Process[Task,String] = line.toUpperCase match { 19       case "QUIT" => halt 20       case _ => emit(line) ++ await(readLine1)(go) 21  } 22     await(readLine1)(go) 23   }.onComplete{ Process.eval(Task.delay {println("end of program1"); ""}) } 24   val prg2: Process[Task,String] = { 25     def go(line: String): Process[Task,String] = line.toUpperCase match { 26       case "QUIT" => halt 27       case _ => emit(line) ++ await(readLine2)(go) 28  } 29     await(readLine2)(go) 30   }.onComplete{ Process.eval(Task.delay {println("end of program2"); ""}) } 31   val program1 = (prg1 |> multiplyBy(3) to stdOutput) 32   val program2 = (prg2 |> multiplyBy(5) to stdOutput) 33   
34   (program1 ++ program2).run.run 35   
36 } 

由於program的類型是Process[Task,String],因此咱們能夠用++把它們鏈接起來。同時咱們應該看到在program的造成過程當中transducer multiplyBy是如何用|>與prg組合的。如今咱們看看測試運算結果:

 1 Prg1>:3
 2 9.0
 3 Prg1>:4
 4 12.0
 5 Prg1>:quit  6 end of program1  7 Prg2>:5
 8 25.0
 9 Prg2>:6
10 30.0
11 Prg2>:quit 12 end of program2

咱們看到程序是按照流程走的。下面再試個流程控制程序分發(dispatching)的例子:

 1  val program1 = (prg1 |> multiplyBy(3) observe stdOutput)  2   val program2 = (prg2 |> multiplyBy(5) observe stdOutput)  3   
 4   //(program1 ++ program2).run.run
 5   val getOption = prompt("Enter your choice>:").flatMap {_ => getLine }  6   val mainPrg: Process[Task,String] = {  7      def go(input: String): Process[Task,String] = input.toUpperCase match {  8        case "QUIT" => halt  9        case "P1" => program1 ++ await(getOption)(go) 10        case "P2" => program2 ++ await(getOption)(go) 11        case _ => await(getOption)(go) 12  } 13      await(getOption)(go) 14   }.onComplete{ Process.eval(Task.delay {println("end of main"); ""}) } 15   
16   mainPrg.run.run

咱們先把program1和program2的終點類型Sink去掉。用observe來實現數據複製分流。這樣program1和program2的結果類型才能與await的類型相匹配。咱們能夠測試運行一下:

 1 Enter your choice>:p2  2 Prg2>:3
 3 15.0
 4 Prg2>:5
 5 25.0
 6 Prg2>:quit  7 end of program2  8 Enter your choice>:p1  9 Prg1>:3
10 9.0
11 Prg1>:6
12 18.0
13 Prg1>:quit 14 end of program1 15 Enter your choice>:wat 16 Enter your choice>:oh no 17 Enter your choice>:quit 18 end of main

scalaz-stream是一種泛函類型。咱們在上面已經示範了它的函數組合能力。固然,若是程序的類型是Process,那麼咱們能夠很容易地用merge來實現並行運算。

scalaz-stream做爲一種程序運算框架能夠輕鬆實現FP程序的組合,那麼它成爲一種安全穩定的泛函多線程編程工具就會是很好的選擇。

相關文章
相關標籤/搜索