GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/node_messaging.cc Lines: 471 513 91.8 %
Date: 2020-02-19 22:14:06 Branches: 223 302 73.8 %

Line Branch Exec Source
1
#include "node_messaging.h"
2
3
#include "async_wrap-inl.h"
4
#include "debug_utils-inl.h"
5
#include "memory_tracker-inl.h"
6
#include "node_contextify.h"
7
#include "node_buffer.h"
8
#include "node_errors.h"
9
#include "node_process.h"
10
#include "util-inl.h"
11
12
using node::contextify::ContextifyContext;
13
using v8::Array;
14
using v8::ArrayBuffer;
15
using v8::BackingStore;
16
using v8::CompiledWasmModule;
17
using v8::Context;
18
using v8::EscapableHandleScope;
19
using v8::Exception;
20
using v8::Function;
21
using v8::FunctionCallbackInfo;
22
using v8::FunctionTemplate;
23
using v8::Global;
24
using v8::HandleScope;
25
using v8::Isolate;
26
using v8::Just;
27
using v8::Local;
28
using v8::Maybe;
29
using v8::MaybeLocal;
30
using v8::Nothing;
31
using v8::Object;
32
using v8::SharedArrayBuffer;
33
using v8::String;
34
using v8::Symbol;
35
using v8::Value;
36
using v8::ValueDeserializer;
37
using v8::ValueSerializer;
38
using v8::WasmModuleObject;
39
40
namespace node {
41
namespace worker {
42
43
180685
Message::Message(MallocedBuffer<char>&& buffer)
44
180685
    : main_message_buf_(std::move(buffer)) {}
45
46
105809
bool Message::IsCloseMessage() const {
47
105809
  return main_message_buf_.data == nullptr;
48
}
49
50
namespace {
51
52
// This is used to tell V8 how to read transferred host objects, like other
53
// `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
54
42193
class DeserializerDelegate : public ValueDeserializer::Delegate {
55
 public:
56
42913
  DeserializerDelegate(
57
      Message* m,
58
      Environment* env,
59
      const std::vector<MessagePort*>& message_ports,
60
      const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
61
      const std::vector<CompiledWasmModule>& wasm_modules)
62
42913
      : message_ports_(message_ports),
63
        shared_array_buffers_(shared_array_buffers),
64
42913
        wasm_modules_(wasm_modules) {}
65
66
10230
  MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
67
    // Currently, only MessagePort hosts objects are supported, so identifying
68
    // by the index in the message's MessagePort array is sufficient.
69
    uint32_t id;
70
10230
    if (!deserializer->ReadUint32(&id))
71
      return MaybeLocal<Object>();
72
10230
    CHECK_LE(id, message_ports_.size());
73
20460
    return message_ports_[id]->object(isolate);
74
  }
75
76
244
  MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
77
      Isolate* isolate, uint32_t clone_id) override {
78
244
    CHECK_LE(clone_id, shared_array_buffers_.size());
79
488
    return shared_array_buffers_[clone_id];
80
  }
81
82
2
  MaybeLocal<WasmModuleObject> GetWasmModuleFromId(
83
      Isolate* isolate, uint32_t transfer_id) override {
84
2
    CHECK_LE(transfer_id, wasm_modules_.size());
85
    return WasmModuleObject::FromCompiledModule(
86
2
        isolate, wasm_modules_[transfer_id]);
87
  }
88
89
  ValueDeserializer* deserializer = nullptr;
90
91
 private:
92
  const std::vector<MessagePort*>& message_ports_;
93
  const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
94
  const std::vector<CompiledWasmModule>& wasm_modules_;
95
};
96
97
}  // anonymous namespace
98
99
42393
MaybeLocal<Value> Message::Deserialize(Environment* env,
100
                                       Local<Context> context) {
101
42393
  CHECK(!IsCloseMessage());
102
103
42693
  EscapableHandleScope handle_scope(env->isolate());
104
  Context::Scope context_scope(context);
105
106
  // Create all necessary MessagePort handles.
107
85718
  std::vector<MessagePort*> ports(message_ports_.size());
108
53203
  for (uint32_t i = 0; i < message_ports_.size(); ++i) {
109
20460
    ports[i] = MessagePort::New(env,
110
                                context,
111
10230
                                std::move(message_ports_[i]));
112
10230
    if (ports[i] == nullptr) {
113
      for (MessagePort* port : ports) {
114
        // This will eventually release the MessagePort object itself.
115
        if (port != nullptr)
116
          port->Close();
117
      }
118
      return MaybeLocal<Value>();
119
    }
120
  }
121
42904
  message_ports_.clear();
122
123
84870
  std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
124
  // Attach all transferred SharedArrayBuffers to their new Isolate.
125
43268
  for (uint32_t i = 0; i < shared_array_buffers_.size(); ++i) {
126
    Local<SharedArrayBuffer> sab =
127
        SharedArrayBuffer::New(env->isolate(),
128
244
                               std::move(shared_array_buffers_[i]));
129
244
    shared_array_buffers.push_back(sab);
130
  }
131
41932
  shared_array_buffers_.clear();
132
133
  DeserializerDelegate delegate(
134
85763
      this, env, ports, shared_array_buffers, wasm_modules_);
135
  ValueDeserializer deserializer(
136
      env->isolate(),
137
42708
      reinterpret_cast<const uint8_t*>(main_message_buf_.data),
138
      main_message_buf_.size,
139
128417
      &delegate);
140
43045
  delegate.deserializer = &deserializer;
141
142
  // Attach all transferred ArrayBuffers to their new Isolate.
143
43052
  for (uint32_t i = 0; i < array_buffers_.size(); ++i) {
144
    Local<ArrayBuffer> ab =
145
7
        ArrayBuffer::New(env->isolate(), std::move(array_buffers_[i]));
146
7
    deserializer.TransferArrayBuffer(i, ab);
147
  }
148
43046
  array_buffers_.clear();
149
150
85436
  if (deserializer.ReadHeader(context).IsNothing())
151
    return MaybeLocal<Value>();
152
  return handle_scope.Escape(
153
85391
      deserializer.ReadValue(context).FromMaybe(Local<Value>()));
154
}
155
156
246
void Message::AddSharedArrayBuffer(
157
    std::shared_ptr<BackingStore> backing_store) {
158
246
  shared_array_buffers_.emplace_back(std::move(backing_store));
159
246
}
160
161
10232
void Message::AddMessagePort(std::unique_ptr<MessagePortData>&& data) {
162
10232
  message_ports_.emplace_back(std::move(data));
163
10232
}
164
165
2
uint32_t Message::AddWASMModule(CompiledWasmModule&& mod) {
166
2
  wasm_modules_.emplace_back(std::move(mod));
167
2
  return wasm_modules_.size() - 1;
168
}
169
170
namespace {
171
172
31227
MaybeLocal<Function> GetEmitMessageFunction(Local<Context> context) {
173
31227
  Isolate* isolate = context->GetIsolate();
174
  Local<Object> per_context_bindings;
175
  Local<Value> emit_message_val;
176

124908
  if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
177
93681
      !per_context_bindings->Get(context,
178
93681
                                FIXED_ONE_BYTE_STRING(isolate, "emitMessage"))
179
31227
          .ToLocal(&emit_message_val)) {
180
    return MaybeLocal<Function>();
181
  }
182
31227
  CHECK(emit_message_val->IsFunction());
183
31227
  return emit_message_val.As<Function>();
184
}
185
186
3685
MaybeLocal<Function> GetDOMException(Local<Context> context) {
187
3685
  Isolate* isolate = context->GetIsolate();
188
  Local<Object> per_context_bindings;
189
  Local<Value> domexception_ctor_val;
190

14740
  if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
191
11055
      !per_context_bindings->Get(context,
192
11055
                                FIXED_ONE_BYTE_STRING(isolate, "DOMException"))
193
3685
          .ToLocal(&domexception_ctor_val)) {
194
    return MaybeLocal<Function>();
195
  }
