Flutter 異步機制:Future(二),發送任務

Flutter 基於 dart 語言,dart 自己是一個單線程模型,Future 也是基於單線程的異步機制,即基於事件循環實現的異步,與多線程實現的異步並不同,比較相似於 Android 中的 Handler 機制,而所謂的異步,就是向事件循環中心發送一條消息,等待調度,在以後的某個時刻執行代碼,可是這段代碼仍是在當前線程執行的,因此,若是使用 Future 執行耗時任務,它可能不會阻塞當前的 UI 流程,不事後續的一些 UI 操做仍是會受到影響。 使用 Future 異步執行代碼須要四個步驟:c++

  1. 建立任務
  2. 發送任務
  3. 執行任務
  4. 執行功能代碼

建立任務

關於 Future 任務的建立,在應用層通常是這麼寫:api

new Future(() {
    doSomething();
});
複製代碼

那麼,從 Future 的構造函數開始,能夠一窺 Future 任務建立的全過程。markdown

factory Future(FutureOr<T> computation()) {
  _Future<T> result = new _Future<T>();
  Timer.run(() {
    try {
      result._complete(computation());
    } catch (e, s) {
      _completeWithErrorCallback(result, e, s);
    }
  });
  return result;
}
複製代碼

result._complete(computation()) 即最後的執行功能代碼的部分,參見第四小節,Timer.run 則會一步步建立任務。多線程

static void run(void callback()) {
  new Timer(Duration.zero, callback);
}

// third_party/sdk/sdk/lib/async/timer.dart
factory Timer(Duration duration, void callback()) {
  if (Zone.current == Zone.root) {
    // No need to bind the callback. We know that the root's timer will
    // be invoked in the root zone.
    return Zone.current.createTimer(duration, callback);
  }
  return Zone.current
      .createTimer(duration, Zone.current.bindCallbackGuarded(callback));
}

// third_party/sdk/runtime/lib/timer_patch.dart
static Timer _createTimer(Duration duration, void callback()) {
  // TODO(iposva): Remove _TimerFactory and use VMLibraryHooks exclusively.
  if (_TimerFactory._factory == null) {
    _TimerFactory._factory = VMLibraryHooks.timerFactory;
  }
  if (_TimerFactory._factory == null) {
    throw new UnsupportedError("Timer interface not supported.");
  }
  int milliseconds = duration.inMilliseconds;
  if (milliseconds < 0) milliseconds = 0;
  return _TimerFactory._factory(milliseconds, (_) {
    callback();
  }, false);
}
複製代碼

這裏最後調用 _TimerFactory._factory 建立 Timer 實例,_TimerFactory._factory 來自於 VMLibraryHooks.timerFactory ,而 VMLibraryHooks.timerFactory 的設置時機能夠一步步回溯至 InitDartInternal :app

// lib/ui/dart_runtime_hooks.cc
static void InitDartInternal(Dart_Handle builtin_library, bool is_ui_isolate) {
  Dart_Handle print = GetFunction(builtin_library, "_getPrintClosure");

  Dart_Handle internal_library = Dart_LookupLibrary(ToDart("dart:_internal"));

  Dart_Handle result =
      Dart_SetField(internal_library, ToDart("_printClosure"), print);
  PropagateIfError(result);

  if (is_ui_isolate) {
    // Call |_setupHooks| to configure |VMLibraryHooks|.
    Dart_Handle method_name = Dart_NewStringFromCString("_setupHooks");
    result = Dart_Invoke(builtin_library, method_name, 0, NULL);
    PropagateIfError(result);
  }

  Dart_Handle setup_hooks = Dart_NewStringFromCString("_setupHooks");

  Dart_Handle io_lib = Dart_LookupLibrary(ToDart("dart:io"));
  result = Dart_Invoke(io_lib, setup_hooks, 0, NULL);
  PropagateIfError(result);

  Dart_Handle isolate_lib = Dart_LookupLibrary(ToDart("dart:isolate"));
  result = Dart_Invoke(isolate_lib, setup_hooks, 0, NULL);
  PropagateIfError(result);
}

// third_party/sdk/runtime/lib/timer_impl.dart
@pragma("vm:entry-point", "call")
_setupHooks() {
  VMLibraryHooks.timerFactory = _Timer._factory;
}

static Timer _factory(
    int milliSeconds, void callback(Timer timer), bool repeating) {
  if (repeating) {
    return new _Timer.periodic(milliSeconds, callback);
  }
  return new _Timer(milliSeconds, callback);
}
複製代碼

_Timer 是 Timer 的實現類,重複執行與不重複執行的 Timer 會調用不一樣的構造函數,可是兩者異曲同工。less

factory _Timer(int milliSeconds, void callback(Timer timer)) {
  return _createTimer(callback, milliSeconds, false);
}

factory _Timer.periodic(int milliSeconds, void callback(Timer timer)) {
  return _createTimer(callback, milliSeconds, true);
}

static Timer _createTimer(
    void callback(Timer timer), int milliSeconds, bool repeating) {
  // Negative timeouts are treated as if 0 timeout.
  if (milliSeconds < 0) {
    milliSeconds = 0;
  }
  // Add one because DateTime.now() is assumed to round down
  // to nearest millisecond, not up, so that time + duration is before
  // duration milliseconds from now. Using microsecond timers like
  // Stopwatch allows detecting that the timer fires early.
  int now = VMLibraryHooks.timerMillisecondClock();
  int wakeupTime = (milliSeconds == 0) ? now : (now + 1 + milliSeconds);

  _Timer timer =
      new _Timer._internal(callback, wakeupTime, milliSeconds, repeating);
  // Enqueue this newly created timer in the appropriate structure and
  // notify if necessary.
  timer._enqueue();
  return timer;
}
複製代碼

_internal 函數是 _Timer 的構造函數,_enqueue 函數將 Timer 放入隊列等待執行:異步

// third_party/sdk/runtime/lib/timer_impl.dart
void _enqueue() {
  if (_milliSeconds == 0) {
    if (_firstZeroTimer == null) {
      _lastZeroTimer = this;
      _firstZeroTimer = this;
    } else {
      _lastZeroTimer._indexOrNext = this;
      _lastZeroTimer = this;
    }
    // Every zero timer gets its own event.
    _notifyZeroHandler();
  } else {
    _heap.add(this);
    if (_heap.isFirst(this)) {
      _notifyEventHandler();
    }
  }
}
複製代碼

Timer 的延遲是否爲 0 是一個分界線,它會將 Timer 分別插入 _lastZeroTimer 和 _heap 中,而後調用 _notifyZeroHandler 或 _notifyEventHandler 通知目標線程處理任務,接下來就是發送任務的過程了。async

發送任務

以 _notifyZeroHandler 爲例,ide

// third_party/sdk/runtime/lib/timer_impl.dart
static void _notifyZeroHandler() {
  if (_sendPort == null) {
    _createTimerHandler();
  }
  _sendPort.send(_ZERO_EVENT);
}
複製代碼

首先,確保 _sendPort 的存在,而後,使用 _sendPort 發送一條 _ZERO_EVENT 消息。函數

// third_party/sdk/runtime/lib/timer_impl.dart
static void _createTimerHandler() {
  assert(_receivePort == null);
  assert(_sendPort == null);
  _receivePort = new RawReceivePort(_handleMessage);
  _sendPort = _receivePort.sendPort;
  _scheduledWakeupTime = null;
}
複製代碼

_receivePort 與 _sendPort 是一對用於通訊的接口,首先調用 RawReceivePort 構造函數建立 _receivePort,而且傳遞了回調函數 _handleMessage ,而後從 _receivePort 中取出 _sendPort ,可見這個通訊模型的重點就是 _receivePort 的構造過程。

// third_party/sdk/runtime/lib/isolate_patch.dart
@patch
factory RawReceivePort([Function handler]) {
  _RawReceivePortImpl result = new _RawReceivePortImpl();
  result.handler = handler;
  return result;
}

factory _RawReceivePortImpl() native "RawReceivePortImpl_factory";

// third_party/sdk/runtime/lib/isolate.cc
DEFINE_NATIVE_ENTRY(RawReceivePortImpl_factory, 0, 1) {
  ASSERT(
      TypeArguments::CheckedHandle(zone, arguments->NativeArgAt(0)).IsNull());
  Dart_Port port_id = PortMap::CreatePort(isolate->message_handler());
  return ReceivePort::New(port_id, false /* not control port */);
}
複製代碼

構造函數是一個 native 函數,在 native 中,首先調用 PortMap::CreatePort 建立出 Dart_Port ,而後調用 ReceivePort::New 建立 ReceivePort 實例,實例化以後,將回調函數 handler 保存到了 map 中,key 爲 _get_id,這也是一個 native 函數。

建立 Dart_Port 的參數 isolate->message_handler() 的設置時機爲 InitIsolate:

