GCC Code Coverage Report
Directory: ./ Exec Total Coverage
File: node_worker.cc Lines: 449 480 93.5 %
Date: 2022-09-22 04:22:24 Branches: 175 242 72.3 %

Line Branch Exec Source
1
#include "node_worker.h"
2
#include "debug_utils-inl.h"
3
#include "histogram-inl.h"
4
#include "memory_tracker-inl.h"
5
#include "node_errors.h"
6
#include "node_external_reference.h"
7
#include "node_buffer.h"
8
#include "node_options-inl.h"
9
#include "node_perf.h"
10
#include "node_snapshot_builder.h"
11
#include "util-inl.h"
12
#include "async_wrap-inl.h"
13
14
#include <memory>
15
#include <string>
16
#include <vector>
17
18
using node::kAllowedInEnvironment;
19
using node::kDisallowedInEnvironment;
20
using v8::Array;
21
using v8::ArrayBuffer;
22
using v8::Boolean;
23
using v8::Context;
24
using v8::Float64Array;
25
using v8::FunctionCallbackInfo;
26
using v8::FunctionTemplate;
27
using v8::HandleScope;
28
using v8::Integer;
29
using v8::Isolate;
30
using v8::Local;
31
using v8::Locker;
32
using v8::Maybe;
33
using v8::MaybeLocal;
34
using v8::Null;
35
using v8::Number;
36
using v8::Object;
37
using v8::ResourceConstraints;
38
using v8::SealHandleScope;
39
using v8::String;
40
using v8::TryCatch;
41
using v8::Value;
42
43
namespace node {
44
namespace worker {
45
46
constexpr double kMB = 1024 * 1024;
47
48
961
Worker::Worker(Environment* env,
49
               Local<Object> wrap,
50
               const std::string& url,
51
               std::shared_ptr<PerIsolateOptions> per_isolate_opts,
52
               std::vector<std::string>&& exec_argv,
53
               std::shared_ptr<KVStore> env_vars,
54
961
               const SnapshotData* snapshot_data)
55
    : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER),
56
      per_isolate_opts_(per_isolate_opts),
57
      exec_argv_(exec_argv),
58
961
      platform_(env->isolate_data()->platform()),
59
961
      thread_id_(AllocateEnvironmentThreadId()),
60
      env_vars_(env_vars),
61
1922
      snapshot_data_(snapshot_data) {
62
961
  Debug(this, "Creating new worker instance with thread id %llu",
63
961
        thread_id_.id);
64
65
  // Set up everything that needs to be set up in the parent environment.
66
961
  MessagePort* parent_port = MessagePort::New(env, env->context());
67
961
  if (parent_port == nullptr) {
68
    // This can happen e.g. because execution is terminating.
69
    return;
70
  }
71
72
961
  child_port_data_ = std::make_unique<MessagePortData>(nullptr);
73
961
  MessagePort::Entangle(parent_port, child_port_data_.get());
74
75
961
  object()
76
2883
      ->Set(env->context(), env->message_port_string(), parent_port->object())
77
      .Check();
78
79
961
  object()->Set(env->context(),
80
                env->thread_id_string(),
81
2883
                Number::New(env->isolate(), static_cast<double>(thread_id_.id)))
82
      .Check();
83
84
1922
  inspector_parent_handle_ = GetInspectorParentHandle(
85
961
      env, thread_id_, url.c_str());
86
87
1922
  argv_ = std::vector<std::string>{env->argv()[0]};
88
  // Mark this Worker object as weak until we actually start the thread.
89
961
  MakeWeak();
90
91
961
  Debug(this, "Preparation for worker %llu finished", thread_id_.id);
92
}
93
94
2914
bool Worker::is_stopped() const {
95
5828
  Mutex::ScopedLock lock(mutex_);
96
2914
  if (env_ != nullptr)
97
722
    return env_->is_stopping();
98
2192
  return stopped_;
99
}
100
101
734
void Worker::UpdateResourceConstraints(ResourceConstraints* constraints) {
102
734
  constraints->set_stack_limit(reinterpret_cast<uint32_t*>(stack_base_));
103
104
734
  if (resource_limits_[kMaxYoungGenerationSizeMb] > 0) {
105
1
    constraints->set_max_young_generation_size_in_bytes(
106
1
        static_cast<size_t>(resource_limits_[kMaxYoungGenerationSizeMb] * kMB));
107
  } else {
108
733
    resource_limits_[kMaxYoungGenerationSizeMb] =
109
733
        constraints->max_young_generation_size_in_bytes() / kMB;
110
  }
111
112
734
  if (resource_limits_[kMaxOldGenerationSizeMb] > 0) {
113
4
    constraints->set_max_old_generation_size_in_bytes(
114
4
        static_cast<size_t>(resource_limits_[kMaxOldGenerationSizeMb] * kMB));
115
  } else {
116
730
    resource_limits_[kMaxOldGenerationSizeMb] =
117
730
        constraints->max_old_generation_size_in_bytes() / kMB;
118
  }
119
120
734
  if (resource_limits_[kCodeRangeSizeMb] > 0) {
121
1
    constraints->set_code_range_size_in_bytes(
122
1
        static_cast<size_t>(resource_limits_[kCodeRangeSizeMb] * kMB));
123
  } else {
124
733
    resource_limits_[kCodeRangeSizeMb] =
125
733
        constraints->code_range_size_in_bytes() / kMB;
126
  }
127
734
}
128
129
// This class contains data that is only relevant to the child thread itself,
130
// and only while it is running.
131
// (Eventually, the Environment instance should probably also be moved here.)
132
class WorkerThreadData {
133
 public:
134
958
  explicit WorkerThreadData(Worker* w)
135
958
    : w_(w) {
136
958
    int ret = uv_loop_init(&loop_);
137
958
    if (ret != 0) {
138
      char err_buf[128];
139
224
      uv_err_name_r(ret, err_buf, sizeof(err_buf));
140
224
      w->Exit(1, "ERR_WORKER_INIT_FAILED", err_buf);
141
224
      return;
142
    }
143
734
    loop_init_failed_ = false;
144
734
    uv_loop_configure(&loop_, UV_METRICS_IDLE_TIME);
145
146
    std::shared_ptr<ArrayBufferAllocator> allocator =
147
734
        ArrayBufferAllocator::Create();
148
734
    Isolate::CreateParams params;
149
734
    SetIsolateCreateParamsForNode(&params);
150
734
    params.array_buffer_allocator_shared = allocator;
151
152
734
    if (w->snapshot_data() != nullptr) {
153
733
      SnapshotBuilder::InitializeIsolateParams(w->snapshot_data(), &params);
154
    }
155
734
    w->UpdateResourceConstraints(&params.constraints);
156
157
734
    Isolate* isolate = Isolate::Allocate();
158
734
    if (isolate == nullptr) {
159
      w->Exit(1, "ERR_WORKER_INIT_FAILED", "Failed to create new Isolate");
160
      return;
161
    }
162
163
734
    w->platform_->RegisterIsolate(isolate, &loop_);
164
734
    Isolate::Initialize(isolate, params);
165
734
    SetIsolateUpForNode(isolate);
166
167
    // Be sure it's called before Environment::InitializeDiagnostics()
168
    // so that this callback stays when the callback of
169
    // --heapsnapshot-near-heap-limit gets is popped.
170
734
    isolate->AddNearHeapLimitCallback(Worker::NearHeapLimit, w);
171
172
    {
173
734
      Locker locker(isolate);
174
1468
      Isolate::Scope isolate_scope(isolate);
175
      // V8 computes its stack limit the first time a `Locker` is used based on
176
      // --stack-size. Reset it to the correct value.
177
734
      isolate->SetStackLimit(w->stack_base_);
178
179
734
      HandleScope handle_scope(isolate);
180
1468
      isolate_data_.reset(CreateIsolateData(isolate,
181
                                            &loop_,
182
734
                                            w_->platform_,
183
                                            allocator.get()));
184
734
      CHECK(isolate_data_);
185
734
      if (w_->per_isolate_opts_)
186
289
        isolate_data_->set_options(std::move(w_->per_isolate_opts_));
187
734
      isolate_data_->set_worker_context(w_);
188
734
      isolate_data_->max_young_gen_size =
189
734
          params.constraints.max_young_generation_size_in_bytes();
190
    }
191
192
734
    Mutex::ScopedLock lock(w_->mutex_);
193
734
    w_->isolate_ = isolate;
194
  }
195
196
958
  ~WorkerThreadData() {
197
958
    Debug(w_, "Worker %llu dispose isolate", w_->thread_id_.id);
198
    Isolate* isolate;
199
    {
200
958
      Mutex::ScopedLock lock(w_->mutex_);
201
958
      isolate = w_->isolate_;
202
958
      w_->isolate_ = nullptr;
203
    }
204
205
958
    if (isolate != nullptr) {
206
734
      CHECK(!loop_init_failed_);
207
734
      bool platform_finished = false;
208
209
734
      isolate_data_.reset();
210
211
734
      w_->platform_->AddIsolateFinishedCallback(isolate, [](void* data) {
212
734
        *static_cast<bool*>(data) = true;
213
734
      }, &platform_finished);
214
215
      // The order of these calls is important; if the Isolate is first disposed
216
      // and then unregistered, there is a race condition window in which no
217
      // new Isolate at the same address can successfully be registered with
218
      // the platform.
219
      // (Refs: https://github.com/nodejs/node/issues/30846)
220
734
      w_->platform_->UnregisterIsolate(isolate);
221
734
      isolate->Dispose();
222
223
      // Wait until the platform has cleaned up all relevant resources.
224
1468
      while (!platform_finished) {
225
734
        uv_run(&loop_, UV_RUN_ONCE);
226
      }
227
    }
228
958
    if (!loop_init_failed_) {
229
734
      CheckedUvLoopClose(&loop_);
230
    }
231
958
  }
232
233
734
  bool loop_is_usable() const { return !loop_init_failed_; }
234
235
 private:
236
  Worker* const w_;
237
  uv_loop_t loop_;
238
  bool loop_init_failed_ = true;
239
  DeleteFnPtr<IsolateData, FreeIsolateData> isolate_data_;
240
  const SnapshotData* snapshot_data_ = nullptr;
241
  friend class Worker;
242
};
243
244
4
size_t Worker::NearHeapLimit(void* data, size_t current_heap_limit,
245
                             size_t initial_heap_limit) {
246
4
  Worker* worker = static_cast<Worker*>(data);
247
  // Give the current GC some extra leeway to let it finish rather than
248
  // crash hard. We are not going to perform further allocations anyway.
249
4
  constexpr size_t kExtraHeapAllowance = 16 * 1024 * 1024;
250
4
  size_t new_limit = current_heap_limit + kExtraHeapAllowance;
251
4
  Environment* env = worker->env();
252
4
  if (env != nullptr) {
253
    DCHECK(!env->is_in_heapsnapshot_heap_limit_callback());
254
    Debug(env,
255
          DebugCategory::DIAGNOSTICS,
256
          "Throwing ERR_WORKER_OUT_OF_MEMORY, "
257
          "new_limit=%" PRIu64 "\n",
258
8
          static_cast<uint64_t>(new_limit));
259
  }
260
4
  worker->Exit(1, "ERR_WORKER_OUT_OF_MEMORY", "JS heap out of memory");
261
4
  return new_limit;
262
}
263
264
958
void Worker::Run() {
265
958
  std::string name = "WorkerThread ";
266
958
  name += std::to_string(thread_id_.id);
267

1138
  TRACE_EVENT_METADATA1(
268
      "__metadata", "thread_name", "name",
269
      TRACE_STR_COPY(name.c_str()));
270
958
  CHECK_NOT_NULL(platform_);
271
272
958
  Debug(this, "Creating isolate for worker with id %llu", thread_id_.id);
273
274
958
  WorkerThreadData data(this);
275
958
  if (isolate_ == nullptr) return;
276
734
  CHECK(data.loop_is_usable());
277
278
734
  Debug(this, "Starting worker with id %llu", thread_id_.id);
279
  {
280
734
    Locker locker(isolate_);
281
734
    Isolate::Scope isolate_scope(isolate_);
282
734
    SealHandleScope outer_seal(isolate_);
283
284
734
    DeleteFnPtr<Environment, FreeEnvironment> env_;
285
734
    auto cleanup_env = OnScopeLeave([&]() {
286
      // TODO(addaleax): This call is harmless but should not be necessary.
287
      // Figure out why V8 is raising a DCHECK() here without it
288
      // (in test/parallel/test-async-hooks-worker-asyncfn-terminate-4.js).
289
734
      isolate_->CancelTerminateExecution();
290
291
734
      if (!env_) return;
292
729
      env_->set_can_call_into_js(false);
293
294
      {
295
729
        Mutex::ScopedLock lock(mutex_);
296
729
        stopped_ = true;
297
729
        this->env_ = nullptr;
298
      }
299
300
729
      env_.reset();
301
734
    });
302
303
734
    if (is_stopped()) return;
304
    {
305
729
      HandleScope handle_scope(isolate_);
306
      Local<Context> context;
307
      {
308
        // We create the Context object before we have an Environment* in place
309
        // that we could use for error handling. If creation fails due to
310
        // resource constraints, we need something in place to handle it,
311
        // though.
312
729
        TryCatch try_catch(isolate_);
313
729
        if (snapshot_data_ != nullptr) {
314
728
          context = Context::FromSnapshot(isolate_,
315
728
                                          SnapshotData::kNodeBaseContextIndex)
316
728
                        .ToLocalChecked();
317
1456
          if (!context.IsEmpty() &&
318

2184
              !InitializeContextRuntime(context).IsJust()) {
319
            context = Local<Context>();
320
          }
321
        } else {
322
1
          context = NewContext(isolate_);
323
        }
324
729
        if (context.IsEmpty()) {
325
          Exit(1, "ERR_WORKER_INIT_FAILED", "Failed to create new Context");
326
          return;
327
        }
328
      }
329
330
729
      if (is_stopped()) return;
331
729
      CHECK(!context.IsEmpty());
332
729
      Context::Scope context_scope(context);
333
      {
334
1458
        env_.reset(CreateEnvironment(
335
            data.isolate_data_.get(),
336
            context,
337
729
            std::move(argv_),
338
729
            std::move(exec_argv_),
339
729
            static_cast<EnvironmentFlags::Flags>(environment_flags_),
340
            thread_id_,
341
729
            std::move(inspector_parent_handle_)));
342
729
        if (is_stopped()) return;
343
722
        CHECK_NOT_NULL(env_);
344
722
        env_->set_env_vars(std::move(env_vars_));
345
722
        SetProcessExitHandler(env_.get(), [this](Environment*, int exit_code) {
346
60
          Exit(exit_code);
347
60
        });
348
      }
349
      {
350
722
        Mutex::ScopedLock lock(mutex_);
351
722
        if (stopped_) return;
352
722
        this->env_ = env_.get();
353
      }
354
722
      Debug(this, "Created Environment for worker with id %llu", thread_id_.id);
355
722
      if (is_stopped()) return;
356
      {
357
722
        if (!CreateEnvMessagePort(env_.get())) {
358
          return;
359
        }
360
361
722
        Debug(this, "Created message port for worker %llu", thread_id_.id);
362
1444
        if (LoadEnvironment(env_.get(), StartExecutionCallback{}).IsEmpty())
363
          return;
364
365
722
        Debug(this, "Loaded environment for worker %llu", thread_id_.id);
366
      }
367
    }
368
369
    {
370
722
      Maybe<int> exit_code = SpinEventLoop(env_.get());
371
1444
      Mutex::ScopedLock lock(mutex_);
372

1089
      if (exit_code_ == 0 && exit_code.IsJust()) {
373
348
        exit_code_ = exit_code.FromJust();
374
      }
375
376
722
      Debug(this, "Exiting thread for worker %llu with exit code %d",
377
722
            thread_id_.id, exit_code_);
378
    }
379
  }
380
381
722
  Debug(this, "Worker %llu thread stops", thread_id_.id);
382
}
383
384
722
bool Worker::CreateEnvMessagePort(Environment* env) {
385
1444
  HandleScope handle_scope(isolate_);
386
1444
  std::unique_ptr<MessagePortData> data;
387
  {
388
1444
    Mutex::ScopedLock lock(mutex_);
389
722
    data = std::move(child_port_data_);
390
  }
391
392
  // Set up the message channel for receiving messages in the child.
393
1444
  MessagePort* child_port = MessagePort::New(env,
394
                                             env->context(),
395
722
                                             std::move(data));
396
  // MessagePort::New() may return nullptr if execution is terminated
397
  // within it.
398
722
  if (child_port != nullptr)
399
722
    env->set_message_port(child_port->object(isolate_));
400
401
722
  return child_port;
402
}
403
404
962
void Worker::JoinThread() {
405
962
  if (!tid_.has_value())
406
4
    return;
407
958
  CHECK_EQ(uv_thread_join(&tid_.value()), 0);
408
958
  tid_.reset();
409
410
958
  env()->remove_sub_worker_context(this);
411
412
  {
413
1914
    HandleScope handle_scope(env()->isolate());
414
958
    Context::Scope context_scope(env()->context());
415
416
    // Reset the parent port as we're closing it now anyway.
417
958
    object()->Set(env()->context(),
418
                  env()->message_port_string(),
419
2874
                  Undefined(env()->isolate())).Check();
420
421
    Local<Value> args[] = {
422
        Integer::New(env()->isolate(), exit_code_),
423
958
        custom_error_ != nullptr
424
456
            ? OneByteString(env()->isolate(), custom_error_).As<Value>()
425
2190
            : Null(env()->isolate()).As<Value>(),
426
1916
        !custom_error_str_.empty()
427
228
            ? OneByteString(env()->isolate(), custom_error_str_.c_str())
428
228
                  .As<Value>()
429
2190
            : Null(env()->isolate()).As<Value>(),
430

2874
    };
431
432
958
    MakeCallback(env()->onexit_string(), arraysize(args), args);
433
  }
434
435
  // If we get here, the tid_.has_value() condition at the top of the function
436
  // implies that the thread was running. In that case, its final action will
437
  // be to schedule a callback on the parent thread which will delete this
438
  // object, so there's nothing more to do here.
439
}
440
441
3744
Worker::~Worker() {
442
3744
  Mutex::ScopedLock lock(mutex_);
443
444
1872
  CHECK(stopped_);
445
1872
  CHECK_NULL(env_);
446
1872
  CHECK(!tid_.has_value());
447
448
1872
  Debug(this, "Worker %llu destroyed", thread_id_.id);
449
3744
}
450
451
965
void Worker::New(const FunctionCallbackInfo<Value>& args) {
452
965
  Environment* env = Environment::GetCurrent(args);
453
965
  Isolate* isolate = args.GetIsolate();
454
455
965
  CHECK(args.IsConstructCall());
456
457
965
  if (env->isolate_data()->platform() == nullptr) {
458
    THROW_ERR_MISSING_PLATFORM_FOR_WORKER(env);
459
4
    return;
460
  }
461
462
965
  std::string url;
463
965
  std::shared_ptr<PerIsolateOptions> per_isolate_opts = nullptr;
464
965
  std::shared_ptr<KVStore> env_vars = nullptr;
465
466
965
  std::vector<std::string> exec_argv_out;
467
468
  // Argument might be a string or URL
469
1930
  if (!args[0]->IsNullOrUndefined()) {
470
    Utf8Value value(
471
1455
        isolate, args[0]->ToString(env->context()).FromMaybe(Local<String>()));
472
485
    url.append(value.out(), value.length());
473
  }
474
475
1930
  if (args[1]->IsNull()) {
476
    // Means worker.env = { ...process.env }.
477
937
    env_vars = env->env_vars()->Clone(isolate);
478
28
  } else if (args[1]->IsObject()) {
479
    // User provided env.
480
27
    env_vars = KVStore::CreateMapKVStore();
481
27
    env_vars->AssignFromObject(isolate->GetCurrentContext(),
482
81
                               args[1].As<Object>());
483
  } else {
484
    // Env is shared.
485
1
    env_vars = env->env_vars();
486
  }
487
488

1903
  if (args[1]->IsObject() || args[2]->IsArray()) {
489
293
    per_isolate_opts.reset(new PerIsolateOptions());
490
491
293
    HandleEnvOptions(per_isolate_opts->per_env, [&env_vars](const char* name) {
492
2344
      return env_vars->Get(name).FromMaybe("");
493
    });
494
495
#ifndef NODE_WITHOUT_NODE_OPTIONS
496
    MaybeLocal<String> maybe_node_opts =
497
293
        env_vars->Get(isolate, OneByteString(isolate, "NODE_OPTIONS"));
498
    Local<String> node_opts;
499
293
    if (maybe_node_opts.ToLocal(&node_opts)) {
500
876
      std::string node_options(*String::Utf8Value(isolate, node_opts));
501
292
      std::vector<std::string> errors{};
502
      std::vector<std::string> env_argv =
503
292
          ParseNodeOptionsEnvVar(node_options, &errors);
504
      // [0] is expected to be the program name, add dummy string.
505
292
      env_argv.insert(env_argv.begin(), "");
506
292
      std::vector<std::string> invalid_args{};
507
292
      options_parser::Parse(&env_argv,
508
                            nullptr,
509
                            &invalid_args,
510
                            per_isolate_opts.get(),
511
                            kAllowedInEnvironment,
512
                            &errors);
513

293
      if (!errors.empty() && args[1]->IsObject()) {
514
        // Only fail for explicitly provided env, this protects from failures
515
        // when NODE_OPTIONS from parent's env is used (which is the default).
516
        Local<Value> error;
517
2
        if (!ToV8Value(env->context(), errors).ToLocal(&error)) return;
518
        Local<String> key =
519
1
            FIXED_ONE_BYTE_STRING(env->isolate(), "invalidNodeOptions");
520
        // Ignore the return value of Set() because exceptions bubble up to JS
521
        // when we return anyway.
522
1
        USE(args.This()->Set(env->context(), key, error));
523
1
        return;
524
      }
525
    }
526
#endif  // NODE_WITHOUT_NODE_OPTIONS
527
  }
528
529
964
  if (args[2]->IsArray()) {
530
289
    Local<Array> array = args[2].As<Array>();
531
    // The first argument is reserved for program name, but we don't need it
532
    // in workers.
533
1156
    std::vector<std::string> exec_argv = {""};
534
289
    uint32_t length = array->Length();
535
427
    for (uint32_t i = 0; i < length; i++) {
536
      Local<Value> arg;
537
276
      if (!array->Get(env->context(), i).ToLocal(&arg)) {
538
        return;
539
      }
540
      Local<String> arg_v8;
541
276
      if (!arg->ToString(env->context()).ToLocal(&arg_v8)) {
542
        return;
543
      }
544
276
      Utf8Value arg_utf8_value(args.GetIsolate(), arg_v8);
545
276
      std::string arg_string(arg_utf8_value.out(), arg_utf8_value.length());
546
138
      exec_argv.push_back(arg_string);
547
    }
548
549
289
    std::vector<std::string> invalid_args{};
550
289
    std::vector<std::string> errors{};
551
    // Using invalid_args as the v8_args argument as it stores unknown
552
    // options for the per isolate parser.
553
289
    options_parser::Parse(
554
        &exec_argv,
555
        &exec_argv_out,
556
        &invalid_args,
557
        per_isolate_opts.get(),
558
        kDisallowedInEnvironment,
559
        &errors);
560
561
    // The first argument is program name.
562
289
    invalid_args.erase(invalid_args.begin());
563

289
    if (errors.size() > 0 || invalid_args.size() > 0) {
564
      Local<Value> error;
565
3
      if (!ToV8Value(env->context(),
566
3
                     errors.size() > 0 ? errors : invalid_args)
567
3
                         .ToLocal(&error)) {
568
        return;
569
      }
570
      Local<String> key =
571
3
          FIXED_ONE_BYTE_STRING(env->isolate(), "invalidExecArgv");
572
      // Ignore the return value of Set() because exceptions bubble up to JS
573
      // when we return anyway.
574
3
      USE(args.This()->Set(env->context(), key, error));
575
3
      return;
576
    }
577
  } else {
578
675
    exec_argv_out = env->exec_argv();
579
  }
580
581
961
  bool use_node_snapshot = per_process::cli_options->node_snapshot;
582
  const SnapshotData* snapshot_data =
583
961
      use_node_snapshot ? SnapshotBuilder::GetEmbeddedSnapshotData() : nullptr;
584
585
  Worker* worker = new Worker(env,
586
1922
                              args.This(),
587
                              url,
588
                              per_isolate_opts,
589
961
                              std::move(exec_argv_out),
590
                              env_vars,
591
961
                              snapshot_data);
592
593
961
  CHECK(args[3]->IsFloat64Array());
594
1922
  Local<Float64Array> limit_info = args[3].As<Float64Array>();
595
961
  CHECK_EQ(limit_info->Length(), kTotalResourceLimitCount);
596
961
  limit_info->CopyContents(worker->resource_limits_,
597
                           sizeof(worker->resource_limits_));
598
599
961
  CHECK(args[4]->IsBoolean());
600

961
  if (args[4]->IsTrue() || env->tracks_unmanaged_fds())
601
960
    worker->environment_flags_ |= EnvironmentFlags::kTrackUnmanagedFds;
602
961
  if (env->hide_console_windows())
603
    worker->environment_flags_ |= EnvironmentFlags::kHideConsoleWindows;
604
961
  if (env->no_native_addons())
605
4
    worker->environment_flags_ |= EnvironmentFlags::kNoNativeAddons;
606
961
  if (env->no_global_search_paths())
607
    worker->environment_flags_ |= EnvironmentFlags::kNoGlobalSearchPaths;
608
961
  if (env->no_browser_globals())
609
    worker->environment_flags_ |= EnvironmentFlags::kNoBrowserGlobals;
610
}
611
612
958
void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
613
  Worker* w;
614
958
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
615
1916
  Mutex::ScopedLock lock(w->mutex_);
616
617
958
  w->stopped_ = false;
618
619
958
  if (w->resource_limits_[kStackSizeMb] > 0) {
620
6
    if (w->resource_limits_[kStackSizeMb] * kMB < kStackBufferSize) {
621
      w->resource_limits_[kStackSizeMb] = kStackBufferSize / kMB;
622
      w->stack_size_ = kStackBufferSize;
623
    } else {
624
12
      w->stack_size_ =
625
6
          static_cast<size_t>(w->resource_limits_[kStackSizeMb] * kMB);
626
    }
627
  } else {
628
952
    w->resource_limits_[kStackSizeMb] = w->stack_size_ / kMB;
629
  }
630
631
  uv_thread_options_t thread_options;
632
958
  thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
633
958
  thread_options.stack_size = w->stack_size_;
634
635
958
  uv_thread_t* tid = &w->tid_.emplace();  // Create uv_thread_t instance
636
958
  int ret = uv_thread_create_ex(tid, &thread_options, [](void* arg) {
637
    // XXX: This could become a std::unique_ptr, but that makes at least
638
    // gcc 6.3 detect undefined behaviour when there shouldn't be any.
639
    // gcc 7+ handles this well.
640
958
    Worker* w = static_cast<Worker*>(arg);
641
958
    const uintptr_t stack_top = reinterpret_cast<uintptr_t>(&arg);
642
643
    // Leave a few kilobytes just to make sure we're within limits and have
644
    // some space to do work in C++ land.
645
958
    w->stack_base_ = stack_top - (w->stack_size_ - kStackBufferSize);
646
647
958
    w->Run();
648
649
958
    Mutex::ScopedLock lock(w->mutex_);
650
958
    w->env()->SetImmediateThreadsafe(
651
935
        [w = std::unique_ptr<Worker>(w)](Environment* env) {
652
935
          if (w->has_ref_)
653
933
            env->add_refs(-1);
654
935
          w->JoinThread();
655
          // implicitly delete w
656
933
        });
657
958
  }, static_cast<void*>(w));
658
659
958
  if (ret == 0) {
660
    // The object now owns the created thread and should not be garbage
661
    // collected until that finishes.
662
958
    w->ClearWeak();
663
664
958
    if (w->has_ref_)
665
958
      w->env()->add_refs(1);
666
667
958
    w->env()->add_sub_worker_context(w);
668
  } else {
669
    w->stopped_ = true;
670
    w->tid_.reset();
671
672
    char err_buf[128];
673
    uv_err_name_r(ret, err_buf, sizeof(err_buf));
674
    {
675
      Isolate* isolate = w->env()->isolate();
676
      HandleScope handle_scope(isolate);
677
      THROW_ERR_WORKER_INIT_FAILED(isolate, err_buf);
678
    }
679
  }
680
}
681
682
382
void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
683
  Worker* w;
