GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/node_messaging.cc Lines: 471 502 93.8 %
Date: 2019-09-07 22:28:56 Branches: 244 344 70.9 %

Line Branch Exec Source
1
#include "node_messaging.h"
2
3
#include "async_wrap-inl.h"
4
#include "debug_utils.h"
5
#include "memory_tracker-inl.h"
6
#include "node_contextify.h"
7
#include "node_buffer.h"
8
#include "node_errors.h"
9
#include "node_process.h"
10
#include "util-inl.h"
11
12
using node::contextify::ContextifyContext;
13
using v8::Array;
14
using v8::ArrayBuffer;
15
using v8::ArrayBufferCreationMode;
16
using v8::Context;
17
using v8::EscapableHandleScope;
18
using v8::Exception;
19
using v8::Function;
20
using v8::FunctionCallbackInfo;
21
using v8::FunctionTemplate;
22
using v8::Global;
23
using v8::HandleScope;
24
using v8::Isolate;
25
using v8::Just;
26
using v8::Local;
27
using v8::Maybe;
28
using v8::MaybeLocal;
29
using v8::Nothing;
30
using v8::Object;
31
using v8::ObjectTemplate;
32
using v8::SharedArrayBuffer;
33
using v8::String;
34
using v8::Symbol;
35
using v8::Value;
36
using v8::ValueDeserializer;
37
using v8::ValueSerializer;
38
using v8::WasmModuleObject;
39
40
namespace node {
41
namespace worker {
42
43
166191
Message::Message(MallocedBuffer<char>&& buffer)
44
166191
    : main_message_buf_(std::move(buffer)) {}
45
46
96206
bool Message::IsCloseMessage() const {
47
96206
  return main_message_buf_.data == nullptr;
48
}
49
50
namespace {
51
52
// This is used to tell V8 how to read transferred host objects, like other
53
// `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
54
37911
class DeserializerDelegate : public ValueDeserializer::Delegate {
55
 public:
56
37908
  DeserializerDelegate(
57
      Message* m,
58
      Environment* env,
59
      const std::vector<MessagePort*>& message_ports,
60
      const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
61
      const std::vector<WasmModuleObject::TransferrableModule>& wasm_modules)
62
      : message_ports_(message_ports),
63
        shared_array_buffers_(shared_array_buffers),
64
37908
        wasm_modules_(wasm_modules) {}
65
66
10201
  MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
67
    // Currently, only MessagePort hosts objects are supported, so identifying
68
    // by the index in the message's MessagePort array is sufficient.
69
    uint32_t id;
70
10201
    if (!deserializer->ReadUint32(&id))
71
      return MaybeLocal<Object>();
72
10201
    CHECK_LE(id, message_ports_.size());
73
20402
    return message_ports_[id]->object(isolate);
74
  }
75
76
211
  MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
77
      Isolate* isolate, uint32_t clone_id) override {
78
211
    CHECK_LE(clone_id, shared_array_buffers_.size());
79
422
    return shared_array_buffers_[clone_id];
80
  }
81
82
2
  MaybeLocal<WasmModuleObject> GetWasmModuleFromId(
83
      Isolate* isolate, uint32_t transfer_id) override {
84
2
    CHECK_LE(transfer_id, wasm_modules_.size());
85
    return WasmModuleObject::FromTransferrableModule(
86
2
        isolate, wasm_modules_[transfer_id]);
87
  }
88
89
  ValueDeserializer* deserializer = nullptr;
90
91
 private:
92
  const std::vector<MessagePort*>& message_ports_;
93
  const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
94
  const std::vector<WasmModuleObject::TransferrableModule>& wasm_modules_;
95
};
96
97
}  // anonymous namespace
98
99
37886
MaybeLocal<Value> Message::Deserialize(Environment* env,
100
                                       Local<Context> context) {
101
37886
  CHECK(!IsCloseMessage());
102
103
37908
  EscapableHandleScope handle_scope(env->isolate());
104
  Context::Scope context_scope(context);
105
106
  // Create all necessary MessagePort handles.
107
75828
  std::vector<MessagePort*> ports(message_ports_.size());
108
48111
  for (uint32_t i = 0; i < message_ports_.size(); ++i) {
109
10201
    ports[i] = MessagePort::New(env,
110
                                context,
111
10201
                                std::move(message_ports_[i]));
112
10201
    if (ports[i] == nullptr) {
113
      for (MessagePort* port : ports) {
114
        // This will eventually release the MessagePort object itself.
115
        if (port != nullptr)
116
          port->Close();
117
      }
118
      return MaybeLocal<Value>();
119
    }
120
  }
121
37890
  message_ports_.clear();
122
123
75839
  std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
124
  // Attach all transferred SharedArrayBuffers to their new Isolate.
125
38133
  for (uint32_t i = 0; i < shared_array_buffers_.size(); ++i) {
126
    Local<SharedArrayBuffer> sab;
127
633
    if (!shared_array_buffers_[i]->GetSharedArrayBuffer(env, context)
128
633
            .ToLocal(&sab))
129
      return MaybeLocal<Value>();
130
211
    shared_array_buffers.push_back(sab);
131
  }
132
37880
  shared_array_buffers_.clear();
133
134
  DeserializerDelegate delegate(
135
75838
      this, env, ports, shared_array_buffers, wasm_modules_);
136
  ValueDeserializer deserializer(
137
      env->isolate(),
138
      reinterpret_cast<const uint8_t*>(main_message_buf_.data),
139
      main_message_buf_.size,
140
75824
      &delegate);
141
37916
  delegate.deserializer = &deserializer;
142
143
  // Attach all transferred ArrayBuffers to their new Isolate.
144
37922
  for (uint32_t i = 0; i < array_buffer_contents_.size(); ++i) {
145
6
    if (!env->isolate_data()->uses_node_allocator()) {
146
      // We don't use Node's allocator on the receiving side, so we have
147
      // to create the ArrayBuffer from a copy of the memory.
148
      AllocatedBuffer buf =
149
          env->AllocateManaged(array_buffer_contents_[i].size);
150
      memcpy(buf.data(),
151
             array_buffer_contents_[i].data,
152
             array_buffer_contents_[i].size);
153
      deserializer.TransferArrayBuffer(i, buf.ToArrayBuffer());
154
      continue;
155
    }
156
157
6
    env->isolate_data()->node_allocator()->RegisterPointer(
158
6
        array_buffer_contents_[i].data, array_buffer_contents_[i].size);
159
160
    Local<ArrayBuffer> ab =
161
        ArrayBuffer::New(env->isolate(),
162
6
                         array_buffer_contents_[i].release(),
163
6
                         array_buffer_contents_[i].size,
164
12
                         ArrayBufferCreationMode::kInternalized);
165
6
    deserializer.TransferArrayBuffer(i, ab);
166
  }
167
37919
  array_buffer_contents_.clear();
168
169
75822
  if (deserializer.ReadHeader(context).IsNothing())
170
    return MaybeLocal<Value>();
171
  return handle_scope.Escape(
172
75815
      deserializer.ReadValue(context).FromMaybe(Local<Value>()));
173
}
174
175
226
void Message::AddSharedArrayBuffer(
176
    const SharedArrayBufferMetadataReference& reference) {
177
226
  shared_array_buffers_.push_back(reference);
178
226
}
179
180
10226
void Message::AddMessagePort(std::unique_ptr<MessagePortData>&& data) {
181
10226
  message_ports_.emplace_back(std::move(data));
182
10226
}
183
184
2
uint32_t Message::AddWASMModule(WasmModuleObject::TransferrableModule&& mod) {
185
2
  wasm_modules_.emplace_back(std::move(mod));
186
2
  return wasm_modules_.size() - 1;
187
}
188
189
namespace {
190
191
3586
MaybeLocal<Function> GetDOMException(Local<Context> context) {
192
3586
  Isolate* isolate = context->GetIsolate();
193
  Local<Object> per_context_bindings;
194
  Local<Value> domexception_ctor_val;
195


17930
  if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
196
      !per_context_bindings->Get(context,
197
10758
                                FIXED_ONE_BYTE_STRING(isolate, "DOMException"))
198
14344
          .ToLocal(&domexception_ctor_val)) {
199
    return MaybeLocal<Function>();
200
  }
201
3586
  CHECK(domexception_ctor_val->IsFunction());
202
3586
  Local<Function> domexception_ctor = domexception_ctor_val.As<Function>();
203
3586
  return domexception_ctor;
204
}
205
206
14
void ThrowDataCloneException(Local<Context> context, Local<String> message) {
207
14
  Isolate* isolate = context->GetIsolate();
208
  Local<Value> argv[] = {message,
209
42
                         FIXED_ONE_BYTE_STRING(isolate, "DataCloneError")};
210
  Local<Value> exception;
211
  Local<Function> domexception_ctor;
212


70
  if (!GetDOMException(context).ToLocal(&domexception_ctor) ||
213
28
      !domexception_ctor->NewInstance(context, arraysize(argv), argv)
214
42
           .ToLocal(&exception)) {
215
14
    return;
216
  }
217
14
  isolate->ThrowException(exception);
218
}
219
220
// This tells V8 how to serialize objects that it does not understand
221
// (e.g. C++ objects) into the output buffer, in a way that our own
222
// DeserializerDelegate understands how to unpack.
223
38055
class SerializerDelegate : public ValueSerializer::Delegate {
224
 public:
225
38055
  SerializerDelegate(Environment* env, Local<Context> context, Message* m)
226
38055
      : env_(env), context_(context), msg_(m) {}
227
228
4
  void ThrowDataCloneError(Local<String> message) override {
229
4
    ThrowDataCloneException(context_, message);
230
4
  }
231
232
10226
  Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
233
20452
    if (env_->message_port_constructor_template()->HasInstance(object)) {
234
10225
      return WriteMessagePort(Unwrap<MessagePort>(object));
235
    }
236
237
1
    ThrowDataCloneError(env_->clone_unsupported_type_str());
238
1
    return Nothing<bool>();
239
  }
240
241
226
  Maybe<uint32_t> GetSharedArrayBufferId(
242
      Isolate* isolate,
243
      Local<SharedArrayBuffer> shared_array_buffer) override {
244
    uint32_t i;
245
235
    for (i = 0; i < seen_shared_array_buffers_.size(); ++i) {
246
18
      if (PersistentToLocal::Strong(seen_shared_array_buffers_[i]) ==
247
          shared_array_buffer) {
248
        return Just(i);
249
      }
250
    }
251
252
    auto reference = SharedArrayBufferMetadata::ForSharedArrayBuffer(
253
        env_,
254
        context_,
255
226
        shared_array_buffer);
256
226
    if (!reference) {
257
      return Nothing<uint32_t>();
258
    }
259
    seen_shared_array_buffers_.emplace_back(
260
452
      Global<SharedArrayBuffer> { isolate, shared_array_buffer });
261
226
    msg_->AddSharedArrayBuffer(reference);
262
226
    return Just(i);
263
  }
264
265
2
  Maybe<uint32_t> GetWasmModuleTransferId(
266
      Isolate* isolate, Local<WasmModuleObject> module) override {
267
2
    return Just(msg_->AddWASMModule(module->GetTransferrableModule()));
268
  }
269
270
38041
  void Finish() {
271
    // Only close the MessagePort handles and actually transfer them
272
    // once we know that serialization succeeded.
273
48267
    for (MessagePort* port : ports_) {
274
20452
      port->Close();
275
10226
      msg_->AddMessagePort(port->Detach());
276
    }
277
38041
  }
