Flutter 異步機制:Future(三),接收任務

Flutter 基於 dart 語言,dart 自己是一個單線程模型,Future 也是基於單線程的異步機制,即基於事件循環實現的異步,與多線程實現的異步並不同,比較相似於 Android 中的 Handler 機制,而所謂的異步,就是向事件循環中心發送一條消息,等待調度,在以後的某個時刻執行代碼,可是這段代碼仍是在當前線程執行的,因此,若是使用 Future 執行耗時任務,它可能不會阻塞當前的 UI 流程,不事後續的一些 UI 操做仍是會受到影響。 使用 Future 異步執行代碼須要四個步驟:c++

  1. 建立任務
  2. 發送任務
  3. 執行任務
  4. 執行功能代碼
void DartMessageHandler::OnHandleMessage(DartState* dart_state) {
  if (isolate_had_fatal_error_) {
    // Don't handle any more messages.
    return;
  }

  DartIsolateScope scope(dart_state->isolate());
  DartApiScope dart_api_scope;
  Dart_Handle result = Dart_Null();
  bool error = false;

  // On the first message, check if we should pause on isolate start.
  if (!handled_first_message()) {
    set_handled_first_message(true);
    if (Dart_ShouldPauseOnStart()) {
      // Mark that we are paused on isolate start.
      Dart_SetPausedOnStart(true);
    }
  }

  if (Dart_IsPausedOnStart()) {
      
  } else if (Dart_IsPausedOnExit()) {
      
  } else {
    // We are processing messages normally.
    result = Dart_HandleMessage();
    // If the Dart program has set a return code, then it is intending to shut
    // down by way of a fatal error, and so there is no need to emit a log
    // message.
    if (dart_state->has_set_return_code() && Dart_IsError(result) &&
        Dart_IsFatalError(result)) {
      error = true;
    } else {
      error = LogIfError(result);
    }
    dart_state->MessageEpilogue(result);
    if (!Dart_CurrentIsolate()) {
      isolate_exited_ = true;
      return;
    }
  }

}
複製代碼

正常狀況下會調用 Dart_HandleMessage:api

// third_party/sdk/runtime/vm/dart_api_impl.cc
DART_EXPORT Dart_Handle Dart_HandleMessage() {
  Thread* T = Thread::Current();
  Isolate* I = T->isolate();
  CHECK_API_SCOPE(T);
  CHECK_CALLBACK_STATE(T);
  API_TIMELINE_BEGIN_END_BASIC(T);
  TransitionNativeToVM transition(T);
  if (I->message_handler()->HandleNextMessage() != MessageHandler::kOK) {
    return Api::NewHandle(T, T->StealStickyError());
  }
  return Api::Success();
}

// third_party/sdk/runtime/vm/message_handler.cc
MessageHandler::MessageStatus MessageHandler::HandleNextMessage() {
  // We can only call HandleNextMessage when this handler is not
  // assigned to a thread pool.
  MonitorLocker ml(&monitor_);
  ASSERT(pool_ == NULL);
  ASSERT(!delete_me_);
#if defined(DEBUG)
  CheckAccess();
#endif
  return HandleMessages(&ml, true, false);
}

