GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/node_messaging.cc Lines: 403 439 91.8 %
Date: 2019-03-02 22:23:06 Branches: 171 246 69.5 %

Line Branch Exec Source
1
#include "node_messaging.h"
2
#include "async_wrap-inl.h"
3
#include "async_wrap.h"
4
#include "debug_utils.h"
5
#include "node_buffer.h"
6
#include "node_errors.h"
7
#include "node_process.h"
8
#include "util-inl.h"
9
#include "util.h"
10
11
using v8::Array;
12
using v8::ArrayBuffer;
13
using v8::ArrayBufferCreationMode;
14
using v8::Context;
15
using v8::EscapableHandleScope;
16
using v8::Exception;
17
using v8::Function;
18
using v8::FunctionCallbackInfo;
19
using v8::FunctionTemplate;
20
using v8::HandleScope;
21
using v8::Isolate;
22
using v8::Just;
23
using v8::Local;
24
using v8::Maybe;
25
using v8::MaybeLocal;
26
using v8::Nothing;
27
using v8::Object;
28
using v8::ObjectTemplate;
29
using v8::SharedArrayBuffer;
30
using v8::String;
31
using v8::Value;
32
using v8::ValueDeserializer;
33
using v8::ValueSerializer;
34
using v8::WasmCompiledModule;
35
36
namespace node {
37
namespace worker {
38
39
22874
Message::Message(MallocedBuffer<char>&& buffer)
40
22874
    : main_message_buf_(std::move(buffer)) {}
41
42
namespace {
43
44
// This is used to tell V8 how to read transferred host objects, like other
45
// `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
46
8059
class DeserializerDelegate : public ValueDeserializer::Delegate {
47
 public:
48
8060
  DeserializerDelegate(
49
      Message* m,
50
      Environment* env,
51
      const std::vector<MessagePort*>& message_ports,
52
      const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
53
      const std::vector<WasmCompiledModule::TransferrableModule>& wasm_modules)
54
      : message_ports_(message_ports),
55
        shared_array_buffers_(shared_array_buffers),
56
8060
        wasm_modules_(wasm_modules) {}
57
58
166
  MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
59
    // Currently, only MessagePort hosts objects are supported, so identifying
60
    // by the index in the message's MessagePort array is sufficient.
61
    uint32_t id;
62
166
    if (!deserializer->ReadUint32(&id))
63
      return MaybeLocal<Object>();
64
166
    CHECK_LE(id, message_ports_.size());
65
333
    return message_ports_[id]->object(isolate);
66
  };
67
68
7
  MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
69
      Isolate* isolate, uint32_t clone_id) override {
70
7
    CHECK_LE(clone_id, shared_array_buffers_.size());
71
14
    return shared_array_buffers_[clone_id];
72
  }
73
74
2
  MaybeLocal<WasmCompiledModule> GetWasmModuleFromId(
75
      Isolate* isolate, uint32_t transfer_id) override {
76
2
    CHECK_LE(transfer_id, wasm_modules_.size());
77
    return WasmCompiledModule::FromTransferrableModule(
78
2
        isolate, wasm_modules_[transfer_id]);
79
  }
80
81
  ValueDeserializer* deserializer = nullptr;
82
83
 private:
84
  const std::vector<MessagePort*>& message_ports_;
85
  const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
86
  const std::vector<WasmCompiledModule::TransferrableModule>& wasm_modules_;
87
};
88
89
}  // anonymous namespace
90
91
8061
MaybeLocal<Value> Message::Deserialize(Environment* env,
92
                                       Local<Context> context) {
93
8061
  EscapableHandleScope handle_scope(env->isolate());
94
  Context::Scope context_scope(context);
95
96
  // Create all necessary MessagePort handles.
97
16114
  std::vector<MessagePort*> ports(message_ports_.size());
98
8227
  for (uint32_t i = 0; i < message_ports_.size(); ++i) {
99
167
    ports[i] = MessagePort::New(env,
100
                                context,
101
167
                                std::move(message_ports_[i]));
102
167
    if (ports[i] == nullptr) {
103
      for (MessagePort* port : ports) {
104
        // This will eventually release the MessagePort object itself.
105
        if (port != nullptr)
106
          port->Close();
107
      }
108
      return MaybeLocal<Value>();
109
    }
110
  }
111
8060
  message_ports_.clear();
112
113
16122
  std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
114
  // Attach all transferred SharedArrayBuffers to their new Isolate.
115
8067
  for (uint32_t i = 0; i < shared_array_buffers_.size(); ++i) {
116
    Local<SharedArrayBuffer> sab;
117
21
    if (!shared_array_buffers_[i]->GetSharedArrayBuffer(env, context)
118
21
            .ToLocal(&sab))
119
      return MaybeLocal<Value>();
120
7
    shared_array_buffers.push_back(sab);
121
  }
122
8060
  shared_array_buffers_.clear();
123
124
  DeserializerDelegate delegate(
125
16121
      this, env, ports, shared_array_buffers, wasm_modules_);
126
  ValueDeserializer deserializer(
127
      env->isolate(),
128
      reinterpret_cast<const uint8_t*>(main_message_buf_.data),
129
      main_message_buf_.size,
130
16121
      &delegate);
131
8061
  delegate.deserializer = &deserializer;
132
133
  // Attach all transferred ArrayBuffers to their new Isolate.
134
8063
  for (uint32_t i = 0; i < array_buffer_contents_.size(); ++i) {
135
2
    if (!env->isolate_data()->uses_node_allocator()) {
136
      // We don't use Node's allocator on the receiving side, so we have
137
      // to create the ArrayBuffer from a copy of the memory.
138
      AllocatedBuffer buf =
139
          env->AllocateManaged(array_buffer_contents_[i].size);
140
      memcpy(buf.data(),
141
             array_buffer_contents_[i].data,
142
             array_buffer_contents_[i].size);
143
      deserializer.TransferArrayBuffer(i, buf.ToArrayBuffer());
144
      continue;
145
    }
146
147
2
    env->isolate_data()->node_allocator()->RegisterPointer(
148
2
        array_buffer_contents_[i].data, array_buffer_contents_[i].size);
149
150
    Local<ArrayBuffer> ab =
151
        ArrayBuffer::New(env->isolate(),
152
2
                         array_buffer_contents_[i].release(),
153
2
                         array_buffer_contents_[i].size,
154
4
                         ArrayBufferCreationMode::kInternalized);
155
2
    deserializer.TransferArrayBuffer(i, ab);
156
  }
157
8060
  array_buffer_contents_.clear();
158
159
16121
  if (deserializer.ReadHeader(context).IsNothing())
160
    return MaybeLocal<Value>();
161
  return handle_scope.Escape(
162
16121
      deserializer.ReadValue(context).FromMaybe(Local<Value>()));
163
}
164
165
7
void Message::AddSharedArrayBuffer(
166
    const SharedArrayBufferMetadataReference& reference) {
167
7
  shared_array_buffers_.push_back(reference);
168
7
}
169
170
189
void Message::AddMessagePort(std::unique_ptr<MessagePortData>&& data) {
171
189
  message_ports_.emplace_back(std::move(data));
172
189
}
173
174
2
uint32_t Message::AddWASMModule(WasmCompiledModule::TransferrableModule&& mod) {
175
2
  wasm_modules_.emplace_back(std::move(mod));
176
2
  return wasm_modules_.size() - 1;
177
}
178
179
namespace {
180
181
11
void ThrowDataCloneException(Environment* env, Local<String> message) {
182
  Local<Value> argv[] = {
183
    message,
184
    FIXED_ONE_BYTE_STRING(env->isolate(), "DataCloneError")
185
33
  };
186
  Local<Value> exception;
187
11
  Local<Function> domexception_ctor = env->domexception_function();
188
11
  CHECK(!domexception_ctor.IsEmpty());
189
44
  if (!domexception_ctor->NewInstance(env->context(), arraysize(argv), argv)
190
33
          .ToLocal(&exception)) {
191
11
    return;
192
  }
193
11
  env->isolate()->ThrowException(exception);
194
}
195
196
// This tells V8 how to serialize objects that it does not understand
197
// (e.g. C++ objects) into the output buffer, in a way that our own
198
// DeserializerDelegate understands how to unpack.
199
8175
class SerializerDelegate : public ValueSerializer::Delegate {
200
 public:
201
8175
  SerializerDelegate(Environment* env, Local<Context> context, Message* m)
202
8175
      : env_(env), context_(context), msg_(m) {}
203
204
1
  void ThrowDataCloneError(Local<String> message) override {
205
1
    ThrowDataCloneException(env_, message);
206
1
  }
207
208
188
  Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
209
376
    if (env_->message_port_constructor_template()->HasInstance(object)) {
210
188
      return WriteMessagePort(Unwrap<MessagePort>(object));
211
    }
212
213
    THROW_ERR_CANNOT_TRANSFER_OBJECT(env_);
214
    return Nothing<bool>();
215
  }
216
217
7
  Maybe<uint32_t> GetSharedArrayBufferId(
218
      Isolate* isolate,
219
      Local<SharedArrayBuffer> shared_array_buffer) override {
220
    uint32_t i;
221
7
    for (i = 0; i < seen_shared_array_buffers_.size(); ++i) {
222
      if (seen_shared_array_buffers_[i] == shared_array_buffer)
223
        return Just(i);
224
    }
225
226
    auto reference = SharedArrayBufferMetadata::ForSharedArrayBuffer(
227
        env_,
228
        context_,
229
7
        shared_array_buffer);
230
7
    if (!reference) {
231
      return Nothing<uint32_t>();
232
    }
233
7
    seen_shared_array_buffers_.push_back(shared_array_buffer);
234
7
    msg_->AddSharedArrayBuffer(reference);
235
7
    return Just(i);
236
  }
237
238
2
  Maybe<uint32_t> GetWasmModuleTransferId(
239
      Isolate* isolate, Local<WasmCompiledModule> module) override {
240
2
    return Just(msg_->AddWASMModule(module->GetTransferrableModule()));
241
  }
242
243
8164
  void Finish() {
244
    // Only close the MessagePort handles and actually transfer them
245
    // once we know that serialization succeeded.
246
8353
    for (MessagePort* port : ports_) {
247
378
      port->Close();
248
189
      msg_->AddMessagePort(port->Detach());
249
    }
250
8164
  }
251
252
  ValueSerializer* serializer = nullptr;
253
254
 private:
255
188
  Maybe<bool> WriteMessagePort(MessagePort* port) {
256
188
    for (uint32_t i = 0; i < ports_.size(); i++) {
257
188
      if (ports_[i] == port) {
258
188
        serializer->WriteUint32(i);
259
188
        return Just(true);
260
      }
261
    }
262
263
    THROW_ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST(env_);
264
    return Nothing<bool>();
265
  }
266
267
  Environment* env_;
268
  Local<Context> context_;
269
  Message* msg_;
270
  std::vector<Local<SharedArrayBuffer>> seen_shared_array_buffers_;
271
  std::vector<MessagePort*> ports_;
272
273
  friend class worker::Message;
274
};
275
276
}  // anonymous namespace
277
278
8175
Maybe<bool> Message::Serialize(Environment* env,
279
                               Local<Context> context,
280
                               Local<Value> input,
281
                               Local<Value> transfer_list_v,
282
                               Local<Object> source_port) {
283
8175
  HandleScope handle_scope(env->isolate());
284
  Context::Scope context_scope(context);
285
286
  // Verify that we're not silently overwriting an existing message.
287
8175
  CHECK(main_message_buf_.is_empty());
288
289
16350
  SerializerDelegate delegate(env, context, this);
290
16350
  ValueSerializer serializer(env->isolate(), &delegate);
291
8175
  delegate.serializer = &serializer;
292
293
16350
  std::vector<Local<ArrayBuffer>> array_buffers;
294
8175
  if (transfer_list_v->IsArray()) {
295
202
    Local<Array> transfer_list = transfer_list_v.As<Array>();
296
202
    uint32_t length = transfer_list->Length();
297
404
    for (uint32_t i = 0; i < length; ++i) {
298
      Local<Value> entry;
299
424
      if (!transfer_list->Get(context, i).ToLocal(&entry))
300
10
        return Nothing<bool>();
301
      // Currently, we support ArrayBuffers and MessagePorts.
302
212
      if (entry->IsArrayBuffer()) {
303
12
        Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
304
        // If we cannot render the ArrayBuffer unusable in this Isolate and
305
        // take ownership of its memory, copying the buffer will have to do.
306


36
        if (!ab->IsNeuterable() || ab->IsExternal() ||
307
12
            !env->isolate_data()->uses_node_allocator()) {
308
11
          continue;
309
        }
310
12
        if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
311
            array_buffers.end()) {
312
          ThrowDataCloneException(
313
              env,
314
              FIXED_ONE_BYTE_STRING(
315
                  env->isolate(),
316
1
                  "Transfer list contains duplicate ArrayBuffer"));
317
1
          return Nothing<bool>();
318
        }
319
        // We simply use the array index in the `array_buffers` list as the
320
        // ID that we write into the serialized buffer.
321
11
        uint32_t id = array_buffers.size();
322
11
        array_buffers.push_back(ab);
323
11
        serializer.TransferArrayBuffer(id, ab);
324
11
        continue;
325
400
      } else if (env->message_port_constructor_template()
326
600
                    ->HasInstance(entry)) {
327
        // Check if the source MessagePort is being transferred.
328

400
        if (!source_port.IsEmpty() && entry == source_port) {
329
          ThrowDataCloneException(
330
              env,
331
              FIXED_ONE_BYTE_STRING(env->isolate(),
332
1
                                    "Transfer list contains source port"));
333
10
          return Nothing<bool>();
334
        }
335
199
        MessagePort* port = Unwrap<MessagePort>(entry.As<Object>());
336

199
        if (port == nullptr || port->IsDetached()) {
337
          ThrowDataCloneException(
338
              env,
339
              FIXED_ONE_BYTE_STRING(
340
                  env->isolate(),
341
7
                  "MessagePort in transfer list is already detached"));
342
7
          return Nothing<bool>();
343
        }
344
192
        if (std::find(delegate.ports_.begin(), delegate.ports_.end(), port) !=
345
            delegate.ports_.end()) {
346
          ThrowDataCloneException(
347
              env,
348
              FIXED_ONE_BYTE_STRING(
349
                  env->isolate(),
350
1
                  "Transfer list contains duplicate MessagePort"));
351
1
          return Nothing<bool>();
352
        }
353
191
        delegate.ports_.push_back(port);
354
191
        continue;
355
      }
356
357
      THROW_ERR_INVALID_TRANSFER_OBJECT(env);
358
      return Nothing<bool>();
359
    }
360
  }
