All files / lib/internal/worker io.js

100% Statements 271/271
100% Branches 42/42
100% Functions 15/15
100% Lines 271/271

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 41483x 41483x 41483x 41483x 119x 119x 119x 119x 119x 119x 119x 119x 119x 82974x 119x 119x 9x 9x 8x 8x 8x 1x 1x 1x 9x 119x 119x 119x 32254x 32254x 32254x 119x 119x 119x 119x 119x 119x 119x 119x 31604x 31604x 31604x 119x 119x 119x 119x 119x 119x 119x 119x 10030x 10030x 10030x 119x 119x 119x 119x 119x 119x 14x 14x 14x 14x 14x 14x 7x 14x 2x 14x 5x 5x 14x 14x 14x 119x 119x 32783x 32783x 32783x 32783x 32783x 32783x 32783x 75074x 21760x 21760x 21760x 32783x 32783x 41412x 20398x 20398x 20398x 32783x 32783x 119x 119x 119x 119x 1333x 1333x 1333x 1333x 1333x 1333x 975x 27x 27x 27x 1333x 1333x 119x 119x 5597x 27x 27x 27x 27x 5597x 5597x 5597x 5597x 5597x 5597x 119x 119x 119x 119x 552x 552x 552x 552x 552x 119x 119x 4583x 4583x 4583x 4583x 4583x 4583x 4583x 4583x 4583x 4583x 119x 119x 1x 1x 1x 1x 1x 1x 1x 119x 119x 5010x 5010x 5010x 5010x 5010x 5010x 5010x 119x 119x 275x 275x 275x 275x 275x 275x 275x 275x 275x 119x 11x 11x 11x 3x 11x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x 119x  
'use strict';
 
const {
  ObjectAssign,
  ObjectCreate,
  ObjectDefineProperty,
  ObjectGetOwnPropertyDescriptors,
  ObjectGetPrototypeOf,
  ObjectSetPrototypeOf,
  Symbol,
} = primordials;
 
const {
  handle_onclose: handleOnCloseSymbol,
  oninit: onInitSymbol,
  no_message_symbol: noMessageSymbol
} = internalBinding('symbols');
const {
  MessagePort,
  MessageChannel,
  drainMessagePort,
  moveMessagePortToContext,
  receiveMessageOnPort: receiveMessageOnPort_,
  stopMessagePort
} = internalBinding('messaging');
const {
  threadId,
  getEnvMessagePort
} = internalBinding('worker');
 
const { Readable, Writable } = require('stream');
const EventEmitter = require('events');
const { inspect } = require('internal/util/inspect');
const debug = require('internal/util/debuglog').debuglog('worker');
 
const kIncrementsPortRef = Symbol('kIncrementsPortRef');
const kName = Symbol('kName');
const kOnMessageListener = Symbol('kOnMessageListener');
const kPort = Symbol('kPort');
const kWaitingStreams = Symbol('kWaitingStreams');
const kWritableCallbacks = Symbol('kWritableCallbacks');
const kStartedReading = Symbol('kStartedReading');
const kStdioWantsMoreDataCallback = Symbol('kStdioWantsMoreDataCallback');
 
const messageTypes = {
  UP_AND_RUNNING: 'upAndRunning',
  COULD_NOT_SERIALIZE_ERROR: 'couldNotSerializeError',
  ERROR_MESSAGE: 'errorMessage',
  STDIO_PAYLOAD: 'stdioPayload',
  STDIO_WANTS_MORE_DATA: 'stdioWantsMoreData',
  LOAD_SCRIPT: 'loadScript'
};
 
// We have to mess with the MessagePort prototype a bit, so that a) we can make
// it inherit from EventEmitter, even though it is a C++ class, and b) we do
// not provide methods that are not present in the Browser and not documented
// on our side (e.g. hasRef).
// Save a copy of the original set of methods as a shallow clone.
const MessagePortPrototype = ObjectCreate(
  ObjectGetPrototypeOf(MessagePort.prototype),
  ObjectGetOwnPropertyDescriptors(MessagePort.prototype));
// Set up the new inheritance chain.
ObjectSetPrototypeOf(MessagePort, EventEmitter);
ObjectSetPrototypeOf(MessagePort.prototype, EventEmitter.prototype);
// Copy methods that are inherited from HandleWrap, because
// changing the prototype of MessagePort.prototype implicitly removed them.
MessagePort.prototype.ref = MessagePortPrototype.ref;
MessagePort.prototype.unref = MessagePortPrototype.unref;
 
// A communication channel consisting of a handle (that wraps around an
// uv_async_t) which can receive information from other threads and emits
// .onmessage events, and a function used for sending data to a MessagePort
// in some other thread.
MessagePort.prototype[kOnMessageListener] = function onmessage(event) {
  if (event.data && event.data.type !== messageTypes.STDIO_WANTS_MORE_DATA)
    debug(`[${threadId}] received message`, event);
  // Emit the deserialized object to userland.
  this.emit('message', event.data);
};
 
