GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/node_messaging.cc Lines: 395 429 92.1 %
Date: 2019-02-13 22:28:58 Branches: 151 214 70.6 %

Line Branch Exec Source
1
#include "node_messaging.h"
2
#include "async_wrap-inl.h"
3
#include "async_wrap.h"
4
#include "debug_utils.h"
5
#include "node_buffer.h"
6
#include "node_errors.h"
7
#include "node_process.h"
8
#include "util-inl.h"
9
#include "util.h"
10
11
using v8::Array;
12
using v8::ArrayBuffer;
13
using v8::ArrayBufferCreationMode;
14
using v8::Context;
15
using v8::EscapableHandleScope;
16
using v8::Exception;
17
using v8::Function;
18
using v8::FunctionCallbackInfo;
19
using v8::FunctionTemplate;
20
using v8::HandleScope;
21
using v8::Isolate;
22
using v8::Just;
23
using v8::Local;
24
using v8::Maybe;
25
using v8::MaybeLocal;
26
using v8::Nothing;
27
using v8::Object;
28
using v8::SharedArrayBuffer;
29
using v8::String;
30
using v8::Value;
31
using v8::ValueDeserializer;
32
using v8::ValueSerializer;
33
using v8::WasmCompiledModule;
34
35
namespace node {
36
namespace worker {
37
38
21893
Message::Message(MallocedBuffer<char>&& buffer)
39
21893
    : main_message_buf_(std::move(buffer)) {}
40
41
namespace {
42
43
// This is used to tell V8 how to read transferred host objects, like other
44
// `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
45
7780
class DeserializerDelegate : public ValueDeserializer::Delegate {
46
 public:
47
7780
  DeserializerDelegate(
48
      Message* m,
49
      Environment* env,
50
      const std::vector<MessagePort*>& message_ports,
51
      const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
52
      const std::vector<WasmCompiledModule::TransferrableModule>& wasm_modules)
53
      : message_ports_(message_ports),
54
        shared_array_buffers_(shared_array_buffers),
55
7780
        wasm_modules_(wasm_modules) {}
56
57
143
  MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
58
    // Currently, only MessagePort hosts objects are supported, so identifying
59
    // by the index in the message's MessagePort array is sufficient.
60
    uint32_t id;
61
143
    if (!deserializer->ReadUint32(&id))
62
      return MaybeLocal<Object>();
63
143
    CHECK_LE(id, message_ports_.size());
64
286
    return message_ports_[id]->object(isolate);
65
  };
66
67
5
  MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
68
      Isolate* isolate, uint32_t clone_id) override {
69
5
    CHECK_LE(clone_id, shared_array_buffers_.size());
70
10
    return shared_array_buffers_[clone_id];
71
  }
72
73
2
  MaybeLocal<WasmCompiledModule> GetWasmModuleFromId(
74
      Isolate* isolate, uint32_t transfer_id) override {
75
2
    CHECK_LE(transfer_id, wasm_modules_.size());
76
    return WasmCompiledModule::FromTransferrableModule(
77
2
        isolate, wasm_modules_[transfer_id]);
78
  }
79
80
  ValueDeserializer* deserializer = nullptr;
81
82
 private:
83
  const std::vector<MessagePort*>& message_ports_;
84
  const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
85
  const std::vector<WasmCompiledModule::TransferrableModule>& wasm_modules_;
86
};
87
88
}  // anonymous namespace
89
90
7780
MaybeLocal<Value> Message::Deserialize(Environment* env,
91
                                       Local<Context> context) {
92
7780
  EscapableHandleScope handle_scope(env->isolate());
93
  Context::Scope context_scope(context);
94
95
  // Create all necessary MessagePort handles.
96
15560
  std::vector<MessagePort*> ports(message_ports_.size());
97
7923
  for (uint32_t i = 0; i < message_ports_.size(); ++i) {
98
143
    ports[i] = MessagePort::New(env,
99
                                context,
100
143
                                std::move(message_ports_[i]));
101
143
    if (ports[i] == nullptr) {
102
      for (MessagePort* port : ports) {
103
        // This will eventually release the MessagePort object itself.
104
        if (port != nullptr)
105
          port->Close();
106
      }
107
      return MaybeLocal<Value>();
108
    }
109
  }
110
7780
  message_ports_.clear();
111
112
15560
  std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
113
  // Attach all transferred SharedArrayBuffers to their new Isolate.
114
7785
  for (uint32_t i = 0; i < shared_array_buffers_.size(); ++i) {
115
    Local<SharedArrayBuffer> sab;
116
15
    if (!shared_array_buffers_[i]->GetSharedArrayBuffer(env, context)
117
15
            .ToLocal(&sab))
118
      return MaybeLocal<Value>();
119
5
    shared_array_buffers.push_back(sab);
120
  }
121
7780
  shared_array_buffers_.clear();
122
123
  DeserializerDelegate delegate(
124
15560
      this, env, ports, shared_array_buffers, wasm_modules_);
125
  ValueDeserializer deserializer(
126
      env->isolate(),
127
      reinterpret_cast<const uint8_t*>(main_message_buf_.data),
128
      main_message_buf_.size,
129
15559
      &delegate);
130
7780
  delegate.deserializer = &deserializer;
131
132
  // Attach all transferred ArrayBuffers to their new Isolate.
133
7782
  for (uint32_t i = 0; i < array_buffer_contents_.size(); ++i) {
134
    Local<ArrayBuffer> ab =
135
        ArrayBuffer::New(env->isolate(),
136
2
                         array_buffer_contents_[i].release(),
137
2
                         array_buffer_contents_[i].size,
138
4
                         ArrayBufferCreationMode::kInternalized);
139
2
    deserializer.TransferArrayBuffer(i, ab);
140
  }
141
7780
  array_buffer_contents_.clear();
142
143
15560
  if (deserializer.ReadHeader(context).IsNothing())
144
    return MaybeLocal<Value>();
145
  return handle_scope.Escape(
146
15559
      deserializer.ReadValue(context).FromMaybe(Local<Value>()));
147
}
148
149
5
void Message::AddSharedArrayBuffer(
150
    SharedArrayBufferMetadataReference reference) {
151
5
  shared_array_buffers_.push_back(reference);
152
5
}
153
154
167
void Message::AddMessagePort(std::unique_ptr<MessagePortData>&& data) {
155
167
  message_ports_.emplace_back(std::move(data));
156
167
}
157
158
2
uint32_t Message::AddWASMModule(WasmCompiledModule::TransferrableModule&& mod) {
159
2
  wasm_modules_.emplace_back(std::move(mod));
160
2
  return wasm_modules_.size() - 1;
161
}
162
163
namespace {
164
165
11
void ThrowDataCloneException(Environment* env, Local<String> message) {
166
  Local<Value> argv[] = {
167
    message,
168
    FIXED_ONE_BYTE_STRING(env->isolate(), "DataCloneError")
169
33
  };
170
  Local<Value> exception;
171
11
  Local<Function> domexception_ctor = env->domexception_function();
172
11
  CHECK(!domexception_ctor.IsEmpty());
173
44
  if (!domexception_ctor->NewInstance(env->context(), arraysize(argv), argv)
174
33
          .ToLocal(&exception)) {
175
11
    return;
176
  }
177
11
  env->isolate()->ThrowException(exception);
178
}
179
180
// This tells V8 how to serialize objects that it does not understand
181
// (e.g. C++ objects) into the output buffer, in a way that our own
182
// DeserializerDelegate understands how to unpack.
183
7880
class SerializerDelegate : public ValueSerializer::Delegate {
184
 public:
185
7880
  SerializerDelegate(Environment* env, Local<Context> context, Message* m)
186
7880
      : env_(env), context_(context), msg_(m) {}
187
188
1
  void ThrowDataCloneError(Local<String> message) override {
189
1
    ThrowDataCloneException(env_, message);
190
1
  }
191
192
166
  Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
193
332
    if (env_->message_port_constructor_template()->HasInstance(object)) {
194
166
      return WriteMessagePort(Unwrap<MessagePort>(object));
195
    }
196
197
    THROW_ERR_CANNOT_TRANSFER_OBJECT(env_);
198
    return Nothing<bool>();
199
  }
200
201
5
  Maybe<uint32_t> GetSharedArrayBufferId(
202
      Isolate* isolate,
203
      Local<SharedArrayBuffer> shared_array_buffer) override {
204
    uint32_t i;
205
5
    for (i = 0; i < seen_shared_array_buffers_.size(); ++i) {
206
      if (seen_shared_array_buffers_[i] == shared_array_buffer)
207
        return Just(i);
208
    }
209
210
    auto reference = SharedArrayBufferMetadata::ForSharedArrayBuffer(
211
        env_,
212
        context_,
213
5
        shared_array_buffer);
214
5
    if (!reference) {
215
      return Nothing<uint32_t>();
216
    }
217
5
    seen_shared_array_buffers_.push_back(shared_array_buffer);
218
5
    msg_->AddSharedArrayBuffer(reference);
219
5
    return Just(i);
220
  }
221
222
2
  Maybe<uint32_t> GetWasmModuleTransferId(
223
      Isolate* isolate, Local<WasmCompiledModule> module) override {
224
2
    return Just(msg_->AddWASMModule(module->GetTransferrableModule()));
225
  }
226
227
7869
  void Finish() {
228
    // Only close the MessagePort handles and actually transfer them
229
    // once we know that serialization succeeded.
230
8036
    for (MessagePort* port : ports_) {
231
334
      port->Close();
232
167
      msg_->AddMessagePort(port->Detach());
233
    }
234
7869
  }
235
236
  ValueSerializer* serializer = nullptr;
237
238
 private:
239
166
  Maybe<bool> WriteMessagePort(MessagePort* port) {
240
166
    for (uint32_t i = 0; i < ports_.size(); i++) {
241
166
      if (ports_[i] == port) {
242
166
        serializer->WriteUint32(i);
243
166
        return Just(true);
244
      }
245
    }
246
247
    THROW_ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST(env_);
248
    return Nothing<bool>();
249
  }
250
251
  Environment* env_;
252
  Local<Context> context_;
253
  Message* msg_;
254
  std::vector<Local<SharedArrayBuffer>> seen_shared_array_buffers_;
255
  std::vector<MessagePort*> ports_;
256
257
  friend class worker::Message;
258
};
259
260
}  // anonymous namespace
261
262
7880
Maybe<bool> Message::Serialize(Environment* env,
263
                               Local<Context> context,
264
                               Local<Value> input,
265
                               Local<Value> transfer_list_v,
266
                               Local<Object> source_port) {
267
7880
  HandleScope handle_scope(env->isolate());
268
  Context::Scope context_scope(context);
269
270
  // Verify that we're not silently overwriting an existing message.
271
7880
  CHECK(main_message_buf_.is_empty());
272
273
15760
  SerializerDelegate delegate(env, context, this);
274
15760
  ValueSerializer serializer(env->isolate(), &delegate);
275
7880
  delegate.serializer = &serializer;
276
277
15760
  std::vector<Local<ArrayBuffer>> array_buffers;
278
7880
  if (transfer_list_v->IsArray()) {
279
180
    Local<Array> transfer_list = transfer_list_v.As<Array>();
280
180
    uint32_t length = transfer_list->Length();
281
360
    for (uint32_t i = 0; i < length; ++i) {
282
      Local<Value> entry;
283
380
      if (!transfer_list->Get(context, i).ToLocal(&entry))
284
10
        return Nothing<bool>();
285
      // Currently, we support ArrayBuffers and MessagePorts.
286
190
      if (entry->IsArrayBuffer()) {
287
12
        Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
288
        // If we cannot render the ArrayBuffer unusable in this Isolate and
289
        // take ownership of its memory, copying the buffer will have to do.
290

24
        if (!ab->IsNeuterable() || ab->IsExternal())
291
11
          continue;
292
12
        if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
293
            array_buffers.end()) {
294
          ThrowDataCloneException(
295
              env,
296
              FIXED_ONE_BYTE_STRING(
297
                  env->isolate(),
298
1
                  "Transfer list contains duplicate ArrayBuffer"));
299
1
          return Nothing<bool>();
300
        }
301
        // We simply use the array index in the `array_buffers` list as the
302
        // ID that we write into the serialized buffer.
303
11
        uint32_t id = array_buffers.size();
304
11
        array_buffers.push_back(ab);
305
11
        serializer.TransferArrayBuffer(id, ab);
306
11
        continue;
307
356
      } else if (env->message_port_constructor_template()
308
534
                    ->HasInstance(entry)) {
309
        // Check if the source MessagePort is being transferred.
310

356
        if (!source_port.IsEmpty() && entry == source_port) {
311
          ThrowDataCloneException(
312
              env,
313
              FIXED_ONE_BYTE_STRING(env->isolate(),
314
1
                                    "Transfer list contains source port"));
315
10
          return Nothing<bool>();
316
        }
317
177
        MessagePort* port = Unwrap<MessagePort>(entry.As<Object>());
318

177
        if (port == nullptr || port->IsDetached()) {
319
          ThrowDataCloneException(
320
              env,
321
              FIXED_ONE_BYTE_STRING(
322
                  env->isolate(),
323
7
                  "MessagePort in transfer list is already detached"));
324
7
          return Nothing<bool>();
325
        }
326
170
        if (std::find(delegate.ports_.begin(), delegate.ports_.end(), port) !=
327
            delegate.ports_.end()) {
328
          ThrowDataCloneException(
329
              env,
330
              FIXED_ONE_BYTE_STRING(
331
                  env->isolate(),
332
1
                  "Transfer list contains duplicate MessagePort"));
333
1
          return Nothing<bool>();
334
        }
335
169
        delegate.ports_.push_back(port);
336
169
        continue;
337
      }
338
339
      THROW_ERR_INVALID_TRANSFER_OBJECT(env);
340
      return Nothing<bool>();
341
    }
342
  }
