nodejs源碼分析之線程

以前分析過線程的代碼,最近在使用線程,繼續分析一下。咱們先看一下通常的使用例子。node

const { Worker, isMainThread, parentPort } = require('worker_threads');
if (isMainThread) {
  const worker = new Worker(__filename);
  worker.once('message', (message) => {
    ...
  });
  worker.postMessage('Hello, world!');
else {
  // 作點耗時的事情
  parentPort.once('message', (message) => {
    parentPort.postMessage(message);
  });
}

咱們先分析一下這個代碼的意思。由於上面的代碼在主線程和子線程都會被執行一遍。因此首先經過isMainThread判斷當前是主線程仍是子線程。主線程的話,就建立一個子線程,而後監聽子線程發過來的消息。子線程的話,首先執行業務相關的代碼,還能夠監聽主線程傳過來的消息。下面咱們開始分析源碼。分析完,會對上面的代碼有更多的理解。
   首先咱們從worker_threads模塊開始分析。這是一個c++模塊。咱們看一下他導出的功能。require("work_threads")的時候就是引用了InitWorker函數導出的功能。c++

void InitWorker(Local<Object> target,
                Local<Value> unused,
                Local<Context> context,
                void* priv) {
  Environment* env = Environment::GetCurrent(context);

  {
    // 執行下面的方法時,入參都是w->GetFunction() new出來的對象
    // 新建一個函數模板,Worker::New是對w->GetFunction()執行new的時候會執行的回調
    Local<FunctionTemplate> w = env->NewFunctionTemplate(Worker::New);
    // 設置須要拓展的內存,由於c++對象的內存是固定的
    w->InstanceTemplate()->SetInternalFieldCount(1);
    w->Inherit(AsyncWrap::GetConstructorTemplate(env));
    // 設置一系列原型方法,就不一一列舉
    env->SetProtoMethod(w, "setEnvVars", Worker::SetEnvVars);
    // 一系列原型方法
    // 導出函數模塊對應的函數,即咱們代碼中const { Worker } = require("worker_threads");中的Worker
    Local<String> workerString =
        FIXED_ONE_BYTE_STRING(env->isolate(), "Worker");
    w->SetClassName(workerString);
    target->Set(env->context(),
                workerString,
                w->GetFunction(env->context()).ToLocalChecked()).Check();
  }
  // 導出getEnvMessagePort方法,const { getEnvMessagePort } = require("worker_threads");
  env->SetMethod(target, "getEnvMessagePort", GetEnvMessagePort);
  /*
      線程id,這個不是操做系統分配的那個,而是nodejs分配的,在新開線程的時候設置
      const { threadId } = require("worker_threads");
  */
  target
      ->Set(env->context(),
            env->thread_id_string(),
            Number::New(env->isolate(), static_cast<double>(env->thread_id())))
      .Check();
  /*
      是不是主線程,const { isMainThread } = require("worker_threads");
      這邊變量在nodejs啓動的時候設置爲true,新開子線程的時候,沒有設置,因此是false
  */
  target
      ->Set(env->context(),
            FIXED_ONE_BYTE_STRING(env->isolate(), "isMainThread"),
            Boolean::New(env->isolate(), env->is_main_thread()))
      .Check();
  /*
      若是不是主線程,導出資源限制的配置,
      即在子線程中調用const { resourceLimits } = require("worker_threads");
  */
  if (!env->is_main_thread()) {
    target
        ->Set(env->context(),
              FIXED_ONE_BYTE_STRING(env->isolate(), "resourceLimits"),
              env->worker_context()->GetResourceLimits(env->isolate()))
        .Check();
  }
  // 導出幾個常量
  NODE_DEFINE_CONSTANT(target, kMaxYoungGenerationSizeMb);
  NODE_DEFINE_CONSTANT(target, kMaxOldGenerationSizeMb);
  NODE_DEFINE_CONSTANT(target, kCodeRangeSizeMb);
  NODE_DEFINE_CONSTANT(target, kTotalResourceLimitCount);
}

翻譯成js大概是web

function c++Worker(object) {
    // 關聯起來,後續在js層調用c++層函數時,取出來,拿到c++層真正的worker對象 
    object[0] = this;
    ...
}
function New(object) {
    const worker = new c++Worker(object);
}
function Worker() {
    New(this);
}
Worker.prototype = {
    startThread,StartThread,
    StopThread: StopThread,
    ...
}
module.exports = {
    Worker: Worker,
    getEnvMessagePort: GetEnvMessagePort,
    isMainThread: true | false
    ...
}

瞭解work_threads模塊導出的功能後,咱們看new Worker的時候的邏輯。根據上面代碼導出的邏輯,咱們知道這時候首先會新建一個c++對象。對應上面的Worker函數中的this。而後執行New回調,並傳入tihs。咱們看New函數的邏輯。咱們省略一系列的參數處理,主要代碼以下。編程

// args.This()就是咱們剛纔傳進來的this
Worker* worker = new Worker(env, args.This(), 
                            url, per_isolate_opts,
                             std::move(exec_argv_out));

咱們再看Worker類。緩存

Worker::Worker(Environment* env,
               Local<Object> wrap,...)
    // 在父類構造函數中完成對象的Worker對象和args.This()對象的關聯
    : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER),
      ...
      // 分配線程id
      thread_id_(Environment::AllocateThreadId()),
      env_vars_(env->env_vars()) {

  // 新建一個端口和子線程通訊
  parent_port_ = MessagePort::New(env, env->context());
  /*
    關聯起來,用於通訊
    const parent_port_ = {data: {sibling: null}};
    const child_port_data_  = {sibling: null};
    parent_port_.data.sibling = child_port_data_;
    child_port_data_.sibling = parent_port_.data;
  */
  child_port_data_ = std::make_unique<MessagePortData>(nullptr);
  MessagePort::Entangle(parent_port_, child_port_data_.get());
  // 設置Worker對象的messagePort屬性爲parent_port_
  object()->Set(env->context(),
                env->message_port_string(),
                parent_port_->object()).Check();
  // 設置Worker對象的線程id,即threadId屬性
  object()->Set(env->context(),
                env->thread_id_string(),
                Number::New(env->isolate(), static_cast<double>(thread_id_)))
      .Check();
}

