GCC Code Coverage Report
Directory: ./ Exec Total Coverage
File: node_worker.cc Lines: 448 476 94.1 %
Date: 2022-04-22 04:19:20 Branches: 169 230 73.5 %

Line Branch Exec Source
1
#include "node_worker.h"
2
#include "debug_utils-inl.h"
3
#include "histogram-inl.h"
4
#include "memory_tracker-inl.h"
5
#include "node_errors.h"
6
#include "node_external_reference.h"
7
#include "node_buffer.h"
8
#include "node_options-inl.h"
9
#include "node_perf.h"
10
#include "node_snapshot_builder.h"
11
#include "util-inl.h"
12
#include "async_wrap-inl.h"
13
14
#include <memory>
15
#include <string>
16
#include <vector>
17
18
using node::kAllowedInEnvironment;
19
using node::kDisallowedInEnvironment;
20
using v8::Array;
21
using v8::ArrayBuffer;
22
using v8::Boolean;
23
using v8::Context;
24
using v8::Float64Array;
25
using v8::FunctionCallbackInfo;
26
using v8::FunctionTemplate;
27
using v8::HandleScope;
28
using v8::Integer;
29
using v8::Isolate;
30
using v8::Local;
31
using v8::Locker;
32
using v8::Maybe;
33
using v8::MaybeLocal;
34
using v8::Null;
35
using v8::Number;
36
using v8::Object;
37
using v8::ResourceConstraints;
38
using v8::SealHandleScope;
39
using v8::String;
40
using v8::TryCatch;
41
using v8::Value;
42
43
namespace node {
44
namespace worker {
45
46
constexpr double kMB = 1024 * 1024;
47
48
815
Worker::Worker(Environment* env,
49
               Local<Object> wrap,
50
               const std::string& url,
51
               std::shared_ptr<PerIsolateOptions> per_isolate_opts,
52
               std::vector<std::string>&& exec_argv,
53
815
               std::shared_ptr<KVStore> env_vars)
54
    : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER),
55
      per_isolate_opts_(per_isolate_opts),
56
      exec_argv_(exec_argv),
57
815
      platform_(env->isolate_data()->platform()),
58
815
      thread_id_(AllocateEnvironmentThreadId()),
59
1630
      env_vars_(env_vars) {
60
815
  Debug(this, "Creating new worker instance with thread id %llu",
61
815
        thread_id_.id);
62
63
  // Set up everything that needs to be set up in the parent environment.
64
815
  parent_port_ = MessagePort::New(env, env->context());
65
815
  if (parent_port_ == nullptr) {
66
    // This can happen e.g. because execution is terminating.
67
    return;
68
  }
69
70
815
  child_port_data_ = std::make_unique<MessagePortData>(nullptr);
71
815
  MessagePort::Entangle(parent_port_, child_port_data_.get());
72
73
815
  object()->Set(env->context(),
74
                env->message_port_string(),
75
2445
                parent_port_->object()).Check();
76
77
815
  object()->Set(env->context(),
78
                env->thread_id_string(),
79
2445
                Number::New(env->isolate(), static_cast<double>(thread_id_.id)))
80
      .Check();
81
82
1630
  inspector_parent_handle_ = GetInspectorParentHandle(
83
815
      env, thread_id_, url.c_str());
84
85
1630
  argv_ = std::vector<std::string>{env->argv()[0]};
86
  // Mark this Worker object as weak until we actually start the thread.
87
815
  MakeWeak();
88
89
815
  Debug(this, "Preparation for worker %llu finished", thread_id_.id);
90
}
91
92
2321
bool Worker::is_stopped() const {
93
4642
  Mutex::ScopedLock lock(mutex_);
94
2321
  if (env_ != nullptr)
95
575
    return env_->is_stopping();
96
1746
  return stopped_;
97
}
98
99
592
void Worker::UpdateResourceConstraints(ResourceConstraints* constraints) {
100
592
  constraints->set_stack_limit(reinterpret_cast<uint32_t*>(stack_base_));
101
102
592
  if (resource_limits_[kMaxYoungGenerationSizeMb] > 0) {
103
1
    constraints->set_max_young_generation_size_in_bytes(
104
1
        static_cast<size_t>(resource_limits_[kMaxYoungGenerationSizeMb] * kMB));
105
  } else {
106
591
    resource_limits_[kMaxYoungGenerationSizeMb] =
107
591
        constraints->max_young_generation_size_in_bytes() / kMB;
108
  }
109
110
592
  if (resource_limits_[kMaxOldGenerationSizeMb] > 0) {
111
3
    constraints->set_max_old_generation_size_in_bytes(
112
3
        static_cast<size_t>(resource_limits_[kMaxOldGenerationSizeMb] * kMB));
113
  } else {
114
589
    resource_limits_[kMaxOldGenerationSizeMb] =
115
589
        constraints->max_old_generation_size_in_bytes() / kMB;
116
  }
117
118
592
  if (resource_limits_[kCodeRangeSizeMb] > 0) {
119
1
    constraints->set_code_range_size_in_bytes(
120
1
        static_cast<size_t>(resource_limits_[kCodeRangeSizeMb] * kMB));
121
  } else {
122
591
    resource_limits_[kCodeRangeSizeMb] =
123
591
        constraints->code_range_size_in_bytes() / kMB;
124
  }
125
592
}
126
127
// This class contains data that is only relevant to the child thread itself,
128
// and only while it is running.
129
// (Eventually, the Environment instance should probably also be moved here.)
130
class WorkerThreadData {
131
 public:
132
812
  explicit WorkerThreadData(Worker* w)
133
812
    : w_(w) {
134
812
    int ret = uv_loop_init(&loop_);
135
812
    if (ret != 0) {
136
      char err_buf[128];
137
220
      uv_err_name_r(ret, err_buf, sizeof(err_buf));
138
220
      w->Exit(1, "ERR_WORKER_INIT_FAILED", err_buf);
139
220
      return;
140
    }
141
592
    loop_init_failed_ = false;
142
592
    uv_loop_configure(&loop_, UV_METRICS_IDLE_TIME);
143
144
    std::shared_ptr<ArrayBufferAllocator> allocator =
145
592
        ArrayBufferAllocator::Create();
146
592
    Isolate::CreateParams params;
147
592
    SetIsolateCreateParamsForNode(&params);
148
592
    params.array_buffer_allocator_shared = allocator;
149
150
592
    bool use_node_snapshot = true;
151
592
    if (w_->per_isolate_opts_) {
152
265
      use_node_snapshot = w_->per_isolate_opts_->node_snapshot;
153
    } else {
154
      // IsolateData is created after the Isolate is created so we'll
155
      // inherit the option from the parent here.
156
327
      use_node_snapshot = per_process::cli_options->per_isolate->node_snapshot;
157
    }
158
    const SnapshotData* snapshot_data =
159
592
        use_node_snapshot ? SnapshotBuilder::GetEmbeddedSnapshotData()
160
592
                          : nullptr;
161
592
    if (snapshot_data != nullptr) {
162
591
      SnapshotBuilder::InitializeIsolateParams(snapshot_data, &params);
163
    }
164
592
    w->UpdateResourceConstraints(&params.constraints);
165
166
592
    Isolate* isolate = Isolate::Allocate();
167
592
    if (isolate == nullptr) {
168
      w->Exit(1, "ERR_WORKER_INIT_FAILED", "Failed to create new Isolate");
169
      return;
170
    }
171
172
592
    w->platform_->RegisterIsolate(isolate, &loop_);
173
592
    Isolate::Initialize(isolate, params);
174
592
    SetIsolateUpForNode(isolate);
175
176
    // Be sure it's called before Environment::InitializeDiagnostics()
177
    // so that this callback stays when the callback of
178
    // --heapsnapshot-near-heap-limit gets is popped.
179
592
    isolate->AddNearHeapLimitCallback(Worker::NearHeapLimit, w);
180
181
    {
182
592
      Locker locker(isolate);
183
1184
      Isolate::Scope isolate_scope(isolate);
184
      // V8 computes its stack limit the first time a `Locker` is used based on
185
      // --stack-size. Reset it to the correct value.
186
592
      isolate->SetStackLimit(w->stack_base_);
187
188
592
      HandleScope handle_scope(isolate);
189
1184
      isolate_data_.reset(CreateIsolateData(isolate,
190
                                            &loop_,
191
592
                                            w_->platform_,
192
                                            allocator.get()));
193
592
      CHECK(isolate_data_);
194
592
      if (w_->per_isolate_opts_)
195
265
        isolate_data_->set_options(std::move(w_->per_isolate_opts_));
196
592
      isolate_data_->set_worker_context(w_);
197
592
      isolate_data_->max_young_gen_size =
198
592
          params.constraints.max_young_generation_size_in_bytes();
199
    }
200
201
592
    Mutex::ScopedLock lock(w_->mutex_);
202
592
    w_->isolate_ = isolate;
203
  }
204
205
812
  ~WorkerThreadData() {
206
812
    Debug(w_, "Worker %llu dispose isolate", w_->thread_id_.id);
207
    Isolate* isolate;
208
    {
209
812
      Mutex::ScopedLock lock(w_->mutex_);
210
812
      isolate = w_->isolate_;
211
812
      w_->isolate_ = nullptr;
212
    }
213
214
812
    if (isolate != nullptr) {
215
592
      CHECK(!loop_init_failed_);
216
592
      bool platform_finished = false;
217
218
592
      isolate_data_.reset();
219
220
592
      w_->platform_->AddIsolateFinishedCallback(isolate, [](void* data) {
221
592
        *static_cast<bool*>(data) = true;
222
592
      }, &platform_finished);
223
224
      // The order of these calls is important; if the Isolate is first disposed
225
      // and then unregistered, there is a race condition window in which no
226
      // new Isolate at the same address can successfully be registered with
227
      // the platform.
228
      // (Refs: https://github.com/nodejs/node/issues/30846)
229
592
      w_->platform_->UnregisterIsolate(isolate);
230
592
      isolate->Dispose();
231
232
      // Wait until the platform has cleaned up all relevant resources.
233
1184
      while (!platform_finished) {
234
592
        uv_run(&loop_, UV_RUN_ONCE);
235
      }
236
    }
237
812
    if (!loop_init_failed_) {
238
592
      CheckedUvLoopClose(&loop_);
239
    }
240
812
  }
241
242
592
  bool loop_is_usable() const { return !loop_init_failed_; }
243
244
 private:
245
  Worker* const w_;
246
  uv_loop_t loop_;
247
  bool loop_init_failed_ = true;
248
  DeleteFnPtr<IsolateData, FreeIsolateData> isolate_data_;
249
250
  friend class Worker;
251
};
252
253
3
size_t Worker::NearHeapLimit(void* data, size_t current_heap_limit,
254
                             size_t initial_heap_limit) {
255
3
  Worker* worker = static_cast<Worker*>(data);
256
3
  worker->Exit(1, "ERR_WORKER_OUT_OF_MEMORY", "JS heap out of memory");
257
  // Give the current GC some extra leeway to let it finish rather than
258
  // crash hard. We are not going to perform further allocations anyway.
259
3
  constexpr size_t kExtraHeapAllowance = 16 * 1024 * 1024;
260
3
  return current_heap_limit + kExtraHeapAllowance;
261
}
262
263
812
void Worker::Run() {
264
812
  std::string name = "WorkerThread ";
265
812
  name += std::to_string(thread_id_.id);
266

980
  TRACE_EVENT_METADATA1(
267
      "__metadata", "thread_name", "name",
268
      TRACE_STR_COPY(name.c_str()));
269
812
  CHECK_NOT_NULL(platform_);
270
271
812
  Debug(this, "Creating isolate for worker with id %llu", thread_id_.id);
272
273
812
  WorkerThreadData data(this);
274
812
  if (isolate_ == nullptr) return;
275
592
  CHECK(data.loop_is_usable());
276
277
592
  Debug(this, "Starting worker with id %llu", thread_id_.id);
278
  {
279
592
    Locker locker(isolate_);
280
592
    Isolate::Scope isolate_scope(isolate_);
281
592
    SealHandleScope outer_seal(isolate_);
282
283
592
    DeleteFnPtr<Environment, FreeEnvironment> env_;
284
592
    auto cleanup_env = OnScopeLeave([&]() {
285
      // TODO(addaleax): This call is harmless but should not be necessary.
286
      // Figure out why V8 is raising a DCHECK() here without it
287
      // (in test/parallel/test-async-hooks-worker-asyncfn-terminate-4.js).
288
592
      isolate_->CancelTerminateExecution();
289
290
592
      if (!env_) return;
291
577
      env_->set_can_call_into_js(false);
292
293
      {
294
577
        Mutex::ScopedLock lock(mutex_);
295
577
        stopped_ = true;
296
577
        this->env_ = nullptr;
297
      }
298
299
577
      env_.reset();
300
592
    });
301
302
592
    if (is_stopped()) return;
303
    {
304
581
      HandleScope handle_scope(isolate_);
305
      Local<Context> context;
306
      {
307
        // We create the Context object before we have an Environment* in place
308
        // that we could use for error handling. If creation fails due to
309
        // resource constraints, we need something in place to handle it,
310
        // though.
311
581
        TryCatch try_catch(isolate_);
312
581
        context = NewContext(isolate_);
313
581
        if (context.IsEmpty()) {
314
4
          Exit(1, "ERR_WORKER_INIT_FAILED", "Failed to create new Context");
315
4
          return;
316
        }
317
      }
318
319
577
      if (is_stopped()) return;
320
577
      CHECK(!context.IsEmpty());
321
577
      Context::Scope context_scope(context);
322
      {
323
1154
        env_.reset(CreateEnvironment(
324
            data.isolate_data_.get(),
325
            context,
326
577
            std::move(argv_),
327
577
            std::move(exec_argv_),
328
577
            static_cast<EnvironmentFlags::Flags>(environment_flags_),
329
            thread_id_,
330
577
            std::move(inspector_parent_handle_)));
331
577
        if (is_stopped()) return;
332
575
        CHECK_NOT_NULL(env_);
333
575
        env_->set_env_vars(std::move(env_vars_));
334
575
        SetProcessExitHandler(env_.get(), [this](Environment*, int exit_code) {
335
71
          Exit(exit_code);
336
71
        });
337
      }
338
      {
339
575
        Mutex::ScopedLock lock(mutex_);
340
575
        if (stopped_) return;
341
575
        this->env_ = env_.get();
342
      }
343
575
      Debug(this, "Created Environment for worker with id %llu", thread_id_.id);
344
575
      if (is_stopped()) return;
345
      {
346
575
        if (!CreateEnvMessagePort(env_.get())) {
347
          return;
348
        }
349
350
575
        Debug(this, "Created message port for worker %llu", thread_id_.id);
351
1150
        if (LoadEnvironment(env_.get(), StartExecutionCallback{}).IsEmpty())
352
          return;
353
354
575
        Debug(this, "Loaded environment for worker %llu", thread_id_.id);
355
      }
356
    }
357
358
    {
359
575
      Maybe<int> exit_code = SpinEventLoop(env_.get());
360
1150
      Mutex::ScopedLock lock(mutex_);
361

1004
      if (exit_code_ == 0 && exit_code.IsJust()) {
362
410
        exit_code_ = exit_code.FromJust();
363
      }
364
365
575
      Debug(this, "Exiting thread for worker %llu with exit code %d",
366
575
            thread_id_.id, exit_code_);
367
    }
368
  }
369
370
575
  Debug(this, "Worker %llu thread stops", thread_id_.id);
371
}
372
373
575
bool Worker::CreateEnvMessagePort(Environment* env) {
374
1150
  HandleScope handle_scope(isolate_);
375
1150
  std::unique_ptr<MessagePortData> data;
376
  {
377
1150
    Mutex::ScopedLock lock(mutex_);
378
575
    data = std::move(child_port_data_);
379
  }
380
381
  // Set up the message channel for receiving messages in the child.
382
1150
  MessagePort* child_port = MessagePort::New(env,
383
                                             env->context(),
384
575
                                             std::move(data));
385
  // MessagePort::New() may return nullptr if execution is terminated
386
  // within it.
387
575
  if (child_port != nullptr)
388
575
    env->set_message_port(child_port->object(isolate_));
389
390
575
  return child_port;
391
}
392
393
816
void Worker::JoinThread() {
394
816
  if (!tid_.has_value())
395
4
    return;
396
812
  CHECK_EQ(uv_thread_join(&tid_.value()), 0);
397
812
  tid_.reset();
398
399
812
  env()->remove_sub_worker_context(this);
400
401
  {
402
1622
    HandleScope handle_scope(env()->isolate());
403
812
    Context::Scope context_scope(env()->context());
404
405
    // Reset the parent port as we're closing it now anyway.
406
812
    object()->Set(env()->context(),
407
                  env()->message_port_string(),
408
2436
                  Undefined(env()->isolate())).Check();
409
410
    Local<Value> args[] = {
411
        Integer::New(env()->isolate(), exit_code_),
412
812
        custom_error_ != nullptr
413
454
            ? OneByteString(env()->isolate(), custom_error_).As<Value>()
414
1755
            : Null(env()->isolate()).As<Value>(),
415
1624
        !custom_error_str_.empty()
416
227
            ? OneByteString(env()->isolate(), custom_error_str_.c_str())
417
227
                  .As<Value>()
418
1755
            : Null(env()->isolate()).As<Value>(),
419

2436
    };
420
421
812
    MakeCallback(env()->onexit_string(), arraysize(args), args);
422
  }
423
424
  // If we get here, the tid_.has_value() condition at the top of the function
425
  // implies that the thread was running. In that case, its final action will
426
  // be to schedule a callback on the parent thread which will delete this
427
  // object, so there's nothing more to do here.
428
}
429
430
3164
Worker::~Worker() {
431
3164
  Mutex::ScopedLock lock(mutex_);
432
433
1582
  CHECK(stopped_);
434
1582
  CHECK_NULL(env_);
435
1582
  CHECK(!tid_.has_value());
436
437
1582
  Debug(this, "Worker %llu destroyed", thread_id_.id);
438
3164
}
439
440
819
void Worker::New(const FunctionCallbackInfo<Value>& args) {
441
819
  Environment* env = Environment::GetCurrent(args);
442
819
  Isolate* isolate = args.GetIsolate();
443
444
819
  CHECK(args.IsConstructCall());
445
446
819
  if (env->isolate_data()->platform() == nullptr) {
447
    THROW_ERR_MISSING_PLATFORM_FOR_WORKER(env);
448
4
    return;
449
  }
450
451
819
  std::string url;
452
819
  std::shared_ptr<PerIsolateOptions> per_isolate_opts = nullptr;
453
819
  std::shared_ptr<KVStore> env_vars = nullptr;
454
455
819
  std::vector<std::string> exec_argv_out;
456
457
  // Argument might be a string or URL
458
1638
  if (!args[0]->IsNullOrUndefined()) {
459
    Utf8Value value(
460
1023
        isolate, args[0]->ToString(env->context()).FromMaybe(Local<String>()));
461
341
    url.append(value.out(), value.length());
462
  }
463
464
1638
  if (args[1]->IsNull()) {
465
    // Means worker.env = { ...process.env }.
466
792
    env_vars = env->env_vars()->Clone(isolate);
467
27
  } else if (args[1]->IsObject()) {
468
    // User provided env.
469
26
    env_vars = KVStore::CreateMapKVStore();
470
26
    env_vars->AssignFromObject(isolate->GetCurrentContext(),
471
78
                               args[1].As<Object>());
472
  } else {
473
    // Env is shared.
474
1
    env_vars = env->env_vars();
475
  }
476
477

1612
  if (args[1]->IsObject() || args[2]->IsArray()) {
478
269
    per_isolate_opts.reset(new PerIsolateOptions());
479
480
269
    HandleEnvOptions(per_isolate_opts->per_env, [&env_vars](const char* name) {
481
2152
      return env_vars->Get(name).FromMaybe("");
482
    });
483
484
#ifndef NODE_WITHOUT_NODE_OPTIONS
485
    MaybeLocal<String> maybe_node_opts =
486
269
        env_vars->Get(isolate, OneByteString(isolate, "NODE_OPTIONS"));
487
    Local<String> node_opts;
488
269
    if (maybe_node_opts.ToLocal(&node_opts)) {
489
804
      std::string node_options(*String::Utf8Value(isolate, node_opts));
490
268
      std::vector<std::string> errors{};
491
      std::vector<std::string> env_argv =
492
268
          ParseNodeOptionsEnvVar(node_options, &errors);
493
      // [0] is expected to be the program name, add dummy string.
494
268
      env_argv.insert(env_argv.begin(), "");
495
268
      std::vector<std::string> invalid_args{};
496
268
      options_parser::Parse(&env_argv,
497
                            nullptr,
498
                            &invalid_args,
499
                            per_isolate_opts.get(),
500
                            kAllowedInEnvironment,
501
                            &errors);
502

269
      if (!errors.empty() && args[1]->IsObject()) {
503
        // Only fail for explicitly provided env, this protects from failures
504
        // when NODE_OPTIONS from parent's env is used (which is the default).
505
        Local<Value> error;
506
2
        if (!ToV8Value(env->context(), errors).ToLocal(&error)) return;
507
        Local<String> key =
508
1
            FIXED_ONE_BYTE_STRING(env->isolate(), "invalidNodeOptions");
509
        // Ignore the return value of Set() because exceptions bubble up to JS
510
        // when we return anyway.
511
1
        USE(args.This()->Set(env->context(), key, error));
512
1
        return;
513
      }
514
    }
515
#endif  // NODE_WITHOUT_NODE_OPTIONS
516
  }
517
518
818
  if (args[2]->IsArray()) {
519
266
    Local<Array> array = args[2].As<Array>();
520
    // The first argument is reserved for program name, but we don't need it
521
    // in workers.
522
1064
    std::vector<std::string> exec_argv = {""};
523
266
    uint32_t length = array->Length();
524
387
    for (uint32_t i = 0; i < length; i++) {
525
      Local<Value> arg;
526
242
      if (!array->Get(env->context(), i).ToLocal(&arg)) {
527
        return;
528
      }
529
      Local<String> arg_v8;
530
242
      if (!arg->ToString(env->context()).ToLocal(&arg_v8)) {
531
        return;
532
      }
533
242
      Utf8Value arg_utf8_value(args.GetIsolate(), arg_v8);
534
242
      std::string arg_string(arg_utf8_value.out(), arg_utf8_value.length());
535
121
      exec_argv.push_back(arg_string);
536
    }
537
538
266
    std::vector<std::string> invalid_args{};
539
266
    std::vector<std::string> errors{};
540
    // Using invalid_args as the v8_args argument as it stores unknown
541
    // options for the per isolate parser.
542
266
    options_parser::Parse(
543
        &exec_argv,
544
        &exec_argv_out,
545
        &invalid_args,
546
        per_isolate_opts.get(),
547
        kDisallowedInEnvironment,
548
        &errors);
549
550
    // The first argument is program name.
551
266
    invalid_args.erase(invalid_args.begin());
552

266
    if (errors.size() > 0 || invalid_args.size() > 0) {
553
      Local<Value> error;
554
3
      if (!ToV8Value(env->context(),
555
3
                     errors.size() > 0 ? errors : invalid_args)
556
3
                         .ToLocal(&error)) {
557
        return;
558
      }
559
      Local<String> key =
560
3
          FIXED_ONE_BYTE_STRING(env->isolate(), "invalidExecArgv");
561
      // Ignore the return value of Set() because exceptions bubble up to JS
562
      // when we return anyway.
563
3
      USE(args.This()->Set(env->context(), key, error));
564
3
      return;
565
    }
566
  } else {
567
552
    exec_argv_out = env->exec_argv();
568
  }
569
570
  Worker* worker = new Worker(env,
571
1630
                              args.This(),
572
                              url,
573
                              per_isolate_opts,
574
815
                              std::move(exec_argv_out),
575
815
                              env_vars);
576
577
815
  CHECK(args[3]->IsFloat64Array());
578
1630
  Local<Float64Array> limit_info = args[3].As<Float64Array>();
579
815
  CHECK_EQ(limit_info->Length(), kTotalResourceLimitCount);
580
815
  limit_info->CopyContents(worker->resource_limits_,
581
                           sizeof(worker->resource_limits_));
582
583
815
  CHECK(args[4]->IsBoolean());
584

815
  if (args[4]->IsTrue() || env->tracks_unmanaged_fds())
585
814
    worker->environment_flags_ |= EnvironmentFlags::kTrackUnmanagedFds;
586
815
  if (env->hide_console_windows())
587
    worker->environment_flags_ |= EnvironmentFlags::kHideConsoleWindows;
588
815
  if (env->no_native_addons())
589
4
    worker->environment_flags_ |= EnvironmentFlags::kNoNativeAddons;
590
815
  if (env->no_global_search_paths())
591
    worker->environment_flags_ |= EnvironmentFlags::kNoGlobalSearchPaths;
592
815
  if (env->no_browser_globals())
593
    worker->environment_flags_ |= EnvironmentFlags::kNoBrowserGlobals;
594
}
595
596
812
void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
597
  Worker* w;
598
812
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
599
1624
  Mutex::ScopedLock lock(w->mutex_);
600
601
812
  w->stopped_ = false;
602
603
812
  if (w->resource_limits_[kStackSizeMb] > 0) {
604
9
    if (w->resource_limits_[kStackSizeMb] * kMB < kStackBufferSize) {
605
3
      w->resource_limits_[kStackSizeMb] = kStackBufferSize / kMB;
606
3
      w->stack_size_ = kStackBufferSize;
607
    } else {
608
12
      w->stack_size_ =
609
6
          static_cast<size_t>(w->resource_limits_[kStackSizeMb] * kMB);
610
    }
611
  } else {
612
803
    w->resource_limits_[kStackSizeMb] = w->stack_size_ / kMB;
613
  }
614
615
  uv_thread_options_t thread_options;
616
812
  thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
617
812
  thread_options.stack_size = w->stack_size_;
618
619
812
  uv_thread_t* tid = &w->tid_.emplace();  // Create uv_thread_t instance
620
812
  int ret = uv_thread_create_ex(tid, &thread_options, [](void* arg) {
621
    // XXX: This could become a std::unique_ptr, but that makes at least
622
    // gcc 6.3 detect undefined behaviour when there shouldn't be any.
623
    // gcc 7+ handles this well.
624
812
    Worker* w = static_cast<Worker*>(arg);
625
812
    const uintptr_t stack_top = reinterpret_cast<uintptr_t>(&arg);
626
627
    // Leave a few kilobytes just to make sure we're within limits and have
628
    // some space to do work in C++ land.
629
812
    w->stack_base_ = stack_top - (w->stack_size_ - kStackBufferSize);
630
631
812
    w->Run();
632
633
812
    Mutex::ScopedLock lock(w->mutex_);
634
812
    w->env()->SetImmediateThreadsafe(
635
790
        [w = std::unique_ptr<Worker>(w)](Environment* env) {
636
790
          if (w->has_ref_)
637
788
            env->add_refs(-1);
638
790
          w->JoinThread();
639
          // implicitly delete w
640
788
        });
641
812
  }, static_cast<void*>(w));
642
643
812
  if (ret == 0) {
644
    // The object now owns the created thread and should not be garbage
645
    // collected until that finishes.
646
812
    w->ClearWeak();
647
648
812
    if (w->has_ref_)
649
812
      w->env()->add_refs(1);
650
651
812
    w->env()->add_sub_worker_context(w);
652
  } else {
653
    w->stopped_ = true;
654
    w->tid_.reset();
655
656
    char err_buf[128];
657
    uv_err_name_r(ret, err_buf, sizeof(err_buf));
658
    {
659
      Isolate* isolate = w->env()->isolate();
660
      HandleScope handle_scope(isolate);
661
      THROW_ERR_WORKER_INIT_FAILED(isolate, err_buf);
662
    }
663
  }
664
}
665
666
236
void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
667
  Worker* w;
