GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/node_messaging.cc Lines: 31 404 7.7 %
Date: 2019-02-01 22:03:38 Branches: 4 214 1.9 %

Line Branch Exec Source
1
#include "node_messaging.h"
2
#include "async_wrap-inl.h"
3
#include "async_wrap.h"
4
#include "debug_utils.h"
5
#include "node_buffer.h"
6
#include "node_errors.h"
7
#include "node_process.h"
8
#include "util-inl.h"
9
#include "util.h"
10
11
using v8::Array;
12
using v8::ArrayBuffer;
13
using v8::ArrayBufferCreationMode;
14
using v8::Context;
15
using v8::EscapableHandleScope;
16
using v8::Exception;
17
using v8::Function;
18
using v8::FunctionCallbackInfo;
19
using v8::FunctionTemplate;
20
using v8::HandleScope;
21
using v8::Isolate;
22
using v8::Just;
23
using v8::Local;
24
using v8::Maybe;
25
using v8::MaybeLocal;
26
using v8::Nothing;
27
using v8::Object;
28
using v8::SharedArrayBuffer;
29
using v8::String;
30
using v8::Value;
31
using v8::ValueDeserializer;
32
using v8::ValueSerializer;
33
using v8::WasmCompiledModule;
34
35
namespace node {
36
namespace worker {
37
38
Message::Message(MallocedBuffer<char>&& buffer)
39
    : main_message_buf_(std::move(buffer)) {}
40
41
namespace {
42
43
// This is used to tell V8 how to read transferred host objects, like other
44
// `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
45
class DeserializerDelegate : public ValueDeserializer::Delegate {
46
 public:
47
  DeserializerDelegate(
48
      Message* m,
49
      Environment* env,
50
      const std::vector<MessagePort*>& message_ports,
51
      const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
52
      const std::vector<WasmCompiledModule::TransferrableModule>& wasm_modules)
53
      : message_ports_(message_ports),
54
        shared_array_buffers_(shared_array_buffers),
55
        wasm_modules_(wasm_modules) {}
56
57
  MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
58
    // Currently, only MessagePort hosts objects are supported, so identifying
59
    // by the index in the message's MessagePort array is sufficient.
60
    uint32_t id;
61
    if (!deserializer->ReadUint32(&id))
62
      return MaybeLocal<Object>();
63
    CHECK_LE(id, message_ports_.size());
64
    return message_ports_[id]->object(isolate);
65
  };
66
67
  MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
68
      Isolate* isolate, uint32_t clone_id) override {
69
    CHECK_LE(clone_id, shared_array_buffers_.size());
70
    return shared_array_buffers_[clone_id];
71
  }
72
73
  MaybeLocal<WasmCompiledModule> GetWasmModuleFromId(
74
      Isolate* isolate, uint32_t transfer_id) override {
75
    CHECK_LE(transfer_id, wasm_modules_.size());
76
    return WasmCompiledModule::FromTransferrableModule(
77
        isolate, wasm_modules_[transfer_id]);
78
  }
79
80
  ValueDeserializer* deserializer = nullptr;
81
82
 private:
83
  const std::vector<MessagePort*>& message_ports_;
84
  const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
85
  const std::vector<WasmCompiledModule::TransferrableModule>& wasm_modules_;
86
};
87
88
}  // anonymous namespace
89
90
MaybeLocal<Value> Message::Deserialize(Environment* env,
91
                                       Local<Context> context) {
92
  EscapableHandleScope handle_scope(env->isolate());
93
  Context::Scope context_scope(context);
94
95
  // Create all necessary MessagePort handles.
96
  std::vector<MessagePort*> ports(message_ports_.size());
97
  for (uint32_t i = 0; i < message_ports_.size(); ++i) {
98
    ports[i] = MessagePort::New(env,
99
                                context,
100
                                std::move(message_ports_[i]));
101
    if (ports[i] == nullptr) {
102
      for (MessagePort* port : ports) {
103
        // This will eventually release the MessagePort object itself.
104
        if (port != nullptr)
105
          port->Close();
106
      }
107
      return MaybeLocal<Value>();
108
    }
109
  }
110
  message_ports_.clear();
111
112
  std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
113
  // Attach all transferred SharedArrayBuffers to their new Isolate.
114
  for (uint32_t i = 0; i < shared_array_buffers_.size(); ++i) {
115
    Local<SharedArrayBuffer> sab;
116
    if (!shared_array_buffers_[i]->GetSharedArrayBuffer(env, context)
117
            .ToLocal(&sab))
118
      return MaybeLocal<Value>();
119
    shared_array_buffers.push_back(sab);
120
  }
121
  shared_array_buffers_.clear();
122
123
  DeserializerDelegate delegate(
124
      this, env, ports, shared_array_buffers, wasm_modules_);
125
  ValueDeserializer deserializer(
126
      env->isolate(),
127
      reinterpret_cast<const uint8_t*>(main_message_buf_.data),
128
      main_message_buf_.size,
129
      &delegate);
130
  delegate.deserializer = &deserializer;
131
132
  // Attach all transferred ArrayBuffers to their new Isolate.
133
  for (uint32_t i = 0; i < array_buffer_contents_.size(); ++i) {
134
    Local<ArrayBuffer> ab =
135
        ArrayBuffer::New(env->isolate(),
136
                         array_buffer_contents_[i].release(),
137
                         array_buffer_contents_[i].size,
138
                         ArrayBufferCreationMode::kInternalized);
139
    deserializer.TransferArrayBuffer(i, ab);
140
  }
141
  array_buffer_contents_.clear();
142
143
  if (deserializer.ReadHeader(context).IsNothing())
144
    return MaybeLocal<Value>();
145
  return handle_scope.Escape(
146
      deserializer.ReadValue(context).FromMaybe(Local<Value>()));
147
}
148
149
void Message::AddSharedArrayBuffer(
150
    SharedArrayBufferMetadataReference reference) {
151
  shared_array_buffers_.push_back(reference);
152
}
153
154
void Message::AddMessagePort(std::unique_ptr<MessagePortData>&& data) {
155
  message_ports_.emplace_back(std::move(data));
156
}
157
158
uint32_t Message::AddWASMModule(WasmCompiledModule::TransferrableModule&& mod) {
159
  wasm_modules_.emplace_back(std::move(mod));
160
  return wasm_modules_.size() - 1;
161
}
162
163
namespace {
164
165
void ThrowDataCloneException(Environment* env, Local<String> message) {
166
  Local<Value> argv[] = {
167
    message,
168
    FIXED_ONE_BYTE_STRING(env->isolate(), "DataCloneError")
169
  };
170
  Local<Value> exception;
171
  Local<Function> domexception_ctor = env->domexception_function();
172
  CHECK(!domexception_ctor.IsEmpty());
173
  if (!domexception_ctor->NewInstance(env->context(), arraysize(argv), argv)
174
          .ToLocal(&exception)) {
175
    return;
176
  }
177
  env->isolate()->ThrowException(exception);
178
}
179
180
// This tells V8 how to serialize objects that it does not understand
181
// (e.g. C++ objects) into the output buffer, in a way that our own
182
// DeserializerDelegate understands how to unpack.
183
class SerializerDelegate : public ValueSerializer::Delegate {
184
 public:
185
  SerializerDelegate(Environment* env, Local<Context> context, Message* m)
186
      : env_(env), context_(context), msg_(m) {}
187
188
  void ThrowDataCloneError(Local<String> message) override {
189
    ThrowDataCloneException(env_, message);
190
  }
191
192
  Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
193
    if (env_->message_port_constructor_template()->HasInstance(object)) {
194
      return WriteMessagePort(Unwrap<MessagePort>(object));
195
    }
196
197
    THROW_ERR_CANNOT_TRANSFER_OBJECT(env_);
198
    return Nothing<bool>();
199
  }
200
201
  Maybe<uint32_t> GetSharedArrayBufferId(
202
      Isolate* isolate,
203
      Local<SharedArrayBuffer> shared_array_buffer) override {
204
    uint32_t i;
205
    for (i = 0; i < seen_shared_array_buffers_.size(); ++i) {
206
      if (seen_shared_array_buffers_[i] == shared_array_buffer)
207
        return Just(i);
208
    }
209
210
    auto reference = SharedArrayBufferMetadata::ForSharedArrayBuffer(
211
        env_,
212
        context_,
213
        shared_array_buffer);
214
    if (!reference) {
215
      return Nothing<uint32_t>();
216
    }
217
    seen_shared_array_buffers_.push_back(shared_array_buffer);
218
    msg_->AddSharedArrayBuffer(reference);
219
    return Just(i);
220
  }
221
222
  Maybe<uint32_t> GetWasmModuleTransferId(
223
      Isolate* isolate, Local<WasmCompiledModule> module) override {
224
    return Just(msg_->AddWASMModule(module->GetTransferrableModule()));
225
  }
226
227
  void Finish() {
228
    // Only close the MessagePort handles and actually transfer them
229
    // once we know that serialization succeeded.
230
    for (MessagePort* port : ports_) {
231
      port->Close();
232
      msg_->AddMessagePort(port->Detach());
233
    }
234
  }
235
236
  ValueSerializer* serializer = nullptr;
237
238
 private:
239
  Maybe<bool> WriteMessagePort(MessagePort* port) {
240
    for (uint32_t i = 0; i < ports_.size(); i++) {
241
      if (ports_[i] == port) {
242
        serializer->WriteUint32(i);
243
        return Just(true);
244
      }
245
    }
246
247
    THROW_ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST(env_);
248
    return Nothing<bool>();
249
  }
250
251
  Environment* env_;
252
  Local<Context> context_;
253
  Message* msg_;
254
  std::vector<Local<SharedArrayBuffer>> seen_shared_array_buffers_;
255
  std::vector<MessagePort*> ports_;
256
257
  friend class worker::Message;
258
};
259
260
}  // anonymous namespace
261
262
Maybe<bool> Message::Serialize(Environment* env,
263
                               Local<Context> context,
264
                               Local<Value> input,
265
                               Local<Value> transfer_list_v,
266
                               Local<Object> source_port) {
267
  HandleScope handle_scope(env->isolate());
268
  Context::Scope context_scope(context);
269
270
  // Verify that we're not silently overwriting an existing message.
271
  CHECK(main_message_buf_.is_empty());
272
273
  SerializerDelegate delegate(env, context, this);
274
  ValueSerializer serializer(env->isolate(), &delegate);
275
  delegate.serializer = &serializer;
276
277
  std::vector<Local<ArrayBuffer>> array_buffers;
278
  if (transfer_list_v->IsArray()) {
279
    Local<Array> transfer_list = transfer_list_v.As<Array>();
280
    uint32_t length = transfer_list->Length();
281
    for (uint32_t i = 0; i < length; ++i) {
282
      Local<Value> entry;
283
      if (!transfer_list->Get(context, i).ToLocal(&entry))
284
        return Nothing<bool>();
285
      // Currently, we support ArrayBuffers and MessagePorts.
286
      if (entry->IsArrayBuffer()) {
287
        Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
288
        // If we cannot render the ArrayBuffer unusable in this Isolate and
289
        // take ownership of its memory, copying the buffer will have to do.
290
        if (!ab->IsNeuterable() || ab->IsExternal())
291
          continue;
292
        if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
293
            array_buffers.end()) {
294
          ThrowDataCloneException(
295
              env,
296
              FIXED_ONE_BYTE_STRING(
297
                  env->isolate(),
298
                  "Transfer list contains duplicate ArrayBuffer"));
299
          return Nothing<bool>();
300
        }
301
        // We simply use the array index in the `array_buffers` list as the
302
        // ID that we write into the serialized buffer.
303
        uint32_t id = array_buffers.size();
304
        array_buffers.push_back(ab);
305
        serializer.TransferArrayBuffer(id, ab);
306
        continue;
307
      } else if (env->message_port_constructor_template()
308
                    ->HasInstance(entry)) {
309
        // Check if the source MessagePort is being transferred.
310
        if (!source_port.IsEmpty() && entry == source_port) {
311
          ThrowDataCloneException(
312
              env,
313
              FIXED_ONE_BYTE_STRING(env->isolate(),
314
                                    "Transfer list contains source port"));
315
          return Nothing<bool>();
316
        }
317
        MessagePort* port = Unwrap<MessagePort>(entry.As<Object>());
318
        if (port == nullptr || port->IsDetached()) {
319
          ThrowDataCloneException(
320
              env,
321
              FIXED_ONE_BYTE_STRING(
322
                  env->isolate(),
323
                  "MessagePort in transfer list is already detached"));
324
          return Nothing<bool>();
325
        }
326
        if (std::find(delegate.ports_.begin(), delegate.ports_.end(), port) !=
327
            delegate.ports_.end()) {
328
          ThrowDataCloneException(
329
              env,
330
              FIXED_ONE_BYTE_STRING(
331
                  env->isolate(),
332
                  "Transfer list contains duplicate MessagePort"));
333
          return Nothing<bool>();
334
        }
335
        delegate.ports_.push_back(port);
336
        continue;
337
      }
338
339
      THROW_ERR_INVALID_TRANSFER_OBJECT(env);
340
      return Nothing<bool>();
341
    }
342
  }
343
344
  serializer.WriteHeader();
345
  if (serializer.WriteValue(context, input).IsNothing()) {
346
    return Nothing<bool>();
347
  }
348
349
  for (Local<ArrayBuffer> ab : array_buffers) {
350
    // If serialization succeeded, we want to take ownership of
351
    // (a.k.a. externalize) the underlying memory region and render
352
    // it inaccessible in this Isolate.
353
    ArrayBuffer::Contents contents = ab->Externalize();
354
    ab->Neuter();
355
    array_buffer_contents_.push_back(
356
        MallocedBuffer<char> { static_cast<char*>(contents.Data()),
357
                               contents.ByteLength() });
358
  }
359
360
  delegate.Finish();
361
362
  // The serializer gave us a buffer allocated using `malloc()`.
363
  std::pair<uint8_t*, size_t> data = serializer.Release();
364
  main_message_buf_ =
365
      MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
366
  return Just(true);
367
}
368
369
void Message::MemoryInfo(MemoryTracker* tracker) const {
370
  tracker->TrackField("array_buffer_contents", array_buffer_contents_);
371
  tracker->TrackFieldWithSize("shared_array_buffers",
372
      shared_array_buffers_.size() * sizeof(shared_array_buffers_[0]));
373
  tracker->TrackField("message_ports", message_ports_);
374
}
375
376
MessagePortData::MessagePortData(MessagePort* owner) : owner_(owner) { }
377
378
MessagePortData::~MessagePortData() {
379
  CHECK_EQ(owner_, nullptr);
380
  Disentangle();
381
}
382
383
void MessagePortData::MemoryInfo(MemoryTracker* tracker) const {
384
  Mutex::ScopedLock lock(mutex_);
385
  tracker->TrackField("incoming_messages", incoming_messages_);
386
}
387
388
void MessagePortData::AddToIncomingQueue(Message&& message) {
389
  // This function will be called by other threads.
390
  Mutex::ScopedLock lock(mutex_);
391
  incoming_messages_.emplace_back(std::move(message));
392
393
  if (owner_ != nullptr) {
394
    Debug(owner_, "Adding message to incoming queue");
395
    owner_->TriggerAsync();
396
  }
397
}
398
399
bool MessagePortData::IsSiblingClosed() const {
400
  Mutex::ScopedLock lock(*sibling_mutex_);
401
  return sibling_ == nullptr;
402
}
403
404
void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
405
  CHECK_EQ(a->sibling_, nullptr);
