GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/node_worker.cc Lines: 51 448 11.4 %
Date: 2021-02-19 04:08:54 Branches: 3 258 1.2 %

Line Branch Exec Source
1
#include "node_worker.h"
2
#include "debug_utils-inl.h"
3
#include "histogram-inl.h"
4
#include "memory_tracker-inl.h"
5
#include "node_errors.h"
6
#include "node_external_reference.h"
7
#include "node_buffer.h"
8
#include "node_options-inl.h"
9
#include "node_perf.h"
10
#include "util-inl.h"
11
#include "async_wrap-inl.h"
12
13
#include <memory>
14
#include <string>
15
#include <vector>
16
17
using node::kAllowedInEnvironment;
18
using node::kDisallowedInEnvironment;
19
using v8::Array;
20
using v8::ArrayBuffer;
21
using v8::Boolean;
22
using v8::Context;
23
using v8::Float64Array;
24
using v8::FunctionCallbackInfo;
25
using v8::FunctionTemplate;
26
using v8::HandleScope;
27
using v8::Integer;
28
using v8::Isolate;
29
using v8::Local;
30
using v8::Locker;
31
using v8::Maybe;
32
using v8::MaybeLocal;
33
using v8::Null;
34
using v8::Number;
35
using v8::Object;
36
using v8::ResourceConstraints;
37
using v8::SealHandleScope;
38
using v8::String;
39
using v8::TryCatch;
40
using v8::Value;
41
42
namespace node {
43
namespace worker {
44
45
constexpr double kMB = 1024 * 1024;
46
47
Worker::Worker(Environment* env,
48
               Local<Object> wrap,
49
               const std::string& url,
50
               std::shared_ptr<PerIsolateOptions> per_isolate_opts,
51
               std::vector<std::string>&& exec_argv,
52
               std::shared_ptr<KVStore> env_vars)
53
    : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER),
54
      per_isolate_opts_(per_isolate_opts),
55
      exec_argv_(exec_argv),
56
      platform_(env->isolate_data()->platform()),
57
      thread_id_(AllocateEnvironmentThreadId()),
58
      env_vars_(env_vars) {
59
  Debug(this, "Creating new worker instance with thread id %llu",
60
        thread_id_.id);
61
62
  // Set up everything that needs to be set up in the parent environment.
63
  parent_port_ = MessagePort::New(env, env->context());
64
  if (parent_port_ == nullptr) {
65
    // This can happen e.g. because execution is terminating.
66
    return;
67
  }
68
69
  child_port_data_ = std::make_unique<MessagePortData>(nullptr);
70
  MessagePort::Entangle(parent_port_, child_port_data_.get());
71
72
  object()->Set(env->context(),
73
                env->message_port_string(),
74
                parent_port_->object()).Check();
75
76
  object()->Set(env->context(),
77
                env->thread_id_string(),
78
                Number::New(env->isolate(), static_cast<double>(thread_id_.id)))
79
      .Check();
80
81
  inspector_parent_handle_ = GetInspectorParentHandle(
82
      env, thread_id_, url.c_str());
83
84
  argv_ = std::vector<std::string>{env->argv()[0]};
85
  // Mark this Worker object as weak until we actually start the thread.
86
  MakeWeak();
87
88
  Debug(this, "Preparation for worker %llu finished", thread_id_.id);
89
}
90
91
bool Worker::is_stopped() const {
92
  Mutex::ScopedLock lock(mutex_);
93
  if (env_ != nullptr)
94
    return env_->is_stopping();
95
  return stopped_;
96
}
97
98
void Worker::UpdateResourceConstraints(ResourceConstraints* constraints) {
99
  constraints->set_stack_limit(reinterpret_cast<uint32_t*>(stack_base_));
100
101
  if (resource_limits_[kMaxYoungGenerationSizeMb] > 0) {
102
    constraints->set_max_young_generation_size_in_bytes(
103
        static_cast<size_t>(resource_limits_[kMaxYoungGenerationSizeMb] * kMB));
104
  } else {
105
    resource_limits_[kMaxYoungGenerationSizeMb] =
106
        constraints->max_young_generation_size_in_bytes() / kMB;
107
  }
108
109
  if (resource_limits_[kMaxOldGenerationSizeMb] > 0) {
110
    constraints->set_max_old_generation_size_in_bytes(
111
        static_cast<size_t>(resource_limits_[kMaxOldGenerationSizeMb] * kMB));
112
  } else {
113
    resource_limits_[kMaxOldGenerationSizeMb] =
114
        constraints->max_old_generation_size_in_bytes() / kMB;
115
  }
116
117
  if (resource_limits_[kCodeRangeSizeMb] > 0) {
118
    constraints->set_code_range_size_in_bytes(
119
        static_cast<size_t>(resource_limits_[kCodeRangeSizeMb] * kMB));
120
  } else {
121
    resource_limits_[kCodeRangeSizeMb] =
122
        constraints->code_range_size_in_bytes() / kMB;
123
  }
124
}
125
126
// This class contains data that is only relevant to the child thread itself,
127
// and only while it is running.
128
// (Eventually, the Environment instance should probably also be moved here.)
129
class WorkerThreadData {
130
 public:
131
  explicit WorkerThreadData(Worker* w)
132
    : w_(w) {
133
    int ret = uv_loop_init(&loop_);
134
    if (ret != 0) {
135
      char err_buf[128];
136
      uv_err_name_r(ret, err_buf, sizeof(err_buf));
137
      w->Exit(1, "ERR_WORKER_INIT_FAILED", err_buf);
138
      return;
139
    }
140
    loop_init_failed_ = false;
141
    uv_loop_configure(&loop_, UV_METRICS_IDLE_TIME);
142
143
    std::shared_ptr<ArrayBufferAllocator> allocator =
144
        ArrayBufferAllocator::Create();
145
    Isolate::CreateParams params;
146
    SetIsolateCreateParamsForNode(&params);
147
    params.array_buffer_allocator_shared = allocator;
148
149
    w->UpdateResourceConstraints(&params.constraints);
150
151
    Isolate* isolate = Isolate::Allocate();
152
    if (isolate == nullptr) {
153
      // TODO(addaleax): This should be ERR_WORKER_INIT_FAILED,
154
      // ERR_WORKER_OUT_OF_MEMORY is for reaching the per-Worker heap limit.
155
      w->Exit(1, "ERR_WORKER_OUT_OF_MEMORY", "Failed to create new Isolate");
156
      return;
157
    }
158
159
    w->platform_->RegisterIsolate(isolate, &loop_);
160
    Isolate::Initialize(isolate, params);
161
    SetIsolateUpForNode(isolate);
162
163
    // Be sure it's called before Environment::InitializeDiagnostics()
164
    // so that this callback stays when the callback of
165
    // --heapsnapshot-near-heap-limit gets is popped.
166
    isolate->AddNearHeapLimitCallback(Worker::NearHeapLimit, w);
167
168
    {
169
      Locker locker(isolate);
170
      Isolate::Scope isolate_scope(isolate);
171
      // V8 computes its stack limit the first time a `Locker` is used based on
172
      // --stack-size. Reset it to the correct value.
173
      isolate->SetStackLimit(w->stack_base_);
174
175
      HandleScope handle_scope(isolate);
176
      isolate_data_.reset(CreateIsolateData(isolate,
177
                                            &loop_,
178
                                            w_->platform_,
179
                                            allocator.get()));
180
      CHECK(isolate_data_);
181
      if (w_->per_isolate_opts_)
182
        isolate_data_->set_options(std::move(w_->per_isolate_opts_));
183
      isolate_data_->set_worker_context(w_);
184
      isolate_data_->max_young_gen_size =
185
          params.constraints.max_young_generation_size_in_bytes();
186
    }
187
188
    Mutex::ScopedLock lock(w_->mutex_);
189
    w_->isolate_ = isolate;
190
  }
191
192
  ~WorkerThreadData() {
193
    Debug(w_, "Worker %llu dispose isolate", w_->thread_id_.id);
194
    Isolate* isolate;
195
    {
196
      Mutex::ScopedLock lock(w_->mutex_);
197
      isolate = w_->isolate_;
198
      w_->isolate_ = nullptr;
199
    }
200
201
    if (isolate != nullptr) {
202
      CHECK(!loop_init_failed_);
203
      bool platform_finished = false;
204
205
      isolate_data_.reset();
206
207
      w_->platform_->AddIsolateFinishedCallback(isolate, [](void* data) {
208
        *static_cast<bool*>(data) = true;
209
      }, &platform_finished);
210
211
      // The order of these calls is important; if the Isolate is first disposed
212
      // and then unregistered, there is a race condition window in which no
213
      // new Isolate at the same address can successfully be registered with
214
      // the platform.
215
      // (Refs: https://github.com/nodejs/node/issues/30846)
216
      w_->platform_->UnregisterIsolate(isolate);
217
      isolate->Dispose();
218
219
      // Wait until the platform has cleaned up all relevant resources.
220
      while (!platform_finished) {
221
        uv_run(&loop_, UV_RUN_ONCE);
222
      }
223
    }
224
    if (!loop_init_failed_) {
225
      CheckedUvLoopClose(&loop_);
226
    }
227
  }
228
229
  bool loop_is_usable() const { return !loop_init_failed_; }
230
231
 private:
232
  Worker* const w_;
233
  uv_loop_t loop_;
234
  bool loop_init_failed_ = true;
235
  DeleteFnPtr<IsolateData, FreeIsolateData> isolate_data_;
236
237
  friend class Worker;
238
};
239
240
size_t Worker::NearHeapLimit(void* data, size_t current_heap_limit,
241
                             size_t initial_heap_limit) {
242
  Worker* worker = static_cast<Worker*>(data);
243
  worker->Exit(1, "ERR_WORKER_OUT_OF_MEMORY", "JS heap out of memory");
244
  // Give the current GC some extra leeway to let it finish rather than
245
  // crash hard. We are not going to perform further allocations anyway.
246
  constexpr size_t kExtraHeapAllowance = 16 * 1024 * 1024;
247
  return current_heap_limit + kExtraHeapAllowance;
248
}
249
250
void Worker::Run() {
251
  std::string name = "WorkerThread ";
252
  name += std::to_string(thread_id_.id);
253
  TRACE_EVENT_METADATA1(
254
      "__metadata", "thread_name", "name",
255
      TRACE_STR_COPY(name.c_str()));
256
  CHECK_NOT_NULL(platform_);
257
258
  Debug(this, "Creating isolate for worker with id %llu", thread_id_.id);
259
260
  WorkerThreadData data(this);
261
  if (isolate_ == nullptr) return;
262
  CHECK(data.loop_is_usable());
263
264
  Debug(this, "Starting worker with id %llu", thread_id_.id);
265
  {
266
    Locker locker(isolate_);
267
    Isolate::Scope isolate_scope(isolate_);
268
    SealHandleScope outer_seal(isolate_);
269
270
    DeleteFnPtr<Environment, FreeEnvironment> env_;
271
    auto cleanup_env = OnScopeLeave([&]() {
272
      // TODO(addaleax): This call is harmless but should not be necessary.
273
      // Figure out why V8 is raising a DCHECK() here without it
274
      // (in test/parallel/test-async-hooks-worker-asyncfn-terminate-4.js).
275
      isolate_->CancelTerminateExecution();
276
277
      if (!env_) return;
278
      env_->set_can_call_into_js(false);
279
280
      {
281
        Mutex::ScopedLock lock(mutex_);
282
        stopped_ = true;
283
        this->env_ = nullptr;
284
      }
285
286
      env_.reset();
287
    });
288
289
    if (is_stopped()) return;
290
    {
291
      HandleScope handle_scope(isolate_);
292
      Local<Context> context;
293
      {
294
        // We create the Context object before we have an Environment* in place
295
        // that we could use for error handling. If creation fails due to
296
        // resource constraints, we need something in place to handle it,
297
        // though.
298
        TryCatch try_catch(isolate_);
299
        context = NewContext(isolate_);
300
        if (context.IsEmpty()) {
301
          // TODO(addaleax): This should be ERR_WORKER_INIT_FAILED,
302
          // ERR_WORKER_OUT_OF_MEMORY is for reaching the per-Worker heap limit.
303
          Exit(1, "ERR_WORKER_OUT_OF_MEMORY", "Failed to create new Context");
304
          return;
305
        }
306
      }
307
308
      if (is_stopped()) return;
309
      CHECK(!context.IsEmpty());
310
      Context::Scope context_scope(context);
311
      {
312
        env_.reset(CreateEnvironment(
313
            data.isolate_data_.get(),
314
            context,
315
            std::move(argv_),
316
            std::move(exec_argv_),
317
            static_cast<EnvironmentFlags::Flags>(environment_flags_),
318
            thread_id_,
319
            std::move(inspector_parent_handle_)));
320
        if (is_stopped()) return;
321
        CHECK_NOT_NULL(env_);
322
        env_->set_env_vars(std::move(env_vars_));
323
        SetProcessExitHandler(env_.get(), [this](Environment*, int exit_code) {
324
          Exit(exit_code);
325
        });
326
      }
327
      {
328
        Mutex::ScopedLock lock(mutex_);
329
        if (stopped_) return;
330
        this->env_ = env_.get();
331
      }
332
      Debug(this, "Created Environment for worker with id %llu", thread_id_.id);
333
      if (is_stopped()) return;
334
      {
335
        CreateEnvMessagePort(env_.get());
336
        Debug(this, "Created message port for worker %llu", thread_id_.id);
337
        if (LoadEnvironment(env_.get(), StartExecutionCallback{}).IsEmpty())
338
          return;
339
340
        Debug(this, "Loaded environment for worker %llu", thread_id_.id);
341
      }
342
    }
343
344
    {
345
      Maybe<int> exit_code = SpinEventLoop(env_.get());
346
      Mutex::ScopedLock lock(mutex_);
347
      if (exit_code_ == 0 && exit_code.IsJust()) {
348
        exit_code_ = exit_code.FromJust();
349
      }
350
351
      Debug(this, "Exiting thread for worker %llu with exit code %d",
352
            thread_id_.id, exit_code_);
353
    }
354
  }
355
356
  Debug(this, "Worker %llu thread stops", thread_id_.id);
357
}
358
359
void Worker::CreateEnvMessagePort(Environment* env) {
360
  HandleScope handle_scope(isolate_);
361
  Mutex::ScopedLock lock(mutex_);
362
  // Set up the message channel for receiving messages in the child.
363
  MessagePort* child_port = MessagePort::New(env,
364
                                             env->context(),
365
                                             std::move(child_port_data_));
366
  // MessagePort::New() may return nullptr if execution is terminated
367
  // within it.
368
  if (child_port != nullptr)
369
    env->set_message_port(child_port->object(isolate_));
370
}
371
372
void Worker::JoinThread() {
373
  if (thread_joined_)
374
    return;
375
  CHECK_EQ(uv_thread_join(&tid_), 0);
376
  thread_joined_ = true;
377
378
  env()->remove_sub_worker_context(this);
379
380
  {
381
    HandleScope handle_scope(env()->isolate());
382
    Context::Scope context_scope(env()->context());
383
384
    // Reset the parent port as we're closing it now anyway.
385
    object()->Set(env()->context(),
386
                  env()->message_port_string(),
387
                  Undefined(env()->isolate())).Check();
388
389
    Local<Value> args[] = {
390
        Integer::New(env()->isolate(), exit_code_),
391
        custom_error_ != nullptr
392
            ? OneByteString(env()->isolate(), custom_error_).As<Value>()
393
            : Null(env()->isolate()).As<Value>(),
394
        !custom_error_str_.empty()
395
            ? OneByteString(env()->isolate(), custom_error_str_.c_str())
396
                  .As<Value>()
397
            : Null(env()->isolate()).As<Value>(),
398
    };
399
400
    MakeCallback(env()->onexit_string(), arraysize(args), args);
401
  }
402
403
  // If we get here, the !thread_joined_ condition at the top of the function
404
  // implies that the thread was running. In that case, its final action will
405
  // be to schedule a callback on the parent thread which will delete this
406
  // object, so there's nothing more to do here.
407
}
408
409
Worker::~Worker() {
410
  Mutex::ScopedLock lock(mutex_);
411
412
  CHECK(stopped_);
413
  CHECK_NULL(env_);
414
  CHECK(thread_joined_);
415
416
  Debug(this, "Worker %llu destroyed", thread_id_.id);
417
}
418
419
void Worker::New(const FunctionCallbackInfo<Value>& args) {
420
  Environment* env = Environment::GetCurrent(args);
421
  Isolate* isolate = args.GetIsolate();
422
423
  CHECK(args.IsConstructCall());
424
425
  if (env->isolate_data()->platform() == nullptr) {
426
    THROW_ERR_MISSING_PLATFORM_FOR_WORKER(env);
427
    return;
428
  }
429
430
  std::string url;
431
  std::shared_ptr<PerIsolateOptions> per_isolate_opts = nullptr;
432
  std::shared_ptr<KVStore> env_vars = nullptr;
433
434
  std::vector<std::string> exec_argv_out;
435
436
  // Argument might be a string or URL
437
  if (!args[0]->IsNullOrUndefined()) {
438
    Utf8Value value(
439
        isolate, args[0]->ToString(env->context()).FromMaybe(Local<String>()));
440
    url.append(value.out(), value.length());
441
  }
442
443
  if (args[1]->IsNull()) {
444
    // Means worker.env = { ...process.env }.
445
    env_vars = env->env_vars()->Clone(isolate);
446
  } else if (args[1]->IsObject()) {
447
    // User provided env.
448
    env_vars = KVStore::CreateMapKVStore();
449
    env_vars->AssignFromObject(isolate->GetCurrentContext(),
450
                               args[1].As<Object>());
451
  } else {
452
    // Env is shared.
453
    env_vars = env->env_vars();
454
  }
455
456
  if (args[1]->IsObject() || args[2]->IsArray()) {
457
    per_isolate_opts.reset(new PerIsolateOptions());
458
459
    HandleEnvOptions(per_isolate_opts->per_env, [&env_vars](const char* name) {
460
      return env_vars->Get(name).FromMaybe("");
461
    });
462
463
#ifndef NODE_WITHOUT_NODE_OPTIONS
464
    MaybeLocal<String> maybe_node_opts =
465
        env_vars->Get(isolate, OneByteString(isolate, "NODE_OPTIONS"));
466
    Local<String> node_opts;
467
    if (maybe_node_opts.ToLocal(&node_opts)) {
468
      std::string node_options(*String::Utf8Value(isolate, node_opts));
469
      std::vector<std::string> errors{};
470
      std::vector<std::string> env_argv =
471
          ParseNodeOptionsEnvVar(node_options, &errors);
472
      // [0] is expected to be the program name, add dummy string.
473
      env_argv.insert(env_argv.begin(), "");
474
      std::vector<std::string> invalid_args{};
475
      options_parser::Parse(&env_argv,
476
                            nullptr,
477
                            &invalid_args,
478
                            per_isolate_opts.get(),
479
                            kAllowedInEnvironment,
480
                            &errors);
481
      if (!errors.empty() && args[1]->IsObject()) {
482
        // Only fail for explicitly provided env, this protects from failures
483
        // when NODE_OPTIONS from parent's env is used (which is the default).
484
        Local<Value> error;
485
        if (!ToV8Value(env->context(), errors).ToLocal(&error)) return;
486
        Local<String> key =
487
            FIXED_ONE_BYTE_STRING(env->isolate(), "invalidNodeOptions");
488
        // Ignore the return value of Set() because exceptions bubble up to JS
489
        // when we return anyway.
490
        USE(args.This()->Set(env->context(), key, error));
491
        return;
492
      }
493
    }
494
#endif
495
  }
496
497
  if (args[2]->IsArray()) {
498
    Local<Array> array = args[2].As<Array>();
499
    // The first argument is reserved for program name, but we don't need it
500
    // in workers.
501
    std::vector<std::string> exec_argv = {""};
502
    uint32_t length = array->Length();
503
    for (uint32_t i = 0; i < length; i++) {
504
      Local<Value> arg;
505
      if (!array->Get(env->context(), i).ToLocal(&arg)) {
506
        return;
507
      }
508
      Local<String> arg_v8;
509
      if (!arg->ToString(env->context()).ToLocal(&arg_v8)) {
510
        return;
511
      }
512
      Utf8Value arg_utf8_value(args.GetIsolate(), arg_v8);
513
      std::string arg_string(arg_utf8_value.out(), arg_utf8_value.length());
514
      exec_argv.push_back(arg_string);
515
    }
516
517
    std::vector<std::string> invalid_args{};
518
    std::vector<std::string> errors{};
519
    // Using invalid_args as the v8_args argument as it stores unknown
520
    // options for the per isolate parser.
521
    options_parser::Parse(
522
        &exec_argv,
523
        &exec_argv_out,
524
        &invalid_args,
525
        per_isolate_opts.get(),
526
        kDisallowedInEnvironment,
527
        &errors);
528
529
    // The first argument is program name.
530
    invalid_args.erase(invalid_args.begin());
531
    if (errors.size() > 0 || invalid_args.size() > 0) {
532
      Local<Value> error;
533
      if (!ToV8Value(env->context(),
534
                     errors.size() > 0 ? errors : invalid_args)
535
                         .ToLocal(&error)) {
536
        return;
537
      }
538
      Local<String> key =
539
          FIXED_ONE_BYTE_STRING(env->isolate(), "invalidExecArgv");
540
      // Ignore the return value of Set() because exceptions bubble up to JS
541
      // when we return anyway.
542
      USE(args.This()->Set(env->context(), key, error));
543
      return;
544
    }
545
  } else {
546
    exec_argv_out = env->exec_argv();
547
  }
548
549
  Worker* worker = new Worker(env,
550
                              args.This(),
551
                              url,
552
                              per_isolate_opts,
553
                              std::move(exec_argv_out),
554
                              env_vars);
555
556
  CHECK(args[3]->IsFloat64Array());
557
  Local<Float64Array> limit_info = args[3].As<Float64Array>();
558
  CHECK_EQ(limit_info->Length(), kTotalResourceLimitCount);
559
  limit_info->CopyContents(worker->resource_limits_,
560
                           sizeof(worker->resource_limits_));
561
562
  CHECK(args[4]->IsBoolean());
563
  if (args[4]->IsTrue() || env->tracks_unmanaged_fds())
564
    worker->environment_flags_ |= EnvironmentFlags::kTrackUnmanagedFds;
565
}
566
567
void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
568
  Worker* w;
569
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
570
  Mutex::ScopedLock lock(w->mutex_);
571
572
  w->stopped_ = false;
573
574
  if (w->resource_limits_[kStackSizeMb] > 0) {
575
    if (w->resource_limits_[kStackSizeMb] * kMB < kStackBufferSize) {
576
      w->resource_limits_[kStackSizeMb] = kStackBufferSize / kMB;
577
      w->stack_size_ = kStackBufferSize;
578
    } else {
579
      w->stack_size_ =
580
          static_cast<size_t>(w->resource_limits_[kStackSizeMb] * kMB);
581
    }
582
  } else {
583
    w->resource_limits_[kStackSizeMb] = w->stack_size_ / kMB;
584
  }
585
586
  uv_thread_options_t thread_options;
587
  thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
588
  thread_options.stack_size = w->stack_size_;
589
  int ret = uv_thread_create_ex(&w->tid_, &thread_options, [](void* arg) {
590
    // XXX: This could become a std::unique_ptr, but that makes at least
591
    // gcc 6.3 detect undefined behaviour when there shouldn't be any.
592
    // gcc 7+ handles this well.
593
    Worker* w = static_cast<Worker*>(arg);
594
    const uintptr_t stack_top = reinterpret_cast<uintptr_t>(&arg);
595
596
    // Leave a few kilobytes just to make sure we're within limits and have
597
    // some space to do work in C++ land.
598
    w->stack_base_ = stack_top - (w->stack_size_ - kStackBufferSize);
599
600
    w->Run();
601
602
    Mutex::ScopedLock lock(w->mutex_);
603
    w->env()->SetImmediateThreadsafe(
604
        [w = std::unique_ptr<Worker>(w)](Environment* env) {
605
          if (w->has_ref_)
606
            env->add_refs(-1);
607
          w->JoinThread();
608
          // implicitly delete w
609
        });
610
  }, static_cast<void*>(w));
611
612
  if (ret == 0) {
613
    // The object now owns the created thread and should not be garbage
614
    // collected until that finishes.
615
    w->ClearWeak();
616
    w->thread_joined_ = false;
617
618
    if (w->has_ref_)
619
      w->env()->add_refs(1);
620
621
    w->env()->add_sub_worker_context(w);
622
  } else {
623
    w->stopped_ = true;
624
625
    char err_buf[128];
626
    uv_err_name_r(ret, err_buf, sizeof(err_buf));
627
    {
628
      Isolate* isolate = w->env()->isolate();
629
      HandleScope handle_scope(isolate);
630
      THROW_ERR_WORKER_INIT_FAILED(isolate, err_buf);
631
    }
632
  }
633
}
634
635
void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
636
  Worker* w;