361
362
8165
  serializer.WriteHeader();
363
16330
  if (serializer.WriteValue(context, input).IsNothing()) {
364
1
    return Nothing<bool>();
365
  }
366
367
8167
  for (Local<ArrayBuffer> ab : array_buffers) {
368
    // If serialization succeeded, we want to take ownership of
369
    // (a.k.a. externalize) the underlying memory region and render
370
    // it inaccessible in this Isolate.
371
3
    ArrayBuffer::Contents contents = ab->Externalize();
372
3
    ab->Neuter();
373
374
3
    CHECK(env->isolate_data()->uses_node_allocator());
375
3
    env->isolate_data()->node_allocator()->UnregisterPointer(
376
3
        contents.Data(), contents.ByteLength());
377
378
    array_buffer_contents_.push_back(
379
3
        MallocedBuffer<char> { static_cast<char*>(contents.Data()),
380
3
                               contents.ByteLength() });
381
  }
382
383
8164
  delegate.Finish();
384
385
  // The serializer gave us a buffer allocated using `malloc()`.
386
8164
  std::pair<uint8_t*, size_t> data = serializer.Release();
387
16328
  main_message_buf_ =
388
8164
      MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
389
16339
  return Just(true);
390
}
391
392
4
void Message::MemoryInfo(MemoryTracker* tracker) const {
393
4
  tracker->TrackField("array_buffer_contents", array_buffer_contents_);
394
  tracker->TrackFieldWithSize("shared_array_buffers",
395
4
      shared_array_buffers_.size() * sizeof(shared_array_buffers_[0]));
396
4
  tracker->TrackField("message_ports", message_ports_);
397
4
}
398
399
1105
MessagePortData::MessagePortData(MessagePort* owner) : owner_(owner) { }
400
401
3187
MessagePortData::~MessagePortData() {
402
1062
  CHECK_NULL(owner_);
403
1062
  Disentangle();
404
2124
}
405
406
8
void MessagePortData::MemoryInfo(MemoryTracker* tracker) const {
407
8
  Mutex::ScopedLock lock(mutex_);
408
8
  tracker->TrackField("incoming_messages", incoming_messages_);
409
8
}
410
411
8163
void MessagePortData::AddToIncomingQueue(Message&& message) {
412
  // This function will be called by other threads.
413
8163
  Mutex::ScopedLock lock(mutex_);
414
8163
  incoming_messages_.emplace_back(std::move(message));
415
416
8163
  if (owner_ != nullptr) {
417
7626
    Debug(owner_, "Adding message to incoming queue");
418
7626
    owner_->TriggerAsync();
419
8163
  }
420
8163
}
421
422
6632
bool MessagePortData::IsSiblingClosed() const {
423
6632
  Mutex::ScopedLock lock(*sibling_mutex_);
424
6633
  return sibling_ == nullptr;
425
}
426
427
383
void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
428
383
  CHECK_NULL(a->sibling_);
