GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/node_platform.cc Lines: 329 343 95.9 %
Date: 2019-05-05 22:32:45 Branches: 77 108 71.3 %

Line Branch Exec Source
1
#include "node_platform.h"
2
#include "node_internals.h"
3
4
#include "env-inl.h"
5
#include "debug_utils.h"
6
#include "util.h"
7
#include <algorithm>
8
#include <cmath>
9
#include <memory>
10
11
namespace node {
12
13
using v8::Isolate;
14
using v8::Local;
15
using v8::Object;
16
using v8::Platform;
17
using v8::Task;
18
using node::tracing::TracingController;
19
20
namespace {
21
22
struct PlatformWorkerData {
23
  TaskQueue<Task>* task_queue;
24
  Mutex* platform_workers_mutex;
25
  ConditionVariable* platform_workers_ready;
26
  int* pending_platform_workers;
27
  int id;
28
};
29
30
17714
static void PlatformWorkerThread(void* data) {
31
  std::unique_ptr<PlatformWorkerData>
32
17714
      worker_data(static_cast<PlatformWorkerData*>(data));
33
34
17826
  TaskQueue<Task>* pending_worker_tasks = worker_data->task_queue;
35

35504
  TRACE_EVENT_METADATA1("__metadata", "thread_name", "name",
36
                        "PlatformWorkerThread");
37
38
  // Notify the main thread that the platform worker is ready.
39
  {
40
17792
    Mutex::ScopedLock lock(*worker_data->platform_workers_mutex);
41
17886
    (*worker_data->pending_platform_workers)--;
42
17886
    worker_data->platform_workers_ready->Signal(lock);
43
  }
44
45
161936
  while (std::unique_ptr<Task> task = pending_worker_tasks->BlockingPop()) {
46
143828
    task->Run();
47
143909
    pending_worker_tasks->NotifyOfCompletion();
48
144061
  }
49
17870
}
50
51
}  // namespace
52
53
4466
class WorkerThreadsTaskRunner::DelayedTaskScheduler {
54
 public:
55
4470
  explicit DelayedTaskScheduler(TaskQueue<Task>* tasks)
56
4470
    : pending_worker_tasks_(tasks) {}
57
58
4470
  std::unique_ptr<uv_thread_t> Start() {
59
13410
    auto start_thread = [](void* data) {
60
4470
      static_cast<DelayedTaskScheduler*>(data)->Run();
61
13406
    };
62
4470
    std::unique_ptr<uv_thread_t> t { new uv_thread_t() };
63
4470
    uv_sem_init(&ready_, 0);
64
4470
    CHECK_EQ(0, uv_thread_create(t.get(), start_thread, this));
65
4470
    uv_sem_wait(&ready_);
66
4470
    uv_sem_destroy(&ready_);
67
4470
    return t;
68
  }
69
70
1
  void PostDelayedTask(std::unique_ptr<Task> task, double delay_in_seconds) {
71
1
    tasks_.Push(std::unique_ptr<Task>(new ScheduleTask(this, std::move(task),
72
1
                                                       delay_in_seconds)));
73
1
    uv_async_send(&flush_tasks_);
74
1
  }
75
76
4466
  void Stop() {
77
4466
    tasks_.Push(std::unique_ptr<Task>(new StopTask(this)));
78
4466
    uv_async_send(&flush_tasks_);
79
4466
  }
80
81
 private:
82
4470
  void Run() {
83

8940
    TRACE_EVENT_METADATA1("__metadata", "thread_name", "name",
84
                          "WorkerThreadsTaskRunner::DelayedTaskScheduler");
85
4470
    loop_.data = this;
86
4470
    CHECK_EQ(0, uv_loop_init(&loop_));
87
4470
    flush_tasks_.data = this;
88
4470
    CHECK_EQ(0, uv_async_init(&loop_, &flush_tasks_, FlushTasks));
89
4470
    uv_sem_post(&ready_);
90
91
4470
    uv_run(&loop_, UV_RUN_DEFAULT);
92
4466
    CheckedUvLoopClose(&loop_);
93
4466
  }
94
95
4467
  static void FlushTasks(uv_async_t* flush_tasks) {
96
    DelayedTaskScheduler* scheduler =
97
4467
        ContainerOf(&DelayedTaskScheduler::loop_, flush_tasks->loop);
98
8934
    while (std::unique_ptr<Task> task = scheduler->tasks_.Pop())
99
4467
      task->Run();
100
4467
  }
101
102
8932
  class StopTask : public Task {
103
   public:
104
4466
    explicit StopTask(DelayedTaskScheduler* scheduler): scheduler_(scheduler) {}
105
106
4466
    void Run() override {
107
4466
      std::vector<uv_timer_t*> timers;
108
4466
      for (uv_timer_t* timer : scheduler_->timers_)
109
        timers.push_back(timer);
110
4466
      for (uv_timer_t* timer : timers)
111
        scheduler_->TakeTimerTask(timer);
112
      uv_close(reinterpret_cast<uv_handle_t*>(&scheduler_->flush_tasks_),
113
17864
               [](uv_handle_t* handle) {});
114
4466
    }
115
116
   private:
117
     DelayedTaskScheduler* scheduler_;
118
  };
119
120
2
  class ScheduleTask : public Task {
121
   public:
122
1
    ScheduleTask(DelayedTaskScheduler* scheduler,
123
                 std::unique_ptr<Task> task,
124
                 double delay_in_seconds)
125
      : scheduler_(scheduler),
126
1
        task_(std::move(task)),
127
2
        delay_in_seconds_(delay_in_seconds) {}
128
129
1
    void Run() override {
130
1
      uint64_t delay_millis = llround(delay_in_seconds_ * 1000);
131
1
      std::unique_ptr<uv_timer_t> timer(new uv_timer_t());
132
1
      CHECK_EQ(0, uv_timer_init(&scheduler_->loop_, timer.get()));
133
1
      timer->data = task_.release();
134
1
      CHECK_EQ(0, uv_timer_start(timer.get(), RunTask, delay_millis, 0));
135
1
      scheduler_->timers_.insert(timer.release());
136
1
    }
137
138
   private:
139
    DelayedTaskScheduler* scheduler_;
140
    std::unique_ptr<Task> task_;
141
    double delay_in_seconds_;
142
  };
143
144
1
  static void RunTask(uv_timer_t* timer) {
145
    DelayedTaskScheduler* scheduler =
146
1
        ContainerOf(&DelayedTaskScheduler::loop_, timer->loop);
147
1
    scheduler->pending_worker_tasks_->Push(scheduler->TakeTimerTask(timer));
148
1
  }
149
150
1
  std::unique_ptr<Task> TakeTimerTask(uv_timer_t* timer) {
151
1
    std::unique_ptr<Task> task(static_cast<Task*>(timer->data));
152
1
    uv_timer_stop(timer);
153
3
    uv_close(reinterpret_cast<uv_handle_t*>(timer), [](uv_handle_t* handle) {
154
1
      delete reinterpret_cast<uv_timer_t*>(handle);
155
4
    });
156
1
    timers_.erase(timer);
157
1
    return task;
158
  }
159
160
  uv_sem_t ready_;
161
  TaskQueue<Task>* pending_worker_tasks_;
162
163
  TaskQueue<Task> tasks_;
164
  uv_loop_t loop_;
165
  uv_async_t flush_tasks_;
166
  std::unordered_set<uv_timer_t*> timers_;
167
};
168
169
4470
WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int thread_pool_size) {
170
4470
  Mutex platform_workers_mutex;
171
8940
  ConditionVariable platform_workers_ready;
172
173
8940
  Mutex::ScopedLock lock(platform_workers_mutex);
174
4470
  int pending_platform_workers = thread_pool_size;
175
176
8940
  delayed_task_scheduler_ = std::make_unique<DelayedTaskScheduler>(
177
4470
      &pending_worker_tasks_);
178
4470
  threads_.push_back(delayed_task_scheduler_->Start());
179
180
22356
  for (int i = 0; i < thread_pool_size; i++) {
181
    PlatformWorkerData* worker_data = new PlatformWorkerData{
182
      &pending_worker_tasks_, &platform_workers_mutex,
183
      &platform_workers_ready, &pending_platform_workers, i
184
17886
    };
185
17886
    std::unique_ptr<uv_thread_t> t { new uv_thread_t() };
186
17886
    if (uv_thread_create(t.get(), PlatformWorkerThread,
187
17886
                         worker_data) != 0) {
188
      break;
189
    }
190
17886
    threads_.push_back(std::move(t));
191
17886
  }
192
193
  // Wait for platform workers to initialize before continuing with the
194
  // bootstrap.
195
24283
  while (pending_platform_workers > 0) {
196
15343
    platform_workers_ready.Wait(lock);
197
4470
  }
198
4470
}
199
200
144083
void WorkerThreadsTaskRunner::PostTask(std::unique_ptr<Task> task) {
201
144083
  pending_worker_tasks_.Push(std::move(task));
202
144083
}
203
204
1
void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr<Task> task,
205
                                              double delay_in_seconds) {
206
1
  delayed_task_scheduler_->PostDelayedTask(std::move(task), delay_in_seconds);
207
1
}
208
209
10434
void WorkerThreadsTaskRunner::BlockingDrain() {
210
10434
  pending_worker_tasks_.BlockingDrain();
211
10434
}
212
213
4466
void WorkerThreadsTaskRunner::Shutdown() {
214
4466
  pending_worker_tasks_.Stop();
215
4466
  delayed_task_scheduler_->Stop();
216
26802
  for (size_t i = 0; i < threads_.size(); i++) {
217
22336
    CHECK_EQ(0, uv_thread_join(threads_[i].get()));
218
  }
219
4466
}
220
221
4939
int WorkerThreadsTaskRunner::NumberOfWorkerThreads() const {
222
4939
  return threads_.size();
223
}
224
225
4655
PerIsolatePlatformData::PerIsolatePlatformData(
226
    Isolate* isolate, uv_loop_t* loop)
