GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/node_messaging.cc Lines: 420 456 92.1 %
Date: 2019-02-26 22:23:30 Branches: 175 250 70.0 %

Line Branch Exec Source
1
#include "node_messaging.h"
2
#include "async_wrap-inl.h"
3
#include "async_wrap.h"
4
#include "debug_utils.h"
5
#include "node_buffer.h"
6
#include "node_errors.h"
7
#include "node_process.h"
8
#include "util-inl.h"
9
#include "util.h"
10
11
using v8::Array;
12
using v8::ArrayBuffer;
13
using v8::ArrayBufferCreationMode;
14
using v8::Context;
15
using v8::EscapableHandleScope;
16
using v8::Exception;
17
using v8::Function;
18
using v8::FunctionCallbackInfo;
19
using v8::FunctionTemplate;
20
using v8::HandleScope;
21
using v8::Isolate;
22
using v8::Just;
23
using v8::Local;
24
using v8::Maybe;
25
using v8::MaybeLocal;
26
using v8::Nothing;
27
using v8::Object;
28
using v8::ObjectTemplate;
29
using v8::SharedArrayBuffer;
30
using v8::String;
31
using v8::Value;
32
using v8::ValueDeserializer;
33
using v8::ValueSerializer;
34
using v8::WasmCompiledModule;
35
36
namespace node {
37
namespace worker {
38
39
22561
Message::Message(MallocedBuffer<char>&& buffer)
40
22561
    : main_message_buf_(std::move(buffer)) {}
41
42
namespace {
43
44
// This is used to tell V8 how to read transferred host objects, like other
45
// `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
46
8042
class DeserializerDelegate : public ValueDeserializer::Delegate {
47
 public:
48
8042
  DeserializerDelegate(
49
      Message* m,
50
      Environment* env,
51
      const std::vector<MessagePort*>& message_ports,
52
      const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
53
      const std::vector<WasmCompiledModule::TransferrableModule>& wasm_modules)
54
      : message_ports_(message_ports),
55
        shared_array_buffers_(shared_array_buffers),
56
8042
        wasm_modules_(wasm_modules) {}
57
58
158
  MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
59
    // Currently, only MessagePort hosts objects are supported, so identifying
60
    // by the index in the message's MessagePort array is sufficient.
61
    uint32_t id;
62
158
    if (!deserializer->ReadUint32(&id))
63
      return MaybeLocal<Object>();
64
158
    CHECK_LE(id, message_ports_.size());
65
316
    return message_ports_[id]->object(isolate);
66
  };
67
68
7
  MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
69
      Isolate* isolate, uint32_t clone_id) override {
70
7
    CHECK_LE(clone_id, shared_array_buffers_.size());
71
14
    return shared_array_buffers_[clone_id];
72
  }
73
74
2
  MaybeLocal<WasmCompiledModule> GetWasmModuleFromId(
75
      Isolate* isolate, uint32_t transfer_id) override {
76
2
    CHECK_LE(transfer_id, wasm_modules_.size());
77
    return WasmCompiledModule::FromTransferrableModule(
78
2
        isolate, wasm_modules_[transfer_id]);
79
  }
80
81
  ValueDeserializer* deserializer = nullptr;
82
83
 private:
84
  const std::vector<MessagePort*>& message_ports_;
85
  const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
86
  const std::vector<WasmCompiledModule::TransferrableModule>& wasm_modules_;
87
};
88
89
}  // anonymous namespace
90
91
8043
MaybeLocal<Value> Message::Deserialize(Environment* env,
92
                                       Local<Context> context) {
93
8043
  EscapableHandleScope handle_scope(env->isolate());
94
  Context::Scope context_scope(context);
95
96
  // Create all necessary MessagePort handles.
97
16085
  std::vector<MessagePort*> ports(message_ports_.size());
98
8201
  for (uint32_t i = 0; i < message_ports_.size(); ++i) {
99
159
    ports[i] = MessagePort::New(env,
100
                                context,
101
159
                                std::move(message_ports_[i]));
102
159
    if (ports[i] == nullptr) {
103
2
      for (MessagePort* port : ports) {
104
        // This will eventually release the MessagePort object itself.
105
1
        if (port != nullptr)
106
          port->Close();
107
      }
108
1
      return MaybeLocal<Value>();
109
    }
110
  }
111
8042
  message_ports_.clear();
112
113
16084
  std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
114
  // Attach all transferred SharedArrayBuffers to their new Isolate.
115
8049
  for (uint32_t i = 0; i < shared_array_buffers_.size(); ++i) {
116
    Local<SharedArrayBuffer> sab;
117
21
    if (!shared_array_buffers_[i]->GetSharedArrayBuffer(env, context)
118
21
            .ToLocal(&sab))
119
      return MaybeLocal<Value>();
120
7
    shared_array_buffers.push_back(sab);
121
  }
122
8042
  shared_array_buffers_.clear();
123
124
  DeserializerDelegate delegate(
125
16084
      this, env, ports, shared_array_buffers, wasm_modules_);
126
  ValueDeserializer deserializer(
127
      env->isolate(),
128
      reinterpret_cast<const uint8_t*>(main_message_buf_.data),
129
      main_message_buf_.size,
130
16083
      &delegate);
131
8042
  delegate.deserializer = &deserializer;
132
133
  // Attach all transferred ArrayBuffers to their new Isolate.
134
8044
  for (uint32_t i = 0; i < array_buffer_contents_.size(); ++i) {
135
2
    if (!env->isolate_data()->uses_node_allocator()) {
136
      // We don't use Node's allocator on the receiving side, so we have
137
      // to create the ArrayBuffer from a copy of the memory.
138
      AllocatedBuffer buf =
139
          env->AllocateManaged(array_buffer_contents_[i].size);
140
      memcpy(buf.data(),
141
             array_buffer_contents_[i].data,
142
             array_buffer_contents_[i].size);
143
      deserializer.TransferArrayBuffer(i, buf.ToArrayBuffer());
144
      continue;
145
    }
146
147
2
    env->isolate_data()->node_allocator()->RegisterPointer(
148
2
        array_buffer_contents_[i].data, array_buffer_contents_[i].size);
149
150
    Local<ArrayBuffer> ab =
151
        ArrayBuffer::New(env->isolate(),
152
2
                         array_buffer_contents_[i].release(),
153
2
                         array_buffer_contents_[i].size,
154
4
                         ArrayBufferCreationMode::kInternalized);
155
2
    deserializer.TransferArrayBuffer(i, ab);
156
  }
157
8042
  array_buffer_contents_.clear();
158
159
16084
  if (deserializer.ReadHeader(context).IsNothing())
160
    return MaybeLocal<Value>();
161
  return handle_scope.Escape(
162
16083
      deserializer.ReadValue(context).FromMaybe(Local<Value>()));
163
}
164
165
7
void Message::AddSharedArrayBuffer(
166
    const SharedArrayBufferMetadataReference& reference) {
167
7
  shared_array_buffers_.push_back(reference);
168
7
}
169
170
183
void Message::AddMessagePort(std::unique_ptr<MessagePortData>&& data) {
171
183
  message_ports_.emplace_back(std::move(data));
172
183
}
173
174
2
uint32_t Message::AddWASMModule(WasmCompiledModule::TransferrableModule&& mod) {
175
2
  wasm_modules_.emplace_back(std::move(mod));
176
2
  return wasm_modules_.size() - 1;
177
}
178
179
namespace {
180
181
11
void ThrowDataCloneException(Environment* env, Local<String> message) {
182
  Local<Value> argv[] = {
183
    message,
184
    FIXED_ONE_BYTE_STRING(env->isolate(), "DataCloneError")
185
33
  };
186
  Local<Value> exception;
187
11
  Local<Function> domexception_ctor = env->domexception_function();
188
11
  CHECK(!domexception_ctor.IsEmpty());
189
44
  if (!domexception_ctor->NewInstance(env->context(), arraysize(argv), argv)
190
33
          .ToLocal(&exception)) {
191
11
    return;
192
  }
193
11
  env->isolate()->ThrowException(exception);
194
}
195
196
// This tells V8 how to serialize objects that it does not understand
197
// (e.g. C++ objects) into the output buffer, in a way that our own
198
// DeserializerDelegate understands how to unpack.
199
8157
class SerializerDelegate : public ValueSerializer::Delegate {
200
 public:
201
8157
  SerializerDelegate(Environment* env, Local<Context> context, Message* m)
202
8157
      : env_(env), context_(context), msg_(m) {}
203
204
1
  void ThrowDataCloneError(Local<String> message) override {
205
1
    ThrowDataCloneException(env_, message);
206
1
  }
207
208
182
  Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
209
364
    if (env_->message_port_constructor_template()->HasInstance(object)) {
210
182
      return WriteMessagePort(Unwrap<MessagePort>(object));
211
    }
212
213
    THROW_ERR_CANNOT_TRANSFER_OBJECT(env_);
214
    return Nothing<bool>();
215
  }
216
217
7
  Maybe<uint32_t> GetSharedArrayBufferId(
218
      Isolate* isolate,
219
      Local<SharedArrayBuffer> shared_array_buffer) override {
220
    uint32_t i;
221
7
    for (i = 0; i < seen_shared_array_buffers_.size(); ++i) {
222
      if (seen_shared_array_buffers_[i] == shared_array_buffer)
223
        return Just(i);
224
    }
225
226
    auto reference = SharedArrayBufferMetadata::ForSharedArrayBuffer(
227
        env_,
228
        context_,
229
7
        shared_array_buffer);
230
7
    if (!reference) {
231
      return Nothing<uint32_t>();
232
    }
233
7
    seen_shared_array_buffers_.push_back(shared_array_buffer);
234
7
    msg_->AddSharedArrayBuffer(reference);
235
7
    return Just(i);
236
  }
237
238
2
  Maybe<uint32_t> GetWasmModuleTransferId(
239
      Isolate* isolate, Local<WasmCompiledModule> module) override {
240
2
    return Just(msg_->AddWASMModule(module->GetTransferrableModule()));
241
  }
242
243
8146
  void Finish() {
244
    // Only close the MessagePort handles and actually transfer them
245
    // once we know that serialization succeeded.
246
8329
    for (MessagePort* port : ports_) {
247
366
      port->Close();
248
183
      msg_->AddMessagePort(port->Detach());
249
    }
250
8146
  }
251
252
  ValueSerializer* serializer = nullptr;
253
254
 private:
255
182
  Maybe<bool> WriteMessagePort(MessagePort* port) {
256
182
    for (uint32_t i = 0; i < ports_.size(); i++) {
257
182
      if (ports_[i] == port) {
258
182
        serializer->WriteUint32(i);
259
182
        return Just(true);
260
      }
261
    }
262
263
    THROW_ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST(env_);
264
    return Nothing<bool>();
265
  }
266
267
  Environment* env_;
268
  Local<Context> context_;
269
  Message* msg_;
270
  std::vector<Local<SharedArrayBuffer>> seen_shared_array_buffers_;
271
  std::vector<MessagePort*> ports_;
272
273
  friend class worker::Message;
274
};
275
276
}  // anonymous namespace
277
278
8157
Maybe<bool> Message::Serialize(Environment* env,
279
                               Local<Context> context,
280
                               Local<Value> input,
281
                               Local<Value> transfer_list_v,
282
                               Local<Object> source_port) {
283
8157
  HandleScope handle_scope(env->isolate());
284
  Context::Scope context_scope(context);
285
286
  // Verify that we're not silently overwriting an existing message.
287
8157
  CHECK(main_message_buf_.is_empty());
288
289
16314
  SerializerDelegate delegate(env, context, this);
290
16314
  ValueSerializer serializer(env->isolate(), &delegate);
291
8157
  delegate.serializer = &serializer;
292
293
16314
  std::vector<Local<ArrayBuffer>> array_buffers;
294
8157
  if (transfer_list_v->IsArray()) {
295
196
    Local<Array> transfer_list = transfer_list_v.As<Array>();
296
196
    uint32_t length = transfer_list->Length();
297
392
    for (uint32_t i = 0; i < length; ++i) {
298
      Local<Value> entry;
299
412
      if (!transfer_list->Get(context, i).ToLocal(&entry))
300
10
        return Nothing<bool>();
301
      // Currently, we support ArrayBuffers and MessagePorts.
302
206
      if (entry->IsArrayBuffer()) {
303
12
        Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
304
        // If we cannot render the ArrayBuffer unusable in this Isolate and
305
        // take ownership of its memory, copying the buffer will have to do.
306


36
        if (!ab->IsNeuterable() || ab->IsExternal() ||
307
12
            !env->isolate_data()->uses_node_allocator()) {
308
11
          continue;
309
        }
310
12
        if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
311
            array_buffers.end()) {
312
          ThrowDataCloneException(
313
              env,
314
              FIXED_ONE_BYTE_STRING(
315
                  env->isolate(),
316
1
                  "Transfer list contains duplicate ArrayBuffer"));
317
1
          return Nothing<bool>();
318
        }
319
        // We simply use the array index in the `array_buffers` list as the
320
        // ID that we write into the serialized buffer.
321
11
        uint32_t id = array_buffers.size();
322
11
        array_buffers.push_back(ab);
323
11
        serializer.TransferArrayBuffer(id, ab);
324
11
        continue;
325
388
      } else if (env->message_port_constructor_template()
326
582
                    ->HasInstance(entry)) {
327
        // Check if the source MessagePort is being transferred.
328

388
        if (!source_port.IsEmpty() && entry == source_port) {
329
          ThrowDataCloneException(
330
              env,
331
              FIXED_ONE_BYTE_STRING(env->isolate(),
332
1
                                    "Transfer list contains source port"));
333
10
          return Nothing<bool>();
334
        }
335
193
        MessagePort* port = Unwrap<MessagePort>(entry.As<Object>());
336

193
        if (port == nullptr || port->IsDetached()) {
337
          ThrowDataCloneException(
338
              env,
339
              FIXED_ONE_BYTE_STRING(
340
                  env->isolate(),
341
7
                  "MessagePort in transfer list is already detached"));
342
7
          return Nothing<bool>();
343
        }
344
186
        if (std::find(delegate.ports_.begin(), delegate.ports_.end(), port) !=
345
            delegate.ports_.end()) {
346
          ThrowDataCloneException(
347
              env,
348
              FIXED_ONE_BYTE_STRING(
349
                  env->isolate(),
350
1
                  "Transfer list contains duplicate MessagePort"));
351
1
          return Nothing<bool>();
352
        }
353
185
        delegate.ports_.push_back(port);
354
185
        continue;
355
      }
356
357
      THROW_ERR_INVALID_TRANSFER_OBJECT(env);
358
      return Nothing<bool>();
359
    }
360
  }
