GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/node_messaging.cc Lines: 432 460 93.9 %
Date: 2019-07-28 22:34:34 Branches: 211 300 70.3 %

Line Branch Exec Source
1
#include "node_messaging.h"
2
3
#include "async_wrap-inl.h"
4
#include "debug_utils.h"
5
#include "memory_tracker-inl.h"
6
#include "node_contextify.h"
7
#include "node_buffer.h"
8
#include "node_errors.h"
9
#include "node_process.h"
10
#include "util-inl.h"
11
12
using node::contextify::ContextifyContext;
13
using v8::Array;
14
using v8::ArrayBuffer;
15
using v8::ArrayBufferCreationMode;
16
using v8::Context;
17
using v8::EscapableHandleScope;
18
using v8::Exception;
19
using v8::Function;
20
using v8::FunctionCallbackInfo;
21
using v8::FunctionTemplate;
22
using v8::Global;
23
using v8::HandleScope;
24
using v8::Isolate;
25
using v8::Just;
26
using v8::Local;
27
using v8::Maybe;
28
using v8::MaybeLocal;
29
using v8::Nothing;
30
using v8::Object;
31
using v8::ObjectTemplate;
32
using v8::SharedArrayBuffer;
33
using v8::String;
34
using v8::Value;
35
using v8::ValueDeserializer;
36
using v8::ValueSerializer;
37
using v8::WasmModuleObject;
38
39
namespace node {
40
namespace worker {
41
42
164622
Message::Message(MallocedBuffer<char>&& buffer)
43
164622
    : main_message_buf_(std::move(buffer)) {}
44
45
95748
bool Message::IsCloseMessage() const {
46
95748
  return main_message_buf_.data == nullptr;
47
}
48
49
namespace {
50
51
// This is used to tell V8 how to read transferred host objects, like other
52
// `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
53
37658
class DeserializerDelegate : public ValueDeserializer::Delegate {
54
 public:
55
37659
  DeserializerDelegate(
56
      Message* m,
57
      Environment* env,
58
      const std::vector<MessagePort*>& message_ports,
59
      const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
60
      const std::vector<WasmModuleObject::TransferrableModule>& wasm_modules)
61
      : message_ports_(message_ports),
62
        shared_array_buffers_(shared_array_buffers),
63
37659
        wasm_modules_(wasm_modules) {}
64
65
10195
  MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
66
    // Currently, only MessagePort hosts objects are supported, so identifying
67
    // by the index in the message's MessagePort array is sufficient.
68
    uint32_t id;
69
10195
    if (!deserializer->ReadUint32(&id))
70
      return MaybeLocal<Object>();
71
10195
    CHECK_LE(id, message_ports_.size());
72
20390
    return message_ports_[id]->object(isolate);
73
  }
74
75
204
  MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
76
      Isolate* isolate, uint32_t clone_id) override {
77
204
    CHECK_LE(clone_id, shared_array_buffers_.size());
78
408
    return shared_array_buffers_[clone_id];
79
  }
80
81
2
  MaybeLocal<WasmModuleObject> GetWasmModuleFromId(
82
      Isolate* isolate, uint32_t transfer_id) override {
83
2
    CHECK_LE(transfer_id, wasm_modules_.size());
84
    return WasmModuleObject::FromTransferrableModule(
85
2
        isolate, wasm_modules_[transfer_id]);
86
  }
87
88
  ValueDeserializer* deserializer = nullptr;
89
90
 private:
91
  const std::vector<MessagePort*>& message_ports_;
92
  const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
93
  const std::vector<WasmModuleObject::TransferrableModule>& wasm_modules_;
94
};
95
96
}  // anonymous namespace
97
98
37658
MaybeLocal<Value> Message::Deserialize(Environment* env,
99
                                       Local<Context> context) {
100
37658
  CHECK(!IsCloseMessage());
101
102
37658
  EscapableHandleScope handle_scope(env->isolate());
103
  Context::Scope context_scope(context);
104
105
  // Create all necessary MessagePort handles.
106
75313
  std::vector<MessagePort*> ports(message_ports_.size());
107
47854
  for (uint32_t i = 0; i < message_ports_.size(); ++i) {
108
10195
    ports[i] = MessagePort::New(env,
109
                                context,
110
10195
                                std::move(message_ports_[i]));
111
10195
    if (ports[i] == nullptr) {
112
      for (MessagePort* port : ports) {
113
        // This will eventually release the MessagePort object itself.
114
        if (port != nullptr)
115
          port->Close();
116
      }
117
      return MaybeLocal<Value>();
118
    }
119
  }
120
37659
  message_ports_.clear();
121
122
75318
  std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
123
  // Attach all transferred SharedArrayBuffers to their new Isolate.
124
37863
  for (uint32_t i = 0; i < shared_array_buffers_.size(); ++i) {
125
    Local<SharedArrayBuffer> sab;
126
612
    if (!shared_array_buffers_[i]->GetSharedArrayBuffer(env, context)
127
612
            .ToLocal(&sab))
128
      return MaybeLocal<Value>();
129
204
    shared_array_buffers.push_back(sab);
130
  }
131
37658
  shared_array_buffers_.clear();
132
133
  DeserializerDelegate delegate(
134
75316
      this, env, ports, shared_array_buffers, wasm_modules_);
135
  ValueDeserializer deserializer(
136
      env->isolate(),
137
      reinterpret_cast<const uint8_t*>(main_message_buf_.data),
138
      main_message_buf_.size,
139
75315
      &delegate);
140
37659
  delegate.deserializer = &deserializer;
141
142
  // Attach all transferred ArrayBuffers to their new Isolate.
143
37661
  for (uint32_t i = 0; i < array_buffer_contents_.size(); ++i) {
144
2
    if (!env->isolate_data()->uses_node_allocator()) {
145
      // We don't use Node's allocator on the receiving side, so we have
146
      // to create the ArrayBuffer from a copy of the memory.
147
      AllocatedBuffer buf =
148
          env->AllocateManaged(array_buffer_contents_[i].size);
149
      memcpy(buf.data(),
150
             array_buffer_contents_[i].data,
151
             array_buffer_contents_[i].size);
152
      deserializer.TransferArrayBuffer(i, buf.ToArrayBuffer());
153
      continue;
154
    }
155
156
2
    env->isolate_data()->node_allocator()->RegisterPointer(
157
2
        array_buffer_contents_[i].data, array_buffer_contents_[i].size);
158
159
    Local<ArrayBuffer> ab =
160
        ArrayBuffer::New(env->isolate(),
161
2
                         array_buffer_contents_[i].release(),
162
2
                         array_buffer_contents_[i].size,
163
4
                         ArrayBufferCreationMode::kInternalized);
164
2
    deserializer.TransferArrayBuffer(i, ab);
165
  }
166
37659
  array_buffer_contents_.clear();
167
168
75318
  if (deserializer.ReadHeader(context).IsNothing())
169
    return MaybeLocal<Value>();
170
  return handle_scope.Escape(
171
75315
      deserializer.ReadValue(context).FromMaybe(Local<Value>()));
172
}
173
174
217
void Message::AddSharedArrayBuffer(
175
    const SharedArrayBufferMetadataReference& reference) {
176
217
  shared_array_buffers_.push_back(reference);
177
217
}
178
179
10218
void Message::AddMessagePort(std::unique_ptr<MessagePortData>&& data) {
180
10218
  message_ports_.emplace_back(std::move(data));
181
10218
}
182
183
2
uint32_t Message::AddWASMModule(WasmModuleObject::TransferrableModule&& mod) {
184
2
  wasm_modules_.emplace_back(std::move(mod));
185
2
  return wasm_modules_.size() - 1;
186
}
187
188
namespace {
189
190
3520
MaybeLocal<Function> GetDOMException(Local<Context> context) {
191
3520
  Isolate* isolate = context->GetIsolate();
192
  Local<Object> per_context_bindings;
193
  Local<Value> domexception_ctor_val;
194


17600
  if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
195
      !per_context_bindings->Get(context,
196
10560
                                FIXED_ONE_BYTE_STRING(isolate, "DOMException"))
197
14080
          .ToLocal(&domexception_ctor_val)) {
198
    return MaybeLocal<Function>();
199
  }
200
3520
  CHECK(domexception_ctor_val->IsFunction());
201
3520
  Local<Function> domexception_ctor = domexception_ctor_val.As<Function>();
202
3520
  return domexception_ctor;
203
}
204
205
14
void ThrowDataCloneException(Local<Context> context, Local<String> message) {
206
14
  Isolate* isolate = context->GetIsolate();
207
  Local<Value> argv[] = {message,
208
42
                         FIXED_ONE_BYTE_STRING(isolate, "DataCloneError")};
209
  Local<Value> exception;
210
  Local<Function> domexception_ctor;
211


70
  if (!GetDOMException(context).ToLocal(&domexception_ctor) ||
212
28
      !domexception_ctor->NewInstance(context, arraysize(argv), argv)
213
42
           .ToLocal(&exception)) {
214
14
    return;
215
  }
216
14
  isolate->ThrowException(exception);
217
}
218
219
// This tells V8 how to serialize objects that it does not understand
220
// (e.g. C++ objects) into the output buffer, in a way that our own
221
// DeserializerDelegate understands how to unpack.
222
37769
class SerializerDelegate : public ValueSerializer::Delegate {
223
 public:
224
37769
  SerializerDelegate(Environment* env, Local<Context> context, Message* m)
225
37769
      : env_(env), context_(context), msg_(m) {}
226
227
4
  void ThrowDataCloneError(Local<String> message) override {
228
4
    ThrowDataCloneException(context_, message);
229
4
  }
230
231
10218
  Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
232
20436
    if (env_->message_port_constructor_template()->HasInstance(object)) {
233
10217
      return WriteMessagePort(Unwrap<MessagePort>(object));
234
    }
235
236
1
    ThrowDataCloneError(env_->clone_unsupported_type_str());
237
1
    return Nothing<bool>();
238
  }
