GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage/nodes/benchmark/out/../src/node_platform.cc Lines: 318 327 97.2 %
Date: 2019-01-07 12:15:22 Branches: 75 98 76.5 %

Line Branch Exec Source
1
#include "node_platform.h"
2
#include "node_internals.h"
3
4
#include "env-inl.h"
5
#include "debug_utils.h"
6
#include "util.h"
7
#include <algorithm>
8
9
namespace node {
10
11
using v8::HandleScope;
12
using v8::Isolate;
13
using v8::Local;
14
using v8::Object;
15
using v8::Platform;
16
using v8::Task;
17
using node::tracing::TracingController;
18
19
namespace {
20
21
struct PlatformWorkerData {
22
  TaskQueue<Task>* task_queue;
23
  Mutex* platform_workers_mutex;
24
  ConditionVariable* platform_workers_ready;
25
  int* pending_platform_workers;
26
  int id;
27
};
28
29
14127
static void PlatformWorkerThread(void* data) {
30
  std::unique_ptr<PlatformWorkerData>
31
14127
      worker_data(static_cast<PlatformWorkerData*>(data));
32
33
14205
  TaskQueue<Task>* pending_worker_tasks = worker_data->task_queue;
34

14214
  TRACE_EVENT_METADATA1("__metadata", "thread_name", "name",
35
                        "PlatformWorkerThread");
36
37
  // Notify the main thread that the platform worker is ready.
38
  {
39
14204
    Mutex::ScopedLock lock(*worker_data->platform_workers_mutex);
40
14238
    (*worker_data->pending_platform_workers)--;
41
14238
    worker_data->platform_workers_ready->Signal(lock);
42
  }
43
44
245108
  while (std::unique_ptr<Task> task = pending_worker_tasks->BlockingPop()) {
45
230489
    task->Run();
46
230186
    pending_worker_tasks->NotifyOfCompletion();
47
230871
  }
48
12850
}
49
50
}  // namespace
51
52
3210
class WorkerThreadsTaskRunner::DelayedTaskScheduler {
53
 public:
54
3558
  explicit DelayedTaskScheduler(TaskQueue<Task>* tasks)
55
3558
    : pending_worker_tasks_(tasks) {}
56
57
3558
  std::unique_ptr<uv_thread_t> Start() {
58
10674
    auto start_thread = [](void* data) {
59
3558
      static_cast<DelayedTaskScheduler*>(data)->Run();
60
10327
    };
61
3558
    std::unique_ptr<uv_thread_t> t { new uv_thread_t() };
62
3558
    uv_sem_init(&ready_, 0);
63
3558
    CHECK_EQ(0, uv_thread_create(t.get(), start_thread, this));
64
3558
    uv_sem_wait(&ready_);
65
3558
    uv_sem_destroy(&ready_);
66
3558
    return t;
67
  }
68
69
1
  void PostDelayedTask(std::unique_ptr<Task> task, double delay_in_seconds) {
70
1
    tasks_.Push(std::unique_ptr<Task>(new ScheduleTask(this, std::move(task),
71
1
                                                       delay_in_seconds)));
72
1
    uv_async_send(&flush_tasks_);
73
1
  }
74
75
3211
  void Stop() {
76
3211
    tasks_.Push(std::unique_ptr<Task>(new StopTask(this)));
77
3211
    uv_async_send(&flush_tasks_);
78
3211
  }
79
80
 private:
81
3558
  void Run() {
82

3558
    TRACE_EVENT_METADATA1("__metadata", "thread_name", "name",
83
                          "WorkerThreadsTaskRunner::DelayedTaskScheduler");
84
3558
    loop_.data = this;
85
3558
    CHECK_EQ(0, uv_loop_init(&loop_));
86
3558
    flush_tasks_.data = this;
87
3558
    CHECK_EQ(0, uv_async_init(&loop_, &flush_tasks_, FlushTasks));
88
3558
    uv_sem_post(&ready_);
89
90
3558
    uv_run(&loop_, UV_RUN_DEFAULT);
91
3211
    CheckedUvLoopClose(&loop_);
92
3211
  }
93
94
3212
  static void FlushTasks(uv_async_t* flush_tasks) {
95
    DelayedTaskScheduler* scheduler =
96
3212
        ContainerOf(&DelayedTaskScheduler::loop_, flush_tasks->loop);
97
6424
    while (std::unique_ptr<Task> task = scheduler->tasks_.Pop())
98
3212
      task->Run();
99
3212
  }
100
101
6422
  class StopTask : public Task {
102
   public:
103
3211
    explicit StopTask(DelayedTaskScheduler* scheduler): scheduler_(scheduler) {}
104
105
3211
    void Run() override {
106
3211
      std::vector<uv_timer_t*> timers;
107
3211
      for (uv_timer_t* timer : scheduler_->timers_)
108
        timers.push_back(timer);
109
3211
      for (uv_timer_t* timer : timers)
110
        scheduler_->TakeTimerTask(timer);
111
      uv_close(reinterpret_cast<uv_handle_t*>(&scheduler_->flush_tasks_),
112
12844
               [](uv_handle_t* handle) {});
113
3211
    }
114
115
   private:
116
     DelayedTaskScheduler* scheduler_;
117
  };
118
119
2
  class ScheduleTask : public Task {
120
   public:
121
1
    ScheduleTask(DelayedTaskScheduler* scheduler,
122
                 std::unique_ptr<Task> task,
123
                 double delay_in_seconds)
124
      : scheduler_(scheduler),
125
1
        task_(std::move(task)),
126
2
        delay_in_seconds_(delay_in_seconds) {}
127
128
1
    void Run() override {
129
      uint64_t delay_millis =
130
1
          static_cast<uint64_t>(delay_in_seconds_ + 0.5) * 1000;
131
1
      std::unique_ptr<uv_timer_t> timer(new uv_timer_t());
132
1
      CHECK_EQ(0, uv_timer_init(&scheduler_->loop_, timer.get()));
133
1
      timer->data = task_.release();
134
1
      CHECK_EQ(0, uv_timer_start(timer.get(), RunTask, delay_millis, 0));
135
1
      scheduler_->timers_.insert(timer.release());
136
1
    }
137
138
   private:
139
    DelayedTaskScheduler* scheduler_;
140
    std::unique_ptr<Task> task_;
141
    double delay_in_seconds_;
142
  };
143
144
1
  static void RunTask(uv_timer_t* timer) {
145
    DelayedTaskScheduler* scheduler =
146
1
        ContainerOf(&DelayedTaskScheduler::loop_, timer->loop);
147
1
    scheduler->pending_worker_tasks_->Push(scheduler->TakeTimerTask(timer));
148
1
  }
149
150
1
  std::unique_ptr<Task> TakeTimerTask(uv_timer_t* timer) {
151
1
    std::unique_ptr<Task> task(static_cast<Task*>(timer->data));
152
1
    uv_timer_stop(timer);
153
3
    uv_close(reinterpret_cast<uv_handle_t*>(timer), [](uv_handle_t* handle) {
154
1
      delete reinterpret_cast<uv_timer_t*>(handle);
155
4
    });
156
1
    timers_.erase(timer);
157
1
    return task;
158
  }
159
160
  uv_sem_t ready_;
161
  TaskQueue<v8::Task>* pending_worker_tasks_;
162
163
  TaskQueue<v8::Task> tasks_;
164
  uv_loop_t loop_;
165
  uv_async_t flush_tasks_;
166
  std::unordered_set<uv_timer_t*> timers_;
167
};
168
169
3558
WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int thread_pool_size) {
170
3558
  Mutex platform_workers_mutex;
171
7116
  ConditionVariable platform_workers_ready;
172
173
7116
  Mutex::ScopedLock lock(platform_workers_mutex);
174
3558
  int pending_platform_workers = thread_pool_size;
175
176
  delayed_task_scheduler_.reset(
177
3558
      new DelayedTaskScheduler(&pending_worker_tasks_));
178
3558
  threads_.push_back(delayed_task_scheduler_->Start());
179
180
17796
  for (int i = 0; i < thread_pool_size; i++) {
181
    PlatformWorkerData* worker_data = new PlatformWorkerData{
182
      &pending_worker_tasks_, &platform_workers_mutex,
183
      &platform_workers_ready, &pending_platform_workers, i
184
14238
    };
185
14238
    std::unique_ptr<uv_thread_t> t { new uv_thread_t() };
186
14238
    if (uv_thread_create(t.get(), PlatformWorkerThread,
187
14238
                         worker_data) != 0) {
188
      break;
189
    }
190
14238
    threads_.push_back(std::move(t));
191
14238
  }
192
193
  // Wait for platform workers to initialize before continuing with the
194
  // bootstrap.
195
19233
  while (pending_platform_workers > 0) {
196
12117
    platform_workers_ready.Wait(lock);
197
3558
  }
198
3558
}
199
200
230892
void WorkerThreadsTaskRunner::PostTask(std::unique_ptr<Task> task) {
201
230892
  pending_worker_tasks_.Push(std::move(task));
202
230892
}
203
204
1
void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr<v8::Task> task,
205
                                              double delay_in_seconds) {
206
1
  delayed_task_scheduler_->PostDelayedTask(std::move(task), delay_in_seconds);
207
1
}
208
209
7542
void WorkerThreadsTaskRunner::BlockingDrain() {
210
7542
  pending_worker_tasks_.BlockingDrain();
211
7542
}
212
213
3211
void WorkerThreadsTaskRunner::Shutdown() {
214
3211
  pending_worker_tasks_.Stop();
215
3211
  delayed_task_scheduler_->Stop();
216
19272
  for (size_t i = 0; i < threads_.size(); i++) {
217
16061
    CHECK_EQ(0, uv_thread_join(threads_[i].get()));
218
  }
219
3211
}
220
221
3901
int WorkerThreadsTaskRunner::NumberOfWorkerThreads() const {
222
3901
  return threads_.size();
223
}
224
225
3700
PerIsolatePlatformData::PerIsolatePlatformData(
226
    v8::Isolate* isolate, uv_loop_t* loop)
