在 Dart
中的異步函數返回 Future
或 Stream
對象, await
和 async
關鍵字用於異步編程, 使得編寫異步代碼就像同步代碼同樣html
使用 async
關鍵字標記一個函數爲異步函數, 如:編程
Future<String> fetchVersion() async {
return "1.0.0";
}
複製代碼
經過 await
關鍵獲取異步函數 Future
的結果:bash
main() async {
var v = await fetchVersion();
print(v);
}
複製代碼
除了經過 await
關鍵字獲取 Future
的值, 還能夠經過 then
函數來獲取:異步
fetchVersion().then((version){
print(version);
});
複製代碼
使用 try/catch/finally
來捕獲異步函數 Future
的異常信息:async
try {
var v = await fetchVersion();
print(v);
} catch(e) {
print(e);
}
複製代碼
若是是經過 then
函數來獲取的 Future
的結果, 能夠經過 catchError
函數來捕獲異常:ide
fetchVersion().then((version) {
print(version);
}).catchError((e) {
print(e);
});
複製代碼
經過 then
函數來實現異步函數的調用鏈:異步編程
Future result = costlyQuery(url);
result
.then((value) => expensiveWork(value))
.then((_) => lengthyComputation())
.then((_) => print('Done!'))
.catchError((exception) {
//Handle exception...
});
複製代碼
也能夠經過 await
關鍵字來實現上面的功能:函數
try {
final value = await costlyQuery(url);
await expensiveWork(value);
await lengthyComputation();
print('Done!');
} catch (e) {
//Handle exception...
}
複製代碼
可使用 Future.wait
函數來等待多個異步函數返回, 該函數返回一個集合, 集合元素就是異步函數的返回結果, 集合元素的順序就是異步函數的順序:fetch
main() async {
Future deleteLotsOfFiles() async {
return Future.delayed(Duration(seconds: 5), () {
return "deleteLotsOfFiles";
});
}
Future copyLotsOfFiles() async {
return ("copyLotsOfFiles");
};
Future checksumLotsOfOtherFiles() async {
return ("checksumLotsOfOtherFiles");
};
var result = await Future.wait([
deleteLotsOfFiles(),
copyLotsOfFiles(),
checksumLotsOfOtherFiles(),
]);
print(result);
print('Done with all the long steps!');
}
// 輸出結果
[deleteLotsOfFiles, copyLotsOfFiles, checksumLotsOfOtherFiles]
Done with all the long steps!
複製代碼
異步函數返回 Future
, 也能夠返回 Stream
, Stream
表明的是數據序列ui
能夠經過 await for
來獲取 stream
裏的值, 如:
await for (varOrType identifier in stream) {
// Executes each time the stream emits a value.
}
複製代碼
同理, 使用了 await for
的函數也必須是 async
:
Future main() async {
await for (var request in requestServer) {
handleRequest(request);
}
}
複製代碼
在 await for
中可使用 return
或 break
來中斷流
除了使用 await for
還可使用 listen
函數來監聽 stream
裏的值:
main() {
new HttpClient().getUrl(Uri.parse('http://www.baidu.com'))
.then((HttpClientRequest request) => request.close())
.then((HttpClientResponse response) => response.transform(new Utf8Decoder()).listen(print));
}
複製代碼
有的時候咱們還須要對流數據進行轉換, 例以下面的代碼讀取文件的內容案例, 對內容進行 utf8 的反編碼, 加上行分割, 這樣就能原樣輸出內容:
main() async {
var config = File('hello.dart');
var inputStream = config.openRead();
var lines = inputStream.transform(utf8.decoder).transform(LineSplitter());
await for (var line in lines) {
print(line);
}
}
複製代碼
如何要捕獲上面讀取文件的例子的異常, 若是是使用 await for
, 可使用 try catch
main() async {
var config = File('config.txt');
Stream<List<int>> inputStream = config.openRead();
var lines = inputStream.transform(utf8.decoder).transform(LineSplitter());
try {
await for (var line in lines) {
print(line);
}
print('file is now closed');
} catch (e) {
print(e);
}
}
複製代碼
若是使用的是 then
函數來實現的, 可使用 onDone
和 onError
來監聽:
main() async {
var config = File('config.txt');
Stream<List<int>> inputStream = config.openRead();
inputStream.transform(utf8.decoder).transform(LineSplitter()).listen(
(String line) {
print(line);
}, onDone: () {
print('file is now closed');
}, onError: (e) {
print(e);
});
}
複製代碼
上面的案例都是使用 SDK
爲咱們提供好的 Stream
, 那麼咱們如何本身建立 Stream
呢?
新建一個返回 Stream
的異步函數須要使用 async*
來標記, 使用 yield
或 yield*
來發射數據:
Stream<int> timedCounter(Duration interval, [int maxCount]) async* {
int i = 0;
while (true) {
await Future.delayed(interval);
yield i++;
if (i == maxCount) break;
}
}
main() async {
timedCounter(Duration(seconds: 2), 5).listen(print);
}
複製代碼
上面的代碼大概的意思就是每隔 interval = 2
秒發射一次數據, 數據從 0
開始累加, 直到數據等於 maxCount=5
時中止發射. 須要注意的是隻有調用了 listen
異步函數體纔會被執行, 該函數返回的是一個 Subscription
能夠經過 pause
函數來暫停一個 Subscription
發射數據:
main() async {
var r = timedCounter(Duration(seconds: 2), 5).listen(print);
var resumeSignal = Future.delayed(Duration(seconds: 2));
r.pause(resumeSignal);
}
複製代碼
執行了 pause
函數會致使 Subscription
暫停發射數據, resumeSignal
執行完畢後 Subscription
將繼續發射數據
pause
函數參數是能夠選的, resumeSignal
參數能夠不傳遞, 如:
main() async {
var r = timedCounter(Duration(seconds: 2), 5).listen(print);
r.pause();
}
複製代碼
這個時候程序 2
秒後就會退出, 由於 執行完 listen
函數後,異步函數體就會被執行, 當執行完
await Future.delayed(interval);
複製代碼
程序便退·出了, 由於 main 函數已經執行完畢. 咱們能夠經過 resume
函數來恢復 Subscription
數據的發射:
main() async {
var r = timedCounter(Duration(seconds: 2), 5).listen(print);
r.pause();
await Future.delayed(Duration(seconds: 3));
r.resume();
}
複製代碼
還能夠經過 cancel
函數來取消一個 Subscription
:
main() async {
var r = timedCounter(Duration(seconds: 2), 5).listen(print);
//上面的代碼意思4秒以後取消stream
Future.delayed(Duration(seconds: 4),(){
r.cancel();
});
}
複製代碼
有的時候 Stream
來程序的不一樣地方, 不單單是來自已異步函數
咱們能夠經過 StreamController
來建立流, 而且往這個流裏發射數據. 也就是說 StreamController
會爲咱們建立一個新的 Stream
, 能夠在任什麼時候候任何地方爲 Stream
添加 Event
例如咱們使用 StreamController
來改造下 timedCounter
函數 :
Stream<int> timedCounter(Duration interval, [int maxCount]) {
var controller = StreamController<int>();
int counter = 0;
void tick(Timer timer) {
counter++;
// Ask stream to send counter values as event.
controller.add(counter);
if (maxCount != null && counter >= maxCount) {
timer.cancel();
// Ask stream to shut down and tell listeners.
controller.close();
}
}
// BAD: Starts before it has subscribers.
Timer.periodic(interval, tick);
return controller.stream;
}
複製代碼
可是這個改造程序有兩個問題:
listen
函數, timedCounter
函數體會被執行, 也就是定時器 Timer.periodic()
會被執行, 由於 timedCounter
並無被 async*
標記, 若是被標記程序會自動幫咱們處理.pause
狀況, 哪怕執行了 pause
函數, 程序也會不斷的產生事件因此當咱們使用 StreamController
的時候必定要避免出現上面的兩個問題, 不然會致使內存泄漏問題
對於第一個問題, 讀者可能會問:咱們沒有執行 listen
函數, 程序也沒有輸出數據啊. 這是由於這些事件被 StreamController
緩衝起來了:
void listenAfterDelay() async {
var counterStream = timedCounter(const Duration(seconds: 1), 15);
await Future.delayed(const Duration(seconds: 5));
// After 5 seconds, add a listener.
await for (int n in counterStream) {
print(n); // Print an integer every second, 15 times.
}
}
複製代碼
調用完 timedCounter
函數後, 咱們沒有調用 listen
函數, 而是 delay
了 5
秒 也就是說 StreamController
在這 5
秒裏緩衝了 5
個 Event
而後經過 await for
來獲取值, 會馬上輸出 1 ~ 5
的數字, 由於這些數據已經緩衝好了, 等待被獲取. 最後每隔一秒輸出一次. 因此這個程序證實了哪怕沒有調用 listen
函數 發送的 Event
會被 StreamController
緩衝起來
咱們再來看下第二個問題: 沒有處理 pause
事件, 哪怕調用管理 pause
函數, 程序也會發送 Event , 這些 Event
會被 StreamController
buffer 起來. 咱們經過一個程序來驗證下:
void listenWithPause() {
var counterStream = timedCounter(const Duration(seconds: 1), 15);
StreamSubscription<int> subscription;
subscription = counterStream.listen((int counter) {
print(counter); // Print an integer every second.
if (counter == 5) {
// After 5 ticks, pause for five seconds, then resume.
subscription.pause(Future.delayed(const Duration(seconds: 5)));
}
});
}
複製代碼
這個程序會輸出 1 ~ 5
的數, 而後會觸發 pause
函數, 暫停 5
秒. 其實在暫停的期間 Event
依然會發送, 並無真正的暫停. 5
秒後 Subscription
恢復, 會馬上輸出 6 ~ 10
, 說明在這 5
秒產生了 5
個 Event
, 這些事件被 StreamController
buffer 起來了.
那麼在使用 StreamController
的時候如何避免上述問題呢?
Stream<int> timedCounter(Duration interval, [int maxCount]) {
StreamController<int> controller;
Timer timer;
int counter = 0;
void tick(_) {
counter++;
controller.add(counter); // Ask stream to send counter values as event.
if (counter == maxCount) {
timer.cancel();
controller.close(); // Ask stream to shut down and tell listeners.
}
}
void startTimer() {
timer = Timer.periodic(interval, tick);
}
void stopTimer() {
if (timer != null) {
timer.cancel();
timer = null;
}
}
controller = StreamController<int>(
onListen: startTimer,
onPause: stopTimer,
onResume: startTimer,
onCancel: stopTimer);
return controller.stream;
}
複製代碼
也就是說須要把 pause/resume
等回調交給 StreamController
, 它會自動幫咱們處理好生命週期
關於 Dart
的異步操做就介紹到這裏了.
下面是個人公衆號,乾貨文章不錯過,有須要的能夠關注下,有任何問題能夠聯繫我: