Flutter 基於 dart 語言,dart 自己是一個單線程模型,Future 也是基於單線程的異步機制,即基於事件循環實現的異步,與多線程實現的異步並不同,比較相似於 Android 中的 Handler 機制,而所謂的異步,就是向事件循環中心發送一條消息,等待調度,在以後的某個時刻執行代碼,可是這段代碼仍是在當前線程執行的,因此,若是使用 Future 執行耗時任務,它可能不會阻塞當前的 UI 流程,不事後續的一些 UI 操做仍是會受到影響。 使用 Future 異步執行代碼須要四個步驟:c++
關於 Future 任務的建立,在應用層通常是這麼寫:api
new Future(() {
doSomething();
});
複製代碼
那麼,從 Future 的構造函數開始,能夠一窺 Future 任務建立的全過程。markdown
factory Future(FutureOr<T> computation()) {
_Future<T> result = new _Future<T>();
Timer.run(() {
try {
result._complete(computation());
} catch (e, s) {
_completeWithErrorCallback(result, e, s);
}
});
return result;
}
複製代碼
result._complete(computation()) 即最後的執行功能代碼的部分,參見第四小節,Timer.run 則會一步步建立任務。多線程
static void run(void callback()) {
new Timer(Duration.zero, callback);
}
// third_party/sdk/sdk/lib/async/timer.dart
factory Timer(Duration duration, void callback()) {
if (Zone.current == Zone.root) {
// No need to bind the callback. We know that the root's timer will
// be invoked in the root zone.
return Zone.current.createTimer(duration, callback);
}
return Zone.current
.createTimer(duration, Zone.current.bindCallbackGuarded(callback));
}
// third_party/sdk/runtime/lib/timer_patch.dart
static Timer _createTimer(Duration duration, void callback()) {
// TODO(iposva): Remove _TimerFactory and use VMLibraryHooks exclusively.
if (_TimerFactory._factory == null) {
_TimerFactory._factory = VMLibraryHooks.timerFactory;
}
if (_TimerFactory._factory == null) {
throw new UnsupportedError("Timer interface not supported.");
}
int milliseconds = duration.inMilliseconds;
if (milliseconds < 0) milliseconds = 0;
return _TimerFactory._factory(milliseconds, (_) {
callback();
}, false);
}
複製代碼
這裏最後調用 _TimerFactory._factory 建立 Timer 實例,_TimerFactory._factory 來自於 VMLibraryHooks.timerFactory ,而 VMLibraryHooks.timerFactory 的設置時機能夠一步步回溯至 InitDartInternal :app
// lib/ui/dart_runtime_hooks.cc
static void InitDartInternal(Dart_Handle builtin_library, bool is_ui_isolate) {
Dart_Handle print = GetFunction(builtin_library, "_getPrintClosure");
Dart_Handle internal_library = Dart_LookupLibrary(ToDart("dart:_internal"));
Dart_Handle result =
Dart_SetField(internal_library, ToDart("_printClosure"), print);
PropagateIfError(result);
if (is_ui_isolate) {
// Call |_setupHooks| to configure |VMLibraryHooks|.
Dart_Handle method_name = Dart_NewStringFromCString("_setupHooks");
result = Dart_Invoke(builtin_library, method_name, 0, NULL);
PropagateIfError(result);
}
Dart_Handle setup_hooks = Dart_NewStringFromCString("_setupHooks");
Dart_Handle io_lib = Dart_LookupLibrary(ToDart("dart:io"));
result = Dart_Invoke(io_lib, setup_hooks, 0, NULL);
PropagateIfError(result);
Dart_Handle isolate_lib = Dart_LookupLibrary(ToDart("dart:isolate"));
result = Dart_Invoke(isolate_lib, setup_hooks, 0, NULL);
PropagateIfError(result);
}
// third_party/sdk/runtime/lib/timer_impl.dart
@pragma("vm:entry-point", "call")
_setupHooks() {
VMLibraryHooks.timerFactory = _Timer._factory;
}
static Timer _factory(
int milliSeconds, void callback(Timer timer), bool repeating) {
if (repeating) {
return new _Timer.periodic(milliSeconds, callback);
}
return new _Timer(milliSeconds, callback);
}
複製代碼
_Timer 是 Timer 的實現類,重複執行與不重複執行的 Timer 會調用不一樣的構造函數,可是兩者異曲同工。less
factory _Timer(int milliSeconds, void callback(Timer timer)) {
return _createTimer(callback, milliSeconds, false);
}
factory _Timer.periodic(int milliSeconds, void callback(Timer timer)) {
return _createTimer(callback, milliSeconds, true);
}
static Timer _createTimer(
void callback(Timer timer), int milliSeconds, bool repeating) {
// Negative timeouts are treated as if 0 timeout.
if (milliSeconds < 0) {
milliSeconds = 0;
}
// Add one because DateTime.now() is assumed to round down
// to nearest millisecond, not up, so that time + duration is before
// duration milliseconds from now. Using microsecond timers like
// Stopwatch allows detecting that the timer fires early.
int now = VMLibraryHooks.timerMillisecondClock();
int wakeupTime = (milliSeconds == 0) ? now : (now + 1 + milliSeconds);
_Timer timer =
new _Timer._internal(callback, wakeupTime, milliSeconds, repeating);
// Enqueue this newly created timer in the appropriate structure and
// notify if necessary.
timer._enqueue();
return timer;
}
複製代碼
_internal 函數是 _Timer 的構造函數,_enqueue 函數將 Timer 放入隊列等待執行:異步
// third_party/sdk/runtime/lib/timer_impl.dart
void _enqueue() {
if (_milliSeconds == 0) {
if (_firstZeroTimer == null) {
_lastZeroTimer = this;
_firstZeroTimer = this;
} else {
_lastZeroTimer._indexOrNext = this;
_lastZeroTimer = this;
}
// Every zero timer gets its own event.
_notifyZeroHandler();
} else {
_heap.add(this);
if (_heap.isFirst(this)) {
_notifyEventHandler();
}
}
}
複製代碼
Timer 的延遲是否爲 0 是一個分界線,它會將 Timer 分別插入 _lastZeroTimer 和 _heap 中,而後調用 _notifyZeroHandler 或 _notifyEventHandler 通知目標線程處理任務,接下來就是發送任務的過程了。async
以 _notifyZeroHandler 爲例,ide
// third_party/sdk/runtime/lib/timer_impl.dart
static void _notifyZeroHandler() {
if (_sendPort == null) {
_createTimerHandler();
}
_sendPort.send(_ZERO_EVENT);
}
複製代碼
首先,確保 _sendPort 的存在,而後,使用 _sendPort 發送一條 _ZERO_EVENT 消息。函數
// third_party/sdk/runtime/lib/timer_impl.dart
static void _createTimerHandler() {
assert(_receivePort == null);
assert(_sendPort == null);
_receivePort = new RawReceivePort(_handleMessage);
_sendPort = _receivePort.sendPort;
_scheduledWakeupTime = null;
}
複製代碼
_receivePort 與 _sendPort 是一對用於通訊的接口,首先調用 RawReceivePort 構造函數建立 _receivePort,而且傳遞了回調函數 _handleMessage ,而後從 _receivePort 中取出 _sendPort ,可見這個通訊模型的重點就是 _receivePort 的構造過程。
// third_party/sdk/runtime/lib/isolate_patch.dart
@patch
factory RawReceivePort([Function handler]) {
_RawReceivePortImpl result = new _RawReceivePortImpl();
result.handler = handler;
return result;
}
factory _RawReceivePortImpl() native "RawReceivePortImpl_factory";
// third_party/sdk/runtime/lib/isolate.cc
DEFINE_NATIVE_ENTRY(RawReceivePortImpl_factory, 0, 1) {
ASSERT(
TypeArguments::CheckedHandle(zone, arguments->NativeArgAt(0)).IsNull());
Dart_Port port_id = PortMap::CreatePort(isolate->message_handler());
return ReceivePort::New(port_id, false /* not control port */);
}
複製代碼
構造函數是一個 native 函數,在 native 中,首先調用 PortMap::CreatePort 建立出 Dart_Port ,而後調用 ReceivePort::New 建立 ReceivePort 實例,實例化以後,將回調函數 handler 保存到了 map 中,key 爲 _get_id,這也是一個 native 函數。
建立 Dart_Port 的參數 isolate->message_handler() 的設置時機爲 InitIsolate:
// third_party/sdk/runtime/vm/isolate.cc
Isolate* Isolate::InitIsolate(const char* name_prefix, IsolateGroup* isolate_group, const Dart_IsolateFlags& api_flags, bool is_vm_isolate) {
// Setup the isolate message handler.
MessageHandler* handler = new IsolateMessageHandler(result);
ASSERT(handler != nullptr);
result->set_message_handler(handler);
}
複製代碼
Dart_Port PortMap::CreatePort(MessageHandler* handler) {
ASSERT(handler != NULL);
MutexLocker ml(mutex_);
#if defined(DEBUG)
handler->CheckAccess();
#endif
Entry entry;
entry.port = AllocatePort();
entry.handler = handler;
entry.state = kNewPort;
// Search for the first unused slot. Make use of the knowledge that here is
// currently no port with this id in the port map.
ASSERT(FindPort(entry.port) < 0);
intptr_t index = entry.port % capacity_;
Entry cur = map_[index];
// Stop the search at the first found unused (free or deleted) slot.
while (cur.port != 0) {
index = (index + 1) % capacity_;
cur = map_[index];
}
// Insert the newly created port at the index.
ASSERT(index >= 0);
ASSERT(index < capacity_);
ASSERT(map_[index].port == 0);
ASSERT((map_[index].handler == NULL) ||
(map_[index].handler == deleted_entry_));
if (map_[index].handler == deleted_entry_) {
// Consuming a deleted entry.
deleted_--;
}
map_[index] = entry;
// Increment number of used slots and grow if necessary.
used_++;
MaintainInvariants();
if (FLAG_trace_isolates) {
OS::PrintErr(
"[+] Opening port: \n"
"\thandler: %s\n"
"\tport: %" Pd64 "\n",
handler->name(), entry.port);
}
return entry.port;
}
複製代碼
在 CreatePort 中,先是調用 AllocatePort 建立一個端口(先隨機生成一個,再判斷這個端口沒有被使用,就能夠返回),而後構建出 Entry 並將其存在一個哈希表中。
RawReceivePort* ReceivePort::New(Dart_Port id, bool is_control_port, Heap::Space space) {
ASSERT(id != ILLEGAL_PORT);
Thread* thread = Thread::Current();
Zone* zone = thread->zone();
const SendPort& send_port =
SendPort::Handle(zone, SendPort::New(id, thread->isolate()->origin_id()));
ReceivePort& result = ReceivePort::Handle(zone);
{
RawObject* raw = Object::Allocate(ReceivePort::kClassId,
ReceivePort::InstanceSize(), space);
NoSafepointScope no_safepoint;
result ^= raw;
result.StorePointer(&result.raw_ptr()->send_port_, send_port.raw());
}
if (is_control_port) {
PortMap::SetPortState(id, PortMap::kControlPort);
} else {
PortMap::SetPortState(id, PortMap::kLivePort);
}
return result.raw();
}
複製代碼
在這個函數裏面,先是建立了 SendPort ,而後對 ReceivePort 進行了一些初始化操做,並將 ReceivePort 的 RawObject 返回,接着在 third_party/sdk/runtime/lib/timer_impl.dart#_createTimerHandler 中還要經過 _receivePort 取得 _sendPort,最終會調用 _get_sendport,這也是一個 native 函數:
DEFINE_NATIVE_ENTRY(RawReceivePortImpl_get_sendport, 0, 1) {
GET_NON_NULL_NATIVE_ARGUMENT(ReceivePort, port, arguments->NativeArgAt(0));
return port.send_port();
}
複製代碼
GET_NON_NULL_NATIVE_ARGUMENT 取出 ReceivePort 實例,經過 arguments->NativeArgAt(0) ,可是 _get_sendport 並無傳遞參數,也就是說這個參數就是 _RawReceivePortImpl 自身,也就是上面返回的 RawObject,而後返回它的 send_port 。
再回到 third_party/sdk/runtime/lib/timer_impl.dart#_notifyZeroHandler,_sendPort 調用 send 函數發送了一條消息,最終會調用 _sendInternal ,再轉到 c++ 層:
// third_party/sdk/runtime/lib/isolate.cc
DEFINE_NATIVE_ENTRY(SendPortImpl_sendInternal_, 0, 2) {
GET_NON_NULL_NATIVE_ARGUMENT(SendPort, port, arguments->NativeArgAt(0));
// TODO(iposva): Allow for arbitrary messages to be sent.
GET_NON_NULL_NATIVE_ARGUMENT(Instance, obj, arguments->NativeArgAt(1));
const Dart_Port destination_port_id = port.Id();
const bool can_send_any_object = isolate->origin_id() == port.origin_id();
if (ApiObjectConverter::CanConvert(obj.raw())) {
PortMap::PostMessage(
Message::New(destination_port_id, obj.raw(), Message::kNormalPriority));
} else {
MessageWriter writer(can_send_any_object);
// TODO(turnidge): Throw an exception when the return value is false?
PortMap::PostMessage(writer.WriteMessage(obj, destination_port_id,
Message::kNormalPriority));
}
return Object::null();
}
複製代碼
首先取出 SendPort 和 Instance 實例,而後建立出 Message 實例,最後調用 PortMap::PostMessage 發送消息。
// third_party/sdk/runtime/vm/port.cc
bool PortMap::PostMessage(std::unique_ptr<Message> message, bool before_events) {
MutexLocker ml(mutex_);
intptr_t index = FindPort(message->dest_port());
if (index < 0) {
return false;
}
ASSERT(index >= 0);
ASSERT(index < capacity_);
MessageHandler* handler = map_[index].handler;
ASSERT(map_[index].port != 0);
ASSERT((handler != NULL) && (handler != deleted_entry_));
handler->PostMessage(std::move(message), before_events);
return true;
}
複製代碼
根據 Message 的 dest_port 找到 index ,再從哈希表中取出 handler,這裏的 handler 就是在初始化 Dart_Port 傳入的 isolate->message_handler() ,也就是 IsolateMessageHandler 實例,不過 IsolateMessageHandler 並無重寫 PostMessage 函數。
// third_party/sdk/runtime/vm/message_handler.cc
void MessageHandler::PostMessage(std::unique_ptr<Message> message, bool before_events) {
Message::Priority saved_priority;
{
saved_priority = message->priority();
if (message->IsOOB()) {
oob_queue_->Enqueue(std::move(message), before_events);
} else {
queue_->Enqueue(std::move(message), before_events);
}
if (paused_for_messages_) {
ml.Notify();
}
if (pool_ != nullptr && !task_running_) {
ASSERT(!delete_me_);
task_running_ = true;
const bool launched_successfully = pool_->Run<MessageHandlerTask>(this);
ASSERT(launched_successfully);
}
}
// Invoke any custom message notification.
MessageNotify(saved_priority);
}
複製代碼
message 分爲兩種,oob 和非 oob,實際上就是優先級的區分:
// third_party/sdk/runtime/vm/message.h
typedef enum {
kNormalPriority = 0, // Deliver message when idle.
kOOBPriority = 1, // Deliver message asap.
// Iteration.
kFirstPriority = 0,
kNumPriorities = 2,
} Priority;
複製代碼
不一樣優先級的 message 會被加入不一樣的隊列,oob_queue_ 和 queue_,完了調用 MessageNotify :
// third_party/sdk/runtime/vm/isolate.cc
void IsolateMessageHandler::MessageNotify(Message::Priority priority) {
if (priority >= Message::kOOBPriority) {
// Handle out of band messages even if the mutator thread is busy.
I->ScheduleInterrupts(Thread::kMessageInterrupt);
}
Dart_MessageNotifyCallback callback = I->message_notify_callback();
if (callback != nullptr) {
// Allow the embedder to handle message notification.
(*callback)(Api::CastIsolate(I));
}
}
複製代碼
此處的 priority 是 message 的 priority ,當優先級爲 kOOBPriority 時,會中斷當前的任務去處理這個 message 。而後從 isolate 中取出 callback 執行,再看 callback 究竟是誰。這要從 DartIsolate::Initialize 開始:
// runtime/dart_isolate.cc
bool DartIsolate::Initialize(Dart_Isolate dart_isolate, bool is_root_isolate) {
SetMessageHandlingTaskRunner(GetTaskRunners().GetUITaskRunner(),
is_root_isolate);
}
void DartIsolate::SetMessageHandlingTaskRunner( fml::RefPtr<fml::TaskRunner> runner, bool is_root_isolate) {
if (!is_root_isolate || !runner) {
return;
}
message_handling_task_runner_ = runner;
message_handler().Initialize(
[runner](std::function<void()> task) { runner->PostTask(task); });
}
複製代碼
從這裏就能夠看出,後續的 callback 會在 message_handling_task_runner_ 中運行,可是它實際上仍是 UITaskRunner ,因此說,Future 的異步是單線程的異步,而後再看:
// third_party/tonic/dart_message_handler.cc
void DartMessageHandler::Initialize(TaskDispatcher dispatcher) {1
// Only can be called once.
TONIC_CHECK(!task_dispatcher_ && dispatcher);
task_dispatcher_ = dispatcher;
Dart_SetMessageNotifyCallback(MessageNotifyCallback);
}
void DartMessageHandler::MessageNotifyCallback(Dart_Isolate dest_isolate) {
auto dart_state = DartState::From(dest_isolate);
TONIC_CHECK(dart_state);
dart_state->message_handler().OnMessage(dart_state);
}
複製代碼
// third_party/sdk/runtime/vm/dart_api_impl.cc
DART_EXPORT void Dart_SetMessageNotifyCallback( Dart_MessageNotifyCallback message_notify_callback) {
Isolate* isolate = Isolate::Current();
CHECK_ISOLATE(isolate);
{
NoSafepointScope no_safepoint_scope;
isolate->set_message_notify_callback(message_notify_callback);
}
if (message_notify_callback != nullptr && isolate->HasPendingMessages()) {
::Dart_ExitIsolate();
// If a new handler gets installed and there are pending messages in the
// queue (e.g. OOB messages for doing vm service work) we need to notify
// the newly registered callback, otherwise the embedder might never get
// notified about the pending messages.
message_notify_callback(Api::CastIsolate(isolate));
::Dart_EnterIsolate(Api::CastIsolate(isolate));
}
}
複製代碼
從以上代碼能夠看出,message_notify_callback 就是 MessageNotifyCallback 函數,這個函數調用了 OnMessage ,而 task_dispatcher_ ,就是上面給出的 UITaskRunner 的 dispatcher 。
// third_party/tonic/dart_message_handler.cc
void DartMessageHandler::MessageNotifyCallback(Dart_Isolate dest_isolate) {
auto dart_state = DartState::From(dest_isolate);
TONIC_CHECK(dart_state);
dart_state->message_handler().OnMessage(dart_state);
}
void DartMessageHandler::OnMessage(DartState* dart_state) {
auto task_dispatcher_ = dart_state->message_handler().task_dispatcher_;
// Schedule a task to run on the message loop thread.
auto weak_dart_state = dart_state->GetWeakPtr();
task_dispatcher_([weak_dart_state]() {
if (auto dart_state = weak_dart_state.lock()) {
dart_state->message_handler().OnHandleMessage(dart_state.get());
}
});
}
複製代碼
從 OnHandleMessage 開始,剩下的代碼開始在 task_dispatcher_ 中執行,也能夠說從這裏開始,開始了接收任務階段。
void DartMessageHandler::OnHandleMessage(DartState* dart_state) {
if (isolate_had_fatal_error_) {
// Don't handle any more messages.
return;
}
DartIsolateScope scope(dart_state->isolate());
DartApiScope dart_api_scope;
Dart_Handle result = Dart_Null();
bool error = false;
// On the first message, check if we should pause on isolate start.
if (!handled_first_message()) {
set_handled_first_message(true);
if (Dart_ShouldPauseOnStart()) {
// Mark that we are paused on isolate start.
Dart_SetPausedOnStart(true);
}
}
if (Dart_IsPausedOnStart()) {
} else if (Dart_IsPausedOnExit()) {
} else {
// We are processing messages normally.
result = Dart_HandleMessage();
// If the Dart program has set a return code, then it is intending to shut
// down by way of a fatal error, and so there is no need to emit a log
// message.
if (dart_state->has_set_return_code() && Dart_IsError(result) &&
Dart_IsFatalError(result)) {
error = true;
} else {
error = LogIfError(result);
}
dart_state->MessageEpilogue(result);
if (!Dart_CurrentIsolate()) {
isolate_exited_ = true;
return;
}
}
}
複製代碼
正常狀況下會調用 Dart_HandleMessage:
// third_party/sdk/runtime/vm/dart_api_impl.cc
DART_EXPORT Dart_Handle Dart_HandleMessage() {
Thread* T = Thread::Current();
Isolate* I = T->isolate();
CHECK_API_SCOPE(T);
CHECK_CALLBACK_STATE(T);
API_TIMELINE_BEGIN_END_BASIC(T);
TransitionNativeToVM transition(T);
if (I->message_handler()->HandleNextMessage() != MessageHandler::kOK) {
return Api::NewHandle(T, T->StealStickyError());
}
return Api::Success();
}
// third_party/sdk/runtime/vm/message_handler.cc
MessageHandler::MessageStatus MessageHandler::HandleNextMessage() {
// We can only call HandleNextMessage when this handler is not
// assigned to a thread pool.
MonitorLocker ml(&monitor_);
ASSERT(pool_ == NULL);
ASSERT(!delete_me_);
#if defined(DEBUG)
CheckAccess();
#endif
return HandleMessages(&ml, true, false);
}
MessageHandler::MessageStatus MessageHandler::HandleMessages( MonitorLocker* ml, bool allow_normal_messages, bool allow_multiple_normal_messages) {
ASSERT(monitor_.IsOwnedByCurrentThread());
// Scheduling of the mutator thread during the isolate start can cause this
// thread to safepoint.
// We want to avoid holding the message handler monitor during the safepoint
// operation to avoid possible deadlocks, which can occur if other threads are
// sending messages to this message handler.
//
// If isolate() returns nullptr [StartIsolateScope] does nothing.
ml->Exit();
StartIsolateScope start_isolate(isolate());
ml->Enter();
MessageStatus max_status = kOK;
Message::Priority min_priority =
((allow_normal_messages && !paused()) ? Message::kNormalPriority
: Message::kOOBPriority);
std::unique_ptr<Message> message = DequeueMessage(min_priority);
while (message != nullptr) {
intptr_t message_len = message->Size();
if (FLAG_trace_isolates) {
OS::PrintErr(
"[<] Handling message:\n"
"\tlen: %" Pd
"\n"
"\thandler: %s\n"
"\tport: %" Pd64 "\n",
message_len, name(), message->dest_port());
}
// Release the monitor_ temporarily while we handle the message.
// The monitor was acquired in MessageHandler::TaskCallback().
ml->Exit();
Message::Priority saved_priority = message->priority();
Dart_Port saved_dest_port = message->dest_port();
MessageStatus status = HandleMessage(std::move(message));
if (status > max_status) {
max_status = status;
}
ml->Enter();
if (FLAG_trace_isolates) {
OS::PrintErr(
"[.] Message handled (%s):\n"
"\tlen: %" Pd
"\n"
"\thandler: %s\n"
"\tport: %" Pd64 "\n",
MessageStatusString(status), message_len, name(), saved_dest_port);
}
// If we are shutting down, do not process any more messages.
if (status == kShutdown) {
ClearOOBQueue();
break;
}
// Remember time since the last message. Don't consider OOB messages so
// using Observatory doesn't trigger additional idle tasks.
if ((FLAG_idle_timeout_micros != 0) &&
(saved_priority == Message::kNormalPriority)) {
idle_start_time_ = OS::GetCurrentMonotonicMicros();
}
// Some callers want to process only one normal message and then quit. At
// the same time it is OK to process multiple OOB messages.
if ((saved_priority == Message::kNormalPriority) &&
!allow_multiple_normal_messages) {
// We processed one normal message. Allow no more.
allow_normal_messages = false;
}
// Reevaluate the minimum allowable priority. The paused state
// may have changed as part of handling the message. We may also
// have encountered an error during message processing.
//
// Even if we encounter an error, we still process pending OOB
// messages so that we don't lose the message notification.
min_priority = (((max_status == kOK) && allow_normal_messages && !paused())
? Message::kNormalPriority
: Message::kOOBPriority);
message = DequeueMessage(min_priority);
}
return max_status;
}
複製代碼
直到 MessageHandler::HandleMessages 爲止,這裏又是一個 while 循環,不斷調用 DequeueMessage 取出 message ,直到全部的 message 執行完畢,單個 message 的處理,則是調用 HandleMessage ,
在 HandleMessage 中首先作的是獲取 msg_handler ,調用的是 DartLibraryCalls::LookupHandler ,
RawObject* DartLibraryCalls::LookupHandler(Dart_Port port_id) {
Thread* thread = Thread::Current();
Zone* zone = thread->zone();
Function& function = Function::Handle(
zone, thread->isolate()->object_store()->lookup_port_handler());
const int kTypeArgsLen = 0;
const int kNumArguments = 1;
if (function.IsNull()) {
Library& isolate_lib = Library::Handle(zone, Library::IsolateLibrary());
ASSERT(!isolate_lib.IsNull());
const String& class_name = String::Handle(
zone, isolate_lib.PrivateName(Symbols::_RawReceivePortImpl()));
const String& function_name = String::Handle(
zone, isolate_lib.PrivateName(Symbols::_lookupHandler()));
function = Resolver::ResolveStatic(isolate_lib, class_name, function_name,
kTypeArgsLen, kNumArguments,
Object::empty_array());
ASSERT(!function.IsNull());
thread->isolate()->object_store()->set_lookup_port_handler(function);
}
const Array& args = Array::Handle(zone, Array::New(kNumArguments));
args.SetAt(0, Integer::Handle(zone, Integer::New(port_id)));
const Object& result =
Object::Handle(zone, DartEntry::InvokeFunction(function, args));
return result.raw();
}
複製代碼
能夠看出,這就是一個典型的 c++ 調用 dart 的流程,先找到 function ,而後構建參數,最後 DartEntry::InvokeFunction 調用這個函數。從 8~20 行得知,這是 _RawReceivePortImpl 的 _lookupHandler 函數:
@pragma("vm:entry-point", "call")
static _lookupHandler(int id) {
var result = _handlerMap[id];
return result;
}
複製代碼
根據 id 從 _handlerMap 中找到一個值返回,這個值,其實就是會調函數,設置時機以下:
void set handler(Function value) {
_handlerMap[this._get_id()] = value;
}
複製代碼
這就是在初始化 _RawReceivePortImpl 以後調用的,_get_id 返回的也正是 dest_id 。而後,HandleMessage 中對 message 分爲三種狀況進行處理:
正常的處理以下:
const Object& result =
Object::Handle(zone, DartLibraryCalls::HandleMessage(msg_handler, msg));
if (result.IsError()) {
status = ProcessUnhandledException(Error::Cast(result));
} else {
ASSERT(result.IsNull());
}
複製代碼
DartLibraryCalls::HandleMessage:
// third_party/sdk/runtime/vm/dart_entry.cc
RawObject* DartLibraryCalls::HandleMessage(const Object& handler, const Instance& message) {
Thread* thread = Thread::Current();
Zone* zone = thread->zone();
Isolate* isolate = thread->isolate();
Function& function = Function::Handle(
zone, isolate->object_store()->handle_message_function());
const int kTypeArgsLen = 0;
const int kNumArguments = 2;
if (function.IsNull()) {
Library& isolate_lib = Library::Handle(zone, Library::IsolateLibrary());
ASSERT(!isolate_lib.IsNull());
const String& class_name = String::Handle(
zone, isolate_lib.PrivateName(Symbols::_RawReceivePortImpl()));
const String& function_name = String::Handle(
zone, isolate_lib.PrivateName(Symbols::_handleMessage()));
function = Resolver::ResolveStatic(isolate_lib, class_name, function_name,
kTypeArgsLen, kNumArguments,
Object::empty_array());
ASSERT(!function.IsNull());
isolate->object_store()->set_handle_message_function(function);
}
const Array& args = Array::Handle(zone, Array::New(kNumArguments));
args.SetAt(0, handler);
args.SetAt(1, message);
#if !defined(PRODUCT)
if (isolate->debugger()->IsStepping()) {
// If the isolate is being debugged and the debugger was stepping
// through code, enable single stepping so debugger will stop
// at the first location the user is interested in.
isolate->debugger()->SetResumeAction(Debugger::kStepInto);
}
#endif
const Object& result =
Object::Handle(zone, DartEntry::InvokeFunction(function, args));
ASSERT(result.IsNull() || result.IsError());
return result.raw();
}
複製代碼
這又是從 c++ 中調用 dart 函數,從代碼可知,調用的是 _RawReceivePortImpl 的 _handleMessage 函數:
@pragma("vm:entry-point", "call")
static void _handleMessage(Function handler, var message) {
// TODO(floitsch): this relies on the fact that any exception aborts the
// VM. Once we have non-fatal global exceptions we need to catch errors
// so that we can run the immediate callbacks.
handler(message);
_runPendingImmediateCallback();
}
複製代碼
以 message 爲參數,調用 handler 函數,即最初傳進來的 _Timer 的 _handleMessage 函數:
// third_party/sdk/runtime/lib/timer_impl.dart
static void _handleMessage(msg) {
var pendingTimers;
if (msg == _ZERO_EVENT) {
pendingTimers = _queueFromZeroEvent();
assert(pendingTimers.length > 0);
} else {
assert(msg == _TIMEOUT_EVENT);
_scheduledWakeupTime = null; // Consumed the last scheduled wakeup now.
pendingTimers = _queueFromTimeoutEvent();
}
_runTimers(pendingTimers);
// Notify the event handler or shutdown the port if no more pending
// timers are present.
_notifyEventHandler();
}
複製代碼
這裏根據 msg 的不一樣從不一樣的隊列中取 pendingTimers ,兩者分別實現以下:
static List _queueFromZeroEvent() {
var pendingTimers = new List();
assert(_firstZeroTimer != null);
// Collect pending timers from the timer heap that have an expiration prior
// to the currently notified zero timer.
var timer;
while (!_heap.isEmpty && (_heap.first._compareTo(_firstZeroTimer) < 0)) {
timer = _heap.removeFirst();
pendingTimers.add(timer);
}
// Append the first zero timer to the pending timers.
timer = _firstZeroTimer;
_firstZeroTimer = timer._indexOrNext;
timer._indexOrNext = null;
pendingTimers.add(timer);
return pendingTimers;
}
static List _queueFromTimeoutEvent() {
var pendingTimers = new List();
if (_firstZeroTimer != null) {
// Collect pending timers from the timer heap that have an expiration
// prior to the next zero timer.
// By definition the first zero timer has been scheduled before the
// current time, meaning all timers which are "less than" the first zero
// timer are expired. The first zero timer will be dispatched when its
// corresponding message is delivered.
var timer;
while (!_heap.isEmpty && (_heap.first._compareTo(_firstZeroTimer) < 0)) {
timer = _heap.removeFirst();
pendingTimers.add(timer);
}
} else {
// Collect pending timers from the timer heap which have expired at this
// time.
var currentTime = VMLibraryHooks.timerMillisecondClock();
var timer;
while (!_heap.isEmpty && (_heap.first._wakeupTime <= currentTime)) {
timer = _heap.removeFirst();
pendingTimers.add(timer);
}
}
return pendingTimers;
}
複製代碼
當 msg 爲 _ZERO_EVENT 時,會取出一個 _firstZeroTimer 隊列中的任務和 n 個 _heap 隊列中達到執行時間的任務,而當 msg 不爲 _ZERO_EVENT 時,則會取出 n 個執行時間先於 _firstZeroTimer 第一個任務的的任務,或者是執行時間先於當前時間的任務。取完以後則是調用 _runTimers 執行任務。
static void _runTimers(List pendingTimers) {
// If there are no pending timers currently reset the id space before we
// have a chance to enqueue new timers.
if (_heap.isEmpty && (_firstZeroTimer == null)) {
_idCount = 0;
}
// Fast exit if no pending timers.
if (pendingTimers.length == 0) {
return;
}
// Trigger all of the pending timers. New timers added as part of the
// callbacks will be enqueued now and notified in the next spin at the
// earliest.
_handlingCallbacks = true;
var i = 0;
try {
for (; i < pendingTimers.length; i++) {
// Next pending timer.
var timer = pendingTimers[i];
timer._indexOrNext = null;
// One of the timers in the pending_timers list can cancel
// one of the later timers which will set the callback to
// null. Or the pending zero timer has been canceled earlier.
if (timer._callback != null) {
var callback = timer._callback;
if (!timer._repeating) {
// Mark timer as inactive.
timer._callback = null;
} else if (timer._milliSeconds > 0) {
var ms = timer._milliSeconds;
int overdue =
VMLibraryHooks.timerMillisecondClock() - timer._wakeupTime;
if (overdue > ms) {
int missedTicks = overdue ~/ ms;
timer._wakeupTime += missedTicks * ms;
timer._tick += missedTicks;
}
}
timer._tick += 1;
callback(timer);
// Re-insert repeating timer if not canceled.
if (timer._repeating && (timer._callback != null)) {
timer._advanceWakeupTime();
timer._enqueue();
}
// Execute pending micro tasks.
var immediateCallback = _removePendingImmediateCallback();
if (immediateCallback != null) {
immediateCallback();
}
}
}
} finally {
_handlingCallbacks = false;
// Re-queue timers we didn't get to.
for (i++; i < pendingTimers.length; i++) {
var timer = pendingTimers[i];
timer._enqueue();
}
_notifyEventHandler();
}
}
複製代碼
基本邏輯就是一個 for 循環,對於每個 timer,執行其 callback 函數,一步步回溯回去,就會調用到 result._complete(computation()) 函數,今後處開始,就到了上層邏輯代碼。