// third_party/sdk/runtime/vm/isolate.cc
Isolate* Isolate::InitIsolate(const char* name_prefix, IsolateGroup* isolate_group, const Dart_IsolateFlags& api_flags, bool is_vm_isolate) {


  // Setup the isolate message handler.
  MessageHandler* handler = new IsolateMessageHandler(result);
  ASSERT(handler != nullptr);
  result->set_message_handler(handler);
}
複製代碼
Dart_Port PortMap::CreatePort(MessageHandler* handler) {
  ASSERT(handler != NULL);
  MutexLocker ml(mutex_);
#if defined(DEBUG)
  handler->CheckAccess();
#endif

  Entry entry;
  entry.port = AllocatePort();
  entry.handler = handler;
  entry.state = kNewPort;

  // Search for the first unused slot. Make use of the knowledge that here is
  // currently no port with this id in the port map.
  ASSERT(FindPort(entry.port) < 0);
  intptr_t index = entry.port % capacity_;
  Entry cur = map_[index];
  // Stop the search at the first found unused (free or deleted) slot.
  while (cur.port != 0) {
    index = (index + 1) % capacity_;
    cur = map_[index];
  }

  // Insert the newly created port at the index.
  ASSERT(index >= 0);
  ASSERT(index < capacity_);
  ASSERT(map_[index].port == 0);
  ASSERT((map_[index].handler == NULL) ||
         (map_[index].handler == deleted_entry_));
  if (map_[index].handler == deleted_entry_) {
    // Consuming a deleted entry.
    deleted_--;
  }
  map_[index] = entry;

  // Increment number of used slots and grow if necessary.
  used_++;
  MaintainInvariants();

  if (FLAG_trace_isolates) {
    OS::PrintErr(
        "[+] Opening port: \n"
        "\thandler: %s\n"
        "\tport: %" Pd64 "\n",
        handler->name(), entry.port);
  }

  return entry.port;
}
複製代碼

在 CreatePort 中,先是調用 AllocatePort 建立一個端口(先隨機生成一個,再判斷這個端口沒有被使用,就能夠返回),而後構建出 Entry 並將其存在一個哈希表中。

RawReceivePort* ReceivePort::New(Dart_Port id, bool is_control_port, Heap::Space space) {
  ASSERT(id != ILLEGAL_PORT);
  Thread* thread = Thread::Current();
  Zone* zone = thread->zone();
  const SendPort& send_port =
      SendPort::Handle(zone, SendPort::New(id, thread->isolate()->origin_id()));

  ReceivePort& result = ReceivePort::Handle(zone);
  {
    RawObject* raw = Object::Allocate(ReceivePort::kClassId,
                                      ReceivePort::InstanceSize(), space);
    NoSafepointScope no_safepoint;
    result ^= raw;
    result.StorePointer(&result.raw_ptr()->send_port_, send_port.raw());
  }
  if (is_control_port) {
    PortMap::SetPortState(id, PortMap::kControlPort);
  } else {
    PortMap::SetPortState(id, PortMap::kLivePort);
  }
  return result.raw();
}
複製代碼

在這個函數裏面,先是建立了 SendPort ,而後對 ReceivePort 進行了一些初始化操做,並將 ReceivePort 的 RawObject 返回,接着在 third_party/sdk/runtime/lib/timer_impl.dart#_createTimerHandler 中還要經過 _receivePort 取得 _sendPort,最終會調用 _get_sendport,這也是一個 native 函數:

DEFINE_NATIVE_ENTRY(RawReceivePortImpl_get_sendport, 0, 1) {
  GET_NON_NULL_NATIVE_ARGUMENT(ReceivePort, port, arguments->NativeArgAt(0));
  return port.send_port();
}
複製代碼

GET_NON_NULL_NATIVE_ARGUMENT 取出 ReceivePort 實例,經過 arguments->NativeArgAt(0) ,可是 _get_sendport 並無傳遞參數,也就是說這個參數就是 _RawReceivePortImpl 自身,也就是上面返回的 RawObject,而後返回它的 send_port 。

再回到 third_party/sdk/runtime/lib/timer_impl.dart#_notifyZeroHandler,_sendPort 調用 send 函數發送了一條消息,最終會調用 _sendInternal ,再轉到 c++ 層:

// third_party/sdk/runtime/lib/isolate.cc
DEFINE_NATIVE_ENTRY(SendPortImpl_sendInternal_, 0, 2) {
  GET_NON_NULL_NATIVE_ARGUMENT(SendPort, port, arguments->NativeArgAt(0));
  // TODO(iposva): Allow for arbitrary messages to be sent.
  GET_NON_NULL_NATIVE_ARGUMENT(Instance, obj, arguments->NativeArgAt(1));

  const Dart_Port destination_port_id = port.Id();
  const bool can_send_any_object = isolate->origin_id() == port.origin_id();

  if (ApiObjectConverter::CanConvert(obj.raw())) {
    PortMap::PostMessage(
        Message::New(destination_port_id, obj.raw(), Message::kNormalPriority));
  } else {
    MessageWriter writer(can_send_any_object);
    // TODO(turnidge): Throw an exception when the return value is false?
    PortMap::PostMessage(writer.WriteMessage(obj, destination_port_id,
                                             Message::kNormalPriority));
  }
  return Object::null();
}
複製代碼