239
240
217
  Maybe<uint32_t> GetSharedArrayBufferId(
241
      Isolate* isolate,
242
      Local<SharedArrayBuffer> shared_array_buffer) override {
243
    uint32_t i;
244
226
    for (i = 0; i < seen_shared_array_buffers_.size(); ++i) {
245
18
      if (PersistentToLocal::Strong(seen_shared_array_buffers_[i]) ==
246
          shared_array_buffer) {
247
        return Just(i);
248
      }
249
    }
250
251
    auto reference = SharedArrayBufferMetadata::ForSharedArrayBuffer(
252
        env_,
253
        context_,
254
217
        shared_array_buffer);
255
217
    if (!reference) {
256
      return Nothing<uint32_t>();
257
    }
258
    seen_shared_array_buffers_.emplace_back(
259
434
      Global<SharedArrayBuffer> { isolate, shared_array_buffer });
260
217
    msg_->AddSharedArrayBuffer(reference);
261
217
    return Just(i);
262
  }
263
264
2
  Maybe<uint32_t> GetWasmModuleTransferId(
265
      Isolate* isolate, Local<WasmModuleObject> module) override {
266
2
    return Just(msg_->AddWASMModule(module->GetTransferrableModule()));
267
  }
268
269
37755
  void Finish() {
270
    // Only close the MessagePort handles and actually transfer them
271
    // once we know that serialization succeeded.
272
47973
    for (MessagePort* port : ports_) {
273
20436
      port->Close();
274
10218
      msg_->AddMessagePort(port->Detach());
275
    }
276
37755
  }
