GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/node_messaging.cc Lines: 477 508 93.9 %
Date: 2019-10-08 22:34:21 Branches: 249 346 72.0 %

Line Branch Exec Source
1
#include "node_messaging.h"
2
3
#include "async_wrap-inl.h"
4
#include "debug_utils.h"
5
#include "memory_tracker-inl.h"
6
#include "node_contextify.h"
7
#include "node_buffer.h"
8
#include "node_errors.h"
9
#include "node_process.h"
10
#include "util-inl.h"
11
12
using node::contextify::ContextifyContext;
13
using v8::Array;
14
using v8::ArrayBuffer;
15
using v8::ArrayBufferCreationMode;
16
using v8::Context;
17
using v8::EscapableHandleScope;
18
using v8::Exception;
19
using v8::Function;
20
using v8::FunctionCallbackInfo;
21
using v8::FunctionTemplate;
22
using v8::Global;
23
using v8::HandleScope;
24
using v8::Isolate;
25
using v8::Just;
26
using v8::Local;
27
using v8::Maybe;
28
using v8::MaybeLocal;
29
using v8::Nothing;
30
using v8::Object;
31
using v8::ObjectTemplate;
32
using v8::SharedArrayBuffer;
33
using v8::String;
34
using v8::Symbol;
35
using v8::Value;
36
using v8::ValueDeserializer;
37
using v8::ValueSerializer;
38
using v8::WasmModuleObject;
39
40
namespace node {
41
namespace worker {
42
43
179280
Message::Message(MallocedBuffer<char>&& buffer)
44
179280
    : main_message_buf_(std::move(buffer)) {}
45
46
105603
bool Message::IsCloseMessage() const {
47
105603
  return main_message_buf_.data == nullptr;
48
}
49
50
namespace {
51
52
// This is used to tell V8 how to read transferred host objects, like other
53
// `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
54
42561
class DeserializerDelegate : public ValueDeserializer::Delegate {
55
 public:
56
42539
  DeserializerDelegate(
57
      Message* m,
58
      Environment* env,
59
      const std::vector<MessagePort*>& message_ports,
60
      const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
61
      const std::vector<WasmModuleObject::TransferrableModule>& wasm_modules)
62
      : message_ports_(message_ports),
63
        shared_array_buffers_(shared_array_buffers),
64
42539
        wasm_modules_(wasm_modules) {}
65
66
10207
  MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
67
    // Currently, only MessagePort hosts objects are supported, so identifying
68
    // by the index in the message's MessagePort array is sufficient.
69
    uint32_t id;
70
10207
    if (!deserializer->ReadUint32(&id))
71
      return MaybeLocal<Object>();
72
10207
    CHECK_LE(id, message_ports_.size());
73
20414
    return message_ports_[id]->object(isolate);
74
  }
75
76
218
  MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
77
      Isolate* isolate, uint32_t clone_id) override {
78
218
    CHECK_LE(clone_id, shared_array_buffers_.size());
79
436
    return shared_array_buffers_[clone_id];
80
  }
81
82
2
  MaybeLocal<WasmModuleObject> GetWasmModuleFromId(
83
      Isolate* isolate, uint32_t transfer_id) override {
84
2
    CHECK_LE(transfer_id, wasm_modules_.size());
85
    return WasmModuleObject::FromTransferrableModule(
86
2
        isolate, wasm_modules_[transfer_id]);
87
  }
88
89
  ValueDeserializer* deserializer = nullptr;
90
91
 private:
92
  const std::vector<MessagePort*>& message_ports_;
93
  const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
94
  const std::vector<WasmModuleObject::TransferrableModule>& wasm_modules_;
95
};
96
97
}  // anonymous namespace
98
99
42587
MaybeLocal<Value> Message::Deserialize(Environment* env,
100
                                       Local<Context> context) {
101
42587
  CHECK(!IsCloseMessage());
102
103
42596
  EscapableHandleScope handle_scope(env->isolate());
104
  Context::Scope context_scope(context);
105
106
  // Create all necessary MessagePort handles.
107
85179
  std::vector<MessagePort*> ports(message_ports_.size());
108
52808
  for (uint32_t i = 0; i < message_ports_.size(); ++i) {
109
10207
    ports[i] = MessagePort::New(env,
110
                                context,
111
10207
                                std::move(message_ports_[i]));
112
10207
    if (ports[i] == nullptr) {
113
      for (MessagePort* port : ports) {
114
        // This will eventually release the MessagePort object itself.
115
        if (port != nullptr)
116
          port->Close();
117
      }
118
      return MaybeLocal<Value>();
119
    }
120
  }
121
42584
  message_ports_.clear();
122
123
85160
  std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
124
  // Attach all transferred SharedArrayBuffers to their new Isolate.
125
42820
  for (uint32_t i = 0; i < shared_array_buffers_.size(); ++i) {
126
    Local<SharedArrayBuffer> sab;
127
654
    if (!shared_array_buffers_[i]->GetSharedArrayBuffer(env, context)
128
654
            .ToLocal(&sab))
129
      return MaybeLocal<Value>();
130
218
    shared_array_buffers.push_back(sab);
131
  }
132
42530
  shared_array_buffers_.clear();
133
134
  DeserializerDelegate delegate(
135
85122
      this, env, ports, shared_array_buffers, wasm_modules_);
136
  ValueDeserializer deserializer(
137
      env->isolate(),
138
      reinterpret_cast<const uint8_t*>(main_message_buf_.data),
139
      main_message_buf_.size,
140
85163
      &delegate);
141
42601
  delegate.deserializer = &deserializer;
142
143
  // Attach all transferred ArrayBuffers to their new Isolate.
144
42607
  for (uint32_t i = 0; i < array_buffer_contents_.size(); ++i) {
145
6
    if (!env->isolate_data()->uses_node_allocator()) {
146
      // We don't use Node's allocator on the receiving side, so we have
147
      // to create the ArrayBuffer from a copy of the memory.
148
      AllocatedBuffer buf =
149
          env->AllocateManaged(array_buffer_contents_[i].size);
150
      memcpy(buf.data(),
151
             array_buffer_contents_[i].data,
152
             array_buffer_contents_[i].size);
153
      deserializer.TransferArrayBuffer(i, buf.ToArrayBuffer());
154
      continue;
155
    }
156
157
6
    env->isolate_data()->node_allocator()->RegisterPointer(
158
6
        array_buffer_contents_[i].data, array_buffer_contents_[i].size);
159
160
    Local<ArrayBuffer> ab =
161
        ArrayBuffer::New(env->isolate(),
162
6
                         array_buffer_contents_[i].release(),
163
6
                         array_buffer_contents_[i].size,
164
12
                         ArrayBufferCreationMode::kInternalized);
165
6
    deserializer.TransferArrayBuffer(i, ab);
166
  }
167
42602
  array_buffer_contents_.clear();
168
169
85181
  if (deserializer.ReadHeader(context).IsNothing())
170
    return MaybeLocal<Value>();
171
  return handle_scope.Escape(
172
85163
      deserializer.ReadValue(context).FromMaybe(Local<Value>()));
173
}
174
175
232
void Message::AddSharedArrayBuffer(
176
    const SharedArrayBufferMetadataReference& reference) {
177
232
  shared_array_buffers_.push_back(reference);
178
232
}
179
180
10231
void Message::AddMessagePort(std::unique_ptr<MessagePortData>&& data) {
181
10231
  message_ports_.emplace_back(std::move(data));
182
10231
}
183
184
2
uint32_t Message::AddWASMModule(WasmModuleObject::TransferrableModule&& mod) {
185
2
  wasm_modules_.emplace_back(std::move(mod));
186
2
  return wasm_modules_.size() - 1;
187
}
188
189
namespace {
190
191
3630
MaybeLocal<Function> GetDOMException(Local<Context> context) {
192
3630
  Isolate* isolate = context->GetIsolate();
193
  Local<Object> per_context_bindings;
194
  Local<Value> domexception_ctor_val;
195


18150
  if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
196
      !per_context_bindings->Get(context,
197
10890
                                FIXED_ONE_BYTE_STRING(isolate, "DOMException"))
198
14520
          .ToLocal(&domexception_ctor_val)) {
199
    return MaybeLocal<Function>();
200
  }
201
3630
  CHECK(domexception_ctor_val->IsFunction());
202
3630
  Local<Function> domexception_ctor = domexception_ctor_val.As<Function>();
203
3630
  return domexception_ctor;
204
}
205
206
14
void ThrowDataCloneException(Local<Context> context, Local<String> message) {
207
14
  Isolate* isolate = context->GetIsolate();
208
  Local<Value> argv[] = {message,
209
42
                         FIXED_ONE_BYTE_STRING(isolate, "DataCloneError")};
210
  Local<Value> exception;
211
  Local<Function> domexception_ctor;
212


70
  if (!GetDOMException(context).ToLocal(&domexception_ctor) ||
213
28
      !domexception_ctor->NewInstance(context, arraysize(argv), argv)
214
42
           .ToLocal(&exception)) {
215
14
    return;
216
  }
217
14
  isolate->ThrowException(exception);
218
}
219
220
// This tells V8 how to serialize objects that it does not understand
221
// (e.g. C++ objects) into the output buffer, in a way that our own
222
// DeserializerDelegate understands how to unpack.
223
42735
class SerializerDelegate : public ValueSerializer::Delegate {
224
 public:
225
42735
  SerializerDelegate(Environment* env, Local<Context> context, Message* m)
226
42735
      : env_(env), context_(context), msg_(m) {}
227
228
4
  void ThrowDataCloneError(Local<String> message) override {
229
4
    ThrowDataCloneException(context_, message);
230
4
  }
231
232
10231
  Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
233
20462
    if (env_->message_port_constructor_template()->HasInstance(object)) {
234
10230
      return WriteMessagePort(Unwrap<MessagePort>(object));
235
    }
236
237
1
    ThrowDataCloneError(env_->clone_unsupported_type_str());
238
1
    return Nothing<bool>();
239
  }