343
344
7870
  serializer.WriteHeader();
345
15740
  if (serializer.WriteValue(context, input).IsNothing()) {
346
1
    return Nothing<bool>();
347
  }
348
349
7872
  for (Local<ArrayBuffer> ab : array_buffers) {
350
    // If serialization succeeded, we want to take ownership of
351
    // (a.k.a. externalize) the underlying memory region and render
352
    // it inaccessible in this Isolate.
353
3
    ArrayBuffer::Contents contents = ab->Externalize();
354
3
    ab->Neuter();
355
    array_buffer_contents_.push_back(
356
3
        MallocedBuffer<char> { static_cast<char*>(contents.Data()),
357
3
                               contents.ByteLength() });
358
  }
359
360
7869
  delegate.Finish();
361
362
  // The serializer gave us a buffer allocated using `malloc()`.
363
7869
  std::pair<uint8_t*, size_t> data = serializer.Release();
364
15738
  main_message_buf_ =
365
7869
      MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
366
15749
  return Just(true);
367
}
368
369
4
void Message::MemoryInfo(MemoryTracker* tracker) const {
370
4
  tracker->TrackField("array_buffer_contents", array_buffer_contents_);
371
  tracker->TrackFieldWithSize("shared_array_buffers",
372
4
      shared_array_buffers_.size() * sizeof(shared_array_buffers_[0]));
373
4
  tracker->TrackField("message_ports", message_ports_);
374
4
}
375
376
972
MessagePortData::MessagePortData(MessagePort* owner) : owner_(owner) { }
377
378
2790
MessagePortData::~MessagePortData() {
379
930
  CHECK_NULL(owner_);
380
930
  Disentangle();
381
1860
}
382
383
8
void MessagePortData::MemoryInfo(MemoryTracker* tracker) const {
384
8
  Mutex::ScopedLock lock(mutex_);
385
8
  tracker->TrackField("incoming_messages", incoming_messages_);
386
8
}
387
388
7868
void MessagePortData::AddToIncomingQueue(Message&& message) {
389
  // This function will be called by other threads.
390
7868
  Mutex::ScopedLock lock(mutex_);
391
7868
  incoming_messages_.emplace_back(std::move(message));
392
393
7868
  if (owner_ != nullptr) {
394
7396
    Debug(owner_, "Adding message to incoming queue");
395
7396
    owner_->TriggerAsync();
396
7868
  }
397
7868
}
398
399
6233
bool MessagePortData::IsSiblingClosed() const {
400
6233
  Mutex::ScopedLock lock(*sibling_mutex_);
401
6233
  return sibling_ == nullptr;
402
}
403
404
339
void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
405
339
  CHECK_NULL(a->sibling_);