278
279
  ValueSerializer* serializer = nullptr;
280
281
 private:
282
10225
  Maybe<bool> WriteMessagePort(MessagePort* port) {
283
10225
    for (uint32_t i = 0; i < ports_.size(); i++) {
284
10225
      if (ports_[i] == port) {
285
10225
        serializer->WriteUint32(i);
286
10225
        return Just(true);
287
      }
288
    }
289
290
    THROW_ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST(env_);
291
    return Nothing<bool>();
292
  }
293
294
  Environment* env_;
295
  Local<Context> context_;
296
  Message* msg_;
297
  std::vector<Global<SharedArrayBuffer>> seen_shared_array_buffers_;
298
  std::vector<MessagePort*> ports_;
299
300
  friend class worker::Message;
301
};
302
303
}  // anonymous namespace
304
305
38055
Maybe<bool> Message::Serialize(Environment* env,
306
                               Local<Context> context,
307
                               Local<Value> input,
308
                               const TransferList& transfer_list_v,
309
                               Local<Object> source_port) {
310
38055
  HandleScope handle_scope(env->isolate());
311
  Context::Scope context_scope(context);
312
313
  // Verify that we're not silently overwriting an existing message.
314
38055
  CHECK(main_message_buf_.is_empty());
315
316
76110
  SerializerDelegate delegate(env, context, this);
317
76110
  ValueSerializer serializer(env->isolate(), &delegate);
318
38055
  delegate.serializer = &serializer;
319
320
76110
  std::vector<Local<ArrayBuffer>> array_buffers;
321
48299
  for (uint32_t i = 0; i < transfer_list_v.length(); ++i) {
322
10254
    Local<Value> entry = transfer_list_v[i];
323
    // Currently, we support ArrayBuffers and MessagePorts.
324
10254
    if (entry->IsArrayBuffer()) {
325
17
      Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
326
      // If we cannot render the ArrayBuffer unusable in this Isolate and
327
      // take ownership of its memory, copying the buffer will have to do.
328


51
      if (!ab->IsDetachable() || ab->IsExternal() ||
329
17
          !env->isolate_data()->uses_node_allocator()) {
330
16
        continue;
331
      }
332
17
      if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
333
          array_buffers.end()) {
334
        ThrowDataCloneException(
335
            context,
336
            FIXED_ONE_BYTE_STRING(
337
                env->isolate(),
338
1
                "Transfer list contains duplicate ArrayBuffer"));
339
1
        return Nothing<bool>();
340
      }
341
      // We simply use the array index in the `array_buffers` list as the
342
      // ID that we write into the serialized buffer.
343
16
      uint32_t id = array_buffers.size();
344
16
      array_buffers.push_back(ab);
345
16
      serializer.TransferArrayBuffer(id, ab);
346
16
      continue;
347
20474
    } else if (env->message_port_constructor_template()
348
30711
                  ->HasInstance(entry)) {
349
      // Check if the source MessagePort is being transferred.
350

20474
      if (!source_port.IsEmpty() && entry == source_port) {
351
        ThrowDataCloneException(
352
            context,
353
            FIXED_ONE_BYTE_STRING(env->isolate(),
354
1
                                  "Transfer list contains source port"));
355
10
        return Nothing<bool>();
356
      }
357
10236
      MessagePort* port = Unwrap<MessagePort>(entry.As<Object>());
358

10236
      if (port == nullptr || port->IsDetached()) {
359
        ThrowDataCloneException(
360
            context,
361
            FIXED_ONE_BYTE_STRING(
362
                env->isolate(),
363
7
                "MessagePort in transfer list is already detached"));
364
7
        return Nothing<bool>();
365
      }
366
10229
      if (std::find(delegate.ports_.begin(), delegate.ports_.end(), port) !=
367
          delegate.ports_.end()) {
368
        ThrowDataCloneException(
369
            context,
370
            FIXED_ONE_BYTE_STRING(
371
                env->isolate(),
372
1
                "Transfer list contains duplicate MessagePort"));
373
1
        return Nothing<bool>();
374
      }
375
10228
      delegate.ports_.push_back(port);
376
10228
      continue;
377
    }
378
379
    THROW_ERR_INVALID_TRANSFER_OBJECT(env);
380
    return Nothing<bool>();
381
  }