240
241
232
  Maybe<uint32_t> GetSharedArrayBufferId(
242
      Isolate* isolate,
243
      Local<SharedArrayBuffer> shared_array_buffer) override {
244
    uint32_t i;
245
242
    for (i = 0; i < seen_shared_array_buffers_.size(); ++i) {
246
20
      if (PersistentToLocal::Strong(seen_shared_array_buffers_[i]) ==
247
          shared_array_buffer) {
248
        return Just(i);
249
      }
250
    }
251
252
    auto reference = SharedArrayBufferMetadata::ForSharedArrayBuffer(
253
        env_,
254
        context_,
255
232
        shared_array_buffer);
256
232
    if (!reference) {
257
      return Nothing<uint32_t>();
258
    }
259
    seen_shared_array_buffers_.emplace_back(
260
464
      Global<SharedArrayBuffer> { isolate, shared_array_buffer });
261
232
    msg_->AddSharedArrayBuffer(reference);
262
232
    return Just(i);
263
  }
264
265
2
  Maybe<uint32_t> GetWasmModuleTransferId(
266
      Isolate* isolate, Local<WasmModuleObject> module) override {
267
2
    return Just(msg_->AddWASMModule(module->GetTransferrableModule()));
268
  }
269
270
42721
  void Finish() {
271
    // Only close the MessagePort handles and actually transfer them
272
    // once we know that serialization succeeded.
273
52952
    for (MessagePort* port : ports_) {
274
20462
      port->Close();
275
10231
      msg_->AddMessagePort(port->Detach());
276
    }
277
42721
  }
