All files / lib/internal js_stream_socket.js

98.76% Statements 238/241
100% Branches 43/43
88.89% Functions 16/18
98.76% Lines 238/241

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 24210x 10x 10x 10x 10x 10x 10x 10x 10x 10x 10x 10x 10x 10x 33x 33x 10x 10x 10x 10x 10x 10x 10x 10x   10x 10608x 10x 60x 10x 12x 10x 605x 10x 10x 10x 10x 10x 10x 10x 10x 10x 10x 10x 10x 10x 10x 10x 10573x 10573x 10535x 10535x 10573x 10573x 10573x 10573x 10573x 10573x 10573x 10573x 10573x 10573x 10573x 10573x 641x 641x 2x 2x 2x 2x 2x 2x 2x 639x 639x 639x 639x 10573x 10573x 10573x 8x 8x 8x 10573x 10573x 10573x 10535x 10535x 10535x 10573x 10573x 10573x 10573x 10573x 10573x 10573x 10573x 10573x 10573x 10573x 10573x 10573x 10x 10x 10x 10x 1x 1x 10x 10x     10x 10x 10608x 10608x 10608x 10x 10x 61x 61x 61x 10x 10x 23x 23x 23x 23x 23x 23x 23x 23x 23x 23x 11x 11x 11x 12x 12x 12x 12x 12x 12x 12x 12x 12x 9x 12x 12x 12x 23x 10x 10x 10x 10544x 10544x 10544x 10x 10x 10x 10544x 10x 10x 605x 605x 605x 605x 605x 605x 605x 605x 605x 605x 605x 605x 605x 605x 605x 605x 605x 605x 644x 644x 598x 598x 598x 598x 598x 644x 2x 2x 598x 598x 598x 598x 598x 644x 605x 605x 605x 10x 10x 10x 11133x 11133x 11133x 599x 599x 599x 599x 11133x 11x 11x 11x 11x 11133x 10x 10x 10535x 10535x 10535x 10535x 10535x 10535x 10535x 10535x 10535x 10535x 10535x 10535x 10535x 10535x 10535x 10535x 10535x 10535x 10535x 10535x 10x 10x 10x  
'use strict';
 
const {
  Symbol,
} = primordials;
 
const { setImmediate } = require('timers');
const assert = require('internal/assert');
const { Socket } = require('net');
const { JSStream } = internalBinding('js_stream');
const uv = internalBinding('uv');
let debug = require('internal/util/debuglog').debuglog(
  'stream_socket',
  (fn) => {
    debug = fn;
  }
);
const { owner_symbol } = require('internal/async_hooks').symbols;
const { ERR_STREAM_WRAP } = require('internal/errors').codes;
 
const kCurrentWriteRequest = Symbol('kCurrentWriteRequest');
const kCurrentShutdownRequest = Symbol('kCurrentShutdownRequest');
const kPendingShutdownRequest = Symbol('kPendingShutdownRequest');
 
function isClosing() { return this[owner_symbol].isClosing(); }
 
function onreadstart() { return this[owner_symbol].readStart(); }
 
function onreadstop() { return this[owner_symbol].readStop(); }
 
function onshutdown(req) { return this[owner_symbol].doShutdown(req); }
 
function onwrite(req, bufs) { return this[owner_symbol].doWrite(req, bufs); }
 
/* This class serves as a wrapper for when the C++ side of Node wants access
 * to a standard JS stream. For example, TLS or HTTP do not operate on network
 * resources conceptually, although that is the common case and what we are
 * optimizing for; in theory, they are completely composable and can work with
 * any stream resource they see.
 *
 * For the common case, i.e. a TLS socket wrapping around a net.Socket, we
 * can skip going through the JS layer and let TLS access the raw C++ handle
 * of a net.Socket. The flipside of this is that, to maintain composability,
 * we need a way to create "fake" net.Socket instances that call back into a
 * "real" JavaScript stream. JSStreamSocket is exactly this.
 */
class JSStreamSocket extends Socket {
  constructor(stream) {
    const handle = new JSStream();
    handle.close = (cb) => {
      debug('close');
      this.doClose(cb);
    };
    // Inside of the following functions, `this` refers to the handle
    // and `this[owner_symbol]` refers to this JSStreamSocket instance.
    handle.isClosing = isClosing;
    handle.onreadstart = onreadstart;
    handle.onreadstop = onreadstop;
    handle.onshutdown = onshutdown;
    handle.onwrite = onwrite;
 
    stream.pause();
    stream.on('error', (err) => this.emit('error', err));
    const ondata = (chunk) => {
      if (typeof chunk === 'string' ||
          stream.readableObjectMode === true) {
        // Make sure that no further `data` events will happen.
        stream.pause();
        stream.removeListener('data', ondata);
 
        this.emit('error', new ERR_STREAM_WRAP());
        return;
      }
 
      debug('data', chunk.length);
      if (this._handle)
        this._handle.readBuffer(chunk);
    };
    stream.on('data', ondata);
    stream.once('end', () => {
      debug('end');
      if (this._handle)
        this._handle.emitEOF();
    });
    // Some `Stream` don't pass `hasError` parameters when closed.
    stream.once('close', () => {
      // Errors emitted from `stream` have also been emitted to this instance
      // so that we don't pass errors to `destroy()` again.
      this.destroy();
    });
 
    super({ handle, manualStart: true });
    this.stream = stream;
    this[kCurrentWriteRequest] = null;
    this[kCurrentShutdownRequest] = null;
    this[kPendingShutdownRequest] = null;
    this.readable = stream.readable;
    this.writable = stream.writable;
 
    // Start reading.
    this.read(0);
  }
 
