以前分析過線程的代碼,最近在使用線程,繼續分析一下。咱們先看一下通常的使用例子。node
const { Worker, isMainThread, parentPort } = require('worker_threads');
if (isMainThread) {
const worker = new Worker(__filename);
worker.once('message', (message) => {
...
});
worker.postMessage('Hello, world!');
} else {
// 作點耗時的事情
parentPort.once('message', (message) => {
parentPort.postMessage(message);
});
}
咱們先分析一下這個代碼的意思。由於上面的代碼在主線程和子線程都會被執行一遍。因此首先經過isMainThread判斷當前是主線程仍是子線程。主線程的話,就建立一個子線程,而後監聽子線程發過來的消息。子線程的話,首先執行業務相關的代碼,還能夠監聽主線程傳過來的消息。下面咱們開始分析源碼。分析完,會對上面的代碼有更多的理解。
首先咱們從worker_threads模塊開始分析。這是一個c++模塊。咱們看一下他導出的功能。require("work_threads")的時候就是引用了InitWorker函數導出的功能。c++
void InitWorker(Local<Object> target,
Local<Value> unused,
Local<Context> context,
void* priv) {
Environment* env = Environment::GetCurrent(context);
{
// 執行下面的方法時,入參都是w->GetFunction() new出來的對象
// 新建一個函數模板,Worker::New是對w->GetFunction()執行new的時候會執行的回調
Local<FunctionTemplate> w = env->NewFunctionTemplate(Worker::New);
// 設置須要拓展的內存,由於c++對象的內存是固定的
w->InstanceTemplate()->SetInternalFieldCount(1);
w->Inherit(AsyncWrap::GetConstructorTemplate(env));
// 設置一系列原型方法,就不一一列舉
env->SetProtoMethod(w, "setEnvVars", Worker::SetEnvVars);
// 一系列原型方法
// 導出函數模塊對應的函數,即咱們代碼中const { Worker } = require("worker_threads");中的Worker
Local<String> workerString =
FIXED_ONE_BYTE_STRING(env->isolate(), "Worker");
w->SetClassName(workerString);
target->Set(env->context(),
workerString,
w->GetFunction(env->context()).ToLocalChecked()).Check();
}
// 導出getEnvMessagePort方法,const { getEnvMessagePort } = require("worker_threads");
env->SetMethod(target, "getEnvMessagePort", GetEnvMessagePort);
/*
線程id,這個不是操做系統分配的那個,而是nodejs分配的,在新開線程的時候設置
const { threadId } = require("worker_threads");
*/
target
->Set(env->context(),
env->thread_id_string(),
Number::New(env->isolate(), static_cast<double>(env->thread_id())))
.Check();
/*
是不是主線程,const { isMainThread } = require("worker_threads");
這邊變量在nodejs啓動的時候設置爲true,新開子線程的時候,沒有設置,因此是false
*/
target
->Set(env->context(),
FIXED_ONE_BYTE_STRING(env->isolate(), "isMainThread"),
Boolean::New(env->isolate(), env->is_main_thread()))
.Check();
/*
若是不是主線程,導出資源限制的配置,
即在子線程中調用const { resourceLimits } = require("worker_threads");
*/
if (!env->is_main_thread()) {
target
->Set(env->context(),
FIXED_ONE_BYTE_STRING(env->isolate(), "resourceLimits"),
env->worker_context()->GetResourceLimits(env->isolate()))
.Check();
}
// 導出幾個常量
NODE_DEFINE_CONSTANT(target, kMaxYoungGenerationSizeMb);
NODE_DEFINE_CONSTANT(target, kMaxOldGenerationSizeMb);
NODE_DEFINE_CONSTANT(target, kCodeRangeSizeMb);
NODE_DEFINE_CONSTANT(target, kTotalResourceLimitCount);
}
翻譯成js大概是web
function c++Worker(object) {
// 關聯起來,後續在js層調用c++層函數時,取出來,拿到c++層真正的worker對象
object[0] = this;
...
}
function New(object) {
const worker = new c++Worker(object);
}
function Worker() {
New(this);
}
Worker.prototype = {
startThread,StartThread,
StopThread: StopThread,
...
}
module.exports = {
Worker: Worker,
getEnvMessagePort: GetEnvMessagePort,
isMainThread: true | false
...
}
瞭解work_threads模塊導出的功能後,咱們看new Worker的時候的邏輯。根據上面代碼導出的邏輯,咱們知道這時候首先會新建一個c++對象。對應上面的Worker函數中的this。而後執行New回調,並傳入tihs。咱們看New函數的邏輯。咱們省略一系列的參數處理,主要代碼以下。編程
// args.This()就是咱們剛纔傳進來的this
Worker* worker = new Worker(env, args.This(),
url, per_isolate_opts,
std::move(exec_argv_out));
咱們再看Worker類。緩存
Worker::Worker(Environment* env,
Local<Object> wrap,...)
// 在父類構造函數中完成對象的Worker對象和args.This()對象的關聯
: AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER),
...
// 分配線程id
thread_id_(Environment::AllocateThreadId()),
env_vars_(env->env_vars()) {
// 新建一個端口和子線程通訊
parent_port_ = MessagePort::New(env, env->context());
/*
關聯起來,用於通訊
const parent_port_ = {data: {sibling: null}};
const child_port_data_ = {sibling: null};
parent_port_.data.sibling = child_port_data_;
child_port_data_.sibling = parent_port_.data;
*/
child_port_data_ = std::make_unique<MessagePortData>(nullptr);
MessagePort::Entangle(parent_port_, child_port_data_.get());
// 設置Worker對象的messagePort屬性爲parent_port_
object()->Set(env->context(),
env->message_port_string(),
parent_port_->object()).Check();
// 設置Worker對象的線程id,即threadId屬性
object()->Set(env->context(),
env->thread_id_string(),
Number::New(env->isolate(), static_cast<double>(thread_id_)))
.Check();
}
新建一個Worker,結構以下
微信
![](http://static.javashuo.com/static/loading.gif)
瞭解了new Worker的邏輯後,咱們看在js層是如何使用的。咱們看js層Worker類的構造函數。
constructor(filename, options = {}) {
super();
// 忽略一系列參數處理,new Worker就是上面提到的c++層的
this[kHandle] = new Worker(url, options.execArgv, parseResourceLimits(options.resourceLimits));
// messagePort就是上面圖中的messagePort,指向_parent_port
this[kPort] = this[kHandle].messagePort;
this[kPort].on('message', (data) => this[kOnMessage](data));
// 開始接收消息,咱們這裏不深刻messagePort,後續單獨分析
this[kPort].start();
// 申請一個通訊管道,兩個端口
const { port1, port2 } = new MessageChannel();
this[kPublicPort] = port1;
this[kPublicPort].on('message', (message) => this.emit('message', message));
// 向另外一端發送消息
this[kPort].postMessage({
argv,
type: messageTypes.LOAD_SCRIPT,
filename,
doEval: !!options.eval,
cwdCounter: cwdCounter || workerIo.sharedCwdCounter,
workerData: options.workerData,
publicPort: port2,
manifestSrc: getOptionValue('--experimental-policy') ?
require('internal/process/policy').src :
null,
hasStdin: !!options.stdin
}, [port2]);
// 開啓線程
this[kHandle].startThread();
}
上面的代碼主要邏輯以下
1 保存messagePort,而後給messagePort的對端(看上面的圖)發送消息,可是這時候尚未接收者,因此消息會緩存到MessagePortData,即child_port_data_ 中。
2 申請一個通訊管道,用於主線程和子線程通訊。_parent_port和child_port是給nodejs使用的,新申請的管道是給用戶使用的。
3 建立子線程。
咱們看建立線程的時候,作了什麼。架構
void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
Worker* w;
// 解包出對應的Worker對象
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
// 新建一個子線程,而後執行Run函數,今後在子線程裏執行
uv_thread_create_ex(&w->tid_, &thread_options, [](void* arg) {
w->Run();
}, static_cast<void*>(w))
}
咱們繼續看Run函數
void Worker::Run() {
{
// 新建一個env
env_.reset(new Environment(data.isolate_data_.get(),
context,
std::move(argv_),
std::move(exec_argv_),
Environment::kNoFlags,
thread_id_));
// 初始化libuv,往libuv註冊
env_->InitializeLibuv(start_profiler_idle_notifier_);
// 建立一個MessagePort
CreateEnvMessagePort(env_.get());
// 執行internal/main/worker_thread.js
StartExecution(env_.get(), "internal/main/worker_thread");
// 開始事件循環
do {
uv_run(&data.loop_, UV_RUN_DEFAULT);
platform_->DrainTasks(isolate_);
more = uv_loop_alive(&data.loop_);
if (more && !is_stopped()) continue;
more = uv_loop_alive(&data.loop_);
} while (more == true && !is_stopped());
}
}
咱們分步驟分析上面的代碼
1 CreateEnvMessagePortoop
void Worker::CreateEnvMessagePort(Environment* env) {
child_port_ = MessagePort::New(env,
env->context(),
std::move(child_port_data_));
if (child_port_ != nullptr)
env->set_message_port(child_port_->object(isolate_));
}
child_port_data_這個變量咱們應該很熟悉,在這裏首先申請一個新的端口。負責端口中數據管理的對象是child_port_data_。而後在env緩存起來。一會要用。
源碼分析
![](http://static.javashuo.com/static/loading.gif)
2 執行internal/main/worker_thread.js
// 設置process對象
patchProcessObject();
// 獲取剛纔緩存的端口
onst port = getEnvMessagePort();
port.on('message', (message) => {
// 加載腳本
if (message.type === LOAD_SCRIPT) {
const {
argv,
cwdCounter,
filename,
doEval,
workerData,
publicPort,
manifestSrc,
manifestURL,
hasStdin
} = message;
const CJSLoader = require('internal/modules/cjs/loader');
loadPreloadModules();
/*
由主線程申請的MessageChannel管道中,某一端的端口,
設置publicWorker的parentPort字段,publicWorker就是worker_threads導出的對象,後面須要用
*/
publicWorker.parentPort = publicPort;
// 執行時使用的數據
publicWorker.workerData = workerData;
// 通知主線程,正在執行腳本
port.postMessage({ type: UP_AND_RUNNING });
// 執行new Worker(filename)時傳入的文件
CJSLoader.Module.runMain(filename);
})
// 開始接收消息
port.start()
這時候咱們再回頭看一下,咱們調用new Worker(filename),而後在子線程裏執行咱們的filename時的場景。咱們再次回顧前面的代碼。
const { Worker, isMainThread, parentPort } = require('worker_threads');
if (isMainThread) {
const worker = new Worker(__filename);
worker.once('message', (message) => {
...
});
worker.postMessage('Hello, world!');
} else {
// 作點耗時的事情
parentPort.once('message', (message) => {
parentPort.postMessage(message);
});
}
咱們知道isMainThread在子線程裏是false,parentPort 就是就是messageChannel中的一端。因此parentPort.postMessage給對端發送消息,就是給主線程發送消息,咱們再看看worker.postMessage('Hello, world!')。
postMessage(...args) {
this[kPublicPort].postMessage(...args);
}
kPublicPort指向的就是messageChannel的另外一端。即給子線程發送消息。那麼on('message')就是接收對端發過來的消息。
總結,以上就是nodejs中關於線程的基本原理,線程的實現也很是複雜,大體瞭解他的原理,才能更好地使用他。
本文分享自微信公衆號 - 編程雜技(theanarkh)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。