GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/node_worker.cc Lines: 389 437 89.0 %
Date: 2020-09-03 22:13:26 Branches: 185 258 71.7 %

Line Branch Exec Source
1
#include "node_worker.h"
2
#include "debug_utils-inl.h"
3
#include "memory_tracker-inl.h"
4
#include "node_errors.h"
5
#include "node_buffer.h"
6
#include "node_options-inl.h"
7
#include "node_perf.h"
8
#include "util-inl.h"
9
#include "async_wrap-inl.h"
10
11
#include <memory>
12
#include <string>
13
#include <vector>
14
15
using node::kAllowedInEnvironment;
16
using node::kDisallowedInEnvironment;
17
using v8::Array;
18
using v8::ArrayBuffer;
19
using v8::Boolean;
20
using v8::Context;
21
using v8::Float64Array;
22
using v8::FunctionCallbackInfo;
23
using v8::FunctionTemplate;
24
using v8::HandleScope;
25
using v8::Integer;
26
using v8::Isolate;
27
using v8::Local;
28
using v8::Locker;
29
using v8::MaybeLocal;
30
using v8::Null;
31
using v8::Number;
32
using v8::Object;
33
using v8::ResourceConstraints;
34
using v8::SealHandleScope;
35
using v8::String;
36
using v8::TryCatch;
37
using v8::Value;
38
39
namespace node {
40
namespace worker {
41
42
constexpr double kMB = 1024 * 1024;
43
44
627
Worker::Worker(Environment* env,
45
               Local<Object> wrap,
46
               const std::string& url,
47
               std::shared_ptr<PerIsolateOptions> per_isolate_opts,
48
               std::vector<std::string>&& exec_argv,
49
627
               std::shared_ptr<KVStore> env_vars)
50
    : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER),
51
      per_isolate_opts_(per_isolate_opts),
52
      exec_argv_(exec_argv),
53
627
      platform_(env->isolate_data()->platform()),
54
      thread_id_(AllocateEnvironmentThreadId()),
55
1254
      env_vars_(env_vars) {
56
627
  Debug(this, "Creating new worker instance with thread id %llu",
57
        thread_id_.id);
58
59
  // Set up everything that needs to be set up in the parent environment.
60
627
  parent_port_ = MessagePort::New(env, env->context());
61
627
  if (parent_port_ == nullptr) {
62
    // This can happen e.g. because execution is terminating.
63
    return;
64
  }
65
66
627
  child_port_data_ = std::make_unique<MessagePortData>(nullptr);
67
627
  MessagePort::Entangle(parent_port_, child_port_data_.get());
68
69
1881
  object()->Set(env->context(),
70
                env->message_port_string(),
71
3135
                parent_port_->object()).Check();
72
73
1881
  object()->Set(env->context(),
74
                env->thread_id_string(),
75
3135
                Number::New(env->isolate(), static_cast<double>(thread_id_.id)))
76
      .Check();
77
78
1254
  inspector_parent_handle_ = GetInspectorParentHandle(
79
627
      env, thread_id_, url.c_str());
80
81
627
  argv_ = std::vector<std::string>{env->argv()[0]};
82
  // Mark this Worker object as weak until we actually start the thread.
83
627
  MakeWeak();
84
85
627
  Debug(this, "Preparation for worker %llu finished", thread_id_.id);
86
}
87
88
3274
bool Worker::is_stopped() const {
89
6550
  Mutex::ScopedLock lock(mutex_);
90
3276
  if (env_ != nullptr)
91
2035
    return env_->is_stopping();
92
1241
  return stopped_;
93
}
94
95
423
void Worker::UpdateResourceConstraints(ResourceConstraints* constraints) {
96
423
  constraints->set_stack_limit(reinterpret_cast<uint32_t*>(stack_base_));
97
98
423
  if (resource_limits_[kMaxYoungGenerationSizeMb] > 0) {
99
1
    constraints->set_max_young_generation_size_in_bytes(
100
2
        resource_limits_[kMaxYoungGenerationSizeMb] * kMB);
101
  } else {
102
422
    resource_limits_[kMaxYoungGenerationSizeMb] =
103
422
        constraints->max_young_generation_size_in_bytes() / kMB;
104
  }
105
106
423
  if (resource_limits_[kMaxOldGenerationSizeMb] > 0) {
107
1
    constraints->set_max_old_generation_size_in_bytes(
108
2
        resource_limits_[kMaxOldGenerationSizeMb] * kMB);
109
  } else {
110
422
    resource_limits_[kMaxOldGenerationSizeMb] =
111
422
        constraints->max_old_generation_size_in_bytes() / kMB;
112
  }
113
114
423
  if (resource_limits_[kCodeRangeSizeMb] > 0) {
115
1
    constraints->set_code_range_size_in_bytes(
116
2
        resource_limits_[kCodeRangeSizeMb] * kMB);
117
  } else {
118
422
    resource_limits_[kCodeRangeSizeMb] =
119
422
        constraints->code_range_size_in_bytes() / kMB;
120
  }
121
423
}
122
123
// This class contains data that is only relevant to the child thread itself,
124
// and only while it is running.
125
// (Eventually, the Environment instance should probably also be moved here.)
126
class WorkerThreadData {
127
 public:
128
625
  explicit WorkerThreadData(Worker* w)
129
625
    : w_(w) {
130
625
    int ret = uv_loop_init(&loop_);
131
625
    if (ret != 0) {
132
      char err_buf[128];
133
202
      uv_err_name_r(ret, err_buf, sizeof(err_buf));
134
202
      w->Exit(1, "ERR_WORKER_INIT_FAILED", err_buf);
135
202
      return;
136
    }
137
423
    loop_init_failed_ = false;
138
423
    uv_loop_configure(&loop_, UV_METRICS_IDLE_TIME);
139
140
    std::shared_ptr<ArrayBufferAllocator> allocator =
141
846
        ArrayBufferAllocator::Create();
142
846
    Isolate::CreateParams params;
143
423
    SetIsolateCreateParamsForNode(&params);
144
423
    params.array_buffer_allocator_shared = allocator;
145
146
423
    w->UpdateResourceConstraints(&params.constraints);
147
148
423
    Isolate* isolate = Isolate::Allocate();
149
423
    if (isolate == nullptr) {
150
      // TODO(addaleax): This should be ERR_WORKER_INIT_FAILED,
151
      // ERR_WORKER_OUT_OF_MEMORY is for reaching the per-Worker heap limit.
152
      w->Exit(1, "ERR_WORKER_OUT_OF_MEMORY", "Failed to create new Isolate");
153
      return;
154
    }
155
156
423
    w->platform_->RegisterIsolate(isolate, &loop_);
157
423
    Isolate::Initialize(isolate, params);
158
422
    SetIsolateUpForNode(isolate);
159
160
423
    isolate->AddNearHeapLimitCallback(Worker::NearHeapLimit, w);
161
162
    {
163
423
      Locker locker(isolate);
164
846
      Isolate::Scope isolate_scope(isolate);
165
      // V8 computes its stack limit the first time a `Locker` is used based on
166
      // --stack-size. Reset it to the correct value.
167
423
      isolate->SetStackLimit(w->stack_base_);
168
169
846
      HandleScope handle_scope(isolate);
170
844
      isolate_data_.reset(CreateIsolateData(isolate,
171
                                            &loop_,
172
422
                                            w_->platform_,
173
423
                                            allocator.get()));
174
423
      CHECK(isolate_data_);
175
423
      if (w_->per_isolate_opts_)
176
96
        isolate_data_->set_options(std::move(w_->per_isolate_opts_));
177
423
      isolate_data_->set_worker_context(w_);
178
    }
179
180
846
    Mutex::ScopedLock lock(w_->mutex_);
181
423
    w_->isolate_ = isolate;
182
  }
183
184
1249
  ~WorkerThreadData() {
185
624
    Debug(w_, "Worker %llu dispose isolate", w_->thread_id_.id);
186
    Isolate* isolate;
187
    {
188
1250
      Mutex::ScopedLock lock(w_->mutex_);
189
625
      isolate = w_->isolate_;
190
625
      w_->isolate_ = nullptr;
191
    }
192
193
625
    if (isolate != nullptr) {
194
423
      CHECK(!loop_init_failed_);
195
423
      bool platform_finished = false;
196
197
423
      isolate_data_.reset();
198
199
2538
      w_->platform_->AddIsolateFinishedCallback(isolate, [](void* data) {
200
423
        *static_cast<bool*>(data) = true;
201
2115
      }, &platform_finished);
202
203
      // The order of these calls is important; if the Isolate is first disposed
204
      // and then unregistered, there is a race condition window in which no
205
      // new Isolate at the same address can successfully be registered with
206
      // the platform.
207
      // (Refs: https://github.com/nodejs/node/issues/30846)
208
423
      w_->platform_->UnregisterIsolate(isolate);
209
423
      isolate->Dispose();
210
211
      // Wait until the platform has cleaned up all relevant resources.
212
1269
      while (!platform_finished) {
213
423
        uv_run(&loop_, UV_RUN_ONCE);
214
      }
215
    }
216
625
    if (!loop_init_failed_) {
217
423
      CheckedUvLoopClose(&loop_);
218
    }
219
625
  }
220
221
423
  bool loop_is_usable() const { return !loop_init_failed_; }
222
223
 private:
224
  Worker* const w_;
225
  uv_loop_t loop_;
226
  bool loop_init_failed_ = true;
227
  DeleteFnPtr<IsolateData, FreeIsolateData> isolate_data_;
228
229
  friend class Worker;
230
};
231
232
1
size_t Worker::NearHeapLimit(void* data, size_t current_heap_limit,
233
                             size_t initial_heap_limit) {
234
1
  Worker* worker = static_cast<Worker*>(data);
235
1
  worker->Exit(1, "ERR_WORKER_OUT_OF_MEMORY", "JS heap out of memory");
236
  // Give the current GC some extra leeway to let it finish rather than
237
  // crash hard. We are not going to perform further allocations anyway.
238
1
  constexpr size_t kExtraHeapAllowance = 16 * 1024 * 1024;
239
1
  return current_heap_limit + kExtraHeapAllowance;
240
}
241
242
625
void Worker::Run() {
243
1032
  std::string name = "WorkerThread ";
244
625
  name += std::to_string(thread_id_.id);
245

1252
  TRACE_EVENT_METADATA1(
246
      "__metadata", "thread_name", "name",
247
      TRACE_STR_COPY(name.c_str()));
248
625
  CHECK_NOT_NULL(platform_);
249
250
625
  Debug(this, "Creating isolate for worker with id %llu", thread_id_.id);
251
252
1032
  WorkerThreadData data(this);
253
625
  if (isolate_ == nullptr) return;
254
423
  CHECK(data.loop_is_usable());
255
256
423
  Debug(this, "Starting worker with id %llu", thread_id_.id);
257
  {
258
829
    Locker locker(isolate_);
259
830
    Isolate::Scope isolate_scope(isolate_);
260
830
    SealHandleScope outer_seal(isolate_);
261
262
830
    DeleteFnPtr<Environment, FreeEnvironment> env_;
263
421
    auto cleanup_env = OnScopeLeave([&]() {
264
      // TODO(addaleax): This call is harmless but should not be necessary.
265
      // Figure out why V8 is raising a DCHECK() here without it
266
      // (in test/parallel/test-async-hooks-worker-asyncfn-terminate-4.js).
267
1644
      isolate_->CancelTerminateExecution();
268
269
830
      if (!env_) return;
270
408
      env_->set_can_call_into_js(false);
271
272
      {
273
815
        Mutex::ScopedLock lock(mutex_);
274
408
        stopped_ = true;
275
408
        this->env_ = nullptr;
276
      }
277
278
408
      env_.reset();
279
829
    });
280
281
423
    if (is_stopped()) return;
282
    {
283
820
      HandleScope handle_scope(isolate_);
284
      Local<Context> context;
285
      {
286
        // We create the Context object before we have an Environment* in place
287
        // that we could use for error handling. If creation fails due to
288
        // resource constraints, we need something in place to handle it,
289
        // though.
290
824
        TryCatch try_catch(isolate_);
291
414
        context = NewContext(isolate_);
292
414
        if (context.IsEmpty()) {
293
          // TODO(addaleax): This should be ERR_WORKER_INIT_FAILED,
294
          // ERR_WORKER_OUT_OF_MEMORY is for reaching the per-Worker heap limit.
295
4
          Exit(1, "ERR_WORKER_OUT_OF_MEMORY", "Failed to create new Context");
296
4
          return;
297
        }
298
      }
299
300
410
      if (is_stopped()) return;
301
408
      CHECK(!context.IsEmpty());
302
406
      Context::Scope context_scope(context);
303
      {
304
1224
        env_.reset(CreateEnvironment(
305
            data.isolate_data_.get(),
306
            context,
307
408
            std::move(argv_),
308
408
            std::move(exec_argv_),
309
408
            static_cast<EnvironmentFlags::Flags>(environment_flags_),
310
            thread_id_,
311
816
            std::move(inspector_parent_handle_)));
312
408
        if (is_stopped()) return;
313
408
        CHECK_NOT_NULL(env_);
314
408
        env_->set_env_vars(std::move(env_vars_));
315
867
        SetProcessExitHandler(env_.get(), [this](Environment*, int exit_code) {
316
51
          Exit(exit_code);
317
459
        });
318
      }
319
      {
320
816
        Mutex::ScopedLock lock(mutex_);
321
408
        if (stopped_) return;
322
408
        this->env_ = env_.get();
323
      }
324
408
      Debug(this, "Created Environment for worker with id %llu", thread_id_.id);
325
408
      if (is_stopped()) return;
326
      {
327
408
        CreateEnvMessagePort(env_.get());
328
408
        Debug(this, "Created message port for worker %llu", thread_id_.id);
329
816
        if (LoadEnvironment(env_.get(), StartExecutionCallback{}).IsEmpty())
330
1
          return;
331
332
407
        Debug(this, "Loaded environment for worker %llu", thread_id_.id);
333
      }
334
335
407
      if (is_stopped()) return;
336
      {
337
813
        SealHandleScope seal(isolate_);
338
        bool more;
339
407
        env_->performance_state()->Mark(
340
407
            node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START);
341
321
        do {
342
407
          if (is_stopped()) break;
343
407
          uv_run(&data.loop_, UV_RUN_DEFAULT);
344
407
          if (is_stopped()) break;
345
346
323
          platform_->DrainTasks(isolate_);
347
348
322
          more = uv_loop_alive(&data.loop_);
349

324
          if (more && !is_stopped()) continue;
350
351
323
          EmitBeforeExit(env_.get());
352
353
          // Emit `beforeExit` if the loop became alive either after emitting
354
          // event, or after running some callbacks.
355
322
          more = uv_loop_alive(&data.loop_);
356

321
        } while (more == true && !is_stopped());
357
405
        env_->performance_state()->Mark(
358
406
            node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_EXIT);
359
      }
360
    }
361
362
    {
363
      int exit_code;
364
406
      bool stopped = is_stopped();
365
405
      if (!stopped)
366
320
        exit_code = EmitExit(env_.get());
367
811
      Mutex::ScopedLock lock(mutex_);
368

407
      if (exit_code_ == 0 && !stopped)
369
319
        exit_code_ = exit_code;
370
371
407
      Debug(this, "Exiting thread for worker %llu with exit code %d",
372
            thread_id_.id, exit_code_);
373
    }
374
  }
375
376
406
  Debug(this, "Worker %llu thread stops", thread_id_.id);
377
}
378
379
408
void Worker::CreateEnvMessagePort(Environment* env) {
380
816
  HandleScope handle_scope(isolate_);
381
816
  Mutex::ScopedLock lock(mutex_);
382
  // Set up the message channel for receiving messages in the child.
383
816
  MessagePort* child_port = MessagePort::New(env,
384
                                             env->context(),
385
816
                                             std::move(child_port_data_));
386
  // MessagePort::New() may return nullptr if execution is terminated
387
  // within it.
388
408
  if (child_port != nullptr)
389
408
    env->set_message_port(child_port->object(isolate_));
390
408
}
391
392
629
void Worker::JoinThread() {
393
629
  if (thread_joined_)
394
4
    return;
395
625
  CHECK_EQ(uv_thread_join(&tid_), 0);
396
625
  thread_joined_ = true;
397
398
625
  env()->remove_sub_worker_context(this);
399
400
  {
401
1248
    HandleScope handle_scope(env()->isolate());
402
625
    Context::Scope context_scope(env()->context());
403
404
    // Reset the parent port as we're closing it now anyway.
405
1875
    object()->Set(env()->context(),
406
                  env()->message_port_string(),
407
3125
                  Undefined(env()->isolate())).Check();
408
409
    Local<Value> args[] = {
410
        Integer::New(env()->isolate(), exit_code_),
411
625
        custom_error_ != nullptr
412
832
            ? OneByteString(env()->isolate(), custom_error_).As<Value>()
413
1461
            : Null(env()->isolate()).As<Value>(),
414
625
        !custom_error_str_.empty()
415
832
            ? OneByteString(env()->isolate(), custom_error_str_.c_str())
416
                  .As<Value>()
417
1461
            : Null(env()->isolate()).As<Value>(),
418

5000
    };
419
420
625
    MakeCallback(env()->onexit_string(), arraysize(args), args);
421
  }
422
423
  // If we get here, the !thread_joined_ condition at the top of the function
424
  // implies that the thread was running. In that case, its final action will
425
  // be to schedule a callback on the parent thread which will delete this
426
  // object, so there's nothing more to do here.
427
}
428
429
1809
Worker::~Worker() {
430
1206
  Mutex::ScopedLock lock(mutex_);
431
432
603
  CHECK(stopped_);
433
603
  CHECK_NULL(env_);
434
603
  CHECK(thread_joined_);
435
436
603
  Debug(this, "Worker %llu destroyed", thread_id_.id);
437
1206
}
438
439
630
void Worker::New(const FunctionCallbackInfo<Value>& args) {
440
630
  Environment* env = Environment::GetCurrent(args);
441
630
  Isolate* isolate = args.GetIsolate();
442
443
630
  CHECK(args.IsConstructCall());
444
445
630
  if (env->isolate_data()->platform() == nullptr) {
446
    THROW_ERR_MISSING_PLATFORM_FOR_WORKER(env);
447
    return;
448
  }
449
450
1257
  std::string url;
451
1257
  std::shared_ptr<PerIsolateOptions> per_isolate_opts = nullptr;
452
1257
  std::shared_ptr<KVStore> env_vars = nullptr;
453
454
1257
  std::vector<std::string> exec_argv_out;
455
456
  // Argument might be a string or URL
457
1890
  if (!args[0]->IsNullOrUndefined()) {
458
    Utf8Value value(
459
810
        isolate, args[0]->ToString(env->context()).FromMaybe(Local<String>()));
460
162
    url.append(value.out(), value.length());
461
  }
462
463
1890
  if (args[1]->IsNull()) {
464
    // Means worker.env = { ...process.env }.
465
605
    env_vars = env->env_vars()->Clone(isolate);
466
50
  } else if (args[1]->IsObject()) {
467
    // User provided env.
468
24
    env_vars = KVStore::CreateMapKVStore();
469
24
    env_vars->AssignFromObject(isolate->GetCurrentContext(),
470
72
                               args[1].As<Object>());
471
  } else {
472
    // Env is shared.
473
1
    env_vars = env->env_vars();
474
  }
475
476

2472
  if (args[1]->IsObject() || args[2]->IsArray()) {
477
99
    per_isolate_opts.reset(new PerIsolateOptions());
478
479
990
    HandleEnvOptions(per_isolate_opts->per_env, [&env_vars](const char* name) {
480
792
      return env_vars->Get(name).FromMaybe("");
481
891
    });
482
483
#ifndef NODE_WITHOUT_NODE_OPTIONS
484
    MaybeLocal<String> maybe_node_opts =
485
99
        env_vars->Get(isolate, OneByteString(isolate, "NODE_OPTIONS"));
486
    Local<String> node_opts;
487
99
    if (maybe_node_opts.ToLocal(&node_opts)) {
488
294
      std::string node_options(*String::Utf8Value(isolate, node_opts));
489
196
      std::vector<std::string> errors{};
490
      std::vector<std::string> env_argv =
491
196
          ParseNodeOptionsEnvVar(node_options, &errors);
492
      // [0] is expected to be the program name, add dummy string.
493
98
      env_argv.insert(env_argv.begin(), "");
494
196
      std::vector<std::string> invalid_args{};
495
98
      options_parser::Parse(&env_argv,
496
                            nullptr,
497
                            &invalid_args,
498
                            per_isolate_opts.get(),
499
                            kAllowedInEnvironment,
500
98
                            &errors);
501

98
      if (!errors.empty() && args[1]->IsObject()) {
502
        // Only fail for explicitly provided env, this protects from failures
503
        // when NODE_OPTIONS from parent's env is used (which is the default).
504
        Local<Value> error;
505
        if (!ToV8Value(env->context(), errors).ToLocal(&error)) return;
506
        Local<String> key =
507
            FIXED_ONE_BYTE_STRING(env->isolate(), "invalidNodeOptions");
508
        // Ignore the return value of Set() because exceptions bubble up to JS
509
        // when we return anyway.
510
        USE(args.This()->Set(env->context(), key, error));
511
        return;
512
      }
513
    }
514
#endif
515
  }
516
517
1260
  if (args[2]->IsArray()) {
518
194
    Local<Array> array = args[2].As<Array>();
519
    // The first argument is reserved for program name, but we don't need it
520
    // in workers.
521
191
    std::vector<std::string> exec_argv = {""};
522
97
    uint32_t length = array->Length();
523
146
    for (uint32_t i = 0; i < length; i++) {
524
      Local<Value> arg;
525
147
      if (!array->Get(env->context(), i).ToLocal(&arg)) {
526
        return;
527
      }
528
      Local<String> arg_v8;
529
147
      if (!arg->ToString(env->context()).ToLocal(&arg_v8)) {
530
        return;
531
      }
532
98
      Utf8Value arg_utf8_value(args.GetIsolate(), arg_v8);
533
98
      std::string arg_string(arg_utf8_value.out(), arg_utf8_value.length());
534
49
      exec_argv.push_back(arg_string);
535
    }
536
537
191
    std::vector<std::string> invalid_args{};
538
191
    std::vector<std::string> errors{};
539
    // Using invalid_args as the v8_args argument as it stores unknown
540
    // options for the per isolate parser.
541
97
    options_parser::Parse(
542
        &exec_argv,
543
        &exec_argv_out,
544
        &invalid_args,
545
        per_isolate_opts.get(),
546
        kDisallowedInEnvironment,
547
97
        &errors);
548
549
    // The first argument is program name.
550
97
    invalid_args.erase(invalid_args.begin());
551

97
    if (errors.size() > 0 || invalid_args.size() > 0) {
552
      Local<Value> error;
553
6
      if (!ToV8Value(env->context(),
554
3
                     errors.size() > 0 ? errors : invalid_args)
555
3
                         .ToLocal(&error)) {
556
        return;
557
      }
558
      Local<String> key =
559
3
          FIXED_ONE_BYTE_STRING(env->isolate(), "invalidExecArgv");
560
      // Ignore the return value of Set() because exceptions bubble up to JS
561
      // when we return anyway.
562
9
      USE(args.This()->Set(env->context(), key, error));
563
3
      return;
564
    }
565
  } else {
566
533
    exec_argv_out = env->exec_argv();
567
  }
568
569
  Worker* worker = new Worker(env,
570
1254
                              args.This(),
571
                              url,
572
                              per_isolate_opts,
573
                              std::move(exec_argv_out),
574
627
                              env_vars);
575
576
1254
  CHECK(args[3]->IsFloat64Array());
577
1254
  Local<Float64Array> limit_info = args[3].As<Float64Array>();
578
627
  CHECK_EQ(limit_info->Length(), kTotalResourceLimitCount);
579
1254
  limit_info->CopyContents(worker->resource_limits_,
580
627
                           sizeof(worker->resource_limits_));
581
582
1254
  CHECK(args[4]->IsBoolean());
583

1254
  if (args[4]->IsTrue() || env->tracks_unmanaged_fds())
584
626
    worker->environment_flags_ |= EnvironmentFlags::kTrackUnmanagedFds;
585
}
586
587
625
void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
588
  Worker* w;
589
625
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
590
1250
  Mutex::ScopedLock lock(w->mutex_);
591
592
625
  w->stopped_ = false;
593
594
625
  if (w->resource_limits_[kStackSizeMb] > 0) {
595
9
    if (w->resource_limits_[kStackSizeMb] * kMB < kStackBufferSize) {
596
3
      w->resource_limits_[kStackSizeMb] = kStackBufferSize / kMB;
597
3
      w->stack_size_ = kStackBufferSize;
598
    } else {
599
6
      w->stack_size_ = w->resource_limits_[kStackSizeMb] * kMB;
600
    }
601
  } else {
602
616
    w->resource_limits_[kStackSizeMb] = w->stack_size_ / kMB;
603
  }
604
605
  uv_thread_options_t thread_options;
606
625
  thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
607
625
  thread_options.stack_size = w->stack_size_;
608
3124
  int ret = uv_thread_create_ex(&w->tid_, &thread_options, [](void* arg) {
609
    // XXX: This could become a std::unique_ptr, but that makes at least
610
    // gcc 6.3 detect undefined behaviour when there shouldn't be any.
611
    // gcc 7+ handles this well.
612
624
    Worker* w = static_cast<Worker*>(arg);
613
624
    const uintptr_t stack_top = reinterpret_cast<uintptr_t>(&arg);
614
615
    // Leave a few kilobytes just to make sure we're within limits and have
616
    // some space to do work in C++ land.
617
624
    w->stack_base_ = stack_top - (w->stack_size_ - kStackBufferSize);
618
619
624
    w->Run();
620
621
1250
    Mutex::ScopedLock lock(w->mutex_);
622
1250
    w->env()->SetImmediateThreadsafe(
623
2454
        [w = std::unique_ptr<Worker>(w)](Environment* env) {
624
603
          if (w->has_ref_)
625
602
            env->add_refs(-1);
626
603
          w->JoinThread();
627
          // implicitly delete w
628
1226
        });
629
2500
  }, static_cast<void*>(w));
630
631
625
  if (ret == 0) {
632
    // The object now owns the created thread and should not be garbage
633
    // collected until that finishes.
634
625
    w->ClearWeak();
635
625
    w->thread_joined_ = false;
636
637
625
    if (w->has_ref_)
638
625
      w->env()->add_refs(1);
639
640
625
    w->env()->add_sub_worker_context(w);
641
  } else {
642
    w->stopped_ = true;
643
644
    char err_buf[128];
645
    uv_err_name_r(ret, err_buf, sizeof(err_buf));
646
    {
647
      Isolate* isolate = w->env()->isolate();
648
      HandleScope handle_scope(isolate);
649
      THROW_ERR_WORKER_INIT_FAILED(isolate, err_buf);
650
    }
651
  }
652
}
653
654
32
void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
655
  Worker* w;
