All files / lib/internal/streams compose.js

98.85% Statements 172/174
97.61% Branches 41/42
100% Functions 6/6
98.85% Lines 172/174

Press n or j to go to the next uncovered block, b, p or k for the previous block.

x 22x 22x 22x 22x 22x 22x 22x 22x 22x 22x 22x 22x 22x 22x 22x 22x 22x 22x 22x 1x 1x 22x 22x 5x 5x 22x 22x 22x 22x 1x 1x 16x 22x 6x 6x 6x 22x 22x 41x 14x 14x 14x 41x 1x 1x 1x 1x 1x 1x 41x 1x 1x 1x 1x 1x 1x 41x 22x 22x 22x 22x 22x 22x 22x 22x 14x 14x 14x 14x 5x 14x 3x 9x 4x 4x 14x 22x 22x 22x 22x 22x 22x 22x 22x 22x 22x 22x 22x 22x 22x 22x 22x 22x 22x 22x 7x 7x 5x 7x 2x 2x 7x 7x 7x 6x 6x 7x 7x 7x 1x 1x 1x 1x 1x 7x 7x 7x 5x 5x 5x 5x 5x 7x 7x 22x 22x 7x 10x 8x 8x 8x 8x 7x 7x 7x 5x 7x 7x 7x 19x 24x 24x 24x 19x 19x 19x 24x 24x     24x 7x 7x 22x 22x 14x 5x 5x 14x 14x 14x 14x 14x 14x 9x 14x 5x 5x 5x 22x 22x 22x 22x  
'use strict';
 
const { pipeline } = require('internal/streams/pipeline');
const Duplex = require('internal/streams/duplex');
const { destroyer } = require('internal/streams/destroy');
const {
  isNodeStream,
  isReadable,
  isWritable,
} = require('internal/streams/utils');
const {
  AbortError,
  codes: {
    ERR_INVALID_ARG_VALUE,
    ERR_MISSING_ARGS,
  },
} = require('internal/errors');
 
module.exports = function compose(...streams) {
  if (streams.length === 0) {
    throw new ERR_MISSING_ARGS('streams');
  }
 
  if (streams.length === 1) {
    return Duplex.from(streams[0]);
  }
 
  const orgStreams = [...streams];
 
  if (typeof streams[0] === 'function') {
    streams[0] = Duplex.from(streams[0]);
  }
 
  if (typeof streams[streams.length - 1] === 'function') {
    const idx = streams.length - 1;
    streams[idx] = Duplex.from(streams[idx]);
  }
 
  for (let n = 0; n < streams.length; ++n) {
    if (!isNodeStream(streams[n])) {
      // TODO(ronag): Add checks for non streams.
      continue;
    }
    if (n < streams.length - 1 && !isReadable(streams[n])) {
      throw new ERR_INVALID_ARG_VALUE(
        `streams[${n}]`,
        orgStreams[n],
        'must be readable'
      );
    }
    if (n > 0 && !isWritable(streams[n])) {
      throw new ERR_INVALID_ARG_VALUE(
        `streams[${n}]`,
        orgStreams[n],
        'must be writable'
      );
    }
  }
 
  let ondrain;
  let onfinish;
  let onreadable;
  let onclose;
  let d;
 
  function onfinished(err) {
    const cb = onclose;
    onclose = null;
 
    if (cb) {
      cb(err);
    } else if (err) {
      d.destroy(err);
    } else if (!readable && !writable) {
      d.destroy();
    }
  }
 
  const head = streams[0];
  const tail = pipeline(streams, onfinished);
 
  const writable = !!isWritable(head);
  const readable = !!isReadable(tail);
 
  // TODO(ronag): Avoid double buffering.
  // Implement Writable/Readable/Duplex traits.
  // See, https://github.com/nodejs/node/pull/33515.
  d = new Duplex({
    // TODO (ronag): highWaterMark?
    writableObjectMode: !!head?.writableObjectMode,
    readableObjectMode: !!tail?.writableObjectMode,
    writable,
    readable,
  });
 
  if (writable) {
    d._write = function(chunk, encoding, callback) {
      if (head.write(chunk, encoding)) {
        callback();
      } else {
        ondrain = callback;
      }
    };
 
    d._final = function(callback) {
      head.end();
      onfinish = callback;
    };
 
    head.on('drain', function() {
      if (ondrain) {
        const cb = ondrain;
        ondrain = null;
        cb();
      }
    });
 
    tail.on('finish', function() {
      if (onfinish) {
        const cb = onfinish;
        onfinish = null;
        cb();
      }
    });
  }
 
  if (readable) {
    tail.on('readable', function() {
      if (onreadable) {
        const cb = onreadable;
        onreadable = null;
        cb();
      }
    });
 
    tail.on('end', function() {
      d.push(null);
    });
 
    d._read = function() {
      while (true) {
        const buf = tail.read();
 
        if (buf === null) {
          onreadable = d._read;
          return;
        }
 
        if (!d.push(buf)) {
          return;
        }
      }
    };
  }
 
  d._destroy = function(err, callback) {
    if (!err && onclose !== null) {
      err = new AbortError();
    }
 
    onreadable = null;
    ondrain = null;
    onfinish = null;
 
    if (onclose === null) {
      callback(err);
    } else {
      onclose = callback;
      destroyer(tail, err);
    }
  };
 
  return d;
};