monad(https://en.wikipedia.org/wiki/Monad_%28functional_programming%29)是一個從函數式編程中比較抽象的一個概念,可能對不少程序員來講並不熟悉。它超出了本指南的範圍,在www.introtorx.com中,咱們能夠找到一個簡短的定義:html
Monads are a kind of abstract data type constructor that encapsulate program logic instead of data in the domain model.java
Monads 是咱們感興趣的,由於 observable 是 Monad。RX代碼聲明須要作什麼,但實際處理不是在執行Rx語句時發生的,而是在發出值時發生的。讀者可能會發現閱讀更多關於 monads 的內容是頗有趣的。對於本指南,當說起Monad時,讀者只需考慮 observable。react
想要離開Monad有兩個主要緣由,第一個緣由是,一個新的Rx開發人員仍然會更習慣於更傳統的範例。在不一樣的範例中執行部分計算可能會使某些部分工做,而您仍在研究如何使用Rx進行操做。第二個緣由是,咱們一般與沒有考慮到Rx的組件和庫進行交互。當將現有代碼重構爲Rx時,讓Rx以阻塞方式運行多是有用的。ios
以分塊方式從可觀察到的對象中獲取數據,第一步是轉換爲 BlockingObservable.。任何可觀察的對象均可以經過如下兩種方式轉換爲 BlockingObservable:您可使用可觀察對象的 toBlocking 方法。git
或者BlockingObservable的靜態工廠方法程序員
BlockingObservable 沒有繼承 Observable,也不能與咱們一般的Rx操做符一塊兒使用。它有本身的小型函數方法的實現,容許您以阻塞的方式從可觀察到的數據中提取數據。這些方法中的許多都是與咱們已經看到的方法相對應的阻塞方法。github
Observable 有一個 forEach 方法,forEach 被定義爲 subscribe 的別名,主要區別是它不返回 Subscription。來看一個例子編程
輸出:api
這裏的代碼的行爲與 subscribe 相似。首先註冊一個觀察者(forEach接受觀察者沒有重載,但語義是相同的)。而後執行繼續打印「Subscribed」並退出咱們的代碼段。當發出值(第一個延遲100 ms)時,它們被傳遞給咱們的觀察者進行處理。緩存
BlockingObservable 沒有 subscribe 方法,可是有 forEach,讓咱們用 BLockingObservable 改造一下:
輸出:
咱們在這裏看到,對每一個對象的調用被阻塞,直到可觀察到的任務完成。另外一個區別是沒有onError和onCompleted的處理程序。onCompleted是在執行完成時給定的,而異常將被拋入運行時進行捕獲:
輸出:
BlockingObservable 擁有 first ,last 和 single 方法,也實現了firstOrDefault,lastOrDefault和singleOrDefault。在 Observable 中閱讀了它們的 namesakes 以後,您已經知道返回的值是什麼。一樣,不一樣的是方法的阻塞性質。它們不返回可觀察到的值,當值可用時,它就會發出該值。相反,它們會阻塞,直到值可用並返回值自己,而不返回周圍可見的值。
輸出:
正如咱們所看到的,調用首先被阻塞,直到一個值可用,而後才返回一個值。
與forEach同樣,異常也會在運行時中拋出以供捕獲。
輸出:
您能夠經過BlockingViewable上的各類方法將您的可觀察性轉換爲可迭代性。Iterable是基於拉的,與rx不一樣的是,rx是基於推的。這意味着,當使用者準備使用一個值時,會在可迭代的Iterator上使用Next()請求一個值。對Next()的調用要麼當即返回一個值,要麼阻塞,直到一個值準備好爲止。
有幾種方法能夠從BlockingViewable<T>到Iterable<T>,而且每種方法都有不一樣的行爲。
在此實現中,將收集和緩存全部發出的值。因爲緩存,不會漏掉任何項目。迭代器將盡快獲取下一個值,若是下一個值已經發生,則當即獲取,或者阻塞,直到下一個值可用爲止。
輸出:
值得注意的是,在下一個通知可用以前,應該使用hasNext()或Next()塊。若是完成,hasNext返回false,next拋出java.util.NoSuchElementException
在這個實現中,根本不緩存值。迭代器將始終等待下一個值並返回該值。
輸出:
在本例中,使用者比生產者慢,而且老是錯過下一個值。迭代器將得到下一個迭代器。
latest 方法相似於Next,不一樣之處在於它將緩存一個值。迭代器只會在自上一個值被使用後可觀察到的事件沒有發出時阻塞。只要有新的事件發生,迭代器就會當即返回一個值,或者隨着迭代的結束而返回。
輸出:
使用 latest 迭代器時,若是在發出下一個事件以前沒有提取值,則會跳過這些值。若是使用者比生產者快,迭代器將阻塞並等待下一個值。
有趣的是,4從未被消耗掉。那是由於一個onCompleted緊隨其後,致使下一個拉看到一個終止的可觀察到的。隱式iterator.hasNext()方法報了結止的可觀察值,而不檢查是否使用了最後一個值。
mostRecent 迭代器從不阻塞。它緩存這個值,所以,若是使用者速度慢,則能夠跳過值。與最新版本不一樣,最後一個緩存的值老是返回,若是使用者的速度快於生產者,就會致使重複。爲了使最近的迭代器徹底無阻塞,須要一個初始值.。若是可觀察到的對象還沒有發出任何值,則返回該值。
輸出:
BlockingObservable<T>能夠用 toFuture 方法表示爲 Future 。此方法僅建立Future的實例,而不阻止。獲取值時根據須要執行塊。Future 容許使用者決定如何處理異步操做。Future 還可以報告操做中的錯誤
輸出:
以這種方式建立的Future 但願observable 發出單個值,就像單個方法所作的那樣。若是發出多個項,Future將報告java.lang.IllegalArgumentException。
到目前爲止,咱們可以忽略潛在的死鎖。Rx的非阻塞性使得建立沒必要要的死鎖變得更加困難。然而,在本章中,咱們又回到了阻塞方法,從而將死鎖再次提到了最前沿。
下面的例子能夠做爲一個非阻塞案例。可是由於咱們使用了阻塞操做,因此它永遠不會解除阻塞。
輸出:
forEach僅在序列終止後返回。可是,終止事件要求Each在被推送以前返回。所以,每一個人都將永遠不會解除封鎖。
訪問可觀測數據的一些阻塞方法,如 last()
,要求被觀測到的對象終止並解除阻塞。其餘的,好比first(),要求它至少發出一個事件來解除阻塞。將這些方法用於可觀察性並非很大的危險,由於它們只返回一個不可終止的可觀察性.。若是使用者沒有花時間強制執行一些保證,例如超時,BlockingViewable上的這些相同的方法可能會致使一個永久的塊(咱們將看到在[Timeshifter sequences](/Part 3 - Taming the sequence/5. Time-shifted sequences.md)。
原文連接:
https://github.com/Froussios/Intro-To-RxJava/blob/master/Part%203%20-%20Taming%20the%20sequence/2.%20Leaving%20the%20monad.md