382
383
38045
  serializer.WriteHeader();
384
76090
  if (serializer.WriteValue(context, input).IsNothing()) {
385
4
    return Nothing<bool>();
386
  }
387
388
38049
  for (Local<ArrayBuffer> ab : array_buffers) {
389
    // If serialization succeeded, we want to take ownership of
390
    // (a.k.a. externalize) the underlying memory region and render
391
    // it inaccessible in this Isolate.
392
8
    ArrayBuffer::Contents contents = ab->Externalize();
393
8
    ab->Detach();
394
395
8
    CHECK(env->isolate_data()->uses_node_allocator());
396
8
    env->isolate_data()->node_allocator()->UnregisterPointer(
397
8
        contents.Data(), contents.ByteLength());
398
399
    array_buffer_contents_.emplace_back(MallocedBuffer<char>{
400
8
        static_cast<char*>(contents.Data()), contents.ByteLength()});
401
  }
402
403
38041
  delegate.Finish();
404
405
  // The serializer gave us a buffer allocated using `malloc()`.
406
38041
  std::pair<uint8_t*, size_t> data = serializer.Release();
407
38041
  CHECK_NOT_NULL(data.first);
408
76082
  main_message_buf_ =
409
38041
      MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
410
76096
  return Just(true);
411
}
412
413
4
void Message::MemoryInfo(MemoryTracker* tracker) const {
414
4
  tracker->TrackField("array_buffer_contents", array_buffer_contents_);
415
  tracker->TrackFieldWithSize("shared_array_buffers",
416
4
      shared_array_buffers_.size() * sizeof(shared_array_buffers_[0]));
417
4
  tracker->TrackField("message_ports", message_ports_);
418
4
}
419
420
31341
MessagePortData::MessagePortData(MessagePort* owner) : owner_(owner) { }
421
422
93866
MessagePortData::~MessagePortData() {
423
31287
  CHECK_NULL(owner_);
424
31287
  Disentangle();
425
62580
}
426
427
8
void MessagePortData::MemoryInfo(MemoryTracker* tracker) const {
428
8
  Mutex::ScopedLock lock(mutex_);
429
8
  tracker->TrackField("incoming_messages", incoming_messages_);
430
8
}
431
432
100642
void MessagePortData::AddToIncomingQueue(Message&& message) {
433
  // This function will be called by other threads.
434
100642
  Mutex::ScopedLock lock(mutex_);
435
100653
  incoming_messages_.emplace_back(std::move(message));
436
437
100650
  if (owner_ != nullptr) {
438
47832
    Debug(owner_, "Adding message to incoming queue");
439
47832
    owner_->TriggerAsync();
440
100650
  }
441
100654
}
442
443
10469
void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
444
10469
  CHECK_NULL(a->sibling_);
445
10469
  CHECK_NULL(b->sibling_);
446
10469
  a->sibling_ = b;
447
10469
  b->sibling_ = a;
448
10469
  a->sibling_mutex_ = b->sibling_mutex_;
449
10469
}
450
451
52143
void MessagePortData::Disentangle() {
452
  // Grab a copy of the sibling mutex, then replace it so that each sibling
453
  // has its own sibling_mutex_ now.
454
52143
  std::shared_ptr<Mutex> sibling_mutex = sibling_mutex_;
455
104289
  Mutex::ScopedLock sibling_lock(*sibling_mutex);
456
52148
  sibling_mutex_ = std::make_shared<Mutex>();
457
458
52148
  MessagePortData* sibling = sibling_;
459
52148
  if (sibling_ != nullptr) {
460
10465
    sibling_->sibling_ = nullptr;
461
10465
    sibling_ = nullptr;
462
  }
463
464
  // We close MessagePorts after disentanglement, so we enqueue a corresponding
465
  // message and trigger the corresponding uv_async_t to let them know that
466
  // this happened.
467
52148
  AddToIncomingQueue(Message());
468
52148
  if (sibling != nullptr) {
469
10465
    sibling->AddToIncomingQueue(Message());
470
52149
  }
471
52147
}
472
473
93249
MessagePort::~MessagePort() {
474
31083
  if (data_)
475
    data_->owner_ = nullptr;
476
62166
}
477
478
31127
MessagePort::MessagePort(Environment* env,
479
                         Local<Context> context,
480
                         Local<Object> wrap)
481
  : HandleWrap(env,
482
               wrap,
483
               reinterpret_cast<uv_handle_t*>(&async_),
484
               AsyncWrap::PROVIDER_MESSAGEPORT),
