泛函編程(18)-泛函庫設計-並行運算組件庫

    做爲專業的編程人員,咱們常常會由於工做須要創建一些工具庫。所謂工具庫就是針對工做上常常會遇到的一些共性問題預先編制的由一整套函數所組成的函數庫。一般這些工具庫的功能都是在特別定製的一些數據類型支持下由一系列函數圍繞着這些數據類型進行運算而實現的。在泛函編程範疇內也不例外。但在泛函工具庫裏的函數則更重視函數的組合能力(functional composition);於是泛函的工具庫通常稱爲組件庫(combinator library),庫內函數則被稱之爲組件(combinator)。組件庫的設計者對函數設計有着共通的最基本目標:經過對組件進行各類函數組合能夠實現更大的功能。泛函組件庫設計通常針對特別的功能需求或課題:首先嚐試用一些數據類型來表述課題需求,而後圍繞這些特製的數據類型設計一系列函數針對課題各個最基本需求範疇提供解決方法。咱們在這節討論中從一個並行運算組件庫的設計過程來介紹泛函組件庫設計模式。java

    咱們設計這個並行運算組件庫的目的:能夠把一個普通運算放到另一個獨立的線程(thread)中去運行。這樣咱們能夠同時把多個運算分別放到多個線程中同時運行從而達到並行運算的目的。問題簡單明確,但如何對這些在各自獨立運行空間的運算進行組合(composition)、變形(transformation)則值得仔細思量。程序員

先從數據類型着手:一個並行運算應該像是一個容器,把一個普通運算封裝在裏面。咱們來隨便造個結構出來:Par[A],A是普通運算返回的結果類型。這個Par類型很像咱們前面接觸的高階類型,那個承載A類型元素的管子類型。若是這樣去想的話,咱們能夠用前面全部針對高階類型的函數對管子內的元素A進行操做處理。那麼若是一個運算是封裝在Par裏在另外一個線程中運算完成後老是須要一個方法把結果取出來。這樣咱們能夠先得出兩個最基本的函數:算法

1 def unit[A](a: A): Par[A]    //把一個普通運算注入Par。把A升格到一個並行運算
2 def get[A](pa: Par[A]): A    //把並行運行結果抽取出來

下一個問題是運行線程控制:是由程序員來決定一個運算該放到一個新的線程裏仍是固定每個運算都用新的獨立線程?假設咱們選擇用由程序員調用一個函數來肯定產生新線程。這樣有兩個優越:一、能夠有更靈活的並行運算策略(有些已經肯定很快完成的運算可能沒有必要用新的線程,獨立線程運算可能消耗更多的資源);二、獨立線程機制和並行運算是鬆散耦合的:Par的實現中不須要了解線程管理機制。這個函數的款式以下:編程

def fork[A](pa: Par[A]): Par[A]  //爲pa設定一個新的運行空間。並不改變pa,仍是返回Par[A]

那麼把一個運算放到一個新的線程裏運行能夠用這個函數表達:設計模式

def async[A](a: => A): Par[A] = fork(unit(a))  //不須要了解任何關於Par的信息。知道fork會爲這個運算設定新的運行空間。注意仍是返回Par[A]

由於咱們追求的是線程機制和並行運算的鬆散耦合,那麼咱們就不會在Par裏實際進行並行運算的運行,那麼Par就只是對一個並行運算的描述。fork的返回仍是Par,只是增長了對運算環境的描述,也不會真正運行算法。這樣來講Par若是是一個運算描述,那麼咱們就須要一個真正的運行機制來獲取運算結果了:數組

1 def run[A](pa: Par[A]): A    //因爲Par的意義從容器變成運算描述,咱們把get從新命名爲run

咱們就須要在run的函數實現方法裏進行線程管理、計算運行等真正Par的運行了。多線程

如今Par的表達形式包括以下:async

1 def unit[A](a: A): Par[A]                      //把一個普通運算注入Par。把A升格到一個並行運算描述
2 def fork[A](pa: Par[A]): Par[A]                //爲pa設定一個新的運行空間。返回的結果Par必須經run來運行並獲取結果
3 def async[A](a: => A): Par[A] = fork(unit(a))  //不須要了解任何關於Par的信息。注意仍是返回Par[A]
4 def run[A](pa: Par[A]): A                      //運行pa並抽取運算結果

應該是在v1.6之後吧,java API包含了java.util.concurrent包,其中包括了ExecutorService類提供線程管理方面的支持。ExecutorService和Future類翻譯成scala以下: 函數

class ExecutorService { def submit[A](a: Callable[A]): Future[A] } trait Future[A] { def get: A def get(timeout: Long, unit: TimeUnit): A def cancel(evenIfRunning: Boolean): Boolean def isDone: Boolean def isCancelled: Boolean }

