GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/node_messaging.cc Lines: 473 512 92.4 %
Date: 2020-05-27 22:15:15 Branches: 229 304 75.3 %

Line Branch Exec Source
1
#include "node_messaging.h"
2
3
#include "async_wrap-inl.h"
4
#include "debug_utils-inl.h"
5
#include "memory_tracker-inl.h"
6
#include "node_contextify.h"
7
#include "node_buffer.h"
8
#include "node_errors.h"
9
#include "node_process.h"
10
#include "util-inl.h"
11
12
using node::contextify::ContextifyContext;
13
using v8::Array;
14
using v8::ArrayBuffer;
15
using v8::BackingStore;
16
using v8::CompiledWasmModule;
17
using v8::Context;
18
using v8::EscapableHandleScope;
19
using v8::Exception;
20
using v8::Function;
21
using v8::FunctionCallbackInfo;
22
using v8::FunctionTemplate;
23
using v8::Global;
24
using v8::HandleScope;
25
using v8::Isolate;
26
using v8::Just;
27
using v8::Local;
28
using v8::Maybe;
29
using v8::MaybeLocal;
30
using v8::Nothing;
31
using v8::Object;
32
using v8::SharedArrayBuffer;
33
using v8::String;
34
using v8::Symbol;
35
using v8::Value;
36
using v8::ValueDeserializer;
37
using v8::ValueSerializer;
38
using v8::WasmModuleObject;
39
40
namespace node {
41
namespace worker {
42
43
180691
Message::Message(MallocedBuffer<char>&& buffer)
44
180691
    : main_message_buf_(std::move(buffer)) {}
45
46
104477
bool Message::IsCloseMessage() const {
47
104477
  return main_message_buf_.data == nullptr;
48
}
49
50
namespace {
51
52
// This is used to tell V8 how to read transferred host objects, like other
53
// `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
54
41172
class DeserializerDelegate : public ValueDeserializer::Delegate {
55
 public:
56
41471
  DeserializerDelegate(
57
      Message* m,
58
      Environment* env,
59
      const std::vector<MessagePort*>& message_ports,
60
      const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
61
      const std::vector<CompiledWasmModule>& wasm_modules)
62
41471
      : message_ports_(message_ports),
63
        shared_array_buffers_(shared_array_buffers),
64
41471
        wasm_modules_(wasm_modules) {}
65
66
10302
  MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
67
    // Currently, only MessagePort hosts objects are supported, so identifying
68
    // by the index in the message's MessagePort array is sufficient.
69
    uint32_t id;
70
10302
    if (!deserializer->ReadUint32(&id))
71
      return MaybeLocal<Object>();
72
10302
    CHECK_LE(id, message_ports_.size());
73
20604
    return message_ports_[id]->object(isolate);
74
  }
75
76
320
  MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
77
      Isolate* isolate, uint32_t clone_id) override {
78
320
    CHECK_LE(clone_id, shared_array_buffers_.size());
79
640
    return shared_array_buffers_[clone_id];
80
  }
81
82
2
  MaybeLocal<WasmModuleObject> GetWasmModuleFromId(
83
      Isolate* isolate, uint32_t transfer_id) override {
84
2
    CHECK_LE(transfer_id, wasm_modules_.size());
85
    return WasmModuleObject::FromCompiledModule(
86
2
        isolate, wasm_modules_[transfer_id]);
87
  }
88
89
  ValueDeserializer* deserializer = nullptr;
90
91
 private:
92
  const std::vector<MessagePort*>& message_ports_;
93
  const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
94
  const std::vector<CompiledWasmModule>& wasm_modules_;
95
};
96
97
}  // anonymous namespace
98
99
41467
MaybeLocal<Value> Message::Deserialize(Environment* env,
100
                                       Local<Context> context) {
101
41467
  CHECK(!IsCloseMessage());
102
103
41585
  EscapableHandleScope handle_scope(env->isolate());
104
  Context::Scope context_scope(context);
105
106
  // Create all necessary MessagePort handles.
107
83009
  std::vector<MessagePort*> ports(message_ports_.size());
108
51924
  for (uint32_t i = 0; i < message_ports_.size(); ++i) {
109
20604
    ports[i] = MessagePort::New(env,
110
                                context,
111
10302
                                std::move(message_ports_[i]));
112
10302
    if (ports[i] == nullptr) {
113
      for (MessagePort* port : ports) {
114
        // This will eventually release the MessagePort object itself.
115
        if (port != nullptr)
116
          port->Close();
117
      }
118
      return MaybeLocal<Value>();
119
    }
120
  }
121
41606
  message_ports_.clear();
122
123
83177
  std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
124
  // Attach all transferred SharedArrayBuffers to their new Isolate.
125
41992
  for (uint32_t i = 0; i < shared_array_buffers_.size(); ++i) {
126
    Local<SharedArrayBuffer> sab =
127
        SharedArrayBuffer::New(env->isolate(),
128
320
                               std::move(shared_array_buffers_[i]));
129
320
    shared_array_buffers.push_back(sab);
130
  }
131
41373
  shared_array_buffers_.clear();
132
133
  DeserializerDelegate delegate(
134
83173
      this, env, ports, shared_array_buffers, wasm_modules_);
135
  ValueDeserializer deserializer(
136
      env->isolate(),
137
41646
      reinterpret_cast<const uint8_t*>(main_message_buf_.data),
138
      main_message_buf_.size,
139
124861
      &delegate);
140
41663
  delegate.deserializer = &deserializer;
141
142
  // Attach all transferred ArrayBuffers to their new Isolate.
143
41671
  for (uint32_t i = 0; i < array_buffers_.size(); ++i) {
144
    Local<ArrayBuffer> ab =
145
8
        ArrayBuffer::New(env->isolate(), std::move(array_buffers_[i]));
146
8
    deserializer.TransferArrayBuffer(i, ab);
147
  }
148
41672
  array_buffers_.clear();
149
150
83042
  if (deserializer.ReadHeader(context).IsNothing())
151
    return MaybeLocal<Value>();
152
  return handle_scope.Escape(
153
82935
      deserializer.ReadValue(context).FromMaybe(Local<Value>()));
154
}
155
156
573
void Message::AddSharedArrayBuffer(
157
    std::shared_ptr<BackingStore> backing_store) {
158
573
  shared_array_buffers_.emplace_back(std::move(backing_store));
159
573
}
160
161
10560
void Message::AddMessagePort(std::unique_ptr<MessagePortData>&& data) {
162
10560
  message_ports_.emplace_back(std::move(data));
163
10560
}
164
165
2
uint32_t Message::AddWASMModule(CompiledWasmModule&& mod) {
166
2
  wasm_modules_.emplace_back(std::move(mod));
167
2
  return wasm_modules_.size() - 1;
168
}
169
170
namespace {
171
172
32351
MaybeLocal<Function> GetEmitMessageFunction(Local<Context> context) {
173
32351
  Isolate* isolate = context->GetIsolate();
174
  Local<Object> per_context_bindings;
175
  Local<Value> emit_message_val;
176

129404
  if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
177
97053
      !per_context_bindings->Get(context,
178
97053
                                FIXED_ONE_BYTE_STRING(isolate, "emitMessage"))
179
32351
          .ToLocal(&emit_message_val)) {
180
    return MaybeLocal<Function>();
181
  }
182
32351
  CHECK(emit_message_val->IsFunction());
183
32351
  return emit_message_val.As<Function>();
184
}
185
186
3883
MaybeLocal<Function> GetDOMException(Local<Context> context) {
187
3883
  Isolate* isolate = context->GetIsolate();
188
  Local<Object> per_context_bindings;
189
  Local<Value> domexception_ctor_val;
190

15532
  if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
191
11649
      !per_context_bindings->Get(context,
192
11649
                                FIXED_ONE_BYTE_STRING(isolate, "DOMException"))
193
3883
          .ToLocal(&domexception_ctor_val)) {
194
    return MaybeLocal<Function>();
195
  }