278
279
  ValueSerializer* serializer = nullptr;
280
281
 private:
282
10230
  Maybe<bool> WriteMessagePort(MessagePort* port) {
283
10230
    for (uint32_t i = 0; i < ports_.size(); i++) {
284
10230
      if (ports_[i] == port) {
285
10230
        serializer->WriteUint32(i);
286
10230
        return Just(true);
287
      }
288
    }
289
290
    THROW_ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST(env_);
291
    return Nothing<bool>();
292
  }
293
294
  Environment* env_;
295
  Local<Context> context_;
296
  Message* msg_;
297
  std::vector<Global<SharedArrayBuffer>> seen_shared_array_buffers_;
298
  std::vector<MessagePort*> ports_;
299
300
  friend class worker::Message;
301
};
302
303
}  // anonymous namespace
304
305
42735
Maybe<bool> Message::Serialize(Environment* env,
306
                               Local<Context> context,
307
                               Local<Value> input,
308
                               const TransferList& transfer_list_v,
309
                               Local<Object> source_port) {
310
42735
  HandleScope handle_scope(env->isolate());
311
  Context::Scope context_scope(context);
312
313
  // Verify that we're not silently overwriting an existing message.
314
42735
  CHECK(main_message_buf_.is_empty());
315
316
85470
  SerializerDelegate delegate(env, context, this);
317
85470
  ValueSerializer serializer(env->isolate(), &delegate);
318
42735
  delegate.serializer = &serializer;
319
320
85470
  std::vector<Local<ArrayBuffer>> array_buffers;
321
52987
  for (uint32_t i = 0; i < transfer_list_v.length(); ++i) {
322
10262
    Local<Value> entry = transfer_list_v[i];
323
    // Currently, we support ArrayBuffers and MessagePorts.
324
10262
    if (entry->IsArrayBuffer()) {
325
20
      Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
326
      // If we cannot render the ArrayBuffer unusable in this Isolate and
327
      // take ownership of its memory, copying the buffer will have to do.
328


60
      if (!ab->IsDetachable() || ab->IsExternal() ||
329
20
          !env->isolate_data()->uses_node_allocator()) {
330
19
        continue;
331
      }
332
20
      if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
333
          array_buffers.end()) {
334
        ThrowDataCloneException(
335
            context,
336
            FIXED_ONE_BYTE_STRING(
337
                env->isolate(),
338
1
                "Transfer list contains duplicate ArrayBuffer"));
339
1
        return Nothing<bool>();
340
      }
341
      // We simply use the array index in the `array_buffers` list as the
342
      // ID that we write into the serialized buffer.
343
19
      uint32_t id = array_buffers.size();
344
19
      array_buffers.push_back(ab);
345
19
      serializer.TransferArrayBuffer(id, ab);
346
19
      continue;
347
20484
    } else if (env->message_port_constructor_template()
348
30726
                  ->HasInstance(entry)) {
349
      // Check if the source MessagePort is being transferred.
350

20484
      if (!source_port.IsEmpty() && entry == source_port) {
351
        ThrowDataCloneException(
352
            context,
353
            FIXED_ONE_BYTE_STRING(env->isolate(),
354
1
                                  "Transfer list contains source port"));
355
10
        return Nothing<bool>();
356
      }
357
10241
      MessagePort* port = Unwrap<MessagePort>(entry.As<Object>());
358

10241
      if (port == nullptr || port->IsDetached()) {
359
        ThrowDataCloneException(
360
            context,
361
            FIXED_ONE_BYTE_STRING(
362
                env->isolate(),
363
7
                "MessagePort in transfer list is already detached"));
364
7
        return Nothing<bool>();
365
      }
366
10234
      if (std::find(delegate.ports_.begin(), delegate.ports_.end(), port) !=
367
          delegate.ports_.end()) {
368
        ThrowDataCloneException(
369
            context,
370
            FIXED_ONE_BYTE_STRING(
371
                env->isolate(),
372
1
                "Transfer list contains duplicate MessagePort"));
373
1
        return Nothing<bool>();
374
      }
375
10233
      delegate.ports_.push_back(port);
376
10233
      continue;
377
    }
378
379
    THROW_ERR_INVALID_TRANSFER_OBJECT(env);
380
    return Nothing<bool>();
381
  }
