做爲系列文章的第十一篇,本篇將很是全面帶你瞭解 Flutter 中最關鍵的設計之一,深刻原理幫助你理解 Stream 全家桶,這也許是目前 Flutter 中最全面的 Stream 分析了。git
前文:github
Stream
在 Flutter 是屬於很是關鍵的概念,在 Flutter 中,狀態管理除了 InheritedWidget
以外,不管 rxdart
,Bloc
模式,flutter_redux
,fish_redux
都離不開 Stream
的封裝,而事實上 Stream
並非 Flutter 中特有的,而是 Dart 中自帶的邏輯。redux
通俗來講,Stream
就是事件流或者管道,事件流相信你們並不陌生,簡單的說就是:基於事件流驅動設計代碼,而後監聽訂閱事件,並針對事件變換處理響應。緩存
而在 Flutter 中,整個 Stream
設計外部暴露的對象主要以下圖,主要包含了 StreamController
、Sink
、Stream
、StreamSubscription
四個對象。bash
以下代碼所示,Stream
的使用並不複雜,通常咱們只須要:異步
StreamController
,StreamSink
用作事件入口,Stream
對象用於監聽,StreamSubscription
管理事件訂閱,最後在不須要時關閉便可,看起來是否是很簡單?class DataBloc {
///定義一個Controller
StreamController<List<String>> _dataController = StreamController<List<String>>();
///獲取 StreamSink 作 add 入口
StreamSink<List<String>> get _dataSink => _dataController.sink;
///獲取 Stream 用於監聽
Stream<List<String>> get _dataStream => _dataController.stream;
///事件訂閱對象
StreamSubscription _dataSubscription;
init() {
///監聽事件
_dataSubscription = _dataStream.listen((value){
///do change
});
///改變事件
_dataSink.add(["first", "second", "three", "more"]);
}
close() {
///關閉
_dataSubscription.cancel();
_dataController.close();
}
}
複製代碼
在設置好監聽後,以後每次有事件變化時, listen
內的方法就會被調用,同時你還能夠經過操做符對 Stream
進行變換處理。async
以下代碼所示,是否是一股 rx
風撲面而來?ide
_dataStream.where(test).map(convert).transform(streamTransformer).listen(onData);
複製代碼
而在 Flutter 中, 最後結合 StreamBuilder
, 就能夠完成 基於事件流的異步狀態控件 了!佈局
StreamBuilder<List<String>>(
stream: dataStream,
initialData: ["none"],
///這裏的 snapshot 是數據快照的意思
builder: (BuildContext context, AsyncSnapshot<List<String>> snapshot) {
///獲取到數據,隨心所欲的更新 UI
var data = snapshot.data;
return Container();
});
複製代碼
那麼問題來了,它們內部到底是若是實現的呢?原理是什麼?各自的做用是什麼?都有哪些特性呢?後面咱們將開始深刻解析這個邏輯 。post
從上面咱們知道,在 Flutter 中使用 Stream
主要有四個對象,那麼這四個對象是如何「勾搭」在一塊兒的?他們各自又擔任什麼責職呢?
首先以下圖,咱們能夠從進階版的流程圖上看出 整個 Stream
的內部工做流程。
Flutter中 Stream
、StreamController
、StreamSink
和 StreamSubscription
都是 abstract
對象,他們對外抽象出接口,而內部實現對象大部分都是 _
開頭的如 _SyncStreamController
、ControllerStream
等私有類,在這基礎上整個流程歸納起來就是:
有一個事件源叫 Stream
,爲了方便控制 Stream
,官方提供了使用 StreamController
做爲管理;同時它對外提供了 StreamSink
對象做爲事件輸入口,可經過 sink
屬性訪問; 又提供 stream
屬性提供 Stream
對象的監聽和變換,最後獲得的 StreamSubscription
能夠管理事件的訂閱。
因此咱們能夠總結出:
Stream
過程的控制,提供各種接口用於建立各類事件流。add
, addStream
等。listen
、 where
。cacenl
、pause
,同時在內部也是事件的中轉關鍵。回到 Stream
的工做流程上,在上圖中咱們知道, 經過 StreamSink.add
添加一個事件時, 事件最後會回調到 listen
中的 onData
方法,這個過程是經過 zone.runUnaryGuarded
執行的,這裏 zone.runUnaryGuarded
是什麼做用後面再說,咱們須要知道這個 onData
是怎麼來的?
如上圖,經過源碼咱們知道:
一、Stream
在 listen
的時候傳入了 onData
回調,這個回調會傳入到 StreamSubscription
中,以後經過 zone.registerUnaryCallback
註冊獲得 _onData
對象( 不是前面的 onData
回調哦 )。
二、StreamSink
在添加事件是,會執行到 StreamSubscription
中的 _sendData
方法,而後經過 _zone.runUnaryGuarded(_onData, data);
執行 1 中獲得的 _onData
對象,觸發 listen
時傳入的回調方法。
能夠看出整個流程都是和 StreamSubscription
相關的,如今咱們已經知道從 事件入口到事件出口 的整個流程時怎麼運做的,那麼這個過程是**怎麼異步執行的呢?其中頻繁出現的 zone
是什麼?
首先咱們須要知道,Stream 是怎麼實現異步的?
這就須要說到 Dart 中的異步實現邏輯了,由於 Dart 是 單線程應用 ,和大多數單線程應用同樣,Dart 是以 消息循環機制 來運行的,而這裏面主要包含兩個任務隊列,一個是 microtask 內部隊列,一個是 event 外部隊列,而 microtask 的優先級又高於 event 。
默認的在 Dart 中,如 點擊、滑動、IO、繪製事件 等事件都屬於 event 外部隊列,microtask 內部隊列主要是由 Dart 內部產生,而 Stream
中的執行異步的模式就是 scheduleMicrotask
了。
由於 microtask 的優先級又高於 event ,因此若是 microtask 太多就可能會對觸摸、繪製等外部事件形成阻塞卡頓哦。
以下圖,就是 Stream 內部在執行異步操做過程執行流程:
那麼 Zone
又是什麼?它是哪裏來的?
在上一篇章中說過,由於 Dart 中 Future
之類的異步操做是沒法被當前代碼 try/cacth
的,而在 Dart 中你能夠給執行對象指定一個 Zone
,相似提供一個沙箱環境 ,而在這個沙箱內,你就能夠所有能夠捕獲、攔截或修改一些代碼行爲,好比全部未被處理的異常。
那麼項目中默認的 Zone
是怎麼來的?在 Flutter 中,Dart 中的 Zone
啓動是在 _runMainZoned
方法 ,以下代碼所示 _runMainZoned
的 @pragma("vm:entry-point")
註解表示該方式是給 Engine 調用的,到這裏咱們知道了 Zone
是怎麼來的了。
///Dart 中
@pragma('vm:entry-point')
// ignore: unused_element
void _runMainZoned(Function startMainIsolateFunction, Function userMainFunction) {
startMainIsolateFunction((){
runZoned<Future<void>>(····);
}, null);
}
///C++ 中
if (tonic::LogIfError(tonic::DartInvokeField(
Dart_LookupLibrary(tonic::ToDart("dart:ui")), "_runMainZoned",
{start_main_isolate_function, user_entrypoint_function}))) {
FML_LOG(ERROR) << "Could not invoke the main entrypoint.";
return false;
}
複製代碼
那麼 zone.runUnaryGuarded
的做用是什麼?相較於 scheduleMicrotask
的異步操做,官方的解釋是:在此區域中使用參數執行給定操做並捕獲同步錯誤。 相似的還有 runUnary
、 runBinaryGuarded
等,因此咱們知道前面提到的 zone.runUnaryGuarded
就是 Flutter 在運行的這個 zone 裏執行已經註冊的 _onData
,並捕獲異常。
前面咱們說了 Stream
的內部執行流程,那麼同步和異步操做時又有什麼區別?具體實現時怎麼樣的呢?
咱們以默認 Stream
流程爲例子, StreamController
的工廠建立能夠經過 sync
指定同步仍是異步,默認是異步模式的。 而不管異步仍是同步,他們都是繼承了 _StreamController
對象,區別仍是在於 mixins
的是哪一個 _EventDispatch
實現:
_AsyncStreamControllerDispatch
_SyncStreamControllerDispatch
上面這兩個 _EventDispatch
最大的不一樣就是在調用 sendData
提交事件時,是直接調用 StreamSubscription
的 _add
方法,仍是調用 _addPending(new _DelayedData<T>(data));
方法的區別。
以下圖, 異步執行的邏輯就是上面說過的 scheduleMicrotask
, 在 _StreamImplEvents
中 scheduleMicrotask
執行後,會調用 _DelayedData
的 perform
,最後經過 _sendData
觸發 StreamSubscription
去回調數據 。
在 Stream
中又非爲廣播和非廣播模式,若是是廣播模式中,StreamControlle
的實現是由以下所示實現的,他們的基礎關係以下圖所示:
_SyncBroadcastStreamController
_AsyncBroadcastStreamController
廣播和非廣播的區別在於調用 _createSubscription
時,內部對接口類 _StreamControllerLifecycle
的實現,同時它們的差別在於:
在 _StreamController
裏判斷了若是 Stream
是 _isInitialState
的,也就是訂閱過的,就直接報錯 "Stream has already been listened to." ,只有未訂閱的才建立 StreamSubscription
。
在 _BroadcastStreamController
中,_isInitialState
的判斷被去掉了,取而代之的是 isClosed
判斷,而且在廣播中, _sendData
是一個 forEach
執行:
_forEachListener((_BufferingStreamSubscription<T> subscription) {
subscription._add(data);
});
複製代碼
Stream
是支持變換處理的,針對 Stream
咱們能夠通過屢次變化來獲得咱們須要的結果。那麼這些變化是怎麼實現的呢?
以下圖所示,通常操做符變換的 Stream
實現類,都是繼承了 _ForwardingStream
, 在它的內部的_ForwardingStreamSubscription
裏,會經過上一個 Pre A Stream
的 listen
添加 _handleData
回調,以後在回調裏再次調用新的 Current B Stream
的 _handleData
。
因此事件變化的本質就是,變換都是對 Stream
的 listen
嵌套調用組成的。
同時 Stream
還有轉換爲 Future
, 如 firstWhere
、 elementAt
、 reduce
等操做符方法,基本都是建立一個內部 _Future
實例,而後再 listen
的回調用調用 Future
方法返回。
以下代碼所示, 在 Flutter 中經過 StreamBuilder
構建 Widget ,只需提供一個 Stream
實例便可,其中 AsyncSnapshot
對象爲數據快照,經過 data
緩存了當前數據和狀態,那 StreamBuilder
是如何與 Stream
關聯起來的呢?
StreamBuilder<List<String>>(
stream: dataStream,
initialData: ["none"],
///這裏的 snapshot 是數據快照的意思
builder: (BuildContext context, AsyncSnapshot<List<String>> snapshot) {
///獲取到數據,隨心所欲的更新 UI
var data = snapshot.data;
return Container();
});
複製代碼
如上圖所示, StreamBuilder
的調用邏輯主要在 _StreamBuilderBaseState
中,_StreamBuilderBaseState
在 initState
、didUpdateWidget
中會調用 _subscribe
方法,從而調用 Stream
的 listen
,而後經過 setState
更新UI,就是這麼簡單有木有?
咱們經常使用的
setState
中實際上是調用了markNeedsBuild
,markNeedsBuild
內部標記element
爲diry
,而後在下一幀WidgetsBinding.drawFrame
纔會被繪製,這能夠看出setState
並非當即生效的哦。
其實不管從訂閱或者變換均可以看出, Dart 中的 Stream
已經自帶了相似 rx
的效果,可是爲了讓 rx
的用戶們更方便的使用,ReactiveX 就封裝了 rxdart
來知足用戶的熟悉感,以下圖所示爲它們的對應關係:
在 rxdart
中, Observable
是一個 Stream
,而 Subject
繼承了 Observable
也是一個 Stream
,而且 Subject
實現了 StreamController
的接口,因此它也具備 Controller 的做用。
以下代碼所示是 rxdart
的簡單使用,能夠看出它屏蔽了外界須要對 StreamSubscription
和 StreamSink
等的認知,更符合 rx
歷史用戶的理解。
final subject = PublishSubject<String>();
subject.stream.listen(observerA);
subject.add("AAAA1");
subject.add("AAAA2"));
subject.stream.listen(observeB);
subject.add("BBBB1");
subject.close();
複製代碼
這裏咱們簡單分析下,以上方代碼爲例,
PublishSubject
內部實際建立是建立了一個廣播 StreamController<T>.broadcast
。
當咱們調用 add
或者 addStream
時,最終會調用到的仍是咱們建立的 StreamController.add
。
當咱們調用 onListen
時,也是將回調設置到 StreamController
中。
rxdart
在作變換時,咱們獲取到的 Observable
就是 this,也就是 PublishSubject
自身這個 Stream
,而 Observable
一系列的變換,也是基於建立時傳入的 stream
對象,好比:
@override
Observable<S> asyncMap<S>(FutureOr<S> convert(T value)) =>
Observable<S>(_stream.asyncMap(convert));
複製代碼
因此咱們能夠看出來,rxdart
只是對 Stream
進行了概念變換,變成了咱們熟悉的對象和操做符,而這也是爲何 rxdart
能夠在 StreamBuilder
中直接使用的緣由。
因此,到這裏你對 Flutter 中 Stream 有全面的理解了沒?
自此,第十一篇終於結束了!(///▽///)
《Flutter完整開發實戰詳解(1、Dart語言和Flutter基礎)》
《Flutter完整開發實戰詳解(4、Redux、主題、國際化)》
《Flutter完整開發實戰詳解(6、 深刻Widget原理)》
《Flutter完整開發實戰詳解(10、 深刻圖片加載流程)》