責任鏈模式Scala的7種實現

責任鏈模式是經典的GoF 23種設計模式之一,也許你已經瞭解這種模式。無論你是否熟悉,建議讀者在閱讀本文以前,不妨先思考下面三個問題:react

(1) 如何用多種風格迥異的編程範式來實現責任鏈模式?
(2) 可否讓責任鏈上的結點多任務併發執行?
(3) 可否把責任鏈部署到分佈式環境下,分佈在世界各地的多臺計算機,經過某種方式構成一條責任鏈,協同工做,可否作到呢?

引言程序員

責任鏈鐵索連環
無模式的實現方式面向過程風格
面向對象風格OOP
1 解法一用模板方法模式實現責任鏈模式
2 解法二用策略模式實現責任鏈模式
函數式風格Functional Programming
1 解法三用一等公民函數替換策略模式實現責任鏈模式
2 解法四用偏應用函數實現責任鏈模式
21 替換Node類
22 爲函數升階函數的Curry化爲更高階的函數
23 爲函數降階偏應用函數
3 解法五用偏函數實現責任鏈模式
31 偏函數我不完美但咱們很完美
32 用偏函數實現責任鏈模式
響應式風格Reactive Programming
1 響應式思惟別拉進來推出去
2 解法六用Actor模型實現責任鏈模式
21 Actor模型簡介
22 Akka一個支持高併發分佈式計算消息驅動的Actor庫
23 爲責任鏈上的處理者結點定義Actor類
24 建立actor對象
25 連環計建立基於actor的責任鏈
26 如何禮貌地向別人提問題
27 對將來值的聰明響應我不理你但當你有結果的時候必定要告訴我
28 併發世界也有秩序
3 解法七用RXReactive eXtension響應式擴展實現責任鏈模式
4 兩種響應式編程方式的比較
回顧與總結

 

本文對責任鏈模式給出7種不一樣實現方式,其中:編程

OOP(面向對象編程):有2種採用面向對象編程範式,即:
(1)用模板方法模式實現責任鏈模式(第3.1節)
(2)用策略模式實現責任鏈模式(第3.2節)設計模式

一種採用混合OOP和FP的編程範式,即:
(3)用一等公民函數替換策略模式實現責任鏈模式(第4.1節)數組

FP(函數式編程):有2種採用純函數式編程範式,即:
(4)用偏應用函數實現責任鏈模式(第4.2節)
(5)用偏函數實現責任鏈模式(第4.3節)安全

RP(響應式編程):最後2種採用響應式編程範式,即:
(6)用Actor模型實現責任鏈模式(第5.2節)
(7)用RX響應式擴展實現責任鏈模式(第5.3節)性能優化

1. 責任鏈:鐵索連環


某用戶購買了一款軟件,用了幾天就crash,重要在數據丟失。用戶很生氣,後果很嚴重。因而打電話給軟件公司客服投訴。客服若是不能解決,會問經理,若經理也不能解決或者認爲不規他解決,會轉給工程師解決,這樣客服、經理、工程師就構成了一條責任鏈。bash

記用戶反映在問題爲Input,每一個問題Input都用各自不一樣的業務值value(可由一個字符表示),用Scala代碼描述爲:網絡

case class Input(value: Char)

記解答爲輸出Output,帶字符串信息value。多線程

case class Output(value: String)

舒適提示:Scala中的case類經常使用於定義數據類和消息事件。別小看上面這一行代碼,它定義了不少東西: 

(1)定義了類Output,實現序列化接口 
(2)定義類裏有個不可變成員value,類型爲String 
(3)提供帶參數的構造函數,參數爲value,類型爲String,實現方式是爲不可變成員value賦值 
(4)提供不可變成員value的get方法,方法名爲value。 
(5)覆蓋toString方法並提供可讀信息 
(6)覆蓋hashCode方法並實現 
(7)覆蓋equals方法並實現 
(8)還有其餘方法,如copy, apply, unapply… 

  

若用Java寫,要寫很長的代碼,以下:

public class Output implements Serializable {
private final String value;
public Output(String value) { this.value = value; }
public String value() { return value; }
@Override public String toString() { ... }
@Override public int hashCode() { ... }
@Override public boolean equals(Output that) { ... }
還有其餘方法,如copy, apply, unapply...
}

假設客服只負責處理業務值爲a, w;處理問題在方式是返回信息:」Customer Service handles 」 + 業務值。即:

def canHandleByCustomService(request: Input) = "aw" contains request.value
def handleByCustomService(request: Input) = Output("Customer Service handles " + request.value)

 

相似的,經理只負責處理業務值爲b, w, z;工程師只負責處理業務值爲c, z。即:

def canHandleByManager(request: Input) = "bwz" contains request.value
def handleByManager(request: Input) = Output("Manager handles " + request.value)
def canHandleByEngineer(request: Input) = "cz" contains request.value
def handleByEngineer(request: Input) = Output("Engineer handles " + request.value)

2. 無模式的實現方式(面向過程風格)

根據前面描述,處理過程可由一系列條件分支來描述:

object NoDp {
def handle(request: Input): Option[Output] =
if (canHandleByCustomService(request)) {
Some(handleByCustomService(request))
} else if (canHandleByManager(request)) {
Some(handleByManager(request))
} else if (canHandleByEngineer(request)) {
Some(handleByEngineer(request))
} else {
None
}
}

返回值爲Option類型,當有解答時爲其子類Some並給出結果,不然是None。

這種實現方式雖然簡單,但存在如下缺點:

擴展性差,擴展須要增長else if語句,違反開閉原則。
靈活性差,不能隨意動態調整調用順序。
給用戶代碼暴露太多細節,用戶須要知道哪些人能處理哪些事,以及處理的順序。

下面給出7種不一樣責任鏈模式在實現,都能解決以上問題。

3. 面向對象風格(OOP)


3.1 解法一:用模板方法模式實現責任鏈模式
定義責任鏈上在結點類:

abstract class HandlerNode 

存在可變成員變量指向後續結點

var next: HandlerNode = _

