Flutter 異步機制:Future(四)執行功能代碼

Flutter 基於 dart 語言,dart 自己是一個單線程模型,Future 也是基於單線程的異步機制,即基於事件循環實現的異步,與多線程實現的異步並不同,比較相似於 Android 中的 Handler 機制,而所謂的異步,就是向事件循環中心發送一條消息,等待調度,在以後的某個時刻執行代碼,可是這段代碼仍是在當前線程執行的,因此,若是使用 Future 執行耗時任務,它可能不會阻塞當前的 UI 流程,不事後續的一些 UI 操做仍是會受到影響。 使用 Future 異步執行代碼須要四個步驟:markdown

  1. 建立任務
  2. 發送任務
  3. 執行任務
  4. 執行功能代碼

接收到的最終的任務就是一個 callback,參見:多線程

factory Future(FutureOr<T> computation()) {
  _Future<T> result = new _Future<T>();
  Timer.run(() {
    try {
      result._complete(computation());
    } catch (e, s) {
      _completeWithErrorCallback(result, e, s);
    }
  });
  return result;
}
複製代碼

_complete 函數用於處理 computation 函數執行的結果,分爲三種類型判斷:app

void _complete(FutureOr<T> value) {
  assert(!_isComplete);
  if (value is Future<T>) {
    if (value is _Future<T>) {
      _chainCoreFuture(value, this);
    } else {
      _chainForeignFuture(value, this);
    }
  } else {
    _FutureListener listeners = _removeListeners();
    _setValue(value);
    _propagateToListeners(this, listeners);
  }
}
複製代碼

當返回結果仍是一個 Future 類型時,和當返回結果不是 Future 對象時有這兩種不一樣的處理邏輯,後者,也就是 else 語句中的邏輯,能夠看到就是將返回值傳遞給 listener ,而後調用 _propagateToListeners 將結果分發給 listener 的 listeners,但前者,if 語句中的邏輯,告訴咱們當上一個 Future 返回出一個 Future 時,不能直接將這個 Future 看成 listener 的 value 而後通知 listener ,這涉及到 Future 使用過程當中的一個規則:Listener 的監聽結果不能是一個 Future ,因此就有了以下這種代碼:異步

test4() {
  print("start");
  Future(() {
    print("return future");
    return Future(() {
      print("return test4");
      return "test4";
    });
  }).then((FutureOr<String> s) {
    if (s is Future<String>) {
      s.then((String ss) {
        print(ss + "_");
      });
    } else {
      print(s);
    }
  });
  print("end");
}
複製代碼

這段代碼最終打印的是 test4 而不是 test4_ ,而這種狀況出現的緣由,就在於 _complete 函數中對 Future 對象和非 Future 對象的兩種不一樣的處理。對於非 Future 對象,將返回值經過 _setValue 傳遞給 result ,再分發給其 listener ,這是一個比較常規的過程。而對於 Future 對象也有兩種不一樣的處理,當返回值是一個 _Future 對象時,調用 _chainCoreFuture ,不然調用 _chainForeignFuture ,這兩個函數功能上大同小異,只不過對於 Flutter 默認實現的 _Future,有一些更「本土化」的處理,而對於其餘 Future 的實現,則只能使用 Future 帶有的函數來實現一樣的功能,所謂的處理,就是 Future 不能做爲 listener 的返回值出現,具體看代碼:async

static void _chainForeignFuture(Future source, _Future target) {
  assert(!target._isComplete);
  assert(source is! _Future);

  // Mark the target as chained (and as such half-completed).
  target._setPendingComplete();
  try {
    source.then((value) {
      assert(target._isPendingComplete);
      // The "value" may be another future if the foreign future
      // implementation is mis-behaving,
      // so use _complete instead of _completeWithValue.
      target._clearPendingComplete(); // Clear this first, it's set again.
      target._complete(value);
    },
        // TODO(floitsch): eventually we would like to make this non-optional
        // and dependent on the listeners of the target future. If none of
        // the target future's listeners want to have the stack trace we don't
        // need a trace.
        onError: (error, [StackTrace stackTrace]) {
      assert(target._isPendingComplete);
      target._completeError(error, stackTrace);
    });
  } catch (e, s) {
    // This only happens if the `then` call threw synchronously when given
    // valid arguments.
    // That requires a non-conforming implementation of the Future interface,
    // which should, hopefully, never happen.
    scheduleMicrotask(() {
      target._completeError(e, s);
    });
  }
}
複製代碼