277
278
  ValueSerializer* serializer = nullptr;
279
280
 private:
281
10217
  Maybe<bool> WriteMessagePort(MessagePort* port) {
282
10217
    for (uint32_t i = 0; i < ports_.size(); i++) {
283
10217
      if (ports_[i] == port) {
284
10217
        serializer->WriteUint32(i);
285
10217
        return Just(true);
286
      }
287
    }
288
289
    THROW_ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST(env_);
290
    return Nothing<bool>();
291
  }
292
293
  Environment* env_;
294
  Local<Context> context_;
295
  Message* msg_;
296
  std::vector<Global<SharedArrayBuffer>> seen_shared_array_buffers_;
297
  std::vector<MessagePort*> ports_;
298
299
  friend class worker::Message;
300
};
301
302
}  // anonymous namespace
303
304
37769
Maybe<bool> Message::Serialize(Environment* env,
305
                               Local<Context> context,
306
                               Local<Value> input,
307
                               Local<Value> transfer_list_v,
308
                               Local<Object> source_port) {
309
37769
  HandleScope handle_scope(env->isolate());
310
  Context::Scope context_scope(context);
311
312
  // Verify that we're not silently overwriting an existing message.
313
37769
  CHECK(main_message_buf_.is_empty());
314
315
75538
  SerializerDelegate delegate(env, context, this);
316
75538
  ValueSerializer serializer(env->isolate(), &delegate);
317
37769
  delegate.serializer = &serializer;
318
319
75538
  std::vector<Local<ArrayBuffer>> array_buffers;
320
37769
  if (transfer_list_v->IsArray()) {
321
10232
    Local<Array> transfer_list = transfer_list_v.As<Array>();
322
10232
    uint32_t length = transfer_list->Length();
323
20463
    for (uint32_t i = 0; i < length; ++i) {
324
      Local<Value> entry;
325
20482
      if (!transfer_list->Get(context, i).ToLocal(&entry))
326
10
        return Nothing<bool>();
327
      // Currently, we support ArrayBuffers and MessagePorts.
328
10241
      if (entry->IsArrayBuffer()) {
329
12
        Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
330
        // If we cannot render the ArrayBuffer unusable in this Isolate and
331
        // take ownership of its memory, copying the buffer will have to do.
332


36
        if (!ab->IsDetachable() || ab->IsExternal() ||
333
12
            !env->isolate_data()->uses_node_allocator()) {
334
11
          continue;
335
        }
336
12
        if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
337
            array_buffers.end()) {
338
          ThrowDataCloneException(
339
              context,
340
              FIXED_ONE_BYTE_STRING(
341
                  env->isolate(),
342
1
                  "Transfer list contains duplicate ArrayBuffer"));
343
1
          return Nothing<bool>();
344
        }
345
        // We simply use the array index in the `array_buffers` list as the
346
        // ID that we write into the serialized buffer.
347
11
        uint32_t id = array_buffers.size();
348
11
        array_buffers.push_back(ab);
349
11
        serializer.TransferArrayBuffer(id, ab);
350
11
        continue;
351
20458
      } else if (env->message_port_constructor_template()
352
30687
                    ->HasInstance(entry)) {
353
        // Check if the source MessagePort is being transferred.
354

20458
        if (!source_port.IsEmpty() && entry == source_port) {
355
          ThrowDataCloneException(
356
              context,
357
              FIXED_ONE_BYTE_STRING(env->isolate(),
358
1
                                    "Transfer list contains source port"));
359
10
          return Nothing<bool>();
360
        }
361
10228
        MessagePort* port = Unwrap<MessagePort>(entry.As<Object>());
362

10228
        if (port == nullptr || port->IsDetached()) {
363
          ThrowDataCloneException(
364
              context,
365
              FIXED_ONE_BYTE_STRING(
366
                  env->isolate(),
367
7
                  "MessagePort in transfer list is already detached"));
368
7
          return Nothing<bool>();
369
        }
370
10221
        if (std::find(delegate.ports_.begin(), delegate.ports_.end(), port) !=
371
            delegate.ports_.end()) {
372
          ThrowDataCloneException(
373
              context,
374
              FIXED_ONE_BYTE_STRING(
375
                  env->isolate(),
376
1
                  "Transfer list contains duplicate MessagePort"));
377
1
          return Nothing<bool>();
378
        }
379
10220
        delegate.ports_.push_back(port);
380
10220
        continue;
381
      }
382
383
      THROW_ERR_INVALID_TRANSFER_OBJECT(env);
384
      return Nothing<bool>();
385
    }
386
  }
