GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage/nodes/benchmark/out/../src/node_messaging.cc Lines: 386 421 91.7 %
Date: 2019-01-07 12:15:22 Branches: 147 208 70.7 %

Line Branch Exec Source
1
#include "debug_utils.h"
2
#include "node_messaging.h"
3
#include "node_internals.h"
4
#include "node_buffer.h"
5
#include "node_errors.h"
6
#include "util.h"
7
#include "util-inl.h"
8
#include "async_wrap.h"
9
#include "async_wrap-inl.h"
10
11
using v8::Array;
12
using v8::ArrayBuffer;
13
using v8::ArrayBufferCreationMode;
14
using v8::Context;
15
using v8::EscapableHandleScope;
16
using v8::Exception;
17
using v8::Function;
18
using v8::FunctionCallbackInfo;
19
using v8::FunctionTemplate;
20
using v8::HandleScope;
21
using v8::Isolate;
22
using v8::Just;
23
using v8::Local;
24
using v8::Maybe;
25
using v8::MaybeLocal;
26
using v8::Nothing;
27
using v8::Object;
28
using v8::SharedArrayBuffer;
29
using v8::String;
30
using v8::Value;
31
using v8::ValueDeserializer;
32
using v8::ValueSerializer;
33
using v8::WasmCompiledModule;
34
35
namespace node {
36
namespace worker {
37
38
24747
Message::Message(MallocedBuffer<char>&& buffer)
39
24747
    : main_message_buf_(std::move(buffer)) {}
40
41
namespace {
42
43
// This is used to tell V8 how to read transferred host objects, like other
44
// `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
45
8809
class DeserializerDelegate : public ValueDeserializer::Delegate {
46
 public:
47
8733
  DeserializerDelegate(
48
      Message* m,
49
      Environment* env,
50
      const std::vector<MessagePort*>& message_ports,
51
      const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
52
      const std::vector<WasmCompiledModule::TransferrableModule>& wasm_modules)
53
      : message_ports_(message_ports),
54
        shared_array_buffers_(shared_array_buffers),
55
8733
        wasm_modules_(wasm_modules) {}
56
57
110
  MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
58
    // Currently, only MessagePort hosts objects are supported, so identifying
59
    // by the index in the message's MessagePort array is sufficient.
60
    uint32_t id;
61
110
    if (!deserializer->ReadUint32(&id))
62
      return MaybeLocal<Object>();
63
110
    CHECK_LE(id, message_ports_.size());
64
220
    return message_ports_[id]->object(isolate);
65
  };
66
67
5
  MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
68
      Isolate* isolate, uint32_t clone_id) override {
69
5
    CHECK_LE(clone_id, shared_array_buffers_.size());
70
10
    return shared_array_buffers_[clone_id];
71
  }
72
73
2
  MaybeLocal<WasmCompiledModule> GetWasmModuleFromId(
74
      Isolate* isolate, uint32_t transfer_id) override {
75
2
    CHECK_LE(transfer_id, wasm_modules_.size());
76
    return WasmCompiledModule::FromTransferrableModule(
77
2
        isolate, wasm_modules_[transfer_id]);
78
  }
79
80
  ValueDeserializer* deserializer = nullptr;
81
82
 private:
83
  const std::vector<MessagePort*>& message_ports_;
84
  const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
85
  const std::vector<WasmCompiledModule::TransferrableModule>& wasm_modules_;
86
};
87
88
}  // anonymous namespace
89
90
8806
MaybeLocal<Value> Message::Deserialize(Environment* env,
91
                                       Local<Context> context) {
92
8806
  EscapableHandleScope handle_scope(env->isolate());
93
  Context::Scope context_scope(context);
94
95
  // Create all necessary MessagePort handles.
96
17628
  std::vector<MessagePort*> ports(message_ports_.size());
97
8923
  for (uint32_t i = 0; i < message_ports_.size(); ++i) {
98
110
    ports[i] = MessagePort::New(env,
99
                                context,
100
110
                                std::move(message_ports_[i]));
101
110
    if (ports[i] == nullptr) {
102
      for (MessagePort* port : ports) {
103
        // This will eventually release the MessagePort object itself.
104
        if (port != nullptr)
105
          port->Close();
106
      }
107
      return MaybeLocal<Value>();
108
    }
109
  }
110
8777
  message_ports_.clear();
111
112
17620
  std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
113
  // Attach all transferred SharedArrayBuffers to their new Isolate.
114
8806
  for (uint32_t i = 0; i < shared_array_buffers_.size(); ++i) {
115
    Local<SharedArrayBuffer> sab;
116
15
    if (!shared_array_buffers_[i]->GetSharedArrayBuffer(env, context)
117
15
            .ToLocal(&sab))
118
      return MaybeLocal<Value>();
119
5
    shared_array_buffers.push_back(sab);
120
  }
121
8597
  shared_array_buffers_.clear();
122
123
  DeserializerDelegate delegate(
124
17611
      this, env, ports, shared_array_buffers, wasm_modules_);
125
  ValueDeserializer deserializer(
126
      env->isolate(),
127
      reinterpret_cast<const uint8_t*>(main_message_buf_.data),
128
      main_message_buf_.size,
129
17596
      &delegate);
130
8817
  delegate.deserializer = &deserializer;
131
132
  // Attach all transferred ArrayBuffers to their new Isolate.
133
8819
  for (uint32_t i = 0; i < array_buffer_contents_.size(); ++i) {
134
    Local<ArrayBuffer> ab =
135
        ArrayBuffer::New(env->isolate(),
136
2
                         array_buffer_contents_[i].release(),
137
2
                         array_buffer_contents_[i].size,
138
4
                         ArrayBufferCreationMode::kInternalized);
139
2
    deserializer.TransferArrayBuffer(i, ab);
140
  }
141
8817
  array_buffer_contents_.clear();
142
143
17634
  if (deserializer.ReadHeader(context).IsNothing())
144
    return MaybeLocal<Value>();
145
  return handle_scope.Escape(
146
17635
      deserializer.ReadValue(context).FromMaybe(Local<Value>()));
147
}
148
149
5
void Message::AddSharedArrayBuffer(
150
    SharedArrayBufferMetadataReference reference) {
151
5
  shared_array_buffers_.push_back(reference);
152
5
}
153
154
131
void Message::AddMessagePort(std::unique_ptr<MessagePortData>&& data) {
155
131
  message_ports_.emplace_back(std::move(data));
156
131
}
157
158
2
uint32_t Message::AddWASMModule(WasmCompiledModule::TransferrableModule&& mod) {
159
2
  wasm_modules_.emplace_back(std::move(mod));
160
2
  return wasm_modules_.size() - 1;
161
}
162
163
namespace {
164
165
9
void ThrowDataCloneException(Environment* env, Local<String> message) {
166
  Local<Value> argv[] = {
167
    message,
168
    FIXED_ONE_BYTE_STRING(env->isolate(), "DataCloneError")
169
27
  };
170
  Local<Value> exception;
171
9
  Local<Function> domexception_ctor = env->domexception_function();
172
9
  CHECK(!domexception_ctor.IsEmpty());
173
36
  if (!domexception_ctor->NewInstance(env->context(), arraysize(argv), argv)
174
27
          .ToLocal(&exception)) {
175
9
    return;
176
  }
177
9
  env->isolate()->ThrowException(exception);
178
}
179
180
// This tells V8 how to serialize objects that it does not understand
181
// (e.g. C++ objects) into the output buffer, in a way that our own
182
// DeserializerDelegate understands how to unpack.
183
8885
class SerializerDelegate : public ValueSerializer::Delegate {
184
 public:
185
8885
  SerializerDelegate(Environment* env, Local<Context> context, Message* m)
186
8885
      : env_(env), context_(context), msg_(m) {}
187
188
1
  void ThrowDataCloneError(Local<String> message) override {
189
1
    ThrowDataCloneException(env_, message);
190
1
  }
191
192
130
  Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
193
260
    if (env_->message_port_constructor_template()->HasInstance(object)) {
194
130
      return WriteMessagePort(Unwrap<MessagePort>(object));
195
    }
196
197
    THROW_ERR_CANNOT_TRANSFER_OBJECT(env_);
198
    return Nothing<bool>();
199
  }
200
201
5
  Maybe<uint32_t> GetSharedArrayBufferId(
202
      Isolate* isolate,
203
      Local<SharedArrayBuffer> shared_array_buffer) override {
204
    uint32_t i;
205
5
    for (i = 0; i < seen_shared_array_buffers_.size(); ++i) {
206
      if (seen_shared_array_buffers_[i] == shared_array_buffer)
207
        return Just(i);
208
    }
209
210
    auto reference = SharedArrayBufferMetadata::ForSharedArrayBuffer(
211
        env_,
212
        context_,
213
5
        shared_array_buffer);
214
5
    if (!reference) {
215
      return Nothing<uint32_t>();
216
    }
217
5
    seen_shared_array_buffers_.push_back(shared_array_buffer);
218
5
    msg_->AddSharedArrayBuffer(reference);
219
5
    return Just(i);
220
  }
221
222
2
  Maybe<uint32_t> GetWasmModuleTransferId(
223
      Isolate* isolate, Local<WasmCompiledModule> module) override {
224
2
    return Just(msg_->AddWASMModule(module->GetTransferrableModule()));
225
  }
226
227
8876
  void Finish() {
228
    // Only close the MessagePort handles and actually transfer them
229
    // once we know that serialization succeeded.
230
9007
    for (MessagePort* port : ports_) {
231
262
      port->Close();
232
131
      msg_->AddMessagePort(port->Detach());
233
    }
234
8877
  }
235
236
  ValueSerializer* serializer = nullptr;
237
238
 private:
239
130
  Maybe<bool> WriteMessagePort(MessagePort* port) {
240
130
    for (uint32_t i = 0; i < ports_.size(); i++) {
241
130
      if (ports_[i] == port) {
242
130
        serializer->WriteUint32(i);
243
130
        return Just(true);
244
      }
245
    }
246
247
    THROW_ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST(env_);
248
    return Nothing<bool>();
249
  }
250
251
  Environment* env_;
252
  Local<Context> context_;
253
  Message* msg_;
254
  std::vector<Local<SharedArrayBuffer>> seen_shared_array_buffers_;
255
  std::vector<MessagePort*> ports_;
256
257
  friend class worker::Message;
258
};
259
260
}  // anonymous namespace
261
262
8885
Maybe<bool> Message::Serialize(Environment* env,
263
                               Local<Context> context,
264
                               Local<Value> input,
265
                               Local<Value> transfer_list_v,
266
                               Local<Object> source_port) {
267
8885
  HandleScope handle_scope(env->isolate());
268
  Context::Scope context_scope(context);
269
270
  // Verify that we're not silently overwriting an existing message.
271
8886
  CHECK(main_message_buf_.is_empty());
272
273
17772
  SerializerDelegate delegate(env, context, this);
274
17772
  ValueSerializer serializer(env->isolate(), &delegate);
275
8885
  delegate.serializer = &serializer;
276
277
17771
  std::vector<Local<ArrayBuffer>> array_buffers;
278
8885
  if (transfer_list_v->IsArray()) {
279
142
    Local<Array> transfer_list = transfer_list_v.As<Array>();
280
142
    uint32_t length = transfer_list->Length();
281
284
    for (uint32_t i = 0; i < length; ++i) {
282
      Local<Value> entry;
283
300
      if (!transfer_list->Get(context, i).ToLocal(&entry))
284
8
        return Nothing<bool>();
285
      // Currently, we support ArrayBuffers and MessagePorts.
286
150
      if (entry->IsArrayBuffer()) {
287
10
        Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
288
        // If we cannot render the ArrayBuffer unusable in this Isolate and
289
        // take ownership of its memory, copying the buffer will have to do.
290

20
        if (!ab->IsNeuterable() || ab->IsExternal())
291
          continue;
292
        // We simply use the array index in the `array_buffers` list as the
293
        // ID that we write into the serialized buffer.
294
10
        uint32_t id = array_buffers.size();
295
10
        array_buffers.push_back(ab);
296
10
        serializer.TransferArrayBuffer(id, ab);
297
10
        continue;
298
280
      } else if (env->message_port_constructor_template()
299
420
                    ->HasInstance(entry)) {
300
        // Check if the source MessagePort is being transferred.
301

280
        if (!source_port.IsEmpty() && entry == source_port) {
302
          ThrowDataCloneException(
303
              env,
304
              FIXED_ONE_BYTE_STRING(env->isolate(),
305
1
                                    "Transfer list contains source port"));
306
9
          return Nothing<bool>();
307
        }
308
139
        MessagePort* port = Unwrap<MessagePort>(entry.As<Object>());
309

139
        if (port == nullptr || port->IsDetached()) {
310
          ThrowDataCloneException(
311
              env,
312
              FIXED_ONE_BYTE_STRING(
313
                  env->isolate(),
314
7
                  "MessagePort in transfer list is already detached"));
315
7
          return Nothing<bool>();
316
        }
317
132
        delegate.ports_.push_back(port);
318
132
        continue;
319
      }
320
321
      THROW_ERR_INVALID_TRANSFER_OBJECT(env);
322
      return Nothing<bool>();
323
    }
324
  }
325
326
8878
  serializer.WriteHeader();
327
17756
  if (serializer.WriteValue(context, input).IsNothing()) {
328
1
    return Nothing<bool>();
329
  }
330
331
8880
  for (Local<ArrayBuffer> ab : array_buffers) {
332
    // If serialization succeeded, we want to take ownership of
333
    // (a.k.a. externalize) the underlying memory region and render
334
    // it inaccessible in this Isolate.
335
3
    ArrayBuffer::Contents contents = ab->Externalize();
336
3
    ab->Neuter();
337
    array_buffer_contents_.push_back(
338
3
        MallocedBuffer<char> { static_cast<char*>(contents.Data()),
339
3
                               contents.ByteLength() });
340
  }
341
342
8876
  delegate.Finish();
343
344
  // The serializer gave us a buffer allocated using `malloc()`.
345
8877
  std::pair<uint8_t*, size_t> data = serializer.Release();
346
17753
  main_message_buf_ =
347
8877
      MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
348
17761
  return Just(true);
349
}
350
351
4
void Message::MemoryInfo(MemoryTracker* tracker) const {
352
4
  tracker->TrackField("array_buffer_contents", array_buffer_contents_);
353
  tracker->TrackFieldWithSize("shared_array_buffers",
354
4
      shared_array_buffers_.size() * sizeof(shared_array_buffers_[0]));
355
4
  tracker->TrackField("message_ports", message_ports_);
356
4
}
357
358
756
MessagePortData::MessagePortData(MessagePort* owner) : owner_(owner) { }
359
360
2267
MessagePortData::~MessagePortData() {
361
756
  CHECK_EQ(owner_, nullptr);
362
756
  Disentangle();
363
1511
}
364
365
8
void MessagePortData::MemoryInfo(MemoryTracker* tracker) const {
366
8
  Mutex::ScopedLock lock(mutex_);
367
8
  tracker->TrackField("incoming_messages", incoming_messages_);
368
8
}
369
370
8876
void MessagePortData::AddToIncomingQueue(Message&& message) {
371
  // This function will be called by other threads.
372
8876
  Mutex::ScopedLock lock(mutex_);
373
8876
  incoming_messages_.emplace_back(std::move(message));
374
375
8876
  if (owner_ != nullptr) {
376
8610
    Debug(owner_, "Adding message to incoming queue");
377
8610
    owner_->TriggerAsync();
378
8876
  }
379
8876
}
380
381
7044
bool MessagePortData::IsSiblingClosed() const {
382
7044
  Mutex::ScopedLock lock(*sibling_mutex_);
383
7044
  return sibling_ == nullptr;
384
}
385
386
264
void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
387
264
  CHECK_EQ(a->sibling_, nullptr);
