GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage/nodes/benchmark/out/../src/node_worker.cc Lines: 237 247 96.0 %
Date: 2019-01-07 12:15:22 Branches: 64 112 57.1 %

Line Branch Exec Source
1
#include "node_worker.h"
2
#include "debug_utils.h"
3
#include "node_errors.h"
4
#include "node_internals.h"
5
#include "node_buffer.h"
6
#include "node_perf.h"
7
#include "util.h"
8
#include "util-inl.h"
9
#include "async_wrap.h"
10
#include "async_wrap-inl.h"
11
12
#include <string>
13
14
using v8::ArrayBuffer;
15
using v8::Context;
16
using v8::Function;
17
using v8::FunctionCallbackInfo;
18
using v8::FunctionTemplate;
19
using v8::HandleScope;
20
using v8::Integer;
21
using v8::Isolate;
22
using v8::Local;
23
using v8::Locker;
24
using v8::Number;
25
using v8::Object;
26
using v8::SealHandleScope;
27
using v8::String;
28
using v8::Value;
29
30
namespace node {
31
namespace worker {
32
33
namespace {
34
35
uint64_t next_thread_id = 1;
36
3597
Mutex next_thread_id_mutex;
37
38
#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR
39
118
void StartWorkerInspector(Environment* child, const std::string& url) {
40
  child->inspector_agent()->Start(url,
41
236
                                  child->options()->debug_options(),
42
                                  child->inspector_host_port(),
43
354
                                  false);
44
118
}
45
46
119
void AddWorkerInspector(Environment* parent,
47
                        Environment* child,
48
                        int id,
49
                        const std::string& url) {
50
  parent->inspector_agent()->AddWorkerInspector(id, url,
51
119
                                                child->inspector_agent());
52
119
}
53
54
118
void WaitForWorkerInspectorToStop(Environment* child) {
55
118
  child->inspector_agent()->WaitForDisconnect();
56
118
  child->inspector_agent()->Stop();
57
118
}
58
59
#else
60
// No-ops
61
void StartWorkerInspector(Environment* child, const std::string& url) {}
62
void AddWorkerInspector(Environment* parent,
63
                        Environment* child,
64
                        int id,
65
                        const std::string& url) {}
66
void WaitForWorkerInspectorToStop(Environment* child) {}
67
#endif
68
69
}  // anonymous namespace
70
71
119
Worker::Worker(Environment* env, Local<Object> wrap, const std::string& url)
72
119
    : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER), url_(url) {
73
  // Generate a new thread id.
74
  {
75
119
    Mutex::ScopedLock next_thread_id_lock(next_thread_id_mutex);
76
119
    thread_id_ = next_thread_id++;
77
  }
78
79
119
  Debug(this, "Creating worker with id %llu", thread_id_);
80
  wrap->Set(env->context(),
81
            env->thread_id_string(),
82
            Number::New(env->isolate(),
83
595
                        static_cast<double>(thread_id_))).FromJust();
84
85
  // Set up everything that needs to be set up in the parent environment.
86
119
  parent_port_ = MessagePort::New(env, env->context());
87
119
  if (parent_port_ == nullptr) {
88
    // This can happen e.g. because execution is terminating.
89
119
    return;
90
  }
91
92
119
  child_port_data_.reset(new MessagePortData(nullptr));
93
119
  MessagePort::Entangle(parent_port_, child_port_data_.get());
94
95
119
  object()->Set(env->context(),
96
                env->message_port_string(),
97
714
                parent_port_->object()).FromJust();
98
99
119
  array_buffer_allocator_.reset(CreateArrayBufferAllocator());
100
101
119
  CHECK_EQ(uv_loop_init(&loop_), 0);
102
119
  isolate_ = NewIsolate(array_buffer_allocator_.get(), &loop_);
103
119
  CHECK_NE(isolate_, nullptr);
104
105
  {
106
    // Enter an environment capable of executing code in the child Isolate
107
    // (and only in it).
108
119
    Locker locker(isolate_);
109
238
    Isolate::Scope isolate_scope(isolate_);
110
238
    HandleScope handle_scope(isolate_);
111
112
    isolate_data_.reset(CreateIsolateData(isolate_,
113
                                          &loop_,
114
                                          env->isolate_data()->platform(),
115
119
                                          array_buffer_allocator_.get()));
116
119
    CHECK(isolate_data_);
117
118
119
    Local<Context> context = NewContext(isolate_);
119
    Context::Scope context_scope(context);
120
121
    // TODO(addaleax): Use CreateEnvironment(), or generally another public API.
122
119
    env_.reset(new Environment(isolate_data_.get(), context));
123
119
    CHECK_NE(env_, nullptr);
124
119
    env_->set_abort_on_uncaught_exception(false);
125
119
    env_->set_worker_context(this);
126
119
    env_->set_thread_id(thread_id_);
127
128
    env_->Start(std::vector<std::string>{},
129
                std::vector<std::string>{},
130
119
                env->profiler_idle_notifier_started());
131
    // Done while on the parent thread
132
238
    AddWorkerInspector(env, env_.get(), thread_id_, url_);
133
  }
134
135
  // The new isolate won't be bothered on this thread again.
136
119
  isolate_->DiscardThreadSpecificMetadata();
137
138
119
  Debug(this, "Set up worker with id %llu", thread_id_);
139
}
140
141
22196
bool Worker::is_stopped() const {
142
22196
  Mutex::ScopedLock stopped_lock(stopped_mutex_);
143
22197
  return stopped_;
144
}
145
146
118
void Worker::Run() {
147
118
  std::string name = "WorkerThread ";
148
118
  name += std::to_string(thread_id_);
149

119
  TRACE_EVENT_METADATA1(
150
      "__metadata", "thread_name", "name",
151
      TRACE_STR_COPY(name.c_str()));
152
118
  MultiIsolatePlatform* platform = isolate_data_->platform();
153
118
  CHECK_NE(platform, nullptr);
154
155
118
  Debug(this, "Starting worker with id %llu", thread_id_);
156
  {
157
118
    Locker locker(isolate_);
158
236
    Isolate::Scope isolate_scope(isolate_);
159
236
    SealHandleScope outer_seal(isolate_);
160
118
    bool inspector_started = false;
161
162
    {
163
118
      Context::Scope context_scope(env_->context());
164
236
      HandleScope handle_scope(isolate_);
165
166
      {
167
118
        HandleScope handle_scope(isolate_);
168
236
        Mutex::ScopedLock lock(mutex_);
169
        // Set up the message channel for receiving messages in the child.
170
        child_port_ = MessagePort::New(env_.get(),
171
                                       env_->context(),
172
118
                                       std::move(child_port_data_));
173
        // MessagePort::New() may return nullptr if execution is terminated
174
        // within it.
175
118
        if (child_port_ != nullptr)
176
118
          env_->set_message_port(child_port_->object(isolate_));
177
178
236
        Debug(this, "Created message port for worker %llu", thread_id_);
179
      }
180
181
118
      if (!is_stopped()) {
182
118
        StartWorkerInspector(env_.get(), url_);
183
118
        inspector_started = true;
184
185
118
        HandleScope handle_scope(isolate_);
186
236
        Environment::AsyncCallbackScope callback_scope(env_.get());
187
118
        env_->async_hooks()->push_async_ids(1, 0);
188
        // This loads the Node bootstrapping code.
189
118
        LoadEnvironment(env_.get());
190
118
        env_->async_hooks()->pop_async_id(1);
191
192
236
        Debug(this, "Loaded environment for worker %llu", thread_id_);
193
      }
194
195
      {
196
118
        SealHandleScope seal(isolate_);
197
        bool more;
198
        env_->performance_state()->Mark(
199
118
            node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START);
200
91
        do {
201
118
          if (is_stopped()) break;
202
108
          uv_run(&loop_, UV_RUN_DEFAULT);
203
108
          if (is_stopped()) break;
204
205
91
          platform->DrainTasks(isolate_);
206
207
91
          more = uv_loop_alive(&loop_);
208

91
          if (more && !is_stopped())
209
            continue;
210
211
91
          EmitBeforeExit(env_.get());
212
213
          // Emit `beforeExit` if the loop became alive either after emitting
214
          // event, or after running some callbacks.
215
91
          more = uv_loop_alive(&loop_);
216
        } while (more == true);
217
        env_->performance_state()->Mark(
218
118
            node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_EXIT);
219
      }
220
    }
221
222
    {
223
      int exit_code;
224
118
      bool stopped = is_stopped();
225
118
      if (!stopped)
226
91
        exit_code = EmitExit(env_.get());
227
118
      Mutex::ScopedLock lock(mutex_);
228

118
      if (exit_code_ == 0 && !stopped)
229
91
        exit_code_ = exit_code;
230
231
      Debug(this, "Exiting thread for worker %llu with exit code %d",
232
236
            thread_id_, exit_code_);
233
    }
234
235
118
    env_->set_can_call_into_js(false);
236
    Isolate::DisallowJavascriptExecutionScope disallow_js(isolate_,
237
236
        Isolate::DisallowJavascriptExecutionScope::THROW_ON_FAILURE);
238
239
    // Grab the parent-to-child channel and render is unusable.
240
    MessagePort* child_port;
241
    {
242
118
      Mutex::ScopedLock lock(mutex_);
243
118
      child_port = child_port_;
244
118
      child_port_ = nullptr;
245
    }
246
247
    {
248
118
      Context::Scope context_scope(env_->context());
249
236
      child_port->Close();
250
118
      env_->stop_sub_worker_contexts();
251
118
      env_->RunCleanup();
252
118
      RunAtExit(env_.get());
253
118
      if (inspector_started)
254
118
        WaitForWorkerInspectorToStop(env_.get());
255
256
      {
257
118
        Mutex::ScopedLock stopped_lock(stopped_mutex_);
258
118
        stopped_ = true;
259
      }
260
261
118
      env_->RunCleanup();
262
263
      // This call needs to be made while the `Environment` is still alive
264
      // because we assume that it is available for async tracking in the
265
      // NodePlatform implementation.
266
118
      platform->DrainTasks(isolate_);
267
    }
268
269
236
    env_.reset();
270
  }
271
272
118
  DisposeIsolate();
273
274
  {
275
118
    Mutex::ScopedLock lock(mutex_);
276
118
    CHECK(thread_exit_async_);
277
118
    scheduled_on_thread_stopped_ = true;
278
118
    uv_async_send(thread_exit_async_.get());
279
  }
280
281
236
  Debug(this, "Worker %llu thread stops", thread_id_);
282
118
}
283
284
237
void Worker::DisposeIsolate() {
285
237
  if (env_) {
286
1
    CHECK_NOT_NULL(isolate_);
287
1
    Locker locker(isolate_);
288
2
    Isolate::Scope isolate_scope(isolate_);
289
2
    env_.reset();
290
  }
291
292
237
  if (isolate_ == nullptr)
293
355
    return;
294
295
119
  Debug(this, "Worker %llu dispose isolate", thread_id_);
296
119
  CHECK(isolate_data_);
297
119
  MultiIsolatePlatform* platform = isolate_data_->platform();
298
119
  platform->CancelPendingDelayedTasks(isolate_);
299
300
119
  isolate_data_.reset();
301
119
  platform->UnregisterIsolate(isolate_);
302
303
119
  isolate_->Dispose();
304
119
  isolate_ = nullptr;
305
}
306
307
254
void Worker::JoinThread() {
308
254
  if (thread_joined_)
309
390
    return;
310
118
  CHECK_EQ(uv_thread_join(&tid_), 0);
311
118
  thread_joined_ = true;
312
313
118
  env()->remove_sub_worker_context(this);
314
315
118
  if (thread_exit_async_) {
316
118
    env()->CloseHandle(thread_exit_async_.release(), [](uv_async_t* async) {
317
118
      delete async;
318
236
    });
319
320
118
    if (scheduled_on_thread_stopped_)
321
17
      OnThreadStopped();
322
  }
323
}
324
325
118
void Worker::OnThreadStopped() {
326
  {
327
118
    Mutex::ScopedLock lock(mutex_);
328
118
    scheduled_on_thread_stopped_ = false;
329
330
118
    Debug(this, "Worker %llu thread stopped", thread_id_);
331
332
    {
333
118
      Mutex::ScopedLock stopped_lock(stopped_mutex_);
334
118
      CHECK(stopped_);
335
    }
336
337
118
    CHECK_EQ(child_port_, nullptr);
338
118
    parent_port_ = nullptr;
339
  }
340
341
118
  JoinThread();
342
343
  {
344
118
    HandleScope handle_scope(env()->isolate());
345
118
    Context::Scope context_scope(env()->context());
346
347
    // Reset the parent port as we're closing it now anyway.
348
118
    object()->Set(env()->context(),
349
                  env()->message_port_string(),
350
708
                  Undefined(env()->isolate())).FromJust();
351
352
118
    Local<Value> code = Integer::New(env()->isolate(), exit_code_);
353
236
    MakeCallback(env()->onexit_string(), 1, &code);
354
  }
355
356
  // JoinThread() cleared all libuv handles bound to this Worker,
357
  // the C++ object is no longer needed for anything now.
358
118
  MakeWeak();
359
118
}
360
361
357
Worker::~Worker() {
362
119
  Mutex::ScopedLock lock(mutex_);
363
119
  JoinThread();
364
365
119
  CHECK(stopped_);
366
119
  CHECK(thread_joined_);
367
119
  CHECK_EQ(child_port_, nullptr);
368
369
  // This has most likely already happened within the worker thread -- this
370
  // is just in case Worker creation failed early.
371
119
  DisposeIsolate();
372
373
  // Need to run the loop one more time to close the platform's uv_async_t
374
119
  uv_run(&loop_, UV_RUN_ONCE);
375
376
119
  CheckedUvLoopClose(&loop_);
377
378
238
  Debug(this, "Worker %llu destroyed", thread_id_);
379
238
}
380
381
119
void Worker::New(const FunctionCallbackInfo<Value>& args) {
382
119
  Environment* env = Environment::GetCurrent(args);
383
384
119
  CHECK(args.IsConstructCall());
385
386
119
  if (env->isolate_data()->platform() == nullptr) {
387
    THROW_ERR_MISSING_PLATFORM_FOR_WORKER(env);
388
119
    return;
389
  }
390
391
119
  std::string url;
392
  // Argument might be a string or URL
393


595
  if (args.Length() == 1 && !args[0]->IsNullOrUndefined()) {
394
    Utf8Value value(
395
        args.GetIsolate(),
396
132
        args[0]->ToString(env->context()).FromMaybe(v8::Local<v8::String>()));
397
33
    url.append(value.out(), value.length());
398
  }
399
119
  new Worker(env, args.This(), url);
400
}
401
402
118
void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
403
  Worker* w;
404
236
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
405
118
  Mutex::ScopedLock lock(w->mutex_);
406
407
118
  w->env()->add_sub_worker_context(w);
408
118
  w->stopped_ = false;
409
118
  w->thread_joined_ = false;
410
411
118
  w->thread_exit_async_.reset(new uv_async_t);
412
118
  w->thread_exit_async_->data = w;
413
438
  CHECK_EQ(uv_async_init(w->env()->event_loop(),
414
                         w->thread_exit_async_.get(),
415
                         [](uv_async_t* handle) {
416
    static_cast<Worker*>(handle->data)->OnThreadStopped();
417
  }), 0);
418
419
472
  CHECK_EQ(uv_thread_create(&w->tid_, [](void* arg) {
420
    static_cast<Worker*>(arg)->Run();
421
118
  }, static_cast<void*>(w)), 0);
422
}
423
424
17
void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
425
  Worker* w;
