All files / lib/internal/streams operators.js

93.37% Statements 155/166
91.48% Branches 43/47
83.33% Functions 5/6
93.37% Lines 155/166

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 16719x 19x 19x 19x 19x 19x 19x 19x 19x 19x 19x 19x 19x 19x 19x 19x 19x 19x 19x 19x 19x 19x 18x 18x 1x 1x 1x 18x 18x 2x 2x 18x 18x 18x 6x 6x 14x 14x 14x 14x 14x 14x 14x 14x 14x 14x 18x 18x 18x 18x 18x 18x 18x     18x 18x 12x 12x 50x 2x 2x 48x 50x     50x 50x 50x 50x     48x 50x     50x 50x 38x 38x 49x 49x 50x 44x 44x 44x 48x 50x 44x 44x 44x 44x 50x 10x 12x       12x 12x 12x 8x 8x 8x 12x 12x 12x 18x 18x 18x 18x 18x 64x 56x 56x 56x 10x 10x 56x 56x 2x 2x 56x 56x 36x 36x 56x 56x 56x 42x 42x 42x 56x 52x 52x 52x 52x 52x 18x 12x 12x 12x 12x 2x 2x 2x 12x 18x 19x 9x 9x 1x 1x 1x 7x 19x 9x 9x 19x 19x 7x 9x 19x 19x 19x 19x  
'use strict';
 
const { AbortController } = require('internal/abort_controller');
const {
  codes: {
    ERR_INVALID_ARG_TYPE,
  },
  AbortError,
} = require('internal/errors');
const { validateInteger } = require('internal/validators');
 
const {
  MathFloor,
  Promise,
  PromiseReject,
  PromisePrototypeCatch,
  Symbol,
} = primordials;
 
const kEmpty = Symbol('kEmpty');
const kEof = Symbol('kEof');
 
async function * map(fn, options) {
  if (typeof fn !== 'function') {
    throw new ERR_INVALID_ARG_TYPE(
      'fn', ['Function', 'AsyncFunction'], this);
  }
 
  if (options != null && typeof options !== 'object') {
    throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
  }
 
  let concurrency = 1;
  if (options?.concurrency != null) {
    concurrency = MathFloor(options.concurrency);
  }
 
  validateInteger(concurrency, 'concurrency', 1);
 
  const ac = new AbortController();
  const stream = this;
  const queue = [];
  const signal = ac.signal;
  const signalOpt = { signal };
 
  const abort = () => ac.abort();
  options?.signal?.addEventListener('abort', abort);
 
  let next;
  let resume;
  let done = false;
 
  function onDone() {
    done = true;
  }
 
  async function pump() {
    try {
      for await (let val of stream) {
        if (done) {
          return;
        }
 
        if (signal.aborted) {
          throw new AbortError();
        }
 
        try {
          val = fn(val, signalOpt);
        } catch (err) {
          val = PromiseReject(err);
        }
 
        if (val === kEmpty) {
          continue;
        }
 
        if (typeof val?.catch === 'function') {
          val.catch(onDone);
        }
 
        queue.push(val);
        if (next) {
          next();
          next = null;
        }
 
        if (!done && queue.length && queue.length >= concurrency) {
          await new Promise((resolve) => {
            resume = resolve;
          });
        }
      }
      queue.push(kEof);
    } catch (err) {
      const val = PromiseReject(err);
      PromisePrototypeCatch(val, onDone);
      queue.push(val);
    } finally {
      done = true;
      if (next) {
        next();
        next = null;
      }
      options?.signal?.removeEventListener('abort', abort);
    }
  }
 
  pump();
 
  try {
    while (true) {
      while (queue.length > 0) {
        const val = await queue[0];
 
        if (val === kEof) {
          return;
        }
 
        if (signal.aborted) {
          throw new AbortError();
        }
 
        if (val !== kEmpty) {
          yield val;
        }
 
        queue.shift();
        if (resume) {
          resume();
          resume = null;
        }
      }
 
      await new Promise((resolve) => {
        next = resolve;
      });
    }
  } finally {
    ac.abort();
 
    done = true;
    if (resume) {
      resume();
      resume = null;
    }
  }
}
 
async function * filter(fn, options) {
  if (typeof fn !== 'function') {
    throw (new ERR_INVALID_ARG_TYPE(
      'fn', ['Function', 'AsyncFunction'], this));
  }
  async function filterFn(value, options) {
    if (await fn(value, options)) {
      return value;
    }
    return kEmpty;
  }
  yield* this.map(filterFn, options);
}
module.exports = {
  map,
  filter
};