GCC Code Coverage Report
Directory: ./ Exec Total Coverage
File: node_worker.cc Lines: 446 479 93.1 %
Date: 2022-05-16 04:15:04 Branches: 170 236 72.0 %

Line Branch Exec Source
1
#include "node_worker.h"
2
#include "debug_utils-inl.h"
3
#include "histogram-inl.h"
4
#include "memory_tracker-inl.h"
5
#include "node_errors.h"
6
#include "node_external_reference.h"
7
#include "node_buffer.h"
8
#include "node_options-inl.h"
9
#include "node_perf.h"
10
#include "node_snapshot_builder.h"
11
#include "util-inl.h"
12
#include "async_wrap-inl.h"
13
14
#include <memory>
15
#include <string>
16
#include <vector>
17
18
using node::kAllowedInEnvironment;
19
using node::kDisallowedInEnvironment;
20
using v8::Array;
21
using v8::ArrayBuffer;
22
using v8::Boolean;
23
using v8::Context;
24
using v8::Float64Array;
25
using v8::FunctionCallbackInfo;
26
using v8::FunctionTemplate;
27
using v8::HandleScope;
28
using v8::Integer;
29
using v8::Isolate;
30
using v8::Local;
31
using v8::Locker;
32
using v8::Maybe;
33
using v8::MaybeLocal;
34
using v8::Null;
35
using v8::Number;
36
using v8::Object;
37
using v8::ResourceConstraints;
38
using v8::SealHandleScope;
39
using v8::String;
40
using v8::TryCatch;
41
using v8::Value;
42
43
namespace node {
44
namespace worker {
45
46
constexpr double kMB = 1024 * 1024;
47
48
1031
Worker::Worker(Environment* env,
49
               Local<Object> wrap,
50
               const std::string& url,
51
               std::shared_ptr<PerIsolateOptions> per_isolate_opts,
52
               std::vector<std::string>&& exec_argv,
53
               std::shared_ptr<KVStore> env_vars,
54
1031
               const SnapshotData* snapshot_data)
55
    : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER),
56
      per_isolate_opts_(per_isolate_opts),
57
      exec_argv_(exec_argv),
58
1031
      platform_(env->isolate_data()->platform()),
59
1031
      thread_id_(AllocateEnvironmentThreadId()),
60
      env_vars_(env_vars),
61
2062
      snapshot_data_(snapshot_data) {
62
1031
  Debug(this, "Creating new worker instance with thread id %llu",
63
1031
        thread_id_.id);
64
65
  // Set up everything that needs to be set up in the parent environment.
66
1031
  parent_port_ = MessagePort::New(env, env->context());
67
1031
  if (parent_port_ == nullptr) {
68
    // This can happen e.g. because execution is terminating.
69
    return;
70
  }
71
72
1031
  child_port_data_ = std::make_unique<MessagePortData>(nullptr);
73
1031
  MessagePort::Entangle(parent_port_, child_port_data_.get());
74
75
1031
  object()->Set(env->context(),
76
                env->message_port_string(),
77
3093
                parent_port_->object()).Check();
78
79
1031
  object()->Set(env->context(),
80
                env->thread_id_string(),
81
3093
                Number::New(env->isolate(), static_cast<double>(thread_id_.id)))
82
      .Check();
83
84
2062
  inspector_parent_handle_ = GetInspectorParentHandle(
85
1031
      env, thread_id_, url.c_str());
86
87
2062
  argv_ = std::vector<std::string>{env->argv()[0]};
88
  // Mark this Worker object as weak until we actually start the thread.
89
1031
  MakeWeak();
90
91
1031
  Debug(this, "Preparation for worker %llu finished", thread_id_.id);
92
}
93
94
3216
bool Worker::is_stopped() const {
95
6432
  Mutex::ScopedLock lock(mutex_);
96
3216
  if (env_ != nullptr)
97
798
    return env_->is_stopping();
98
2418
  return stopped_;
99
}
100
101
810
void Worker::UpdateResourceConstraints(ResourceConstraints* constraints) {
102
810
  constraints->set_stack_limit(reinterpret_cast<uint32_t*>(stack_base_));
103
104
810
  if (resource_limits_[kMaxYoungGenerationSizeMb] > 0) {
105
1
    constraints->set_max_young_generation_size_in_bytes(
106
1
        static_cast<size_t>(resource_limits_[kMaxYoungGenerationSizeMb] * kMB));
107
  } else {
108
809
    resource_limits_[kMaxYoungGenerationSizeMb] =
109
809
        constraints->max_young_generation_size_in_bytes() / kMB;
110
  }
111
112
810
  if (resource_limits_[kMaxOldGenerationSizeMb] > 0) {
113
3
    constraints->set_max_old_generation_size_in_bytes(
114
3
        static_cast<size_t>(resource_limits_[kMaxOldGenerationSizeMb] * kMB));
115
  } else {
116
807
    resource_limits_[kMaxOldGenerationSizeMb] =
117
807
        constraints->max_old_generation_size_in_bytes() / kMB;
118
  }
119
120
810
  if (resource_limits_[kCodeRangeSizeMb] > 0) {
121
1
    constraints->set_code_range_size_in_bytes(
122
1
        static_cast<size_t>(resource_limits_[kCodeRangeSizeMb] * kMB));
123
  } else {
124
809
    resource_limits_[kCodeRangeSizeMb] =
125
809
        constraints->code_range_size_in_bytes() / kMB;
126
  }
127
810
}
128
129
// This class contains data that is only relevant to the child thread itself,
130
// and only while it is running.
131
// (Eventually, the Environment instance should probably also be moved here.)
132
class WorkerThreadData {
133
 public:
134
1028
  explicit WorkerThreadData(Worker* w)
135
1028
    : w_(w) {
136
1028
    int ret = uv_loop_init(&loop_);
137
1028
    if (ret != 0) {
138
      char err_buf[128];
139
218
      uv_err_name_r(ret, err_buf, sizeof(err_buf));
140
218
      w->Exit(1, "ERR_WORKER_INIT_FAILED", err_buf);
141
218
      return;
142
    }
143
810
    loop_init_failed_ = false;
144
810
    uv_loop_configure(&loop_, UV_METRICS_IDLE_TIME);
145
146
    std::shared_ptr<ArrayBufferAllocator> allocator =
147
810
        ArrayBufferAllocator::Create();
148
810
    Isolate::CreateParams params;
149
810
    SetIsolateCreateParamsForNode(&params);
150
810
    params.array_buffer_allocator_shared = allocator;
151
152
810
    if (w->snapshot_data() != nullptr) {
153
809
      SnapshotBuilder::InitializeIsolateParams(w->snapshot_data(), &params);
154
    }
155
810
    w->UpdateResourceConstraints(&params.constraints);
156
157
810
    Isolate* isolate = Isolate::Allocate();
158
810
    if (isolate == nullptr) {
159
      w->Exit(1, "ERR_WORKER_INIT_FAILED", "Failed to create new Isolate");
160
      return;
161
    }
162
163
810
    w->platform_->RegisterIsolate(isolate, &loop_);
164
810
    Isolate::Initialize(isolate, params);
165
810
    SetIsolateUpForNode(isolate);
166
167
    // Be sure it's called before Environment::InitializeDiagnostics()
168
    // so that this callback stays when the callback of
169
    // --heapsnapshot-near-heap-limit gets is popped.
170
810
    isolate->AddNearHeapLimitCallback(Worker::NearHeapLimit, w);
171
172
    {
173
810
      Locker locker(isolate);
174
1620
      Isolate::Scope isolate_scope(isolate);
175
      // V8 computes its stack limit the first time a `Locker` is used based on
176
      // --stack-size. Reset it to the correct value.
177
810
      isolate->SetStackLimit(w->stack_base_);
178
179
810
      HandleScope handle_scope(isolate);
180
1620
      isolate_data_.reset(CreateIsolateData(isolate,
181
                                            &loop_,
182
810
                                            w_->platform_,
183
                                            allocator.get()));
184
810
      CHECK(isolate_data_);
185
810
      if (w_->per_isolate_opts_)
186
272
        isolate_data_->set_options(std::move(w_->per_isolate_opts_));
187
810
      isolate_data_->set_worker_context(w_);
188
810
      isolate_data_->max_young_gen_size =
189
810
          params.constraints.max_young_generation_size_in_bytes();
190
    }
191
192
810
    Mutex::ScopedLock lock(w_->mutex_);
193
810
    w_->isolate_ = isolate;
194
  }
195
196
1028
  ~WorkerThreadData() {
197
1028
    Debug(w_, "Worker %llu dispose isolate", w_->thread_id_.id);
198
    Isolate* isolate;
199
    {
200
1028
      Mutex::ScopedLock lock(w_->mutex_);
201
1028
      isolate = w_->isolate_;
202
1028
      w_->isolate_ = nullptr;
203
    }
204
205
1028
    if (isolate != nullptr) {
206
810
      CHECK(!loop_init_failed_);
207
810
      bool platform_finished = false;
208
209
810
      isolate_data_.reset();
210
211
810
      w_->platform_->AddIsolateFinishedCallback(isolate, [](void* data) {
212
810
        *static_cast<bool*>(data) = true;
213
810
      }, &platform_finished);
214
215
      // The order of these calls is important; if the Isolate is first disposed
216
      // and then unregistered, there is a race condition window in which no
217
      // new Isolate at the same address can successfully be registered with
218
      // the platform.
219
      // (Refs: https://github.com/nodejs/node/issues/30846)
220
810
      w_->platform_->UnregisterIsolate(isolate);
221
810
      isolate->Dispose();
222
223
      // Wait until the platform has cleaned up all relevant resources.
224
1620
      while (!platform_finished) {
225
810
        uv_run(&loop_, UV_RUN_ONCE);
226
      }
227
    }
228
1028
    if (!loop_init_failed_) {
229
810
      CheckedUvLoopClose(&loop_);
230
    }
231
1028
  }
232
233
810
  bool loop_is_usable() const { return !loop_init_failed_; }
234
235
 private:
236
  Worker* const w_;
237
  uv_loop_t loop_;
238
  bool loop_init_failed_ = true;
239
  DeleteFnPtr<IsolateData, FreeIsolateData> isolate_data_;
240
  const SnapshotData* snapshot_data_ = nullptr;
241
  friend class Worker;
242
};
243
244
3
size_t Worker::NearHeapLimit(void* data, size_t current_heap_limit,
245
                             size_t initial_heap_limit) {
246
3
  Worker* worker = static_cast<Worker*>(data);
247
3
  worker->Exit(1, "ERR_WORKER_OUT_OF_MEMORY", "JS heap out of memory");
248
  // Give the current GC some extra leeway to let it finish rather than
249
  // crash hard. We are not going to perform further allocations anyway.
250
3
  constexpr size_t kExtraHeapAllowance = 16 * 1024 * 1024;
251
3
  return current_heap_limit + kExtraHeapAllowance;
252
}
253
254
1028
void Worker::Run() {
255
1028
  std::string name = "WorkerThread ";
256
1028
  name += std::to_string(thread_id_.id);
257

1199
  TRACE_EVENT_METADATA1(
258
      "__metadata", "thread_name", "name",
259
      TRACE_STR_COPY(name.c_str()));
260
1028
  CHECK_NOT_NULL(platform_);
261
262
1028
  Debug(this, "Creating isolate for worker with id %llu", thread_id_.id);
263
264
1028
  WorkerThreadData data(this);
265
1028
  if (isolate_ == nullptr) return;
266
810
  CHECK(data.loop_is_usable());
267
268
810
  Debug(this, "Starting worker with id %llu", thread_id_.id);
269
  {
270
810
    Locker locker(isolate_);
271
810
    Isolate::Scope isolate_scope(isolate_);
272
810
    SealHandleScope outer_seal(isolate_);
273
274
810
    DeleteFnPtr<Environment, FreeEnvironment> env_;
275
810
    auto cleanup_env = OnScopeLeave([&]() {
276
      // TODO(addaleax): This call is harmless but should not be necessary.
277
      // Figure out why V8 is raising a DCHECK() here without it
278
      // (in test/parallel/test-async-hooks-worker-asyncfn-terminate-4.js).
279
810
      isolate_->CancelTerminateExecution();
280
281
810
      if (!env_) return;
282
804
      env_->set_can_call_into_js(false);
283
284
      {
285
804
        Mutex::ScopedLock lock(mutex_);
286
804
        stopped_ = true;
287
804
        this->env_ = nullptr;
288
      }
289
290
804
      env_.reset();
291
810
    });
292
293
810
    if (is_stopped()) return;
294
    {
295
804
      HandleScope handle_scope(isolate_);
296
      Local<Context> context;
297
      {
298
        // We create the Context object before we have an Environment* in place
299
        // that we could use for error handling. If creation fails due to
300
        // resource constraints, we need something in place to handle it,
301
        // though.
302
804
        TryCatch try_catch(isolate_);
303
804
        if (snapshot_data_ != nullptr) {
304
803
          context = Context::FromSnapshot(
305
803
                        isolate_, SnapshotBuilder::kNodeBaseContextIndex)
306
803
                        .ToLocalChecked();
307
1606
          if (!context.IsEmpty() &&
308

2409
              !InitializeContextRuntime(context).IsJust()) {
309
            context = Local<Context>();
310
          }
311
        } else {
312
1
          context = NewContext(isolate_);
313
        }
314
804
        if (context.IsEmpty()) {
315
          Exit(1, "ERR_WORKER_INIT_FAILED", "Failed to create new Context");
316
          return;
317
        }
318
      }
319
320
804
      if (is_stopped()) return;
321
804
      CHECK(!context.IsEmpty());
322
804
      Context::Scope context_scope(context);
323
      {
324
1608
        env_.reset(CreateEnvironment(
325
            data.isolate_data_.get(),
326
            context,
327
804
            std::move(argv_),
328
804
            std::move(exec_argv_),
329
804
            static_cast<EnvironmentFlags::Flags>(environment_flags_),
330
            thread_id_,
331
804
            std::move(inspector_parent_handle_)));
332
804
        if (is_stopped()) return;
333
798
        CHECK_NOT_NULL(env_);
334
798
        env_->set_env_vars(std::move(env_vars_));
335
798
        SetProcessExitHandler(env_.get(), [this](Environment*, int exit_code) {
336
72
          Exit(exit_code);
337
72
        });
338
      }
339
      {
340
798
        Mutex::ScopedLock lock(mutex_);
341
798
        if (stopped_) return;
342
798
        this->env_ = env_.get();
343
      }
344
798
      Debug(this, "Created Environment for worker with id %llu", thread_id_.id);
345
798
      if (is_stopped()) return;
346
      {
347
798
        if (!CreateEnvMessagePort(env_.get())) {
348
          return;
349
        }
350
351
798
        Debug(this, "Created message port for worker %llu", thread_id_.id);
352
1596
        if (LoadEnvironment(env_.get(), StartExecutionCallback{}).IsEmpty())
353
          return;
354
355
798
        Debug(this, "Loaded environment for worker %llu", thread_id_.id);
356
      }
357
    }
358
359
    {
360
798
      Maybe<int> exit_code = SpinEventLoop(env_.get());
361
1596
      Mutex::ScopedLock lock(mutex_);
362

1232
      if (exit_code_ == 0 && exit_code.IsJust()) {
363
415
        exit_code_ = exit_code.FromJust();
364
      }
365
366
798
      Debug(this, "Exiting thread for worker %llu with exit code %d",
367
798
            thread_id_.id, exit_code_);
368
    }
369
  }
370
371
798
  Debug(this, "Worker %llu thread stops", thread_id_.id);
372
}
373
374
798
bool Worker::CreateEnvMessagePort(Environment* env) {
375
1596
  HandleScope handle_scope(isolate_);
376
1596
  std::unique_ptr<MessagePortData> data;
377
  {
378
1596
    Mutex::ScopedLock lock(mutex_);
379
798
    data = std::move(child_port_data_);
380
  }
381
382
  // Set up the message channel for receiving messages in the child.
383
1596
  MessagePort* child_port = MessagePort::New(env,
384
                                             env->context(),
385
798
                                             std::move(data));
386
  // MessagePort::New() may return nullptr if execution is terminated
387
  // within it.
388
798
  if (child_port != nullptr)
389
798
    env->set_message_port(child_port->object(isolate_));
390
391
798
  return child_port;
392
}
393
394
1032
void Worker::JoinThread() {
395
1032
  if (!tid_.has_value())
396
4
    return;
397
1028
  CHECK_EQ(uv_thread_join(&tid_.value()), 0);
398
1028
  tid_.reset();
399
400
1028
  env()->remove_sub_worker_context(this);
401
402
  {
403
2054
    HandleScope handle_scope(env()->isolate());
404
1028
    Context::Scope context_scope(env()->context());
405
406
    // Reset the parent port as we're closing it now anyway.
407
1028
    object()->Set(env()->context(),
408
                  env()->message_port_string(),
409
3084
                  Undefined(env()->isolate())).Check();
410
411
    Local<Value> args[] = {
412
        Integer::New(env()->isolate(), exit_code_),
413
1028
        custom_error_ != nullptr
414
442
            ? OneByteString(env()->isolate(), custom_error_).As<Value>()
415
2421
            : Null(env()->isolate()).As<Value>(),
416
2056
        !custom_error_str_.empty()
417
221
            ? OneByteString(env()->isolate(), custom_error_str_.c_str())
418
221
                  .As<Value>()
419
2421
            : Null(env()->isolate()).As<Value>(),
420

3084
    };
421
422
1028
    MakeCallback(env()->onexit_string(), arraysize(args), args);
423
  }
424
425
  // If we get here, the tid_.has_value() condition at the top of the function
426
  // implies that the thread was running. In that case, its final action will
427
  // be to schedule a callback on the parent thread which will delete this
428
  // object, so there's nothing more to do here.
429
}
430
431
4028
Worker::~Worker() {
432
4028
  Mutex::ScopedLock lock(mutex_);
433
434
2014
  CHECK(stopped_);
435
2014
  CHECK_NULL(env_);
436
2014
  CHECK(!tid_.has_value());
437
438
2014
  Debug(this, "Worker %llu destroyed", thread_id_.id);
439
4028
}
440
441
1035
void Worker::New(const FunctionCallbackInfo<Value>& args) {
442
1035
  Environment* env = Environment::GetCurrent(args);
443
1035
  Isolate* isolate = args.GetIsolate();
444
445
1035
  CHECK(args.IsConstructCall());
446
447
1035
  if (env->isolate_data()->platform() == nullptr) {
448
    THROW_ERR_MISSING_PLATFORM_FOR_WORKER(env);
449
4
    return;
450
  }
451
452
1035
  std::string url;
453
1035
  std::shared_ptr<PerIsolateOptions> per_isolate_opts = nullptr;
454
1035
  std::shared_ptr<KVStore> env_vars = nullptr;
455
456
1035
  std::vector<std::string> exec_argv_out;
457
458
  // Argument might be a string or URL
459
2070
  if (!args[0]->IsNullOrUndefined()) {
460
    Utf8Value value(
461
1677
        isolate, args[0]->ToString(env->context()).FromMaybe(Local<String>()));
462
559
    url.append(value.out(), value.length());
463
  }
464
465
2070
  if (args[1]->IsNull()) {
466
    // Means worker.env = { ...process.env }.
467
1008
    env_vars = env->env_vars()->Clone(isolate);
468
27
  } else if (args[1]->IsObject()) {
469
    // User provided env.
470
26
    env_vars = KVStore::CreateMapKVStore();
471
26
    env_vars->AssignFromObject(isolate->GetCurrentContext(),
472
78
                               args[1].As<Object>());
473
  } else {
474
    // Env is shared.
475
1
    env_vars = env->env_vars();
476
  }
477
478

2044
  if (args[1]->IsObject() || args[2]->IsArray()) {
479
276
    per_isolate_opts.reset(new PerIsolateOptions());
480
481
276
    HandleEnvOptions(per_isolate_opts->per_env, [&env_vars](const char* name) {
482
2208
      return env_vars->Get(name).FromMaybe("");
483
    });
484
485
#ifndef NODE_WITHOUT_NODE_OPTIONS
486
    MaybeLocal<String> maybe_node_opts =
487
276
        env_vars->Get(isolate, OneByteString(isolate, "NODE_OPTIONS"));
488
    Local<String> node_opts;
489
276
    if (maybe_node_opts.ToLocal(&node_opts)) {
490
825
      std::string node_options(*String::Utf8Value(isolate, node_opts));
491
275
      std::vector<std::string> errors{};
492
      std::vector<std::string> env_argv =
493
275
          ParseNodeOptionsEnvVar(node_options, &errors);
494
      // [0] is expected to be the program name, add dummy string.
495
275
      env_argv.insert(env_argv.begin(), "");
496
275
      std::vector<std::string> invalid_args{};
497
275
      options_parser::Parse(&env_argv,
498
                            nullptr,
499
                            &invalid_args,
500
                            per_isolate_opts.get(),
501
                            kAllowedInEnvironment,
502
                            &errors);
503

276
      if (!errors.empty() && args[1]->IsObject()) {
504
        // Only fail for explicitly provided env, this protects from failures
505
        // when NODE_OPTIONS from parent's env is used (which is the default).
506
        Local<Value> error;
507
2
        if (!ToV8Value(env->context(), errors).ToLocal(&error)) return;
508
        Local<String> key =
509
1
            FIXED_ONE_BYTE_STRING(env->isolate(), "invalidNodeOptions");
510
        // Ignore the return value of Set() because exceptions bubble up to JS
511
        // when we return anyway.
512
1
        USE(args.This()->Set(env->context(), key, error));
513
1
        return;
514
      }
515
    }
516
#endif  // NODE_WITHOUT_NODE_OPTIONS
517
  }
518
519
1034
  if (args[2]->IsArray()) {
520
273
    Local<Array> array = args[2].As<Array>();
521
    // The first argument is reserved for program name, but we don't need it
522
    // in workers.
523
1092
    std::vector<std::string> exec_argv = {""};
524
273
    uint32_t length = array->Length();
525
393
    for (uint32_t i = 0; i < length; i++) {
526
      Local<Value> arg;
527
240
      if (!array->Get(env->context(), i).ToLocal(&arg)) {
528
        return;
529
      }
530
      Local<String> arg_v8;
531
240
      if (!arg->ToString(env->context()).ToLocal(&arg_v8)) {
532
        return;
533
      }
534
240
      Utf8Value arg_utf8_value(args.GetIsolate(), arg_v8);
535
240
      std::string arg_string(arg_utf8_value.out(), arg_utf8_value.length());
536
120
      exec_argv.push_back(arg_string);
537
    }
538
539
273
    std::vector<std::string> invalid_args{};
540
273
    std::vector<std::string> errors{};
541
    // Using invalid_args as the v8_args argument as it stores unknown
542
    // options for the per isolate parser.
543
273
    options_parser::Parse(
544
        &exec_argv,
545
        &exec_argv_out,
546
        &invalid_args,
547
        per_isolate_opts.get(),
548
        kDisallowedInEnvironment,
549
        &errors);
550
551
    // The first argument is program name.
552
273
    invalid_args.erase(invalid_args.begin());
553

273
    if (errors.size() > 0 || invalid_args.size() > 0) {
554
      Local<Value> error;
555
3
      if (!ToV8Value(env->context(),
556
3
                     errors.size() > 0 ? errors : invalid_args)
557
3
                         .ToLocal(&error)) {
558
        return;
559
      }
560
      Local<String> key =
561
3
          FIXED_ONE_BYTE_STRING(env->isolate(), "invalidExecArgv");
562
      // Ignore the return value of Set() because exceptions bubble up to JS
563
      // when we return anyway.
564
3
      USE(args.This()->Set(env->context(), key, error));
565
3
      return;
566
    }
567
  } else {
568
761
    exec_argv_out = env->exec_argv();
569
  }
570
571
1031
  bool use_node_snapshot = per_process::cli_options->node_snapshot;
572
  const SnapshotData* snapshot_data =
573
1031
      use_node_snapshot ? SnapshotBuilder::GetEmbeddedSnapshotData() : nullptr;
574
575
  Worker* worker = new Worker(env,
576
2062
                              args.This(),
577
                              url,
578
                              per_isolate_opts,
579
1031
                              std::move(exec_argv_out),
580
                              env_vars,
581
1031
                              snapshot_data);
582
583
1031
  CHECK(args[3]->IsFloat64Array());
584
2062
  Local<Float64Array> limit_info = args[3].As<Float64Array>();
585
1031
  CHECK_EQ(limit_info->Length(), kTotalResourceLimitCount);
586
1031
  limit_info->CopyContents(worker->resource_limits_,
587
                           sizeof(worker->resource_limits_));
588
589
1031
  CHECK(args[4]->IsBoolean());
590

1031
  if (args[4]->IsTrue() || env->tracks_unmanaged_fds())
591
1030
    worker->environment_flags_ |= EnvironmentFlags::kTrackUnmanagedFds;
592
1031
  if (env->hide_console_windows())
593
    worker->environment_flags_ |= EnvironmentFlags::kHideConsoleWindows;
594
1031
  if (env->no_native_addons())
595
4
    worker->environment_flags_ |= EnvironmentFlags::kNoNativeAddons;
596
1031
  if (env->no_global_search_paths())
597
    worker->environment_flags_ |= EnvironmentFlags::kNoGlobalSearchPaths;
598
1031
  if (env->no_browser_globals())
599
    worker->environment_flags_ |= EnvironmentFlags::kNoBrowserGlobals;
600
}
601
602
1028
void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
603
  Worker* w;
604
1028
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
605
2056
  Mutex::ScopedLock lock(w->mutex_);
606
607
1028
  w->stopped_ = false;
608
609
1028
  if (w->resource_limits_[kStackSizeMb] > 0) {
610
6
    if (w->resource_limits_[kStackSizeMb] * kMB < kStackBufferSize) {
611
      w->resource_limits_[kStackSizeMb] = kStackBufferSize / kMB;
612
      w->stack_size_ = kStackBufferSize;
613
    } else {
614
12
      w->stack_size_ =
615
6
          static_cast<size_t>(w->resource_limits_[kStackSizeMb] * kMB);
616
    }
617
  } else {
618
1022
    w->resource_limits_[kStackSizeMb] = w->stack_size_ / kMB;
619
  }
620
621
  uv_thread_options_t thread_options;
622
1028
  thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
623
1028
  thread_options.stack_size = w->stack_size_;
624
625
1028
  uv_thread_t* tid = &w->tid_.emplace();  // Create uv_thread_t instance
626
1028
  int ret = uv_thread_create_ex(tid, &thread_options, [](void* arg) {
627
    // XXX: This could become a std::unique_ptr, but that makes at least
628
    // gcc 6.3 detect undefined behaviour when there shouldn't be any.
629
    // gcc 7+ handles this well.
630
1028
    Worker* w = static_cast<Worker*>(arg);
631
1028
    const uintptr_t stack_top = reinterpret_cast<uintptr_t>(&arg);
632
633
    // Leave a few kilobytes just to make sure we're within limits and have
634
    // some space to do work in C++ land.
635
1028
    w->stack_base_ = stack_top - (w->stack_size_ - kStackBufferSize);
636
637
1028
    w->Run();
638
639
1028
    Mutex::ScopedLock lock(w->mutex_);
640
1028
    w->env()->SetImmediateThreadsafe(
641
1006
        [w = std::unique_ptr<Worker>(w)](Environment* env) {
642
1006
          if (w->has_ref_)
643
1004
            env->add_refs(-1);
644
1006
          w->JoinThread();
645
          // implicitly delete w
646
1004
        });
647
1028
  }, static_cast<void*>(w));
648
649
1028
  if (ret == 0) {
650
    // The object now owns the created thread and should not be garbage
651
    // collected until that finishes.
652
1028
    w->ClearWeak();
653
654
1028
    if (w->has_ref_)
655
1028
      w->env()->add_refs(1);
656
657
1028
    w->env()->add_sub_worker_context(w);
658
  } else {
659
    w->stopped_ = true;
660
    w->tid_.reset();
661
662
    char err_buf[128];
663
    uv_err_name_r(ret, err_buf, sizeof(err_buf));
664
    {
665
      Isolate* isolate = w->env()->isolate();
666
      HandleScope handle_scope(isolate);
667
      THROW_ERR_WORKER_INIT_FAILED(isolate, err_buf);
668
    }
669
  }
670
}
671
672
454
void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
673
  Worker* w;