首先取出 SendPort 和 Instance 實例,而後建立出 Message 實例,最後調用 PortMap::PostMessage 發送消息。

// third_party/sdk/runtime/vm/port.cc
bool PortMap::PostMessage(std::unique_ptr<Message> message, bool before_events) {
  MutexLocker ml(mutex_);
  intptr_t index = FindPort(message->dest_port());
  if (index < 0) {
    return false;
  }
  ASSERT(index >= 0);
  ASSERT(index < capacity_);
  MessageHandler* handler = map_[index].handler;
  ASSERT(map_[index].port != 0);
  ASSERT((handler != NULL) && (handler != deleted_entry_));
  handler->PostMessage(std::move(message), before_events);
  return true;
}
複製代碼

根據 Message 的 dest_port 找到 index ,再從哈希表中取出 handler,這裏的 handler 就是在初始化 Dart_Port 傳入的 isolate->message_handler() ,也就是 IsolateMessageHandler 實例,不過 IsolateMessageHandler 並無重寫 PostMessage 函數。

// third_party/sdk/runtime/vm/message_handler.cc
void MessageHandler::PostMessage(std::unique_ptr<Message> message, bool before_events) {
  Message::Priority saved_priority;

  {
    saved_priority = message->priority();
    if (message->IsOOB()) {
      oob_queue_->Enqueue(std::move(message), before_events);
    } else {
      queue_->Enqueue(std::move(message), before_events);
    }
    if (paused_for_messages_) {
      ml.Notify();
    }

    if (pool_ != nullptr && !task_running_) {
      ASSERT(!delete_me_);
      task_running_ = true;
      const bool launched_successfully = pool_->Run<MessageHandlerTask>(this);
      ASSERT(launched_successfully);
    }
  }

  // Invoke any custom message notification.
  MessageNotify(saved_priority);
}
複製代碼

message 分爲兩種,oob 和非 oob,實際上就是優先級的區分:

// third_party/sdk/runtime/vm/message.h
typedef enum {
  kNormalPriority = 0,  // Deliver message when idle.
  kOOBPriority = 1,     // Deliver message asap.

  // Iteration.
  kFirstPriority = 0,
  kNumPriorities = 2,
} Priority;
複製代碼

不一樣優先級的 message 會被加入不一樣的隊列,oob_queue_ 和 queue_,完了調用 MessageNotify :

// third_party/sdk/runtime/vm/isolate.cc
void IsolateMessageHandler::MessageNotify(Message::Priority priority) {
  if (priority >= Message::kOOBPriority) {
    // Handle out of band messages even if the mutator thread is busy.
    I->ScheduleInterrupts(Thread::kMessageInterrupt);
  }
  Dart_MessageNotifyCallback callback = I->message_notify_callback();
  if (callback != nullptr) {
    // Allow the embedder to handle message notification.
    (*callback)(Api::CastIsolate(I));
  }
}
複製代碼

此處的 priority 是 message 的 priority ,當優先級爲 kOOBPriority 時,會中斷當前的任務去處理這個 message 。而後從 isolate 中取出 callback 執行,再看 callback 究竟是誰。這要從 DartIsolate::Initialize 開始:

// runtime/dart_isolate.cc
bool DartIsolate::Initialize(Dart_Isolate dart_isolate, bool is_root_isolate) {

  SetMessageHandlingTaskRunner(GetTaskRunners().GetUITaskRunner(),
                               is_root_isolate);

}

void DartIsolate::SetMessageHandlingTaskRunner( fml::RefPtr<fml::TaskRunner> runner, bool is_root_isolate) {
  if (!is_root_isolate || !runner) {
    return;
  }

  message_handling_task_runner_ = runner;

  message_handler().Initialize(
      [runner](std::function<void()> task) { runner->PostTask(task); });
}
複製代碼

從這裏就能夠看出,後續的 callback 會在 message_handling_task_runner_ 中運行,可是它實際上仍是 UITaskRunner ,因此說,Future 的異步是單線程的異步,而後再看:

// third_party/tonic/dart_message_handler.cc
void DartMessageHandler::Initialize(TaskDispatcher dispatcher) {1
  // Only can be called once.
  TONIC_CHECK(!task_dispatcher_ && dispatcher);
  task_dispatcher_ = dispatcher;
  Dart_SetMessageNotifyCallback(MessageNotifyCallback);
}

