加入如下依賴到項目中:html
到目前爲止,咱們看到的會話模式很簡單,由於它們要求Actor保持不多或沒有狀態。java
在這一部分中,咱們將使用一個更復雜的例子。因爲房主會對他們家中的溫度感興趣,咱們的目標是可以查詢一組中的全部設備Actors。咱們首先來研究一下這樣的查詢API應該如何展示。git
咱們面臨的第一個問題是,一個羣體的成員資格是動態的。每一個傳感器設備由一個能夠隨時中止的Actor表示。在查詢開始時,咱們能夠向全部現有設備actor詢問當前溫度。可是,在查詢的生命週期中:github
這些問題能夠經過許多不一樣的方式解決,但重要的是要解決所指望的行爲。如下適用於咱們的用例:安全
除了設備Actor動態來來每每,一些Actor可能須要很長時間才能回答。例如,它們可能會陷入意外的無限循環中,或者由於錯誤而失敗並放棄咱們的請求。咱們不但願查詢無限期地繼續,所以咱們將在如下任一狀況下認爲它是完整的:網絡
鑑於這些決定,以及快照中的設備可能剛剛啓動但還沒有接收到記錄溫度的事實,咱們能夠針對溫度查詢爲每一個設備actor定義四種狀態:數據結構
在消息類型中總結這些,咱們能夠將如下內容添加到DeviceGroup:ide
實現查詢的一種方法涉及將代碼添加到組設備actor。然而,在實踐中,這可能很是麻煩且容易出錯。請記住,當咱們開始查詢時,咱們須要拍攝存在的設備的快照並啓動計時器,以便咱們能夠強制執行截止日期。與此同時,另外一個查詢能夠到達。對於第二個查詢,咱們須要跟蹤徹底相同的信息,但與先前的查詢隔離。這將要求咱們在查詢和設備角色之間維護單獨的映射。工具
相反,咱們將實施一種更簡單,更優越的方法。咱們將建立一個表明單個查詢的actor,並表明group actor執行完成查詢所需的任務。到目前爲止,咱們已經建立了屬於經典域對象的actor,但如今,咱們將建立一個表明進程或任務而不是實體的actor。咱們經過保持咱們的組設備行爲者簡單而且可以更好地單獨測試查詢功能而受益。
首先,咱們須要設計查詢actor的生命週期。這包括肯定其初始狀態,將採起的第一個操做以及清理 - 若有必要。查詢actor須要如下信息:
調度查詢超時
因爲咱們須要一種方法來代表咱們願意等待多長時間的響應,如今是時候引入一個咱們還沒有使用的新Akka功能,即內置的調度程序工具。使用調度程序很簡單:
咱們須要建立一個表示查詢超時的消息。爲此,咱們建立了一個簡單的消息CollectionTimeout,沒有任何參數。scheduleOnce的返回值是一個可取消的,若是查詢在時間上成功完成,可用於取消計時器。在查詢開始時,咱們須要向每一個設備參與者詢問當前溫度。爲了可以快速檢測在得到ReadTemperature消息以前中止的設備,咱們還將觀察每一個actor。這樣,咱們得到了在查詢生命週期內中止的那些消息的終止消息,所以咱們不須要等到超時才能將這些消息標記爲不可用。
把它們放在一塊兒,咱們的DeviceGroupQuery actor的輪廓以下所示:
除了掛起的計時器以外,查詢actor還有一個有狀態方面,跟蹤已經回覆,已經中止或沒有回覆的actor集。跟蹤此狀態的一種方法是在actor中建立可變字段。一種不一樣的方法利用了改變演員如何響應消息的能力。Receive只是一個能夠從另外一個函數返回的函數(或者一個對象,若是你願意)。默認狀況下,接收塊定義actor的行爲,但能夠在actor的生命週期內屢次更改它。咱們調用context.become(newBehavior),其中newBehavior是任何類型爲Receive的東西。咱們將利用此功能來跟蹤演員的狀態。
對於咱們的用例:
咱們不是直接定義receive,而是委託waitingForReplies函數來建立Receive。
waitingForReplies函數將跟蹤兩個不斷變化的值:已收到回覆的Map,咱們還在等待的Actor Set。
咱們有三個Event能夠採用:
在前兩種狀況下,咱們須要跟蹤回覆,咱們如今委託給一個方法receivedResponse,咱們將在後面討論。在超時的狀況下,咱們須要簡單地接受還沒有回覆的全部actor(該組的成員仍然等待),並將DeviceTimedOut做爲最終回覆中的狀態。而後咱們用收集的結果回覆查詢的提交者並中止查詢actor。
爲此,請將如下內容添加到DeviceGroupQuery源文件中:
目前尚不清楚咱們將如何「改變」repliesSoFar和仍然等待數據結構。須要注意的一件重要事情是waitForReplies函數不直接處理消息。它返回一個將處理消息的Receive函數。這意味着若是咱們再次使用不一樣的參數調用waitingForReplies,那麼它將返回一個全新的Receive,它將使用這些新參數。
咱們已經看到了如何經過從接收返回來安裝初始接收。例如,爲了安裝新的回覆,咱們須要一些機制。這個機制是方法context.become(newReceive),它將actor的消息處理函數改成提供的newReceive函數。你能夠想象,在開始以前,你的actor會自動調用context.become(receive),即安裝從receive返回的Receive函數。這是另外一個重要的觀察:它不是處理消息的接收,它返回一個實際處理消息的Receive函數。
咱們如今必須弄清楚在receivedResponse中要作什麼。首先,咱們須要在地圖repliesSoFar中記錄新結果,並從stillWaiting中刪除actor。下一步是檢查咱們還在等待剩下的演員。若是沒有,咱們將查詢結果發送給原始請求者並中止查詢actor。不然,咱們須要更新repliesSoFar和stillWaiting結構並等待更多消息。
在以前的代碼中,咱們將Terminated視爲隱式響應DeviceNotAvailable,所以receivedResponse不須要作任何特殊的事情。可是,咱們仍然須要完成一項小任務。咱們可能會從設備actor收到適當的響應,但隨後會在查詢的生命週期內中止響應。咱們不但願第二個事件覆蓋已經收到的回覆。換句話說,咱們不但願在記錄響應後收到Terminated。經過調用context.unwatch(ref)能夠很容易地實現。此方法還確保咱們不會收到已存在於actor的郵箱中的已終止事件。屢次調用也是安全的,只有第一次調用纔會有效,其他的都會被忽略。
有了這些知識,咱們能夠建立receivedResponse方法:
在這一點上,咱們很天然地經過使用context.become()技巧得到了什麼,而不是使repliesSoFar和stillWaiting結構成爲actor的可變字段(即vars)?在這個簡單的例子中,並無那麼多。當你忽然有更多種類的狀態時,這種狀態保持的價值變得更加明顯。因爲每一個狀態可能具備相關的臨時數據,所以將這些數據保持爲字段會污染參與者的全局狀態,即不清楚在什麼狀態下使用哪些字段。使用參數化接收「工廠」方法,咱們能夠保持僅與狀態相關的數據私有。使用可變字段而不是context.become()重寫查詢仍然是一個很好的練習。可是,建議您熟悉咱們在此處使用的解決方案,由於它有助於以更清晰,更易於維護的方式構建更復雜的actor代碼。
咱們的查詢Actor如今完成了:
如今讓咱們驗證查詢actor實現的正確性。咱們須要單獨測試各類場景,以確保一切按預期工做。爲了可以作到這一點,咱們須要以某種方式模擬設備演員以執行各類正常或失敗場景。值得慶幸的是,咱們將協做者列表(其實是一個Map)做爲查詢參與者的參數,所以咱們能夠傳入TestKit引用。在咱們的第一次測試中,當有兩個設備而且都報告溫度時,咱們會嘗試這種狀況:
這是一個幸福的案例,但咱們知道有時設備沒法提供溫度測量。此方案與上一個略有不一樣:
咱們也知道,有時設備Actor會在回答以前中止:
若是你還記得,還有另外一個與設備演員中止相關的案例。咱們可能會從設備actor得到正常回復,但以後會收到同一個actor的Terminated。在這種狀況下,咱們但願保留第一個回覆,而不是將設備標記爲DeviceNotAvailable。咱們也應該測試一下:
最後一種狀況是並不是全部設備都能及時響應。爲了使咱們的測試保持相對較快,咱們將構造具備較小超時的DeviceGroupQuery actor
咱們的查詢如今按預期工做,如今是時候在DeviceGroup actor中包含這個新功能了。
在組actor中包含查詢功能如今至關簡單。咱們在查詢actor自己中完成了全部繁重工做,組actor只須要使用正確的初始參數建立它,而不須要其餘任何東西。
重申咱們在本章開頭所說的內容多是值得的。經過在單獨的actor中保持僅與查詢自己相關的臨時狀態,咱們保持組actor實現很是簡單。它將一切都委託給兒童演員,所以沒必要保持與其核心業務無關的狀態。此外,多個查詢如今能夠彼此並行運行,事實上,根據須要能夠多個查詢。在咱們的狀況下,查詢單個設備actor是一種快速操做,但若是不是這種狀況,例如,由於須要經過網絡聯繫遠程傳感器,這種設計將顯着提升吞吐量。
咱們經過測試一切能夠一塊兒工做來結束本章。此測試是之前測試的變體,如今正在執行組查詢功能:
在物聯網系統的背景下,本指南介紹瞭如下概念。若有必要,您能夠按照連接進行審覈:
下節再續!
原文:https://doc.akka.io/docs/akka/2.5/guide/tutorial_5.html
有什麼討論的內容,能夠加我公衆號: