All files / lib/internal/streams from.js

100% Statements 113/113
97.87% Branches 46/47
100% Functions 6/6
100% Lines 113/113

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 11419x 19x 19x 19x 19x 19x 19x 19x 19x 19x 19x 19x 19x 19x 39x 39x 39x 2x 2x 2x 2x 2x 2x 2x 2x 2x 37x 37x 39x 21x 21x 39x 15x 15x 16x 1x 1x 36x 36x 36x 36x 36x 36x 36x 36x 36x 36x 36x 36x 36x 68x 63x 63x 63x 36x 36x 36x 30x 30x 30x 30x 30x 36x 36x 36x 30x 30x 30x 10x 3x 3x 3x 3x 10x 23x 16x 15x 15x 30x 36x 36x 63x 83x 83x 83x 83x 83x 83x 18x 82x 62x 62x 62x 62x 62x 1x 1x 62x 20x 60x 40x 40x 62x 83x 5x 5x 63x 63x 63x 36x 39x 19x 19x  
'use strict';
 
const {
  PromisePrototypeThen,
  SymbolAsyncIterator,
  SymbolIterator,
} = primordials;
const { Buffer } = require('buffer');
 
const {
  ERR_INVALID_ARG_TYPE,
  ERR_STREAM_NULL_VALUES
} = require('internal/errors').codes;
 
function from(Readable, iterable, opts) {
  let iterator;
  if (typeof iterable === 'string' || iterable instanceof Buffer) {
    return new Readable({
      objectMode: true,
      ...opts,
      read() {
        this.push(iterable);
        this.push(null);
      }
    });
  }
 
  let isAsync = false;
  if (iterable && iterable[SymbolAsyncIterator]) {
    isAsync = true;
    iterator = iterable[SymbolAsyncIterator]();
  } else if (iterable && iterable[SymbolIterator]) {
    isAsync = false;
    iterator = iterable[SymbolIterator]();
  } else {
    throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable);
  }
 
  const readable = new Readable({
    objectMode: true,
    highWaterMark: 1,
    // TODO(ronag): What options should be allowed?
    ...opts,
  });
 
  // Flag to protect against _read
  // being called before last iteration completion.
  let reading = false;
 
  readable._read = function() {
    if (!reading) {
      reading = true;
      next();
    }
  };
 
  readable._destroy = function(error, cb) {
    PromisePrototypeThen(
      close(error),
      () => process.nextTick(cb, error), // nextTick is here in case cb throws
      (e) => process.nextTick(cb, e || error),
    );
  };
 
  async function close(error) {
    const hadError = (error !== undefined) && (error !== null);
    const hasThrow = typeof iterator.throw === 'function';
    if (hadError && hasThrow) {
      const { value, done } = await iterator.throw(error);
      await value;
      if (done) {
        return;
      }
    }
    if (typeof iterator.return === 'function') {
      const { value } = await iterator.return();
      await value;
    }
  }
 
  async function next() {
    for (;;) {
      try {
        const { value, done } = isAsync ?
          await iterator.next() :
          iterator.next();
 
        if (done) {
          readable.push(null);
        } else {
          const res = (value &&
            typeof value.then === 'function') ?
            await value :
            value;
          if (res === null) {
            reading = false;
            throw new ERR_STREAM_NULL_VALUES();
          } else if (readable.push(res)) {
            continue;
          } else {
            reading = false;
          }
        }
      } catch (err) {
        readable.destroy(err);
      }
      break;
    }
  }
  return readable;
}
 
module.exports = from;