GCC Code Coverage Report
Directory: ./ Exec Total Coverage
File: node_worker.cc Lines: 458 490 93.5 %
Date: 2022-11-09 04:21:35 Branches: 176 244 72.1 %

Line Branch Exec Source
1
#include "node_worker.h"
2
#include "debug_utils-inl.h"
3
#include "histogram-inl.h"
4
#include "memory_tracker-inl.h"
5
#include "node_errors.h"
6
#include "node_external_reference.h"
7
#include "node_buffer.h"
8
#include "node_options-inl.h"
9
#include "node_perf.h"
10
#include "node_snapshot_builder.h"
11
#include "util-inl.h"
12
#include "async_wrap-inl.h"
13
14
#include <memory>
15
#include <string>
16
#include <vector>
17
18
using node::kAllowedInEnvvar;
19
using node::kDisallowedInEnvvar;
20
using v8::Array;
21
using v8::ArrayBuffer;
22
using v8::Boolean;
23
using v8::Context;
24
using v8::Float64Array;
25
using v8::FunctionCallbackInfo;
26
using v8::FunctionTemplate;
27
using v8::HandleScope;
28
using v8::Integer;
29
using v8::Isolate;
30
using v8::Local;
31
using v8::Locker;
32
using v8::Maybe;
33
using v8::MaybeLocal;
34
using v8::Null;
35
using v8::Number;
36
using v8::Object;
37
using v8::ResourceConstraints;
38
using v8::SealHandleScope;
39
using v8::String;
40
using v8::TryCatch;
41
using v8::Value;
42
43
namespace node {
44
namespace worker {
45
46
constexpr double kMB = 1024 * 1024;
47
48
969
Worker::Worker(Environment* env,
49
               Local<Object> wrap,
50
               const std::string& url,
51
               std::shared_ptr<PerIsolateOptions> per_isolate_opts,
52
               std::vector<std::string>&& exec_argv,
53
               std::shared_ptr<KVStore> env_vars,
54
969
               const SnapshotData* snapshot_data)
55
    : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER),
56
      per_isolate_opts_(per_isolate_opts),
57
      exec_argv_(exec_argv),
58
969
      platform_(env->isolate_data()->platform()),
59
969
      thread_id_(AllocateEnvironmentThreadId()),
60
      env_vars_(env_vars),
61
1938
      snapshot_data_(snapshot_data) {
62
969
  Debug(this, "Creating new worker instance with thread id %llu",
63
969
        thread_id_.id);
64
65
  // Set up everything that needs to be set up in the parent environment.
66
969
  MessagePort* parent_port = MessagePort::New(env, env->context());
67
969
  if (parent_port == nullptr) {
68
    // This can happen e.g. because execution is terminating.
69
    return;
70
  }
71
72
969
  child_port_data_ = std::make_unique<MessagePortData>(nullptr);
73
969
  MessagePort::Entangle(parent_port, child_port_data_.get());
74
75
969
  object()
76
2907
      ->Set(env->context(), env->message_port_string(), parent_port->object())
77
      .Check();
78
79
969
  object()->Set(env->context(),
80
                env->thread_id_string(),
81
2907
                Number::New(env->isolate(), static_cast<double>(thread_id_.id)))
82
      .Check();
83
84
1938
  inspector_parent_handle_ = GetInspectorParentHandle(
85
969
      env, thread_id_, url.c_str());
86
87
1938
  argv_ = std::vector<std::string>{env->argv()[0]};
88
  // Mark this Worker object as weak until we actually start the thread.
89
969
  MakeWeak();
90
91
969
  Debug(this, "Preparation for worker %llu finished", thread_id_.id);
92
}
93
94
2926
bool Worker::is_stopped() const {
95
5852
  Mutex::ScopedLock lock(mutex_);
96
2926
  if (env_ != nullptr)
97
726
    return env_->is_stopping();
98
2200
  return stopped_;
99
}
100
101
738
void Worker::UpdateResourceConstraints(ResourceConstraints* constraints) {
102
738
  constraints->set_stack_limit(reinterpret_cast<uint32_t*>(stack_base_));
103
104
738
  if (resource_limits_[kMaxYoungGenerationSizeMb] > 0) {
105
1
    constraints->set_max_young_generation_size_in_bytes(
106
1
        static_cast<size_t>(resource_limits_[kMaxYoungGenerationSizeMb] * kMB));
107
  } else {
108
737
    resource_limits_[kMaxYoungGenerationSizeMb] =
109
737
        constraints->max_young_generation_size_in_bytes() / kMB;
110
  }
111
112
738
  if (resource_limits_[kMaxOldGenerationSizeMb] > 0) {
113
4
    constraints->set_max_old_generation_size_in_bytes(
114
4
        static_cast<size_t>(resource_limits_[kMaxOldGenerationSizeMb] * kMB));
115
  } else {
116
734
    resource_limits_[kMaxOldGenerationSizeMb] =
117
734
        constraints->max_old_generation_size_in_bytes() / kMB;
118
  }
119
120
738
  if (resource_limits_[kCodeRangeSizeMb] > 0) {
121
1
    constraints->set_code_range_size_in_bytes(
122
1
        static_cast<size_t>(resource_limits_[kCodeRangeSizeMb] * kMB));
123
  } else {
124
737
    resource_limits_[kCodeRangeSizeMb] =
125
737
        constraints->code_range_size_in_bytes() / kMB;
126
  }
127
738
}
128
129
// This class contains data that is only relevant to the child thread itself,
130
// and only while it is running.
131
// (Eventually, the Environment instance should probably also be moved here.)
132
class WorkerThreadData {
133
 public:
134
966
  explicit WorkerThreadData(Worker* w)
135
966
    : w_(w) {
136
966
    int ret = uv_loop_init(&loop_);
137
966
    if (ret != 0) {
138
      char err_buf[128];
139
228
      uv_err_name_r(ret, err_buf, sizeof(err_buf));
140
      // TODO(joyeecheung): maybe this should be kBootstrapFailure instead?
141
228
      w->Exit(ExitCode::kGenericUserError, "ERR_WORKER_INIT_FAILED", err_buf);
142
228
      return;
143
    }
144
738
    loop_init_failed_ = false;
145
738
    uv_loop_configure(&loop_, UV_METRICS_IDLE_TIME);
146
147
    std::shared_ptr<ArrayBufferAllocator> allocator =
148
738
        ArrayBufferAllocator::Create();
149
738
    Isolate::CreateParams params;
150
738
    SetIsolateCreateParamsForNode(&params);
151
738
    params.array_buffer_allocator_shared = allocator;
152
153
738
    if (w->snapshot_data() != nullptr) {
154
737
      SnapshotBuilder::InitializeIsolateParams(w->snapshot_data(), &params);
155
    }
156
738
    w->UpdateResourceConstraints(&params.constraints);
157
158
738
    Isolate* isolate = Isolate::Allocate();
159
738
    if (isolate == nullptr) {
160
      // TODO(joyeecheung): maybe this should be kBootstrapFailure instead?
161
      w->Exit(ExitCode::kGenericUserError,
162
              "ERR_WORKER_INIT_FAILED",
163
              "Failed to create new Isolate");
164
      return;
165
    }
166
167
738
    w->platform_->RegisterIsolate(isolate, &loop_);
168
738
    Isolate::Initialize(isolate, params);
169
738
    SetIsolateUpForNode(isolate);
170
171
    // Be sure it's called before Environment::InitializeDiagnostics()
172
    // so that this callback stays when the callback of
173
    // --heapsnapshot-near-heap-limit gets is popped.
174
738
    isolate->AddNearHeapLimitCallback(Worker::NearHeapLimit, w);
175
176
    {
177
738
      Locker locker(isolate);
178
1476
      Isolate::Scope isolate_scope(isolate);
179
      // V8 computes its stack limit the first time a `Locker` is used based on
180
      // --stack-size. Reset it to the correct value.
181
738
      isolate->SetStackLimit(w->stack_base_);
182
183
738
      HandleScope handle_scope(isolate);
184
1476
      isolate_data_.reset(CreateIsolateData(isolate,
185
                                            &loop_,
186
738
                                            w_->platform_,
187
                                            allocator.get()));
188
738
      CHECK(isolate_data_);
189
738
      if (w_->per_isolate_opts_)
190
295
        isolate_data_->set_options(std::move(w_->per_isolate_opts_));
191
738
      isolate_data_->set_worker_context(w_);
192
738
      isolate_data_->max_young_gen_size =
193
738
          params.constraints.max_young_generation_size_in_bytes();
194
    }
195
196
738
    Mutex::ScopedLock lock(w_->mutex_);
197
738
    w_->isolate_ = isolate;
198
  }
199
200
966
  ~WorkerThreadData() {
201
966
    Debug(w_, "Worker %llu dispose isolate", w_->thread_id_.id);
202
    Isolate* isolate;
203
    {
204
966
      Mutex::ScopedLock lock(w_->mutex_);
205
966
      isolate = w_->isolate_;
206
966
      w_->isolate_ = nullptr;
207
    }
208
209
966
    if (isolate != nullptr) {
210
738
      CHECK(!loop_init_failed_);
211
738
      bool platform_finished = false;
212
213
738
      isolate_data_.reset();
214
215
738
      w_->platform_->AddIsolateFinishedCallback(isolate, [](void* data) {
216
738
        *static_cast<bool*>(data) = true;
217
738
      }, &platform_finished);
218
219
      // The order of these calls is important; if the Isolate is first disposed
220
      // and then unregistered, there is a race condition window in which no
221
      // new Isolate at the same address can successfully be registered with
222
      // the platform.
223
      // (Refs: https://github.com/nodejs/node/issues/30846)
224
738
      w_->platform_->UnregisterIsolate(isolate);
225
738
      isolate->Dispose();
226
227
      // Wait until the platform has cleaned up all relevant resources.
228
1476
      while (!platform_finished) {
229
738
        uv_run(&loop_, UV_RUN_ONCE);
230
      }
231
    }
232
966
    if (!loop_init_failed_) {
233
738
      CheckedUvLoopClose(&loop_);
234
    }
235
966
  }
236
237
738
  bool loop_is_usable() const { return !loop_init_failed_; }
238
239
 private:
240
  Worker* const w_;
241
  uv_loop_t loop_;
242
  bool loop_init_failed_ = true;
243
  DeleteFnPtr<IsolateData, FreeIsolateData> isolate_data_;
244
  const SnapshotData* snapshot_data_ = nullptr;
245
  friend class Worker;
246
};
247
248
4
size_t Worker::NearHeapLimit(void* data, size_t current_heap_limit,
249
                             size_t initial_heap_limit) {
250
4
  Worker* worker = static_cast<Worker*>(data);
251
  // Give the current GC some extra leeway to let it finish rather than
252
  // crash hard. We are not going to perform further allocations anyway.
253
4
  constexpr size_t kExtraHeapAllowance = 16 * 1024 * 1024;
254
4
  size_t new_limit = current_heap_limit + kExtraHeapAllowance;
255
4
  Environment* env = worker->env();
256
4
  if (env != nullptr) {
257
    DCHECK(!env->is_in_heapsnapshot_heap_limit_callback());
258
    Debug(env,
259
          DebugCategory::DIAGNOSTICS,
260
          "Throwing ERR_WORKER_OUT_OF_MEMORY, "
261
          "new_limit=%" PRIu64 "\n",
262
8
          static_cast<uint64_t>(new_limit));
263
  }
264
  // TODO(joyeecheung): maybe this should be kV8FatalError instead?
265
4
  worker->Exit(ExitCode::kGenericUserError,
266
               "ERR_WORKER_OUT_OF_MEMORY",
267
               "JS heap out of memory");
268
4
  return new_limit;
269
}
270
271
966
void Worker::Run() {
272
966
  std::string name = "WorkerThread ";
273
966
  name += std::to_string(thread_id_.id);
274

1150
  TRACE_EVENT_METADATA1(
275
      "__metadata", "thread_name", "name",
276
      TRACE_STR_COPY(name.c_str()));
277
966
  CHECK_NOT_NULL(platform_);
278
279
966
  Debug(this, "Creating isolate for worker with id %llu", thread_id_.id);
280
281
966
  WorkerThreadData data(this);
282
966
  if (isolate_ == nullptr) return;
283
738
  CHECK(data.loop_is_usable());
284
285
738
  Debug(this, "Starting worker with id %llu", thread_id_.id);
286
  {
287
738
    Locker locker(isolate_);
288
738
    Isolate::Scope isolate_scope(isolate_);
289
738
    SealHandleScope outer_seal(isolate_);
290
291
738
    DeleteFnPtr<Environment, FreeEnvironment> env_;
292
738
    auto cleanup_env = OnScopeLeave([&]() {
293
      // TODO(addaleax): This call is harmless but should not be necessary.
294
      // Figure out why V8 is raising a DCHECK() here without it
295
      // (in test/parallel/test-async-hooks-worker-asyncfn-terminate-4.js).
296
738
      isolate_->CancelTerminateExecution();
297
298
738
      if (!env_) return;
299
731
      env_->set_can_call_into_js(false);
300
301
      {
302
731
        Mutex::ScopedLock lock(mutex_);
303
731
        stopped_ = true;
304
731
        this->env_ = nullptr;
305
      }
306
307
731
      env_.reset();
308
738
    });
309
310
738
    if (is_stopped()) return;
311
    {
312
731
      HandleScope handle_scope(isolate_);
313
      Local<Context> context;
314
      {
315
        // We create the Context object before we have an Environment* in place
316
        // that we could use for error handling. If creation fails due to
317
        // resource constraints, we need something in place to handle it,
318
        // though.
319
731
        TryCatch try_catch(isolate_);
320
731
        if (snapshot_data_ != nullptr) {
321
730
          context = Context::FromSnapshot(isolate_,
322
730
                                          SnapshotData::kNodeBaseContextIndex)
323
730
                        .ToLocalChecked();
324
1460
          if (!context.IsEmpty() &&
325

2190
              !InitializeContextRuntime(context).IsJust()) {
326
            context = Local<Context>();
327
          }
328
        } else {
329
1
          context = NewContext(isolate_);
330
        }
331
731
        if (context.IsEmpty()) {
332
          // TODO(joyeecheung): maybe this should be kBootstrapFailure instead?
333
          Exit(ExitCode::kGenericUserError,
334
               "ERR_WORKER_INIT_FAILED",
335
               "Failed to create new Context");
336
          return;
337
        }
338
      }
339
340
731
      if (is_stopped()) return;
341
731
      CHECK(!context.IsEmpty());
342
731
      Context::Scope context_scope(context);
343
      {
344
1462
        env_.reset(CreateEnvironment(
345
            data.isolate_data_.get(),
346
            context,
347
731
            std::move(argv_),
348
731
            std::move(exec_argv_),
349
731
            static_cast<EnvironmentFlags::Flags>(environment_flags_),
350
            thread_id_,
351
731
            std::move(inspector_parent_handle_)));
352
731
        if (is_stopped()) return;
353
726
        CHECK_NOT_NULL(env_);
354
726
        env_->set_env_vars(std::move(env_vars_));
355
726
        SetProcessExitHandler(env_.get(), [this](Environment*, int exit_code) {
356
60
          Exit(static_cast<ExitCode>(exit_code));
357
60
        });
358
      }
359
      {
360
726
        Mutex::ScopedLock lock(mutex_);
361
726
        if (stopped_) return;
362
726
        this->env_ = env_.get();
363
      }
364
726
      Debug(this, "Created Environment for worker with id %llu", thread_id_.id);
365
726
      if (is_stopped()) return;
366
      {
367
726
        if (!CreateEnvMessagePort(env_.get())) {
368
          return;
369
        }
370
371
726
        Debug(this, "Created message port for worker %llu", thread_id_.id);
372
1452
        if (LoadEnvironment(env_.get(), StartExecutionCallback{}).IsEmpty())
373
          return;
374
375
726
        Debug(this, "Loaded environment for worker %llu", thread_id_.id);
376
      }
377
    }
378
379
    {
380
726
      Maybe<ExitCode> exit_code = SpinEventLoopInternal(env_.get());
381
726
      Mutex::ScopedLock lock(mutex_);
382

1104
      if (exit_code_ == ExitCode::kNoFailure && exit_code.IsJust()) {
383
359
        exit_code_ = exit_code.FromJust();
384
      }
385
386
726
      Debug(this,
387
            "Exiting thread for worker %llu with exit code %d",
388
726
            thread_id_.id,
389
1452
            static_cast<int>(exit_code_));
390
    }
391
  }
392
393
726
  Debug(this, "Worker %llu thread stops", thread_id_.id);
394
}
395
396
726
bool Worker::CreateEnvMessagePort(Environment* env) {
397
1452
  HandleScope handle_scope(isolate_);
398
1452
  std::unique_ptr<MessagePortData> data;
399
  {
400
1452
    Mutex::ScopedLock lock(mutex_);
401
726
    data = std::move(child_port_data_);
402
  }
403
404
  // Set up the message channel for receiving messages in the child.
405
1452
  MessagePort* child_port = MessagePort::New(env,
406
                                             env->context(),
407
726
                                             std::move(data));
408
  // MessagePort::New() may return nullptr if execution is terminated
409
  // within it.
410
726
  if (child_port != nullptr)
411
726
    env->set_message_port(child_port->object(isolate_));
412
413
726
  return child_port;
414
}
415
416
970
void Worker::JoinThread() {
417
970
  if (!tid_.has_value())
418
4
    return;
419
966
  CHECK_EQ(uv_thread_join(&tid_.value()), 0);
420
966
  tid_.reset();
421
422
966
  env()->remove_sub_worker_context(this);
423
424
  {
425
1930
    HandleScope handle_scope(env()->isolate());
426
966
    Context::Scope context_scope(env()->context());
427
428
    // Reset the parent port as we're closing it now anyway.
429
966
    object()->Set(env()->context(),
430
                  env()->message_port_string(),
431
2898
                  Undefined(env()->isolate())).Check();
432
433
    Local<Value> args[] = {
434
966
        Integer::New(env()->isolate(), static_cast<int>(exit_code_)),
435
966
        custom_error_ != nullptr
436
464
            ? OneByteString(env()->isolate(), custom_error_).As<Value>()
437
2202
            : Null(env()->isolate()).As<Value>(),
438
1932
        !custom_error_str_.empty()
439
232
            ? OneByteString(env()->isolate(), custom_error_str_.c_str())
440
232
                  .As<Value>()
441
2202
            : Null(env()->isolate()).As<Value>(),
442

2898
    };
443
444
966
    MakeCallback(env()->onexit_string(), arraysize(args), args);
445
  }
446
447
  // If we get here, the tid_.has_value() condition at the top of the function
448
  // implies that the thread was running. In that case, its final action will
449
  // be to schedule a callback on the parent thread which will delete this
450
  // object, so there's nothing more to do here.
451
}
452
453
3776
Worker::~Worker() {
454
3776
  Mutex::ScopedLock lock(mutex_);
455
456
1888
  CHECK(stopped_);
457
1888
  CHECK_NULL(env_);
458
1888
  CHECK(!tid_.has_value());
459
460
1888
  Debug(this, "Worker %llu destroyed", thread_id_.id);
461
3776
}
462
463
973
void Worker::New(const FunctionCallbackInfo<Value>& args) {
464
973
  Environment* env = Environment::GetCurrent(args);
465
973
  Isolate* isolate = args.GetIsolate();
466
467
973
  CHECK(args.IsConstructCall());
468
469
973
  if (env->isolate_data()->platform() == nullptr) {
470
    THROW_ERR_MISSING_PLATFORM_FOR_WORKER(env);
471
4
    return;
472
  }
473
474
973
  std::string url;
475
973
  std::shared_ptr<PerIsolateOptions> per_isolate_opts = nullptr;
476
973
  std::shared_ptr<KVStore> env_vars = nullptr;
477
478
973
  std::vector<std::string> exec_argv_out;
479
480
  // Argument might be a string or URL
481
1946
  if (!args[0]->IsNullOrUndefined()) {
482
    Utf8Value value(
483
1476
        isolate, args[0]->ToString(env->context()).FromMaybe(Local<String>()));
484
492
    url.append(value.out(), value.length());
485
  }
486
487
1946
  if (args[1]->IsNull()) {
488
    // Means worker.env = { ...process.env }.
489
945
    env_vars = env->env_vars()->Clone(isolate);
490
28
  } else if (args[1]->IsObject()) {
491
    // User provided env.
492
27
    env_vars = KVStore::CreateMapKVStore();
493
27
    env_vars->AssignFromObject(isolate->GetCurrentContext(),
494
81
                               args[1].As<Object>());
495
  } else {
496
    // Env is shared.
497
1
    env_vars = env->env_vars();
498
  }
499
500

1919
  if (args[1]->IsObject() || args[2]->IsArray()) {
501
299
    per_isolate_opts.reset(new PerIsolateOptions());
502
503
299
    HandleEnvOptions(per_isolate_opts->per_env, [&env_vars](const char* name) {
504
2392
      return env_vars->Get(name).FromMaybe("");
505
    });
506
507
#ifndef NODE_WITHOUT_NODE_OPTIONS
508
    MaybeLocal<String> maybe_node_opts =
509
299
        env_vars->Get(isolate, OneByteString(isolate, "NODE_OPTIONS"));
510
    Local<String> node_opts;
511
299
    if (maybe_node_opts.ToLocal(&node_opts)) {
512
894
      std::string node_options(*String::Utf8Value(isolate, node_opts));
513
298
      std::vector<std::string> errors{};
514
      std::vector<std::string> env_argv =
515
298
          ParseNodeOptionsEnvVar(node_options, &errors);
516
      // [0] is expected to be the program name, add dummy string.
517
298
      env_argv.insert(env_argv.begin(), "");
518
298
      std::vector<std::string> invalid_args{};
519
298
      options_parser::Parse(&env_argv,
520
                            nullptr,
521
                            &invalid_args,
522
                            per_isolate_opts.get(),
523
                            kAllowedInEnvvar,
524
                            &errors);
525

299
      if (!errors.empty() && args[1]->IsObject()) {
526
        // Only fail for explicitly provided env, this protects from failures
527
        // when NODE_OPTIONS from parent's env is used (which is the default).
528
        Local<Value> error;
529
2
        if (!ToV8Value(env->context(), errors).ToLocal(&error)) return;
530
        Local<String> key =
531
1
            FIXED_ONE_BYTE_STRING(env->isolate(), "invalidNodeOptions");
532
        // Ignore the return value of Set() because exceptions bubble up to JS
533
        // when we return anyway.
534
1
        USE(args.This()->Set(env->context(), key, error));
535
1
        return;
536
      }
537
    }
538
#endif  // NODE_WITHOUT_NODE_OPTIONS
539
  }
540
541
972
  if (args[2]->IsArray()) {
542
295
    Local<Array> array = args[2].As<Array>();
543
    // The first argument is reserved for program name, but we don't need it
544
    // in workers.
545
1180
    std::vector<std::string> exec_argv = {""};
546
295
    uint32_t length = array->Length();
547
389
    for (uint32_t i = 0; i < length; i++) {
548
      Local<Value> arg;
549
188
      if (!array->Get(env->context(), i).ToLocal(&arg)) {
550
        return;
551
      }
552
      Local<String> arg_v8;
553
188
      if (!arg->ToString(env->context()).ToLocal(&arg_v8)) {
554
        return;
555
      }
556
188
      Utf8Value arg_utf8_value(args.GetIsolate(), arg_v8);
557
188
      std::string arg_string(arg_utf8_value.out(), arg_utf8_value.length());
558
94
      exec_argv.push_back(arg_string);
559
    }
560
561
295
    std::vector<std::string> invalid_args{};
562
295
    std::vector<std::string> errors{};
563
    // Using invalid_args as the v8_args argument as it stores unknown
564
    // options for the per isolate parser.
565
295
    options_parser::Parse(&exec_argv,
566
                          &exec_argv_out,
567
                          &invalid_args,
568
                          per_isolate_opts.get(),
569
                          kDisallowedInEnvvar,
570
                          &errors);
571
572
    // The first argument is program name.
573
295
    invalid_args.erase(invalid_args.begin());
574

295
    if (errors.size() > 0 || invalid_args.size() > 0) {
575
      Local<Value> error;
576
3
      if (!ToV8Value(env->context(),
577
3
                     errors.size() > 0 ? errors : invalid_args)
578
3
                         .ToLocal(&error)) {
579
        return;
580
      }
581
      Local<String> key =
582
3
          FIXED_ONE_BYTE_STRING(env->isolate(), "invalidExecArgv");
583
      // Ignore the return value of Set() because exceptions bubble up to JS
584
      // when we return anyway.
585
3
      USE(args.This()->Set(env->context(), key, error));
586
3
      return;
587
    }
588
  } else {
589
677
    exec_argv_out = env->exec_argv();
590
  }
591
592
969
  bool use_node_snapshot = per_process::cli_options->node_snapshot;
593
  const SnapshotData* snapshot_data =
594
969
      use_node_snapshot ? SnapshotBuilder::GetEmbeddedSnapshotData() : nullptr;
595
596
  Worker* worker = new Worker(env,
597
1938
                              args.This(),
598
                              url,
599
                              per_isolate_opts,
600
969
                              std::move(exec_argv_out),
601
                              env_vars,
602
969
                              snapshot_data);
603
604
969
  CHECK(args[3]->IsFloat64Array());
605
1938
  Local<Float64Array> limit_info = args[3].As<Float64Array>();
606
969
  CHECK_EQ(limit_info->Length(), kTotalResourceLimitCount);
607
969
  limit_info->CopyContents(worker->resource_limits_,
608
                           sizeof(worker->resource_limits_));
609
610
969
  CHECK(args[4]->IsBoolean());
611

969
  if (args[4]->IsTrue() || env->tracks_unmanaged_fds())
612
968
    worker->environment_flags_ |= EnvironmentFlags::kTrackUnmanagedFds;
613
969
  if (env->hide_console_windows())
614
    worker->environment_flags_ |= EnvironmentFlags::kHideConsoleWindows;
615
969
  if (env->no_native_addons())
616
4
    worker->environment_flags_ |= EnvironmentFlags::kNoNativeAddons;
617
969
  if (env->no_global_search_paths())
618
    worker->environment_flags_ |= EnvironmentFlags::kNoGlobalSearchPaths;
619
969
  if (env->no_browser_globals())
620
    worker->environment_flags_ |= EnvironmentFlags::kNoBrowserGlobals;
621
}
622
623
966
void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
624
  Worker* w;
625
966
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
626
1932
  Mutex::ScopedLock lock(w->mutex_);
627
628
966
  w->stopped_ = false;
629
630
966
  if (w->resource_limits_[kStackSizeMb] > 0) {
631
6
    if (w->resource_limits_[kStackSizeMb] * kMB < kStackBufferSize) {
632
      w->resource_limits_[kStackSizeMb] = kStackBufferSize / kMB;
633
      w->stack_size_ = kStackBufferSize;
634
    } else {
635
12
      w->stack_size_ =
636
6
          static_cast<size_t>(w->resource_limits_[kStackSizeMb] * kMB);
637
    }
638
  } else {
639
960
    w->resource_limits_[kStackSizeMb] = w->stack_size_ / kMB;
640
  }
641
642
  uv_thread_options_t thread_options;
643
966
  thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
644
966
  thread_options.stack_size = w->stack_size_;
645
646
966
  uv_thread_t* tid = &w->tid_.emplace();  // Create uv_thread_t instance
647
966
  int ret = uv_thread_create_ex(tid, &thread_options, [](void* arg) {
648
    // XXX: This could become a std::unique_ptr, but that makes at least
649
    // gcc 6.3 detect undefined behaviour when there shouldn't be any.
650
    // gcc 7+ handles this well.
651
966
    Worker* w = static_cast<Worker*>(arg);
652
966
    const uintptr_t stack_top = reinterpret_cast<uintptr_t>(&arg);
653
654
    // Leave a few kilobytes just to make sure we're within limits and have
655
    // some space to do work in C++ land.
656
966
    w->stack_base_ = stack_top - (w->stack_size_ - kStackBufferSize);
657
658
966
    w->Run();
659
660
966
    Mutex::ScopedLock lock(w->mutex_);
661
966
    w->env()->SetImmediateThreadsafe(
662
943
        [w = std::unique_ptr<Worker>(w)](Environment* env) {
663
943
          if (w->has_ref_)
664
941
            env->add_refs(-1);
665
943
          w->JoinThread();
666
          // implicitly delete w
667
941
        });
668
966
  }, static_cast<void*>(w));
669
670
966
  if (ret == 0) {
671
    // The object now owns the created thread and should not be garbage
672
    // collected until that finishes.
673
966
    w->ClearWeak();
674
675
966
    if (w->has_ref_)
676
966
      w->env()->add_refs(1);
677
678
966
    w->env()->add_sub_worker_context(w);
679
  } else {
680
    w->stopped_ = true;
681
    w->tid_.reset();
682
683
    char err_buf[128];
684
    uv_err_name_r(ret, err_buf, sizeof(err_buf));
685
    {
686
      Isolate* isolate = w->env()->isolate();
687
      HandleScope handle_scope(isolate);
688
      THROW_ERR_WORKER_INIT_FAILED(isolate, err_buf);
689
    }
690
  }
691
}
692
693
389
void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
694
  Worker* w;