196
3685
  CHECK(domexception_ctor_val->IsFunction());
197
3685
  Local<Function> domexception_ctor = domexception_ctor_val.As<Function>();
198
3685
  return domexception_ctor;
199
}
200
201
14
void ThrowDataCloneException(Local<Context> context, Local<String> message) {
202
14
  Isolate* isolate = context->GetIsolate();
203
  Local<Value> argv[] = {message,
204
42
                         FIXED_ONE_BYTE_STRING(isolate, "DataCloneError")};
205
  Local<Value> exception;
206
  Local<Function> domexception_ctor;
207

56
  if (!GetDOMException(context).ToLocal(&domexception_ctor) ||
208
56
      !domexception_ctor->NewInstance(context, arraysize(argv), argv)
209
14
           .ToLocal(&exception)) {
210
    return;
211
  }
212
14
  isolate->ThrowException(exception);
213
}
214
215
// This tells V8 how to serialize objects that it does not understand
216
// (e.g. C++ objects) into the output buffer, in a way that our own
217
// DeserializerDelegate understands how to unpack.
218
43133
class SerializerDelegate : public ValueSerializer::Delegate {
219
 public:
220
43133
  SerializerDelegate(Environment* env, Local<Context> context, Message* m)
221
43133
      : env_(env), context_(context), msg_(m) {}
222
223
4
  void ThrowDataCloneError(Local<String> message) override {
224
4
    ThrowDataCloneException(context_, message);
225
4
  }
226
227
10232
  Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
228
20464
    if (env_->message_port_constructor_template()->HasInstance(object)) {
229
10231
      return WriteMessagePort(Unwrap<MessagePort>(object));
230
    }
231
232
1
    ThrowDataCloneError(env_->clone_unsupported_type_str());
233
1
    return Nothing<bool>();
234
  }
235
236
246
  Maybe<uint32_t> GetSharedArrayBufferId(
237
      Isolate* isolate,
238
      Local<SharedArrayBuffer> shared_array_buffer) override {
239
    uint32_t i;
240
259
    for (i = 0; i < seen_shared_array_buffers_.size(); ++i) {
241
26
      if (PersistentToLocal::Strong(seen_shared_array_buffers_[i]) ==
242
          shared_array_buffer) {
243
        return Just(i);
244
      }
245
    }
246
247
246
    seen_shared_array_buffers_.emplace_back(
248
492
      Global<SharedArrayBuffer> { isolate, shared_array_buffer });
249
246
    msg_->AddSharedArrayBuffer(shared_array_buffer->GetBackingStore());
250
246
    return Just(i);
251
  }