361
362
8147
  serializer.WriteHeader();
363
16294
  if (serializer.WriteValue(context, input).IsNothing()) {
364
1
    return Nothing<bool>();
365
  }
366
367
8149
  for (Local<ArrayBuffer> ab : array_buffers) {
368
    // If serialization succeeded, we want to take ownership of
369
    // (a.k.a. externalize) the underlying memory region and render
370
    // it inaccessible in this Isolate.
371
3
    ArrayBuffer::Contents contents = ab->Externalize();
372
3
    ab->Neuter();
373
374
3
    CHECK(env->isolate_data()->uses_node_allocator());
375
3
    env->isolate_data()->node_allocator()->UnregisterPointer(
376
3
        contents.Data(), contents.ByteLength());
377
378
    array_buffer_contents_.push_back(
379
3
        MallocedBuffer<char> { static_cast<char*>(contents.Data()),
380
3
                               contents.ByteLength() });
381
  }
382
383
8146
  delegate.Finish();
384
385
  // The serializer gave us a buffer allocated using `malloc()`.
386
8146
  std::pair<uint8_t*, size_t> data = serializer.Release();
387
16292
  main_message_buf_ =
388
8146
      MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
389
16303
  return Just(true);
390
}
391
392
4
void Message::MemoryInfo(MemoryTracker* tracker) const {
393
4
  tracker->TrackField("array_buffer_contents", array_buffer_contents_);
394
  tracker->TrackFieldWithSize("shared_array_buffers",
395
4
      shared_array_buffers_.size() * sizeof(shared_array_buffers_[0]));
396
4
  tracker->TrackField("message_ports", message_ports_);
397
4
}
398
399
1070
MessagePortData::MessagePortData(MessagePort* owner) : owner_(owner) { }
400
401
3084
MessagePortData::~MessagePortData() {
402
1028
  CHECK_NULL(owner_);
403
1028
  Disentangle();
404
2056
}
405
406
8
void MessagePortData::MemoryInfo(MemoryTracker* tracker) const {
407
8
  Mutex::ScopedLock lock(mutex_);
408
8
  tracker->TrackField("incoming_messages", incoming_messages_);
409
8
}
410
411
8145
void MessagePortData::AddToIncomingQueue(Message&& message) {
412
  // This function will be called by other threads.
413
8145
  Mutex::ScopedLock lock(mutex_);
414
8145
  incoming_messages_.emplace_back(std::move(message));
415
416
8145
  if (owner_ != nullptr) {
417
7621
    Debug(owner_, "Adding message to incoming queue");
418
7621
    owner_->TriggerAsync();
419
8145
  }
420
8145
}
421
422
6362
bool MessagePortData::IsSiblingClosed() const {
423
6362
  Mutex::ScopedLock lock(*sibling_mutex_);
424
6362
  return sibling_ == nullptr;
425
}
426
427
371
void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
428
371
  CHECK_NULL(a->sibling_);
