GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/node_worker.cc Lines: 370 425 87.1 %
Date: 2020-02-19 22:14:06 Branches: 167 240 69.6 %

Line Branch Exec Source
1
#include "node_worker.h"
2
#include "debug_utils-inl.h"
3
#include "memory_tracker-inl.h"
4
#include "node_errors.h"
5
#include "node_buffer.h"
6
#include "node_options-inl.h"
7
#include "node_perf.h"
8
#include "util-inl.h"
9
#include "async_wrap-inl.h"
10
11
#if HAVE_INSPECTOR
12
#include "inspector/worker_inspector.h"  // ParentInspectorHandle
13
#endif
14
15
#include <memory>
16
#include <string>
17
#include <vector>
18
19
using node::kAllowedInEnvironment;
20
using node::kDisallowedInEnvironment;
21
using v8::Array;
22
using v8::ArrayBuffer;
23
using v8::BackingStore;
24
using v8::Boolean;
25
using v8::Context;
26
using v8::Float64Array;
27
using v8::FunctionCallbackInfo;
28
using v8::FunctionTemplate;
29
using v8::HandleScope;
30
using v8::Integer;
31
using v8::Isolate;
32
using v8::Local;
33
using v8::Locker;
34
using v8::MaybeLocal;
35
using v8::Null;
36
using v8::Number;
37
using v8::Object;
38
using v8::ResourceConstraints;
39
using v8::SealHandleScope;
40
using v8::String;
41
using v8::TryCatch;
42
using v8::Value;
43
44
namespace node {
45
namespace worker {
46
47
230
Worker::Worker(Environment* env,
48
               Local<Object> wrap,
49
               const std::string& url,
50
               std::shared_ptr<PerIsolateOptions> per_isolate_opts,
51
               std::vector<std::string>&& exec_argv,
52
230
               std::shared_ptr<KVStore> env_vars)
53
    : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER),
54
      per_isolate_opts_(per_isolate_opts),
55
      exec_argv_(exec_argv),
56
230
      platform_(env->isolate_data()->platform()),
57
230
      start_profiler_idle_notifier_(env->profiler_idle_notifier_started()),
58
230
      thread_id_(Environment::AllocateThreadId()),
59
920
      env_vars_(env_vars) {
60
230
  Debug(this, "Creating new worker instance with thread id %llu", thread_id_);
61
62
  // Set up everything that needs to be set up in the parent environment.
63
230
  parent_port_ = MessagePort::New(env, env->context());
64
230
  if (parent_port_ == nullptr) {
65
    // This can happen e.g. because execution is terminating.
66
    return;
67
  }
68
69
230
  child_port_data_ = std::make_unique<MessagePortData>(nullptr);
70
230
  MessagePort::Entangle(parent_port_, child_port_data_.get());
71
72
690
  object()->Set(env->context(),
73
                env->message_port_string(),
74
1150
                parent_port_->object()).Check();
75
76
690
  object()->Set(env->context(),
77
                env->thread_id_string(),
78
1150
                Number::New(env->isolate(), static_cast<double>(thread_id_)))
79
      .Check();
80
81
#if HAVE_INSPECTOR
82
  inspector_parent_handle_ =
83
230
      env->inspector_agent()->GetParentHandle(thread_id_, url);
84
#endif
85
86
230
  argv_ = std::vector<std::string>{env->argv()[0]};
87
  // Mark this Worker object as weak until we actually start the thread.
88
230
  MakeWeak();
89
90
230
  Debug(this, "Preparation for worker %llu finished", thread_id_);
91
}
92
93
1829
bool Worker::is_stopped() const {
94
3658
  Mutex::ScopedLock lock(mutex_);
95
1829
  if (env_ != nullptr)
96
1372
    return env_->is_stopping();
97
457
  return stopped_;
98
}
99
100
229
void Worker::UpdateResourceConstraints(ResourceConstraints* constraints) {
101
229
  constraints->set_stack_limit(reinterpret_cast<uint32_t*>(stack_base_));
102
103
229
  constexpr double kMB = 1024 * 1024;
104
105
229
  if (resource_limits_[kMaxYoungGenerationSizeMb] > 0) {
106
1
    constraints->set_max_young_generation_size_in_bytes(
107
2
        resource_limits_[kMaxYoungGenerationSizeMb] * kMB);
108
  } else {
109
228
    resource_limits_[kMaxYoungGenerationSizeMb] =
110
228
        constraints->max_young_generation_size_in_bytes() / kMB;
111
  }
112
113
229
  if (resource_limits_[kMaxOldGenerationSizeMb] > 0) {
114
1
    constraints->set_max_old_generation_size_in_bytes(
115
2
        resource_limits_[kMaxOldGenerationSizeMb] * kMB);
116
  } else {
117
228
    resource_limits_[kMaxOldGenerationSizeMb] =
118
228
        constraints->max_old_generation_size_in_bytes() / kMB;
119
  }
120
121
229
  if (resource_limits_[kCodeRangeSizeMb] > 0) {
122
1
    constraints->set_code_range_size_in_bytes(
123
2
        resource_limits_[kCodeRangeSizeMb] * kMB);
124
  } else {
125
228
    resource_limits_[kCodeRangeSizeMb] =
126
228
        constraints->code_range_size_in_bytes() / kMB;
127
  }
128
229
}
129
130
// This class contains data that is only relevant to the child thread itself,
131
// and only while it is running.
132
// (Eventually, the Environment instance should probably also be moved here.)
133
class WorkerThreadData {
134
 public:
135
229
  explicit WorkerThreadData(Worker* w)
136
229
    : w_(w) {
137
229
    int ret = uv_loop_init(&loop_);
138
229
    if (ret != 0) {
139
      char err_buf[128];
140
      uv_err_name_r(ret, err_buf, sizeof(err_buf));
141
      w->custom_error_ = "ERR_WORKER_INIT_FAILED";
142
      w->custom_error_str_ = err_buf;
143
      w->loop_init_failed_ = true;
144
      w->stopped_ = true;
145
      return;
146
    }
147
148
    std::shared_ptr<ArrayBufferAllocator> allocator =
149
458
        ArrayBufferAllocator::Create();
150
458
    Isolate::CreateParams params;
151
229
    SetIsolateCreateParamsForNode(&params);
152
229
    params.array_buffer_allocator_shared = allocator;
153
154
229
    w->UpdateResourceConstraints(&params.constraints);
155
156
229
    Isolate* isolate = Isolate::Allocate();
157
229
    if (isolate == nullptr) {
158
      w->custom_error_ = "ERR_WORKER_OUT_OF_MEMORY";
159
      w->custom_error_str_ = "Failed to create new Isolate";
160
      w->stopped_ = true;
161
      return;
162
    }
163
164
229
    w->platform_->RegisterIsolate(isolate, &loop_);
165
229
    Isolate::Initialize(isolate, params);
166
229
    SetIsolateUpForNode(isolate);
167
168
229
    isolate->AddNearHeapLimitCallback(Worker::NearHeapLimit, w);
169
170
    {
171
229
      Locker locker(isolate);
172
458
      Isolate::Scope isolate_scope(isolate);
173
      // V8 computes its stack limit the first time a `Locker` is used based on
174
      // --stack-size. Reset it to the correct value.
175
229
      isolate->SetStackLimit(w->stack_base_);
176
177
458
      HandleScope handle_scope(isolate);
178
458
      isolate_data_.reset(CreateIsolateData(isolate,
179
                                            &loop_,
180
229
                                            w_->platform_,
181
229
                                            allocator.get()));
182
229
      CHECK(isolate_data_);
183
229
      if (w_->per_isolate_opts_)
184
28
        isolate_data_->set_options(std::move(w_->per_isolate_opts_));
185
    }
186
187
457
    Mutex::ScopedLock lock(w_->mutex_);
188
228
    w_->isolate_ = isolate;
189
  }
190
191
457
  ~WorkerThreadData() {
192
228
    Debug(w_, "Worker %llu dispose isolate", w_->thread_id_);
193
    Isolate* isolate;
194
    {
195
458
      Mutex::ScopedLock lock(w_->mutex_);
196
229
      isolate = w_->isolate_;
197
229
      w_->isolate_ = nullptr;
198
    }
199
200
229
    if (isolate != nullptr) {
201
229
      bool platform_finished = false;
202
203
229
      isolate_data_.reset();
204
205
1374
      w_->platform_->AddIsolateFinishedCallback(isolate, [](void* data) {
206
229
        *static_cast<bool*>(data) = true;
207
1145
      }, &platform_finished);
208
209
      // The order of these calls is important; if the Isolate is first disposed
210
      // and then unregistered, there is a race condition window in which no
211
      // new Isolate at the same address can successfully be registered with
212
      // the platform.
213
      // (Refs: https://github.com/nodejs/node/issues/30846)
214
229
      w_->platform_->UnregisterIsolate(isolate);
215
229
      isolate->Dispose();
216
217
      // Wait until the platform has cleaned up all relevant resources.
218
687
      while (!platform_finished) {
219
229
        CHECK(!w_->loop_init_failed_);
220
229
        uv_run(&loop_, UV_RUN_ONCE);
221
      }
222
    }
223
229
    if (!w_->loop_init_failed_) {
224
229
      CheckedUvLoopClose(&loop_);
225
    }
226
229
  }
227
228
 private:
229
  Worker* const w_;
230
  uv_loop_t loop_;
231
  DeleteFnPtr<IsolateData, FreeIsolateData> isolate_data_;
232
233
  friend class Worker;
234
};
235
236
1
size_t Worker::NearHeapLimit(void* data, size_t current_heap_limit,
237
                             size_t initial_heap_limit) {
238
1
  Worker* worker = static_cast<Worker*>(data);
239
1
  worker->custom_error_ = "ERR_WORKER_OUT_OF_MEMORY";
240
1
  worker->custom_error_str_ = "JS heap out of memory";
241
1
  worker->Exit(1);
242
  // Give the current GC some extra leeway to let it finish rather than
243
  // crash hard. We are not going to perform further allocations anyway.
244
1
  constexpr size_t kExtraHeapAllowance = 16 * 1024 * 1024;
245
1
  return current_heap_limit + kExtraHeapAllowance;
246
}
247
248
229
void Worker::Run() {
249
457
  std::string name = "WorkerThread ";
250
229
  name += std::to_string(thread_id_);
251

460
  TRACE_EVENT_METADATA1(
252
      "__metadata", "thread_name", "name",
253
      TRACE_STR_COPY(name.c_str()));
254
229
  CHECK_NOT_NULL(platform_);
255
256
229
  Debug(this, "Creating isolate for worker with id %llu", thread_id_);
257
258
457
  WorkerThreadData data(this);
259
229
  if (isolate_ == nullptr) return;
260
229
  CHECK(!data.w_->loop_init_failed_);
261
262
229
  Debug(this, "Starting worker with id %llu", thread_id_);
263
  {
264
456
    Locker locker(isolate_);
265
456
    Isolate::Scope isolate_scope(isolate_);
266
457
    SealHandleScope outer_seal(isolate_);
267
268
456
    DeleteFnPtr<Environment, FreeEnvironment> env_;
269
228
    auto cleanup_env = OnScopeLeave([&]() {
270
1367
      if (!env_) return;
271
228
      env_->set_can_call_into_js(false);
272
1823
      Isolate::DisallowJavascriptExecutionScope disallow_js(isolate_,
273
456
          Isolate::DisallowJavascriptExecutionScope::THROW_ON_FAILURE);
274
275
      {
276
228
        Context::Scope context_scope(env_->context());
277
        {
278
455
          Mutex::ScopedLock lock(mutex_);
279
228
          stopped_ = true;
280
228
          this->env_ = nullptr;
281
        }
282
228
        env_->set_stopping(true);
283
228
        env_->stop_sub_worker_contexts();
284
228
        env_->RunCleanup();
285
227
        RunAtExit(env_.get());
286
287
        // This call needs to be made while the `Environment` is still alive
288
        // because we assume that it is available for async tracking in the
289
        // NodePlatform implementation.
290
912
        platform_->DrainTasks(isolate_);
291
      }
292
455
    });
293
294
229
    if (is_stopped()) return;
295
    {
296
456
      HandleScope handle_scope(isolate_);
297
      Local<Context> context;
298
      {
299
        // We create the Context object before we have an Environment* in place
300
        // that we could use for error handling. If creation fails due to
301
        // resource constraints, we need something in place to handle it,
302
        // though.
303
456
        TryCatch try_catch(isolate_);
304
228
        context = NewContext(isolate_);
305
228
        if (context.IsEmpty()) {
306
          custom_error_ = "ERR_WORKER_OUT_OF_MEMORY";
307
          custom_error_str_ = "Failed to create new Context";
308
          return;
309
        }
310
      }
311
312
228
      if (is_stopped()) return;
313
228
      CHECK(!context.IsEmpty());
314
228
      Context::Scope context_scope(context);
315
      {
316
        // TODO(addaleax): Use CreateEnvironment(), or generally another
317
        // public API.
318
912
        env_.reset(new Environment(data.isolate_data_.get(),
319
                                   context,
320
228
                                   std::move(argv_),
321
228
                                   std::move(exec_argv_),
322
                                   Environment::kNoFlags,
323
228
                                   thread_id_));
324
228
        CHECK_NOT_NULL(env_);
325
228
        env_->set_env_vars(std::move(env_vars_));
326
228
        env_->set_abort_on_uncaught_exception(false);
327
228
        env_->set_worker_context(this);
328
329
228
        env_->InitializeLibuv(start_profiler_idle_notifier_);
330
      }
331
      {
332
456
        Mutex::ScopedLock lock(mutex_);
333
228
        if (stopped_) return;
334
228
        this->env_ = env_.get();
335
      }
336
228
      Debug(this, "Created Environment for worker with id %llu", thread_id_);
337
228
      if (is_stopped()) return;
338
      {
339
228
        env_->InitializeDiagnostics();
340
#if HAVE_INSPECTOR
341
228
        env_->InitializeInspector(std::move(inspector_parent_handle_));
342
#endif
343
456
        HandleScope handle_scope(isolate_);
344
        InternalCallbackScope callback_scope(
345
            env_.get(),
346
            Object::New(isolate_),
347
            { 1, 0 },
348
456
            InternalCallbackScope::kSkipAsyncHooks);
349
350
456
        if (!env_->RunBootstrapping().IsEmpty()) {
351
228
          CreateEnvMessagePort(env_.get());
352
228
          if (is_stopped()) return;
353
228
          Debug(this, "Created message port for worker %llu", thread_id_);
354
228
          USE(StartExecution(env_.get(), "internal/main/worker_thread"));
355
        }
356
357
228
        Debug(this, "Loaded environment for worker %llu", thread_id_);
358
      }
359
360
228
      if (is_stopped()) return;
361
      {
362
456
        SealHandleScope seal(isolate_);
363
        bool more;
364
228
        env_->performance_state()->Mark(
365
228
            node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START);
366
171
        do {
367
229
          if (is_stopped()) break;
368
229
          uv_run(&data.loop_, UV_RUN_DEFAULT);
369
229
          if (is_stopped()) break;
370
371
171
          platform_->DrainTasks(isolate_);
372
373
170
          more = uv_loop_alive(&data.loop_);
374

171
          if (more && !is_stopped()) continue;
375
376
170
          EmitBeforeExit(env_.get());
377
378
          // Emit `beforeExit` if the loop became alive either after emitting
379
          // event, or after running some callbacks.
380
170
          more = uv_loop_alive(&data.loop_);
381

171
        } while (more == true && !is_stopped());
382
228
        env_->performance_state()->Mark(
383
228
            node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_EXIT);
384
      }
385
    }
386
387
    {
388
      int exit_code;
389
228
      bool stopped = is_stopped();
390
228
      if (!stopped)
391
170
        exit_code = EmitExit(env_.get());
392
455
      Mutex::ScopedLock lock(mutex_);
393

228
      if (exit_code_ == 0 && !stopped)
394
166
        exit_code_ = exit_code;
395
396
228
      Debug(this, "Exiting thread for worker %llu with exit code %d",
397
            thread_id_, exit_code_);
398
    }
399
  }
400
401
228
  Debug(this, "Worker %llu thread stops", thread_id_);
402
}
403
404
228
void Worker::CreateEnvMessagePort(Environment* env) {
405
456
  HandleScope handle_scope(isolate_);
406
456
  Mutex::ScopedLock lock(mutex_);
407
  // Set up the message channel for receiving messages in the child.
408
456
  MessagePort* child_port = MessagePort::New(env,
409
                                             env->context(),
410
456
                                             std::move(child_port_data_));
411
  // MessagePort::New() may return nullptr if execution is terminated
412
  // within it.
413
228
  if (child_port != nullptr)
414
228
    env->set_message_port(child_port->object(isolate_));
415
228
}
416
417
229
void Worker::JoinThread() {
418
229
  if (thread_joined_)
419
    return;
420
229
  CHECK_EQ(uv_thread_join(&tid_), 0);
421
229
  thread_joined_ = true;
422
423
229
  env()->remove_sub_worker_context(this);
424
425
  {
426
454
    HandleScope handle_scope(env()->isolate());
427
229
    Context::Scope context_scope(env()->context());
428
429
    // Reset the parent port as we're closing it now anyway.
430
687
    object()->Set(env()->context(),
431
                  env()->message_port_string(),
432
1145
                  Undefined(env()->isolate())).Check();
433
434
    Local<Value> args[] = {
435
        Integer::New(env()->isolate(), exit_code_),
436
229
        custom_error_ != nullptr
437
230
            ? OneByteString(env()->isolate(), custom_error_).As<Value>()
438
685
            : Null(env()->isolate()).As<Value>(),
439
229
        !custom_error_str_.empty()
440
230
            ? OneByteString(env()->isolate(), custom_error_str_.c_str())
441
                  .As<Value>()
442
685
            : Null(env()->isolate()).As<Value>(),
443

1832
    };
444
445
229
    MakeCallback(env()->onexit_string(), arraysize(args), args);
446
  }
447
448
  // We cleared all libuv handles bound to this Worker above,
449
  // the C++ object is no longer needed for anything now.
450
225
  MakeWeak();
451
}
452
453
645
Worker::~Worker() {
454
430
  Mutex::ScopedLock lock(mutex_);
455
456
215
  CHECK(stopped_);
457
215
  CHECK_NULL(env_);
458
215
  CHECK(thread_joined_);
459
460
215
  Debug(this, "Worker %llu destroyed", thread_id_);
461
430
}
462
463
233
void Worker::New(const FunctionCallbackInfo<Value>& args) {
464
233
  Environment* env = Environment::GetCurrent(args);
465
233
  Isolate* isolate = args.GetIsolate();
466
467
233
  CHECK(args.IsConstructCall());
468
469
233
  if (env->isolate_data()->platform() == nullptr) {
470
    THROW_ERR_MISSING_PLATFORM_FOR_WORKER(env);
471
    return;
472
  }
473
474
463
  std::string url;
475
463
  std::shared_ptr<PerIsolateOptions> per_isolate_opts = nullptr;
476
463
  std::shared_ptr<KVStore> env_vars = nullptr;
477
478
463
  std::vector<std::string> exec_argv_out;
479
480
233
  CHECK_EQ(args.Length(), 4);
481
  // Argument might be a string or URL
482
699
  if (!args[0]->IsNullOrUndefined()) {
483
    Utf8Value value(
484
465
        isolate, args[0]->ToString(env->context()).FromMaybe(Local<String>()));
485
93
    url.append(value.out(), value.length());
486
  }
487
488
699
  if (args[1]->IsNull()) {
489
    // Means worker.env = { ...process.env }.
490
210
    env_vars = env->env_vars()->Clone(isolate);
491
46
  } else if (args[1]->IsObject()) {
492
    // User provided env.
493
22
    env_vars = KVStore::CreateMapKVStore();
494
22
    env_vars->AssignFromObject(isolate->GetCurrentContext(),
495
66
                               args[1].As<Object>());
496
  } else {
497
    // Env is shared.
498
1
    env_vars = env->env_vars();
499
  }
500
501

888
  if (args[1]->IsObject() || args[2]->IsArray()) {
502
31
    per_isolate_opts.reset(new PerIsolateOptions());
503
504
93
    HandleEnvOptions(
505
405
        per_isolate_opts->per_env, [isolate, &env_vars](const char* name) {
506
          MaybeLocal<String> value =
507
372
              env_vars->Get(isolate, OneByteString(isolate, name));
508
124
          return value.IsEmpty() ? std::string{}
509
128
                                 : std::string(*String::Utf8Value(
510

252
                                       isolate, value.ToLocalChecked()));
511
31
        });
512
513
#ifndef NODE_WITHOUT_NODE_OPTIONS
514
    MaybeLocal<String> maybe_node_opts =
515
31
        env_vars->Get(isolate, OneByteString(isolate, "NODE_OPTIONS"));
516
31
    if (!maybe_node_opts.IsEmpty()) {
517
      std::string node_options(
518
90
          *String::Utf8Value(isolate, maybe_node_opts.ToLocalChecked()));
519
60
      std::vector<std::string> errors{};
520
      std::vector<std::string> env_argv =
521
60
          ParseNodeOptionsEnvVar(node_options, &errors);
522
      // [0] is expected to be the program name, add dummy string.
523
30
      env_argv.insert(env_argv.begin(), "");
524
60
      std::vector<std::string> invalid_args{};
525
30
      options_parser::Parse(&env_argv,
526
                            nullptr,
527
                            &invalid_args,
528
                            per_isolate_opts.get(),
529
                            kAllowedInEnvironment,
530
30
                            &errors);
531

30
      if (errors.size() > 0 && args[1]->IsObject()) {
532
        // Only fail for explicitly provided env, this protects from failures
533
        // when NODE_OPTIONS from parent's env is used (which is the default).
534
        Local<Value> error;
535
        if (!ToV8Value(env->context(), errors).ToLocal(&error)) return;
536
        Local<String> key =
537
            FIXED_ONE_BYTE_STRING(env->isolate(), "invalidNodeOptions");
538
        // Ignore the return value of Set() because exceptions bubble up to JS
539
        // when we return anyway.
540
        USE(args.This()->Set(env->context(), key, error));
541
        return;
542
      }
543
    }
544
#endif
545
  }
546
547
466
  if (args[2]->IsArray()) {
548
58
    Local<Array> array = args[2].As<Array>();
549
    // The first argument is reserved for program name, but we don't need it
550
    // in workers.
551
55
    std::vector<std::string> exec_argv = {""};
552
29
    uint32_t length = array->Length();
553
43
    for (uint32_t i = 0; i < length; i++) {
554
      Local<Value> arg;
555
42
      if (!array->Get(env->context(), i).ToLocal(&arg)) {
556
        return;
557
      }
558
      MaybeLocal<String> arg_v8_string =
559
28
          arg->ToString(env->context());
560
14
      if (arg_v8_string.IsEmpty()) {
561
        return;
562
      }
563
      Utf8Value arg_utf8_value(
564
          args.GetIsolate(),
565
28
          arg_v8_string.FromMaybe(Local<String>()));
566
28
      std::string arg_string(arg_utf8_value.out(), arg_utf8_value.length());
567
14
      exec_argv.push_back(arg_string);
568
    }
569
570
55
    std::vector<std::string> invalid_args{};
571
55
    std::vector<std::string> errors{};
572
    // Using invalid_args as the v8_args argument as it stores unknown
573
    // options for the per isolate parser.
574
29
    options_parser::Parse(
575
        &exec_argv,
576
        &exec_argv_out,
577
        &invalid_args,
578
        per_isolate_opts.get(),
579
        kDisallowedInEnvironment,
580
29
        &errors);
581
582
    // The first argument is program name.
583
29
    invalid_args.erase(invalid_args.begin());
584

29
    if (errors.size() > 0 || invalid_args.size() > 0) {
585
      Local<Value> error;
586
6
      if (!ToV8Value(env->context(),
587
3
                     errors.size() > 0 ? errors : invalid_args)
588
3
                         .ToLocal(&error)) {
589
        return;
590
      }
591
      Local<String> key =
592
3
          FIXED_ONE_BYTE_STRING(env->isolate(), "invalidExecArgv");
593
      // Ignore the return value of Set() because exceptions bubble up to JS
594
      // when we return anyway.
595
9
      USE(args.This()->Set(env->context(), key, error));
596
3
      return;
597
    }
598
  } else {
599
204
    exec_argv_out = env->exec_argv();
600
  }
601
602
  Worker* worker = new Worker(env,
603
460
                              args.This(),
604
                              url,
605
                              per_isolate_opts,
606
                              std::move(exec_argv_out),
607
230
                              env_vars);
608
609
460
  CHECK(args[3]->IsFloat64Array());
610
460
  Local<Float64Array> limit_info = args[3].As<Float64Array>();
611
230
  CHECK_EQ(limit_info->Length(), kTotalResourceLimitCount);
612
460
  limit_info->CopyContents(worker->resource_limits_,
613
230
                           sizeof(worker->resource_limits_));
614
}
615
616
229
void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
617
  Worker* w;
618
229
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
619
458
  Mutex::ScopedLock lock(w->mutex_);
620
621
  // The object now owns the created thread and should not be garbage collected
622
  // until that finishes.
623
229
  w->ClearWeak();
624
625
229
  w->env()->add_sub_worker_context(w);
626
229
  w->stopped_ = false;
627
229
  w->thread_joined_ = false;
628
629
229
  if (w->has_ref_)
630
229
    w->env()->add_refs(1);
631
632
  uv_thread_options_t thread_options;
633
229
  thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
634
229
  thread_options.stack_size = kStackSize;
635

1805
  CHECK_EQ(uv_thread_create_ex(&w->tid_, &thread_options, [](void* arg) {
636
    // XXX: This could become a std::unique_ptr, but that makes at least
637
    // gcc 6.3 detect undefined behaviour when there shouldn't be any.
638
    // gcc 7+ handles this well.
639
    Worker* w = static_cast<Worker*>(arg);
640
    const uintptr_t stack_top = reinterpret_cast<uintptr_t>(&arg);
641
642
    // Leave a few kilobytes just to make sure we're within limits and have
643
    // some space to do work in C++ land.
644
    w->stack_base_ = stack_top - (kStackSize - kStackBufferSize);
645
646
    w->Run();
647
648
    Mutex::ScopedLock lock(w->mutex_);
649
    w->env()->SetImmediateThreadsafe(
650
        [w = std::unique_ptr<Worker>(w)](Environment* env) {
651
          if (w->has_ref_)
652
            env->add_refs(-1);
653
          w->JoinThread();
654
          // implicitly delete w
655
        });
656
  }, static_cast<void*>(w)), 0);
657
}
658
659
20
void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
660
  Worker* w;
