GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/node_messaging.cc Lines: 417 451 92.5 %
Date: 2019-05-05 22:32:45 Branches: 191 278 68.7 %

Line Branch Exec Source
1
#include "node_messaging.h"
2
3
#include "async_wrap-inl.h"
4
#include "debug_utils.h"
5
#include "node_contextify.h"
6
#include "node_buffer.h"
7
#include "node_errors.h"
8
#include "node_process.h"
9
#include "util.h"
10
11
using node::contextify::ContextifyContext;
12
using v8::Array;
13
using v8::ArrayBuffer;
14
using v8::ArrayBufferCreationMode;
15
using v8::Context;
16
using v8::EscapableHandleScope;
17
using v8::Exception;
18
using v8::Function;
19
using v8::FunctionCallbackInfo;
20
using v8::FunctionTemplate;
21
using v8::HandleScope;
22
using v8::Isolate;
23
using v8::Just;
24
using v8::Local;
25
using v8::Maybe;
26
using v8::MaybeLocal;
27
using v8::Nothing;
28
using v8::Object;
29
using v8::ObjectTemplate;
30
using v8::SharedArrayBuffer;
31
using v8::String;
32
using v8::Value;
33
using v8::ValueDeserializer;
34
using v8::ValueSerializer;
35
using v8::WasmModuleObject;
36
37
namespace node {
38
namespace worker {
39
40
27113
Message::Message(MallocedBuffer<char>&& buffer)
41
27113
    : main_message_buf_(std::move(buffer)) {}
42
43
namespace {
44
45
// This is used to tell V8 how to read transferred host objects, like other
46
// `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
47
9422
class DeserializerDelegate : public ValueDeserializer::Delegate {
48
 public:
49
9423
  DeserializerDelegate(
50
      Message* m,
51
      Environment* env,
52
      const std::vector<MessagePort*>& message_ports,
53
      const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
54
      const std::vector<WasmModuleObject::TransferrableModule>& wasm_modules)
55
      : message_ports_(message_ports),
56
        shared_array_buffers_(shared_array_buffers),
57
9423
        wasm_modules_(wasm_modules) {}
58
59
175
  MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
60
    // Currently, only MessagePort hosts objects are supported, so identifying
61
    // by the index in the message's MessagePort array is sufficient.
62
    uint32_t id;
63
175
    if (!deserializer->ReadUint32(&id))
64
      return MaybeLocal<Object>();
65
176
    CHECK_LE(id, message_ports_.size());
66
352
    return message_ports_[id]->object(isolate);
67
  }
68
69
181
  MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
70
      Isolate* isolate, uint32_t clone_id) override {
71
181
    CHECK_LE(clone_id, shared_array_buffers_.size());
72
362
    return shared_array_buffers_[clone_id];
73
  }
74
75
2
  MaybeLocal<WasmModuleObject> GetWasmModuleFromId(
76
      Isolate* isolate, uint32_t transfer_id) override {
77
2
    CHECK_LE(transfer_id, wasm_modules_.size());
78
    return WasmModuleObject::FromTransferrableModule(
79
2
        isolate, wasm_modules_[transfer_id]);
80
  }
81
82
  ValueDeserializer* deserializer = nullptr;
83
84
 private:
85
  const std::vector<MessagePort*>& message_ports_;
86
  const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
87
  const std::vector<WasmModuleObject::TransferrableModule>& wasm_modules_;
88
};
89
90
}  // anonymous namespace
91
92
9423
MaybeLocal<Value> Message::Deserialize(Environment* env,
93
                                       Local<Context> context) {
94
9423
  EscapableHandleScope handle_scope(env->isolate());
95
  Context::Scope context_scope(context);
96
97
  // Create all necessary MessagePort handles.
98
18843
  std::vector<MessagePort*> ports(message_ports_.size());
99
9599
  for (uint32_t i = 0; i < message_ports_.size(); ++i) {
100
176
    ports[i] = MessagePort::New(env,
101
                                context,
102
176
                                std::move(message_ports_[i]));
103
176
    if (ports[i] == nullptr) {
104
      for (MessagePort* port : ports) {
105
        // This will eventually release the MessagePort object itself.
106
        if (port != nullptr)
107
          port->Close();
108
      }
109
      return MaybeLocal<Value>();
110
    }
111
  }
112
9423
  message_ports_.clear();
113
114
18846
  std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
115
  // Attach all transferred SharedArrayBuffers to their new Isolate.
116
9604
  for (uint32_t i = 0; i < shared_array_buffers_.size(); ++i) {
117
    Local<SharedArrayBuffer> sab;
118
543
    if (!shared_array_buffers_[i]->GetSharedArrayBuffer(env, context)
119
543
            .ToLocal(&sab))
120
      return MaybeLocal<Value>();
121
181
    shared_array_buffers.push_back(sab);
122
  }
123
9423
  shared_array_buffers_.clear();
124
125
  DeserializerDelegate delegate(
126
18843
      this, env, ports, shared_array_buffers, wasm_modules_);
127
  ValueDeserializer deserializer(
128
      env->isolate(),
129
      reinterpret_cast<const uint8_t*>(main_message_buf_.data),
130
      main_message_buf_.size,
131
18845
      &delegate);
132
9423
  delegate.deserializer = &deserializer;
133
134
  // Attach all transferred ArrayBuffers to their new Isolate.
135
9425
  for (uint32_t i = 0; i < array_buffer_contents_.size(); ++i) {
136
2
    if (!env->isolate_data()->uses_node_allocator()) {
137
      // We don't use Node's allocator on the receiving side, so we have
138
      // to create the ArrayBuffer from a copy of the memory.
139
      AllocatedBuffer buf =
140
          env->AllocateManaged(array_buffer_contents_[i].size);
141
      memcpy(buf.data(),
142
             array_buffer_contents_[i].data,
143
             array_buffer_contents_[i].size);
144
      deserializer.TransferArrayBuffer(i, buf.ToArrayBuffer());
145
      continue;
146
    }
147
148
2
    env->isolate_data()->node_allocator()->RegisterPointer(
149
2
        array_buffer_contents_[i].data, array_buffer_contents_[i].size);
150
151
    Local<ArrayBuffer> ab =
152
        ArrayBuffer::New(env->isolate(),
153
2
                         array_buffer_contents_[i].release(),
154
2
                         array_buffer_contents_[i].size,
155
4
                         ArrayBufferCreationMode::kInternalized);
156
2
    deserializer.TransferArrayBuffer(i, ab);
157
  }
158
9423
  array_buffer_contents_.clear();
159
160
18846
  if (deserializer.ReadHeader(context).IsNothing())
161
    return MaybeLocal<Value>();
162
  return handle_scope.Escape(
163
18845
      deserializer.ReadValue(context).FromMaybe(Local<Value>()));
164
}
165
166
193
void Message::AddSharedArrayBuffer(
167
    const SharedArrayBufferMetadataReference& reference) {
168
193
  shared_array_buffers_.push_back(reference);
169
193
}
170
171
198
void Message::AddMessagePort(std::unique_ptr<MessagePortData>&& data) {
172
198
  message_ports_.emplace_back(std::move(data));
173
198
}
174
175
2
uint32_t Message::AddWASMModule(WasmModuleObject::TransferrableModule&& mod) {
176
2
  wasm_modules_.emplace_back(std::move(mod));
177
2
  return wasm_modules_.size() - 1;
178
}
179
180
namespace {
181
182
13
void ThrowDataCloneException(Local<Context> context, Local<String> message) {
183
13
  Isolate* isolate = context->GetIsolate();
184
  Local<Value> argv[] = {
185
    message,
186
    FIXED_ONE_BYTE_STRING(isolate, "DataCloneError")
187
39
  };
188
  Local<Value> exception;
189
190
  Local<Object> per_context_bindings;
191
  Local<Value> domexception_ctor_val;
192


65
  if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
193
      !per_context_bindings->Get(context,
194
39
                                FIXED_ONE_BYTE_STRING(isolate, "DOMException"))
195
52
          .ToLocal(&domexception_ctor_val)) {
196
    return;
197
  }
198
199
13
  CHECK(domexception_ctor_val->IsFunction());
200
13
  Local<Function> domexception_ctor = domexception_ctor_val.As<Function>();
201
52
  if (!domexception_ctor->NewInstance(context, arraysize(argv), argv)
202
39
          .ToLocal(&exception)) {
203
    return;
204
  }
205
13
  isolate->ThrowException(exception);
206
}
207
208
// This tells V8 how to serialize objects that it does not understand
209
// (e.g. C++ objects) into the output buffer, in a way that our own
210
// DeserializerDelegate understands how to unpack.
211
9532
class SerializerDelegate : public ValueSerializer::Delegate {
212
 public:
213
9532
  SerializerDelegate(Environment* env, Local<Context> context, Message* m)
214
9532
      : env_(env), context_(context), msg_(m) {}
215
216
2
  void ThrowDataCloneError(Local<String> message) override {
217
2
    ThrowDataCloneException(context_, message);
218
2
  }
219
220
197
  Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
221
394
    if (env_->message_port_constructor_template()->HasInstance(object)) {
222
197
      return WriteMessagePort(Unwrap<MessagePort>(object));
223
    }
224
225
    THROW_ERR_CANNOT_TRANSFER_OBJECT(env_);
226
    return Nothing<bool>();
227
  }