source 是上一個 Future 的返回值,target 是 listener 對應的 Future ,它沒有將 source 設置爲 target 的 value,而是繼續調用 source 的 then 函數,獲得 source 的返回值後再調用 target 的 _complete 函數處理返回值,這裏便會陷入循環,若是 source 嵌套返回了多少個 Future ,這裏就須要調用多少次,總之,直到 source 返回了一個常規的值,纔會結束這種循環,這就是 _chainForeignFuture 中所作的處理,只使用到了 Future 的 then 函數,看起來也比較容易理解,不過 _chainCoreFuture 看着就沒有這麼和善了。ide

static void _chainCoreFuture(_Future source, _Future target) {
  assert(target._mayAddListener); // Not completed, not already chained.
  while (source._isChained) {
    source = source._chainSource;
  }
  if (source._isComplete) {
    _FutureListener listeners = target._removeListeners();
    target._cloneResult(source);
    _propagateToListeners(target, listeners);
  } else {
    _FutureListener listeners = target._resultOrListeners;
    target._setChained(source);
    source._prependListeners(listeners);
  }
}
複製代碼

首先判斷 source 是否處於 Future 鏈中,若是是,則找到鏈尾的 Future ,而後將這個 Future 做爲 source ,若是 source 已經完成了,把 source 的返回值複製過來,而後分發給 listeners,若是沒有,則加入這個 Future 鏈,並將本身的 listeners 都移植到 source 身上。 那麼 Future 鏈是什麼?鏈尾的 source 又是什麼?爲何 target 的 listeners 可以直接交給 source ?歡迎來到。。。。 先看幾個示例代碼:函數

test5() {
  Future<String> f = Future.delayed(Duration(milliseconds: 100), () {
    print("return test5");
    return "test5";
  });
  Future(() {
    print("return f");
    return f;
  }).then((String s) => print(s));
}

test6() {
  Future<String> f = Future.delayed(Duration(milliseconds: 100), () {
    print("return test6");
    return "test6";
  });
  Future<String> ff = Future(() {
    print("return f");
    return f;
  });
  Future<String> fff = Future(() {
    print("return ff");
    return ff;
  });
  Future(() {
    print("return fff");
    return fff;
  }).then((String s) => print(s));
}
複製代碼

先看 test5 ,首先聲明瞭一個 Future f,延遲 100 ms,接着又定義了一個 Future 會返回 f,它有一個 listener 須要接受 String 參數。當第二個 Future 返回 f 時 f 還未執行完畢,因而此時執行的代碼段就是:oop

_FutureListener listeners = target._resultOrListeners;
target._setChained(source);
source._prependListeners(listeners);
複製代碼

即 Future 加入了 Future 鏈,並將 listner 移交給了 f,因此此時實際上的邏輯應該是這樣的:ui

test5() {
  Future<String> f = Future.delayed(Duration(milliseconds: 100), () {
    print("return test5");
    return "test5";
  });
  f.then((String s) => print(s));
}
複製代碼

因而一個雙層嵌套的 Future ,變成了一個沒有嵌套的 Future,當 f 執行完了以後,就會通知 listner 打印 s。 而對於 test6 ,這是一個多層嵌套,ff、fff 以及最後一個 Future ,返回的都是 Future ,那麼當 ff 執行完時,它返回了一個還沒有執行完的 f,因此它會加入到 Future 鏈,此時鏈爲 ff -> f,而後 fff 返回了 ff,而此時 ff 已經在 Future 鏈中,因此 fff 也會加入 Future 鏈,此時 fff -> ff -> f,而後就是 Future -> fff -> ff -> f,而且此時 Future 的 listener 掛在了 f 身上,因而也能夠化簡爲:this

test6() {
  Future<String> f = Future.delayed(Duration(milliseconds: 100), () {
    print("return test6");
    return "test6";
  });
  f.then((String s) => print(s));
}
複製代碼

最終,當 f 執行完了以後,會進入 _complete 函數執行以下代碼:

_FutureListener listeners = _removeListeners();
_setValue(value);
_propagateToListeners(this, listeners);
複製代碼

此時的 listeners ,即是包含了 ff,fff 和 Future 這三個 Future 的 listener 集合,雖然如此,但此刻它們都已經移植到了 f 身上,而且這個移植是無縫的,從根本上來講,他們的 listener 實際上監聽的也正是 f 的返回值。 看到這裏,上面幾個問題應該有了解答,Future 鏈是一串 Future ,可是它們中只有一個 Future 返回了真實的數據,也就是鏈尾的那一個,前面的 Future 的 listeners 則是直接或間接地依賴着鏈尾 Future 的返回結果,因此它們的 listeners 也都直接移植到了鏈尾 Future 上,這個鏈尾的 Future ,就是 _chainCoreFuture 中的 source,而對於其它那些沒有返回真正數據的 Future ,是不會調用它們的 _propagateToListeners 函數的。 _propagateToListeners 負責將返回值傳遞給 listeners :

