GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/node_messaging.cc Lines: 409 443 92.3 %
Date: 2019-02-23 22:23:05 Branches: 169 244 69.3 %

Line Branch Exec Source
1
#include "node_messaging.h"
2
#include "async_wrap-inl.h"
3
#include "async_wrap.h"
4
#include "debug_utils.h"
5
#include "node_buffer.h"
6
#include "node_errors.h"
7
#include "node_process.h"
8
#include "util-inl.h"
9
#include "util.h"
10
11
using v8::Array;
12
using v8::ArrayBuffer;
13
using v8::ArrayBufferCreationMode;
14
using v8::Context;
15
using v8::EscapableHandleScope;
16
using v8::Exception;
17
using v8::Function;
18
using v8::FunctionCallbackInfo;
19
using v8::FunctionTemplate;
20
using v8::HandleScope;
21
using v8::Isolate;
22
using v8::Just;
23
using v8::Local;
24
using v8::Maybe;
25
using v8::MaybeLocal;
26
using v8::Nothing;
27
using v8::Object;
28
using v8::ObjectTemplate;
29
using v8::SharedArrayBuffer;
30
using v8::String;
31
using v8::Value;
32
using v8::ValueDeserializer;
33
using v8::ValueSerializer;
34
using v8::WasmCompiledModule;
35
36
namespace node {
37
namespace worker {
38
39
22518
Message::Message(MallocedBuffer<char>&& buffer)
40
22518
    : main_message_buf_(std::move(buffer)) {}
41
42
namespace {
43
44
// This is used to tell V8 how to read transferred host objects, like other
45
// `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
46
8002
class DeserializerDelegate : public ValueDeserializer::Delegate {
47
 public:
48
8003
  DeserializerDelegate(
49
      Message* m,
50
      Environment* env,
51
      const std::vector<MessagePort*>& message_ports,
52
      const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
53
      const std::vector<WasmCompiledModule::TransferrableModule>& wasm_modules)
54
      : message_ports_(message_ports),
55
        shared_array_buffers_(shared_array_buffers),
56
8003
        wasm_modules_(wasm_modules) {}
57
58
154
  MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
59
    // Currently, only MessagePort hosts objects are supported, so identifying
60
    // by the index in the message's MessagePort array is sufficient.
61
    uint32_t id;
62
154
    if (!deserializer->ReadUint32(&id))
63
      return MaybeLocal<Object>();
64
154
    CHECK_LE(id, message_ports_.size());
65
308
    return message_ports_[id]->object(isolate);
66
  };
67
68
7
  MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
69
      Isolate* isolate, uint32_t clone_id) override {
70
7
    CHECK_LE(clone_id, shared_array_buffers_.size());
71
14
    return shared_array_buffers_[clone_id];
72
  }
73
74
2
  MaybeLocal<WasmCompiledModule> GetWasmModuleFromId(
75
      Isolate* isolate, uint32_t transfer_id) override {
76
2
    CHECK_LE(transfer_id, wasm_modules_.size());
77
    return WasmCompiledModule::FromTransferrableModule(
78
2
        isolate, wasm_modules_[transfer_id]);
79
  }
80
81
  ValueDeserializer* deserializer = nullptr;
82
83
 private:
84
  const std::vector<MessagePort*>& message_ports_;
85
  const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
86
  const std::vector<WasmCompiledModule::TransferrableModule>& wasm_modules_;
87
};
88
89
}  // anonymous namespace
90
91
8003
MaybeLocal<Value> Message::Deserialize(Environment* env,
92
                                       Local<Context> context) {
93
8003
  EscapableHandleScope handle_scope(env->isolate());
94
  Context::Scope context_scope(context);
95
96
  // Create all necessary MessagePort handles.
97
16004
  std::vector<MessagePort*> ports(message_ports_.size());
98
8157
  for (uint32_t i = 0; i < message_ports_.size(); ++i) {
99
154
    ports[i] = MessagePort::New(env,
100
                                context,
101
154
                                std::move(message_ports_[i]));
102
154
    if (ports[i] == nullptr) {
103
      for (MessagePort* port : ports) {
104
        // This will eventually release the MessagePort object itself.
105
        if (port != nullptr)
106
          port->Close();
107
      }
108
      return MaybeLocal<Value>();
109
    }
110
  }
111
8003
  message_ports_.clear();
112
113
16006
  std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
114
  // Attach all transferred SharedArrayBuffers to their new Isolate.
115
8010
  for (uint32_t i = 0; i < shared_array_buffers_.size(); ++i) {
116
    Local<SharedArrayBuffer> sab;
117
21
    if (!shared_array_buffers_[i]->GetSharedArrayBuffer(env, context)
118
21
            .ToLocal(&sab))
119
      return MaybeLocal<Value>();
120
7
    shared_array_buffers.push_back(sab);
121
  }
122
8003
  shared_array_buffers_.clear();
123
124
  DeserializerDelegate delegate(
125
16005
      this, env, ports, shared_array_buffers, wasm_modules_);
126
  ValueDeserializer deserializer(
127
      env->isolate(),
128
      reinterpret_cast<const uint8_t*>(main_message_buf_.data),
129
      main_message_buf_.size,
130
16006
      &delegate);
131
8003
  delegate.deserializer = &deserializer;
132
133
  // Attach all transferred ArrayBuffers to their new Isolate.
134
8005
  for (uint32_t i = 0; i < array_buffer_contents_.size(); ++i) {
135
    Local<ArrayBuffer> ab =
136
        ArrayBuffer::New(env->isolate(),
137
2
                         array_buffer_contents_[i].release(),
138
2
                         array_buffer_contents_[i].size,
139
4
                         ArrayBufferCreationMode::kInternalized);
140
2
    deserializer.TransferArrayBuffer(i, ab);
141
  }
142
8003
  array_buffer_contents_.clear();
143
144
16006
  if (deserializer.ReadHeader(context).IsNothing())
145
    return MaybeLocal<Value>();
146
  return handle_scope.Escape(
147
16006
      deserializer.ReadValue(context).FromMaybe(Local<Value>()));
148
}
149
150
7
void Message::AddSharedArrayBuffer(
151
    const SharedArrayBufferMetadataReference& reference) {
152
7
  shared_array_buffers_.push_back(reference);
153
7
}
154
155
182
void Message::AddMessagePort(std::unique_ptr<MessagePortData>&& data) {
156
182
  message_ports_.emplace_back(std::move(data));
157
182
}
158
159
2
uint32_t Message::AddWASMModule(WasmCompiledModule::TransferrableModule&& mod) {
160
2
  wasm_modules_.emplace_back(std::move(mod));
161
2
  return wasm_modules_.size() - 1;
162
}
163
164
namespace {
165
166
11
void ThrowDataCloneException(Environment* env, Local<String> message) {
167
  Local<Value> argv[] = {
168
    message,
169
    FIXED_ONE_BYTE_STRING(env->isolate(), "DataCloneError")
170
33
  };
171
  Local<Value> exception;
172
11
  Local<Function> domexception_ctor = env->domexception_function();
173
11
  CHECK(!domexception_ctor.IsEmpty());
174
44
  if (!domexception_ctor->NewInstance(env->context(), arraysize(argv), argv)
175
33
          .ToLocal(&exception)) {
176
11
    return;
177
  }
178
11
  env->isolate()->ThrowException(exception);
179
}
180
181
// This tells V8 how to serialize objects that it does not understand
182
// (e.g. C++ objects) into the output buffer, in a way that our own
183
// DeserializerDelegate understands how to unpack.
184
8118
class SerializerDelegate : public ValueSerializer::Delegate {
185
 public:
186
8118
  SerializerDelegate(Environment* env, Local<Context> context, Message* m)
187
8118
      : env_(env), context_(context), msg_(m) {}
188
189
1
  void ThrowDataCloneError(Local<String> message) override {
190
1
    ThrowDataCloneException(env_, message);
191
1
  }
192
193
181
  Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
194
362
    if (env_->message_port_constructor_template()->HasInstance(object)) {
195
181
      return WriteMessagePort(Unwrap<MessagePort>(object));
196
    }
197
198
    THROW_ERR_CANNOT_TRANSFER_OBJECT(env_);
199
    return Nothing<bool>();
200
  }
201
202
7
  Maybe<uint32_t> GetSharedArrayBufferId(
203
      Isolate* isolate,
204
      Local<SharedArrayBuffer> shared_array_buffer) override {
205
    uint32_t i;
206
7
    for (i = 0; i < seen_shared_array_buffers_.size(); ++i) {
207
      if (seen_shared_array_buffers_[i] == shared_array_buffer)
208
        return Just(i);
209
    }
210
211
    auto reference = SharedArrayBufferMetadata::ForSharedArrayBuffer(
212
        env_,
213
        context_,
214
7
        shared_array_buffer);
215
7
    if (!reference) {
216
      return Nothing<uint32_t>();
217
    }
218
7
    seen_shared_array_buffers_.push_back(shared_array_buffer);
219
7
    msg_->AddSharedArrayBuffer(reference);
220
7
    return Just(i);
221
  }
222
223
2
  Maybe<uint32_t> GetWasmModuleTransferId(
224
      Isolate* isolate, Local<WasmCompiledModule> module) override {
225
2
    return Just(msg_->AddWASMModule(module->GetTransferrableModule()));
226
  }
227
228
8107
  void Finish() {
229
    // Only close the MessagePort handles and actually transfer them
230
    // once we know that serialization succeeded.
231
8289
    for (MessagePort* port : ports_) {
232
364
      port->Close();
233
182
      msg_->AddMessagePort(port->Detach());
234
    }
235
8107
  }
236
237
  ValueSerializer* serializer = nullptr;
238
239
 private:
240
181
  Maybe<bool> WriteMessagePort(MessagePort* port) {
241
181
    for (uint32_t i = 0; i < ports_.size(); i++) {
242
181
      if (ports_[i] == port) {
243
181
        serializer->WriteUint32(i);
244
181
        return Just(true);
245
      }
246
    }
247
248
    THROW_ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST(env_);
249
    return Nothing<bool>();
250
  }
251
252
  Environment* env_;
253
  Local<Context> context_;
254
  Message* msg_;
255
  std::vector<Local<SharedArrayBuffer>> seen_shared_array_buffers_;
256
  std::vector<MessagePort*> ports_;
257
258
  friend class worker::Message;
259
};
260
261
}  // anonymous namespace
262
263
8118
Maybe<bool> Message::Serialize(Environment* env,
264
                               Local<Context> context,
265
                               Local<Value> input,
266
                               Local<Value> transfer_list_v,
267
                               Local<Object> source_port) {
268
8118
  HandleScope handle_scope(env->isolate());
269
  Context::Scope context_scope(context);
270
271
  // Verify that we're not silently overwriting an existing message.
272
8118
  CHECK(main_message_buf_.is_empty());
273
274
16236
  SerializerDelegate delegate(env, context, this);
275
16236
  ValueSerializer serializer(env->isolate(), &delegate);
276
8118
  delegate.serializer = &serializer;
277
278
16236
  std::vector<Local<ArrayBuffer>> array_buffers;
279
8118
  if (transfer_list_v->IsArray()) {
280
195
    Local<Array> transfer_list = transfer_list_v.As<Array>();
281
195
    uint32_t length = transfer_list->Length();
282
390
    for (uint32_t i = 0; i < length; ++i) {
283
      Local<Value> entry;
284
410
      if (!transfer_list->Get(context, i).ToLocal(&entry))
285
10
        return Nothing<bool>();
286
      // Currently, we support ArrayBuffers and MessagePorts.
287
205
      if (entry->IsArrayBuffer()) {
288
12
        Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
289
        // If we cannot render the ArrayBuffer unusable in this Isolate and
290
        // take ownership of its memory, copying the buffer will have to do.
291

24
        if (!ab->IsNeuterable() || ab->IsExternal())
292
11
          continue;
293
12
        if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
294
            array_buffers.end()) {
295
          ThrowDataCloneException(
296
              env,
297
              FIXED_ONE_BYTE_STRING(
298
                  env->isolate(),
299
1
                  "Transfer list contains duplicate ArrayBuffer"));
300
1
          return Nothing<bool>();
301
        }
302
        // We simply use the array index in the `array_buffers` list as the
303
        // ID that we write into the serialized buffer.
304
11
        uint32_t id = array_buffers.size();
305
11
        array_buffers.push_back(ab);
306
11
        serializer.TransferArrayBuffer(id, ab);
307
11
        continue;
308
386
      } else if (env->message_port_constructor_template()
309
579
                    ->HasInstance(entry)) {
310
        // Check if the source MessagePort is being transferred.
311

386
        if (!source_port.IsEmpty() && entry == source_port) {
312
          ThrowDataCloneException(
313
              env,
314
              FIXED_ONE_BYTE_STRING(env->isolate(),
315
1
                                    "Transfer list contains source port"));
316
10
          return Nothing<bool>();
317
        }
318
192
        MessagePort* port = Unwrap<MessagePort>(entry.As<Object>());
319

192
        if (port == nullptr || port->IsDetached()) {
320
          ThrowDataCloneException(
321
              env,
322
              FIXED_ONE_BYTE_STRING(
323
                  env->isolate(),
324
7
                  "MessagePort in transfer list is already detached"));
325
7
          return Nothing<bool>();
326
        }
327
185
        if (std::find(delegate.ports_.begin(), delegate.ports_.end(), port) !=
328
            delegate.ports_.end()) {
329
          ThrowDataCloneException(
330
              env,
331
              FIXED_ONE_BYTE_STRING(
332
                  env->isolate(),
333
1
                  "Transfer list contains duplicate MessagePort"));
334
1
          return Nothing<bool>();
335
        }
336
184
        delegate.ports_.push_back(port);
337
184
        continue;
338
      }
339
340
      THROW_ERR_INVALID_TRANSFER_OBJECT(env);
341
      return Nothing<bool>();
342
    }
343
  }