382
383
42725
  serializer.WriteHeader();
384
85450
  if (serializer.WriteValue(context, input).IsNothing()) {
385
4
    return Nothing<bool>();
386
  }
387
388
42732
  for (Local<ArrayBuffer> ab : array_buffers) {
389
    // If serialization succeeded, we want to take ownership of
390
    // (a.k.a. externalize) the underlying memory region and render
391
    // it inaccessible in this Isolate.
392
11
    ArrayBuffer::Contents contents = ab->Externalize();
393
11
    ab->Detach();
394
395
11
    CHECK(env->isolate_data()->uses_node_allocator());
396
11
    env->isolate_data()->node_allocator()->UnregisterPointer(
397
11
        contents.Data(), contents.ByteLength());
398
399
    array_buffer_contents_.emplace_back(MallocedBuffer<char>{
400
11
        static_cast<char*>(contents.Data()), contents.ByteLength()});
401
  }
402
403
42721
  delegate.Finish();
404
405
  // The serializer gave us a buffer allocated using `malloc()`.
406
42721
  std::pair<uint8_t*, size_t> data = serializer.Release();
407
42721
  CHECK_NOT_NULL(data.first);
408
85442
  main_message_buf_ =
409
42721
      MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
410
85456
  return Just(true);
411
}
412
413
4
void Message::MemoryInfo(MemoryTracker* tracker) const {
414
4
  tracker->TrackField("array_buffer_contents", array_buffer_contents_);
415
  tracker->TrackFieldWithSize("shared_array_buffers",
416
4
      shared_array_buffers_.size() * sizeof(shared_array_buffers_[0]));
417
4
  tracker->TrackField("message_ports", message_ports_);
418
4
}
419
420
31382
MessagePortData::MessagePortData(MessagePort* owner) : owner_(owner) { }
421
422
93984
MessagePortData::~MessagePortData() {
423
31326
  CHECK_NULL(owner_);
424
31326
  Disentangle();
425
62656
}
426
427
8
void MessagePortData::MemoryInfo(MemoryTracker* tracker) const {
428
8
  Mutex::ScopedLock lock(mutex_);
429
8
  tracker->TrackField("incoming_messages", incoming_messages_);
430
8
}
431
432
105413
void MessagePortData::AddToIncomingQueue(Message&& message) {
433
  // This function will be called by other threads.
434
105413
  Mutex::ScopedLock lock(mutex_);
435
105417
  incoming_messages_.emplace_back(std::move(message));
436
437
105414
  if (owner_ != nullptr) {
438
52513
    Debug(owner_, "Adding message to incoming queue");
439
52513
    owner_->TriggerAsync();
440
105414
  }
441
105417
}
442
443
10484
void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
444
10484
  CHECK_NULL(a->sibling_);
445
10484
  CHECK_NULL(b->sibling_);
446
10484
  a->sibling_ = b;
447
10484
  b->sibling_ = a;
448
10484
  a->sibling_mutex_ = b->sibling_mutex_;
449
10484
}
450
451
52209
void MessagePortData::Disentangle() {
452
  // Grab a copy of the sibling mutex, then replace it so that each sibling
453
  // has its own sibling_mutex_ now.
454
52209
  std::shared_ptr<Mutex> sibling_mutex = sibling_mutex_;
455
104418
  Mutex::ScopedLock sibling_lock(*sibling_mutex);
456
52217
  sibling_mutex_ = std::make_shared<Mutex>();
457
458
52216
  MessagePortData* sibling = sibling_;
459
52216
  if (sibling_ != nullptr) {
460
10480
    sibling_->sibling_ = nullptr;
461
10480
    sibling_ = nullptr;
462
  }
463
464
  // We close MessagePorts after disentanglement, so we enqueue a corresponding
465
  // message and trigger the corresponding uv_async_t to let them know that
466
  // this happened.
467
52216
  AddToIncomingQueue(Message());
468
52206
  if (sibling != nullptr) {
469
10480
    sibling->AddToIncomingQueue(Message());
470
52214
  }
471
52215
}
472
473
93351
MessagePort::~MessagePort() {
474
31117
  if (data_)
475
    data_->owner_ = nullptr;
476
62234
}
477
478
31163
MessagePort::MessagePort(Environment* env,
479
                         Local<Context> context,
480
                         Local<Object> wrap)
481
  : HandleWrap(env,
482
               wrap,
483
               reinterpret_cast<uv_handle_t*>(&async_),
484
               AsyncWrap::PROVIDER_MESSAGEPORT),
485
31163
    data_(new MessagePortData(this)) {
486
93621
  auto onmessage = [](uv_async_t* handle) {
487
    // Called when data has been put into the queue.
488
31229
    MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
489
31229
    channel->OnMessage();
490
93616
  };
491
31163
  CHECK_EQ(uv_async_init(env->event_loop(),
492
                         &async_,
493
                         onmessage), 0);
494
31163
  async_.data = static_cast<void*>(this);
495
496
  Local<Value> fn;
497
93489
  if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn))
498
31163
    return;
499
500
31163
  if (fn->IsFunction()) {
501
31162
    Local<Function> init = fn.As<Function>();
502
31162
    USE(init->Call(context, wrap, 0, nullptr));
503
  }
