探索 Flutter 異步消息的實現

1、簡介

咱們在進行 Android 開發的時候,會經過建立一個 Handler 並調用其 sendMessage  或 Post 方法來進行異步消息調用,其背後涉及到了三個面試常常被問的類:Handler,Looper,MessageQueue,內部原理我想作過 Android 開發的基本都瞭解。
Flutter 使用 dart 開發,其也有相似的異步消息機制,具體參見這篇文章:https://dart.dev/articles/arc...,按這個文章的說法,在 Dart 應用中有一個事件循環(message loop)和兩個隊列(Event queue 和 Microtask queue)。java

  1. Event queue : 包含了全部的外部事件:I/O,鼠標點擊,繪製,定時器,Dart isolate 的消息等,其實這塊又根據消息的優先級細分紅了兩個隊列,後面會有介紹。
  2. Microtask queue :事件處理代碼有時須要在當前 event 以後,且在下一個 event 以前作一些任務。
  • 兩種隊列的消息處理流程大體如圖所示:

dart 提供了 dart:async 庫來對這兩個隊列進行操做,主要是以下兩個API:android

  1. Future 類,建立一個定時器事件到 Event queue 尾部。
  2. scheduleMicrotask() 方法,添加一個事件到 Microtask queue 尾部

下面將會分析這兩個 API 背後的實現,涉及的是 flutter engine 的源碼,能夠參考官方的 wiki 來下載:
https://github.com/flutter/fl...git

2、初始化

1.  建立 MessageLoop

  • 在啓動 Flutter 的時候,引擎會額外建立三個線程:UI Thread,IO Thread, GPU Thread,併爲每一個線程都建立一個 MessageLoop,以後各個線程就進入的消息循環的狀態,等待新的消息來處理,具體流程以下:
    /src/flutter/shell/platform/android/android_shell_holder.cc
ThreadHost thread_host_; 
AndroidShellHolder::AndroidShellHolder(
    flutter::Settings settings,
    fml::jni::JavaObjectWeakGlobalRef java_object,
    bool is_background_view)
    : settings_(std::move(settings)), java_object_(java_object) {
......
  if (is_background_view) {
    thread_host_ = {thread_label, ThreadHost::Type::UI};
  } else {
    // 建立三個線程
    thread_host_ = {thread_label, ThreadHost::Type::UI | ThreadHost::Type::GPU |
                                      ThreadHost::Type::IO};
  }
......
}
  • 進一步分析 ThreadHost 的建立,最後來到了建立 Thread,每一個 Thread 都會建立一個 MessageLoop 並獲取其 TaskRunner,TaskRunner 是用來外部向 MessageLoop 中 Post Task 的。順帶說一下,在 Android 上 MessageLoop 的實現仍是使用的系統自身的 Looper 機制,這裏是經過 NDK 的 ALooper 相關接口來實現的。具體代碼以下:
    /src/flutter/fml/thread.cc
Thread::Thread(const std::string& name) : joined_(false) {
  fml::AutoResetWaitableEvent latch;
  fml::RefPtr<fml::TaskRunner> runner;
  thread_ = std::make_unique<std::thread>([&latch, &runner, name]() -> void {
    SetCurrentThreadName(name);
    // 建立 MessageLoop
    fml::MessageLoop::EnsureInitializedForCurrentThread();
    auto& loop = MessageLoop::GetCurrent();
    // 獲取 TaskRunner
    runner = loop.GetTaskRunner();
    latch.Signal();
    loop.Run();
  });
  latch.Wait();
  task_runner_ = runner;
}
  • MessageLoop 建立好後,咱們就能夠經過 TaskRunner 向其發送 Task 了,這裏須要注意 MessageLoop 執行的 Task 僅是一個 無參的閉包 。相似這樣:
auto jni_exit_task([key = thread_destruct_key_]() {
  FML_CHECK(pthread_setspecific(key, reinterpret_cast<void*>(1)) == 0);
});
thread_host_.ui_thread->GetTaskRunner()->PostTask(jni_exit_task);

2.  建立 Root Isolate