處理過程就是能處理則處理,不然遞歸向後傳:

@tailrec final def handle(request: Input): Option[Output] = {
 if (canHandle(request)) Some(doHandle(request))
 else if (next == null) None
 else next handle request
}

 

舒適提示:@tailrec代表該函數使用了尾遞歸優化。雖然不上加@tailrec,Scala編譯器也會對任何能夠尾遞歸優化的代碼進行尾遞歸優化,可是加上@tailrec以後,若是這個函數不能作尾遞歸優化,那是編譯不過的。有了@tailrec,媽媽不再用擔憂這個函數能不能作了尾遞歸優化了。

上面在處理過程是個模板(用到模板方式模式),用到兩個抽象方法canHandle和doHandle:

protected def canHandle(request: Input): Boolean
protected def doHandle(request: Input): Output

客服,經理,工程師,屬於不一樣處理結點,也就是繼承HandlerNode類,並提供各自不一樣的實現:

class CustomerService extends HandlerNode {
override protected def canHandle(request: Input): Boolean = canHandleByCustomService(request)
override protected def doHandle(request: Input): Output = handleByCustomService(request)
}

class Manager extends HandlerNode {
override protected def canHandle(request: Input): Boolean = canHandleByManager(request)
override protected def doHandle(request: Input): Output = handleByManager(request)
}

class Engineer extends HandlerNode {
override protected def canHandle(request: Input): Boolean = canHandleByEngineer(request)
override protected def doHandle(request: Input): Output = handleByEngineer(request)
}

類圖以下:


建立上面3個處理結點:

val customerService = new CustomerService
val manager = new Manager
val engineer = new Engineer

由多個結點的不一樣順序構成不一樣的責任鏈:

customerService.next = manager
manager.next = engineer

如今用戶只需投訴給客服就行,無需知道處理細節:

def handle(request: Input): Option[Output] = {
 customerService handle request
}

假如連續收到一連串投訴碼」cafebabe wenzhe」,對於每一個投訴,打印處理結果,若是沒人處理的業務則不打印:

val cafebabe = "cafebabe wenzhe"
 println("----- Oop1: inherit -----")
 for (ch <- cafebabe) {
  Oop1 handle Input(ch) map (_.value) foreach println
}

 

輸出正確結果:

Engineer handles c
Customer Service handles a
Manager handles b
Customer Service handles a
Manager handles b
Customer Service handles w
Manager handles z

  

3.2 解法二:用策略模式實現責任鏈模式
上面的實現中,用到模板方法模式,用到繼承,處理者與結點是同一個類層次中;如今換一種方式:用策略模式,用聚合。
將處理者與結點分開。

咱們首先定義處理者接口:

trait Handler {
 def canHandle(request: Input): Boolean
 def handle(request: Input): Output
}

客服,經理,工程師分別給出不一樣的處理者實現:

class CustomerService extends Handler {
 def canHandle(request: Input): Boolean =     canHandleByCustomService(request)
 def handle(request: Input): Output = handleByCustomService(request)
}

class Manager extends Handler {
 def canHandle(request: Input): Boolean = canHandleByManager(request)
 def handle(request: Input): Output = handleByManager(request)
}

class Engineer extends Handler {
 def canHandle(request: Input): Boolean = canHandleByEngineer(request)
 def handle(request: Input): Output = handleByEngineer(request)
}

 

結點除了帶有可變在nextNode外,還聚合了一個處理者(不可變,構造時傳入):

class Node(handler: Handler) {
var nextNode: Node = _

處理過程與上相似:

@tailrec final def handle(request: Input): Option[Output] = {
 if (handler canHandle request) Some(handler handle request)
 else if (nextNode == null) None
 else nextNode handle request
 }
}

類圖以下:

 


建立3個處理結點: 客服,經理,工程師

val customerService = new Node(new CustomerService)
val manager = new Node(new Manager)
val engineer = new Node(new Engineer)

構成責任鏈:

customerService.nextNode = manager
manager.nextNode = engineer

 

如今用戶只需投訴給A(客服)就行,無需知道處理細節:

def handle(request: Input): Option[Output] = {
 customerService handle request
}

 

假如連續收到一連串投訴碼」cafebabe wenzhe」,對於每一個投訴,打印處理結果,若是沒人處理的業務則不打印:

val cafebabe = "cafebabe wenzhe"
 println("----- Oop2: aggregation -----")
 for (ch <- cafebabe) {
 Oop2 handle Input(ch) map (_.value) foreach println
}

輸出正確結果:

Engineer handles c
Customer Service handles a
Manager handles b
Customer Service handles a
Manager handles b
Customer Service handles w
Manager handles z

 

4. 函數式風格(Functional Programming)


大部分程序員都熟悉面對對象,而不熟悉函數式編程,爲了讓面向對象程序員容易理解,咱們先從上面3.2節在OOP實現方式出發,一步一步地重構爲FP。

4.1 解法三:用一等公民函數替換策略模式實現責任鏈模式
OOP經過抽取接口,構建具體子類,利用多態實現各類設計模式。從FP在角度,接口Handler的做用也可看做是提供兩個函數的東西。那麼,能夠將3.2節代碼中接口Handler及其子類去掉,把接口中的兩個方法做爲類Node的不可變成員變量(構造函數傳入),那麼類Node變爲:

class Node(canHandle: Input => Boolean, doHandle: Input => Output) {
 var nextNode: Node = _
 @tailrec final def handle(request: Input): Option[Output] = {
  if (canHandle(request)) Some(doHandle(request))
  else if (nextNode == null) None
 else nextNode handle request
 }
}

類圖以下,只有一個Node類,簡單不少吧:

 


相應的,修改構造責任鏈的代碼:

val customerService = new Node(canHandleByCustomService, handleByCustomService)
val manager = new Node(canHandleByManager, handleByManager)
val engineer = new Node(canHandleByEngineer, handleByEngineer)

