Flutter 詳解 (6、深刻了解Stream)

Future

Future有三種狀態未完成完成帶有值完成帶有異常,使用Future能夠簡化事件任務。 假如你有一個按鈕,點擊以後開始下載圖片,首先事件循環機制會處理你的點擊事件,而後開始下載圖片,當下載完成,你可使用then來註冊回調,而後獲取到圖片並顯示出來。html

一般咱們不會直接建立,網絡下載圖片會返回一個Future,文件I/O會返回一個Future,那咱們怎麼建立一個呢?只須要關鍵字async就表示該函數異步執行,返回類型是Future<T>git

Future<String> getStr()async{
  var str = HttpRequest.getString('www.fgyong.cn');
  return str;
}

使用http請求地址www.fgyong.cn獲取數據,而後返回。github

如何接收文本呢?數組

其實很簡單,只須要使用await關鍵字便可,用來註冊then回調。瀏覽器

main(List<String> args) async {
  String string = await getStr();
  print(string);
}

等同於:服務器

main(List<String> args) async {
  getStr().then((value) {
    print(value);
  });
}

官方比較推薦前者,由於前者看起來很像同步函數,少了層層嵌套,方便開發者理解代碼。
網絡下載想延遲動畫隱藏時間,可使用Future.delayed()網絡

await Future.delayed(Duration(seconds: 2), () {
hideAnimation();
});

若是已經帶有值的想異步去執行,那麼可使用Future.value()多線程

Stream

dart:async庫包含對許多Dart API很重要的兩種類型:StreamFuture。若是Future表示單個計算的結果,則流是一系列結果。您偵聽流以獲取有關結果(數據和錯誤)以及流關閉的通知。您還能夠在收聽流時暫停播放或在流完成以前中止收聽。閉包

如何使用Stream

流能夠經過多種方式建立,後續在仔細講解,可是它們均可以以相同的方式使用:異步for循環(一般僅稱爲await for)遍歷流的事件,如for循環迭代遍歷。例如:異步

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (var value in stream) {
    sum += value;
  }
  return sum;
}

此代碼僅接收整數事件流中的每一個事件,將它們相加,而後返回(和)其和。當循環主體結束時,函數將暫停,直到下一個事件到達或流完成爲止。 該函數標記有async關鍵字,在使用await for循環時須要此關鍵字。 如下示例經過使用async *函數生成簡單的整數流來測試前面的代碼:

import 'dart:async';

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (var value in stream) {
    sum += value;
  }
  return sum;
}

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    yield i;
  }
}

main() async {
  var stream = countStream(10);
  var sum = await sumStream(stream);
  print(sum); // 55
}

當流中沒有更多事件時,就完成流,並通知接收事件的代碼,就像通知新事件到來同樣。使用await for循環讀取事件時,流完成後循環中止。 在某些狀況下,流完成以前會發生錯誤;多是網絡從遠程服務器上獲取文件時發生故障,或者建立事件的代碼存在錯誤,可是有人須要瞭解它。 流還能夠傳遞錯誤事件,就像傳遞數據事件同樣。大多數流將在出現第一個錯誤後中止,但有可能傳遞多個錯誤的流以及在發生錯誤事件後傳遞更多數據的流。在本文檔中,咱們僅討論最多產生一個錯誤的流。 當使用await for讀取流時,循環語句會引起錯誤。這也結束了循環。您可使用try-catch捕獲錯誤。如下示例在循環迭代器等於時引起錯誤 4:

import 'dart:async';

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  try {
    await for (var value in stream) {
      sum += value;
    }
  } catch (e) {
    return -1;
  }
  return sum;
}

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    if (i == 4) {
      throw new Exception('Intentional exception');
    } else {
      yield i;
    }
  }
}

main() async {
  var stream = countStream(10);
  var sum = await sumStream(stream);
  print(sum); // -1
}

兩種Stream

單一訂閱流 最多見的流包含一系列事件,這些事件是較大總體的一部分。事件必須以正確的順序傳遞,而且不能丟失任何事件。這是您在讀取文件或接收Web請求時得到的流。 這樣的流只能被收聽一次。稍後再次收聽可能意味着錯過了最初的事件,而後其他部分毫無心義。當您開始收聽時,數據將被提取並以塊的形式提供。 廣播流 另外一種流是針對能夠一次處理的單個消息的。例如,這種流可用於瀏覽器中的鼠標事件。 您能夠隨時開始收聽這樣的流,而且在收聽時會觸發事件。多個收聽者能夠同時收聽,而且您能夠在取消上一個訂閱以後稍後再次收聽。

Stream的方法