429
383
  CHECK_NULL(b->sibling_);
430
383
  a->sibling_ = b;
431
383
  b->sibling_ = a;
432
383
  a->sibling_mutex_ = b->sibling_mutex_;
433
383
}
434
435
2141
void MessagePortData::PingOwnerAfterDisentanglement() {
436
2141
  Mutex::ScopedLock lock(mutex_);
437
2142
  if (owner_ != nullptr)
438
370
    owner_->TriggerAsync();
439
2142
}
440
441
1759
void MessagePortData::Disentangle() {
442
  // Grab a copy of the sibling mutex, then replace it so that each sibling
443
  // has its own sibling_mutex_ now.
444
1759
  std::shared_ptr<Mutex> sibling_mutex = sibling_mutex_;
445
3519
  Mutex::ScopedLock sibling_lock(*sibling_mutex);
446
1760
  sibling_mutex_ = std::make_shared<Mutex>();
447
448
1760
  MessagePortData* sibling = sibling_;
449
1760
  if (sibling_ != nullptr) {
450
383
    sibling_->sibling_ = nullptr;
451
383
    sibling_ = nullptr;
452
  }
453
454
  // We close MessagePorts after disentanglement, so we trigger the
455
  // corresponding uv_async_t to let them know that this happened.
456
1760
  PingOwnerAfterDisentanglement();
457
1759
  if (sibling != nullptr) {
458
383
    sibling->PingOwnerAfterDisentanglement();
459
1760
  }
460
1759
}
461
462
2658
MessagePort::~MessagePort() {
463
886
  if (data_)
464
    data_->owner_ = nullptr;
465
1772
}
466
467
928
MessagePort::MessagePort(Environment* env,
468
                         Local<Context> context,
469
                         Local<Object> wrap)