// This is for compatibility with the Web's MessagePort API. It makes sense to
// provide it as an `EventEmitter` in Node.js, but if somebody overrides
// `onmessage`, we'll switch over to the Web API model.
ObjectDefineProperty(MessagePort.prototype, 'onmessage', {
  enumerable: true,
  configurable: true,
  get() {
    return this[kOnMessageListener];
  },
  set(value) {
    this[kOnMessageListener] = value;
    if (typeof value === 'function') {
      this.ref();
      MessagePortPrototype.start.call(this);
    } else {
      this.unref();
      stopMessagePort(this);
    }
  }
});
 
// This is called from inside the `MessagePort` constructor.
function oninit() {
  setupPortReferencing(this, this, 'message');
}
 
ObjectDefineProperty(MessagePort.prototype, onInitSymbol, {
  enumerable: true,
  writable: false,
  value: oninit
});
 
// This is called after the underlying `uv_async_t` has been closed.
function onclose() {
  this.emit('close');
}
 
ObjectDefineProperty(MessagePort.prototype, handleOnCloseSymbol, {
  enumerable: false,
  writable: false,
  value: onclose
});
 
MessagePort.prototype.close = function(cb) {
  if (typeof cb === 'function')
    this.once('close', cb);
  MessagePortPrototype.close.call(this);
};
 
ObjectDefineProperty(MessagePort.prototype, inspect.custom, {
  enumerable: false,
  writable: false,
  value: function inspect() {  // eslint-disable-line func-name-matching
    let ref;
    try {
      // This may throw when `this` does not refer to a native object,
      // e.g. when accessing the prototype directly.
      ref = MessagePortPrototype.hasRef.call(this);
    } catch { return this; }
    return ObjectAssign(ObjectCreate(MessagePort.prototype),
                        ref === undefined ? {
                          active: false,
                        } : {
                          active: true,
                          refed: ref
                        },
                        this);
  }
});
 
function setupPortReferencing(port, eventEmitter, eventName) {
  // Keep track of whether there are any workerMessage listeners:
  // If there are some, ref() the channel so it keeps the event loop alive.
  // If there are none or all are removed, unref() the channel so the worker
  // can shutdown gracefully.
  port.unref();
  eventEmitter.on('newListener', (name) => {
    if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
      port.ref();
      MessagePortPrototype.start.call(port);
    }
  });
  eventEmitter.on('removeListener', (name) => {
    if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
      stopMessagePort(port);
      port.unref();
    }
  });
}
 
 
class ReadableWorkerStdio extends Readable {
  constructor(port, name) {
    super();
    this[kPort] = port;
    this[kName] = name;
    this[kIncrementsPortRef] = true;
    this[kStartedReading] = false;
    this.on('end', () => {
      if (this[kStartedReading] && this[kIncrementsPortRef]) {
        if (--this[kPort][kWaitingStreams] === 0)
          this[kPort].unref();
      }
    });
  }
 
  _read() {
    if (!this[kStartedReading] && this[kIncrementsPortRef]) {
      this[kStartedReading] = true;
      if (this[kPort][kWaitingStreams]++ === 0)
        this[kPort].ref();
    }
 
    this[kPort].postMessage({
      type: messageTypes.STDIO_WANTS_MORE_DATA,
      stream: this[kName]
    });
  }
}
 
class WritableWorkerStdio extends Writable {
  constructor(port, name) {
    super({ decodeStrings: false });
    this[kPort] = port;
    this[kName] = name;
    this[kWritableCallbacks] = [];
  }
 
  _write(chunk, encoding, cb) {
    this[kPort].postMessage({
      type: messageTypes.STDIO_PAYLOAD,
      stream: this[kName],
      chunk,
      encoding
    });
    this[kWritableCallbacks].push(cb);
    if (this[kPort][kWaitingStreams]++ === 0)
      this[kPort].ref();
  }
 
  _final(cb) {
    this[kPort].postMessage({
      type: messageTypes.STDIO_PAYLOAD,
      stream: this[kName],
      chunk: null
    });
    cb();
  }
 
  [kStdioWantsMoreDataCallback]() {
    const cbs = this[kWritableCallbacks];
    this[kWritableCallbacks] = [];
    for (const cb of cbs)
      cb();
    if ((this[kPort][kWaitingStreams] -= cbs.length) === 0)
      this[kPort].unref();
  }
}
 
function createWorkerStdio() {
  const port = getEnvMessagePort();
  port[kWaitingStreams] = 0;
  return {
    stdin: new ReadableWorkerStdio(port, 'stdin'),
    stdout: new WritableWorkerStdio(port, 'stdout'),
    stderr: new WritableWorkerStdio(port, 'stderr')
  };
}
 
function receiveMessageOnPort(port) {
  const message = receiveMessageOnPort_(port);
  if (message === noMessageSymbol) return undefined;
  return { message };
}
 
module.exports = {
  drainMessagePort,
  messageTypes,
  kPort,
  kIncrementsPortRef,
  kWaitingStreams,
  kStdioWantsMoreDataCallback,
  moveMessagePortToContext,
  MessagePort,
  MessageChannel,
  receiveMessageOnPort,
  setupPortReferencing,
  ReadableWorkerStdio,
  WritableWorkerStdio,
  createWorkerStdio
};