GCC Code Coverage Report
Directory: ./ Exec Total Coverage
File: node_worker.cc Lines: 459 491 93.5 %
Date: 2022-12-31 04:22:30 Branches: 174 242 71.9 %

Line Branch Exec Source
1
#include "node_worker.h"
2
#include "debug_utils-inl.h"
3
#include "histogram-inl.h"
4
#include "memory_tracker-inl.h"
5
#include "node_errors.h"
6
#include "node_external_reference.h"
7
#include "node_buffer.h"
8
#include "node_options-inl.h"
9
#include "node_perf.h"
10
#include "node_snapshot_builder.h"
11
#include "util-inl.h"
12
#include "async_wrap-inl.h"
13
14
#include <memory>
15
#include <string>
16
#include <vector>
17
18
using node::kAllowedInEnvvar;
19
using node::kDisallowedInEnvvar;
20
using v8::Array;
21
using v8::ArrayBuffer;
22
using v8::Boolean;
23
using v8::Context;
24
using v8::Float64Array;
25
using v8::FunctionCallbackInfo;
26
using v8::FunctionTemplate;
27
using v8::HandleScope;
28
using v8::Integer;
29
using v8::Isolate;
30
using v8::Local;
31
using v8::Locker;
32
using v8::Maybe;
33
using v8::MaybeLocal;
34
using v8::Null;
35
using v8::Number;
36
using v8::Object;
37
using v8::ObjectTemplate;
38
using v8::ResourceConstraints;
39
using v8::SealHandleScope;
40
using v8::String;
41
using v8::TryCatch;
42
using v8::Value;
43
44
namespace node {
45
namespace worker {
46
47
constexpr double kMB = 1024 * 1024;
48
49
983
Worker::Worker(Environment* env,
50
               Local<Object> wrap,
51
               const std::string& url,
52
               std::shared_ptr<PerIsolateOptions> per_isolate_opts,
53
               std::vector<std::string>&& exec_argv,
54
               std::shared_ptr<KVStore> env_vars,
55
983
               const SnapshotData* snapshot_data)
56
    : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER),
57
      per_isolate_opts_(per_isolate_opts),
58
      exec_argv_(exec_argv),
59
983
      platform_(env->isolate_data()->platform()),
60
983
      thread_id_(AllocateEnvironmentThreadId()),
61
      env_vars_(env_vars),
62
1966
      snapshot_data_(snapshot_data) {
63
983
  Debug(this, "Creating new worker instance with thread id %llu",
64
983
        thread_id_.id);
65
66
  // Set up everything that needs to be set up in the parent environment.
67
983
  MessagePort* parent_port = MessagePort::New(env, env->context());
68
983
  if (parent_port == nullptr) {
69
    // This can happen e.g. because execution is terminating.
70
    return;
71
  }
72
73
983
  child_port_data_ = std::make_unique<MessagePortData>(nullptr);
74
983
  MessagePort::Entangle(parent_port, child_port_data_.get());
75
76
983
  object()
77
2949
      ->Set(env->context(), env->message_port_string(), parent_port->object())
78
      .Check();
79
80
983
  object()->Set(env->context(),
81
                env->thread_id_string(),
82
2949
                Number::New(env->isolate(), static_cast<double>(thread_id_.id)))
83
      .Check();
84
85
1966
  inspector_parent_handle_ = GetInspectorParentHandle(
86
983
      env, thread_id_, url.c_str());
87
88
1966
  argv_ = std::vector<std::string>{env->argv()[0]};
89
  // Mark this Worker object as weak until we actually start the thread.
90
983
  MakeWeak();
91
92
983
  Debug(this, "Preparation for worker %llu finished", thread_id_.id);
93
}
94
95
3052
bool Worker::is_stopped() const {
96
6104
  Mutex::ScopedLock lock(mutex_);
97
3052
  if (env_ != nullptr)
98
756
    return env_->is_stopping();
99
2296
  return stopped_;
100
}
101
102
768
void Worker::UpdateResourceConstraints(ResourceConstraints* constraints) {
103
768
  constraints->set_stack_limit(reinterpret_cast<uint32_t*>(stack_base_));
104
105
768
  if (resource_limits_[kMaxYoungGenerationSizeMb] > 0) {
106
1
    constraints->set_max_young_generation_size_in_bytes(
107
1
        static_cast<size_t>(resource_limits_[kMaxYoungGenerationSizeMb] * kMB));
108
  } else {
109
767
    resource_limits_[kMaxYoungGenerationSizeMb] =
110
767
        constraints->max_young_generation_size_in_bytes() / kMB;
111
  }
112
113
768
  if (resource_limits_[kMaxOldGenerationSizeMb] > 0) {
114
4
    constraints->set_max_old_generation_size_in_bytes(
115
4
        static_cast<size_t>(resource_limits_[kMaxOldGenerationSizeMb] * kMB));
116
  } else {
117
764
    resource_limits_[kMaxOldGenerationSizeMb] =
118
764
        constraints->max_old_generation_size_in_bytes() / kMB;
119
  }
120
121
768
  if (resource_limits_[kCodeRangeSizeMb] > 0) {
122
1
    constraints->set_code_range_size_in_bytes(
123
1
        static_cast<size_t>(resource_limits_[kCodeRangeSizeMb] * kMB));
124
  } else {
125
767
    resource_limits_[kCodeRangeSizeMb] =
126
767
        constraints->code_range_size_in_bytes() / kMB;
127
  }
128
768
}
129
130
// This class contains data that is only relevant to the child thread itself,
131
// and only while it is running.
132
// (Eventually, the Environment instance should probably also be moved here.)
133
class WorkerThreadData {
134
 public:
135
980
  explicit WorkerThreadData(Worker* w)
136
980
    : w_(w) {
137
980
    int ret = uv_loop_init(&loop_);
138
980
    if (ret != 0) {
139
      char err_buf[128];
140
212
      uv_err_name_r(ret, err_buf, sizeof(err_buf));
141
      // TODO(joyeecheung): maybe this should be kBootstrapFailure instead?
142
212
      w->Exit(ExitCode::kGenericUserError, "ERR_WORKER_INIT_FAILED", err_buf);
143
212
      return;
144
    }
145
768
    loop_init_failed_ = false;
146
768
    uv_loop_configure(&loop_, UV_METRICS_IDLE_TIME);
147
148
    std::shared_ptr<ArrayBufferAllocator> allocator =
149
768
        ArrayBufferAllocator::Create();
150
768
    Isolate::CreateParams params;
151
768
    SetIsolateCreateParamsForNode(&params);
152
768
    w->UpdateResourceConstraints(&params.constraints);
153
768
    params.array_buffer_allocator_shared = allocator;
154
    Isolate* isolate =
155
768
        NewIsolate(&params, &loop_, w->platform_, w->snapshot_data());
156
768
    if (isolate == nullptr) {
157
      // TODO(joyeecheung): maybe this should be kBootstrapFailure instead?
158
      w->Exit(ExitCode::kGenericUserError,
159
              "ERR_WORKER_INIT_FAILED",
160
              "Failed to create new Isolate");
161
      return;
162
    }
163
164
768
    SetIsolateUpForNode(isolate);
165
166
    // Be sure it's called before Environment::InitializeDiagnostics()
167
    // so that this callback stays when the callback of
168
    // --heapsnapshot-near-heap-limit gets is popped.
169
768
    isolate->AddNearHeapLimitCallback(Worker::NearHeapLimit, w);
170
171
    {
172
768
      Locker locker(isolate);
173
1536
      Isolate::Scope isolate_scope(isolate);
174
      // V8 computes its stack limit the first time a `Locker` is used based on
175
      // --stack-size. Reset it to the correct value.
176
768
      isolate->SetStackLimit(w->stack_base_);
177
178
768
      HandleScope handle_scope(isolate);
179
1536
      isolate_data_.reset(CreateIsolateData(isolate,
180
                                            &loop_,
181
768
                                            w_->platform_,
182
                                            allocator.get()));
183
768
      CHECK(isolate_data_);
184
768
      if (w_->per_isolate_opts_)
185
303
        isolate_data_->set_options(std::move(w_->per_isolate_opts_));
186
768
      isolate_data_->set_worker_context(w_);
187
768
      isolate_data_->max_young_gen_size =
188
768
          params.constraints.max_young_generation_size_in_bytes();
189
    }
190
191
768
    Mutex::ScopedLock lock(w_->mutex_);
192
768
    w_->isolate_ = isolate;
193
  }
194
195
980
  ~WorkerThreadData() {
196
1960
    Debug(w_, "Worker %llu dispose isolate", w_->thread_id_.id);
197
    Isolate* isolate;
198
    {
199
980
      Mutex::ScopedLock lock(w_->mutex_);
200
980
      isolate = w_->isolate_;
201
980
      w_->isolate_ = nullptr;
202
    }
203
204
980
    if (isolate != nullptr) {
205
768
      CHECK(!loop_init_failed_);
206
768
      bool platform_finished = false;
207
208
768
      isolate_data_.reset();
209
210
768
      w_->platform_->AddIsolateFinishedCallback(isolate, [](void* data) {
211
768
        *static_cast<bool*>(data) = true;
212
768
      }, &platform_finished);
213
214
      // The order of these calls is important; if the Isolate is first disposed
215
      // and then unregistered, there is a race condition window in which no
216
      // new Isolate at the same address can successfully be registered with
217
      // the platform.
218
      // (Refs: https://github.com/nodejs/node/issues/30846)
219
768
      w_->platform_->UnregisterIsolate(isolate);
220
768
      isolate->Dispose();
221
222
      // Wait until the platform has cleaned up all relevant resources.
223
1536
      while (!platform_finished) {
224
768
        uv_run(&loop_, UV_RUN_ONCE);
225
      }
226
    }
227
980
    if (!loop_init_failed_) {
228
768
      CheckedUvLoopClose(&loop_);
229
    }
230
980
  }
231
232
768
  bool loop_is_usable() const { return !loop_init_failed_; }
233
234
 private:
235
  Worker* const w_;
236
  uv_loop_t loop_;
237
  bool loop_init_failed_ = true;
238
  DeleteFnPtr<IsolateData, FreeIsolateData> isolate_data_;
239
  const SnapshotData* snapshot_data_ = nullptr;
240
  friend class Worker;
241
};
242
243
4
size_t Worker::NearHeapLimit(void* data, size_t current_heap_limit,
244
                             size_t initial_heap_limit) {
245
4
  Worker* worker = static_cast<Worker*>(data);
246
  // Give the current GC some extra leeway to let it finish rather than
247
  // crash hard. We are not going to perform further allocations anyway.
248
4
  constexpr size_t kExtraHeapAllowance = 16 * 1024 * 1024;
249
4
  size_t new_limit = current_heap_limit + kExtraHeapAllowance;
250
4
  Environment* env = worker->env();
251
4
  if (env != nullptr) {
252
    DCHECK(!env->is_in_heapsnapshot_heap_limit_callback());
253
    Debug(env,
254
          DebugCategory::DIAGNOSTICS,
255
          "Throwing ERR_WORKER_OUT_OF_MEMORY, "
256
          "new_limit=%" PRIu64 "\n",
257
8
          static_cast<uint64_t>(new_limit));
258
  }
259
  // TODO(joyeecheung): maybe this should be kV8FatalError instead?
260
4
  worker->Exit(ExitCode::kGenericUserError,
261
               "ERR_WORKER_OUT_OF_MEMORY",
262
               "JS heap out of memory");
263
4
  return new_limit;
264
}
265
266
980
void Worker::Run() {
267
980
  std::string name = "WorkerThread ";
268
980
  name += std::to_string(thread_id_.id);
269

1168
  TRACE_EVENT_METADATA1(
270
      "__metadata", "thread_name", "name",
271
      TRACE_STR_COPY(name.c_str()));
272
980
  CHECK_NOT_NULL(platform_);
273
274
980
  Debug(this, "Creating isolate for worker with id %llu", thread_id_.id);
275
276
980
  WorkerThreadData data(this);
277
980
  if (isolate_ == nullptr) return;
278
768
  CHECK(data.loop_is_usable());
279
280
768
  Debug(this, "Starting worker with id %llu", thread_id_.id);
281
  {
282
768
    Locker locker(isolate_);
283
768
    Isolate::Scope isolate_scope(isolate_);
284
768
    SealHandleScope outer_seal(isolate_);
285
286
768
    DeleteFnPtr<Environment, FreeEnvironment> env_;
287
768
    auto cleanup_env = OnScopeLeave([&]() {
288
      // TODO(addaleax): This call is harmless but should not be necessary.
289
      // Figure out why V8 is raising a DCHECK() here without it
290
      // (in test/parallel/test-async-hooks-worker-asyncfn-terminate-4.js).
291
768
      isolate_->CancelTerminateExecution();
292
293
768
      if (!env_) return;
294
764
      env_->set_can_call_into_js(false);
295
296
      {
297
764
        Mutex::ScopedLock lock(mutex_);
298
764
        stopped_ = true;
299
764
        this->env_ = nullptr;
300
      }
301
302
764
      env_.reset();
303
768
    });
304
305
768
    if (is_stopped()) return;
306
    {
307
764
      HandleScope handle_scope(isolate_);
308
      Local<Context> context;
309
      {
310
        // We create the Context object before we have an Environment* in place
311
        // that we could use for error handling. If creation fails due to
312
        // resource constraints, we need something in place to handle it,
313
        // though.
314
764
        TryCatch try_catch(isolate_);
315
764
        if (snapshot_data_ != nullptr) {
316
763
          context = Context::FromSnapshot(isolate_,
317
763
                                          SnapshotData::kNodeBaseContextIndex)
318
763
                        .ToLocalChecked();
319
1526
          if (!context.IsEmpty() &&
320

2289
              !InitializeContextRuntime(context).IsJust()) {
321
            context = Local<Context>();
322
          }
323
        } else {
324
1
          context = NewContext(isolate_);
325
        }
326
764
        if (context.IsEmpty()) {
327
          // TODO(joyeecheung): maybe this should be kBootstrapFailure instead?
328
          Exit(ExitCode::kGenericUserError,
329
               "ERR_WORKER_INIT_FAILED",
330
               "Failed to create new Context");
331
          return;
332
        }
333
      }
334
335
764
      if (is_stopped()) return;
336
764
      CHECK(!context.IsEmpty());
337
764
      Context::Scope context_scope(context);
338
      {
339
1528
        env_.reset(CreateEnvironment(
340
            data.isolate_data_.get(),
341
            context,
342
764
            std::move(argv_),
343
764
            std::move(exec_argv_),
344
764
            static_cast<EnvironmentFlags::Flags>(environment_flags_),
345
            thread_id_,
346
764
            std::move(inspector_parent_handle_)));
347
764
        if (is_stopped()) return;
348
756
        CHECK_NOT_NULL(env_);
349
756
        env_->set_env_vars(std::move(env_vars_));
350
756
        SetProcessExitHandler(env_.get(), [this](Environment*, int exit_code) {
351
62
          Exit(static_cast<ExitCode>(exit_code));
352
62
        });
353
      }
354
      {
355
756
        Mutex::ScopedLock lock(mutex_);
356
756
        if (stopped_) return;
357
756
        this->env_ = env_.get();
358
      }
359
756
      Debug(this, "Created Environment for worker with id %llu", thread_id_.id);
360
756
      if (is_stopped()) return;
361
      {
362
756
        if (!CreateEnvMessagePort(env_.get())) {
363
          return;
364
        }
365
366
756
        Debug(this, "Created message port for worker %llu", thread_id_.id);
367
1512
        if (LoadEnvironment(env_.get(), StartExecutionCallback{}).IsEmpty())
368
          return;
369
370
756
        Debug(this, "Loaded environment for worker %llu", thread_id_.id);
371
      }
372
    }
373
374
    {
375
756
      Maybe<ExitCode> exit_code = SpinEventLoopInternal(env_.get());
376
756
      Mutex::ScopedLock lock(mutex_);
377

1187
      if (exit_code_ == ExitCode::kNoFailure && exit_code.IsJust()) {
378
410
        exit_code_ = exit_code.FromJust();
379
      }
380
381
756
      Debug(this,
382
            "Exiting thread for worker %llu with exit code %d",
383
756
            thread_id_.id,
384
1512
            static_cast<int>(exit_code_));
385
    }
386
  }
387
388
756
  Debug(this, "Worker %llu thread stops", thread_id_.id);
389
}
390
391
756
bool Worker::CreateEnvMessagePort(Environment* env) {
392
1512
  HandleScope handle_scope(isolate_);
393
1512
  std::unique_ptr<MessagePortData> data;
394
  {
395
1512
    Mutex::ScopedLock lock(mutex_);
396
756
    data = std::move(child_port_data_);
397
  }
398
399
  // Set up the message channel for receiving messages in the child.
400
1512
  MessagePort* child_port = MessagePort::New(env,
401
                                             env->context(),
402
756
                                             std::move(data));
403
  // MessagePort::New() may return nullptr if execution is terminated
404
  // within it.
405
756
  if (child_port != nullptr)
406
756
    env->set_message_port(child_port->object(isolate_));
407
408
756
  return child_port;
409
}
410
411
984
void Worker::JoinThread() {
412
984
  if (!tid_.has_value())
413
4
    return;
414
980
  CHECK_EQ(uv_thread_join(&tid_.value()), 0);
415
980
  tid_.reset();
416
417
980
  env()->remove_sub_worker_context(this);
418
419
  {
420
1958
    HandleScope handle_scope(env()->isolate());
421
980
    Context::Scope context_scope(env()->context());
422
423
    // Reset the parent port as we're closing it now anyway.
424
980
    object()->Set(env()->context(),
425
                  env()->message_port_string(),
426
2940
                  Undefined(env()->isolate())).Check();
427
428
    Local<Value> args[] = {
429
980
        Integer::New(env()->isolate(), static_cast<int>(exit_code_)),
430
980
        custom_error_ != nullptr
431
432
            ? OneByteString(env()->isolate(), custom_error_).As<Value>()
432
2292
            : Null(env()->isolate()).As<Value>(),
433
1960
        !custom_error_str_.empty()
434
216
            ? OneByteString(env()->isolate(), custom_error_str_.c_str())
435
216
                  .As<Value>()
436
2292
            : Null(env()->isolate()).As<Value>(),
437

2940
    };
438
439
980
    MakeCallback(env()->onexit_string(), arraysize(args), args);
440
  }
441
442
  // If we get here, the tid_.has_value() condition at the top of the function
443
  // implies that the thread was running. In that case, its final action will
444
  // be to schedule a callback on the parent thread which will delete this
445
  // object, so there's nothing more to do here.
446
}
447
448
3832
Worker::~Worker() {
449
3832
  Mutex::ScopedLock lock(mutex_);
450
451
1916
  CHECK(stopped_);
452
1916
  CHECK_NULL(env_);
453
1916
  CHECK(!tid_.has_value());
454
455
1916
  Debug(this, "Worker %llu destroyed", thread_id_.id);
456
3832
}
457
458
987
void Worker::New(const FunctionCallbackInfo<Value>& args) {
459
987
  Environment* env = Environment::GetCurrent(args);
460
987
  Isolate* isolate = args.GetIsolate();
461
462
987
  CHECK(args.IsConstructCall());
463
464
987
  if (env->isolate_data()->platform() == nullptr) {
465
    THROW_ERR_MISSING_PLATFORM_FOR_WORKER(env);
466
4
    return;
467
  }
468
469
987
  std::string url;
470
987
  std::shared_ptr<PerIsolateOptions> per_isolate_opts = nullptr;
471
987
  std::shared_ptr<KVStore> env_vars = nullptr;
472
473
987
  std::vector<std::string> exec_argv_out;
474
475
  // Argument might be a string or URL
476
1974
  if (!args[0]->IsNullOrUndefined()) {
477
    Utf8Value value(
478
1515
        isolate, args[0]->ToString(env->context()).FromMaybe(Local<String>()));
479
505
    url.append(value.out(), value.length());
480
  }
481
482
1974
  if (args[1]->IsNull()) {
483
    // Means worker.env = { ...process.env }.
484
958
    env_vars = env->env_vars()->Clone(isolate);
485
29
  } else if (args[1]->IsObject()) {
486
    // User provided env.
487
28
    env_vars = KVStore::CreateMapKVStore();
488
28
    env_vars->AssignFromObject(isolate->GetCurrentContext(),
489
84
                               args[1].As<Object>());
490
  } else {
491
    // Env is shared.
492
1
    env_vars = env->env_vars();
493
  }
494
495

1946
  if (args[1]->IsObject() || args[2]->IsArray()) {
496
307
    per_isolate_opts.reset(new PerIsolateOptions());
497
498
307
    HandleEnvOptions(per_isolate_opts->per_env, [&env_vars](const char* name) {
499
2456
      return env_vars->Get(name).FromMaybe("");
500
    });
501
502
#ifndef NODE_WITHOUT_NODE_OPTIONS
503
    MaybeLocal<String> maybe_node_opts =
504
307
        env_vars->Get(isolate, OneByteString(isolate, "NODE_OPTIONS"));
505
    Local<String> node_opts;
506
307
    if (maybe_node_opts.ToLocal(&node_opts)) {
507
918
      std::string node_options(*String::Utf8Value(isolate, node_opts));
508
306
      std::vector<std::string> errors{};
509
      std::vector<std::string> env_argv =
510
306
          ParseNodeOptionsEnvVar(node_options, &errors);
511
      // [0] is expected to be the program name, add dummy string.
512
306
      env_argv.insert(env_argv.begin(), "");
513
306
      std::vector<std::string> invalid_args{};
514
306
      options_parser::Parse(&env_argv,
515
                            nullptr,
516
                            &invalid_args,
517
                            per_isolate_opts.get(),
518
                            kAllowedInEnvvar,
519
                            &errors);
520

307
      if (!errors.empty() && args[1]->IsObject()) {
521
        // Only fail for explicitly provided env, this protects from failures
522
        // when NODE_OPTIONS from parent's env is used (which is the default).
523
        Local<Value> error;
524
2
        if (!ToV8Value(env->context(), errors).ToLocal(&error)) return;
525
        Local<String> key =
526
1
            FIXED_ONE_BYTE_STRING(env->isolate(), "invalidNodeOptions");
527
        // Ignore the return value of Set() because exceptions bubble up to JS
528
        // when we return anyway.
529
1
        USE(args.This()->Set(env->context(), key, error));
530
1
        return;
531
      }
532
    }
533
#endif  // NODE_WITHOUT_NODE_OPTIONS
534
  }
535
536
986
  if (args[2]->IsArray()) {
537
303
    Local<Array> array = args[2].As<Array>();
538
    // The first argument is reserved for program name, but we don't need it
539
    // in workers.
540
1212
    std::vector<std::string> exec_argv = {""};
541
303
    uint32_t length = array->Length();
542
346
    for (uint32_t i = 0; i < length; i++) {
543
      Local<Value> arg;
544
86
      if (!array->Get(env->context(), i).ToLocal(&arg)) {
545
        return;
546
      }
547
      Local<String> arg_v8;
548
86
      if (!arg->ToString(env->context()).ToLocal(&arg_v8)) {
549
        return;
550
      }
551
86
      Utf8Value arg_utf8_value(args.GetIsolate(), arg_v8);
552
86
      std::string arg_string(arg_utf8_value.out(), arg_utf8_value.length());
553
43
      exec_argv.push_back(arg_string);
554
    }
555
556
303
    std::vector<std::string> invalid_args{};
557
303
    std::vector<std::string> errors{};
558
    // Using invalid_args as the v8_args argument as it stores unknown
559
    // options for the per isolate parser.
560
303
    options_parser::Parse(&exec_argv,
561
                          &exec_argv_out,
562
                          &invalid_args,
563
                          per_isolate_opts.get(),
564
                          kDisallowedInEnvvar,
565
                          &errors);
566
567
    // The first argument is program name.
568
303
    invalid_args.erase(invalid_args.begin());
569

303
    if (errors.size() > 0 || invalid_args.size() > 0) {
570
      Local<Value> error;
571
3
      if (!ToV8Value(env->context(),
572
3
                     errors.size() > 0 ? errors : invalid_args)
573
3
                         .ToLocal(&error)) {
574
        return;
575
      }
576
      Local<String> key =
577
3
          FIXED_ONE_BYTE_STRING(env->isolate(), "invalidExecArgv");
578
      // Ignore the return value of Set() because exceptions bubble up to JS
579
      // when we return anyway.
580
3
      USE(args.This()->Set(env->context(), key, error));
581
3
      return;
582
    }
583
  } else {
584
683
    exec_argv_out = env->exec_argv();
585
  }
586
587
983
  bool use_node_snapshot = per_process::cli_options->node_snapshot;
588
  const SnapshotData* snapshot_data =
589
983
      use_node_snapshot ? SnapshotBuilder::GetEmbeddedSnapshotData() : nullptr;
590
591
  Worker* worker = new Worker(env,
592
1966
                              args.This(),
593
                              url,
594
                              per_isolate_opts,
595
983
                              std::move(exec_argv_out),
596
                              env_vars,
597
983
                              snapshot_data);
598
599
983
  CHECK(args[3]->IsFloat64Array());
600
1966
  Local<Float64Array> limit_info = args[3].As<Float64Array>();
601
983
  CHECK_EQ(limit_info->Length(), kTotalResourceLimitCount);
602
983
  limit_info->CopyContents(worker->resource_limits_,
603
                           sizeof(worker->resource_limits_));
604
605
983
  CHECK(args[4]->IsBoolean());
606

983
  if (args[4]->IsTrue() || env->tracks_unmanaged_fds())
607
982
    worker->environment_flags_ |= EnvironmentFlags::kTrackUnmanagedFds;
608
983
  if (env->hide_console_windows())
609
    worker->environment_flags_ |= EnvironmentFlags::kHideConsoleWindows;
610
983
  if (env->no_native_addons())
611
4
    worker->environment_flags_ |= EnvironmentFlags::kNoNativeAddons;
612
983
  if (env->no_global_search_paths())
613
    worker->environment_flags_ |= EnvironmentFlags::kNoGlobalSearchPaths;
614
983
  if (env->no_browser_globals())
615
    worker->environment_flags_ |= EnvironmentFlags::kNoBrowserGlobals;
616
}
617
618
980
void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
619
  Worker* w;
620
980
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
621
1960
  Mutex::ScopedLock lock(w->mutex_);
622
623
980
  w->stopped_ = false;
624
625
980
  if (w->resource_limits_[kStackSizeMb] > 0) {
626
6
    if (w->resource_limits_[kStackSizeMb] * kMB < kStackBufferSize) {
627
      w->resource_limits_[kStackSizeMb] = kStackBufferSize / kMB;
628
      w->stack_size_ = kStackBufferSize;
629
    } else {
630
12
      w->stack_size_ =
631
6
          static_cast<size_t>(w->resource_limits_[kStackSizeMb] * kMB);
632
    }
633
  } else {
634
974
    w->resource_limits_[kStackSizeMb] = w->stack_size_ / kMB;
635
  }
636
637
  uv_thread_options_t thread_options;
638
980
  thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
639
980
  thread_options.stack_size = w->stack_size_;
640
641
980
  uv_thread_t* tid = &w->tid_.emplace();  // Create uv_thread_t instance
642
980
  int ret = uv_thread_create_ex(tid, &thread_options, [](void* arg) {
643
    // XXX: This could become a std::unique_ptr, but that makes at least
644
    // gcc 6.3 detect undefined behaviour when there shouldn't be any.
645
    // gcc 7+ handles this well.
646
980
    Worker* w = static_cast<Worker*>(arg);
647
980
    const uintptr_t stack_top = reinterpret_cast<uintptr_t>(&arg);
648
649
    // Leave a few kilobytes just to make sure we're within limits and have
650
    // some space to do work in C++ land.
651
980
    w->stack_base_ = stack_top - (w->stack_size_ - kStackBufferSize);
652
653
980
    w->Run();
654
655
980
    Mutex::ScopedLock lock(w->mutex_);
656
980
    w->env()->SetImmediateThreadsafe(
657
957
        [w = std::unique_ptr<Worker>(w)](Environment* env) {
658
957
          if (w->has_ref_)
659
955
            env->add_refs(-1);
660
957
          w->JoinThread();
661
          // implicitly delete w
662
955
        });
663
980
  }, static_cast<void*>(w));
664
665
980
  if (ret == 0) {
666
    // The object now owns the created thread and should not be garbage
667
    // collected until that finishes.
668
980
    w->ClearWeak();
669
670
980
    if (w->has_ref_)
671
980
      w->env()->add_refs(1);
672
673
980
    w->env()->add_sub_worker_context(w);
674
  } else {
675
    w->stopped_ = true;
676
    w->tid_.reset();
677
678
    char err_buf[128];
679
    uv_err_name_r(ret, err_buf, sizeof(err_buf));
680
    {
681
      Isolate* isolate = w->env()->isolate();
682
      HandleScope handle_scope(isolate);
683
      THROW_ERR_WORKER_INIT_FAILED(isolate, err_buf);
684
    }
685
  }
686
}
687
688
396
void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
689
  Worker* w;