406
  CHECK_EQ(b->sibling_, nullptr);
407
  a->sibling_ = b;
408
  b->sibling_ = a;
409
  a->sibling_mutex_ = b->sibling_mutex_;
410
}
411
412
void MessagePortData::PingOwnerAfterDisentanglement() {
413
  Mutex::ScopedLock lock(mutex_);
414
  if (owner_ != nullptr)
415
    owner_->TriggerAsync();
416
}
417
418
void MessagePortData::Disentangle() {
419
  // Grab a copy of the sibling mutex, then replace it so that each sibling
420
  // has its own sibling_mutex_ now.
421
  std::shared_ptr<Mutex> sibling_mutex = sibling_mutex_;
422
  Mutex::ScopedLock sibling_lock(*sibling_mutex);
423
  sibling_mutex_ = std::make_shared<Mutex>();
424
425
  MessagePortData* sibling = sibling_;
426
  if (sibling_ != nullptr) {
427
    sibling_->sibling_ = nullptr;
428
    sibling_ = nullptr;
429
  }
430
431
  // We close MessagePorts after disentanglement, so we trigger the
432
  // corresponding uv_async_t to let them know that this happened.
433
  PingOwnerAfterDisentanglement();
434
  if (sibling != nullptr) {
435
    sibling->PingOwnerAfterDisentanglement();
436
  }
437
}
438
439
MessagePort::~MessagePort() {
440
  if (data_)
441
    data_->owner_ = nullptr;
442
}
443
444
MessagePort::MessagePort(Environment* env,
445
                         Local<Context> context,
446
                         Local<Object> wrap)