684
382
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
685
686
382
  Debug(w, "Worker %llu is getting stopped by parent", w->thread_id_.id);
687
382
  w->Exit(1);
688
}
689
690
386
void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
691
  Worker* w;
692
386
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
693

386
  if (!w->has_ref_ && w->tid_.has_value()) {
694
5
    w->has_ref_ = true;
695
5
    w->env()->add_refs(1);
696
  }
697
}
698
699
9
void Worker::HasRef(const FunctionCallbackInfo<Value>& args) {
700
  Worker* w;
701
9
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
702
16
  args.GetReturnValue().Set(w->has_ref_);
703
}
704
705
7
void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
706
  Worker* w;
707
7
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
708

7
  if (w->has_ref_ && w->tid_.has_value()) {
709
7
    w->has_ref_ = false;
710
7
    w->env()->add_refs(-1);
711
  }
712
}
713
714
1
void Worker::GetResourceLimits(const FunctionCallbackInfo<Value>& args) {
715
  Worker* w;
716
1
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
717
2
  args.GetReturnValue().Set(w->GetResourceLimits(args.GetIsolate()));
718
}
719
720
730
Local<Float64Array> Worker::GetResourceLimits(Isolate* isolate) const {
721
730
  Local<ArrayBuffer> ab = ArrayBuffer::New(isolate, sizeof(resource_limits_));
722
723
730
  memcpy(ab->Data(), resource_limits_, sizeof(resource_limits_));
724
730
  return Float64Array::New(ab, 0, kTotalResourceLimitCount);
725
}
726
727
697
void Worker::Exit(int code, const char* error_code, const char* error_message) {
728
1394
  Mutex::ScopedLock lock(mutex_);
729
697
  Debug(this, "Worker %llu called Exit(%d, %s, %s)",
730
697
        thread_id_.id, code, error_code, error_message);
731
732
697
  if (error_code != nullptr) {
733
228
    custom_error_ = error_code;
734
228
    custom_error_str_ = error_message;
735
  }
736
737
697
  if (env_ != nullptr) {
738
375
    exit_code_ = code;
739
375
    Stop(env_);
740
  } else {
741
322
    stopped_ = true;
742
  }
743
697
}
744
745
bool Worker::IsNotIndicativeOfMemoryLeakAtExit() const {
746
  // Worker objects always stay alive as long as the child thread, regardless
747
  // of whether they are being referenced in the parent thread.
748
  return true;
749
}
750
751
class WorkerHeapSnapshotTaker : public AsyncWrap {
752
 public:
753
1
  WorkerHeapSnapshotTaker(Environment* env, Local<Object> obj)
754
1
    : AsyncWrap(env, obj, AsyncWrap::PROVIDER_WORKERHEAPSNAPSHOT) {}
755
756
  SET_NO_MEMORY_INFO()
757
  SET_MEMORY_INFO_NAME(WorkerHeapSnapshotTaker)
758
  SET_SELF_SIZE(WorkerHeapSnapshotTaker)
759
};
760
761
1
void Worker::TakeHeapSnapshot(const FunctionCallbackInfo<Value>& args) {
762
  Worker* w;
763
1
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
764
765
1
  Debug(w, "Worker %llu taking heap snapshot", w->thread_id_.id);
766
767
1
  Environment* env = w->env();
768
1
  AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
769
  Local<Object> wrap;
770
1
  if (!env->worker_heap_snapshot_taker_template()
771
2
      ->NewInstance(env->context()).ToLocal(&wrap)) {
772
    return;
773
  }
774
  BaseObjectPtr<WorkerHeapSnapshotTaker> taker =
775
1
      MakeDetachedBaseObject<WorkerHeapSnapshotTaker>(env, wrap);
776
777
  // Interrupt the worker thread and take a snapshot, then schedule a call
778
  // on the parent thread that turns that snapshot into a readable stream.
779
1
  bool scheduled = w->RequestInterrupt([taker, env](Environment* worker_env) {
780
    heap::HeapSnapshotPointer snapshot {
781
1
        worker_env->isolate()->GetHeapProfiler()->TakeHeapSnapshot() };
782
1
    CHECK(snapshot);
783
2
    env->SetImmediateThreadsafe(
784
1
        [taker, snapshot = std::move(snapshot)](Environment* env) mutable {
785
2
          HandleScope handle_scope(env->isolate());
786
1
          Context::Scope context_scope(env->context());
787
788
2
          AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker.get());
789
          BaseObjectPtr<AsyncWrap> stream = heap::CreateHeapSnapshotStream(
790
2
              env, std::move(snapshot));
791
1
          Local<Value> args[] = { stream->object() };
792
1
          taker->MakeCallback(env->ondone_string(), arraysize(args), args);
793
1
        }, CallbackFlags::kUnrefed);
794
1
  });
