All files / lib/internal/streams compose.js

98.98% Statements 194/196
98.11% Branches 52/53
100% Functions 7/7
98.98% Lines 194/196

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 19722x 22x 22x 22x 22x 22x 22x 22x 22x 22x 22x 22x 22x 22x 22x 22x 22x 22x 22x 22x 22x 14x 14x 14x 14x 14x 7x 7x 7x 7x 14x 14x 7x 7x 7x 7x 7x 14x 22x 22x 22x 22x 1x 1x 21x 22x 5x 5x 16x 16x 16x 22x 1x 1x 16x 22x 6x 6x 6x 16x 22x 41x 14x 14x 14x 41x 1x 1x 1x 1x 1x 1x 41x 1x 1x 1x 1x 1x 1x 41x 14x 14x 14x 14x 14x 14x 14x 14x 14x 14x 14x 14x 5x 14x 3x 9x 4x 4x 14x 14x 14x 14x 14x 14x 14x 14x 14x 14x 14x 14x 14x 22x 22x 22x 22x 22x 22x 22x 7x 7x 5x 7x 2x 2x 7x 7x 7x 6x 6x 7x 7x 7x 1x 1x 1x 1x 1x 7x 7x 7x 5x 5x 5x 5x 5x 7x 7x 14x 22x 7x 10x 8x 8x 8x 8x 7x 7x 7x 5x 7x 7x 7x 19x 24x 24x 24x 19x 19x 19x 5x 24x     24x 7x 7x 14x 14x 14x 5x 5x 14x 14x 14x 14x 14x 14x 9x 14x 5x 5x 5x 14x 14x 14x 22x  
'use strict';
 
const pipeline = require('internal/streams/pipeline');
const Duplex = require('internal/streams/duplex');
const { destroyer } = require('internal/streams/destroy');
const {
  isNodeStream,
  isReadable,
  isWritable,
} = require('internal/streams/utils');
const {
  AbortError,
  codes: {
    ERR_INVALID_ARG_VALUE,
    ERR_MISSING_ARGS,
  },
} = require('internal/errors');
 
// This is needed for pre node 17.
class ComposeDuplex extends Duplex {
  constructor(options) {
    super(options);
 
    // https://github.com/nodejs/node/pull/34385
 
    if (options?.readable === false) {
      this._readableState.readable = false;
      this._readableState.ended = true;
      this._readableState.endEmitted = true;
    }
 
    if (options?.writable === false) {
      this._writableState.writable = false;
      this._writableState.ending = true;
      this._writableState.ended = true;
      this._writableState.finished = true;
    }
  }
}
 
module.exports = function compose(...streams) {
  if (streams.length === 0) {
    throw new ERR_MISSING_ARGS('streams');
  }
 
  if (streams.length === 1) {
    return Duplex.from(streams[0]);
  }
 
  const orgStreams = [...streams];
 
  if (typeof streams[0] === 'function') {
    streams[0] = Duplex.from(streams[0]);
  }
 
  if (typeof streams[streams.length - 1] === 'function') {
    const idx = streams.length - 1;
    streams[idx] = Duplex.from(streams[idx]);
  }
 
  for (let n = 0; n < streams.length; ++n) {
    if (!isNodeStream(streams[n])) {
      // TODO(ronag): Add checks for non streams.
      continue;
    }
    if (n < streams.length - 1 && !isReadable(streams[n])) {
      throw new ERR_INVALID_ARG_VALUE(
        `streams[${n}]`,
        orgStreams[n],
        'must be readable'
      );
    }
    if (n > 0 && !isWritable(streams[n])) {
      throw new ERR_INVALID_ARG_VALUE(
        `streams[${n}]`,
        orgStreams[n],
        'must be writable'
      );
    }
  }
 
  let ondrain;
  let onfinish;
  let onreadable;
  let onclose;
  let d;
 
  function onfinished(err) {
    const cb = onclose;
    onclose = null;
 
    if (cb) {
      cb(err);
    } else if (err) {
      d.destroy(err);
    } else if (!readable && !writable) {
      d.destroy();
    }
  }
 
  const head = streams[0];
  const tail = pipeline(streams, onfinished);
 
  const writable = !!isWritable(head);
  const readable = !!isReadable(tail);
 
  // TODO(ronag): Avoid double buffering.
  // Implement Writable/Readable/Duplex traits.
  // See, https://github.com/nodejs/node/pull/33515.
  d = new ComposeDuplex({
    // TODO (ronag): highWaterMark?
    writableObjectMode: !!head?.writableObjectMode,
    readableObjectMode: !!tail?.writableObjectMode,
    writable,
    readable,
  });
 
  if (writable) {
    d._write = function(chunk, encoding, callback) {
      if (head.write(chunk, encoding)) {
        callback();
      } else {
        ondrain = callback;
      }
    };
 
    d._final = function(callback) {
      head.end();
      onfinish = callback;
    };
 
    head.on('drain', function() {
      if (ondrain) {
        const cb = ondrain;
        ondrain = null;
        cb();
      }
    });
 
    tail.on('finish', function() {
      if (onfinish) {
        const cb = onfinish;
        onfinish = null;
        cb();
      }
    });
  }
 
  if (readable) {
    tail.on('readable', function() {
      if (onreadable) {
        const cb = onreadable;
        onreadable = null;
        cb();
      }
    });
 
    tail.on('end', function() {
      d.push(null);
    });
 
    d._read = function() {
      while (true) {
        const buf = tail.read();
 
        if (buf === null) {
          onreadable = d._read;
          return;
        }
 
        if (!d.push(buf)) {
          return;
        }
      }
    };
  }
 
  d._destroy = function(err, callback) {
    if (!err && onclose !== null) {
      err = new AbortError();
    }
 
    onreadable = null;
    ondrain = null;
    onfinish = null;
 
    if (onclose === null) {
      callback(err);
    } else {
      onclose = callback;
      destroyer(tail, err);
    }
  };
 
  return d;
};