344
345
8108
  serializer.WriteHeader();
346
16216
  if (serializer.WriteValue(context, input).IsNothing()) {
347
1
    return Nothing<bool>();
348
  }
349
350
8110
  for (Local<ArrayBuffer> ab : array_buffers) {
351
    // If serialization succeeded, we want to take ownership of
352
    // (a.k.a. externalize) the underlying memory region and render
353
    // it inaccessible in this Isolate.
354
3
    ArrayBuffer::Contents contents = ab->Externalize();
355
3
    ab->Neuter();
356
    array_buffer_contents_.push_back(
357
3
        MallocedBuffer<char> { static_cast<char*>(contents.Data()),
358
3
                               contents.ByteLength() });
359
  }
360
361
8107
  delegate.Finish();
362
363
  // The serializer gave us a buffer allocated using `malloc()`.
364
8107
  std::pair<uint8_t*, size_t> data = serializer.Release();
365
16214
  main_message_buf_ =
366
8107
      MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
367
16225
  return Just(true);
368
}
369
370
4
void Message::MemoryInfo(MemoryTracker* tracker) const {
371
4
  tracker->TrackField("array_buffer_contents", array_buffer_contents_);
372
  tracker->TrackFieldWithSize("shared_array_buffers",
373
4
      shared_array_buffers_.size() * sizeof(shared_array_buffers_[0]));
374
4
  tracker->TrackField("message_ports", message_ports_);
375
4
}
376
377
1057
MessagePortData::MessagePortData(MessagePort* owner) : owner_(owner) { }
378
379
3045
MessagePortData::~MessagePortData() {
380
1015
  CHECK_NULL(owner_);
381
1015
  Disentangle();
382
2029
}
383
384
8
void MessagePortData::MemoryInfo(MemoryTracker* tracker) const {
385
8
  Mutex::ScopedLock lock(mutex_);
386
8
  tracker->TrackField("incoming_messages", incoming_messages_);
387
8
}
388
389
8106
void MessagePortData::AddToIncomingQueue(Message&& message) {
390
  // This function will be called by other threads.
391
8106
  Mutex::ScopedLock lock(mutex_);
392
8106
  incoming_messages_.emplace_back(std::move(message));
393
394
8106
  if (owner_ != nullptr) {
395
7590
    Debug(owner_, "Adding message to incoming queue");
396
7590
    owner_->TriggerAsync();
397
8106
  }
398
8106
}
399
400
6399
bool MessagePortData::IsSiblingClosed() const {
401
6399
  Mutex::ScopedLock lock(*sibling_mutex_);
402
6399
  return sibling_ == nullptr;
403
}
404
405
369
void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
406
369
  CHECK_NULL(a->sibling_);
