All files / lib/internal/streams compose.js

98.85% Statements 172/174
98% Branches 49/50
100% Functions 6/6
98.85% Lines 172/174

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 17530x 30x 30x 30x 30x 30x 30x 30x 30x 30x 30x 30x 30x 30x 30x 30x 30x 30x 30x 22x 1x 1x 21x 22x 5x 5x 16x 16x 16x 22x 1x 1x 16x 22x 6x 6x 6x 16x 22x 41x 14x 14x 14x 41x 1x 1x 1x 1x 1x 1x 41x 1x 1x 1x 1x 1x 1x 41x 14x 14x 14x 14x 14x 14x 14x 14x 14x 14x 14x 14x 5x 14x 3x 9x 4x 4x 14x 14x 14x 14x 14x 14x 14x 14x 14x 14x 14x 14x 14x 22x 22x 22x 22x 22x 22x 22x 7x 7x 5x 7x 2x 2x 7x 7x 7x 6x 6x 7x 7x 7x 1x 1x 1x 1x 1x 7x 7x 7x 5x 5x 5x 5x 5x 7x 7x 14x 22x 7x 10x 8x 8x 8x 8x 7x 7x 7x 5x 7x 7x 7x 19x 24x 24x 24x 19x 19x 19x 5x 24x     24x 7x 7x 14x 14x 14x 5x 5x 14x 14x 14x 14x 14x 14x 9x 14x 5x 5x 5x 14x 14x 14x 30x  
'use strict';
 
const { pipeline } = require('internal/streams/pipeline');
const Duplex = require('internal/streams/duplex');
const { destroyer } = require('internal/streams/destroy');
const {
  isNodeStream,
  isReadable,
  isWritable,
} = require('internal/streams/utils');
const {
  AbortError,
  codes: {
    ERR_INVALID_ARG_VALUE,
    ERR_MISSING_ARGS,
  },
} = require('internal/errors');
 
module.exports = function compose(...streams) {
  if (streams.length === 0) {
    throw new ERR_MISSING_ARGS('streams');
  }
 
  if (streams.length === 1) {
    return Duplex.from(streams[0]);
  }
 
  const orgStreams = [...streams];
 
  if (typeof streams[0] === 'function') {
    streams[0] = Duplex.from(streams[0]);
  }
 
  if (typeof streams[streams.length - 1] === 'function') {
    const idx = streams.length - 1;
    streams[idx] = Duplex.from(streams[idx]);
  }
 
  for (let n = 0; n < streams.length; ++n) {
    if (!isNodeStream(streams[n])) {
      // TODO(ronag): Add checks for non streams.
      continue;
    }
    if (n < streams.length - 1 && !isReadable(streams[n])) {
      throw new ERR_INVALID_ARG_VALUE(
        `streams[${n}]`,
        orgStreams[n],
        'must be readable'
      );
    }
    if (n > 0 && !isWritable(streams[n])) {
      throw new ERR_INVALID_ARG_VALUE(
        `streams[${n}]`,
        orgStreams[n],
        'must be writable'
      );
    }
  }
 
  let ondrain;
  let onfinish;
  let onreadable;
  let onclose;
  let d;
 
  function onfinished(err) {
    const cb = onclose;
    onclose = null;
 
    if (cb) {
      cb(err);
    } else if (err) {
      d.destroy(err);
    } else if (!readable && !writable) {
      d.destroy();
    }
  }
 
  const head = streams[0];
  const tail = pipeline(streams, onfinished);
 
  const writable = !!isWritable(head);
  const readable = !!isReadable(tail);
 
  // TODO(ronag): Avoid double buffering.
  // Implement Writable/Readable/Duplex traits.
  // See, https://github.com/nodejs/node/pull/33515.
  d = new Duplex({
    // TODO (ronag): highWaterMark?
    writableObjectMode: !!head?.writableObjectMode,
    readableObjectMode: !!tail?.writableObjectMode,
    writable,
    readable,
  });
 
  if (writable) {
    d._write = function(chunk, encoding, callback) {
      if (head.write(chunk, encoding)) {
        callback();
      } else {
        ondrain = callback;
      }
    };
 
    d._final = function(callback) {
      head.end();
      onfinish = callback;
    };
 
    head.on('drain', function() {
      if (ondrain) {
        const cb = ondrain;
        ondrain = null;
        cb();
      }
    });
 
    tail.on('finish', function() {
      if (onfinish) {
        const cb = onfinish;
        onfinish = null;
        cb();
      }
    });
  }
 
  if (readable) {
    tail.on('readable', function() {
      if (onreadable) {
        const cb = onreadable;
        onreadable = null;
        cb();
      }
    });
 
    tail.on('end', function() {
      d.push(null);
    });
 
    d._read = function() {
      while (true) {
        const buf = tail.read();
 
        if (buf === null) {
          onreadable = d._read;
          return;
        }
 
        if (!d.push(buf)) {
          return;
        }
      }
    };
  }
 
  d._destroy = function(err, callback) {
    if (!err && onclose !== null) {
      err = new AbortError();
    }
 
    onreadable = null;
    ondrain = null;
    onfinish = null;
 
    if (onclose === null) {
      callback(err);
    } else {
      onclose = callback;
      destroyer(tail, err);
    }
  };
 
  return d;
};