470
  : HandleWrap(env,
471
               wrap,
472
               reinterpret_cast<uv_handle_t*>(&async_),
473
               AsyncWrap::PROVIDER_MESSAGEPORT),
474
928
    data_(new MessagePortData(this)) {
475
14090
  auto onmessage = [](uv_async_t* handle) {
476
    // Called when data has been put into the queue.
477
6581
    MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
478
6581
    channel->OnMessage();
479
14088
  };
480
928
  CHECK_EQ(uv_async_init(env->event_loop(),
481
                         &async_,
482
                         onmessage), 0);
483
928
  async_.data = static_cast<void*>(this);
484
485
  Local<Value> fn;
486
2784
  if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn))
487
928
    return;
488
489
928
  if (fn->IsFunction()) {
490
756
    Local<Function> init = fn.As<Function>();
491
756
    USE(init->Call(context, wrap, 0, nullptr));
492
  }
493
494
928
  Debug(this, "Created message port");
495
}
496
497
198
bool MessagePort::IsDetached() const {
498

198
  return data_ == nullptr || IsHandleClosing();
499
}
500
501
8562
void MessagePort::TriggerAsync() {
502
17125
  if (IsHandleClosing()) return;
503
8557
  CHECK_EQ(uv_async_send(&async_), 0);
504
}
505
506
1108
void MessagePort::Close(v8::Local<v8::Value> close_callback) {
507
2216
  Debug(this, "Closing message port, data set = %d", static_cast<int>(!!data_));
508
509
1108
  if (data_) {
510
    // Wrap this call with accessing the mutex, so that TriggerAsync()
511
    // can check IsHandleClosing() without race conditions.
512
1108
    Mutex::ScopedLock sibling_lock(data_->mutex_);
513
1108
    HandleWrap::Close(close_callback);
514
  } else {
515
    HandleWrap::Close(close_callback);
516
  }
517
1108
}
518
519
928
void MessagePort::New(const FunctionCallbackInfo<Value>& args) {
520
928
  Environment* env = Environment::GetCurrent(args);
521
928
  if (!args.IsConstructCall()) {
522
    THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
523
928
    return;
524
  }
525
526
1856
  Local<Context> context = args.This()->CreationContext();
527
  Context::Scope context_scope(context);
528
529
1856
  new MessagePort(env, context, args.This());
530
}
531
532
928
MessagePort* MessagePort::New(
533
    Environment* env,
534
    Local<Context> context,
535
    std::unique_ptr<MessagePortData> data) {
536
  Context::Scope context_scope(context);
537
  Local<Function> ctor;
538
1856
  if (!GetMessagePortConstructor(env, context).ToLocal(&ctor))
539
    return nullptr;
540
541
  // Construct a new instance, then assign the listener instance and possibly
542
  // the MessagePortData to it.
543
  Local<Object> instance;
544
1856
  if (!ctor->NewInstance(context).ToLocal(&instance))
545
    return nullptr;
546
928
  MessagePort* port = Unwrap<MessagePort>(instance);
547
927
  CHECK_NOT_NULL(port);
548
927
  if (data) {
549
339
    port->Detach();
550
338
    port->data_ = std::move(data);
551
552
    // This lock is here to avoid race conditions with the `owner_` read
553
    // in AddToIncomingQueue(). (This would likely be unproblematic without it,
554
    // but it's better to be safe than sorry.)
555
339
    Mutex::ScopedLock lock(port->data_->mutex_);
556
339
    port->data_->owner_ = port;
557
    // If the existing MessagePortData object had pending messages, this is
558
    // the easiest way to run that queue.
559
338
    port->TriggerAsync();
560
  }
561
928
  return port;
562
}
563
564
6669
void MessagePort::OnMessage() {
565
6669
  Debug(this, "Running MessagePort::OnMessage()");
566
6669
  HandleScope handle_scope(env()->isolate());
567
13338
  Local<Context> context = object(env()->isolate())->CreationContext();
568
569
  // data_ can only ever be modified by the owner thread, so no need to lock.
570
  // However, the message port may be transferred while it is processing
571
  // messages, so we need to check that this handle still owns its `data_` field
572
  // on every iteration.
573
21371
  while (data_) {
574
14699
    Message received;
575
    {
576
      // Get the head of the message queue.
577
14700
      Mutex::ScopedLock lock(data_->mutex_);
578
579
      Debug(this, "MessagePort has message, receiving = %d",
580
29404
            static_cast<int>(data_->receiving_messages_));
581
582
14702
      if (!data_->receiving_messages_)
583
59
        break;
584
14643
      if (data_->incoming_messages_.empty())
585
6574
        break;
586
8069
      received = std::move(data_->incoming_messages_.front());
587
8069
      data_->incoming_messages_.pop_front();
588
    }
589
590
8068
    if (!env()->can_call_into_js()) {
591
8
      Debug(this, "MessagePort drains queue because !can_call_into_js()");
592
      // In this case there is nothing to do but to drain the current queue.
593
8
      continue;
594
    }
595
596
    {
597
      // Call the JS .onmessage() callback.
598
8061
      HandleScope handle_scope(env()->isolate());
599
8025
      Context::Scope context_scope(context);
600
601
      Local<Object> event;
602
      Local<Value> payload;
603
16122
      Local<Value> cb_args[1];
604

48354
      if (!received.Deserialize(env(), context).ToLocal(&payload) ||
605
24171
          !env()->message_event_object_template()->NewInstance(context)
606

48349
              .ToLocal(&event) ||
607

56413
          event->Set(context, env()->data_string(), payload).IsNothing() ||
608


56412
          event->Set(context, env()->target_string(), object()).IsNothing() ||
609

40289
          (cb_args[0] = event, false) ||
610
          MakeCallback(env()->onmessage_string(),
611
8061
                       arraysize(cb_args),
612
24179
                       cb_args).IsEmpty()) {
613
        // Re-schedule OnMessage() execution in case of failure.
614
34
        if (data_)
615
34
          TriggerAsync();
616
6701
        return;
617

8025
      }
618
    }
619
8025
  }
620
621

6636
  if (data_ && data_->IsSiblingClosed()) {
622
720
    Close();
623
6633
  }
624
}
625
626
bool MessagePort::IsSiblingClosed() const {
627
  CHECK(data_);
628
  return data_->IsSiblingClosed();
629
}
630
631
885
void MessagePort::OnClose() {
632
885
  Debug(this, "MessagePort::OnClose()");
633
885
  if (data_) {
634
697
    data_->owner_ = nullptr;
635
697
    data_->Disentangle();
636
  }
637
886
  data_.reset();
638
886
}
639
640
527
std::unique_ptr<MessagePortData> MessagePort::Detach() {
641
527
  CHECK(data_);
642
528
  Mutex::ScopedLock lock(data_->mutex_);
643
528
  data_->owner_ = nullptr;
644
528
  return std::move(data_);
645
}
646
647
648
8174
Maybe<bool> MessagePort::PostMessage(Environment* env,
649
                                     Local<Value> message_v,
650
                                     Local<Value> transfer_v) {
651
8174
  Isolate* isolate = env->isolate();
652
8174
  Local<Object> obj = object(isolate);
653
8174
  Local<Context> context = obj->CreationContext();
654
655
8174
  Message msg;
656
657
  // Per spec, we need to both check if transfer list has the source port, and
658
  // serialize the input message, even if the MessagePort is closed or detached.
659
660
  Maybe<bool> serialization_maybe =
661
8174
      msg.Serialize(env, context, message_v, transfer_v, obj);
662
8174
  if (data_ == nullptr) {
663
2
    return serialization_maybe;
664
  }
665
8172
  if (serialization_maybe.IsNothing()) {
666
8
    return Nothing<bool>();
667
  }
668
669
16328
  Mutex::ScopedLock lock(*data_->sibling_mutex_);
670
8164
  bool doomed = false;
671
672
  // Check if the target port is posted to itself.
673
8164
  if (data_->sibling_ != nullptr) {
674
8352
    for (const auto& port_data : msg.message_ports()) {
675
189
      if (data_->sibling_ == port_data.get()) {
676
1
        doomed = true;
677
        ProcessEmitWarning(env, "The target port was posted to itself, and "
678
1
                                "the communication channel was lost");
679
1
        break;
680
      }
681
    }
682
  }
683
684

8164
  if (data_->sibling_ == nullptr || doomed)
685
1
    return Just(true);
686
687
8163
  data_->sibling_->AddToIncomingQueue(std::move(msg));
688
16337
  return Just(true);
689
}
690
691
8175
void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
692
8175
  Environment* env = Environment::GetCurrent(args);