新建一個Worker,結構以下
微信


瞭解了new Worker的邏輯後,咱們看在js層是如何使用的。咱們看js層Worker類的構造函數。
constructor(filename, options = {}) {
    super();
    // 忽略一系列參數處理,new Worker就是上面提到的c++層的
    this[kHandle] = new Worker(url, options.execArgv, parseResourceLimits(options.resourceLimits));
    // messagePort就是上面圖中的messagePort,指向_parent_port
    this[kPort] = this[kHandle].messagePort;
    this[kPort].on('message', (data) => this[kOnMessage](data));
    // 開始接收消息,咱們這裏不深刻messagePort,後續單獨分析
    this[kPort].start();
    // 申請一個通訊管道,兩個端口
    const { port1, port2 } = new MessageChannel();
    this[kPublicPort] = port1;
    this[kPublicPort].on('message', (message) => this.emit('message', message));
    // 向另外一端發送消息
    this[kPort].postMessage({
      argv,
      type: messageTypes.LOAD_SCRIPT,
      filename,
      doEval: !!options.eval,
      cwdCounter: cwdCounter || workerIo.sharedCwdCounter,
      workerData: options.workerData,
      publicPort: port2,
      manifestSrc: getOptionValue('--experimental-policy') ?
        require('internal/process/policy').src :
        null,
      hasStdin: !!options.stdin
    }, [port2]);
    // 開啓線程
    this[kHandle].startThread();
  }

上面的代碼主要邏輯以下
1 保存messagePort,而後給messagePort的對端(看上面的圖)發送消息,可是這時候尚未接收者,因此消息會緩存到MessagePortData,即child_port_data_ 中。
2 申請一個通訊管道,用於主線程和子線程通訊。_parent_port和child_port是給nodejs使用的,新申請的管道是給用戶使用的。
3 建立子線程。
咱們看建立線程的時候,作了什麼。架構