252
253
2
  Maybe<uint32_t> GetWasmModuleTransferId(
254
      Isolate* isolate, Local<WasmModuleObject> module) override {
255
2
    return Just(msg_->AddWASMModule(module->GetCompiledModule()));
256
  }
257
258
43119
  void Finish() {
259
    // Only close the MessagePort handles and actually transfer them
260
    // once we know that serialization succeeded.
261
53351
    for (MessagePort* port : ports_) {
262
20464
      port->Close();
263
10232
      msg_->AddMessagePort(port->Detach());
264
    }
265
43119
  }
266
267
  ValueSerializer* serializer = nullptr;
268
269
 private:
270
10231
  Maybe<bool> WriteMessagePort(MessagePort* port) {
271
10231
    for (uint32_t i = 0; i < ports_.size(); i++) {
272
10231
      if (ports_[i] == port) {
273
10231
        serializer->WriteUint32(i);
274
10231
        return Just(true);
275
      }
276
    }
277
278
    THROW_ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST(env_);
279
    return Nothing<bool>();
280
  }
281
282
  Environment* env_;
283
  Local<Context> context_;
284
  Message* msg_;
285
  std::vector<Global<SharedArrayBuffer>> seen_shared_array_buffers_;
286
  std::vector<MessagePort*> ports_;
287
288
  friend class worker::Message;
289
};
290
291
}  // anonymous namespace
292
293
43133
Maybe<bool> Message::Serialize(Environment* env,
294
                               Local<Context> context,
295
                               Local<Value> input,
296
                               const TransferList& transfer_list_v,
297
                               Local<Object> source_port) {
298
86266
  HandleScope handle_scope(env->isolate());
299
  Context::Scope context_scope(context);
300
301
  // Verify that we're not silently overwriting an existing message.
302
43134
  CHECK(main_message_buf_.is_empty());
303
304
86267
  SerializerDelegate delegate(env, context, this);
305
86267
  ValueSerializer serializer(env->isolate(), &delegate);
306
43133
  delegate.serializer = &serializer;
307
308
86267
  std::vector<Local<ArrayBuffer>> array_buffers;
309
53390
  for (uint32_t i = 0; i < transfer_list_v.length(); ++i) {
310
10266
    Local<Value> entry = transfer_list_v[i];
311
    // Currently, we support ArrayBuffers and MessagePorts.
312
10266
    if (entry->IsArrayBuffer()) {
313
23
      Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
314
      // If we cannot render the ArrayBuffer unusable in this Isolate,
315
      // copying the buffer will have to do.
316
      // Note that we can currently transfer ArrayBuffers even if they were
317
      // not allocated by Node’s ArrayBufferAllocator in the first place,
318
      // because we pass the underlying v8::BackingStore around rather than
319
      // raw data *and* an Isolate with a non-default ArrayBuffer allocator
320
      // is always going to outlive any Workers it creates, and so will its
321
      // allocator along with it.
322
45
      if (!ab->IsDetachable()) continue;
323
      // See https://github.com/nodejs/node/pull/30339#issuecomment-552225353
324
      // for details.
325
      bool untransferrable;
326
46
      if (!ab->HasPrivate(
327
              context,
328
23
              env->arraybuffer_untransferable_private_symbol())
329
23
              .To(&untransferrable)) {
330
1
        return Nothing<bool>();
331
      }
332
23
      if (untransferrable) continue;
333
42
      if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
334
42
          array_buffers.end()) {
335
1
        ThrowDataCloneException(
336
            context,
337
            FIXED_ONE_BYTE_STRING(
338
                env->isolate(),
339
1
                "Transfer list contains duplicate ArrayBuffer"));
340
1
        return Nothing<bool>();
341
      }
342
      // We simply use the array index in the `array_buffers` list as the
343
      // ID that we write into the serialized buffer.
344
20
      uint32_t id = array_buffers.size();
345
20
      array_buffers.push_back(ab);
346
20
      serializer.TransferArrayBuffer(id, ab);
347
20
      continue;
348
20486
    } else if (env->message_port_constructor_template()
349
10243
                  ->HasInstance(entry)) {
350
      // Check if the source MessagePort is being transferred.
351

20486
      if (!source_port.IsEmpty() && entry == source_port) {
352
1
        ThrowDataCloneException(
353
            context,
354
            FIXED_ONE_BYTE_STRING(env->isolate(),
355
1
                                  "Transfer list contains source port"));
356
10
        return Nothing<bool>();
357
      }
358
10242
      MessagePort* port = Unwrap<MessagePort>(entry.As<Object>());
359

10242
      if (port == nullptr || port->IsDetached()) {
360
7
        ThrowDataCloneException(
361
            context,
362
            FIXED_ONE_BYTE_STRING(
363
                env->isolate(),
364
7
                "MessagePort in transfer list is already detached"));
365
7
        return Nothing<bool>();
366
      }
367
20470
      if (std::find(delegate.ports_.begin(), delegate.ports_.end(), port) !=
368
20470
          delegate.ports_.end()) {
369
1
        ThrowDataCloneException(
370
            context,
371
            FIXED_ONE_BYTE_STRING(
372
                env->isolate(),
373
1
                "Transfer list contains duplicate MessagePort"));
374
1
        return Nothing<bool>();
375
      }
376
10234
      delegate.ports_.push_back(port);
377
10234
      continue;
378
    }
379
380
    THROW_ERR_INVALID_TRANSFER_OBJECT(env);
381
    return Nothing<bool>();
382
  }
