All files / lib/internal/cluster round_robin_handle.js

97.14% Statements 136/140
94.28% Branches 33/35
100% Functions 7/7
97.14% Lines 136/140

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 14183x 83x 83x 83x 83x 83x 83x 83x 83x 83x 83x 83x 83x 83x 83x 83x 83x 41x 41x 41x 41x 41x 41x 41x 41x 41x 41x 41x 36x 36x 36x 36x 36x 36x 36x 41x 5x 5x 5x 5x 5x 41x 41x 37x 37x 37x 37x 41x 41x 83x 83x 81x 81x 81x 81x 77x 72x 72x 72x 72x 77x 5x 5x 77x 77x 81x 81x 81x 81x 41x 41x 41x 41x 4x 41x 83x 83x 83x 78x 78x 78x 78x 74x 74x 74x 74x 78x 35x 78x         35x 35x 35x 35x 83x 83x 83x 51x 51x 1x 1x 50x 50x 50x 50x 50x 29x 29x 29x 29x 83x 83x 83x 155x 2x 2x 153x 153x 153x 153x 104x 104x 104x 49x 49x 49x 49x 49x 49x 49x 49x 14x 49x 49x 49x 49x 83x  
'use strict';
 
const {
  ArrayIsArray,
  Boolean,
  ObjectCreate,
  SafeMap,
} = primordials;
 
const assert = require('internal/assert');
const net = require('net');
const { sendHelper } = require('internal/cluster/utils');
const { append, init, isEmpty, peek, remove } = require('internal/linkedlist');
const { constants } = internalBinding('tcp_wrap');
 
module.exports = RoundRobinHandle;
 
function RoundRobinHandle(key, address, { port, fd, flags, backlog, readableAll, writableAll }) {
  this.key = key;
  this.all = new SafeMap();
  this.free = new SafeMap();
  this.handles = init(ObjectCreate(null));
  this.handle = null;
  this.server = net.createServer(assert.fail);
 
  if (fd >= 0)
    this.server.listen({ fd, backlog });
  else if (port >= 0) {
    this.server.listen({
      port,
      host: address,
      // Currently, net module only supports `ipv6Only` option in `flags`.
      ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
      backlog,
    });
  } else
    this.server.listen({
      path: address,
      backlog,
      readableAll,
      writableAll,
    });  // UNIX socket path.
  this.server.once('listening', () => {
    this.handle = this.server._handle;
    this.handle.onconnection = (err, handle) => this.distribute(err, handle);
    this.server._handle = null;
    this.server = null;
  });
}
 
RoundRobinHandle.prototype.add = function(worker, send) {
  assert(this.all.has(worker.id) === false);
  this.all.set(worker.id, worker);
 
  const done = () => {
    if (this.handle.getsockname) {
      const out = {};
      this.handle.getsockname(out);
      // TODO(bnoordhuis) Check err.
      send(null, { sockname: out }, null);
    } else {
      send(null, null, null);  // UNIX socket.
    }
 
    this.handoff(worker);  // In case there are connections pending.
  };
 
  if (this.server === null)
    return done();
 
  // Still busy binding.
  this.server.once('listening', done);
  this.server.once('error', (err) => {
    send(err.errno, null);
  });
};
 
RoundRobinHandle.prototype.remove = function(worker) {
  const existed = this.all.delete(worker.id);
 
  if (!existed)
    return false;
 
  this.free.delete(worker.id);
 
  if (this.all.size !== 0)
    return false;
 
  while (!isEmpty(this.handles)) {
    const handle = peek(this.handles);
    handle.close();
    remove(handle);
  }
 
  this.handle.close();
  this.handle = null;
  return true;
};
 
RoundRobinHandle.prototype.distribute = function(err, handle) {
  // If `accept` fails just skip it (handle is undefined)
  if (err) {
    return;
  }
  append(this.handles, handle);
  // eslint-disable-next-line node-core/no-array-destructuring
  const [ workerEntry ] = this.free; // this.free is a SafeMap
 
  if (ArrayIsArray(workerEntry)) {
    const { 0: workerId, 1: worker } = workerEntry;
    this.free.delete(workerId);
    this.handoff(worker);
  }
};
 
RoundRobinHandle.prototype.handoff = function(worker) {
  if (!this.all.has(worker.id)) {
    return;  // Worker is closing (or has closed) the server.
  }
 
  const handle = peek(this.handles);
 
  if (handle === null) {
    this.free.set(worker.id, worker);  // Add to ready queue again.
    return;
  }
 
  remove(handle);
 
  const message = { act: 'newconn', key: this.key };
 
  sendHelper(worker.process, message, handle, (reply) => {
    if (reply.accepted)
      handle.close();
    else
      this.distribute(0, handle);  // Worker is shutting down. Send to another.
 
    this.handoff(worker);
  });
};