GCC Code Coverage Report
Directory: ./ Exec Total Coverage
File: node_worker.cc Lines: 435 463 94.0 %
Date: 2022-03-23 03:15:06 Branches: 160 220 72.7 %

Line Branch Exec Source
1
#include "node_worker.h"
2
#include "debug_utils-inl.h"
3
#include "histogram-inl.h"
4
#include "memory_tracker-inl.h"
5
#include "node_errors.h"
6
#include "node_external_reference.h"
7
#include "node_buffer.h"
8
#include "node_options-inl.h"
9
#include "node_perf.h"
10
#include "util-inl.h"
11
#include "async_wrap-inl.h"
12
13
#include <memory>
14
#include <string>
15
#include <vector>
16
17
using node::kAllowedInEnvironment;
18
using node::kDisallowedInEnvironment;
19
using v8::Array;
20
using v8::ArrayBuffer;
21
using v8::Boolean;
22
using v8::Context;
23
using v8::Float64Array;
24
using v8::FunctionCallbackInfo;
25
using v8::FunctionTemplate;
26
using v8::HandleScope;
27
using v8::Integer;
28
using v8::Isolate;
29
using v8::Local;
30
using v8::Locker;
31
using v8::Maybe;
32
using v8::MaybeLocal;
33
using v8::Null;
34
using v8::Number;
35
using v8::Object;
36
using v8::ResourceConstraints;
37
using v8::SealHandleScope;
38
using v8::String;
39
using v8::TryCatch;
40
using v8::Value;
41
42
namespace node {
43
namespace worker {
44
45
constexpr double kMB = 1024 * 1024;
46
47
814
Worker::Worker(Environment* env,
48
               Local<Object> wrap,
49
               const std::string& url,
50
               std::shared_ptr<PerIsolateOptions> per_isolate_opts,
51
               std::vector<std::string>&& exec_argv,
52
814
               std::shared_ptr<KVStore> env_vars)
53
    : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER),
54
      per_isolate_opts_(per_isolate_opts),
55
      exec_argv_(exec_argv),
56
814
      platform_(env->isolate_data()->platform()),
57
814
      thread_id_(AllocateEnvironmentThreadId()),
58
1628
      env_vars_(env_vars) {
59
814
  Debug(this, "Creating new worker instance with thread id %llu",
60
814
        thread_id_.id);
61
62
  // Set up everything that needs to be set up in the parent environment.
63
814
  parent_port_ = MessagePort::New(env, env->context());
64
814
  if (parent_port_ == nullptr) {
65
    // This can happen e.g. because execution is terminating.
66
    return;
67
  }
68
69
814
  child_port_data_ = std::make_unique<MessagePortData>(nullptr);
70
814
  MessagePort::Entangle(parent_port_, child_port_data_.get());
71
72
814
  object()->Set(env->context(),
73
                env->message_port_string(),
74
2442
                parent_port_->object()).Check();
75
76
814
  object()->Set(env->context(),
77
                env->thread_id_string(),
78
2442
                Number::New(env->isolate(), static_cast<double>(thread_id_.id)))
79
      .Check();
80
81
1628
  inspector_parent_handle_ = GetInspectorParentHandle(
82
814
      env, thread_id_, url.c_str());
83
84
1628
  argv_ = std::vector<std::string>{env->argv()[0]};
85
  // Mark this Worker object as weak until we actually start the thread.
86
814
  MakeWeak();
87
88
814
  Debug(this, "Preparation for worker %llu finished", thread_id_.id);
89
}
90
91
2297
bool Worker::is_stopped() const {
92
4594
  Mutex::ScopedLock lock(mutex_);
93
2297
  if (env_ != nullptr)
94
569
    return env_->is_stopping();
95
1728
  return stopped_;
96
}
97
98
585
void Worker::UpdateResourceConstraints(ResourceConstraints* constraints) {
99
585
  constraints->set_stack_limit(reinterpret_cast<uint32_t*>(stack_base_));
100
101
585
  if (resource_limits_[kMaxYoungGenerationSizeMb] > 0) {
102
1
    constraints->set_max_young_generation_size_in_bytes(
103
1
        static_cast<size_t>(resource_limits_[kMaxYoungGenerationSizeMb] * kMB));
104
  } else {
105
584
    resource_limits_[kMaxYoungGenerationSizeMb] =
106
584
        constraints->max_young_generation_size_in_bytes() / kMB;
107
  }
108
109
585
  if (resource_limits_[kMaxOldGenerationSizeMb] > 0) {
110
3
    constraints->set_max_old_generation_size_in_bytes(
111
3
        static_cast<size_t>(resource_limits_[kMaxOldGenerationSizeMb] * kMB));
112
  } else {
113
582
    resource_limits_[kMaxOldGenerationSizeMb] =
114
582
        constraints->max_old_generation_size_in_bytes() / kMB;
115
  }
116
117
585
  if (resource_limits_[kCodeRangeSizeMb] > 0) {
118
1
    constraints->set_code_range_size_in_bytes(
119
1
        static_cast<size_t>(resource_limits_[kCodeRangeSizeMb] * kMB));
120
  } else {
121
584
    resource_limits_[kCodeRangeSizeMb] =
122
584
        constraints->code_range_size_in_bytes() / kMB;
123
  }
124
585
}
125
126
// This class contains data that is only relevant to the child thread itself,
127
// and only while it is running.
128
// (Eventually, the Environment instance should probably also be moved here.)
129
class WorkerThreadData {
130
 public:
131
811
  explicit WorkerThreadData(Worker* w)
132
811
    : w_(w) {
133
811
    int ret = uv_loop_init(&loop_);
134
811
    if (ret != 0) {
135
      char err_buf[128];
136
226
      uv_err_name_r(ret, err_buf, sizeof(err_buf));
137
226
      w->Exit(1, "ERR_WORKER_INIT_FAILED", err_buf);
138
226
      return;
139
    }
140
585
    loop_init_failed_ = false;
141
585
    uv_loop_configure(&loop_, UV_METRICS_IDLE_TIME);
142
143
    std::shared_ptr<ArrayBufferAllocator> allocator =
144
585
        ArrayBufferAllocator::Create();
145
585
    Isolate::CreateParams params;
146
585
    SetIsolateCreateParamsForNode(&params);
147
585
    params.array_buffer_allocator_shared = allocator;
148
149
585
    w->UpdateResourceConstraints(&params.constraints);
150
151
585
    Isolate* isolate = Isolate::Allocate();
152
585
    if (isolate == nullptr) {
153
      w->Exit(1, "ERR_WORKER_INIT_FAILED", "Failed to create new Isolate");
154
      return;
155
    }
156
157
585
    w->platform_->RegisterIsolate(isolate, &loop_);
158
585
    Isolate::Initialize(isolate, params);
159
585
    SetIsolateUpForNode(isolate);
160
161
    // Be sure it's called before Environment::InitializeDiagnostics()
162
    // so that this callback stays when the callback of
163
    // --heapsnapshot-near-heap-limit gets is popped.
164
585
    isolate->AddNearHeapLimitCallback(Worker::NearHeapLimit, w);
165
166
    {
167
585
      Locker locker(isolate);
168
1170
      Isolate::Scope isolate_scope(isolate);
169
      // V8 computes its stack limit the first time a `Locker` is used based on
170
      // --stack-size. Reset it to the correct value.
171
585
      isolate->SetStackLimit(w->stack_base_);
172
173
585
      HandleScope handle_scope(isolate);
174
1170
      isolate_data_.reset(CreateIsolateData(isolate,
175
                                            &loop_,
176
585
                                            w_->platform_,
177
                                            allocator.get()));
178
585
      CHECK(isolate_data_);
179
585
      if (w_->per_isolate_opts_)
180
264
        isolate_data_->set_options(std::move(w_->per_isolate_opts_));
181
585
      isolate_data_->set_worker_context(w_);
182
585
      isolate_data_->max_young_gen_size =
183
585
          params.constraints.max_young_generation_size_in_bytes();
184
    }
185
186
585
    Mutex::ScopedLock lock(w_->mutex_);
187
585
    w_->isolate_ = isolate;
188
  }
189
190
811
  ~WorkerThreadData() {
191
811
    Debug(w_, "Worker %llu dispose isolate", w_->thread_id_.id);
192
    Isolate* isolate;
193
    {
194
811
      Mutex::ScopedLock lock(w_->mutex_);
195
811
      isolate = w_->isolate_;
196
811
      w_->isolate_ = nullptr;
197
    }
198
199
811
    if (isolate != nullptr) {
200
585
      CHECK(!loop_init_failed_);
201
585
      bool platform_finished = false;
202
203
585
      isolate_data_.reset();
204
205
585
      w_->platform_->AddIsolateFinishedCallback(isolate, [](void* data) {
206
585
        *static_cast<bool*>(data) = true;
207
585
      }, &platform_finished);
208
209
      // The order of these calls is important; if the Isolate is first disposed
210
      // and then unregistered, there is a race condition window in which no
211
      // new Isolate at the same address can successfully be registered with
212
      // the platform.
213
      // (Refs: https://github.com/nodejs/node/issues/30846)
214
585
      w_->platform_->UnregisterIsolate(isolate);
215
585
      isolate->Dispose();
216
217
      // Wait until the platform has cleaned up all relevant resources.
218
1170
      while (!platform_finished) {
219
585
        uv_run(&loop_, UV_RUN_ONCE);
220
      }
221
    }
222
811
    if (!loop_init_failed_) {
223
585
      CheckedUvLoopClose(&loop_);
224
    }
225
811
  }
226
227
585
  bool loop_is_usable() const { return !loop_init_failed_; }
228
229
 private:
230
  Worker* const w_;
231
  uv_loop_t loop_;
232
  bool loop_init_failed_ = true;
233
  DeleteFnPtr<IsolateData, FreeIsolateData> isolate_data_;
234
235
  friend class Worker;
236
};
237
238
3
size_t Worker::NearHeapLimit(void* data, size_t current_heap_limit,
239
                             size_t initial_heap_limit) {
240
3
  Worker* worker = static_cast<Worker*>(data);
241
3
  worker->Exit(1, "ERR_WORKER_OUT_OF_MEMORY", "JS heap out of memory");
242
  // Give the current GC some extra leeway to let it finish rather than
243
  // crash hard. We are not going to perform further allocations anyway.
244
3
  constexpr size_t kExtraHeapAllowance = 16 * 1024 * 1024;
245
3
  return current_heap_limit + kExtraHeapAllowance;
246
}
247
248
811
void Worker::Run() {
249
811
  std::string name = "WorkerThread ";
250
811
  name += std::to_string(thread_id_.id);
251

978
  TRACE_EVENT_METADATA1(
252
      "__metadata", "thread_name", "name",
253
      TRACE_STR_COPY(name.c_str()));
254
811
  CHECK_NOT_NULL(platform_);
255
256
811
  Debug(this, "Creating isolate for worker with id %llu", thread_id_.id);
257
258
811
  WorkerThreadData data(this);
259
811
  if (isolate_ == nullptr) return;
260
585
  CHECK(data.loop_is_usable());
261
262
585
  Debug(this, "Starting worker with id %llu", thread_id_.id);
263
  {
264
585
    Locker locker(isolate_);
265
585
    Isolate::Scope isolate_scope(isolate_);
266
585
    SealHandleScope outer_seal(isolate_);
267
268
585
    DeleteFnPtr<Environment, FreeEnvironment> env_;
269
585
    auto cleanup_env = OnScopeLeave([&]() {
270
      // TODO(addaleax): This call is harmless but should not be necessary.
271
      // Figure out why V8 is raising a DCHECK() here without it
272
      // (in test/parallel/test-async-hooks-worker-asyncfn-terminate-4.js).
273
585
      isolate_->CancelTerminateExecution();
274
275
585
      if (!env_) return;
276
570
      env_->set_can_call_into_js(false);
277
278
      {
279
570
        Mutex::ScopedLock lock(mutex_);
280
570
        stopped_ = true;
281
570
        this->env_ = nullptr;
282
      }
283
284
570
      env_.reset();
285
585
    });
286
287
585
    if (is_stopped()) return;
288
    {
289
577
      HandleScope handle_scope(isolate_);
290
      Local<Context> context;
291
      {
292
        // We create the Context object before we have an Environment* in place
293
        // that we could use for error handling. If creation fails due to
294
        // resource constraints, we need something in place to handle it,
295
        // though.
296
577
        TryCatch try_catch(isolate_);
297
577
        context = NewContext(isolate_);
298
577
        if (context.IsEmpty()) {
299
4
          Exit(1, "ERR_WORKER_INIT_FAILED", "Failed to create new Context");
300
4
          return;
301
        }
302
      }
303
304
573
      if (is_stopped()) return;
305
570
      CHECK(!context.IsEmpty());
306
570
      Context::Scope context_scope(context);
307
      {
308
1140
        env_.reset(CreateEnvironment(
309
            data.isolate_data_.get(),
310
            context,
311
570
            std::move(argv_),
312
570
            std::move(exec_argv_),
313
570
            static_cast<EnvironmentFlags::Flags>(environment_flags_),
314
            thread_id_,
315
570
            std::move(inspector_parent_handle_)));
316
570
        if (is_stopped()) return;
317
569
        CHECK_NOT_NULL(env_);
318
569
        env_->set_env_vars(std::move(env_vars_));
319
569
        SetProcessExitHandler(env_.get(), [this](Environment*, int exit_code) {
320
71
          Exit(exit_code);
321
71
        });
322
      }
323
      {
324
569
        Mutex::ScopedLock lock(mutex_);
325
569
        if (stopped_) return;
326
569
        this->env_ = env_.get();
327
      }
328
569
      Debug(this, "Created Environment for worker with id %llu", thread_id_.id);
329
569
      if (is_stopped()) return;
330
      {
331
569
        if (!CreateEnvMessagePort(env_.get())) {
332
          return;
333
        }
334
335
569
        Debug(this, "Created message port for worker %llu", thread_id_.id);
336
1138
        if (LoadEnvironment(env_.get(), StartExecutionCallback{}).IsEmpty())
337
          return;
338
339
569
        Debug(this, "Loaded environment for worker %llu", thread_id_.id);
340
      }
341
    }
342
343
    {
344
569
      Maybe<int> exit_code = SpinEventLoop(env_.get());
345
1138
      Mutex::ScopedLock lock(mutex_);
346

998
      if (exit_code_ == 0 && exit_code.IsJust()) {
347
410
        exit_code_ = exit_code.FromJust();
348
      }
349
350
569
      Debug(this, "Exiting thread for worker %llu with exit code %d",
351
569
            thread_id_.id, exit_code_);
352
    }
353
  }
354
355
569
  Debug(this, "Worker %llu thread stops", thread_id_.id);
356
}
357
358
569
bool Worker::CreateEnvMessagePort(Environment* env) {
359
1138
  HandleScope handle_scope(isolate_);
360
1138
  std::unique_ptr<MessagePortData> data;
361
  {
362
1138
    Mutex::ScopedLock lock(mutex_);
363
569
    data = std::move(child_port_data_);
364
  }
365
366
  // Set up the message channel for receiving messages in the child.
367
1138
  MessagePort* child_port = MessagePort::New(env,
368
                                             env->context(),
369
569
                                             std::move(data));
370
  // MessagePort::New() may return nullptr if execution is terminated
371
  // within it.
372
569
  if (child_port != nullptr)
373
569
    env->set_message_port(child_port->object(isolate_));
374
375
569
  return child_port;
376
}
377
378
816
void Worker::JoinThread() {
379
816
  if (!tid_.has_value())
380
5
    return;
381
811
  CHECK_EQ(uv_thread_join(&tid_.value()), 0);
382
811
  tid_.reset();
383
384
811
  env()->remove_sub_worker_context(this);
385
386
  {
387
1620
    HandleScope handle_scope(env()->isolate());
388
811
    Context::Scope context_scope(env()->context());
389
390
    // Reset the parent port as we're closing it now anyway.
391
811
    object()->Set(env()->context(),
392
                  env()->message_port_string(),
393
2433
                  Undefined(env()->isolate())).Check();
394
395
    Local<Value> args[] = {
396
        Integer::New(env()->isolate(), exit_code_),
397
811
        custom_error_ != nullptr
398
466
            ? OneByteString(env()->isolate(), custom_error_).As<Value>()
399
1734
            : Null(env()->isolate()).As<Value>(),
400
1622
        !custom_error_str_.empty()
401
233
            ? OneByteString(env()->isolate(), custom_error_str_.c_str())
402
233
                  .As<Value>()
403
1734
            : Null(env()->isolate()).As<Value>(),
404

2433
    };
405
406
811
    MakeCallback(env()->onexit_string(), arraysize(args), args);
407
  }
408
409
  // If we get here, the tid_.has_value() condition at the top of the function
410
  // implies that the thread was running. In that case, its final action will
411
  // be to schedule a callback on the parent thread which will delete this
412
  // object, so there's nothing more to do here.
413
}
414
415
3160
Worker::~Worker() {
416
3160
  Mutex::ScopedLock lock(mutex_);
417
418
1580
  CHECK(stopped_);
419
1580
  CHECK_NULL(env_);
420
1580
  CHECK(!tid_.has_value());
421
422
1580
  Debug(this, "Worker %llu destroyed", thread_id_.id);
423
3160
}
424
425
818
void Worker::New(const FunctionCallbackInfo<Value>& args) {
426
818
  Environment* env = Environment::GetCurrent(args);
427
818
  Isolate* isolate = args.GetIsolate();
428
429
818
  CHECK(args.IsConstructCall());
430
431
818
  if (env->isolate_data()->platform() == nullptr) {
432
    THROW_ERR_MISSING_PLATFORM_FOR_WORKER(env);
433
4
    return;
434
  }
435
436
818
  std::string url;
437
818
  std::shared_ptr<PerIsolateOptions> per_isolate_opts = nullptr;
438
818
  std::shared_ptr<KVStore> env_vars = nullptr;
439
440
818
  std::vector<std::string> exec_argv_out;
441
442
  // Argument might be a string or URL
443
1636
  if (!args[0]->IsNullOrUndefined()) {
444
    Utf8Value value(
445
1023
        isolate, args[0]->ToString(env->context()).FromMaybe(Local<String>()));
446
341
    url.append(value.out(), value.length());
447
  }
448
449
1636
  if (args[1]->IsNull()) {
450
    // Means worker.env = { ...process.env }.
451
791
    env_vars = env->env_vars()->Clone(isolate);
452
27
  } else if (args[1]->IsObject()) {
453
    // User provided env.
454
26
    env_vars = KVStore::CreateMapKVStore();
455
26
    env_vars->AssignFromObject(isolate->GetCurrentContext(),
456
78
                               args[1].As<Object>());
457
  } else {
458
    // Env is shared.
459
1
    env_vars = env->env_vars();
460
  }
461
462

1610
  if (args[1]->IsObject() || args[2]->IsArray()) {
463
268
    per_isolate_opts.reset(new PerIsolateOptions());
464
465
268
    HandleEnvOptions(per_isolate_opts->per_env, [&env_vars](const char* name) {
466
2144
      return env_vars->Get(name).FromMaybe("");
467
    });
468
469
#ifndef NODE_WITHOUT_NODE_OPTIONS
470
    MaybeLocal<String> maybe_node_opts =
471
268
        env_vars->Get(isolate, OneByteString(isolate, "NODE_OPTIONS"));
472
    Local<String> node_opts;
473
268
    if (maybe_node_opts.ToLocal(&node_opts)) {
474
801
      std::string node_options(*String::Utf8Value(isolate, node_opts));
475
267
      std::vector<std::string> errors{};
476
      std::vector<std::string> env_argv =
477
267
          ParseNodeOptionsEnvVar(node_options, &errors);
478
      // [0] is expected to be the program name, add dummy string.
479
267
      env_argv.insert(env_argv.begin(), "");
480
267
      std::vector<std::string> invalid_args{};
481
267
      options_parser::Parse(&env_argv,
482
                            nullptr,
483
                            &invalid_args,
484
                            per_isolate_opts.get(),
485
                            kAllowedInEnvironment,
486
                            &errors);
487

268
      if (!errors.empty() && args[1]->IsObject()) {
488
        // Only fail for explicitly provided env, this protects from failures
489
        // when NODE_OPTIONS from parent's env is used (which is the default).
490
        Local<Value> error;
491
2
        if (!ToV8Value(env->context(), errors).ToLocal(&error)) return;
492
        Local<String> key =
493
1
            FIXED_ONE_BYTE_STRING(env->isolate(), "invalidNodeOptions");
494
        // Ignore the return value of Set() because exceptions bubble up to JS
495
        // when we return anyway.
496
1
        USE(args.This()->Set(env->context(), key, error));
497
1
        return;
498
      }
499
    }
500
#endif  // NODE_WITHOUT_NODE_OPTIONS
501
  }
502
503
817
  if (args[2]->IsArray()) {
504
265
    Local<Array> array = args[2].As<Array>();
505
    // The first argument is reserved for program name, but we don't need it
506
    // in workers.
507
1060
    std::vector<std::string> exec_argv = {""};
508
265
    uint32_t length = array->Length();
509
443
    for (uint32_t i = 0; i < length; i++) {
510
      Local<Value> arg;
511
356
      if (!array->Get(env->context(), i).ToLocal(&arg)) {
512
        return;
513
      }
514
      Local<String> arg_v8;
515
356
      if (!arg->ToString(env->context()).ToLocal(&arg_v8)) {
516
        return;
517
      }
518
356
      Utf8Value arg_utf8_value(args.GetIsolate(), arg_v8);
519
356
      std::string arg_string(arg_utf8_value.out(), arg_utf8_value.length());
520
178
      exec_argv.push_back(arg_string);
521
    }
522
523
265
    std::vector<std::string> invalid_args{};
524
265
    std::vector<std::string> errors{};
525
    // Using invalid_args as the v8_args argument as it stores unknown
526
    // options for the per isolate parser.
527
265
    options_parser::Parse(
528
        &exec_argv,
529
        &exec_argv_out,
530
        &invalid_args,
531
        per_isolate_opts.get(),
532
        kDisallowedInEnvironment,
533
        &errors);
534
535
    // The first argument is program name.
536
265
    invalid_args.erase(invalid_args.begin());
537

265
    if (errors.size() > 0 || invalid_args.size() > 0) {
538
      Local<Value> error;
539
3
      if (!ToV8Value(env->context(),
540
3
                     errors.size() > 0 ? errors : invalid_args)
541
3
                         .ToLocal(&error)) {
542
        return;
543
      }
544
      Local<String> key =
545
3
          FIXED_ONE_BYTE_STRING(env->isolate(), "invalidExecArgv");
546
      // Ignore the return value of Set() because exceptions bubble up to JS
547
      // when we return anyway.
548
3
      USE(args.This()->Set(env->context(), key, error));
549
3
      return;
550
    }
551
  } else {
552
552
    exec_argv_out = env->exec_argv();
553
  }
554
555
  Worker* worker = new Worker(env,
556
1628
                              args.This(),
557
                              url,
558
                              per_isolate_opts,
559
814
                              std::move(exec_argv_out),
560
814
                              env_vars);
561
562
814
  CHECK(args[3]->IsFloat64Array());
563
1628
  Local<Float64Array> limit_info = args[3].As<Float64Array>();
564
814
  CHECK_EQ(limit_info->Length(), kTotalResourceLimitCount);
565
814
  limit_info->CopyContents(worker->resource_limits_,
566
                           sizeof(worker->resource_limits_));
567
568
814
  CHECK(args[4]->IsBoolean());
569

814
  if (args[4]->IsTrue() || env->tracks_unmanaged_fds())
570
813
    worker->environment_flags_ |= EnvironmentFlags::kTrackUnmanagedFds;
571
814
  if (env->hide_console_windows())
572
    worker->environment_flags_ |= EnvironmentFlags::kHideConsoleWindows;
573
814
  if (env->no_native_addons())
574
4
    worker->environment_flags_ |= EnvironmentFlags::kNoNativeAddons;
575
814
  if (env->no_global_search_paths())
576
    worker->environment_flags_ |= EnvironmentFlags::kNoGlobalSearchPaths;
577
814
  if (env->no_browser_globals())
578
    worker->environment_flags_ |= EnvironmentFlags::kNoBrowserGlobals;
579
}
580
581
811
void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
582
  Worker* w;
583
811
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
584
1622
  Mutex::ScopedLock lock(w->mutex_);
585
586
811
  w->stopped_ = false;
587
588
811
  if (w->resource_limits_[kStackSizeMb] > 0) {
589
9
    if (w->resource_limits_[kStackSizeMb] * kMB < kStackBufferSize) {
590
3
      w->resource_limits_[kStackSizeMb] = kStackBufferSize / kMB;
591
3
      w->stack_size_ = kStackBufferSize;
592
    } else {
593
12
      w->stack_size_ =
594
6
          static_cast<size_t>(w->resource_limits_[kStackSizeMb] * kMB);
595
    }
596
  } else {
597
802
    w->resource_limits_[kStackSizeMb] = w->stack_size_ / kMB;
598
  }
599
600
  uv_thread_options_t thread_options;
601
811
  thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
602
811
  thread_options.stack_size = w->stack_size_;
603
604
811
  uv_thread_t* tid = &w->tid_.emplace();  // Create uv_thread_t instance
605
811
  int ret = uv_thread_create_ex(tid, &thread_options, [](void* arg) {
606
    // XXX: This could become a std::unique_ptr, but that makes at least
607
    // gcc 6.3 detect undefined behaviour when there shouldn't be any.
608
    // gcc 7+ handles this well.
609
811
    Worker* w = static_cast<Worker*>(arg);
610
811
    const uintptr_t stack_top = reinterpret_cast<uintptr_t>(&arg);
611
612
    // Leave a few kilobytes just to make sure we're within limits and have
613
    // some space to do work in C++ land.
614
811
    w->stack_base_ = stack_top - (w->stack_size_ - kStackBufferSize);
615
616
811
    w->Run();
617
618
811
    Mutex::ScopedLock lock(w->mutex_);
619
811
    w->env()->SetImmediateThreadsafe(
620
789
        [w = std::unique_ptr<Worker>(w)](Environment* env) {
621
789
          if (w->has_ref_)
622
787
            env->add_refs(-1);
623
789
          w->JoinThread();
624
          // implicitly delete w
625
787
        });
626
811
  }, static_cast<void*>(w));
627
628
811
  if (ret == 0) {
629
    // The object now owns the created thread and should not be garbage
630
    // collected until that finishes.
631
811
    w->ClearWeak();
632
633
811
    if (w->has_ref_)
634
811
      w->env()->add_refs(1);
635
636
811
    w->env()->add_sub_worker_context(w);
637
  } else {
638
    w->stopped_ = true;
639
    w->tid_.reset();
640
641
    char err_buf[128];
642
    uv_err_name_r(ret, err_buf, sizeof(err_buf));
643
    {
644
      Isolate* isolate = w->env()->isolate();
645
      HandleScope handle_scope(isolate);
646
      THROW_ERR_WORKER_INIT_FAILED(isolate, err_buf);
647
    }
648
  }
649
}
650
651
236
void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
652
  Worker* w;