447
  : HandleWrap(env,
448
               wrap,
449
               reinterpret_cast<uv_handle_t*>(new uv_async_t()),
450
               AsyncWrap::PROVIDER_MESSAGEPORT),
451
    data_(new MessagePortData(this)) {
452
  auto onmessage = [](uv_async_t* handle) {
453
    // Called when data has been put into the queue.
454
    MessagePort* channel = static_cast<MessagePort*>(handle->data);
455
    channel->OnMessage();
456
  };
457
  CHECK_EQ(uv_async_init(env->event_loop(),
458
                         async(),
459
                         onmessage), 0);
460
  async()->data = static_cast<void*>(this);
461
462
  Local<Value> fn;
463
  if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn))
464
    return;
465
466
  if (fn->IsFunction()) {
467
    Local<Function> init = fn.As<Function>();
468
    USE(init->Call(context, wrap, 0, nullptr));
469
  }
470
471
  Debug(this, "Created message port");
472
}
473
474
void MessagePort::AddToIncomingQueue(Message&& message) {
475
  data_->AddToIncomingQueue(std::move(message));
476
}
477
478
uv_async_t* MessagePort::async() {
479
  return reinterpret_cast<uv_async_t*>(GetHandle());
480
}
481
482
bool MessagePort::IsDetached() const {
483
  return data_ == nullptr || IsHandleClosing();
484
}
485
486
void MessagePort::TriggerAsync() {
487
  if (IsHandleClosing()) return;
488
  CHECK_EQ(uv_async_send(async()), 0);
489
}
490
491
void MessagePort::Close(v8::Local<v8::Value> close_callback) {
492
  Debug(this, "Closing message port, data set = %d", static_cast<int>(!!data_));
493
494
  if (data_) {
495
    // Wrap this call with accessing the mutex, so that TriggerAsync()
496
    // can check IsHandleClosing() without race conditions.
497
    Mutex::ScopedLock sibling_lock(data_->mutex_);
498
    HandleWrap::Close(close_callback);
499
  } else {
500
    HandleWrap::Close(close_callback);
501
  }
502
}
503
504
void MessagePort::New(const FunctionCallbackInfo<Value>& args) {
505
  Environment* env = Environment::GetCurrent(args);
506
  if (!args.IsConstructCall()) {
507
    THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
508
    return;
509
  }
510
511
  Local<Context> context = args.This()->CreationContext();
512
  Context::Scope context_scope(context);
513
514
  new MessagePort(env, context, args.This());
515
}
516
517
MessagePort* MessagePort::New(
518
    Environment* env,
519
    Local<Context> context,
520
    std::unique_ptr<MessagePortData> data) {
521
  Context::Scope context_scope(context);
522
  Local<Function> ctor;
523
  if (!GetMessagePortConstructor(env, context).ToLocal(&ctor))
524
    return nullptr;
525
526
  // Construct a new instance, then assign the listener instance and possibly
527
  // the MessagePortData to it.
528
  Local<Object> instance;
529
  if (!ctor->NewInstance(context).ToLocal(&instance))
530
    return nullptr;
531
  MessagePort* port = Unwrap<MessagePort>(instance);
532
  CHECK_NOT_NULL(port);
533
  if (data) {
534
    port->Detach();
535
    port->data_ = std::move(data);
536
    port->data_->owner_ = port;
537
    // If the existing MessagePortData object had pending messages, this is
538
    // the easiest way to run that queue.
539
    port->TriggerAsync();
540
  }
541
  return port;
542
}
543
544
void MessagePort::OnMessage() {
545
  Debug(this, "Running MessagePort::OnMessage()");
546
  HandleScope handle_scope(env()->isolate());
547
  Local<Context> context = object(env()->isolate())->CreationContext();
548
549
  // data_ can only ever be modified by the owner thread, so no need to lock.
550
  // However, the message port may be transferred while it is processing
551
  // messages, so we need to check that this handle still owns its `data_` field
552
  // on every iteration.
553
  while (data_) {
554
    Message received;
555
    {
556
      // Get the head of the message queue.
557
      Mutex::ScopedLock lock(data_->mutex_);
558
559
      if (stop_event_loop_) {
560
        Debug(this, "MessagePort stops loop as requested");
561
        CHECK(!data_->receiving_messages_);
562
        uv_stop(env()->event_loop());
563
        break;
564
      }
565
566
      Debug(this, "MessagePort has message, receiving = %d",
567
            static_cast<int>(data_->receiving_messages_));
568
569
      if (!data_->receiving_messages_)
570
        break;
571
      if (data_->incoming_messages_.empty())
572
        break;
573
      received = std::move(data_->incoming_messages_.front());
574
      data_->incoming_messages_.pop_front();
575
    }
576
577
    if (!env()->can_call_into_js()) {
578
      Debug(this, "MessagePort drains queue because !can_call_into_js()");
579
      // In this case there is nothing to do but to drain the current queue.
580
      continue;
581
    }
582
583
    {
584
      // Call the JS .onmessage() callback.
585
      HandleScope handle_scope(env()->isolate());
586
      Context::Scope context_scope(context);
587
      Local<Value> args[] = {
588
        received.Deserialize(env(), context).FromMaybe(Local<Value>())
589
      };
590
591
      if (args[0].IsEmpty() ||
592
          MakeCallback(env()->onmessage_string(), 1, args).IsEmpty()) {
593
        // Re-schedule OnMessage() execution in case of failure.
594
        if (data_)
595
          TriggerAsync();
596
        return;
597
      }
598
    }
599
  }
600
601
  if (data_ && data_->IsSiblingClosed()) {
602
    Close();
603
  }
604
}
605
606
bool MessagePort::IsSiblingClosed() const {
607
  CHECK(data_);
608
  return data_->IsSiblingClosed();
609
}
610
611
void MessagePort::OnClose() {
612
  Debug(this, "MessagePort::OnClose()");
613
  if (data_) {
614
    data_->owner_ = nullptr;
615
    data_->Disentangle();
616
  }
617
  data_.reset();
618
  delete async();
619
}
620
621
std::unique_ptr<MessagePortData> MessagePort::Detach() {
622
  CHECK(data_);
623
  Mutex::ScopedLock lock(data_->mutex_);
624
  data_->owner_ = nullptr;
625
  return std::move(data_);
626
}
627
628
629
Maybe<bool> MessagePort::PostMessage(Environment* env,
630
                                     Local<Value> message_v,
631
                                     Local<Value> transfer_v) {
632
  Isolate* isolate = env->isolate();
633
  Local<Object> obj = object(isolate);
634
  Local<Context> context = obj->CreationContext();
635
636
  Message msg;
637
638
  // Per spec, we need to both check if transfer list has the source port, and
639
  // serialize the input message, even if the MessagePort is closed or detached.
640
641
  Maybe<bool> serialization_maybe =
642
      msg.Serialize(env, context, message_v, transfer_v, obj);
643
  if (data_ == nullptr) {
644
    return serialization_maybe;
645
  }
646
  if (serialization_maybe.IsNothing()) {
647
    return Nothing<bool>();
648
  }
649
650
  Mutex::ScopedLock lock(*data_->sibling_mutex_);
651
  bool doomed = false;
652
653
  // Check if the target port is posted to itself.
654
  if (data_->sibling_ != nullptr) {
655
    for (const auto& port_data : msg.message_ports()) {
656
      if (data_->sibling_ == port_data.get()) {
657
        doomed = true;
658
        ProcessEmitWarning(env, "The target port was posted to itself, and "
659
                                "the communication channel was lost");
660
        break;
661
      }
662
    }
663
  }
664
665
  if (data_->sibling_ == nullptr || doomed)
666
    return Just(true);
667
668
  data_->sibling_->AddToIncomingQueue(std::move(msg));
669
  return Just(true);
670
}
671
672
void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
673
  Environment* env = Environment::GetCurrent(args);
