GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/node_worker.cc Lines: 19 267 7.1 %
Date: 2019-02-01 22:03:38 Branches: 0 148 0.0 %

Line Branch Exec Source
1
#include "node_worker.h"
2
#include "debug_utils.h"
3
#include "node_errors.h"
4
#include "node_buffer.h"
5
#include "node_perf.h"
6
#include "util.h"
7
#include "util-inl.h"
8
#include "async_wrap.h"
9
#include "async_wrap-inl.h"
10
11
#include <string>
12
#include <vector>
13
14
using node::options_parser::kDisallowedInEnvironment;
15
using v8::ArrayBuffer;
16
using v8::Boolean;
17
using v8::Context;
18
using v8::Function;
19
using v8::FunctionCallbackInfo;
20
using v8::FunctionTemplate;
21
using v8::HandleScope;
22
using v8::Integer;
23
using v8::Isolate;
24
using v8::Local;
25
using v8::Locker;
26
using v8::Number;
27
using v8::Object;
28
using v8::SealHandleScope;
29
using v8::String;
30
using v8::Value;
31
32
namespace node {
33
namespace worker {
34
35
namespace {
36
37
#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR
38
void StartWorkerInspector(Environment* child, const std::string& url) {
39
  child->inspector_agent()->Start(url,
40
                                  child->options()->debug_options(),
41
                                  child->inspector_host_port(),
42
                                  false);
43
}
44
45
void AddWorkerInspector(Environment* parent,
46
                        Environment* child,
47
                        int id,
48
                        const std::string& url) {
49
  parent->inspector_agent()->AddWorkerInspector(id, url,
50
                                                child->inspector_agent());
51
}
52
53
void WaitForWorkerInspectorToStop(Environment* child) {
54
  child->inspector_agent()->WaitForDisconnect();
55
  child->inspector_agent()->Stop();
56
}
57
58
#else
59
// No-ops
60
void StartWorkerInspector(Environment* child, const std::string& url) {}
61
void AddWorkerInspector(Environment* parent,
62
                        Environment* child,
63
                        int id,
64
                        const std::string& url) {}
65
void WaitForWorkerInspectorToStop(Environment* child) {}
66
#endif
67
68
}  // anonymous namespace
69
70
Worker::Worker(Environment* env,
71
               Local<Object> wrap,
72
               const std::string& url,
73
               std::shared_ptr<PerIsolateOptions> per_isolate_opts)
74
    : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER), url_(url) {
75
  Debug(this, "Creating new worker instance at %p", static_cast<void*>(this));
76
77
  // Set up everything that needs to be set up in the parent environment.
78
  parent_port_ = MessagePort::New(env, env->context());
79
  if (parent_port_ == nullptr) {
80
    // This can happen e.g. because execution is terminating.
81
    return;
82
  }
83
84
  child_port_data_.reset(new MessagePortData(nullptr));
85
  MessagePort::Entangle(parent_port_, child_port_data_.get());
86
87
  object()->Set(env->context(),
88
                env->message_port_string(),
89
                parent_port_->object()).FromJust();
90
91
  array_buffer_allocator_.reset(CreateArrayBufferAllocator());
92
93
  CHECK_EQ(uv_loop_init(&loop_), 0);
94
  isolate_ = NewIsolate(array_buffer_allocator_.get(), &loop_);
95
  CHECK_NE(isolate_, nullptr);
96
97
  {
98
    // Enter an environment capable of executing code in the child Isolate
99
    // (and only in it).
100
    Locker locker(isolate_);
101
    Isolate::Scope isolate_scope(isolate_);
102
    HandleScope handle_scope(isolate_);
103
104
    isolate_data_.reset(CreateIsolateData(isolate_,
105
                                          &loop_,
106
                                          env->isolate_data()->platform(),
107
                                          array_buffer_allocator_.get()));
108
    if (per_isolate_opts != nullptr) {
109
      isolate_data_->set_options(per_isolate_opts);
110
    }
111
    CHECK(isolate_data_);
112
113
    Local<Context> context = NewContext(isolate_);
114
    Context::Scope context_scope(context);
115
116
    // TODO(addaleax): Use CreateEnvironment(), or generally another public API.
117
    env_.reset(new Environment(isolate_data_.get(), context));
118
    CHECK_NE(env_, nullptr);
119
    env_->set_abort_on_uncaught_exception(false);
120
    env_->set_worker_context(this);
121
    thread_id_ = env_->thread_id();
122
123
    env_->Start(env->profiler_idle_notifier_started());
124
    env_->ProcessCliArgs(std::vector<std::string>{},
125
                            std::vector<std::string>{});
126
    // Done while on the parent thread
127
    AddWorkerInspector(env, env_.get(), thread_id_, url_);
128
  }
129
130
  // The new isolate won't be bothered on this thread again.
131
  isolate_->DiscardThreadSpecificMetadata();
132
133
  wrap->Set(env->context(),
134
            env->thread_id_string(),
135
            Number::New(env->isolate(), static_cast<double>(thread_id_)))
136
      .FromJust();
137
138
  Debug(this,
139
        "Set up worker at %p with id %llu",
140
        static_cast<void*>(this),
141
        thread_id_);
142
}
143
144
bool Worker::is_stopped() const {
145
  Mutex::ScopedLock stopped_lock(stopped_mutex_);
146
  return stopped_;
147
}
148
149
void Worker::Run() {
150
  std::string name = "WorkerThread ";
151
  name += std::to_string(thread_id_);
152
  TRACE_EVENT_METADATA1(
153
      "__metadata", "thread_name", "name",
154
      TRACE_STR_COPY(name.c_str()));
155
  MultiIsolatePlatform* platform = isolate_data_->platform();
156
  CHECK_NE(platform, nullptr);
157
158
  Debug(this, "Starting worker with id %llu", thread_id_);
159
  {
160
    Locker locker(isolate_);
161
    Isolate::Scope isolate_scope(isolate_);
162
    SealHandleScope outer_seal(isolate_);
163
    bool inspector_started = false;
164
165
    {
166
      Context::Scope context_scope(env_->context());
167
      HandleScope handle_scope(isolate_);
168
169
      {
170
        HandleScope handle_scope(isolate_);
171
        Mutex::ScopedLock lock(mutex_);
172
        // Set up the message channel for receiving messages in the child.
173
        child_port_ = MessagePort::New(env_.get(),
174
                                       env_->context(),
175
                                       std::move(child_port_data_));
176
        // MessagePort::New() may return nullptr if execution is terminated
177
        // within it.
178
        if (child_port_ != nullptr)
179
          env_->set_message_port(child_port_->object(isolate_));
180
181
        Debug(this, "Created message port for worker %llu", thread_id_);
182
      }
183
184
      if (!is_stopped()) {
185
        StartWorkerInspector(env_.get(), url_);
186
        inspector_started = true;
187
188
        HandleScope handle_scope(isolate_);
189
        Environment::AsyncCallbackScope callback_scope(env_.get());
190
        env_->async_hooks()->push_async_ids(1, 0);
191
        if (!RunBootstrapping(env_.get()).IsEmpty()) {
192
          USE(StartExecution(env_.get(), "internal/main/worker_thread"));
193
        }
194
195
        env_->async_hooks()->pop_async_id(1);
196
197
        Debug(this, "Loaded environment for worker %llu", thread_id_);
198
      }
199
200
      {
201
        SealHandleScope seal(isolate_);
202
        bool more;
203
        env_->performance_state()->Mark(
204
            node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START);
205
        do {
206
          if (is_stopped()) break;
207
          uv_run(&loop_, UV_RUN_DEFAULT);
208
          if (is_stopped()) break;
209
210
          platform->DrainTasks(isolate_);
211
212
          more = uv_loop_alive(&loop_);
213
          if (more && !is_stopped())
214
            continue;
215
216
          EmitBeforeExit(env_.get());
217
218
          // Emit `beforeExit` if the loop became alive either after emitting
219
          // event, or after running some callbacks.
220
          more = uv_loop_alive(&loop_);
221
        } while (more == true);
222
        env_->performance_state()->Mark(
223
            node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_EXIT);
224
      }
225
    }
226
227
    {
228
      int exit_code;
229
      bool stopped = is_stopped();
230
      if (!stopped)
231
        exit_code = EmitExit(env_.get());
232
      Mutex::ScopedLock lock(mutex_);
233
      if (exit_code_ == 0 && !stopped)
234
        exit_code_ = exit_code;
235
236
      Debug(this, "Exiting thread for worker %llu with exit code %d",
237
            thread_id_, exit_code_);
238
    }
239
240
    env_->set_can_call_into_js(false);
241
    Isolate::DisallowJavascriptExecutionScope disallow_js(isolate_,
242
        Isolate::DisallowJavascriptExecutionScope::THROW_ON_FAILURE);
243
244
    // Grab the parent-to-child channel and render is unusable.
245
    MessagePort* child_port;
246
    {
247
      Mutex::ScopedLock lock(mutex_);
248
      child_port = child_port_;
249
      child_port_ = nullptr;
250
    }
251
252
    {
253
      Context::Scope context_scope(env_->context());
254
      child_port->Close();
255
      env_->stop_sub_worker_contexts();
256
      env_->RunCleanup();
257
      RunAtExit(env_.get());
258
      if (inspector_started)
259
        WaitForWorkerInspectorToStop(env_.get());
260
261
      {
262
        Mutex::ScopedLock stopped_lock(stopped_mutex_);
263
        stopped_ = true;
264
      }
265
266
      env_->RunCleanup();
267
268
      // This call needs to be made while the `Environment` is still alive
269
      // because we assume that it is available for async tracking in the
270
      // NodePlatform implementation.
271
      platform->DrainTasks(isolate_);
272
    }
273
274
    env_.reset();
275
  }
276
277
  DisposeIsolate();
278
279
  {
280
    Mutex::ScopedLock lock(mutex_);
281
    CHECK(thread_exit_async_);
282
    scheduled_on_thread_stopped_ = true;
283
    uv_async_send(thread_exit_async_.get());
284
  }
285
286
  Debug(this, "Worker %llu thread stops", thread_id_);
287
}
288
289
void Worker::DisposeIsolate() {
290
  if (env_) {
291
    CHECK_NOT_NULL(isolate_);
292
    Locker locker(isolate_);
293
    Isolate::Scope isolate_scope(isolate_);
294
    env_.reset();
295
  }
296
297
  if (isolate_ == nullptr)
298
    return;
299
300
  Debug(this, "Worker %llu dispose isolate", thread_id_);
301
  CHECK(isolate_data_);
302
  MultiIsolatePlatform* platform = isolate_data_->platform();
303
  platform->CancelPendingDelayedTasks(isolate_);
304
305
  isolate_data_.reset();
306
  platform->UnregisterIsolate(isolate_);
307
308
  isolate_->Dispose();
309
  isolate_ = nullptr;
310
}
311
312
void Worker::JoinThread() {
313
  if (thread_joined_)
314
    return;
315
  CHECK_EQ(uv_thread_join(&tid_), 0);
316
  thread_joined_ = true;
317
318
  env()->remove_sub_worker_context(this);
319
320
  if (thread_exit_async_) {
321
    env()->CloseHandle(thread_exit_async_.release(), [](uv_async_t* async) {
322
      delete async;
323
    });
324
325
    if (scheduled_on_thread_stopped_)
326
      OnThreadStopped();
327
  }
328
}
329
330
void Worker::OnThreadStopped() {
331
  {
332
    Mutex::ScopedLock lock(mutex_);
333
    scheduled_on_thread_stopped_ = false;
334
335
    Debug(this, "Worker %llu thread stopped", thread_id_);
336
337
    {
338
      Mutex::ScopedLock stopped_lock(stopped_mutex_);
339
      CHECK(stopped_);
340
    }
341
342
    CHECK_EQ(child_port_, nullptr);
343
    parent_port_ = nullptr;
344
  }
345
346
  JoinThread();
347
348
  {
349
    HandleScope handle_scope(env()->isolate());
350
    Context::Scope context_scope(env()->context());
351
352
    // Reset the parent port as we're closing it now anyway.
353
    object()->Set(env()->context(),
354
                  env()->message_port_string(),
355
                  Undefined(env()->isolate())).FromJust();
356
357
    Local<Value> code = Integer::New(env()->isolate(), exit_code_);
358
    MakeCallback(env()->onexit_string(), 1, &code);
359
  }
360
361
  // JoinThread() cleared all libuv handles bound to this Worker,
362
  // the C++ object is no longer needed for anything now.
363
  MakeWeak();
364
}
365
366
Worker::~Worker() {
367
  Mutex::ScopedLock lock(mutex_);
368
  JoinThread();
369
370
  CHECK(stopped_);
371
  CHECK(thread_joined_);
372
  CHECK_EQ(child_port_, nullptr);
373
374
  // This has most likely already happened within the worker thread -- this
375
  // is just in case Worker creation failed early.
376
  DisposeIsolate();
377
378
  // Need to run the loop one more time to close the platform's uv_async_t
379
  uv_run(&loop_, UV_RUN_ONCE);
380
381
  CheckedUvLoopClose(&loop_);
382
383
  Debug(this, "Worker %llu destroyed", thread_id_);
384
}
385
386
void Worker::New(const FunctionCallbackInfo<Value>& args) {
387
  Environment* env = Environment::GetCurrent(args);
388
389
  CHECK(args.IsConstructCall());
390
391
  if (env->isolate_data()->platform() == nullptr) {
392
    THROW_ERR_MISSING_PLATFORM_FOR_WORKER(env);
393
    return;
394
  }
395
396
  std::string url;
397
  std::shared_ptr<PerIsolateOptions> per_isolate_opts = nullptr;
398
399
  // Argument might be a string or URL
400
  if (args.Length() > 0 && !args[0]->IsNullOrUndefined()) {
401
    Utf8Value value(
402
        args.GetIsolate(),
403
        args[0]->ToString(env->context()).FromMaybe(v8::Local<v8::String>()));
404
    url.append(value.out(), value.length());
405
406
    if (args.Length() > 1 && args[1]->IsArray()) {
407
      v8::Local<v8::Array> array = args[1].As<v8::Array>();
408
      // The first argument is reserved for program name, but we don't need it
409
      // in workers.
410
      std::vector<std::string> exec_argv = {""};
411
      uint32_t length = array->Length();
412
      for (uint32_t i = 0; i < length; i++) {
413
        v8::Local<v8::Value> arg;
414
        if (!array->Get(env->context(), i).ToLocal(&arg)) {
415
          return;
416
        }
417
        v8::MaybeLocal<v8::String> arg_v8_string =
418
            arg->ToString(env->context());
419
        if (arg_v8_string.IsEmpty()) {
420
          return;
421
        }
422
        Utf8Value arg_utf8_value(
423
            args.GetIsolate(),
424
            arg_v8_string.FromMaybe(v8::Local<v8::String>()));
425
        std::string arg_string(arg_utf8_value.out(), arg_utf8_value.length());
426
        exec_argv.push_back(arg_string);
427
      }
428
429
      std::vector<std::string> invalid_args{};
430
      std::vector<std::string> errors{};
431
      per_isolate_opts.reset(new PerIsolateOptions());
432
433
      // Using invalid_args as the v8_args argument as it stores unknown
434
      // options for the per isolate parser.
435
      options_parser::PerIsolateOptionsParser::instance.Parse(
436
          &exec_argv,
437
          nullptr,
438
          &invalid_args,
439
          per_isolate_opts.get(),
440
          kDisallowedInEnvironment,
441
          &errors);
442
443
      // The first argument is program name.
444
      invalid_args.erase(invalid_args.begin());
445
      if (errors.size() > 0 || invalid_args.size() > 0) {
446
        v8::Local<v8::Value> value =
447
            ToV8Value(env->context(),
448
                      errors.size() > 0 ? errors : invalid_args)
449
                .ToLocalChecked();
450
        Local<String> key =
451
            FIXED_ONE_BYTE_STRING(env->isolate(), "invalidExecArgv");
452
        args.This()->Set(env->context(), key, value).FromJust();
453
        return;
454
      }
455
    }
456
  }
457
  new Worker(env, args.This(), url, per_isolate_opts);
458
}
459
460
void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
461
  Worker* w;
462
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
463
  Mutex::ScopedLock lock(w->mutex_);
464
465
  w->env()->add_sub_worker_context(w);
466
  w->stopped_ = false;
467
  w->thread_joined_ = false;
468
469
  w->thread_exit_async_.reset(new uv_async_t);
470
  w->thread_exit_async_->data = w;
471
  CHECK_EQ(uv_async_init(w->env()->event_loop(),
472
                         w->thread_exit_async_.get(),
473
                         [](uv_async_t* handle) {
474
    static_cast<Worker*>(handle->data)->OnThreadStopped();
475
  }), 0);
476
477
  CHECK_EQ(uv_thread_create(&w->tid_, [](void* arg) {
478
    static_cast<Worker*>(arg)->Run();
479
  }, static_cast<void*>(w)), 0);
480
}
481
482
void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
483
  Worker* w;