661
20
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
662
663
20
  Debug(w, "Worker %llu is getting stopped by parent", w->thread_id_);
664
20
  w->Exit(1);
665
}
666
667
22
void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
668
  Worker* w;
669
22
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
670
22
  if (!w->has_ref_) {
671
3
    w->has_ref_ = true;
672
3
    w->env()->add_refs(1);
673
  }
674
}
675
676
4
void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
677
  Worker* w;
678
4
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
679
4
  if (w->has_ref_) {
680
4
    w->has_ref_ = false;
681
4
    w->env()->add_refs(-1);
682
  }
683
}
684
685
1
void Worker::GetResourceLimits(const FunctionCallbackInfo<Value>& args) {
686
  Worker* w;
687
1
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
688
3
  args.GetReturnValue().Set(w->GetResourceLimits(args.GetIsolate()));
689
}
690
691
229
Local<Float64Array> Worker::GetResourceLimits(Isolate* isolate) const {
692
229
  Local<ArrayBuffer> ab = ArrayBuffer::New(isolate, sizeof(resource_limits_));
693
694
687
  memcpy(ab->GetBackingStore()->Data(),
695
         resource_limits_,
696
229
         sizeof(resource_limits_));
697
229
  return Float64Array::New(ab, 0, kTotalResourceLimitCount);
698
}
699
700
63
void Worker::Exit(int code) {
701
126
  Mutex::ScopedLock lock(mutex_);
702
63
  Debug(this, "Worker %llu called Exit(%d)", thread_id_, code);
703
63
  if (env_ != nullptr) {
704
61
    exit_code_ = code;
705
61
    Stop(env_);
706
  } else {
707
2
    stopped_ = true;
708
  }
709
63
}
710
711
void Worker::MemoryInfo(MemoryTracker* tracker) const {
712
  tracker->TrackField("parent_port", parent_port_);
713
}
714
715
class WorkerHeapSnapshotTaker : public AsyncWrap {
716
 public:
717
  WorkerHeapSnapshotTaker(Environment* env, Local<Object> obj)
718
    : AsyncWrap(env, obj, AsyncWrap::PROVIDER_WORKERHEAPSNAPSHOT) {}
719
720
  SET_NO_MEMORY_INFO()
721
  SET_MEMORY_INFO_NAME(WorkerHeapSnapshotTaker)
722
  SET_SELF_SIZE(WorkerHeapSnapshotTaker)
723
};
724
725
void Worker::TakeHeapSnapshot(const FunctionCallbackInfo<Value>& args) {
726
  Worker* w;
727
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
728
729
  Debug(w, "Worker %llu taking heap snapshot", w->thread_id_);
730
731
  Environment* env = w->env();
732
  AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
733
  Local<Object> wrap;
734
  if (!env->worker_heap_snapshot_taker_template()
735
      ->NewInstance(env->context()).ToLocal(&wrap)) {
736
    return;
737
  }
738
  BaseObjectPtr<WorkerHeapSnapshotTaker> taker =
739
      MakeDetachedBaseObject<WorkerHeapSnapshotTaker>(env, wrap);
740
741
  // Interrupt the worker thread and take a snapshot, then schedule a call
742
  // on the parent thread that turns that snapshot into a readable stream.
743
  bool scheduled = w->RequestInterrupt([taker, env](Environment* worker_env) {
744
    heap::HeapSnapshotPointer snapshot {
745
        worker_env->isolate()->GetHeapProfiler()->TakeHeapSnapshot() };
746
    CHECK(snapshot);
747
    env->SetImmediateThreadsafe(
748
        [taker, snapshot = std::move(snapshot)](Environment* env) mutable {
749
          HandleScope handle_scope(env->isolate());
750
          Context::Scope context_scope(env->context());
751
752
          AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker.get());
753
          BaseObjectPtr<AsyncWrap> stream = heap::CreateHeapSnapshotStream(
754
              env, std::move(snapshot));
755
          Local<Value> args[] = { stream->object() };
756
          taker->MakeCallback(env->ondone_string(), arraysize(args), args);
757
        });
758
  });
