All files / lib/internal js_stream_socket.js

98.76% Statements 239/242
100% Branches 43/43
88.88% Functions 16/18
98.76% Lines 239/242

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 2439x 9x 9x 9x 9x 9x 9x 9x 9x 9x 9x 9x 9x 9x 243x 243x 9x 9x 9x 9x 9x 9x 9x 9x   9x 11955x 9x 989x 9x 12x 9x 1550x 9x 9x 9x 9x 9x 9x 9x 9x 9x 9x 9x 9x 9x 9x 9x 10993x 10993x 10535x 10535x 10993x 10993x 10993x 10993x 10993x 10993x 10993x 10993x 10993x 10993x 10993x 10993x 3059x 3059x 2x 2x 2x 2x 2x 2x 2x 3057x 3057x 3057x 3057x 10993x 10993x 10993x 11x 11x 11x 10993x 10993x 10993x 10535x 10535x 10535x 10993x 10993x 10993x 10993x 10993x 10993x 10993x 10993x 10993x 10993x 10993x 10993x 10993x 9x 9x 9x 9x 1x 1x 9x 9x     9x 9x 11955x 11955x 11955x 9x 9x 990x 990x 990x 9x 9x 23x 23x 23x 23x 23x 23x 23x 23x 23x 23x 11x 11x 11x 12x 12x 12x 12x 12x 12x 12x 12x 12x 9x 12x 12x 12x 23x 9x 9x 9x 10544x 10544x 10544x 10x 10x 10x 10544x 9x 9x 1550x 1550x 1550x 1550x 1550x 1550x 1550x 1550x 1550x 1550x 1550x 1550x 1550x 1550x 1550x 1550x 1550x 1550x 1550x 3295x 3295x 1538x 1538x 1538x 1538x 1538x 3295x 2x 2x 1538x 1538x 1538x 1524x 1538x 3295x 1550x 1550x 1550x 9x 9x 9x 12059x 12059x 12059x 1525x 1525x 1525x 1525x 12059x 11x 11x 11x 11x 12059x 9x 9x 10535x 10535x 10535x 10535x 10535x 10535x 10535x 10535x 10535x 10535x 10535x 10535x 10535x 10535x 10535x 10535x 10535x 10535x 10535x 10535x 9x 9x 9x  
'use strict';
 
const {
  Symbol,
} = primordials;
 
const { setImmediate } = require('timers');
const assert = require('internal/assert');
const { Socket } = require('net');
const { JSStream } = internalBinding('js_stream');
const uv = internalBinding('uv');
let debug = require('internal/util/debuglog').debuglog(
  'stream_socket',
  (fn) => {
    debug = fn;
  }
);
const { owner_symbol } = require('internal/async_hooks').symbols;
const { ERR_STREAM_WRAP } = require('internal/errors').codes;
 
const kCurrentWriteRequest = Symbol('kCurrentWriteRequest');
const kCurrentShutdownRequest = Symbol('kCurrentShutdownRequest');
const kPendingShutdownRequest = Symbol('kPendingShutdownRequest');
 
function isClosing() { return this[owner_symbol].isClosing(); }
 
function onreadstart() { return this[owner_symbol].readStart(); }
 
function onreadstop() { return this[owner_symbol].readStop(); }
 
function onshutdown(req) { return this[owner_symbol].doShutdown(req); }
 
function onwrite(req, bufs) { return this[owner_symbol].doWrite(req, bufs); }
 
/* This class serves as a wrapper for when the C++ side of Node wants access
 * to a standard JS stream. For example, TLS or HTTP do not operate on network
 * resources conceptually, although that is the common case and what we are
 * optimizing for; in theory, they are completely composable and can work with
 * any stream resource they see.
 *
 * For the common case, i.e. a TLS socket wrapping around a net.Socket, we
 * can skip going through the JS layer and let TLS access the raw C++ handle
 * of a net.Socket. The flipside of this is that, to maintain composability,
 * we need a way to create "fake" net.Socket instances that call back into a
 * "real" JavaScript stream. JSStreamSocket is exactly this.
 */
