GCC Code Coverage Report
Directory: ./ Exec Total Coverage
File: node_worker.cc Lines: 445 473 94.1 %
Date: 2022-04-29 04:15:07 Branches: 167 228 73.2 %

Line Branch Exec Source
1
#include "node_worker.h"
2
#include "debug_utils-inl.h"
3
#include "histogram-inl.h"
4
#include "memory_tracker-inl.h"
5
#include "node_errors.h"
6
#include "node_external_reference.h"
7
#include "node_buffer.h"
8
#include "node_options-inl.h"
9
#include "node_perf.h"
10
#include "node_snapshot_builder.h"
11
#include "util-inl.h"
12
#include "async_wrap-inl.h"
13
14
#include <memory>
15
#include <string>
16
#include <vector>
17
18
using node::kAllowedInEnvironment;
19
using node::kDisallowedInEnvironment;
20
using v8::Array;
21
using v8::ArrayBuffer;
22
using v8::Boolean;
23
using v8::Context;
24
using v8::Float64Array;
25
using v8::FunctionCallbackInfo;
26
using v8::FunctionTemplate;
27
using v8::HandleScope;
28
using v8::Integer;
29
using v8::Isolate;
30
using v8::Local;
31
using v8::Locker;
32
using v8::Maybe;
33
using v8::MaybeLocal;
34
using v8::Null;
35
using v8::Number;
36
using v8::Object;
37
using v8::ResourceConstraints;
38
using v8::SealHandleScope;
39
using v8::String;
40
using v8::TryCatch;
41
using v8::Value;
42
43
namespace node {
44
namespace worker {
45
46
constexpr double kMB = 1024 * 1024;
47
48
822
Worker::Worker(Environment* env,
49
               Local<Object> wrap,
50
               const std::string& url,
51
               std::shared_ptr<PerIsolateOptions> per_isolate_opts,
52
               std::vector<std::string>&& exec_argv,
53
822
               std::shared_ptr<KVStore> env_vars)
54
    : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER),
55
      per_isolate_opts_(per_isolate_opts),
56
      exec_argv_(exec_argv),
57
822
      platform_(env->isolate_data()->platform()),
58
822
      thread_id_(AllocateEnvironmentThreadId()),
59
1644
      env_vars_(env_vars) {
60
822
  Debug(this, "Creating new worker instance with thread id %llu",
61
822
        thread_id_.id);
62
63
  // Set up everything that needs to be set up in the parent environment.
64
822
  parent_port_ = MessagePort::New(env, env->context());
65
822
  if (parent_port_ == nullptr) {
66
    // This can happen e.g. because execution is terminating.
67
    return;
68
  }
69
70
822
  child_port_data_ = std::make_unique<MessagePortData>(nullptr);
71
822
  MessagePort::Entangle(parent_port_, child_port_data_.get());
72
73
822
  object()->Set(env->context(),
74
                env->message_port_string(),
75
2466
                parent_port_->object()).Check();
76
77
822
  object()->Set(env->context(),
78
                env->thread_id_string(),
79
2466
                Number::New(env->isolate(), static_cast<double>(thread_id_.id)))
80
      .Check();
81
82
1644
  inspector_parent_handle_ = GetInspectorParentHandle(
83
822
      env, thread_id_, url.c_str());
84
85
1644
  argv_ = std::vector<std::string>{env->argv()[0]};
86
  // Mark this Worker object as weak until we actually start the thread.
87
822
  MakeWeak();
88
89
822
  Debug(this, "Preparation for worker %llu finished", thread_id_.id);
90
}
91
92
2350
bool Worker::is_stopped() const {
93
4700
  Mutex::ScopedLock lock(mutex_);
94
2350
  if (env_ != nullptr)
95
583
    return env_->is_stopping();
96
1767
  return stopped_;
97
}
98
99
599
void Worker::UpdateResourceConstraints(ResourceConstraints* constraints) {
100
599
  constraints->set_stack_limit(reinterpret_cast<uint32_t*>(stack_base_));
101
102
599
  if (resource_limits_[kMaxYoungGenerationSizeMb] > 0) {
103
1
    constraints->set_max_young_generation_size_in_bytes(
104
1
        static_cast<size_t>(resource_limits_[kMaxYoungGenerationSizeMb] * kMB));
105
  } else {
106
598
    resource_limits_[kMaxYoungGenerationSizeMb] =
107
598
        constraints->max_young_generation_size_in_bytes() / kMB;
108
  }
109
110
599
  if (resource_limits_[kMaxOldGenerationSizeMb] > 0) {
111
3
    constraints->set_max_old_generation_size_in_bytes(
112
3
        static_cast<size_t>(resource_limits_[kMaxOldGenerationSizeMb] * kMB));
113
  } else {
114
596
    resource_limits_[kMaxOldGenerationSizeMb] =
115
596
        constraints->max_old_generation_size_in_bytes() / kMB;
116
  }
117
118
599
  if (resource_limits_[kCodeRangeSizeMb] > 0) {
119
1
    constraints->set_code_range_size_in_bytes(
120
1
        static_cast<size_t>(resource_limits_[kCodeRangeSizeMb] * kMB));
121
  } else {
122
598
    resource_limits_[kCodeRangeSizeMb] =
123
598
        constraints->code_range_size_in_bytes() / kMB;
124
  }
125
599
}
126
127
// This class contains data that is only relevant to the child thread itself,
128
// and only while it is running.
129
// (Eventually, the Environment instance should probably also be moved here.)
130
class WorkerThreadData {
131
 public:
132
819
  explicit WorkerThreadData(Worker* w)
133
819
    : w_(w) {
134
819
    int ret = uv_loop_init(&loop_);
135
819
    if (ret != 0) {
136
      char err_buf[128];
137
220
      uv_err_name_r(ret, err_buf, sizeof(err_buf));
138
220
      w->Exit(1, "ERR_WORKER_INIT_FAILED", err_buf);
139
220
      return;
140
    }
141
599
    loop_init_failed_ = false;
142
599
    uv_loop_configure(&loop_, UV_METRICS_IDLE_TIME);
143
144
    std::shared_ptr<ArrayBufferAllocator> allocator =
145
599
        ArrayBufferAllocator::Create();
146
599
    Isolate::CreateParams params;
147
599
    SetIsolateCreateParamsForNode(&params);
148
599
    params.array_buffer_allocator_shared = allocator;
149
150
599
    bool use_node_snapshot = per_process::cli_options->node_snapshot;
151
    const SnapshotData* snapshot_data =
152
599
        use_node_snapshot ? SnapshotBuilder::GetEmbeddedSnapshotData()
153
599
                          : nullptr;
154
599
    if (snapshot_data != nullptr) {
155
598
      SnapshotBuilder::InitializeIsolateParams(snapshot_data, &params);
156
    }
157
599
    w->UpdateResourceConstraints(&params.constraints);
158
159
599
    Isolate* isolate = Isolate::Allocate();
160
599
    if (isolate == nullptr) {
161
      w->Exit(1, "ERR_WORKER_INIT_FAILED", "Failed to create new Isolate");
162
      return;
163
    }
164
165
599
    w->platform_->RegisterIsolate(isolate, &loop_);
166
599
    Isolate::Initialize(isolate, params);
167
599
    SetIsolateUpForNode(isolate);
168
169
    // Be sure it's called before Environment::InitializeDiagnostics()
170
    // so that this callback stays when the callback of
171
    // --heapsnapshot-near-heap-limit gets is popped.
172
599
    isolate->AddNearHeapLimitCallback(Worker::NearHeapLimit, w);
173
174
    {
175
599
      Locker locker(isolate);
176
1198
      Isolate::Scope isolate_scope(isolate);
177
      // V8 computes its stack limit the first time a `Locker` is used based on
178
      // --stack-size. Reset it to the correct value.
179
599
      isolate->SetStackLimit(w->stack_base_);
180
181
599
      HandleScope handle_scope(isolate);
182
1198
      isolate_data_.reset(CreateIsolateData(isolate,
183
                                            &loop_,
184
599
                                            w_->platform_,
185
                                            allocator.get()));
186
599
      CHECK(isolate_data_);
187
599
      if (w_->per_isolate_opts_)
188
271
        isolate_data_->set_options(std::move(w_->per_isolate_opts_));
189
599
      isolate_data_->set_worker_context(w_);
190
599
      isolate_data_->max_young_gen_size =
191
599
          params.constraints.max_young_generation_size_in_bytes();
192
    }
193
194
599
    Mutex::ScopedLock lock(w_->mutex_);
195
599
    w_->isolate_ = isolate;
196
  }
197
198
819
  ~WorkerThreadData() {
199
819
    Debug(w_, "Worker %llu dispose isolate", w_->thread_id_.id);
200
    Isolate* isolate;
201
    {
202
819
      Mutex::ScopedLock lock(w_->mutex_);
203
819
      isolate = w_->isolate_;
204
819
      w_->isolate_ = nullptr;
205
    }
206
207
819
    if (isolate != nullptr) {
208
599
      CHECK(!loop_init_failed_);
209
599
      bool platform_finished = false;
210
211
599
      isolate_data_.reset();
212
213
599
      w_->platform_->AddIsolateFinishedCallback(isolate, [](void* data) {
214
599
        *static_cast<bool*>(data) = true;
215
599
      }, &platform_finished);
216
217
      // The order of these calls is important; if the Isolate is first disposed
218
      // and then unregistered, there is a race condition window in which no
219
      // new Isolate at the same address can successfully be registered with
220
      // the platform.
221
      // (Refs: https://github.com/nodejs/node/issues/30846)
222
599
      w_->platform_->UnregisterIsolate(isolate);
223
599
      isolate->Dispose();
224
225
      // Wait until the platform has cleaned up all relevant resources.
226
1198
      while (!platform_finished) {
227
599
        uv_run(&loop_, UV_RUN_ONCE);
228
      }
229
    }
230
819
    if (!loop_init_failed_) {
231
599
      CheckedUvLoopClose(&loop_);
232
    }
233
819
  }
234
235
599
  bool loop_is_usable() const { return !loop_init_failed_; }
236
237
 private:
238
  Worker* const w_;
239
  uv_loop_t loop_;
240
  bool loop_init_failed_ = true;
241
  DeleteFnPtr<IsolateData, FreeIsolateData> isolate_data_;
242
243
  friend class Worker;
244
};
245
246
3
size_t Worker::NearHeapLimit(void* data, size_t current_heap_limit,
247
                             size_t initial_heap_limit) {
248
3
  Worker* worker = static_cast<Worker*>(data);
249
3
  worker->Exit(1, "ERR_WORKER_OUT_OF_MEMORY", "JS heap out of memory");
250
  // Give the current GC some extra leeway to let it finish rather than
251
  // crash hard. We are not going to perform further allocations anyway.
252
3
  constexpr size_t kExtraHeapAllowance = 16 * 1024 * 1024;
253
3
  return current_heap_limit + kExtraHeapAllowance;
254
}
255
256
819
void Worker::Run() {
257
819
  std::string name = "WorkerThread ";
258
819
  name += std::to_string(thread_id_.id);
259

988
  TRACE_EVENT_METADATA1(
260
      "__metadata", "thread_name", "name",
261
      TRACE_STR_COPY(name.c_str()));
262
819
  CHECK_NOT_NULL(platform_);
263
264
819
  Debug(this, "Creating isolate for worker with id %llu", thread_id_.id);
265
266
819
  WorkerThreadData data(this);
267
819
  if (isolate_ == nullptr) return;
268
599
  CHECK(data.loop_is_usable());
269
270
599
  Debug(this, "Starting worker with id %llu", thread_id_.id);
271
  {
272
599
    Locker locker(isolate_);
273
599
    Isolate::Scope isolate_scope(isolate_);
274
599
    SealHandleScope outer_seal(isolate_);
275
276
599
    DeleteFnPtr<Environment, FreeEnvironment> env_;
277
599
    auto cleanup_env = OnScopeLeave([&]() {
278
      // TODO(addaleax): This call is harmless but should not be necessary.
279
      // Figure out why V8 is raising a DCHECK() here without it
280
      // (in test/parallel/test-async-hooks-worker-asyncfn-terminate-4.js).
281
599
      isolate_->CancelTerminateExecution();
282
283
599
      if (!env_) return;
284
584
      env_->set_can_call_into_js(false);
285
286
      {
287
584
        Mutex::ScopedLock lock(mutex_);
288
584
        stopped_ = true;
289
584
        this->env_ = nullptr;
290
      }
291
292
584
      env_.reset();
293
599
    });
294
295
599
    if (is_stopped()) return;
296
    {
297
588
      HandleScope handle_scope(isolate_);
298
      Local<Context> context;
299
      {
300
        // We create the Context object before we have an Environment* in place
301
        // that we could use for error handling. If creation fails due to
302
        // resource constraints, we need something in place to handle it,
303
        // though.
304
588
        TryCatch try_catch(isolate_);
305
588
        context = NewContext(isolate_);
306
588
        if (context.IsEmpty()) {
307
4
          Exit(1, "ERR_WORKER_INIT_FAILED", "Failed to create new Context");
308
4
          return;
309
        }
310
      }
311
312
584
      if (is_stopped()) return;
313
584
      CHECK(!context.IsEmpty());
314
584
      Context::Scope context_scope(context);
315
      {
316
1168
        env_.reset(CreateEnvironment(
317
            data.isolate_data_.get(),
318
            context,
319
584
            std::move(argv_),
320
584
            std::move(exec_argv_),
321
584
            static_cast<EnvironmentFlags::Flags>(environment_flags_),
322
            thread_id_,
323
584
            std::move(inspector_parent_handle_)));
324
584
        if (is_stopped()) return;
325
583
        CHECK_NOT_NULL(env_);
326
583
        env_->set_env_vars(std::move(env_vars_));
327
583
        SetProcessExitHandler(env_.get(), [this](Environment*, int exit_code) {
328
71
          Exit(exit_code);
329
71
        });
330
      }
331
      {
332
583
        Mutex::ScopedLock lock(mutex_);
333
583
        if (stopped_) return;
334
583
        this->env_ = env_.get();
335
      }
336
583
      Debug(this, "Created Environment for worker with id %llu", thread_id_.id);
337
583
      if (is_stopped()) return;
338
      {
339
583
        if (!CreateEnvMessagePort(env_.get())) {
340
          return;
341
        }
342
343
583
        Debug(this, "Created message port for worker %llu", thread_id_.id);
344
1166
        if (LoadEnvironment(env_.get(), StartExecutionCallback{}).IsEmpty())
345
          return;
346
347
583
        Debug(this, "Loaded environment for worker %llu", thread_id_.id);
348
      }
349
    }
350
351
    {
352
583
      Maybe<int> exit_code = SpinEventLoop(env_.get());
353
1166
      Mutex::ScopedLock lock(mutex_);
354

997
      if (exit_code_ == 0 && exit_code.IsJust()) {
355
395
        exit_code_ = exit_code.FromJust();
356
      }
357
358
583
      Debug(this, "Exiting thread for worker %llu with exit code %d",
359
583
            thread_id_.id, exit_code_);
360
    }
361
  }
362
363
583
  Debug(this, "Worker %llu thread stops", thread_id_.id);
364
}
365
366
583
bool Worker::CreateEnvMessagePort(Environment* env) {
367
1166
  HandleScope handle_scope(isolate_);
368
1166
  std::unique_ptr<MessagePortData> data;
369
  {
370
1166
    Mutex::ScopedLock lock(mutex_);
371
583
    data = std::move(child_port_data_);
372
  }
373
374
  // Set up the message channel for receiving messages in the child.
375
1166
  MessagePort* child_port = MessagePort::New(env,
376
                                             env->context(),
377
583
                                             std::move(data));
378
  // MessagePort::New() may return nullptr if execution is terminated
379
  // within it.
380
583
  if (child_port != nullptr)
381
583
    env->set_message_port(child_port->object(isolate_));
382
383
583
  return child_port;
384
}
385
386
823
void Worker::JoinThread() {
387
823
  if (!tid_.has_value())
388
4
    return;
389
819
  CHECK_EQ(uv_thread_join(&tid_.value()), 0);
390
819
  tid_.reset();
391
392
819
  env()->remove_sub_worker_context(this);
393
394
  {
395
1636
    HandleScope handle_scope(env()->isolate());
396
819
    Context::Scope context_scope(env()->context());
397
398
    // Reset the parent port as we're closing it now anyway.
399
819
    object()->Set(env()->context(),
400
                  env()->message_port_string(),
401
2457
                  Undefined(env()->isolate())).Check();
402
403
    Local<Value> args[] = {
404
        Integer::New(env()->isolate(), exit_code_),
405
819
        custom_error_ != nullptr
406
454
            ? OneByteString(env()->isolate(), custom_error_).As<Value>()
407
1776
            : Null(env()->isolate()).As<Value>(),
408
1638
        !custom_error_str_.empty()
409
227
            ? OneByteString(env()->isolate(), custom_error_str_.c_str())
410
227
                  .As<Value>()
411
1776
            : Null(env()->isolate()).As<Value>(),
412

2457
    };
413
414
819
    MakeCallback(env()->onexit_string(), arraysize(args), args);
415
  }
416
417
  // If we get here, the tid_.has_value() condition at the top of the function
418
  // implies that the thread was running. In that case, its final action will
419
  // be to schedule a callback on the parent thread which will delete this
420
  // object, so there's nothing more to do here.
421
}
422
423
3192
Worker::~Worker() {
424
3192
  Mutex::ScopedLock lock(mutex_);
425
426
1596
  CHECK(stopped_);
427
1596
  CHECK_NULL(env_);
428
1596
  CHECK(!tid_.has_value());
429
430
1596
  Debug(this, "Worker %llu destroyed", thread_id_.id);
431
3192
}
432
433
826
void Worker::New(const FunctionCallbackInfo<Value>& args) {
434
826
  Environment* env = Environment::GetCurrent(args);
435
826
  Isolate* isolate = args.GetIsolate();
436
437
826
  CHECK(args.IsConstructCall());
438
439
826
  if (env->isolate_data()->platform() == nullptr) {
440
    THROW_ERR_MISSING_PLATFORM_FOR_WORKER(env);
441
4
    return;
442
  }
443
444
826
  std::string url;
445
826
  std::shared_ptr<PerIsolateOptions> per_isolate_opts = nullptr;
446
826
  std::shared_ptr<KVStore> env_vars = nullptr;
447
448
826
  std::vector<std::string> exec_argv_out;
449
450
  // Argument might be a string or URL
451
1652
  if (!args[0]->IsNullOrUndefined()) {
452
    Utf8Value value(
453
1044
        isolate, args[0]->ToString(env->context()).FromMaybe(Local<String>()));
454
348
    url.append(value.out(), value.length());
455
  }
456
457
1652
  if (args[1]->IsNull()) {
458
    // Means worker.env = { ...process.env }.
459
799
    env_vars = env->env_vars()->Clone(isolate);
460
27
  } else if (args[1]->IsObject()) {
461
    // User provided env.
462
26
    env_vars = KVStore::CreateMapKVStore();
463
26
    env_vars->AssignFromObject(isolate->GetCurrentContext(),
464
78
                               args[1].As<Object>());
465
  } else {
466
    // Env is shared.
467
1
    env_vars = env->env_vars();
468
  }
469
470

1626
  if (args[1]->IsObject() || args[2]->IsArray()) {
471
275
    per_isolate_opts.reset(new PerIsolateOptions());
472
473
275
    HandleEnvOptions(per_isolate_opts->per_env, [&env_vars](const char* name) {
474
2200
      return env_vars->Get(name).FromMaybe("");
475
    });
476
477
#ifndef NODE_WITHOUT_NODE_OPTIONS
478
    MaybeLocal<String> maybe_node_opts =
479
275
        env_vars->Get(isolate, OneByteString(isolate, "NODE_OPTIONS"));
480
    Local<String> node_opts;
481
275
    if (maybe_node_opts.ToLocal(&node_opts)) {
482
822
      std::string node_options(*String::Utf8Value(isolate, node_opts));
483
274
      std::vector<std::string> errors{};
484
      std::vector<std::string> env_argv =
485
274
          ParseNodeOptionsEnvVar(node_options, &errors);
486
      // [0] is expected to be the program name, add dummy string.
487
274
      env_argv.insert(env_argv.begin(), "");
488
274
      std::vector<std::string> invalid_args{};
489
274
      options_parser::Parse(&env_argv,
490
                            nullptr,
491
                            &invalid_args,
492
                            per_isolate_opts.get(),
493
                            kAllowedInEnvironment,
494
                            &errors);
495

275
      if (!errors.empty() && args[1]->IsObject()) {
496
        // Only fail for explicitly provided env, this protects from failures
497
        // when NODE_OPTIONS from parent's env is used (which is the default).
498
        Local<Value> error;
499
2
        if (!ToV8Value(env->context(), errors).ToLocal(&error)) return;
500
        Local<String> key =
501
1
            FIXED_ONE_BYTE_STRING(env->isolate(), "invalidNodeOptions");
502
        // Ignore the return value of Set() because exceptions bubble up to JS
503
        // when we return anyway.
504
1
        USE(args.This()->Set(env->context(), key, error));
505
1
        return;
506
      }
507
    }
508
#endif  // NODE_WITHOUT_NODE_OPTIONS
509
  }
510
511
825
  if (args[2]->IsArray()) {
512
272
    Local<Array> array = args[2].As<Array>();
513
    // The first argument is reserved for program name, but we don't need it
514
    // in workers.
515
1088
    std::vector<std::string> exec_argv = {""};
516
272
    uint32_t length = array->Length();
517
392
    for (uint32_t i = 0; i < length; i++) {
518
      Local<Value> arg;
519
240
      if (!array->Get(env->context(), i).ToLocal(&arg)) {
520
        return;
521
      }
522
      Local<String> arg_v8;
523
240
      if (!arg->ToString(env->context()).ToLocal(&arg_v8)) {
524
        return;
525
      }
526
240
      Utf8Value arg_utf8_value(args.GetIsolate(), arg_v8);
527
240
      std::string arg_string(arg_utf8_value.out(), arg_utf8_value.length());
528
120
      exec_argv.push_back(arg_string);
529
    }
530
531
272
    std::vector<std::string> invalid_args{};
532
272
    std::vector<std::string> errors{};
533
    // Using invalid_args as the v8_args argument as it stores unknown
534
    // options for the per isolate parser.
535
272
    options_parser::Parse(
536
        &exec_argv,
537
        &exec_argv_out,
538
        &invalid_args,
539
        per_isolate_opts.get(),
540
        kDisallowedInEnvironment,
541
        &errors);
542
543
    // The first argument is program name.
544
272
    invalid_args.erase(invalid_args.begin());
545

272
    if (errors.size() > 0 || invalid_args.size() > 0) {
546
      Local<Value> error;
547
3
      if (!ToV8Value(env->context(),
548
3
                     errors.size() > 0 ? errors : invalid_args)
549
3
                         .ToLocal(&error)) {
550
        return;
551
      }
552
      Local<String> key =
553
3
          FIXED_ONE_BYTE_STRING(env->isolate(), "invalidExecArgv");
554
      // Ignore the return value of Set() because exceptions bubble up to JS
555
      // when we return anyway.
556
3
      USE(args.This()->Set(env->context(), key, error));
557
3
      return;
558
    }
559
  } else {
560
553
    exec_argv_out = env->exec_argv();
561
  }
562
563
  Worker* worker = new Worker(env,
564
1644
                              args.This(),
565
                              url,
566
                              per_isolate_opts,
567
822
                              std::move(exec_argv_out),
568
822
                              env_vars);
569
570
822
  CHECK(args[3]->IsFloat64Array());
571
1644
  Local<Float64Array> limit_info = args[3].As<Float64Array>();
572
822
  CHECK_EQ(limit_info->Length(), kTotalResourceLimitCount);
573
822
  limit_info->CopyContents(worker->resource_limits_,
574
                           sizeof(worker->resource_limits_));
575
576
822
  CHECK(args[4]->IsBoolean());
577

822
  if (args[4]->IsTrue() || env->tracks_unmanaged_fds())
578
821
    worker->environment_flags_ |= EnvironmentFlags::kTrackUnmanagedFds;
579
822
  if (env->hide_console_windows())
580
    worker->environment_flags_ |= EnvironmentFlags::kHideConsoleWindows;
581
822
  if (env->no_native_addons())
582
4
    worker->environment_flags_ |= EnvironmentFlags::kNoNativeAddons;
583
822
  if (env->no_global_search_paths())
584
    worker->environment_flags_ |= EnvironmentFlags::kNoGlobalSearchPaths;
585
822
  if (env->no_browser_globals())
586
    worker->environment_flags_ |= EnvironmentFlags::kNoBrowserGlobals;
587
}
588
589
819
void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
590
  Worker* w;
591
819
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
592
1638
  Mutex::ScopedLock lock(w->mutex_);
593
594
819
  w->stopped_ = false;
595
596
819
  if (w->resource_limits_[kStackSizeMb] > 0) {
597
9
    if (w->resource_limits_[kStackSizeMb] * kMB < kStackBufferSize) {
598
3
      w->resource_limits_[kStackSizeMb] = kStackBufferSize / kMB;
599
3
      w->stack_size_ = kStackBufferSize;
600
    } else {
601
12
      w->stack_size_ =
602
6
          static_cast<size_t>(w->resource_limits_[kStackSizeMb] * kMB);
603
    }
604
  } else {
605
810
    w->resource_limits_[kStackSizeMb] = w->stack_size_ / kMB;
606
  }
607
608
  uv_thread_options_t thread_options;
609
819
  thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
610
819
  thread_options.stack_size = w->stack_size_;
611
612
819
  uv_thread_t* tid = &w->tid_.emplace();  // Create uv_thread_t instance
613
819
  int ret = uv_thread_create_ex(tid, &thread_options, [](void* arg) {
614
    // XXX: This could become a std::unique_ptr, but that makes at least
615
    // gcc 6.3 detect undefined behaviour when there shouldn't be any.
616
    // gcc 7+ handles this well.
617
819
    Worker* w = static_cast<Worker*>(arg);
618
819
    const uintptr_t stack_top = reinterpret_cast<uintptr_t>(&arg);
619
620
    // Leave a few kilobytes just to make sure we're within limits and have
621
    // some space to do work in C++ land.
622
819
    w->stack_base_ = stack_top - (w->stack_size_ - kStackBufferSize);
623
624
819
    w->Run();
625
626
819
    Mutex::ScopedLock lock(w->mutex_);
627
819
    w->env()->SetImmediateThreadsafe(
628
797
        [w = std::unique_ptr<Worker>(w)](Environment* env) {
629
797
          if (w->has_ref_)
630
795
            env->add_refs(-1);
631
797
          w->JoinThread();
632
          // implicitly delete w
633
795
        });
634
819
  }, static_cast<void*>(w));
635
636
819
  if (ret == 0) {
637
    // The object now owns the created thread and should not be garbage
638
    // collected until that finishes.
639
819
    w->ClearWeak();
640
641
819
    if (w->has_ref_)
642
819
      w->env()->add_refs(1);
643
644
819
    w->env()->add_sub_worker_context(w);
645
  } else {
646
    w->stopped_ = true;
647
    w->tid_.reset();
648
649
    char err_buf[128];
650
    uv_err_name_r(ret, err_buf, sizeof(err_buf));
651
    {
652
      Isolate* isolate = w->env()->isolate();
653
      HandleScope handle_scope(isolate);
654
      THROW_ERR_WORKER_INIT_FAILED(isolate, err_buf);
655
    }
656
  }
657
}
658
659
243
void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
660
  Worker* w;
