Nexus.js介紹:一個多線程的JavaScript運行庫

首先,若是你不熟悉這個項目,建議先閱讀以前寫的一系列文章。若是你不想閱讀這些,不用擔憂。這裏面也會涉及到那些內容。javascript

如今,讓咱們開始吧。html

去年,我開始實現Nexus.js,這是一個基於Webkit/JavaScript內核的多線程服務端JavaScript運行庫。有一段時間我放棄了作這件事,因爲一些我沒法控制的緣由,我不打算在這裏討論,主要是:我沒法讓本身長時間工做。java

因此,讓咱們從討論Nexus的架構開始,以及它是如何工做的。react

事件循環

  • 沒有事件循環
  • 有一個帶有(無鎖)任務對象的線程池
  • 每次調用setTimeout或setImmediate或建立一個Promise時,任務就排隊到任務隊列鍾。
  • 每當計劃任務時,第一個可用的線程將選擇任務並執行它。
  • 在CPU內核上處理Promise。對Promise.all()的調用將並行的解決Promise。

ES6

  • 支持async/await,而且推薦使用
  • 支持for await(...)
  • 支持解構
  • 支持async try/catch/finally

模塊

  • 不支持CommonJS。(require(...)和module.exports)
  • 全部模塊使用ES6的import/export語法
  • 支持動態導入經過import('file-or-packge').then(...)
  • 支持import.meta,例如:import.meta.filename以及import.meta.dirname等等
  • 附加功能:支持直接從URL中導入,例如:
import { h } from 'https://unpkg.com/preact/dist/preact.esm.js';
複製代碼

EventEmitter

  • Nexus實現了基於Promise的EventEmitter類
  • 事件處理程序在全部線程上排序,並將並行處理執行。
  • EventEmitter.emit(...)的返回值是一個Promise,它能夠被解析爲在事件處理器中返回值所構成的數組。

例如:git

class EmitterTest extends Nexus.EventEmitter {
  constructor() {
    super();
    for(let i = 0; i < 4; i++)
      this.on('test', value => { console.log(`fired test ${i}!`); console.inspect(value); });
    for(let i = 0; i < 4; i++)
      this.on('returns-a-value', v => `${v + i}`);
  }
}

const test = new EmitterTest();

async function start() {
  await test.emit('test', { payload: 'test 1' });
  console.log('first test done!');
  await test.emit('test', { payload: 'test 2' });
  console.log('second test done!');
  const values = await test.emit('returns-a-value', 10);
  console.log('third test done, returned values are:'); console.inspect(values);
}

start().catch(console.error);
複製代碼

I/O

  • 全部輸入/輸出都經過三個原語完成:Device,Filter和Stream。
  • 全部輸入/輸出原語都實現了EventEmitter類
  • 要使用Device,你須要在Device之上建立一個ReadableStream或WritableStream
  • 要操做數據,能夠將Filters添加到ReadableStream或WritableStream中。
  • 最後,使用source.pipe(...destinationStreams),而後等待source.resume()來處理數據。
  • 全部的輸入/輸出操做都是使用ArrayBuffer對象完成的。
  • Filter試了了process(buffer)方法來處理數據。

例如:使用2個獨立的輸出文件將UTF-8轉換爲UTF6。es6

const startTime = Date.now();
  try {
    const device = new Nexus.IO.FilePushDevice('enwik8');
    const stream = new Nexus.IO.ReadableStream(device);

    stream.pushFilter(new Nexus.IO.EncodingConversionFilter("UTF-8", "UTF-16LE"));

    const wstreams = [0,1,2,3]
      .map(i => new Nexus.IO.WritableStream(new Nexus.IO.FileSinkDevice('enwik16-' + i)));

    console.log('piping...');

    stream.pipe(...wstreams);

    console.log('streaming...');

    await stream.resume();

    await stream.close();

    await Promise.all(wstreams.map(stream => stream.close()));

    console.log(`finished in ${(Date.now() * startTime) / 1000} seconds!`);
  } catch (e) {
    console.error('An error occurred: ', e);
  }
}

start().catch(console.error);

複製代碼