429
371
  CHECK_NULL(b->sibling_);
430
371
  a->sibling_ = b;
431
371
  b->sibling_ = a;
432
371
  a->sibling_mutex_ = b->sibling_mutex_;
433
371
}
434
435
2072
void MessagePortData::PingOwnerAfterDisentanglement() {
436
2072
  Mutex::ScopedLock lock(mutex_);
437
2073
  if (owner_ != nullptr)
438
367
    owner_->TriggerAsync();
439
2072
}
440
441
1701
void MessagePortData::Disentangle() {
442
  // Grab a copy of the sibling mutex, then replace it so that each sibling
443
  // has its own sibling_mutex_ now.
444
1701
  std::shared_ptr<Mutex> sibling_mutex = sibling_mutex_;
445
3403
  Mutex::ScopedLock sibling_lock(*sibling_mutex);
446
1702
  sibling_mutex_ = std::make_shared<Mutex>();
447
448
1701
  MessagePortData* sibling = sibling_;
449
1701
  if (sibling_ != nullptr) {
450
371
    sibling_->sibling_ = nullptr;
451
371
    sibling_ = nullptr;
452
  }
453
454
  // We close MessagePorts after disentanglement, so we trigger the
455
  // corresponding uv_async_t to let them know that this happened.
456
1701
  PingOwnerAfterDisentanglement();
457
1701
  if (sibling != nullptr) {
458
371
    sibling->PingOwnerAfterDisentanglement();
459
1702
  }
460
1702
}
461
462
2571
MessagePort::~MessagePort() {
463
857
  if (data_)
464
    data_->owner_ = nullptr;
465
1714
}
466
467
899
MessagePort::MessagePort(Environment* env,
468
                         Local<Context> context,
469
                         Local<Object> wrap)
