Flutter | 狀態管理拓展篇——RxDart(四)

前言

在前一篇文章向你們介紹了一種新的狀態管理方式——BLoC,它在分離咱們的ui邏輯與業務邏輯上表現十分優秀。可是在最後咱們發現了一個問題。react

bloc是一個典型的觀察者模式,咱們以counter bloc舉例,在A,B頁面都存在觀察者,它們監聽的是同一個廣播流,當咱們pop B頁面,回到A頁面這個操做不會出現任何問題,而當咱們再次進入B頁面的時候卻發現,它顯示了初始值0,而不是咱們想要的value,只有等咱們再次按下按鈕時,它才能刷新得到實際的value。

Stream很棒,可是還不夠強大git

因此今天要給你們簡單介紹下ReactiveX的dart 實現——RxDart,它極大的擴展了Stream的功能,可以讓咱們在使用bloc的時候更加遊刃有餘。github

在正式開始介紹前,我但願您已經閱讀並理解了stream的相關知識,後面的內容都基於此。若是您還未了解過dart:stream 的話,我建議您先閱讀這篇文章:Dart:什麼是Streamapi

RxDart

ReactiveX是什麼

ReactiveX是一個強大的庫,用於經過使用可觀察序列來編寫異步基於事件的程序。它突破了語言平臺的限制,讓咱們編寫異步程序就像在自家花園散步那樣 easy。我相信你必定會愛上它!緩存

基本概念

Dart:什麼是Stream這篇文章中,我用到一個模型來理解stream裏面到底發生了什麼。今天咱們仍是利用這個模型來看看,在rxdart中它是什麼樣的。app

這個模式的關鍵思惟在於觀察者的無狀態。咱們平時調用方法的時候必定是很清楚咱們何時調用,並馬上會返回一個預想的結果。

可是在這裏,咱們中間進行處理的時候,徹底是處於異步狀態的,也就是說沒法馬上返回一個值。咱們不知道stream何時會「吐」出處理結果,因此必需要一個觀察者來守着這個出口。異步

當有事件/數據流出時,觀察者捕捉到了這個事件並解析處理。ide

  • Subject實現並擴展了StreamController,它符合StreamController的全部規範。假如您以前使用的StreamController,那麼你能夠直接替換爲Subject。你能夠把它想像成streamController。
  • Observable實現並擴展了Stream。它將經常使用的stream和streamTransformer組合成了很是好用的api。你能夠把它想像成stream。

可觀察對象——Observable

建立Observavle

你能夠把stream直接包裝成Observablepost

var obs = Observable(Stream.fromIterable([1,2,3,4,5]));
  
  obs.listen(print);
複製代碼

輸出:1 2 3 4 5ui

經過Future建立:fromFuture

var obs = Observable.fromFuture(new Future.value("Hello"));
 
  obs.listen(print); 
複製代碼

輸出:Hello

經過Iterable建立:fromIterable

var obs = Observable.fromInterable([1,2,3,4,5]);

obs.listen(print);
複製代碼

輸出:1 2 3 4 5

讓流的「吐」出間隔一段時間:interval

interval方法可以讓流「吐出數據」後間隔一段時間再吐下一個數據。

var obs = Observable(Stream.fromIterable([1,2,3,4,5]))
    .interval(new Duration(seconds: 1));

  obs.listen(print);
複製代碼

輸出:1 ... 2 ... 3 ... 4 ... 5

其中...表明停頓了一秒。

迭代地處理數據:map

map方法可以讓咱們迭代的處理每個數據並返回一個新的數據

var obs = Observable(Stream.fromIterable([1,2,3,4,5]))
    .map((item)=>++item);
    
obs.listen(print);
複製代碼

輸出:2 3 4 5 6

擴展流:expand

expand方法可以讓咱們把把每一個item擴展至多個流

var obs = Observable(Stream.fromIterable([1,2,3,4,5]))
   .expand((item)=> [item,item.toDouble()]);

 obs.listen(print);
複製代碼

輸出:1 1.0 2 2.0 3 3.0 4 4.0 5 5.0

這裏咱們將每一個數據擴展成【item,item.toDouble】你能夠擴展成任意組的流。假如這是一個廣播Observable,並被屢次收聽,那麼他能夠單獨調用expand並擴展本身。

合併流:merge

merge方法可以讓咱們合併多個流,請注意輸出。

var obs = Observable.merge([
    Stream.fromIterable([1,2,3]),
    Stream.fromIterable([4,5,6]),
    Stream.fromIterable([7,8,9]),
  ]);

  obs.listen(print);
複製代碼

輸出:1 4 7 2 5 8 3 6 9

順序執行多個流:concat

concat方法可以讓咱們按照順序執行一組流,當一組流執行完畢後,再開始執行下一組。

var obs = Observable.concat([
    Stream.fromIterable([1,2,3]),
    Stream.fromIterable([4,5,6]),
    Stream.fromIterable([7,8,9]),
  ]);

  obs.listen(print);
複製代碼

輸出:1 2 3 4 5 6 7 8 9

檢查每個item:every

every會檢查每一個item是否符合要求,而後它將會返回一個可以被轉化爲 Observable 的 AsObservableFuture< bool>。