383
384
43124
  serializer.WriteHeader();
385
86246
  if (serializer.WriteValue(context, input).IsNothing()) {
386
4
    return Nothing<bool>();
387
  }
388
389
43131
  for (Local<ArrayBuffer> ab : array_buffers) {
390
    // If serialization succeeded, we render it inaccessible in this Isolate.
391
24
    std::shared_ptr<BackingStore> backing_store = ab->GetBackingStore();
392
    // TODO(addaleax): This can/should be dropped once we have V8 8.0.
393
12
    if (!ab->IsExternal())
394
12
      ab->Externalize(backing_store);
395
12
    ab->Detach();
396
397
12
    array_buffers_.emplace_back(std::move(backing_store));
398
  }
399
400
43119
  delegate.Finish();
401
402
  // The serializer gave us a buffer allocated using `malloc()`.
403
43119
  std::pair<uint8_t*, size_t> data = serializer.Release();
404
43120
  CHECK_NOT_NULL(data.first);
405
  main_message_buf_ =
406
43120
      MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
407
43119
  return Just(true);
408
}
409
410
void Message::MemoryInfo(MemoryTracker* tracker) const {
411
  tracker->TrackField("array_buffers_", array_buffers_);
412
  tracker->TrackField("shared_array_buffers", shared_array_buffers_);
413
  tracker->TrackField("message_ports", message_ports_);
414
}
415
416
31457
MessagePortData::MessagePortData(MessagePort* owner) : owner_(owner) { }
417
418
94294
MessagePortData::~MessagePortData() {
419
31430
  CHECK_NULL(owner_);
420
31430
  Disentangle();
421
62866
}
422
423
void MessagePortData::MemoryInfo(MemoryTracker* tracker) const {
424
  Mutex::ScopedLock lock(mutex_);
425
  tracker->TrackField("incoming_messages", incoming_messages_);
426
}
427
428
106009
void MessagePortData::AddToIncomingQueue(Message&& message) {
429
  // This function will be called by other threads.
430
212003
  Mutex::ScopedLock lock(mutex_);
431
106015
  incoming_messages_.emplace_back(std::move(message));
432
433
105994
  if (owner_ != nullptr) {
434
52932
    Debug(owner_, "Adding message to incoming queue");
435
52932
    owner_->TriggerAsync();
436
  }
437
106013
}
438
439
10499
void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
440
10499
  CHECK_NULL(a->sibling_);
441
10499
  CHECK_NULL(b->sibling_);
442
10499
  a->sibling_ = b;
443
10499
  b->sibling_ = a;
444
10499
  a->sibling_mutex_ = b->sibling_mutex_;
445
10499
}
446
447
52399
void MessagePortData::Disentangle() {
448
  // Grab a copy of the sibling mutex, then replace it so that each sibling
449
  // has its own sibling_mutex_ now.
450
104805
  std::shared_ptr<Mutex> sibling_mutex = sibling_mutex_;
451
104802
  Mutex::ScopedLock sibling_lock(*sibling_mutex);
452
52406
  sibling_mutex_ = std::make_shared<Mutex>();
453
454
52407
  MessagePortData* sibling = sibling_;
455
52407
  if (sibling_ != nullptr) {
456
10499
    sibling_->sibling_ = nullptr;
457
10499
    sibling_ = nullptr;
458
  }
459
460
  // We close MessagePorts after disentanglement, so we enqueue a corresponding
461
  // message and trigger the corresponding uv_async_t to let them know that
462
  // this happened.
463
52407
  AddToIncomingQueue(Message());
464
52397
  if (sibling != nullptr) {
465
10499
    sibling->AddToIncomingQueue(Message());
466
  }
467
52402
}
468
469
124814
MessagePort::~MessagePort() {
470
31202
  if (data_)
471
    data_->owner_ = nullptr;
472
62408
}
473
474
31227
MessagePort::MessagePort(Environment* env,
475
                         Local<Context> context,
476
31227
                         Local<Object> wrap)
477
  : HandleWrap(env,
478
               wrap,
479
31227
               reinterpret_cast<uv_handle_t*>(&async_),
480
               AsyncWrap::PROVIDER_MESSAGEPORT),
481
31227
    data_(new MessagePortData(this)) {
482
94527
  auto onmessage = [](uv_async_t* handle) {
483
    // Called when data has been put into the queue.
484
31650
    MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
485
31650
    channel->OnMessage();
486
94525
  };
487
488
31227
  CHECK_EQ(uv_async_init(env->event_loop(),
489
                         &async_,
490
                         onmessage), 0);
491
  // Reset later to indicate success of the constructor.
492
31227
  bool succeeded = false;
493
93681
  auto cleanup = OnScopeLeave([&]() { if (!succeeded) Close(); });
494
495
  Local<Value> fn;
496
93681
  if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn))
497
    return;