其餘代碼與3.2節同樣,這裏不在提供。比較3.2節的純OOP方式,這裏的實現不只變短了,並且擴展起來更方便,不須要爲新的不一樣的處理策略再定義類,只需傳如不一樣的函數便可。

4.2 解法四:用偏應用函數實現責任鏈模式
4.2.1 替換Node類
上一小節咱們已經成功地用「一等公民」函數替換Handler接口及其子類。如今就只剩下Node類了,可否也用函數替換呢?
分析一下,Node類有3個成員變量:

(1)canHandle: Input => Boolean,它是一個函數變量 
(2)doHandle: Input => Output,它也是一個函數變量 
(3)nextNode: Node,指向下一結點的變量。

另外Node類在主要方法是handle,它有一個參數request:Input,返回輸出結果Option[Output],表明可能有結果,可能沒結果。

從FP角度,Node類沒有存在的必要,能夠寫一個函數代替它,這個函數具備下面的特色:

(1)函數的內容就是Node類handle方法的內容,須要的外部環境所有由參數提供 
(2)返回值是Option[Output] 
(3)有4個輸入參數(canHandle, doHandle, nextNode, request)

咱們能夠寫成下面的函數:

def handleByChainNode(canHandle: Input => Boolean, 
handle: Input => Output, 
nextHandle: Input => Option[Output], 
request: Input): Option[Output] = {
 if (canHandle(request)) Some(handle(request))
 else if (nextHandle == null) None
 else nextHandle(request)
}

 

4.2.2 爲函數升階:函數的Curry化爲更高階的函數
上面的函數有太多參數(4個),是一種壞味道(Code Smell),由於這樣的函數很差使用。每一個處理者(客服、經理、工程師),咱們只須要賦予canHandle,handle兩個參數;把處理者鏈起來的連接順序,只須要傳遞nextHandle參數;處理業務請求,只須要傳入request參數。那麼,如今這麼多參數,咱們很難一次性提供,那麼咱們應該怎麼用它來解決問題呢?

解決方法首先就是要減小參數個數,每減小一個參數,函數的返回值是另外一個函數(做爲返回值的函數是一等公民),那個被減小的參數做爲返回值函數的參數。好比把request參數從handleByChainNode的參數中去掉,那麼handleByChainNode須要返回另外一個函數(參數就是request),代碼以下:

def handleByChainNode(canHandle: Input => Boolean, 
handle: Input => Output, 
nextHandle: Input => Option[Output]): Input => Option[Output] = {
(request: Input) => {
 if (canHandle(request)) Some(handle(request))
 else if (nextHandle == null) None
 else nextHandle(request)
 }
}

這是一個返回函數的函數,咱們叫作高階函數。這樣寫的代碼是否是有的繞?爲了方便閱讀,scala提供語法糖,上面的代碼也能夠寫出這樣:

def handleByChainNode(canHandle: Input => Boolean, 
handle: Input => Output, 
nextHandle: Input => Option[Output])
(request: Input): Option[Output] = {
 if (canHandle(request)) Some(handle(request))
 else if (nextHandle == null) None
 else nextHandle(request)
}

這是一個二階函數,擁有兩個參數列表,第一個參數列表包含三個參數(canHandle,handle,nextHandle),另外一個參數列表只有一個參數(request)。這裏雖然返回值不是函數,而是原來的Option[Output],但從普通函數(一階函數)的角度看它的返回值就是一個函數。

如今,第一個參數列表的參數仍是太多,怎麼辦?一樣辦法,咱們能夠繼續增長參數列表,減小每一個參數列表中參數個數,最終咱們獲得下面的4階函數(有4個參數列表):

def handleByChainNode(canHandle: Input => Boolean)
(handle: Input => Output)
(nextHandle: Input => Option[Output])
(request: Input): Option[Output] = {
 if (canHandle(request)) Some(handle(request))
 else if (nextHandle == null) None
 else nextHandle(request)
}

上面這個經過增長函數階數的辦法來減小參數個數,直到每一個參數列表只有一個參數的過程,叫作函數的Curry化。那麼,這又有什麼用呢?接下來你就知道!

4.2.3 爲函數降階:偏應用函數
有了上面的Curry化後的高階函數,建立每一個處理結點就很容易了,就是調用上面「半」個handleByChainNode方法:

val customerService = handleByChainNode(canHandleByCustomService)(handleByCustomService) _
val manager = handleByChainNode(canHandleByManager)(handleByManager) _
val engineer = handleByChainNode(canHandleByEngineer)(handleByEngineer) _

之因此說「半」個方法,是由於還沒給handler函數後面兩個參數列表賦值(函數調用後面的下劃線表示沒提供數據),即沒有給參數nextHandle賦值,也沒有給request參數賦值。這在函數式編程中叫作「偏應用函數」,方便把一個高階的函數降階爲更低階的函數。這個低階函數雖然也是函數,當實際上已經包含了一些不可變狀態(就是調用高階函數時那些已經賦值的參數,這裏是canHandleByXX, handleByXX)。

如今customerService,manager,engineer都是2階函數,其第一個參數列表是(nextHandle: Input => Option[Output]),第二個參數列表是(request: Input)。咱們能夠對nextHandle參數賦值從而把這些結點鏈起來:

val handleChain = customerService(manager(engineer(null)))

若要調整順序就很簡單了,只要換一下調用順序便可。如今handleChain是一個一階函數,也就是咱們熟悉的普通函數,參數爲request,返回Option[Output],處理用戶業務請求也就是調用handleChain而已:

def handle(request: Input): Option[Output] = {
 handleChain(request)
}

測試下, 假如連續收到一連串投訴碼」cafebabe wenzhe」,對於每一個投訴,打印處理結果,若是沒人處理的業務則不打印:

val cafebabe = "cafebabe wenzhe"
 println("----- Fp1: partial apply function -----")
 for (ch <- cafebabe) {
 Fp1 handle Input(ch) map (_.value) foreach println
}

輸出正確結果:

Engineer handles c
Customer Service handles a
Manager handles b
Customer Service handles a
Manager handles b
Customer Service handles w
Manager handles z

 