637
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
638
639
  Debug(w, "Worker %llu is getting stopped by parent", w->thread_id_.id);
640
  w->Exit(1);
641
}
642
643
void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
644
  Worker* w;
645
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
646
  if (!w->has_ref_ && !w->thread_joined_) {
647
    w->has_ref_ = true;
648
    w->env()->add_refs(1);
649
  }
650
}
651
652
void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
653
  Worker* w;
654
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
655
  if (w->has_ref_ && !w->thread_joined_) {
656
    w->has_ref_ = false;
657
    w->env()->add_refs(-1);
658
  }
659
}
660
661
void Worker::GetResourceLimits(const FunctionCallbackInfo<Value>& args) {
662
  Worker* w;
663
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
664
  args.GetReturnValue().Set(w->GetResourceLimits(args.GetIsolate()));
665
}
666
667
Local<Float64Array> Worker::GetResourceLimits(Isolate* isolate) const {
668
  Local<ArrayBuffer> ab = ArrayBuffer::New(isolate, sizeof(resource_limits_));
669
670
  memcpy(ab->GetBackingStore()->Data(),
671
         resource_limits_,
672
         sizeof(resource_limits_));
673
  return Float64Array::New(ab, 0, kTotalResourceLimitCount);
674
}
675
676
void Worker::Exit(int code, const char* error_code, const char* error_message) {
677
  Mutex::ScopedLock lock(mutex_);
678
  Debug(this, "Worker %llu called Exit(%d, %s, %s)",
679
        thread_id_.id, code, error_code, error_message);
680
681
  if (error_code != nullptr) {
682
    custom_error_ = error_code;
683
    custom_error_str_ = error_message;
684
  }
685
686
  if (env_ != nullptr) {
687
    exit_code_ = code;
688
    Stop(env_);
689
  } else {
690
    stopped_ = true;
691
  }
692
}
693
694
void Worker::MemoryInfo(MemoryTracker* tracker) const {
695
  tracker->TrackField("parent_port", parent_port_);
696
}
697
698
bool Worker::IsNotIndicativeOfMemoryLeakAtExit() const {
699
  // Worker objects always stay alive as long as the child thread, regardless
700
  // of whether they are being referenced in the parent thread.
701
  return true;
702
}
703
704
class WorkerHeapSnapshotTaker : public AsyncWrap {
705
 public:
706
  WorkerHeapSnapshotTaker(Environment* env, Local<Object> obj)
707
    : AsyncWrap(env, obj, AsyncWrap::PROVIDER_WORKERHEAPSNAPSHOT) {}
708
709
  SET_NO_MEMORY_INFO()
710
  SET_MEMORY_INFO_NAME(WorkerHeapSnapshotTaker)
711
  SET_SELF_SIZE(WorkerHeapSnapshotTaker)
712
};
713
714
void Worker::TakeHeapSnapshot(const FunctionCallbackInfo<Value>& args) {
715
  Worker* w;
716
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
717
718
  Debug(w, "Worker %llu taking heap snapshot", w->thread_id_.id);
719
720
  Environment* env = w->env();
721
  AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
722
  Local<Object> wrap;
723
  if (!env->worker_heap_snapshot_taker_template()
724
      ->NewInstance(env->context()).ToLocal(&wrap)) {
725
    return;
726
  }
727
  BaseObjectPtr<WorkerHeapSnapshotTaker> taker =
728
      MakeDetachedBaseObject<WorkerHeapSnapshotTaker>(env, wrap);
729
730
  // Interrupt the worker thread and take a snapshot, then schedule a call
731
  // on the parent thread that turns that snapshot into a readable stream.
732
  bool scheduled = w->RequestInterrupt([taker, env](Environment* worker_env) {
733
    heap::HeapSnapshotPointer snapshot {
734
        worker_env->isolate()->GetHeapProfiler()->TakeHeapSnapshot() };
735
    CHECK(snapshot);
736
    env->SetImmediateThreadsafe(
737
        [taker, snapshot = std::move(snapshot)](Environment* env) mutable {
738
          HandleScope handle_scope(env->isolate());
739
          Context::Scope context_scope(env->context());
740
741
          AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker.get());
742
          BaseObjectPtr<AsyncWrap> stream = heap::CreateHeapSnapshotStream(
743
              env, std::move(snapshot));
744
          Local<Value> args[] = { stream->object() };
745
          taker->MakeCallback(env->ondone_string(), arraysize(args), args);
746
        }, CallbackFlags::kUnrefed);
747
  });