406
339
  CHECK_NULL(b->sibling_);
407
339
  a->sibling_ = b;
408
339
  b->sibling_ = a;
409
339
  a->sibling_mutex_ = b->sibling_mutex_;
410
339
}
411
412
1877
void MessagePortData::PingOwnerAfterDisentanglement() {
413
1877
  Mutex::ScopedLock lock(mutex_);
414
1877
  if (owner_ != nullptr)
415
329
    owner_->TriggerAsync();
416
1877
}
417
418
1538
void MessagePortData::Disentangle() {
419
  // Grab a copy of the sibling mutex, then replace it so that each sibling
420
  // has its own sibling_mutex_ now.
421
1538
  std::shared_ptr<Mutex> sibling_mutex = sibling_mutex_;
422
3076
  Mutex::ScopedLock sibling_lock(*sibling_mutex);
423
1538
  sibling_mutex_ = std::make_shared<Mutex>();
424
425
1538
  MessagePortData* sibling = sibling_;
426
1538
  if (sibling_ != nullptr) {
427
339
    sibling_->sibling_ = nullptr;
428
339
    sibling_ = nullptr;
429
  }
430
431
  // We close MessagePorts after disentanglement, so we trigger the
432
  // corresponding uv_async_t to let them know that this happened.
433
1538
  PingOwnerAfterDisentanglement();
434
1538
  if (sibling != nullptr) {
435
339
    sibling->PingOwnerAfterDisentanglement();
436
1538
  }
437
1538
}
438
439
2325
MessagePort::~MessagePort() {
440
775
  if (data_)
441
    data_->owner_ = nullptr;
442
1550
}
443
444
817
MessagePort::MessagePort(Environment* env,
445
                         Local<Context> context,
446
                         Local<Object> wrap)