504
505
31163
  Debug(this, "Created message port");
506
}
507
508
10241
bool MessagePort::IsDetached() const {
509

10241
  return data_ == nullptr || IsHandleClosing();
510
}
511
512
73403
void MessagePort::TriggerAsync() {
513
146806
  if (IsHandleClosing()) return;
514
73390
  CHECK_EQ(uv_async_send(&async_), 0);
515
}
516
517
31366
void MessagePort::Close(v8::Local<v8::Value> close_callback) {
518
62732
  Debug(this, "Closing message port, data set = %d", static_cast<int>(!!data_));
519
520
31366
  if (data_) {
521
    // Wrap this call with accessing the mutex, so that TriggerAsync()
522
    // can check IsHandleClosing() without race conditions.
523
31365
    Mutex::ScopedLock sibling_lock(data_->mutex_);
524
31365
    HandleWrap::Close(close_callback);
525
  } else {
526
1
    HandleWrap::Close(close_callback);
527
  }
528
31366
}
529
530
2
void MessagePort::New(const FunctionCallbackInfo<Value>& args) {
531
  // This constructor just throws an error. Unfortunately, we can’t use V8’s
532
  // ConstructorBehavior::kThrow, as that also removes the prototype from the
533
  // class (i.e. makes it behave like an arrow function).
534
2
  Environment* env = Environment::GetCurrent(args);
535
2
  THROW_ERR_CONSTRUCT_CALL_INVALID(env);
536
2
}
537
538
31163
MessagePort* MessagePort::New(
539
    Environment* env,
540
    Local<Context> context,
541
    std::unique_ptr<MessagePortData> data) {
542
  Context::Scope context_scope(context);
543
31163
  Local<FunctionTemplate> ctor_templ = GetMessagePortConstructorTemplate(env);
544
545
  // Construct a new instance, then assign the listener instance and possibly
546
  // the MessagePortData to it.
547
  Local<Object> instance;
548
93489
  if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance))
549
    return nullptr;
550
31163
  MessagePort* port = new MessagePort(env, context, instance);
551
31163
  CHECK_NOT_NULL(port);
552
31163
  if (data) {
553
10414
    port->Detach();
554
10414
    port->data_ = std::move(data);
555
556
    // This lock is here to avoid race conditions with the `owner_` read
557
    // in AddToIncomingQueue(). (This would likely be unproblematic without it,
558
    // but it's better to be safe than sorry.)
559
10414
    Mutex::ScopedLock lock(port->data_->mutex_);
560
10414
    port->data_->owner_ = port;
561
    // If the existing MessagePortData object had pending messages, this is
562
    // the easiest way to run that queue.
563
10414
    port->TriggerAsync();
564
  }
565
31163
  return port;