407
369
  CHECK_NULL(b->sibling_);
408
369
  a->sibling_ = b;
409
369
  b->sibling_ = a;
410
369
  a->sibling_mutex_ = b->sibling_mutex_;
411
369
}
412
413
2047
void MessagePortData::PingOwnerAfterDisentanglement() {
414
2047
  Mutex::ScopedLock lock(mutex_);
415
2047
  if (owner_ != nullptr)
416
357
    owner_->TriggerAsync();
417
2046
}
418
419
1678
void MessagePortData::Disentangle() {
420
  // Grab a copy of the sibling mutex, then replace it so that each sibling
421
  // has its own sibling_mutex_ now.
422
1678
  std::shared_ptr<Mutex> sibling_mutex = sibling_mutex_;
423
3354
  Mutex::ScopedLock sibling_lock(*sibling_mutex);
424
1678
  sibling_mutex_ = std::make_shared<Mutex>();
425
426
1678
  MessagePortData* sibling = sibling_;
427
1678
  if (sibling_ != nullptr) {
428
369
    sibling_->sibling_ = nullptr;
429
369
    sibling_ = nullptr;
430
  }
431
432
  // We close MessagePorts after disentanglement, so we trigger the
433
  // corresponding uv_async_t to let them know that this happened.
434
1678
  PingOwnerAfterDisentanglement();
435
1677
  if (sibling != nullptr) {
436
369
    sibling->PingOwnerAfterDisentanglement();
437
1678
  }
438
1678
}
439
440
2535
MessagePort::~MessagePort() {
441
845
  if (data_)
442
    data_->owner_ = nullptr;
443
1690
}
444
445
887
MessagePort::MessagePort(Environment* env,
446
                         Local<Context> context,
447
                         Local<Object> wrap)