674
454
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
675
676
454
  Debug(w, "Worker %llu is getting stopped by parent", w->thread_id_.id);
677
454
  w->Exit(1);
678
}
679
680
458
void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
681
  Worker* w;
682
458
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
683

458
  if (!w->has_ref_ && w->tid_.has_value()) {
684
5
    w->has_ref_ = true;
685
5
    w->env()->add_refs(1);
686
  }
687
}
688
689
9
void Worker::HasRef(const FunctionCallbackInfo<Value>& args) {
690
  Worker* w;
691
9
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
692
16
  args.GetReturnValue().Set(w->has_ref_);
693
}
694
695
7
void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
696
  Worker* w;
697
7
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
698

7
  if (w->has_ref_ && w->tid_.has_value()) {
699
7
    w->has_ref_ = false;
700
7
    w->env()->add_refs(-1);
701
  }
702
}
703
704
1
void Worker::GetResourceLimits(const FunctionCallbackInfo<Value>& args) {
705
  Worker* w;
706
1
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
707
2
  args.GetReturnValue().Set(w->GetResourceLimits(args.GetIsolate()));
708
}
709
710
805
Local<Float64Array> Worker::GetResourceLimits(Isolate* isolate) const {
711
805
  Local<ArrayBuffer> ab = ArrayBuffer::New(isolate, sizeof(resource_limits_));
712
713
805
  memcpy(ab->GetBackingStore()->Data(),
714
805
         resource_limits_,
715
         sizeof(resource_limits_));
716
805
  return Float64Array::New(ab, 0, kTotalResourceLimitCount);
717
}
718
719
773
void Worker::Exit(int code, const char* error_code, const char* error_message) {
720
1546
  Mutex::ScopedLock lock(mutex_);
721
773
  Debug(this, "Worker %llu called Exit(%d, %s, %s)",
722
773
        thread_id_.id, code, error_code, error_message);
723
724
773
  if (error_code != nullptr) {
725
221
    custom_error_ = error_code;
726
221
    custom_error_str_ = error_message;
727
  }
728
729
773
  if (env_ != nullptr) {
730
384
    exit_code_ = code;
731
384
    Stop(env_);
732
  } else {
733
389
    stopped_ = true;
734
  }
735
773
}
736
737
void Worker::MemoryInfo(MemoryTracker* tracker) const {
738
  tracker->TrackField("parent_port", parent_port_);
739
}
740
741
bool Worker::IsNotIndicativeOfMemoryLeakAtExit() const {
742
  // Worker objects always stay alive as long as the child thread, regardless
743
  // of whether they are being referenced in the parent thread.
744
  return true;
745
}
746
747
class WorkerHeapSnapshotTaker : public AsyncWrap {
748
 public:
749
1
  WorkerHeapSnapshotTaker(Environment* env, Local<Object> obj)
750
1
    : AsyncWrap(env, obj, AsyncWrap::PROVIDER_WORKERHEAPSNAPSHOT) {}
751
752
  SET_NO_MEMORY_INFO()
753
  SET_MEMORY_INFO_NAME(WorkerHeapSnapshotTaker)
754
  SET_SELF_SIZE(WorkerHeapSnapshotTaker)
755
};
756
757
1
void Worker::TakeHeapSnapshot(const FunctionCallbackInfo<Value>& args) {
758
  Worker* w;
759
1
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
760
761
1
  Debug(w, "Worker %llu taking heap snapshot", w->thread_id_.id);
762
763
1
  Environment* env = w->env();
764
1
  AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
765
  Local<Object> wrap;
766
1
  if (!env->worker_heap_snapshot_taker_template()
767
2
      ->NewInstance(env->context()).ToLocal(&wrap)) {
768
    return;
769
  }
770
  BaseObjectPtr<WorkerHeapSnapshotTaker> taker =
771
1
      MakeDetachedBaseObject<WorkerHeapSnapshotTaker>(env, wrap);
772
773
  // Interrupt the worker thread and take a snapshot, then schedule a call
774
  // on the parent thread that turns that snapshot into a readable stream.
775
1
  bool scheduled = w->RequestInterrupt([taker, env](Environment* worker_env) {
776
    heap::HeapSnapshotPointer snapshot {
777
1
        worker_env->isolate()->GetHeapProfiler()->TakeHeapSnapshot() };
778
1
    CHECK(snapshot);
779
2
    env->SetImmediateThreadsafe(
780
1
        [taker, snapshot = std::move(snapshot)](Environment* env) mutable {
781
2
          HandleScope handle_scope(env->isolate());
782
1
          Context::Scope context_scope(env->context());
783
784
2
          AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker.get());
785
          BaseObjectPtr<AsyncWrap> stream = heap::CreateHeapSnapshotStream(
786
2
              env, std::move(snapshot));
787
1
          Local<Value> args[] = { stream->object() };
788
1
          taker->MakeCallback(env->ondone_string(), arraysize(args), args);
789
1
        }, CallbackFlags::kUnrefed);
790
1
  });