MessageHandler::MessageStatus MessageHandler::HandleMessages( MonitorLocker* ml, bool allow_normal_messages, bool allow_multiple_normal_messages) {
  ASSERT(monitor_.IsOwnedByCurrentThread());

  // Scheduling of the mutator thread during the isolate start can cause this
  // thread to safepoint.
  // We want to avoid holding the message handler monitor during the safepoint
  // operation to avoid possible deadlocks, which can occur if other threads are
  // sending messages to this message handler.
  //
  // If isolate() returns nullptr [StartIsolateScope] does nothing.
  ml->Exit();
  StartIsolateScope start_isolate(isolate());
  ml->Enter();

  MessageStatus max_status = kOK;
  Message::Priority min_priority =
      ((allow_normal_messages && !paused()) ? Message::kNormalPriority
                                            : Message::kOOBPriority);
  std::unique_ptr<Message> message = DequeueMessage(min_priority);
  while (message != nullptr) {
    intptr_t message_len = message->Size();
    if (FLAG_trace_isolates) {
      OS::PrintErr(
          "[<] Handling message:\n"
          "\tlen: %" Pd
          "\n"
          "\thandler: %s\n"
          "\tport: %" Pd64 "\n",
          message_len, name(), message->dest_port());
    }

    // Release the monitor_ temporarily while we handle the message.
    // The monitor was acquired in MessageHandler::TaskCallback().
    ml->Exit();
    Message::Priority saved_priority = message->priority();
    Dart_Port saved_dest_port = message->dest_port();
    MessageStatus status = HandleMessage(std::move(message));
    if (status > max_status) {
      max_status = status;
    }
    ml->Enter();
    if (FLAG_trace_isolates) {
      OS::PrintErr(
          "[.] Message handled (%s):\n"
          "\tlen: %" Pd
          "\n"
          "\thandler: %s\n"
          "\tport: %" Pd64 "\n",
          MessageStatusString(status), message_len, name(), saved_dest_port);
    }
    // If we are shutting down, do not process any more messages.
    if (status == kShutdown) {
      ClearOOBQueue();
      break;
    }

    // Remember time since the last message. Don't consider OOB messages so
    // using Observatory doesn't trigger additional idle tasks.
    if ((FLAG_idle_timeout_micros != 0) &&
        (saved_priority == Message::kNormalPriority)) {
      idle_start_time_ = OS::GetCurrentMonotonicMicros();
    }

    // Some callers want to process only one normal message and then quit. At
    // the same time it is OK to process multiple OOB messages.
    if ((saved_priority == Message::kNormalPriority) &&
        !allow_multiple_normal_messages) {
      // We processed one normal message. Allow no more.
      allow_normal_messages = false;
    }

    // Reevaluate the minimum allowable priority. The paused state
    // may have changed as part of handling the message. We may also
    // have encountered an error during message processing.
    //
    // Even if we encounter an error, we still process pending OOB
    // messages so that we don't lose the message notification.
    min_priority = (((max_status == kOK) && allow_normal_messages && !paused())
                        ? Message::kNormalPriority
                        : Message::kOOBPriority);
    message = DequeueMessage(min_priority);
  }
  return max_status;
}
複製代碼

直到 MessageHandler::HandleMessages 爲止,這裏又是一個 while 循環,不斷調用 DequeueMessage 取出 message ,直到全部的 message 執行完畢,單個 message 的處理,則是調用 HandleMessage ,markdown

在 HandleMessage 中首先作的是獲取 msg_handler ,調用的是 DartLibraryCalls::LookupHandler ,多線程

RawObject* DartLibraryCalls::LookupHandler(Dart_Port port_id) {
  Thread* thread = Thread::Current();
  Zone* zone = thread->zone();
  Function& function = Function::Handle(
      zone, thread->isolate()->object_store()->lookup_port_handler());
  const int kTypeArgsLen = 0;
  const int kNumArguments = 1;
  if (function.IsNull()) {
    Library& isolate_lib = Library::Handle(zone, Library::IsolateLibrary());
    ASSERT(!isolate_lib.IsNull());
    const String& class_name = String::Handle(
        zone, isolate_lib.PrivateName(Symbols::_RawReceivePortImpl()));
    const String& function_name = String::Handle(
        zone, isolate_lib.PrivateName(Symbols::_lookupHandler()));
    function = Resolver::ResolveStatic(isolate_lib, class_name, function_name,
                                       kTypeArgsLen, kNumArguments,
                                       Object::empty_array());
    ASSERT(!function.IsNull());
    thread->isolate()->object_store()->set_lookup_port_handler(function);
  }
  const Array& args = Array::Handle(zone, Array::New(kNumArguments));
  args.SetAt(0, Integer::Handle(zone, Integer::New(port_id)));
  const Object& result =
      Object::Handle(zone, DartEntry::InvokeFunction(function, args));
  return result.raw();
}
複製代碼

能夠看出,這就是一個典型的 c++ 調用 dart 的流程,先找到 function ,而後構建參數,最後 DartEntry::InvokeFunction 調用這個函數。從 8~20 行得知,這是 _RawReceivePortImpl 的 _lookupHandler 函數:less

@pragma("vm:entry-point", "call")
static _lookupHandler(int id) {
  var result = _handlerMap[id];
  return result;
}
複製代碼