static void _propagateToListeners(_Future source, _FutureListener listeners) {
  while (true) {
    assert(source._isComplete);
    bool hasError = source._hasError;
    if (listeners == null) {
      if (hasError) {
        AsyncError asyncError = source._error;
        source._zone
            .handleUncaughtError(asyncError.error, asyncError.stackTrace);
      }
      return;
    }
    // Usually futures only have one listener. If they have several, we
    // call handle them separately in recursive calls, continuing
    // here only when there is only one listener left.
    while (listeners._nextListener != null) {
      _FutureListener listener = listeners;
      listeners = listener._nextListener;
      listener._nextListener = null;
      _propagateToListeners(source, listener);
    }
    _FutureListener listener = listeners;
    final sourceResult = source._resultOrListeners;
    // Do the actual propagation.
    // Set initial state of listenerHasError and listenerValueOrError. These
    // variables are updated with the outcome of potential callbacks.
    // Non-error results, including futures, are stored in
    // listenerValueOrError and listenerHasError is set to false. Errors
    // are stored in listenerValueOrError as an [AsyncError] and
    // listenerHasError is set to true.
    bool listenerHasError = hasError;
    var listenerValueOrError = sourceResult;

    // Only if we either have an error or callbacks, go into this, somewhat
    // expensive, branch. Here we'll enter/leave the zone. Many futures
    // don't have callbacks, so this is a significant optimization.
    if (hasError || listener.handlesValue || listener.handlesComplete) {
      Zone zone = listener._zone;
      if (hasError && !source._zone.inSameErrorZone(zone)) {
        // Don’t cross zone boundaries with errors.
        AsyncError asyncError = source._error;
        source._zone
            .handleUncaughtError(asyncError.error, asyncError.stackTrace);
        return;
      }

      Zone oldZone;
      if (!identical(Zone.current, zone)) {
        // Change zone if it's not current.
        oldZone = Zone._enter(zone);
      }

      // These callbacks are abstracted to isolate the try/catch blocks
      // from the rest of the code to work around a V8 glass jaw.
      void handleWhenCompleteCallback() {
        // The whenComplete-handler is not combined with normal value/error
        // handling. This means at most one handleX method is called per
        // listener.
        assert(!listener.handlesValue);
        assert(!listener.handlesError);
        var completeResult;
        try {
          completeResult = listener.handleWhenComplete();
        } catch (e, s) {
          if (hasError && identical(source._error.error, e)) {
            listenerValueOrError = source._error;
          } else {
            listenerValueOrError = new AsyncError(e, s);
          }
          listenerHasError = true;
          return;
        }
        if (completeResult is Future) {
          if (completeResult is _Future && completeResult._isComplete) {
            if (completeResult._hasError) {
              listenerValueOrError = completeResult._error;
              listenerHasError = true;
            }
            // Otherwise use the existing result of source.
            return;
          }
          // We have to wait for the completeResult future to complete
          // before knowing if it’s an error or we should use the result
          // of source.
          var originalSource = source;
          listenerValueOrError = completeResult.then((_) => originalSource);
          listenerHasError = false;
        }
      }

      void handleValueCallback() {
        try {
          listenerValueOrError = listener.handleValue(sourceResult);
        } catch (e, s) {
          listenerValueOrError = new AsyncError(e, s);
          listenerHasError = true;
        }
      }

      void handleError() {
        try {
          AsyncError asyncError = source._error;
          if (listener.matchesErrorTest(asyncError) &&
              listener.hasErrorCallback) {
            listenerValueOrError = listener.handleError(asyncError);
            listenerHasError = false;
          }
        } catch (e, s) {
          if (identical(source._error.error, e)) {
            listenerValueOrError = source._error;
          } else {
            listenerValueOrError = new AsyncError(e, s);
          }
          listenerHasError = true;
        }
      }

      if (listener.handlesComplete) {
        handleWhenCompleteCallback();
      } else if (!hasError) {
        if (listener.handlesValue) {
          handleValueCallback();
        }
      } else {
        if (listener.handlesError) {
          handleError();
        }
      }

      // If we changed zone, oldZone will not be null.
      if (oldZone != null) Zone._leave(oldZone);

      // If the listener’s value is a future we need to chain it. Note that
      // this can only happen if there is a callback.
      if (listenerValueOrError is Future) {
        Future chainSource = listenerValueOrError;
        // Shortcut if the chain-source is already completed. Just continue
        // the loop.
        _Future result = listener.result;
        if (chainSource is _Future) {
          if (chainSource._isComplete) {
            listeners = result._removeListeners();
            result._cloneResult(chainSource);
            source = chainSource;
            continue;
          } else {
            _chainCoreFuture(chainSource, result);
          }
        } else {
          _chainForeignFuture(chainSource, result);
        }
        return;
      }
    }
    _Future result = listener.result;
    listeners = result._removeListeners();
    if (!listenerHasError) {
      result._setValue(listenerValueOrError);
    } else {
      AsyncError asyncError = listenerValueOrError;
      result._setErrorObject(asyncError);
    }
    // Prepare for next round.
    source = result;
  }
}
複製代碼