227
3700
  : loop_(loop) {
228
3700
  flush_tasks_ = new uv_async_t();
229
3700
  CHECK_EQ(0, uv_async_init(loop, flush_tasks_, FlushTasks));
230
3700
  flush_tasks_->data = static_cast<void*>(this);
231
3700
  uv_unref(reinterpret_cast<uv_handle_t*>(flush_tasks_));
232
3700
}
233
234
940
void PerIsolatePlatformData::FlushTasks(uv_async_t* handle) {
235
940
  auto platform_data = static_cast<PerIsolatePlatformData*>(handle->data);
236
940
  platform_data->FlushForegroundTasksInternal();
237
940
}
238
239
void PerIsolatePlatformData::PostIdleTask(std::unique_ptr<v8::IdleTask> task) {
240
  UNREACHABLE();
241
}
242
243
1938
void PerIsolatePlatformData::PostTask(std::unique_ptr<Task> task) {
244
1938
  CHECK_NE(flush_tasks_, nullptr);
245
1938
  foreground_tasks_.Push(std::move(task));
246
1938
  uv_async_send(flush_tasks_);
247
1938
}
248
249
175
void PerIsolatePlatformData::PostDelayedTask(
250
    std::unique_ptr<Task> task, double delay_in_seconds) {
251
175
  CHECK_NE(flush_tasks_, nullptr);
252
175
  std::unique_ptr<DelayedTask> delayed(new DelayedTask());
253
175
  delayed->task = std::move(task);
254
175
  delayed->platform_data = shared_from_this();
255
175
  delayed->timeout = delay_in_seconds;
256
175
  foreground_delayed_tasks_.Push(std::move(delayed));
257
175
  uv_async_send(flush_tasks_);
258
175
}
259
260
6378
PerIsolatePlatformData::~PerIsolatePlatformData() {
261
3189
  Shutdown();
262
3189
}
263
264
6542
void PerIsolatePlatformData::Shutdown() {
265
6542
  if (flush_tasks_ == nullptr)
266
9731
    return;
267
268
3353
  while (FlushForegroundTasksInternal()) {}
269
3353
  CancelPendingDelayedTasks();
270
271
  uv_close(reinterpret_cast<uv_handle_t*>(flush_tasks_),
272
3645
           [](uv_handle_t* handle) {
273
146
    delete reinterpret_cast<uv_async_t*>(handle);
274
6998
  });
275
3353
  flush_tasks_ = nullptr;
276
}
277
278
3685
void PerIsolatePlatformData::ref() {
279
3685
  ref_count_++;
280
3685
}
281
282
6691
int PerIsolatePlatformData::unref() {
283
6691
  return --ref_count_;
284
}
285
286
3558
NodePlatform::NodePlatform(int thread_pool_size,
287
3558
                           TracingController* tracing_controller) {
288
3558
  if (tracing_controller) {
289
3558
    tracing_controller_ = tracing_controller;
290
  } else {
291
    tracing_controller_ = new TracingController();
292
  }
293
7116
  worker_thread_task_runner_ =
294
3558
      std::make_shared<WorkerThreadsTaskRunner>(thread_pool_size);
295
3558
}
296
297
7385
void NodePlatform::RegisterIsolate(Isolate* isolate, uv_loop_t* loop) {
298
7385
  Mutex::ScopedLock lock(per_isolate_mutex_);
299
14770
  std::shared_ptr<PerIsolatePlatformData> existing = per_isolate_[isolate];
300
7385
  if (existing) {
301
3685
    CHECK_EQ(loop, existing->event_loop());
302
3685
    existing->ref();
303
  } else {
304
7400
    per_isolate_[isolate] =
305
3700
        std::make_shared<PerIsolatePlatformData>(isolate, loop);
306
7385
  }
307
7385
}
308
309
6691
void NodePlatform::UnregisterIsolate(Isolate* isolate) {
310
6691
  Mutex::ScopedLock lock(per_isolate_mutex_);
311
13382
  std::shared_ptr<PerIsolatePlatformData> existing = per_isolate_[isolate];
312
6691
  CHECK(existing);
313
6691
  if (existing->unref() == 0) {
314
3353
    existing->Shutdown();
315
3353
    per_isolate_.erase(isolate);
316
6691
  }
317
6691
}
318
319
3211
void NodePlatform::Shutdown() {
320
3211
  worker_thread_task_runner_->Shutdown();
321
322
  {
323
3211
    Mutex::ScopedLock lock(per_isolate_mutex_);
324
3211
    per_isolate_.clear();
325
  }
326
3211
}
327
328
3901
int NodePlatform::NumberOfWorkerThreads() {
329
3901
  return worker_thread_task_runner_->NumberOfWorkerThreads();
330
}
331
332
1933
void PerIsolatePlatformData::RunForegroundTask(std::unique_ptr<Task> task) {
333
1933
  Isolate* isolate = Isolate::GetCurrent();
334
1933
  HandleScope scope(isolate);
335
1933
  Environment* env = Environment::GetCurrent(isolate);
336
  InternalCallbackScope cb_scope(env, Local<Object>(), { 0, 0 },
337
5799
                                 InternalCallbackScope::kAllowEmptyResource);
338
3866
  task->Run();
339
1933
}
340
341
1
void PerIsolatePlatformData::DeleteFromScheduledTasks(DelayedTask* task) {
342
  auto it = std::find_if(scheduled_delayed_tasks_.begin(),
343
                         scheduled_delayed_tasks_.end(),
344
1
                         [task](const DelayedTaskPointer& delayed) -> bool {
345
1
          return delayed.get() == task;
346
1
      });
347
1
  CHECK_NE(it, scheduled_delayed_tasks_.end());
348
1
  scheduled_delayed_tasks_.erase(it);
349
1
}
350
351
1
void PerIsolatePlatformData::RunForegroundTask(uv_timer_t* handle) {
352
1
  DelayedTask* delayed = static_cast<DelayedTask*>(handle->data);
353
1
  RunForegroundTask(std::move(delayed->task));
354
1
  delayed->platform_data->DeleteFromScheduledTasks(delayed);
355
1
}
356
357
6678
void PerIsolatePlatformData::CancelPendingDelayedTasks() {
358
6678
  scheduled_delayed_tasks_.clear();
359
6678
}
360
361
6673
void NodePlatform::DrainTasks(Isolate* isolate) {
362
6673
  std::shared_ptr<PerIsolatePlatformData> per_isolate = ForIsolate(isolate);
363
364
7542
  do {
365
    // Worker tasks aren't associated with an Isolate.
366
7542
    worker_thread_task_runner_->BlockingDrain();
367
14215
  } while (per_isolate->FlushForegroundTasksInternal());
368
6673
}
369
370
339566
bool PerIsolatePlatformData::FlushForegroundTasksInternal() {
371
339566
  bool did_work = false;
372
373
  while (std::unique_ptr<DelayedTask> delayed =
374
339740
      foreground_delayed_tasks_.Pop()) {
375
174
    did_work = true;
376
    uint64_t delay_millis =
377
174
        static_cast<uint64_t>(delayed->timeout + 0.5) * 1000;
378
174
    delayed->timer.data = static_cast<void*>(delayed.get());
379
174
    uv_timer_init(loop_, &delayed->timer);
380
    // Timers may not guarantee queue ordering of events with the same delay if
381
    // the delay is non-zero. This should not be a problem in practice.
382
174
    uv_timer_start(&delayed->timer, RunForegroundTask, delay_millis, 0);
383
174
    uv_unref(reinterpret_cast<uv_handle_t*>(&delayed->timer));
384
385
174
    scheduled_delayed_tasks_.emplace_back(delayed.release(),
386
167
                                          [](DelayedTask* delayed) {
387
      uv_close(reinterpret_cast<uv_handle_t*>(&delayed->timer),
388
173
               [](uv_handle_t* handle) {
389
3
        delete static_cast<DelayedTask*>(handle->data);
390
340
      });
391
341
    });
392
174
  }
393
  // Move all foreground tasks into a separate queue and flush that queue.
394
  // This way tasks that are posted while flushing the queue will be run on the
395
  // next call of FlushForegroundTasksInternal.
396
339566
  std::queue<std::unique_ptr<Task>> tasks = foreground_tasks_.PopAll();
397
681062
  while (!tasks.empty()) {
398
1932
    std::unique_ptr<Task> task = std::move(tasks.front());
399
1932
    tasks.pop();
400
1932
    did_work = true;
401
1932
    RunForegroundTask(std::move(task));
402
1932
  }
403
339740
  return did_work;
404
}
405
406
230892
void NodePlatform::CallOnWorkerThread(std::unique_ptr<v8::Task> task) {
407
230892
  worker_thread_task_runner_->PostTask(std::move(task));
408
230892
}
409
410
1
void NodePlatform::CallDelayedOnWorkerThread(std::unique_ptr<v8::Task> task,
411
                                             double delay_in_seconds) {
412
1
  worker_thread_task_runner_->PostDelayedTask(std::move(task),
413
1
                                              delay_in_seconds);
414
1
}
415
416
417
std::shared_ptr<PerIsolatePlatformData>
418
347083
NodePlatform::ForIsolate(Isolate* isolate) {
419
347083
  Mutex::ScopedLock lock(per_isolate_mutex_);
420
347083
  std::shared_ptr<PerIsolatePlatformData> data = per_isolate_[isolate];
421
347083
  CHECK(data);
422
347083
  return data;
423
}
424
425
3
void NodePlatform::CallOnForegroundThread(Isolate* isolate, Task* task) {
426
3
  ForIsolate(isolate)->PostTask(std::unique_ptr<Task>(task));
427
3
}
428
429
void NodePlatform::CallDelayedOnForegroundThread(Isolate* isolate,
430
                                                 Task* task,
431
                                                 double delay_in_seconds) {
432
  ForIsolate(isolate)->PostDelayedTask(
433
    std::unique_ptr<Task>(task), delay_in_seconds);
434
}
435
436
327731
bool NodePlatform::FlushForegroundTasks(v8::Isolate* isolate) {
437
327731
  return ForIsolate(isolate)->FlushForegroundTasksInternal();
438
}
439
440
3325
void NodePlatform::CancelPendingDelayedTasks(v8::Isolate* isolate) {
441
3325
  ForIsolate(isolate)->CancelPendingDelayedTasks();
442
3325
}
443
444
54158
bool NodePlatform::IdleTasksEnabled(Isolate* isolate) { return false; }
445
446
std::shared_ptr<v8::TaskRunner>
447
9351
NodePlatform::GetForegroundTaskRunner(Isolate* isolate) {
448
9351
  return ForIsolate(isolate);
449
}
450
451
1933076
double NodePlatform::MonotonicallyIncreasingTime() {
452
  // Convert nanos to seconds.
453
1933076
  return uv_hrtime() / 1e9;
454
}
455
456
31409788
double NodePlatform::CurrentClockTimeMillis() {
457
31409788
  return SystemClockTimeMillis();
458
}
459
460
189056
TracingController* NodePlatform::GetTracingController() {
461
189056
  return tracing_controller_;
462
}
463
464
template <class T>
465
14516
TaskQueue<T>::TaskQueue()
466
    : lock_(), tasks_available_(), tasks_drained_(),