791
2
  args.GetReturnValue().Set(scheduled ? taker->object() : Local<Object>());
792
}
793
794
7
void Worker::LoopIdleTime(const FunctionCallbackInfo<Value>& args) {
795
  Worker* w;
796
7
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
797
798
7
  Mutex::ScopedLock lock(w->mutex_);
799
  // Using w->is_stopped() here leads to a deadlock, and checking is_stopped()
800
  // before locking the mutex is a race condition. So manually do the same
801
  // check.
802

7
  if (w->stopped_ || w->env_ == nullptr)
803
    return args.GetReturnValue().Set(-1);
804
805
7
  uint64_t idle_time = uv_metrics_idle_time(w->env_->event_loop());
806
14
  args.GetReturnValue().Set(1.0 * idle_time / 1e6);
807
}
808
809
1
void Worker::LoopStartTime(const FunctionCallbackInfo<Value>& args) {
810
  Worker* w;
811
1
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
812
813
1
  Mutex::ScopedLock lock(w->mutex_);
814
  // Using w->is_stopped() here leads to a deadlock, and checking is_stopped()
815
  // before locking the mutex is a race condition. So manually do the same
816
  // check.
817

1
  if (w->stopped_ || w->env_ == nullptr)
818
    return args.GetReturnValue().Set(-1);
819
820
1
  double loop_start_time = w->env_->performance_state()->milestones[
821
1
      node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START];
822
1
  CHECK_GE(loop_start_time, 0);
823
1
  args.GetReturnValue().Set(
824
1
      (loop_start_time - node::performance::timeOrigin) / 1e6);
825
}
826
827
namespace {
828
829
// Return the MessagePort that is global for this Environment and communicates
830
// with the internal [kPort] port of the JS Worker class in the parent thread.
831
1597
void GetEnvMessagePort(const FunctionCallbackInfo<Value>& args) {
832
1597
  Environment* env = Environment::GetCurrent(args);
833
1597
  Local<Object> port = env->message_port();
834

3194
  CHECK_IMPLIES(!env->is_main_thread(), !port.IsEmpty());
835
1597
  if (!port.IsEmpty()) {
836
4791
    CHECK_EQ(port->GetCreationContext().ToLocalChecked()->GetIsolate(),
837
             args.GetIsolate());
838
3194
    args.GetReturnValue().Set(port);
839
  }
840
1597
}
841
842
860
void InitWorker(Local<Object> target,
843
                Local<Value> unused,
844
                Local<Context> context,
845
                void* priv) {
846
860
  Environment* env = Environment::GetCurrent(context);
847
848
  {
849
860
    Local<FunctionTemplate> w = env->NewFunctionTemplate(Worker::New);
850
851
1720
    w->InstanceTemplate()->SetInternalFieldCount(
852
        Worker::kInternalFieldCount);
853
860
    w->Inherit(AsyncWrap::GetConstructorTemplate(env));
854
855
860
    env->SetProtoMethod(w, "startThread", Worker::StartThread);
856
860
    env->SetProtoMethod(w, "stopThread", Worker::StopThread);
857
860
    env->SetProtoMethod(w, "hasRef", Worker::HasRef);
858
860
    env->SetProtoMethod(w, "ref", Worker::Ref);
859
860
    env->SetProtoMethod(w, "unref", Worker::Unref);
860
860
    env->SetProtoMethod(w, "getResourceLimits", Worker::GetResourceLimits);
861
860
    env->SetProtoMethod(w, "takeHeapSnapshot", Worker::TakeHeapSnapshot);
862
860
    env->SetProtoMethod(w, "loopIdleTime", Worker::LoopIdleTime);
863
860
    env->SetProtoMethod(w, "loopStartTime", Worker::LoopStartTime);
864
865
860
    env->SetConstructorFunction(target, "Worker", w);
866
  }
867
868
  {
869
860
    Local<FunctionTemplate> wst = FunctionTemplate::New(env->isolate());
870
871
1720
    wst->InstanceTemplate()->SetInternalFieldCount(
872
        WorkerHeapSnapshotTaker::kInternalFieldCount);
873
860
    wst->Inherit(AsyncWrap::GetConstructorTemplate(env));
874
875
    Local<String> wst_string =
876
860
        FIXED_ONE_BYTE_STRING(env->isolate(), "WorkerHeapSnapshotTaker");
877
860
    wst->SetClassName(wst_string);
878
860
    env->set_worker_heap_snapshot_taker_template(wst->InstanceTemplate());
879
  }
880
881
860
  env->SetMethod(target, "getEnvMessagePort", GetEnvMessagePort);
882
883
  target
884
860
      ->Set(env->context(),
885
            env->thread_id_string(),
886
3440
            Number::New(env->isolate(), static_cast<double>(env->thread_id())))
887
      .Check();
888
889
  target
890
860
      ->Set(env->context(),
891
            FIXED_ONE_BYTE_STRING(env->isolate(), "isMainThread"),
892
3440
            Boolean::New(env->isolate(), env->is_main_thread()))
893
      .Check();
894
895
  target
896
860
      ->Set(env->context(),
897
            FIXED_ONE_BYTE_STRING(env->isolate(), "ownsProcessState"),
898
2580
            Boolean::New(env->isolate(), env->owns_process_state()))
899
      .Check();
900
901
860
  if (!env->is_main_thread()) {
902
    target
903
804
        ->Set(env->context(),
904
              FIXED_ONE_BYTE_STRING(env->isolate(), "resourceLimits"),
905
3216
              env->worker_context()->GetResourceLimits(env->isolate()))
906
        .Check();
907
  }
908
909
2580
  NODE_DEFINE_CONSTANT(target, kMaxYoungGenerationSizeMb);
910
2580
  NODE_DEFINE_CONSTANT(target, kMaxOldGenerationSizeMb);
911
2580
  NODE_DEFINE_CONSTANT(target, kCodeRangeSizeMb);
912
2580
  NODE_DEFINE_CONSTANT(target, kStackSizeMb);
913
1720
  NODE_DEFINE_CONSTANT(target, kTotalResourceLimitCount);
914
860
}
915
916
5180
void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
917
5180
  registry->Register(GetEnvMessagePort);
918
5180
  registry->Register(Worker::New);
919
5180
  registry->Register(Worker::StartThread);
920
5180
  registry->Register(Worker::StopThread);
921
5180
  registry->Register(Worker::HasRef);
922
5180
  registry->Register(Worker::Ref);
923
5180
  registry->Register(Worker::Unref);
924
5180
  registry->Register(Worker::GetResourceLimits);
925
5180
  registry->Register(Worker::TakeHeapSnapshot);
926
5180
  registry->Register(Worker::LoopIdleTime);
927
5180
  registry->Register(Worker::LoopStartTime);
928
5180
}
929
930
}  // anonymous namespace
931
}  // namespace worker
932
}  // namespace node
933
934
5248
NODE_MODULE_CONTEXT_AWARE_INTERNAL(worker, node::worker::InitWorker)
935
5180
NODE_MODULE_EXTERNAL_REFERENCE(worker, node::worker::RegisterExternalReferences)