656
32
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
657
658
32
  Debug(w, "Worker %llu is getting stopped by parent", w->thread_id_.id);
659
32
  w->Exit(1);
660
}
661
662
34
void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
663
  Worker* w;
664
34
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
665

34
  if (!w->has_ref_ && !w->thread_joined_) {
666
3
    w->has_ref_ = true;
667
3
    w->env()->add_refs(1);
668
  }
669
}
670
671
5
void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
672
  Worker* w;
673
5
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
674

5
  if (w->has_ref_ && !w->thread_joined_) {
675
4
    w->has_ref_ = false;
676
4
    w->env()->add_refs(-1);
677
  }
678
}
679
680
1
void Worker::GetResourceLimits(const FunctionCallbackInfo<Value>& args) {
681
  Worker* w;
682
1
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
683
3
  args.GetReturnValue().Set(w->GetResourceLimits(args.GetIsolate()));
684
}
685
686
409
Local<Float64Array> Worker::GetResourceLimits(Isolate* isolate) const {
687
409
  Local<ArrayBuffer> ab = ArrayBuffer::New(isolate, sizeof(resource_limits_));
688
689
1227
  memcpy(ab->GetBackingStore()->Data(),
690
         resource_limits_,
691
409
         sizeof(resource_limits_));
692
409
  return Float64Array::New(ab, 0, kTotalResourceLimitCount);
693
}
694
695
316
void Worker::Exit(int code, const char* error_code, const char* error_message) {
696
632
  Mutex::ScopedLock lock(mutex_);
697
316
  Debug(this, "Worker %llu called Exit(%d, %s, %s)",
698
        thread_id_.id, code, error_code, error_message);
699
700
316
  if (error_code != nullptr) {
701
207
    custom_error_ = error_code;
702
207
    custom_error_str_ = error_message;
703
  }
704
705
316
  if (env_ != nullptr) {
706
89
    exit_code_ = code;
707
89
    Stop(env_);
708
  } else {
709
227
    stopped_ = true;
710
  }
711
316
}
712
713
void Worker::MemoryInfo(MemoryTracker* tracker) const {
714
  tracker->TrackField("parent_port", parent_port_);
715
}
716
717
class WorkerHeapSnapshotTaker : public AsyncWrap {
718
 public:
719
  WorkerHeapSnapshotTaker(Environment* env, Local<Object> obj)
720
    : AsyncWrap(env, obj, AsyncWrap::PROVIDER_WORKERHEAPSNAPSHOT) {}
721
722
  SET_NO_MEMORY_INFO()
723
  SET_MEMORY_INFO_NAME(WorkerHeapSnapshotTaker)
724
  SET_SELF_SIZE(WorkerHeapSnapshotTaker)
725
};
726
727
void Worker::TakeHeapSnapshot(const FunctionCallbackInfo<Value>& args) {
728
  Worker* w;
729
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
730
731
  Debug(w, "Worker %llu taking heap snapshot", w->thread_id_.id);
732
733
  Environment* env = w->env();
734
  AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
735
  Local<Object> wrap;
736
  if (!env->worker_heap_snapshot_taker_template()
737
      ->NewInstance(env->context()).ToLocal(&wrap)) {
738
    return;
739
  }
740
  BaseObjectPtr<WorkerHeapSnapshotTaker> taker =
741
      MakeDetachedBaseObject<WorkerHeapSnapshotTaker>(env, wrap);
742
743
  // Interrupt the worker thread and take a snapshot, then schedule a call
744
  // on the parent thread that turns that snapshot into a readable stream.
745
  bool scheduled = w->RequestInterrupt([taker, env](Environment* worker_env) {
746
    heap::HeapSnapshotPointer snapshot {
747
        worker_env->isolate()->GetHeapProfiler()->TakeHeapSnapshot() };
748
    CHECK(snapshot);
749
    env->SetImmediateThreadsafe(
750
        [taker, snapshot = std::move(snapshot)](Environment* env) mutable {
751
          HandleScope handle_scope(env->isolate());
752
          Context::Scope context_scope(env->context());
753
754
          AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker.get());
755
          BaseObjectPtr<AsyncWrap> stream = heap::CreateHeapSnapshotStream(
756
              env, std::move(snapshot));
757
          Local<Value> args[] = { stream->object() };
758
          taker->MakeCallback(env->ondone_string(), arraysize(args), args);
759
        }, CallbackFlags::kUnrefed);
760
  });
