GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/node_worker.cc Lines: 407 456 89.3 %
Date: 2021-02-11 04:11:15 Branches: 192 258 74.4 %

Line Branch Exec Source
1
#include "node_worker.h"
2
#include "debug_utils-inl.h"
3
#include "memory_tracker-inl.h"
4
#include "node_errors.h"
5
#include "node_external_reference.h"
6
#include "node_buffer.h"
7
#include "node_options-inl.h"
8
#include "node_perf.h"
9
#include "util-inl.h"
10
#include "async_wrap-inl.h"
11
12
#include <memory>
13
#include <string>
14
#include <vector>
15
16
using node::kAllowedInEnvironment;
17
using node::kDisallowedInEnvironment;
18
using v8::Array;
19
using v8::ArrayBuffer;
20
using v8::Boolean;
21
using v8::Context;
22
using v8::Float64Array;
23
using v8::FunctionCallbackInfo;
24
using v8::FunctionTemplate;
25
using v8::HandleScope;
26
using v8::Integer;
27
using v8::Isolate;
28
using v8::Local;
29
using v8::Locker;
30
using v8::Maybe;
31
using v8::MaybeLocal;
32
using v8::Null;
33
using v8::Number;
34
using v8::Object;
35
using v8::ResourceConstraints;
36
using v8::SealHandleScope;
37
using v8::String;
38
using v8::TryCatch;
39
using v8::Value;
40
41
namespace node {
42
namespace worker {
43
44
constexpr double kMB = 1024 * 1024;
45
46
636
Worker::Worker(Environment* env,
47
               Local<Object> wrap,
48
               const std::string& url,
49
               std::shared_ptr<PerIsolateOptions> per_isolate_opts,
50
               std::vector<std::string>&& exec_argv,
51
636
               std::shared_ptr<KVStore> env_vars)
52
    : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER),
53
      per_isolate_opts_(per_isolate_opts),
54
      exec_argv_(exec_argv),
55
636
      platform_(env->isolate_data()->platform()),
56
      thread_id_(AllocateEnvironmentThreadId()),
57
1272
      env_vars_(env_vars) {
58
636
  Debug(this, "Creating new worker instance with thread id %llu",
59
        thread_id_.id);
60
61
  // Set up everything that needs to be set up in the parent environment.
62
636
  parent_port_ = MessagePort::New(env, env->context());
63
636
  if (parent_port_ == nullptr) {
64
    // This can happen e.g. because execution is terminating.
65
    return;
66
  }
67
68
636
  child_port_data_ = std::make_unique<MessagePortData>(nullptr);
69
636
  MessagePort::Entangle(parent_port_, child_port_data_.get());
70
71
1908
  object()->Set(env->context(),
72
                env->message_port_string(),
73
3180
                parent_port_->object()).Check();
74
75
1908
  object()->Set(env->context(),
76
                env->thread_id_string(),
77
3180
                Number::New(env->isolate(), static_cast<double>(thread_id_.id)))
78
      .Check();
79
80
1272
  inspector_parent_handle_ = GetInspectorParentHandle(
81
636
      env, thread_id_, url.c_str());
82
83
636
  argv_ = std::vector<std::string>{env->argv()[0]};
84
  // Mark this Worker object as weak until we actually start the thread.
85
636
  MakeWeak();
86
87
636
  Debug(this, "Preparation for worker %llu finished", thread_id_.id);
88
}
89
90
1578
bool Worker::is_stopped() const {
91
3156
  Mutex::ScopedLock lock(mutex_);
92
1578
  if (env_ != nullptr)
93
390
    return env_->is_stopping();
94
1188
  return stopped_;
95
}
96
97
404
void Worker::UpdateResourceConstraints(ResourceConstraints* constraints) {
98
404
  constraints->set_stack_limit(reinterpret_cast<uint32_t*>(stack_base_));
99
100
405
  if (resource_limits_[kMaxYoungGenerationSizeMb] > 0) {
101
1
    constraints->set_max_young_generation_size_in_bytes(
102
2
        static_cast<size_t>(resource_limits_[kMaxYoungGenerationSizeMb] * kMB));
103
  } else {
104
403
    resource_limits_[kMaxYoungGenerationSizeMb] =
105
404
        constraints->max_young_generation_size_in_bytes() / kMB;
106
  }
107
108
404
  if (resource_limits_[kMaxOldGenerationSizeMb] > 0) {
109
2
    constraints->set_max_old_generation_size_in_bytes(
110
4
        static_cast<size_t>(resource_limits_[kMaxOldGenerationSizeMb] * kMB));
111
  } else {
112
402
    resource_limits_[kMaxOldGenerationSizeMb] =
113
402
        constraints->max_old_generation_size_in_bytes() / kMB;
114
  }
115
116
404
  if (resource_limits_[kCodeRangeSizeMb] > 0) {
117
1
    constraints->set_code_range_size_in_bytes(
118
2
        static_cast<size_t>(resource_limits_[kCodeRangeSizeMb] * kMB));
119
  } else {
120
403
    resource_limits_[kCodeRangeSizeMb] =
121
403
        constraints->code_range_size_in_bytes() / kMB;
122
  }
123
404
}
124
125
// This class contains data that is only relevant to the child thread itself,
126
// and only while it is running.
127
// (Eventually, the Environment instance should probably also be moved here.)
128
class WorkerThreadData {
129
 public:
130
633
  explicit WorkerThreadData(Worker* w)
131
633
    : w_(w) {
132
633
    int ret = uv_loop_init(&loop_);
133
633
    if (ret != 0) {
134
      char err_buf[128];
135
228
      uv_err_name_r(ret, err_buf, sizeof(err_buf));
136
228
      w->Exit(1, "ERR_WORKER_INIT_FAILED", err_buf);
137
228
      return;
138
    }
139
405
    loop_init_failed_ = false;
140
405
    uv_loop_configure(&loop_, UV_METRICS_IDLE_TIME);
141
142
    std::shared_ptr<ArrayBufferAllocator> allocator =
143
810
        ArrayBufferAllocator::Create();
144
809
    Isolate::CreateParams params;
145
403
    SetIsolateCreateParamsForNode(&params);
146
404
    params.array_buffer_allocator_shared = allocator;
147
148
404
    w->UpdateResourceConstraints(&params.constraints);
149
150
404
    Isolate* isolate = Isolate::Allocate();
151
405
    if (isolate == nullptr) {
152
      // TODO(addaleax): This should be ERR_WORKER_INIT_FAILED,
153
      // ERR_WORKER_OUT_OF_MEMORY is for reaching the per-Worker heap limit.
154
      w->Exit(1, "ERR_WORKER_OUT_OF_MEMORY", "Failed to create new Isolate");
155
      return;
156
    }
157
158
405
    w->platform_->RegisterIsolate(isolate, &loop_);
159
405
    Isolate::Initialize(isolate, params);
160
405
    SetIsolateUpForNode(isolate);
161
162
    // Be sure it's called before Environment::InitializeDiagnostics()
163
    // so that this callback stays when the callback of
164
    // --heapsnapshot-near-heap-limit gets is popped.
165
405
    isolate->AddNearHeapLimitCallback(Worker::NearHeapLimit, w);
166
167
    {
168
405
      Locker locker(isolate);
169
810
      Isolate::Scope isolate_scope(isolate);
170
      // V8 computes its stack limit the first time a `Locker` is used based on
171
      // --stack-size. Reset it to the correct value.
172
405
      isolate->SetStackLimit(w->stack_base_);
173
174
810
      HandleScope handle_scope(isolate);
175
810
      isolate_data_.reset(CreateIsolateData(isolate,
176
                                            &loop_,
177
405
                                            w_->platform_,
178
405
                                            allocator.get()));
179
405
      CHECK(isolate_data_);
180
405
      if (w_->per_isolate_opts_)
181
102
        isolate_data_->set_options(std::move(w_->per_isolate_opts_));
182
405
      isolate_data_->set_worker_context(w_);
183
810
      isolate_data_->max_young_gen_size =
184
405
          params.constraints.max_young_generation_size_in_bytes();
185
    }
186
187
810
    Mutex::ScopedLock lock(w_->mutex_);
188
405
    w_->isolate_ = isolate;
189
  }
190
191
1265
  ~WorkerThreadData() {
192
632
    Debug(w_, "Worker %llu dispose isolate", w_->thread_id_.id);
193
    Isolate* isolate;
194
    {
195
1266
      Mutex::ScopedLock lock(w_->mutex_);
196
633
      isolate = w_->isolate_;
197
633
      w_->isolate_ = nullptr;
198
    }
199
200
633
    if (isolate != nullptr) {
201
405
      CHECK(!loop_init_failed_);
202
405
      bool platform_finished = false;
203
204
405
      isolate_data_.reset();
205
206
2430
      w_->platform_->AddIsolateFinishedCallback(isolate, [](void* data) {
207
405
        *static_cast<bool*>(data) = true;
208
2025
      }, &platform_finished);
209
210
      // The order of these calls is important; if the Isolate is first disposed
211
      // and then unregistered, there is a race condition window in which no
212
      // new Isolate at the same address can successfully be registered with
213
      // the platform.
214
      // (Refs: https://github.com/nodejs/node/issues/30846)
215
405
      w_->platform_->UnregisterIsolate(isolate);
216
405
      isolate->Dispose();
217
218
      // Wait until the platform has cleaned up all relevant resources.
219
1215
      while (!platform_finished) {
220
405
        uv_run(&loop_, UV_RUN_ONCE);
221
      }
222
    }
223
633
    if (!loop_init_failed_) {
224
405
      CheckedUvLoopClose(&loop_);
225
    }
226
633
  }
227
228
405
  bool loop_is_usable() const { return !loop_init_failed_; }
229
230
 private:
231
  Worker* const w_;
232
  uv_loop_t loop_;
233
  bool loop_init_failed_ = true;
234
  DeleteFnPtr<IsolateData, FreeIsolateData> isolate_data_;
235
236
  friend class Worker;
237
};
238
239
3
size_t Worker::NearHeapLimit(void* data, size_t current_heap_limit,
240
                             size_t initial_heap_limit) {
241
3
  Worker* worker = static_cast<Worker*>(data);
242
3
  worker->Exit(1, "ERR_WORKER_OUT_OF_MEMORY", "JS heap out of memory");
243
  // Give the current GC some extra leeway to let it finish rather than
244
  // crash hard. We are not going to perform further allocations anyway.
245
3
  constexpr size_t kExtraHeapAllowance = 16 * 1024 * 1024;
246
3
  return current_heap_limit + kExtraHeapAllowance;
247
}
248
249
633
void Worker::Run() {
250
1023
  std::string name = "WorkerThread ";
251
633
  name += std::to_string(thread_id_.id);
252
633
  TRACE_EVENT_METADATA1(
253
      "__metadata", "thread_name", "name",
254
      TRACE_STR_COPY(name.c_str()));
255
779
  CHECK_NOT_NULL(platform_);
256
633
257
633
  Debug(this, "Creating isolate for worker with id %llu", thread_id_.id);
258
4
259
1023
  WorkerThreadData data(this);
260
633
  if (isolate_ == nullptr) return;
261
405
  CHECK(data.loop_is_usable());
262
263
405
  Debug(this, "Starting worker with id %llu", thread_id_.id);
264
  {
265
795
    Locker locker(isolate_);
266
794
    Isolate::Scope isolate_scope(isolate_);
267
795
    SealHandleScope outer_seal(isolate_);
268
269
795
    DeleteFnPtr<Environment, FreeEnvironment> env_;
270
405
    auto cleanup_env = OnScopeLeave([&]() {
271
      // TODO(addaleax): This call is harmless but should not be necessary.
272
      // Figure out why V8 is raising a DCHECK() here without it
273
      // (in test/parallel/test-async-hooks-worker-asyncfn-terminate-4.js).
274
1575
      isolate_->CancelTerminateExecution();
275
276
795
      if (!env_) return;
277
388
      env_->set_can_call_into_js(false);
278
279
      {
280
780
        Mutex::ScopedLock lock(mutex_);
281
390
        stopped_ = true;
282
390
        this->env_ = nullptr;
283
      }
284
285
390
      env_.reset();
286
794
    });
287
288
405
    if (is_stopped()) return;
289
    {
290
787
      HandleScope handle_scope(isolate_);
291
      Local<Context> context;
292
      {
293
        // We create the Context object before we have an Environment* in place
294
        // that we could use for error handling. If creation fails due to
295
        // resource constraints, we need something in place to handle it,
296
        // though.
297
790
        TryCatch try_catch(isolate_);
298
397
        context = NewContext(isolate_);
299
397
        if (context.IsEmpty()) {
300
          // TODO(addaleax): This should be ERR_WORKER_INIT_FAILED,
301
          // ERR_WORKER_OUT_OF_MEMORY is for reaching the per-Worker heap limit.
302
4
          Exit(1, "ERR_WORKER_OUT_OF_MEMORY", "Failed to create new Context");
303
4
          return;
304
        }
305
      }
306
307
393
      if (is_stopped()) return;
308
390
      CHECK(!context.IsEmpty());
309
390
      Context::Scope context_scope(context);
310
      {
311
1170
        env_.reset(CreateEnvironment(
312
            data.isolate_data_.get(),
313
            context,
314
390
            std::move(argv_),
315
390
            std::move(exec_argv_),
316
390
            static_cast<EnvironmentFlags::Flags>(environment_flags_),
317
            thread_id_,
318
780
            std::move(inspector_parent_handle_)));
319
390
        if (is_stopped()) return;
320
390
        CHECK_NOT_NULL(env_);
321
390
        env_->set_env_vars(std::move(env_vars_));
322
833
        SetProcessExitHandler(env_.get(), [this](Environment*, int exit_code) {
323
53
          Exit(exit_code);
324
443
        });
325
      }
326
      {
327
780
        Mutex::ScopedLock lock(mutex_);
328
390
        if (stopped_) return;
329
390
        this->env_ = env_.get();
330
      }
331
390
      Debug(this, "Created Environment for worker with id %llu", thread_id_.id);
332
390
      if (is_stopped()) return;
333
      {
334
390
        CreateEnvMessagePort(env_.get());
335
390
        Debug(this, "Created message port for worker %llu", thread_id_.id);
336
780
        if (LoadEnvironment(env_.get(), StartExecutionCallback{}).IsEmpty())
337
          return;
338
339
390
        Debug(this, "Loaded environment for worker %llu", thread_id_.id);
340
      }
341
    }
342
343
    {
344
390
      Maybe<int> exit_code = SpinEventLoop(env_.get());
345
778
      Mutex::ScopedLock lock(mutex_);
346

707
      if (exit_code_ == 0 && exit_code.IsJust()) {
347
299
        exit_code_ = exit_code.FromJust();
348
      }
349
350
390
      Debug(this, "Exiting thread for worker %llu with exit code %d",
351
            thread_id_.id, exit_code_);
352
    }
353
  }
354
355
390
  Debug(this, "Worker %llu thread stops", thread_id_.id);
356
}
357
358
390
void Worker::CreateEnvMessagePort(Environment* env) {
359
780
  HandleScope handle_scope(isolate_);
360
780
  Mutex::ScopedLock lock(mutex_);
361
  // Set up the message channel for receiving messages in the child.
362
1170
  MessagePort* child_port = MessagePort::New(env,
363
                                             env->context(),
364
780
                                             std::move(child_port_data_));
365
  // MessagePort::New() may return nullptr if execution is terminated
366
  // within it.
367
390
  if (child_port != nullptr)
368
390
    env->set_message_port(child_port->object(isolate_));
369
390
}
370
371
636
void Worker::JoinThread() {
372
636
  if (thread_joined_)
373
3
    return;
374
633
  CHECK_EQ(uv_thread_join(&tid_), 0);
375
633
  thread_joined_ = true;
376
377
633
  env()->remove_sub_worker_context(this);
378
379
  {
380
1264
    HandleScope handle_scope(env()->isolate());
381
633
    Context::Scope context_scope(env()->context());
382
383
    // Reset the parent port as we're closing it now anyway.
384
1899
    object()->Set(env()->context(),
385
                  env()->message_port_string(),
386
3165
                  Undefined(env()->isolate())).Check();
387
388
    Local<Value> args[] = {
389
        Integer::New(env()->isolate(), exit_code_),
390
633
        custom_error_ != nullptr
391
867
            ? OneByteString(env()->isolate(), custom_error_).As<Value>()
392
1431
            : Null(env()->isolate()).As<Value>(),
393
633
        !custom_error_str_.empty()
394
867
            ? OneByteString(env()->isolate(), custom_error_str_.c_str())
395
                  .As<Value>()
396
1431
            : Null(env()->isolate()).As<Value>(),
397

5064
    };
398
399
633
    MakeCallback(env()->onexit_string(), arraysize(args), args);
400
  }
401
402
  // If we get here, the !thread_joined_ condition at the top of the function
403
  // implies that the thread was running. In that case, its final action will
404
  // be to schedule a callback on the parent thread which will delete this
405
  // object, so there's nothing more to do here.
406
}
407
408
1836
Worker::~Worker() {
409
1224
  Mutex::ScopedLock lock(mutex_);
410
411
612
  CHECK(stopped_);
412
612
  CHECK_NULL(env_);
413
612
  CHECK(thread_joined_);
414
415
612
  Debug(this, "Worker %llu destroyed", thread_id_.id);
416
1224
}
417
418
640
void Worker::New(const FunctionCallbackInfo<Value>& args) {
419
640
  Environment* env = Environment::GetCurrent(args);
420
640
  Isolate* isolate = args.GetIsolate();
421
422
640
  CHECK(args.IsConstructCall());
423
424
640
  if (env->isolate_data()->platform() == nullptr) {
425
    THROW_ERR_MISSING_PLATFORM_FOR_WORKER(env);
426
    return;
427
  }
428
429
1276
  std::string url;
430
1276
  std::shared_ptr<PerIsolateOptions> per_isolate_opts = nullptr;
431
1276
  std::shared_ptr<KVStore> env_vars = nullptr;
432
433
1276
  std::vector<std::string> exec_argv_out;
434
435
  // Argument might be a string or URL
436
1920
  if (!args[0]->IsNullOrUndefined()) {
437
    Utf8Value value(
438
870
        isolate, args[0]->ToString(env->context()).FromMaybe(Local<String>()));
439
174
    url.append(value.out(), value.length());
440
  }
441
442
1920
  if (args[1]->IsNull()) {
443
    // Means worker.env = { ...process.env }.
444
614
    env_vars = env->env_vars()->Clone(isolate);
445
52
  } else if (args[1]->IsObject()) {
446
    // User provided env.
447
25
    env_vars = KVStore::CreateMapKVStore();
448
25
    env_vars->AssignFromObject(isolate->GetCurrentContext(),
449
75
                               args[1].As<Object>());
450
  } else {
451
    // Env is shared.
452
1
    env_vars = env->env_vars();
453
  }
454
455

2510
  if (args[1]->IsObject() || args[2]->IsArray()) {
456
106
    per_isolate_opts.reset(new PerIsolateOptions());
457
458
1060
    HandleEnvOptions(per_isolate_opts->per_env, [&env_vars](const char* name) {
459
848
      return env_vars->Get(name).FromMaybe("");
460
954
    });
461
462
#ifndef NODE_WITHOUT_NODE_OPTIONS
463
    MaybeLocal<String> maybe_node_opts =
464
106
        env_vars->Get(isolate, OneByteString(isolate, "NODE_OPTIONS"));
465
    Local<String> node_opts;
466
106
    if (maybe_node_opts.ToLocal(&node_opts)) {
467
314
      std::string node_options(*String::Utf8Value(isolate, node_opts));
468
209
      std::vector<std::string> errors{};
469
      std::vector<std::string> env_argv =
470
209
          ParseNodeOptionsEnvVar(node_options, &errors);
471
      // [0] is expected to be the program name, add dummy string.
472
105
      env_argv.insert(env_argv.begin(), "");
473
209
      std::vector<std::string> invalid_args{};
474
105
      options_parser::Parse(&env_argv,
475
                            nullptr,
476
                            &invalid_args,
477
                            per_isolate_opts.get(),
478
                            kAllowedInEnvironment,
479
105
                            &errors);
480

107
      if (!errors.empty() && args[1]->IsObject()) {
481
        // Only fail for explicitly provided env, this protects from failures
482
        // when NODE_OPTIONS from parent's env is used (which is the default).
483
        Local<Value> error;
484
2
        if (!ToV8Value(env->context(), errors).ToLocal(&error)) return;
485
        Local<String> key =
486
1
            FIXED_ONE_BYTE_STRING(env->isolate(), "invalidNodeOptions");
487
        // Ignore the return value of Set() because exceptions bubble up to JS
488
        // when we return anyway.
489
3
        USE(args.This()->Set(env->context(), key, error));
490
1
        return;
491
      }
492
    }
493
#endif
494
  }
495
496
1278
  if (args[2]->IsArray()) {
497
206
    Local<Array> array = args[2].As<Array>();
498
    // The first argument is reserved for program name, but we don't need it
499
    // in workers.
500
203
    std::vector<std::string> exec_argv = {""};
501
103
    uint32_t length = array->Length();
502
154
    for (uint32_t i = 0; i < length; i++) {
503
      Local<Value> arg;
504
153
      if (!array->Get(env->context(), i).ToLocal(&arg)) {
505
        return;
506
      }
507
      Local<String> arg_v8;
508
153
      if (!arg->ToString(env->context()).ToLocal(&arg_v8)) {
509
        return;
510
      }
511
102
      Utf8Value arg_utf8_value(args.GetIsolate(), arg_v8);
512
102
      std::string arg_string(arg_utf8_value.out(), arg_utf8_value.length());
513
51
      exec_argv.push_back(arg_string);
514
    }
515
516
203
    std::vector<std::string> invalid_args{};
517
203
    std::vector<std::string> errors{};
518
    // Using invalid_args as the v8_args argument as it stores unknown
519
    // options for the per isolate parser.
520
103
    options_parser::Parse(
521
        &exec_argv,
522
        &exec_argv_out,
523
        &invalid_args,
524
        per_isolate_opts.get(),
525
        kDisallowedInEnvironment,
526
103
        &errors);
527
528
    // The first argument is program name.
529
103
    invalid_args.erase(invalid_args.begin());
530

103
    if (errors.size() > 0 || invalid_args.size() > 0) {
531
      Local<Value> error;
532
6
      if (!ToV8Value(env->context(),
533
3
                     errors.size() > 0 ? errors : invalid_args)
534
3
                         .ToLocal(&error)) {
535
        return;
536
      }
537
      Local<String> key =
538
3
          FIXED_ONE_BYTE_STRING(env->isolate(), "invalidExecArgv");
539
      // Ignore the return value of Set() because exceptions bubble up to JS
540
      // when we return anyway.
541
9
      USE(args.This()->Set(env->context(), key, error));
542
3
      return;
543
    }
544
  } else {
545
536
    exec_argv_out = env->exec_argv();
546
  }
547
548
  Worker* worker = new Worker(env,
549
1272
                              args.This(),
550
                              url,
551
                              per_isolate_opts,
552
                              std::move(exec_argv_out),
553
636
                              env_vars);
554
555
1272
  CHECK(args[3]->IsFloat64Array());
556
1272
  Local<Float64Array> limit_info = args[3].As<Float64Array>();
557
636
  CHECK_EQ(limit_info->Length(), kTotalResourceLimitCount);
558
1272
  limit_info->CopyContents(worker->resource_limits_,
559
636
                           sizeof(worker->resource_limits_));
560
561
1272
  CHECK(args[4]->IsBoolean());
562

1272
  if (args[4]->IsTrue() || env->tracks_unmanaged_fds())
563
635
    worker->environment_flags_ |= EnvironmentFlags::kTrackUnmanagedFds;
564
}
565
566
633
void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
567
  Worker* w;
568
633
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
569
1266
  Mutex::ScopedLock lock(w->mutex_);
570
571
633
  w->stopped_ = false;
572
573
633
  if (w->resource_limits_[kStackSizeMb] > 0) {
574
9
    if (w->resource_limits_[kStackSizeMb] * kMB < kStackBufferSize) {
575
3
      w->resource_limits_[kStackSizeMb] = kStackBufferSize / kMB;
576
3
      w->stack_size_ = kStackBufferSize;
577
    } else {
578
12
      w->stack_size_ =
579
6
          static_cast<size_t>(w->resource_limits_[kStackSizeMb] * kMB);
580
    }
581
  } else {
582
624
    w->resource_limits_[kStackSizeMb] = w->stack_size_ / kMB;
583
  }
584
585
  uv_thread_options_t thread_options;
586
633
  thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
587
633
  thread_options.stack_size = w->stack_size_;
588
3165
  int ret = uv_thread_create_ex(&w->tid_, &thread_options, [](void* arg) {
589
    // XXX: This could become a std::unique_ptr, but that makes at least
590
    // gcc 6.3 detect undefined behaviour when there shouldn't be any.
591
    // gcc 7+ handles this well.
592
633
    Worker* w = static_cast<Worker*>(arg);
593
633
    const uintptr_t stack_top = reinterpret_cast<uintptr_t>(&arg);
594
595
    // Leave a few kilobytes just to make sure we're within limits and have
596
    // some space to do work in C++ land.
597
633
    w->stack_base_ = stack_top - (w->stack_size_ - kStackBufferSize);
598
599
633
    w->Run();
600
601
1266
    Mutex::ScopedLock lock(w->mutex_);
602
1266
    w->env()->SetImmediateThreadsafe(
603
2486
        [w = std::unique_ptr<Worker>(w)](Environment* env) {
604
611
          if (w->has_ref_)
605
610
            env->add_refs(-1);
606
611
          w->JoinThread();
607
          // implicitly delete w
608
1242
        });
609
2532
  }, static_cast<void*>(w));
610
611
633
  if (ret == 0) {
612
    // The object now owns the created thread and should not be garbage
613
    // collected until that finishes.
614
633
    w->ClearWeak();
615
633
    w->thread_joined_ = false;
616
617
633
    if (w->has_ref_)
618
633
      w->env()->add_refs(1);
619
620
633
    w->env()->add_sub_worker_context(w);
621
  } else {
622
    w->stopped_ = true;
623
624
    char err_buf[128];
625
    uv_err_name_r(ret, err_buf, sizeof(err_buf));
626
    {
627
      Isolate* isolate = w->env()->isolate();
628
      HandleScope handle_scope(isolate);
629
      THROW_ERR_WORKER_INIT_FAILED(isolate, err_buf);
630
    }
631
  }
632
}
633
634
32
void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
635
  Worker* w;