498
499
31227
  if (fn->IsFunction()) {
500
31226
    Local<Function> init = fn.As<Function>();
501
62452
    if (init->Call(context, wrap, 0, nullptr).IsEmpty())
502
      return;
503
  }
504
505
  Local<Function> emit_message_fn;
506
62454
  if (!GetEmitMessageFunction(context).ToLocal(&emit_message_fn))
507
    return;
508
31227
  emit_message_fn_.Reset(env->isolate(), emit_message_fn);
509
510
31227
  succeeded = true;
511
31227
  Debug(this, "Created message port");
512
}
513
514
10242
bool MessagePort::IsDetached() const {
515

10242
  return data_ == nullptr || IsHandleClosing();
516
}
517
518
73917
void MessagePort::TriggerAsync() {
519
73917
  if (IsHandleClosing()) return;
520
73902
  CHECK_EQ(uv_async_send(&async_), 0);
521
}
522
523
31227
void MessagePort::Close(v8::Local<v8::Value> close_callback) {
524
62455
  Debug(this, "Closing message port, data set = %d", static_cast<int>(!!data_));
525
526
31228
  if (data_) {
527
    // Wrap this call with accessing the mutex, so that TriggerAsync()
528
    // can check IsHandleClosing() without race conditions.
529
62453
    Mutex::ScopedLock sibling_lock(data_->mutex_);
530
31227
    HandleWrap::Close(close_callback);
531
  } else {
532
1
    HandleWrap::Close(close_callback);
533
  }
534
31228
}
535
536
2
void MessagePort::New(const FunctionCallbackInfo<Value>& args) {
537
  // This constructor just throws an error. Unfortunately, we can’t use V8’s
538
  // ConstructorBehavior::kThrow, as that also removes the prototype from the
539
  // class (i.e. makes it behave like an arrow function).
540
2
  Environment* env = Environment::GetCurrent(args);
541
2
  THROW_ERR_CONSTRUCT_CALL_INVALID(env);
542
2
}
543
544
31227
MessagePort* MessagePort::New(
545
    Environment* env,
546
    Local<Context> context,
547
    std::unique_ptr<MessagePortData> data) {
548
  Context::Scope context_scope(context);
549
31227
  Local<FunctionTemplate> ctor_templ = GetMessagePortConstructorTemplate(env);
550
551
  // Construct a new instance, then assign the listener instance and possibly
552
  // the MessagePortData to it.
553
  Local<Object> instance;
554
93681
  if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance))
555
    return nullptr;
556
31227
  MessagePort* port = new MessagePort(env, context, instance);
557
31227
  CHECK_NOT_NULL(port);
558
31227
  if (port->IsHandleClosing()) {
559
    // Construction failed with an exception.
560
    return nullptr;
561
  }
562
563
31227
  if (data) {
564
10459
    port->Detach();
565
10459
    port->data_ = std::move(data);
566
567
    // This lock is here to avoid race conditions with the `owner_` read
568
    // in AddToIncomingQueue(). (This would likely be unproblematic without it,
569
    // but it's better to be safe than sorry.)
570
20918
    Mutex::ScopedLock lock(port->data_->mutex_);
571
10459
    port->data_->owner_ = port;
572
    // If the existing MessagePortData object had pending messages, this is
573
    // the easiest way to run that queue.
574
10459
    port->TriggerAsync();
575
  }
576
31227
  return port;
