Stream in Dart

你們應該都吃過轉盤小火鍋吧,情形是這樣的的:好多我的坐在一塊,圍着一條傳送帶,每一個人的位置上都會有一個小火鍋,廚師將菜品放到傳送帶上,這些菜品會隨着傳送帶通過每一個人的位置,若是看到你想吃的菜品,則直接拿着放到本身的小火鍋裏;若是沒碰到想吃的,則直接濾過,但傳送帶會繼續將他們傳遞到下一我的的身邊。html

上述狀況咱們須要注意這樣幾個問題:java

1. 咱們只有坐在這條傳送帶旁邊,才能吃到上面的菜。web

2. 咱們想吃的菜品不是立馬就能出現到你面前的。若是你想吃肥牛,前提是須要廚師在傳送帶上面放一盤肥牛,肥牛纔會被傳送到你旁邊,可是你並不知道肥牛何時才能到你身邊。api

3. 傳送帶上的菜品都是按照廚師放置的順序依次被傳送到你身邊的。websocket

Stream

那麼什麼是 Stream 呢?上面的傳送帶就是 Stream,傳送帶上面能夠傳遞任何菜品,相應的 Stream 也就能夠傳遞任何數據類型。markdown

爲了方便的控制 Stream ,咱們一般使用 StreamControllerStreamController 中提供了兩個屬性,一個是用來往 Stream 中添加數據的 sink,一個是用於接收數據的 streamapp

上面說了,只有坐在傳送帶旁邊才能吃到傳送帶上的菜品,這裏,咱們也只須要使用streamController.stream.listen(...) 就能夠收到通知 (當 Stream上有數據的時候)。socket

只要咱們坐在傳送帶前,咱們就成了消費者,對應的,當咱們 listen 了一個 Stream 的時候,咱們就成了一個 StreamSubscription (訂閱者)。async

最後,當咱們不須要這個 Stream 的時候,咱們須要將其 close 掉,使用streamController.close()ide

下面咱們用一個例子來演示下:

import 'dart:async';  
  
void main() {  
  // 聲明一個 StreamController 
  StreamController controller = StreamController();  
  
  // 監聽此 Stream 
  StreamSubscription subscription1 =  
      controller.stream.listen((value) => print('$value'));  
  
  // 往 Stream 中添加數據 
  controller.sink.add(0);  
  controller.sink.add('a, b, c, d');  
  controller.sink.add(3.14);  
  
  // 關閉 StreamController 
  controller.close();  
}
複製代碼

輸出:

0
a, b, c, d
3.14
複製代碼

上述代碼中,咱們聲明瞭一個 StreamController,而後每每裏面放置了三種不一樣的數據類型,固然,咱們也可使用泛型的方式來限制裏面的數據類型:

StreamController<int> controller = StreamController<int>();
複製代碼

接下來咱們看看 stream.listne(...) 這個方法,源碼以下:

StreamSubscription<T> listen(void onData(T event),  
  {Function onError, void onDone(), bool cancelOnError});
複製代碼

能夠看出 listen(...) 方法接受一個必選參數和三個可選參數,咱們上一個例子中只是傳遞了必選參數 onData(...),其餘額三個參數並無傳遞,下面咱們來舉例說明 listen(...) 方法中各個參數的用途。

import 'dart:async';  
  
void main() {  
  StreamController<int> controller = StreamController<int>();  
  
  controller.stream  
  .listen(onData, onError: onError, onDone: onDone, cancelOnError: true);  
  
  controller.sink.add(0);  
  controller.sink.add(1);  
  // 發送一個 Error 
  controller.sink.addError(-1);  
  controller.sink.add(2);  
  
  controller.close();  
}  
  
void onData(int data) {  
  print('The value is $data');  
}  
  
void onError(err) {  
  print('The err is $err');  
}  
  
void onDone() {  
  print('The stream is done !');  
}
複製代碼

輸出:

The value is 0
The value is 1
The err is -1
複製代碼

從上述代碼中,咱們能夠看到,stream.listen(...) ,接受 4 個參數,這 4 個參數的做用分別以下:

onData(T data) : 用來接收 Stream 中的每個事件。

onError(...) : 註釋上是這樣說的 The [onError] callback must be of type void onError(error),也就是所他須要一個接受一個參數的方法。但它還支持接受兩個參數的方法 void onError(error, StackTrace stackTrace)

onDone() : 當一個 Stream 關閉了,也就是執行了 stream.close() 方法而且發送了 done 事件,這個方法會被調用。

cancelOnError : 這是一個 bool 類型的值,意思也很簡單,就是當 Stream 碰到 Error 事件的時候,是否關閉這個 Stream