在 dart 語言中是沒有線程的,而是使用相似於線程但相互之間不共享堆內存的 isolate,表明一個獨立的 dart 程序執行環境。一樣 Flutter 的 dart 代碼是運行在一個叫 root isolate 的 isolate 中,下面簡要列下 root isolate 的建立過程。github

a.啓動 dart vm面試

這個步驟的具體流程,你們能夠順着 /engine-v1.5.4/src/flutter/runtime/dart_vm.cc 中的 DartVM 構造方法去跟進分析。在 dart vm 啓動過程當中會建立 vm isolate 和 PortMap,這兩個的具體做用下面有介紹。shell

b.建立 root isolateapi

root isolate 是在 UI 線程中建立的,具體流程見 /src/flutter/runtime/dart_isolate.cc 的 CreateRootIsolate 方法。因爲 isolate 是對當前線程執行環境的一個抽象表示,因此其內部存儲了不少信息,對於異步消息這塊有四個關鍵的信息是須要注意的。閉包

下面三個字段是定義在 dart vm 層的 Isolate 類中,具體見 /src/third_party/dart/runtime/vm/isolate.h。架構

  • main_port :能夠看作該 isolate 的標識,是一個整數;
  • message_handler :顧名思義,用來管理每一個 isolate 的 Event queue,其內部根據 message 的優先級將消息分爲了兩個隊列:普通優先級的 queue 和 OOB 優先級的 oob_queue。瞭解 TCP 協議的應該瞭解 TCP 中有帶外數據(即優先數據),isolate 的 OOB 優先級也是相似的意思,OOB 優先級的消息會被優先處理,目前看到有這麼幾種 OOB 消息:
// 這些主要和 isolate 的生命週期相關
kPauseMsg = 1,
kResumeMsg = 2,
kPingMsg = 3,
kKillMsg = 4,
kAddExitMsg = 5,
kDelExitMsg = 6,
kAddErrorMsg = 7,
kDelErrorMsg = 8,
kErrorFatalMsg = 9,
// 能夠注意下 kLowMemoryMsg ,若是有大量 OOB 懷疑是內存不夠了
kInterruptMsg = 10,     // Break in the debugger.
kInternalKillMsg = 11,  // Like kill, but does not run exit listeners, etc.
kLowMemoryMsg = 12,     // Run compactor, etc.
kDrainServiceExtensionsMsg = 13,  // Invoke pending service extensions
  • message_notify_callback :message_handler 收到消息後會調用該變量指向的函數去處理;

Flutter 引擎層會對 dart vm 的 isolate 實例作一層包裝( DartIsolate 類),其內部定義了:app

  • microtask_queue: 用來存儲 microtask 消息,可見在 Flutter 引擎中,Microtask queue 並非由 dart vm 層來管理的

    前面已經說過 isolate 之間是不能直接互相訪問,如圖:

能夠看出 isolate 之間是共享 vm isolate 的堆內存區域的,有點相似於操做系統的內核空間,vm isolate 的堆內存存儲了 dart vm 內部的核心數據(內置類,內置對象)。除了 vm isolate,不一樣 isolate 之間的堆內存是不能直接訪問的,爲此 dart vm 提供了 isolate 之間的通訊機制,負責通訊路由的大管家就是 PortMap,其內部實現就是一張 Hash 表,Key 爲 isolate 的 main_port,Value 爲 isolate 的 message_handler。

3、建立 Future

使用 dart 開發必定會用到 Future,當咱們經過 Future 構造方法建立一個實例的時候,就會建立一個定時器消息到 Event queue,下面咱們將分析這個流程。總體架構圖:

1.  dart 層建立 Future

建立 Future 的時候,內部會經過 Timer 構造一個定時器,具體代碼以下:

/src/out/host_release/dart-sdk/lib/async/future.dart

factory Future(FutureOr<T> computation()) {
  _Future<T> result = new _Future<T>();
  Timer.run(() {
    try {
      result._complete(computation());
    } catch (e, s) {
      _completeWithErrorCallback(result, e, s);
    }
  });
  return result;
}