636
32
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
637
638
32
  Debug(w, "Worker %llu is getting stopped by parent", w->thread_id_.id);
639
32
  w->Exit(1);
640
}
641
642
34
void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
643
  Worker* w;
644
34
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
645

34
  if (!w->has_ref_ && !w->thread_joined_) {
646
3
    w->has_ref_ = true;
647
3
    w->env()->add_refs(1);
648
  }
649
}
650
651
5
void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
652
  Worker* w;
653
5
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
654

5
  if (w->has_ref_ && !w->thread_joined_) {
655
4
    w->has_ref_ = false;
656
4
    w->env()->add_refs(-1);
657
  }
658
}
659
660
1
void Worker::GetResourceLimits(const FunctionCallbackInfo<Value>& args) {
661
  Worker* w;
662
1
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
663
3
  args.GetReturnValue().Set(w->GetResourceLimits(args.GetIsolate()));
664
}
665
666
391
Local<Float64Array> Worker::GetResourceLimits(Isolate* isolate) const {
667
391
  Local<ArrayBuffer> ab = ArrayBuffer::New(isolate, sizeof(resource_limits_));
668
669
1173
  memcpy(ab->GetBackingStore()->Data(),
670
         resource_limits_,
671
391
         sizeof(resource_limits_));
672
391
  return Float64Array::New(ab, 0, kTotalResourceLimitCount);
673
}
674
675
345
void Worker::Exit(int code, const char* error_code, const char* error_message) {
676
690
  Mutex::ScopedLock lock(mutex_);
677
345
  Debug(this, "Worker %llu called Exit(%d, %s, %s)",
678
        thread_id_.id, code, error_code, error_message);
679
680
345
  if (error_code != nullptr) {
681
235
    custom_error_ = error_code;
682
235
    custom_error_str_ = error_message;
683
  }
684
685
345
  if (env_ != nullptr) {
686
91
    exit_code_ = code;
687
91
    Stop(env_);
688
  } else {
689
254
    stopped_ = true;
690
  }
691
345
}
692
693
void Worker::MemoryInfo(MemoryTracker* tracker) const {
694
  tracker->TrackField("parent_port", parent_port_);
695
}
696
697
bool Worker::IsNotIndicativeOfMemoryLeakAtExit() const {
698
  // Worker objects always stay alive as long as the child thread, regardless
699
  // of whether they are being referenced in the parent thread.
700
  return true;
701
}
702
703
class WorkerHeapSnapshotTaker : public AsyncWrap {
704
 public:
705
  WorkerHeapSnapshotTaker(Environment* env, Local<Object> obj)
706
    : AsyncWrap(env, obj, AsyncWrap::PROVIDER_WORKERHEAPSNAPSHOT) {}
707
708
  SET_NO_MEMORY_INFO()
709
  SET_MEMORY_INFO_NAME(WorkerHeapSnapshotTaker)
710
  SET_SELF_SIZE(WorkerHeapSnapshotTaker)
711
};
712
713
void Worker::TakeHeapSnapshot(const FunctionCallbackInfo<Value>& args) {
714
  Worker* w;
715
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
716
717
  Debug(w, "Worker %llu taking heap snapshot", w->thread_id_.id);
718
719
  Environment* env = w->env();
720
  AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
721
  Local<Object> wrap;
722
  if (!env->worker_heap_snapshot_taker_template()
723
      ->NewInstance(env->context()).ToLocal(&wrap)) {
724
    return;
725
  }
726
  BaseObjectPtr<WorkerHeapSnapshotTaker> taker =
727
      MakeDetachedBaseObject<WorkerHeapSnapshotTaker>(env, wrap);
728
729
  // Interrupt the worker thread and take a snapshot, then schedule a call
730
  // on the parent thread that turns that snapshot into a readable stream.
731
  bool scheduled = w->RequestInterrupt([taker, env](Environment* worker_env) {
732
    heap::HeapSnapshotPointer snapshot {
733
        worker_env->isolate()->GetHeapProfiler()->TakeHeapSnapshot() };
734
    CHECK(snapshot);
735
    env->SetImmediateThreadsafe(
736
        [taker, snapshot = std::move(snapshot)](Environment* env) mutable {
737
          HandleScope handle_scope(env->isolate());
738
          Context::Scope context_scope(env->context());
739
740
          AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker.get());
741
          BaseObjectPtr<AsyncWrap> stream = heap::CreateHeapSnapshotStream(
742
              env, std::move(snapshot));
743
          Local<Value> args[] = { stream->object() };
744
          taker->MakeCallback(env->ondone_string(), arraysize(args), args);
745
        }, CallbackFlags::kUnrefed);
746
  });
