GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/node_worker.cc Lines: 277 286 96.9 %
Date: 2019-02-13 22:28:58 Branches: 117 162 72.2 %

Line Branch Exec Source
1
#include "node_worker.h"
2
#include "debug_utils.h"
3
#include "node_errors.h"
4
#include "node_buffer.h"
5
#include "node_perf.h"
6
#include "util.h"
7
#include "util-inl.h"
8
#include "async_wrap.h"
9
#include "async_wrap-inl.h"
10
11
#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR
12
#include "inspector/worker_inspector.h"  // ParentInspectorHandle
13
#endif
14
15
#include <string>
16
#include <vector>
17
18
using node::options_parser::kDisallowedInEnvironment;
19
using v8::ArrayBuffer;
20
using v8::Boolean;
21
using v8::Context;
22
using v8::Function;
23
using v8::FunctionCallbackInfo;
24
using v8::FunctionTemplate;
25
using v8::HandleScope;
26
using v8::Integer;
27
using v8::Isolate;
28
using v8::Local;
29
using v8::Locker;
30
using v8::Number;
31
using v8::Object;
32
using v8::SealHandleScope;
33
using v8::String;
34
using v8::Value;
35
36
namespace node {
37
namespace worker {
38
39
namespace {
40
41
#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR
42
151
void StartWorkerInspector(
43
    Environment* child,
44
    std::unique_ptr<inspector::ParentInspectorHandle> parent_handle,
45
    const std::string& url) {
46
151
  child->inspector_agent()->SetParentHandle(std::move(parent_handle));
47
  child->inspector_agent()->Start(url,
48
302
                                  child->options()->debug_options(),
49
                                  child->inspector_host_port(),
50
453
                                  false);
51
151
}
52
53
151
void WaitForWorkerInspectorToStop(Environment* child) {
54
151
  child->inspector_agent()->WaitForDisconnect();
55
151
  child->inspector_agent()->Stop();
56
151
}
57
#endif
58
59
}  // anonymous namespace
60
61
155
Worker::Worker(Environment* env,
62
               Local<Object> wrap,
63
               const std::string& url,
64
               std::shared_ptr<PerIsolateOptions> per_isolate_opts)
65
    : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER),
66
      url_(url),
67
      per_isolate_opts_(per_isolate_opts),
68
155
      platform_(env->isolate_data()->platform()),
69
155
      profiler_idle_notifier_started_(env->profiler_idle_notifier_started()),
70
465
      thread_id_(Environment::AllocateThreadId()) {
71
155
  Debug(this, "Creating new worker instance with thread id %llu", thread_id_);
72
73
  // Set up everything that needs to be set up in the parent environment.
74
155
  parent_port_ = MessagePort::New(env, env->context());
75
155
  if (parent_port_ == nullptr) {
76
    // This can happen e.g. because execution is terminating.
77
155
    return;
78
  }
79
80
155
  child_port_data_.reset(new MessagePortData(nullptr));
81
155
  MessagePort::Entangle(parent_port_, child_port_data_.get());
82
83
155
  object()->Set(env->context(),
84
                env->message_port_string(),
85
930
                parent_port_->object()).FromJust();
86
87
155
  object()->Set(env->context(),
88
                env->thread_id_string(),
89
930
                Number::New(env->isolate(), static_cast<double>(thread_id_)))
90
310
      .FromJust();
91
92
#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR
93
310
  inspector_parent_handle_ =
94
155
      env->inspector_agent()->GetParentHandle(thread_id_, url);
95
#endif
96
97
155
  Debug(this, "Preparation for worker %llu finished", thread_id_);
98
}
99
100
21101
bool Worker::is_stopped() const {
101
21101
  Mutex::ScopedLock stopped_lock(stopped_mutex_);
102
21103
  return stopped_;
103
}
104
105
// This class contains data that is only relevant to the child thread itself,
106
// and only while it is running.
107
// (Eventually, the Environment instance should probably also be moved here.)
108
class WorkerThreadData {
109
 public:
110
154
  explicit WorkerThreadData(Worker* w)
111
    : w_(w),
112
154
      array_buffer_allocator_(CreateArrayBufferAllocator()) {
113
154
    CHECK_EQ(uv_loop_init(&loop_), 0);
114
115
154
    Isolate* isolate = NewIsolate(array_buffer_allocator_.get(), &loop_);
116
154
    CHECK_NOT_NULL(isolate);
117
118
    {
119
154
      Locker locker(isolate);
120
308
      Isolate::Scope isolate_scope(isolate);
121
154
      isolate->SetStackLimit(w_->stack_base_);
122
123
308
      HandleScope handle_scope(isolate);
124
      isolate_data_.reset(CreateIsolateData(isolate,
125
                                            &loop_,
126
                                            w_->platform_,
127
154
                                            array_buffer_allocator_.get()));
128
154
      CHECK(isolate_data_);
129
154
      if (w_->per_isolate_opts_)
130
157
        isolate_data_->set_options(std::move(w_->per_isolate_opts_));
131
    }
132
133
154
    Mutex::ScopedLock lock(w_->mutex_);
134
154
    w_->isolate_ = isolate;
135
154
  }
136
137
308
  ~WorkerThreadData() {
138
154
    Debug(w_, "Worker %llu dispose isolate", w_->thread_id_);
139
    Isolate* isolate;
140
    {
141
154
      Mutex::ScopedLock lock(w_->mutex_);
142
154
      isolate = w_->isolate_;
143
154
      w_->isolate_ = nullptr;
144
    }
145
146
154
    w_->platform_->CancelPendingDelayedTasks(isolate);
147
148
154
    isolate_data_.reset();
149
154
    w_->platform_->UnregisterIsolate(isolate);
150
151
154
    isolate->Dispose();
152
153
    // Need to run the loop one more time to close the platform's uv_async_t
154
154
    uv_run(&loop_, UV_RUN_ONCE);
155
156
154
    CheckedUvLoopClose(&loop_);
157
154
  }
158
159
 private:
160
  Worker* const w_;
161
  uv_loop_t loop_;
162
  DeleteFnPtr<ArrayBufferAllocator, FreeArrayBufferAllocator>
163
    array_buffer_allocator_;
164
  DeleteFnPtr<IsolateData, FreeIsolateData> isolate_data_;
165
166
  friend class Worker;
167
};
168
169
154
void Worker::Run() {
170
154
  std::string name = "WorkerThread ";
171
154
  name += std::to_string(thread_id_);
172

155
  TRACE_EVENT_METADATA1(
173
      "__metadata", "thread_name", "name",
174
      TRACE_STR_COPY(name.c_str()));
175
154
  CHECK_NOT_NULL(platform_);
176
177
154
  Debug(this, "Creating isolate for worker with id %llu", thread_id_);
178
179
295
  WorkerThreadData data(this);
180
181
154
  Debug(this, "Starting worker with id %llu", thread_id_);
182
  {
183
154
    Locker locker(isolate_);
184
295
    Isolate::Scope isolate_scope(isolate_);
185
295
    SealHandleScope outer_seal(isolate_);
186
154
    bool inspector_started = false;
187
188
295
    DeleteFnPtr<Environment, FreeEnvironment> env_;
189
154
    OnScopeLeave cleanup_env([&]() {
190
308
      if (!env_) return;
191
151
      env_->set_can_call_into_js(false);
192
      Isolate::DisallowJavascriptExecutionScope disallow_js(isolate_,
193
151
          Isolate::DisallowJavascriptExecutionScope::THROW_ON_FAILURE);
194
195
      // Grab the parent-to-child channel and render is unusable.
196
      MessagePort* child_port;
197
      {
198
151
        Mutex::ScopedLock lock(mutex_);
199
151
        child_port = child_port_;
200
151
        child_port_ = nullptr;
201
      }
202
203
      {
204
151
        Context::Scope context_scope(env_->context());
205
151
        if (child_port != nullptr)
206
302
          child_port->Close();
207
151
        env_->stop_sub_worker_contexts();
208
151
        env_->RunCleanup();
209
151
        RunAtExit(env_.get());
210
#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR
211
151
        if (inspector_started)
212
151
          WaitForWorkerInspectorToStop(env_.get());
213
#endif
214
215
        {
216
151
          Mutex::ScopedLock stopped_lock(stopped_mutex_);
217
151
          stopped_ = true;
218
        }
219
220
151
        env_->RunCleanup();
221
222
        // This call needs to be made while the `Environment` is still alive
223
        // because we assume that it is available for async tracking in the
224
        // NodePlatform implementation.
225
151
        platform_->DrainTasks(isolate_);
226
      }
227
446
    });
228
229
    {
230
154
      HandleScope handle_scope(isolate_);
231
154
      Local<Context> context = NewContext(isolate_);
232
154
      if (is_stopped()) return;
233
234
151
      CHECK(!context.IsEmpty());
235
141
      Context::Scope context_scope(context);
236
      {
237
        // TODO(addaleax): Use CreateEnvironment(), or generally another
238
        // public API.
239
        env_.reset(new Environment(data.isolate_data_.get(),
240
                                   context,
241
                                   Environment::kNoFlags,
242
151
                                   thread_id_));
243
151
        CHECK_NOT_NULL(env_);
244
151
        env_->set_abort_on_uncaught_exception(false);
245
151
        env_->set_worker_context(this);
246
247
151
        env_->Start(profiler_idle_notifier_started_);
248
        env_->ProcessCliArgs(std::vector<std::string>{},
249
151
                             std::vector<std::string>{});
250
      }
251
252
151
      Debug(this, "Created Environment for worker with id %llu", thread_id_);
253
254
151
      if (is_stopped()) return;
255
      {
256
151
        HandleScope handle_scope(isolate_);
257
302
        Mutex::ScopedLock lock(mutex_);
258
        // Set up the message channel for receiving messages in the child.
259
        child_port_ = MessagePort::New(env_.get(),
260
                                       env_->context(),
261
151
                                       std::move(child_port_data_));
262
        // MessagePort::New() may return nullptr if execution is terminated
263
        // within it.
264
151
        if (child_port_ != nullptr)
265
151
          env_->set_message_port(child_port_->object(isolate_));
266
267
302
        Debug(this, "Created message port for worker %llu", thread_id_);
268
      }
269
270
151
      if (is_stopped()) return;
271
      {
272
#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR
273
        StartWorkerInspector(env_.get(),
274
151
                             std::move(inspector_parent_handle_),
275
302
                             url_);
276
#endif
277
151
        inspector_started = true;
278
279
151
        HandleScope handle_scope(isolate_);
280
302
        Environment::AsyncCallbackScope callback_scope(env_.get());
281
151
        env_->async_hooks()->push_async_ids(1, 0);
282
302
        if (!RunBootstrapping(env_.get()).IsEmpty()) {
283
141
          USE(StartExecution(env_.get(), "internal/main/worker_thread"));
284
        }
285
286
151
        env_->async_hooks()->pop_async_id(1);
287
288
302
        Debug(this, "Loaded environment for worker %llu", thread_id_);
289
      }
290
291
151
      if (is_stopped()) return;
292
      {
293
141
        SealHandleScope seal(isolate_);
294
        bool more;
295
        env_->performance_state()->Mark(
296
141
            node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START);
297
98
        do {
298
141
          if (is_stopped()) break;
299
141
          uv_run(&data.loop_, UV_RUN_DEFAULT);
300
141
          if (is_stopped()) break;
301
302
98
          platform_->DrainTasks(isolate_);
303
304
98
          more = uv_loop_alive(&data.loop_);
305

98
          if (more && !is_stopped())
306
            continue;
307
308
98
          EmitBeforeExit(env_.get());
309
310
          // Emit `beforeExit` if the loop became alive either after emitting
311
          // event, or after running some callbacks.
312
98
          more = uv_loop_alive(&data.loop_);
313
        } while (more == true);
314
        env_->performance_state()->Mark(
315
141
            node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_EXIT);
316
141
      }
317
    }
318
319
    {
320
      int exit_code;
321
141
      bool stopped = is_stopped();
322
141
      if (!stopped)
323
98
        exit_code = EmitExit(env_.get());
324
141
      Mutex::ScopedLock lock(mutex_);
325

141
      if (exit_code_ == 0 && !stopped)
326
98
        exit_code_ = exit_code;
327
328
      Debug(this, "Exiting thread for worker %llu with exit code %d",
329
282
            thread_id_, exit_code_);
330
141
    }
331
  }
332
333
282
  Debug(this, "Worker %llu thread stops", thread_id_);
334
}
335
336
329
void Worker::JoinThread() {
337
329
  if (thread_joined_)
338
503
    return;
339
154
  CHECK_EQ(uv_thread_join(&tid_), 0);
340
154
  thread_joined_ = true;
341
342
154
  env()->remove_sub_worker_context(this);
343
344
154
  if (thread_exit_async_) {
345
131
    env()->CloseHandle(thread_exit_async_.release(), [](uv_async_t* async) {
346
131
      delete async;
347
285
    });
348
349
154
    if (scheduled_on_thread_stopped_)
350
43
      OnThreadStopped();
351
  }
352
}
353
354
154
void Worker::OnThreadStopped() {
355
  {
356
154
    Mutex::ScopedLock lock(mutex_);
357
154
    scheduled_on_thread_stopped_ = false;
358
359
154
    Debug(this, "Worker %llu thread stopped", thread_id_);
360
361
    {
362
154
      Mutex::ScopedLock stopped_lock(stopped_mutex_);
363
154
      CHECK(stopped_);
364
    }
365
366
154
    parent_port_ = nullptr;
367
  }
368
369
154
  JoinThread();
370
371
  {
372
154
    HandleScope handle_scope(env()->isolate());
373
154
    Context::Scope context_scope(env()->context());
374
375
    // Reset the parent port as we're closing it now anyway.
376
154
    object()->Set(env()->context(),
377
                  env()->message_port_string(),
378
924
                  Undefined(env()->isolate())).FromJust();
379
380
154
    Local<Value> code = Integer::New(env()->isolate(), exit_code_);
381
305
    MakeCallback(env()->onexit_string(), 1, &code);
382
  }
383
384
  // JoinThread() cleared all libuv handles bound to this Worker,
385
  // the C++ object is no longer needed for anything now.
386
151
  MakeWeak();
387
151
}
388
389
396
Worker::~Worker() {
390
132
  Mutex::ScopedLock lock(mutex_);
391
132
  JoinThread();
392
393
132
  CHECK(stopped_);
394
132
  CHECK(thread_joined_);
395
396
  // This has most likely already happened within the worker thread -- this
397
  // is just in case Worker creation failed early.
398
399
264
  Debug(this, "Worker %llu destroyed", thread_id_);
400
264
}
401
402
158
void Worker::New(const FunctionCallbackInfo<Value>& args) {
403
158
  Environment* env = Environment::GetCurrent(args);
404
405
158
  CHECK(args.IsConstructCall());
406
407
158
  if (env->isolate_data()->platform() == nullptr) {
408
    THROW_ERR_MISSING_PLATFORM_FOR_WORKER(env);
409
    return;
410
  }
411
412
158
  std::string url;
413
313
  std::shared_ptr<PerIsolateOptions> per_isolate_opts = nullptr;
414
415
  // Argument might be a string or URL
416


790
  if (args.Length() > 0 && !args[0]->IsNullOrUndefined()) {
417
    Utf8Value value(
418
        args.GetIsolate(),
419
184
        args[0]->ToString(env->context()).FromMaybe(v8::Local<v8::String>()));
420
46
    url.append(value.out(), value.length());
421
422


184
    if (args.Length() > 1 && args[1]->IsArray()) {
423
12
      v8::Local<v8::Array> array = args[1].As<v8::Array>();
424
      // The first argument is reserved for program name, but we don't need it
425
      // in workers.
426
6
      std::vector<std::string> exec_argv = {""};
427
6
      uint32_t length = array->Length();
428
24
      for (uint32_t i = 0; i < length; i++) {
429
        v8::Local<v8::Value> arg;
430
18
        if (!array->Get(env->context(), i).ToLocal(&arg)) {
431
          return;
432
        }
433
        v8::MaybeLocal<v8::String> arg_v8_string =
434
12
            arg->ToString(env->context());
435
6
        if (arg_v8_string.IsEmpty()) {
436
          return;
437
        }
438
        Utf8Value arg_utf8_value(
439
            args.GetIsolate(),
440
6
            arg_v8_string.FromMaybe(v8::Local<v8::String>()));
441
12
        std::string arg_string(arg_utf8_value.out(), arg_utf8_value.length());
442
6
        exec_argv.push_back(arg_string);
443
6
      }
444
445
9
      std::vector<std::string> invalid_args{};
446
9
      std::vector<std::string> errors{};
447
6
      per_isolate_opts.reset(new PerIsolateOptions());
448
449
      // Using invalid_args as the v8_args argument as it stores unknown
450
      // options for the per isolate parser.
451
      options_parser::PerIsolateOptionsParser::instance.Parse(
452
          &exec_argv,
453
          nullptr,
454
          &invalid_args,
455
          per_isolate_opts.get(),
456
          kDisallowedInEnvironment,
457
6
          &errors);
458
459
      // The first argument is program name.
460
6
      invalid_args.erase(invalid_args.begin());
461

6
      if (errors.size() > 0 || invalid_args.size() > 0) {
462
        v8::Local<v8::Value> value =
463
            ToV8Value(env->context(),
464
3
                      errors.size() > 0 ? errors : invalid_args)
465
6
                .ToLocalChecked();
466
        Local<String> key =
467
3
            FIXED_ONE_BYTE_STRING(env->isolate(), "invalidExecArgv");
468
12
        args.This()->Set(env->context(), key, value).FromJust();
469
3
        return;
470
3
      }
471
43
    }
472
  }
473
310
  new Worker(env, args.This(), url, per_isolate_opts);
474
}
475
476
154
void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
477
  Worker* w;
478
308
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
479
154
  Mutex::ScopedLock lock(w->mutex_);
480
481
154
  w->env()->add_sub_worker_context(w);
482
154
  w->stopped_ = false;
483
154
  w->thread_joined_ = false;
484
485
154
  w->thread_exit_async_.reset(new uv_async_t);
486
154
  w->thread_exit_async_->data = w;
487
530
  CHECK_EQ(uv_async_init(w->env()->event_loop(),
488
                         w->thread_exit_async_.get(),
489
                         [](uv_async_t* handle) {
490
    static_cast<Worker*>(handle->data)->OnThreadStopped();
491
  }), 0);
492
493
  uv_thread_options_t thread_options;
494
154
  thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
495
154
  thread_options.stack_size = kStackSize;
496

616
  CHECK_EQ(uv_thread_create_ex(&w->tid_, &thread_options, [](void* arg) {
497
    Worker* w = static_cast<Worker*>(arg);
498
    const uintptr_t stack_top = reinterpret_cast<uintptr_t>(&arg);
499
500
    // Leave a few kilobytes just to make sure we're within limits and have
501
    // some space to do work in C++ land.
502
    w->stack_base_ = stack_top - (kStackSize - kStackBufferSize);
503
504
    w->Run();
505
506
    Mutex::ScopedLock lock(w->mutex_);
507
    CHECK(w->thread_exit_async_);
508
    w->scheduled_on_thread_stopped_ = true;
509
    uv_async_send(w->thread_exit_async_.get());
510
154
  }, static_cast<void*>(w)), 0);
511
}
512
513
21
void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
514
  Worker* w;