TCP/UDP

  • Nexus.js提供了一個Acceptor類,負責綁定ip地址/端口和監聽鏈接
  • 每次收到一個鏈接請求,connection事件就會被觸發,而且提供一個Socket設備。
  • 每個Socket實例是全雙工的I/O設備。
  • 你可使用ReadableStream和WritableStream來操做Socket。

最基礎的例子:(向客戶端發送「Hello World」)github

const acceptor = new Nexus.Net.TCP.Acceptor();
let count = 0;

acceptor.on('connection', (socket, endpoint) => {
  const connId = count++;
  console.log(`connection #${connId} from ${endpoint.address}:${endpoint.port}`);
  const rstream = new Nexus.IO.ReadableStream(socket);
  const wstream = new Nexus.IO.WritableStream(socket);
  const buffer = new Uint8Array(13);
  const message = 'Hello World!\n';
  for(let i = 0; i < 13; i++)
    buffer[i] = message.charCodeAt(i);
  rstream.pushFilter(new Nexus.IO.UTF8StringFilter());
  rstream.on('data', buffer => console.log(`got message: ${buffer}`));
  rstream.resume().catch(e => console.log(`client #${connId} at ${endpoint.address}:${endpoint.port} disconnected!`));
  console.log(`sending greeting to #${connId}!`);
  wstream.write(buffer);
});

acceptor.bind('127.0.0.1', 10000);
acceptor.listen();

console.log('server ready');
複製代碼

Http

  • Nexus提供了一個Nexus.Net.HTTP.Server類,該類基本上繼承了TCPAcceptor
  • 一些基礎接口
  • 當服務器端完成了對傳入鏈接的基本的Http頭的解析/校驗時,將使用鏈接和一樣的信息觸發connection事件
  • 每個鏈接實例都又一個request和一個response對象。這些是輸入/輸出設備。
  • 你能夠構造ReadableStream和WritableStream來操縱request/response。
  • 若是你經過管道鏈接到一個Response對象,輸入的流將會使用分塊編碼的模式。否者,你可使用response.write()來寫入一個常規的字符串。

複雜例子:(基本的Http服務器與塊編碼,細節省略)apache

....


/** * Creates an input stream from a path. * @param path * @returns {Promise<ReadableStream>} */
async function createInputStream(path) {
  if (path.startsWith('/')) // If it starts with '/', omit it.
    path = path.substr(1);
  if (path.startsWith('.')) // If it starts with '.', reject it.
    throw new NotFoundError(path);
  if (path === '/' || !path) // If it's empty, set to index.html.
    path = 'index.html';
  /** * `import.meta.dirname` and `import.meta.filename` replace the old CommonJS `__dirname` and `__filename`. */
  const filePath = Nexus.FileSystem.join(import.meta.dirname, 'server_root', path);
  try {
    // Stat the target path.
    const {type} = await Nexus.FileSystem.stat(filePath);
    if (type === Nexus.FileSystem.FileType.Directory) // If it's a directory, return its 'index.html'
      return createInputStream(Nexus.FileSystem.join(filePath, 'index.html'));
    else if (type === Nexus.FileSystem.FileType.Unknown || type === Nexus.FileSystem.FileType.NotFound)
      // If it's not found, throw NotFound.
      throw new NotFoundError(path);
  } catch(e) {
    if (e.code)
      throw e;
    throw new NotFoundError(path);
  }
  try {
    // First, we create a device.
    const fileDevice = new Nexus.IO.FilePushDevice(filePath);
    // Then we return a new ReadableStream created using our source device.
    return new Nexus.IO.ReadableStream(fileDevice);
  } catch(e) {
    throw new InternalServerError(e.message);
  }
}

/** * Connections counter. */
let connections = 0;

/** * Create a new HTTP server. * @type {Nexus.Net.HTTP.Server} */
const server = new Nexus.Net.HTTP.Server();

// A server error means an error occurred while the server was listening to connections.
// We can mostly ignore such errors, we display them anyway.
server.on('error', e => {
  console.error(FgRed + Bright + 'Server Error: ' + e.message + '\n' + e.stack, Reset);
});

