Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 | 74x 74x 74x 74x 74x 74x 74x 74x 74x 74x 74x 74x 74x 74x 74x 74x 74x 37x 37x 37x 37x 37x 37x 37x 37x 37x 37x 37x 34x 34x 34x 34x 34x 34x 34x 37x 37x 37x 37x 33x 33x 33x 33x 37x 37x 74x 74x 76x 76x 76x 76x 72x 70x 70x 70x 70x 72x 2x 2x 72x 72x 76x 76x 76x 76x 37x 37x 37x 37x 4x 37x 74x 74x 74x 73x 73x 73x 73x 69x 69x 69x 69x 73x 31x 73x 31x 31x 31x 31x 74x 74x 74x 25x 25x 25x 25x 25x 25x 25x 25x 25x 74x 74x 74x 122x 1x 1x 121x 121x 121x 122x 96x 96x 96x 25x 25x 25x 25x 25x 25x 25x 25x 25x 25x 25x 25x 74x | 'use strict'; const { ArrayIsArray, Boolean, ObjectCreate, SafeMap, } = primordials; const assert = require('internal/assert'); const net = require('net'); const { sendHelper } = require('internal/cluster/utils'); const { append, init, isEmpty, peek, remove } = require('internal/linkedlist'); const { constants } = internalBinding('tcp_wrap'); module.exports = RoundRobinHandle; function RoundRobinHandle(key, address, { port, fd, flags, backlog }) { this.key = key; this.all = new SafeMap(); this.free = new SafeMap(); this.handles = init(ObjectCreate(null)); this.handle = null; this.server = net.createServer(assert.fail); if (fd >= 0) this.server.listen({ fd, backlog }); else if (port >= 0) { this.server.listen({ port, host: address, // Currently, net module only supports `ipv6Only` option in `flags`. ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY), backlog, }); } else this.server.listen(address, backlog); // UNIX socket path. this.server.once('listening', () => { this.handle = this.server._handle; this.handle.onconnection = (err, handle) => this.distribute(err, handle); this.server._handle = null; this.server = null; }); } RoundRobinHandle.prototype.add = function(worker, send) { assert(this.all.has(worker.id) === false); this.all.set(worker.id, worker); const done = () => { if (this.handle.getsockname) { const out = {}; this.handle.getsockname(out); // TODO(bnoordhuis) Check err. send(null, { sockname: out }, null); } else { send(null, null, null); // UNIX socket. } this.handoff(worker); // In case there are connections pending. }; if (this.server === null) return done(); // Still busy binding. this.server.once('listening', done); this.server.once('error', (err) => { send(err.errno, null); }); }; RoundRobinHandle.prototype.remove = function(worker) { const existed = this.all.delete(worker.id); if (!existed) return false; this.free.delete(worker.id); if (this.all.size !== 0) return false; while (!isEmpty(this.handles)) { const handle = peek(this.handles); handle.close(); remove(handle); } this.handle.close(); this.handle = null; return true; }; RoundRobinHandle.prototype.distribute = function(err, handle) { append(this.handles, handle); // eslint-disable-next-line node-core/no-array-destructuring const [ workerEntry ] = this.free; // this.free is a SafeMap if (ArrayIsArray(workerEntry)) { const { 0: workerId, 1: worker } = workerEntry; this.free.delete(workerId); this.handoff(worker); } }; RoundRobinHandle.prototype.handoff = function(worker) { if (!this.all.has(worker.id)) { return; // Worker is closing (or has closed) the server. } const handle = peek(this.handles); if (handle === null) { this.free.set(worker.id, worker); // Add to ready queue again. return; } remove(handle); const message = { act: 'newconn', key: this.key }; sendHelper(worker.process, message, handle, (reply) => { if (reply.accepted) handle.close(); else this.distribute(0, handle); // Worker is shutting down. Send to another. this.handoff(worker); }); }; |