515
42
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
516
517
21
  Debug(w, "Worker %llu is getting stopped by parent", w->thread_id_);
518
21
  w->Exit(1);
519
21
  w->JoinThread();
520
}
521
522
void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
523
  Worker* w;
524
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
525
  if (w->thread_exit_async_)
526
    uv_ref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get()));
527
}
528
529
1
void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
530
  Worker* w;
531
2
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
532
1
  if (w->thread_exit_async_)
533
1
    uv_unref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get()));
534
}
535
536
57
void Worker::Exit(int code) {
537
57
  Mutex::ScopedLock lock(mutex_);
538
114
  Mutex::ScopedLock stopped_lock(stopped_mutex_);
539
540
57
  Debug(this, "Worker %llu called Exit(%d)", thread_id_, code);
541
542
57
  if (!stopped_) {
543
56
    stopped_ = true;
544
56
    exit_code_ = code;
545
56
    if (child_port_ != nullptr)
546
53
      child_port_->StopEventLoop();
547
56
    if (isolate_ != nullptr)
548
54
      isolate_->TerminateExecution();
549
57
  }
550
57
}
551
552
namespace {
553
554
// Return the MessagePort that is global for this Environment and communicates
555
// with the internal [kPort] port of the JS Worker class in the parent thread.
556
287
void GetEnvMessagePort(const FunctionCallbackInfo<Value>& args) {
557
287
  Environment* env = Environment::GetCurrent(args);
558
287
  Local<Object> port = env->message_port();
559
287
  if (!port.IsEmpty()) {
560
861
    CHECK_EQ(port->CreationContext()->GetIsolate(), args.GetIsolate());
561
574
    args.GetReturnValue().Set(port);
562
  }
563
287
}
564
565
4407
void InitWorker(Local<Object> target,
566
                Local<Value> unused,
567
                Local<Context> context,
568
                void* priv) {
569
4407
  Environment* env = Environment::GetCurrent(context);
570
571
  {
572
4407
    Local<FunctionTemplate> w = env->NewFunctionTemplate(Worker::New);
573
574
8814
    w->InstanceTemplate()->SetInternalFieldCount(1);
575
8814
    w->Inherit(AsyncWrap::GetConstructorTemplate(env));
576
577
4407
    env->SetProtoMethod(w, "startThread", Worker::StartThread);
578
4407
    env->SetProtoMethod(w, "stopThread", Worker::StopThread);
579
4407
    env->SetProtoMethod(w, "ref", Worker::Ref);
580
4407
    env->SetProtoMethod(w, "unref", Worker::Unref);
581
582
    Local<String> workerString =
583
4407
        FIXED_ONE_BYTE_STRING(env->isolate(), "Worker");
584
4407
    w->SetClassName(workerString);
585
    target->Set(env->context(),
586
                workerString,
587
22035
                w->GetFunction(env->context()).ToLocalChecked()).FromJust();
588
  }
589
590
4407
  env->SetMethod(target, "getEnvMessagePort", GetEnvMessagePort);
591
592
  target
593
      ->Set(env->context(),
594
            env->thread_id_string(),
595
22035
            Number::New(env->isolate(), static_cast<double>(env->thread_id())))
596
8814
      .FromJust();
597
598
  target
599
      ->Set(env->context(),
600
            FIXED_ONE_BYTE_STRING(env->isolate(), "isMainThread"),
601
22035
            Boolean::New(env->isolate(), env->is_main_thread()))
602
8814
      .FromJust();
603
604
  target
605
      ->Set(env->context(),
606
            FIXED_ONE_BYTE_STRING(env->isolate(), "ownsProcessState"),
607
22035
            Boolean::New(env->isolate(), env->owns_process_state()))
608
8814
      .FromJust();
609
4407
}
610
611
}  // anonymous namespace
612
613
}  // namespace worker
614
}  // namespace node
615
616
4314
NODE_MODULE_CONTEXT_AWARE_INTERNAL(worker, node::worker::InitWorker)