470
  : HandleWrap(env,
471
               wrap,
472
               reinterpret_cast<uv_handle_t*>(new uv_async_t()),
473
               AsyncWrap::PROVIDER_MESSAGEPORT),
474
899
    data_(new MessagePortData(this)) {
475
13507
  auto onmessage = [](uv_async_t* handle) {
476
    // Called when data has been put into the queue.
477
6304
    MessagePort* channel = static_cast<MessagePort*>(handle->data);
478
6304
    channel->OnMessage();
479
13505
  };
480
899
  CHECK_EQ(uv_async_init(env->event_loop(),
481
                         async(),
482
                         onmessage), 0);
483
899
  async()->data = static_cast<void*>(this);
484
485
  Local<Value> fn;
486
2697
  if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn))
487
899
    return;
488
489
899
  if (fn->IsFunction()) {
490
730
    Local<Function> init = fn.As<Function>();
491
730
    USE(init->Call(context, wrap, 0, nullptr));
492
  }
493
494
899
  Debug(this, "Created message port");
495
}
496
497
void MessagePort::AddToIncomingQueue(Message&& message) {
498
  data_->AddToIncomingQueue(std::move(message));
499
}
500
501
11246
uv_async_t* MessagePort::async() {
502
11246
  return reinterpret_cast<uv_async_t*>(GetHandle());
503
}
504
505
192
bool MessagePort::IsDetached() const {
506

192
  return data_ == nullptr || IsHandleClosing();
507
}
508
509
8597
void MessagePort::TriggerAsync() {
510
17194
  if (IsHandleClosing()) return;
511
8591
  CHECK_EQ(uv_async_send(async()), 0);
512
}
513
514
1080
void MessagePort::Close(v8::Local<v8::Value> close_callback) {
515
2160
  Debug(this, "Closing message port, data set = %d", static_cast<int>(!!data_));
516
517
1080
  if (data_) {
518
    // Wrap this call with accessing the mutex, so that TriggerAsync()
519
    // can check IsHandleClosing() without race conditions.
520
1080
    Mutex::ScopedLock sibling_lock(data_->mutex_);
521
1080
    HandleWrap::Close(close_callback);
522
  } else {
523
    HandleWrap::Close(close_callback);
524
  }
525
1080
}
526
527
899
void MessagePort::New(const FunctionCallbackInfo<Value>& args) {
528
899
  Environment* env = Environment::GetCurrent(args);
529
899
  if (!args.IsConstructCall()) {
530
    THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
531
899
    return;
532
  }
533
534
1798
  Local<Context> context = args.This()->CreationContext();
535
  Context::Scope context_scope(context);
536
537
1798
  new MessagePort(env, context, args.This());
538
}
539
540
899
MessagePort* MessagePort::New(
541
    Environment* env,
542
    Local<Context> context,
543
    std::unique_ptr<MessagePortData> data) {
544
  Context::Scope context_scope(context);
545
  Local<Function> ctor;
546
1798
  if (!GetMessagePortConstructor(env, context).ToLocal(&ctor))
547
    return nullptr;
548
549
  // Construct a new instance, then assign the listener instance and possibly
550
  // the MessagePortData to it.
551
  Local<Object> instance;
552
1798
  if (!ctor->NewInstance(context).ToLocal(&instance))
553
1
    return nullptr;
554
898
  MessagePort* port = Unwrap<MessagePort>(instance);
555
898
  CHECK_NOT_NULL(port);
556
898
  if (data) {
557
327
    port->Detach();
558
327
    port->data_ = std::move(data);
559
560
    // This lock is here to avoid race conditions with the `owner_` read
561
    // in AddToIncomingQueue(). (This would likely be unproblematic without it,
562
    // but it's better to be safe than sorry.)
563
327
    Mutex::ScopedLock lock(port->data_->mutex_);
564
327
    port->data_->owner_ = port;
565
    // If the existing MessagePortData object had pending messages, this is
566
    // the easiest way to run that queue.
567
327
    port->TriggerAsync();
568
  }
569
898
  return port;
570
}
571
572
6396
void MessagePort::OnMessage() {
573
6396
  Debug(this, "Running MessagePort::OnMessage()");
574
6396
  HandleScope handle_scope(env()->isolate());
575
12792
  Local<Context> context = object(env()->isolate())->CreationContext();
576
577
  // data_ can only ever be modified by the owner thread, so no need to lock.
578
  // However, the message port may be transferred while it is processing
579
  // messages, so we need to check that this handle still owns its `data_` field
580
  // on every iteration.
581
20800
  while (data_) {
582
14405
    Message received;
583
    {
584
      // Get the head of the message queue.
585
14405
      Mutex::ScopedLock lock(data_->mutex_);
586
587
14405
      if (stop_event_loop_) {
588
25
        Debug(this, "MessagePort stops loop as requested");
589
25
        CHECK(!data_->receiving_messages_);
590
25
        uv_stop(env()->event_loop());
591
25
        break;
592
      }
593
594
      Debug(this, "MessagePort has message, receiving = %d",
595
28759
            static_cast<int>(data_->receiving_messages_));
596
597
14379
      if (!data_->receiving_messages_)
598
55
        break;
599
14324
      if (data_->incoming_messages_.empty())
600
6280
        break;
601
8043
      received = std::move(data_->incoming_messages_.front());
602
8043
      data_->incoming_messages_.pop_front();
603
    }
604
605
8043
    if (!env()->can_call_into_js()) {
606
      Debug(this, "MessagePort drains queue because !can_call_into_js()");
607
      // In this case there is nothing to do but to drain the current queue.
608
      continue;
609
    }
610
611
    {
612
      // Call the JS .onmessage() callback.
613
8043
      HandleScope handle_scope(env()->isolate());
614
8009
      Context::Scope context_scope(context);
615
616
      Local<Object> event;
617
      Local<Value> payload;
618
16086
      Local<Value> cb_args[1];
619

48242
      if (!received.Deserialize(env(), context).ToLocal(&payload) ||
620
24120
          !env()->message_event_object_template()->NewInstance(context)
621

48246
              .ToLocal(&event) ||
622

56288
          event->Set(context, env()->data_string(), payload).IsNothing() ||
623


56288
          event->Set(context, env()->target_string(), object()).IsNothing() ||
624

40201
          (cb_args[0] = event, false) ||
625
          MakeCallback(env()->onmessage_string(),
626
8041
                       arraysize(cb_args),
627
24118
                       cb_args).IsEmpty()) {
628
        // Re-schedule OnMessage() execution in case of failure.
629
32
        if (data_)
630
32
          TriggerAsync();
631
6428
        return;
632

8009
      }
633
    }
634
8008
  }
635
636

6362
  if (data_ && data_->IsSiblingClosed()) {
637
716
    Close();
638
6362
  }
639
}
640
641
bool MessagePort::IsSiblingClosed() const {
642
  CHECK(data_);
643
  return data_->IsSiblingClosed();
644
}
645
646
857
void MessagePort::OnClose() {
647
857
  Debug(this, "MessagePort::OnClose()");
648
857
  if (data_) {
649
674
    data_->owner_ = nullptr;
650
674
    data_->Disentangle();
651
  }
652
857
  data_.reset();
653
857
  delete async();
654
857
}
655
656
510
std::unique_ptr<MessagePortData> MessagePort::Detach() {
657
510
  CHECK(data_);
658
510
  Mutex::ScopedLock lock(data_->mutex_);
659
510
  data_->owner_ = nullptr;
660
510
  return std::move(data_);
661
}
662
663
664
8156
Maybe<bool> MessagePort::PostMessage(Environment* env,
665
                                     Local<Value> message_v,
666
                                     Local<Value> transfer_v) {
667
8156
  Isolate* isolate = env->isolate();
668
8156
  Local<Object> obj = object(isolate);
669
8156
  Local<Context> context = obj->CreationContext();
670
671
8156
  Message msg;
672
673
  // Per spec, we need to both check if transfer list has the source port, and
674
  // serialize the input message, even if the MessagePort is closed or detached.
675
676
  Maybe<bool> serialization_maybe =
677
8156
      msg.Serialize(env, context, message_v, transfer_v, obj);
678
8156
  if (data_ == nullptr) {
679
2
    return serialization_maybe;
680
  }
681
8154
  if (serialization_maybe.IsNothing()) {
682
8
    return Nothing<bool>();
683
  }
684
685
16292
  Mutex::ScopedLock lock(*data_->sibling_mutex_);
686
8146
  bool doomed = false;
687
688
  // Check if the target port is posted to itself.
689
8146
  if (data_->sibling_ != nullptr) {
690
8328
    for (const auto& port_data : msg.message_ports()) {
691
183
      if (data_->sibling_ == port_data.get()) {
692
1
        doomed = true;
693
        ProcessEmitWarning(env, "The target port was posted to itself, and "
694
1
                                "the communication channel was lost");
695
1
        break;
696
      }
697
    }
698
  }
699
700

8146
  if (data_->sibling_ == nullptr || doomed)
701
1
    return Just(true);
702
703
8145
  data_->sibling_->AddToIncomingQueue(std::move(msg));
704
16301
  return Just(true);
705
}
706
707
8157
void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
708
8157
  Environment* env = Environment::GetCurrent(args);