577
}
578
579
74659
MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
580
                                              bool only_if_receiving) {
581
149276
  Message received;
582
  {
583
    // Get the head of the message queue.
584
128138
    Mutex::ScopedLock lock(data_->mutex_);
585
586
74695
    Debug(this, "MessagePort has message");
587
588

74695
    bool wants_message = receiving_messages_ || !only_if_receiving;
589
    // We have nothing to do if:
590
    // - There are no pending messages
591
    // - We are not intending to receive messages, and the message we would
592
    //   receive is not the final "close" message.
593

149399
    if (data_->incoming_messages_.empty() ||
594
63541
        (!wants_message &&
595
10017
         !data_->incoming_messages_.front().IsCloseMessage())) {
596
42348
      return env()->no_message_symbol();
597
    }
598
599
53516
    received = std::move(data_->incoming_messages_.front());
600
53459
    data_->incoming_messages_.pop_front();
601
  }
602
603
53495
  if (received.IsCloseMessage()) {
604
20948
    Close();
605
20948
    return env()->no_message_symbol();
606
  }
607
608
42466
  if (!env()->can_call_into_js()) return MaybeLocal<Value>();
609
610
42940
  return received.Deserialize(env(), context);
611
}
612
613
31680
void MessagePort::OnMessage() {
614
31680
  Debug(this, "Running MessagePort::OnMessage()");
615
63325
  HandleScope handle_scope(env()->isolate());
616
63360
  Local<Context> context = object(env()->isolate())->CreationContext();
617
618
  size_t processing_limit;
619
  {
620
31680
    Mutex::ScopedLock(data_->mutex_);
621
63360
    processing_limit = std::max(data_->incoming_messages_.size(),
622
95040
                                static_cast<size_t>(1000));
623
  }
624
625
  // data_ can only ever be modified by the owner thread, so no need to lock.
626
  // However, the message port may be transferred while it is processing
627
  // messages, so we need to check that this handle still owns its `data_` field
628
  // on every iteration.
629

117700
  while (data_) {
630
74649
    if (processing_limit-- == 0) {
631
      // Prevent event loop starvation by only processing those messages without
632
      // interruption that were already present when the OnMessage() call was
633
      // first triggered, but at least 1000 messages because otherwise the
634
      // overhead of repeatedly triggering the uv_async_t instance becomes
635
      // noticable, at least on Windows.
636
      // (That might require more investigation by somebody more familiar with
637
      // Windows.)
638
1
      TriggerAsync();
639
1
      return;
640
    }
641
642
117658
    HandleScope handle_scope(env()->isolate());
643

43010
    Context::Scope context_scope(context);
644
645
    Local<Value> payload;
646
149213
    if (!ReceiveMessage(context, true).ToLocal(&payload)) break;
647
149234
    if (payload == env()->no_message_symbol()) break;
648
649
43032
    if (!env()->can_call_into_js()) {
650
      Debug(this, "MessagePort drains queue because !can_call_into_js()");
651
      // In this case there is nothing to do but to drain the current queue.
652
      continue;
653
    }
654
655
42860
    Local<Function> emit_message = PersistentToLocal::Strong(emit_message_fn_);
656
86053
    if (MakeCallback(emit_message, 1, &payload).IsEmpty()) {
657
      // Re-schedule OnMessage() execution in case of failure.
658
32
      if (data_)
659
32
        TriggerAsync();
660
32
      return;
661
    }
662
  }
663
}
664
665
31204
void MessagePort::OnClose() {
666
31204
  Debug(this, "MessagePort::OnClose()");
667
31204
  if (data_) {
668
20971
    data_->owner_ = nullptr;
669
20972
    data_->Disentangle();
670
  }
671
31205
  data_.reset();
672
31205
}
673
674
20692
std::unique_ptr<MessagePortData> MessagePort::Detach() {
675
20692
  CHECK(data_);
676
41384
  Mutex::ScopedLock lock(data_->mutex_);
677
20692
  data_->owner_ = nullptr;
678
41384
  return std::move(data_);
679
}
680
681
682
43133
Maybe<bool> MessagePort::PostMessage(Environment* env,
683
                                     Local<Value> message_v,
684
                                     const TransferList& transfer_v) {
685
43133
  Isolate* isolate = env->isolate();
686
43133
  Local<Object> obj = object(isolate);
687
43133
  Local<Context> context = obj->CreationContext();
688
689
86266
  Message msg;
690
691
  // Per spec, we need to both check if transfer list has the source port, and
692
  // serialize the input message, even if the MessagePort is closed or detached.
693
694
  Maybe<bool> serialization_maybe =
695
43133
      msg.Serialize(env, context, message_v, transfer_v, obj);
696
43132
  if (data_ == nullptr) {
697
2
    return serialization_maybe;
698
  }
699
43130
  if (serialization_maybe.IsNothing()) {
700
11
    return Nothing<bool>();
701
  }
702
703
86239
  Mutex::ScopedLock lock(*data_->sibling_mutex_);
704
43120
  bool doomed = false;
705
706
  // Check if the target port is posted to itself.
707
43120
  if (data_->sibling_ != nullptr) {
708
53340
    for (const auto& port_data : msg.message_ports()) {
709
10232
      if (data_->sibling_ == port_data.get()) {
710
1
        doomed = true;
711
        ProcessEmitWarning(env, "The target port was posted to itself, and "
712
1
                                "the communication channel was lost");
713
1
        break;
714
      }
715
    }
716
  }
717
718

43120
  if (data_->sibling_ == nullptr || doomed)
719
11
    return Just(true);
720
721
43109
  data_->sibling_->AddToIncomingQueue(std::move(msg));
722
43109
  return Just(true);
723
}
724
725
10280
static Maybe<bool> ReadIterable(Environment* env,
726
                                Local<Context> context,
727
                                // NOLINTNEXTLINE(runtime/references)
728
                                TransferList& transfer_list,
729
                                Local<Value> object) {
730
10280
  if (!object->IsObject()) return Just(false);
731
732
10277
  if (object->IsArray()) {
733
10257
    Local<Array> arr = object.As<Array>();
734
10257
    size_t length = arr->Length();
735
10257
    transfer_list.AllocateSufficientStorage(length);
736
20521
    for (size_t i = 0; i < length; i++) {
737
30792
      if (!arr->Get(context, i).ToLocal(&transfer_list[i]))
738
        return Nothing<bool>();
739
    }
740
10257
    return Just(true);
741
  }
742
743
20
  Isolate* isolate = env->isolate();
744
  Local<Value> iterator_method;
745
80
  if (!object.As<Object>()->Get(context, Symbol::GetIterator(isolate))
746
20
      .ToLocal(&iterator_method)) return Nothing<bool>();
747
20
  if (!iterator_method->IsFunction()) return Just(false);
748
749
  Local<Value> iterator;
750
18
  if (!iterator_method.As<Function>()->Call(context, object, 0, nullptr)
751
6
      .ToLocal(&iterator)) return Nothing<bool>();
752
6
  if (!iterator->IsObject()) return Just(false);
753
754
  Local<Value> next;
755
24
  if (!iterator.As<Object>()->Get(context, env->next_string()).ToLocal(&next))
756
    return Nothing<bool>();
757
6
  if (!next->IsFunction()) return Just(false);
758
759
6
  std::vector<Local<Value>> entries;
760
7
  while (env->can_call_into_js()) {
761
    Local<Value> result;
762
15
    if (!next.As<Function>()->Call(context, iterator, 0, nullptr)
763
7
        .ToLocal(&result)) return Nothing<bool>();
764
4
    if (!result->IsObject()) return Just(false);
765
766
    Local<Value> done;
767
16
    if (!result.As<Object>()->Get(context, env->done_string()).ToLocal(&done))
768
      return Nothing<bool>();
769
4
    if (done->BooleanValue(isolate)) break;
770
771
    Local<Value> val;
772
8
    if (!result.As<Object>()->Get(context, env->value_string()).ToLocal(&val))
773
      return Nothing<bool>();
774
2
    entries.push_back(val);
775
  }
776
777
2
  transfer_list.AllocateSufficientStorage(entries.size());
778
2
  std::copy(entries.begin(), entries.end(), &transfer_list[0]);
779
2
  return Just(true);
780
}
781
782
43146
void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
783
43146
  Environment* env = Environment::GetCurrent(args);