674
  if (args.Length() == 0) {
675
    return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
676
                                       "MessagePort.postMessage");
677
  }
678
679
  MessagePort* port = Unwrap<MessagePort>(args.This());
680
  // Even if the backing MessagePort object has already been deleted, we still
681
  // want to serialize the message to ensure spec-compliant behavior w.r.t.
682
  // transfers.
683
  if (port == nullptr) {
684
    Message msg;
685
    Local<Object> obj = args.This();
686
    Local<Context> context = obj->CreationContext();
687
    USE(msg.Serialize(env, context, args[0], args[1], obj));
688
    return;
689
  }
690
691
  port->PostMessage(env, args[0], args[1]);
692
}
693
694
void MessagePort::Start() {
695
  Mutex::ScopedLock lock(data_->mutex_);
696
  Debug(this, "Start receiving messages");
697
  data_->receiving_messages_ = true;
698
  if (!data_->incoming_messages_.empty())
699
    TriggerAsync();
700
}
701
702
void MessagePort::Stop() {
703
  Mutex::ScopedLock lock(data_->mutex_);
704
  Debug(this, "Stop receiving messages");
705
  data_->receiving_messages_ = false;
706
}
707
708
void MessagePort::StopEventLoop() {
709
  Mutex::ScopedLock lock(data_->mutex_);
710
  data_->receiving_messages_ = false;
711
  stop_event_loop_ = true;
712
713
  Debug(this, "Received StopEventLoop request");
714
  TriggerAsync();
715
}
716
717
void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
718
  Environment* env = Environment::GetCurrent(args);