693
8175
  if (args.Length() == 0) {
694
    return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
695
                                       "MessagePort.postMessage");
696
  }
697
698
8175
  MessagePort* port = Unwrap<MessagePort>(args.This());
699
  // Even if the backing MessagePort object has already been deleted, we still
700
  // want to serialize the message to ensure spec-compliant behavior w.r.t.
701
  // transfers.
702
8175
  if (port == nullptr) {
703
1
    Message msg;
704
1
    Local<Object> obj = args.This();
705
1
    Local<Context> context = obj->CreationContext();
706
1
    USE(msg.Serialize(env, context, args[0], args[1], obj));
707
1
    return;
708
  }
709
710
8174
  port->PostMessage(env, args[0], args[1]);
711
}
712
713
831
void MessagePort::Start() {
714
831
  Mutex::ScopedLock lock(data_->mutex_);
715
831
  Debug(this, "Start receiving messages");
716
831
  data_->receiving_messages_ = true;
717
831
  if (!data_->incoming_messages_.empty())
718
194
    TriggerAsync();
719
831
}
720
721
30
void MessagePort::Stop() {
722
30
  Mutex::ScopedLock lock(data_->mutex_);
723
30
  Debug(this, "Stop receiving messages");
724
30
  data_->receiving_messages_ = false;
725
30
}
726
727
831
void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
728
831
  Environment* env = Environment::GetCurrent(args);