709
8157
  if (args.Length() == 0) {
710
    return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
711
                                       "MessagePort.postMessage");
712
  }
713
714
8157
  MessagePort* port = Unwrap<MessagePort>(args.This());
715
  // Even if the backing MessagePort object has already been deleted, we still
716
  // want to serialize the message to ensure spec-compliant behavior w.r.t.
717
  // transfers.
718
8157
  if (port == nullptr) {
719
1
    Message msg;
720
1
    Local<Object> obj = args.This();
721
1
    Local<Context> context = obj->CreationContext();
722
1
    USE(msg.Serialize(env, context, args[0], args[1], obj));
723
1
    return;
724
  }
725
726
8156
  port->PostMessage(env, args[0], args[1]);
727
}
728
729
805
void MessagePort::Start() {
730
805
  Mutex::ScopedLock lock(data_->mutex_);
731
805
  Debug(this, "Start receiving messages");
732
805
  data_->receiving_messages_ = true;
733
805
  if (!data_->incoming_messages_.empty())
734
186
    TriggerAsync();
735
805
}
736
737
32
void MessagePort::Stop() {
738
32
  Mutex::ScopedLock lock(data_->mutex_);
739
32
  Debug(this, "Stop receiving messages");
740
32
  data_->receiving_messages_ = false;
741
32
}
742
743
64
void MessagePort::StopEventLoop() {
744
64
  Mutex::ScopedLock lock(data_->mutex_);
745
64
  data_->receiving_messages_ = false;
746
64
  stop_event_loop_ = true;
747
748
64
  Debug(this, "Received StopEventLoop request");
749
64
  TriggerAsync();
750
64
}
751
752
805
void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
753
805
  Environment* env = Environment::GetCurrent(args);
