RxJava 學習筆記 <四> Sequence 基礎

 Sequence basics

既然你已經知道了什麼是 Rx,如今是開始建立和操做 sequences 的時候了。操做 sequences 的最初實現是基於c#linq,然後者則是從函數式編程中獲得啓發的。對於這些知識並非必要的,可是它會使讀者更容易學習。根據原始的www.introtorx.com,咱們也將操做劃分爲一般從簡單到更高級的主題。大多數 Rx 運算符操做現有序列。可是首先,咱們將看到如何建立一個 Observable 開始。java

Creating a sequence

在前面的示例中,咱們使用了 Subject 並手動將值推入其中以建立序列。咱們使用該序列演示了一些關鍵概念和第一個也是最重要的 Rx 方法--Subject。在大多數狀況下,Subject 並非創造一個新的 Observable 最佳方式。咱們如今將看到更整潔的方法來創造可觀察的序列。ios

Simple factory methods

Observable.just

Just方法建立一個 Observable,它將發出預先定義的值序列,在建立時提供,而後終止。git

輸出:程序員

Observable.empty

observable 會調用單一的 onCompleted 方法,沒有其餘的。github

輸出:編程

Observable.never

這個 observable 不會輸出任何值c#

輸出:緩存

Observable.error

該 observable 會發出 error 事件和終止服務器

輸出:併發

Observable.defer

defer 並不定義一種新的 observable ,而是容許您在每次 subscriber 到達時都聲明如何建立 observable。考慮如何建立一個 observable ,返回當前時間並終止。您正在發出一個值,所以它聽起來就像一個 just。

輸出:

注意 subscribers 分開1秒執行,可是看看同一時間。這是由於時間的值只請求了一次:當執行到達 just時。您想要的是當訂閱者經過 subscribing 請求時要從新定義的時間。延遲將執行一個函數來建立和返回 Observable 。函數返回 Observable 延遲的,這裏最重要的一點是,這個函數將再次執行每個新 subscription 。

Observable.create

creat 是一個很是強大的功能來建立 observables,讓咱們看看簽名。

static <T> Observable<T> create(Observable.OnSubscribe<T> f)

O使用nSubscribe<T>比它看起來簡單。它基本上是一個函數,對於T類型,它接接受 Subscriber<T> ,咱們能夠手動肯定推送事件到 subscriber 。

輸出:

當有人訂閱 observable (value)時,相應的 Subscriber 實例傳遞給您的函數。當代碼執行時,值被推送到訂 Subscriber。注意,若是您想要 sequence 來指示其完成,您要本身調用 onCompleted。

當現有的快速方法都不符合您的目的時,這個方法應該是您首選的建立自定義的 observable 。代碼相似於咱們如何建立一個 subject 並將 value 推送給它,可是有幾個重要的區別。首先,事件的源被整齊地封裝,並與無關的代碼分離。第二,subject 所包含的危險並不明顯:對於管理狀態的 subject ,任何訪問實例的人均可以將值推入其中並更改順序,稍後咱們會看到更多關於這個問題的內容。

使用 subject 的另外一個關鍵區別在於代碼是惰性地執行的,當 observer 訂閱時,代碼執行。在上面的示例中,當建立 observable 時,代碼並沒運行(由於還沒有訂閱服務器),可是每次調用訂閱都是這樣。這意味着每一個訂閱服務器都會再次生成每一個值,相似於ReplaySubject。最終結果相似於一個ReplaySubject,但沒有發生緩存。可是,若是咱們習慣使用使用 ReplaySubject,建立方法比較耗時,那麼就會阻塞執行建立的線程。您必須手動建立一個新線程來將值推送到 subject中。咱們尚未提出併發性的方法,可是也有一些方便的方法來並行執行中的函數。

您可能已經注意到,您可使用 Observable.create 來實現之前的任何一個 observables。實際上,咱們的建立示例至關於 Observable.just("hello")。

Functional unfolds

在函數式編程中,一般會建立長度不受限制或無限長的序列。RxJava有工廠方法來建立這樣的序列。

Observable.range

對程序員來講都是直接的和熟悉的方法,它發出指定的整數範圍。

Observable<Integer> values = Observable.range(10, 15);

這個例子的發出值的整數範圍 10 到24 之間。

Observable.interval

此函數將建立一個無限的序列,以指定的持續時間分隔。

輸出:

直到咱們再也不 unsubscribe 啦,這個 sequence 纔會終止。

咱們應該注意爲何在結尾的 read()方法是必要的。沒有它,程序就會終止,而不會打印內容。這是由於咱們的操做是非阻塞的:咱們建立了一個 observable ,隨着時間的推移,咱們會發出值,而後,若是值到達的話,咱們會註冊執行這些操做。全部這些都不是阻塞的,主線程終止,計時器在本身的線程上運行,這並不阻止JVM終止。

Observable.timer

對於Observable.timer有兩個重載。第一個示例建立一個observable 等待給定時間,而後發出 0L 並終止。

輸出:

第二個會等待必定的時間,而後以給定的頻率開始像間隔同樣發出。

輸出:

上面的例子等待兩秒後,每秒開始計數。

Observable.from

與咱們迄今所看到的大部分功能同樣,您能夠將任何類型的輸入使用 create 轉換爲Rx observable。

Future 是 java 框架的功能,當你使用併發性的框架的時候可能使用他們。它們是一個和 RX 差很少強大的併發概念,由於它們只返回一個值。天然地,您可能但願將它們變成 observables 。

輸出:

observable 在可用時發出futureTask 的結果,而後終止。若是任務被取消,可觀察到的將發出一個java.util.concurrent.CancerationException error。

若是您對 Future 的結果返回時間限制感興趣,您能夠提供這樣的超時機制。

Observable<Integer> values = Observable.from(f, 1000, TimeUnit.MILLISECONDS);

若是 Future 沒有在指定的時間內完成,observable 將忽略它,以 TimeoutException 失敗。

您還可使用 Observable.from 將任何集合轉換成 observable,這樣每一個值都會發出,執行最後的 onCompleted 事件。

輸出:

Observable 不是能夠用Iterable或stream 替換的。Observables 是基於 push-based, 也就是說,調用onNext 會致使處理程序堆棧所有執行到最終訂閱服務器方法(除非另有指定)。其餘模型是基於  pull-based,這意味着在返回結果以前,儘量快地請求值和執行塊。

參考:

https://github.com/Froussios/Intro-To-RxJava/blob/master/Part%202%20-%20Sequence%20Basics/1.%20Creating%20a%20sequence.md

相關文章
相關標籤/搜索