void DartMessageHandler::MessageNotifyCallback(Dart_Isolate dest_isolate) {
  auto dart_state = DartState::From(dest_isolate);
  TONIC_CHECK(dart_state);
  dart_state->message_handler().OnMessage(dart_state);
}
複製代碼
// third_party/sdk/runtime/vm/dart_api_impl.cc
DART_EXPORT void Dart_SetMessageNotifyCallback( Dart_MessageNotifyCallback message_notify_callback) {
  Isolate* isolate = Isolate::Current();
  CHECK_ISOLATE(isolate);

  {
    NoSafepointScope no_safepoint_scope;
    isolate->set_message_notify_callback(message_notify_callback);
  }

  if (message_notify_callback != nullptr && isolate->HasPendingMessages()) {
    ::Dart_ExitIsolate();

    // If a new handler gets installed and there are pending messages in the
    // queue (e.g. OOB messages for doing vm service work) we need to notify
    // the newly registered callback, otherwise the embedder might never get
    // notified about the pending messages.
    message_notify_callback(Api::CastIsolate(isolate));

    ::Dart_EnterIsolate(Api::CastIsolate(isolate));
  }
}
複製代碼

從以上代碼能夠看出,message_notify_callback 就是 MessageNotifyCallback 函數,這個函數調用了 OnMessage ,而 task_dispatcher_ ,就是上面給出的 UITaskRunner 的 dispatcher 。

// third_party/tonic/dart_message_handler.cc
void DartMessageHandler::MessageNotifyCallback(Dart_Isolate dest_isolate) {
  auto dart_state = DartState::From(dest_isolate);
  TONIC_CHECK(dart_state);
  dart_state->message_handler().OnMessage(dart_state);
}

void DartMessageHandler::OnMessage(DartState* dart_state) {
  auto task_dispatcher_ = dart_state->message_handler().task_dispatcher_;

  // Schedule a task to run on the message loop thread.
  auto weak_dart_state = dart_state->GetWeakPtr();
  task_dispatcher_([weak_dart_state]() {
    if (auto dart_state = weak_dart_state.lock()) {
      dart_state->message_handler().OnHandleMessage(dart_state.get());
    }
  });
}
複製代碼

從 OnHandleMessage 開始,剩下的代碼開始在 task_dispatcher_ 中執行,也能夠說從這裏開始,開始了接收任務階段。

接收任務

void DartMessageHandler::OnHandleMessage(DartState* dart_state) {
  if (isolate_had_fatal_error_) {
    // Don't handle any more messages.
    return;
  }

  DartIsolateScope scope(dart_state->isolate());
  DartApiScope dart_api_scope;
  Dart_Handle result = Dart_Null();
  bool error = false;

  // On the first message, check if we should pause on isolate start.
  if (!handled_first_message()) {
    set_handled_first_message(true);
    if (Dart_ShouldPauseOnStart()) {
      // Mark that we are paused on isolate start.
      Dart_SetPausedOnStart(true);
    }
  }

  if (Dart_IsPausedOnStart()) {
      
  } else if (Dart_IsPausedOnExit()) {
      
  } else {
    // We are processing messages normally.
    result = Dart_HandleMessage();
    // If the Dart program has set a return code, then it is intending to shut
    // down by way of a fatal error, and so there is no need to emit a log
    // message.
    if (dart_state->has_set_return_code() && Dart_IsError(result) &&
        Dart_IsFatalError(result)) {
      error = true;
    } else {
      error = LogIfError(result);
    }
    dart_state->MessageEpilogue(result);
    if (!Dart_CurrentIsolate()) {
      isolate_exited_ = true;
      return;
    }
  }

}
複製代碼

正常狀況下會調用 Dart_HandleMessage:

// third_party/sdk/runtime/vm/dart_api_impl.cc
DART_EXPORT Dart_Handle Dart_HandleMessage() {
  Thread* T = Thread::Current();
  Isolate* I = T->isolate();
  CHECK_API_SCOPE(T);
  CHECK_CALLBACK_STATE(T);
  API_TIMELINE_BEGIN_END_BASIC(T);
  TransitionNativeToVM transition(T);
  if (I->message_handler()->HandleNextMessage() != MessageHandler::kOK) {
    return Api::NewHandle(T, T->StealStickyError());
  }
  return Api::Success();
}