228
229
193
  Maybe<uint32_t> GetSharedArrayBufferId(
230
      Isolate* isolate,
231
      Local<SharedArrayBuffer> shared_array_buffer) override {
232
    uint32_t i;
233
196
    for (i = 0; i < seen_shared_array_buffers_.size(); ++i) {
234
6
      if (seen_shared_array_buffers_[i] == shared_array_buffer)
235
        return Just(i);
236
    }
237
238
    auto reference = SharedArrayBufferMetadata::ForSharedArrayBuffer(
239
        env_,
240
        context_,
241
193
        shared_array_buffer);
242
193
    if (!reference) {
243
      return Nothing<uint32_t>();
244
    }
245
193
    seen_shared_array_buffers_.push_back(shared_array_buffer);
246
193
    msg_->AddSharedArrayBuffer(reference);
247
193
    return Just(i);
248
  }
249
250
2
  Maybe<uint32_t> GetWasmModuleTransferId(
251
      Isolate* isolate, Local<WasmModuleObject> module) override {
252
2
    return Just(msg_->AddWASMModule(module->GetTransferrableModule()));
253
  }
254
255
9519
  void Finish() {
256
    // Only close the MessagePort handles and actually transfer them
257
    // once we know that serialization succeeded.
258
9717
    for (MessagePort* port : ports_) {
259
396
      port->Close();
260
198
      msg_->AddMessagePort(port->Detach());
261
    }
262
9519
  }