690
396
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
691
692
396
  Debug(w, "Worker %llu is getting stopped by parent", w->thread_id_.id);
693
396
  w->Exit(ExitCode::kGenericUserError);
694
}
695
696
400
void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
697
  Worker* w;
698
400
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
699

400
  if (!w->has_ref_ && w->tid_.has_value()) {
700
5
    w->has_ref_ = true;
701
5
    w->env()->add_refs(1);
702
  }
703
}
704
705
9
void Worker::HasRef(const FunctionCallbackInfo<Value>& args) {
706
  Worker* w;
707
9
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
708
16
  args.GetReturnValue().Set(w->has_ref_);
709
}
710
711
7
void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
712
  Worker* w;
713
7
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
714

7
  if (w->has_ref_ && w->tid_.has_value()) {
715
7
    w->has_ref_ = false;
716
7
    w->env()->add_refs(-1);
717
  }
718
}
719
720
1
void Worker::GetResourceLimits(const FunctionCallbackInfo<Value>& args) {
721
  Worker* w;
722
1
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
723
2
  args.GetReturnValue().Set(w->GetResourceLimits(args.GetIsolate()));
724
}
725
726
765
Local<Float64Array> Worker::GetResourceLimits(Isolate* isolate) const {
727
765
  Local<ArrayBuffer> ab = ArrayBuffer::New(isolate, sizeof(resource_limits_));
728
729
765
  memcpy(ab->Data(), resource_limits_, sizeof(resource_limits_));
730
765
  return Float64Array::New(ab, 0, kTotalResourceLimitCount);
731
}
732
733
701
void Worker::Exit(ExitCode code,
734
                  const char* error_code,
735
                  const char* error_message) {
736
1402
  Mutex::ScopedLock lock(mutex_);
737
701
  Debug(this,
738
        "Worker %llu called Exit(%d, %s, %s)",
739
701
        thread_id_.id,
740
701
        static_cast<int>(code),
741
        error_code,
742
        error_message);
743
744
701
  if (error_code != nullptr) {
745
216
    custom_error_ = error_code;
746
216
    custom_error_str_ = error_message;
747
  }
748
749
701
  if (env_ != nullptr) {
750
350
    exit_code_ = code;
751
350
    Stop(env_);
752
  } else {
753
351
    stopped_ = true;
754
  }
755
701
}
756
757
bool Worker::IsNotIndicativeOfMemoryLeakAtExit() const {
758
  // Worker objects always stay alive as long as the child thread, regardless
759
  // of whether they are being referenced in the parent thread.
760
  return true;
761
}
762
763
class WorkerHeapSnapshotTaker : public AsyncWrap {
764
 public:
765
3
  WorkerHeapSnapshotTaker(Environment* env, Local<Object> obj)
766
3
    : AsyncWrap(env, obj, AsyncWrap::PROVIDER_WORKERHEAPSNAPSHOT) {}
767
768
  SET_NO_MEMORY_INFO()
769
  SET_MEMORY_INFO_NAME(WorkerHeapSnapshotTaker)
770
  SET_SELF_SIZE(WorkerHeapSnapshotTaker)
771
};
772
773
3
void Worker::TakeHeapSnapshot(const FunctionCallbackInfo<Value>& args) {
774
  Worker* w;
775
3
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
776
3
  CHECK_EQ(args.Length(), 1);
777
3
  auto options = heap::GetHeapSnapshotOptions(args[0]);
778
779
3
  Debug(w, "Worker %llu taking heap snapshot", w->thread_id_.id);
780
781
3
  Environment* env = w->env();
782
3
  AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
783
  Local<Object> wrap;
784
3
  if (!env->worker_heap_snapshot_taker_template()
785
6
      ->NewInstance(env->context()).ToLocal(&wrap)) {
786
    return;
787
  }
788
789
  // The created WorkerHeapSnapshotTaker is an object owned by main
790
  // thread's Isolate, it can not be accessed by worker thread
791
  std::unique_ptr<BaseObjectPtr<WorkerHeapSnapshotTaker>> taker =
792
      std::make_unique<BaseObjectPtr<WorkerHeapSnapshotTaker>>(
793
6
          MakeDetachedBaseObject<WorkerHeapSnapshotTaker>(env, wrap));
794
795
  // Interrupt the worker thread and take a snapshot, then schedule a call
796
  // on the parent thread that turns that snapshot into a readable stream.
797
3
  bool scheduled = w->RequestInterrupt([taker = std::move(taker), env, options](
798
6
                                           Environment* worker_env) mutable {
799
    heap::HeapSnapshotPointer snapshot{
800
3
        worker_env->isolate()->GetHeapProfiler()->TakeHeapSnapshot(options)};
801
3
    CHECK(snapshot);
802
803
    // Here, the worker thread temporarily owns the WorkerHeapSnapshotTaker
804
    // object.
805
806
6
    env->SetImmediateThreadsafe(
807
3
        [taker = std::move(taker),
808
3
         snapshot = std::move(snapshot)](Environment* env) mutable {
809
6
          HandleScope handle_scope(env->isolate());
810
3
          Context::Scope context_scope(env->context());
811
812
6
          AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker->get());
813
          BaseObjectPtr<AsyncWrap> stream =
814
6
              heap::CreateHeapSnapshotStream(env, std::move(snapshot));
815
3
          Local<Value> args[] = {stream->object()};
816
3
          taker->get()->MakeCallback(
817
3
              env->ondone_string(), arraysize(args), args);
818
          // implicitly delete `taker`
819
3
        },
820
        CallbackFlags::kUnrefed);
821
822
    // Now, the lambda is delivered to the main thread, as a result, the
823
    // WorkerHeapSnapshotTaker object is delivered to the main thread, too.
824
3
  });