448
  : HandleWrap(env,
449
               wrap,
450
               reinterpret_cast<uv_handle_t*>(new uv_async_t()),
451
               AsyncWrap::PROVIDER_MESSAGEPORT),
452
887
    data_(new MessagePortData(this)) {
453
13565
  auto onmessage = [](uv_async_t* handle) {
454
    // Called when data has been put into the queue.
455
6339
    MessagePort* channel = static_cast<MessagePort*>(handle->data);
456
6339
    channel->OnMessage();
457
13563
  };
458
887
  CHECK_EQ(uv_async_init(env->event_loop(),
459
                         async(),
460
                         onmessage), 0);
461
887
  async()->data = static_cast<void*>(this);
462
463
  Local<Value> fn;
464
2661
  if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn))
465
887
    return;
466
467
887
  if (fn->IsFunction()) {
468
722
    Local<Function> init = fn.As<Function>();
469
722
    USE(init->Call(context, wrap, 0, nullptr));
470
  }
471
472
887
  Debug(this, "Created message port");
473
}
474
475
void MessagePort::AddToIncomingQueue(Message&& message) {
476
  data_->AddToIncomingQueue(std::move(message));
477
}
478
479
11146
uv_async_t* MessagePort::async() {
480
11146
  return reinterpret_cast<uv_async_t*>(GetHandle());
481
}
482
483
191
bool MessagePort::IsDetached() const {
484

191
  return data_ == nullptr || IsHandleClosing();
485
}
486
487
8533
void MessagePort::TriggerAsync() {
488
17066
  if (IsHandleClosing()) return;
489
8527
  CHECK_EQ(uv_async_send(async()), 0);
490
}
491
492
1060
void MessagePort::Close(v8::Local<v8::Value> close_callback) {
493
2120
  Debug(this, "Closing message port, data set = %d", static_cast<int>(!!data_));
494
495
1060
  if (data_) {
496
    // Wrap this call with accessing the mutex, so that TriggerAsync()
497
    // can check IsHandleClosing() without race conditions.
498
1060
    Mutex::ScopedLock sibling_lock(data_->mutex_);
499
1060
    HandleWrap::Close(close_callback);
500
  } else {
501
    HandleWrap::Close(close_callback);
502
  }
503
1060
}
504
505
887
void MessagePort::New(const FunctionCallbackInfo<Value>& args) {
506
887
  Environment* env = Environment::GetCurrent(args);
507
887
  if (!args.IsConstructCall()) {
508
    THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
509
887
    return;
510
  }
511
512
1774
  Local<Context> context = args.This()->CreationContext();
513
  Context::Scope context_scope(context);
514
515
1774
  new MessagePort(env, context, args.This());
516
}
517
518
887
MessagePort* MessagePort::New(
519
    Environment* env,
520
    Local<Context> context,
521
    std::unique_ptr<MessagePortData> data) {
522
  Context::Scope context_scope(context);
523
  Local<Function> ctor;
524
1774
  if (!GetMessagePortConstructor(env, context).ToLocal(&ctor))
525
    return nullptr;
526
527
  // Construct a new instance, then assign the listener instance and possibly
528
  // the MessagePortData to it.
529
  Local<Object> instance;
530
1774
  if (!ctor->NewInstance(context).ToLocal(&instance))
531
    return nullptr;
532
887
  MessagePort* port = Unwrap<MessagePort>(instance);
533
887
  CHECK_NOT_NULL(port);
534
887
  if (data) {
535
319
    port->Detach();
536
319
    port->data_ = std::move(data);
537
538
    // This lock is here to avoid race conditions with the `owner_` read
539
    // in AddToIncomingQueue(). (This would likely be unproblematic without it,
540
    // but it's better to be safe than sorry.)
541
319
    Mutex::ScopedLock lock(port->data_->mutex_);
542
319
    port->data_->owner_ = port;
543
    // If the existing MessagePortData object had pending messages, this is
544
    // the easiest way to run that queue.
545
319
    port->TriggerAsync();
546
  }
547
887
  return port;
548
}
549
550
6427
void MessagePort::OnMessage() {
551
6427
  Debug(this, "Running MessagePort::OnMessage()");
552
6427
  HandleScope handle_scope(env()->isolate());
553
12854
  Local<Context> context = object(env()->isolate())->CreationContext();
554
555
  // data_ can only ever be modified by the owner thread, so no need to lock.
556
  // However, the message port may be transferred while it is processing
557
  // messages, so we need to check that this handle still owns its `data_` field
558
  // on every iteration.
559
20829
  while (data_) {
560
14402
    Message received;
561
    {
562
      // Get the head of the message queue.
563
14402
      Mutex::ScopedLock lock(data_->mutex_);
564
565
14402
      if (stop_event_loop_) {
566
25
        Debug(this, "MessagePort stops loop as requested");
567
25
        CHECK(!data_->receiving_messages_);
568
25
        uv_stop(env()->event_loop());
569
25
        break;
570
      }
571
572
      Debug(this, "MessagePort has message, receiving = %d",
573
28752
            static_cast<int>(data_->receiving_messages_));
574
575
14375
      if (!data_->receiving_messages_)
576
56
        break;
577
14321
      if (data_->incoming_messages_.empty())
578
6318
        break;
579
8003
      received = std::move(data_->incoming_messages_.front());
580
8003
      data_->incoming_messages_.pop_front();
581
    }
582
583
8003
    if (!env()->can_call_into_js()) {
584
      Debug(this, "MessagePort drains queue because !can_call_into_js()");
585
      // In this case there is nothing to do but to drain the current queue.
586
      continue;
587
    }
588
589
    {
590
      // Call the JS .onmessage() callback.
591
8003
      HandleScope handle_scope(env()->isolate());
592
7975
      Context::Scope context_scope(context);
593
594
      Local<Object> event;
595
      Local<Value> payload;
596
16006
      Local<Value> cb_args[1];
597

48011
      if (!received.Deserialize(env(), context).ToLocal(&payload) ||
598
24004
          !env()->message_event_object_template()->NewInstance(context)
599

48013
              .ToLocal(&event) ||
600

56014
          event->Set(context, env()->data_string(), payload).IsNothing() ||
601


56016
          event->Set(context, env()->target_string(), object()).IsNothing() ||
602

40005
          (cb_args[0] = event, false) ||
603
          MakeCallback(env()->onmessage_string(),
604
8001
                       arraysize(cb_args),
605
24001
                       cb_args).IsEmpty()) {
606
        // Re-schedule OnMessage() execution in case of failure.
607
26
        if (data_)
608
26
          TriggerAsync();
609
6451
        return;
610

7975
      }
611
    }
612
7975
  }
613
614

6399
  if (data_ && data_->IsSiblingClosed()) {
615
696
    Close();
616
6399
  }
617
}
618
619
bool MessagePort::IsSiblingClosed() const {
620
  CHECK(data_);
621
  return data_->IsSiblingClosed();
622
}
623
624
845
void MessagePort::OnClose() {
625
845
  Debug(this, "MessagePort::OnClose()");
626
845
  if (data_) {
627
663
    data_->owner_ = nullptr;
628
663
    data_->Disentangle();
629
  }
630
845
  data_.reset();
631
845
  delete async();
632
845
}
633
634
501
std::unique_ptr<MessagePortData> MessagePort::Detach() {
635
501
  CHECK(data_);
636
501
  Mutex::ScopedLock lock(data_->mutex_);
637
501
  data_->owner_ = nullptr;
638
501
  return std::move(data_);
639
}
640
641
642
8117
Maybe<bool> MessagePort::PostMessage(Environment* env,
643
                                     Local<Value> message_v,
644
                                     Local<Value> transfer_v) {
645
8117
  Isolate* isolate = env->isolate();
646
8117
  Local<Object> obj = object(isolate);
647
8117
  Local<Context> context = obj->CreationContext();
648
649
8117
  Message msg;
650
651
  // Per spec, we need to both check if transfer list has the source port, and
652
  // serialize the input message, even if the MessagePort is closed or detached.
653
654
  Maybe<bool> serialization_maybe =
655
8117
      msg.Serialize(env, context, message_v, transfer_v, obj);
656
8117
  if (data_ == nullptr) {
657
2
    return serialization_maybe;
658
  }
659
8115
  if (serialization_maybe.IsNothing()) {
660
8
    return Nothing<bool>();
661
  }
662
663
16214
  Mutex::ScopedLock lock(*data_->sibling_mutex_);
664
8107
  bool doomed = false;
665
666
  // Check if the target port is posted to itself.
667
8107
  if (data_->sibling_ != nullptr) {
668
8288
    for (const auto& port_data : msg.message_ports()) {
669
182
      if (data_->sibling_ == port_data.get()) {
670
1
        doomed = true;
671
        ProcessEmitWarning(env, "The target port was posted to itself, and "
672
1
                                "the communication channel was lost");
673
1
        break;
674
      }
675
    }
676
  }
677
678

8107
  if (data_->sibling_ == nullptr || doomed)
679
1
    return Just(true);
680
681
8106
  data_->sibling_->AddToIncomingQueue(std::move(msg));
682
16223
  return Just(true);
683
}
684
685
8118
void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
686
8118
  Environment* env = Environment::GetCurrent(args);