388
264
  CHECK_EQ(b->sibling_, nullptr);
389
264
  a->sibling_ = b;
390
264
  b->sibling_ = a;
391
264
  a->sibling_mutex_ = b->sibling_mutex_;
392
264
}
393
394
1526
void MessagePortData::PingOwnerAfterDisentanglement() {
395
1526
  Mutex::ScopedLock lock(mutex_);
396
1526
  if (owner_ != nullptr)
397
263
    owner_->TriggerAsync();
398
1526
}
399
400
1262
void MessagePortData::Disentangle() {
401
  // Grab a copy of the sibling mutex, then replace it so that each sibling
402
  // has its own sibling_mutex_ now.
403
1262
  std::shared_ptr<Mutex> sibling_mutex = sibling_mutex_;
404
2524
  Mutex::ScopedLock sibling_lock(*sibling_mutex);
405
1262
  sibling_mutex_ = std::make_shared<Mutex>();
406
407
1262
  MessagePortData* sibling = sibling_;
408
1262
  if (sibling_ != nullptr) {
409
264
    sibling_->sibling_ = nullptr;
410
264
    sibling_ = nullptr;
411
  }
412
413
  // We close MessagePorts after disentanglement, so we trigger the
414
  // corresponding uv_async_t to let them know that this happened.
415
1262
  PingOwnerAfterDisentanglement();
416
1262
  if (sibling != nullptr) {
417
264
    sibling->PingOwnerAfterDisentanglement();
418
1262
  }
419
1262
}
420
421
1911
MessagePort::~MessagePort() {
422
637
  if (data_)
423
    data_->owner_ = nullptr;
424
1274
}
425
426
637
MessagePort::MessagePort(Environment* env,
427
                         Local<Context> context,
428
                         Local<Object> wrap)