729
  MessagePort* port;
730
831
  ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
731
831
  if (!port->data_) {
732
    THROW_ERR_CLOSED_MESSAGE_PORT(env);
733
    return;
734
  }
735
831
  port->Start();
736
}
737
738
111
void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
739
111
  Environment* env = Environment::GetCurrent(args);
740
  MessagePort* port;
741
222
  CHECK(args[0]->IsObject());
742
303
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
743
30
  if (!port->data_) {
744
    THROW_ERR_CLOSED_MESSAGE_PORT(env);
745
    return;
746
  }
747
30
  port->Stop();
748
}
749
750
348
void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
751
  MessagePort* port;
752
696
  CHECK(args[0]->IsObject());
753
1044
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
754
88
  port->OnMessage();
755
}
756
757
206
void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
758
206
  Entangle(a, b->data_.get());
759
206
}
760
761
383
void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
762
383
  MessagePortData::Entangle(a->data_.get(), b);
763
383
}
764
765
9751
MaybeLocal<Function> GetMessagePortConstructor(
766
    Environment* env, Local<Context> context) {
767
  // Factor generating the MessagePort JS constructor into its own piece
768
  // of code, because it is needed early on in the child environment setup.
769
9751
  Local<FunctionTemplate> templ = env->message_port_constructor_template();
770
9751
  if (!templ.IsEmpty())
771
5336
    return templ->GetFunction(context);
772
773
4415
  Isolate* isolate = env->isolate();
774
775
  {
776
4415
    Local<FunctionTemplate> m = env->NewFunctionTemplate(MessagePort::New);
777
8830
    m->SetClassName(env->message_port_constructor_string());
778
8830
    m->InstanceTemplate()->SetInternalFieldCount(1);
779
8830
    m->Inherit(HandleWrap::GetConstructorTemplate(env));
780
781
4415
    env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage);
782
4415
    env->SetProtoMethod(m, "start", MessagePort::Start);
783
784
4415
    env->set_message_port_constructor_template(m);
785
786
4415
    Local<FunctionTemplate> event_ctor = FunctionTemplate::New(isolate);
787
8830
    event_ctor->SetClassName(FIXED_ONE_BYTE_STRING(isolate, "MessageEvent"));
788
4415
    Local<ObjectTemplate> e = event_ctor->InstanceTemplate();
789
8830
    e->Set(env->data_string(), Null(isolate));
790
8830
    e->Set(env->target_string(), Null(isolate));
791
4415
    env->set_message_event_object_template(e);
792
  }