如今責任鏈的實現徹底沒有類了,也不存在可變狀態(OOP中可變成員變量nextHandler在多線程環境下不是線程安全的,而如今徹底不存在可變狀態),這是本文給出的第一個純函數式實現。從代碼來看,更短更簡潔!

4.3 解法五:用偏函數實現責任鏈模式
4.3.1 偏函數:我不完美,但咱們很完美!
若一個函數只能處理其參數的某些取值範圍,而對其它取值範圍在處理沒有定義,這樣的函數不是一個完整的函數,而像是一部分函數,咱們稱之爲「部分實現函數」,或者叫「偏函數」(Partial Function)。

有點抽象吧,那我舉個例子,定義這樣一個函數:輸入參數類型是整數類型,對偶數有定義,行爲是對偶數平方,但沒爲奇數定義行爲,這顯然不是個完整的函數,而是部分實現的函數,即偏函數。

舒適提示:上一節中「偏應用函數」與本節的「偏函數」,儘管名字很像,都是不完整的函數,但倒是徹底不一樣的概念,不一樣之處在於缺乏的部分不同。函數能夠有多個參數列表,若缺乏了某個參數列表,就是「偏應用函數」,側重參數列表的不完整性;於此不一樣的是,若一個函數有且只有一個的參數,而且沒有實現對該參數有部分取值範圍的處理,就是「偏函數」,側重實現的不完整性。「偏應用函數」的好處在於將大函數(高階函數)分解爲一系列可重用的小函數(低階函數)並得到某些不可變狀態,側重於「分」;而「偏函數」的好處在於靈活組合一系列功能簡單的小函數構成功能強大的大函數,側重於「合」。

val squareEven = new PartialFunction[Int, Int] {
 def apply(x: Int) = x * x
 def isDefinedAt(x: Int) = x % 2 == 0
}

 

更方便的是用模式匹配來描述:

val squareEven: PartialFunction[Int, Int] = {
 case x: Int if x % 2 == 0 => x * x
}

case語句很容易讀,即:」當整數x對2取模爲0時,返回x的平方」。

偏函數比普通函數更小,所以代碼的可重用粒度也就更細。經過定義一些列「不完美」的偏函數,經過不一樣的組合方式,能夠產生各類不一樣的「更完美」的偏函數。下面介紹偏函數如何組合實現責任鏈模式。

4.3.2 用偏函數實現責任鏈模式
因爲客服,經理,工程師每一個人都只負責處理某些業務,他們都是部分業務的處理者,能夠用偏函數來描述:

def handler(canHandle: Input => Boolean, handle: Input => Output): PartialFunction[Input, Output] = {
 case request: Input if canHandle(request) => handle(request)
}

將客服,經理,工程師各自不一樣的責任範圍和處理方法傳入,從而產生責任鏈上的結點:

val customService = handler(canHandleByCustomService, handleByCustomService)
val manager = handler(canHandleByManager, handleByManager)
val engineer = handler(canHandleByEngineer, handleByEngineer)

 

如今customService,manager,engineer都是偏函數。偏函數雖然不完整,但能夠組合,多個不完整的偏函數能夠組合成一個更完整一點的偏函數。責任鏈的構造能夠由一系列偏函數組合而成:

val handleChain = customService orElse manager orElse engineer

能夠看到咱們很容易調整執行順序。handleChain也是偏函數,可是處理能力更強了。向handleChain輸入業務請求,直接調用這個函數:

def handle(request: Input): Output = {
  handleChain(request) // throw MatchError exception when no handler can   handle the request
}

這樣調用的話可能會拋出異常:當request取值在偏函數handleChain定義範圍以外(即責任鏈上全部結點都不能處理)時,會拋出MatchError異常。爲了跟前面其餘實現的調用方法一致,咱們但願返回一個Option[Output],若是可以處理,就返回結果Some[Output];若不能處理,返回None。

有兩種方法能夠實現:

(1)使用Try類封裝異常(無異常返回Success[Output],拋出異常返回Failure),而後再轉成Option,代碼以下:

def handle(request: Input): Option[Output] = {
 Try(handleChain(request)) toOption
}

這是一種通用的解決方式。

(2)對於Scala的偏函數,還有更專用的方式,能夠經過偏函數的lift方法把處理結果轉成Option,

def handle(request: Input): Option[Output] = {
 handleChain lift request
}

測試下, 假如連續收到一連串投訴碼」cafebabe wenzhe」,對於每一個投訴,打印處理結果,若是沒人處理的業務則不打印:

val cafebabe = "cafebabe wenzhe"
 println("----- Fp2: partial function -----")
 for (ch <- cafebabe) {
 Fp2 handle Input(ch) map (_.value) foreach println
}

輸出正確結果:

Engineer handles c
Customer Service handles a
Manager handles b
Customer Service handles a
Manager handles b
Customer Service handles w
Manager handles z

利用偏函數間的組合(orElse)方式,取代了nextHandler狀態。比起前面4種實現方式,偏函數的實現方式更加簡潔,優雅。

到目前爲止的5種責任鏈模式的實現方式,都是單線程的。那麼,請讀者考慮下面兩個問題:

(1) 可否讓責任鏈上的結點多任務併發執行? 
(2) 可否把責任鏈部署到分佈式環境下,分佈在世界各地的多臺計算機,經過某種方式構成一條責任鏈,協同工做,可否作到呢?

5. 響應式風格(Reactive Programming)


5.1 響應式思惟:別拉進來,推出去!
面向對象設計模式通常經過接口(或抽象類)對代碼進行隔離,減小代碼間的耦合度(見第3節);函數式風格實現的設計模式也是經過「接口」隔離,只不過這個「接口」更加通用,其實就是「一等公民」函數(4.1節)或者偏函數(4.2節)。不管哪一種方式,說到底都是對象間直接的方法調用,都是「拉」式的。用戶向客服投訴,客服處理,其實就是用戶代碼調用了客服類(AHandler)的handle方法(OOP,見第3節),或者調用客服處理函數(FP,見第4節),這些都是「拉」的思惟方式。用戶搞不定,因而把客服「拉」進來;客服也搞不定時,把經理「拉」進來;經理也搞不定時,把工程師「拉」進來。他們經過直接調用對方的服務接口實現交互。