747
  args.GetReturnValue().Set(scheduled ? taker->object() : Local<Object>());
748
}
749
750
7
void Worker::LoopIdleTime(const FunctionCallbackInfo<Value>& args) {
751
  Worker* w;
752
7
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
753
754
14
  Mutex::ScopedLock lock(w->mutex_);
755
  // Using w->is_stopped() here leads to a deadlock, and checking is_stopped()
756
  // before locking the mutex is a race condition. So manually do the same
757
  // check.
758

7
  if (w->stopped_ || w->env_ == nullptr)
759
    return args.GetReturnValue().Set(-1);
760
761
7
  uint64_t idle_time = uv_metrics_idle_time(w->env_->event_loop());
762
21
  args.GetReturnValue().Set(1.0 * idle_time / 1e6);
763
}
764
765
1
void Worker::LoopStartTime(const FunctionCallbackInfo<Value>& args) {
766
  Worker* w;
767
1
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
768
769
2
  Mutex::ScopedLock lock(w->mutex_);
770
  // Using w->is_stopped() here leads to a deadlock, and checking is_stopped()
771
  // before locking the mutex is a race condition. So manually do the same
772
  // check.
773

1
  if (w->stopped_ || w->env_ == nullptr)
774
    return args.GetReturnValue().Set(-1);
775
776
1
  double loop_start_time = w->env_->performance_state()->milestones[
777
1
      node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START];
778
1
  CHECK_GE(loop_start_time, 0);
779
3
  args.GetReturnValue().Set(
780
1
      (loop_start_time - node::performance::timeOrigin) / 1e6);
781
}
782
783
namespace {
784
785
// Return the MessagePort that is global for this Environment and communicates
786
// with the internal [kPort] port of the JS Worker class in the parent thread.
787
781
void GetEnvMessagePort(const FunctionCallbackInfo<Value>& args) {
788
781
  Environment* env = Environment::GetCurrent(args);
789
781
  Local<Object> port = env->message_port();
790

1562
  CHECK_IMPLIES(!env->is_main_thread(), !port.IsEmpty());
791
781
  if (!port.IsEmpty()) {
792
2343
    CHECK_EQ(port->CreationContext()->GetIsolate(), args.GetIsolate());
793
1562
    args.GetReturnValue().Set(port);
794
  }
795
781
}
796
797
444
void InitWorker(Local<Object> target,
798
                Local<Value> unused,
799
                Local<Context> context,
800
                void* priv) {
801
444
  Environment* env = Environment::GetCurrent(context);
802
803
  {
804
444
    Local<FunctionTemplate> w = env->NewFunctionTemplate(Worker::New);
805
806
1332
    w->InstanceTemplate()->SetInternalFieldCount(
807
444
        Worker::kInternalFieldCount);
808
888
    w->Inherit(AsyncWrap::GetConstructorTemplate(env));
809
810
444
    env->SetProtoMethod(w, "startThread", Worker::StartThread);
811
444
    env->SetProtoMethod(w, "stopThread", Worker::StopThread);
812
444
    env->SetProtoMethod(w, "ref", Worker::Ref);
813
444
    env->SetProtoMethod(w, "unref", Worker::Unref);
814
444
    env->SetProtoMethod(w, "getResourceLimits", Worker::GetResourceLimits);
815
444
    env->SetProtoMethod(w, "takeHeapSnapshot", Worker::TakeHeapSnapshot);
816
444
    env->SetProtoMethod(w, "loopIdleTime", Worker::LoopIdleTime);
817
444
    env->SetProtoMethod(w, "loopStartTime", Worker::LoopStartTime);
818
819
444
    env->SetConstructorFunction(target, "Worker", w);
820
  }
821
822
  {
823
444
    Local<FunctionTemplate> wst = FunctionTemplate::New(env->isolate());
824
825
1332
    wst->InstanceTemplate()->SetInternalFieldCount(
826
444
        WorkerHeapSnapshotTaker::kInternalFieldCount);
827
888
    wst->Inherit(AsyncWrap::GetConstructorTemplate(env));
828
829
    Local<String> wst_string =
830
444
        FIXED_ONE_BYTE_STRING(env->isolate(), "WorkerHeapSnapshotTaker");
831
444
    wst->SetClassName(wst_string);
832
444
    env->set_worker_heap_snapshot_taker_template(wst->InstanceTemplate());
833
  }
834
835
444
  env->SetMethod(target, "getEnvMessagePort", GetEnvMessagePort);
836
837
  target
838
888
      ->Set(env->context(),
839
            env->thread_id_string(),
840
2220
            Number::New(env->isolate(), static_cast<double>(env->thread_id())))
841
      .Check();
842
843
  target
844
888
      ->Set(env->context(),
845
            FIXED_ONE_BYTE_STRING(env->isolate(), "isMainThread"),
846
2220
            Boolean::New(env->isolate(), env->is_main_thread()))
847
      .Check();
848
849
  target
850
888
      ->Set(env->context(),
851
            FIXED_ONE_BYTE_STRING(env->isolate(), "ownsProcessState"),
852
2220
            Boolean::New(env->isolate(), env->owns_process_state()))
853
      .Check();
854
855
444
  if (!env->is_main_thread()) {
856
    target
857
780
        ->Set(env->context(),
858
              FIXED_ONE_BYTE_STRING(env->isolate(), "resourceLimits"),
859
1950
              env->worker_context()->GetResourceLimits(env->isolate()))
860
        .Check();
861
  }
862
863
888
  NODE_DEFINE_CONSTANT(target, kMaxYoungGenerationSizeMb);
864
444
  NODE_DEFINE_CONSTANT(target, kMaxOldGenerationSizeMb);
865
1332
  NODE_DEFINE_CONSTANT(target, kCodeRangeSizeMb);
866
2220
  NODE_DEFINE_CONSTANT(target, kStackSizeMb);
867
3552
  NODE_DEFINE_CONSTANT(target, kTotalResourceLimitCount);
868
3996
}
869
3108
870
5953
void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
871
5509
  registry->Register(GetEnvMessagePort);
872
4621
  registry->Register(Worker::New);
873
4621
  registry->Register(Worker::StartThread);
874
4621
  registry->Register(Worker::StopThread);
875
4621
  registry->Register(Worker::Ref);
876
4621
  registry->Register(Worker::Unref);
877
4621
  registry->Register(Worker::GetResourceLimits);
878
4621
  registry->Register(Worker::TakeHeapSnapshot);
879
4621
  registry->Register(Worker::LoopIdleTime);
880
4621
  registry->Register(Worker::LoopStartTime);
881
4621
}
882
883
}  // anonymous namespace
884
}  // namespace worker
885
}  // namespace node
886
887
4691
NODE_MODULE_CONTEXT_AWARE_INTERNAL(worker, node::worker::InitWorker)
888

18718
NODE_MODULE_EXTERNAL_REFERENCE(worker, node::worker::RegisterExternalReferences)