566
}
567
568
73843
MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
569
                                              bool only_if_receiving) {
570
73843
  Message received;
571
  {
572
    // Get the head of the message queue.
573
73845
    Mutex::ScopedLock lock(data_->mutex_);
574
575
73889
    Debug(this, "MessagePort has message");
576
577

73890
    bool wants_message = receiving_messages_ || !only_if_receiving;
578
    // We have nothing to do if:
579
    // - There are no pending messages
580
    // - We are not intending to receive messages, and the message we would
581
    //   receive is not the final "close" message.
582

147784
    if (data_->incoming_messages_.empty() ||
583
63095
        (!wants_message &&
584
10026
         !data_->incoming_messages_.front().IsCloseMessage())) {
585
41651
      return env()->no_message_symbol();
586
    }
587
588
53060
    received = std::move(data_->incoming_messages_.front());
589
53002
    data_->incoming_messages_.pop_front();
590
  }
591
592
53061
  if (received.IsCloseMessage()) {
593
20912
    Close();
594
20912
    return env()->no_message_symbol();
595
  }
596
597
42594
  if (!env()->can_call_into_js()) return MaybeLocal<Value>();
598
599
42599
  return received.Deserialize(env(), context);
600
}
601
602
31316
void MessagePort::OnMessage() {
603
31316
  Debug(this, "Running MessagePort::OnMessage()");
604
31316
  HandleScope handle_scope(env()->isolate());
605
62631
  Local<Context> context = object(env()->isolate())->CreationContext();
606
607
  size_t processing_limit;
608
  {
609
31316
    Mutex::ScopedLock(data_->mutex_);
610
31316
    processing_limit = std::max(data_->incoming_messages_.size(),
611
62632
                                static_cast<size_t>(1000));
612
  }
613
614
  // data_ can only ever be modified by the owner thread, so no need to lock.
615
  // However, the message port may be transferred while it is processing
616
  // messages, so we need to check that this handle still owns its `data_` field
617
  // on every iteration.
618

105203
  while (data_) {
619
73858
    if (processing_limit-- == 0) {
620
      // Prevent event loop starvation by only processing those messages without
621
      // interruption that were already present when the OnMessage() call was
622
      // first triggered, but at least 1000 messages because otherwise the
623
      // overhead of repeatedly triggering the uv_async_t instance becomes
624
      // noticable, at least on Windows.
625
      // (That might require more investigation by somebody more familiar with
626
      // Windows.)
627
2
      TriggerAsync();
628
1
      return;
629
    }
630
631
73856
    HandleScope handle_scope(env()->isolate());
632

42571
    Context::Scope context_scope(context);
633
634
    Local<Value> payload;
635
147718
    if (!ReceiveMessage(context, true).ToLocal(&payload)) break;
636
147747
    if (payload == env()->no_message_symbol()) break;
637
638
42596
    if (!env()->can_call_into_js()) {
639
      Debug(this, "MessagePort drains queue because !can_call_into_js()");
640
      // In this case there is nothing to do but to drain the current queue.
641
      continue;
642
    }
643
644
    Local<Object> event;
645
85131
    Local<Value> cb_args[1];
646

170286
    if (!env()->message_event_object_template()->NewInstance(context)
647

254935
            .ToLocal(&event) ||
648

297970
        event->Set(context, env()->data_string(), payload).IsNothing() ||
649


297836
        event->Set(context, env()->target_string(), object()).IsNothing() ||
650

212304
        (cb_args[0] = event, false) ||
651
        MakeCallback(env()->onmessage_string(),
652
42589
                     arraysize(cb_args),
653
127705
                     cb_args).IsEmpty()) {
654
      // Re-schedule OnMessage() execution in case of failure.
655
29
      if (data_)
656
29
        TriggerAsync();
657
29
      return;
658
    }
659
73854
  }
660
}
661
662
31116
void MessagePort::OnClose() {
663
31116
  Debug(this, "MessagePort::OnClose()");
664
31117
  if (data_) {
665
20885
    data_->owner_ = nullptr;
666
20885
    data_->Disentangle();
667
  }
668
31117
  data_.reset();
669
31117
}
670
671
20646
std::unique_ptr<MessagePortData> MessagePort::Detach() {
672
20646
  CHECK(data_);
673
20646
  Mutex::ScopedLock lock(data_->mutex_);
674
20646
  data_->owner_ = nullptr;
675
20646
  return std::move(data_);
676
}
677
678
679
42734
Maybe<bool> MessagePort::PostMessage(Environment* env,
680
                                     Local<Value> message_v,
681
                                     const TransferList& transfer_v) {
682
42734
  Isolate* isolate = env->isolate();
683
42734
  Local<Object> obj = object(isolate);
684
42734
  Local<Context> context = obj->CreationContext();
685
686
42734
  Message msg;
687
688
  // Per spec, we need to both check if transfer list has the source port, and
689
  // serialize the input message, even if the MessagePort is closed or detached.
690
691
  Maybe<bool> serialization_maybe =
692
42734
      msg.Serialize(env, context, message_v, transfer_v, obj);
693
42734
  if (data_ == nullptr) {
694
2
    return serialization_maybe;
695
  }
696
42732
  if (serialization_maybe.IsNothing()) {
697
11
    return Nothing<bool>();
698
  }
699
700
85442
  Mutex::ScopedLock lock(*data_->sibling_mutex_);
701
42721
  bool doomed = false;
702
703
  // Check if the target port is posted to itself.
704
42721
  if (data_->sibling_ != nullptr) {
705
52951
    for (const auto& port_data : msg.message_ports()) {
706
10231
      if (data_->sibling_ == port_data.get()) {
707
1
        doomed = true;
708
        ProcessEmitWarning(env, "The target port was posted to itself, and "
709
1
                                "the communication channel was lost");
710
1
        break;
711
      }
712
    }
713
  }
714
715

42721
  if (data_->sibling_ == nullptr || doomed)
716
1
    return Just(true);
717
718
42720
  data_->sibling_->AddToIncomingQueue(std::move(msg));
719
85454
  return Just(true);
720
}
721
722
10275
static Maybe<bool> ReadIterable(Environment* env,
723
                                Local<Context> context,
724
                                // NOLINTNEXTLINE(runtime/references)
725
                                TransferList& transfer_list,
726
                                Local<Value> object) {
727
10275
  if (!object->IsObject()) return Just(false);
728
729
10272
  if (object->IsArray()) {
730
10252
    Local<Array> arr = object.As<Array>();
731
10252
    size_t length = arr->Length();
732
10252
    transfer_list.AllocateSufficientStorage(length);
733
20512
    for (size_t i = 0; i < length; i++) {
734
30780
      if (!arr->Get(context, i).ToLocal(&transfer_list[i]))
735
        return Nothing<bool>();
736
    }
737
10252
    return Just(true);
738
  }
739
740
20
  Isolate* isolate = env->isolate();
741
  Local<Value> iterator_method;
742
100
  if (!object.As<Object>()->Get(context, Symbol::GetIterator(isolate))
743
60
      .ToLocal(&iterator_method)) return Nothing<bool>();
744
20
  if (!iterator_method->IsFunction()) return Just(false);
745
746
  Local<Value> iterator;
747
24
  if (!iterator_method.As<Function>()->Call(context, object, 0, nullptr)
748
18
      .ToLocal(&iterator)) return Nothing<bool>();
749
6
  if (!iterator->IsObject()) return Just(false);
750
751
  Local<Value> next;
752
24
  if (!iterator.As<Object>()->Get(context, env->next_string()).ToLocal(&next))
753
    return Nothing<bool>();
754
6
  if (!next->IsFunction()) return Just(false);
755
756
3
  std::vector<Local<Value>> entries;
757
3
  while (env->can_call_into_js()) {
758
    Local<Value> result;
759
20
    if (!next.As<Function>()->Call(context, iterator, 0, nullptr)
760
17
        .ToLocal(&result)) return Nothing<bool>();
761
4
    if (!result->IsObject()) return Just(false);
762
763
    Local<Value> done;
764
16
    if (!result.As<Object>()->Get(context, env->done_string()).ToLocal(&done))
765
      return Nothing<bool>();
766
4
    if (done->BooleanValue(isolate)) break;
767
768
    Local<Value> val;
769
8
    if (!result.As<Object>()->Get(context, env->value_string()).ToLocal(&val))
770
      return Nothing<bool>();
771
2
    entries.push_back(val);
772
  }
773
774
2
  transfer_list.AllocateSufficientStorage(entries.size());
775
2
  std::copy(entries.begin(), entries.end(), &transfer_list[0]);
776
2
  return Just(true);
777
}
778
779
42747
void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
780
42747
  Environment* env = Environment::GetCurrent(args);