263
264
  ValueSerializer* serializer = nullptr;
265
266
 private:
267
197
  Maybe<bool> WriteMessagePort(MessagePort* port) {
268
197
    for (uint32_t i = 0; i < ports_.size(); i++) {
269
197
      if (ports_[i] == port) {
270
197
        serializer->WriteUint32(i);
271
197
        return Just(true);
272
      }
273
    }
274
275
    THROW_ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST(env_);
276
    return Nothing<bool>();
277
  }
278
279
  Environment* env_;
280
  Local<Context> context_;
281
  Message* msg_;
282
  std::vector<Local<SharedArrayBuffer>> seen_shared_array_buffers_;
283
  std::vector<MessagePort*> ports_;
284
285
  friend class worker::Message;
286
};
287
288
}  // anonymous namespace
289
290
9532
Maybe<bool> Message::Serialize(Environment* env,
291
                               Local<Context> context,
292
                               Local<Value> input,
293
                               Local<Value> transfer_list_v,
294
                               Local<Object> source_port) {
295
9532
  HandleScope handle_scope(env->isolate());
296
  Context::Scope context_scope(context);
297
298
  // Verify that we're not silently overwriting an existing message.
299
9532
  CHECK(main_message_buf_.is_empty());
300
301
19064
  SerializerDelegate delegate(env, context, this);
302
19064
  ValueSerializer serializer(env->isolate(), &delegate);
303
9532
  delegate.serializer = &serializer;
304
305
19064
  std::vector<Local<ArrayBuffer>> array_buffers;
306
9532
  if (transfer_list_v->IsArray()) {
307
212
    Local<Array> transfer_list = transfer_list_v.As<Array>();
308
212
    uint32_t length = transfer_list->Length();
309
424
    for (uint32_t i = 0; i < length; ++i) {
310
      Local<Value> entry;
311
446
      if (!transfer_list->Get(context, i).ToLocal(&entry))
312
11
        return Nothing<bool>();
313
      // Currently, we support ArrayBuffers and MessagePorts.
314
223
      if (entry->IsArrayBuffer()) {
315
14
        Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
316
        // If we cannot render the ArrayBuffer unusable in this Isolate and
317
        // take ownership of its memory, copying the buffer will have to do.
318


42
        if (!ab->IsDetachable() || ab->IsExternal() ||
319
14
            !env->isolate_data()->uses_node_allocator()) {
320
12
          continue;
321
        }
322
14
        if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
323
            array_buffers.end()) {
324
          ThrowDataCloneException(
325
              context,
326
              FIXED_ONE_BYTE_STRING(
327
                  env->isolate(),
328
2
                  "Transfer list contains duplicate ArrayBuffer"));
329
2
          return Nothing<bool>();
330
        }
331
        // We simply use the array index in the `array_buffers` list as the
332
        // ID that we write into the serialized buffer.
333
12
        uint32_t id = array_buffers.size();
334
12
        array_buffers.push_back(ab);
335
12
        serializer.TransferArrayBuffer(id, ab);
336
12
        continue;
337
418
      } else if (env->message_port_constructor_template()
338
627
                    ->HasInstance(entry)) {
339
        // Check if the source MessagePort is being transferred.
340

418
        if (!source_port.IsEmpty() && entry == source_port) {
341
          ThrowDataCloneException(
342
              context,
343
              FIXED_ONE_BYTE_STRING(env->isolate(),
344
1
                                    "Transfer list contains source port"));
345
10
          return Nothing<bool>();
346
        }
347
208
        MessagePort* port = Unwrap<MessagePort>(entry.As<Object>());
348

208
        if (port == nullptr || port->IsDetached()) {
349
          ThrowDataCloneException(
350
              context,
351
              FIXED_ONE_BYTE_STRING(
352
                  env->isolate(),
353
7
                  "MessagePort in transfer list is already detached"));
354
7
          return Nothing<bool>();
355
        }
356
201
        if (std::find(delegate.ports_.begin(), delegate.ports_.end(), port) !=
357
            delegate.ports_.end()) {
358
          ThrowDataCloneException(
359
              context,
360
              FIXED_ONE_BYTE_STRING(
361
                  env->isolate(),
362
1
                  "Transfer list contains duplicate MessagePort"));
363
1
          return Nothing<bool>();
364
        }
365
200
        delegate.ports_.push_back(port);
366
200
        continue;
367
      }
368
369
      THROW_ERR_INVALID_TRANSFER_OBJECT(env);
370
      return Nothing<bool>();
371
    }
372
  }
