你們應該都吃過轉盤小火鍋吧,情形是這樣的的:好多我的坐在一塊,圍着一條傳送帶,每一個人的位置上都會有一個小火鍋,廚師將菜品放到傳送帶上,這些菜品會隨着傳送帶通過每一個人的位置,若是看到你想吃的菜品,則直接拿着放到本身的小火鍋裏;若是沒碰到想吃的,則直接濾過,但傳送帶會繼續將他們傳遞到下一我的的身邊。html
上述狀況咱們須要注意這樣幾個問題:java
1. 咱們只有坐在這條傳送帶旁邊,才能吃到上面的菜。web
2. 咱們想吃的菜品不是立馬就能出現到你面前的。若是你想吃肥牛,前提是須要廚師在傳送帶上面放一盤肥牛,肥牛纔會被傳送到你旁邊,可是你並不知道肥牛何時才能到你身邊。api
3. 傳送帶上的菜品都是按照廚師放置的順序依次被傳送到你身邊的。websocket
那麼什麼是 Stream 呢?上面的傳送帶就是 Stream,傳送帶上面能夠傳遞任何菜品,相應的 Stream 也就能夠傳遞任何數據類型。markdown
爲了方便的控制 Stream ,咱們一般使用 StreamController,StreamController
中提供了兩個屬性,一個是用來往 Stream
中添加數據的 sink
,一個是用於接收數據的 stream
。app
上面說了,只有坐在傳送帶旁邊才能吃到傳送帶上的菜品,這裏,咱們也只須要使用streamController.stream.listen(...)
就能夠收到通知 (當 Stream上有數據的時候)。socket
只要咱們坐在傳送帶前,咱們就成了消費者,對應的,當咱們 listen
了一個 Stream
的時候,咱們就成了一個 StreamSubscription (訂閱者)。async
最後,當咱們不須要這個 Stream
的時候,咱們須要將其 close
掉,使用streamController.close()
ide
下面咱們用一個例子來演示下:
import 'dart:async';
void main() {
// 聲明一個 StreamController
StreamController controller = StreamController();
// 監聽此 Stream
StreamSubscription subscription1 =
controller.stream.listen((value) => print('$value'));
// 往 Stream 中添加數據
controller.sink.add(0);
controller.sink.add('a, b, c, d');
controller.sink.add(3.14);
// 關閉 StreamController
controller.close();
}
複製代碼
輸出:
0
a, b, c, d
3.14
複製代碼
上述代碼中,咱們聲明瞭一個 StreamController,而後每每裏面放置了三種不一樣的數據類型,固然,咱們也可使用泛型的方式來限制裏面的數據類型:
StreamController<int> controller = StreamController<int>();
複製代碼
接下來咱們看看 stream.listne(...)
這個方法,源碼以下:
StreamSubscription<T> listen(void onData(T event),
{Function onError, void onDone(), bool cancelOnError});
複製代碼
能夠看出 listen(...)
方法接受一個必選參數和三個可選參數,咱們上一個例子中只是傳遞了必選參數 onData(...)
,其餘額三個參數並無傳遞,下面咱們來舉例說明 listen(...)
方法中各個參數的用途。
import 'dart:async';
void main() {
StreamController<int> controller = StreamController<int>();
controller.stream
.listen(onData, onError: onError, onDone: onDone, cancelOnError: true);
controller.sink.add(0);
controller.sink.add(1);
// 發送一個 Error
controller.sink.addError(-1);
controller.sink.add(2);
controller.close();
}
void onData(int data) {
print('The value is $data');
}
void onError(err) {
print('The err is $err');
}
void onDone() {
print('The stream is done !');
}
複製代碼
輸出:
The value is 0
The value is 1
The err is -1
複製代碼
從上述代碼中,咱們能夠看到,stream.listen(...)
,接受 4 個參數,這 4 個參數的做用分別以下:
onData(T data)
: 用來接收 Stream
中的每個事件。
onError(...)
: 註釋上是這樣說的 The [onError] callback must be of type void onError(error)
,也就是所他須要一個接受一個參數的方法。但它還支持接受兩個參數的方法 void onError(error, StackTrace stackTrace)
。
onDone()
: 當一個 Stream
關閉了,也就是執行了 stream.close()
方法而且發送了 done
事件,這個方法會被調用。
cancelOnError
: 這是一個 bool
類型的值,意思也很簡單,就是當 Stream
碰到 Error
事件的時候,是否關閉這個 Stream
。
咱們上述代碼中 cancelOnError
參數傳遞的是 true
,也就是說當Stream
遇到 Error
的時候,Stream
就關閉了,下面的事件就不會再發送出去了。咱們把上面代碼中的 cancelOnError
參數改成 false
其餘的代碼不變,輸出以下:
The value is 0
The value is 1
The err is -1
The value is 2
The stream is done !
複製代碼
能夠看出,這個狀況下,即便 Stream
中遇到了 Error
,下面的事件依然會接着發送,而且最後的 done
事件也執行了。
Stream 類型分爲兩種,分別爲 Single-subscription Streams 和 Broadcast Streams
這種類型的 Stream
只容許一個訂閱者,也就是隻能 listen
一次。咱們上一小節中的例子就是一個 Single-subscription Streams。接下來咱們看看若是咱們對這種類型的 Stream
訂閱兩次會發生什麼狀況。
import 'dart:async';
void main() {
// Single-subscription Streams
StreamController<int> controller = StreamController<int>();
// 第一個訂閱者
StreamSubscription subscription1 =
controller.stream.listen((value) => print('subscription1 $value'));
// 第二個訂閱者
StreamSubscription subscription2 =
controller.stream.listen((value) => print('subscription2 $value'));
controller.sink.add(0);
controller.sink.add(1);
controller.close();
}
複製代碼
輸出:
Unhandled exception:
Bad state: Stream has already been listened to.
...
複製代碼
這種類型的 Stream
容許任意數量的訂閱者,只是新的訂閱者只能從它開始訂閱的時候接收事件。也就是訂閱以前 Stream
中的事件是接受不到的。
Broadcast Streams 的聲明方式以下:
StreamController<int> controller = StreamController<int>.broadcast();
複製代碼
接下來咱們寫個示例看看:
import 'dart:async';
void main() {
// Broadcast Streams
StreamController<int> controller = StreamController<int>.broadcast();
StreamSubscription sub1 = controller.stream.listen((value) => print('sub1 value is $value'));
controller.sink.add(0);
controller.sink.add(1);
StreamSubscription sub2 = controller.stream.listen((value) => print('sub2 value is $value'));
controller.sink.add(2);
controller.sink.add(3);
controller.close();
}
複製代碼
輸出:
sub1 value is 0
sub2 value is 2
sub1 value is 1
sub2 value is 3
sub1 value is 2
sub1 value is 3
複製代碼
能夠看出 sub1
能夠接收到 Stream
中全部的數據,而 sub2
只能接收到從訂閱這個 Stream
以後發送的數據。
當數據經過 Stream
傳遞的時候,咱們能夠按需來轉換裏面的數據,Dart 中給咱們提供了 StreamTransformer
來對數據作出一些特定轉換。
咱們能夠經過三種方式來實現數據轉換:
Stream
自帶的方法,如 map
,where
等StreamTransformer.fromHandlers(...)
來轉換StreamTransformer
來定義一個轉換器import 'dart:async';
void main() {
StreamController<int> controller = StreamController<int>();
controller.stream
.where((value) => value % 2 == 0) // where
.map((value) => 'The value is $value') // map
.listen((value) => print(value));
controller.sink.add(0);
controller.sink.add(1);
controller.sink.add(2);
controller.sink.add(3);
controller.sink.add(4);
controller.close();
}
複製代碼
輸出:
The value is 0
The value is 2
The value is 4
複製代碼
上述代碼中,咱們使用了 where
和 map
轉換符。where
將知足條件的值過濾出來,而後 map
將整型的數字轉換成字符串類型的值。
import 'dart:async';
// 轉換方法
void handleData(data, EventSink sink) {
if (data % 2 == 0) {
sink.add(data);
}
}
void main() {
StreamController<int> controller = StreamController<int>();
controller.stream
.transform(StreamTransformer.fromHandlers(handleData: handleData))
.listen((value) => print(value));
controller.sink.add(0);
controller.sink.add(1);
controller.sink.add(2);
controller.sink.add(3);
controller.sink.add(4);
controller.close();
}
複製代碼
輸出:
0
2
4
複製代碼
在自定義咱們本身的 Transformer 以前,咱們先來看看 stream.transform(...)
作了什麼事情,畢竟咱們是經過 transform(...)
方法傳入的 Transformer, transform(...)
源碼以下:
Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer) {
return streamTransformer.bind(this);
}
複製代碼
能夠看出 transform 方法返回的依然回一個 Stream ,只不過這個 Stream 是通過轉換後的Stream。 相似於 Java8 Stream 中的中間流,就是不停的返回 Stream 的那種。而後該方法是一個泛型方法,泛型類型分別爲 T,和 S,能夠這樣理解,T 爲該方法的入參類型,S 爲該方法的出參類型,相似於 Java8 中的 Function,咱們能夠看下 Java 中的 Function 接口的部分源碼:
/** * ... * @param <T> the type of the input to the function * @param <R> the type of the result of the function * * @since 1.8 */
@FunctionalInterface
public interface Function<T, R> {
R apply(T t);
...
}
複製代碼
最後,transform(...)
方法調用了 streamTransformer.bind(this);
該方法返回的是一個新的 Stream
,也就是轉換後的 Stream
,固然,bind()
方法也是咱們自定義 StreamTransformer
時須要實現的方法。
咱們在以前的例子中聲明一個 StreamController
的時候,都是沒有傳遞參數的,其實,StreamController 的構造方法是這樣的:
factory StreamController(
{void onListen(),
void onPause(),
void onResume(),
onCancel(),
bool sync: false}) {
return sync
? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
: new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
}
複製代碼
能夠看出,StreamController 是有參數的,而且都是可選參數,在這些參數中,咱們重點實現 onListen()
,關於 onListen
的源碼解釋以下:
A Stream should be inert until a subscriber starts listening on it (using
the [onListen] callback to start producing events). Streams should not
leak resources (like websockets) when no user ever listens on the stream.
重點就是 onListen
方法是用來生產事件的。
接下來,來實現一個自定義的 StreamTransformer:
/// 自定義一個 StreamTransformer ,
/// 泛型類型 S 爲入參類型,T 爲出參類型
/// 這些類型都是 Stream 中傳遞的數據類型
class MyTransformer<S, T> implements StreamTransformer<S, T> {
// 用來生成一個新的 Stream 而且控制符合條件的數據
StreamController _controller;
StreamSubscription _subscription;
bool cancelOrError;
// 轉換以前的 Stream
Stream<S> _stream;
MyTransformer({bool sync: false, this.cancelOrError}) {
_controller = new StreamController<T>(
onListen: _onListen,
onCancel: _onCancel,
onPause: () {
_subscription.pause();
},
onResume: () {
_subscription.resume();
},
sync: sync);
}
MyTransformer.broadcast({bool sync: false, bool this.cancelOrError}) {
// 定義一個 StreamController,注意泛型類型爲 T,也就是出參類型,由於
// 咱們是使用該 _controller 生成一個用來返回的新的 Stream<T>
_controller = new StreamController<T>.broadcast(
onListen: _onListen, onCancel: _onCancel, sync: sync);
}
void _onListen() {
// _stream 爲轉換以前的 Stream<S>
_subscription = _stream.listen(onData,
onError: _controller.addError,
onDone: _controller.close,
cancelOnError: cancelOrError);
}
void _onCancel() {
_subscription.cancel();
_subscription = null;
}
// 數據轉換
void onData(S data) {
if ((data as int) % 2 == 0) {
// 將符合條件的數據添加到新的 Stream 中
_controller.sink.add(data);
}
}
// 參數爲轉換以前的 Stream<S>
// 返回的是一個新的 Stream<T> (轉換以後的 Stream)
@override
Stream<T> bind(Stream<S> stream) {
this._stream = stream;
return _controller.stream;
}
@override
StreamTransformer<RS, RT> cast<RS, RT>() {
// TODO: implement cast
return null;
}
}
複製代碼
使用以下:
void main() {
StreamController<int> controller = StreamController<int>();
controller.stream
.transform(new MyTransformer()) // 自定義的 StreamTransformer
.listen((value) => print('$value'));
controller.sink.add(0);
controller.sink.add(1);
controller.sink.add(2);
controller.sink.add(3);
controller.sink.add(4);
controller.close();
}
複製代碼
輸出:
0
2
4
複製代碼
若有錯誤,還請指出。謝謝!!!