chromium源碼閱讀--進程的Message Loop

上一篇總結了chromium進程的啓動,接下來就看線程的消息處理,這裏的線程包含進程的主線程。java

消息處理是由base::MessageLoop中實現,消息中的任務和定時器都是異步事件的。緩存

主要以下幾點:安全

一、消息的類型分類數據結構

二、延時處理的消息是如何實現閉包

 

1、消息分類異步

     chromium主要將消息類型以下定義:(chromium//src/base/message_loop/message_loop.h  112行)ide

1  enum Type {
2     TYPE_DEFAULT,
3     TYPE_UI,
4     TYPE_CUSTOM,
5     TYPE_IO,
6 #if defined(OS_ANDROID)
7     TYPE_JAVA,
8 #endif  // defined(OS_ANDROID)
9   };

1.TYPE_DEFAULT:函數

      處理chromium定義的Task(閉包代碼塊)和定時器任務oop

2.TYPE_UI:ui

     除了TYPE_DEFAULT定義的範圍,還支持原生的UI事件消息(好比用戶操做的窗口消息),MessageLoopForUI類

3.TYPE_IO:

     除了TYPE_DEFAULT定義的範圍,還支持異步IO的事件消息,MessageLoopForIO類

4.TYPE_JAVA

    是Android平臺的特有的消息消息,由於Android裏,有java消息和native消息分層,native消息與java消息交互,java消息與應用程序交互,能夠看作java消息接管了native消息。

5.TYPE_CUSTOM

    定製消息,比較少見使用。

 

消息類型的不一樣也就會建立不一樣的MessagePump。對於UI消息,不一樣的平臺也會有不一樣的實現。在chromium//src/base/message_loop/message_loop.cc 166行

 1 // static
 2 std::unique_ptr<MessagePump> MessageLoop::CreateMessagePumpForType(Type type) {
 3 // TODO(rvargas): Get rid of the OS guards.
 4 #if defined(USE_GLIB) && !defined(OS_NACL)
 5   using MessagePumpForUI = MessagePumpGlib;
 6 #elif (defined(OS_LINUX) && !defined(OS_NACL)) || defined(OS_BSD)
 7   using MessagePumpForUI = MessagePumpLibevent;
 8 #elif defined(OS_FUCHSIA)
 9   using MessagePumpForUI = MessagePumpFuchsia;
10 #endif
11 
12 #if defined(OS_IOS) || defined(OS_MACOSX)
13 #define MESSAGE_PUMP_UI std::unique_ptr<MessagePump>(MessagePumpMac::Create())
14 #elif defined(OS_NACL) || defined(OS_AIX)
15 // Currently NaCl and AIX don't have a UI MessageLoop.
16 // TODO(abarth): Figure out if we need this.
17 #define MESSAGE_PUMP_UI std::unique_ptr<MessagePump>()
18 #else
19 #define MESSAGE_PUMP_UI std::unique_ptr<MessagePump>(new MessagePumpForUI())
20 #endif
21 
22 #if defined(OS_MACOSX)
23   // Use an OS native runloop on Mac to support timer coalescing.
24 #define MESSAGE_PUMP_DEFAULT \
25   std::unique_ptr<MessagePump>(new MessagePumpCFRunLoop())
26 #else
27 #define MESSAGE_PUMP_DEFAULT \
28   std::unique_ptr<MessagePump>(new MessagePumpDefault())
29 #endif
30 
31   if (type == MessageLoop::TYPE_UI) {
32     if (message_pump_for_ui_factory_)
33       return message_pump_for_ui_factory_();
34     return MESSAGE_PUMP_UI;
35   }
36   if (type == MessageLoop::TYPE_IO)
37     return std::unique_ptr<MessagePump>(new MessagePumpForIO());
38 
39 #if defined(OS_ANDROID)
40   if (type == MessageLoop::TYPE_JAVA)
41     return std::unique_ptr<MessagePump>(new MessagePumpForUI());
42 #endif
43 
44   DCHECK_EQ(MessageLoop::TYPE_DEFAULT, type);
45   return MESSAGE_PUMP_DEFAULT;
46 }
View Code

 

2、延時消息如何處理

消息的處理與消息隊列密不可分,internal::IncomingTaskQueue實現了一個線程安全的消息隊列。 MessageLoop裏定義了(chromium//src/base/message_loop/message_loop.h 392行)

1 scoped_refptr<internal::IncomingTaskQueue> incoming_task_queue_;

接收到的消息就緩存在這個隊列裏。那麼咱們先看看這個類的構造函數。

 1 IncomingTaskQueue::IncomingTaskQueue(MessageLoop* message_loop)
 2     : always_schedule_work_(AlwaysNotifyPump(message_loop->type())),
 3       triage_tasks_(this),
 4       delayed_tasks_(this),
 5       deferred_tasks_(this),
 6       message_loop_(message_loop) {
 7   // The constructing sequence is not necessarily the running sequence in the
 8   // case of base::Thread.
 9   DETACH_FROM_SEQUENCE(sequence_checker_);
10 }

構造函數裏經過MessageLoop的類型來初始化bool成員 always_schedule_work_ ,來判斷是否對消息進行調度, 並保存了message_loop指針。

繼續分析代碼,前面看到消息隊列已經初始化了,那接下來咱們看看是怎麼往隊列裏添加任務的。

bool IncomingTaskQueue::AddToIncomingQueue(const Location& from_here,
                                           OnceClosure task,
                                           TimeDelta delay,
                                           Nestable nestable) {
  ......

  PendingTask pending_task(from_here, std::move(task),
                           CalculateDelayedRuntime(delay), nestable);
  ......
  return PostPendingTask(&pending_task);
}

使用了PendingTask對象,並計算了延遲的時間和是不是嵌套任務。那麼看PostPendingTask函數:

 1 bool IncomingTaskQueue::PostPendingTask(PendingTask* pending_task) {
 5   bool accept_new_tasks;
 6   bool schedule_work = false;
 7   {
 8     AutoLock auto_lock(incoming_queue_lock_);
 9     accept_new_tasks = accept_new_tasks_;
10     if (accept_new_tasks)
11       schedule_work = PostPendingTaskLockRequired(pending_task);
12   }
13 
14   if (!accept_new_tasks) {
19 pending_task->task.Reset(); 20 return false; 21 } 22 29 if (schedule_work) { 30 // Ensures |message_loop_| isn't destroyed while running. 31 AutoLock auto_lock(message_loop_lock_); 32 if (message_loop_) 33 message_loop_->ScheduleWork(); 34 } 35 36 return true; 37 }

這裏已經開始給線程加鎖了,那麼繼續看PostPendingTaskLockRequired函數:

 1 bool IncomingTaskQueue::PostPendingTaskLockRequired(PendingTask* pending_task) {
 2   incoming_queue_lock_.AssertAcquired();
 3   ......
 4   
 7   pending_task->sequence_num = next_sequence_num_++;
 8 
 9   task_annotator_.DidQueueTask("MessageLoop::PostTask", *pending_task);
10 
11   bool was_empty = incoming_queue_.empty();
12   incoming_queue_.push(std::move(*pending_task));
13 
14   if (is_ready_for_scheduling_ &&
15       (always_schedule_work_ || (!message_loop_scheduled_ && was_empty))) {
21     message_loop_scheduled_ = true;
22     return true;
23   }
24   return false;
25 }

這裏看到pending_task是保存在incoming_queue_ 這裏使用了std::queue容器(一個FIFO的數據結構),這個隊列裏面的任務尚未添加到MessageLoop中,也能夠看到這裏尚未明確任務的執行方式,使用的是FIFO隊列。

下面的幾個成員變量,則就是在MessageLoop中使用了。

chromium//src/base/message_loop/incoming_task_queue.h 217行

1 // Queue for initial triaging of tasks on the |sequence_checker_| sequence.
2   TriageQueue triage_tasks_;
3 
4   // Queue for delayed tasks on the |sequence_checker_| sequence.
5   DelayedQueue delayed_tasks_;
6 
7   // Queue for non-nestable deferred tasks on the |sequence_checker_| sequence.
8   DeferredQueue deferred_tasks_;

一、TriageQueue

     這是第一個按默認的任務處理順序(FIFO)接受全部任務的隊列,這個隊列的任務要立刻執行或者放到下面的DelayedQueue 或者 DeferredQueue。

     triage_tasks_隊列的任務是經過 IncomingTaskQueue::ReloadWorkQueue(TaskQueue* work_queue)來實現切換的,能夠將 incoming_queue_ 和 triage_tasks_當作冷熱備份的緩存,在triage_tasks_隊列的任務執行完了,即爲空時,就將待執行的incoming_queue_隊列的任務與之交換。

1 void IncomingTaskQueue::TriageQueue::ReloadFromIncomingQueueIfEmpty() {
2   if (queue_.empty()) {
3     // TODO(robliao): Since these high resolution tasks aren't yet in the
4     // delayed queue, they technically shouldn't trigger high resolution timers
5     // until they are.
6     outer_->pending_high_res_tasks_ += outer_->ReloadWorkQueue(&queue_);
7   }
8 }

二、DelayedQueue

     這個隊列是存放延遲執行的任務,而且定期望時間排序的

     delayed_tasks_是一個優先級隊列,按delayed_run_time排序,chromium//src/base/pending_task.h 63行

1 // PendingTasks are sorted by their |delayed_run_time| property.
2 using DelayedTaskQueue = std::priority_queue<base::PendingTask>;

三、DeferredQueue

     這個隊列一般是存放哪些由於MessageLoop嵌套而不能執行的任務,這些任務一般會在空閒的時候執行。

OK,看到這裏,咱們回顧一下MessageLoop的執行流程:

1 void MessageLoop::Run() {
2   DCHECK_EQ(this, current());
3   pump_->Run(this);
4 }

由MessagePump來執行,那麼咱們選擇默認的MessagePump來看Run的流程,chromium//src/base/message_loop/message_pump_default.cc 29行:

 1 void MessagePumpDefault::Run(Delegate* delegate) {
 2   AutoReset<bool> auto_reset_keep_running(&keep_running_, true);
 3 
 4   for (;;) {
 5 #if defined(OS_MACOSX)
 6     mac::ScopedNSAutoreleasePool autorelease_pool;
 7 #endif
 8 
 9     bool did_work = delegate->DoWork();
10     if (!keep_running_)
11       break;
12 
13     did_work |= delegate->DoDelayedWork(&delayed_work_time_);
14     if (!keep_running_)
15       break;
16 
17     if (did_work)
18       continue;
19 
20     did_work = delegate->DoIdleWork();
21     if (!keep_running_)
22       break;
23 
24     if (did_work)
25       continue;
26 
27     ThreadRestrictions::ScopedAllowWait allow_wait;
28     if (delayed_work_time_.is_null()) {
29       event_.Wait();
30     } else {
31       event_.TimedWaitUntil(delayed_work_time_);
32     }
33   }
34 }
View Code

其中的流程是,delegate->DoWork(), delegate->DoDelayedWork(&delayed_work_time_),delegate->DoIdleWork()。

也能夠看其餘平臺的MessagePump,主要的流程是一致的,都是delegate的函數,而delegate指向一個MessageLoop指針,那麼又回到MessageLoop中。

上面具體的DoWork就不詳述了,接下來看看延遲任務是如何實現的:

chromium//src/base/message_loop/message_loop.cc 473行

TimeTicks next_run_time =
      incoming_task_queue_->delayed_tasks().Peek().delayed_run_time;
  if (next_run_time > recent_time_) {
    recent_time_ = TimeTicks::Now();  // Get a better view of Now();
    if (next_run_time > recent_time_) {
      *next_delayed_work_time = next_run_time;
      return false;
    }
  }

  PendingTask pending_task = incoming_task_queue_->delayed_tasks().Pop();

  if (incoming_task_queue_->delayed_tasks().HasTasks()) {
    *next_delayed_work_time =
        incoming_task_queue_->delayed_tasks().Peek().delayed_run_time;
  }

  return DeferOrRunPendingTask(std::move(pending_task));

在下一個可執行時間到來了,就會從incoming_task_queue_->delayed_tasks().Pop() 出來一個task, 若是incoming_task_queue_->delayed_tasks()裏還有延遲任務,就取裏面優先級最高的任務的延遲時間做爲下次判斷的時間。

到這裏,消息的處理和延遲任務的執行都完成了。

好了,回到上面的Run()函數流程,在DoIdleWork()完成以後,當前線程開始休眠,直到有新的任務來會從新喚醒。

 

嗯,這篇就到這裏了,下面一篇會總結IPC消息聲明和IPC channel的建立。

相關文章
相關標籤/搜索