387
388
37759
  serializer.WriteHeader();
389
75518
  if (serializer.WriteValue(context, input).IsNothing()) {
390
4
    return Nothing<bool>();
391
  }
392
393
37758
  for (Local<ArrayBuffer> ab : array_buffers) {
394
    // If serialization succeeded, we want to take ownership of
395
    // (a.k.a. externalize) the underlying memory region and render
396
    // it inaccessible in this Isolate.
397
3
    ArrayBuffer::Contents contents = ab->Externalize();
398
3
    ab->Detach();
399
400
3
    CHECK(env->isolate_data()->uses_node_allocator());
401
3
    env->isolate_data()->node_allocator()->UnregisterPointer(
402
3
        contents.Data(), contents.ByteLength());
403
404
    array_buffer_contents_.emplace_back(MallocedBuffer<char>{
405
3
        static_cast<char*>(contents.Data()), contents.ByteLength()});
406
  }
407
408
37755
  delegate.Finish();
409
410
  // The serializer gave us a buffer allocated using `malloc()`.
411
37755
  std::pair<uint8_t*, size_t> data = serializer.Release();
412
37755
  CHECK_NOT_NULL(data.first);
413
75510
  main_message_buf_ =
414
37755
      MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
415
75524
  return Just(true);
416
}
417
418
4
void Message::MemoryInfo(MemoryTracker* tracker) const {
419
4
  tracker->TrackField("array_buffer_contents", array_buffer_contents_);
420
  tracker->TrackFieldWithSize("shared_array_buffers",
421
4
      shared_array_buffers_.size() * sizeof(shared_array_buffers_[0]));
422
4
  tracker->TrackField("message_ports", message_ports_);
423
4
}
424
425
31289
MessagePortData::MessagePortData(MessagePort* owner) : owner_(owner) { }
426
427
93692
MessagePortData::~MessagePortData() {
428
31225
  CHECK_NULL(owner_);
429
31225
  Disentangle();
430
62467
}
431
432
8
void MessagePortData::MemoryInfo(MemoryTracker* tracker) const {
433
8
  Mutex::ScopedLock lock(mutex_);
434
8
  tracker->TrackField("incoming_messages", incoming_messages_);
435
8
}
436
437
100243
void MessagePortData::AddToIncomingQueue(Message&& message) {
438
  // This function will be called by other threads.
439
100243
  Mutex::ScopedLock lock(mutex_);
440
100264
  incoming_messages_.emplace_back(std::move(message));
441
442
100251
  if (owner_ != nullptr) {
443
47554
    Debug(owner_, "Adding message to incoming queue");
444
47554
    owner_->TriggerAsync();
445
100251
  }
446
100259
}
447
448
10450
void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
449
10450
  CHECK_NULL(a->sibling_);
450
10450
  CHECK_NULL(b->sibling_);
451
10450
  a->sibling_ = b;
452
10450
  b->sibling_ = a;
