Flutter Stream 簡介及部分操做符使用

前言

    對於剛接觸Flutter的同窗來講,Stream(流)是一個相對比較抽象,也相對比較難以理解的東西。準確的來講Stream並非Flutter的特性,而是Dart語言自身所帶庫。StreamFuture都位於dart:async核心庫,是Dart中異步操做的兩大高手。因此不只僅能夠用於Flutter,而是能夠用於任何Dart語言上的實現。bash

    在咱們剛開始學習Flutter的時候基本都是使用 StatefulWidgetsetState((){})來刷新界面的數據,當我熟練使用流以後就能夠基本徹底使用StatelessWidget告別 StatefulWidget一樣達到數據刷新效果。markdown

Stream分類

流能夠分爲兩類:app

  • 單訂閱流(Single Subscription),這種流最多隻能有一個監聽器(listener)less

  • 多訂閱流(Broadcast),這種流能夠有多個監聽器監聽(listener)異步

Stream建立方式

  • Stream.fromFuture 接收一個Future對象做爲參數async

    Stream<String>.fromFuture(getData());
    
    Future<String> getData() async{
        await Future.delayed(Duration(seconds: 5));
        return "返回一個Future對象";
      }
    複製代碼

  • Stream.fromIterable 接收一個集合對象做爲參數ide

    Stream<String>.fromIterable(['A','B','C']);
    複製代碼

  • Stream.fromFutures 接收一個Future集合對象做爲參數學習

    Stream<String>.fromFutures([getData()]);
    複製代碼

  • Stream.periodic 接收一個 Duration對象做爲參數ui

    Duration interval = Duration(seconds: 1);
    Stream<int> stream = Stream<int>.periodic(interval);
    複製代碼

操做方式及使用

使用Stream .periodic 方式建立一個Stream對象

用於建立一個週期發送事件的流,以下this

///用於建立一個每隔一秒發送一次無限事件的流,並打印出值
void _stream() async{
    Duration interval = Duration(seconds: 1);
    Stream<int> stream = Stream.periodic(interval, (data) => data);
    await for(int i in stream ){
      print(i); 
    }
}

//(data){return data;} 上面的這句是用來回來值,若是省略這句,打印的值都爲null
複製代碼