咱們上述代碼中 cancelOnError 參數傳遞的是 true ,也就是說當Stream遇到 Error 的時候,Stream 就關閉了,下面的事件就不會再發送出去了。咱們把上面代碼中的 cancelOnError 參數改成 false 其餘的代碼不變,輸出以下:

The value is 0
The value is 1
The err is -1
The value is 2
The stream is done !
複製代碼

能夠看出,這個狀況下,即便 Stream 中遇到了 Error ,下面的事件依然會接着發送,而且最後的 done 事件也執行了。

Stream 類型

Stream 類型分爲兩種,分別爲 Single-subscription StreamsBroadcast Streams

Single-subscription Streams

這種類型的 Stream 只容許一個訂閱者,也就是隻能 listen 一次。咱們上一小節中的例子就是一個 Single-subscription Streams。接下來咱們看看若是咱們對這種類型的 Stream 訂閱兩次會發生什麼狀況。

import 'dart:async';  
  
void main() {  
  // Single-subscription Streams 
  StreamController<int> controller = StreamController<int>();  
  
  // 第一個訂閱者 
  StreamSubscription subscription1 =  
      controller.stream.listen((value) => print('subscription1 $value'));  
  
  // 第二個訂閱者 
  StreamSubscription subscription2 =  
      controller.stream.listen((value) => print('subscription2 $value'));  
  
  controller.sink.add(0);  
  controller.sink.add(1);  
  
  controller.close();  
}
複製代碼

輸出:

Unhandled exception:
Bad state: Stream has already been listened to.
...
複製代碼

Broadcast Streams

這種類型的 Stream 容許任意數量的訂閱者,只是新的訂閱者只能從它開始訂閱的時候接收事件。也就是訂閱以前 Stream 中的事件是接受不到的。

Broadcast Streams 的聲明方式以下:

StreamController<int> controller = StreamController<int>.broadcast();
複製代碼

接下來咱們寫個示例看看:

import 'dart:async';  
  
void main() {  
  // Broadcast Streams 
  StreamController<int> controller = StreamController<int>.broadcast();  
  
  StreamSubscription sub1 =  controller.stream.listen((value) => print('sub1 value is $value'));  
  controller.sink.add(0);  
  controller.sink.add(1);  
  
  StreamSubscription sub2 = controller.stream.listen((value) => print('sub2 value is $value'));  
  controller.sink.add(2);  
  controller.sink.add(3);  
  
  controller.close();  
}
複製代碼

輸出:

sub1 value is 0
sub2 value is 2
sub1 value is 1
sub2 value is 3
sub1 value is 2
sub1 value is 3
複製代碼

能夠看出 sub1 能夠接收到 Stream 中全部的數據,而 sub2 只能接收到從訂閱這個 Stream 以後發送的數據。

StreamTransformer

當數據經過 Stream 傳遞的時候,咱們能夠按需來轉換裏面的數據,Dart 中給咱們提供了 StreamTransformer 來對數據作出一些特定轉換。

咱們能夠經過三種方式來實現數據轉換:

  • Stream 自帶的方法,如 mapwhere
  • 經過 StreamTransformer.fromHandlers(...) 來轉換
  • 直接實現一個 StreamTransformer 來定義一個轉換器

map、where ...

import 'dart:async';  
  
void main() {  
  StreamController<int> controller = StreamController<int>();  
  
  controller.stream  
  .where((value) => value % 2 == 0) // where 
  .map((value) => 'The value is $value') // map 
  .listen((value) => print(value));  
  
  controller.sink.add(0);  
  controller.sink.add(1);  
  controller.sink.add(2);  
  controller.sink.add(3);  
  controller.sink.add(4);  
  
  controller.close();  
}
複製代碼

輸出:

The value is 0
The value is 2
The value is 4
複製代碼

上述代碼中,咱們使用了 wheremap 轉換符。where 將知足條件的值過濾出來,而後 map 將整型的數字轉換成字符串類型的值。

StreamTransformer.fromHandlers(...)

import 'dart:async';  
  
// 轉換方法 
void handleData(data, EventSink sink) {  
  if (data % 2 == 0) {  
    sink.add(data);  
  }  
}  
  
void main() {  
  StreamController<int> controller = StreamController<int>();  
  
  controller.stream  
  .transform(StreamTransformer.fromHandlers(handleData: handleData))  
      .listen((value) => print(value));  
  
  controller.sink.add(0);  
  controller.sink.add(1);  
  controller.sink.add(2);  
  controller.sink.add(3);  
  controller.sink.add(4);  
  
  controller.close();  
}
複製代碼

輸出:

0
2
4
複製代碼

