GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/node_worker.cc Lines: 281 286 98.3 %
Date: 2019-02-26 22:23:30 Branches: 119 162 73.5 %

Line Branch Exec Source
1
#include "node_worker.h"
2
#include "debug_utils.h"
3
#include "node_errors.h"
4
#include "node_buffer.h"
5
#include "node_perf.h"
6
#include "util.h"
7
#include "util-inl.h"
8
#include "async_wrap.h"
9
#include "async_wrap-inl.h"
10
11
#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR
12
#include "inspector/worker_inspector.h"  // ParentInspectorHandle
13
#endif
14
15
#include <string>
16
#include <vector>
17
18
using node::options_parser::kDisallowedInEnvironment;
19
using v8::ArrayBuffer;
20
using v8::Boolean;
21
using v8::Context;
22
using v8::Function;
23
using v8::FunctionCallbackInfo;
24
using v8::FunctionTemplate;
25
using v8::HandleScope;
26
using v8::Integer;
27
using v8::Isolate;
28
using v8::Local;
29
using v8::Locker;
30
using v8::Number;
31
using v8::Object;
32
using v8::SealHandleScope;
33
using v8::String;
34
using v8::Value;
35
36
namespace node {
37
namespace worker {
38
39
namespace {
40
41
#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR
42
169
void StartWorkerInspector(
43
    Environment* child,
44
    std::unique_ptr<inspector::ParentInspectorHandle> parent_handle,
45
    const std::string& url) {
46
169
  child->inspector_agent()->SetParentHandle(std::move(parent_handle));
47
  child->inspector_agent()->Start(url,
48
338
                                  child->options()->debug_options(),
49
                                  child->inspector_host_port(),
50
507
                                  false);
51
169
}
52
53
169
void WaitForWorkerInspectorToStop(Environment* child) {
54
169
  child->inspector_agent()->WaitForDisconnect();
55
169
  child->inspector_agent()->Stop();
56
169
}
57
#endif
58
59
}  // anonymous namespace
60
61
171
Worker::Worker(Environment* env,
62
               Local<Object> wrap,
63
               const std::string& url,
64
               std::shared_ptr<PerIsolateOptions> per_isolate_opts)
65
    : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER),
66
      url_(url),
67
      per_isolate_opts_(per_isolate_opts),
68
171
      platform_(env->isolate_data()->platform()),
69
171
      profiler_idle_notifier_started_(env->profiler_idle_notifier_started()),
70
513
      thread_id_(Environment::AllocateThreadId()) {
71
171
  Debug(this, "Creating new worker instance with thread id %llu", thread_id_);
72
73
  // Set up everything that needs to be set up in the parent environment.
74
171
  parent_port_ = MessagePort::New(env, env->context());
75
171
  if (parent_port_ == nullptr) {
76
    // This can happen e.g. because execution is terminating.
77
171
    return;
78
  }
79
80
171
  child_port_data_.reset(new MessagePortData(nullptr));
81
171
  MessagePort::Entangle(parent_port_, child_port_data_.get());
82
83
171
  object()->Set(env->context(),
84
                env->message_port_string(),
85
1026
                parent_port_->object()).FromJust();
86
87
171
  object()->Set(env->context(),
88
                env->thread_id_string(),
89
1026
                Number::New(env->isolate(), static_cast<double>(thread_id_)))
90
342
      .FromJust();
91
92
#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR
93
342
  inspector_parent_handle_ =
94
171
      env->inspector_agent()->GetParentHandle(thread_id_, url);
95
#endif
96
97
171
  Debug(this, "Preparation for worker %llu finished", thread_id_);
98
}
99
100
21616
bool Worker::is_stopped() const {
101
21616
  Mutex::ScopedLock stopped_lock(stopped_mutex_);
102
21616
  return stopped_;
103
}
104
105
// This class contains data that is only relevant to the child thread itself,
106
// and only while it is running.
107
// (Eventually, the Environment instance should probably also be moved here.)
108
class WorkerThreadData {
109
 public:
110
170
  explicit WorkerThreadData(Worker* w)
111
    : w_(w),
112
170
      array_buffer_allocator_(CreateArrayBufferAllocator()) {
113
170
    CHECK_EQ(uv_loop_init(&loop_), 0);
114
115
170
    Isolate* isolate = NewIsolate(array_buffer_allocator_.get(), &loop_);
116
170
    CHECK_NOT_NULL(isolate);
117
118
    {
119
170
      Locker locker(isolate);
120
340
      Isolate::Scope isolate_scope(isolate);
121
170
      isolate->SetStackLimit(w_->stack_base_);
122
123
340
      HandleScope handle_scope(isolate);
124
      isolate_data_.reset(CreateIsolateData(isolate,
125
                                            &loop_,
126
                                            w_->platform_,
127
170
                                            array_buffer_allocator_.get()));
128
170
      CHECK(isolate_data_);
129
170
      if (w_->per_isolate_opts_)
130
174
        isolate_data_->set_options(std::move(w_->per_isolate_opts_));
131
    }
132
133
170
    Mutex::ScopedLock lock(w_->mutex_);
134
170
    w_->isolate_ = isolate;
135
170
  }
136
137
340
  ~WorkerThreadData() {
138
170
    Debug(w_, "Worker %llu dispose isolate", w_->thread_id_);
139
    Isolate* isolate;
140
    {
141
170
      Mutex::ScopedLock lock(w_->mutex_);
142
170
      isolate = w_->isolate_;
143
170
      w_->isolate_ = nullptr;
144
    }
145
146
170
    w_->platform_->CancelPendingDelayedTasks(isolate);
147
148
170
    isolate_data_.reset();
149
170
    w_->platform_->UnregisterIsolate(isolate);
150
151
170
    isolate->Dispose();
152
153
    // Need to run the loop twice more to close the platform's uv_async_t
154
    // TODO(addaleax): It would be better for the platform itself to provide
155
    // some kind of notification when it has fully cleaned up.
156
170
    uv_run(&loop_, UV_RUN_ONCE);
157
170
    uv_run(&loop_, UV_RUN_ONCE);
158
159
170
    CheckedUvLoopClose(&loop_);
160
170
  }
161
162
 private:
163
  Worker* const w_;
164
  uv_loop_t loop_;
165
  DeleteFnPtr<ArrayBufferAllocator, FreeArrayBufferAllocator>
166
    array_buffer_allocator_;
167
  DeleteFnPtr<IsolateData, FreeIsolateData> isolate_data_;
168
169
  friend class Worker;
170
};
171
172
170
void Worker::Run() {
173
170
  std::string name = "WorkerThread ";
174
170
  name += std::to_string(thread_id_);
175

342
  TRACE_EVENT_METADATA1(
176
      "__metadata", "thread_name", "name",
177
      TRACE_STR_COPY(name.c_str()));
178
170
  CHECK_NOT_NULL(platform_);
179
180
170
  Debug(this, "Creating isolate for worker with id %llu", thread_id_);
181
182
327
  WorkerThreadData data(this);
183
184
170
  Debug(this, "Starting worker with id %llu", thread_id_);
185
  {
186
170
    Locker locker(isolate_);
187
327
    Isolate::Scope isolate_scope(isolate_);
188
327
    SealHandleScope outer_seal(isolate_);
189
170
    bool inspector_started = false;
190
191
327
    DeleteFnPtr<Environment, FreeEnvironment> env_;
192
170
    OnScopeLeave cleanup_env([&]() {
193
340
      if (!env_) return;
194
169
      env_->set_can_call_into_js(false);
195
      Isolate::DisallowJavascriptExecutionScope disallow_js(isolate_,
196
169
          Isolate::DisallowJavascriptExecutionScope::THROW_ON_FAILURE);
197
198
      // Grab the parent-to-child channel and render is unusable.
199
      MessagePort* child_port;
200
      {
201
169
        Mutex::ScopedLock lock(mutex_);
202
169
        child_port = child_port_;
203
169
        child_port_ = nullptr;
204
      }
205
206
      {
207
169
        Context::Scope context_scope(env_->context());
208
169
        if (child_port != nullptr)
209
338
          child_port->Close();
210
169
        env_->stop_sub_worker_contexts();
211
169
        env_->RunCleanup();
212
169
        RunAtExit(env_.get());
213
#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR
214
169
        if (inspector_started)
215
169
          WaitForWorkerInspectorToStop(env_.get());
216
#endif
217
218
        {
219
169
          Mutex::ScopedLock stopped_lock(stopped_mutex_);
220
169
          stopped_ = true;
221
        }
222
223
        // This call needs to be made while the `Environment` is still alive
224
        // because we assume that it is available for async tracking in the
225
        // NodePlatform implementation.
226
169
        platform_->DrainTasks(isolate_);
227
      }
228
496
    });
229
230
    {
231
170
      HandleScope handle_scope(isolate_);
232
170
      Local<Context> context = NewContext(isolate_);
233
170
      if (is_stopped()) return;
234
235
169
      CHECK(!context.IsEmpty());
236
157
      Context::Scope context_scope(context);
237
      {
238
        // TODO(addaleax): Use CreateEnvironment(), or generally another
239
        // public API.
240
        env_.reset(new Environment(data.isolate_data_.get(),
241
                                   context,
242
                                   Environment::kNoFlags,
243
169
                                   thread_id_));
244
169
        CHECK_NOT_NULL(env_);
245
169
        env_->set_abort_on_uncaught_exception(false);
246
169
        env_->set_worker_context(this);
247
248
169
        env_->Start(profiler_idle_notifier_started_);
249
        env_->ProcessCliArgs(std::vector<std::string>{},
250
169
                             std::vector<std::string>{});
251
      }
252
253
169
      Debug(this, "Created Environment for worker with id %llu", thread_id_);
254
255
169
      if (is_stopped()) return;
256
      {
257
169
        HandleScope handle_scope(isolate_);
258
338
        Mutex::ScopedLock lock(mutex_);
259
        // Set up the message channel for receiving messages in the child.
260
        child_port_ = MessagePort::New(env_.get(),
261
                                       env_->context(),
262
169
                                       std::move(child_port_data_));
263
        // MessagePort::New() may return nullptr if execution is terminated
264
        // within it.
265
169
        if (child_port_ != nullptr)
266
169
          env_->set_message_port(child_port_->object(isolate_));
267
268
338
        Debug(this, "Created message port for worker %llu", thread_id_);
269
      }
270
271
169
      if (is_stopped()) return;
272
      {
273
#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR
274
        StartWorkerInspector(env_.get(),
275
169
                             std::move(inspector_parent_handle_),
276
338
                             url_);
277
#endif
278
169
        inspector_started = true;
279
280
169
        HandleScope handle_scope(isolate_);
281
338
        Environment::AsyncCallbackScope callback_scope(env_.get());
282
169
        env_->async_hooks()->push_async_ids(1, 0);
283
338
        if (!RunBootstrapping(env_.get()).IsEmpty()) {
284
159
          USE(StartExecution(env_.get(), "internal/main/worker_thread"));
285
        }
286
287
169
        env_->async_hooks()->pop_async_id(1);
288
289
338
        Debug(this, "Loaded environment for worker %llu", thread_id_);
290
      }
291
292
169
      if (is_stopped()) return;
293
      {
294
157
        SealHandleScope seal(isolate_);
295
        bool more;
296
        env_->performance_state()->Mark(
297
157
            node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START);
298
105
        do {
299
157
          if (is_stopped()) break;
300
157
          uv_run(&data.loop_, UV_RUN_DEFAULT);
301
157
          if (is_stopped()) break;
302
303
105
          platform_->DrainTasks(isolate_);
304
305
105
          more = uv_loop_alive(&data.loop_);
306

105
          if (more && !is_stopped())
307
            continue;
308
309
105
          EmitBeforeExit(env_.get());
310
311
          // Emit `beforeExit` if the loop became alive either after emitting
312
          // event, or after running some callbacks.
313
105
          more = uv_loop_alive(&data.loop_);
314
        } while (more == true);
315
        env_->performance_state()->Mark(
316
157
            node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_EXIT);
317
157
      }
318
    }
319
320
    {
321
      int exit_code;
322
157
      bool stopped = is_stopped();
323
157
      if (!stopped)
324
105
        exit_code = EmitExit(env_.get());
325
157
      Mutex::ScopedLock lock(mutex_);
326

157
      if (exit_code_ == 0 && !stopped)
327
105
        exit_code_ = exit_code;
328
329
      Debug(this, "Exiting thread for worker %llu with exit code %d",
330
314
            thread_id_, exit_code_);
331
157
    }
332
  }
333
334
314
  Debug(this, "Worker %llu thread stops", thread_id_);
335
}
336
337
364
void Worker::JoinThread() {
338
364
  if (thread_joined_)
339
557
    return;
340
170
  CHECK_EQ(uv_thread_join(&tid_), 0);
341
170
  thread_joined_ = true;
342
343
170
  env()->remove_sub_worker_context(this);
344
345
170
  if (thread_exit_async_) {
346
147
    env()->CloseHandle(thread_exit_async_.release(), [](uv_async_t* async) {
347
147
      delete async;
348
317
    });
349
350
170
    if (scheduled_on_thread_stopped_)
351
46
      OnThreadStopped();
352
  }
353
}
354
355
170
void Worker::OnThreadStopped() {
356
  {
357
170
    Mutex::ScopedLock lock(mutex_);
358
170
    scheduled_on_thread_stopped_ = false;
359
360
170
    Debug(this, "Worker %llu thread stopped", thread_id_);
361
362
    {
363
170
      Mutex::ScopedLock stopped_lock(stopped_mutex_);
364
170
      CHECK(stopped_);
365
    }
366
367
170
    parent_port_ = nullptr;
368
  }
369
370
170
  JoinThread();
371
372
  {
373
170
    HandleScope handle_scope(env()->isolate());
374
170
    Context::Scope context_scope(env()->context());
375
376
    // Reset the parent port as we're closing it now anyway.
377
170
    object()->Set(env()->context(),
378
                  env()->message_port_string(),
379
1020
                  Undefined(env()->isolate())).FromJust();
380
381
170
    Local<Value> code = Integer::New(env()->isolate(), exit_code_);
382
337
    MakeCallback(env()->onexit_string(), 1, &code);
383
  }
384
385
  // JoinThread() cleared all libuv handles bound to this Worker,
386
  // the C++ object is no longer needed for anything now.
387
167
  MakeWeak();
388
167
}
389
390
444
Worker::~Worker() {
391
148
  Mutex::ScopedLock lock(mutex_);
392
148
  JoinThread();
393
394
148
  CHECK(stopped_);
395
148
  CHECK(thread_joined_);
396
397
  // This has most likely already happened within the worker thread -- this
398
  // is just in case Worker creation failed early.
399
400
296
  Debug(this, "Worker %llu destroyed", thread_id_);
401
296
}
402
403
174
void Worker::New(const FunctionCallbackInfo<Value>& args) {
404
174
  Environment* env = Environment::GetCurrent(args);
405
406
174
  CHECK(args.IsConstructCall());
407
408
174
  if (env->isolate_data()->platform() == nullptr) {
409
    THROW_ERR_MISSING_PLATFORM_FOR_WORKER(env);
410
    return;
411
  }
412
413
174
  std::string url;
414
345
  std::shared_ptr<PerIsolateOptions> per_isolate_opts = nullptr;
415
416
  // Argument might be a string or URL
417


870
  if (args.Length() > 0 && !args[0]->IsNullOrUndefined()) {
418
    Utf8Value value(
419
        args.GetIsolate(),
420
208
        args[0]->ToString(env->context()).FromMaybe(v8::Local<v8::String>()));
421
52
    url.append(value.out(), value.length());
422
423


208
    if (args.Length() > 1 && args[1]->IsArray()) {
424
14
      v8::Local<v8::Array> array = args[1].As<v8::Array>();
425
      // The first argument is reserved for program name, but we don't need it
426
      // in workers.
427
7
      std::vector<std::string> exec_argv = {""};
428
7
      uint32_t length = array->Length();
429
28
      for (uint32_t i = 0; i < length; i++) {
430
        v8::Local<v8::Value> arg;
431
21
        if (!array->Get(env->context(), i).ToLocal(&arg)) {
432
          return;
433
        }
434
        v8::MaybeLocal<v8::String> arg_v8_string =
435
14
            arg->ToString(env->context());
436
7
        if (arg_v8_string.IsEmpty()) {
437
          return;
438
        }
439
        Utf8Value arg_utf8_value(
440
            args.GetIsolate(),
441
7
            arg_v8_string.FromMaybe(v8::Local<v8::String>()));
442
14
        std::string arg_string(arg_utf8_value.out(), arg_utf8_value.length());
443
7
        exec_argv.push_back(arg_string);
444
7
      }
445
446
11
      std::vector<std::string> invalid_args{};
447
11
      std::vector<std::string> errors{};
448
7
      per_isolate_opts.reset(new PerIsolateOptions());
449
450
      // Using invalid_args as the v8_args argument as it stores unknown
451
      // options for the per isolate parser.
452
      options_parser::PerIsolateOptionsParser::instance.Parse(
453
          &exec_argv,
454
          nullptr,
455
          &invalid_args,
456
          per_isolate_opts.get(),
457
          kDisallowedInEnvironment,
458
7
          &errors);
459
460
      // The first argument is program name.
461
7
      invalid_args.erase(invalid_args.begin());
462

7
      if (errors.size() > 0 || invalid_args.size() > 0) {
463
        v8::Local<v8::Value> value =
464
            ToV8Value(env->context(),
465
3
                      errors.size() > 0 ? errors : invalid_args)
466
6
                .ToLocalChecked();
467
        Local<String> key =
468
3
            FIXED_ONE_BYTE_STRING(env->isolate(), "invalidExecArgv");
469
12
        args.This()->Set(env->context(), key, value).FromJust();
470
3
        return;
471
4
      }
472
49
    }
473
  }
474
342
  new Worker(env, args.This(), url, per_isolate_opts);
475
}
476
477
170
void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
478
  Worker* w;
479
340
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
480
170
  Mutex::ScopedLock lock(w->mutex_);
481
482
170
  w->env()->add_sub_worker_context(w);
483
170
  w->stopped_ = false;
484
170
  w->thread_joined_ = false;
485
486
170
  w->thread_exit_async_.reset(new uv_async_t);
487
170
  w->thread_exit_async_->data = w;
488
588
  CHECK_EQ(uv_async_init(w->env()->event_loop(),
489
                         w->thread_exit_async_.get(),
490
                         [](uv_async_t* handle) {
491
    static_cast<Worker*>(handle->data)->OnThreadStopped();
492
  }), 0);
493
494
  uv_thread_options_t thread_options;
495
170
  thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
496
170
  thread_options.stack_size = kStackSize;
497

680
  CHECK_EQ(uv_thread_create_ex(&w->tid_, &thread_options, [](void* arg) {
498
    Worker* w = static_cast<Worker*>(arg);
499
    const uintptr_t stack_top = reinterpret_cast<uintptr_t>(&arg);
500
501
    // Leave a few kilobytes just to make sure we're within limits and have
502
    // some space to do work in C++ land.
503
    w->stack_base_ = stack_top - (kStackSize - kStackBufferSize);
504
505
    w->Run();
506
507
    Mutex::ScopedLock lock(w->mutex_);
508
    CHECK(w->thread_exit_async_);
509
    w->scheduled_on_thread_stopped_ = true;
510
    uv_async_send(w->thread_exit_async_.get());
511
170
  }, static_cast<void*>(w)), 0);
512
}
513
514
23
void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
515
  Worker* w;