784
43146
  Local<Object> obj = args.This();
785
43146
  Local<Context> context = obj->CreationContext();
786
787
43146
  if (args.Length() == 0) {
788
    return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
789
13
                                       "MessagePort.postMessage");
790
  }
791
792

149984
  if (!args[1]->IsNullOrUndefined() && !args[1]->IsObject()) {
793
    // Browsers ignore null or undefined, and otherwise accept an array or an
794
    // options object.
795
    return THROW_ERR_INVALID_ARG_TYPE(env,
796
4
        "Optional transferList argument must be an iterable");
797
  }
798
799
86275
  TransferList transfer_list;
800
86284
  if (args[1]->IsObject()) {
801
    bool was_iterable;
802
20538
    if (!ReadIterable(env, context, transfer_list, args[1]).To(&was_iterable))
803
8
      return;
804
10269
    if (!was_iterable) {
805
      Local<Value> transfer_option;
806
65
      if (!args[1].As<Object>()->Get(context, env->transfer_string())
807
21
          .ToLocal(&transfer_option)) return;
808
26
      if (!transfer_option->IsUndefined()) {
809
22
        if (!ReadIterable(env, context, transfer_list, transfer_option)
810
12
            .To(&was_iterable)) return;
811
10
        if (!was_iterable) {
812
          return THROW_ERR_INVALID_ARG_TYPE(env,
813
7
              "Optional options.transfer argument must be an iterable");
814
        }
815
      }
816
    }
817
  }
818
819
43134
  MessagePort* port = Unwrap<MessagePort>(args.This());
820
  // Even if the backing MessagePort object has already been deleted, we still
821
  // want to serialize the message to ensure spec-compliant behavior w.r.t.
822
  // transfers.
823
43134
  if (port == nullptr) {
824
2
    Message msg;
825
1
    USE(msg.Serialize(env, context, args[0], transfer_list, obj));
826
1
    return;
827
  }
828
829
43133
  port->PostMessage(env, args[0], transfer_list);
830
}
831
832
21323
void MessagePort::Start() {
833
21323
  Debug(this, "Start receiving messages");
834
21323
  receiving_messages_ = true;
835
42646
  Mutex::ScopedLock lock(data_->mutex_);
836
21323
  if (!data_->incoming_messages_.empty())
837
10494
    TriggerAsync();
838
21323
}
839
840
20037
void MessagePort::Stop() {
841
20037
  Debug(this, "Stop receiving messages");
842
20037
  receiving_messages_ = false;
843
20037
}
844
845
21324
void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
846
  MessagePort* port;
847
21325
  ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
848
21324
  if (!port->data_) {
849
1
    return;
850
  }
851
21323
  port->Start();
852
}
853
854
20139
void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
855
  MessagePort* port;
856
40278
  CHECK(args[0]->IsObject());
857
40380
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
858
20039
  if (!port->data_) {
859
2
    return;
860
  }
861
20037
  port->Stop();
862
}
863
864
456
void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
865
  MessagePort* port;
866
912
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
867
30
  port->OnMessage();
868
}
869
870
6
void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) {
871
12
  CHECK(args[0]->IsObject());
872
12
  MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
873
6
  if (port == nullptr) {
874
    // Return 'no messages' for a closed port.
875
    args.GetReturnValue().Set(
876
        Environment::GetCurrent(args)->no_message_symbol());
877
    return;
878
  }
879
880
  MaybeLocal<Value> payload =
881
12
      port->ReceiveMessage(port->object()->CreationContext(), false);
882
6
  if (!payload.IsEmpty())
883
12
    args.GetReturnValue().Set(payload.ToLocalChecked());
884
}
885
886
1
void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
887
1
  Environment* env = Environment::GetCurrent(args);
888

4
  if (!args[0]->IsObject() ||
889
3
      !env->message_port_constructor_template()->HasInstance(args[0])) {
890
    return THROW_ERR_INVALID_ARG_TYPE(env,
891
        "First argument needs to be a MessagePort instance");
892
  }
893
2
  MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
894
1
  CHECK_NOT_NULL(port);