453
10450
  a->sibling_mutex_ = b->sibling_mutex_;
454
10450
}
455
456
52039
void MessagePortData::Disentangle() {
457
  // Grab a copy of the sibling mutex, then replace it so that each sibling
458
  // has its own sibling_mutex_ now.
459
52039
  std::shared_ptr<Mutex> sibling_mutex = sibling_mutex_;
460
104101
  Mutex::ScopedLock sibling_lock(*sibling_mutex);
461
52062
  sibling_mutex_ = std::make_shared<Mutex>();
462
463
52062
  MessagePortData* sibling = sibling_;
464
52062
  if (sibling_ != nullptr) {
465
10448
    sibling_->sibling_ = nullptr;
466
10448
    sibling_ = nullptr;
467
  }
468
469
  // We close MessagePorts after disentanglement, so we enqueue a corresponding
470
  // message and trigger the corresponding uv_async_t to let them know that
471
  // this happened.
472
52062
  AddToIncomingQueue(Message());
473
52046
  if (sibling != nullptr) {
474
10448
    sibling->AddToIncomingQueue(Message());
475
52060
  }
476
52060
}
477
478
93117
MessagePort::~MessagePort() {
479
31039
  if (data_)
480
    data_->owner_ = nullptr;
481
62078
}
482
483
31083
MessagePort::MessagePort(Environment* env,
484
                         Local<Context> context,
485
                         Local<Object> wrap)
486
  : HandleWrap(env,
487
               wrap,
488
               reinterpret_cast<uv_handle_t*>(&async_),
489
               AsyncWrap::PROVIDER_MESSAGEPORT),
490
31083
    data_(new MessagePortData(this)) {
491
84301
  auto onmessage = [](uv_async_t* handle) {
492
    // Called when data has been put into the queue.
493
26609
    MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
494
26609
    channel->OnMessage();
495
84297
  };
496
31083
  CHECK_EQ(uv_async_init(env->event_loop(),
497
                         &async_,
498
                         onmessage), 0);
499
31083
  async_.data = static_cast<void*>(this);
500
501
  Local<Value> fn;
502
93249
  if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn))
503
31083
    return;
504
505
31083
  if (fn->IsFunction()) {
506
31082
    Local<Function> init = fn.As<Function>();
507
31082
    USE(init->Call(context, wrap, 0, nullptr));
508
  }
509
510
31083
  Debug(this, "Created message port");
511
}
512
513
10228
bool MessagePort::IsDetached() const {
514

10228
  return data_ == nullptr || IsHandleClosing();
515
}
516
517
68366
void MessagePort::TriggerAsync() {
518
136734
  if (IsHandleClosing()) return;
519
68360
  CHECK_EQ(uv_async_send(&async_), 0);
520
}
521
522
31274
void MessagePort::Close(v8::Local<v8::Value> close_callback) {
523
62548
  Debug(this, "Closing message port, data set = %d", static_cast<int>(!!data_));
524
525
31274
  if (data_) {
526
    // Wrap this call with accessing the mutex, so that TriggerAsync()
527
    // can check IsHandleClosing() without race conditions.
528
31273
    Mutex::ScopedLock sibling_lock(data_->mutex_);
529
31273
    HandleWrap::Close(close_callback);
530
  } else {
531
1
    HandleWrap::Close(close_callback);
532
  }
533
31274
}
534
535
2
void MessagePort::New(const FunctionCallbackInfo<Value>& args) {
536
  // This constructor just throws an error. Unfortunately, we can’t use V8’s
537
  // ConstructorBehavior::kThrow, as that also removes the prototype from the
538
  // class (i.e. makes it behave like an arrow function).
539
2
  Environment* env = Environment::GetCurrent(args);
540
2
  THROW_ERR_CONSTRUCT_CALL_INVALID(env);
541
2
}
542
543
31083
MessagePort* MessagePort::New(
544
    Environment* env,
545
    Local<Context> context,
546
    std::unique_ptr<MessagePortData> data) {
547
  Context::Scope context_scope(context);
548
31083
  Local<FunctionTemplate> ctor_templ = GetMessagePortConstructorTemplate(env);
549
550
  // Construct a new instance, then assign the listener instance and possibly
551
  // the MessagePortData to it.
552
  Local<Object> instance;
553
93249
  if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance))
554
    return nullptr;
555
31083
  MessagePort* port = new MessagePort(env, context, instance);
556
31082
  CHECK_NOT_NULL(port);
557
31082
  if (data) {
558
10389
    port->Detach();
559
10387
    port->data_ = std::move(data);
560
561
    // This lock is here to avoid race conditions with the `owner_` read
562
    // in AddToIncomingQueue(). (This would likely be unproblematic without it,
563
    // but it's better to be safe than sorry.)
564
10389
    Mutex::ScopedLock lock(port->data_->mutex_);
565
10389
    port->data_->owner_ = port;
566
    // If the existing MessagePortData object had pending messages, this is
567
    // the easiest way to run that queue.
568
10387
    port->TriggerAsync();
569
  }