這個函數相對來講比較重量級,其完成的功能也是比較多的,不過結構仍是比較清晰的,換算下來就是:

static void _propagateToListeners(_Future source, _FutureListener listeners) {
  while (true) {
    // 利用遞歸,將 source 的結果分發給 listeners 中的每個 listener
    
    // 若是 listener 具備這三個之一的能力,就去根據狀況執行其中一個
    if (hasError || listener.handlesValue || listener.handlesComplete) {
      
      // 聲明 handleWhenCompleteCallback 函數

      // 聲明 handleValueCallback 函數

      // 聲明 handleError 函數

      // 根據 listener 的能力及 source 的執行結果,選擇上面的其中一個函數執行

      // 若是 listener 的返回結果仍是一個 Future ,那就調用 _chainCoreFuture 或 _chainForeignFuture 進行處理
    }

    // 若是返回的是一個常規值,則將 value 設置給 result ,接着將 result 的結果分發給 result 的 listeners,因此給 source 和 listeners 從新賦值,while 循環
  }
}
複製代碼

脈絡就是這樣。這個函數裏面又出現了兩種分支,即 listener 處理以後的結果是不是一個 Future ,能夠看以下兩個示例:

test7() {
  print("start");
  Future<String> f = Future.delayed(Duration(milliseconds: 100), () {
    print("return test7");
    return "test7";
  });
  Future<String> ff = Future(() {
    print("return f");
    return f;
  });
  Future(() {
    print("return ff");
    return ff;
  }).then((String s) {
    return Future.delayed(Duration(milliseconds: 100), () {
      print("return s");
      return s;
    });
  }).then((String s) => print(s));
  print("end");
}

test8() {
  print("start");
  Future.delayed(Duration(milliseconds: 100), () {
    print("return test8");
    return "test8";
  }).then((String s) {
    print("return s1");
    return s;
  }).then((String s) {
    print("return s2");
    return s;
  }).then((String s) {
    print("return s3");
    return s;
  }).then((String s) {
    print(s);
  });
  print("end");
}
複製代碼

在 test7 中,Future 的返回值是 ff ,從上面對 _complete 函數的分析得知,test7 能夠作以下轉換:

test7() {
  print("start");
  Future<String> f = Future.delayed(Duration(milliseconds: 100), () {
    print("return test7");
    return "test7";
  });
  f.then((String s) {
    return Future.delayed(Duration(milliseconds: 100), () {
      print("return s");
      return s;
    });
  }).then((String s) => print(s));
  print("end");
}
複製代碼

那麼當 f 執行完畢的時候,便會經過 _propagateToListeners 將返回結果傳遞給 listener ,接着執行 listener 的 callback,可是 listener 的返回結果仍是一個 Future ,則會執行這一段代碼:

if (listenerValueOrError is Future) {
  Future chainSource = listenerValueOrError;
  // Shortcut if the chain-source is already completed. Just continue
  // the loop.
  _Future result = listener.result;
  if (chainSource is _Future) {
    if (chainSource._isComplete) {
      listeners = result._removeListeners();
      result._cloneResult(chainSource);
      source = chainSource;
      continue;
    } else {
      _chainCoreFuture(chainSource, result);
    }
  } else {
    _chainForeignFuture(chainSource, result);
  }
  return;
}
複製代碼

因此,test7 又會變成以下這樣:

test7() {
  print("start");
  Future<String> f = Future.delayed(Duration(milliseconds: 100), () {
    print("return test7");
    return "test7";
  });
  f.then((String s) {
    Future f = Future.delayed(Duration(milliseconds: 100), () {
      print("return s");
      return s;
    });
	  f.then((String s) => print(s));
	  return f;
  });
  print("end");
}
複製代碼

至此,當 f 中的 Future 也執行完畢的時候,就會出發最後的 listener ,打印 s。 而對於 test8 ,多個 then 鏈接在一塊兒,且返回的結果都是常規值,則對應着這段代碼:

_Future result = listener.result;
listeners = result._removeListeners();
if (!listenerHasError) {
  result._setValue(listenerValueOrError);
} else {
  AsyncError asyncError = listenerValueOrError;
  result._setErrorObject(asyncError);
}
// Prepare for next round.
source = result;
複製代碼

result 便是 listener 對應的 Future,listeners 是 result 的 listeners,因此,每當一個 while 循環執行以後,就意味着一個 then 函數的結束,直到全部的 listener 都獲得處理。 再來看下 then 函數:

Future<R> then<R>(FutureOr<R> f(T value), {Function onError}) {
  Zone currentZone = Zone.current;
  if (!identical(currentZone, _rootZone)) {
    f = currentZone.registerUnaryCallback<FutureOr<R>, T>(f);
    if (onError != null) {
      // In checked mode, this checks that onError is assignable to one of:
      // dynamic Function(Object)
      // dynamic Function(Object, StackTrace)
      onError = _registerErrorHandler(onError, currentZone);
    }
  }
  _Future<R> result = new _Future<R>();
  _addListener(new _FutureListener<T, R>.then(result, f, onError));
  return result;
}
複製代碼

_addListener 函數能夠看出,Future 的 listener 就是在這個函數中添加的,listener 對應的 Future ,也是在這裏建立的,result、callback、onError 將會被構形成 _FutureListener 實例,_addListener 負責將 listener 添加到 Future 上。

void _addListener(_FutureListener listener) {
  assert(listener._nextListener == null);
  if (_mayAddListener) {
    listener._nextListener = _resultOrListeners;
    _resultOrListeners = listener;
  } else {
    if (_isChained) {
      // Delegate listeners to chained source future.
      // If the source is complete, instead copy its values and
      // drop the chaining.
      _Future source = _chainSource;
      if (!source._isComplete) {
        source._addListener(listener);
        return;
      }
      _cloneResult(source);
    }
    assert(_isComplete);
    // Handle late listeners asynchronously.
    _zone.scheduleMicrotask(() {
      _propagateToListeners(this, listener);
    });
  }
}
複製代碼

通常狀況下,listener 會被添加到 Future 的 listeners 中,當 Future 已是 Future 鏈中的一員時,listener 則會直接被添加到 source 中去,或者當 Future 已經執行完成時,則直接調用 _propagateToListeners 處理。 再來講說 Future 中經常使用的一些參數,好比 _isChained 、_isComplete 以及 _mayAddListener 這些究竟是怎麼肯定的?_chainSource 、_resultOrListeners 這些變量都是些什麼? 總的來講,這些都指向兩個變量,_state 和 _resultOrListeners,好比:

bool get _mayComplete => _state == _stateIncomplete;
bool get _isPendingComplete => _state == _statePendingComplete;
bool get _mayAddListener => _state <= _statePendingComplete;
bool get _isChained => _state == _stateChained;
bool get _isComplete => _state >= _stateValue;
bool get _hasError => _state == _stateError;
複製代碼

又好比:

AsyncError get _error {
  assert(_hasError);
  return _resultOrListeners;
}
_Future get _chainSource {
  assert(_isChained);
  return _resultOrListeners;
}
_FutureListener _removeListeners() {
  // Reverse listeners before returning them, so the resulting list is in
  // subscription order.
  assert(!_isComplete);
  _FutureListener current = _resultOrListeners;
  _resultOrListeners = null;
  return _reverseListeners(current);
}

void _setChained(_Future source) {
  assert(_mayAddListener);
  _state = _stateChained;
  _resultOrListeners = source;
}
void _setValue(T value) {
  assert(!_isComplete); // But may have a completion pending.
  _state = _stateValue;
  _resultOrListeners = value;
}
void _setErrorObject(AsyncError error) {
  assert(!_isComplete); // But may have a completion pending.
  _state = _stateError;
  _resultOrListeners = error;
}
void _cloneResult(_Future source) {
  assert(!_isComplete);
  assert(source._isComplete);
  _state = source._state;
  _resultOrListeners = source._resultOrListeners;
}
複製代碼

因此,error 、result、chainSource、listeners 實際上都是同一個變量表示的,只不過在不一樣的狀態下,_resultOrListeners 有着不一樣的含義,在 _hasError 狀態下,_resultOrListeners 表示 error,在 !_isComplete 狀態下,_resultOrListeners 表示 listeners,而在給 _resultOrListeners 設置值的時候,通常也會一併給 _state 賦值。

相關文章
相關標籤/搜索