227
4655
  : loop_(loop) {
228
4655
  flush_tasks_ = new uv_async_t();
229
4655
  CHECK_EQ(0, uv_async_init(loop, flush_tasks_, FlushTasks));
230
4655
  flush_tasks_->data = static_cast<void*>(this);
231
4655
  uv_unref(reinterpret_cast<uv_handle_t*>(flush_tasks_));
232
4655
}
233
234
5686
void PerIsolatePlatformData::FlushTasks(uv_async_t* handle) {
235
5686
  auto platform_data = static_cast<PerIsolatePlatformData*>(handle->data);
236
5686
  platform_data->FlushForegroundTasksInternal();
237
5685
}
238
239
void PerIsolatePlatformData::PostIdleTask(std::unique_ptr<v8::IdleTask> task) {
240
  UNREACHABLE();
241
}
242
243
6829
void PerIsolatePlatformData::PostTask(std::unique_ptr<Task> task) {
244
6829
  CHECK_NOT_NULL(flush_tasks_);
245
6829
  foreground_tasks_.Push(std::move(task));
246
6829
  uv_async_send(flush_tasks_);
247
6829
}
248
249
4491
void PerIsolatePlatformData::PostDelayedTask(
250
    std::unique_ptr<Task> task, double delay_in_seconds) {
251
4491
  CHECK_NOT_NULL(flush_tasks_);
252
4491
  std::unique_ptr<DelayedTask> delayed(new DelayedTask());
253
4491
  delayed->task = std::move(task);
254
4491
  delayed->platform_data = shared_from_this();
255
4491
  delayed->timeout = delay_in_seconds;
256
4491
  foreground_delayed_tasks_.Push(std::move(delayed));
257
4491
  uv_async_send(flush_tasks_);
258
4491
}
259
260
446
void PerIsolatePlatformData::PostNonNestableTask(std::unique_ptr<Task> task) {
261
446
  PostTask(std::move(task));
262
446
}
263
264
184
void PerIsolatePlatformData::PostNonNestableDelayedTask(
265
    std::unique_ptr<Task> task,
266
    double delay_in_seconds) {
267
184
  PostDelayedTask(std::move(task), delay_in_seconds);
268
184
}
269
270
768
PerIsolatePlatformData::~PerIsolatePlatformData() {
271
384
  Shutdown();
272
384
}
273
274
185
void PerIsolatePlatformData::AddShutdownCallback(void (*callback)(void*),
275
                                                 void* data) {
276
185
  shutdown_callbacks_.emplace_back(ShutdownCallback { callback, data });
277
185
}
278
279
4658
void PerIsolatePlatformData::Shutdown() {
280
4658
  if (flush_tasks_ == nullptr)
281
5042
    return;
282
283
4274
  CHECK_NULL(foreground_delayed_tasks_.Pop());
284
4274
  CHECK_NULL(foreground_tasks_.Pop());
285
4274
  CancelPendingDelayedTasks();
286
287
4274
  ShutdownCbList* copy = new ShutdownCbList(std::move(shutdown_callbacks_));
288
4274
  flush_tasks_->data = copy;
289
  uv_close(reinterpret_cast<uv_handle_t*>(flush_tasks_),
290
4644
           [](uv_handle_t* handle) {
291
    std::unique_ptr<ShutdownCbList> callbacks(
292
185
        static_cast<ShutdownCbList*>(handle->data));
293
370
    for (const auto& callback : *callbacks)
294
185
      callback.cb(callback.data);
295
185
    delete reinterpret_cast<uv_async_t*>(handle);
296
8918
  });
297
4274
  flush_tasks_ = nullptr;
298
}
299
300
4470
NodePlatform::NodePlatform(int thread_pool_size,
301
4470
                           TracingController* tracing_controller) {
302
4470
  if (tracing_controller) {
303
4470
    tracing_controller_ = tracing_controller;
304
  } else {
305
    tracing_controller_ = new TracingController();
306
  }
307
8940
  worker_thread_task_runner_ =
308
4470
      std::make_shared<WorkerThreadsTaskRunner>(thread_pool_size);
309
4470
}
310
311
4655
void NodePlatform::RegisterIsolate(Isolate* isolate, uv_loop_t* loop) {
312
4655
  Mutex::ScopedLock lock(per_isolate_mutex_);
313
9310
  std::shared_ptr<PerIsolatePlatformData> existing = per_isolate_[isolate];
314
4655
  CHECK(!existing);
315
9310
  per_isolate_[isolate] =
316
9310
      std::make_shared<PerIsolatePlatformData>(isolate, loop);
317
4655
}
318
319
4274
void NodePlatform::UnregisterIsolate(Isolate* isolate) {
320
4274
  Mutex::ScopedLock lock(per_isolate_mutex_);
321
8548
  std::shared_ptr<PerIsolatePlatformData> existing = per_isolate_[isolate];
322
4274
  CHECK(existing);
323
4274
  existing->Shutdown();
324
8548
  per_isolate_.erase(isolate);
325
4274
}
326
327
185
void NodePlatform::AddIsolateFinishedCallback(Isolate* isolate,
328
                                              void (*cb)(void*), void* data) {
329
185
  Mutex::ScopedLock lock(per_isolate_mutex_);
330
185
  auto it = per_isolate_.find(isolate);
331
185
  if (it == per_isolate_.end()) {
332
    CHECK(it->second);
333
    cb(data);
334
185
    return;
335
  }
336
185
  it->second->AddShutdownCallback(cb, data);
337
}
338
339
4466
void NodePlatform::Shutdown() {
340
4466
  worker_thread_task_runner_->Shutdown();
341
342
  {
343
4466
    Mutex::ScopedLock lock(per_isolate_mutex_);
344
4466
    per_isolate_.clear();
345
  }
346
4466
}
347
348
4939
int NodePlatform::NumberOfWorkerThreads() {
349
4939
  return worker_thread_task_runner_->NumberOfWorkerThreads();
350
}
351
352
6775
void PerIsolatePlatformData::RunForegroundTask(std::unique_ptr<Task> task) {
353
6775
  Isolate* isolate = Isolate::GetCurrent();
354
6775
  DebugSealHandleScope scope(isolate);
355
6775
  Environment* env = Environment::GetCurrent(isolate);
356
6775
  if (env != nullptr) {
357
    InternalCallbackScope cb_scope(env, Local<Object>(), { 0, 0 },
358
13550
                                   InternalCallbackScope::kAllowEmptyResource);
359
6775
    task->Run();
360
  } else {
361
    task->Run();
362
  }
363
6774
}
364
365
208
void PerIsolatePlatformData::DeleteFromScheduledTasks(DelayedTask* task) {
366
  auto it = std::find_if(scheduled_delayed_tasks_.begin(),
367
                         scheduled_delayed_tasks_.end(),
368
318
                         [task](const DelayedTaskPointer& delayed) -> bool {
369
318
          return delayed.get() == task;
370
208
      });
371
208
  CHECK_NE(it, scheduled_delayed_tasks_.end());
372
208
  scheduled_delayed_tasks_.erase(it);
373
208
}
374
375
208
void PerIsolatePlatformData::RunForegroundTask(uv_timer_t* handle) {
376
208
  DelayedTask* delayed = static_cast<DelayedTask*>(handle->data);
377
208
  RunForegroundTask(std::move(delayed->task));
378
208
  delayed->platform_data->DeleteFromScheduledTasks(delayed);
379
208
}
380
381
8548
void PerIsolatePlatformData::CancelPendingDelayedTasks() {
382
8548
  scheduled_delayed_tasks_.clear();
383
8548
}
384
385
8498
void NodePlatform::DrainTasks(Isolate* isolate) {
386
8498
  std::shared_ptr<PerIsolatePlatformData> per_isolate = ForIsolate(isolate);
387
388
10434
  do {
389
    // Worker tasks aren't associated with an Isolate.
390
10434
    worker_thread_task_runner_->BlockingDrain();
391
18932
  } while (per_isolate->FlushForegroundTasksInternal());
392
8498
}
393
394
162088
bool PerIsolatePlatformData::FlushForegroundTasksInternal() {
395
162088
  bool did_work = false;
396
397
  while (std::unique_ptr<DelayedTask> delayed =
398
166398
      foreground_delayed_tasks_.Pop()) {
399
4310
    did_work = true;
400
4310
    uint64_t delay_millis = llround(delayed->timeout * 1000);
401
402
4310
    delayed->timer.data = static_cast<void*>(delayed.get());
403
4310
    uv_timer_init(loop_, &delayed->timer);
404
    // Timers may not guarantee queue ordering of events with the same delay if
405
    // the delay is non-zero. This should not be a problem in practice.
406
4310
    uv_timer_start(&delayed->timer, RunForegroundTask, delay_millis, 0);
407
4310
    uv_unref(reinterpret_cast<uv_handle_t*>(&delayed->timer));
408
409
4310
    scheduled_delayed_tasks_.emplace_back(delayed.release(),
410
4178
                                          [](DelayedTask* delayed) {
411
      uv_close(reinterpret_cast<uv_handle_t*>(&delayed->timer),
412
4748
               [](uv_handle_t* handle) {
413
285
        delete static_cast<DelayedTask*>(handle->data);
414
8926
      });
415
8488
    });
416
4310
  }
417
  // Move all foreground tasks into a separate queue and flush that queue.
418
  // This way tasks that are posted while flushing the queue will be run on the
419
  // next call of FlushForegroundTasksInternal.
420
162088
  std::queue<std::unique_ptr<Task>> tasks = foreground_tasks_.PopAll();
421
330742
  while (!tasks.empty()) {
422
6567
    std::unique_ptr<Task> task = std::move(tasks.front());
423
6567
    tasks.pop();
424
6567
    did_work = true;
425
6567
    RunForegroundTask(std::move(task));
426
6566
  }
427
166397
  return did_work;
428
}
429
430
144083
void NodePlatform::CallOnWorkerThread(std::unique_ptr<Task> task) {
431
144083
  worker_thread_task_runner_->PostTask(std::move(task));
432
144083
}
433
434
1
void NodePlatform::CallDelayedOnWorkerThread(std::unique_ptr<Task> task,
435
                                             double delay_in_seconds) {
436
1
  worker_thread_task_runner_->PostDelayedTask(std::move(task),
437
1
                                              delay_in_seconds);
438
1
}
439
440
441
std::shared_ptr<PerIsolatePlatformData>
442
179716
NodePlatform::ForIsolate(Isolate* isolate) {
443
179716
  Mutex::ScopedLock lock(per_isolate_mutex_);
444
179717
  std::shared_ptr<PerIsolatePlatformData> data = per_isolate_[isolate];
445
179717
  CHECK(data);
446
179717
  return data;
447
}
448
449
void NodePlatform::CallOnForegroundThread(Isolate* isolate, Task* task) {
450
  ForIsolate(isolate)->PostTask(std::unique_ptr<Task>(task));
451
}
452
453
void NodePlatform::CallDelayedOnForegroundThread(Isolate* isolate,
454
                                                 Task* task,
455
                                                 double delay_in_seconds) {
456
  ForIsolate(isolate)->PostDelayedTask(
457
    std::unique_ptr<Task>(task), delay_in_seconds);
458
}
459
460
145968
bool NodePlatform::FlushForegroundTasks(Isolate* isolate) {
461
145968
  return ForIsolate(isolate)->FlushForegroundTasksInternal();
462
}
463
464
4274
void NodePlatform::CancelPendingDelayedTasks(Isolate* isolate) {
465
4274
  ForIsolate(isolate)->CancelPendingDelayedTasks();
466
4274
}
467
468
44670
bool NodePlatform::IdleTasksEnabled(Isolate* isolate) { return false; }
469
470
std::shared_ptr<v8::TaskRunner>
471
20977
NodePlatform::GetForegroundTaskRunner(Isolate* isolate) {
472
20977
  return ForIsolate(isolate);
473
}
474
475
1672393
double NodePlatform::MonotonicallyIncreasingTime() {
476
  // Convert nanos to seconds.
477
1672393
  return uv_hrtime() / 1e9;
478
}
479
480
84001446
double NodePlatform::CurrentClockTimeMillis() {
481
84001446
  return SystemClockTimeMillis();
482
}
483
484
207325
TracingController* NodePlatform::GetTracingController() {
485
207325
  CHECK_NOT_NULL(tracing_controller_);
486
207325
  return tracing_controller_;
487
}
488
489
template <class T>
490
18250
TaskQueue<T>::TaskQueue()
491
    : lock_(), tasks_available_(), tasks_drained_(),