570
31083
  return port;
571
}
572
573
64357
MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
574
                                              bool only_if_receiving) {
575
64357
  Message received;
576
  {
577
    // Get the head of the message queue.
578
64357
    Mutex::ScopedLock lock(data_->mutex_);
579
580
64357
    Debug(this, "MessagePort has message");
581
582

64358
    bool wants_message = receiving_messages_ || !only_if_receiving;
583
    // We have nothing to do if:
584
    // - There are no pending messages
585
    // - We are not intending to receive messages, and the message we would
586
    //   receive is not the final "close" message.
587

128718
    if (data_->incoming_messages_.empty() ||
588
58102
        (!wants_message &&
589
10021
         !data_->incoming_messages_.front().IsCloseMessage())) {
590
32558
      return env()->no_message_symbol();
591
    }
592
593
48078
    received = std::move(data_->incoming_messages_.front());
594
48079
    data_->incoming_messages_.pop_front();
595
  }
596
597
48079
  if (received.IsCloseMessage()) {
598
20830
    Close();
599
20830
    return env()->no_message_symbol();
600
  }
601
602
37669
  if (!env()->can_call_into_js()) return MaybeLocal<Value>();
603
604
37659
  return received.Deserialize(env(), context);
605
}
606
607
26723
void MessagePort::OnMessage() {
608
26723
  Debug(this, "Running MessagePort::OnMessage()");
609
26723
  HandleScope handle_scope(env()->isolate());
610
53446
  Local<Context> context = object(env()->isolate())->CreationContext();
611
612
  // data_ can only ever be modified by the owner thread, so no need to lock.
613
  // However, the message port may be transferred while it is processing
614
  // messages, so we need to check that this handle still owns its `data_` field
615
  // on every iteration.
616

91074
  while (data_) {
617
64351
    HandleScope handle_scope(env()->isolate());
618

37628
    Context::Scope context_scope(context);
619
620
    Local<Value> payload;
621
128694
    if (!ReceiveMessage(context, true).ToLocal(&payload)) break;
622
128685
    if (payload == env()->no_message_symbol()) break;
623
624
37655
    if (!env()->can_call_into_js()) {
625
      Debug(this, "MessagePort drains queue because !can_call_into_js()");
626
      // In this case there is nothing to do but to drain the current queue.
627
      continue;
628
    }
629
630
    Local<Object> event;
631
75308
    Local<Value> cb_args[1];
632

150613
    if (!env()->message_event_object_template()->NewInstance(context)
633

225912
            .ToLocal(&event) ||
634

263547
        event->Set(context, env()->data_string(), payload).IsNothing() ||
635


263559
        event->Set(context, env()->target_string(), object()).IsNothing() ||
636

188261
        (cb_args[0] = event, false) ||
637
        MakeCallback(env()->onmessage_string(),
638
37651
                     arraysize(cb_args),
639
112959
                     cb_args).IsEmpty()) {
640
      // Re-schedule OnMessage() execution in case of failure.
641
24
      if (data_)
642
24
        TriggerAsync();
643
26744
      return;
644
    }
645
64324
  }
646
}
647
648
31037
void MessagePort::OnClose() {
649
31037
  Debug(this, "MessagePort::OnClose()");
650
31036
  if (data_) {
651
20820
    data_->owner_ = nullptr;
652
20820
    data_->Disentangle();
653
  }
654
31037
  data_.reset();
655
31039
}
656
657
20607
std::unique_ptr<MessagePortData> MessagePort::Detach() {
658
20607
  CHECK(data_);
659
20608
  Mutex::ScopedLock lock(data_->mutex_);
660
20608
  data_->owner_ = nullptr;
661
20608
  return std::move(data_);
662
}
663
664
665
37768
Maybe<bool> MessagePort::PostMessage(Environment* env,
666
                                     Local<Value> message_v,
667
                                     Local<Value> transfer_v) {
668
37768
  Isolate* isolate = env->isolate();
669
37768
  Local<Object> obj = object(isolate);
670
37768
  Local<Context> context = obj->CreationContext();
671
672
37768
  Message msg;
673
674
  // Per spec, we need to both check if transfer list has the source port, and
675
  // serialize the input message, even if the MessagePort is closed or detached.
676
677
  Maybe<bool> serialization_maybe =
678
37768
      msg.Serialize(env, context, message_v, transfer_v, obj);
679
37768
  if (data_ == nullptr) {
680
2
    return serialization_maybe;
681
  }
682
37766
  if (serialization_maybe.IsNothing()) {
683
11
    return Nothing<bool>();
684
  }
685
686
75510
  Mutex::ScopedLock lock(*data_->sibling_mutex_);
687
37755
  bool doomed = false;
688
689
  // Check if the target port is posted to itself.
690
37755
  if (data_->sibling_ != nullptr) {
691
47972
    for (const auto& port_data : msg.message_ports()) {
692
10218
      if (data_->sibling_ == port_data.get()) {
693
1
        doomed = true;
694
        ProcessEmitWarning(env, "The target port was posted to itself, and "
695
1
                                "the communication channel was lost");
696
1
        break;
697
      }
698
    }
699
  }
700
701

37755
  if (data_->sibling_ == nullptr || doomed)
702
1
    return Just(true);
703
704
37754
  data_->sibling_->AddToIncomingQueue(std::move(msg));
705
75522
  return Just(true);
706
}
707
708
37773
void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
709
37773
  Environment* env = Environment::GetCurrent(args);