Future<T> get first;
Future<bool> get isEmpty;
Future<T> get last;
Future<int> get length;
Future<T> get single;
Future<bool> any(bool Function(T element) test);
Future<bool> contains(Object needle);
Future<E> drain<E>([E futureValue]);
Future<T> elementAt(int index);
Future<bool> every(bool Function(T element) test);
Future<T> firstWhere(bool Function(T element) test, {T Function() orElse});
Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine);
Future forEach(void Function(T element) action);
Future<String> join([String separator = ""]);
Future<T> lastWhere(bool Function(T element) test, {T Function() orElse});
Future pipe(StreamConsumer<T> streamConsumer);
Future<T> reduce(T Function(T previous, T element) combine);
Future<T> singleWhere(bool Function(T element) test, {T Function() orElse});
Future<List<T>> toList();
Future<Set<T>> toSet();

這麼多方法基本均可以使用await for來循環

Future<bool> contains(Object needle) async {
  await for (var event in this) {
    if (event == needle) return true;
  }
  return false;
}

Future forEach(void Function(T element) action) async {
  await for (var event in this) {
    action(event);
  }
}

Future<List<T>> toList() async {
  final result = <T>[];
  await this.forEach(result.add);
  return result;
}

Future<String> join([String separator = ""]) async =>
    (await this.toList()).join(separator);

修改Stream

Stream<R> cast<R>();
Stream<S> expand<S>(Iterable<S> Function(T element) convert);
Stream<S> map<S>(S Function(T event) convert);
Stream<T> skip(int count);
Stream<T> skipWhile(bool Function(T element) test);
Stream<T> take(int count);
Stream<T> takeWhile(bool Function(T element) test);
Stream<T> where(bool Function(T event) test);

這些基本都是使用閉包過濾流的內容,固然也能夠轉換內容。

監聽Stream

最後是監聽,當流改變時會觸發監聽listen(),全部的流均可以被監聽。

StreamSubscription<T> listen(void Function(T event) onData,
    {Function onError, void Function() onDone, bool cancelOnError});

建立本身的Stream

建立Stream大概有三種,以下所示:

  • 轉換Stram
  • 使用async*建立Stream
  • 使用StreamController建立

轉換Stram

常常有些Stream包含的值不是咱們想要的,那麼就須要咱們轉換一下了,例如咱們把數字轉成字符串.

Future<String> _toString() async {
    var s = await _stream().map((event) => event.toString()).join('|');
    return s;
  }

也能夠轉成數組

Future<List> _toList() async {
    var s = await _stream().toList();
    return s;
  }

使用async*建立Stream

建立新流的一種方法是使用異步生成器(async *)函數。在調用該函數時建立該流,而且在偵聽該流時該函數的主體開始運行。函數返回時,流關閉。在函數返回以前,它可使用yieldyield *語句在流上發出事件。

這是一個原始示例,該示例會按期發射數字:

Stream<int> timedCounter(Duration interval, [int maxCount]) async* {
  int i = 0;
  while (true) {
    await Future.delayed(interval);
    yield i++;
    if (i == maxCount) break;
  }
}

此函數返回一個Stream。當收聽該流時,主體開始運行。它反覆延遲請求的時間間隔,而後產生下一個數字。若是忽略count參數,則循環上沒有中止條件,所以流永遠輸出愈來愈大的數字-或直到偵聽器取消其訂閱爲止。 當偵聽器取消時(經過在listen()方法返回的StreamSubscription對象上調用cancel()),則主體下一次到達yield語句時,yield將充當return語句。執行全部封閉的finally塊,而後函數退出。若是函數嘗試在退出前產生一個值,則該操做將失敗並充當返回值。 當函數最終退出時,由cancel()方法返回的Future完成。若是函數以錯誤退出,則Future會以該錯誤結束;不然,它以null結束。 另外一個更有用的示例是一個轉換序列的函數:

Stream<T> streamFromFutures<T>(Iterable<Future<T>> futures) async* {
  for (var future in futures) {
    var result = await future;
    yield result;
  }
}

使用yield*

使用yield*來調用其餘的函數取下一個值,當不獲取的時候,則不運行。

Stream<int> _stream() async* {
    if (_count < 10) {
      yield _count++;
      await Future.delayed(Duration(seconds: 1));
      sleep(Duration(seconds: 1));
      yield* _getDataFromServer();
    }
  }

yield*yield區別是後者直接返回一個固定的值,而前者返回是的是一個函數。前者多用於分流,把一個Stream<T>分爲其餘的Stream<R>Stream<E>

使用StreamController建立stream

_streamController = StreamController();
// 監聽
_streamController.stream.listen((event) { })
/// 添加數據
_streamController.add('data');

StreamControlelr的類圖以下所示,他們從SinkStreamSink都只是到導入接口,並沒有直接繼承關係,在抽象類方面按照功能解耦,最終由StreamController整合,負責添加的是StreamControlelr,負責監聽的是Stream,最終在當前的Zone中執行回調Zone.runUnaryGuarded()Zone相似一個沙盒環境,在APP啓動的時候建立。

原理

StreamController中初始化,直接調用了_SyncStreamController,具體功能所有在_SyncStreamController(同步)或_AsyncStreamController(異步)中實現。

