All files / lib/internal/streams from.js

100% Statements 113/113
97.87% Branches 46/47
100% Functions 6/6
100% Lines 113/113

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 11422x 22x 22x 22x 22x 22x 22x 22x 22x 22x 22x 22x 22x 22x 108x 108x 108x 4x 4x 4x 4x 4x 4x 4x 4x 4x 104x 104x 108x 60x 60x 108x 43x 43x 44x 1x 1x 108x 108x 108x 108x 108x 108x 108x 108x 108x 108x 108x 108x 108x 245x 240x 240x 240x 108x 108x 108x 84x 84x 84x 84x 84x 108x 108x 108x 84x 84x 84x 24x 3x 3x 3x 3x 24x 77x 39x 38x 38x 84x 108x 108x 240x 260x 260x 260x 260x 260x 260x 55x 259x 188x 188x 188x 188x 188x 1x 1x 188x 20x 186x 166x 166x 188x 260x 19x 19x 240x 240x 240x 108x 108x 22x 22x  
'use strict';
 
const {
  PromisePrototypeThen,
  SymbolAsyncIterator,
  SymbolIterator,
} = primordials;
const { Buffer } = require('buffer');
 
const {
  ERR_INVALID_ARG_TYPE,
  ERR_STREAM_NULL_VALUES
} = require('internal/errors').codes;
 
function from(Readable, iterable, opts) {
  let iterator;
  if (typeof iterable === 'string' || iterable instanceof Buffer) {
    return new Readable({
      objectMode: true,
      ...opts,
      read() {
        this.push(iterable);
        this.push(null);
      }
    });
  }
 
  let isAsync;
  if (iterable && iterable[SymbolAsyncIterator]) {
    isAsync = true;
    iterator = iterable[SymbolAsyncIterator]();
  } else if (iterable && iterable[SymbolIterator]) {
    isAsync = false;
    iterator = iterable[SymbolIterator]();
  } else {
    throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable);
  }
 
  const readable = new Readable({
    objectMode: true,
    highWaterMark: 1,
    // TODO(ronag): What options should be allowed?
    ...opts,
  });
 
  // Flag to protect against _read
  // being called before last iteration completion.
  let reading = false;
 
  readable._read = function() {
    if (!reading) {
      reading = true;
      next();
    }
  };
 
  readable._destroy = function(error, cb) {
    PromisePrototypeThen(
      close(error),
      () => process.nextTick(cb, error), // nextTick is here in case cb throws
      (e) => process.nextTick(cb, e || error),
    );
  };
 
  async function close(error) {
    const hadError = (error !== undefined) && (error !== null);
    const hasThrow = typeof iterator.throw === 'function';
    if (hadError && hasThrow) {
      const { value, done } = await iterator.throw(error);
      await value;
      if (done) {
        return;
      }
    }
    if (typeof iterator.return === 'function') {
      const { value } = await iterator.return();
      await value;
    }
  }
 
  async function next() {
    for (;;) {
      try {
        const { value, done } = isAsync ?
          await iterator.next() :
          iterator.next();
 
        if (done) {
          readable.push(null);
        } else {
          const res = (value &&
            typeof value.then === 'function') ?
            await value :
            value;
          if (res === null) {
            reading = false;
            throw new ERR_STREAM_NULL_VALUES();
          } else if (readable.push(res)) {
            continue;
          } else {
            reading = false;
          }
        }
      } catch (err) {
        readable.destroy(err);
      }
      break;
    }
  }
  return readable;
}
 
module.exports = from;