196
3883
  CHECK(domexception_ctor_val->IsFunction());
197
3883
  Local<Function> domexception_ctor = domexception_ctor_val.As<Function>();
198
3883
  return domexception_ctor;
199
}
200
201
14
void ThrowDataCloneException(Local<Context> context, Local<String> message) {
202
14
  Isolate* isolate = context->GetIsolate();
203
  Local<Value> argv[] = {message,
204
42
                         FIXED_ONE_BYTE_STRING(isolate, "DataCloneError")};
205
  Local<Value> exception;
206
  Local<Function> domexception_ctor;
207

56
  if (!GetDOMException(context).ToLocal(&domexception_ctor) ||
208
56
      !domexception_ctor->NewInstance(context, arraysize(argv), argv)
209
14
           .ToLocal(&exception)) {
210
    return;
211
  }
212
14
  isolate->ThrowException(exception);
213
}
214
215
// This tells V8 how to serialize objects that it does not understand
216
// (e.g. C++ objects) into the output buffer, in a way that our own
217
// DeserializerDelegate understands how to unpack.
218
42548
class SerializerDelegate : public ValueSerializer::Delegate {
219
 public:
220
42548
  SerializerDelegate(Environment* env, Local<Context> context, Message* m)
221
42548
      : env_(env), context_(context), msg_(m) {}
222
223
4
  void ThrowDataCloneError(Local<String> message) override {
224
4
    ThrowDataCloneException(context_, message);
225
4
  }
226
227
10561
  Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
228
21122
    if (env_->message_port_constructor_template()->HasInstance(object)) {
229
10560
      return WriteMessagePort(Unwrap<MessagePort>(object));
230
    }
231
232
1
    ThrowDataCloneError(env_->clone_unsupported_type_str());
233
1
    return Nothing<bool>();
234
  }
235
236
573
  Maybe<uint32_t> GetSharedArrayBufferId(
237
      Isolate* isolate,
238
      Local<SharedArrayBuffer> shared_array_buffer) override {
239
    uint32_t i;
240
597
    for (i = 0; i < seen_shared_array_buffers_.size(); ++i) {
241
48
      if (PersistentToLocal::Strong(seen_shared_array_buffers_[i]) ==
242
          shared_array_buffer) {
243
        return Just(i);
244
      }
245
    }
246
247
573
    seen_shared_array_buffers_.emplace_back(
248
1146
      Global<SharedArrayBuffer> { isolate, shared_array_buffer });
249
573
    msg_->AddSharedArrayBuffer(shared_array_buffer->GetBackingStore());
250
573
    return Just(i);
251
  }
252
253
2
  Maybe<uint32_t> GetWasmModuleTransferId(
254
      Isolate* isolate, Local<WasmModuleObject> module) override {
255
2
    return Just(msg_->AddWASMModule(module->GetCompiledModule()));
256
  }
257
258
42533
  void Finish() {
259
    // Only close the MessagePort handles and actually transfer them
260
    // once we know that serialization succeeded.
261
53093
    for (MessagePort* port : ports_) {
262
21120
      port->Close();
263
10560
      msg_->AddMessagePort(port->Detach());
264
    }
265
42534
  }
266
267
  ValueSerializer* serializer = nullptr;
268
269
 private:
270
10560
  Maybe<bool> WriteMessagePort(MessagePort* port) {
271
10565
    for (uint32_t i = 0; i < ports_.size(); i++) {
272
10564
      if (ports_[i] == port) {
273
10559
        serializer->WriteUint32(i);
274
10559
        return Just(true);
275
      }
276
    }
277
278
1
    THROW_ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST(env_);
279
1
    return Nothing<bool>();
280
  }
281
282
  Environment* env_;
283
  Local<Context> context_;
284
  Message* msg_;
285
  std::vector<Global<SharedArrayBuffer>> seen_shared_array_buffers_;
286
  std::vector<MessagePort*> ports_;
287
288
  friend class worker::Message;
289
};
290
291
}  // anonymous namespace
292
293
42548
Maybe<bool> Message::Serialize(Environment* env,
294
                               Local<Context> context,
295
                               Local<Value> input,
296
                               const TransferList& transfer_list_v,
297
                               Local<Object> source_port) {
298
85097
  HandleScope handle_scope(env->isolate());
299
  Context::Scope context_scope(context);
300
301
  // Verify that we're not silently overwriting an existing message.
302
42549
  CHECK(main_message_buf_.is_empty());
303
304
85098
  SerializerDelegate delegate(env, context, this);
305
85097
  ValueSerializer serializer(env->isolate(), &delegate);
306
42548
  delegate.serializer = &serializer;
307
308
85097
  std::vector<Local<ArrayBuffer>> array_buffers;
309
53136
  for (uint32_t i = 0; i < transfer_list_v.length(); ++i) {
310
10597
    Local<Value> entry = transfer_list_v[i];
311
    // Currently, we support ArrayBuffers and MessagePorts.
312
10597
    if (entry->IsArrayBuffer()) {
313
25
      Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
314
      // If we cannot render the ArrayBuffer unusable in this Isolate,
315
      // copying the buffer will have to do.
316
      // Note that we can currently transfer ArrayBuffers even if they were
317
      // not allocated by Node’s ArrayBufferAllocator in the first place,
318
      // because we pass the underlying v8::BackingStore around rather than
319
      // raw data *and* an Isolate with a non-default ArrayBuffer allocator
320
      // is always going to outlive any Workers it creates, and so will its
321
      // allocator along with it.
322
49
      if (!ab->IsDetachable()) continue;
323
      // See https://github.com/nodejs/node/pull/30339#issuecomment-552225353
324
      // for details.
325
      bool untransferrable;
326
50
      if (!ab->HasPrivate(
327
              context,
328
25
              env->arraybuffer_untransferable_private_symbol())
329
25
              .To(&untransferrable)) {
330
1
        return Nothing<bool>();
331
      }
332
25
      if (untransferrable) continue;
333
44
      if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
334
44
          array_buffers.end()) {
335
1
        ThrowDataCloneException(
336
            context,
337
            FIXED_ONE_BYTE_STRING(
338
                env->isolate(),
339
1
                "Transfer list contains duplicate ArrayBuffer"));
340
1
        return Nothing<bool>();
341
      }
342
      // We simply use the array index in the `array_buffers` list as the
343
      // ID that we write into the serialized buffer.
344
21
      uint32_t id = array_buffers.size();
345
21
      array_buffers.push_back(ab);
346
21
      serializer.TransferArrayBuffer(id, ab);
347
21
      continue;
348
21144
    } else if (env->message_port_constructor_template()
349
10572
                  ->HasInstance(entry)) {
350
      // Check if the source MessagePort is being transferred.
351

21144
      if (!source_port.IsEmpty() && entry == source_port) {
352
1
        ThrowDataCloneException(
353
            context,
354
            FIXED_ONE_BYTE_STRING(env->isolate(),
355
1
                                  "Transfer list contains source port"));
356
10
        return Nothing<bool>();
357
      }
358
10571
      MessagePort* port = Unwrap<MessagePort>(entry.As<Object>());
359

10571
      if (port == nullptr || port->IsDetached()) {
360
7
        ThrowDataCloneException(
361
            context,
362
            FIXED_ONE_BYTE_STRING(
363
                env->isolate(),
364
7
                "MessagePort in transfer list is already detached"));
365
7
        return Nothing<bool>();
366
      }
367
21128
      if (std::find(delegate.ports_.begin(), delegate.ports_.end(), port) !=
368
21128
          delegate.ports_.end()) {
369
1
        ThrowDataCloneException(
370
            context,
371
            FIXED_ONE_BYTE_STRING(
372
                env->isolate(),
373
1
                "Transfer list contains duplicate MessagePort"));
374
1
        return Nothing<bool>();
375
      }
376
10563
      delegate.ports_.push_back(port);
377
10563
      continue;
378
    }
379
380
    THROW_ERR_INVALID_TRANSFER_OBJECT(env);
381
    return Nothing<bool>();
382
  }