跟進 Timer 的實現,具體代碼以下:

/src/out/host_release/dart-sdk/lib/async/timer.dart

static void run(void callback()) {
  new Timer(Duration.zero, callback);
}

factory Timer(Duration duration, void callback()) {
  if (Zone.current == Zone.root) {
    return Zone.current.createTimer(duration, callback);
  }
......
}

這裏假定 duration == Duration.zero,Zone.current == Zone.root ,進而到 rootZone 的 createTimer 方法,裏面又調用了 Timer 的 _createTimer 方法:

/src/out/host_release/dart-sdk/lib/async/zone.dart

class _RootZone extends _Zone {
......
  Timer createTimer(Duration duration, void f()) {
    return Timer._createTimer(duration, f);
  }
......
}

/src/out/host_release/dart-sdk/lib/async/timer.dart

external static Timer _createTimer(Duration duration, void callback());

能夠看到 _createTimer 方法是個 external 的,按照 dart 語言的規範,external 方法的實現都在對應的 patch 文件中( timer_patch.dart),內部經過 _TimerFactory._factory 來建立 Timer,具體代碼以下:

/src/third_party/dart/runtime/lib/timer_patch.dart

@patch
class Timer {
......
  @patch
  static Timer _createTimer(Duration duration, void callback()) {
    if (_TimerFactory._factory == null) {
      _TimerFactory._factory = VMLibraryHooks.timerFactory;
    }
......
    int milliseconds = duration.inMilliseconds;
    if (milliseconds < 0) milliseconds = 0;
   // 注意此處將外部的 callback 又包了一層
    return _TimerFactory._factory(milliseconds, (_) {
      callback();
    }, false);
  }
......
}

經過上面的代碼,咱們知道 _TimerFactory._factory = VMLibraryHooks.timerFactory, VMLibraryHooks.timerFactory 又是在 root isolate 初始化時經過調用 _setupHooks 方法設置的,具體代碼以下:

/src/third_party/dart/runtime/lib/timer_impl.dart

@pragma("vm:entry-point", "call")
_setupHooks() {
  VMLibraryHooks.timerFactory = _Timer._factory;
}
// VMLibraryHooks.timerFactory 指向的該方法
// 咱們假設建立的是非 repeating 消息,而且 milliSeconds 爲 0
static Timer _factory(
      int milliSeconds, void callback(Timer timer), bool repeating) {
......
    return new _Timer(milliSeconds, callback);
  }
}

factory _Timer(int milliSeconds, void callback(Timer timer)) {
  return _createTimer(callback, milliSeconds, false);
}
// 建立一個 Timer 實例並調用 _enqueue 將其加入到隊列
static Timer _createTimer(
    void callback(Timer timer), int milliSeconds, bool repeating) {
......
  _Timer timer =
      new _Timer._internal(callback, wakeupTime, milliSeconds, repeating);
  timer._enqueue();
  return timer;
}

_Timer._internal(
    this._callback, this._wakeupTime, this._milliSeconds, this._repeating)
    : _id = _nextId();

// 這裏 _milliSeconds == 0,會向 ZeroTimer 隊列插入消息,而後調用 _notifyZeroHandler
void _enqueue() {
  if (_milliSeconds == 0) {
    if (_firstZeroTimer == null) {
      _lastZeroTimer = this;
      _firstZeroTimer = this;
    } else {
      _lastZeroTimer._indexOrNext = this;
      _lastZeroTimer = this;
    }
    // Every zero timer gets its own event.
    _notifyZeroHandler();
  } else {
   ......
    // 延遲消息這裏先不分析
  }
}

折騰了一大圈,最後只是構造了一個 _Timer 實例並把其加入到 ZeroTimer 隊列中,若是是延遲消息則會加入到 TimeoutTimerHeap 中,最後調用 _notifyZeroHandler 方法, 其主要作以下操做:

  • 建立 RawReceivePort 並設置一個叫 _handleMessage 方法作爲引擎層的回調方法
  • 向引擎層 Event queue 發送一個普通優先級的  _ZERO_EVENT ,引擎層處理該消息的時候會最終回調到上面設置的 _handleMessage 方法。

