GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/node_platform.cc Lines: 317 328 96.6 %
Date: 2019-02-23 22:23:05 Branches: 76 102 74.5 %

Line Branch Exec Source
1
#include "node_platform.h"
2
#include "node_internals.h"
3
4
#include "env-inl.h"
5
#include "debug_utils.h"
6
#include "util.h"
7
#include <algorithm>
8
9
namespace node {
10
11
using v8::HandleScope;
12
using v8::Isolate;
13
using v8::Local;
14
using v8::Object;
15
using v8::Platform;
16
using v8::Task;
17
using node::tracing::TracingController;
18
19
namespace {
20
21
struct PlatformWorkerData {
22
  TaskQueue<Task>* task_queue;
23
  Mutex* platform_workers_mutex;
24
  ConditionVariable* platform_workers_ready;
25
  int* pending_platform_workers;
26
  int id;
27
};
28
29
16828
static void PlatformWorkerThread(void* data) {
30
  std::unique_ptr<PlatformWorkerData>
31
16828
      worker_data(static_cast<PlatformWorkerData*>(data));
32
33
16925
  TaskQueue<Task>* pending_worker_tasks = worker_data->task_queue;
34

33893
  TRACE_EVENT_METADATA1("__metadata", "thread_name", "name",
35
                        "PlatformWorkerThread");
36
37
  // Notify the main thread that the platform worker is ready.
38
  {
39
16947
    Mutex::ScopedLock lock(*worker_data->platform_workers_mutex);
40
16978
    (*worker_data->pending_platform_workers)--;
41
16978
    worker_data->platform_workers_ready->Signal(lock);
42
  }
43
44
151228
  while (std::unique_ptr<Task> task = pending_worker_tasks->BlockingPop()) {
45
134114
    task->Run();
46
134158
    pending_worker_tasks->NotifyOfCompletion();
47
134257
  }
48
16958
}
49
50
}  // namespace
51
52
4238
class WorkerThreadsTaskRunner::DelayedTaskScheduler {
53
 public:
54
4243
  explicit DelayedTaskScheduler(TaskQueue<Task>* tasks)
55
4243
    : pending_worker_tasks_(tasks) {}
56
57
4243
  std::unique_ptr<uv_thread_t> Start() {
58
12729
    auto start_thread = [](void* data) {
59
4243
      static_cast<DelayedTaskScheduler*>(data)->Run();
60
12724
    };
61
4243
    std::unique_ptr<uv_thread_t> t { new uv_thread_t() };
62
4243
    uv_sem_init(&ready_, 0);
63
4243
    CHECK_EQ(0, uv_thread_create(t.get(), start_thread, this));
64
4243
    uv_sem_wait(&ready_);
65
4243
    uv_sem_destroy(&ready_);
66
4243
    return t;
67
  }
68
69
1
  void PostDelayedTask(std::unique_ptr<Task> task, double delay_in_seconds) {
70
1
    tasks_.Push(std::unique_ptr<Task>(new ScheduleTask(this, std::move(task),
71
1
                                                       delay_in_seconds)));
72
1
    uv_async_send(&flush_tasks_);
73
1
  }
74
75
4238
  void Stop() {
76
4238
    tasks_.Push(std::unique_ptr<Task>(new StopTask(this)));
77
4238
    uv_async_send(&flush_tasks_);
78
4238
  }
79
80
 private:
81
4243
  void Run() {
82

8486
    TRACE_EVENT_METADATA1("__metadata", "thread_name", "name",
83
                          "WorkerThreadsTaskRunner::DelayedTaskScheduler");
84
4243
    loop_.data = this;
85
4243
    CHECK_EQ(0, uv_loop_init(&loop_));
86
4243
    flush_tasks_.data = this;
87
4243
    CHECK_EQ(0, uv_async_init(&loop_, &flush_tasks_, FlushTasks));
88
4243
    uv_sem_post(&ready_);
89
90
4243
    uv_run(&loop_, UV_RUN_DEFAULT);
91
4238
    CheckedUvLoopClose(&loop_);
92
4238
  }
93
94
4239
  static void FlushTasks(uv_async_t* flush_tasks) {
95
    DelayedTaskScheduler* scheduler =
96
4239
        ContainerOf(&DelayedTaskScheduler::loop_, flush_tasks->loop);
97
8478
    while (std::unique_ptr<Task> task = scheduler->tasks_.Pop())
98
4239
      task->Run();
99
4239
  }
100
101
8476
  class StopTask : public Task {
102
   public:
103
4238
    explicit StopTask(DelayedTaskScheduler* scheduler): scheduler_(scheduler) {}
104
105
4238
    void Run() override {
106
4238
      std::vector<uv_timer_t*> timers;
107
4238
      for (uv_timer_t* timer : scheduler_->timers_)
108
        timers.push_back(timer);
109
4238
      for (uv_timer_t* timer : timers)
110
        scheduler_->TakeTimerTask(timer);
111
      uv_close(reinterpret_cast<uv_handle_t*>(&scheduler_->flush_tasks_),
112
16952
               [](uv_handle_t* handle) {});
113
4238
    }
114
115
   private:
116
     DelayedTaskScheduler* scheduler_;
117
  };
118
119
2
  class ScheduleTask : public Task {
120
   public:
121
1
    ScheduleTask(DelayedTaskScheduler* scheduler,
122
                 std::unique_ptr<Task> task,
123
                 double delay_in_seconds)
124
      : scheduler_(scheduler),
125
1
        task_(std::move(task)),
126
2
        delay_in_seconds_(delay_in_seconds) {}
127
128
1
    void Run() override {
129
      uint64_t delay_millis =
130
1
          static_cast<uint64_t>(delay_in_seconds_ + 0.5) * 1000;
131
1
      std::unique_ptr<uv_timer_t> timer(new uv_timer_t());
132
1
      CHECK_EQ(0, uv_timer_init(&scheduler_->loop_, timer.get()));
133
1
      timer->data = task_.release();
134
1
      CHECK_EQ(0, uv_timer_start(timer.get(), RunTask, delay_millis, 0));
135
1
      scheduler_->timers_.insert(timer.release());
136
1
    }
137
138
   private:
139
    DelayedTaskScheduler* scheduler_;
140
    std::unique_ptr<Task> task_;
141
    double delay_in_seconds_;
142
  };
143
144
1
  static void RunTask(uv_timer_t* timer) {
145
    DelayedTaskScheduler* scheduler =
146
1
        ContainerOf(&DelayedTaskScheduler::loop_, timer->loop);
147
1
    scheduler->pending_worker_tasks_->Push(scheduler->TakeTimerTask(timer));
148
1
  }
149
150
1
  std::unique_ptr<Task> TakeTimerTask(uv_timer_t* timer) {
151
1
    std::unique_ptr<Task> task(static_cast<Task*>(timer->data));
152
1
    uv_timer_stop(timer);
153
3
    uv_close(reinterpret_cast<uv_handle_t*>(timer), [](uv_handle_t* handle) {
154
1
      delete reinterpret_cast<uv_timer_t*>(handle);
155
4
    });
156
1
    timers_.erase(timer);
157
1
    return task;
158
  }
159
160
  uv_sem_t ready_;
161
  TaskQueue<v8::Task>* pending_worker_tasks_;
162
163
  TaskQueue<v8::Task> tasks_;
164
  uv_loop_t loop_;
165
  uv_async_t flush_tasks_;
166
  std::unordered_set<uv_timer_t*> timers_;
167
};
168
169
4243
WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int thread_pool_size) {
170
4243
  Mutex platform_workers_mutex;
171
8486
  ConditionVariable platform_workers_ready;
172
173
8486
  Mutex::ScopedLock lock(platform_workers_mutex);
174
4243
  int pending_platform_workers = thread_pool_size;
175
176
  delayed_task_scheduler_.reset(
177
4243
      new DelayedTaskScheduler(&pending_worker_tasks_));
178
4243
  threads_.push_back(delayed_task_scheduler_->Start());
179
180
21221
  for (int i = 0; i < thread_pool_size; i++) {
181
    PlatformWorkerData* worker_data = new PlatformWorkerData{
182
      &pending_worker_tasks_, &platform_workers_mutex,
183
      &platform_workers_ready, &pending_platform_workers, i
184
16978
    };
185
16978
    std::unique_ptr<uv_thread_t> t { new uv_thread_t() };
186
16978
    if (uv_thread_create(t.get(), PlatformWorkerThread,
187
16978
                         worker_data) != 0) {
188
      break;
189
    }
190
16978
    threads_.push_back(std::move(t));
191
16978
  }
192
193
  // Wait for platform workers to initialize before continuing with the
194
  // bootstrap.
195
22407
  while (pending_platform_workers > 0) {
196
13921
    platform_workers_ready.Wait(lock);
197
4243
  }
198
4243
}
199
200
134262
void WorkerThreadsTaskRunner::PostTask(std::unique_ptr<Task> task) {
201
134262
  pending_worker_tasks_.Push(std::move(task));
202
134262
}
203
204
1
void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr<v8::Task> task,
205
                                              double delay_in_seconds) {
206
1
  delayed_task_scheduler_->PostDelayedTask(std::move(task), delay_in_seconds);
207
1
}
208
209
14015
void WorkerThreadsTaskRunner::BlockingDrain() {
210
14015
  pending_worker_tasks_.BlockingDrain();
211
14015
}
212
213
4238
void WorkerThreadsTaskRunner::Shutdown() {
214
4238
  pending_worker_tasks_.Stop();
215
4238
  delayed_task_scheduler_->Stop();
216
25434
  for (size_t i = 0; i < threads_.size(); i++) {
217
21196
    CHECK_EQ(0, uv_thread_join(threads_[i].get()));
218
  }
219
4238
}
220
221
4640
int WorkerThreadsTaskRunner::NumberOfWorkerThreads() const {
222
4640
  return threads_.size();
223
}
224
225
4412
PerIsolatePlatformData::PerIsolatePlatformData(
226
    v8::Isolate* isolate, uv_loop_t* loop)