429
  : HandleWrap(env,
430
               wrap,
431
               reinterpret_cast<uv_handle_t*>(new uv_async_t()),
432
               AsyncWrap::PROVIDER_MESSAGEPORT),
433
637
    data_(new MessagePortData(this)) {
434
14663
  auto onmessage = [](uv_async_t* handle) {
435
    // Called when data has been put into the queue.
436
7013
    MessagePort* channel = static_cast<MessagePort*>(handle->data);
437
7013
    channel->OnMessage();
438
14663
  };
439
637
  CHECK_EQ(uv_async_init(env->event_loop(),
440
                         async(),
441
                         onmessage), 0);
442
637
  async()->data = static_cast<void*>(this);
443
444
  Local<Value> fn;
445
1911
  if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn))
446
637
    return;
447
448
637
  if (fn->IsFunction()) {
449
519
    Local<Function> init = fn.As<Function>();
450
519
    USE(init->Call(context, wrap, 0, nullptr));
451
  }
452
453
637
  Debug(this, "Created message port");
454
}
455
456
void MessagePort::AddToIncomingQueue(Message&& message) {
457
  data_->AddToIncomingQueue(std::move(message));
458
}
459
460
11182
uv_async_t* MessagePort::async() {
461
11182
  return reinterpret_cast<uv_async_t*>(GetHandle());
462
}
463
464
138
bool MessagePort::IsDetached() const {
465

138
  return data_ == nullptr || IsHandleClosing();
466
}
467
468
9276
void MessagePort::TriggerAsync() {
469
18552
  if (IsHandleClosing()) return;
470
9271
  CHECK_EQ(uv_async_send(async()), 0);
471
}
472
473
769
void MessagePort::Close(v8::Local<v8::Value> close_callback) {
474
1538
  Debug(this, "Closing message port, data set = %d", static_cast<int>(!!data_));
475
476
769
  if (data_) {
477
    // Wrap this call with accessing the mutex, so that TriggerAsync()
478
    // can check IsHandleClosing() without race conditions.
479
769
    Mutex::ScopedLock sibling_lock(data_->mutex_);
480
769
    HandleWrap::Close(close_callback);
481
  } else {
482
    HandleWrap::Close(close_callback);
483
  }
484
769
}
485
486
637
void MessagePort::New(const FunctionCallbackInfo<Value>& args) {
487
637
  Environment* env = Environment::GetCurrent(args);
488
637
  if (!args.IsConstructCall()) {
489
    THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
490
637
    return;
491
  }
492
493
1274
  Local<Context> context = args.This()->CreationContext();
494
  Context::Scope context_scope(context);
495
496
1274
  new MessagePort(env, context, args.This());
497
}
498
499
637
MessagePort* MessagePort::New(
500
    Environment* env,
501
    Local<Context> context,
502
    std::unique_ptr<MessagePortData> data) {
503
  Context::Scope context_scope(context);
504
  Local<Function> ctor;
505
1274
  if (!GetMessagePortConstructor(env, context).ToLocal(&ctor))
506
    return nullptr;
507
508
  // Construct a new instance, then assign the listener instance and possibly
509
  // the MessagePortData to it.
510
  Local<Object> instance;
511
1274
  if (!ctor->NewInstance(context).ToLocal(&instance))
512
    return nullptr;
513
637
  MessagePort* port = Unwrap<MessagePort>(instance);
514
637
  CHECK_NOT_NULL(port);
515
637
  if (data) {
516
228
    port->Detach();
517
228
    port->data_ = std::move(data);
518
228
    port->data_->owner_ = port;
519
    // If the existing MessagePortData object had pending messages, this is
520
    // the easiest way to run that queue.
521
228
    port->TriggerAsync();
522
  }
523
637
  return port;
524
}
525
526
7057
void MessagePort::OnMessage() {
527
7057
  Debug(this, "Running MessagePort::OnMessage()");
528
7057
  HandleScope handle_scope(env()->isolate());
529
14114
  Local<Context> context = object(env()->isolate())->CreationContext();
530
531
  // data_ can only ever be modified by the owner thread, so no need to lock.
532
  // However, the message port may be transferred while it is processing
533
  // messages, so we need to check that this handle still owns its `data_` field
534
  // on every iteration.
535
22921
  while (data_) {
536
15863
    Message received;
537
    {
538
      // Get the head of the message queue.
539
15864
      Mutex::ScopedLock lock(data_->mutex_);
540
541
15861
      if (stop_event_loop_) {
542
8
        Debug(this, "MessagePort stops loop as requested");
543
8
        CHECK(!data_->receiving_messages_);
544
8
        uv_stop(env()->event_loop());
545
8
        break;
546
      }
547
548
      Debug(this, "MessagePort has message, receiving = %d",
549
31708
            static_cast<int>(data_->receiving_messages_));
550
551
15855
      if (!data_->receiving_messages_)
552
30
        break;
553
15826
      if (data_->incoming_messages_.empty())
554
7006
        break;
555
8818
      received = std::move(data_->incoming_messages_.front());
556
8818
      data_->incoming_messages_.pop_front();
557
    }
558
559
8820
    if (!env()->can_call_into_js()) {
560
      Debug(this, "MessagePort drains queue because !can_call_into_js()");
561
      // In this case there is nothing to do but to drain the current queue.
562
      continue;
563
    }
564
565
    {
566
      // Call the JS .onmessage() callback.
567
8820
      HandleScope handle_scope(env()->isolate());
568
8807
      Context::Scope context_scope(context);
569
      Local<Value> args[] = {
570
17629
        received.Deserialize(env(), context).FromMaybe(Local<Value>())
571
26447
      };
572
573

44096
      if (args[0].IsEmpty() ||
574
35278
          MakeCallback(env()->onmessage_string(), 1, args).IsEmpty()) {
575
        // Re-schedule OnMessage() execution in case of failure.
576
13
        if (data_)
577
13
          TriggerAsync();
578
7070
        return;
579

8807
      }
580
    }
581
8807
  }
582
583

7045
  if (data_ && data_->IsSiblingClosed()) {
584
520
    Close();
585
7044
  }
586
}
587
588
bool MessagePort::IsSiblingClosed() const {
589
  CHECK(data_);
590
  return data_->IsSiblingClosed();
591
}
592
593
637
void MessagePort::OnClose() {
594
637
  Debug(this, "MessagePort::OnClose()");
595
637
  if (data_) {
596
506
    data_->owner_ = nullptr;
597
506
    data_->Disentangle();
598
  }
599
637
  data_.reset();
600
637
  delete async();
601
637
}
602
603
359
std::unique_ptr<MessagePortData> MessagePort::Detach() {
604
359
  Mutex::ScopedLock lock(data_->mutex_);
605
359
  data_->owner_ = nullptr;
606
359
  return std::move(data_);
607
}
608
609
610
8885
Maybe<bool> MessagePort::PostMessage(Environment* env,
611
                                     Local<Value> message_v,
612
                                     Local<Value> transfer_v) {
613
8885
  Isolate* isolate = env->isolate();
614
8885
  Local<Object> obj = object(isolate);
615
8885
  Local<Context> context = obj->CreationContext();
616
617
8885
  Message msg;
618
619
  // Per spec, we need to both check if transfer list has the source port, and
620
  // serialize the input message, even if the MessagePort is closed or detached.
621
622
  Maybe<bool> serialization_maybe =
623
8885
      msg.Serialize(env, context, message_v, transfer_v, obj);
624
8885
  if (data_ == nullptr) {
625
2
    return serialization_maybe;
626
  }
627
8882
  if (serialization_maybe.IsNothing()) {
628
6
    return Nothing<bool>();
629
  }
630
631
17753
  Mutex::ScopedLock lock(*data_->sibling_mutex_);
632
8877
  bool doomed = false;
633
634
  // Check if the target port is posted to itself.
635
8877
  if (data_->sibling_ != nullptr) {
636
9007
    for (const auto& port_data : msg.message_ports()) {
637
131
      if (data_->sibling_ == port_data.get()) {
638
1
        doomed = true;
639
        ProcessEmitWarning(env, "The target port was posted to itself, and "
640
1
                                "the communication channel was lost");
641
1
        break;
642
      }
643
    }
644
  }
645
646

8877
  if (data_->sibling_ == nullptr || doomed)
647
1
    return Just(true);
648
649
8876
  data_->sibling_->AddToIncomingQueue(std::move(msg));
650
17761
  return Just(true);
651
}
652
653
8886
void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
654
8886
  Environment* env = Environment::GetCurrent(args);