383
384
42538
  serializer.WriteHeader();
385
85076
  if (serializer.WriteValue(context, input).IsNothing()) {
386
5
    return Nothing<bool>();
387
  }
388
389
42546
  for (Local<ArrayBuffer> ab : array_buffers) {
390
    // If serialization succeeded, we render it inaccessible in this Isolate.
391
26
    std::shared_ptr<BackingStore> backing_store = ab->GetBackingStore();
392
13
    ab->Detach();
393
394
13
    array_buffers_.emplace_back(std::move(backing_store));
395
  }
396
397
42534
  delegate.Finish();
398
399
  // The serializer gave us a buffer allocated using `malloc()`.
400
42533
  std::pair<uint8_t*, size_t> data = serializer.Release();
401
42533
  CHECK_NOT_NULL(data.first);
402
  main_message_buf_ =
403
42533
      MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
404
42534
  return Just(true);
405
}
406
407
void Message::MemoryInfo(MemoryTracker* tracker) const {
408
  tracker->TrackField("array_buffers_", array_buffers_);
409
  tracker->TrackField("shared_array_buffers", shared_array_buffers_);
410
  tracker->TrackField("message_ports", message_ports_);
411
}
412
413
32897
MessagePortData::MessagePortData(MessagePort* owner) : owner_(owner) { }
414
415
98559
MessagePortData::~MessagePortData() {
416
32853
  CHECK_NULL(owner_);
417
32853
  Disentangle();
418
65706
}
419
420
void MessagePortData::MemoryInfo(MemoryTracker* tracker) const {
421
  Mutex::ScopedLock lock(mutex_);
422
  tracker->TrackField("incoming_messages", incoming_messages_);
423
}
424
425
108264
void MessagePortData::AddToIncomingQueue(Message&& message) {
426
  // This function will be called by other threads.
427
216527
  Mutex::ScopedLock lock(mutex_);
428
108267
  incoming_messages_.emplace_back(std::move(message));
429
430
108262
  if (owner_ != nullptr) {
431
52055
    Debug(owner_, "Adding message to incoming queue");
432
52055
    owner_->TriggerAsync();
433
  }
434
108268
}
435
436
11147
void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
437
11147
  CHECK_NULL(a->sibling_);
438
11147
  CHECK_NULL(b->sibling_);
439
11147
  a->sibling_ = b;
440
11147
  b->sibling_ = a;
441
11147
  a->sibling_mutex_ = b->sibling_mutex_;
442
11147
}
443
444
54598
void MessagePortData::Disentangle() {
445
  // Grab a copy of the sibling mutex, then replace it so that each sibling
446
  // has its own sibling_mutex_ now.
447
109195
  std::shared_ptr<Mutex> sibling_mutex = sibling_mutex_;
448
109195
  Mutex::ScopedLock sibling_lock(*sibling_mutex);
449
54599
  sibling_mutex_ = std::make_shared<Mutex>();
450
451
54599
  MessagePortData* sibling = sibling_;
452
54599
  if (sibling_ != nullptr) {
453
11147
    sibling_->sibling_ = nullptr;
454
11147
    sibling_ = nullptr;
455
  }
456
457
  // We close MessagePorts after disentanglement, so we enqueue a corresponding
458
  // message and trigger the corresponding uv_async_t to let them know that
459
  // this happened.
460
54599
  AddToIncomingQueue(Message());
461
54597
  if (sibling != nullptr) {
462
11147
    sibling->AddToIncomingQueue(Message());
463
  }
464
54599
}
465
466
129228
MessagePort::~MessagePort() {
467
32307
  if (data_) Detach();
468
64614
}
469
470
32351
MessagePort::MessagePort(Environment* env,
471
                         Local<Context> context,
472
32351
                         Local<Object> wrap)
473
  : HandleWrap(env,
474
               wrap,
475
32351
               reinterpret_cast<uv_handle_t*>(&async_),
476
               AsyncWrap::PROVIDER_MESSAGEPORT),
477
32351
    data_(new MessagePortData(this)) {
478
92910
  auto onmessage = [](uv_async_t* handle) {
479
    // Called when data has been put into the queue.
480
30279
    MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
481
30280
    channel->OnMessage();
482
92904
  };
483
484
32351
  CHECK_EQ(uv_async_init(env->event_loop(),
485
                         &async_,
486
                         onmessage), 0);
487
  // Reset later to indicate success of the constructor.
488
32351
  bool succeeded = false;
489
97053
  auto cleanup = OnScopeLeave([&]() { if (!succeeded) Close(); });
490
491
  Local<Value> fn;
492
97053
  if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn))
493
    return;
494
495
32351
  if (fn->IsFunction()) {
496
32350
    Local<Function> init = fn.As<Function>();
497
64700
    if (init->Call(context, wrap, 0, nullptr).IsEmpty())
498
      return;
499
  }
500
501
  Local<Function> emit_message_fn;
502
64702
  if (!GetEmitMessageFunction(context).ToLocal(&emit_message_fn))