426
34
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
427
428
17
  Debug(w, "Worker %llu is getting stopped by parent", w->thread_id_);
429
17
  w->Exit(1);
430
17
  w->JoinThread();
431
}
432
433
void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
434
  Worker* w;
435
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
436
  if (w->thread_exit_async_)
437
    uv_ref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get()));
438
}
439
440
void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
441
  Worker* w;
442
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
443
  if (w->thread_exit_async_)
444
    uv_unref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get()));
445
}
446
447
27
void Worker::Exit(int code) {
448
27
  Mutex::ScopedLock lock(mutex_);
449
54
  Mutex::ScopedLock stopped_lock(stopped_mutex_);
450
451
27
  Debug(this, "Worker %llu called Exit(%d)", thread_id_, code);
452
453
27
  if (!stopped_) {
454
27
    CHECK_NE(env_, nullptr);
455
27
    stopped_ = true;
456
27
    exit_code_ = code;
457
27
    if (child_port_ != nullptr)
458
27
      child_port_->StopEventLoop();
459
27
    isolate_->TerminateExecution();
460
27
  }
461
27
}
462
463
namespace {
464
465
// Return the MessagePort that is global for this Environment and communicates
466
// with the internal [kPort] port of the JS Worker class in the parent thread.
467
216
void GetEnvMessagePort(const FunctionCallbackInfo<Value>& args) {
468
216
  Environment* env = Environment::GetCurrent(args);
469
216
  Local<Object> port = env->message_port();
470
216
  if (!port.IsEmpty()) {
471
648
    CHECK_EQ(port->CreationContext()->GetIsolate(), args.GetIsolate());
472
432
    args.GetReturnValue().Set(port);
473
  }
474
216
}
475
476
403
void InitWorker(Local<Object> target,
477
                Local<Value> unused,
478
                Local<Context> context,
479
                void* priv) {
480
403
  Environment* env = Environment::GetCurrent(context);
481
482
  {
483
403
    Local<FunctionTemplate> w = env->NewFunctionTemplate(Worker::New);
484
485
806
    w->InstanceTemplate()->SetInternalFieldCount(1);
486
806
    w->Inherit(AsyncWrap::GetConstructorTemplate(env));
487
488
403
    env->SetProtoMethod(w, "startThread", Worker::StartThread);
489
403
    env->SetProtoMethod(w, "stopThread", Worker::StopThread);
490
403
    env->SetProtoMethod(w, "ref", Worker::Ref);
491
403
    env->SetProtoMethod(w, "unref", Worker::Unref);
492
493
    Local<String> workerString =
494
403
        FIXED_ONE_BYTE_STRING(env->isolate(), "Worker");
495
403
    w->SetClassName(workerString);
496
    target->Set(env->context(),
497
                workerString,
498
2015
                w->GetFunction(env->context()).ToLocalChecked()).FromJust();
499
  }
500
501
403
  env->SetMethod(target, "getEnvMessagePort", GetEnvMessagePort);
502
503
403
  auto thread_id_string = FIXED_ONE_BYTE_STRING(env->isolate(), "threadId");
504
  target->Set(env->context(),
505
              thread_id_string,
506
              Number::New(env->isolate(),
507
1612
                          static_cast<double>(env->thread_id()))).FromJust();
508
403
}
509
510
}  // anonymous namespace
511
512
}  // namespace worker
513
}  // namespace node
514
515

14387
NODE_MODULE_CONTEXT_AWARE_INTERNAL(worker, node::worker::InitWorker)