447
  : HandleWrap(env,
448
               wrap,
449
               reinterpret_cast<uv_handle_t*>(new uv_async_t()),
450
               AsyncWrap::PROVIDER_MESSAGEPORT),
451
817
    data_(new MessagePortData(this)) {
452
13159
  auto onmessage = [](uv_async_t* handle) {
453
    // Called when data has been put into the queue.
454
6171
    MessagePort* channel = static_cast<MessagePort*>(handle->data);
455
6171
    channel->OnMessage();
456
13157
  };
457
817
  CHECK_EQ(uv_async_init(env->event_loop(),
458
                         async(),
459
                         onmessage), 0);
460
817
  async()->data = static_cast<void*>(this);
461
462
  Local<Value> fn;
463
2451
  if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn))
464
817
    return;
465
466
817
  if (fn->IsFunction()) {
467
666
    Local<Function> init = fn.As<Function>();
468
666
    USE(init->Call(context, wrap, 0, nullptr));
469
  }
470
471
817
  Debug(this, "Created message port");
472
}
473
474
void MessagePort::AddToIncomingQueue(Message&& message) {
475
  data_->AddToIncomingQueue(std::move(message));
476
}
477
478
10668
uv_async_t* MessagePort::async() {
479
10668
  return reinterpret_cast<uv_async_t*>(GetHandle());
480
}
481
482
176
bool MessagePort::IsDetached() const {
483

176
  return data_ == nullptr || IsHandleClosing();
484
}
485
486
8265
void MessagePort::TriggerAsync() {
487
16530
  if (IsHandleClosing()) return;
488
8259
  CHECK_EQ(uv_async_send(async()), 0);
489
}
490
491
979
void MessagePort::Close(v8::Local<v8::Value> close_callback) {
492
1958
  Debug(this, "Closing message port, data set = %d", static_cast<int>(!!data_));
493
494
979
  if (data_) {
495
    // Wrap this call with accessing the mutex, so that TriggerAsync()
496
    // can check IsHandleClosing() without race conditions.
497
979
    Mutex::ScopedLock sibling_lock(data_->mutex_);
498
979
    HandleWrap::Close(close_callback);
499
  } else {
500
    HandleWrap::Close(close_callback);
501
  }
502
979
}
503
504
817
void MessagePort::New(const FunctionCallbackInfo<Value>& args) {
505
817
  Environment* env = Environment::GetCurrent(args);
506
817
  if (!args.IsConstructCall()) {
507
    THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
508
817
    return;
509
  }
510
511
1634
  Local<Context> context = args.This()->CreationContext();
512
  Context::Scope context_scope(context);
513
514
1634
  new MessagePort(env, context, args.This());
515
}
516
517
817
MessagePort* MessagePort::New(
518
    Environment* env,
519
    Local<Context> context,
520
    std::unique_ptr<MessagePortData> data) {
521
  Context::Scope context_scope(context);
522
  Local<Function> ctor;
523
1634
  if (!GetMessagePortConstructor(env, context).ToLocal(&ctor))
524
    return nullptr;
525
526
  // Construct a new instance, then assign the listener instance and possibly
527
  // the MessagePortData to it.
528
  Local<Object> instance;
529
1634
  if (!ctor->NewInstance(context).ToLocal(&instance))
530
    return nullptr;
531
817
  MessagePort* port = Unwrap<MessagePort>(instance);
532
817
  CHECK_NOT_NULL(port);
533
817
  if (data) {
534
294
    port->Detach();
535
294
    port->data_ = std::move(data);
536
537
    // This lock is here to avoid race conditions with the `owner_` read
538
    // in AddToIncomingQueue(). (This would likely be unproblematic without it,
539
    // but it's better to be safe than sorry.)
540
294
    Mutex::ScopedLock lock(port->data_->mutex_);
541
294
    port->data_->owner_ = port;
542
    // If the existing MessagePortData object had pending messages, this is
543
    // the easiest way to run that queue.
544
294
    port->TriggerAsync();
545
  }
546
817
  return port;
547
}
548
549
6259
void MessagePort::OnMessage() {
550
6259
  Debug(this, "Running MessagePort::OnMessage()");
551
6259
  HandleScope handle_scope(env()->isolate());
552
12518
  Local<Context> context = object(env()->isolate())->CreationContext();
553
554
  // data_ can only ever be modified by the owner thread, so no need to lock.
555
  // However, the message port may be transferred while it is processing
556
  // messages, so we need to check that this handle still owns its `data_` field
557
  // on every iteration.
558
20272
  while (data_) {
559
14013
    Message received;
560
    {
561
      // Get the head of the message queue.
562
14013
      Mutex::ScopedLock lock(data_->mutex_);
563
564
14013
      if (stop_event_loop_) {
565
25
        Debug(this, "MessagePort stops loop as requested");
566
25
        CHECK(!data_->receiving_messages_);
567
25
        uv_stop(env()->event_loop());
568
25
        break;
569
      }
570
571
      Debug(this, "MessagePort has message, receiving = %d",
572
27976
            static_cast<int>(data_->receiving_messages_));
573
574
13988
      if (!data_->receiving_messages_)
575
53
        break;
576
13935
      if (data_->incoming_messages_.empty())
577
6155
        break;
578
7780
      received = std::move(data_->incoming_messages_.front());
579
7780
      data_->incoming_messages_.pop_front();
580
    }
581
582
7780
    if (!env()->can_call_into_js()) {
583
      Debug(this, "MessagePort drains queue because !can_call_into_js()");
584
      // In this case there is nothing to do but to drain the current queue.
585
      continue;
586
    }
587
588
    {
589
      // Call the JS .onmessage() callback.
590
7780
      HandleScope handle_scope(env()->isolate());
591
7754
      Context::Scope context_scope(context);
592
      Local<Value> args[] = {
593
15560
        received.Deserialize(env(), context).FromMaybe(Local<Value>())
594
23340
      };
595
596

38894
      if (args[0].IsEmpty() ||
597
31114
          MakeCallback(env()->onmessage_string(), 1, args).IsEmpty()) {
598
        // Re-schedule OnMessage() execution in case of failure.
599
24
        if (data_)
600
24
          TriggerAsync();
601
6281
        return;
602

7754
      }
603
    }
604
7754
  }
605
606

6233
  if (data_ && data_->IsSiblingClosed()) {
607
646
    Close();
608
6233
  }
609
}
610
611
bool MessagePort::IsSiblingClosed() const {
612
  CHECK(data_);
613
  return data_->IsSiblingClosed();
614
}
615
616
775
void MessagePort::OnClose() {
617
775
  Debug(this, "MessagePort::OnClose()");
618
775
  if (data_) {
619
608
    data_->owner_ = nullptr;
620
608
    data_->Disentangle();
621
  }
622
775
  data_.reset();
623
775
  delete async();
624
775
}
625
626
461
std::unique_ptr<MessagePortData> MessagePort::Detach() {
627
461
  CHECK(data_);
628
461
  Mutex::ScopedLock lock(data_->mutex_);
629
461
  data_->owner_ = nullptr;
630
461
  return std::move(data_);
631
}
632
633
634
7879
Maybe<bool> MessagePort::PostMessage(Environment* env,
635
                                     Local<Value> message_v,
636
                                     Local<Value> transfer_v) {
637
7879
  Isolate* isolate = env->isolate();
638
7879
  Local<Object> obj = object(isolate);
639
7879
  Local<Context> context = obj->CreationContext();
640
641
7879
  Message msg;
642
643
  // Per spec, we need to both check if transfer list has the source port, and
644
  // serialize the input message, even if the MessagePort is closed or detached.
645
646
  Maybe<bool> serialization_maybe =
647
7879
      msg.Serialize(env, context, message_v, transfer_v, obj);
648
7879
  if (data_ == nullptr) {
649
2
    return serialization_maybe;
650
  }
651
7877
  if (serialization_maybe.IsNothing()) {
652
8
    return Nothing<bool>();
653
  }
654
655
15738
  Mutex::ScopedLock lock(*data_->sibling_mutex_);
656
7869
  bool doomed = false;
657
658
  // Check if the target port is posted to itself.
659
7869
  if (data_->sibling_ != nullptr) {
660
8035
    for (const auto& port_data : msg.message_ports()) {
661
167
      if (data_->sibling_ == port_data.get()) {
662
1
        doomed = true;
663
        ProcessEmitWarning(env, "The target port was posted to itself, and "
664
1
                                "the communication channel was lost");
665
1
        break;
666
      }
667
    }
668
  }
669
670

7869
  if (data_->sibling_ == nullptr || doomed)
671
1
    return Just(true);
672
673
7868
  data_->sibling_->AddToIncomingQueue(std::move(msg));
674
15747
  return Just(true);
675
}
676
677
7880
void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
678
7880
  Environment* env = Environment::GetCurrent(args);