503
    return;
504
32351
  emit_message_fn_.Reset(env->isolate(), emit_message_fn);
505
506
32351
  succeeded = true;
507
32351
  Debug(this, "Created message port");
508
}
509
510
10571
bool MessagePort::IsDetached() const {
511

10571
  return data_ == nullptr || IsHandleClosing();
512
}
513
514
73304
void MessagePort::TriggerAsync() {
515
73304
  if (IsHandleClosing()) return;
516
73284
  CHECK_EQ(uv_async_send(&async_), 0);
517
}
518
519
32307
void MessagePort::Close(v8::Local<v8::Value> close_callback) {
520
64615
  Debug(this, "Closing message port, data set = %d", static_cast<int>(!!data_));
521
522
32308
  if (data_) {
523
    // Wrap this call with accessing the mutex, so that TriggerAsync()
524
    // can check IsHandleClosing() without race conditions.
525
64611
    Mutex::ScopedLock sibling_lock(data_->mutex_);
526
32307
    HandleWrap::Close(close_callback);
527
  } else {
528
1
    HandleWrap::Close(close_callback);
529
  }
530
32307
}
531
532
2
void MessagePort::New(const FunctionCallbackInfo<Value>& args) {
533
  // This constructor just throws an error. Unfortunately, we can’t use V8’s
534
  // ConstructorBehavior::kThrow, as that also removes the prototype from the
535
  // class (i.e. makes it behave like an arrow function).
536
2
  Environment* env = Environment::GetCurrent(args);
537
2
  THROW_ERR_CONSTRUCT_CALL_INVALID(env);
538
2
}
539
540
32351
MessagePort* MessagePort::New(
541
    Environment* env,
542
    Local<Context> context,
543
    std::unique_ptr<MessagePortData> data) {
544
  Context::Scope context_scope(context);
545
32351
  Local<FunctionTemplate> ctor_templ = GetMessagePortConstructorTemplate(env);
546
547
  // Construct a new instance, then assign the listener instance and possibly
548
  // the MessagePortData to it.
549
  Local<Object> instance;
550
97053
  if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance))
551
    return nullptr;
552
32351
  MessagePort* port = new MessagePort(env, context, instance);
553
32351
  CHECK_NOT_NULL(port);
554
32351
  if (port->IsHandleClosing()) {
555
    // Construction failed with an exception.
556
    return nullptr;
557
  }
558
559
32351
  if (data) {
560
10603
    port->Detach();
561
10603
    port->data_ = std::move(data);
562
563
    // This lock is here to avoid race conditions with the `owner_` read
564
    // in AddToIncomingQueue(). (This would likely be unproblematic without it,
565
    // but it's better to be safe than sorry.)
566
21206
    Mutex::ScopedLock lock(port->data_->mutex_);
567
10603
    port->data_->owner_ = port;
568
    // If the existing MessagePortData object had pending messages, this is
569
    // the easiest way to run that queue.
570
10603
    port->TriggerAsync();
571
  }
572
32351
  return port;