具體代碼以下:

/src/third_party/dart/runtime/lib/timer_impl.dart

static void _notifyZeroHandler() {
  if (_sendPort == null) {
    _createTimerHandler();
  }
// 底層會調到 PortMap 的 PostMessage 方法,進而喚醒消息處理,後面會分析這個流程
  _sendPort.send(_ZERO_EVENT);
}
// 建立和引擎層通訊的 RawReceivePort,並設置引擎層的回調方法 _handleMessage
static void _createTimerHandler() {
  assert(_receivePort == null);
  assert(_sendPort == null);
  _receivePort = new RawReceivePort(_handleMessage);
  _sendPort = _receivePort.sendPort;
  _scheduledWakeupTime = null;
}

/src/third_party/dart/runtime/lib/isolate_patch.dart

@patch
class RawReceivePort {
  @patch
  factory RawReceivePort([Function handler]) {
    _RawReceivePortImpl result = new _RawReceivePortImpl();
    result.handler = handler;
    return result;
  }
}

// 最終將回調設置到 _RawReceivePortImpl 的 _handlerMap 中,引擎層會從這個 map 尋找消息的 handler
@pragma("vm:entry-point")
class _RawReceivePortImpl implements RawReceivePort {
    void set handler(Function value) {
      _handlerMap[this._get_id()] = value;
    }
}

_handleMessage 回調方法會收集 Timer 並執行,具體代碼實現以下:

/src/third_party/dart/runtime/lib/timer_impl.dart

static void _handleMessage(msg) {
  var pendingTimers;
  if (msg == _ZERO_EVENT) {
    // 找到全部的待處理 Timers
    pendingTimers = _queueFromZeroEvent();
    assert(pendingTimers.length > 0);
  } else {
    ......
    // 延時消息這裏不分析
  }
// 處理Timer,即調用設置的 callback
  _runTimers(pendingTimers);
......
}

2.  向 Event Queue 發送消息

前面說到 RawReceiverPort 會向引擎層 Event queue 發送一個 _ZERO_EVENT  ,其內部是經過調用 PortMap 的 PostMessage 方法將消息發送到 Event queue,該方法首先會根據接收方的 port id 找到對應的 message_handler,而後將消息根據優先級保存到相應的 queue 中,最後喚醒 message_notify_callback 回調函數 ,具體代碼以下:

/src/third_party/dart/runtime/vm/port.cc

bool PortMap::PostMessage(Message* message, bool before_events) {
  ......
  intptr_t index = FindPort(message->dest_port());
  ......
  MessageHandler* handler = map_[index].handler;
 ......
  handler->PostMessage(message, before_events);
  return true;
}

/src/third_party/dart/runtime/vm/message_handler.cc

void MessageHandler::PostMessage(Message* message, bool before_events) {
  Message::Priority saved_priority;
  bool task_running = true;
 ......
  // 根據消息優先級進入不一樣的隊列
    if (message->IsOOB()) {
      oob_queue_->Enqueue(message, before_events);
    } else {
      queue_->Enqueue(message, before_events);
    }
   ......
//喚醒並處理消息
  MessageNotify(saved_priority);
}

/src/third_party/dart/runtime/vm/isolate.cc

void IsolateMessageHandler::MessageNotify(Message::Priority priority) {
  if (priority >= Message::kOOBPriority) {
    I->ScheduleInterrupts(Thread::kMessageInterrupt);
  }
// 最後調用的 message_notify_callback 所指向的函數
  Dart_MessageNotifyCallback callback = I->message_notify_callback();
  if (callback) {
    (*callback)(Api::CastIsolate(I));
  }
}

3.   Event Queue 消息處理

前面消息已經發送成功並調用了消息處理喚醒的操做,下面咱們須要知道 message_notify_callback 所指向的函數的實現, root isolate 在初始化時會設置該變量,具體代碼以下:

/src/flutter/runtime/dart_isolate.cc