485
31127
    data_(new MessagePortData(this)) {
486
86227
  auto onmessage = [](uv_async_t* handle) {
487
    // Called when data has been put into the queue.
488
27550
    MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
489
27550
    channel->OnMessage();
490
86224
  };
491
31127
  CHECK_EQ(uv_async_init(env->event_loop(),
492
                         &async_,
493
                         onmessage), 0);
494
31127
  async_.data = static_cast<void*>(this);
495
496
  Local<Value> fn;
497
93379
  if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn))
498
31127
    return;
499
500
31126
  if (fn->IsFunction()) {
501
31126
    Local<Function> init = fn.As<Function>();
502
31126
    USE(init->Call(context, wrap, 0, nullptr));
503
  }
504
505
31127
  Debug(this, "Created message port");
506
}
507
508
10236
bool MessagePort::IsDetached() const {
509

10236
  return data_ == nullptr || IsHandleClosing();
510
}
511
512
68694
void MessagePort::TriggerAsync() {
513
137388
  if (IsHandleClosing()) return;
514
68684
  CHECK_EQ(uv_async_send(&async_), 0);
515
}
516
517
31324
void MessagePort::Close(v8::Local<v8::Value> close_callback) {
518
62648
  Debug(this, "Closing message port, data set = %d", static_cast<int>(!!data_));
519
520
31324
  if (data_) {
521
    // Wrap this call with accessing the mutex, so that TriggerAsync()
522
    // can check IsHandleClosing() without race conditions.
523
31323
    Mutex::ScopedLock sibling_lock(data_->mutex_);
524
31323
    HandleWrap::Close(close_callback);
525
  } else {
526
1
    HandleWrap::Close(close_callback);
527
  }
528
31324
}
529
530
2
void MessagePort::New(const FunctionCallbackInfo<Value>& args) {
531
  // This constructor just throws an error. Unfortunately, we can’t use V8’s
532
  // ConstructorBehavior::kThrow, as that also removes the prototype from the
533
  // class (i.e. makes it behave like an arrow function).
534
2
  Environment* env = Environment::GetCurrent(args);
535
2
  THROW_ERR_CONSTRUCT_CALL_INVALID(env);
536
2
}
537
538
31127
MessagePort* MessagePort::New(
539
    Environment* env,
540
    Local<Context> context,
541
    std::unique_ptr<MessagePortData> data) {
542
  Context::Scope context_scope(context);
543
31127
  Local<FunctionTemplate> ctor_templ = GetMessagePortConstructorTemplate(env);
544
545
  // Construct a new instance, then assign the listener instance and possibly
546
  // the MessagePortData to it.
547
  Local<Object> instance;
548
93381
  if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance))
549
    return nullptr;
550
31127
  MessagePort* port = new MessagePort(env, context, instance);
551
31127
  CHECK_NOT_NULL(port);
552
31127
  if (data) {
553
10403
    port->Detach();
554
10403
    port->data_ = std::move(data);
555
556
    // This lock is here to avoid race conditions with the `owner_` read
557
    // in AddToIncomingQueue(). (This would likely be unproblematic without it,
558
    // but it's better to be safe than sorry.)
559
10403
    Mutex::ScopedLock lock(port->data_->mutex_);
560
10403
    port->data_->owner_ = port;
561
    // If the existing MessagePortData object had pending messages, this is
562
    // the easiest way to run that queue.
563
10403
    port->TriggerAsync();
564
  }
565
31127
  return port;