573
}
574
575
72387
MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
576
                                              bool only_if_receiving) {
577
144787
  Message received;
578
  {
579
    // Get the head of the message queue.
580
125149
    Mutex::ScopedLock lock(data_->mutex_);
581
582
72421
    Debug(this, "MessagePort has message");
583
584

72415
    bool wants_message = receiving_messages_ || !only_if_receiving;
585
    // We have nothing to do if:
586
    // - There are no pending messages
587
    // - We are not intending to receive messages, and the message we would
588
    //   receive is not the final "close" message.
589

144838
    if (data_->incoming_messages_.empty() ||
590
63006
        (!wants_message &&
591
10260
         !data_->incoming_messages_.front().IsCloseMessage())) {
592
39345
      return env()->no_message_symbol();
593
    }
594
595
52731
    received = std::move(data_->incoming_messages_.front());
596
52706
    data_->incoming_messages_.pop_front();
597
  }
598
599
52746
  if (received.IsCloseMessage()) {
600
22138
    Close();
601
22138
    return env()->no_message_symbol();
602
  }
603
604
41590
  if (!env()->can_call_into_js()) return MaybeLocal<Value>();
605
606
41667
  return received.Deserialize(env(), context);
607
}
608
609
30787
void MessagePort::OnMessage() {
610
30787
  Debug(this, "Running MessagePort::OnMessage()");
611
61527
  HandleScope handle_scope(env()->isolate());
612
61576
  Local<Context> context = object(env()->isolate())->CreationContext();
613
614
  size_t processing_limit;
615
  {
616
30788
    Mutex::ScopedLock(data_->mutex_);
617
61575
    processing_limit = std::max(data_->incoming_messages_.size(),
618
92362
                                static_cast<size_t>(1000));
619
  }
620
621
  // data_ can only ever be modified by the owner thread, so no need to lock.
622
  // However, the message port may be transferred while it is processing
623
  // messages, so we need to check that this handle still owns its `data_` field
624
  // on every iteration.
625

114049
  while (data_) {
626
72400
    if (processing_limit-- == 0) {
627
      // Prevent event loop starvation by only processing those messages without
628
      // interruption that were already present when the OnMessage() call was
629
      // first triggered, but at least 1000 messages because otherwise the
630
      // overhead of repeatedly triggering the uv_async_t instance becomes
631
      // noticable, at least on Windows.
632
      // (That might require more investigation by somebody more familiar with
633
      // Windows.)
634
1
      TriggerAsync();
635
1
      return;
636
    }
637
638
114030
    HandleScope handle_scope(env()->isolate());
639

41631
    Context::Scope context_scope(context);
640
641
    Local<Value> payload;
642
144674
    if (!ReceiveMessage(context, true).ToLocal(&payload)) break;
643
144694
    if (payload == env()->no_message_symbol()) break;
644
645
41665
    if (!env()->can_call_into_js()) {
646
      Debug(this, "MessagePort drains queue because !can_call_into_js()");
647
      // In this case there is nothing to do but to drain the current queue.
648
      continue;
649
    }
650
651
41492
    Local<Function> emit_message = PersistentToLocal::Strong(emit_message_fn_);
652
83311
    if (MakeCallback(emit_message, 1, &payload).IsEmpty()) {
653
      // Re-schedule OnMessage() execution in case of failure.
654
43
      if (data_)
655
43
        TriggerAsync();
656
43
      return;
657
    }
658
  }
659
}
660
661
32306
void MessagePort::OnClose() {
662
32306
  Debug(this, "MessagePort::OnClose()");
663
32305
  if (data_) {
664
    // Detach() returns move(data_).
665
21745
    Detach()->Disentangle();
666
  }
667
32307
}
668
669
42906
std::unique_ptr<MessagePortData> MessagePort::Detach() {
670
42906
  CHECK(data_);
671
85818
  Mutex::ScopedLock lock(data_->mutex_);
672
42910
  data_->owner_ = nullptr;
673
85819
  return std::move(data_);
674
}
675
676
677
42546
Maybe<bool> MessagePort::PostMessage(Environment* env,
678
                                     Local<Value> message_v,
679
                                     const TransferList& transfer_v) {
680
42546
  Isolate* isolate = env->isolate();
681
42547
  Local<Object> obj = object(isolate);
682
42548
  Local<Context> context = obj->CreationContext();
683
684
85095
  Message msg;
685
686
  // Per spec, we need to both check if transfer list has the source port, and
687
  // serialize the input message, even if the MessagePort is closed or detached.
688
689
  Maybe<bool> serialization_maybe =
690
42548
      msg.Serialize(env, context, message_v, transfer_v, obj);
691
42547
  if (data_ == nullptr) {
692
2
    return serialization_maybe;
693
  }
694
42546
  if (serialization_maybe.IsNothing()) {
695
12
    return Nothing<bool>();
696
  }
697
698
85068
  Mutex::ScopedLock lock(*data_->sibling_mutex_);
699
42534
  bool doomed = false;
700
701
  // Check if the target port is posted to itself.
702
42534
  if (data_->sibling_ != nullptr) {
703
53081
    for (const auto& port_data : msg.message_ports()) {
704
10560
      if (data_->sibling_ == port_data.get()) {
705
1
        doomed = true;
706
        ProcessEmitWarning(env, "The target port was posted to itself, and "
707
1
                                "the communication channel was lost");
708
1
        break;
709
      }
710
    }
711
  }
712
713

42535
  if (data_->sibling_ == nullptr || doomed)
714
12
    return Just(true);
715
716
42522
  data_->sibling_->AddToIncomingQueue(std::move(msg));
717
42522
  return Just(true);
718
}
719
720
10607
static Maybe<bool> ReadIterable(Environment* env,
721
                                Local<Context> context,
722
                                // NOLINTNEXTLINE(runtime/references)
723
                                TransferList& transfer_list,
724
                                Local<Value> object) {
725
10607
  if (!object->IsObject()) return Just(false);
726
727
10604
  if (object->IsArray()) {
728
10584
    Local<Array> arr = object.As<Array>();
729
10584
    size_t length = arr->Length();
730
10584
    transfer_list.AllocateSufficientStorage(length);
731
21179
    for (size_t i = 0; i < length; i++) {
732
31785
      if (!arr->Get(context, i).ToLocal(&transfer_list[i]))
733
        return Nothing<bool>();
734
    }
735
10584
    return Just(true);
736
  }
737
738
20
  Isolate* isolate = env->isolate();
739
  Local<Value> iterator_method;
740
80
  if (!object.As<Object>()->Get(context, Symbol::GetIterator(isolate))
741
20
      .ToLocal(&iterator_method)) return Nothing<bool>();
742
20
  if (!iterator_method->IsFunction()) return Just(false);
743
744
  Local<Value> iterator;
745
18
  if (!iterator_method.As<Function>()->Call(context, object, 0, nullptr)
746
6
      .ToLocal(&iterator)) return Nothing<bool>();
747
6
  if (!iterator->IsObject()) return Just(false);
748
749
  Local<Value> next;
750
24
  if (!iterator.As<Object>()->Get(context, env->next_string()).ToLocal(&next))
751
    return Nothing<bool>();
752
6
  if (!next->IsFunction()) return Just(false);
753
754
6
  std::vector<Local<Value>> entries;
755
7
  while (env->can_call_into_js()) {
756
    Local<Value> result;
757
15
    if (!next.As<Function>()->Call(context, iterator, 0, nullptr)
758
7
        .ToLocal(&result)) return Nothing<bool>();
759
4
    if (!result->IsObject()) return Just(false);
760
761
    Local<Value> done;
762
16
    if (!result.As<Object>()->Get(context, env->done_string()).ToLocal(&done))
763
      return Nothing<bool>();
764
4
    if (done->BooleanValue(isolate)) break;
765
766
    Local<Value> val;
767
8
    if (!result.As<Object>()->Get(context, env->value_string()).ToLocal(&val))
768
      return Nothing<bool>();
769
2
    entries.push_back(val);
770
  }
771
772
2
  transfer_list.AllocateSufficientStorage(entries.size());
773
2
  std::copy(entries.begin(), entries.end(), &transfer_list[0]);
774
2
  return Just(true);
775
}
776
777
42561
void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
778
42561
  Environment* env = Environment::GetCurrent(args);