咱們不須要進入多線程編程底層細節,用java Concurrent ExecutorService足夠了。ExecutorService提供了以Callable形式向系統提交需運算任務方式;系統當即返回Future,咱們能夠用Future.get以鎖定線程方式讀取運算。因爲運算結果讀取是以鎖定線程(blocking)形式進行的,那麼使用get的時間節點就很重要了:若是提交一個運算後下一步直接get就會當即鎖定線程直至運算完成,那咱們就沒法獲得任何並行運算效果了。Future還提供了運行狀態和中斷運行等功能爲編程人員提供更強大靈活的運算控制。爲了獲取更靈活的控制,Par的返回值應該從直接鎖定線程讀取A改爲不會產生鎖定線程效果的Future:工具

1 type Par[A] = ExecutorService => Future[A] 2 def run[A](es: ExecutorService)(pa: Par[A]): Future[A] = pa(es)

如今Par的含義又從一個數據類型變成了一個函數描述了:傳入一個ExecutorService,返回Future。咱們能夠用run來運行這個函數,系統會當即返回Future,無需任何等待。

下面讓咱們把這些最基本的函數都實現了:

 

 1 object par {  2 import java.util.concurrent._  3 
 4 type Par[A] = ExecutorService => Future[A]  5 def run[A](es: ExecutorService)(pa: Par[A]): Future[A] = pa(es)  6                                                   //> run: [A](es: java.util.concurrent.ExecutorService)(pa: ch7.par.Par[A])java.u  7                                                   //| til.concurrent.Future[A]
 8 
 9 def unit[A](a: A): Par[A] = es => { 10     new Future[A] { 11         def get: A = a 12         def isDone = true
13         def isCancelled = false
14         def get(timeOut: Long, timeUnit: TimeUnit): A = get
15         def cancel(evenIfRunning: Boolean): Boolean = false
16  } 17 }                                                 //> unit: [A](a: A)ch7.par.Par[A]
18 def fork[A](pa: Par[A]): Par[A] = es => { 19     es.submit[A](new Callable[A] { 20       def call: A = run(es)(pa).get
21  }) 22 }                                                 //> fork: [A](pa: ch7.par.Par[A])ch7.par.Par[A]
23 def async[A](a: => A): Par[A] = fork(unit(a))     //> async: [A](a: => A)ch7.par.Par[A]
24 
25 val a = unit(4+7)                                 //> a : ch7.par.Par[Int] = <function1>
26 val b = async(2+1)                                //> b : ch7.par.Par[Int] = <function1>
27 val es = Executors.newCachedThreadPool()          //> es : java.util.concurrent.ExecutorService = java.util.concurrent.ThreadPool 28                                                   //| Executor@71be98f5[Running, pool size = 0, active threads = 0, queued tasks = 29                                                   //| 0, completed tasks = 0]
30 run(es)(b).get                                    //> res0: Int = 3
31 run(es)(a).get                                    //> res1: Int = 11
32 es.shutdown() 33 
34 }

 

從應用例子裏咱們能夠了解線程的管理是由現有的java工具提供的(Executors.newCachedThreadPool),咱們無須瞭解線程管理細節。咱們同時肯定了線程的管理機制與咱們設計的並行運算Par是鬆散耦合的。

注意:unit並無使用ExecutorService es, 而是直接返回一個註明完成運算(isDone=true)的Future,這個Future的get就是unit的傳入參數a。若是咱們再用這個Future的get來得取表達式的運算結果的話,這個運算是在當前主線程中運行的。async經過fork選擇新的線程;並向新的運行環境提交了運算任務。咱們來分析一下運算流程:

一、val a = unit(4+7),unit構建了一個完成的 new Future; isDone=true,設置了 Future.get = 4 + 7,run(es)(a)在主線程中對錶達式 4+7 進行了運算並得取結果 11。

二、val b = async(2+1) >>> fork(unit(2+1)), run(es)(b) >>> submit(new Callable), 注意 def call = run(es)(b).get : 這裏提交的運算run(es)(b).get實際上又提交了一次運算並直接鎖定線程(blocking)等待讀取運算結果。第一次提交Callable又須要鎖定線程等待提交運算完成計算。若是線程池只能提供一個線程的話,第一次提交了Callable會佔用這個惟一的線程並等待第二次提交運算得出的結果,因爲沒有線程能夠提供給二次提交運算,這個運算永遠沒法獲得結果,那麼run(es)(b).get就會產生死鎖了(dead lock)。


    咱們在這節介紹了一個簡單的泛函並行組件庫設計,能夠把一個運算放到主線程以外的另外一個新的線程中計算。可是抽取運算結果卻仍是會鎖定線程(blocking)。咱們下一節將會討論如何經過一些算法函數來實現並行運算。

相關文章
相關標籤/搜索