bool DartIsolate::Initialize(Dart_Isolate dart_isolate, bool is_root_isolate) {
  ......
  // 設置 message handler 的 task runner 爲 UI Task Runner
  SetMessageHandlingTaskRunner(GetTaskRunners().GetUITaskRunner(),
                               is_root_isolate);
  ......
  return true;
}
void DartIsolate::SetMessageHandlingTaskRunner(
    fml::RefPtr<fml::TaskRunner> runner,
    bool is_root_isolate) {
 ......
  message_handler().Initialize(
      [runner](std::function<void()> task) { runner->PostTask(task); });
}

進一步跟進分析發現經過 Dart_SetMessageNotifyCallback 將  root isolate 的 message_notify_callback 設置爲 MessageNotifyCallback 方法,具體代碼以下:

/src/third_party/tonic/dart_message_handler.cc

void DartMessageHandler::Initialize(TaskDispatcher dispatcher) {
  TONIC_CHECK(!task_dispatcher_ && dispatcher);
  task_dispatcher_ = dispatcher;
  Dart_SetMessageNotifyCallback(MessageNotifyCallback);
}

MessageNotifyCallback 會在 Event queue 收到消息後執行,但其執行過程當中並無拿到 Event queue 中的消息,而是往 UI Thread 的 MessageLoop Post 了一個 Task 閉包,這個 Task 閉包會經過調用 Dart_HandleMessage 來處理 Event queue 中的消息,具體代碼流程以下:

/src/third_party/tonic/dart_message_handler.cc

void DartMessageHandler::MessageNotifyCallback(Dart_Isolate dest_isolate) {
  auto dart_state = DartState::From(dest_isolate);
  TONIC_CHECK(dart_state);
  dart_state->message_handler().OnMessage(dart_state);
}

void DartMessageHandler::OnMessage(DartState* dart_state) {
  auto task_dispatcher_ = dart_state->message_handler().task_dispatcher_;
  // 往 ui 線程 MessageLoop Post 了一個 Task
  auto weak_dart_state = dart_state->GetWeakPtr();
  task_dispatcher_([weak_dart_state]() {
    if (auto dart_state = weak_dart_state.lock()) {
      dart_state->message_handler().OnHandleMessage(dart_state.get());
    }
  });
}

void DartMessageHandler::OnHandleMessage(DartState* dart_state) {
  ......
  if (Dart_IsPausedOnStart()) {
    ......
  } else if (Dart_IsPausedOnExit()) {
   ......
  } else {
     // 調用 Dart_HandleMessage 方法處理消息
    result = Dart_HandleMessage();
   ......
  }
 ......
}

Dart_HandleMessage 的實現很簡單,只是調用 message_handler 的 HandleNextMessage 方法,具體代碼實現以下:

/src/third_party/dart/runtime/vm/dart_api_impl.cc

DART_EXPORT Dart_Handle Dart_HandleMessage() {
......
  if (I->message_handler()->HandleNextMessage() != MessageHandler::kOK) {
    return Api::NewHandle(T, T->StealStickyError());
  }
  return Api::Success();
}

咱們進一步跟進  HandleNextMessage 方法的實現,最終來到以下代碼:

/src/third_party/dart/runtime/vm/message_handler.cc

// 依次遍歷 message_handler 的消息隊列,對每一個消息進程處理
MessageHandler::MessageStatus MessageHandler::HandleMessages(
    MonitorLocker* ml,
    bool allow_normal_messages,
    bool allow_multiple_normal_messages) {
......
  Message* message = DequeueMessage(min_priority);
  while (message != NULL) {
    ......
    MessageStatus status = HandleMessage(message);
   ......
    message = DequeueMessage(min_priority);
  }
  return max_status;
}

// 取消息的時候會優先處理 OOB Message
Message* MessageHandler::DequeueMessage(Message::Priority min_priority) {
  Message* message = oob_queue_->Dequeue();
  if ((message == NULL) && (min_priority < Message::kOOBPriority)) {
    message = queue_->Dequeue();
  }
  return message;
}

每一個消息的處理都是在 HandleMessage 方法中,該方法會根據不一樣的消息優先級作相應的處理,具體代碼以下:

/src/third_party/dart/runtime/vm/isolate.cc

MessageHandler::MessageStatus IsolateMessageHandler::HandleMessage(
    Message* message) {
......
  Object& msg_handler = Object::Handle(zone);
// 非 OOB 消息,須要獲取 dart 層的  handler 函數
  if (!message->IsOOB() && (message->dest_port() != Message::kIllegalPort)) {
    msg_handler = DartLibraryCalls::LookupHandler(message->dest_port());
    ......
  }
......
  MessageStatus status = kOK;
  if (message->IsOOB()) {
   // 處理 OOB 消息,詳細實現可本身看代碼,這裏不分析OOB消息
   ......
  } else if (message->dest_port() == Message::kIllegalPort) {
    ......
  } else {
    ......
    // 調用前面找到的 msg_handler 來處理普通消息
    const Object& result =
        Object::Handle(zone, DartLibraryCalls::HandleMessage(msg_handler, msg));
   ......
  }
  delete message;
  return status;
}

這裏咱們主要看普通消息的處理邏輯,首先會經過調用 DartLibraryCalls::LookupHandler 方法來從 dart 層尋找相應的 handler 函數,而後經過 DartLibraryCalls::HandleMessage 執行相應的處理函數,具體實現代碼以下:

/src/third_party/dart/runtime/vm/dart_entry.cc

RawObject* DartLibraryCalls::LookupHandler(Dart_Port port_id) {
  Thread* thread = Thread::Current();
  Zone* zone = thread->zone();
  Function& function = Function::Handle(
      zone, thread->isolate()->object_store()->lookup_port_handler());
  const int kTypeArgsLen = 0;
  const int kNumArguments = 1;
// 若是沒有消息處理方法,則進行查找,最終找到的是 RawReceivePortImpl 的 _lookupHandler 方法。
  if (function.IsNull()) {
    Library& isolate_lib = Library::Handle(zone, Library::IsolateLibrary());
    ASSERT(!isolate_lib.IsNull());
    const String& class_name = String::Handle(
        zone, isolate_lib.PrivateName(Symbols::_RawReceivePortImpl()));
    const String& function_name = String::Handle(
        zone, isolate_lib.PrivateName(Symbols::_lookupHandler()));
    function = Resolver::ResolveStatic(isolate_lib, class_name, function_name,
                                       kTypeArgsLen, kNumArguments,
                                       Object::empty_array());
    ASSERT(!function.IsNull());
    thread->isolate()->object_store()->set_lookup_port_handler(function);
  }
// 執行消息處理函數
  const Array& args = Array::Handle(zone, Array::New(kNumArguments));
  args.SetAt(0, Integer::Handle(zone, Integer::New(port_id)));
  const Object& result =
      Object::Handle(zone, DartEntry::InvokeFunction(function, args));
  return result.raw();
}

最終執行的是 RawReceivePortImpl 的 _lookupHandler 方法,在前面在建立 Future 的時候咱們已經設置 _handleMessage 到 _handlerMap 中,_lookupHandler 方法會從 _handlerMap 中找到設置的回調方法,最後執行回調方法。具體代碼以下:

/src/third_party/dart/runtime/lib/isolate_patch.dart

@pragma("vm:entry-point")
class _RawReceivePortImpl implements RawReceivePort {
 ......
  // Called from the VM to retrieve the handler for a message.
  @pragma("vm:entry-point", "call")
  static _lookupHandler(int id) {
    var result = _handlerMap[id];
    return result;
  }
......
}

Future 的建立到這就分析完了,整個過程涉及到了 EventQueue 的消息收發。
上面主要分析了非延遲消息的處理,若是是延遲的 Timer 會怎麼處理呢?這裏簡單提一下,在 dart vm 內部還有個 EventHandler 線程,若是是延遲消息則會經過管道向這個線程寫入延遲數據,這個線程會負責延遲計數,到時間了就往引擎層 Post 喚醒消息,具體代碼可參見/src/third_party/dart/runtime/bin/eventhandler.cc,這裏就再也不贅述,感興趣的能夠自行分析。