516
46
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
517
518
23
  Debug(w, "Worker %llu is getting stopped by parent", w->thread_id_);
519
23
  w->Exit(1);
520
23
  w->JoinThread();
521
}
522
523
1
void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
524
  Worker* w;
525
2
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
526
1
  if (w->thread_exit_async_)
527
1
    uv_ref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get()));
528
}
529
530
3
void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
531
  Worker* w;
532
6
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
533
3
  if (w->thread_exit_async_)
534
3
    uv_unref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get()));
535
}
536
537
66
void Worker::Exit(int code) {
538
66
  Mutex::ScopedLock lock(mutex_);
539
132
  Mutex::ScopedLock stopped_lock(stopped_mutex_);
540
541
66
  Debug(this, "Worker %llu called Exit(%d)", thread_id_, code);
542
543
66
  if (!stopped_) {
544
65
    stopped_ = true;
545
65
    exit_code_ = code;
546
65
    if (child_port_ != nullptr)
547
64
      child_port_->StopEventLoop();
548
65
    if (isolate_ != nullptr)
549
64
      isolate_->TerminateExecution();
550
66
  }
551
66
}
552
553
namespace {
554
555
// Return the MessagePort that is global for this Environment and communicates
556
// with the internal [kPort] port of the JS Worker class in the parent thread.
557
318
void GetEnvMessagePort(const FunctionCallbackInfo<Value>& args) {
558
318
  Environment* env = Environment::GetCurrent(args);
559
318
  Local<Object> port = env->message_port();
560
318
  if (!port.IsEmpty()) {
561
954
    CHECK_EQ(port->CreationContext()->GetIsolate(), args.GetIsolate());
562
636
    args.GetReturnValue().Set(port);
563
  }
564
318
}
565
566
4391
void InitWorker(Local<Object> target,
567
                Local<Value> unused,
568
                Local<Context> context,
569
                void* priv) {
570
4391
  Environment* env = Environment::GetCurrent(context);
571
572
  {
573
4391
    Local<FunctionTemplate> w = env->NewFunctionTemplate(Worker::New);
574
575
8782
    w->InstanceTemplate()->SetInternalFieldCount(1);
576
8782
    w->Inherit(AsyncWrap::GetConstructorTemplate(env));
577
578
4391
    env->SetProtoMethod(w, "startThread", Worker::StartThread);
579
4391
    env->SetProtoMethod(w, "stopThread", Worker::StopThread);
580
4391
    env->SetProtoMethod(w, "ref", Worker::Ref);
581
4391
    env->SetProtoMethod(w, "unref", Worker::Unref);
582
583
    Local<String> workerString =
584
4391
        FIXED_ONE_BYTE_STRING(env->isolate(), "Worker");
585
4391
    w->SetClassName(workerString);
586
    target->Set(env->context(),
587
                workerString,
588
21955
                w->GetFunction(env->context()).ToLocalChecked()).FromJust();
589
  }
590
591
4391
  env->SetMethod(target, "getEnvMessagePort", GetEnvMessagePort);
592
593
  target
594
      ->Set(env->context(),
595
            env->thread_id_string(),
596
21955
            Number::New(env->isolate(), static_cast<double>(env->thread_id())))
597
8782
      .FromJust();
598
599
  target
600
      ->Set(env->context(),
601
            FIXED_ONE_BYTE_STRING(env->isolate(), "isMainThread"),
602
21955
            Boolean::New(env->isolate(), env->is_main_thread()))
603
8782
      .FromJust();
604
605
  target
606
      ->Set(env->context(),
607
            FIXED_ONE_BYTE_STRING(env->isolate(), "ownsProcessState"),
608
21955
            Boolean::New(env->isolate(), env->owns_process_state()))
609
8782
      .FromJust();
610
4391
}
611
612
}  // anonymous namespace
613
614
}  // namespace worker
615
}  // namespace node
616
617
4282
NODE_MODULE_CONTEXT_AWARE_INTERNAL(worker, node::worker::InitWorker)