先說結論,V8引擎初始化一個默認Platform,會建立一個線程池,而其中有個線程是用於處理相似於setTimeout的延時任務
另外附一些圖,包括繼承樹、關鍵屬性歸屬、純邏輯工做流程,對代碼木得興趣的看完圖能夠X掉了。
上一篇講了V8初始化默認Platform對象時會作三件事,其中生成空白DefaultPlatform、獲取線程池大小已經講過了,剩下線程啓動相關的內容。
寫以前花了10幾分鐘學了下mac下C++的線程,對API有一個初步瞭解,給一個簡單的例子,大概流程以下。
const int stack_size = 1 * 1024 * 512;
int tmp = 0;
void* add(void* number){
tmp = tmp + *(int*)number;
printf("tmp: %i\n", tmp);
return nullptr;
};
int main(int argc, const char * argv[]) {
pthread_t pt;
pthread_attr_t attr;
memset(&attr, 0, sizeof(attr));
pthread_attr_init(&attr);
pthread_attr_setstacksize(&attr, stack_size);
int num = 5;
int* ptr = #
int ret = pthread_create(&pt, &attr, add, ptr);
if(ret != 0) printf("cannot create thread");
return 0;
}複製代碼
經過幾個步驟,就能夠建立一條線程來處理任務,啓動後的輸出就懶得截圖了,反正就是打印一個5。
有了上面的例子,能夠慢慢來看V8初始化時多線程的啓動過程,首先是入門方法。
void DefaultPlatform::EnsureBackgroundTaskRunnerInitialized() {
base::MutexGuard guard(&lock_);
if (!worker_threads_task_runner_) {
worker_threads_task_runner_ =
std::make_shared<DefaultWorkerThreadsTaskRunner>(
thread_pool_size_, time_function_for_testing_
? time_function_for_testing_
: DefaultTimeFunction);
}
}
double DefaultTimeFunction() {
return base::TimeTicks::HighResolutionNow().ToInternalValue() /
static_cast<double>(base::Time::kMicrosecondsPerSecond);
}複製代碼
if中的worker_threads_task_runner是DefaultPlatform的私有屬性,因爲初始化時默認值爲NULL,這裏作一個定義賦值。第一個參數是在第二步獲取的線程池大小,第二個參數是一個計數方法,默認引用以前Time模塊裏的東西,返回硬件時間戳,具體實現能夠看我以前寫的。
接下來看DefaultWorkerThreadsTaskRunner類的構造函數,接受2個參數。
DefaultWorkerThreadsTaskRunner::DefaultWorkerThreadsTaskRunner(
uint32_t thread_pool_size, TimeFunction time_function)
: queue_(time_function),
time_function_(time_function),
thread_pool_size_(thread_pool_size) {
for (uint32_t i = 0; i < thread_pool_size; ++i) {
thread_pool_.push_back(base::make_unique<WorkerThread>(this));
}
}複製代碼
用2個參數初始化了3個屬性,而且根據size往線程池中添加線程,thread_pool_這個屬性用vector在管理,push_back至關於JS的push,當成數組來理解就好了。
添加的WorkerThread類是在DefaultWorkerThreadsTaskRunner裏面的一個私有內部類,繼承於Thread,單純的用來管理線程。C++的this比較簡單,沒有JS那麼多概念,就是一個指向當前對象的指針,來看一下線程類的構造函數。
DefaultWorkerThreadsTaskRunner::WorkerThread::WorkerThread(DefaultWorkerThreadsTaskRunner* runner)
: Thread(Options("V8 DefaultWorkerThreadsTaskRunner WorkerThread")),
runner_(runner) {
Start();
}複製代碼
這裏同時調用了父類構造函數並初始化自己的屬性,runner就是上面那個對象自己。這個構造函數長得比較奇怪,其中Options類是Thread的內部類,有一個接受一個類型爲字符串的構造函數,而Thread的構造函數只接受Options類型,因此會這樣,代碼以下。
class Thread {
public:
// Opaque data type for thread-local storage keys.
using LocalStorageKey = int32_t;
class Options {
public:
Options() : name_("v8:<unknown>"), stack_size_(0) {}
explicit Options(const char* name, int stack_size = 0)
: name_(name), stack_size_(stack_size) {}
// ...
};
// Create new thread.
explicit Thread(const Options& options);
// ...
}複製代碼
能夠簡單理解這裏給線程取了一個名字,在給Options命名的同時,其實也給Thread命名了,以下。
Thread::Thread(const Options& options)
: data_(new PlatformData),
stack_size_(options.stack_size()),
start_semaphore_(nullptr) {
if (stack_size_ > 0 && static_cast<size_t>(stack_size_) < PTHREAD_STACK_MIN) {
stack_size_ = PTHREAD_STACK_MIN;
}
set_name(options.name());
}
class Thread {
static const int kMaxThreadNameLength = 16;
char name_[kMaxThreadNameLength];
}
void Thread::set_name(const char* name) {
strncpy(name_, name, sizeof(name_));
name_[sizeof(name_) - 1] = '\0';
}複製代碼
看註釋說,因爲Linux的prctl方法限制了長度,因此這裏的name也最多隻能保存16位,並且C++的字符串的最後一位還要留給結束符,因此理論上傳入Options的超長字符串
"V8 DefaultWorkerThreadsTaskRunner WorkerThread"只有前15位做爲Thread的name保存下來了,也就是"V8 Defaultworke",很是戲劇性的把r給砍掉了。。。
初始化完成後,會調用Start方法啓動線程,這個方法並不須要子類實現,而是基類已經定義好了,保留關鍵代碼以下。
void Thread::Start() {
int result;
pthread_attr_t attr;
memset(&attr, 0, sizeof(attr));
result = pthread_attr_init(&attr);
size_t stack_size = stack_size_;
if (stack_size == 0) {
stack_size = 1 * 1024 * 1024;
}
if (stack_size > 0) {
result = pthread_attr_setstacksize(&attr, stack_size);
}
{
result = pthread_create(&data_->thread_, &attr, ThreadEntry, this);
}
result = pthread_attr_destroy(&attr);
}複製代碼
參照一下文章開始的demo,能夠看出去掉了合法性檢測和宏以後,在初始化和啓動線程基本上V8的形式是同樣的。
簡單總結一下,V8初始化了一個DefaultPlatform類,計算了一下可用線程池大小,生成了幾條線程弄進線程池,而每條線程的任務就是那個ThreadEntry,這篇所有寫完算了。
// 3-5
static void* ThreadEntry(void* arg) {
Thread* thread = reinterpret_cast<Thread*>(arg);
// We take the lock here to make sure that pthread_create finished first since
// we don't know which thread will run first (the original thread or the new
// one).
{ MutexGuard lock_guard(&thread->data()->thread_creation_mutex_); }
// 3-6
SetThreadName(thread->name());
// 3-7
thread->NotifyStartedAndRun();
return nullptr;
}複製代碼
因爲線程任務的參數定義與返回值都是void*,這裏直接作一個強轉。隨後會加一個線程鎖,由於這幾個線程在初始化的時候並不須要同時執行這個任務。執行的第一個方法雖然從名字來看只是簡單的給線程設置名字,可是內容卻不簡單。
傳入SetThreadName方法的參數是以前那個被截斷的字符串,看一下這個方法。
static void SetThreadName(const char* name) {
int (*dynamic_pthread_setname_np)(const char*);
*reinterpret_cast<void**>(&dynamic_pthread_setname_np) =
dlsym(RTLD_DEFAULT, "pthread_setname_np");
if (dynamic_pthread_setname_np == nullptr) return;
static const int kMaxNameLength = 63;
dynamic_pthread_setname_np(name);
}複製代碼
裏面用了一個很玄的api的叫dlsym,官方解釋以下。
The function dlsym() takes a "handle" of a dynamic library returned by dlopen() and the null-terminated symbol name, returning the address where that symbol is loaded into memory.
大概就是根據句柄讀取一個動態連接庫,名字就是那個字符串,返回其在內存中的地址,因此這塊的調試全是機器碼,根本看不懂,最後返回的一個函數。
知道這是個函數就好了,至於怎麼設置線程名字我也不太想知道。
第二步的方法名就是運行線程的任務,調用鏈比較長,會來回在幾個類之間穿梭,調用各自屬性的方法。
void NotifyStartedAndRun() {
if (start_semaphore_) start_semaphore_->Signal();
Run();
}
void DefaultWorkerThreadsTaskRunner::WorkerThread::Run() {
runner_->single_worker_thread_id_.store(base::OS::GetCurrentThreadId(), std::memory_order_relaxed);
while (std::unique_ptr<Task> task = runner_->GetNext()) {
task->Run();
}
}
std::unique_ptr<Task> DefaultWorkerThreadsTaskRunner::GetNext() {
return queue_.GetNext();
}複製代碼
不理清楚,這個地方真的很麻煩,繞得很,能夠看頂部的繼承圖。總之,最後調用的是DefaultWorkerThreadsTaskRunner類上一個類型爲DelayedTaskQueue類的GetNext方法,返回類型是Task類,V8只是簡單定義了一個基類,實際運行時的因此task都須要繼承這個類並實現其Run方法以便線程執行。
最後的最後,GetNext的邏輯其實能夠參考libuv的邏輯,機制都大同小異,方法的源碼以下。
std::unique_ptr<Task> DelayedTaskQueue::GetNext() {
base::MutexGuard guard(&lock_);
for (;;) {
double now = MonotonicallyIncreasingTime();
std::unique_ptr<Task> task = PopTaskFromDelayedQueue(now);
while (task) {
task_queue_.push(std::move(task));
task = PopTaskFromDelayedQueue(now);
}
if (!task_queue_.empty()) {
std::unique_ptr<Task> result = std::move(task_queue_.front());
task_queue_.pop();
return result;
}
if (terminated_) {
queues_condition_var_.NotifyAll();
return nullptr;
}
if (task_queue_.empty() && !delayed_task_queue_.empty()) {
double wait_in_seconds = delayed_task_queue_.begin()->first - now;
base::TimeDelta wait_delta = base::TimeDelta::FromMicroseconds(base::TimeConstants::kMicrosecondsPerSecond * wait_in_seconds);
bool notified = queues_condition_var_.WaitFor(&lock_, wait_delta);
USE(notified);
} else {
queues_condition_var_.Wait(&lock_);
}
}
}複製代碼