754
  MessagePort* port;
755
805
  ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
756
805
  if (!port->data_) {
757
    THROW_ERR_CLOSED_MESSAGE_PORT(env);
758
    return;
759
  }
760
805
  port->Start();
761
}
762
763
111
void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
764
111
  Environment* env = Environment::GetCurrent(args);
765
  MessagePort* port;
766
222
  CHECK(args[0]->IsObject());
767
301
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
768
32
  if (!port->data_) {
769
    THROW_ERR_CLOSED_MESSAGE_PORT(env);
770
    return;
771
  }
772
32
  port->Stop();
773
}
774
775
336
void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
776
  MessagePort* port;
777
672
  CHECK(args[0]->IsObject());
778
1008
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
779
92
  port->OnMessage();
780
}
781
782
200
void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
783
200
  Entangle(a, b->data_.get());
784
200
}
785
786
371
void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
787
371
  MessagePortData::Entangle(a->data_.get(), b);
788
371
}
789
790
9696
MaybeLocal<Function> GetMessagePortConstructor(
791
    Environment* env, Local<Context> context) {
792
  // Factor generating the MessagePort JS constructor into its own piece
793
  // of code, because it is needed early on in the child environment setup.
794
9696
  Local<FunctionTemplate> templ = env->message_port_constructor_template();
795
9696
  if (!templ.IsEmpty())
796
5294
    return templ->GetFunction(context);
797
798
4402
  Isolate* isolate = env->isolate();
799
800
  {
801
4402
    Local<FunctionTemplate> m = env->NewFunctionTemplate(MessagePort::New);
802
8804
    m->SetClassName(env->message_port_constructor_string());
803
8804
    m->InstanceTemplate()->SetInternalFieldCount(1);
804
8804
    m->Inherit(HandleWrap::GetConstructorTemplate(env));
805
806
4402
    env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage);
807
4402
    env->SetProtoMethod(m, "start", MessagePort::Start);
808
809
4402
    env->set_message_port_constructor_template(m);
810
811
4402
    Local<FunctionTemplate> event_ctor = FunctionTemplate::New(isolate);
812
8804
    event_ctor->SetClassName(FIXED_ONE_BYTE_STRING(isolate, "MessageEvent"));
813
4402
    Local<ObjectTemplate> e = event_ctor->InstanceTemplate();
814
8804
    e->Set(env->data_string(), Null(isolate));
815
8804
    e->Set(env->target_string(), Null(isolate));
816
4402
    env->set_message_event_object_template(e);
817
  }