661
243
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
662
663
243
  Debug(w, "Worker %llu is getting stopped by parent", w->thread_id_.id);
664
243
  w->Exit(1);
665
}
666
667
246
void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
668
  Worker* w;
669
246
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
670

246
  if (!w->has_ref_ && w->tid_.has_value()) {
671
4
    w->has_ref_ = true;
672
4
    w->env()->add_refs(1);
673
  }
674
}
675
676
5
void Worker::HasRef(const FunctionCallbackInfo<Value>& args) {
677
  Worker* w;
678
5
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
679
8
  args.GetReturnValue().Set(w->has_ref_);
680
}
681
682
6
void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
683
  Worker* w;
684
6
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
685

6
  if (w->has_ref_ && w->tid_.has_value()) {
686
6
    w->has_ref_ = false;
687
6
    w->env()->add_refs(-1);
688
  }
689
}
690
691
1
void Worker::GetResourceLimits(const FunctionCallbackInfo<Value>& args) {
692
  Worker* w;
693
1
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
694
2
  args.GetReturnValue().Set(w->GetResourceLimits(args.GetIsolate()));
695
}
696
697
585
Local<Float64Array> Worker::GetResourceLimits(Isolate* isolate) const {
698
585
  Local<ArrayBuffer> ab = ArrayBuffer::New(isolate, sizeof(resource_limits_));
699
700
585
  memcpy(ab->GetBackingStore()->Data(),
701
585
         resource_limits_,
702
         sizeof(resource_limits_));
703
585
  return Float64Array::New(ab, 0, kTotalResourceLimitCount);
704
}
705
706
567
void Worker::Exit(int code, const char* error_code, const char* error_message) {
707
1134
  Mutex::ScopedLock lock(mutex_);
708
567
  Debug(this, "Worker %llu called Exit(%d, %s, %s)",
709
567
        thread_id_.id, code, error_code, error_message);
710
711
567
  if (error_code != nullptr) {
712
227
    custom_error_ = error_code;
713
227
    custom_error_str_ = error_message;
714
  }
715
716
567
  if (env_ != nullptr) {
717
192
    exit_code_ = code;
718
192
    Stop(env_);
719
  } else {
720
375
    stopped_ = true;
721
  }
722
567
}
723
724
void Worker::MemoryInfo(MemoryTracker* tracker) const {
725
  tracker->TrackField("parent_port", parent_port_);
726
}
727
728
bool Worker::IsNotIndicativeOfMemoryLeakAtExit() const {
729
  // Worker objects always stay alive as long as the child thread, regardless
730
  // of whether they are being referenced in the parent thread.
731
  return true;
732
}
733
734
class WorkerHeapSnapshotTaker : public AsyncWrap {
735
 public:
736
1
  WorkerHeapSnapshotTaker(Environment* env, Local<Object> obj)
737
1
    : AsyncWrap(env, obj, AsyncWrap::PROVIDER_WORKERHEAPSNAPSHOT) {}
738
739
  SET_NO_MEMORY_INFO()
740
  SET_MEMORY_INFO_NAME(WorkerHeapSnapshotTaker)
741
  SET_SELF_SIZE(WorkerHeapSnapshotTaker)
742
};
743
744
1
void Worker::TakeHeapSnapshot(const FunctionCallbackInfo<Value>& args) {
745
  Worker* w;
746
1
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
747
748
1
  Debug(w, "Worker %llu taking heap snapshot", w->thread_id_.id);
749
750
1
  Environment* env = w->env();
751
1
  AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
752
  Local<Object> wrap;
753
1
  if (!env->worker_heap_snapshot_taker_template()
754
2
      ->NewInstance(env->context()).ToLocal(&wrap)) {
755
    return;
756
  }
757
  BaseObjectPtr<WorkerHeapSnapshotTaker> taker =
758
1
      MakeDetachedBaseObject<WorkerHeapSnapshotTaker>(env, wrap);
759
760
  // Interrupt the worker thread and take a snapshot, then schedule a call
761
  // on the parent thread that turns that snapshot into a readable stream.
762
1
  bool scheduled = w->RequestInterrupt([taker, env](Environment* worker_env) {
763
    heap::HeapSnapshotPointer snapshot {
764
1
        worker_env->isolate()->GetHeapProfiler()->TakeHeapSnapshot() };
765
1
    CHECK(snapshot);
766
2
    env->SetImmediateThreadsafe(
767
1
        [taker, snapshot = std::move(snapshot)](Environment* env) mutable {
768
2
          HandleScope handle_scope(env->isolate());
769
1
          Context::Scope context_scope(env->context());
770
771
2
          AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker.get());
772
          BaseObjectPtr<AsyncWrap> stream = heap::CreateHeapSnapshotStream(
773
2
              env, std::move(snapshot));
774
1
          Local<Value> args[] = { stream->object() };
775
1
          taker->MakeCallback(env->ondone_string(), arraysize(args), args);
776
1
        }, CallbackFlags::kUnrefed);
777
1
  });