/** * Listen to connections. */
server.on('connection', async (connection, peer) => {
  // Start with a connection ID of 0, increment with every new connection.
  const connId = connections++;
  // Record the start time for this connection.
  const startTime = Date.now();
  // Destructuring is supported, why not use it?
  const { request, response } = connection;
  // Parse the URL parts.
  const { path } = parseURL(request.url);
  // Here we'll store any errors that occur during the connection.
  const errors = [];
  // inStream is our ReadableStream file source, outStream is our response (device) wrapped in a WritableStream.
  let inStream, outStream;
  try {
    // Log the request.
    console.log(`> #${FgCyan + connId + Reset} ${Bright + peer.address}:${peer.port + Reset} ${ FgGreen + request.method + Reset} "${FgYellow}${path}${Reset}"`, Reset);
    // Set the 'Server' header.
    response.set('Server', `nexus.js/0.1.1`);
    // Create our input stream.
    inStream = await createInputStream(path);
    // Create our output stream.
    outStream = new Nexus.IO.WritableStream(response);
    // Hook all `error` events, add any errors to our `errors` array.
    inStream.on('error', e => { errors.push(e); });
    request.on('error', e => { errors.push(e); });
    response.on('error', e => { errors.push(e); });
    outStream.on('error', e => { errors.push(e); });
    // Set content type and request status.
    response
      .set('Content-Type', mimeType(path))
      .status(200);
    // Hook input to output(s).
    const disconnect = inStream.pipe(outStream);
    try {
      // Resume our file stream, this causes the stream to switch to HTTP chunked encoding.
      // This will return a promise that will only resolve after the last byte (HTTP chunk) is written.
      await inStream.resume();
    } catch (e) {
      // Capture any errors that happen during the streaming.
      errors.push(e);
    }
    // Disconnect all the callbacks created by `.pipe()`.
    return disconnect();
  } catch(e) {
    // If an error occurred, push it to the array.
    errors.push(e);
    // Set the content type, status, and write a basic message.
    response
      .set('Content-Type', 'text/plain')
      .status(e.code || 500)
      .send(e.message || 'An error has occurred.');
  } finally {
    // Close the streams manually. This is important because we may run out of file handles otherwise.
    if (inStream)
      await inStream.close();
    if (outStream)
      await outStream.close();
    // Close the connection, has no real effect with keep-alive connections.
    await connection.close();
    // Grab the response's status.
    let status = response.status();
    // Determine what colour to output to the terminal.
    const statusColors = {
      '200': Bright + FgGreen, // Green for 200 (OK),
      '404': Bright + FgYellow, // Yellow for 404 (Not Found)
      '500': Bright + FgRed // Red for 500 (Internal Server Error)
    };
    let statusColor = statusColors[status];
    if (statusColor)
      status = statusColor + status + Reset;
    // Log the connection (and time to complete) to the console.
    console.log(`< #${FgCyan + connId + Reset} ${Bright + peer.address}:${peer.port + Reset} ${ FgGreen + request.method + Reset} "${FgYellow}${path}${Reset}" ${status} ${(Date.now() * startTime)}ms` +
      (errors.length ? " " + FgRed + Bright + errors.map(error => error.message).join(', ') + Reset : Reset));
  }
});

/** * IP and port to listen on. */
const ip = '0.0.0.0', port = 3000;
/** * Whether or not to set the `reuse` flag. (optional, default=false) */
const portReuse = true;
/** * Maximum allowed concurrent connections. Default is 128 on my system. (optional, system specific) * @type {number} */
const maxConcurrentConnections = 1000;
/** * Bind the selected address and port. */
server.bind(ip, port, portReuse);
/** * Start listening to requests. */
server.listen(maxConcurrentConnections);
/** * Happy streaming! */
console.log(FgGreen + `Nexus.js HTTP server listening at ${ip}:${port}` + Reset);
複製代碼

基準

我想我已經涵蓋了到目前爲止所實現的一切。那麼如今咱們來談談性能。api

這裏是上訴Http服務器的當前基準,有100個併發鏈接和總共10000個請求:數組

This is ApacheBench, Version 2.3 <$Revision: 1796539 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking localhost (be patient).....done


Server Software:        nexus.js/0.1.1
Server Hostname:        localhost
Server Port:            3000