795
2
  args.GetReturnValue().Set(scheduled ? taker->object() : Local<Object>());
796
}
797
798
7
void Worker::LoopIdleTime(const FunctionCallbackInfo<Value>& args) {
799
  Worker* w;
800
7
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
801
802
7
  Mutex::ScopedLock lock(w->mutex_);
803
  // Using w->is_stopped() here leads to a deadlock, and checking is_stopped()
804
  // before locking the mutex is a race condition. So manually do the same
805
  // check.
806

7
  if (w->stopped_ || w->env_ == nullptr)
807
    return args.GetReturnValue().Set(-1);
808
809
7
  uint64_t idle_time = uv_metrics_idle_time(w->env_->event_loop());
810
14
  args.GetReturnValue().Set(1.0 * idle_time / 1e6);
811
}
812
813
1
void Worker::LoopStartTime(const FunctionCallbackInfo<Value>& args) {
814
  Worker* w;
815
1
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
816
817
1
  Mutex::ScopedLock lock(w->mutex_);
818
  // Using w->is_stopped() here leads to a deadlock, and checking is_stopped()
819
  // before locking the mutex is a race condition. So manually do the same
820
  // check.
821

1
  if (w->stopped_ || w->env_ == nullptr)
822
    return args.GetReturnValue().Set(-1);
823
824
1
  double loop_start_time = w->env_->performance_state()->milestones[
825
1
      node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START];
826
1
  CHECK_GE(loop_start_time, 0);
827
2
  args.GetReturnValue().Set(loop_start_time / 1e6);
828
}
829
830
namespace {
831
832
// Return the MessagePort that is global for this Environment and communicates
833
// with the internal [kPort] port of the JS Worker class in the parent thread.
834
1445
void GetEnvMessagePort(const FunctionCallbackInfo<Value>& args) {
835
1445
  Environment* env = Environment::GetCurrent(args);
836
1445
  Local<Object> port = env->message_port();
837

2890
  CHECK_IMPLIES(!env->is_main_thread(), !port.IsEmpty());
838
1445
  if (!port.IsEmpty()) {
839
4335
    CHECK_EQ(port->GetCreationContext().ToLocalChecked()->GetIsolate(),
840
             args.GetIsolate());
841
2890
    args.GetReturnValue().Set(port);
842
  }
843
1445
}
844
845
789
void InitWorker(Local<Object> target,
846
                Local<Value> unused,
847
                Local<Context> context,
848
                void* priv) {
849
789
  Environment* env = Environment::GetCurrent(context);
850
789
  Isolate* isolate = env->isolate();
851
852
  {
853
789
    Local<FunctionTemplate> w = NewFunctionTemplate(isolate, Worker::New);
854
855
1578
    w->InstanceTemplate()->SetInternalFieldCount(
856
        Worker::kInternalFieldCount);
857
789
    w->Inherit(AsyncWrap::GetConstructorTemplate(env));
858
859
789
    SetProtoMethod(isolate, w, "startThread", Worker::StartThread);
860
789
    SetProtoMethod(isolate, w, "stopThread", Worker::StopThread);
861
789
    SetProtoMethod(isolate, w, "hasRef", Worker::HasRef);
862
789
    SetProtoMethod(isolate, w, "ref", Worker::Ref);
863
789
    SetProtoMethod(isolate, w, "unref", Worker::Unref);
864
789
    SetProtoMethod(isolate, w, "getResourceLimits", Worker::GetResourceLimits);
865
789
    SetProtoMethod(isolate, w, "takeHeapSnapshot", Worker::TakeHeapSnapshot);
866
789
    SetProtoMethod(isolate, w, "loopIdleTime", Worker::LoopIdleTime);
867
789
    SetProtoMethod(isolate, w, "loopStartTime", Worker::LoopStartTime);
868
869
789
    SetConstructorFunction(context, target, "Worker", w);
870
  }
871
872
  {
873
789
    Local<FunctionTemplate> wst = NewFunctionTemplate(isolate, nullptr);
874
875
1578
    wst->InstanceTemplate()->SetInternalFieldCount(
876
        WorkerHeapSnapshotTaker::kInternalFieldCount);
877
789
    wst->Inherit(AsyncWrap::GetConstructorTemplate(env));
878
879
    Local<String> wst_string =
880
789
        FIXED_ONE_BYTE_STRING(isolate, "WorkerHeapSnapshotTaker");
881
789
    wst->SetClassName(wst_string);
882
789
    env->set_worker_heap_snapshot_taker_template(wst->InstanceTemplate());
883
  }
884
885
789
  SetMethod(context, target, "getEnvMessagePort", GetEnvMessagePort);
886
887
  target
888
789
      ->Set(env->context(),
889
            env->thread_id_string(),
890
3156
            Number::New(isolate, static_cast<double>(env->thread_id())))
891
      .Check();
892
893
  target
894
789
      ->Set(env->context(),
895
            FIXED_ONE_BYTE_STRING(isolate, "isMainThread"),
896
3156
            Boolean::New(isolate, env->is_main_thread()))
897
      .Check();
898
899
  target
900
789
      ->Set(env->context(),
901
            FIXED_ONE_BYTE_STRING(isolate, "ownsProcessState"),
902
2367
            Boolean::New(isolate, env->owns_process_state()))
903
      .Check();
904
905
789
  if (!env->is_main_thread()) {
906
    target
907
729
        ->Set(env->context(),
908
              FIXED_ONE_BYTE_STRING(isolate, "resourceLimits"),
909
2916
              env->worker_context()->GetResourceLimits(isolate))
910
        .Check();
911
  }
912
913
2367
  NODE_DEFINE_CONSTANT(target, kMaxYoungGenerationSizeMb);
914
2367
  NODE_DEFINE_CONSTANT(target, kMaxOldGenerationSizeMb);
915
2367
  NODE_DEFINE_CONSTANT(target, kCodeRangeSizeMb);
916
2367
  NODE_DEFINE_CONSTANT(target, kStackSizeMb);
917
1578
  NODE_DEFINE_CONSTANT(target, kTotalResourceLimitCount);
918
789
}
919
920
5527
void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
921
5527
  registry->Register(GetEnvMessagePort);
922
5527
  registry->Register(Worker::New);
923
5527
  registry->Register(Worker::StartThread);
924
5527
  registry->Register(Worker::StopThread);
925
5527
  registry->Register(Worker::HasRef);
926
5527
  registry->Register(Worker::Ref);
927
5527
  registry->Register(Worker::Unref);
928
5527
  registry->Register(Worker::GetResourceLimits);
929
5527
  registry->Register(Worker::TakeHeapSnapshot);
930
5527
  registry->Register(Worker::LoopIdleTime);
931
5527
  registry->Register(Worker::LoopStartTime);
932
5527
}
933
934
}  // anonymous namespace
935
}  // namespace worker
936
}  // namespace node
937
938
5597
NODE_MODULE_CONTEXT_AWARE_INTERNAL(worker, node::worker::InitWorker)
939
5527
NODE_MODULE_EXTERNAL_REFERENCE(worker, node::worker::RegisterExternalReferences)