679
7880
  if (args.Length() == 0) {
680
    return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
681
                                       "MessagePort.postMessage");
682
  }
683
684
7880
  MessagePort* port = Unwrap<MessagePort>(args.This());
685
  // Even if the backing MessagePort object has already been deleted, we still
686
  // want to serialize the message to ensure spec-compliant behavior w.r.t.
687
  // transfers.
688
7880
  if (port == nullptr) {
689
1
    Message msg;
690
1
    Local<Object> obj = args.This();
691
1
    Local<Context> context = obj->CreationContext();
692
1
    USE(msg.Serialize(env, context, args[0], args[1], obj));
693
1
    return;
694
  }
695
696
7879
  port->PostMessage(env, args[0], args[1]);
697
}
698
699
737
void MessagePort::Start() {
700
737
  Mutex::ScopedLock lock(data_->mutex_);
701
737
  Debug(this, "Start receiving messages");
702
737
  data_->receiving_messages_ = true;
703
737
  if (!data_->incoming_messages_.empty())
704
169
    TriggerAsync();
705
737
}
706
707
29
void MessagePort::Stop() {
708
29
  Mutex::ScopedLock lock(data_->mutex_);
709
29
  Debug(this, "Stop receiving messages");
710
29
  data_->receiving_messages_ = false;
711
29
}
712
713
53
void MessagePort::StopEventLoop() {
714
53
  Mutex::ScopedLock lock(data_->mutex_);
715
53
  data_->receiving_messages_ = false;
716
53
  stop_event_loop_ = true;
717
718
53
  Debug(this, "Received StopEventLoop request");
719
53
  TriggerAsync();
720
53
}
721
722
737
void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
723
737
  Environment* env = Environment::GetCurrent(args);