與「拉」不一樣的方式是「推」,用戶搞不定,「推送」消息給客服反映問題;客服對此事件做出響應,若能搞定就把解決方案「推送」回給用戶,不然就找幫手,把消息forward給經理;一樣的,經理作出響應後若搞不定則再forward給工程師,工程師再對消息事件作出響應。每一個個體之間都是獨立的、事件消息驅動的、響應式的,他們之間經過把事件消息「推」出去來實現交互,經過響應不一樣的事件消息來作各類具體不一樣的事。這種思惟方式就是響應式思惟,更加符合天然。在天然界中,人與人的交互都是事件消息驅動、響應式處理的,人與人都是獨立個體,都能並行處理,都是分佈式的。基於這種思想的響應式編程(RP)更容易處理併發問題,更適合分佈式計算。對象之間能夠徹底獨立解耦,甚至能夠分佈在不一樣的機器上,就好像咱們能夠經過手機給地球另外一端的人交流消息同樣。

5.2 解法六:用Actor模型實現責任鏈模式
5.2.1 Actor模型簡介
一個Actor能夠比作一我的,或者比作一臺單核單任務計算機,或者比作一個企業裏的某個工做角色,雖然它只是一個很輕量級的對象而言。具備以下特性:

(1) 靈活部署:多個Actor能夠在同一進程內(這時候是併發),也能夠跨進程,還能夠跨機器跨網絡,分佈式應用。

(2) Actor內代碼執行方式:同一時間一個Actor內永遠只有一條線程在執行,所以保證了Actor內部線程安全。Actor就比如一臺單核單任務計算機。

(3) Actor間的交互方式:Actor與Actor之間的交流,就像人與人之間的交流,發送事件消息(無類型限制),響應事件並處理。不能直接調用Actor的方法,由於不一樣Actor極可能不在同一臺機器上。

(4) 有序的樹狀組織結構:多個Actor能夠構成一個社會,每一個Actor就像一個公司裏不一樣的職位,有做爲老闆的Actor管理多個部門經理Actor,經理Actor管理多個員工Actor。Actor間有監管機制,如父Actor監管子Actor。

(5) 出錯處理:不怕,讓他掛!(Let it crash!)當某一Actor掛了,其監控者(另外一Actor)會收到消息,響應方式能夠是恢復那個Actor,重啓Actor,中止Actor,也能夠把本身也掛起,或者也能夠繼續向它的上級Actor彙報,等等,Actor具備很高的容錯性,怎麼處理,徹底取決於你!

(6) Actor很是輕量級,一個應用程序能夠建立幾百萬個Actor,就像你的計算機瞬間就變成幾百萬臺計算機同樣。

(7) Actor很是適合描述現實世界中的對象,相似OOP,只是每一個對象都是Actor並具備Actor的一切優勢:線程安全、併發、分佈式、高容錯性。

  

5.2.2 Akka,一個支持高併發、分佈式計算、消息驅動的Actor庫
Akka Actor是使用scala實現的Actor模型,目前已經成爲scala的標準Actor模型。Scala之因此在併發編程方面有強大的優點,Akka Actor是其重要緣由。Akka Actor是一個分佈式計算庫,著名的大數據框架Spark底層就是用它來實現分佈式計算的。能夠在build.sbt增長依賴(相似Maven,SBT編譯時會從Maven中心倉庫遞歸下載依賴):

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.4.17"

5.2.3 爲責任鏈上的處理者結點定義Actor類
從面向對象的角度,責任鏈上的每一個結點,即客服,經理,工程師,都是描述現實世界的對象。在Actor響應式編程中,這種描述現實世界的對象很適合用Actor來定義。所以,咱們爲這些結點定義一個Actor的子類:HandlerActor(準確講是混入Actor特質的類,這裏說成子類是便於理解)。不一樣的結點,除了如下3點不一樣,其餘都是相同的:

判斷有責任處理輸入請求的函數:canHandle,類型爲:Input => Boolean
處理輸入請求的函數:handle,類型爲: Input => Output
對下一個處理者結點的引用:nextHandler,類型爲:ActorRef,由於每一個結點都是Actor。

咱們能夠把這些不一樣點做爲HandlerActor類的成員變量,對於不一樣的結點它們有不一樣的值。其中,canHandle和handle兩個成員變量能夠定義爲不可變的,做爲構造函數的輸入參數,結點構造時須要外部指定;而nextHandler,爲了方便在運行時更換結點順序,設計爲可變的。