778
2
  args.GetReturnValue().Set(scheduled ? taker->object() : Local<Object>());
779
}
780
781
7
void Worker::LoopIdleTime(const FunctionCallbackInfo<Value>& args) {
782
  Worker* w;
783
7
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
784
785
7
  Mutex::ScopedLock lock(w->mutex_);
786
  // Using w->is_stopped() here leads to a deadlock, and checking is_stopped()
787
  // before locking the mutex is a race condition. So manually do the same
788
  // check.
789

7
  if (w->stopped_ || w->env_ == nullptr)
790
    return args.GetReturnValue().Set(-1);
791
792
7
  uint64_t idle_time = uv_metrics_idle_time(w->env_->event_loop());
793
14
  args.GetReturnValue().Set(1.0 * idle_time / 1e6);
794
}
795
796
1
void Worker::LoopStartTime(const FunctionCallbackInfo<Value>& args) {
797
  Worker* w;
798
1
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
799
800
1
  Mutex::ScopedLock lock(w->mutex_);
801
  // Using w->is_stopped() here leads to a deadlock, and checking is_stopped()
802
  // before locking the mutex is a race condition. So manually do the same
803
  // check.
804

1
  if (w->stopped_ || w->env_ == nullptr)
805
    return args.GetReturnValue().Set(-1);
806
807
1
  double loop_start_time = w->env_->performance_state()->milestones[
808
1
      node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START];
809
1
  CHECK_GE(loop_start_time, 0);
810
1
  args.GetReturnValue().Set(
811
1
      (loop_start_time - node::performance::timeOrigin) / 1e6);
812
}
813
814
namespace {
815
816
// Return the MessagePort that is global for this Environment and communicates
817
// with the internal [kPort] port of the JS Worker class in the parent thread.
818
1167
void GetEnvMessagePort(const FunctionCallbackInfo<Value>& args) {
819
1167
  Environment* env = Environment::GetCurrent(args);
820
1167
  Local<Object> port = env->message_port();
821

2334
  CHECK_IMPLIES(!env->is_main_thread(), !port.IsEmpty());
822
1167
  if (!port.IsEmpty()) {
823
3501
    CHECK_EQ(port->GetCreationContext().ToLocalChecked()->GetIsolate(),
824
             args.GetIsolate());
825
2334
    args.GetReturnValue().Set(port);
826
  }
827
1167
}
828
829
640
void InitWorker(Local<Object> target,
830
                Local<Value> unused,
831
                Local<Context> context,
832
                void* priv) {
833
640
  Environment* env = Environment::GetCurrent(context);
834
835
  {
836
640
    Local<FunctionTemplate> w = env->NewFunctionTemplate(Worker::New);
837
838
1280
    w->InstanceTemplate()->SetInternalFieldCount(
839
        Worker::kInternalFieldCount);
840
640
    w->Inherit(AsyncWrap::GetConstructorTemplate(env));
841
842
640
    env->SetProtoMethod(w, "startThread", Worker::StartThread);
843
640
    env->SetProtoMethod(w, "stopThread", Worker::StopThread);
844
640
    env->SetProtoMethod(w, "hasRef", Worker::HasRef);
845
640
    env->SetProtoMethod(w, "ref", Worker::Ref);
846
640
    env->SetProtoMethod(w, "unref", Worker::Unref);
847
640
    env->SetProtoMethod(w, "getResourceLimits", Worker::GetResourceLimits);
848
640
    env->SetProtoMethod(w, "takeHeapSnapshot", Worker::TakeHeapSnapshot);
849
640
    env->SetProtoMethod(w, "loopIdleTime", Worker::LoopIdleTime);
850
640
    env->SetProtoMethod(w, "loopStartTime", Worker::LoopStartTime);
851
852
640
    env->SetConstructorFunction(target, "Worker", w);
853
  }
854
855
  {
856
640
    Local<FunctionTemplate> wst = FunctionTemplate::New(env->isolate());
857
858
1280
    wst->InstanceTemplate()->SetInternalFieldCount(
859
        WorkerHeapSnapshotTaker::kInternalFieldCount);
860
640
    wst->Inherit(AsyncWrap::GetConstructorTemplate(env));
861
862
    Local<String> wst_string =
863
640
        FIXED_ONE_BYTE_STRING(env->isolate(), "WorkerHeapSnapshotTaker");
864
640
    wst->SetClassName(wst_string);
865
640
    env->set_worker_heap_snapshot_taker_template(wst->InstanceTemplate());
866
  }
867
868
640
  env->SetMethod(target, "getEnvMessagePort", GetEnvMessagePort);
869
870
  target
871
640
      ->Set(env->context(),
872
            env->thread_id_string(),
873
2560
            Number::New(env->isolate(), static_cast<double>(env->thread_id())))
874
      .Check();
875
876
  target
877
640
      ->Set(env->context(),
878
            FIXED_ONE_BYTE_STRING(env->isolate(), "isMainThread"),
879
2560
            Boolean::New(env->isolate(), env->is_main_thread()))
880
      .Check();
881
882
  target
883
640
      ->Set(env->context(),
884
            FIXED_ONE_BYTE_STRING(env->isolate(), "ownsProcessState"),
885
1920
            Boolean::New(env->isolate(), env->owns_process_state()))
886
      .Check();
887
888
640
  if (!env->is_main_thread()) {
889
    target
890
584
        ->Set(env->context(),
891
              FIXED_ONE_BYTE_STRING(env->isolate(), "resourceLimits"),
892
2336
              env->worker_context()->GetResourceLimits(env->isolate()))
893
        .Check();
894
  }
895
896
1920
  NODE_DEFINE_CONSTANT(target, kMaxYoungGenerationSizeMb);
897
1920
  NODE_DEFINE_CONSTANT(target, kMaxOldGenerationSizeMb);
898
1920
  NODE_DEFINE_CONSTANT(target, kCodeRangeSizeMb);
899
1920
  NODE_DEFINE_CONSTANT(target, kStackSizeMb);
900
1280
  NODE_DEFINE_CONSTANT(target, kTotalResourceLimitCount);
901
640
}
902
903
5015
void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
904
5015
  registry->Register(GetEnvMessagePort);
905
5015
  registry->Register(Worker::New);
906
5015
  registry->Register(Worker::StartThread);
907
5015
  registry->Register(Worker::StopThread);
908
5015
  registry->Register(Worker::HasRef);
909
5015
  registry->Register(Worker::Ref);
910
5015
  registry->Register(Worker::Unref);
911
5015
  registry->Register(Worker::GetResourceLimits);
912
5015
  registry->Register(Worker::TakeHeapSnapshot);
913
5015
  registry->Register(Worker::LoopIdleTime);
914
5015
  registry->Register(Worker::LoopStartTime);
915
5015
}
916
917
}  // anonymous namespace
918
}  // namespace worker
919
}  // namespace node
920
921
5083
NODE_MODULE_CONTEXT_AWARE_INTERNAL(worker, node::worker::InitWorker)
922
5015
NODE_MODULE_EXTERNAL_REFERENCE(worker, node::worker::RegisterExternalReferences)