566
}
567
568
65550
MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
569
                                              bool only_if_receiving) {
570
65550
  Message received;
571
  {
572
    // Get the head of the message queue.
573
65506
    Mutex::ScopedLock lock(data_->mutex_);
574
575
65553
    Debug(this, "MessagePort has message");
576
577

65555
    bool wants_message = receiving_messages_ || !only_if_receiving;
578
    // We have nothing to do if:
579
    // - There are no pending messages
580
    // - We are not intending to receive messages, and the message we would
581
    //   receive is not the final "close" message.
582

131111
    if (data_->incoming_messages_.empty() ||
583
58389
        (!wants_message &&
584
10023
         !data_->incoming_messages_.front().IsCloseMessage())) {
585
34377
      return env()->no_message_symbol();
586
    }
587
588
48367
    received = std::move(data_->incoming_messages_.front());
589
48297
    data_->incoming_messages_.pop_front();
590
  }
591
592
48368
  if (received.IsCloseMessage()) {
593
20884
    Close();
594
20884
    return env()->no_message_symbol();
595
  }
596
597
37918
  if (!env()->can_call_into_js()) return MaybeLocal<Value>();
598
599
37919
  return received.Deserialize(env(), context);
600
}
601
602
27664
void MessagePort::OnMessage() {
603
27664
  Debug(this, "Running MessagePort::OnMessage()");
604
27664
  HandleScope handle_scope(env()->isolate());
605
55328
  Local<Context> context = object(env()->isolate())->CreationContext();
606
607
  // data_ can only ever be modified by the owner thread, so no need to lock.
608
  // However, the message port may be transferred while it is processing
609
  // messages, so we need to check that this handle still owns its `data_` field
610
  // on every iteration.
611

93219
  while (data_) {
612
65544
    HandleScope handle_scope(env()->isolate());
613

37891
    Context::Scope context_scope(context);
614
615
    Local<Value> payload;
616
131090
    if (!ReceiveMessage(context, true).ToLocal(&payload)) break;
617
131093
    if (payload == env()->no_message_symbol()) break;
618
619
37922
    if (!env()->can_call_into_js()) {
620
      Debug(this, "MessagePort drains queue because !can_call_into_js()");
621
      // In this case there is nothing to do but to drain the current queue.
622
      continue;
623
    }
624
625
    Local<Object> event;
626
75744
    Local<Value> cb_args[1];
627

151607
    if (!env()->message_event_object_template()->NewInstance(context)
628

227222
            .ToLocal(&event) ||
629

265251
        event->Set(context, env()->data_string(), payload).IsNothing() ||
630


265107
        event->Set(context, env()->target_string(), object()).IsNothing() ||
631

189166
        (cb_args[0] = event, false) ||
632
        MakeCallback(env()->onmessage_string(),
633
37919
                     arraysize(cb_args),
634
113715
                     cb_args).IsEmpty()) {
635
      // Re-schedule OnMessage() execution in case of failure.
636
28
      if (data_)
637
28
        TriggerAsync();
638
27689
      return;
639
    }
640
65524
  }
641
}
642
643
31083
void MessagePort::OnClose() {
644
31083
  Debug(this, "MessagePort::OnClose()");
645
31083
  if (data_) {
646
20856
    data_->owner_ = nullptr;
647
20856
    data_->Disentangle();
648
  }
649
31081
  data_.reset();
650
31083
}
651
652
20630
std::unique_ptr<MessagePortData> MessagePort::Detach() {
653
20630
  CHECK(data_);
654
20630
  Mutex::ScopedLock lock(data_->mutex_);
655
20630
  data_->owner_ = nullptr;
656
20630
  return std::move(data_);
657
}
658
659
660
38054
Maybe<bool> MessagePort::PostMessage(Environment* env,
661
                                     Local<Value> message_v,
662
                                     const TransferList& transfer_v) {
663
38054
  Isolate* isolate = env->isolate();
664
38054
  Local<Object> obj = object(isolate);
665
38054
  Local<Context> context = obj->CreationContext();
666
667
38054
  Message msg;
668
669
  // Per spec, we need to both check if transfer list has the source port, and
670
  // serialize the input message, even if the MessagePort is closed or detached.
671
672
  Maybe<bool> serialization_maybe =
673
38054
      msg.Serialize(env, context, message_v, transfer_v, obj);
674
38054
  if (data_ == nullptr) {
675
2
    return serialization_maybe;
676
  }
677
38052
  if (serialization_maybe.IsNothing()) {
678
11
    return Nothing<bool>();
679
  }
680
681
76082
  Mutex::ScopedLock lock(*data_->sibling_mutex_);
682
38041
  bool doomed = false;
683
684
  // Check if the target port is posted to itself.
685
38041
  if (data_->sibling_ != nullptr) {
686
48266
    for (const auto& port_data : msg.message_ports()) {
687
10226
      if (data_->sibling_ == port_data.get()) {
688
1
        doomed = true;
689
        ProcessEmitWarning(env, "The target port was posted to itself, and "
690
1
                                "the communication channel was lost");
691
1
        break;
692
      }
693
    }
694
  }
695
696

38041
  if (data_->sibling_ == nullptr || doomed)
697
1
    return Just(true);
698
699
38040
  data_->sibling_->AddToIncomingQueue(std::move(msg));
700
76094
  return Just(true);
701
}
702
703
10267
static Maybe<bool> ReadIterable(Environment* env,
704
                                Local<Context> context,
705
                                // NOLINTNEXTLINE(runtime/references)
706
                                TransferList& transfer_list,
707
                                Local<Value> object) {
708
10267
  if (!object->IsObject()) return Just(false);
709
710
10264
  if (object->IsArray()) {
711
10244
    Local<Array> arr = object.As<Array>();
712
10244
    size_t length = arr->Length();
713
10244
    transfer_list.AllocateSufficientStorage(length);
714
20496
    for (size_t i = 0; i < length; i++) {
715
30756
      if (!arr->Get(context, i).ToLocal(&transfer_list[i]))
716
        return Nothing<bool>();
717
    }
718
10244
    return Just(true);
719
  }
720
721
20
  Isolate* isolate = env->isolate();
722
  Local<Value> iterator_method;
723
100
  if (!object.As<Object>()->Get(context, Symbol::GetIterator(isolate))
724
60
      .ToLocal(&iterator_method)) return Nothing<bool>();
725
20
  if (!iterator_method->IsFunction()) return Just(false);
726
727
  Local<Value> iterator;
728
24
  if (!iterator_method.As<Function>()->Call(context, object, 0, nullptr)
729
18
      .ToLocal(&iterator)) return Nothing<bool>();
730
6
  if (!iterator->IsObject()) return Just(false);
731
732
  Local<Value> next;
733
24
  if (!iterator.As<Object>()->Get(context, env->next_string()).ToLocal(&next))
734
    return Nothing<bool>();
735
6
  if (!next->IsFunction()) return Just(false);
736
737
3
  std::vector<Local<Value>> entries;
738
3
  while (env->can_call_into_js()) {
739
    Local<Value> result;
740
20
    if (!next.As<Function>()->Call(context, iterator, 0, nullptr)
741
17
        .ToLocal(&result)) return Nothing<bool>();
742
4
    if (!result->IsObject()) return Just(false);
743
744
    Local<Value> done;
745
16
    if (!result.As<Object>()->Get(context, env->done_string()).ToLocal(&done))
746
      return Nothing<bool>();
747
4
    if (done->BooleanValue(isolate)) break;
748
749
    Local<Value> val;
750
8
    if (!result.As<Object>()->Get(context, env->value_string()).ToLocal(&val))
751
      return Nothing<bool>();
752
2
    entries.push_back(val);
753
  }
754
755
2
  transfer_list.AllocateSufficientStorage(entries.size());
756
2
  std::copy(entries.begin(), entries.end(), &transfer_list[0]);
757
2
  return Just(true);
758
}
759
760
38067
void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
761
38067
  Environment* env = Environment::GetCurrent(args);