710
37773
  if (args.Length() == 0) {
711
    return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
712
                                       "MessagePort.postMessage");
713
  }
714


171566
  if (!args[1]->IsNullOrUndefined() && !args[1]->IsObject()) {
715
    // Browsers ignore null or undefined, and otherwise accept an array or an
716
    // options object.
717
    // TODO(addaleax): Add support for an options object and generic sequence
718
    // support.
719
    // Refs: https://github.com/nodejs/node/pull/28033#discussion_r289964991
720
    return THROW_ERR_INVALID_ARG_TYPE(env,
721
4
        "Optional transferList argument must be an array");
722
  }
723
724
37769
  MessagePort* port = Unwrap<MessagePort>(args.This());
725
  // Even if the backing MessagePort object has already been deleted, we still
726
  // want to serialize the message to ensure spec-compliant behavior w.r.t.
727
  // transfers.
728
37769
  if (port == nullptr) {
729
1
    Message msg;
730
1
    Local<Object> obj = args.This();
731
1
    Local<Context> context = obj->CreationContext();
732
1
    USE(msg.Serialize(env, context, args[0], args[1], obj));
733
1
    return;
734
  }
735
736
37768
  port->PostMessage(env, args[0], args[1]);
737
}
738
739
21159
void MessagePort::Start() {
740
21159
  Debug(this, "Start receiving messages");
741
21159
  receiving_messages_ = true;
742
21159
  Mutex::ScopedLock lock(data_->mutex_);
743
21159
  if (!data_->incoming_messages_.empty())
744
10401
    TriggerAsync();
745
21159
}
746
747
20048
void MessagePort::Stop() {
748
20048
  Debug(this, "Stop receiving messages");
749
20048
  receiving_messages_ = false;
750
20048
}
751
752
21160
void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
753
  MessagePort* port;
754
21161
  ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
755
21160
  if (!port->data_) {
756
1
    return;
757
  }
758
21159
  port->Start();
759
}
760
761
20125
void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
762
  MessagePort* port;
763
40250
  CHECK(args[0]->IsObject());
764
40327
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
765
20050
  if (!port->data_) {
766
2
    return;
767
  }
768
20048
  port->Stop();
769
}
770
771
406
void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
772
  MessagePort* port;
773
1218
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
774
114
  port->OnMessage();
775
}
776
777
6
void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) {
778
12
  CHECK(args[0]->IsObject());
779
12
  MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
780
6
  if (port == nullptr) {
781
    // Return 'no messages' for a closed port.
782
    args.GetReturnValue().Set(
783
        Environment::GetCurrent(args)->no_message_symbol());
784
6
    return;
785
  }
786
787
  MaybeLocal<Value> payload =
788
12
      port->ReceiveMessage(port->object()->CreationContext(), false);
789
6
  if (!payload.IsEmpty())
790
12
    args.GetReturnValue().Set(payload.ToLocalChecked());
791
}
792
793
1
void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
794
1
  Environment* env = Environment::GetCurrent(args);
795


6
  if (!args[0]->IsObject() ||
796
3
      !env->message_port_constructor_template()->HasInstance(args[0])) {
797
    return THROW_ERR_INVALID_ARG_TYPE(env,
798
        "First argument needs to be a MessagePort instance");
799
  }
800
2
  MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
801
1
  CHECK_NOT_NULL(port);
802
803
1
  Local<Value> context_arg = args[1];
804
  ContextifyContext* context_wrapper;
805

4
  if (!context_arg->IsObject() ||
806
      (context_wrapper = ContextifyContext::ContextFromContextifiedSandbox(
807
3
          env, context_arg.As<Object>())) == nullptr) {
808
    return THROW_ERR_INVALID_ARG_TYPE(env, "Invalid context argument");
809
  }
810
811
1
  std::unique_ptr<MessagePortData> data;
812
1
  if (!port->IsDetached())
813
1
    data = port->Detach();