668
236
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
669
670
236
  Debug(w, "Worker %llu is getting stopped by parent", w->thread_id_.id);
671
236
  w->Exit(1);
672
}
673
674
239
void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
675
  Worker* w;
676
239
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
677

239
  if (!w->has_ref_ && w->tid_.has_value()) {
678
4
    w->has_ref_ = true;
679
4
    w->env()->add_refs(1);
680
  }
681
}
682
683
5
void Worker::HasRef(const FunctionCallbackInfo<Value>& args) {
684
  Worker* w;
685
5
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
686
8
  args.GetReturnValue().Set(w->has_ref_);
687
}
688
689
6
void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
690
  Worker* w;
691
6
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
692

6
  if (w->has_ref_ && w->tid_.has_value()) {
693
6
    w->has_ref_ = false;
694
6
    w->env()->add_refs(-1);
695
  }
696
}
697
698
1
void Worker::GetResourceLimits(const FunctionCallbackInfo<Value>& args) {
699
  Worker* w;
700
1
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
701
2
  args.GetReturnValue().Set(w->GetResourceLimits(args.GetIsolate()));
702
}
703
704
578
Local<Float64Array> Worker::GetResourceLimits(Isolate* isolate) const {
705
578
  Local<ArrayBuffer> ab = ArrayBuffer::New(isolate, sizeof(resource_limits_));
706
707
578
  memcpy(ab->GetBackingStore()->Data(),
708
578
         resource_limits_,
709
         sizeof(resource_limits_));
710
578
  return Float64Array::New(ab, 0, kTotalResourceLimitCount);
711
}
712
713
560
void Worker::Exit(int code, const char* error_code, const char* error_message) {
714
1120
  Mutex::ScopedLock lock(mutex_);
715
560
  Debug(this, "Worker %llu called Exit(%d, %s, %s)",
716
560
        thread_id_.id, code, error_code, error_message);
717
718
560
  if (error_code != nullptr) {
719
227
    custom_error_ = error_code;
720
227
    custom_error_str_ = error_message;
721
  }
722
723
560
  if (env_ != nullptr) {
724
165
    exit_code_ = code;
725
165
    Stop(env_);
726
  } else {
727
395
    stopped_ = true;
728
  }
729
560
}
730
731
void Worker::MemoryInfo(MemoryTracker* tracker) const {
732
  tracker->TrackField("parent_port", parent_port_);
733
}
734
735
bool Worker::IsNotIndicativeOfMemoryLeakAtExit() const {
736
  // Worker objects always stay alive as long as the child thread, regardless
737
  // of whether they are being referenced in the parent thread.
738
  return true;
739
}
740
741
class WorkerHeapSnapshotTaker : public AsyncWrap {
742
 public:
743
1
  WorkerHeapSnapshotTaker(Environment* env, Local<Object> obj)
744
1
    : AsyncWrap(env, obj, AsyncWrap::PROVIDER_WORKERHEAPSNAPSHOT) {}
745
746
  SET_NO_MEMORY_INFO()
747
  SET_MEMORY_INFO_NAME(WorkerHeapSnapshotTaker)
748
  SET_SELF_SIZE(WorkerHeapSnapshotTaker)
749
};
750
751
1
void Worker::TakeHeapSnapshot(const FunctionCallbackInfo<Value>& args) {
752
  Worker* w;
753
1
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
754
755
1
  Debug(w, "Worker %llu taking heap snapshot", w->thread_id_.id);
756
757
1
  Environment* env = w->env();
758
1
  AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
759
  Local<Object> wrap;
760
1
  if (!env->worker_heap_snapshot_taker_template()
761
2
      ->NewInstance(env->context()).ToLocal(&wrap)) {
762
    return;
763
  }
764
  BaseObjectPtr<WorkerHeapSnapshotTaker> taker =
765
1
      MakeDetachedBaseObject<WorkerHeapSnapshotTaker>(env, wrap);
766
767
  // Interrupt the worker thread and take a snapshot, then schedule a call
768
  // on the parent thread that turns that snapshot into a readable stream.
769
1
  bool scheduled = w->RequestInterrupt([taker, env](Environment* worker_env) {
770
    heap::HeapSnapshotPointer snapshot {
771
1
        worker_env->isolate()->GetHeapProfiler()->TakeHeapSnapshot() };
772
1
    CHECK(snapshot);
773
2
    env->SetImmediateThreadsafe(
774
1
        [taker, snapshot = std::move(snapshot)](Environment* env) mutable {
775
2
          HandleScope handle_scope(env->isolate());
776
1
          Context::Scope context_scope(env->context());
777
778
2
          AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker.get());
779
          BaseObjectPtr<AsyncWrap> stream = heap::CreateHeapSnapshotStream(
780
2
              env, std::move(snapshot));
781
1
          Local<Value> args[] = { stream->object() };
782
1
          taker->MakeCallback(env->ondone_string(), arraysize(args), args);
783
1
        }, CallbackFlags::kUnrefed);
784
1
  });