492
18250
      outstanding_tasks_(0), stopped_(false), task_queue_() { }
493
494
template <class T>
495
159871
void TaskQueue<T>::Push(std::unique_ptr<T> task) {
496
159871
  Mutex::ScopedLock scoped_lock(lock_);
497
159871
  outstanding_tasks_++;
498
159871
  task_queue_.push(std::move(task));
499
159871
  tasks_available_.Signal(scoped_lock);
500
159871
}
501
502
template <class T>
503
183880
std::unique_ptr<T> TaskQueue<T>::Pop() {
504
183880
  Mutex::ScopedLock scoped_lock(lock_);
505

183880
  if (task_queue_.empty()) {
506
175102
    return std::unique_ptr<T>(nullptr);
507
  }
508
17554
  std::unique_ptr<T> result = std::move(task_queue_.front());
509
8777
  task_queue_.pop();
510
192656
  return result;
511
}
512
513
template <class T>
514
161921
std::unique_ptr<T> TaskQueue<T>::BlockingPop() {
515
161921
  Mutex::ScopedLock scoped_lock(lock_);
516

466492
  while (task_queue_.empty() && !stopped_) {
517
142570
    tasks_available_.Wait(scoped_lock);
518
  }
519
161953
  if (stopped_) {
520
17870
    return std::unique_ptr<T>(nullptr);
521
  }
522
288166
  std::unique_ptr<T> result = std::move(task_queue_.front());
523
144083
  task_queue_.pop();
524
306036
  return result;
525
}
526
527
template <class T>
528
143907
void TaskQueue<T>::NotifyOfCompletion() {
529
143907
  Mutex::ScopedLock scoped_lock(lock_);
530
144083
  if (--outstanding_tasks_ == 0) {
531
91577
    tasks_drained_.Broadcast(scoped_lock);
532
144083
  }
533
144054
}
534
535
template <class T>
536
10434
void TaskQueue<T>::BlockingDrain() {
537
10434
  Mutex::ScopedLock scoped_lock(lock_);
538
21047
  while (outstanding_tasks_ > 0) {
539
179
    tasks_drained_.Wait(scoped_lock);
540
10434
  }
541
10434
}
542
543
template <class T>
544
4466
void TaskQueue<T>::Stop() {
545
4466
  Mutex::ScopedLock scoped_lock(lock_);
546
4466
  stopped_ = true;
547
4466
  tasks_available_.Broadcast(scoped_lock);
548
4466
}
549
550
template <class T>
551
162088
std::queue<std::unique_ptr<T>> TaskQueue<T>::PopAll() {
552
162088
  Mutex::ScopedLock scoped_lock(lock_);
553
162088
  std::queue<std::unique_ptr<T>> result;
554
162088
  result.swap(task_queue_);
555
162088
  return result;
556
}
557
558
}  // namespace node