.

  • Stream.take(int count)

    上面建立了一個無限每隔一秒發送一次事件的流,若是咱們想指定只發送10個事件則,用take。下面就只會打印出0-9

    void _stream() async{
        Duration interval = Duration(seconds: 1);
        Stream<int> stream = Stream.periodic(interval, (data) => data);
        stream = stream.take(10); //指定發送事件個數
        await for(int i in stream ){
         print(i);
        }
    }
    複製代碼

  • Stream.takeWhile

    上面這種方式咱們是隻制定了發送事件的個數,若是咱們也不知道發送多少個事件,咱們能夠從返回的結果上作一個返回值的限制,上面結果也能夠用如下方式實現

    void _stream() async {
        Duration interval = Duration(seconds: 1);
        Stream<int> stream = Stream.periodic(interval, (data) => data);
    // stream = stream.take(10);
        stream = stream.takeWhile((data) {
          return data < 10;
        });
        await for (int i in stream) {
          print(i);
        }
      }
    複製代碼

  • Stream.skip(int count)

    skip能夠指定跳過前面的幾個事件,以下會跳過0和1,輸出 2-9;

    void _stream() async {
        Duration interval = Duration(seconds: 1);
        Stream<int> stream = Stream.periodic(interval, (data) => data);
        stream = stream.take(10);
        stream = stream.skip(2);
        await for (int i in stream) {
          print(i);
        }
      }
    複製代碼

  • Stream.skipWhile

    能夠指定跳過不發送事件的指定條件,以下跳過0-4的輸出,輸出5-9

    void _stream() async {
        Duration interval = Duration(seconds: 1);
        Stream<int> stream = Stream.periodic(interval, (data) => data);
        stream = stream.take(10);
        stream = stream.skipWhile((data) => data<5);
        await for (int i in stream) {
          print(i);
        }
      }
    複製代碼

  • Stream.toList()

    將流中全部的數據收集存放在List中,並返回 Future<List>對象,listData裏面 0-9

    1.這個是一個異步方法,要結果則須要使用await關鍵字

    2.這個是等待Stream當流結束時,一次返回結果

    void _stream() async {
        Duration interval = Duration(seconds: 1);
        Stream<int> stream = Stream.periodic(interval, (data) => data);
        stream = stream.take(10);
        List<int> listData = await stream.toList();
        for (int i in listData) {
          print(i);
        }
      }
    複製代碼

  • Stream. listen()

    這是一種特定的能夠用於監聽數據流的方式,和 forEach循環的效果一致,可是返回的是StreamSubscription<T>對象,以下也會輸出0-9,同時打印出 」流已完成「

    看一下源碼這種方式能夠接收

    StreamSubscription<T> listen(void onData(T event),
          {Function onError, void onDone(), bool cancelOnError});
    複製代碼

    1.onData是接收到數據的處理,必需要實現的方法

    2.onError流發生錯誤時候的處理

    3.onDone流完成時候調取

    4.cancelOnError發生錯誤的時候是否立馬終止

    void _stream() async {
        Duration interval = Duration(seconds: 1);
        Stream<int> stream = Stream.periodic(interval, (data) => data);
        stream = stream.take(10);
        stream.listen((data) {
          print(data);
        }, onError: (error) {
          print("流發生錯誤");
        }, onDone: () {
          print("流已完成");
        }, cancelOnError: false);
      }
    複製代碼

  • Stream. forEach()

    這中操做和listen()的方式基本差很少,也是一種監聽流的方式,這只是監聽了onData,下面代碼也會輸出0-9

    void _stream() async {
        Duration interval = Duration(seconds: 1);
        Stream<int> stream = Stream.periodic(interval, (data) => data);
        stream = stream.take(10);
        stream.forEach((data) {
          print(data);
        });
      }
    複製代碼

  • Stream .length

    用於獲取等待流中全部事件發射完成以後統計事件的總數量,下面代碼會輸出 10

    void _stream() async {
        Duration interval = Duration(seconds: 1);
        Stream<int> stream = Stream.periodic(interval, (data) => data);
        stream = stream.take(10);
        var allEvents = await stream.length;
        print(allEvents);
      }
    複製代碼

  • Stream.where

    在流中添加篩選條件,過濾掉一些不想要的數據,知足條件返回true,不知足條件返回false,以下咱們篩選出流中大於5小於10的數據

    void _stream() async {
        Duration interval = Duration(seconds: 1);
        Stream<int> stream = Stream<int>.periodic(interval, (data) => data);
        stream = stream.where((data)=>data>5);
        stream = stream.where((data)=> data<10);
        await for(int i in stream){
          print(i);
        }
      }
    複製代碼

  • stream.map

    對流中的數據進行一些變換,如下是我對Stream的每一個數據都加1

    void _stream() async {
        Duration interval = Duration(seconds: 1);
        Stream<int> stream = Stream<int>.periodic(interval, (data) => data);
        stream = stream.map((data) => data + 1);
        await for (int i in stream) {
          print(i);
        }
      }
    複製代碼

  • Stream.expand

    對流中的數據進行一個擴展,以下,會輸出1,1,2,2,3,3….

    void _stream() async {
        Duration interval = Duration(seconds: 1);
        Stream stream = Stream.periodic(interval, (data) => data);
        stream = stream.expand((data)=>[data,data]);
        stream.listen((data)=>print(data),onError:(error)=> print("發生錯誤") );
      }
    複製代碼
  • Stream.transform

    若是咱們在在流流轉的過程當中須要進行一些轉換和控制咱們則須要使用到transform,接收一個

    StreamTransformer<S,T>,S表示轉換以前的類型,T表示轉換後的輸入類型,以下代碼咱們會接收到三組數字模擬輸入了三次密碼,並判斷真確的密碼,同時輸出密碼正確和密碼錯誤:

    void _stream() async {
        var stream = Stream<int>.fromIterable([123456,234567,678901]);
        var st = StreamTransformer<int, String>.fromHandlers(
            handleData: (int data, sink) {
          if (data == 678901) {
            sink.add("密碼輸入正確,正在開鎖。。。");
          } else {
            sink.add("密碼輸入錯誤...");
          }
        });
        stream.transform(st).listen((String data) => print(data),
            onError: (error) => print("發生錯誤"));
      }
    複製代碼

    輸入以下結果

    I/flutter (18980): 密碼輸入錯誤...
    I/flutter (18980): 密碼輸入錯誤...
    I/flutter (18980): 密碼輸入正確,正在開鎖。。。
    複製代碼

StreamController使用

​ 介紹完了Stream的基本概念和基本用法,上面直接建立流的方式,對咱們自己開發來講,用途不是很大,咱們在實際的開發過程當中,基本都是使用的StreamContoller來建立流。經過源碼咱們能夠知道Stream的幾種構造方法,最終都是經過StreamController進行了包裝。

建立StreamController對象及使用

  • 構建單訂閱的Streamcontroller