814
815
1
  Context::Scope context_scope(context_wrapper->context());
816
  MessagePort* target =
817
1
      MessagePort::New(env, context_wrapper->context(), std::move(data));
818
1
  if (target != nullptr)
819
4
    args.GetReturnValue().Set(target->object());
820
}
821
822
10244
void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
823
10244
  Entangle(a, b->data_.get());
824
10244
}
825
826
10450
void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
827
10450
  MessagePortData::Entangle(a->data_.get(), b);
828
10450
}
829
830
38095
Local<FunctionTemplate> GetMessagePortConstructorTemplate(Environment* env) {
831
  // Factor generating the MessagePort JS constructor into its own piece
832
  // of code, because it is needed early on in the child environment setup.
833
38095
  Local<FunctionTemplate> templ = env->message_port_constructor_template();
834
38095
  if (!templ.IsEmpty())
835
34589
    return templ;
836
837
3506
  Isolate* isolate = env->isolate();
838
839
  {
840
3506
    Local<FunctionTemplate> m = env->NewFunctionTemplate(MessagePort::New);
841
7012
    m->SetClassName(env->message_port_constructor_string());
842
7012
    m->InstanceTemplate()->SetInternalFieldCount(1);
843
7012
    m->Inherit(HandleWrap::GetConstructorTemplate(env));
844
845
3506
    env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage);
846
3506
    env->SetProtoMethod(m, "start", MessagePort::Start);
847
848
3506
    env->set_message_port_constructor_template(m);
849
850
3506
    Local<FunctionTemplate> event_ctor = FunctionTemplate::New(isolate);
851
7012
    event_ctor->SetClassName(FIXED_ONE_BYTE_STRING(isolate, "MessageEvent"));
852
3506
    Local<ObjectTemplate> e = event_ctor->InstanceTemplate();
853
7012
    e->Set(env->data_string(), Null(isolate));
854
7012
    e->Set(env->target_string(), Null(isolate));
855
3506
    env->set_message_event_object_template(e);
856
  }
857
858
3506
  return GetMessagePortConstructorTemplate(env);
859
}
860
861
namespace {
862
863
10245
static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
864
10245
  Environment* env = Environment::GetCurrent(args);
865
10245
  if (!args.IsConstructCall()) {
866
1
    THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
867
10246
    return;
868
  }
869
870
20488
  Local<Context> context = args.This()->CreationContext();
871
  Context::Scope context_scope(context);
872
873
10244
  MessagePort* port1 = MessagePort::New(env, context);
874
10244
  MessagePort* port2 = MessagePort::New(env, context);
875
10244
  MessagePort::Entangle(port1, port2);
876
877
51220
  args.This()->Set(context, env->port1_string(), port1->object())
878
20488
      .Check();
879
51220
  args.This()->Set(context, env->port2_string(), port2->object())
880
20488
      .Check();
881
}
882
883
3506
static void InitMessaging(Local<Object> target,
884
                          Local<Value> unused,
885
                          Local<Context> context,
886
                          void* priv) {
887
3506
  Environment* env = Environment::GetCurrent(context);
888
889
  {
890
    Local<String> message_channel_string =
891
3506
        FIXED_ONE_BYTE_STRING(env->isolate(), "MessageChannel");
892
3506
    Local<FunctionTemplate> templ = env->NewFunctionTemplate(MessageChannel);
893
3506
    templ->SetClassName(message_channel_string);
894
    target->Set(context,
895
                message_channel_string,
896
10518
                templ->GetFunction(context).ToLocalChecked()).Check();
897
  }
898
899
  target->Set(context,
900
              env->message_port_constructor_string(),
901
              GetMessagePortConstructorTemplate(env)
902
17530
                  ->GetFunction(context).ToLocalChecked()).Check();
903
904
  // These are not methods on the MessagePort prototype, because
905
  // the browser equivalents do not provide them.
906
3506
  env->SetMethod(target, "stopMessagePort", MessagePort::Stop);
907
3506
  env->SetMethod(target, "drainMessagePort", MessagePort::Drain);
908
3506
  env->SetMethod(target, "receiveMessageOnPort", MessagePort::ReceiveMessage);
909
  env->SetMethod(target, "moveMessagePortToContext",
910
3506
                 MessagePort::MoveToContext);
911
912
  {
913
7012
    Local<Function> domexception = GetDOMException(context).ToLocalChecked();
914
    target
915
        ->Set(context,
916
              FIXED_ONE_BYTE_STRING(env->isolate(), "DOMException"),
917
10518
              domexception)
918
7012
        .Check();
919
  }
920
3506
}
921
922
}  // anonymous namespace
923
924
}  // namespace worker
925
}  // namespace node
926
927
4827
NODE_MODULE_CONTEXT_AWARE_INTERNAL(messaging, node::worker::InitMessaging)