748
  args.GetReturnValue().Set(scheduled ? taker->object() : Local<Object>());
749
}
750
751
void Worker::LoopIdleTime(const FunctionCallbackInfo<Value>& args) {
752
  Worker* w;
753
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
754
755
  Mutex::ScopedLock lock(w->mutex_);
756
  // Using w->is_stopped() here leads to a deadlock, and checking is_stopped()
757
  // before locking the mutex is a race condition. So manually do the same
758
  // check.
759
  if (w->stopped_ || w->env_ == nullptr)
760
    return args.GetReturnValue().Set(-1);
761
762
  uint64_t idle_time = uv_metrics_idle_time(w->env_->event_loop());
763
  args.GetReturnValue().Set(1.0 * idle_time / 1e6);
764
}
765
766
void Worker::LoopStartTime(const FunctionCallbackInfo<Value>& args) {
767
  Worker* w;
768
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
769
770
  Mutex::ScopedLock lock(w->mutex_);
771
  // Using w->is_stopped() here leads to a deadlock, and checking is_stopped()
772
  // before locking the mutex is a race condition. So manually do the same
773
  // check.
774
  if (w->stopped_ || w->env_ == nullptr)
775
    return args.GetReturnValue().Set(-1);
776
777
  double loop_start_time = w->env_->performance_state()->milestones[
778
      node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START];
779
  CHECK_GE(loop_start_time, 0);
780
  args.GetReturnValue().Set(
781
      (loop_start_time - node::performance::timeOrigin) / 1e6);
782
}
783
784
namespace {
785
786
// Return the MessagePort that is global for this Environment and communicates
787
// with the internal [kPort] port of the JS Worker class in the parent thread.
788
void GetEnvMessagePort(const FunctionCallbackInfo<Value>& args) {
789
  Environment* env = Environment::GetCurrent(args);
790
  Local<Object> port = env->message_port();
791
  CHECK_IMPLIES(!env->is_main_thread(), !port.IsEmpty());
792
  if (!port.IsEmpty()) {
793
    CHECK_EQ(port->CreationContext()->GetIsolate(), args.GetIsolate());
794
    args.GetReturnValue().Set(port);
795
  }
796
}
797
798
52
void InitWorker(Local<Object> target,
799
                Local<Value> unused,
800
                Local<Context> context,
801
                void* priv) {
802
52
  Environment* env = Environment::GetCurrent(context);
803
804
  {
805
52
    Local<FunctionTemplate> w = env->NewFunctionTemplate(Worker::New);
806
807
156
    w->InstanceTemplate()->SetInternalFieldCount(
808
52
        Worker::kInternalFieldCount);
809
104
    w->Inherit(AsyncWrap::GetConstructorTemplate(env));
810
811
52
    env->SetProtoMethod(w, "startThread", Worker::StartThread);
812
52
    env->SetProtoMethod(w, "stopThread", Worker::StopThread);
813
52
    env->SetProtoMethod(w, "ref", Worker::Ref);
814
52
    env->SetProtoMethod(w, "unref", Worker::Unref);
815
52
    env->SetProtoMethod(w, "getResourceLimits", Worker::GetResourceLimits);
816
52
    env->SetProtoMethod(w, "takeHeapSnapshot", Worker::TakeHeapSnapshot);
817
52
    env->SetProtoMethod(w, "loopIdleTime", Worker::LoopIdleTime);
818
52
    env->SetProtoMethod(w, "loopStartTime", Worker::LoopStartTime);
819
820
52
    env->SetConstructorFunction(target, "Worker", w);
821
  }
822
823
  {
824
52
    Local<FunctionTemplate> wst = FunctionTemplate::New(env->isolate());
825
826
156
    wst->InstanceTemplate()->SetInternalFieldCount(
827
52
        WorkerHeapSnapshotTaker::kInternalFieldCount);
828
104
    wst->Inherit(AsyncWrap::GetConstructorTemplate(env));
829
830
    Local<String> wst_string =
831
52
        FIXED_ONE_BYTE_STRING(env->isolate(), "WorkerHeapSnapshotTaker");
832
52
    wst->SetClassName(wst_string);
833
52
    env->set_worker_heap_snapshot_taker_template(wst->InstanceTemplate());
834
  }
835
836
52
  env->SetMethod(target, "getEnvMessagePort", GetEnvMessagePort);
837
838
  target
839
104
      ->Set(env->context(),
840
            env->thread_id_string(),
841
260
            Number::New(env->isolate(), static_cast<double>(env->thread_id())))
842
      .Check();
843
844
  target
845
104
      ->Set(env->context(),
846
            FIXED_ONE_BYTE_STRING(env->isolate(), "isMainThread"),
847
260
            Boolean::New(env->isolate(), env->is_main_thread()))
848
      .Check();
849
850
  target
851
104
      ->Set(env->context(),
852
            FIXED_ONE_BYTE_STRING(env->isolate(), "ownsProcessState"),
853
260
            Boolean::New(env->isolate(), env->owns_process_state()))
854
      .Check();
855
856
52
  if (!env->is_main_thread()) {
857
    target
858
        ->Set(env->context(),
859
              FIXED_ONE_BYTE_STRING(env->isolate(), "resourceLimits"),
860
              env->worker_context()->GetResourceLimits(env->isolate()))
861
        .Check();
862
  }
863
864
104
  NODE_DEFINE_CONSTANT(target, kMaxYoungGenerationSizeMb);
865
52
  NODE_DEFINE_CONSTANT(target, kMaxOldGenerationSizeMb);
866
156
  NODE_DEFINE_CONSTANT(target, kCodeRangeSizeMb);
867
260
  NODE_DEFINE_CONSTANT(target, kStackSizeMb);
868
416
  NODE_DEFINE_CONSTANT(target, kTotalResourceLimitCount);
869
468
}
870
364
871
263
void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
872
211
  registry->Register(GetEnvMessagePort);
873
107
  registry->Register(Worker::New);
874
107
  registry->Register(Worker::StartThread);
875
107
  registry->Register(Worker::StopThread);
876
107
  registry->Register(Worker::Ref);
877
107
  registry->Register(Worker::Unref);
878
107
  registry->Register(Worker::GetResourceLimits);
879
107
  registry->Register(Worker::TakeHeapSnapshot);
880
107
  registry->Register(Worker::LoopIdleTime);
881
107
  registry->Register(Worker::LoopStartTime);
882
107
}
883
884
}  // anonymous namespace
885
}  // namespace worker
886
}  // namespace node
887
888
115
NODE_MODULE_CONTEXT_AWARE_INTERNAL(worker, node::worker::InitWorker)
889

473
NODE_MODULE_EXTERNAL_REFERENCE(worker, node::worker::RegisterExternalReferences)