GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/node_worker.cc Lines: 387 435 89.0 %
Date: 2020-05-27 22:15:15 Branches: 182 248 73.4 %

Line Branch Exec Source
1
#include "node_worker.h"
2
#include "debug_utils-inl.h"
3
#include "memory_tracker-inl.h"
4
#include "node_errors.h"
5
#include "node_buffer.h"
6
#include "node_options-inl.h"
7
#include "node_perf.h"
8
#include "util-inl.h"
9
#include "async_wrap-inl.h"
10
11
#include <memory>
12
#include <string>
13
#include <vector>
14
15
using node::kAllowedInEnvironment;
16
using node::kDisallowedInEnvironment;
17
using v8::Array;
18
using v8::ArrayBuffer;
19
using v8::Boolean;
20
using v8::Context;
21
using v8::Float64Array;
22
using v8::FunctionCallbackInfo;
23
using v8::FunctionTemplate;
24
using v8::HandleScope;
25
using v8::Integer;
26
using v8::Isolate;
27
using v8::Local;
28
using v8::Locker;
29
using v8::MaybeLocal;
30
using v8::Null;
31
using v8::Number;
32
using v8::Object;
33
using v8::ResourceConstraints;
34
using v8::SealHandleScope;
35
using v8::String;
36
using v8::TryCatch;
37
using v8::Value;
38
39
namespace node {
40
namespace worker {
41
42
constexpr double kMB = 1024 * 1024;
43
44
546
Worker::Worker(Environment* env,
45
               Local<Object> wrap,
46
               const std::string& url,
47
               std::shared_ptr<PerIsolateOptions> per_isolate_opts,
48
               std::vector<std::string>&& exec_argv,
49
546
               std::shared_ptr<KVStore> env_vars)
50
    : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER),
51
      per_isolate_opts_(per_isolate_opts),
52
      exec_argv_(exec_argv),
53
546
      platform_(env->isolate_data()->platform()),
54
      thread_id_(AllocateEnvironmentThreadId()),
55
1092
      env_vars_(env_vars) {
56
546
  Debug(this, "Creating new worker instance with thread id %llu",
57
        thread_id_.id);
58
59
  // Set up everything that needs to be set up in the parent environment.
60
546
  parent_port_ = MessagePort::New(env, env->context());
61
546
  if (parent_port_ == nullptr) {
62
    // This can happen e.g. because execution is terminating.
63
    return;
64
  }
65
66
546
  child_port_data_ = std::make_unique<MessagePortData>(nullptr);
67
546
  MessagePort::Entangle(parent_port_, child_port_data_.get());
68
69
1638
  object()->Set(env->context(),
70
                env->message_port_string(),
71
2730
                parent_port_->object()).Check();
72
73
1638
  object()->Set(env->context(),
74
                env->thread_id_string(),
75
2730
                Number::New(env->isolate(), static_cast<double>(thread_id_.id)))
76
      .Check();
77
78
1092
  inspector_parent_handle_ = GetInspectorParentHandle(
79
546
      env, thread_id_, url.c_str());
80
81
546
  argv_ = std::vector<std::string>{env->argv()[0]};
82
  // Mark this Worker object as weak until we actually start the thread.
83
546
  MakeWeak();
84
85
546
  Debug(this, "Preparation for worker %llu finished", thread_id_.id);
86
}
87
88
2407
bool Worker::is_stopped() const {
89
4814
  Mutex::ScopedLock lock(mutex_);
90
2406
  if (env_ != nullptr)
91
1491
    return env_->is_stopping();
92
915
  return stopped_;
93
}
94
95
315
void Worker::UpdateResourceConstraints(ResourceConstraints* constraints) {
96
315
  constraints->set_stack_limit(reinterpret_cast<uint32_t*>(stack_base_));
97
98
315
  if (resource_limits_[kMaxYoungGenerationSizeMb] > 0) {
99
1
    constraints->set_max_young_generation_size_in_bytes(
100
2
        resource_limits_[kMaxYoungGenerationSizeMb] * kMB);
101
  } else {
102
314
    resource_limits_[kMaxYoungGenerationSizeMb] =
103
314
        constraints->max_young_generation_size_in_bytes() / kMB;
104
  }
105
106
315
  if (resource_limits_[kMaxOldGenerationSizeMb] > 0) {
107
1
    constraints->set_max_old_generation_size_in_bytes(
108
2
        resource_limits_[kMaxOldGenerationSizeMb] * kMB);
109
  } else {
110
314
    resource_limits_[kMaxOldGenerationSizeMb] =
111
314
        constraints->max_old_generation_size_in_bytes() / kMB;
112
  }
113
114
315
  if (resource_limits_[kCodeRangeSizeMb] > 0) {
115
1
    constraints->set_code_range_size_in_bytes(
116
2
        resource_limits_[kCodeRangeSizeMb] * kMB);
117
  } else {
118
314
    resource_limits_[kCodeRangeSizeMb] =
119
314
        constraints->code_range_size_in_bytes() / kMB;
120
  }
121
315
}
122
123
// This class contains data that is only relevant to the child thread itself,
124
// and only while it is running.
125
// (Eventually, the Environment instance should probably also be moved here.)
126
class WorkerThreadData {
127
 public:
128
544
  explicit WorkerThreadData(Worker* w)
129
544
    : w_(w) {
130
544
    int ret = uv_loop_init(&loop_);
131
544
    if (ret != 0) {
132
      char err_buf[128];
133
229
      uv_err_name_r(ret, err_buf, sizeof(err_buf));
134
229
      w->Exit(1, "ERR_WORKER_INIT_FAILED", err_buf);
135
229
      return;
136
    }
137
315
    loop_init_failed_ = false;
138
139
    std::shared_ptr<ArrayBufferAllocator> allocator =
140
630
        ArrayBufferAllocator::Create();
141
630
    Isolate::CreateParams params;
142
315
    SetIsolateCreateParamsForNode(&params);
143
315
    params.array_buffer_allocator_shared = allocator;
144
145
315
    w->UpdateResourceConstraints(&params.constraints);
146
147
315
    Isolate* isolate = Isolate::Allocate();
148
315
    if (isolate == nullptr) {
149
      // TODO(addaleax): This should be ERR_WORKER_INIT_FAILED,
150
      // ERR_WORKER_OUT_OF_MEMORY is for reaching the per-Worker heap limit.
151
      w->Exit(1, "ERR_WORKER_OUT_OF_MEMORY", "Failed to create new Isolate");
152
      return;
153
    }
154
155
315
    w->platform_->RegisterIsolate(isolate, &loop_);
156
315
    Isolate::Initialize(isolate, params);
157
315
    SetIsolateUpForNode(isolate);
158
159
315
    isolate->AddNearHeapLimitCallback(Worker::NearHeapLimit, w);
160
161
    {
162
315
      Locker locker(isolate);
163
630
      Isolate::Scope isolate_scope(isolate);
164
      // V8 computes its stack limit the first time a `Locker` is used based on
165
      // --stack-size. Reset it to the correct value.
166
315
      isolate->SetStackLimit(w->stack_base_);
167
168
630
      HandleScope handle_scope(isolate);
169
630
      isolate_data_.reset(CreateIsolateData(isolate,
170
                                            &loop_,
171
315
                                            w_->platform_,
172
315
                                            allocator.get()));
173
315
      CHECK(isolate_data_);
174
315
      if (w_->per_isolate_opts_)
175
40
        isolate_data_->set_options(std::move(w_->per_isolate_opts_));
176
315
      isolate_data_->set_worker_context(w_);
177
    }
178
179
630
    Mutex::ScopedLock lock(w_->mutex_);
180
315
    w_->isolate_ = isolate;
181
  }
182
183
1087
  ~WorkerThreadData() {
184
543
    Debug(w_, "Worker %llu dispose isolate", w_->thread_id_.id);
185
    Isolate* isolate;
186
    {
187
1088
      Mutex::ScopedLock lock(w_->mutex_);
188
544
      isolate = w_->isolate_;
189
544
      w_->isolate_ = nullptr;
190
    }
191
192
544
    if (isolate != nullptr) {
193
315
      CHECK(!loop_init_failed_);
194
315
      bool platform_finished = false;
195
196
315
      isolate_data_.reset();
197
198
1890
      w_->platform_->AddIsolateFinishedCallback(isolate, [](void* data) {
199
315
        *static_cast<bool*>(data) = true;
200
1575
      }, &platform_finished);
201
202
      // The order of these calls is important; if the Isolate is first disposed
203
      // and then unregistered, there is a race condition window in which no
204
      // new Isolate at the same address can successfully be registered with
205
      // the platform.
206
      // (Refs: https://github.com/nodejs/node/issues/30846)
207
315
      w_->platform_->UnregisterIsolate(isolate);
208
315
      isolate->Dispose();
209
210
      // Wait until the platform has cleaned up all relevant resources.
211
945
      while (!platform_finished) {
212
315
        uv_run(&loop_, UV_RUN_ONCE);
213
      }
214
    }
215
544
    if (!loop_init_failed_) {
216
315
      CheckedUvLoopClose(&loop_);
217
    }
218
544
  }
219
220
315
  bool loop_is_usable() const { return !loop_init_failed_; }
221
222
 private:
223
  Worker* const w_;
224
  uv_loop_t loop_;
225
  bool loop_init_failed_ = true;
226
  DeleteFnPtr<IsolateData, FreeIsolateData> isolate_data_;
227
228
  friend class Worker;
229
};
230
231
1
size_t Worker::NearHeapLimit(void* data, size_t current_heap_limit,
232
                             size_t initial_heap_limit) {
233
1
  Worker* worker = static_cast<Worker*>(data);
234
1
  worker->Exit(1, "ERR_WORKER_OUT_OF_MEMORY", "JS heap out of memory");
235
  // Give the current GC some extra leeway to let it finish rather than
236
  // crash hard. We are not going to perform further allocations anyway.
237
1
  constexpr size_t kExtraHeapAllowance = 16 * 1024 * 1024;
238
1
  return current_heap_limit + kExtraHeapAllowance;
239
}
240
241
544
void Worker::Run() {
242
841
  std::string name = "WorkerThread ";
243
543
  name += std::to_string(thread_id_.id);
244

1090
  TRACE_EVENT_METADATA1(
245
      "__metadata", "thread_name", "name",
246
      TRACE_STR_COPY(name.c_str()));
247
544
  CHECK_NOT_NULL(platform_);
248
249
544
  Debug(this, "Creating isolate for worker with id %llu", thread_id_.id);
250
251
841
  WorkerThreadData data(this);
252
544
  if (isolate_ == nullptr) return;
253
315
  CHECK(data.loop_is_usable());
254
255
315
  Debug(this, "Starting worker with id %llu", thread_id_.id);
256
  {
257
612
    Locker locker(isolate_);
258
611
    Isolate::Scope isolate_scope(isolate_);
259
612
    SealHandleScope outer_seal(isolate_);
260
261
611
    DeleteFnPtr<Environment, FreeEnvironment> env_;
262
315
    auto cleanup_env = OnScopeLeave([&]() {
263
      // TODO(addaleax): This call is harmless but should not be necessary.
264
      // Figure out why V8 is raising a DCHECK() here without it
265
      // (in test/parallel/test-async-hooks-worker-asyncfn-terminate-4.js).
266
1515
      isolate_->CancelTerminateExecution();
267
268
615
      if (!env_) return;
269
300
      env_->set_can_call_into_js(false);
270
271
      {
272
600
        Mutex::ScopedLock lock(mutex_);
273
300
        stopped_ = true;
274
300
        this->env_ = nullptr;
275
      }
276
277
      // TODO(addaleax): Try moving DisallowJavascriptExecutionScope into
278
      // FreeEnvironment().
279
      Isolate::DisallowJavascriptExecutionScope disallow_js(isolate_,
280
599
          Isolate::DisallowJavascriptExecutionScope::THROW_ON_FAILURE);
281
300
      env_.reset();
282
611
    });
283
284
315
    if (is_stopped()) return;
285
    {
286
601
      HandleScope handle_scope(isolate_);
287
      Local<Context> context;
288
      {
289
        // We create the Context object before we have an Environment* in place
290
        // that we could use for error handling. If creation fails due to
291
        // resource constraints, we need something in place to handle it,
292
        // though.
293
604
        TryCatch try_catch(isolate_);
294
304
        context = NewContext(isolate_);
295
304
        if (context.IsEmpty()) {
296
          // TODO(addaleax): This should be ERR_WORKER_INIT_FAILED,
297
          // ERR_WORKER_OUT_OF_MEMORY is for reaching the per-Worker heap limit.
298
4
          Exit(1, "ERR_WORKER_OUT_OF_MEMORY", "Failed to create new Context");
299
4
          return;
300
        }
301
      }
302
303
300
      if (is_stopped()) return;
304
300
      CHECK(!context.IsEmpty());
305
296
      Context::Scope context_scope(context);
306
      {
307
900
        env_.reset(CreateEnvironment(
308
            data.isolate_data_.get(),
309
            context,
310
300
            std::move(argv_),
311
300
            std::move(exec_argv_),
312
            EnvironmentFlags::kNoFlags,
313
            thread_id_,
314
600
            std::move(inspector_parent_handle_)));
315
300
        if (is_stopped()) return;
316
300
        CHECK_NOT_NULL(env_);
317
300
        env_->set_env_vars(std::move(env_vars_));
318
641
        SetProcessExitHandler(env_.get(), [this](Environment*, int exit_code) {
319
41
          Exit(exit_code);
320
341
        });
321
      }
322
      {
323
600
        Mutex::ScopedLock lock(mutex_);
324
300
        if (stopped_) return;
325
300
        this->env_ = env_.get();
326
      }
327
300
      Debug(this, "Created Environment for worker with id %llu", thread_id_.id);
328
300
      if (is_stopped()) return;
329
      {
330
300
        CreateEnvMessagePort(env_.get());
331
300
        Debug(this, "Created message port for worker %llu", thread_id_.id);
332
600
        if (LoadEnvironment(env_.get(), StartExecutionCallback{}).IsEmpty())
333
3
          return;
334
335
297
        Debug(this, "Loaded environment for worker %llu", thread_id_.id);
336
      }
337
338
297
      if (is_stopped()) return;
339
      {
340
593
        SealHandleScope seal(isolate_);
341
        bool more;
342
297
        env_->performance_state()->Mark(
343
297
            node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START);
344
223
        do {
345
298
          if (is_stopped()) break;
346
298
          uv_run(&data.loop_, UV_RUN_DEFAULT);
347
298
          if (is_stopped()) break;
348
349
223
          platform_->DrainTasks(isolate_);
350
351
223
          more = uv_loop_alive(&data.loop_);
352

223
          if (more && !is_stopped()) continue;
353
354
222
          EmitBeforeExit(env_.get());
355
356
          // Emit `beforeExit` if the loop became alive either after emitting
357
          // event, or after running some callbacks.
358
222
          more = uv_loop_alive(&data.loop_);
359

223
        } while (more == true && !is_stopped());
360
297
        env_->performance_state()->Mark(
361
297
            node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_EXIT);
362
      }
363
    }
364
365
    {
366
      int exit_code;
367
297
      bool stopped = is_stopped();
368
297
      if (!stopped)
369
221
        exit_code = EmitExit(env_.get());
370
594
      Mutex::ScopedLock lock(mutex_);
371

297
      if (exit_code_ == 0 && !stopped)
372
218
        exit_code_ = exit_code;
373
374
297
      Debug(this, "Exiting thread for worker %llu with exit code %d",
375
            thread_id_.id, exit_code_);
376
    }
377
  }
378
379
297
  Debug(this, "Worker %llu thread stops", thread_id_.id);
380
}
381
382
300
void Worker::CreateEnvMessagePort(Environment* env) {
383
600
  HandleScope handle_scope(isolate_);
384
600
  Mutex::ScopedLock lock(mutex_);
385
  // Set up the message channel for receiving messages in the child.
386
600
  MessagePort* child_port = MessagePort::New(env,
387
                                             env->context(),
388
600
                                             std::move(child_port_data_));
389
  // MessagePort::New() may return nullptr if execution is terminated
390
  // within it.
391
300
  if (child_port != nullptr)
392
300
    env->set_message_port(child_port->object(isolate_));
393
300
}
394
395
547
void Worker::JoinThread() {
396
547
  if (thread_joined_)
397
3
    return;
398
544
  CHECK_EQ(uv_thread_join(&tid_), 0);
399
544
  thread_joined_ = true;
400
401
544
  env()->remove_sub_worker_context(this);
402
403
  {
404
1084
    HandleScope handle_scope(env()->isolate());
405
544
    Context::Scope context_scope(env()->context());
406
407
    // Reset the parent port as we're closing it now anyway.
408
1632
    object()->Set(env()->context(),
409
                  env()->message_port_string(),
410
2720
                  Undefined(env()->isolate())).Check();
411
412
    Local<Value> args[] = {
413
        Integer::New(env()->isolate(), exit_code_),
414
544
        custom_error_ != nullptr
415
778
            ? OneByteString(env()->isolate(), custom_error_).As<Value>()
416
1164
            : Null(env()->isolate()).As<Value>(),
417
544
        !custom_error_str_.empty()
418
778
            ? OneByteString(env()->isolate(), custom_error_str_.c_str())
419
                  .As<Value>()
420
1164
            : Null(env()->isolate()).As<Value>(),
421

4352
    };
422
423
544
    MakeCallback(env()->onexit_string(), arraysize(args), args);
424
  }
425
426
  // If we get here, the !thread_joined_ condition at the top of the function
427
  // implies that the thread was running. In that case, its final action will
428
  // be to schedule a callback on the parent thread which will delete this
429
  // object, so there's nothing more to do here.
430
}
431
432
1560
Worker::~Worker() {
433
1040
  Mutex::ScopedLock lock(mutex_);
434
435
520
  CHECK(stopped_);
436
520
  CHECK_NULL(env_);
437
520
  CHECK(thread_joined_);
438
439
520
  Debug(this, "Worker %llu destroyed", thread_id_.id);
440
1040
}
441
442
549
void Worker::New(const FunctionCallbackInfo<Value>& args) {
443
549
  Environment* env = Environment::GetCurrent(args);
444
549
  Isolate* isolate = args.GetIsolate();
445
446
549
  CHECK(args.IsConstructCall());
447
448
549
  if (env->isolate_data()->platform() == nullptr) {
449
    THROW_ERR_MISSING_PLATFORM_FOR_WORKER(env);
450
    return;
451
  }
452
453
1095
  std::string url;
454
1095
  std::shared_ptr<PerIsolateOptions> per_isolate_opts = nullptr;
455
1095
  std::shared_ptr<KVStore> env_vars = nullptr;
456
457
1095
  std::vector<std::string> exec_argv_out;
458
459
549
  CHECK_EQ(args.Length(), 4);
460
  // Argument might be a string or URL
461
1647
  if (!args[0]->IsNullOrUndefined()) {
462
    Utf8Value value(
463
495
        isolate, args[0]->ToString(env->context()).FromMaybe(Local<String>()));
464
99
    url.append(value.out(), value.length());
465
  }
466
467
1647
  if (args[1]->IsNull()) {
468
    // Means worker.env = { ...process.env }.
469
524
    env_vars = env->env_vars()->Clone(isolate);
470
50
  } else if (args[1]->IsObject()) {
471
    // User provided env.
472
24
    env_vars = KVStore::CreateMapKVStore();
473
24
    env_vars->AssignFromObject(isolate->GetCurrentContext(),
474
72
                               args[1].As<Object>());
475
  } else {
476
    // Env is shared.
477
1
    env_vars = env->env_vars();
478
  }
479
480

2148
  if (args[1]->IsObject() || args[2]->IsArray()) {
481
43
    per_isolate_opts.reset(new PerIsolateOptions());
482
483
430
    HandleEnvOptions(per_isolate_opts->per_env, [&env_vars](const char* name) {
484
344
      return env_vars->Get(name).FromMaybe("");
485
387
    });
486
487
#ifndef NODE_WITHOUT_NODE_OPTIONS
488
    MaybeLocal<String> maybe_node_opts =
489
43
        env_vars->Get(isolate, OneByteString(isolate, "NODE_OPTIONS"));
490
43
    if (!maybe_node_opts.IsEmpty()) {
491
      std::string node_options(
492
126
          *String::Utf8Value(isolate, maybe_node_opts.ToLocalChecked()));
493
84
      std::vector<std::string> errors{};
494
      std::vector<std::string> env_argv =
495
84
          ParseNodeOptionsEnvVar(node_options, &errors);
496
      // [0] is expected to be the program name, add dummy string.
497
42
      env_argv.insert(env_argv.begin(), "");
498
84
      std::vector<std::string> invalid_args{};
499
42
      options_parser::Parse(&env_argv,
500
                            nullptr,
501
                            &invalid_args,
502
                            per_isolate_opts.get(),
503
                            kAllowedInEnvironment,
504
42
                            &errors);
505

42
      if (errors.size() > 0 && args[1]->IsObject()) {
506
        // Only fail for explicitly provided env, this protects from failures
507
        // when NODE_OPTIONS from parent's env is used (which is the default).
508
        Local<Value> error;
509
        if (!ToV8Value(env->context(), errors).ToLocal(&error)) return;
510
        Local<String> key =
511
            FIXED_ONE_BYTE_STRING(env->isolate(), "invalidNodeOptions");
512
        // Ignore the return value of Set() because exceptions bubble up to JS
513
        // when we return anyway.
514
        USE(args.This()->Set(env->context(), key, error));
515
        return;
516
      }
517
    }
518
#endif
519
  }
520
521
1098
  if (args[2]->IsArray()) {
522
82
    Local<Array> array = args[2].As<Array>();
523
    // The first argument is reserved for program name, but we don't need it
524
    // in workers.
525
79
    std::vector<std::string> exec_argv = {""};
526
41
    uint32_t length = array->Length();
527
75
    for (uint32_t i = 0; i < length; i++) {
528
      Local<Value> arg;
529
102
      if (!array->Get(env->context(), i).ToLocal(&arg)) {
530
        return;
531
      }
532
      MaybeLocal<String> arg_v8_string =
533
68
          arg->ToString(env->context());
534
34
      if (arg_v8_string.IsEmpty()) {
535
        return;
536
      }
537
      Utf8Value arg_utf8_value(
538
          args.GetIsolate(),
539
68
          arg_v8_string.FromMaybe(Local<String>()));
540
68
      std::string arg_string(arg_utf8_value.out(), arg_utf8_value.length());
541
34
      exec_argv.push_back(arg_string);
542
    }
543
544
79
    std::vector<std::string> invalid_args{};
545
79
    std::vector<std::string> errors{};
546
    // Using invalid_args as the v8_args argument as it stores unknown
547
    // options for the per isolate parser.
548
41
    options_parser::Parse(
549
        &exec_argv,
550
        &exec_argv_out,
551
        &invalid_args,
552
        per_isolate_opts.get(),
553
        kDisallowedInEnvironment,
554
41
        &errors);
555
556
    // The first argument is program name.
557
41
    invalid_args.erase(invalid_args.begin());
558

41
    if (errors.size() > 0 || invalid_args.size() > 0) {
559
      Local<Value> error;
560
6
      if (!ToV8Value(env->context(),
561
3
                     errors.size() > 0 ? errors : invalid_args)
562
3
                         .ToLocal(&error)) {
563
        return;
564
      }
565
      Local<String> key =
566
3
          FIXED_ONE_BYTE_STRING(env->isolate(), "invalidExecArgv");
567
      // Ignore the return value of Set() because exceptions bubble up to JS
568
      // when we return anyway.
569
9
      USE(args.This()->Set(env->context(), key, error));
570
3
      return;
571
    }
572
  } else {
573
508
    exec_argv_out = env->exec_argv();
574
  }
575
576
  Worker* worker = new Worker(env,
577
1092
                              args.This(),
578
                              url,
579
                              per_isolate_opts,
580
                              std::move(exec_argv_out),
581
546
                              env_vars);
582
583
1092
  CHECK(args[3]->IsFloat64Array());
584
1092
  Local<Float64Array> limit_info = args[3].As<Float64Array>();
585
546
  CHECK_EQ(limit_info->Length(), kTotalResourceLimitCount);
586
1092
  limit_info->CopyContents(worker->resource_limits_,
587
546
                           sizeof(worker->resource_limits_));
588
}
589
590
544
void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
591
  Worker* w;
592
544
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
593
1088
  Mutex::ScopedLock lock(w->mutex_);
594
595
544
  w->stopped_ = false;
596
597
544
  if (w->resource_limits_[kStackSizeMb] > 0) {
598
9
    if (w->resource_limits_[kStackSizeMb] * kMB < kStackBufferSize) {
599
3
      w->resource_limits_[kStackSizeMb] = kStackBufferSize / kMB;
600
3
      w->stack_size_ = kStackBufferSize;
601
    } else {
602
6
      w->stack_size_ = w->resource_limits_[kStackSizeMb] * kMB;
603
    }
604
  } else {
605
535
    w->resource_limits_[kStackSizeMb] = w->stack_size_ / kMB;
606
  }
607
608
  uv_thread_options_t thread_options;
609
544
  thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
610
544
  thread_options.stack_size = w->stack_size_;
611
2720
  int ret = uv_thread_create_ex(&w->tid_, &thread_options, [](void* arg) {
612
    // XXX: This could become a std::unique_ptr, but that makes at least
613
    // gcc 6.3 detect undefined behaviour when there shouldn't be any.
614
    // gcc 7+ handles this well.
615
544
    Worker* w = static_cast<Worker*>(arg);
616
544
    const uintptr_t stack_top = reinterpret_cast<uintptr_t>(&arg);
617
618
    // Leave a few kilobytes just to make sure we're within limits and have
619
    // some space to do work in C++ land.
620
544
    w->stack_base_ = stack_top - (w->stack_size_ - kStackBufferSize);
621
622
544
    w->Run();
623
624
1088
    Mutex::ScopedLock lock(w->mutex_);
625
1088
    w->env()->SetImmediateThreadsafe(
626
2128
        [w = std::unique_ptr<Worker>(w)](Environment* env) {
627
522
          if (w->has_ref_)
628
521
            env->add_refs(-1);
629
522
          w->JoinThread();
630
          // implicitly delete w
631
1062
        });
632
2176
  }, static_cast<void*>(w));
633
634
544
  if (ret == 0) {
635
    // The object now owns the created thread and should not be garbage
636
    // collected until that finishes.
637
544
    w->ClearWeak();
638
544
    w->thread_joined_ = false;
639
640
544
    if (w->has_ref_)
641
544
      w->env()->add_refs(1);
642
643
544
    w->env()->add_sub_worker_context(w);
644
  } else {
645
    w->stopped_ = true;
646
647
    char err_buf[128];
648
    uv_err_name_r(ret, err_buf, sizeof(err_buf));
649
    {
650
      Isolate* isolate = w->env()->isolate();
651
      HandleScope handle_scope(isolate);
652
      THROW_ERR_WORKER_INIT_FAILED(isolate, err_buf);
653
    }
654
  }
655
}
656
657
32
void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
658
  Worker* w;
