既然你已經知道了什麼是 Rx,如今是開始建立和操做 sequences 的時候了。操做 sequences 的最初實現是基於c#linq,然後者則是從函數式編程中獲得啓發的。對於這些知識並非必要的,可是它會使讀者更容易學習。根據原始的www.introtorx.com,咱們也將操做劃分爲一般從簡單到更高級的主題。大多數 Rx 運算符操做現有序列。可是首先,咱們將看到如何建立一個 Observable 開始。java
在前面的示例中,咱們使用了 Subject 並手動將值推入其中以建立序列。咱們使用該序列演示了一些關鍵概念和第一個也是最重要的 Rx 方法--Subject。在大多數狀況下,Subject 並非創造一個新的 Observable 最佳方式。咱們如今將看到更整潔的方法來創造可觀察的序列。ios
Just方法建立一個 Observable,它將發出預先定義的值序列,在建立時提供,而後終止。git
輸出:程序員
observable 會調用單一的 onCompleted 方法,沒有其餘的。github
輸出:編程
這個 observable 不會輸出任何值c#
輸出:緩存
該 observable 會發出 error 事件和終止服務器
輸出:併發
defer 並不定義一種新的 observable ,而是容許您在每次 subscriber 到達時都聲明如何建立 observable。考慮如何建立一個 observable ,返回當前時間並終止。您正在發出一個值,所以它聽起來就像一個 just。
輸出:
注意 subscribers 分開1秒執行,可是看看同一時間。這是由於時間的值只請求了一次:當執行到達 just時。您想要的是當訂閱者經過 subscribing 請求時要從新定義的時間。延遲將執行一個函數來建立和返回 Observable 。函數返回 Observable 延遲的,這裏最重要的一點是,這個函數將再次執行每個新 subscription 。
creat 是一個很是強大的功能來建立 observables,讓咱們看看簽名。
static <T> Observable<T> create(Observable.OnSubscribe<T> f)
O使用nSubscribe<T>比它看起來簡單。它基本上是一個函數,對於T類型,它接接受 Subscriber<T> ,咱們能夠手動肯定推送事件到 subscriber 。
輸出:
當有人訂閱 observable (value)時,相應的 Subscriber 實例傳遞給您的函數。當代碼執行時,值被推送到訂 Subscriber。注意,若是您想要 sequence 來指示其完成,您要本身調用 onCompleted。
當現有的快速方法都不符合您的目的時,這個方法應該是您首選的建立自定義的 observable 。代碼相似於咱們如何建立一個 subject 並將 value 推送給它,可是有幾個重要的區別。首先,事件的源被整齊地封裝,並與無關的代碼分離。第二,subject 所包含的危險並不明顯:對於管理狀態的 subject ,任何訪問實例的人均可以將值推入其中並更改順序,稍後咱們會看到更多關於這個問題的內容。
使用 subject 的另外一個關鍵區別在於代碼是惰性地執行的,當 observer 訂閱時,代碼執行。在上面的示例中,當建立 observable 時,代碼並沒運行(由於還沒有訂閱服務器),可是每次調用訂閱都是這樣。這意味着每一個訂閱服務器都會再次生成每一個值,相似於ReplaySubject。最終結果相似於一個ReplaySubject,但沒有發生緩存。可是,若是咱們習慣使用使用 ReplaySubject,建立方法比較耗時,那麼就會阻塞執行建立的線程。您必須手動建立一個新線程來將值推送到 subject中。咱們尚未提出併發性的方法,可是也有一些方便的方法來並行執行中的函數。
您可能已經注意到,您可使用 Observable.create 來實現之前的任何一個 observables。實際上,咱們的建立示例至關於 Observable.just("hello")。
在函數式編程中,一般會建立長度不受限制或無限長的序列。RxJava有工廠方法來建立這樣的序列。
對程序員來講都是直接的和熟悉的方法,它發出指定的整數範圍。
Observable<Integer> values = Observable.range(10, 15);
這個例子的發出值的整數範圍 10 到24 之間。
此函數將建立一個無限的序列,以指定的持續時間分隔。
輸出:
直到咱們再也不 unsubscribe 啦,這個 sequence 纔會終止。
咱們應該注意爲何在結尾的 read()方法是必要的。沒有它,程序就會終止,而不會打印內容。這是由於咱們的操做是非阻塞的:咱們建立了一個 observable ,隨着時間的推移,咱們會發出值,而後,若是值到達的話,咱們會註冊執行這些操做。全部這些都不是阻塞的,主線程終止,計時器在本身的線程上運行,這並不阻止JVM終止。
對於Observable.timer有兩個重載。第一個示例建立一個observable 等待給定時間,而後發出 0L 並終止。
輸出:
第二個會等待必定的時間,而後以給定的頻率開始像間隔同樣發出。
輸出:
上面的例子等待兩秒後,每秒開始計數。
與咱們迄今所看到的大部分功能同樣,您能夠將任何類型的輸入使用 create 轉換爲Rx observable。
Future 是 java 框架的功能,當你使用併發性的框架的時候可能使用他們。它們是一個和 RX 差很少強大的併發概念,由於它們只返回一個值。天然地,您可能但願將它們變成 observables 。
輸出:
observable 在可用時發出futureTask 的結果,而後終止。若是任務被取消,可觀察到的將發出一個java.util.concurrent.CancerationException error。
若是您對 Future 的結果返回時間限制感興趣,您能夠提供這樣的超時機制。
Observable<Integer> values = Observable.from(f, 1000, TimeUnit.MILLISECONDS);
若是 Future 沒有在指定的時間內完成,observable 將忽略它,以 TimeoutException 失敗。
您還可使用 Observable.from 將任何集合轉換成 observable,這樣每一個值都會發出,執行最後的 onCompleted 事件。
輸出:
Observable 不是能夠用Iterable或stream 替換的。Observables 是基於 push-based, 也就是說,調用onNext 會致使處理程序堆棧所有執行到最終訂閱服務器方法(除非另有指定)。其餘模型是基於 pull-based,這意味着在返回結果以前,儘量快地請求值和執行塊。
參考:
https://github.com/Froussios/Intro-To-RxJava/blob/master/Part%202%20-%20Sequence%20Basics/1.%20Creating%20a%20sequence.md