class HandlerActor(canHandle: Input => Boolean, handle: Input => Output) extends Actor with ActorLogging {
private var nextHandler: ActorRef = _

 

這個類很像第4.1節中的Node類,只不過它繼承Actor(後面混入ActorLogging特質是爲了方便打log)。

自定義Actor類中惟一一個必須override的方法,是receive方法,它負責接收事件消息而且作出響應。

def receive = {
 case SetNextHandler(nextHandler) => this.nextHandler = nextHandler
 case handleEvent @ Handle(request) => {
  log debug s"${request.value}"
  if (canHandle(request)) sender ! Result(Some(handle(request)))
  else if (nextHandler == null) sender ! Result(None)
 else nextHandler forward handleEvent
 }
}

 

方法receive返回一個偏函數(見第4.3節),一般咱們用模式匹配來描述偏函數。上面代碼中匹配了兩個case,代表收到這兩種消息事件(無類型限制)以及各自的響應邏輯,下面對這兩種事件進行解釋:

(1) 第一種事件:SetNextHandler,顧名思義,它請求HandlerActor去設置下一個處理者結點,事件中附帶着指望設置爲下一處理者的引用。SetNextHandler事件由下面的case類來定義:

case class SetNextHandler(nextHandler: ActorRef)

 

對這類事件的響應,就是把可變成員變量nextHandler設置爲事件要求的值:

case SetNextHandler(nextHandler) => this.nextHandler = nextHandler


舒適提示:能夠這樣理解模式匹配,即把每一個case語句都當成一個方法(實際不是),會容易理解得多。好比上面的代碼,能夠想象爲:方法名爲SetNextHandler,參數爲nextHandler(類型爲ActorRef,可從case類SetNextHandler中推斷出來),方法體爲this.nextHandler = nextHandler,返回值可從方法體推斷(面向表達式編程,把方法體當成表達式)。對比一下,通常面向對象的思路會定義方法:

def setNextHandler(nextHandler: ActorRef) = this.nextHandler = nextHandler

 

對比一下,是否是很類似呢?這樣對比,可讓模式匹配很是容易理解。複雜一點,你能夠試試用這種辦法理解後面的case Handle(request) => { … }

(2) 第二種事件:Handle,顧名思義,它請求HandlerActor去處理輸入,事件中附帶着輸入請求request。Handle事件可下面的case類來定義:

case class Handle(request: Input)

對該類事件的響應,就是先判斷可否處理輸入請求,能夠的話,就處理該請求,並將處理結果加個信封,做爲表示結果(Result)的消息事件,做爲回覆,告知(tell)原信息的發送者(sender)。

if (canHandle(request)) sender tell Result(Some(handle(request)))

上面的結果Result,對處理結果進行包裝,它有下面的case類定義:

case class Result(value: Option[Output])


方法tell是actor中一個很是經常使用的方法(我估計其經常使用度排名第一),它告訴其餘actor一個消息(也能夠告訴本身,那樣能夠實現狀態模式),也就是向其餘actor發送消息。多是它太經常使用了,Akka Actor專門爲它定義操做符方法:!,讀做tell,表示發送消息。上面的代碼,更多時候是這樣寫的:

if (canHandle(request)) sender ! Result(Some(handle(request)))

若是該結點不能處理,則判斷下一結點是否存在,若不存在,就向原消息的發送者回復告知沒人能處理該請求。

else if (nextHandler == null) sender ! Result(None)

不然,把請求處理的消息事件forward給下一處理者:

else nextHandler forward handleEvent


5.2.4 建立actor對象
在建立具體Actor對象以前,須要先建立Actor系統(當不再用的時候要關閉它,不然程序不會結束,也不能放在Shutdown hook中關閉),它爲咱們提供Actor模型所需的上下文。

val system = ActorSystem("ActorSystem")


接下來,咱們能夠爲每一個處理者建立actor對象:

val customService = system actorOf (Props(new HandlerActor(canHandleByCustomService, handleByCustomService)), "customService")
val manager = system actorOf (Props(new HandlerActor(canHandleByManager, handleByManager)), "manager")
val engineer = system actorOf (Props(new HandlerActor(canHandleByEngineer, handleByEngineer)), "engineer")

注意這裏customService,manager,engineer的類型不是HandlerActor,而是ActorRef,表明對actor的引用。在Actor編程中,咱們不提倡直接引用Actor類的對象,由於這樣很容易直接調用Actor類的方法,而響應式思惟是經過發消息來通知Actor使其作出響應。ActorRef是Akka Actor爲咱們提供的抽象,它所引用的actor能夠是本地的,也能夠是遠程的,經過ActorRef的抽象讓咱們沒必要關注這些底層通訊細節,咱們只要專一於所要處理的業務就行。

雖然這裏代碼中把customService,manager,engineer這三個actor都在同一個進程裏建立了,實際上,它們也能夠在不一樣的進程、不一樣的機器上建立,在代碼中咱們能夠經過其邏輯路徑找到其餘機器上(網絡上)的actor,持有它的引用(ActorRef),使用起來的代碼更本地建立的代碼是沒有區別的。

5.2.5 連環計,建立基於actor的責任鏈
接着是把這3個處理者連接起來,構造責任鏈。不一樣於OOP,咱們不能直接調用HandlerActor的setNextHandler方法(固然咱們也沒有提供這個方法,即便有也不推薦直接調用),而應該向這些結點發消息,好比向customService發送消息,告訴他若是搞不定能夠找manager幫忙:

customService ! SetNextHandler(manager)

 

一樣的,向manager發送消息,告訴他若是連他也搞不定的話能夠找engineer幫忙:

manager ! SetNextHandler(engineer)


這樣,責任鏈就造成了。

舒適提示:要更好地理解發送消息的符號「!」以及actor間基於消息傳遞的交互方式,能夠與面向對象中直接方法調用的「.」符號作對比,即把「!」想象成「.」。好比上面的代碼,如果直接方法調用,則爲:

manager.setNextHandler(engineer)


表示直接調用bHandler的setNextHandler方法,傳遞參數cHandler。而

manager ! SetNextHandler(engineer)

表示向manager發送消息事件SetNextHandler,附帶參數engineer。

其實目的都是同樣的,很是類似吧,只是把符號」.」換成符號」!」,把普通對象換成actor對象,把方法名換成事件名,其餘都同樣,就把方法直接調用變成異步的消息事件發送的了。

5.2.6 如何禮貌地向別人提問題
因爲消息處理是異步的,咱們定義的處理函數應該返回一個表明將來值的Future,而不能等待結果處理完才返回。

def handle(request: Input): Future[Option[Output]] = {

如今用戶詢問客服,但願客服在5秒以內進行回覆:

val future = customService ask (Handle(request), Timeout(5 seconds))

這裏ask與前面的tell都是發消息,不一樣之處在於tell是說完就忘,不期待別人回覆,而ask是期待別人回覆的。可是別人可能不會馬上就回復你,也可能永遠都不給你回覆,而你也不會一直傻傻地等着他回覆,誰都不能阻塞你,不過你心理有一個超時時間,超過這個時間你就認爲他再也不回覆了。

咱們已經知道,操做符!與tell是經過意思,使用起來就好像函數直接調用。相似的,ask也有一個同義的操做符,你猜猜看是哪一個?相信你能夠猜到,就是問號操做符「?」。

另外,若是每次向人家問問題時老是加上這麼一句:「給你5秒鐘回答個人問題」,顯得很不禮貌,是否是?所以咱們把這個超時時間記在心理就行,幹嗎非得說出來呢?咱們能夠定義把它成隱式變量:

implicit val timeout = Timeout(5 seconds)

接着,問問題就禮貌不少吧:

val future = customService ? Handle(request)

當調用問號方法操做符,它會在上下文查下有沒有隱式的Timeout。沒有的話,休想編譯過!

這裏返回值future是Future[Any]類型,表明將來值,這樣不至於人家不回答你而讓你白白等上一段時間,這樣你纔不會被阻塞。

惋惜返回值future的泛型是Any,而不是咱們指望的輸出結果類型Option[Output]。Any類型能夠類比地理解爲Java裏面的Object(其實Any更強,由於它還包括基本類型,而Java的Object是不行的,因此Scala是一門徹底面向對象的語言,而Java不是)。那麼咱們須要把future轉化爲咱們但願的類型:Future[Option[Output]]。根據前面代碼中HandleActor類中對Handle事件的處理,咱們知道HandleActor會把處理結果Option[Output]封裝在信封(Result類)裏,所以這裏的Any實際上就是Result,轉成Result就能夠拿到它的value,也就是咱們指望的輸出結果了。

future.mapTo[Result] map (_.value)

如今就能夠獲得充滿期待的將來值Future[Option[Output]]了。

5.2.7 對將來值的聰明響應:「我不理你,但當你有結果的時候必定要告訴我!」
測試下, 假如連續收到一連串投訴碼」cafebabe wenzhe」,對於每一個投訴,打印處理結果,若是沒人處理的業務則不打印:

val cafebabe = "cafebabe wenzhe"
println("----- 6. Rp1: akka actor Reactive Programming -----")
val futures = cafebabe map (Rp1 handle Input(_))


問了一連串的問題,會獲得一連串的答覆。不一樣於前面5種實現方式,如今的狀況是說有的消息處理都是併發的、異步的,handle方法只給你返回不是最終結果,而是表明結果的將來值,就好像有人告訴你「之後你能賺到一個億」同樣,是否是有點忽悠人?那麼何時能拿到結果?拿到怎樣的結果呢?真的「賺到一個億」?

這裏的futures是一個可索引的序列,類型是:IndexedSeq[Future[Option[Output]]],其中每一個元素是表明每一個輸入請求處理結果的將來值。

有兩種方式拿到結果:一種是傻傻地等待;另外一種是聰明的響應式思惟:「我不理你,但當你有結果的時候必定要告訴我!」。

具體的,用聰明的響應式思惟,就是對於每一個將來值,當它的任務完成的時候,若是成功,就打印處理結果,若是沒人處理的業務則不打印;若是一直等不到回覆,超時了,或者其餘緣由的問題,就會收到失敗的信息,並附帶異常(超時的異常爲AskTimeoutException),那麼咱們就打印異常堆棧信息。

futures foreach (_.onComplete {
 case Success(output) => output map (_.value) foreach println
 case Failure(exception) => exception printStackTrace
})

 

運行結果以下:(每次運行順序都不同)

Customer Service handles w
Customer Service handles a
Manager handles b
Customer Service handles a
Engineer handles c
Manager handles b
Manager handles z

  

處理結果不是順序的了,並且每次運行都不同,代表事件響應的過程是併發執行的。

5.2.8 併發世界也有秩序
若是咱們想讓運行結果有序,怎麼辦?這有何難,把序列futures經過Future類的sequence方法合併成一個將來值:

val mergedFuture = Future sequence futures

這個合併的將來值mergedFuture,類型是Future[IndexedSeq[Option[Output]]],即它只是一個Future,將來值的結果是一個可索引的序列。這個將來值mergedFuture,只有當全部的輸入請求所有處理完並拿到全部輸出結果時,把結果按照輸入請求的順序,可索引序列做爲結果輸出。咱們只要拿到這個結果就是有序的了。

前面第5.2.7節已經介紹過聰明的辦法,這裏不重複了,恰恰就用很傻很天真的辦法,傻傻地等待,阻塞當前線程,直到別人把全部結果都告訴你爲止。(固然也不會傻到等上一成天,其實等上5秒就足夠傻了,^_^)

val outputs = Await result (mergedFuture, 5 seconds)

output是咱們想要的有序結果,類型是IndexedSeq[Option[Output]],如今對於每一個投訴,有序地打印處理結果,若是沒人處理的業務則不打印:

outputs.flatten map (_.value) foreach println

輸出與輸入一樣順序的結果:

Engineer handles c
Customer Service handles a
Manager handles b
Customer Service handles a
Manager handles b
Customer Service handles w
Manager handles z

  

5.3 解法七:用RX(Reactive eXtension,響應式擴展)實現責任鏈模式
RX是基於事件流處理的響應式編程開源庫,目前已經有多種語言的實現,好比RxJava,RxScala(RxScala實際上是在RxJava基礎上增長了一層adapter,使API更友好)。關於RxJava能夠參考我另外幾篇文章:
(1)實驗驅動開發與響應式編程 —- File Watcher的技術實現
(2)性能優化:RxJava異步響應式編程提高響應速度
(3)基於RxJava實現事件總線

本文使用RxScala,須要在build.sbt增長依賴(相似Maven,SBT編譯時會從Maven中心倉庫遞歸下載依賴):

libraryDependencies += "io.reactivex" %% "rxscala" % "0.26.5"

 

對RX進行響應式編程,主要是對事件流(Observable)進行一連串響應,包括過濾,轉換,處理,等等操做,使其流向指望的目的地。咱們能夠把事件流(Observable)比做FP中的高階函數(見4.2節)。 

相似FP,對於客服,經理,工程師,他們響應輸入請求的過程是:爲輸入請求request構造事件流,而後過濾使得只有可以處理(canHandle)的事件經過,而後處理(handle)請求並返回帶有結果的事件。這個過程須要外部提供如下3個參數:輸入請求request,判斷是否可以處理的函數(canHandle),具體處理的函數(handle)。因而咱們能夠定義下面的三階函數(3個參數列表)來表示每一個結點的響應輸入請求的處理過程:

def handler(canHandle: Input => Boolean)(handle: Input => Output)(request: Input) = {
 Observable just request filter canHandle map handle
}

 

handler函數是這樣一個3階函數,它接受輸入request,構造出以Input爲消息的事件流Observable,事件流通過filter過濾,只讓那些canHandle事件日後流,接着事件流到map,經過handle把輸入消息Input轉化爲Output,而後把事件流做爲返回值流向函數的調用者,以便後續控制事件流的流向。

所以咱們能夠構造出每一個結點:客服,經理,工程師,他們的不一樣方式在於判斷是否可以處理的函數(canHandle),具體處理的函數(handle),咱們把這些不一樣的地方傳入上面的3階函數handler,獲得表明每一個處理結點的偏應用函數(見4.2節):

val customService = handler(canHandleByCustomService)(handleByCustomService) _
val manager = handler(canHandleByManager)(handleByManager) _
val engineer = handler(canHandleByEngineer)(handleByEngineer) _

 

上面customService,manager,engineer,已降爲一階的普通函數了,類型是:Input => Observable[Output],即輸入參數是輸入請求Input,輸出是帶有輸出信息Output的事件流。

將結點鏈起來,就能夠處理用戶投訴事件了。當客服不能處理request事件,就switch給經理,經理不能處理就switch給工程師。

def handle(request: Input): Observable[Output] = {
   customService(request) switchIfEmpty manager(request) switchIfEmpty    engineer(request)
}

 

輸入一連串業務,好比cafebabe = 「cafebabe wenzhe」,處理邏輯能夠用下面一條事件流描述,事件流最後流到print,把輸出值打印出來。

println("----- Rp2: RX Reactive Programming (single thread)-----")
Observable from cafebabe map (Input(_)) flatMap (Rp2 handle _) map (_.value) foreach println

 

咱們沒有對事件流進行異步處理,所以上面的處理過程是單線程的,輸出有序的結果:

----- Rp2: RX Reactive Programming (single thread)-----
Engineer handles c
Customer Service handles a
Manager handles b
Customer Service handles a
Manager handles b
Customer Service handles w
Manager handles z

 

咱們很容易把事件流的響應過程異步化,好比讓經理處理的串行事件流manager(request)流過subscribeOn操做符,能夠轉化爲異步事件流asyncHandleByManager:

val asyncHandleByManager = manager(request) subscribeOn ComputationScheduler()

subscribeOn方法後面接受線程池調度器,這裏用的ComputationScheduler使用的線程池裏的線程個數與計算機CPU的核數相同,你也能夠把它替換成你想要的。

相似的辦法能夠建立經理和工程師的異步事件流,再用switchIfEmpty操做符將它們連接起來構成異步的責任鏈:

def asyncHandle(request: Input): Observable[Output] = {
val asyncHandleByCustomService = customService(request) subscribeOn ComputationScheduler()
val asyncHandleByManager = manager(request) subscribeOn ComputationScheduler()
val asyncHandleByEngineer = engineer(request) subscribeOn ComputationScheduler()
asyncHandleByCustomService switchIfEmpty asyncHandleByManager switchIfEmpty asyncHandleByEngineer
}

 

輸入一樣一連串業務cafebabe = 「cafebabe wenzhe」,讓事件流流入能併發處理的asyncHandleByABC:

println("----- Rp2: RX Reactive Programming (multiple thread)-----")
Observable from cafebabe map (Input(_)) flatMap (Rp2 asyncHandle _) map (_.value) foreach println

運行結果以下:(每次運行順序都不同)

----- Rp2: RX Reactive Programming (multiple thread)-----
Customer Service handles a
Engineer handles c
Manager handles b
Manager handles b
Customer Service handles w
Customer Service handles a
Manager handles z

 

處理結果再也不是順序的了,並且每次運行都不同,代表事件響應的過程是併發執行的。

5.4 兩種響應式編程方式的比較
Actor處理可以在進程內使用,還能夠跨進程、跨機器、跨網絡,可以適用於分佈式計算;RX只能在進程內使用。
若是要串行(單線程)執行,或者單線程多線程切換,RX要比Actor更加方便。

從編程風格看,Actor更像面向對象OOP,須要定義一個Actor類,一個無需考慮線程安全問題的類;而RX更像是函數式編程,使用高階函數,經過流式處理,代碼可讀性更好。

 

6. 回顧與總結


因爲Scala語言簡潔易懂,讀起來類似天然語言,開發效率和運行效率都很高,並且支持多種編程範式(便是徹底面向對象語言,又是函數式編程語言),再加上響應式的Akka Actor、RxScala),所以本文采用Scala做爲例子代碼show給讀做,也順帶介紹了一些Scala語言特性,讀者若是熟悉Java 8,會很容易理解,由於Java 8不少特性都借鑑自scala。

本文描述了責任鏈模式的應用場景,而後給出7中不一樣風格的實現方式。第一種是傳統面向對象的基於繼承的實現方式;接着以聚合代替繼承,給出了第二種面向對象實現方式;接下來從OOP逐步過渡到FP,第三種實現就是混合了OOP和FP兩種範式的實現方式;接下來的第四種和第五種都是純FP實現,分別使用了偏應用函數和偏函數;從第一種到第五種風格的演化過程當中,代碼越來短,當可擴展能力和靈活性卻愈來愈好;接着介紹響應式思惟,以及兩種不一樣的實現,第六種實現是基於Actor模型,而第七種實現是基於事件流響應的流式處理,最後比較了這兩種響應式風格。在介紹這7種實現風格的過程當中,考慮到大多數程序員是面向對象出身,本文對函數式編程、響應式編程的概念進行了比較細緻的介紹。原文:https://blog.csdn.net/liuwenzhe2008/article/details/70199520

相關文章
相關標籤/搜索