373
374
9521
  serializer.WriteHeader();
375
19042
  if (serializer.WriteValue(context, input).IsNothing()) {
376
2
    return Nothing<bool>();
377
  }
378
379
9522
  for (Local<ArrayBuffer> ab : array_buffers) {
380
    // If serialization succeeded, we want to take ownership of
381
    // (a.k.a. externalize) the underlying memory region and render
382
    // it inaccessible in this Isolate.
383
3
    ArrayBuffer::Contents contents = ab->Externalize();
384
3
    ab->Detach();
385
386
3
    CHECK(env->isolate_data()->uses_node_allocator());
387
3
    env->isolate_data()->node_allocator()->UnregisterPointer(
388
3
        contents.Data(), contents.ByteLength());
389
390
    array_buffer_contents_.emplace_back(MallocedBuffer<char>{
391
3
        static_cast<char*>(contents.Data()), contents.ByteLength()});
392
  }
393
394
9519
  delegate.Finish();
395
396
  // The serializer gave us a buffer allocated using `malloc()`.
397
9519
  std::pair<uint8_t*, size_t> data = serializer.Release();
398
19038
  main_message_buf_ =
399
9519
      MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
400
19051
  return Just(true);
401
}
402
403
4
void Message::MemoryInfo(MemoryTracker* tracker) const {
404
4
  tracker->TrackField("array_buffer_contents", array_buffer_contents_);
405
  tracker->TrackFieldWithSize("shared_array_buffers",
406
4
      shared_array_buffers_.size() * sizeof(shared_array_buffers_[0]));
407
4
  tracker->TrackField("message_ports", message_ports_);
408
4
}
409
410
1161
MessagePortData::MessagePortData(MessagePort* owner) : owner_(owner) { }
411
412
3351
MessagePortData::~MessagePortData() {
413
1117
  CHECK_NULL(owner_);
414
1117
  Disentangle();
415
2234
}
416
417
8
void MessagePortData::MemoryInfo(MemoryTracker* tracker) const {
418
8
  Mutex::ScopedLock lock(mutex_);
419
8
  tracker->TrackField("incoming_messages", incoming_messages_);
420
8
}
421
422
9518
void MessagePortData::AddToIncomingQueue(Message&& message) {
423
  // This function will be called by other threads.
424
9518
  Mutex::ScopedLock lock(mutex_);
425
9518
  incoming_messages_.emplace_back(std::move(message));
426
427
9518
  if (owner_ != nullptr) {
428
8937
    Debug(owner_, "Adding message to incoming queue");
429
8937
    owner_->TriggerAsync();
430
9518
  }
431
9518
}
432
433
8159
bool MessagePortData::IsSiblingClosed() const {
434
8159
  Mutex::ScopedLock lock(*sibling_mutex_);
435
8162
  return sibling_ == nullptr;
436
}
437
438
404
void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
439
404
  CHECK_NULL(a->sibling_);