void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
  Worker* w;
  // 解包出對應的Worker對象
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
  // 新建一個子線程,而後執行Run函數,今後在子線程裏執行
  uv_thread_create_ex(&w->tid_, &thread_options, [](void* arg) {
    w->Run();
  }, static_cast<void*>(w))
}

咱們繼續看Run函數

void Worker::Run() {
    {
        // 新建一個env
        env_.reset(new Environment(data.isolate_data_.get(),
                                   context,
                                   std::move(argv_),
                                   std::move(exec_argv_),
                                   Environment::kNoFlags,
                                   thread_id_));
        // 初始化libuv,往libuv註冊
        env_->InitializeLibuv(start_profiler_idle_notifier_);
        // 建立一個MessagePort
        CreateEnvMessagePort(env_.get());
        // 執行internal/main/worker_thread.js
        StartExecution(env_.get(), "internal/main/worker_thread");
        // 開始事件循環
        do {
          uv_run(&data.loop_, UV_RUN_DEFAULT);
          platform_->DrainTasks(isolate_);
          more = uv_loop_alive(&data.loop_);
          if (more && !is_stopped()) continue;
          more = uv_loop_alive(&data.loop_);
        } while (more == true && !is_stopped());
     }
}

咱們分步驟分析上面的代碼
1 CreateEnvMessagePortoop

void Worker::CreateEnvMessagePort(Environment* env) {
  child_port_ = MessagePort::New(env,
                                 env->context(),
                                 std::move(child_port_data_));

  if (child_port_ != nullptr)
    env->set_message_port(child_port_->object(isolate_));
}

child_port_data_這個變量咱們應該很熟悉,在這裏首先申請一個新的端口。負責端口中數據管理的對象是child_port_data_。而後在env緩存起來。一會要用。
源碼分析


2 執行internal/main/worker_thread.js
// 設置process對象
patchProcessObject();
// 獲取剛纔緩存的端口
onst port = getEnvMessagePort();
port.on('message', (message) => {
  // 加載腳本
  if (message.type === LOAD_SCRIPT) {
    const {
      argv,
      cwdCounter,
      filename,
      doEval,
      workerData,
      publicPort,
      manifestSrc,
      manifestURL,
      hasStdin
    } = message;

    const CJSLoader = require('internal/modules/cjs/loader');
    loadPreloadModules();
    /*
        由主線程申請的MessageChannel管道中,某一端的端口,
        設置publicWorker的parentPort字段,publicWorker就是worker_threads導出的對象,後面須要用
    */
    publicWorker.parentPort = publicPort;
    // 執行時使用的數據
    publicWorker.workerData = workerData;
    // 通知主線程,正在執行腳本
    port.postMessage({ type: UP_AND_RUNNING });
    // 執行new Worker(filename)時傳入的文件
    CJSLoader.Module.runMain(filename);
})
// 開始接收消息
port.start()

這時候咱們再回頭看一下,咱們調用new Worker(filename),而後在子線程裏執行咱們的filename時的場景。咱們再次回顧前面的代碼。

const { Worker, isMainThread, parentPort } = require('worker_threads');
if (isMainThread) {
  const worker = new Worker(__filename);
  worker.once('message', (message) => {
    ...
  });
  worker.postMessage('Hello, world!');
else {
  // 作點耗時的事情
  parentPort.once('message', (message) => {
    parentPort.postMessage(message);
  });
}

咱們知道isMainThread在子線程裏是false,parentPort 就是就是messageChannel中的一端。因此parentPort.postMessage給對端發送消息,就是給主線程發送消息,咱們再看看worker.postMessage('Hello, world!')。

 postMessage(...args) {
    this[kPublicPort].postMessage(...args);
 }

kPublicPort指向的就是messageChannel的另外一端。即給子線程發送消息。那麼on('message')就是接收對端發過來的消息。
總結,以上就是nodejs中關於線程的基本原理,線程的實現也很是複雜,大體瞭解他的原理,才能更好地使用他。

更多閱讀
1 經過源碼分析nodejs線程架構
2 nodejs源碼分析之c++層的通用邏輯

本文分享自微信公衆號 - 編程雜技(theanarkh)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索