687
8118
  if (args.Length() == 0) {
688
    return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
689
                                       "MessagePort.postMessage");
690
  }
691
692
8118
  MessagePort* port = Unwrap<MessagePort>(args.This());
693
  // Even if the backing MessagePort object has already been deleted, we still
694
  // want to serialize the message to ensure spec-compliant behavior w.r.t.
695
  // transfers.
696
8118
  if (port == nullptr) {
697
1
    Message msg;
698
1
    Local<Object> obj = args.This();
699
1
    Local<Context> context = obj->CreationContext();
700
1
    USE(msg.Serialize(env, context, args[0], args[1], obj));
701
1
    return;
702
  }
703
704
8117
  port->PostMessage(env, args[0], args[1]);
705
}
706
707
796
void MessagePort::Start() {
708
796
  Mutex::ScopedLock lock(data_->mutex_);
709
796
  Debug(this, "Start receiving messages");
710
796
  data_->receiving_messages_ = true;
711
796
  if (!data_->incoming_messages_.empty())
712
181
    TriggerAsync();
713
796
}
714
715
30
void MessagePort::Stop() {
716
30
  Mutex::ScopedLock lock(data_->mutex_);
717
30
  Debug(this, "Stop receiving messages");
718
30
  data_->receiving_messages_ = false;
719
30
}
720
721
60
void MessagePort::StopEventLoop() {
722
60
  Mutex::ScopedLock lock(data_->mutex_);
723
60
  data_->receiving_messages_ = false;
724
60
  stop_event_loop_ = true;
725
726
60
  Debug(this, "Received StopEventLoop request");
727
60
  TriggerAsync();
728
60
}
729
730
796
void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
731
796
  Environment* env = Environment::GetCurrent(args);