440
404
  CHECK_NULL(b->sibling_);
441
404
  a->sibling_ = b;
442
404
  b->sibling_ = a;
443
404
  a->sibling_mutex_ = b->sibling_mutex_;
444
404
}
445
446
2253
void MessagePortData::PingOwnerAfterDisentanglement() {
447
2253
  Mutex::ScopedLock lock(mutex_);
448
2253
  if (owner_ != nullptr)
449
399
    owner_->TriggerAsync();
450
2253
}
451
452
1850
void MessagePortData::Disentangle() {
453
  // Grab a copy of the sibling mutex, then replace it so that each sibling
454
  // has its own sibling_mutex_ now.
455
1850
  std::shared_ptr<Mutex> sibling_mutex = sibling_mutex_;
456
3702
  Mutex::ScopedLock sibling_lock(*sibling_mutex);
457
1851
  sibling_mutex_ = std::make_shared<Mutex>();
458
459
1851
  MessagePortData* sibling = sibling_;
460
1851
  if (sibling_ != nullptr) {
461
402
    sibling_->sibling_ = nullptr;
462
402
    sibling_ = nullptr;
463
  }
464
465
  // We close MessagePorts after disentanglement, so we trigger the
466
  // corresponding uv_async_t to let them know that this happened.
467
1851
  PingOwnerAfterDisentanglement();
468
1851
  if (sibling != nullptr) {
469
402
    sibling->PingOwnerAfterDisentanglement();
470
1851
  }
471
1851
}
472
473
2799
MessagePort::~MessagePort() {
474
933
  if (data_)
475
    data_->owner_ = nullptr;
476
1866
}
477
478
975
MessagePort::MessagePort(Environment* env,
479
                         Local<Context> context,
480
                         Local<Object> wrap)
481
  : HandleWrap(env,
482
               wrap,
483
               reinterpret_cast<uv_handle_t*>(&async_),
484
               AsyncWrap::PROVIDER_MESSAGEPORT),
485
975
    data_(new MessagePortData(this)) {
486
17175
  auto onmessage = [](uv_async_t* handle) {
487
    // Called when data has been put into the queue.
488
8100
    MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
489
8100
    channel->OnMessage();
490
17173
  };
491
975
  CHECK_EQ(uv_async_init(env->event_loop(),
492
                         &async_,
493
                         onmessage), 0);
494
975
  async_.data = static_cast<void*>(this);
495
496
  Local<Value> fn;
497
2925
  if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn))
498
975
    return;
499
500
975
  if (fn->IsFunction()) {
501
973
    Local<Function> init = fn.As<Function>();
502
973
    USE(init->Call(context, wrap, 0, nullptr));
503
  }
504
505
975
  Debug(this, "Created message port");