factory StreamController(
  {void onListen(),
  void onPause(),
  void onResume(),
  onCancel(),
  bool sync: false}) {
return sync
    ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
    : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
}

最終在_BufferingStreamSubscription類中,實現了
onDoneonError函數,他們分別在類實例化的時候註冊回調函數。

void onData(void handleData(T event)) {
  handleData ??= _nullDataHandler;//默認值
  _onData = _zone.registerUnaryCallback<dynamic, T>(handleData);
}
void onDone(void handleDone()) {
  handleDone ??= _nullDoneHandler;
  _onDone = _zone.registerCallback(handleDone);
}

那麼listener函數是在什麼時候添加的呢?
在獲取stream_ControllerStream(),最終listener函數是_StreamImpl extends Steam中實現的,代碼以下:

StreamSubscription<T> listen(void onData(T data),
      {Function onError, void onDone(), bool cancelOnError}) {
    cancelOnError = identical(true, cancelOnError);
    StreamSubscription<T> subscription =
        _createSubscription(onData, onError, onDone, cancelOnError);
    _onListen(subscription);
    return subscription;
  }
  
/// 建立一個訂閱者
StreamSubscription<T> _createSubscription(void onData(T data),
    Function onError, void onDone(), bool cancelOnError) {
  return new _BufferingStreamSubscription<T>(
      onData, onError, onDone, cancelOnError);
}

最終監聽是執行的_BufferingStreamSubscription_onData(void HandleData(T event)),這裏是使用了_zone.registerUnaryCallback來註冊回調函數,不然在調用的時候會報錯。

void onData(void handleData(T event)) {
  handleData ??= _nullDataHandler;
  _onData = _zone.registerUnaryCallback<dynamic, T>(handleData);
}

stream.add()函數執行了_add(),源碼以下

void _add(T data) {
  assert(!_isClosed);
  if (_isCanceled) return;
  if (_canFire) {
    _sendData(data);
  } else {
    _addPending(new _DelayedData<T>(data));
  }
}

當狀態已關閉,直接斷言,當狀態已取消,返回操做,當能夠發送數據,則執行_sendData()函數,該函數纔是最終發送數據,執行listen操做的關鍵函數,源碼以下:

void _sendData(T data) {
  assert(!_isCanceled);
  assert(!_isPaused);
  assert(!_inCallback);
  bool wasInputPaused = _isInputPaused;
  _state |= _STATE_IN_CALLBACK;//取出來第六位
  _zone.runUnaryGuarded(_onData, data);
  _state &= ~_STATE_IN_CALLBACK;// 捨棄第六位
  _checkState(wasInputPaused);
}

經過final Zone _zone = Zone.current;獲取當前的Zone來執行已經註冊的回調_zone.runUnaryGuarded(_onData, data),執行完畢,使用_state&=~_STATE_IN_CALLBACK來保存當前狀態,使用&=~捨棄第6位數字,_STATE_IN_CALLBACK值爲32,那麼低第六位是1,~_STATE_IN_CALLBACK,除了第六位,剩下的都是1,而後&=來取出來其餘的位數值保存下來。

而後在_checkState(wasInutPaused)來肯定在執行callback中間並沒有狀態改變。

那麼再執行完畢_sendDone()的時候也是如此,首先判斷是否已經取消,沒取消的話,執行_onDone回調。

void _sendDone() {
    assert(!_isCanceled);
    assert(!_isPaused);
    assert(!_inCallback);

    void sendDone() {
      // If the subscription has been canceled while waiting for the cancel
      // future to finish we must not report the done event.
      if (!_waitsForCancel) return;
      _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK);
      _zone.runGuarded(_onDone);
      _state &= ~_STATE_IN_CALLBACK;
    }

    _cancel();
    _state |= _STATE_WAIT_FOR_CANCEL;
    if (_cancelFuture != null &&
        !identical(_cancelFuture, Future._nullFuture)) {
      _cancelFuture.whenComplete(sendDone);
    } else {
      sendDone();
    }
  }

事件流就是在前期使用Zone註冊,在add的時候使用Zone調用已經註冊好的回調,廣播是循環調用

關鍵函數:

/// 註冊回調
_zone.registerUnaryCallback<dynamic, T>(handleData)

/// 註冊不帶參數的回調

_zone.registerCallback(R callback())

/// 執行 附帶參數的回調

_zone.runUnaryGuarded(_onData, data)

更多API能夠查看官方源碼

參考

文章彙總

<<Dart 異步與多線程>>

<<Flutter 詳解(1、深刻了解狀態管理--ScopeModel)>>

<<Flutter 詳解(2、深刻了解狀態管理--Redux)>>

<<Flutter 詳解(3、深刻了解狀態管理--Provider)>>

<<Flutter 詳解(4、深刻了解狀態管理--BLoC)>>

<<Flutter 詳解 (5、深刻了解--Key)>>

<<Flutter 詳解 (6、深刻了解--Stream>>

相關文章
相關標籤/搜索