// third_party/sdk/runtime/vm/message_handler.cc
MessageHandler::MessageStatus MessageHandler::HandleNextMessage() {
  // We can only call HandleNextMessage when this handler is not
  // assigned to a thread pool.
  MonitorLocker ml(&monitor_);
  ASSERT(pool_ == NULL);
  ASSERT(!delete_me_);
#if defined(DEBUG)
  CheckAccess();
#endif
  return HandleMessages(&ml, true, false);
}

MessageHandler::MessageStatus MessageHandler::HandleMessages( MonitorLocker* ml, bool allow_normal_messages, bool allow_multiple_normal_messages) {
  ASSERT(monitor_.IsOwnedByCurrentThread());

  // Scheduling of the mutator thread during the isolate start can cause this
  // thread to safepoint.
  // We want to avoid holding the message handler monitor during the safepoint
  // operation to avoid possible deadlocks, which can occur if other threads are
  // sending messages to this message handler.
  //
  // If isolate() returns nullptr [StartIsolateScope] does nothing.
  ml->Exit();
  StartIsolateScope start_isolate(isolate());
  ml->Enter();

  MessageStatus max_status = kOK;
  Message::Priority min_priority =
      ((allow_normal_messages && !paused()) ? Message::kNormalPriority
                                            : Message::kOOBPriority);
  std::unique_ptr<Message> message = DequeueMessage(min_priority);
  while (message != nullptr) {
    intptr_t message_len = message->Size();
    if (FLAG_trace_isolates) {
      OS::PrintErr(
          "[<] Handling message:\n"
          "\tlen: %" Pd
          "\n"
          "\thandler: %s\n"
          "\tport: %" Pd64 "\n",
          message_len, name(), message->dest_port());
    }

    // Release the monitor_ temporarily while we handle the message.
    // The monitor was acquired in MessageHandler::TaskCallback().
    ml->Exit();
    Message::Priority saved_priority = message->priority();
    Dart_Port saved_dest_port = message->dest_port();
    MessageStatus status = HandleMessage(std::move(message));
    if (status > max_status) {
      max_status = status;
    }
    ml->Enter();
    if (FLAG_trace_isolates) {
      OS::PrintErr(
          "[.] Message handled (%s):\n"
          "\tlen: %" Pd
          "\n"
          "\thandler: %s\n"
          "\tport: %" Pd64 "\n",
          MessageStatusString(status), message_len, name(), saved_dest_port);
    }
    // If we are shutting down, do not process any more messages.
    if (status == kShutdown) {
      ClearOOBQueue();
      break;
    }

    // Remember time since the last message. Don't consider OOB messages so
    // using Observatory doesn't trigger additional idle tasks.
    if ((FLAG_idle_timeout_micros != 0) &&
        (saved_priority == Message::kNormalPriority)) {
      idle_start_time_ = OS::GetCurrentMonotonicMicros();
    }

    // Some callers want to process only one normal message and then quit. At
    // the same time it is OK to process multiple OOB messages.
    if ((saved_priority == Message::kNormalPriority) &&
        !allow_multiple_normal_messages) {
      // We processed one normal message. Allow no more.
      allow_normal_messages = false;
    }

    // Reevaluate the minimum allowable priority. The paused state
    // may have changed as part of handling the message. We may also
    // have encountered an error during message processing.
    //
    // Even if we encounter an error, we still process pending OOB
    // messages so that we don't lose the message notification.
    min_priority = (((max_status == kOK) && allow_normal_messages && !paused())
                        ? Message::kNormalPriority
                        : Message::kOOBPriority);
    message = DequeueMessage(min_priority);
  }
  return max_status;
}
複製代碼

直到 MessageHandler::HandleMessages 爲止,這裏又是一個 while 循環,不斷調用 DequeueMessage 取出 message ,直到全部的 message 執行完畢,單個 message 的處理,則是調用 HandleMessage ,

在 HandleMessage 中首先作的是獲取 msg_handler ,調用的是 DartLibraryCalls::LookupHandler ,

RawObject* DartLibraryCalls::LookupHandler(Dart_Port port_id) {
  Thread* thread = Thread::Current();
  Zone* zone = thread->zone();
  Function& function = Function::Handle(
      zone, thread->isolate()->object_store()->lookup_port_handler());
  const int kTypeArgsLen = 0;
  const int kNumArguments = 1;
  if (function.IsNull()) {
    Library& isolate_lib = Library::Handle(zone, Library::IsolateLibrary());
    ASSERT(!isolate_lib.IsNull());
    const String& class_name = String::Handle(
        zone, isolate_lib.PrivateName(Symbols::_RawReceivePortImpl()));
    const String& function_name = String::Handle(
        zone, isolate_lib.PrivateName(Symbols::_lookupHandler()));
    function = Resolver::ResolveStatic(isolate_lib, class_name, function_name,
                                       kTypeArgsLen, kNumArguments,
                                       Object::empty_array());
    ASSERT(!function.IsNull());
    thread->isolate()->object_store()->set_lookup_port_handler(function);
  }
  const Array& args = Array::Handle(zone, Array::New(kNumArguments));
  args.SetAt(0, Integer::Handle(zone, Integer::New(port_id)));
  const Object& result =
      Object::Handle(zone, DartEntry::InvokeFunction(function, args));
  return result.raw();
}
複製代碼