724
  MessagePort* port;
725
737
  ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
726
737
  if (!port->data_) {
727
    THROW_ERR_CLOSED_MESSAGE_PORT(env);
728
    return;
729
  }
730
737
  port->Start();
731
}
732
733
107
void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
734
107
  Environment* env = Environment::GetCurrent(args);
735
  MessagePort* port;
736
185
  ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
737
29
  if (!port->data_) {
738
    THROW_ERR_CLOSED_MESSAGE_PORT(env);
739
    return;
740
  }
741
29
  port->Stop();
742
}
743
744
306
void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
745
  MessagePort* port;
746
612
  ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
747
88
  port->OnMessage();
748
}
749
750
184
void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
751
184
  Entangle(a, b->data_.get());
752
184
}
753
754
339
void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
755
339
  MessagePortData::Entangle(a->data_.get(), b);
756
339
}
757
758
9644
MaybeLocal<Function> GetMessagePortConstructor(
759
    Environment* env, Local<Context> context) {
760
  // Factor generating the MessagePort JS constructor into its own piece
761
  // of code, because it is needed early on in the child environment setup.
762
9644
  Local<FunctionTemplate> templ = env->message_port_constructor_template();
763
9644
  if (!templ.IsEmpty())
764
5228
    return templ->GetFunction(context);
765
766
  {
767
4416
    Local<FunctionTemplate> m = env->NewFunctionTemplate(MessagePort::New);
768
8832
    m->SetClassName(env->message_port_constructor_string());
769
8832
    m->InstanceTemplate()->SetInternalFieldCount(1);
770
8832
    m->Inherit(HandleWrap::GetConstructorTemplate(env));
771
772
4416
    env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage);
773
4416
    env->SetProtoMethod(m, "start", MessagePort::Start);
774
4416
    env->SetProtoMethod(m, "stop", MessagePort::Stop);
775
4416
    env->SetProtoMethod(m, "drain", MessagePort::Drain);
776
777
4416
    env->set_message_port_constructor_template(m);
778
  }