506
}
507
508
208
bool MessagePort::IsDetached() const {
509

208
  return data_ == nullptr || IsHandleClosing();
510
}
511
512
10093
void MessagePort::TriggerAsync() {
513
20186
  if (IsHandleClosing()) return;
514
10087
  CHECK_EQ(uv_async_send(&async_), 0);
515
}
516
517
1156
void MessagePort::Close(v8::Local<v8::Value> close_callback) {
518
2312
  Debug(this, "Closing message port, data set = %d", static_cast<int>(!!data_));
519
520
1156
  if (data_) {
521
    // Wrap this call with accessing the mutex, so that TriggerAsync()
522
    // can check IsHandleClosing() without race conditions.
523
1155
    Mutex::ScopedLock sibling_lock(data_->mutex_);
524
1155
    HandleWrap::Close(close_callback);
525
  } else {
526
1
    HandleWrap::Close(close_callback);
527
  }
528
1156
}
529
530
975
void MessagePort::New(const FunctionCallbackInfo<Value>& args) {
531
975
  Environment* env = Environment::GetCurrent(args);
532
975
  if (!args.IsConstructCall()) {
533
    THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
534
975
    return;
535
  }
536
537
1950
  Local<Context> context = args.This()->CreationContext();
538
  Context::Scope context_scope(context);
539
540
1950
  new MessagePort(env, context, args.This());
541
}
542
543
973
MessagePort* MessagePort::New(
544
    Environment* env,
545
    Local<Context> context,
546
    std::unique_ptr<MessagePortData> data) {
547
  Context::Scope context_scope(context);
548
  Local<Function> ctor;
549
1946
  if (!GetMessagePortConstructor(env, context).ToLocal(&ctor))
550
    return nullptr;
551
552
  // Construct a new instance, then assign the listener instance and possibly
553
  // the MessagePortData to it.
554
  Local<Object> instance;
555
1946
  if (!ctor->NewInstance(context).ToLocal(&instance))
556
    return nullptr;
557
973
  MessagePort* port = Unwrap<MessagePort>(instance);
558
973
  CHECK_NOT_NULL(port);
559
973
  if (data) {
560
351
    port->Detach();
561
351
    port->data_ = std::move(data);
562
563
    // This lock is here to avoid race conditions with the `owner_` read
564
    // in AddToIncomingQueue(). (This would likely be unproblematic without it,
565
    // but it's better to be safe than sorry.)
566
351
    Mutex::ScopedLock lock(port->data_->mutex_);
567
351
    port->data_->owner_ = port;
568
    // If the existing MessagePortData object had pending messages, this is
569
    // the easiest way to run that queue.
570
351
    port->TriggerAsync();
571
  }
572
973
  return port;
573
}
574
575
8190
void MessagePort::OnMessage() {
576
8190
  Debug(this, "Running MessagePort::OnMessage()");
577
8190
  HandleScope handle_scope(env()->isolate());
578
16380
  Local<Context> context = object(env()->isolate())->CreationContext();
579
580
  // data_ can only ever be modified by the owner thread, so no need to lock.
581
  // However, the message port may be transferred while it is processing
582
  // messages, so we need to check that this handle still owns its `data_` field
583
  // on every iteration.
584
25788
  while (data_) {
585
17591
    Message received;
586
    {
587
      // Get the head of the message queue.
588
17594
      Mutex::ScopedLock lock(data_->mutex_);
589
590
      Debug(this, "MessagePort has message, receiving = %d",
591
35192
            static_cast<int>(data_->receiving_messages_));
592
593
17596
      if (!data_->receiving_messages_)
594
62
        break;
595
17534
      if (data_->incoming_messages_.empty())
596
8098
        break;
597
9436
      received = std::move(data_->incoming_messages_.front());
598
9436
      data_->incoming_messages_.pop_front();
599
    }
600
601
9436
    if (!env()->can_call_into_js()) {
602
13
      Debug(this, "MessagePort drains queue because !can_call_into_js()");
603
      // In this case there is nothing to do but to drain the current queue.
604
13
      continue;
605
    }
606
607
    {
608
      // Call the JS .onmessage() callback.
609
9423
      HandleScope handle_scope(env()->isolate());
610
9395
      Context::Scope context_scope(context);
611
612
      Local<Object> event;
613
      Local<Value> payload;
614
18846
      Local<Value> cb_args[1];
615

56524
      if (!received.Deserialize(env(), context).ToLocal(&payload) ||
616
28262
          !env()->message_event_object_template()->NewInstance(context)
617

56520
              .ToLocal(&event) ||
618

65935
          event->Set(context, env()->data_string(), payload).IsNothing() ||
619


65938
          event->Set(context, env()->target_string(), object()).IsNothing() ||
620

47107
          (cb_args[0] = event, false) ||
621
          MakeCallback(env()->onmessage_string(),
622
9420
                       arraysize(cb_args),
623
28262
                       cb_args).IsEmpty()) {
624
        // Re-schedule OnMessage() execution in case of failure.
625
26
        if (data_)
626
26
          TriggerAsync();
627
8214
        return;
628

9395
      }
629
    }
630
9395
  }
631
632

8165
  if (data_ && data_->IsSiblingClosed()) {
633
736
    Close();
634
8162
  }
635
}
636
637
bool MessagePort::IsSiblingClosed() const {
638
  CHECK(data_);
639
  return data_->IsSiblingClosed();
640
}
641
642
933
void MessagePort::OnClose() {
643
933
  Debug(this, "MessagePort::OnClose()");
644
932
  if (data_) {
645
734
    data_->owner_ = nullptr;
646
734
    data_->Disentangle();
647
  }
648
933
  data_.reset();
649
933
}
650
651
550
std::unique_ptr<MessagePortData> MessagePort::Detach() {
652
550
  CHECK(data_);
653
550
  Mutex::ScopedLock lock(data_->mutex_);
654
550
  data_->owner_ = nullptr;
655
550
  return std::move(data_);
656
}
657
658
659
9531
Maybe<bool> MessagePort::PostMessage(Environment* env,
660
                                     Local<Value> message_v,
661
                                     Local<Value> transfer_v) {
662
9531
  Isolate* isolate = env->isolate();
663
9531
  Local<Object> obj = object(isolate);
664
9531
  Local<Context> context = obj->CreationContext();
665
666
9531
  Message msg;
667
668
  // Per spec, we need to both check if transfer list has the source port, and
669
  // serialize the input message, even if the MessagePort is closed or detached.
670
671
  Maybe<bool> serialization_maybe =
672
9531
      msg.Serialize(env, context, message_v, transfer_v, obj);
673
9531
  if (data_ == nullptr) {
674
2
    return serialization_maybe;
675
  }
676
9529
  if (serialization_maybe.IsNothing()) {
677
10
    return Nothing<bool>();
678
  }
679
680
19038
  Mutex::ScopedLock lock(*data_->sibling_mutex_);
681
9519
  bool doomed = false;
682
683
  // Check if the target port is posted to itself.
684
9519
  if (data_->sibling_ != nullptr) {
685
9716
    for (const auto& port_data : msg.message_ports()) {
686
198
      if (data_->sibling_ == port_data.get()) {
687
1
        doomed = true;
688
        ProcessEmitWarning(env, "The target port was posted to itself, and "
689
1
                                "the communication channel was lost");
690
1
        break;
691
      }
692
    }
693
  }
694
695

9519
  if (data_->sibling_ == nullptr || doomed)
696
1
    return Just(true);
697
698
9518
  data_->sibling_->AddToIncomingQueue(std::move(msg));
699
19049
  return Just(true);
700
}
701
702
9532
void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
703
9532
  Environment* env = Environment::GetCurrent(args);
