All files / lib/internal/cluster round_robin_handle.js

97.05% Statements 132/136
93.54% Branches 29/31
100% Functions 7/7
97.05% Lines 132/136

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 13781x 81x 81x 81x 81x 81x 81x 81x 81x 81x 81x 81x 81x 81x 81x 81x 81x 39x 39x 39x 39x 39x 39x 39x 39x 39x 39x 39x 35x 35x 35x 35x 35x 35x 35x 39x 4x 4x 4x 4x 4x 39x 39x 35x 35x 35x 35x 39x 39x 81x 81x 78x 78x 78x 78x 74x 71x 71x 71x 71x 74x 3x 3x 74x 74x 78x 78x 78x 78x 39x 39x 39x 39x 4x 39x 81x 81x 81x 75x 75x 75x 75x 71x 71x 71x 71x 75x 33x 75x         33x 33x 33x 33x 81x 81x 81x 27x 27x 27x 27x 27x 26x 26x 26x 26x 81x 81x 81x 126x 2x 2x 124x 124x 124x 124x 98x 98x 98x 26x 26x 26x 26x 26x 26x 26x 26x 1x 26x 26x 26x 26x 81x  
'use strict';
 
const {
  ArrayIsArray,
  Boolean,
  ObjectCreate,
  SafeMap,
} = primordials;
 
const assert = require('internal/assert');
const net = require('net');
const { sendHelper } = require('internal/cluster/utils');
const { append, init, isEmpty, peek, remove } = require('internal/linkedlist');
const { constants } = internalBinding('tcp_wrap');
 
module.exports = RoundRobinHandle;
 
function RoundRobinHandle(key, address, { port, fd, flags, backlog, readableAll, writableAll }) {
  this.key = key;
  this.all = new SafeMap();
  this.free = new SafeMap();
  this.handles = init(ObjectCreate(null));
  this.handle = null;
  this.server = net.createServer(assert.fail);
 
  if (fd >= 0)
    this.server.listen({ fd, backlog });
  else if (port >= 0) {
    this.server.listen({
      port,
      host: address,
      // Currently, net module only supports `ipv6Only` option in `flags`.
      ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
      backlog,
    });
  } else
    this.server.listen({
      path: address,
      backlog,
      readableAll,
      writableAll,
    });  // UNIX socket path.
  this.server.once('listening', () => {
    this.handle = this.server._handle;
    this.handle.onconnection = (err, handle) => this.distribute(err, handle);
    this.server._handle = null;
    this.server = null;
  });
}
 
RoundRobinHandle.prototype.add = function(worker, send) {
  assert(this.all.has(worker.id) === false);
  this.all.set(worker.id, worker);
 
  const done = () => {
    if (this.handle.getsockname) {
      const out = {};
      this.handle.getsockname(out);
      // TODO(bnoordhuis) Check err.
      send(null, { sockname: out }, null);
    } else {
      send(null, null, null);  // UNIX socket.
    }
 
    this.handoff(worker);  // In case there are connections pending.
  };
 
  if (this.server === null)
    return done();
 
  // Still busy binding.
  this.server.once('listening', done);
  this.server.once('error', (err) => {
    send(err.errno, null);
  });
};
 
RoundRobinHandle.prototype.remove = function(worker) {
  const existed = this.all.delete(worker.id);
 
  if (!existed)
    return false;
 
  this.free.delete(worker.id);
 
  if (this.all.size !== 0)
    return false;
 
  while (!isEmpty(this.handles)) {
    const handle = peek(this.handles);
    handle.close();
    remove(handle);
  }
 
  this.handle.close();
  this.handle = null;
  return true;
};
 
RoundRobinHandle.prototype.distribute = function(err, handle) {
  append(this.handles, handle);
  // eslint-disable-next-line node-core/no-array-destructuring
  const [ workerEntry ] = this.free; // this.free is a SafeMap
 
  if (ArrayIsArray(workerEntry)) {
    const { 0: workerId, 1: worker } = workerEntry;
    this.free.delete(workerId);
    this.handoff(worker);
  }
};
 
RoundRobinHandle.prototype.handoff = function(worker) {
  if (!this.all.has(worker.id)) {
    return;  // Worker is closing (or has closed) the server.
  }
 
  const handle = peek(this.handles);
 
  if (handle === null) {
    this.free.set(worker.id, worker);  // Add to ready queue again.
    return;
  }
 
  remove(handle);
 
  const message = { act: 'newconn', key: this.key };
 
  sendHelper(worker.process, message, handle, (reply) => {
    if (reply.accepted)
      handle.close();
    else
      this.distribute(0, handle);  // Worker is shutting down. Send to another.
 
    this.handoff(worker);
  });
};