467
14516
      outstanding_tasks_(0), stopped_(false), task_queue_() { }
468
469
template <class T>
470
236218
void TaskQueue<T>::Push(std::unique_ptr<T> task) {
471
236218
  Mutex::ScopedLock scoped_lock(lock_);
472
236218
  outstanding_tasks_++;
473
236218
  task_queue_.push(std::move(task));
474
236218
  tasks_available_.Signal(scoped_lock);
475
236218
}
476
477
template <class T>
478
346164
std::unique_ptr<T> TaskQueue<T>::Pop() {
479
346164
  Mutex::ScopedLock scoped_lock(lock_);
480

346164
  if (task_queue_.empty()) {
481
342778
    return std::unique_ptr<T>(nullptr);
482
  }
483
6772
  std::unique_ptr<T> result = std::move(task_queue_.front());
484
3386
  task_queue_.pop();
485
349550
  return result;
486
}
487
488
template <class T>
489
245083
std::unique_ptr<T> TaskQueue<T>::BlockingPop() {
490
245083
  Mutex::ScopedLock scoped_lock(lock_);
491

707781
  while (task_queue_.empty() && !stopped_) {
492
218908
    tasks_available_.Wait(scoped_lock);
493
  }
494
243743
  if (stopped_) {
495
12850
    return std::unique_ptr<T>(nullptr);
496
  }
497
461786
  std::unique_ptr<T> result = std::move(task_queue_.front());
498
230893
  task_queue_.pop();
499
474636
  return result;
500
}
501
502
template <class T>
503
230123
void TaskQueue<T>::NotifyOfCompletion() {
504
230123
  Mutex::ScopedLock scoped_lock(lock_);
505
230892
  if (--outstanding_tasks_ == 0) {
506
144538
    tasks_drained_.Broadcast(scoped_lock);
507
230892
  }
508
230816
}
509
510
template <class T>
511
7542
void TaskQueue<T>::BlockingDrain() {
512
7542
  Mutex::ScopedLock scoped_lock(lock_);
513
15415
  while (outstanding_tasks_ > 0) {
514
331
    tasks_drained_.Wait(scoped_lock);
515
7542
  }
516
7542
}
517
518
template <class T>
519
3211
void TaskQueue<T>::Stop() {
520
3211
  Mutex::ScopedLock scoped_lock(lock_);
521
3211
  stopped_ = true;
522
3211
  tasks_available_.Broadcast(scoped_lock);
523
3211
}
524
525
template <class T>
526
339566
std::queue<std::unique_ptr<T>> TaskQueue<T>::PopAll() {
527
339566
  Mutex::ScopedLock scoped_lock(lock_);
528
339566
  std::queue<std::unique_ptr<T>> result;
529
339565
  result.swap(task_queue_);
530
339566
  return result;
531
}
532
533
}  // namespace node