695
389
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
696
697
389
  Debug(w, "Worker %llu is getting stopped by parent", w->thread_id_.id);
698
389
  w->Exit(ExitCode::kGenericUserError);
699
}
700
701
393
void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
702
  Worker* w;
703
393
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
704

393
  if (!w->has_ref_ && w->tid_.has_value()) {
705
5
    w->has_ref_ = true;
706
5
    w->env()->add_refs(1);
707
  }
708
}
709
710
9
void Worker::HasRef(const FunctionCallbackInfo<Value>& args) {
711
  Worker* w;
712
9
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
713
16
  args.GetReturnValue().Set(w->has_ref_);
714
}
715
716
7
void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
717
  Worker* w;
718
7
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
719

7
  if (w->has_ref_ && w->tid_.has_value()) {
720
7
    w->has_ref_ = false;
721
7
    w->env()->add_refs(-1);
722
  }
723
}
724
725
1
void Worker::GetResourceLimits(const FunctionCallbackInfo<Value>& args) {
726
  Worker* w;
727
1
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
728
2
  args.GetReturnValue().Set(w->GetResourceLimits(args.GetIsolate()));
729
}
730
731
732
Local<Float64Array> Worker::GetResourceLimits(Isolate* isolate) const {
732
732
  Local<ArrayBuffer> ab = ArrayBuffer::New(isolate, sizeof(resource_limits_));
733
734
732
  memcpy(ab->Data(), resource_limits_, sizeof(resource_limits_));
735
732
  return Float64Array::New(ab, 0, kTotalResourceLimitCount);
736
}
737
738
708
void Worker::Exit(ExitCode code,
739
                  const char* error_code,
740
                  const char* error_message) {
741
1416
  Mutex::ScopedLock lock(mutex_);
742
708
  Debug(this,
743
        "Worker %llu called Exit(%d, %s, %s)",
744
708
        thread_id_.id,
745
708
        static_cast<int>(code),
746
        error_code,
747
        error_message);
748
749
708
  if (error_code != nullptr) {
750
232
    custom_error_ = error_code;
751
232
    custom_error_str_ = error_message;
752
  }
753
754
708
  if (env_ != nullptr) {
755
368
    exit_code_ = code;
756
368
    Stop(env_);
757
  } else {
758
340
    stopped_ = true;
759
  }
760
708
}
761
762
bool Worker::IsNotIndicativeOfMemoryLeakAtExit() const {
763
  // Worker objects always stay alive as long as the child thread, regardless
764
  // of whether they are being referenced in the parent thread.
765
  return true;
766
}
767
768
class WorkerHeapSnapshotTaker : public AsyncWrap {
769
 public:
770
3
  WorkerHeapSnapshotTaker(Environment* env, Local<Object> obj)
771
3
    : AsyncWrap(env, obj, AsyncWrap::PROVIDER_WORKERHEAPSNAPSHOT) {}
772
773
  SET_NO_MEMORY_INFO()
774
  SET_MEMORY_INFO_NAME(WorkerHeapSnapshotTaker)
775
  SET_SELF_SIZE(WorkerHeapSnapshotTaker)
776
};
777
778
3
void Worker::TakeHeapSnapshot(const FunctionCallbackInfo<Value>& args) {
779
  Worker* w;
780
3
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
781
3
  CHECK_EQ(args.Length(), 1);
782
3
  auto options = heap::GetHeapSnapshotOptions(args[0]);
783
784
3
  Debug(w, "Worker %llu taking heap snapshot", w->thread_id_.id);
785
786
3
  Environment* env = w->env();
787
3
  AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
788
  Local<Object> wrap;
789
3
  if (!env->worker_heap_snapshot_taker_template()
790
6
      ->NewInstance(env->context()).ToLocal(&wrap)) {
791
    return;
792
  }
793
794
  // The created WorkerHeapSnapshotTaker is an object owned by main
795
  // thread's Isolate, it can not be accessed by worker thread
796
  std::unique_ptr<BaseObjectPtr<WorkerHeapSnapshotTaker>> taker =
797
      std::make_unique<BaseObjectPtr<WorkerHeapSnapshotTaker>>(
798
6
          MakeDetachedBaseObject<WorkerHeapSnapshotTaker>(env, wrap));
799
800
  // Interrupt the worker thread and take a snapshot, then schedule a call
801
  // on the parent thread that turns that snapshot into a readable stream.
802
3
  bool scheduled = w->RequestInterrupt([taker = std::move(taker), env, options](
803
6
                                           Environment* worker_env) mutable {
804
    heap::HeapSnapshotPointer snapshot{
805
3
        worker_env->isolate()->GetHeapProfiler()->TakeHeapSnapshot(options)};
806
3
    CHECK(snapshot);
807
808
    // Here, the worker thread temporarily owns the WorkerHeapSnapshotTaker
809
    // object.
810
811
6
    env->SetImmediateThreadsafe(
812
3
        [taker = std::move(taker),
813
3
         snapshot = std::move(snapshot)](Environment* env) mutable {
814
6
          HandleScope handle_scope(env->isolate());
815
3
          Context::Scope context_scope(env->context());
816
817
6
          AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker->get());
818
          BaseObjectPtr<AsyncWrap> stream =
819
6
              heap::CreateHeapSnapshotStream(env, std::move(snapshot));
820
3
          Local<Value> args[] = {stream->object()};
821
3
          taker->get()->MakeCallback(
822
3
              env->ondone_string(), arraysize(args), args);
823
          // implicitly delete `taker`
824
3
        },
825
        CallbackFlags::kUnrefed);
826
827
    // Now, the lambda is delivered to the main thread, as a result, the
828
    // WorkerHeapSnapshotTaker object is delivered to the main thread, too.
829
3
  });