至此,經過分析 Future 咱們已經把 Event Queue 的消息處理流程瞭解了。

4、Microtask

1.  向  Microtask queue 發送消息

假設 Zone. current 爲 rootZone, 直接看 scheduleMicrotask 方法的實現:

/src/third_party/dart/sdk/lib/async/schedule_microtask.dart

void scheduleMicrotask(void callback()) {
  _Zone currentZone = Zone.current;
  if (identical(_rootZone, currentZone)) {
    _rootScheduleMicrotask(null, null, _rootZone, callback);
    return;
  }
......
}

跟進 _rootScheduleMicrotask 方法的實現,最終來到 _scheduleAsyncCallback 方法,該方法作了兩件事情:

  • 將傳入的 callback 加入 callback 隊列
  • 將 _startMicrotaskLoop 做爲閉包參數調用  _AsyncRun._scheduleImmediate 方法,_startMicrotaskLoop 中會依次執行 callback 隊列保存的回調。

具體代碼以下:

/src/third_party/dart/sdk/lib/async/schedule_microtask.dart

void _scheduleAsyncCallback(_AsyncCallback callback) {
  _AsyncCallbackEntry newEntry = new _AsyncCallbackEntry(callback);
  if (_nextCallback == null) {
    _nextCallback = _lastCallback = newEntry;
    if (!_isInCallbackLoop) {
      _AsyncRun._scheduleImmediate(_startMicrotaskLoop);
    }
  } else {
    _lastCallback.next = newEntry;
    _lastCallback = newEntry;
  }
}
// 該方法被做爲回調設置到引擎,會在處理全部的 Microtask 的時候執行 
void _startMicrotaskLoop() {
  _isInCallbackLoop = true;
  try {
    _microtaskLoop();
  } finally {
    _lastPriorityCallback = null;
    _isInCallbackLoop = false;
    if (_nextCallback != null) {
      _AsyncRun._scheduleImmediate(_startMicrotaskLoop);
    }
  }
}

class _AsyncRun {
  external static void _scheduleImmediate(void callback());
}

根據前面的經驗,會在對應的 patch 文件中找到 _AsyncRun._scheduleImmediate 的實現,其內部調用了 _ScheduleImmediate._closure 指向的方法。

具體代碼以下:

/src/third_party/dart/runtime/lib/schedule_microtask_patch.dart

@patch
class _AsyncRun {
  @patch
  static void _scheduleImmediate(void callback()) {
    if (_ScheduleImmediate._closure == null) {
      throw new UnsupportedError("Microtasks are not supported");
    }
    _ScheduleImmediate._closure(callback);
  }
}
// 經過該方法設置 _ScheduleImmediate._closure
@pragma("vm:entry-point", "call")
void _setScheduleImmediateClosure(_ScheduleImmediateClosure closure) {
  _ScheduleImmediate._closure = closure;
}

那麼 _ScheduleImmediate._closure 指向的是什麼呢?咱們須要找到 _setScheduleImmediateClosure 的調用方。root isolate 初始化時會執行一系列的 vm hook 調用,咱們從中找到了 _setScheduleImmediateClosure 的調用,具體代碼以下:

/src/flutter/lib/ui/dart_runtime_hooks.cc

static void InitDartAsync(Dart_Handle builtin_library, bool is_ui_isolate) {
  Dart_Handle schedule_microtask;
  if (is_ui_isolate) {
// 這裏的 builtin_library 是 Flutter 擴展的 ui library
    schedule_microtask =
        GetFunction(builtin_library, "_getScheduleMicrotaskClosure");
  } else {
    ......
  }
  Dart_Handle async_library = Dart_LookupLibrary(ToDart("dart:async"));
  Dart_Handle set_schedule_microtask = ToDart("_setScheduleImmediateClosure");
  Dart_Handle result = Dart_Invoke(async_library, set_schedule_microtask, 1,
                                   &schedule_microtask);
  PropagateIfError(result);
}

進一步跟進,最終找到了 _ScheduleImmediate._closure 指向的方法,是一個 native 實現的函數,具體代碼以下:

/src/flutter/lib/ui/natives.dart

Function _getScheduleMicrotaskClosure() => _scheduleMicrotask; 

void _scheduleMicrotask(void callback()) native 'ScheduleMicrotask';

跟進 _scheduleMicrotask 的 native 實現,發現其會把傳入的 _startMicrotaskLoop 方法加入到底層的Microtask queue,具體代碼以下:

/src/flutter/lib/ui/dart_runtime_hooks.cc

void ScheduleMicrotask(Dart_NativeArguments args) {
  Dart_Handle closure = Dart_GetNativeArgument(args, 0);
  UIDartState::Current()->ScheduleMicrotask(closure);
}

/src/flutter/lib/ui/ui_dart_state.cc

void UIDartState::ScheduleMicrotask(Dart_Handle closure) {  if (tonic::LogIfError(closure) || !Dart_IsClosure(closure)) {    return;  }  microtask_queue_.ScheduleMicrotask(closure);}

2.  Microtask queue 消息處理

前面已經將 _startMicrotaskLoop 方法加入到了 Microtask queue ,那麼 Microtask queue 內的方法什麼時候執行呢?咱們經過跟進 Microtask queue  的 RunMicrotasks 方法的調用方,最終找到 Microtask queue 內方法的執行時機 FlushMicrotasksNow,具體代碼以下:

/src/flutter/lib/ui/ui_dart_state.cc

void UIDartState::ScheduleMicrotask(Dart_Handle closure) {
  if (tonic::LogIfError(closure) || !Dart_IsClosure(closure)) {
    return;
  }
  microtask_queue_.ScheduleMicrotask(closure);
}

再跟進 FlushMicrotasksNow 方法的調用方,發現有兩處調用:

  • 這裏是在每一幀開始的時候去執行 Microtask

/src/flutter/lib/ui/window/window.cc

void UIDartState::FlushMicrotasksNow() {
  microtask_queue_.RunMicrotasks();
}
  • 另一處調用是經過 TaskObserve 的形式,具體代碼以下:

/src/flutter/lib/ui/ui_dart_state.cc

void UIDartState::AddOrRemoveTaskObserver(bool add) {
......
  if (add) {
// 這個 add_callback_ 是啥呢?
    add_callback_(reinterpret_cast<intptr_t>(this),
                  [this]() { this->FlushMicrotasksNow(); });
  } else {
    remove_callback_(reinterpret_cast<intptr_t>(this));
  }
}

跟進 add_callback_ 的賦值,這裏是android的實現

/src/flutter/shell/platform/android/flutter_main.cc

void FlutterMain::Init(JNIEnv* env,
                       jclass clazz,
                       jobject context,
                       jobjectArray jargs,
                       jstring bundlePath,
                       jstring appStoragePath,
                       jstring engineCachesPath) {
 ......
  settings.task_observer_add = [](intptr_t key, fml::closure callback) {
    fml::MessageLoop::GetCurrent().AddTaskObserver(key, std::move(callback));
  };
......
}

FlushMicrotasksNow() 是做爲 MessageLoop 的 TaskObserver 來執行的, TaskObserver 會在處理完task以後把該 Task 建立的 MicroTask 所有執行,也就是說在下一個 Task 運行前執行。代碼以下:

/src/flutter/fml/message_loop_impl.cc

void MessageLoopImpl::FlushTasks(FlushType type) {
......
  for (const auto& invocation : invocations) {
    invocation();
    for (const auto& observer : task_observers_) {
      observer.second();
    }
  }
}

5、結束

經過前面的分析,Flutter 的異步消息處理流程仍是挺複雜的,主要是代碼寫的比較亂,跳轉層次太多,能夠經過對整個流程的掌控來尋找UI 線程的優化及監控點,進而下降 UI 線程的處理時間,但願本篇文章讓你們對 Flutter 的異步消息的總體處理流程有更深的理解。

Android開發資料+面試架構資料 免費分享 點擊連接 便可領取

《Android架構師必備學習資源免費領取(架構視頻+面試專題文檔+學習筆記)》

相關文章
相關標籤/搜索