227
4412
  : loop_(loop) {
228
4412
  flush_tasks_ = new uv_async_t();
229
4412
  CHECK_EQ(0, uv_async_init(loop, flush_tasks_, FlushTasks));
230
4412
  flush_tasks_->data = static_cast<void*>(this);
231
4412
  uv_unref(reinterpret_cast<uv_handle_t*>(flush_tasks_));
232
4412
}
233
234
5331
void PerIsolatePlatformData::FlushTasks(uv_async_t* handle) {
235
5331
  auto platform_data = static_cast<PerIsolatePlatformData*>(handle->data);
236
5331
  platform_data->FlushForegroundTasksInternal();
237
5330
}
238
239
void PerIsolatePlatformData::PostIdleTask(std::unique_ptr<v8::IdleTask> task) {
240
  UNREACHABLE();
241
}
242
243
10773
void PerIsolatePlatformData::PostTask(std::unique_ptr<Task> task) {
244
10773
  CHECK_NOT_NULL(flush_tasks_);
245
10773
  foreground_tasks_.Push(std::move(task));
246
10773
  uv_async_send(flush_tasks_);
247
10773
}
248
249
137
void PerIsolatePlatformData::PostDelayedTask(
250
    std::unique_ptr<Task> task, double delay_in_seconds) {
251
137
  CHECK_NOT_NULL(flush_tasks_);
252
137
  std::unique_ptr<DelayedTask> delayed(new DelayedTask());
253
137
  delayed->task = std::move(task);
254
137
  delayed->platform_data = shared_from_this();
255
137
  delayed->timeout = delay_in_seconds;
256
137
  foreground_delayed_tasks_.Push(std::move(delayed));
257
137
  uv_async_send(flush_tasks_);
258
137
}
259
260
7824
PerIsolatePlatformData::~PerIsolatePlatformData() {
261
3912
  Shutdown();
262
3912
}
263
264
7952
void PerIsolatePlatformData::Shutdown() {
265
7952
  if (flush_tasks_ == nullptr)
266
11864
    return;
267
268
4040
  CHECK_NULL(foreground_delayed_tasks_.Pop());
269
4040
  CHECK_NULL(foreground_tasks_.Pop());
270
4040
  CancelPendingDelayedTasks();
271
272
  uv_close(reinterpret_cast<uv_handle_t*>(flush_tasks_),
273
4378
           [](uv_handle_t* handle) {
274
169
    delete reinterpret_cast<uv_async_t*>(handle);
275
8418
  });
276
4040
  flush_tasks_ = nullptr;
277
}
278
279
4409
void PerIsolatePlatformData::ref() {
280
4409
  ref_count_++;
281
4409
}
282
283
8080
int PerIsolatePlatformData::unref() {
284
8080
  return --ref_count_;
285
}
286
287
4243
NodePlatform::NodePlatform(int thread_pool_size,
288
4243
                           TracingController* tracing_controller) {
289
4243
  if (tracing_controller) {
290
4243
    tracing_controller_ = tracing_controller;
291
  } else {
292
    tracing_controller_ = new TracingController();
293
  }
294
8486
  worker_thread_task_runner_ =
295
4243
      std::make_shared<WorkerThreadsTaskRunner>(thread_pool_size);
296
4243
}
297
298
8821
void NodePlatform::RegisterIsolate(Isolate* isolate, uv_loop_t* loop) {
299
8821
  Mutex::ScopedLock lock(per_isolate_mutex_);
300
17642
  std::shared_ptr<PerIsolatePlatformData> existing = per_isolate_[isolate];
301
8821
  if (existing) {
302
4409
    CHECK_EQ(loop, existing->event_loop());
303
4409
    existing->ref();
304
  } else {
305
8824
    per_isolate_[isolate] =
306
4412
        std::make_shared<PerIsolatePlatformData>(isolate, loop);
307
8821
  }
308
8821
}
309
310
8080
void NodePlatform::UnregisterIsolate(Isolate* isolate) {
311
8080
  Mutex::ScopedLock lock(per_isolate_mutex_);
312
16160
  std::shared_ptr<PerIsolatePlatformData> existing = per_isolate_[isolate];
313
8080
  CHECK(existing);
314
8080
  if (existing->unref() == 0) {
315
4040
    existing->Shutdown();
316
4040
    per_isolate_.erase(isolate);
317
8080
  }
318
8080
}
319
320
4238
void NodePlatform::Shutdown() {
321
4238
  worker_thread_task_runner_->Shutdown();
322
323
  {
324
4238
    Mutex::ScopedLock lock(per_isolate_mutex_);
325
4238
    per_isolate_.clear();
326
  }
327
4238
}
328
329
4640
int NodePlatform::NumberOfWorkerThreads() {
330
4640
  return worker_thread_task_runner_->NumberOfWorkerThreads();
331
}
332
333
10508
void PerIsolatePlatformData::RunForegroundTask(std::unique_ptr<Task> task) {
334
10508
  Isolate* isolate = Isolate::GetCurrent();
335
10512
  HandleScope scope(isolate);
336
10515
  Environment* env = Environment::GetCurrent(isolate);
337
  InternalCallbackScope cb_scope(env, Local<Object>(), { 0, 0 },
338
31548
                                 InternalCallbackScope::kAllowEmptyResource);
339
21029
  task->Run();
340
10513
}
341
342
3
void PerIsolatePlatformData::DeleteFromScheduledTasks(DelayedTask* task) {
343
  auto it = std::find_if(scheduled_delayed_tasks_.begin(),
344
                         scheduled_delayed_tasks_.end(),
345
3
                         [task](const DelayedTaskPointer& delayed) -> bool {
346
3
          return delayed.get() == task;
347
3
      });
348
3
  CHECK_NE(it, scheduled_delayed_tasks_.end());
349
3
  scheduled_delayed_tasks_.erase(it);
350
3
}
351
352
3
void PerIsolatePlatformData::RunForegroundTask(uv_timer_t* handle) {
353
3
  DelayedTask* delayed = static_cast<DelayedTask*>(handle->data);
354
3
  RunForegroundTask(std::move(delayed->task));
355
3
  delayed->platform_data->DeleteFromScheduledTasks(delayed);
356
3
}
357
358
8079
void PerIsolatePlatformData::CancelPendingDelayedTasks() {
359
8079
  scheduled_delayed_tasks_.clear();
360
8079
}
361
362
8062
void NodePlatform::DrainTasks(Isolate* isolate) {
363
8062
  std::shared_ptr<PerIsolatePlatformData> per_isolate = ForIsolate(isolate);
364
365
14015
  do {
366
    // Worker tasks aren't associated with an Isolate.
367
14015
    worker_thread_task_runner_->BlockingDrain();
368
22075
  } while (per_isolate->FlushForegroundTasksInternal());
369
8060
}
370
371
173424
bool PerIsolatePlatformData::FlushForegroundTasksInternal() {
372
173424
  bool did_work = false;
373
374
  while (std::unique_ptr<DelayedTask> delayed =
375
173560
      foreground_delayed_tasks_.Pop()) {
376
136
    did_work = true;
377
    uint64_t delay_millis =
378
136
        static_cast<uint64_t>(delayed->timeout + 0.5) * 1000;
379
136
    delayed->timer.data = static_cast<void*>(delayed.get());
380
136
    uv_timer_init(loop_, &delayed->timer);
381
    // Timers may not guarantee queue ordering of events with the same delay if
382
    // the delay is non-zero. This should not be a problem in practice.
383
136
    uv_timer_start(&delayed->timer, RunForegroundTask, delay_millis, 0);
384
136
    uv_unref(reinterpret_cast<uv_handle_t*>(&delayed->timer));
385
386
136
    scheduled_delayed_tasks_.emplace_back(delayed.release(),
387
132
                                          [](DelayedTask* delayed) {
388
      uv_close(reinterpret_cast<uv_handle_t*>(&delayed->timer),
389
140
               [](uv_handle_t* handle) {
390
4
        delete static_cast<DelayedTask*>(handle->data);
391
272
      });
392
268
    });
393
136
  }
394
  // Move all foreground tasks into a separate queue and flush that queue.
395
  // This way tasks that are posted while flushing the queue will be run on the
396
  // next call of FlushForegroundTasksInternal.
397
173424
  std::queue<std::unique_ptr<Task>> tasks = foreground_tasks_.PopAll();
398
357348
  while (!tasks.empty()) {
399
10507
    std::unique_ptr<Task> task = std::move(tasks.front());
400
10513
    tasks.pop();
401
10512
    did_work = true;
402
10512
    RunForegroundTask(std::move(task));
403
10510
  }
404
173557
  return did_work;
405
}
406
407
134262
void NodePlatform::CallOnWorkerThread(std::unique_ptr<v8::Task> task) {
408
134262
  worker_thread_task_runner_->PostTask(std::move(task));
409
134262
}
410
411
1
void NodePlatform::CallDelayedOnWorkerThread(std::unique_ptr<v8::Task> task,
412
                                             double delay_in_seconds) {
413
1
  worker_thread_task_runner_->PostDelayedTask(std::move(task),
414
1
                                              delay_in_seconds);
415
1
}
416
417
418
std::shared_ptr<PerIsolatePlatformData>
419
185804
NodePlatform::ForIsolate(Isolate* isolate) {
420
185804
  Mutex::ScopedLock lock(per_isolate_mutex_);
421
185804
  std::shared_ptr<PerIsolatePlatformData> data = per_isolate_[isolate];
422
185804
  CHECK(data);
423
185804
  return data;
424
}
425
426
void NodePlatform::CallOnForegroundThread(Isolate* isolate, Task* task) {
427
  ForIsolate(isolate)->PostTask(std::unique_ptr<Task>(task));
428
}
429
430
void NodePlatform::CallDelayedOnForegroundThread(Isolate* isolate,
431
                                                 Task* task,
432
                                                 double delay_in_seconds) {
433
  ForIsolate(isolate)->PostDelayedTask(
434
    std::unique_ptr<Task>(task), delay_in_seconds);
435
}
436
437
154078
bool NodePlatform::FlushForegroundTasks(v8::Isolate* isolate) {
438
154078
  return ForIsolate(isolate)->FlushForegroundTasksInternal();
439
}
440
441
4039
void NodePlatform::CancelPendingDelayedTasks(v8::Isolate* isolate) {
442
4039
  ForIsolate(isolate)->CancelPendingDelayedTasks();
443
4039
}
444
445
31039
bool NodePlatform::IdleTasksEnabled(Isolate* isolate) { return false; }
446
447
std::shared_ptr<v8::TaskRunner>
448
19625
NodePlatform::GetForegroundTaskRunner(Isolate* isolate) {
449
19625
  return ForIsolate(isolate);
450
}
451
452
1213961
double NodePlatform::MonotonicallyIncreasingTime() {
453
  // Convert nanos to seconds.
454
1213961
  return uv_hrtime() / 1e9;
455
}
456
457
106315727
double NodePlatform::CurrentClockTimeMillis() {
458
106315727
  return SystemClockTimeMillis();
459
}
460
461
215056
TracingController* NodePlatform::GetTracingController() {
462
215056
  CHECK_NOT_NULL(tracing_controller_);
463
215056
  return tracing_controller_;
464
}
465
466
template <class T>
467
17310
TaskQueue<T>::TaskQueue()
468
    : lock_(), tasks_available_(), tasks_drained_(),