762
38067
  Local<Object> obj = args.This();
763
38067
  Local<Context> context = obj->CreationContext();
764
765
38067
  if (args.Length() == 0) {
766
    return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
767
13
                                       "MessagePort.postMessage");
768
  }
769
770


172788
  if (!args[1]->IsNullOrUndefined() && !args[1]->IsObject()) {
771
    // Browsers ignore null or undefined, and otherwise accept an array or an
772
    // options object.
773
    return THROW_ERR_INVALID_ARG_TYPE(env,
774
4
        "Optional transferList argument must be an iterable");
775
  }
776
777
38063
  TransferList transfer_list;
778
76126
  if (args[1]->IsObject()) {
779
    bool was_iterable;
780
20512
    if (!ReadIterable(env, context, transfer_list, args[1]).To(&was_iterable))
781
8
      return;
782
10256
    if (!was_iterable) {
783
      Local<Value> transfer_option;
784
78
      if (!args[1].As<Object>()->Get(context, env->transfer_string())
785
47
          .ToLocal(&transfer_option)) return;
786
26
      if (!transfer_option->IsUndefined()) {
787
22
        if (!ReadIterable(env, context, transfer_list, transfer_option)
788
34
            .To(&was_iterable)) return;
789
10
        if (!was_iterable) {
790
          return THROW_ERR_INVALID_ARG_TYPE(env,
791
7
              "Optional options.transfer argument must be an iterable");
792
        }
793
      }
794
    }
795
  }
796
797
38055
  MessagePort* port = Unwrap<MessagePort>(args.This());
798
  // Even if the backing MessagePort object has already been deleted, we still
799
  // want to serialize the message to ensure spec-compliant behavior w.r.t.
800
  // transfers.
801
38055
  if (port == nullptr) {
802
1
    Message msg;
803
1
    USE(msg.Serialize(env, context, args[0], transfer_list, obj));
804
1
    return;
805
  }
806
807
38054
  port->PostMessage(env, args[0], transfer_list);
808
}
809
810
21207
void MessagePort::Start() {
811
21207
  Debug(this, "Start receiving messages");
812
21207
  receiving_messages_ = true;
813
21207
  Mutex::ScopedLock lock(data_->mutex_);
814
21207
  if (!data_->incoming_messages_.empty())
815
10431
    TriggerAsync();
816
21207
}
817
818
20043
void MessagePort::Stop() {
819
20043
  Debug(this, "Stop receiving messages");
820
20043
  receiving_messages_ = false;
821
20043
}
822
823
21208
void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
824
  MessagePort* port;
825
21209
  ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
826
21208
  if (!port->data_) {
827
1
    return;
828
  }
829
21207
  port->Start();
830
}
831
832
20133
void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
833
  MessagePort* port;
834
40266
  CHECK(args[0]->IsObject());
835
40356
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
836
20045
  if (!port->data_) {
837
2
    return;
838
  }
839
20043
  port->Stop();
840
}
841
842
422
void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
843
  MessagePort* port;
844
1266
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
845
114
  port->OnMessage();
846
}
847
848
6
void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) {
849
12
  CHECK(args[0]->IsObject());
850
12
  MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
851
6
  if (port == nullptr) {
852
    // Return 'no messages' for a closed port.
853
    args.GetReturnValue().Set(
854
        Environment::GetCurrent(args)->no_message_symbol());
855
6
    return;
856
  }
857
858
  MaybeLocal<Value> payload =
859
12
      port->ReceiveMessage(port->object()->CreationContext(), false);
860
6
  if (!payload.IsEmpty())
861
12
    args.GetReturnValue().Set(payload.ToLocalChecked());
862
}
863
864
1
void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
865
1
  Environment* env = Environment::GetCurrent(args);
866


6
  if (!args[0]->IsObject() ||
867
3
      !env->message_port_constructor_template()->HasInstance(args[0])) {
868
    return THROW_ERR_INVALID_ARG_TYPE(env,
869
        "First argument needs to be a MessagePort instance");
870
  }
871
2
  MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
872
1
  CHECK_NOT_NULL(port);
873
874
1
  Local<Value> context_arg = args[1];
875
  ContextifyContext* context_wrapper;
876