484
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
485
486
  Debug(w, "Worker %llu is getting stopped by parent", w->thread_id_);
487
  w->Exit(1);
488
  w->JoinThread();
489
}
490
491
void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
492
  Worker* w;
493
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
494
  if (w->thread_exit_async_)
495
    uv_ref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get()));
496
}
497
498
void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
499
  Worker* w;
500
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
501
  if (w->thread_exit_async_)
502
    uv_unref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get()));
503
}
504
505
void Worker::Exit(int code) {
506
  Mutex::ScopedLock lock(mutex_);
507
  Mutex::ScopedLock stopped_lock(stopped_mutex_);
508
509
  Debug(this, "Worker %llu called Exit(%d)", thread_id_, code);
510
511
  if (!stopped_) {
512
    CHECK_NE(env_, nullptr);
513
    stopped_ = true;
514
    exit_code_ = code;
515
    if (child_port_ != nullptr)
516
      child_port_->StopEventLoop();
517
    isolate_->TerminateExecution();
518
  }
519
}
520
521
namespace {
522
523
// Return the MessagePort that is global for this Environment and communicates
524
// with the internal [kPort] port of the JS Worker class in the parent thread.
525
void GetEnvMessagePort(const FunctionCallbackInfo<Value>& args) {
526
  Environment* env = Environment::GetCurrent(args);
527
  Local<Object> port = env->message_port();
528
  if (!port.IsEmpty()) {
529
    CHECK_EQ(port->CreationContext()->GetIsolate(), args.GetIsolate());
530
    args.GetReturnValue().Set(port);
531
  }
532
}
533
534
163
void InitWorker(Local<Object> target,
535
                Local<Value> unused,
536
                Local<Context> context,
537
                void* priv) {
538
163
  Environment* env = Environment::GetCurrent(context);
539
540
  {
541
163
    Local<FunctionTemplate> w = env->NewFunctionTemplate(Worker::New);
542
543
326
    w->InstanceTemplate()->SetInternalFieldCount(1);
544
326
    w->Inherit(AsyncWrap::GetConstructorTemplate(env));
545
546
163
    env->SetProtoMethod(w, "startThread", Worker::StartThread);
547
163
    env->SetProtoMethod(w, "stopThread", Worker::StopThread);
548
163
    env->SetProtoMethod(w, "ref", Worker::Ref);
549
163
    env->SetProtoMethod(w, "unref", Worker::Unref);
550
551
    Local<String> workerString =
552
163
        FIXED_ONE_BYTE_STRING(env->isolate(), "Worker");
553
163
    w->SetClassName(workerString);
554
    target->Set(env->context(),
555
                workerString,
556
815
                w->GetFunction(env->context()).ToLocalChecked()).FromJust();
557
  }
558
559
163
  env->SetMethod(target, "getEnvMessagePort", GetEnvMessagePort);
560
561
  target
562
      ->Set(env->context(),
563
            env->thread_id_string(),
564
815
            Number::New(env->isolate(), static_cast<double>(env->thread_id())))
565
326
      .FromJust();
566
567
  target
568
      ->Set(env->context(),
569
            FIXED_ONE_BYTE_STRING(env->isolate(), "isMainThread"),
570
815
            Boolean::New(env->isolate(), env->is_main_thread()))
571
326
      .FromJust();
572
163
}
573
574
}  // anonymous namespace
575
576
}  // namespace worker
577
}  // namespace node
578
579
164
NODE_MODULE_CONTEXT_AWARE_INTERNAL(worker, node::worker::InitWorker)