653
236
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
654
655
236
  Debug(w, "Worker %llu is getting stopped by parent", w->thread_id_.id);
656
236
  w->Exit(1);
657
}
658
659
238
void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
660
  Worker* w;
661
238
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
662

238
  if (!w->has_ref_ && w->tid_.has_value()) {
663
3
    w->has_ref_ = true;
664
3
    w->env()->add_refs(1);
665
  }
666
}
667
668
5
void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
669
  Worker* w;
670
5
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
671

5
  if (w->has_ref_ && w->tid_.has_value()) {
672
5
    w->has_ref_ = false;
673
5
    w->env()->add_refs(-1);
674
  }
675
}
676
677
1
void Worker::GetResourceLimits(const FunctionCallbackInfo<Value>& args) {
678
  Worker* w;
679
1
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
680
2
  args.GetReturnValue().Set(w->GetResourceLimits(args.GetIsolate()));
681
}
682
683
571
Local<Float64Array> Worker::GetResourceLimits(Isolate* isolate) const {
684
571
  Local<ArrayBuffer> ab = ArrayBuffer::New(isolate, sizeof(resource_limits_));
685
686
571
  memcpy(ab->GetBackingStore()->Data(),
687
571
         resource_limits_,
688
         sizeof(resource_limits_));
689
571
  return Float64Array::New(ab, 0, kTotalResourceLimitCount);
690
}
691
692
567
void Worker::Exit(int code, const char* error_code, const char* error_message) {
693
1134
  Mutex::ScopedLock lock(mutex_);
694
567
  Debug(this, "Worker %llu called Exit(%d, %s, %s)",
695
567
        thread_id_.id, code, error_code, error_message);
696
697
567
  if (error_code != nullptr) {
698
233
    custom_error_ = error_code;
699
233
    custom_error_str_ = error_message;
700
  }
701
702
567
  if (env_ != nullptr) {
703
161
    exit_code_ = code;
704
161
    Stop(env_);
705
  } else {
706
406
    stopped_ = true;
707
  }
708
567
}
709
710
void Worker::MemoryInfo(MemoryTracker* tracker) const {
711
  tracker->TrackField("parent_port", parent_port_);
712
}
713
714
bool Worker::IsNotIndicativeOfMemoryLeakAtExit() const {
715
  // Worker objects always stay alive as long as the child thread, regardless
716
  // of whether they are being referenced in the parent thread.
717
  return true;
718
}
719
720
class WorkerHeapSnapshotTaker : public AsyncWrap {
721
 public:
722
1
  WorkerHeapSnapshotTaker(Environment* env, Local<Object> obj)
723
1
    : AsyncWrap(env, obj, AsyncWrap::PROVIDER_WORKERHEAPSNAPSHOT) {}
724
725
  SET_NO_MEMORY_INFO()
726
  SET_MEMORY_INFO_NAME(WorkerHeapSnapshotTaker)
727
  SET_SELF_SIZE(WorkerHeapSnapshotTaker)
728
};
729
730
1
void Worker::TakeHeapSnapshot(const FunctionCallbackInfo<Value>& args) {
731
  Worker* w;
732
1
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
733
734
1
  Debug(w, "Worker %llu taking heap snapshot", w->thread_id_.id);
735
736
1
  Environment* env = w->env();
737
1
  AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
738
  Local<Object> wrap;
739
1
  if (!env->worker_heap_snapshot_taker_template()
740
2
      ->NewInstance(env->context()).ToLocal(&wrap)) {
741
    return;
742
  }
743
  BaseObjectPtr<WorkerHeapSnapshotTaker> taker =
744
1
      MakeDetachedBaseObject<WorkerHeapSnapshotTaker>(env, wrap);
745
746
  // Interrupt the worker thread and take a snapshot, then schedule a call
747
  // on the parent thread that turns that snapshot into a readable stream.
748
1
  bool scheduled = w->RequestInterrupt([taker, env](Environment* worker_env) {
749
    heap::HeapSnapshotPointer snapshot {
750
1
        worker_env->isolate()->GetHeapProfiler()->TakeHeapSnapshot() };
751
1
    CHECK(snapshot);
752
2
    env->SetImmediateThreadsafe(
753
1
        [taker, snapshot = std::move(snapshot)](Environment* env) mutable {
754
2
          HandleScope handle_scope(env->isolate());
755
1
          Context::Scope context_scope(env->context());
756
757
2
          AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker.get());
758
          BaseObjectPtr<AsyncWrap> stream = heap::CreateHeapSnapshotStream(
759
2
              env, std::move(snapshot));
760
1
          Local<Value> args[] = { stream->object() };
761
1
          taker->MakeCallback(env->ondone_string(), arraysize(args), args);
762
1
        }, CallbackFlags::kUnrefed);
763
1
  });