732
  MessagePort* port;
733
796
  ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
734
796
  if (!port->data_) {
735
    THROW_ERR_CLOSED_MESSAGE_PORT(env);
736
    return;
737
  }
738
796
  port->Start();
739
}
740
741
110
void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
742
110
  Environment* env = Environment::GetCurrent(args);
743
  MessagePort* port;
744
220
  CHECK(args[0]->IsObject());
745
300
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
746
30
  if (!port->data_) {
747
    THROW_ERR_CLOSED_MESSAGE_PORT(env);
748
    return;
749
  }
750
30
  port->Stop();
751
}
752
753
334
void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
754
  MessagePort* port;
755
668
  CHECK(args[0]->IsObject());
756
1002
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
757
88
  port->OnMessage();
758
}
759
760
199
void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
761
199
  Entangle(a, b->data_.get());
762
199
}
763
764
369
void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
765
369
  MessagePortData::Entangle(a->data_.get(), b);
766
369
}
767
768
9688
MaybeLocal<Function> GetMessagePortConstructor(
769
    Environment* env, Local<Context> context) {
770
  // Factor generating the MessagePort JS constructor into its own piece
771
  // of code, because it is needed early on in the child environment setup.
772
9688
  Local<FunctionTemplate> templ = env->message_port_constructor_template();
773
9688
  if (!templ.IsEmpty())
774
5284
    return templ->GetFunction(context);
775
776
4404
  Isolate* isolate = env->isolate();
777
778
  {
779
4404
    Local<FunctionTemplate> m = env->NewFunctionTemplate(MessagePort::New);
780
8808
    m->SetClassName(env->message_port_constructor_string());
781
8808
    m->InstanceTemplate()->SetInternalFieldCount(1);
782
8808
    m->Inherit(HandleWrap::GetConstructorTemplate(env));
783
784
4404
    env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage);
785
4404
    env->SetProtoMethod(m, "start", MessagePort::Start);
786
787
4404
    env->set_message_port_constructor_template(m);
788
789
4404
    Local<FunctionTemplate> event_ctor = FunctionTemplate::New(isolate);
790
8808
    event_ctor->SetClassName(FIXED_ONE_BYTE_STRING(isolate, "MessageEvent"));
791
4404
    Local<ObjectTemplate> e = event_ctor->InstanceTemplate();
792
8808
    e->Set(env->data_string(), Null(isolate));
793
8808
    e->Set(env->target_string(), Null(isolate));
794
4404
    env->set_message_event_object_template(e);
795
  }