895
896
1
  Local<Value> context_arg = args[1];
897
  ContextifyContext* context_wrapper;
898

3
  if (!context_arg->IsObject() ||
899
      (context_wrapper = ContextifyContext::ContextFromContextifiedSandbox(
900
2
          env, context_arg.As<Object>())) == nullptr) {
901
    return THROW_ERR_INVALID_ARG_TYPE(env, "Invalid context argument");
902
  }
903
904
2
  std::unique_ptr<MessagePortData> data;
905
1
  if (!port->IsDetached())
906
1
    data = port->Detach();
907
908
1
  Context::Scope context_scope(context_wrapper->context());
909
  MessagePort* target =
910
1
      MessagePort::New(env, context_wrapper->context(), std::move(data));
911
1
  if (target != nullptr)
912
3
    args.GetReturnValue().Set(target->object());
913
}
914
915
10269
void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
916
10269
  Entangle(a, b->data_.get());
917
10269
}
918
919
10499
void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
920
10499
  MessagePortData::Entangle(a->data_.get(), b);
921
10499
}
922
923
void MessagePort::MemoryInfo(MemoryTracker* tracker) const {
924
  tracker->TrackField("data", data_);
925
  tracker->TrackField("emit_message_fn", emit_message_fn_);
926
}
927
928
38569
Local<FunctionTemplate> GetMessagePortConstructorTemplate(Environment* env) {
929
  // Factor generating the MessagePort JS constructor into its own piece
930
  // of code, because it is needed early on in the child environment setup.
931
38569
  Local<FunctionTemplate> templ = env->message_port_constructor_template();
932
38569
  if (!templ.IsEmpty())
933
34898
    return templ;
934
935
  {
936
3671
    Local<FunctionTemplate> m = env->NewFunctionTemplate(MessagePort::New);
937
7342
    m->SetClassName(env->message_port_constructor_string());
938
7342
    m->InstanceTemplate()->SetInternalFieldCount(1);
939
7342
    m->Inherit(HandleWrap::GetConstructorTemplate(env));
940
941
3671
    env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage);
942
3671
    env->SetProtoMethod(m, "start", MessagePort::Start);
943
944
3671
    env->set_message_port_constructor_template(m);
945
  }
946
947
3671
  return GetMessagePortConstructorTemplate(env);
948
}
949
950
namespace {
951
952
10270
static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
953
10270
  Environment* env = Environment::GetCurrent(args);
954
10270
  if (!args.IsConstructCall()) {
955
1
    THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
956
1
    return;
957
  }
958
959
20538
  Local<Context> context = args.This()->CreationContext();
960
10269
  Context::Scope context_scope(context);
961
962
10269
  MessagePort* port1 = MessagePort::New(env, context);
963
10269
  if (port1 == nullptr) return;
964
10269
  MessagePort* port2 = MessagePort::New(env, context);
965
10269
  if (port2 == nullptr) {
966
    port1->Close();
967
    return;
968
  }
969
970
10269
  MessagePort::Entangle(port1, port2);
971
972
51345
  args.This()->Set(context, env->port1_string(), port1->object())
973
      .Check();
974
51345
  args.This()->Set(context, env->port2_string(), port2->object())
975
      .Check();
976
}
977
978
3671
static void InitMessaging(Local<Object> target,
979
                          Local<Value> unused,
980
                          Local<Context> context,
981
                          void* priv) {
982
3671
  Environment* env = Environment::GetCurrent(context);
983
984
  {
985
    Local<String> message_channel_string =
986
3671
        FIXED_ONE_BYTE_STRING(env->isolate(), "MessageChannel");
987
3671
    Local<FunctionTemplate> templ = env->NewFunctionTemplate(MessageChannel);
988
3671
    templ->SetClassName(message_channel_string);
989
7342
    target->Set(context,
990
                message_channel_string,
991
11013
                templ->GetFunction(context).ToLocalChecked()).Check();
992
  }
993
994
7342
  target->Set(context,
995
              env->message_port_constructor_string(),
996
7342
              GetMessagePortConstructorTemplate(env)
997
14684
                  ->GetFunction(context).ToLocalChecked()).Check();
998
999
  // These are not methods on the MessagePort prototype, because
1000
  // the browser equivalents do not provide them.
1001
3671
  env->SetMethod(target, "stopMessagePort", MessagePort::Stop);
1002
3671
  env->SetMethod(target, "drainMessagePort", MessagePort::Drain);
1003
3671
  env->SetMethod(target, "receiveMessageOnPort", MessagePort::ReceiveMessage);
1004
  env->SetMethod(target, "moveMessagePortToContext",
1005
3671
                 MessagePort::MoveToContext);
1006
1007
  {
1008
7342
    Local<Function> domexception = GetDOMException(context).ToLocalChecked();
1009
    target
1010
7342
        ->Set(context,
1011
              FIXED_ONE_BYTE_STRING(env->isolate(), "DOMException"),
1012
11013
              domexception)
1013
        .Check();
1014
  }
1015
3671
}
1016
1017
}  // anonymous namespace
1018
1019
}  // namespace worker
1020
}  // namespace node
1021
1022
4185
NODE_MODULE_CONTEXT_AWARE_INTERNAL(messaging, node::worker::InitMessaging)