能夠看出,這就是一個典型的 c++ 調用 dart 的流程,先找到 function ,而後構建參數,最後 DartEntry::InvokeFunction 調用這個函數。從 8~20 行得知,這是 _RawReceivePortImpl 的 _lookupHandler 函數:

@pragma("vm:entry-point", "call")
static _lookupHandler(int id) {
  var result = _handlerMap[id];
  return result;
}
複製代碼

根據 id 從 _handlerMap 中找到一個值返回,這個值,其實就是會調函數,設置時機以下:

void set handler(Function value) {
  _handlerMap[this._get_id()] = value;
}
複製代碼

這就是在初始化 _RawReceivePortImpl 以後調用的,_get_id 返回的也正是 dest_id 。而後,HandleMessage 中對 message 分爲三種狀況進行處理:

  1. message 優先級爲 oob
  2. message dest_port 爲 kIllegalPort
  3. 正常狀況

正常的處理以下:

const Object& result =
    Object::Handle(zone, DartLibraryCalls::HandleMessage(msg_handler, msg));
if (result.IsError()) {
  status = ProcessUnhandledException(Error::Cast(result));
} else {
  ASSERT(result.IsNull());
}
複製代碼

DartLibraryCalls::HandleMessage:

// third_party/sdk/runtime/vm/dart_entry.cc
RawObject* DartLibraryCalls::HandleMessage(const Object& handler, const Instance& message) {
  Thread* thread = Thread::Current();
  Zone* zone = thread->zone();
  Isolate* isolate = thread->isolate();
  Function& function = Function::Handle(
      zone, isolate->object_store()->handle_message_function());
  const int kTypeArgsLen = 0;
  const int kNumArguments = 2;
  if (function.IsNull()) {
    Library& isolate_lib = Library::Handle(zone, Library::IsolateLibrary());
    ASSERT(!isolate_lib.IsNull());
    const String& class_name = String::Handle(
        zone, isolate_lib.PrivateName(Symbols::_RawReceivePortImpl()));
    const String& function_name = String::Handle(
        zone, isolate_lib.PrivateName(Symbols::_handleMessage()));
    function = Resolver::ResolveStatic(isolate_lib, class_name, function_name,
                                       kTypeArgsLen, kNumArguments,
                                       Object::empty_array());
    ASSERT(!function.IsNull());
    isolate->object_store()->set_handle_message_function(function);
  }
  const Array& args = Array::Handle(zone, Array::New(kNumArguments));
  args.SetAt(0, handler);
  args.SetAt(1, message);
#if !defined(PRODUCT)
  if (isolate->debugger()->IsStepping()) {
    // If the isolate is being debugged and the debugger was stepping
    // through code, enable single stepping so debugger will stop
    // at the first location the user is interested in.
    isolate->debugger()->SetResumeAction(Debugger::kStepInto);
  }
#endif
  const Object& result =
      Object::Handle(zone, DartEntry::InvokeFunction(function, args));
  ASSERT(result.IsNull() || result.IsError());
  return result.raw();
}
複製代碼

這又是從 c++ 中調用 dart 函數,從代碼可知,調用的是 _RawReceivePortImpl 的 _handleMessage 函數:

@pragma("vm:entry-point", "call")
static void _handleMessage(Function handler, var message) {
  // TODO(floitsch): this relies on the fact that any exception aborts the
  // VM. Once we have non-fatal global exceptions we need to catch errors
  // so that we can run the immediate callbacks.
  handler(message);
  _runPendingImmediateCallback();
}
複製代碼

以 message 爲參數,調用 handler 函數,即最初傳進來的 _Timer 的 _handleMessage 函數:

// third_party/sdk/runtime/lib/timer_impl.dart
static void _handleMessage(msg) {
  var pendingTimers;
  if (msg == _ZERO_EVENT) {
    pendingTimers = _queueFromZeroEvent();
    assert(pendingTimers.length > 0);
  } else {
    assert(msg == _TIMEOUT_EVENT);
    _scheduledWakeupTime = null; // Consumed the last scheduled wakeup now.
    pendingTimers = _queueFromTimeoutEvent();
  }
  _runTimers(pendingTimers);
  // Notify the event handler or shutdown the port if no more pending
  // timers are present.
  _notifyEventHandler();
}
複製代碼