4
  if (!context_arg->IsObject() ||
877
      (context_wrapper = ContextifyContext::ContextFromContextifiedSandbox(
878
3
          env, context_arg.As<Object>())) == nullptr) {
879
    return THROW_ERR_INVALID_ARG_TYPE(env, "Invalid context argument");
880
  }
881
882
1
  std::unique_ptr<MessagePortData> data;
883
1
  if (!port->IsDetached())
884
1
    data = port->Detach();
885
886
1
  Context::Scope context_scope(context_wrapper->context());
887
  MessagePort* target =
888
1
      MessagePort::New(env, context_wrapper->context(), std::move(data));
889
1
  if (target != nullptr)
890
4
    args.GetReturnValue().Set(target->object());
891
}
892
893
10255
void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
894
10255
  Entangle(a, b->data_.get());
895
10255
}
896
897
10469
void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
898
10469
  MessagePortData::Entangle(a->data_.get(), b);
899
10469
}
900
901
38271
Local<FunctionTemplate> GetMessagePortConstructorTemplate(Environment* env) {
902
  // Factor generating the MessagePort JS constructor into its own piece
903
  // of code, because it is needed early on in the child environment setup.
904
38271
  Local<FunctionTemplate> templ = env->message_port_constructor_template();
905
38271
  if (!templ.IsEmpty())
906
34699
    return templ;
907
908
3572
  Isolate* isolate = env->isolate();
909
910
  {
911
3572
    Local<FunctionTemplate> m = env->NewFunctionTemplate(MessagePort::New);
912
7144
    m->SetClassName(env->message_port_constructor_string());
913
7144
    m->InstanceTemplate()->SetInternalFieldCount(1);
914
7144
    m->Inherit(HandleWrap::GetConstructorTemplate(env));
915
916
3572
    env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage);
917
3572
    env->SetProtoMethod(m, "start", MessagePort::Start);
918
919
3572
    env->set_message_port_constructor_template(m);
920
921
3572
    Local<FunctionTemplate> event_ctor = FunctionTemplate::New(isolate);
922
7144
    event_ctor->SetClassName(FIXED_ONE_BYTE_STRING(isolate, "MessageEvent"));
923
3572
    Local<ObjectTemplate> e = event_ctor->InstanceTemplate();
924
7144
    e->Set(env->data_string(), Null(isolate));
925
7144
    e->Set(env->target_string(), Null(isolate));
926
3572
    env->set_message_event_object_template(e);
927
  }
928
929
3572
  return GetMessagePortConstructorTemplate(env);
930
}
931
932
namespace {
933
934
10256
static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
935
10256
  Environment* env = Environment::GetCurrent(args);
936
10256
  if (!args.IsConstructCall()) {
937
1
    THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
938
10257
    return;
939
  }
940
941
20510
  Local<Context> context = args.This()->CreationContext();
942
  Context::Scope context_scope(context);
943
944
10255
  MessagePort* port1 = MessagePort::New(env, context);
945
10255
  MessagePort* port2 = MessagePort::New(env, context);
946
10255
  MessagePort::Entangle(port1, port2);
947
948
51275
  args.This()->Set(context, env->port1_string(), port1->object())
949
20510
      .Check();
950
51275
  args.This()->Set(context, env->port2_string(), port2->object())
951
20510
      .Check();
952
}
953
954
3572
static void InitMessaging(Local<Object> target,
955
                          Local<Value> unused,
956
                          Local<Context> context,
957
                          void* priv) {
958
3572
  Environment* env = Environment::GetCurrent(context);
959
960
  {
961
    Local<String> message_channel_string =
962
3572
        FIXED_ONE_BYTE_STRING(env->isolate(), "MessageChannel");
963
3572
    Local<FunctionTemplate> templ = env->NewFunctionTemplate(MessageChannel);
964
3572
    templ->SetClassName(message_channel_string);
965
    target->Set(context,
966
                message_channel_string,
967
10716
                templ->GetFunction(context).ToLocalChecked()).Check();
968
  }
969
970
  target->Set(context,
971
              env->message_port_constructor_string(),
972
              GetMessagePortConstructorTemplate(env)
973
17860
                  ->GetFunction(context).ToLocalChecked()).Check();
974
975
  // These are not methods on the MessagePort prototype, because
976
  // the browser equivalents do not provide them.
977
3572
  env->SetMethod(target, "stopMessagePort", MessagePort::Stop);
978
3572
  env->SetMethod(target, "drainMessagePort", MessagePort::Drain);
979
3572
  env->SetMethod(target, "receiveMessageOnPort", MessagePort::ReceiveMessage);
980
  env->SetMethod(target, "moveMessagePortToContext",
981
3572
                 MessagePort::MoveToContext);
982
983
  {
984
7144
    Local<Function> domexception = GetDOMException(context).ToLocalChecked();
985
    target
986
        ->Set(context,
987
              FIXED_ONE_BYTE_STRING(env->isolate(), "DOMException"),
988
10716
              domexception)
989
7144
        .Check();
990
  }
991
3572
}
992
993
}  // anonymous namespace
994
995
}  // namespace worker
996
}  // namespace node
997
998
4947
NODE_MODULE_CONTEXT_AWARE_INTERNAL(messaging, node::worker::InitMessaging)