761
  args.GetReturnValue().Set(scheduled ? taker->object() : Local<Object>());
762
}
763
764
namespace {
765
766
// Return the MessagePort that is global for this Environment and communicates
767
// with the internal [kPort] port of the JS Worker class in the parent thread.
768
815
void GetEnvMessagePort(const FunctionCallbackInfo<Value>& args) {
769
815
  Environment* env = Environment::GetCurrent(args);
770
815
  Local<Object> port = env->message_port();
771

1630
  CHECK_IMPLIES(!env->is_main_thread(), !port.IsEmpty());
772
815
  if (!port.IsEmpty()) {
773
2445
    CHECK_EQ(port->CreationContext()->GetIsolate(), args.GetIsolate());
774
1630
    args.GetReturnValue().Set(port);
775
  }
776
815
}
777
778
4124
void InitWorker(Local<Object> target,
779
                Local<Value> unused,
780
                Local<Context> context,
781
                void* priv) {
782
4124
  Environment* env = Environment::GetCurrent(context);
783
784
  {
785
4124
    Local<FunctionTemplate> w = env->NewFunctionTemplate(Worker::New);
786
787
12372
    w->InstanceTemplate()->SetInternalFieldCount(
788
4124
        Worker::kInternalFieldCount);
789
8248
    w->Inherit(AsyncWrap::GetConstructorTemplate(env));
790
791
4124
    env->SetProtoMethod(w, "startThread", Worker::StartThread);
792
4124
    env->SetProtoMethod(w, "stopThread", Worker::StopThread);
793
4124
    env->SetProtoMethod(w, "ref", Worker::Ref);
794
4124
    env->SetProtoMethod(w, "unref", Worker::Unref);
795
4124
    env->SetProtoMethod(w, "getResourceLimits", Worker::GetResourceLimits);
796
4124
    env->SetProtoMethod(w, "takeHeapSnapshot", Worker::TakeHeapSnapshot);
797
798
    Local<String> workerString =
799
4124
        FIXED_ONE_BYTE_STRING(env->isolate(), "Worker");
800
4124
    w->SetClassName(workerString);
801
8248
    target->Set(env->context(),
802
                workerString,
803
20620
                w->GetFunction(env->context()).ToLocalChecked()).Check();
804
  }
805
806
  {
807
4124
    Local<FunctionTemplate> wst = FunctionTemplate::New(env->isolate());
808
809
12372
    wst->InstanceTemplate()->SetInternalFieldCount(
810
4124
        WorkerHeapSnapshotTaker::kInternalFieldCount);
811
8248
    wst->Inherit(AsyncWrap::GetConstructorTemplate(env));
812
813
    Local<String> wst_string =
814
4124
        FIXED_ONE_BYTE_STRING(env->isolate(), "WorkerHeapSnapshotTaker");
815
4124
    wst->SetClassName(wst_string);
816
4124
    env->set_worker_heap_snapshot_taker_template(wst->InstanceTemplate());
817
  }
818
819
4124
  env->SetMethod(target, "getEnvMessagePort", GetEnvMessagePort);
820
821
  target
822
8248
      ->Set(env->context(),
823
            env->thread_id_string(),
824
20620
            Number::New(env->isolate(), static_cast<double>(env->thread_id())))
825
      .Check();
826
827
  target
828
8248
      ->Set(env->context(),
829
            FIXED_ONE_BYTE_STRING(env->isolate(), "isMainThread"),
830
20620
            Boolean::New(env->isolate(), env->is_main_thread()))
831
      .Check();
832
833
  target
834
8248
      ->Set(env->context(),
835
            FIXED_ONE_BYTE_STRING(env->isolate(), "ownsProcessState"),
836
20620
            Boolean::New(env->isolate(), env->owns_process_state()))
837
      .Check();
838
839
4124
  if (!env->is_main_thread()) {
840
    target
841
816
        ->Set(env->context(),
842
              FIXED_ONE_BYTE_STRING(env->isolate(), "resourceLimits"),
843
2040
              env->worker_context()->GetResourceLimits(env->isolate()))
844
        .Check();
845
  }
846
847
16496
  NODE_DEFINE_CONSTANT(target, kMaxYoungGenerationSizeMb);
848
16496
  NODE_DEFINE_CONSTANT(target, kMaxOldGenerationSizeMb);
849
16496
  NODE_DEFINE_CONSTANT(target, kCodeRangeSizeMb);
850
16496
  NODE_DEFINE_CONSTANT(target, kStackSizeMb);
851
16496
  NODE_DEFINE_CONSTANT(target, kTotalResourceLimitCount);
852
4124
}
853
854
}  // anonymous namespace
855
856
}  // namespace worker
857
}  // namespace node
858
859

17887
NODE_MODULE_CONTEXT_AWARE_INTERNAL(worker, node::worker::InitWorker)