764
2
  args.GetReturnValue().Set(scheduled ? taker->object() : Local<Object>());
765
}
766
767
7
void Worker::LoopIdleTime(const FunctionCallbackInfo<Value>& args) {
768
  Worker* w;
769
7
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
770
771
7
  Mutex::ScopedLock lock(w->mutex_);
772
  // Using w->is_stopped() here leads to a deadlock, and checking is_stopped()
773
  // before locking the mutex is a race condition. So manually do the same
774
  // check.
775

7
  if (w->stopped_ || w->env_ == nullptr)
776
    return args.GetReturnValue().Set(-1);
777
778
7
  uint64_t idle_time = uv_metrics_idle_time(w->env_->event_loop());
779
14
  args.GetReturnValue().Set(1.0 * idle_time / 1e6);
780
}
781
782
1
void Worker::LoopStartTime(const FunctionCallbackInfo<Value>& args) {
783
  Worker* w;
784
1
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
785
786
1
  Mutex::ScopedLock lock(w->mutex_);
787
  // Using w->is_stopped() here leads to a deadlock, and checking is_stopped()
788
  // before locking the mutex is a race condition. So manually do the same
789
  // check.
790

1
  if (w->stopped_ || w->env_ == nullptr)
791
    return args.GetReturnValue().Set(-1);
792
793
1
  double loop_start_time = w->env_->performance_state()->milestones[
794
1
      node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START];
795
1
  CHECK_GE(loop_start_time, 0);
796
1
  args.GetReturnValue().Set(
797
1
      (loop_start_time - node::performance::timeOrigin) / 1e6);
798
}
799
800
namespace {
801
802
// Return the MessagePort that is global for this Environment and communicates
803
// with the internal [kPort] port of the JS Worker class in the parent thread.
804
1139
void GetEnvMessagePort(const FunctionCallbackInfo<Value>& args) {
805
1139
  Environment* env = Environment::GetCurrent(args);
806
1139
  Local<Object> port = env->message_port();
807

2278
  CHECK_IMPLIES(!env->is_main_thread(), !port.IsEmpty());
808
1139
  if (!port.IsEmpty()) {
809
3417
    CHECK_EQ(port->GetCreationContext().ToLocalChecked()->GetIsolate(),
810
             args.GetIsolate());
811
2278
    args.GetReturnValue().Set(port);
812
  }
813
1139
}
814
815
625
void InitWorker(Local<Object> target,
816
                Local<Value> unused,
817
                Local<Context> context,
818
                void* priv) {
819
625
  Environment* env = Environment::GetCurrent(context);
820
821
  {
822
625
    Local<FunctionTemplate> w = env->NewFunctionTemplate(Worker::New);
823
824
1250
    w->InstanceTemplate()->SetInternalFieldCount(
825
        Worker::kInternalFieldCount);
826
625
    w->Inherit(AsyncWrap::GetConstructorTemplate(env));
827
828
625
    env->SetProtoMethod(w, "startThread", Worker::StartThread);
829
625
    env->SetProtoMethod(w, "stopThread", Worker::StopThread);
830
625
    env->SetProtoMethod(w, "ref", Worker::Ref);
831
625
    env->SetProtoMethod(w, "unref", Worker::Unref);
832
625
    env->SetProtoMethod(w, "getResourceLimits", Worker::GetResourceLimits);
833
625
    env->SetProtoMethod(w, "takeHeapSnapshot", Worker::TakeHeapSnapshot);
834
625
    env->SetProtoMethod(w, "loopIdleTime", Worker::LoopIdleTime);
835
625
    env->SetProtoMethod(w, "loopStartTime", Worker::LoopStartTime);
836
837
625
    env->SetConstructorFunction(target, "Worker", w);
838
  }
839
840
  {
841
625
    Local<FunctionTemplate> wst = FunctionTemplate::New(env->isolate());
842
843
1250
    wst->InstanceTemplate()->SetInternalFieldCount(
844
        WorkerHeapSnapshotTaker::kInternalFieldCount);
845
625
    wst->Inherit(AsyncWrap::GetConstructorTemplate(env));
846
847
    Local<String> wst_string =
848
625
        FIXED_ONE_BYTE_STRING(env->isolate(), "WorkerHeapSnapshotTaker");
849
625
    wst->SetClassName(wst_string);
850
625
    env->set_worker_heap_snapshot_taker_template(wst->InstanceTemplate());
851
  }
852
853
625
  env->SetMethod(target, "getEnvMessagePort", GetEnvMessagePort);
854
855
  target
856
625
      ->Set(env->context(),
857
            env->thread_id_string(),
858
2500
            Number::New(env->isolate(), static_cast<double>(env->thread_id())))
859
      .Check();
860
861
  target
862
625
      ->Set(env->context(),
863
            FIXED_ONE_BYTE_STRING(env->isolate(), "isMainThread"),
864
2500
            Boolean::New(env->isolate(), env->is_main_thread()))
865
      .Check();
866
867
  target
868
625
      ->Set(env->context(),
869
            FIXED_ONE_BYTE_STRING(env->isolate(), "ownsProcessState"),
870
1875
            Boolean::New(env->isolate(), env->owns_process_state()))
871
      .Check();
872
873
625
  if (!env->is_main_thread()) {
874
    target
875
570
        ->Set(env->context(),
876
              FIXED_ONE_BYTE_STRING(env->isolate(), "resourceLimits"),
877
2280
              env->worker_context()->GetResourceLimits(env->isolate()))
878
        .Check();
879
  }
880
881
1875
  NODE_DEFINE_CONSTANT(target, kMaxYoungGenerationSizeMb);
882
1875
  NODE_DEFINE_CONSTANT(target, kMaxOldGenerationSizeMb);
883
1875
  NODE_DEFINE_CONSTANT(target, kCodeRangeSizeMb);
884
1875
  NODE_DEFINE_CONSTANT(target, kStackSizeMb);
885
1250
  NODE_DEFINE_CONSTANT(target, kTotalResourceLimitCount);
886
625
}
887
888
4968
void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
889
4968
  registry->Register(GetEnvMessagePort);
890
4968
  registry->Register(Worker::New);
891
4968
  registry->Register(Worker::StartThread);
892
4968
  registry->Register(Worker::StopThread);
893
4968
  registry->Register(Worker::Ref);
894
4968
  registry->Register(Worker::Unref);
895
4968
  registry->Register(Worker::GetResourceLimits);
896
4968
  registry->Register(Worker::TakeHeapSnapshot);
897
4968
  registry->Register(Worker::LoopIdleTime);
898
4968
  registry->Register(Worker::LoopStartTime);
899
4968
}
900
901
}  // anonymous namespace
902
}  // namespace worker
903
}  // namespace node
904
905
5029
NODE_MODULE_CONTEXT_AWARE_INTERNAL(worker, node::worker::InitWorker)
906
4968
NODE_MODULE_EXTERNAL_REFERENCE(worker, node::worker::RegisterExternalReferences)