719
  MessagePort* port;
720
  ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
721
  if (!port->data_) {
722
    THROW_ERR_CLOSED_MESSAGE_PORT(env);
723
    return;
724
  }
725
  port->Start();
726
}
727
728
void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
729
  Environment* env = Environment::GetCurrent(args);
730
  MessagePort* port;
731
  ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
732
  if (!port->data_) {
733
    THROW_ERR_CLOSED_MESSAGE_PORT(env);
734
    return;
735
  }
736
  port->Stop();
737
}
738
739
void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
740
  MessagePort* port;
741
  ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
742
  port->OnMessage();
743
}
744
745
void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
746
  Entangle(a, b->data_.get());
747
}
748
749
void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
750
  MessagePortData::Entangle(a->data_.get(), b);
751
}
752
753
328
MaybeLocal<Function> GetMessagePortConstructor(
754
    Environment* env, Local<Context> context) {
755
  // Factor generating the MessagePort JS constructor into its own piece
756
  // of code, because it is needed early on in the child environment setup.
757
328
  Local<FunctionTemplate> templ = env->message_port_constructor_template();
758
328
  if (!templ.IsEmpty())
759
164
    return templ->GetFunction(context);
760
761
  {
762
164
    Local<FunctionTemplate> m = env->NewFunctionTemplate(MessagePort::New);
763
328
    m->SetClassName(env->message_port_constructor_string());
764
328
    m->InstanceTemplate()->SetInternalFieldCount(1);
765
328
    m->Inherit(HandleWrap::GetConstructorTemplate(env));
766
767
164
    env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage);
768
164
    env->SetProtoMethod(m, "start", MessagePort::Start);
769
164
    env->SetProtoMethod(m, "stop", MessagePort::Stop);
770
164
    env->SetProtoMethod(m, "drain", MessagePort::Drain);
771
772
164
    env->set_message_port_constructor_template(m);
773
  }