785
2
  args.GetReturnValue().Set(scheduled ? taker->object() : Local<Object>());
786
}
787
788
7
void Worker::LoopIdleTime(const FunctionCallbackInfo<Value>& args) {
789
  Worker* w;
790
7
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
791
792
7
  Mutex::ScopedLock lock(w->mutex_);
793
  // Using w->is_stopped() here leads to a deadlock, and checking is_stopped()
794
  // before locking the mutex is a race condition. So manually do the same
795
  // check.
796

7
  if (w->stopped_ || w->env_ == nullptr)
797
    return args.GetReturnValue().Set(-1);
798
799
7
  uint64_t idle_time = uv_metrics_idle_time(w->env_->event_loop());
800
14
  args.GetReturnValue().Set(1.0 * idle_time / 1e6);
801
}
802
803
1
void Worker::LoopStartTime(const FunctionCallbackInfo<Value>& args) {
804
  Worker* w;
805
1
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
806
807
1
  Mutex::ScopedLock lock(w->mutex_);
808
  // Using w->is_stopped() here leads to a deadlock, and checking is_stopped()
809
  // before locking the mutex is a race condition. So manually do the same
810
  // check.
811

1
  if (w->stopped_ || w->env_ == nullptr)
812
    return args.GetReturnValue().Set(-1);
813
814
1
  double loop_start_time = w->env_->performance_state()->milestones[
815
1
      node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START];
816
1
  CHECK_GE(loop_start_time, 0);
817
1
  args.GetReturnValue().Set(
818
1
      (loop_start_time - node::performance::timeOrigin) / 1e6);
819
}
820
821
namespace {
822
823
// Return the MessagePort that is global for this Environment and communicates
824
// with the internal [kPort] port of the JS Worker class in the parent thread.
825
1151
void GetEnvMessagePort(const FunctionCallbackInfo<Value>& args) {
826
1151
  Environment* env = Environment::GetCurrent(args);
827
1151
  Local<Object> port = env->message_port();
828

2302
  CHECK_IMPLIES(!env->is_main_thread(), !port.IsEmpty());
829
1151
  if (!port.IsEmpty()) {
830
3453
    CHECK_EQ(port->GetCreationContext().ToLocalChecked()->GetIsolate(),
831
             args.GetIsolate());
832
2302
    args.GetReturnValue().Set(port);
833
  }
834
1151
}
835
836
632
void InitWorker(Local<Object> target,
837
                Local<Value> unused,
838
                Local<Context> context,
839
                void* priv) {
840
632
  Environment* env = Environment::GetCurrent(context);
841
842
  {
843
632
    Local<FunctionTemplate> w = env->NewFunctionTemplate(Worker::New);
844
845
1264
    w->InstanceTemplate()->SetInternalFieldCount(
846
        Worker::kInternalFieldCount);
847
632
    w->Inherit(AsyncWrap::GetConstructorTemplate(env));
848
849
632
    env->SetProtoMethod(w, "startThread", Worker::StartThread);
850
632
    env->SetProtoMethod(w, "stopThread", Worker::StopThread);
851
632
    env->SetProtoMethod(w, "hasRef", Worker::HasRef);
852
632
    env->SetProtoMethod(w, "ref", Worker::Ref);
853
632
    env->SetProtoMethod(w, "unref", Worker::Unref);
854
632
    env->SetProtoMethod(w, "getResourceLimits", Worker::GetResourceLimits);
855
632
    env->SetProtoMethod(w, "takeHeapSnapshot", Worker::TakeHeapSnapshot);
856
632
    env->SetProtoMethod(w, "loopIdleTime", Worker::LoopIdleTime);
857
632
    env->SetProtoMethod(w, "loopStartTime", Worker::LoopStartTime);
858
859
632
    env->SetConstructorFunction(target, "Worker", w);
860
  }
861
862
  {
863
632
    Local<FunctionTemplate> wst = FunctionTemplate::New(env->isolate());
864
865
1264
    wst->InstanceTemplate()->SetInternalFieldCount(
866
        WorkerHeapSnapshotTaker::kInternalFieldCount);
867
632
    wst->Inherit(AsyncWrap::GetConstructorTemplate(env));
868
869
    Local<String> wst_string =
870
632
        FIXED_ONE_BYTE_STRING(env->isolate(), "WorkerHeapSnapshotTaker");
871
632
    wst->SetClassName(wst_string);
872
632
    env->set_worker_heap_snapshot_taker_template(wst->InstanceTemplate());
873
  }
874
875
632
  env->SetMethod(target, "getEnvMessagePort", GetEnvMessagePort);
876
877
  target
878
632
      ->Set(env->context(),
879
            env->thread_id_string(),
880
2528
            Number::New(env->isolate(), static_cast<double>(env->thread_id())))
881
      .Check();
882
883
  target
884
632
      ->Set(env->context(),
885
            FIXED_ONE_BYTE_STRING(env->isolate(), "isMainThread"),
886
2528
            Boolean::New(env->isolate(), env->is_main_thread()))
887
      .Check();
888
889
  target
890
632
      ->Set(env->context(),
891
            FIXED_ONE_BYTE_STRING(env->isolate(), "ownsProcessState"),
892
1896
            Boolean::New(env->isolate(), env->owns_process_state()))
893
      .Check();
894
895
632
  if (!env->is_main_thread()) {
896
    target
897
577
        ->Set(env->context(),
898
              FIXED_ONE_BYTE_STRING(env->isolate(), "resourceLimits"),
899
2308
              env->worker_context()->GetResourceLimits(env->isolate()))
900
        .Check();
901
  }
902
903
1896
  NODE_DEFINE_CONSTANT(target, kMaxYoungGenerationSizeMb);
904
1896
  NODE_DEFINE_CONSTANT(target, kMaxOldGenerationSizeMb);
905
1896
  NODE_DEFINE_CONSTANT(target, kCodeRangeSizeMb);
906
1896
  NODE_DEFINE_CONSTANT(target, kStackSizeMb);
907
1264
  NODE_DEFINE_CONSTANT(target, kTotalResourceLimitCount);
908
632
}
909
910
5013
void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
911
5013
  registry->Register(GetEnvMessagePort);
912
5013
  registry->Register(Worker::New);
913
5013
  registry->Register(Worker::StartThread);
914
5013
  registry->Register(Worker::StopThread);
915
5013
  registry->Register(Worker::HasRef);
916
5013
  registry->Register(Worker::Ref);
917
5013
  registry->Register(Worker::Unref);
918
5013
  registry->Register(Worker::GetResourceLimits);
919
5013
  registry->Register(Worker::TakeHeapSnapshot);
920
5013
  registry->Register(Worker::LoopIdleTime);
921
5013
  registry->Register(Worker::LoopStartTime);
922
5013
}
923
924
}  // anonymous namespace
925
}  // namespace worker
926
}  // namespace node
927
928
5080
NODE_MODULE_CONTEXT_AWARE_INTERNAL(worker, node::worker::InitWorker)
929
5013
NODE_MODULE_EXTERNAL_REFERENCE(worker, node::worker::RegisterExternalReferences)