var obs = Observable.fromIterable([1,2,3,4,5]);

  obs.every((x)=> x < 10).asObservable().listen(print);
複製代碼

輸出結果:true

關於Observable你還須要知道這些

  • Dart中 Observables 默認是單一訂閱。若是您嘗試兩次收聽它,則會拋出 StateError 。你可使用工廠方法或者 asBroadcastStream 將其轉化爲多訂閱流。
var obs = Observable(Stream.fromIterable([1,2,3,4,5])).asBroadcastStream();
複製代碼
  • 不少方法的返回值並非一個 Single 也不是一個 Observable 而是必須返回一個Dart的 Future。幸運的是你很容易找到一些方法,把他們轉化成回 stream
  • 出現錯誤時,Dart中的Stream不會默認關閉。可是在Rxdart中,Error會致使Observable終止,除非它被運算符攔截。
  • 默認狀況下Dart中Stream是異步的,而Observables默認是同步的。
  • 在處理多訂閱Observable的時候,onListen方法只有在第一次會被調用。且各個訂閱者之間不會互相干涉。
var obs = Observable(Stream.fromIterable([1,2,3,4,5])).asBroadcastStream();

//第一個訂閱者
  obs.interval(Duration(seconds: 1)).map((item) => ++item).listen(print);
//第二個訂閱者
  obs.listen(print);
複製代碼

輸出:1 2 3 4 5 2 3 4 5 6

以上是一些比較常見的Observable的使用方法,它並不完整,我將會在之後持續的更新這篇文章,並完整介紹它的功能

加強版StreamController——Subject

普通廣播流控制器:PublishSubject

PublishSubject就是一個普通廣播版StreamController,你能夠屢次收聽,默認是sync是false,也就是說裏面是一個AsyncBroadcastStreamController 異步廣播流。

緩存最新一次事件的廣播流控制器:BehaviorSubject

BehaviorSubject也是一個廣播流,可是它能記錄下最新一次的事件,並在新的收聽者收聽的時候將記錄下的事件做爲第一幀發送給收聽者。

還記得咱們文章開頭的那個小問題嗎?在B頁面從新收聽的時候,獲取不到最新的事件,必須等咱們從新觸發流才能夠獲得正確的值。

我發誓我絕對不是爲了湊篇幅🤣

ok,咱們如今用BehaviorSubject替換掉咱們的StreamCroller

//var _countController = StreamController.broadcast<int>();

var _subject = BehaviorSubject<int>();
複製代碼

真的就是這麼簡單,無縫替換😆

代碼已上傳github,讓咱們來看看效果

再來看兩個例子,相信你們會對BehaviorSubject理解更深入

例1

final subject = new BehaviorSubject<int>();

  subject.add(1);
  subject.add(2);
  subject.add(3);

  subject.stream.listen(print); // prints 3
  subject.stream.listen(print); // prints 3
  subject.stream.listen(print);
複製代碼

輸出:3 3 3

因爲咱們在add(3)以後纔開始收聽,因此將會收到最新的value。

例2

final subject = new BehaviorSubject<int>(seedValue: 1);

  subject.stream.listen(print); // prints 1
  subject.stream.listen(print); // prints 1
  subject.stream.listen(print);
複製代碼

輸出:1 1 1

seedValue做爲初始值,在後面有收聽者的時候一樣會把它當成最後一次的value發送給收聽者。

緩存更多事件的廣播流控制器:ReplaySubject

ReplaySubject可以緩存更多的值,默認狀況下將會緩存全部值,並在新的收聽的時候將記錄下的事件做爲第一幀發送給收聽者。

final subject = ReplaySubject<int>();

  subject.add(1);
  subject.add(2);
  subject.add(3);
  
  subject.stream.listen(print); // prints 1
  subject.stream.listen(print); // prints 1
  subject.stream.listen(print);
複製代碼

輸出:1 1 1 2 2 2 3 3 3

你還能夠經過maxSize控制緩存個數

final subject = ReplaySubject<int>(maxSize: 2);

  subject.add(1);
  subject.add(2);
  subject.add(3);

  subject.stream.listen(print); // prints 1
  subject.stream.listen(print); // prints 1
  subject.stream.listen(print);
複製代碼

輸出:2 2 2 3 3 3

自定義你的Subject

你能夠經過自定義一個新的subject繼承至Subject類來得到更加個性化的功能。這裏就不舉栗子了。😝

Subject的釋放

當你再也不收聽Subject,或者Subject再也不使用時,請務必釋放它。你能夠調用subscription的cancel()方法讓某個聽衆取消收聽,或者Subject.close(),關閉整個流。

瞭解更多

下面有一些優秀的文章可以給您更多參考

寫在最後

以上即是RxDart篇的所有內容,它只是介紹了部分RxDart的功能,我在以後會逐漸完善它,最終整理完整。

RxDart十分強大,它讓你在處理大量異步事件的時候感受很是溫馨。我相信每個開發者在瞭解過它以後必定會喜歡上這個好用的庫。

若是你在使用rxdart時候有任何好的idea,或是query,歡迎在下方評論區以及個人郵箱1652219550a@gmail.com留言,我會在24小時內與您聯繫!

下一篇文章將會是flutter狀態管理總結篇,敬請關注。

相關文章
相關標籤/搜索