class JSStreamSocket extends Socket {
  constructor(stream) {
    const handle = new JSStream();
    handle.close = (cb) => {
      debug('close');
      this.doClose(cb);
    };
    // Inside of the following functions, `this` refers to the handle
    // and `this[owner_symbol]` refers to this JSStreamSocket instance.
    handle.isClosing = isClosing;
    handle.onreadstart = onreadstart;
    handle.onreadstop = onreadstop;
    handle.onshutdown = onshutdown;
    handle.onwrite = onwrite;
 
    stream.pause();
    stream.on('error', (err) => this.emit('error', err));
    const ondata = (chunk) => {
      if (typeof chunk === 'string' ||
          stream.readableObjectMode === true) {
        // Make sure that no further `data` events will happen.
        stream.pause();
        stream.removeListener('data', ondata);
 
        this.emit('error', new ERR_STREAM_WRAP());
        return;
      }
 
      debug('data', chunk.length);
      if (this._handle)
        this._handle.readBuffer(chunk);
    };
    stream.on('data', ondata);
    stream.once('end', () => {
      debug('end');
      if (this._handle)
        this._handle.emitEOF();
    });
    // Some `Stream` don't pass `hasError` parameters when closed.
    stream.once('close', () => {
      // Errors emitted from `stream` have also been emitted to this instance
      // so that we don't pass errors to `destroy()` again.
      this.destroy();
    });
 
    super({ handle, manualStart: true });
    this.stream = stream;
    this[kCurrentWriteRequest] = null;
    this[kCurrentShutdownRequest] = null;
    this[kPendingShutdownRequest] = null;
    this.readable = stream.readable;
    this.writable = stream.writable;
 
    // Start reading.
    this.read(0);
  }
 
  // Allow legacy requires in the test suite to keep working:
  //   const { StreamWrap } = require('internal/js_stream_socket')
  static get StreamWrap() {
    return JSStreamSocket;
  }
 
  isClosing() {
    return !this.readable || !this.writable;
  }
 
  readStart() {
    this.stream.resume();
    return 0;
  }
 
  readStop() {
    this.stream.pause();
    return 0;
  }
 
  doShutdown(req) {
    // TODO(addaleax): It might be nice if we could get into a state where
    // DoShutdown() is not called on streams while a write is still pending.
    //
    // Currently, the only part of the code base where that happens is the
    // TLS implementation, which calls both DoWrite() and DoShutdown() on the
    // underlying network stream inside of its own DoShutdown() method.
    // Working around that on the native side is not quite trivial (yet?),
    // so for now that is supported here.
 
    if (this[kCurrentWriteRequest] !== null) {
      this[kPendingShutdownRequest] = req;
      return 0;
    }
    assert(this[kCurrentWriteRequest] === null);
    assert(this[kCurrentShutdownRequest] === null);
    this[kCurrentShutdownRequest] = req;
 
    const handle = this._handle;
 
    setImmediate(() => {
      // Ensure that write is dispatched asynchronously.
      this.stream.end(() => {
        this.finishShutdown(handle, 0);
      });
    });
    return 0;
  }
 
  // handle === this._handle except when called from doClose().
  finishShutdown(handle, errCode) {
    // The shutdown request might already have been cancelled.
    if (this[kCurrentShutdownRequest] === null)
      return;
    const req = this[kCurrentShutdownRequest];
    this[kCurrentShutdownRequest] = null;
    handle.finishShutdown(req, errCode);
  }
 
  doWrite(req, bufs) {
    assert(this[kCurrentWriteRequest] === null);
    assert(this[kCurrentShutdownRequest] === null);
 
    const handle = this._handle;
    const self = this;
 
    let pending = bufs.length;
 
    this.stream.cork();
    // Use `var` over `let` for performance optimization.
    // eslint-disable-next-line no-var
    for (var i = 0; i < bufs.length; ++i)
      this.stream.write(bufs[i], done);
    this.stream.uncork();
 
    // Only set the request here, because the `write()` calls could throw.
    this[kCurrentWriteRequest] = req;
 
    function done(err) {
      if (!err && --pending !== 0)
        return;
 
      // Ensure that this is called once in case of error
      pending = 0;
 
      let errCode = 0;
      if (err) {
        errCode = uv[`UV_${err.code}`] || uv.UV_EPIPE;
      }
 
      // Ensure that write was dispatched
      setImmediate(() => {
        self.finishWrite(handle, errCode);
      });
    }
 
    return 0;
  }
 
  // handle === this._handle except when called from doClose().
  finishWrite(handle, errCode) {
    // The write request might already have been cancelled.
    if (this[kCurrentWriteRequest] === null)
      return;
    const req = this[kCurrentWriteRequest];
    this[kCurrentWriteRequest] = null;
 
    handle.finishWrite(req, errCode);
    if (this[kPendingShutdownRequest]) {
      const req = this[kPendingShutdownRequest];
      this[kPendingShutdownRequest] = null;
      this.doShutdown(req);
    }
  }
 
  doClose(cb) {
    const handle = this._handle;
 
    // When sockets of the "net" module destroyed, they will call
    // `this._handle.close()` which will also emit EOF if not emitted before.
    // This feature makes sockets on the other side emit "end" and "close"
    // even though we haven't called `end()`. As `stream` are likely to be
    // instances of `net.Socket`, calling `stream.destroy()` manually will
    // avoid issues that don't properly close wrapped connections.
    this.stream.destroy();
 
    setImmediate(() => {
      // Should be already set by net.js
      assert(this._handle === null);
 
      this.finishWrite(handle, uv.UV_ECANCELED);
      this.finishShutdown(handle, uv.UV_ECANCELED);
 
      cb();
    });
  }
}
 
module.exports = JSStreamSocket;