830
831
3
  if (scheduled) {
832
6
    args.GetReturnValue().Set(wrap);
833
  } else {
834
    args.GetReturnValue().Set(Local<Object>());
835
  }
836
}
837
838
7
void Worker::LoopIdleTime(const FunctionCallbackInfo<Value>& args) {
839
  Worker* w;
840
7
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
841
842
7
  Mutex::ScopedLock lock(w->mutex_);
843
  // Using w->is_stopped() here leads to a deadlock, and checking is_stopped()
844
  // before locking the mutex is a race condition. So manually do the same
845
  // check.
846

7
  if (w->stopped_ || w->env_ == nullptr)
847
    return args.GetReturnValue().Set(-1);
848
849
7
  uint64_t idle_time = uv_metrics_idle_time(w->env_->event_loop());
850
14
  args.GetReturnValue().Set(1.0 * idle_time / 1e6);
851
}
852
853
1
void Worker::LoopStartTime(const FunctionCallbackInfo<Value>& args) {
854
  Worker* w;
855
1
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
856
857
1
  Mutex::ScopedLock lock(w->mutex_);
858
  // Using w->is_stopped() here leads to a deadlock, and checking is_stopped()
859
  // before locking the mutex is a race condition. So manually do the same
860
  // check.
861

1
  if (w->stopped_ || w->env_ == nullptr)
862
    return args.GetReturnValue().Set(-1);
863
864
1
  double loop_start_time = w->env_->performance_state()->milestones[
865
1
      node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START];
866
1
  CHECK_GE(loop_start_time, 0);
867
2
  args.GetReturnValue().Set(loop_start_time / 1e6);
868
}
869
870
namespace {
871
872
// Return the MessagePort that is global for this Environment and communicates
873
// with the internal [kPort] port of the JS Worker class in the parent thread.
874
1454
void GetEnvMessagePort(const FunctionCallbackInfo<Value>& args) {
875
1454
  Environment* env = Environment::GetCurrent(args);
876
1454
  Local<Object> port = env->message_port();
877

2908
  CHECK_IMPLIES(!env->is_main_thread(), !port.IsEmpty());
878
1454
  if (!port.IsEmpty()) {
879
4362
    CHECK_EQ(port->GetCreationContext().ToLocalChecked()->GetIsolate(),
880
             args.GetIsolate());
881
2908
    args.GetReturnValue().Set(port);
882
  }
883
1454
}
884
885
792
void InitWorker(Local<Object> target,
886
                Local<Value> unused,
887
                Local<Context> context,
888
                void* priv) {
889
792
  Environment* env = Environment::GetCurrent(context);
890
792
  Isolate* isolate = env->isolate();
891
892
  {
893
792
    Local<FunctionTemplate> w = NewFunctionTemplate(isolate, Worker::New);
894
895
1584
    w->InstanceTemplate()->SetInternalFieldCount(
896
        Worker::kInternalFieldCount);
897
792
    w->Inherit(AsyncWrap::GetConstructorTemplate(env));
898
899
792
    SetProtoMethod(isolate, w, "startThread", Worker::StartThread);
900
792
    SetProtoMethod(isolate, w, "stopThread", Worker::StopThread);
901
792
    SetProtoMethod(isolate, w, "hasRef", Worker::HasRef);
902
792
    SetProtoMethod(isolate, w, "ref", Worker::Ref);
903
792
    SetProtoMethod(isolate, w, "unref", Worker::Unref);
904
792
    SetProtoMethod(isolate, w, "getResourceLimits", Worker::GetResourceLimits);
905
792
    SetProtoMethod(isolate, w, "takeHeapSnapshot", Worker::TakeHeapSnapshot);
906
792
    SetProtoMethod(isolate, w, "loopIdleTime", Worker::LoopIdleTime);
907
792
    SetProtoMethod(isolate, w, "loopStartTime", Worker::LoopStartTime);
908
909
792
    SetConstructorFunction(context, target, "Worker", w);
910
  }
911
912
  {
913
792
    Local<FunctionTemplate> wst = NewFunctionTemplate(isolate, nullptr);
914
915
1584
    wst->InstanceTemplate()->SetInternalFieldCount(
916
        WorkerHeapSnapshotTaker::kInternalFieldCount);
917
792
    wst->Inherit(AsyncWrap::GetConstructorTemplate(env));
918
919
    Local<String> wst_string =
920
792
        FIXED_ONE_BYTE_STRING(isolate, "WorkerHeapSnapshotTaker");
921
792
    wst->SetClassName(wst_string);
922
792
    env->set_worker_heap_snapshot_taker_template(wst->InstanceTemplate());
923
  }
924
925
792
  SetMethod(context, target, "getEnvMessagePort", GetEnvMessagePort);
926
927
  target
928
792
      ->Set(env->context(),
929
            env->thread_id_string(),
930
3168
            Number::New(isolate, static_cast<double>(env->thread_id())))
931
      .Check();
932
933
  target
934
792
      ->Set(env->context(),
935
            FIXED_ONE_BYTE_STRING(isolate, "isMainThread"),
936
3168
            Boolean::New(isolate, env->is_main_thread()))
937
      .Check();
938
939
  target
940
792
      ->Set(env->context(),
941
            FIXED_ONE_BYTE_STRING(isolate, "ownsProcessState"),
942
2376
            Boolean::New(isolate, env->owns_process_state()))
943
      .Check();
944
945
792
  if (!env->is_main_thread()) {
946
    target
947
731
        ->Set(env->context(),
948
              FIXED_ONE_BYTE_STRING(isolate, "resourceLimits"),
949
2924
              env->worker_context()->GetResourceLimits(isolate))
950
        .Check();
951
  }
952
953
2376
  NODE_DEFINE_CONSTANT(target, kMaxYoungGenerationSizeMb);
954
2376
  NODE_DEFINE_CONSTANT(target, kMaxOldGenerationSizeMb);
955
2376
  NODE_DEFINE_CONSTANT(target, kCodeRangeSizeMb);
956
2376
  NODE_DEFINE_CONSTANT(target, kStackSizeMb);
957
1584
  NODE_DEFINE_CONSTANT(target, kTotalResourceLimitCount);
958
792
}
959
960
5601
void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
961
5601
  registry->Register(GetEnvMessagePort);
962
5601
  registry->Register(Worker::New);
963
5601
  registry->Register(Worker::StartThread);
964
5601
  registry->Register(Worker::StopThread);
965
5601
  registry->Register(Worker::HasRef);
966
5601
  registry->Register(Worker::Ref);
967
5601
  registry->Register(Worker::Unref);
968
5601
  registry->Register(Worker::GetResourceLimits);
969
5601
  registry->Register(Worker::TakeHeapSnapshot);
970
5601
  registry->Register(Worker::LoopIdleTime);
971
5601
  registry->Register(Worker::LoopStartTime);
972
5601
}
973
974
}  // anonymous namespace
975
}  // namespace worker
976
}  // namespace node
977
978
5672
NODE_MODULE_CONTEXT_AWARE_INTERNAL(worker, node::worker::InitWorker)
979
5601
NODE_MODULE_EXTERNAL_REFERENCE(worker, node::worker::RegisterExternalReferences)