781
42747
  Local<Object> obj = args.This();
782
42747
  Local<Context> context = obj->CreationContext();
783
784
42747
  if (args.Length() == 0) {
785
    return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
786
13
                                       "MessagePort.postMessage");
787
  }
788
789


191524
  if (!args[1]->IsNullOrUndefined() && !args[1]->IsObject()) {
790
    // Browsers ignore null or undefined, and otherwise accept an array or an
791
    // options object.
792
    return THROW_ERR_INVALID_ARG_TYPE(env,
793
4
        "Optional transferList argument must be an iterable");
794
  }
795
796
42743
  TransferList transfer_list;
797
85486
  if (args[1]->IsObject()) {
798
    bool was_iterable;
799
20528
    if (!ReadIterable(env, context, transfer_list, args[1]).To(&was_iterable))
800
8
      return;
801
10264
    if (!was_iterable) {
802
      Local<Value> transfer_option;
803
78
      if (!args[1].As<Object>()->Get(context, env->transfer_string())
804
47
          .ToLocal(&transfer_option)) return;
805
26
      if (!transfer_option->IsUndefined()) {
806
22
        if (!ReadIterable(env, context, transfer_list, transfer_option)
807
34
            .To(&was_iterable)) return;
808
10
        if (!was_iterable) {
809
          return THROW_ERR_INVALID_ARG_TYPE(env,
810
7
              "Optional options.transfer argument must be an iterable");
811
        }
812
      }
813
    }
814
  }
815
816
42735
  MessagePort* port = Unwrap<MessagePort>(args.This());
817
  // Even if the backing MessagePort object has already been deleted, we still
818
  // want to serialize the message to ensure spec-compliant behavior w.r.t.
819
  // transfers.
820
42735
  if (port == nullptr) {
821
1
    Message msg;
822
1
    USE(msg.Serialize(env, context, args[0], transfer_list, obj));
823
1
    return;
824
  }
825
826
42734
  port->PostMessage(env, args[0], transfer_list);
827
}
828
829
21244
void MessagePort::Start() {
830
21244
  Debug(this, "Start receiving messages");
831
21244
  receiving_messages_ = true;
832
21244
  Mutex::ScopedLock lock(data_->mutex_);
833
21244
  if (!data_->incoming_messages_.empty())
834
10446
    TriggerAsync();
835
21244
}
836
837
20041
void MessagePort::Stop() {
838
20041
  Debug(this, "Stop receiving messages");
839
20041
  receiving_messages_ = false;
840
20041
}
841
842
21245
void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
843
  MessagePort* port;
844
21246
  ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
845
21245
  if (!port->data_) {
846
1
    return;
847
  }
848
21244
  port->Start();
849
}
850
851
20137
void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
852
  MessagePort* port;
853
40274
  CHECK(args[0]->IsObject());
854
40370
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
855
20043
  if (!port->data_) {
856
2
    return;
857
  }
858
20041
  port->Stop();
859
}
860
861
432
void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
862
  MessagePort* port;
863
1296
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
864
87
  port->OnMessage();
865
}
866
867
6
void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) {
868
12
  CHECK(args[0]->IsObject());
869
12
  MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
870
6
  if (port == nullptr) {
871
    // Return 'no messages' for a closed port.
872
    args.GetReturnValue().Set(
873
        Environment::GetCurrent(args)->no_message_symbol());
874
6
    return;
875
  }
876
877
  MaybeLocal<Value> payload =
878
12
      port->ReceiveMessage(port->object()->CreationContext(), false);
879
6
  if (!payload.IsEmpty())
880
12
    args.GetReturnValue().Set(payload.ToLocalChecked());
881
}
882
883
1
void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
884
1
  Environment* env = Environment::GetCurrent(args);
885


6
  if (!args[0]->IsObject() ||
886
3
      !env->message_port_constructor_template()->HasInstance(args[0])) {
887
    return THROW_ERR_INVALID_ARG_TYPE(env,
888
        "First argument needs to be a MessagePort instance");
889
  }
890
2
  MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
891
1
  CHECK_NOT_NULL(port);
892
893
1
  Local<Value> context_arg = args[1];
894
  ContextifyContext* context_wrapper;
895