這裏根據 msg 的不一樣從不一樣的隊列中取 pendingTimers ,兩者分別實現以下:

static List _queueFromZeroEvent() {
  var pendingTimers = new List();
  assert(_firstZeroTimer != null);
  // Collect pending timers from the timer heap that have an expiration prior
  // to the currently notified zero timer.
  var timer;
  while (!_heap.isEmpty && (_heap.first._compareTo(_firstZeroTimer) < 0)) {
    timer = _heap.removeFirst();
    pendingTimers.add(timer);
  }
  // Append the first zero timer to the pending timers.
  timer = _firstZeroTimer;
  _firstZeroTimer = timer._indexOrNext;
  timer._indexOrNext = null;
  pendingTimers.add(timer);
  return pendingTimers;
}

static List _queueFromTimeoutEvent() {
  var pendingTimers = new List();
  if (_firstZeroTimer != null) {
    // Collect pending timers from the timer heap that have an expiration
    // prior to the next zero timer.
    // By definition the first zero timer has been scheduled before the
    // current time, meaning all timers which are "less than" the first zero
    // timer are expired. The first zero timer will be dispatched when its
    // corresponding message is delivered.
    var timer;
    while (!_heap.isEmpty && (_heap.first._compareTo(_firstZeroTimer) < 0)) {
      timer = _heap.removeFirst();
      pendingTimers.add(timer);
    }
  } else {
    // Collect pending timers from the timer heap which have expired at this
    // time.
    var currentTime = VMLibraryHooks.timerMillisecondClock();
    var timer;
    while (!_heap.isEmpty && (_heap.first._wakeupTime <= currentTime)) {
      timer = _heap.removeFirst();
      pendingTimers.add(timer);
    }
  }
  return pendingTimers;
}
複製代碼

當 msg 爲 _ZERO_EVENT 時,會取出一個 _firstZeroTimer 隊列中的任務和 n 個 _heap 隊列中達到執行時間的任務,而當 msg 不爲 _ZERO_EVENT 時,則會取出 n 個執行時間先於 _firstZeroTimer 第一個任務的的任務,或者是執行時間先於當前時間的任務。取完以後則是調用 _runTimers 執行任務。

static void _runTimers(List pendingTimers) {
  // If there are no pending timers currently reset the id space before we
  // have a chance to enqueue new timers.
  if (_heap.isEmpty && (_firstZeroTimer == null)) {
    _idCount = 0;
  }

  // Fast exit if no pending timers.
  if (pendingTimers.length == 0) {
    return;
  }

  // Trigger all of the pending timers. New timers added as part of the
  // callbacks will be enqueued now and notified in the next spin at the
  // earliest.
  _handlingCallbacks = true;
  var i = 0;
  try {
    for (; i < pendingTimers.length; i++) {
      // Next pending timer.
      var timer = pendingTimers[i];
      timer._indexOrNext = null;

      // One of the timers in the pending_timers list can cancel
      // one of the later timers which will set the callback to
      // null. Or the pending zero timer has been canceled earlier.
      if (timer._callback != null) {
        var callback = timer._callback;
        if (!timer._repeating) {
          // Mark timer as inactive.
          timer._callback = null;
        } else if (timer._milliSeconds > 0) {
          var ms = timer._milliSeconds;
          int overdue =
              VMLibraryHooks.timerMillisecondClock() - timer._wakeupTime;
          if (overdue > ms) {
            int missedTicks = overdue ~/ ms;
            timer._wakeupTime += missedTicks * ms;
            timer._tick += missedTicks;
          }
        }
        timer._tick += 1;

        callback(timer);
        // Re-insert repeating timer if not canceled.
        if (timer._repeating && (timer._callback != null)) {
          timer._advanceWakeupTime();
          timer._enqueue();
        }
        // Execute pending micro tasks.
        var immediateCallback = _removePendingImmediateCallback();
        if (immediateCallback != null) {
          immediateCallback();
        }
      }
    }
  } finally {
    _handlingCallbacks = false;
    // Re-queue timers we didn't get to.
    for (i++; i < pendingTimers.length; i++) {
      var timer = pendingTimers[i];
      timer._enqueue();
    }
    _notifyEventHandler();
  }
}
複製代碼

基本邏輯就是一個 for 循環,對於每個 timer,執行其 callback 函數,一步步回溯回去,就會調用到 result._complete(computation()) 函數,今後處開始,就到了上層邏輯代碼。

相關文章
相關標籤/搜索