自定義 StreamTransformer

在自定義咱們本身的 Transformer 以前,咱們先來看看 stream.transform(...) 作了什麼事情,畢竟咱們是經過 transform(...) 方法傳入的 Transformer, transform(...) 源碼以下:

Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer) {  
  return streamTransformer.bind(this);  
}
複製代碼

能夠看出 transform 方法返回的依然回一個 Stream ,只不過這個 Stream 是通過轉換後的Stream。 相似於 Java8 Stream 中的中間流,就是不停的返回 Stream 的那種。而後該方法是一個泛型方法,泛型類型分別爲 T,和 S,能夠這樣理解,T 爲該方法的入參類型,S 爲該方法的出參類型,相似於 Java8 中的 Function,咱們能夠看下 Java 中的 Function 接口的部分源碼:

/** * ... * @param <T> the type of the input to the function * @param <R> the type of the result of the function * * @since 1.8 */
@FunctionalInterface  
public interface Function<T, R> {  
 R apply(T t);
 ...
 }
複製代碼

最後,transform(...) 方法調用了 streamTransformer.bind(this); 該方法返回的是一個新的 Stream,也就是轉換後的 Stream,固然,bind() 方法也是咱們自定義 StreamTransformer 時須要實現的方法。

咱們在以前的例子中聲明一個 StreamController 的時候,都是沒有傳遞參數的,其實,StreamController 的構造方法是這樣的:

factory StreamController(  
    {void onListen(),  
  void onPause(),  
  void onResume(),  
  onCancel(),  
  bool sync: false}) {  
  return sync  
      ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)  
      : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);  
}
複製代碼

能夠看出,StreamController 是有參數的,而且都是可選參數,在這些參數中,咱們重點實現 onListen(),關於 onListen 的源碼解釋以下:

A Stream should be inert until a subscriber starts listening on it (using
the [onListen] callback to start producing events). Streams should not
leak resources (like websockets) when no user ever listens on the stream.

重點就是 onListen 方法是用來生產事件的。

接下來,來實現一個自定義的 StreamTransformer:

/// 自定義一個 StreamTransformer ,
/// 泛型類型 S 爲入參類型,T 爲出參類型
/// 這些類型都是 Stream 中傳遞的數據類型
class MyTransformer<S, T> implements StreamTransformer<S, T> {

  // 用來生成一個新的 Stream 而且控制符合條件的數據
  StreamController _controller;

  StreamSubscription _subscription;

  bool cancelOrError;

  // 轉換以前的 Stream
  Stream<S> _stream;

  MyTransformer({bool sync: false, this.cancelOrError}) {
    _controller = new StreamController<T>(
        onListen: _onListen,
        onCancel: _onCancel,
        onPause: () {
          _subscription.pause();
        },
        onResume: () {
          _subscription.resume();
        },
        sync: sync);
  }

  MyTransformer.broadcast({bool sync: false, bool this.cancelOrError}) {
    // 定義一個 StreamController,注意泛型類型爲 T,也就是出參類型,由於
    // 咱們是使用該 _controller 生成一個用來返回的新的 Stream<T>
    _controller = new StreamController<T>.broadcast(
        onListen: _onListen, onCancel: _onCancel, sync: sync);
  }

  void _onListen() {
    // _stream 爲轉換以前的 Stream<S>
    _subscription = _stream.listen(onData,
        onError: _controller.addError,
        onDone: _controller.close,
        cancelOnError: cancelOrError);
  }

  void _onCancel() {
    _subscription.cancel();
    _subscription = null;
  }

  // 數據轉換
  void onData(S data) {
    if ((data as int) % 2 == 0) {
      // 將符合條件的數據添加到新的 Stream 中
      _controller.sink.add(data);
    }
  }

  // 參數爲轉換以前的 Stream<S>
  // 返回的是一個新的 Stream<T> (轉換以後的 Stream)
  @override
  Stream<T> bind(Stream<S> stream) {
    this._stream = stream;
    return _controller.stream;
  }

  @override
  StreamTransformer<RS, RT> cast<RS, RT>() {
    // TODO: implement cast
    return null;
  }
}
複製代碼

使用以下:

void main() {
  StreamController<int> controller = StreamController<int>();

  controller.stream
      .transform(new MyTransformer()) // 自定義的 StreamTransformer
      .listen((value) => print('$value'));

  controller.sink.add(0);
  controller.sink.add(1);
  controller.sink.add(2);
  controller.sink.add(3);
  controller.sink.add(4);

  controller.close();
}
複製代碼

輸出:

0
2
4
複製代碼

若有錯誤,還請指出。謝謝!!!

參考連接

相關文章
相關標籤/搜索