All files / lib/internal/streams from.js

100% Statements 113/113
97.91% Branches 47/48
100% Functions 6/6
100% Lines 113/113

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 11421x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 389x 389x 389x 4x 4x 4x 4x 4x 4x 4x 4x 4x 385x 385x 389x 165x 165x 389x 219x 219x 220x 1x 1x 384x 384x 384x 384x 384x 384x 384x 384x 384x 384x 384x 384x 384x 1781x 1776x 1776x 1776x 384x 384x 384x 321x 321x 321x 321x 321x 384x 384x 384x 321x 321x 321x 63x 3x 3x 3x 3x 63x 314x 101x 100x 100x 321x 384x 384x 1776x 1796x 1796x 1796x 1796x 1796x 1796x 223x 1795x 1552x 1552x 1552x 1552x 1552x 1x 1x 1552x 20x 1550x 1530x 1530x 1552x 1796x 23x 23x 1776x 1776x 1776x 384x 389x 21x 21x  
'use strict';
 
const {
  PromisePrototypeThen,
  SymbolAsyncIterator,
  SymbolIterator,
} = primordials;
const { Buffer } = require('buffer');
 
const {
  ERR_INVALID_ARG_TYPE,
  ERR_STREAM_NULL_VALUES
} = require('internal/errors').codes;
 
function from(Readable, iterable, opts) {
  let iterator;
  if (typeof iterable === 'string' || iterable instanceof Buffer) {
    return new Readable({
      objectMode: true,
      ...opts,
      read() {
        this.push(iterable);
        this.push(null);
      }
    });
  }
 
  let isAsync;
  if (iterable && iterable[SymbolAsyncIterator]) {
    isAsync = true;
    iterator = iterable[SymbolAsyncIterator]();
  } else if (iterable && iterable[SymbolIterator]) {
    isAsync = false;
    iterator = iterable[SymbolIterator]();
  } else {
    throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable);
  }
 
  const readable = new Readable({
    objectMode: true,
    highWaterMark: 1,
    // TODO(ronag): What options should be allowed?
    ...opts,
  });
 
  // Flag to protect against _read
  // being called before last iteration completion.
  let reading = false;
 
  readable._read = function() {
    if (!reading) {
      reading = true;
      next();
    }
  };
 
  readable._destroy = function(error, cb) {
    PromisePrototypeThen(
      close(error),
      () => process.nextTick(cb, error), // nextTick is here in case cb throws
      (e) => process.nextTick(cb, e || error),
    );
  };
 
  async function close(error) {
    const hadError = (error !== undefined) && (error !== null);
    const hasThrow = typeof iterator.throw === 'function';
    if (hadError && hasThrow) {
      const { value, done } = await iterator.throw(error);
      await value;
      if (done) {
        return;
      }
    }
    if (typeof iterator.return === 'function') {
      const { value } = await iterator.return();
      await value;
    }
  }
 
  async function next() {
    for (;;) {
      try {
        const { value, done } = isAsync ?
          await iterator.next() :
          iterator.next();
 
        if (done) {
          readable.push(null);
        } else {
          const res = (value &&
            typeof value.then === 'function') ?
            await value :
            value;
          if (res === null) {
            reading = false;
            throw new ERR_STREAM_NULL_VALUES();
          } else if (readable.push(res)) {
            continue;
          } else {
            reading = false;
          }
        }
      } catch (err) {
        readable.destroy(err);
      }
      break;
    }
  }
  return readable;
}
 
module.exports = from;