779
42561
  Local<Object> obj = args.This();
780
42561
  Local<Context> context = obj->CreationContext();
781
782
42560
  if (args.Length() == 0) {
783
    return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
784
13
                                       "MessagePort.postMessage");
785
  }
786
787

148880
  if (!args[1]->IsNullOrUndefined() && !args[1]->IsObject()) {
788
    // Browsers ignore null or undefined, and otherwise accept an array or an
789
    // options object.
790
    return THROW_ERR_INVALID_ARG_TYPE(env,
791
4
        "Optional transferList argument must be an iterable");
792
  }
793
794
85103
  TransferList transfer_list;
795
85112
  if (args[1]->IsObject()) {
796
    bool was_iterable;
797
21192
    if (!ReadIterable(env, context, transfer_list, args[1]).To(&was_iterable))
798
8
      return;
799
10596
    if (!was_iterable) {
800
      Local<Value> transfer_option;
801
65
      if (!args[1].As<Object>()->Get(context, env->transfer_string())
802
21
          .ToLocal(&transfer_option)) return;
803
26
      if (!transfer_option->IsUndefined()) {
804
22
        if (!ReadIterable(env, context, transfer_list, transfer_option)
805
12
            .To(&was_iterable)) return;
806
10
        if (!was_iterable) {
807
          return THROW_ERR_INVALID_ARG_TYPE(env,
808
7
              "Optional options.transfer argument must be an iterable");
809
        }
810
      }
811
    }
812
  }
813
814
42548
  MessagePort* port = Unwrap<MessagePort>(args.This());
815
  // Even if the backing MessagePort object has already been deleted, we still
816
  // want to serialize the message to ensure spec-compliant behavior w.r.t.
817
  // transfers.
818
42548
  if (port == nullptr) {
819
2
    Message msg;
820
1
    USE(msg.Serialize(env, context, args[0], transfer_list, obj));
821
1
    return;
822
  }
823
824
42547
  port->PostMessage(env, args[0], transfer_list);
825
}
826
827
22671
void MessagePort::Start() {
828
22671
  Debug(this, "Start receiving messages");
829
22671
  receiving_messages_ = true;
830
45342
  Mutex::ScopedLock lock(data_->mutex_);
831
22671
  if (!data_->incoming_messages_.empty())
832
10602
    TriggerAsync();
833
22671
}
834
835
20268
void MessagePort::Stop() {
836
20268
  Debug(this, "Stop receiving messages");
837
20268
  receiving_messages_ = false;
838
20268
}
839
840
22672
void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
841
  MessagePort* port;
842
22673
  ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
843
22672
  if (!port->data_) {
844
1
    return;
845
  }
846
22671
  port->Start();
847
}
848
849
20401
void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
850
  MessagePort* port;
851
40802
  CHECK(args[0]->IsObject());
852
40935
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
853
20270
  if (!port->data_) {
854
2
    return;
855
  }
856
20268
  port->Stop();
857
}
858
859
1038
void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
860
  MessagePort* port;
861
2076
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
862
508
  port->OnMessage();
863
}
864
865
11
void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) {
866
11
  Environment* env = Environment::GetCurrent(args);
867

41
  if (!args[0]->IsObject() ||
868
27
      !env->message_port_constructor_template()->HasInstance(args[0])) {
869
    return THROW_ERR_INVALID_ARG_TYPE(env,
870
10
        "The \"port\" argument must be a MessagePort instance");
871
  }
872
12
  MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
873
6
  if (port == nullptr) {
874
    // Return 'no messages' for a closed port.
875
    args.GetReturnValue().Set(
876
        Environment::GetCurrent(args)->no_message_symbol());
877
    return;
878
  }
879
880
  MaybeLocal<Value> payload =
881
12
      port->ReceiveMessage(port->object()->CreationContext(), false);
882
6
  if (!payload.IsEmpty())
883
12
    args.GetReturnValue().Set(payload.ToLocalChecked());
884
}
885
886
1
void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
887
1
  Environment* env = Environment::GetCurrent(args);
888

4
  if (!args[0]->IsObject() ||
889
3
      !env->message_port_constructor_template()->HasInstance(args[0])) {
890
    return THROW_ERR_INVALID_ARG_TYPE(env,
891
        "The \"port\" argument must be a MessagePort instance");
892
  }
893
2
  MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
894
1
  CHECK_NOT_NULL(port);
895
896
1
  Local<Value> context_arg = args[1];