655
8886
  if (args.Length() == 0) {
656
    return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
657
                                       "MessagePort.postMessage");
658
  }
659
660
8886
  MessagePort* port = Unwrap<MessagePort>(args.This());
661
  // Even if the backing MessagePort object has already been deleted, we still
662
  // want to serialize the message to ensure spec-compliant behavior w.r.t.
663
  // transfers.
664
8886
  if (port == nullptr) {
665
1
    Message msg;
666
1
    Local<Object> obj = args.This();
667
1
    Local<Context> context = obj->CreationContext();
668
1
    USE(msg.Serialize(env, context, args[0], args[1], obj));
669
1
    return;
670
  }
671
672
8885
  port->PostMessage(env, args[0], args[1]);
673
}
674
675
587
void MessagePort::Start() {
676
587
  Mutex::ScopedLock lock(data_->mutex_);
677
587
  Debug(this, "Start receiving messages");
678
587
  data_->receiving_messages_ = true;
679
587
  if (!data_->incoming_messages_.empty())
680
135
    TriggerAsync();
681
587
}
682
683
28
void MessagePort::Stop() {
684
28
  Mutex::ScopedLock lock(data_->mutex_);
685
28
  Debug(this, "Stop receiving messages");
686
28
  data_->receiving_messages_ = false;
687
28
}
688
689
27
void MessagePort::StopEventLoop() {
690
27
  Mutex::ScopedLock lock(data_->mutex_);
691
27
  data_->receiving_messages_ = false;
692
27
  stop_event_loop_ = true;
693
694
27
  Debug(this, "Received StopEventLoop request");
695
27
  TriggerAsync();
696
27
}
697
698
587
void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
699
587
  Environment* env = Environment::GetCurrent(args);