818
819
4402
  return GetMessagePortConstructor(env, context);
820
}
821
822
namespace {
823
824
200
static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
825
200
  Environment* env = Environment::GetCurrent(args);
826
200
  if (!args.IsConstructCall()) {
827
    THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
828
200
    return;
829
  }
830
831
400
  Local<Context> context = args.This()->CreationContext();
832
  Context::Scope context_scope(context);
833
834
200
  MessagePort* port1 = MessagePort::New(env, context);
835
200
  MessagePort* port2 = MessagePort::New(env, context);
836
200
  MessagePort::Entangle(port1, port2);
837
838
1200
  args.This()->Set(env->context(), env->port1_string(), port1->object())
839
400
      .FromJust();
840
1200
  args.This()->Set(env->context(), env->port2_string(), port2->object())
841
400
      .FromJust();
842
}
843
844
4392
static void RegisterDOMException(const FunctionCallbackInfo<Value>& args) {
845
4392
  Environment* env = Environment::GetCurrent(args);
846
4392
  CHECK_EQ(args.Length(), 1);
847
8784
  CHECK(args[0]->IsFunction());
848
8784
  env->set_domexception_function(args[0].As<Function>());
849
4392
}
850
851
4395
static void InitMessaging(Local<Object> target,
852
                          Local<Value> unused,
853
                          Local<Context> context,
854
                          void* priv) {
855
4395
  Environment* env = Environment::GetCurrent(context);
856
857
  {
858
    Local<String> message_channel_string =
859
4395
        FIXED_ONE_BYTE_STRING(env->isolate(), "MessageChannel");
860
4395
    Local<FunctionTemplate> templ = env->NewFunctionTemplate(MessageChannel);
861
4395
    templ->SetClassName(message_channel_string);
862
    target->Set(env->context(),
863
                message_channel_string,
864
17580
                templ->GetFunction(context).ToLocalChecked()).FromJust();
865
  }
866
867
  target->Set(context,
868
              env->message_port_constructor_string(),
869
17580
              GetMessagePortConstructor(env, context).ToLocalChecked())
870
8790
                  .FromJust();
871
872
4395
  env->SetMethod(target, "registerDOMException", RegisterDOMException);
873
874
  // These are not methods on the MessagePort prototype, because
875
  // the browser equivalents do not provide them.
876
4395
  env->SetMethod(target, "stopMessagePort", MessagePort::Stop);
877
4395
  env->SetMethod(target, "drainMessagePort", MessagePort::Drain);
878
4395
}
879
880
}  // anonymous namespace
881
882
}  // namespace worker
883
}  // namespace node
884
885
4282
NODE_MODULE_CONTEXT_AWARE_INTERNAL(messaging, node::worker::InitMessaging)