4
  if (!context_arg->IsObject() ||
896
      (context_wrapper = ContextifyContext::ContextFromContextifiedSandbox(
897
3
          env, context_arg.As<Object>())) == nullptr) {
898
    return THROW_ERR_INVALID_ARG_TYPE(env, "Invalid context argument");
899
  }
900
901
1
  std::unique_ptr<MessagePortData> data;
902
1
  if (!port->IsDetached())
903
1
    data = port->Detach();
904
905
1
  Context::Scope context_scope(context_wrapper->context());
906
  MessagePort* target =
907
1
      MessagePort::New(env, context_wrapper->context(), std::move(data));
908
1
  if (target != nullptr)
909
4
    args.GetReturnValue().Set(target->object());
910
}
911
912
10265
void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
913
10265
  Entangle(a, b->data_.get());
914
10265
}
915
916
10484
void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
917
10484
  MessagePortData::Entangle(a->data_.get(), b);
918
10484
}
919
920
38395
Local<FunctionTemplate> GetMessagePortConstructorTemplate(Environment* env) {
921
  // Factor generating the MessagePort JS constructor into its own piece
922
  // of code, because it is needed early on in the child environment setup.
923
38395
  Local<FunctionTemplate> templ = env->message_port_constructor_template();
924
38395
  if (!templ.IsEmpty())
925
34779
    return templ;
926
927
3616
  Isolate* isolate = env->isolate();
928
929
  {
930
3616
    Local<FunctionTemplate> m = env->NewFunctionTemplate(MessagePort::New);
931
7232
    m->SetClassName(env->message_port_constructor_string());
932
7232
    m->InstanceTemplate()->SetInternalFieldCount(1);
933
7232
    m->Inherit(HandleWrap::GetConstructorTemplate(env));
934
935
3616
    env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage);
936
3616
    env->SetProtoMethod(m, "start", MessagePort::Start);
937
938
3616
    env->set_message_port_constructor_template(m);
939
940
3616
    Local<FunctionTemplate> event_ctor = FunctionTemplate::New(isolate);
941
7232
    event_ctor->SetClassName(FIXED_ONE_BYTE_STRING(isolate, "MessageEvent"));
942
3616
    Local<ObjectTemplate> e = event_ctor->InstanceTemplate();
943
7232
    e->Set(env->data_string(), Null(isolate));
944
7232
    e->Set(env->target_string(), Null(isolate));
945
3616
    env->set_message_event_object_template(e);
946
  }
947
948
3616
  return GetMessagePortConstructorTemplate(env);
949
}
950
951
namespace {
952
953
10266
static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
954
10266
  Environment* env = Environment::GetCurrent(args);
955
10266
  if (!args.IsConstructCall()) {
956
1
    THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
957
10267
    return;
958
  }
959
960
20530
  Local<Context> context = args.This()->CreationContext();
961
  Context::Scope context_scope(context);
962
963
10265
  MessagePort* port1 = MessagePort::New(env, context);
964
10265
  MessagePort* port2 = MessagePort::New(env, context);
965
10265
  MessagePort::Entangle(port1, port2);
966
967
51325
  args.This()->Set(context, env->port1_string(), port1->object())
968
20530
      .Check();
969
51325
  args.This()->Set(context, env->port2_string(), port2->object())
970
20530
      .Check();
971
}
972
973
3616
static void InitMessaging(Local<Object> target,
974
                          Local<Value> unused,
975
                          Local<Context> context,
976
                          void* priv) {
977
3616
  Environment* env = Environment::GetCurrent(context);
978
979
  {
980
    Local<String> message_channel_string =
981
3616
        FIXED_ONE_BYTE_STRING(env->isolate(), "MessageChannel");
982
3616
    Local<FunctionTemplate> templ = env->NewFunctionTemplate(MessageChannel);
983
3616
    templ->SetClassName(message_channel_string);
984
    target->Set(context,
985
                message_channel_string,
986
10848
                templ->GetFunction(context).ToLocalChecked()).Check();
987
  }
988
989
  target->Set(context,
990
              env->message_port_constructor_string(),
991
              GetMessagePortConstructorTemplate(env)
992
18080
                  ->GetFunction(context).ToLocalChecked()).Check();
993
994
  // These are not methods on the MessagePort prototype, because
995
  // the browser equivalents do not provide them.
996
3616
  env->SetMethod(target, "stopMessagePort", MessagePort::Stop);
997
3616
  env->SetMethod(target, "drainMessagePort", MessagePort::Drain);
998
3616
  env->SetMethod(target, "receiveMessageOnPort", MessagePort::ReceiveMessage);
999
  env->SetMethod(target, "moveMessagePortToContext",
1000
3616
                 MessagePort::MoveToContext);
1001
1002
  {
1003
7232
    Local<Function> domexception = GetDOMException(context).ToLocalChecked();
1004
    target
1005
        ->Set(context,
1006
              FIXED_ONE_BYTE_STRING(env->isolate(), "DOMException"),
1007
10848
              domexception)
1008
7232
        .Check();
1009
  }
1010
3616
}
1011
1012
}  // anonymous namespace
1013
1014
}  // namespace worker
1015
}  // namespace node
1016
1017
4981
NODE_MODULE_CONTEXT_AWARE_INTERNAL(messaging, node::worker::InitMessaging)