Document Path:          /
Document Length:        8673 bytes

Concurrency Level:      100
Time taken for tests:   9.991 seconds
Complete requests:      10000
Failed requests:        0
Total transferred:      87880000 bytes
HTML transferred:       86730000 bytes
Requests per second:    1000.94 [#/sec] (mean)
Time per request:       99.906 [ms] (mean)
Time per request:       0.999 [ms] (mean, across all concurrent requests)
Transfer rate:          8590.14 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    0   0.1      0       1
Processing:     6   99  36.6     84     464
Waiting:        5   99  36.4     84     463
Total:          6  100  36.6     84     464

Percentage of the requests served within a certain time (ms)
  50%     84
  66%     97
  75%    105
  80%    112
  90%    134
  95%    188
  98%    233
  99%    238
 100%    464 (longest request)
複製代碼

每秒1000個請求。在一個老的i7上,上面運行了包括這個基準測試軟件,一個佔用了5G內存的IDE,以及服務器自己。

voodooattack@voodooattack:~$ cat /proc/cpuinfo 
processor   : 0
vendor_id   : GenuineIntel
cpu family  : 6
model       : 60
model name  : Intel(R) Core(TM) i7-4770 CPU @ 3.40GHz
stepping    : 3
microcode   : 0x22
cpu MHz     : 3392.093
cache size  : 8192 KB
physical id : 0
siblings    : 8
core id     : 0
cpu cores   : 4
apicid      : 0
initial apicid  : 0
fpu     : yes
fpu_exception   : yes
cpuid level : 13
wp      : yes
flags       : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc cpuid aperfmperf pni pclmulqdq dtes64 monitor ds_cpl vmx smx est tm2 ssse3 sdbg fma cx16 xtpr pdcm pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm cpuid_fault tpr_shadow vnmi flexpriority ept vpid fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid xsaveopt dtherm ida arat pln pts
bugs        :
bogomips    : 6784.18
clflush size    : 64
cache_alignment : 64
address sizes   : 39 bits physical, 48 bits virtual
power management:
複製代碼

圖形表示結果:

我嘗試了1000個併發請求,可是APacheBench因爲許多套接字被打開而超時。我嘗試了httperf,這裏是結果:

voodooattack@voodooattack:~$ httperf --port=3000 --num-conns=10000 --rate=1000
httperf --client=0/1 --server=localhost --port=3000 --uri=/ --rate=1000 --send-buffer=4096 --recv-buffer=16384 --num-conns=10000 --num-calls=1
httperf: warning: open file limit > FD_SETSIZE; limiting max. # of open files to FD_SETSIZE
Maximum connect burst length: 262

Total: connections 9779 requests 9779 replies 9779 test-duration 10.029 s

Connection rate: 975.1 conn/s (1.0 ms/conn, <=1022 concurrent connections)
Connection time [ms]: min 0.5 avg 337.9 max 7191.8 median 79.5 stddev 848.1
Connection time [ms]: connect 207.3
Connection length [replies/conn]: 1.000

Request rate: 975.1 req/s (1.0 ms/req)
Request size [B]: 62.0

Reply rate [replies/s]: min 903.5 avg 974.6 max 1045.7 stddev 100.5 (2 samples)
Reply time [ms]: response 129.5 transfer 1.1
Reply size [B]: header 89.0 content 8660.0 footer 2.0 (total 8751.0)
Reply status: 1xx=0 2xx=9779 3xx=0 4xx=0 5xx=0

CPU time [s]: user 0.35 system 9.67 (user 3.5% system 96.4% total 99.9%)
Net I/O: 8389.9 KB/s (68.7*10^6 bps)

Errors: total 221 client-timo 0 socket-timo 0 connrefused 0 connreset 0
Errors: fd-unavail 221 addrunavail 0 ftab-full 0 other 0
複製代碼

正如你看到的,它任然能工做。儘管因爲壓力,有些鏈接會超時。我仍在研究致使這個問題的緣由。

這個項目的源代碼就在GitHub上,隨時能夠查看。

原文地址:https://dev.to/voodooattack/introducing-nexusjs-a-multi-threaded-javascript-run-time-3g6

相關文章
相關標籤/搜索