  // Allow legacy requires in the test suite to keep working:
  //   const { StreamWrap } = require('internal/js_stream_socket')
  static get StreamWrap() {
    return JSStreamSocket;
  }
 
  isClosing() {
    return !this.readable || !this.writable;
  }
 
  readStart() {
    this.stream.resume();
    return 0;
  }
 
  readStop() {
    this.stream.pause();
    return 0;
  }
 
  doShutdown(req) {
    // TODO(addaleax): It might be nice if we could get into a state where
    // DoShutdown() is not called on streams while a write is still pending.
    //
    // Currently, the only part of the code base where that happens is the
    // TLS implementation, which calls both DoWrite() and DoShutdown() on the
    // underlying network stream inside of its own DoShutdown() method.
    // Working around that on the native side is not quite trivial (yet?),
    // so for now that is supported here.
 
    if (this[kCurrentWriteRequest] !== null) {
      this[kPendingShutdownRequest] = req;
      return 0;
    }
    assert(this[kCurrentWriteRequest] === null);
    assert(this[kCurrentShutdownRequest] === null);
    this[kCurrentShutdownRequest] = req;
 
    const handle = this._handle;
 
    setImmediate(() => {
      // Ensure that write is dispatched asynchronously.
      this.stream.end(() => {
        this.finishShutdown(handle, 0);
      });
    });
    return 0;
  }
 
  // handle === this._handle except when called from doClose().
  finishShutdown(handle, errCode) {
    // The shutdown request might already have been cancelled.
    if (this[kCurrentShutdownRequest] === null)
      return;
    const req = this[kCurrentShutdownRequest];
    this[kCurrentShutdownRequest] = null;
    handle.finishShutdown(req, errCode);
  }
 
  doWrite(req, bufs) {
    assert(this[kCurrentWriteRequest] === null);
    assert(this[kCurrentShutdownRequest] === null);
 
    const handle = this._handle;
    const self = this;
 
    let pending = bufs.length;
 
    this.stream.cork();
    // Use `var` over `let` for performance optimization.
    for (var i = 0; i < bufs.length; ++i)
      this.stream.write(bufs[i], done);
    this.stream.uncork();
 
    // Only set the request here, because the `write()` calls could throw.
    this[kCurrentWriteRequest] = req;
 
    function done(err) {
      if (!err && --pending !== 0)
        return;
 
      // Ensure that this is called once in case of error
      pending = 0;
 
      let errCode = 0;
      if (err) {
        errCode = uv[`UV_${err.code}`] || uv.UV_EPIPE;
      }
 
      // Ensure that write was dispatched
      setImmediate(() => {
        self.finishWrite(handle, errCode);
      });
    }
 
    return 0;
  }
 
  // handle === this._handle except when called from doClose().
  finishWrite(handle, errCode) {
    // The write request might already have been cancelled.
    if (this[kCurrentWriteRequest] === null)
      return;
    const req = this[kCurrentWriteRequest];
    this[kCurrentWriteRequest] = null;
 
    handle.finishWrite(req, errCode);
    if (this[kPendingShutdownRequest]) {
      const req = this[kPendingShutdownRequest];
      this[kPendingShutdownRequest] = null;
      this.doShutdown(req);
    }
  }
 
  doClose(cb) {
    const handle = this._handle;
 
    // When sockets of the "net" module destroyed, they will call
    // `this._handle.close()` which will also emit EOF if not emitted before.
    // This feature makes sockets on the other side emit "end" and "close"
    // even though we haven't called `end()`. As `stream` are likely to be
    // instances of `net.Socket`, calling `stream.destroy()` manually will
    // avoid issues that don't properly close wrapped connections.
    this.stream.destroy();
 
    setImmediate(() => {
      // Should be already set by net.js
      assert(this._handle === null);
 
      this.finishWrite(handle, uv.UV_ECANCELED);
      this.finishShutdown(handle, uv.UV_ECANCELED);
 
      cb();
    });
  }
}
 
module.exports = JSStreamSocket;