關於Dart 語言的Stream 部分,應該回到語言自己去尋找答案,許多資料在Flutter框架中囫圇吞棗式的解釋Stream
,總有一種讓人云山霧罩的感受,事實上從Dart語言自己去了解Stream並不複雜,接下來就花點時間好好學習一下Stream
吧!java
Stream
和 Future
都是Dart中異步編程的核心內容,在以前的文章中已經詳細敘述了關於Future
的知識,請查看 Dart 異步編程詳解之一文全懂,本篇文章則主要基於 Dart2.5 介紹Stream
的知識。編程
Stream
是Dart語言中的所謂異步數據序列的東西,簡單理解,其實就是一個異步數據隊列而已。咱們知道隊列的特色是先進先出的,Stream
也正是如此 bash
Stream
就像一個傳送帶。能夠將一側的物品自動運送到另外一側。如上圖,在另外一側,若是沒有人去抓取,物品就會掉落消失。
但若是咱們在末尾設置一個監聽,當物品到達末端時,就能夠觸發相應的響應行爲。
在Dart語言中,
Stream
有兩種類型,一種是點對點的單訂閱流(Single-subscription),另外一種則是廣播流。框架
單訂閱流的特色是隻容許存在一個監聽器,即便該監聽器被取消後,也不容許再次註冊監聽器。異步
建立一個Stream
有9個構造方法,其中一個是構造廣播流的,這裏主要看一下其中5個構造單訂閱流的方法async
void main(){
test();
}
test() async{
// 使用 periodic 建立流,第一個參數爲間隔時間,第二個參數爲回調函數
Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), callback);
// await for循環從流中讀取
await for(var i in stream){
print(i);
}
}
// 能夠在回調函數中對值進行處理,這裏直接返回了
int callback(int value){
return value;
}
複製代碼
打印結果:異步編程
0
1
2
3
4
...
複製代碼
該方法從整數0開始,在指定的間隔時間內生成一個天然數列,以上設置爲每一秒生成一次,callback
函數用於對生成的整數進行處理,處理後再放入Stream
中。這裏並未處理,直接返回了。要注意,這個流是無限的,它沒有任何一個約束條件使之中止。在後面會介紹如何給流設置條件。函數
void main(){
test();
}
test() async{
print("test start");
Future<String> fut = Future((){
return "async task";
});
// 從Future建立Stream
Stream<String> stream = Stream<String>.fromFuture(fut);
await for(var s in stream){
print(s);
}
print("test end");
}
複製代碼
打印結果:post
test start
async task
test end
複製代碼
該方法從一個Future
建立Stream
,當Future
執行完成時,就會放入Stream
中,然後從Stream
中將任務完成的結果取出。這種用法,很像異步任務隊列。學習
從多個Future
建立Stream
,即將一系列的異步任務放入Stream
中,每一個Future
按順序執行,執行完成後放入Stream
import 'dart:io';
void main() {
test();
}
test() async{
print("test start");
Future<String> fut1 = Future((){
// 模擬耗時5秒
sleep(Duration(seconds:5));
return "async task1";
});
Future<String> fut2 = Future((){
return "async task2";
});
// 將多個Future放入一個列表中,將該列表傳入
Stream<String> stream = Stream<String>.fromFutures([fut1,fut2]);
await for(var s in stream){
print(s);
}
print("test end");
}
複製代碼
該方法從一個集合建立Stream
,用法與上面例子大體相同
// 從一個列表建立`Stream`
Stream<int> stream = Stream<int>.fromIterable([1,2,3]);
複製代碼
這是Dart2.5 新增的方法,用於從單個值建立Stream
test() async{
Stream<bool> stream = Stream<bool>.value(false);
// await for循環從流中讀取
await for(var i in stream){
print(i);
}
}
複製代碼
監聽Stream
,並從中獲取數據也有三種方式,一種就是咱們上文中使用的await for
循環,這也是官方推薦的方式,看起來更簡潔友好,除此以外,另兩種方式分別是使用forEach
方法或listen
方法
Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), callback);
// 使用forEach,傳入一個函數進去獲取並處理數據
stream.forEach((int x){
print(x);
});
複製代碼
使用 listen
監聽 StreamSubscription<T> listen(void onData(T event), {Function onError, void onDone(), bool cancelOnError})
Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), callback);
stream.listen((x){
print(x);
});
複製代碼
還可使用幾個可選的參數
test() async{
Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), callback);
stream = stream.take(5);
stream.listen(
(x)=>print(x),
onError: (e)=>print(e),
onDone: ()=>print("onDone"));
}
複製代碼
onError
:發生Error時觸發onDone
:完成時觸發unsubscribeOnError
:遇到第一個Error時是否取消監聽,默認爲false
Stream<T> take(int count)
用於限制Stream
中的元素數量
test() async{
Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), callback);
// 當放入三個元素後,監聽會中止,Stream會關閉
stream = stream.take(3);
await for(var i in stream){
print(i);
}
}
複製代碼
打印結果:
0
1
2
複製代碼
Stream<T>.takeWhile(bool test(T element))
與 take
做用類似,只是它的參數是一個函數類型,且返回值必須是一個bool
值
stream = stream.takeWhile((x){
// 對當前元素進行判斷,不知足條件則取消監聽
return x <= 3;
});
複製代碼
test() async{
Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), callback);
stream = stream.take(5);
// 表示從Stream中跳過兩個元素
stream = stream.skip(2);
await for(var i in stream){
print(i);
}
}
複製代碼
打印結果:
2
3
4
複製代碼
請注意,該方法只是從Stream
中獲取元素時跳過,被跳過的元素依然是被執行了的,所耗費的時間依然存在,其實只是跳過了執行完的結果而已。
Stream<T> skipWhile(bool test(T element))
方法與takeWhile
用法是相同的,傳入一個函數對結果進行判斷,表示跳過知足條件的。
Future<List<T>> toList()
表示將Stream
中全部數據存儲在List中
test() async{
Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), callback);
stream = stream.take(5);
List <int> data = await stream.toList();
for(var i in data){
print(i);
}
}
複製代碼
等待並獲取流中全部數據的數量
test() async{
Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), callback);
stream = stream.take(5);
var len = await stream.length;
print(len);
}
複製代碼
它實際上就是Stream
的一個幫助類,可用於整個 Stream
過程的控制。
import 'dart:async';
void main() {
test();
}
test() async{
// 建立
StreamController streamController = StreamController();
// 放入事件
streamController.add('element_1');
streamController.addError("this is error");
streamController.sink.add('element_2');
streamController.stream.listen(
print,
onError: print,
onDone: ()=>print("onDone"));
}
複製代碼
使用該類時,須要導入'dart:async'
,其add
方法和sink.add
方法是相同的,都是用於放入一個元素,addError
方法用於產生一個錯誤,監聽方法中的onError
可獲取錯誤。
還能夠在StreamController
中傳入一個指定的stream
test() async{
Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), (e)=>e);
stream = stream.take(5);
StreamController sc = StreamController();
// 將 Stream 傳入
sc.addStream(stream);
// 監聽
sc.stream.listen(
print,
onDone: ()=>print("onDone"));
}
複製代碼
如今來看一下StreamController
的原型,它有5個可選參數
factory StreamController(
{void onListen(),
void onPause(),
void onResume(),
onCancel(),
bool sync: false})
複製代碼
onListen
註冊監聽時回調onPause
當流暫停時回調onResume
當流恢復時回調onCancel
當監聽器被取消時回調sync
當值爲true
時表示同步控制器SynchronousStreamController
,默認值爲false
,表示異步控制器test() async{
// 建立
StreamController sc = StreamController(
onListen: ()=>print("onListen"),
onPause: ()=>print("onPause"),
onResume: ()=>print("onResume"),
onCancel: ()=>print("onCancel"),
sync:false
);
StreamSubscription ss = sc.stream.listen(print);
sc.add('element_1');
// 暫停
ss.pause();
// 恢復
ss.resume();
// 取消
ss.cancel();
// 關閉流
sc.close();
}
複製代碼
打印結果:
onListen
onPause
onCancel
複製代碼
由於監聽器被取消了,且關閉了流,致使"element_1"
未被輸出,"onResume"
亦未輸出
以下,在普通的單訂閱流中調用兩次listen
會報錯
test() async{
Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), (e)=>e);
stream = stream.take(5);
stream.listen(print);
stream.listen(print);
}
複製代碼
Unhandled exception:
Bad state: Stream has already been listened to.
複製代碼
前面已經說了單訂閱流的特色,而廣播流則能夠容許多個監聽器存在,就如同廣播同樣,凡是監聽了廣播流,每一個監聽器都能獲取到數據。要注意,若是在觸發事件時將監聽者正添加到廣播流,則該監聽器將不會接收當前正在觸發的事件。若是取消監聽,監聽者會當即中止接收事件。
有兩種方式建立廣播流,一種直接從Stream
建立,另外一種使用StreamController
建立
test() async{
// 調用 Stream 的 asBroadcastStream 方法建立
Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), (e)=>e)
.asBroadcastStream();
stream = stream.take(5);
stream.listen(print);
stream.listen(print);
}
複製代碼
使用StreamController
test() async{
// 建立廣播流
StreamController sc = StreamController.broadcast();
sc.stream.listen(print);
sc.stream.listen(print);
sc.add("event1");
sc.add("event2");
}
複製代碼
該類可使咱們在Stream
上執行數據轉換。而後,這些轉換被推回到流中,以便該流注冊的全部監聽器能夠接收
構造方法原型
factory StreamTransformer.fromHandlers({
void handleData(S data, EventSink<T> sink),
void handleError(Object error, StackTrace stackTrace, EventSink<T> sink),
void handleDone(EventSink<T> sink)
})
複製代碼
handleData
:響應從流中發出的任何數據事件。提供的參數是來自發出事件的數據,以及EventSink<T>
,表示正在進行此轉換的當前流的實例handleError
:響應從流中發出的任何錯誤事件handleDone
:當流再也不有數據要處理時調用。一般在流的close()
方法被調用時回調void test() {
StreamController sc = StreamController<int>();
// 建立 StreamTransformer對象
StreamTransformer stf = StreamTransformer<int, double>.fromHandlers(
handleData: (int data, EventSink sink) {
// 操做數據後,轉換爲 double 類型
sink.add((data * 2).toDouble());
},
handleError: (error, stacktrace, sink) {
sink.addError('wrong: $error');
},
handleDone: (sink) {
sink.close();
},
);
// 調用流的transform方法,傳入轉換對象
Stream stream = sc.stream.transform(stf);
stream.listen(print);
// 添加數據,這裏的類型是int
sc.add(1);
sc.add(2);
sc.add(3);
// 調用後,觸發handleDone回調
// sc.close();
}
複製代碼
打印結果:
2.0
4.0
6.0
複製代碼
與流相關的操做,主要有四個類
Stream
StreamController
StreamSink
StreamSubscription
Stream
是基礎,爲了更方便控制和管理Stream
,出現了StreamController
類。在StreamController
類中, 提供了StreamSink
做爲事件輸入口,當咱們調用add
時,其實是調用的sink.add
,經過sink
屬性能夠獲取StreamController
類中的StreamSink
,而StreamSubscription
類則用於管理事件的註冊、暫停與取消等,經過調用stream.listen
方法返回一個StreamSubscription
對象。