Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 | 17x 17x 17x 17x 17x 17x 17x 17x 17x 17x 17x 17x 17x 17x 329x 329x 329x 4x 4x 4x 4x 4x 4x 4x 4x 4x 325x 325x 329x 178x 178x 329x 146x 146x 147x 1x 1x 329x 329x 329x 329x 329x 329x 329x 329x 329x 329x 329x 329x 329x 1610x 1605x 1605x 1605x 329x 329x 329x 290x 290x 290x 290x 290x 329x 329x 329x 290x 290x 290x 64x 3x 3x 3x 3x 64x 283x 114x 113x 113x 290x 329x 329x 1605x 1625x 1625x 1625x 1625x 1625x 1625x 203x 1624x 1390x 1390x 1390x 1390x 1390x 1x 1x 1390x 20x 1388x 1368x 1368x 1390x 1625x 34x 34x 1605x 1605x 1605x 329x 329x 17x 17x | 'use strict';
const {
PromisePrototypeThen,
SymbolAsyncIterator,
SymbolIterator,
} = primordials;
const { Buffer } = require('buffer');
const {
ERR_INVALID_ARG_TYPE,
ERR_STREAM_NULL_VALUES
} = require('internal/errors').codes;
function from(Readable, iterable, opts) {
let iterator;
if (typeof iterable === 'string' || iterable instanceof Buffer) {
return new Readable({
objectMode: true,
...opts,
read() {
this.push(iterable);
this.push(null);
}
});
}
let isAsync;
if (iterable && iterable[SymbolAsyncIterator]) {
isAsync = true;
iterator = iterable[SymbolAsyncIterator]();
} else if (iterable && iterable[SymbolIterator]) {
isAsync = false;
iterator = iterable[SymbolIterator]();
} else {
throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable);
}
const readable = new Readable({
objectMode: true,
highWaterMark: 1,
// TODO(ronag): What options should be allowed?
...opts,
});
// Flag to protect against _read
// being called before last iteration completion.
let reading = false;
readable._read = function() {
if (!reading) {
reading = true;
next();
}
};
readable._destroy = function(error, cb) {
PromisePrototypeThen(
close(error),
() => process.nextTick(cb, error), // nextTick is here in case cb throws
(e) => process.nextTick(cb, e || error),
);
};
async function close(error) {
const hadError = (error !== undefined) && (error !== null);
const hasThrow = typeof iterator.throw === 'function';
if (hadError && hasThrow) {
const { value, done } = await iterator.throw(error);
await value;
if (done) {
return;
}
}
if (typeof iterator.return === 'function') {
const { value } = await iterator.return();
await value;
}
}
async function next() {
for (;;) {
try {
const { value, done } = isAsync ?
await iterator.next() :
iterator.next();
if (done) {
readable.push(null);
} else {
const res = (value &&
typeof value.then === 'function') ?
await value :
value;
if (res === null) {
reading = false;
throw new ERR_STREAM_NULL_VALUES();
} else if (readable.push(res)) {
continue;
} else {
reading = false;
}
}
} catch (err) {
readable.destroy(err);
}
break;
}
}
return readable;
}
module.exports = from;
|