700
  MessagePort* port;
701
587
  ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
702
587
  if (!port->data_) {
703
    THROW_ERR_CLOSED_MESSAGE_PORT(env);
704
    return;
705
  }
706
587
  port->Start();
707
}
708
709
102
void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
710
102
  Environment* env = Environment::GetCurrent(args);
711
  MessagePort* port;
712
176
  ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
713
28
  if (!port->data_) {
714
    THROW_ERR_CLOSED_MESSAGE_PORT(env);
715
    return;
716
  }
717
28
  port->Stop();
718
}
719
720
236
void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
721
  MessagePort* port;
722
472
  ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
723
44
  port->OnMessage();
724
}
725
726
145
void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
727
145
  Entangle(a, b->data_.get());
728
145
}
729
730
264
void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
731
264
  MessagePortData::Entangle(a->data_.get(), b);
732
264
}
733
734
1453
MaybeLocal<Function> GetMessagePortConstructor(
735
    Environment* env, Local<Context> context) {
736
  // Factor generating the MessagePort JS constructor into its own piece
737
  // of code, because it is needed early on in the child environment setup.
738
1453
  Local<FunctionTemplate> templ = env->message_port_constructor_template();
739
1453
  if (!templ.IsEmpty())
740
1040
    return templ->GetFunction(context);
741
742
  {
743
413
    Local<FunctionTemplate> m = env->NewFunctionTemplate(MessagePort::New);
744
826
    m->SetClassName(env->message_port_constructor_string());
745
826
    m->InstanceTemplate()->SetInternalFieldCount(1);
746
826
    m->Inherit(HandleWrap::GetConstructorTemplate(env));
747
748
413
    env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage);
749
413
    env->SetProtoMethod(m, "start", MessagePort::Start);
750
413
    env->SetProtoMethod(m, "stop", MessagePort::Stop);
751
413
    env->SetProtoMethod(m, "drain", MessagePort::Drain);
752
753
413
    env->set_message_port_constructor_template(m);
754
  }