469
17310
      outstanding_tasks_(0), stopped_(false), task_queue_() { }
470
471
template <class T>
472
149412
void TaskQueue<T>::Push(std::unique_ptr<T> task) {
473
149412
  Mutex::ScopedLock scoped_lock(lock_);
474
149412
  outstanding_tasks_++;
475
149412
  task_queue_.push(std::move(task));
476
149412
  tasks_available_.Signal(scoped_lock);
477
149412
}
478
479
template <class T>
480
190118
std::unique_ptr<T> TaskQueue<T>::Pop() {
481
190118
  Mutex::ScopedLock scoped_lock(lock_);
482

190118
  if (task_queue_.empty()) {
483
185743
    return std::unique_ptr<T>(nullptr);
484
  }
485
8750
  std::unique_ptr<T> result = std::move(task_queue_.front());
486
4375
  task_queue_.pop();
487
194493
  return result;
488
}
489
490
template <class T>
491
151209
std::unique_ptr<T> TaskQueue<T>::BlockingPop() {
492
151209
  Mutex::ScopedLock scoped_lock(lock_);
493

435879
  while (task_queue_.empty() && !stopped_) {
494
133417
    tasks_available_.Wait(scoped_lock);
495
  }
496
151221
  if (stopped_) {
497
16958
    return std::unique_ptr<T>(nullptr);
498
  }
499
268526
  std::unique_ptr<T> result = std::move(task_queue_.front());
500
134263
  task_queue_.pop();
501
285484
  return result;
502
}
503
504
template <class T>
505
134152
void TaskQueue<T>::NotifyOfCompletion() {
506
134152
  Mutex::ScopedLock scoped_lock(lock_);
507
134263
  if (--outstanding_tasks_ == 0) {
508
87975
    tasks_drained_.Broadcast(scoped_lock);
509
134263
  }
510
134238
}
511
512
template <class T>
513
14015
void TaskQueue<T>::BlockingDrain() {
514
14015
  Mutex::ScopedLock scoped_lock(lock_);
515
28143
  while (outstanding_tasks_ > 0) {
516
113
    tasks_drained_.Wait(scoped_lock);
517
14015
  }
518
14015
}
519
520
template <class T>
521
4238
void TaskQueue<T>::Stop() {
522
4238
  Mutex::ScopedLock scoped_lock(lock_);
523
4238
  stopped_ = true;
524
4238
  tasks_available_.Broadcast(scoped_lock);
525
4238
}
526
527
template <class T>
528
173424
std::queue<std::unique_ptr<T>> TaskQueue<T>::PopAll() {
529
173424
  Mutex::ScopedLock scoped_lock(lock_);
530
173424
  std::queue<std::unique_ptr<T>> result;
531
173418
  result.swap(task_queue_);
532
173423
  return result;
533
}
534
535
}  // namespace node