All files / lib/internal/child_process serialization.js

96.95% Statements 159/164
84.61% Branches 22/26
100% Functions 8/8
96.95% Lines 159/164

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 1655x 5x 5x 5x 5x 5x 5x 5x 5x 5x 5x 5x 5x 5x 5x 5x 5x 5x 5x 5x 5x 5x 5x 5x 5x 5x 5x 5x 5x 10x 10x 10x 10x     10x 5x 5x 5x 5x 10x 10x 10x       10x 5x 5x 5x 5x 5x 5x 5x 5x 5x 5x 5x 5x 5x 5x 536x 536x 536x 536x 536x 536x 536x 536x 537x 537x 537x 537x 537x 537x 537x 537x 537x 537x 22x 537x 537x 4x 4x 4x 537x 537x 537x 537x 537x 537x 537x 537x 537x 537x 537x 537x 537x 22x 536x 536x 5x 5x 5x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 5x 5x 5x 5x 5x 883x 883x 5x 5x 5x 3226x 3226x 3226x 3226x 3226x 3226x 3226x 3226x 3226x 3226x 37x 3226x 3189x 3189x 3189x 3189x 3189x 3226x 5x 5x 5x 5401x 5401x 5x 5x 5x 5x  
'use strict';
 
const {
  JSONParse,
  JSONStringify,
  StringPrototypeSplit,
  ArrayPrototypePush,
  Symbol,
  TypedArrayPrototypeSubarray,
} = primordials;
const { Buffer } = require('buffer');
const { StringDecoder } = require('string_decoder');
const v8 = require('v8');
const { isArrayBufferView } = require('internal/util/types');
const assert = require('internal/assert');
const { streamBaseState, kLastWriteWasAsync } = internalBinding('stream_wrap');
 
const kMessageBuffer = Symbol('kMessageBuffer');
const kMessageBufferSize = Symbol('kMessageBufferSize');
const kJSONBuffer = Symbol('kJSONBuffer');
const kStringDecoder = Symbol('kStringDecoder');
 
// Extend V8's serializer APIs to give more JSON-like behaviour in
// some cases; in particular, for native objects this serializes them the same
// way that JSON does rather than throwing an exception.
const kArrayBufferViewTag = 0;
const kNotArrayBufferViewTag = 1;
class ChildProcessSerializer extends v8.DefaultSerializer {
  _writeHostObject(object) {
    if (isArrayBufferView(object)) {
      this.writeUint32(kArrayBufferViewTag);
      return super._writeHostObject(object);
    }
    this.writeUint32(kNotArrayBufferViewTag);
    this.writeValue({ ...object });
  }
}
 
class ChildProcessDeserializer extends v8.DefaultDeserializer {
  _readHostObject() {
    const tag = this.readUint32();
    if (tag === kArrayBufferViewTag)
      return super._readHostObject();

    assert(tag === kNotArrayBufferViewTag);
    return this.readValue();
  }
}
 
// Messages are parsed in either of the following formats:
// - Newline-delimited JSON, or
// - V8-serialized buffers, prefixed with their length as a big endian uint32
//   (aka 'advanced')
const advanced = {
  initMessageChannel(channel) {
    channel[kMessageBuffer] = [];
    channel[kMessageBufferSize] = 0;
    channel.buffering = false;
  },
 
  *parseChannelMessages(channel, readData) {
    if (readData.length === 0) return;
 
    ArrayPrototypePush(channel[kMessageBuffer], readData);
    channel[kMessageBufferSize] += readData.length;
 
    // Index 0 should always be present because we just pushed data into it.
    let messageBufferHead = channel[kMessageBuffer][0];
    while (messageBufferHead.length >= 4) {
      // We call `readUInt32BE` manually here, because this is faster than first converting
      // it to a buffer and using `readUInt32BE` on that.
      const fullMessageSize = (
        messageBufferHead[0] << 24 |
        messageBufferHead[1] << 16 |
        messageBufferHead[2] << 8 |
        messageBufferHead[3]
      ) + 4;
 
      if (channel[kMessageBufferSize] < fullMessageSize) break;
 
      const concatenatedBuffer = channel[kMessageBuffer].length === 1 ?
        channel[kMessageBuffer][0] :
        Buffer.concat(
          channel[kMessageBuffer],
          channel[kMessageBufferSize]
        );
 
      const deserializer = new ChildProcessDeserializer(
        TypedArrayPrototypeSubarray(concatenatedBuffer, 4, fullMessageSize)
      );
 
      messageBufferHead = TypedArrayPrototypeSubarray(concatenatedBuffer, fullMessageSize);
      channel[kMessageBufferSize] = messageBufferHead.length;
      channel[kMessageBuffer] =
        channel[kMessageBufferSize] !== 0 ? [messageBufferHead] : [];
 
      deserializer.readHeader();
      yield deserializer.readValue();
    }
 
    channel.buffering = channel[kMessageBufferSize] > 0;
  },
 
  writeChannelMessage(channel, req, message, handle) {
    const ser = new ChildProcessSerializer();
    // Add 4 bytes, to later populate with message length
    ser.writeRawBytes(Buffer.allocUnsafe(4));
    ser.writeHeader();
    ser.writeValue(message);
 
    const serializedMessage = ser.releaseBuffer();
    const serializedMessageLength = serializedMessage.length - 4;
 
    serializedMessage.set([
      serializedMessageLength >> 24 & 0xFF,
      serializedMessageLength >> 16 & 0xFF,
      serializedMessageLength >> 8 & 0xFF,
      serializedMessageLength & 0xFF,
    ], 0);
 
    const result = channel.writeBuffer(req, serializedMessage, handle);
 
    // Mirror what stream_base_commons.js does for Buffer retention.
    if (streamBaseState[kLastWriteWasAsync])
      req.buffer = serializedMessage;
 
    return result;
  },
};
 
const json = {
  initMessageChannel(channel) {
    channel[kJSONBuffer] = '';
    channel[kStringDecoder] = undefined;
  },
 
  *parseChannelMessages(channel, readData) {
    if (readData.length === 0) return;
 
    if (channel[kStringDecoder] === undefined)
      channel[kStringDecoder] = new StringDecoder('utf8');
    const chunks =
      StringPrototypeSplit(channel[kStringDecoder].write(readData), '\n');
    const numCompleteChunks = chunks.length - 1;
    // Last line does not have trailing linebreak
    const incompleteChunk = chunks[numCompleteChunks];
    if (numCompleteChunks === 0) {
      channel[kJSONBuffer] += incompleteChunk;
    } else {
      chunks[0] = channel[kJSONBuffer] + chunks[0];
      for (let i = 0; i < numCompleteChunks; i++)
        yield JSONParse(chunks[i]);
      channel[kJSONBuffer] = incompleteChunk;
    }
    channel.buffering = channel[kJSONBuffer].length !== 0;
  },
 
  writeChannelMessage(channel, req, message, handle) {
    const string = JSONStringify(message) + '\n';
    return channel.writeUtf8String(req, string, handle);
  },
};
 
module.exports = { advanced, json };