897
  ContextifyContext* context_wrapper;
898

3
  if (!context_arg->IsObject() ||
899
      (context_wrapper = ContextifyContext::ContextFromContextifiedSandbox(
900
2
          env, context_arg.As<Object>())) == nullptr) {
901
    return THROW_ERR_INVALID_ARG_TYPE(env, "Invalid context argument");
902
  }
903
904
2
  std::unique_ptr<MessagePortData> data;
905
1
  if (!port->IsDetached())
906
1
    data = port->Detach();
907
908
1
  Context::Scope context_scope(context_wrapper->context());
909
  MessagePort* target =
910
1
      MessagePort::New(env, context_wrapper->context(), std::move(data));
911
1
  if (target != nullptr)
912
3
    args.GetReturnValue().Set(target->object());
913
}
914
915
10601
void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
916
10601
  Entangle(a, b->data_.get());
917
10601
}
918
919
11147
void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
920
11147
  MessagePortData::Entangle(a->data_.get(), b);
921
11147
}
922
923
void MessagePort::MemoryInfo(MemoryTracker* tracker) const {
924
  tracker->TrackField("data", data_);
925
  tracker->TrackField("emit_message_fn", emit_message_fn_);
926
}
927
928
40089
Local<FunctionTemplate> GetMessagePortConstructorTemplate(Environment* env) {
929
  // Factor generating the MessagePort JS constructor into its own piece
930
  // of code, because it is needed early on in the child environment setup.
931
40089
  Local<FunctionTemplate> templ = env->message_port_constructor_template();
932
40089
  if (!templ.IsEmpty())
933
36220
    return templ;
934
935
  {
936
3869
    Local<FunctionTemplate> m = env->NewFunctionTemplate(MessagePort::New);
937
7738
    m->SetClassName(env->message_port_constructor_string());
938
11607
    m->InstanceTemplate()->SetInternalFieldCount(
939
3869
        MessagePort::kInternalFieldCount);
940
7738
    m->Inherit(HandleWrap::GetConstructorTemplate(env));
941
942
3869
    env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage);
943
3869
    env->SetProtoMethod(m, "start", MessagePort::Start);
944
945
3869
    env->set_message_port_constructor_template(m);
946
  }
947
948
3869
  return GetMessagePortConstructorTemplate(env);
949
}
950
951
namespace {
952
953
10602
static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
954
10602
  Environment* env = Environment::GetCurrent(args);
955
10602
  if (!args.IsConstructCall()) {
956
1
    THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
957
1
    return;
958
  }
959
960
21202
  Local<Context> context = args.This()->CreationContext();
961
10601
  Context::Scope context_scope(context);
962
963
10601
  MessagePort* port1 = MessagePort::New(env, context);
964
10601
  if (port1 == nullptr) return;
965
10601
  MessagePort* port2 = MessagePort::New(env, context);
966
10601
  if (port2 == nullptr) {
967
    port1->Close();
968
    return;
969
  }
970
971
10601
  MessagePort::Entangle(port1, port2);
972
973
53005
  args.This()->Set(context, env->port1_string(), port1->object())
974
      .Check();
975
53005
  args.This()->Set(context, env->port2_string(), port2->object())
976
      .Check();
977
}
978
979
3869
static void InitMessaging(Local<Object> target,
980
                          Local<Value> unused,
981
                          Local<Context> context,
982
                          void* priv) {
983
3869
  Environment* env = Environment::GetCurrent(context);
984
985
  {
986
    Local<String> message_channel_string =
987
3869
        FIXED_ONE_BYTE_STRING(env->isolate(), "MessageChannel");
988
3869
    Local<FunctionTemplate> templ = env->NewFunctionTemplate(MessageChannel);
989
3869
    templ->SetClassName(message_channel_string);
990
7738
    target->Set(context,
991
                message_channel_string,
992
11607
                templ->GetFunction(context).ToLocalChecked()).Check();
993
  }
994
995
7738
  target->Set(context,
996
              env->message_port_constructor_string(),
997
7738
              GetMessagePortConstructorTemplate(env)
998
15476
                  ->GetFunction(context).ToLocalChecked()).Check();
999
1000
  // These are not methods on the MessagePort prototype, because
1001
  // the browser equivalents do not provide them.
1002
3869
  env->SetMethod(target, "stopMessagePort", MessagePort::Stop);
1003
3869
  env->SetMethod(target, "drainMessagePort", MessagePort::Drain);
1004
3869
  env->SetMethod(target, "receiveMessageOnPort", MessagePort::ReceiveMessage);
1005
  env->SetMethod(target, "moveMessagePortToContext",
1006
3869
                 MessagePort::MoveToContext);
1007
1008
  {
1009
7738
    Local<Function> domexception = GetDOMException(context).ToLocalChecked();
1010
    target
1011
7738
        ->Set(context,
1012
              FIXED_ONE_BYTE_STRING(env->isolate(), "DOMException"),
1013
11607
              domexception)
1014
        .Check();
1015
  }
1016
3869
}
1017
1018
}  // anonymous namespace
1019
1020
}  // namespace worker
1021
}  // namespace node
1022
1023
4325
NODE_MODULE_CONTEXT_AWARE_INTERNAL(messaging, node::worker::InitMessaging)