779
780
4416
  return GetMessagePortConstructor(env, context);
781
}
782
783
namespace {
784
785
184
static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
786
184
  Environment* env = Environment::GetCurrent(args);
787
184
  if (!args.IsConstructCall()) {
788
    THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
789
184
    return;
790
  }
791
792
368
  Local<Context> context = args.This()->CreationContext();
793
  Context::Scope context_scope(context);
794
795
184
  MessagePort* port1 = MessagePort::New(env, context);
796
184
  MessagePort* port2 = MessagePort::New(env, context);
797
184
  MessagePort::Entangle(port1, port2);
798
799
1104
  args.This()->Set(env->context(), env->port1_string(), port1->object())
800
368
      .FromJust();
801
1104
  args.This()->Set(env->context(), env->port2_string(), port2->object())
802
368
      .FromJust();
803
}
804
805
4406
static void RegisterDOMException(const FunctionCallbackInfo<Value>& args) {
806
4406
  Environment* env = Environment::GetCurrent(args);
807
4406
  CHECK_EQ(args.Length(), 1);
808
8812
  CHECK(args[0]->IsFunction());
809
8812
  env->set_domexception_function(args[0].As<Function>());
810
4406
}
811
812
4411
static void InitMessaging(Local<Object> target,
813
                          Local<Value> unused,
814
                          Local<Context> context,
815
                          void* priv) {
816
4411
  Environment* env = Environment::GetCurrent(context);
817
818
  {
819
    Local<String> message_channel_string =
820
4411
        FIXED_ONE_BYTE_STRING(env->isolate(), "MessageChannel");
821
4411
    Local<FunctionTemplate> templ = env->NewFunctionTemplate(MessageChannel);
822
4411
    templ->SetClassName(message_channel_string);
823
    target->Set(env->context(),
824
                message_channel_string,
825
17644
                templ->GetFunction(context).ToLocalChecked()).FromJust();
826
  }
827
828
  target->Set(context,
829
              env->message_port_constructor_string(),
830
17644
              GetMessagePortConstructor(env, context).ToLocalChecked())
831
8822
                  .FromJust();
832
833
4411
  env->SetMethod(target, "registerDOMException", RegisterDOMException);
834
4411
}
835
836
}  // anonymous namespace
837
838
}  // namespace worker
839
}  // namespace node
840
841
4314
NODE_MODULE_CONTEXT_AWARE_INTERNAL(messaging, node::worker::InitMessaging)