//StreamController裏面會建立一個Stream,咱們實際操控的Stream
StreamController<String> streamController = StreamController();
streamController.stream.listen((data)=> print(data));
streamController.sink.add("aaa");
streamController.add("bbb");
streamController.add("ccc");
streamController.close();

//上面代碼咱們會輸出 aaa,bbb,ccc
複製代碼

注意:若是咱們給上面的代碼再加一個listen會報以下異常,因此單訂閱流,只能有一個listen。通常狀況下咱們多數都是使用的單訂閱流,咱們也能夠將單訂閱流轉成多訂閱流。

  • 構建多監聽器的StreamController有兩種方式

    1.直接建立多訂閱Stream

    StreamController<String> streamController = StreamController.broadcast();
        streamController.stream.listen((data){
          print(data);
        },onError: (error){
          print(error.toString());
        });
        streamController.stream.listen((data) => print(data));
        streamController.add("bbb");
    
    	//上面代碼回輸出 bbb,bbb
    複製代碼

    2.將單訂閱流轉成多訂閱流

    StreamController<String> streamController = StreamController();
      Stream stream =streamController.stream.asBroadcastStream();
      stream.listen((data) => print(data));
      stream.listen((data) => print(data));
      streamController.sink.add("aaa");
      streamController.close();
      
      //上面代碼會輸出 aaa,aaa
    複製代碼

注意:在流用完了以後記得關閉,調用streamController.close()

StreamBuilder使用

​    前面我把Stream的經常使用方式作了簡單的介紹和演示,咱們怎麼結合Flutter使用呢?在Flutter裏面提供了一個WidgetStreamBuilderStreamBuilder實際上是個StatefulWidget它一直記錄着流中最新的數據,當數據流發生變化時,會自動調用builder方法進行重建。

  • StreamBuilder的源碼以下,須要接受一個流,咱們能夠傳入一個StreamControllerStream

    const StreamBuilder({
        Key key,
        this.initialData,
        Stream<T> stream,
        @required this.builder,
      }) : assert(builder != null),
           super(key: key, stream: stream);
    複製代碼

  • 使用StreamController 結合 StreamBuider對官方的計數器進行改進,取代setState刷新頁面,代碼以下

    class MyHomePage extends StatefulWidget {
      @override
      _MyHomePageState createState() => _MyHomePageState();
    }
    
    class _MyHomePageState extends State<MyHomePage> {
      int _count = 0;
      final StreamController<int> _streamController = StreamController();
    
      @override
      Widget build(BuildContext context) {
        return Scaffold(
          body: Container(
            child: Center(
              child: StreamBuilder<int>(
                  stream: _streamController.stream,
                  builder: (BuildContext context, AsyncSnapshot snapshot) {
                    return snapshot.data == null
                        ? Text("0")
                        : Text("${snapshot.data}");
                  }),
            ),
          ),
          floatingActionButton: FloatingActionButton(
              child: const Icon(Icons.add),
              onPressed: () {
                _streamController.sink.add(++_count);
              }),
        );
      }
    
      @override
      void dispose() {
        _streamController.close();
        super.dispose();
      }
    }
    複製代碼

源碼相關

    從StreamController源碼咱們能夠看出來裏面建立的得過程 默認建立一個_SyncStreamController,具體能夠去讀取下源碼,看看StreamController是怎麼建立StreamStreamSink

factory StreamController(
      {void onListen(),
      void onPause(),
      void onResume(),
      onCancel(),
      bool sync: false}) {
    return sync
        ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
        : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
  }
複製代碼

    注意:上面我既使用了streamController.sink.add("aaa");添加數據,也使用了streamController.add("bbb");方式添加數據。實際效果是同樣的,查看源碼可知 sink以下,實際上sink是對StreamController的一種包裝,最終都是調取的StreamController.add方法;

class _StreamSinkWrapper<T> implements StreamSink<T> {
  final StreamController _target;
  _StreamSinkWrapper(this._target);
  void add(T data) {
    _target.add(data);
  }

  void addError(Object error, [StackTrace stackTrace]) {
    _target.addError(error, stackTrace);
  }

  Future close() => _target.close();

  Future addStream(Stream<T> source) => _target.addStream(source);

  Future get done => _target.done;
複製代碼

總結

    以上是我在開發過程當中,對FlutterStream使用的一些簡單的認識和經常使用方法的總結,StreamBuilderStreamController結合使用,實現局部刷新效果(比喻一個頁面有多個接口,各自應該刷新本身控制的UI的部分而不該該整個頁面刷新,時候咱們可使用StreamControllerStreamBuilder達到這種效果)若有不到之處或者有誤差的地方,望指正~

相關文章
相關標籤/搜索