659
32
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
660
661
32
  Debug(w, "Worker %llu is getting stopped by parent", w->thread_id_.id);
662
32
  w->Exit(1);
663
}
664
665
34
void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
666
  Worker* w;
667
34
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
668

34
  if (!w->has_ref_ && !w->thread_joined_) {
669
3
    w->has_ref_ = true;
670
3
    w->env()->add_refs(1);
671
  }
672
}
673
674
5
void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
675
  Worker* w;
676
5
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
677

5
  if (w->has_ref_ && !w->thread_joined_) {
678
4
    w->has_ref_ = false;
679
4
    w->env()->add_refs(-1);
680
  }
681
}
682
683
1
void Worker::GetResourceLimits(const FunctionCallbackInfo<Value>& args) {
684
  Worker* w;
685
1
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
686
3
  args.GetReturnValue().Set(w->GetResourceLimits(args.GetIsolate()));
687
}
688
689
301
Local<Float64Array> Worker::GetResourceLimits(Isolate* isolate) const {
690
301
  Local<ArrayBuffer> ab = ArrayBuffer::New(isolate, sizeof(resource_limits_));
691
692
903
  memcpy(ab->GetBackingStore()->Data(),
693
         resource_limits_,
694
301
         sizeof(resource_limits_));
695
301
  return Float64Array::New(ab, 0, kTotalResourceLimitCount);
696
}
697
698
332
void Worker::Exit(int code, const char* error_code, const char* error_message) {
699
664
  Mutex::ScopedLock lock(mutex_);
700
332
  Debug(this, "Worker %llu called Exit(%d, %s, %s)",
701
        thread_id_.id, code, error_code, error_message);
702
703
332
  if (error_code != nullptr) {
704
234
    custom_error_ = error_code;
705
234
    custom_error_str_ = error_message;
706
  }
707
708
332
  if (env_ != nullptr) {
709
82
    exit_code_ = code;
710
82
    Stop(env_);
711
  } else {
712
250
    stopped_ = true;
713
  }
714
332
}
715
716
void Worker::MemoryInfo(MemoryTracker* tracker) const {
717
  tracker->TrackField("parent_port", parent_port_);
718
}
719
720
class WorkerHeapSnapshotTaker : public AsyncWrap {
721
 public:
722
  WorkerHeapSnapshotTaker(Environment* env, Local<Object> obj)
723
    : AsyncWrap(env, obj, AsyncWrap::PROVIDER_WORKERHEAPSNAPSHOT) {}
724
725
  SET_NO_MEMORY_INFO()
726
  SET_MEMORY_INFO_NAME(WorkerHeapSnapshotTaker)
727
  SET_SELF_SIZE(WorkerHeapSnapshotTaker)
728
};
729
730
void Worker::TakeHeapSnapshot(const FunctionCallbackInfo<Value>& args) {
731
  Worker* w;
732
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
733
734
  Debug(w, "Worker %llu taking heap snapshot", w->thread_id_.id);
735
736
  Environment* env = w->env();
737
  AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
738
  Local<Object> wrap;
739
  if (!env->worker_heap_snapshot_taker_template()
740
      ->NewInstance(env->context()).ToLocal(&wrap)) {
741
    return;
742
  }
743
  BaseObjectPtr<WorkerHeapSnapshotTaker> taker =
744
      MakeDetachedBaseObject<WorkerHeapSnapshotTaker>(env, wrap);
745
746
  // Interrupt the worker thread and take a snapshot, then schedule a call
747
  // on the parent thread that turns that snapshot into a readable stream.
748
  bool scheduled = w->RequestInterrupt([taker, env](Environment* worker_env) {
749
    heap::HeapSnapshotPointer snapshot {
750
        worker_env->isolate()->GetHeapProfiler()->TakeHeapSnapshot() };
751
    CHECK(snapshot);
752
    env->SetImmediateThreadsafe(
753
        [taker, snapshot = std::move(snapshot)](Environment* env) mutable {
754
          HandleScope handle_scope(env->isolate());
755
          Context::Scope context_scope(env->context());
756
757
          AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker.get());
758
          BaseObjectPtr<AsyncWrap> stream = heap::CreateHeapSnapshotStream(
759
              env, std::move(snapshot));
760
          Local<Value> args[] = { stream->object() };
761
          taker->MakeCallback(env->ondone_string(), arraysize(args), args);
762
        }, CallbackFlags::kUnrefed);
763
  });