704
9532
  if (args.Length() == 0) {
705
    return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
706
                                       "MessagePort.postMessage");
707
  }
708
709
9532
  MessagePort* port = Unwrap<MessagePort>(args.This());
710
  // Even if the backing MessagePort object has already been deleted, we still
711
  // want to serialize the message to ensure spec-compliant behavior w.r.t.
712
  // transfers.
713
9532
  if (port == nullptr) {
714
1
    Message msg;
715
1
    Local<Object> obj = args.This();
716
1
    Local<Context> context = obj->CreationContext();
717
1
    USE(msg.Serialize(env, context, args[0], args[1], obj));
718
1
    return;
719
  }
720
721
9531
  port->PostMessage(env, args[0], args[1]);
722
}
723
724
1056
void MessagePort::Start() {
725
1056
  Mutex::ScopedLock lock(data_->mutex_);
726
1056
  Debug(this, "Start receiving messages");
727
1056
  data_->receiving_messages_ = true;
728
1056
  if (!data_->incoming_messages_.empty())
729
380
    TriggerAsync();
730
1056
}
731
732
35
void MessagePort::Stop() {
733
35
  Mutex::ScopedLock lock(data_->mutex_);
734
35
  Debug(this, "Stop receiving messages");
735
35
  data_->receiving_messages_ = false;
736
35
}
737
738
1057
void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
739
  MessagePort* port;
740
1058
  ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
741
1057
  if (!port->data_) {
742
1
    return;
743
  }
744
1056
  port->Start();
745
}
746
747
121
void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
748
  MessagePort* port;
749
242
  CHECK(args[0]->IsObject());
750
328
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
751
37
  if (!port->data_) {
752
2
    return;
753
  }
754
35
  port->Stop();
755
}
756
757
366
void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
758
  MessagePort* port;
759
732
  CHECK(args[0]->IsObject());
760
1098
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
761
90
  port->OnMessage();
762
}
763
764
1
void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
765
1
  Environment* env = Environment::GetCurrent(args);
766


6
  if (!args[0]->IsObject() ||
767
3
      !env->message_port_constructor_template()->HasInstance(args[0])) {
768
    return THROW_ERR_INVALID_ARG_TYPE(env,
769
        "First argument needs to be a MessagePort instance");
770
  }
771
2
  MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
772
1
  CHECK_NOT_NULL(port);
773
774
1
  Local<Value> context_arg = args[1];
775
  ContextifyContext* context_wrapper;
776

4
  if (!context_arg->IsObject() ||
777
      (context_wrapper = ContextifyContext::ContextFromContextifiedSandbox(
778
3
          env, context_arg.As<Object>())) == nullptr) {
779
    return THROW_ERR_INVALID_ARG_TYPE(env, "Invalid context argument");
780
  }