759
  args.GetReturnValue().Set(scheduled ? taker->object() : Local<Object>());
760
}
761
762
namespace {
763
764
// Return the MessagePort that is global for this Environment and communicates
765
// with the internal [kPort] port of the JS Worker class in the parent thread.
766
457
void GetEnvMessagePort(const FunctionCallbackInfo<Value>& args) {
767
457
  Environment* env = Environment::GetCurrent(args);
768
457
  Local<Object> port = env->message_port();
769
457
  if (!port.IsEmpty()) {
770
1371
    CHECK_EQ(port->CreationContext()->GetIsolate(), args.GetIsolate());
771
914
    args.GetReturnValue().Set(port);
772
  }
773
457
}
774
775
3671
void InitWorker(Local<Object> target,
776
                Local<Value> unused,
777
                Local<Context> context,
778
                void* priv) {
779
3671
  Environment* env = Environment::GetCurrent(context);
780
781
  {
782
3671
    Local<FunctionTemplate> w = env->NewFunctionTemplate(Worker::New);
783
784
7342
    w->InstanceTemplate()->SetInternalFieldCount(1);
785
7342
    w->Inherit(AsyncWrap::GetConstructorTemplate(env));
786
787
3671
    env->SetProtoMethod(w, "startThread", Worker::StartThread);
788
3671
    env->SetProtoMethod(w, "stopThread", Worker::StopThread);
789
3671
    env->SetProtoMethod(w, "ref", Worker::Ref);
790
3671
    env->SetProtoMethod(w, "unref", Worker::Unref);
791
3671
    env->SetProtoMethod(w, "getResourceLimits", Worker::GetResourceLimits);
792
3671
    env->SetProtoMethod(w, "takeHeapSnapshot", Worker::TakeHeapSnapshot);
793
794
    Local<String> workerString =
795
3671
        FIXED_ONE_BYTE_STRING(env->isolate(), "Worker");
796
3671
    w->SetClassName(workerString);
797
7342
    target->Set(env->context(),
798
                workerString,
799
18355
                w->GetFunction(env->context()).ToLocalChecked()).Check();
800
  }
801
802
  {
803
3671
    Local<FunctionTemplate> wst = FunctionTemplate::New(env->isolate());
804
805
7342
    wst->InstanceTemplate()->SetInternalFieldCount(1);
806
7342
    wst->Inherit(AsyncWrap::GetConstructorTemplate(env));
807
808
    Local<String> wst_string =
809
3671
        FIXED_ONE_BYTE_STRING(env->isolate(), "WorkerHeapSnapshotTaker");
810
3671
    wst->SetClassName(wst_string);
811
3671
    env->set_worker_heap_snapshot_taker_template(wst->InstanceTemplate());
812
  }
813
814
3671
  env->SetMethod(target, "getEnvMessagePort", GetEnvMessagePort);
815
816
  target
817
7342
      ->Set(env->context(),
818
            env->thread_id_string(),
819
18355
            Number::New(env->isolate(), static_cast<double>(env->thread_id())))
820
      .Check();
821
822
  target
823
7342
      ->Set(env->context(),
824
            FIXED_ONE_BYTE_STRING(env->isolate(), "isMainThread"),
825
18355
            Boolean::New(env->isolate(), env->is_main_thread()))
826
      .Check();
827
828
  target
829
7342
      ->Set(env->context(),
830
            FIXED_ONE_BYTE_STRING(env->isolate(), "ownsProcessState"),
831
18355
            Boolean::New(env->isolate(), env->owns_process_state()))
832
      .Check();
833
834
3671
  if (!env->is_main_thread()) {
835
    target
836
456
        ->Set(env->context(),
837
              FIXED_ONE_BYTE_STRING(env->isolate(), "resourceLimits"),
838
1140
              env->worker_context()->GetResourceLimits(env->isolate()))
839
        .Check();
840
  }
841
842
14684
  NODE_DEFINE_CONSTANT(target, kMaxYoungGenerationSizeMb);
843
14684
  NODE_DEFINE_CONSTANT(target, kMaxOldGenerationSizeMb);
844
14684
  NODE_DEFINE_CONSTANT(target, kCodeRangeSizeMb);
845
14684
  NODE_DEFINE_CONSTANT(target, kTotalResourceLimitCount);
846
3671
}
847
848
}  // anonymous namespace
849
850
}  // namespace worker
851
}  // namespace node
852
853
4185
NODE_MODULE_CONTEXT_AWARE_INTERNAL(worker, node::worker::InitWorker)