764
  args.GetReturnValue().Set(scheduled ? taker->object() : Local<Object>());
765
}
766
767
namespace {
768
769
// Return the MessagePort that is global for this Environment and communicates
770
// with the internal [kPort] port of the JS Worker class in the parent thread.
771
595
void GetEnvMessagePort(const FunctionCallbackInfo<Value>& args) {
772
595
  Environment* env = Environment::GetCurrent(args);
773
595
  Local<Object> port = env->message_port();
774

1190
  CHECK_IMPLIES(!env->is_main_thread(), !port.IsEmpty());
775
595
  if (!port.IsEmpty()) {
776
1785
    CHECK_EQ(port->CreationContext()->GetIsolate(), args.GetIsolate());
777
1190
    args.GetReturnValue().Set(port);
778
  }
779
595
}
780
781
3869
void InitWorker(Local<Object> target,
782
                Local<Value> unused,
783
                Local<Context> context,
784
                void* priv) {
785
3869
  Environment* env = Environment::GetCurrent(context);
786
787
  {
788
3869
    Local<FunctionTemplate> w = env->NewFunctionTemplate(Worker::New);
789
790
11607
    w->InstanceTemplate()->SetInternalFieldCount(
791
3869
        Worker::kInternalFieldCount);
792
7738
    w->Inherit(AsyncWrap::GetConstructorTemplate(env));
793
794
3869
    env->SetProtoMethod(w, "startThread", Worker::StartThread);
795
3869
    env->SetProtoMethod(w, "stopThread", Worker::StopThread);
796
3869
    env->SetProtoMethod(w, "ref", Worker::Ref);
797
3869
    env->SetProtoMethod(w, "unref", Worker::Unref);
798
3869
    env->SetProtoMethod(w, "getResourceLimits", Worker::GetResourceLimits);
799
3869
    env->SetProtoMethod(w, "takeHeapSnapshot", Worker::TakeHeapSnapshot);
800
801
    Local<String> workerString =
802
3869
        FIXED_ONE_BYTE_STRING(env->isolate(), "Worker");
803
3869
    w->SetClassName(workerString);
804
7738
    target->Set(env->context(),
805
                workerString,
806
19345
                w->GetFunction(env->context()).ToLocalChecked()).Check();
807
  }
808
809
  {
810
3869
    Local<FunctionTemplate> wst = FunctionTemplate::New(env->isolate());
811
812
11607
    wst->InstanceTemplate()->SetInternalFieldCount(
813
3869
        WorkerHeapSnapshotTaker::kInternalFieldCount);
814
7738
    wst->Inherit(AsyncWrap::GetConstructorTemplate(env));
815
816
    Local<String> wst_string =
817
3869
        FIXED_ONE_BYTE_STRING(env->isolate(), "WorkerHeapSnapshotTaker");
818
3869
    wst->SetClassName(wst_string);
819
3869
    env->set_worker_heap_snapshot_taker_template(wst->InstanceTemplate());
820
  }
821
822
3869
  env->SetMethod(target, "getEnvMessagePort", GetEnvMessagePort);
823
824
  target
825
7738
      ->Set(env->context(),
826
            env->thread_id_string(),
827
19345
            Number::New(env->isolate(), static_cast<double>(env->thread_id())))
828
      .Check();
829
830
  target
831
7738
      ->Set(env->context(),
832
            FIXED_ONE_BYTE_STRING(env->isolate(), "isMainThread"),
833
19345
            Boolean::New(env->isolate(), env->is_main_thread()))
834
      .Check();
835
836
  target
837
7738
      ->Set(env->context(),
838
            FIXED_ONE_BYTE_STRING(env->isolate(), "ownsProcessState"),
839
19345
            Boolean::New(env->isolate(), env->owns_process_state()))
840
      .Check();
841
842
3869
  if (!env->is_main_thread()) {
843
    target
844
600
        ->Set(env->context(),
845
              FIXED_ONE_BYTE_STRING(env->isolate(), "resourceLimits"),
846
1500
              env->worker_context()->GetResourceLimits(env->isolate()))
847
        .Check();
848
  }
849
850
15476
  NODE_DEFINE_CONSTANT(target, kMaxYoungGenerationSizeMb);
851
15476
  NODE_DEFINE_CONSTANT(target, kMaxOldGenerationSizeMb);
852
15476
  NODE_DEFINE_CONSTANT(target, kCodeRangeSizeMb);
853
15476
  NODE_DEFINE_CONSTANT(target, kStackSizeMb);
854
15476
  NODE_DEFINE_CONSTANT(target, kTotalResourceLimitCount);
855
3869
}
856
857
}  // anonymous namespace
858
859
}  // namespace worker
860
}  // namespace node
861
862
4325
NODE_MODULE_CONTEXT_AWARE_INTERNAL(worker, node::worker::InitWorker)