根據 id 從 _handlerMap 中找到一個值返回,這個值,其實就是會調函數,設置時機以下:異步

void set handler(Function value) {
  _handlerMap[this._get_id()] = value;
}
複製代碼

這就是在初始化 _RawReceivePortImpl 以後調用的,_get_id 返回的也正是 dest_id 。而後,HandleMessage 中對 message 分爲三種狀況進行處理:ide

  1. message 優先級爲 oob
  2. message dest_port 爲 kIllegalPort
  3. 正常狀況

正常的處理以下:函數

const Object& result =
    Object::Handle(zone, DartLibraryCalls::HandleMessage(msg_handler, msg));
if (result.IsError()) {
  status = ProcessUnhandledException(Error::Cast(result));
} else {
  ASSERT(result.IsNull());
}
複製代碼

DartLibraryCalls::HandleMessage:ui

// third_party/sdk/runtime/vm/dart_entry.cc
RawObject* DartLibraryCalls::HandleMessage(const Object& handler, const Instance& message) {
  Thread* thread = Thread::Current();
  Zone* zone = thread->zone();
  Isolate* isolate = thread->isolate();
  Function& function = Function::Handle(
      zone, isolate->object_store()->handle_message_function());
  const int kTypeArgsLen = 0;
  const int kNumArguments = 2;
  if (function.IsNull()) {
    Library& isolate_lib = Library::Handle(zone, Library::IsolateLibrary());
    ASSERT(!isolate_lib.IsNull());
    const String& class_name = String::Handle(
        zone, isolate_lib.PrivateName(Symbols::_RawReceivePortImpl()));
    const String& function_name = String::Handle(
        zone, isolate_lib.PrivateName(Symbols::_handleMessage()));
    function = Resolver::ResolveStatic(isolate_lib, class_name, function_name,
                                       kTypeArgsLen, kNumArguments,
                                       Object::empty_array());
    ASSERT(!function.IsNull());
    isolate->object_store()->set_handle_message_function(function);
  }
  const Array& args = Array::Handle(zone, Array::New(kNumArguments));
  args.SetAt(0, handler);
  args.SetAt(1, message);
#if !defined(PRODUCT)
  if (isolate->debugger()->IsStepping()) {
    // If the isolate is being debugged and the debugger was stepping
    // through code, enable single stepping so debugger will stop
    // at the first location the user is interested in.
    isolate->debugger()->SetResumeAction(Debugger::kStepInto);
  }
#endif
  const Object& result =
      Object::Handle(zone, DartEntry::InvokeFunction(function, args));
  ASSERT(result.IsNull() || result.IsError());
  return result.raw();
}
複製代碼

這又是從 c++ 中調用 dart 函數,從代碼可知,調用的是 _RawReceivePortImpl 的 _handleMessage 函數:this

@pragma("vm:entry-point", "call")
static void _handleMessage(Function handler, var message) {
  // TODO(floitsch): this relies on the fact that any exception aborts the
  // VM. Once we have non-fatal global exceptions we need to catch errors
  // so that we can run the immediate callbacks.
  handler(message);
  _runPendingImmediateCallback();
}
複製代碼

以 message 爲參數,調用 handler 函數,即最初傳進來的 _Timer 的 _handleMessage 函數:

// third_party/sdk/runtime/lib/timer_impl.dart
static void _handleMessage(msg) {
  var pendingTimers;
  if (msg == _ZERO_EVENT) {
    pendingTimers = _queueFromZeroEvent();
    assert(pendingTimers.length > 0);
  } else {
    assert(msg == _TIMEOUT_EVENT);
    _scheduledWakeupTime = null; // Consumed the last scheduled wakeup now.
    pendingTimers = _queueFromTimeoutEvent();
  }
  _runTimers(pendingTimers);
  // Notify the event handler or shutdown the port if no more pending
  // timers are present.
  _notifyEventHandler();
}
複製代碼

這裏根據 msg 的不一樣從不一樣的隊列中取 pendingTimers ,兩者分別實現以下:

static List _queueFromZeroEvent() {
  var pendingTimers = new List();
  assert(_firstZeroTimer != null);
  // Collect pending timers from the timer heap that have an expiration prior
  // to the currently notified zero timer.
  var timer;
  while (!_heap.isEmpty && (_heap.first._compareTo(_firstZeroTimer) < 0)) {
    timer = _heap.removeFirst();
    pendingTimers.add(timer);
  }
  // Append the first zero timer to the pending timers.
  timer = _firstZeroTimer;
  _firstZeroTimer = timer._indexOrNext;
  timer._indexOrNext = null;
  pendingTimers.add(timer);
  return pendingTimers;
}

static List _queueFromTimeoutEvent() {
  var pendingTimers = new List();
  if (_firstZeroTimer != null) {
    // Collect pending timers from the timer heap that have an expiration
    // prior to the next zero timer.
    // By definition the first zero timer has been scheduled before the
    // current time, meaning all timers which are "less than" the first zero
    // timer are expired. The first zero timer will be dispatched when its
    // corresponding message is delivered.
    var timer;
    while (!_heap.isEmpty && (_heap.first._compareTo(_firstZeroTimer) < 0)) {
      timer = _heap.removeFirst();
      pendingTimers.add(timer);
    }
  } else {
    // Collect pending timers from the timer heap which have expired at this
    // time.
    var currentTime = VMLibraryHooks.timerMillisecondClock();
    var timer;
    while (!_heap.isEmpty && (_heap.first._wakeupTime <= currentTime)) {
      timer = _heap.removeFirst();
      pendingTimers.add(timer);
    }
  }
  return pendingTimers;
}
複製代碼

當 msg 爲 _ZERO_EVENT 時,會取出一個 _firstZeroTimer 隊列中的任務和 n 個 _heap 隊列中達到執行時間的任務,而當 msg 不爲 _ZERO_EVENT 時,則會取出 n 個執行時間先於 _firstZeroTimer 第一個任務的的任務,或者是執行時間先於當前時間的任務。取完以後則是調用 _runTimers 執行任務。

static void _runTimers(List pendingTimers) {
  // If there are no pending timers currently reset the id space before we
  // have a chance to enqueue new timers.
  if (_heap.isEmpty && (_firstZeroTimer == null)) {
    _idCount = 0;
  }

  // Fast exit if no pending timers.
  if (pendingTimers.length == 0) {
    return;
  }

  // Trigger all of the pending timers. New timers added as part of the
  // callbacks will be enqueued now and notified in the next spin at the
  // earliest.
  _handlingCallbacks = true;
  var i = 0;
  try {
    for (; i < pendingTimers.length; i++) {
      // Next pending timer.
      var timer = pendingTimers[i];
      timer._indexOrNext = null;

      // One of the timers in the pending_timers list can cancel
      // one of the later timers which will set the callback to
      // null. Or the pending zero timer has been canceled earlier.
      if (timer._callback != null) {
        var callback = timer._callback;
        if (!timer._repeating) {
          // Mark timer as inactive.
          timer._callback = null;
        } else if (timer._milliSeconds > 0) {
          var ms = timer._milliSeconds;
          int overdue =
              VMLibraryHooks.timerMillisecondClock() - timer._wakeupTime;
          if (overdue > ms) {
            int missedTicks = overdue ~/ ms;
            timer._wakeupTime += missedTicks * ms;
            timer._tick += missedTicks;
          }
        }
        timer._tick += 1;

        callback(timer);
        // Re-insert repeating timer if not canceled.
        if (timer._repeating && (timer._callback != null)) {
          timer._advanceWakeupTime();
          timer._enqueue();
        }
        // Execute pending micro tasks.
        var immediateCallback = _removePendingImmediateCallback();
        if (immediateCallback != null) {
          immediateCallback();
        }
      }
    }
  } finally {
    _handlingCallbacks = false;
    // Re-queue timers we didn't get to.
    for (i++; i < pendingTimers.length; i++) {
      var timer = pendingTimers[i];
      timer._enqueue();
    }
    _notifyEventHandler();
  }
}
複製代碼

基本邏輯就是一個 for 循環,對於每個 timer,執行其 callback 函數,一步步回溯回去,就會調用到 result._complete(computation()) 函數,今後處開始,就到了上層邏輯代碼。

相關文章
相關標籤/搜索