781
782
1
  std::unique_ptr<MessagePortData> data;
783
1
  if (!port->IsDetached())
784
1
    data = port->Detach();
785
786
1
  Context::Scope context_scope(context_wrapper->context());
787
  MessagePort* target =
788
1
      MessagePort::New(env, context_wrapper->context(), std::move(data));
789
1
  if (target != nullptr)
790
4
    args.GetReturnValue().Set(target->object());
791
}
792
793
218
void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
794
218
  Entangle(a, b->data_.get());
795
218
}
796
797
404
void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
798
404
  MessagePortData::Entangle(a->data_.get(), b);
799
404
}
800
801
7753
MaybeLocal<Function> GetMessagePortConstructor(
802
    Environment* env, Local<Context> context) {
803
  // Factor generating the MessagePort JS constructor into its own piece
804
  // of code, because it is needed early on in the child environment setup.
805
7753
  Local<FunctionTemplate> templ = env->message_port_constructor_template();
806
7753
  if (!templ.IsEmpty())
807
4363
    return templ->GetFunction(context);
808
809
3390
  Isolate* isolate = env->isolate();
810
811
  {
812
3390
    Local<FunctionTemplate> m = env->NewFunctionTemplate(MessagePort::New);
813
6780
    m->SetClassName(env->message_port_constructor_string());
814
6780
    m->InstanceTemplate()->SetInternalFieldCount(1);
815
6780
    m->Inherit(HandleWrap::GetConstructorTemplate(env));
816
817
3390
    env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage);
818
3390
    env->SetProtoMethod(m, "start", MessagePort::Start);
819
820
3390
    env->set_message_port_constructor_template(m);
821
822
3390
    Local<FunctionTemplate> event_ctor = FunctionTemplate::New(isolate);
823
6780
    event_ctor->SetClassName(FIXED_ONE_BYTE_STRING(isolate, "MessageEvent"));
824
3390
    Local<ObjectTemplate> e = event_ctor->InstanceTemplate();
825
6780
    e->Set(env->data_string(), Null(isolate));
826
6780
    e->Set(env->target_string(), Null(isolate));
827
3390
    env->set_message_event_object_template(e);
828
  }
829
830
3390
  return GetMessagePortConstructor(env, context);
831
}
832
833
namespace {
834
835
218
static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
836
218
  Environment* env = Environment::GetCurrent(args);
837
218
  if (!args.IsConstructCall()) {
838
    THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
839
218
    return;
840
  }
841
842
436
  Local<Context> context = args.This()->CreationContext();
843
  Context::Scope context_scope(context);
844
845
218
  MessagePort* port1 = MessagePort::New(env, context);
846
218
  MessagePort* port2 = MessagePort::New(env, context);
847
218
  MessagePort::Entangle(port1, port2);
848
849
1090
  args.This()->Set(context, env->port1_string(), port1->object())
850
436
      .Check();
851
1090
  args.This()->Set(context, env->port2_string(), port2->object())
852
436
      .Check();
853
}
854
855
3390
static void InitMessaging(Local<Object> target,
856
                          Local<Value> unused,
857
                          Local<Context> context,
858
                          void* priv) {
859
3390
  Environment* env = Environment::GetCurrent(context);
860
861
  {
862
    Local<String> message_channel_string =
863
3390
        FIXED_ONE_BYTE_STRING(env->isolate(), "MessageChannel");
864
3390
    Local<FunctionTemplate> templ = env->NewFunctionTemplate(MessageChannel);
865
3390
    templ->SetClassName(message_channel_string);
866
    target->Set(context,
867
                message_channel_string,
868
10170
                templ->GetFunction(context).ToLocalChecked()).Check();
869
  }
870
871
  target->Set(context,
872
              env->message_port_constructor_string(),
873
13560
              GetMessagePortConstructor(env, context).ToLocalChecked())
874
6780
                  .Check();
875
876
  // These are not methods on the MessagePort prototype, because
877
  // the browser equivalents do not provide them.
878
3390
  env->SetMethod(target, "stopMessagePort", MessagePort::Stop);
879
3390
  env->SetMethod(target, "drainMessagePort", MessagePort::Drain);
880
  env->SetMethod(target, "moveMessagePortToContext",
881
3390
                 MessagePort::MoveToContext);
882
3390
}
883
884
}  // anonymous namespace
885
886
}  // namespace worker
887
}  // namespace node
888
889
4524
NODE_MODULE_CONTEXT_AWARE_INTERNAL(messaging, node::worker::InitMessaging)