774
775
164
  return GetMessagePortConstructor(env, context);
776
}
777
778
namespace {
779
780
static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
781
  Environment* env = Environment::GetCurrent(args);
782
  if (!args.IsConstructCall()) {
783
    THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
784
    return;
785
  }
786
787
  Local<Context> context = args.This()->CreationContext();
788
  Context::Scope context_scope(context);
789
790
  MessagePort* port1 = MessagePort::New(env, context);
791
  MessagePort* port2 = MessagePort::New(env, context);
792
  MessagePort::Entangle(port1, port2);
793
794
  args.This()->Set(env->context(), env->port1_string(), port1->object())
795
      .FromJust();
796
  args.This()->Set(env->context(), env->port2_string(), port2->object())
797
      .FromJust();
798
}
799
800
164
static void RegisterDOMException(const FunctionCallbackInfo<Value>& args) {
801
164
  Environment* env = Environment::GetCurrent(args);
802
164
  CHECK_EQ(args.Length(), 1);
803
328
  CHECK(args[0]->IsFunction());
804
328
  env->set_domexception_function(args[0].As<Function>());
805
164
}
806
807
164
static void InitMessaging(Local<Object> target,
808
                          Local<Value> unused,
809
                          Local<Context> context,
810
                          void* priv) {
811
164
  Environment* env = Environment::GetCurrent(context);
812
813
  {
814
    Local<String> message_channel_string =
815
164
        FIXED_ONE_BYTE_STRING(env->isolate(), "MessageChannel");
816
164
    Local<FunctionTemplate> templ = env->NewFunctionTemplate(MessageChannel);
817
164
    templ->SetClassName(message_channel_string);
818
    target->Set(env->context(),
819
                message_channel_string,
820
656
                templ->GetFunction(context).ToLocalChecked()).FromJust();
821
  }
822
823
  target->Set(context,
824
              env->message_port_constructor_string(),
825
656
              GetMessagePortConstructor(env, context).ToLocalChecked())
826
328
                  .FromJust();
827
828
164
  env->SetMethod(target, "registerDOMException", RegisterDOMException);
829
164
}
830
831
}  // anonymous namespace
832
833
}  // namespace worker
834
}  // namespace node
835
836
164
NODE_MODULE_CONTEXT_AWARE_INTERNAL(messaging, node::worker::InitMessaging)