793
794
4415
  return GetMessagePortConstructor(env, context);
795
}
796
797
namespace {
798
799
206
static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
800
206
  Environment* env = Environment::GetCurrent(args);
801
206
  if (!args.IsConstructCall()) {
802
    THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
803
206
    return;
804
  }
805
806
412
  Local<Context> context = args.This()->CreationContext();
807
  Context::Scope context_scope(context);
808
809
206
  MessagePort* port1 = MessagePort::New(env, context);
810
206
  MessagePort* port2 = MessagePort::New(env, context);
811
206
  MessagePort::Entangle(port1, port2);
812
813
1236
  args.This()->Set(env->context(), env->port1_string(), port1->object())
814
412
      .FromJust();
815
1236
  args.This()->Set(env->context(), env->port2_string(), port2->object())
816
412
      .FromJust();
817
}
818
819
4408
static void RegisterDOMException(const FunctionCallbackInfo<Value>& args) {
820
4408
  Environment* env = Environment::GetCurrent(args);
821
4408
  CHECK_EQ(args.Length(), 1);
822
8816
  CHECK(args[0]->IsFunction());
823
8816
  env->set_domexception_function(args[0].As<Function>());
824
4408
}
825
826
4408
static void InitMessaging(Local<Object> target,
827
                          Local<Value> unused,
828
                          Local<Context> context,
829
                          void* priv) {
830
4408
  Environment* env = Environment::GetCurrent(context);
831
832
  {
833
    Local<String> message_channel_string =
834
4408
        FIXED_ONE_BYTE_STRING(env->isolate(), "MessageChannel");
835
4408
    Local<FunctionTemplate> templ = env->NewFunctionTemplate(MessageChannel);
836
4408
    templ->SetClassName(message_channel_string);
837
    target->Set(env->context(),
838
                message_channel_string,
839
17632
                templ->GetFunction(context).ToLocalChecked()).FromJust();
840
  }
841
842
  target->Set(context,
843
              env->message_port_constructor_string(),
844
17632
              GetMessagePortConstructor(env, context).ToLocalChecked())
845
8816
                  .FromJust();
846
847
4408
  env->SetMethod(target, "registerDOMException", RegisterDOMException);
848
849
  // These are not methods on the MessagePort prototype, because
850
  // the browser equivalents do not provide them.
851
4408
  env->SetMethod(target, "stopMessagePort", MessagePort::Stop);
852
4408
  env->SetMethod(target, "drainMessagePort", MessagePort::Drain);
853
4408
}
854
855
}  // anonymous namespace
856
857
}  // namespace worker
858
}  // namespace node
859
860
4292
NODE_MODULE_CONTEXT_AWARE_INTERNAL(messaging, node::worker::InitMessaging)