796
797
4404
  return GetMessagePortConstructor(env, context);
798
}
799
800
namespace {
801
802
199
static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
803
199
  Environment* env = Environment::GetCurrent(args);
804
199
  if (!args.IsConstructCall()) {
805
    THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
806
199
    return;
807
  }
808
809
398
  Local<Context> context = args.This()->CreationContext();
810
  Context::Scope context_scope(context);
811
812
199
  MessagePort* port1 = MessagePort::New(env, context);
813
199
  MessagePort* port2 = MessagePort::New(env, context);
814
199
  MessagePort::Entangle(port1, port2);
815
816
1194
  args.This()->Set(env->context(), env->port1_string(), port1->object())
817
398
      .FromJust();
818
1194
  args.This()->Set(env->context(), env->port2_string(), port2->object())
819
398
      .FromJust();
820
}
821
822
4392
static void RegisterDOMException(const FunctionCallbackInfo<Value>& args) {
823
4392
  Environment* env = Environment::GetCurrent(args);
824
4392
  CHECK_EQ(args.Length(), 1);
825
8784
  CHECK(args[0]->IsFunction());
826
8784
  env->set_domexception_function(args[0].As<Function>());
827
4392
}
828
829
4397
static void InitMessaging(Local<Object> target,
830
                          Local<Value> unused,
831
                          Local<Context> context,
832
                          void* priv) {
833
4397
  Environment* env = Environment::GetCurrent(context);
834
835
  {
836
    Local<String> message_channel_string =
837
4397
        FIXED_ONE_BYTE_STRING(env->isolate(), "MessageChannel");
838
4397
    Local<FunctionTemplate> templ = env->NewFunctionTemplate(MessageChannel);
839
4397
    templ->SetClassName(message_channel_string);
840
    target->Set(env->context(),
841
                message_channel_string,
842
17588
                templ->GetFunction(context).ToLocalChecked()).FromJust();
843
  }
844
845
  target->Set(context,
846
              env->message_port_constructor_string(),
847
17588
              GetMessagePortConstructor(env, context).ToLocalChecked())
848
8794
                  .FromJust();
849
850
4397
  env->SetMethod(target, "registerDOMException", RegisterDOMException);
851
852
  // These are not methods on the MessagePort prototype, because
853
  // the browser equivalents do not provide them.
854
4397
  env->SetMethod(target, "stopMessagePort", MessagePort::Stop);
855
4397
  env->SetMethod(target, "drainMessagePort", MessagePort::Drain);
856
4397
}
857
858
}  // anonymous namespace
859
860
}  // namespace worker
861
}  // namespace node
862
863
4288
NODE_MODULE_CONTEXT_AWARE_INTERNAL(messaging, node::worker::InitMessaging)