755
756
413
  return GetMessagePortConstructor(env, context);
757
}
758
759
namespace {
760
761
145
static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
762
145
  Environment* env = Environment::GetCurrent(args);
763
145
  if (!args.IsConstructCall()) {
764
    THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
765
145
    return;
766
  }
767
768
290
  Local<Context> context = args.This()->CreationContext();
769
  Context::Scope context_scope(context);
770
771
145
  MessagePort* port1 = MessagePort::New(env, context);
772
145
  MessagePort* port2 = MessagePort::New(env, context);
773
145
  MessagePort::Entangle(port1, port2);
774
775
870
  args.This()->Set(env->context(), env->port1_string(), port1->object())
776
290
      .FromJust();
777
870
  args.This()->Set(env->context(), env->port2_string(), port2->object())
778
290
      .FromJust();
779
}
780
781
156
static void RegisterDOMException(const FunctionCallbackInfo<Value>& args) {
782
156
  Environment* env = Environment::GetCurrent(args);
783
156
  CHECK_EQ(args.Length(), 1);
784
312
  CHECK(args[0]->IsFunction());
785
312
  env->set_domexception_function(args[0].As<Function>());
786
156
}
787
788
403
static void InitMessaging(Local<Object> target,
789
                          Local<Value> unused,
790
                          Local<Context> context,
791
                          void* priv) {
792
403
  Environment* env = Environment::GetCurrent(context);
793
794
  {
795
    Local<String> message_channel_string =
796
403
        FIXED_ONE_BYTE_STRING(env->isolate(), "MessageChannel");
797
403
    Local<FunctionTemplate> templ = env->NewFunctionTemplate(MessageChannel);
798
403
    templ->SetClassName(message_channel_string);
799
    target->Set(env->context(),
800
                message_channel_string,
801
1612
                templ->GetFunction(context).ToLocalChecked()).FromJust();
802
  }
803
804
  target->Set(context,
805
              env->message_port_constructor_string(),
806
1612
              GetMessagePortConstructor(env, context).ToLocalChecked())
807
806
                  .FromJust();
808
809
403
  env->SetMethod(target, "registerDOMException", RegisterDOMException);
810
403
}
811
812
}  // anonymous namespace
813
814
}  // namespace worker
815
}  // namespace node
816
817
3596
NODE_MODULE_CONTEXT_AWARE_INTERNAL(messaging, node::worker::InitMessaging)