825
826
3
  if (scheduled) {
827
6
    args.GetReturnValue().Set(wrap);
828
  } else {
829
    args.GetReturnValue().Set(Local<Object>());
830
  }
831
}
832
833
7
void Worker::LoopIdleTime(const FunctionCallbackInfo<Value>& args) {
834
  Worker* w;
835
7
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
836
837
7
  Mutex::ScopedLock lock(w->mutex_);
838
  // Using w->is_stopped() here leads to a deadlock, and checking is_stopped()
839
  // before locking the mutex is a race condition. So manually do the same
840
  // check.
841

7
  if (w->stopped_ || w->env_ == nullptr)
842
    return args.GetReturnValue().Set(-1);
843
844
7
  uint64_t idle_time = uv_metrics_idle_time(w->env_->event_loop());
845
14
  args.GetReturnValue().Set(1.0 * idle_time / 1e6);
846
}
847
848
1
void Worker::LoopStartTime(const FunctionCallbackInfo<Value>& args) {
849
  Worker* w;
850
1
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
851
852
1
  Mutex::ScopedLock lock(w->mutex_);
853
  // Using w->is_stopped() here leads to a deadlock, and checking is_stopped()
854
  // before locking the mutex is a race condition. So manually do the same
855
  // check.
856

1
  if (w->stopped_ || w->env_ == nullptr)
857
    return args.GetReturnValue().Set(-1);
858
859
1
  double loop_start_time = w->env_->performance_state()->milestones[
860
1
      node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START];
861
1
  CHECK_GE(loop_start_time, 0);
862
2
  args.GetReturnValue().Set(loop_start_time / 1e6);
863
}
864
865
namespace {
866
867
// Return the MessagePort that is global for this Environment and communicates
868
// with the internal [kPort] port of the JS Worker class in the parent thread.
869
1514
void GetEnvMessagePort(const FunctionCallbackInfo<Value>& args) {
870
1514
  Environment* env = Environment::GetCurrent(args);
871
1514
  Local<Object> port = env->message_port();
872

3028
  CHECK_IMPLIES(!env->is_main_thread(), !port.IsEmpty());
873
1514
  if (!port.IsEmpty()) {
874
4542
    CHECK_EQ(port->GetCreationContext().ToLocalChecked()->GetIsolate(),
875
             args.GetIsolate());
876
3028
    args.GetReturnValue().Set(port);
877
  }
878
1514
}
879
880
831
void CreateWorkerPerIsolateProperties(IsolateData* isolate_data,
881
                                      Local<FunctionTemplate> target) {
882
831
  Isolate* isolate = isolate_data->isolate();
883
831
  Local<ObjectTemplate> proto = target->PrototypeTemplate();
884
885
  {
886
831
    Local<FunctionTemplate> w = NewFunctionTemplate(isolate, Worker::New);
887
888
1662
    w->InstanceTemplate()->SetInternalFieldCount(
889
        Worker::kInternalFieldCount);
890
831
    w->Inherit(AsyncWrap::GetConstructorTemplate(isolate_data));
891
892
831
    SetProtoMethod(isolate, w, "startThread", Worker::StartThread);
893
831
    SetProtoMethod(isolate, w, "stopThread", Worker::StopThread);
894
831
    SetProtoMethod(isolate, w, "hasRef", Worker::HasRef);
895
831
    SetProtoMethod(isolate, w, "ref", Worker::Ref);
896
831
    SetProtoMethod(isolate, w, "unref", Worker::Unref);
897
831
    SetProtoMethod(isolate, w, "getResourceLimits", Worker::GetResourceLimits);
898
831
    SetProtoMethod(isolate, w, "takeHeapSnapshot", Worker::TakeHeapSnapshot);
899
831
    SetProtoMethod(isolate, w, "loopIdleTime", Worker::LoopIdleTime);
900
831
    SetProtoMethod(isolate, w, "loopStartTime", Worker::LoopStartTime);
901
902
831
    SetConstructorFunction(isolate, proto, "Worker", w);
903
  }
904
905
  {
906
831
    Local<FunctionTemplate> wst = NewFunctionTemplate(isolate, nullptr);
907
908
1662
    wst->InstanceTemplate()->SetInternalFieldCount(
909
        WorkerHeapSnapshotTaker::kInternalFieldCount);
910
831
    wst->Inherit(AsyncWrap::GetConstructorTemplate(isolate_data));
911
912
    Local<String> wst_string =
913
831
        FIXED_ONE_BYTE_STRING(isolate, "WorkerHeapSnapshotTaker");
914
831
    wst->SetClassName(wst_string);
915
831
    isolate_data->set_worker_heap_snapshot_taker_template(
916
        wst->InstanceTemplate());
917
  }
918
919
831
  SetMethod(isolate, proto, "getEnvMessagePort", GetEnvMessagePort);
920
831
}
921
922
827
void CreateWorkerPerContextProperties(Local<Object> target,
923
                                      Local<Value> unused,
924
                                      Local<Context> context,
925
                                      void* priv) {
926
827
  Environment* env = Environment::GetCurrent(context);
927
827
  Isolate* isolate = env->isolate();
928
929
  target
930
827
      ->Set(env->context(),
931
            env->thread_id_string(),
932
3308
            Number::New(isolate, static_cast<double>(env->thread_id())))
933
      .Check();
934
935
  target
936
827
      ->Set(env->context(),
937
            FIXED_ONE_BYTE_STRING(isolate, "isMainThread"),
938
3308
            Boolean::New(isolate, env->is_main_thread()))
939
      .Check();
940
941
  target
942
827
      ->Set(env->context(),
943
            FIXED_ONE_BYTE_STRING(isolate, "ownsProcessState"),
944
2481
            Boolean::New(isolate, env->owns_process_state()))
945
      .Check();
946
947
827
  if (!env->is_main_thread()) {
948
    target
949
764
        ->Set(env->context(),
950
              FIXED_ONE_BYTE_STRING(isolate, "resourceLimits"),
951
3056
              env->worker_context()->GetResourceLimits(isolate))
952
        .Check();
953
  }
954
955
2481
  NODE_DEFINE_CONSTANT(target, kMaxYoungGenerationSizeMb);
956
2481
  NODE_DEFINE_CONSTANT(target, kMaxOldGenerationSizeMb);
957
2481
  NODE_DEFINE_CONSTANT(target, kCodeRangeSizeMb);
958
2481
  NODE_DEFINE_CONSTANT(target, kStackSizeMb);
959
1654
  NODE_DEFINE_CONSTANT(target, kTotalResourceLimitCount);
960
827
}
961
962
5718
void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
963
5718
  registry->Register(GetEnvMessagePort);
964
5718
  registry->Register(Worker::New);
965
5718
  registry->Register(Worker::StartThread);
966
5718
  registry->Register(Worker::StopThread);
967
5718
  registry->Register(Worker::HasRef);
968
5718
  registry->Register(Worker::Ref);
969
5718
  registry->Register(Worker::Unref);
970
5718
  registry->Register(Worker::GetResourceLimits);
971
5718
  registry->Register(Worker::TakeHeapSnapshot);
972
5718
  registry->Register(Worker::LoopIdleTime);
973
5718
  registry->Register(Worker::LoopStartTime);
974
5718
}
975
976
}  // anonymous namespace
977
}  // namespace worker
978
}  // namespace node
979
980
5789
NODE_BINDING_CONTEXT_AWARE_INTERNAL(
981
    worker, node::worker::CreateWorkerPerContextProperties)
982
831
NODE_BINDING_PER_ISOLATE_INIT(worker,
983
                              node::worker::CreateWorkerPerIsolateProperties)
984
5718
NODE_BINDING_EXTERNAL_REFERENCE(worker,
985
                                node::worker::RegisterExternalReferences)