All files / lib/internal/cluster round_robin_handle.js

97.64% Statements 124/127
90.32% Branches 28/31
100% Functions 7/7
97.64% Lines 124/127

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 12873x 73x 73x 73x 73x 73x 73x 73x 73x 73x 73x 73x 73x 73x 73x 73x 73x 31x 31x 31x 31x 31x 31x 31x 31x 31x 31x 31x 28x 28x 28x 28x 28x 28x 31x 31x 31x 31x 27x 27x 27x 27x 31x 31x 73x 73x 70x 70x 70x 70x 66x 64x 64x 64x 64x 66x 2x 2x 66x 66x 70x 70x 70x 70x 31x 31x 31x 31x 4x 31x 73x 73x 73x 67x 67x 67x 67x 63x 63x 63x 63x 67x 25x 67x     25x 25x 25x 25x 25x 73x 73x 73x 25x 25x 25x 25x 25x 25x 25x 25x 73x 73x 73x 116x 1x 1x 115x 115x 115x 116x 90x 90x 90x 25x 25x 25x 25x 25x 25x   25x 25x 25x 25x 73x  
'use strict';
 
const {
  ArrayIsArray,
  ArrayPrototypePush,
  ArrayPrototypeShift,
  Boolean,
  SafeMap,
} = primordials;
 
const assert = require('internal/assert');
const net = require('net');
const { sendHelper } = require('internal/cluster/utils');
const { constants } = internalBinding('tcp_wrap');
 
module.exports = RoundRobinHandle;
 
function RoundRobinHandle(key, address, { port, fd, flags }) {
  this.key = key;
  this.all = new SafeMap();
  this.free = new SafeMap();
  this.handles = [];
  this.handle = null;
  this.server = net.createServer(assert.fail);
 
  if (fd >= 0)
    this.server.listen({ fd });
  else if (port >= 0) {
    this.server.listen({
      port,
      host: address,
      // Currently, net module only supports `ipv6Only` option in `flags`.
      ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
    });
  } else
    this.server.listen(address);  // UNIX socket path.
 
  this.server.once('listening', () => {
    this.handle = this.server._handle;
    this.handle.onconnection = (err, handle) => this.distribute(err, handle);
    this.server._handle = null;
    this.server = null;
  });
}
 
RoundRobinHandle.prototype.add = function(worker, send) {
  assert(this.all.has(worker.id) === false);
  this.all.set(worker.id, worker);
 
  const done = () => {
    if (this.handle.getsockname) {
      const out = {};
      this.handle.getsockname(out);
      // TODO(bnoordhuis) Check err.
      send(null, { sockname: out }, null);
    } else {
      send(null, null, null);  // UNIX socket.
    }
 
    this.handoff(worker);  // In case there are connections pending.
  };
 
  if (this.server === null)
    return done();
 
  // Still busy binding.
  this.server.once('listening', done);
  this.server.once('error', (err) => {
    send(err.errno, null);
  });
};
 
RoundRobinHandle.prototype.remove = function(worker) {
  const existed = this.all.delete(worker.id);
 
  if (!existed)
    return false;
 
  this.free.delete(worker.id);
 
  if (this.all.size !== 0)
    return false;
 
  for (const handle of this.handles) {
    handle.close();
  }
  this.handles = [];
 
  this.handle.close();
  this.handle = null;
  return true;
};
 
RoundRobinHandle.prototype.distribute = function(err, handle) {
  ArrayPrototypePush(this.handles, handle);
  const [ workerEntry ] = this.free;
 
  if (ArrayIsArray(workerEntry)) {
    const [ workerId, worker ] = workerEntry;
    this.free.delete(workerId);
    this.handoff(worker);
  }
};
 
RoundRobinHandle.prototype.handoff = function(worker) {
  if (!this.all.has(worker.id)) {
    return;  // Worker is closing (or has closed) the server.
  }
 
  const handle = ArrayPrototypeShift(this.handles);
 
  if (handle === undefined) {
    this.free.set(worker.id, worker);  // Add to ready queue again.
    return;
  }
 
  const message = { act: 'newconn', key: this.key };
 
  sendHelper(worker.process, message, handle, (reply) => {
    if (reply.accepted)
      handle.close();
    else
      this.distribute(0, handle);  // Worker is shutting down. Send to another.
 
    this.handoff(worker);
  });
};