GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/node_messaging.cc Lines: 725 785 92.4 %
Date: 2021-01-16 04:10:54 Branches: 358 491 72.9 %

Line Branch Exec Source
1
#include "node_messaging.h"
2
3
#include "async_wrap-inl.h"
4
#include "debug_utils-inl.h"
5
#include "memory_tracker-inl.h"
6
#include "node_buffer.h"
7
#include "node_contextify.h"
8
#include "node_errors.h"
9
#include "node_external_reference.h"
10
#include "node_process.h"
11
#include "util-inl.h"
12
13
using node::contextify::ContextifyContext;
14
using node::errors::TryCatchScope;
15
using v8::Array;
16
using v8::ArrayBuffer;
17
using v8::BackingStore;
18
using v8::CompiledWasmModule;
19
using v8::Context;
20
using v8::EscapableHandleScope;
21
using v8::Function;
22
using v8::FunctionCallbackInfo;
23
using v8::FunctionTemplate;
24
using v8::Global;
25
using v8::HandleScope;
26
using v8::Isolate;
27
using v8::Just;
28
using v8::Local;
29
using v8::Maybe;
30
using v8::MaybeLocal;
31
using v8::Nothing;
32
using v8::Object;
33
using v8::SharedArrayBuffer;
34
using v8::String;
35
using v8::Symbol;
36
using v8::Value;
37
using v8::ValueDeserializer;
38
using v8::ValueSerializer;
39
using v8::WasmModuleObject;
40
41
namespace node {
42
43
using BaseObjectList = std::vector<BaseObjectPtr<BaseObject>>;
44
45
1
BaseObject::TransferMode BaseObject::GetTransferMode() const {
46
1
  return BaseObject::TransferMode::kUntransferable;
47
}
48
49
std::unique_ptr<worker::TransferData> BaseObject::TransferForMessaging() {
50
  return CloneForMessaging();
51
}
52
53
std::unique_ptr<worker::TransferData> BaseObject::CloneForMessaging() const {
54
  return {};
55
}
56
57
10657
Maybe<BaseObjectList> BaseObject::NestedTransferables() const {
58
10657
  return Just(BaseObjectList {});
59
}
60
61
10402
Maybe<bool> BaseObject::FinalizeTransferRead(
62
    Local<Context> context, ValueDeserializer* deserializer) {
63
10402
  return Just(true);
64
}
65
66
namespace worker {
67
68
10663
Maybe<bool> TransferData::FinalizeTransferWrite(
69
    Local<Context> context, ValueSerializer* serializer) {
70
10663
  return Just(true);
71
}
72
73
78973
Message::Message(MallocedBuffer<char>&& buffer)
74
78973
    : main_message_buf_(std::move(buffer)) {}
75
76
109400
bool Message::IsCloseMessage() const {
77
109400
  return main_message_buf_.data == nullptr;
78
}
79
80
namespace {
81
82
// This is used to tell V8 how to read transferred host objects, like other
83
// `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
84
43965
class DeserializerDelegate : public ValueDeserializer::Delegate {
85
 public:
86
43941
  DeserializerDelegate(
87
      Message* m,
88
      Environment* env,
89
      const std::vector<BaseObjectPtr<BaseObject>>& host_objects,
90
      const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
91
      const std::vector<CompiledWasmModule>& wasm_modules)
92
43941
      : host_objects_(host_objects),
93
        shared_array_buffers_(shared_array_buffers),
94
43941
        wasm_modules_(wasm_modules) {}
95
96
10406
  MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
97
    // Identifying the index in the message's BaseObject array is sufficient.
98
    uint32_t id;
99
10406
    if (!deserializer->ReadUint32(&id))
100
      return MaybeLocal<Object>();
101
10406
    CHECK_LE(id, host_objects_.size());
102
20812
    return host_objects_[id]->object(isolate);
103
  }
104
105
411
  MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
106
      Isolate* isolate, uint32_t clone_id) override {
107
411
    CHECK_LE(clone_id, shared_array_buffers_.size());
108
822
    return shared_array_buffers_[clone_id];
109
  }
110
111
2
  MaybeLocal<WasmModuleObject> GetWasmModuleFromId(
112
      Isolate* isolate, uint32_t transfer_id) override {
113
2
    CHECK_LE(transfer_id, wasm_modules_.size());
114
    return WasmModuleObject::FromCompiledModule(
115
2
        isolate, wasm_modules_[transfer_id]);
116
  }
117
118
  ValueDeserializer* deserializer = nullptr;
119
120
 private:
121
  const std::vector<BaseObjectPtr<BaseObject>>& host_objects_;
122
  const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
123
  const std::vector<CompiledWasmModule>& wasm_modules_;
124
};
125
126
}  // anonymous namespace
127
128
44004
MaybeLocal<Value> Message::Deserialize(Environment* env,
129
                                       Local<Context> context) {
130
44004
  CHECK(!IsCloseMessage());
131
132
44032
  EscapableHandleScope handle_scope(env->isolate());
133
  Context::Scope context_scope(context);
134
135
  // Create all necessary objects for transferables, e.g. MessagePort handles.
136
87900
  std::vector<BaseObjectPtr<BaseObject>> host_objects(transferables_.size());
137
43684
  auto cleanup = OnScopeLeave([&]() {
138
43684
    for (BaseObjectPtr<BaseObject> object : host_objects) {
139
9
      if (!object) continue;
140
141
      // If the function did not finish successfully, host_objects will contain
142
      // a list of objects that will never be passed to JS. Therefore, we
143
      // destroy them here.
144
      object->Detach();
145
    }
146
131796
  });
147
148
54422
  for (uint32_t i = 0; i < transferables_.size(); ++i) {
149
10412
    TransferData* data = transferables_[i].get();
150
20824
    host_objects[i] = data->Deserialize(
151
20824
        env, context, std::move(transferables_[i]));
152
10418
    if (!host_objects[i]) return {};
153
  }
154
44018
  transferables_.clear();
155
156
87927
  std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
157
  // Attach all transferred SharedArrayBuffers to their new Isolate.
158
44429
  for (uint32_t i = 0; i < shared_array_buffers_.size(); ++i) {
159
    Local<SharedArrayBuffer> sab =
160
411
        SharedArrayBuffer::New(env->isolate(), shared_array_buffers_[i]);
161
411
    shared_array_buffers.push_back(sab);
162
  }
163
164
  DeserializerDelegate delegate(
165
87988
      this, env, host_objects, shared_array_buffers, wasm_modules_);
166
  ValueDeserializer deserializer(
167
      env->isolate(),
168
43998
      reinterpret_cast<const uint8_t*>(main_message_buf_.data),
169
      main_message_buf_.size,
170
131900
      &delegate);
171
44031
  delegate.deserializer = &deserializer;
172
173
  // Attach all transferred ArrayBuffers to their new Isolate.
174
44040
  for (uint32_t i = 0; i < array_buffers_.size(); ++i) {
175
    Local<ArrayBuffer> ab =
176
9
        ArrayBuffer::New(env->isolate(), std::move(array_buffers_[i]));
177
9
    deserializer.TransferArrayBuffer(i, ab);
178
  }
179
180
88025
  if (deserializer.ReadHeader(context).IsNothing())
181
    return {};
182
  Local<Value> return_value;
183
87858
  if (!deserializer.ReadValue(context).ToLocal(&return_value))
184
    return {};
185
186
54270
  for (BaseObjectPtr<BaseObject> base_object : host_objects) {
187
20812
    if (base_object->FinalizeTransferRead(context, &deserializer).IsNothing())
188
      return {};
189
  }
190
191
43609
  host_objects.clear();
192
43904
  return handle_scope.Escape(return_value);
193
}
194
195
658
void Message::AddSharedArrayBuffer(
196
    std::shared_ptr<BackingStore> backing_store) {
197
658
  shared_array_buffers_.emplace_back(std::move(backing_store));
198
658
}
199
200
10671
void Message::AddTransferable(std::unique_ptr<TransferData>&& data) {
201
10671
  transferables_.emplace_back(std::move(data));
202
10671
}
203
204
2
uint32_t Message::AddWASMModule(CompiledWasmModule&& mod) {
205
2
  wasm_modules_.emplace_back(std::move(mod));
206
2
  return wasm_modules_.size() - 1;
207
}
208
209
namespace {
210
211
32863
MaybeLocal<Function> GetEmitMessageFunction(Local<Context> context) {
212
32863
  Isolate* isolate = context->GetIsolate();
213
  Local<Object> per_context_bindings;
214
  Local<Value> emit_message_val;
215

131452
  if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
216
98589
      !per_context_bindings->Get(context,
217
98589
                                FIXED_ONE_BYTE_STRING(isolate, "emitMessage"))
218
32863
          .ToLocal(&emit_message_val)) {
219
    return MaybeLocal<Function>();
220
  }
221
32863
  CHECK(emit_message_val->IsFunction());
222
32863
  return emit_message_val.As<Function>();
223
}
224
225
455
MaybeLocal<Function> GetDOMException(Local<Context> context) {
226
455
  Isolate* isolate = context->GetIsolate();
227
  Local<Object> per_context_bindings;
228
  Local<Value> domexception_ctor_val;
229

1820
  if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
230
1365
      !per_context_bindings->Get(context,
231
1365
                                FIXED_ONE_BYTE_STRING(isolate, "DOMException"))
232
455
          .ToLocal(&domexception_ctor_val)) {
233
    return MaybeLocal<Function>();
234
  }
235
455
  CHECK(domexception_ctor_val->IsFunction());
236
455
  Local<Function> domexception_ctor = domexception_ctor_val.As<Function>();
237
455
  return domexception_ctor;
238
}
239
240
16
void ThrowDataCloneException(Local<Context> context, Local<String> message) {
241
16
  Isolate* isolate = context->GetIsolate();
242
  Local<Value> argv[] = {message,
243
48
                         FIXED_ONE_BYTE_STRING(isolate, "DataCloneError")};
244
  Local<Value> exception;
245
  Local<Function> domexception_ctor;
246

64
  if (!GetDOMException(context).ToLocal(&domexception_ctor) ||
247
64
      !domexception_ctor->NewInstance(context, arraysize(argv), argv)
248
16
           .ToLocal(&exception)) {
249
    return;
250
  }
251
16
  isolate->ThrowException(exception);
252
}
253
254
// This tells V8 how to serialize objects that it does not understand
255
// (e.g. C++ objects) into the output buffer, in a way that our own
256
// DeserializerDelegate understands how to unpack.
257
44980
class SerializerDelegate : public ValueSerializer::Delegate {
258
 public:
259
44980
  SerializerDelegate(Environment* env, Local<Context> context, Message* m)
260
44980
      : env_(env), context_(context), msg_(m) {}
261
262
6
  void ThrowDataCloneError(Local<String> message) override {
263
6
    ThrowDataCloneException(context_, message);
264
6
  }
265
266
10678
  Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
267
21356
    if (env_->base_object_ctor_template()->HasInstance(object)) {
268
      return WriteHostObject(
269
10678
          BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(object) });
270
    }
271
272
    ThrowDataCloneError(env_->clone_unsupported_type_str());
273
    return Nothing<bool>();
274
  }
275
276
658
  Maybe<uint32_t> GetSharedArrayBufferId(
277
      Isolate* isolate,
278
      Local<SharedArrayBuffer> shared_array_buffer) override {
279
    uint32_t i;
280
683
    for (i = 0; i < seen_shared_array_buffers_.size(); ++i) {
281
50
      if (PersistentToLocal::Strong(seen_shared_array_buffers_[i]) ==
282
          shared_array_buffer) {
283
        return Just(i);
284
      }
285
    }
286
287
658
    seen_shared_array_buffers_.emplace_back(
288
1316
      Global<SharedArrayBuffer> { isolate, shared_array_buffer });
289
658
    msg_->AddSharedArrayBuffer(shared_array_buffer->GetBackingStore());
290
658
    return Just(i);
291
  }
292
293
2
  Maybe<uint32_t> GetWasmModuleTransferId(
294
      Isolate* isolate, Local<WasmModuleObject> module) override {
295
2
    return Just(msg_->AddWASMModule(module->GetCompiledModule()));
296
  }
297
298
44961
  Maybe<bool> Finish(Local<Context> context) {
299
55632
    for (uint32_t i = 0; i < host_objects_.size(); i++) {
300
21346
      BaseObjectPtr<BaseObject> host_object = std::move(host_objects_[i]);
301
21346
      std::unique_ptr<TransferData> data;
302
10675
      if (i < first_cloned_object_index_)
303
10661
        data = host_object->TransferForMessaging();
304
10675
      if (!data)
305
17
        data = host_object->CloneForMessaging();
306
10675
      if (!data) return Nothing<bool>();
307
21344
      if (data->FinalizeTransferWrite(context, serializer).IsNothing())
308
1
        return Nothing<bool>();
309
10671
      msg_->AddTransferable(std::move(data));
310
    }
311
44957
    return Just(true);
312
  }
313
314
10668
  inline void AddHostObject(BaseObjectPtr<BaseObject> host_object) {
315
    // Make sure we have not started serializing the value itself yet.
316
10668
    CHECK_EQ(first_cloned_object_index_, SIZE_MAX);
317
10668
    host_objects_.emplace_back(std::move(host_object));
318
10668
  }
319
320
  // Some objects in the transfer list may register sub-objects that can be
321
  // transferred. This could e.g. be a public JS wrapper object, such as a
322
  // FileHandle, that is registering its C++ handle for transfer.
323
44970
  inline Maybe<bool> AddNestedHostObjects() {
324
55637
    for (size_t i = 0; i < host_objects_.size(); i++) {
325
21334
      std::vector<BaseObjectPtr<BaseObject>> nested_transferables;
326
21334
      if (!host_objects_[i]->NestedTransferables().To(&nested_transferables))
327
        return Nothing<bool>();
328

10677
      for (auto nested_transferable : nested_transferables) {
329
20
        if (std::find(host_objects_.begin(),
330
                      host_objects_.end(),
331
20
                      nested_transferable) == host_objects_.end()) {
332
10
          AddHostObject(nested_transferable);
333
        }
334
      }
335
    }
336
44970
    return Just(true);
337
  }
338
339
  ValueSerializer* serializer = nullptr;
340
341
 private:
342
10678
  Maybe<bool> WriteHostObject(BaseObjectPtr<BaseObject> host_object) {
343
10678
    BaseObject::TransferMode mode = host_object->GetTransferMode();
344
10678
    if (mode == BaseObject::TransferMode::kUntransferable) {
345
2
      ThrowDataCloneError(env_->clone_unsupported_type_str());
346
2
      return Nothing<bool>();
347
    }
348
349
10691
    for (uint32_t i = 0; i < host_objects_.size(); i++) {
350
10673
      if (host_objects_[i] == host_object) {
351
10658
        serializer->WriteUint32(i);
352
10658
        return Just(true);
353
      }
354
    }
355
356
18
    if (mode == BaseObject::TransferMode::kTransferable) {
357
4
      THROW_ERR_MISSING_TRANSFERABLE_IN_TRANSFER_LIST(env_);
358
4
      return Nothing<bool>();
359
    }
360
361
14
    CHECK_EQ(mode, BaseObject::TransferMode::kCloneable);
362
14
    uint32_t index = host_objects_.size();
363
14
    if (first_cloned_object_index_ == SIZE_MAX)
364
12
      first_cloned_object_index_ = index;
365
14
    serializer->WriteUint32(index);
366
14
    host_objects_.push_back(host_object);
367
14
    return Just(true);
368
  }
369
370
  Environment* env_;
371
  Local<Context> context_;
372
  Message* msg_;
373
  std::vector<Global<SharedArrayBuffer>> seen_shared_array_buffers_;
374
  std::vector<BaseObjectPtr<BaseObject>> host_objects_;
375
  size_t first_cloned_object_index_ = SIZE_MAX;
376
377
  friend class worker::Message;
378
};
379
380
}  // anonymous namespace
381
382
44980
Maybe<bool> Message::Serialize(Environment* env,
383
                               Local<Context> context,
384
                               Local<Value> input,
385
                               const TransferList& transfer_list_v,
386
                               Local<Object> source_port) {
387
89960
  HandleScope handle_scope(env->isolate());
388
  Context::Scope context_scope(context);
389
390
  // Verify that we're not silently overwriting an existing message.
391
44980
  CHECK(main_message_buf_.is_empty());
392
393
89960
  SerializerDelegate delegate(env, context, this);
394
89960
  ValueSerializer serializer(env->isolate(), &delegate);
395
44980
  delegate.serializer = &serializer;
396
397
89960
  std::vector<Local<ArrayBuffer>> array_buffers;
398
55665
  for (uint32_t i = 0; i < transfer_list_v.length(); ++i) {
399
10695
    Local<Value> entry = transfer_list_v[i];
400
10695
    if (entry->IsObject()) {
401
      // See https://github.com/nodejs/node/pull/30339#issuecomment-552225353
402
      // for details.
403
      bool untransferable;
404
32085
      if (!entry.As<Object>()->HasPrivate(
405
              context,
406
21390
              env->untransferable_object_private_symbol())
407
10695
              .To(&untransferable)) {
408
        return Nothing<bool>();
409
      }
410
10695
      if (untransferable) continue;
411
    }
412
413
    // Currently, we support ArrayBuffers and BaseObjects for which
414
    // GetTransferMode() does not return kUntransferable.
415
10690
    if (entry->IsArrayBuffer()) {
416
23
      Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
417
      // If we cannot render the ArrayBuffer unusable in this Isolate,
418
      // copying the buffer will have to do.
419
      // Note that we can currently transfer ArrayBuffers even if they were
420
      // not allocated by Node’s ArrayBufferAllocator in the first place,
421
      // because we pass the underlying v8::BackingStore around rather than
422
      // raw data *and* an Isolate with a non-default ArrayBuffer allocator
423
      // is always going to outlive any Workers it creates, and so will its
424
      // allocator along with it.
425
45
      if (!ab->IsDetachable()) continue;
426
46
      if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
427
46
          array_buffers.end()) {
428
1
        ThrowDataCloneException(
429
            context,
430
            FIXED_ONE_BYTE_STRING(
431
                env->isolate(),
432
1
                "Transfer list contains duplicate ArrayBuffer"));
433
1
        return Nothing<bool>();
434
      }
435
      // We simply use the array index in the `array_buffers` list as the
436
      // ID that we write into the serialized buffer.
437
22
      uint32_t id = array_buffers.size();
438
22
      array_buffers.push_back(ab);
439
22
      serializer.TransferArrayBuffer(id, ab);
440
22
      continue;
441
21334
    } else if (env->base_object_ctor_template()->HasInstance(entry)) {
442
      // Check if the source MessagePort is being transferred.
443

21334
      if (!source_port.IsEmpty() && entry == source_port) {
444
1
        ThrowDataCloneException(
445
            context,
446
            FIXED_ONE_BYTE_STRING(env->isolate(),
447
1
                                  "Transfer list contains source port"));
448
10
        return Nothing<bool>();
449
      }
450
      BaseObjectPtr<BaseObject> host_object {
451
10666
          Unwrap<BaseObject>(entry.As<Object>()) };
452

31995
      if (env->message_port_constructor_template()->HasInstance(entry) &&
453
21311
          (!host_object ||
454
10655
           static_cast<MessagePort*>(host_object.get())->IsDetached())) {
455
7
        ThrowDataCloneException(
456
            context,
457
            FIXED_ONE_BYTE_STRING(
458
                env->isolate(),
459
7
                "MessagePort in transfer list is already detached"));
460
7
        return Nothing<bool>();
461
      }
462
21318
      if (std::find(delegate.host_objects_.begin(),
463
                    delegate.host_objects_.end(),
464
21318
                    host_object) != delegate.host_objects_.end()) {
465
1
        ThrowDataCloneException(
466
            context,
467
            String::Concat(env->isolate(),
468
                FIXED_ONE_BYTE_STRING(
469
                  env->isolate(),
470
                  "Transfer list contains duplicate "),
471
2
                entry.As<Object>()->GetConstructorName()));
472
1
        return Nothing<bool>();
473
      }
474

10658
      if (host_object && host_object->GetTransferMode() !=
475
10658
              BaseObject::TransferMode::kUntransferable) {
476
10658
        delegate.AddHostObject(host_object);
477
10658
        continue;
478
      }
479
    }
480
481
    THROW_ERR_INVALID_TRANSFER_OBJECT(env);
482
    return Nothing<bool>();
483
  }
484
89940
  if (delegate.AddNestedHostObjects().IsNothing())
485
    return Nothing<bool>();
486
487
44970
  serializer.WriteHeader();
488
89940
  if (serializer.WriteValue(context, input).IsNothing()) {
489
9
    return Nothing<bool>();
490
  }
491
492
44975
  for (Local<ArrayBuffer> ab : array_buffers) {
493
    // If serialization succeeded, we render it inaccessible in this Isolate.
494
28
    std::shared_ptr<BackingStore> backing_store = ab->GetBackingStore();
495
14
    ab->Detach();
496
497
14
    array_buffers_.emplace_back(std::move(backing_store));
498
  }
499
500
89922
  if (delegate.Finish(context).IsNothing())
501
4
    return Nothing<bool>();
502
503
  // The serializer gave us a buffer allocated using `malloc()`.
504
44957
  std::pair<uint8_t*, size_t> data = serializer.Release();
505
44957
  CHECK_NOT_NULL(data.first);
506
  main_message_buf_ =
507
44957
      MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
508
44957
  return Just(true);
509
}
510
511
void Message::MemoryInfo(MemoryTracker* tracker) const {
512
  tracker->TrackField("array_buffers_", array_buffers_);
513
  tracker->TrackField("shared_array_buffers", shared_array_buffers_);
514
  tracker->TrackField("transferables", transferables_);
515
}
516
517
33492
MessagePortData::MessagePortData(MessagePort* owner)
518
33492
    : owner_(owner) {
519
33492
}
520
521
100051
MessagePortData::~MessagePortData() {
522
33351
  CHECK_NULL(owner_);
523
33351
  Disentangle();
524
66767
}
525
526
2
void MessagePortData::MemoryInfo(MemoryTracker* tracker) const {
527
4
  Mutex::ScopedLock lock(mutex_);
528
2
  tracker->TrackField("incoming_messages", incoming_messages_);
529
2
}
530
531
78878
void MessagePortData::AddToIncomingQueue(std::shared_ptr<Message> message) {
532
  // This function will be called by other threads.
533
157756
  Mutex::ScopedLock lock(mutex_);
534
78884
  incoming_messages_.emplace_back(std::move(message));
535
536
78875
  if (owner_ != nullptr) {
537
54430
    Debug(owner_, "Adding message to incoming queue");
538
54430
    owner_->TriggerAsync();
539
  }
540
78888
}
541
542
11336
void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
543
22672
  auto group = std::make_shared<SiblingGroup>();
544
11336
  group->Entangle({a, b});
545
11336
}
546
547
55506
void MessagePortData::Disentangle() {
548
55506
  if (group_) {
549
22666
    group_->Disentangle(this);
550
  }
551
55514
}
552
553
131261
MessagePort::~MessagePort() {
554
32814
  if (data_) Detach();
555
65630
}
556
557
32863
MessagePort::MessagePort(Environment* env,
558
                         Local<Context> context,
559
32863
                         Local<Object> wrap)
560
  : HandleWrap(env,
561
               wrap,
562
32863
               reinterpret_cast<uv_handle_t*>(&async_),
563
               AsyncWrap::PROVIDER_MESSAGEPORT),
564
32863
    data_(new MessagePortData(this)) {
565
95431
  auto onmessage = [](uv_async_t* handle) {
566
    // Called when data has been put into the queue.
567
31284
    MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
568
31284
    channel->OnMessage();
569
95427
  };
570
571
32863
  CHECK_EQ(uv_async_init(env->event_loop(),
572
                         &async_,
573
                         onmessage), 0);
574
  // Reset later to indicate success of the constructor.
575
32863
  bool succeeded = false;
576
98589
  auto cleanup = OnScopeLeave([&]() { if (!succeeded) Close(); });
577
578
  Local<Value> fn;
579
98589
  if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn))
580
    return;
581
582
32863
  if (fn->IsFunction()) {
583
32858
    Local<Function> init = fn.As<Function>();
584
65716
    if (init->Call(context, wrap, 0, nullptr).IsEmpty())
585
      return;
586
  }
587
588
  Local<Function> emit_message_fn;
589
65726
  if (!GetEmitMessageFunction(context).ToLocal(&emit_message_fn))
590
    return;
591
32863
  emit_message_fn_.Reset(env->isolate(), emit_message_fn);
592
593
32863
  succeeded = true;
594
32863
  Debug(this, "Created message port");
595
}
596
597
31956
bool MessagePort::IsDetached() const {
598

31956
  return data_ == nullptr || IsHandleClosing();
599
}
600
601
76092
void MessagePort::TriggerAsync() {
602
76092
  if (IsHandleClosing()) return;
603
76054
  CHECK_EQ(uv_async_send(&async_), 0);
604
}
605
606
32819
void MessagePort::Close(v8::Local<v8::Value> close_callback) {
607
65638
  Debug(this, "Closing message port, data set = %d", static_cast<int>(!!data_));
608
609
32819
  if (data_) {
610
    // Wrap this call with accessing the mutex, so that TriggerAsync()
611
    // can check IsHandleClosing() without race conditions.
612
65630
    Mutex::ScopedLock sibling_lock(data_->mutex_);
613
32816
    HandleWrap::Close(close_callback);
614
  } else {
615
5
    HandleWrap::Close(close_callback);
616
  }
617
32821
}
618
619
2
void MessagePort::New(const FunctionCallbackInfo<Value>& args) {
620
  // This constructor just throws an error. Unfortunately, we can’t use V8’s
621
  // ConstructorBehavior::kThrow, as that also removes the prototype from the
622
  // class (i.e. makes it behave like an arrow function).
623
2
  Environment* env = Environment::GetCurrent(args);
624
2
  THROW_ERR_CONSTRUCT_CALL_INVALID(env);
625
2
}
626
627
32863
MessagePort* MessagePort::New(
628
    Environment* env,
629
    Local<Context> context,
630
    std::unique_ptr<MessagePortData> data,
631
    std::shared_ptr<SiblingGroup> sibling_group) {
632
  Context::Scope context_scope(context);
633
32863
  Local<FunctionTemplate> ctor_templ = GetMessagePortConstructorTemplate(env);
634
635
  // Construct a new instance, then assign the listener instance and possibly
636
  // the MessagePortData to it.
637
  Local<Object> instance;
638
98589
  if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance))
639
    return nullptr;
640
32863
  MessagePort* port = new MessagePort(env, context, instance);
641
32863
  CHECK_NOT_NULL(port);
642
32863
  if (port->IsHandleClosing()) {
643
    // Construction failed with an exception.
644
    return nullptr;
645
  }
646
647
32863
  if (data) {
648
10781
    CHECK(!sibling_group);
649
10781
    port->Detach();
650
10781
    port->data_ = std::move(data);
651
652
    // This lock is here to avoid race conditions with the `owner_` read
653
    // in AddToIncomingQueue(). (This would likely be unproblematic without it,
654
    // but it's better to be safe than sorry.)
655
21562
    Mutex::ScopedLock lock(port->data_->mutex_);
656
10781
    port->data_->owner_ = port;
657
    // If the existing MessagePortData object had pending messages, this is
658
    // the easiest way to run that queue.
659
10781
    port->TriggerAsync();
660
22082
  } else if (sibling_group) {
661
39
    sibling_group->Entangle(port->data_.get());
662
  }
663
32863
  return port;
664
}
665
666
75725
MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
667
                                              bool only_if_receiving) {
668
151468
  std::shared_ptr<Message> received;
669
  {
670
    // Get the head of the message queue.
671
130995
    Mutex::ScopedLock lock(data_->mutex_);
672
673
75741
    Debug(this, "MessagePort has message");
674
675

75744
    bool wants_message = receiving_messages_ || !only_if_receiving;
676
    // We have nothing to do if:
677
    // - There are no pending messages
678
    // - We are not intending to receive messages, and the message we would
679
    //   receive is not the final "close" message.
680

151479
    if (data_->incoming_messages_.empty() ||
681
65526
        (!wants_message &&
682
10267
         !data_->incoming_messages_.front()->IsCloseMessage())) {
683
40954
      return env()->no_message_symbol();
684
    }
685
686
55233
    received = data_->incoming_messages_.front();
687
55269
    data_->incoming_messages_.pop_front();
688
  }
689
690
55270
  if (received->IsCloseMessage()) {
691
22480
    Close();
692
22480
    return env()->no_message_symbol();
693
  }
694
695
44032
  if (!env()->can_call_into_js()) return MaybeLocal<Value>();
696
697
44036
  return received->Deserialize(env(), context);
698
}
699
700
31778
void MessagePort::OnMessage() {
701
31778
  Debug(this, "Running MessagePort::OnMessage()");
702
63492
  HandleScope handle_scope(env()->isolate());
703
63556
  Local<Context> context = object(env()->isolate())->CreationContext();
704
705
  size_t processing_limit;
706
  {
707
31778
    Mutex::ScopedLock(data_->mutex_);
708
63556
    processing_limit = std::max(data_->incoming_messages_.size(),
709
95334
                                static_cast<size_t>(1000));
710
  }
711
712
  // data_ can only ever be modified by the owner thread, so no need to lock.
713
  // However, the message port may be transferred while it is processing
714
  // messages, so we need to check that this handle still owns its `data_` field
715
  // on every iteration.
716

119724
  while (data_) {
717
75656
    if (processing_limit-- == 0) {
718
      // Prevent event loop starvation by only processing those messages without
719
      // interruption that were already present when the OnMessage() call was
720
      // first triggered, but at least 1000 messages because otherwise the
721
      // overhead of repeatedly triggering the uv_async_t instance becomes
722
      // noticable, at least on Windows.
723
      // (That might require more investigation by somebody more familiar with
724
      // Windows.)
725
1
      TriggerAsync();
726
1
      return;
727
    }
728
729
119628
    HandleScope handle_scope(env()->isolate());
730

43973
    Context::Scope context_scope(context);
731
75700
    Local<Function> emit_message = PersistentToLocal::Strong(emit_message_fn_);
732
733
    Local<Value> payload;
734
    Local<Value> message_error;
735
227100
    Local<Value> argv[2];
736
737
    {
738
      // Catch any exceptions from parsing the message itself (not from
739
      // emitting it) as 'messageeror' events.
740
151036
      TryCatchScope try_catch(env());
741
150929
      if (!ReceiveMessage(context, true).ToLocal(&payload)) {
742

6
        if (try_catch.HasCaught() && !try_catch.HasTerminated())
743
6
          message_error = try_catch.Exception();
744
6
        goto reschedule;
745
      }
746
    }
747
151067
    if (payload == env()->no_message_symbol()) break;
748
749
44017
    if (!env()->can_call_into_js()) {
750
      Debug(this, "MessagePort drains queue because !can_call_into_js()");
751
      // In this case there is nothing to do but to drain the current queue.
752
      continue;
753
    }
754
755
44011
    argv[0] = payload;
756
87985
    argv[1] = env()->message_string();
757
758
88000
    if (MakeCallback(emit_message, arraysize(argv), argv).IsEmpty()) {
759
    reschedule:
760
59
      if (!message_error.IsEmpty()) {
761
6
        argv[0] = message_error;
762
12
        argv[1] = env()->messageerror_string();
763
6
        USE(MakeCallback(emit_message, arraysize(argv), argv));
764
      }
765
766
      // Re-schedule OnMessage() execution in case of failure.
767
59
      if (data_)
768
59
        TriggerAsync();
769
59
      return;
770
    }
771
  }
772
}
773
774
32812
void MessagePort::OnClose() {
775
32812
  Debug(this, "MessagePort::OnClose()");
776
32813
  if (data_) {
777
    // Detach() returns move(data_).
778
22170
    Detach()->Disentangle();
779
  }
780
32813
}
781
782
43549
std::unique_ptr<MessagePortData> MessagePort::Detach() {
783
43549
  CHECK(data_);
784
87146
  Mutex::ScopedLock lock(data_->mutex_);
785
43593
  data_->owner_ = nullptr;
786
87187
  return std::move(data_);
787
}
788
789
21296
BaseObject::TransferMode MessagePort::GetTransferMode() const {
790
21296
  if (IsDetached())
791
1
    return BaseObject::TransferMode::kUntransferable;
792
21295
  return BaseObject::TransferMode::kTransferable;
793
}
794
795
10645
std::unique_ptr<TransferData> MessagePort::TransferForMessaging() {
796
21290
  Close();
797
10645
  return Detach();
798
}
799
800
10391
BaseObjectPtr<BaseObject> MessagePortData::Deserialize(
801
    Environment* env,
802
    Local<Context> context,
803
    std::unique_ptr<TransferData> self) {
804
41564
  return BaseObjectPtr<MessagePort> { MessagePort::New(
805
      env, context,
806
41564
      static_unique_pointer_cast<MessagePortData>(std::move(self))) };
807
}
808
809
44979
Maybe<bool> MessagePort::PostMessage(Environment* env,
810
                                     Local<Value> message_v,
811
                                     const TransferList& transfer_v) {
812
44979
  Isolate* isolate = env->isolate();
813
44979
  Local<Object> obj = object(isolate);
814
44979
  Local<Context> context = obj->CreationContext();
815
816
89958
  std::shared_ptr<Message> msg = std::make_shared<Message>();
817
818
  // Per spec, we need to both check if transfer list has the source port, and
819
  // serialize the input message, even if the MessagePort is closed or detached.
820
821
  Maybe<bool> serialization_maybe =
822
44979
      msg->Serialize(env, context, message_v, transfer_v, obj);
823
44979
  if (data_ == nullptr) {
824
2
    return serialization_maybe;
825
  }
826
44977
  if (serialization_maybe.IsNothing()) {
827
20
    return Nothing<bool>();
828
  }
829
830
89914
  std::string error;
831
44957
  Maybe<bool> res = data_->Dispatch(msg, &error);
832
44957
  if (res.IsNothing())
833
    return res;
834
835
44957
  if (!error.empty())
836
2
    ProcessEmitWarning(env, error.c_str());
837
838
44957
  return res;
839
}
840
841
44957
Maybe<bool> MessagePortData::Dispatch(
842
    std::shared_ptr<Message> message,
843
    std::string* error) {
844
44957
  if (!group_) {
845
    if (error != nullptr)
846
      *error = "MessagePortData is not entangled.";
847
    return Nothing<bool>();
848
  }
849
44957
  return group_->Dispatch(this, message, error);
850
}
851
852
10703
static Maybe<bool> ReadIterable(Environment* env,
853
                                Local<Context> context,
854
                                // NOLINTNEXTLINE(runtime/references)
855
                                TransferList& transfer_list,
856
                                Local<Value> object) {
857
10703
  if (!object->IsObject()) return Just(false);
858
859
10700
  if (object->IsArray()) {
860
10680
    Local<Array> arr = object.As<Array>();
861
10680
    size_t length = arr->Length();
862
10680
    transfer_list.AllocateSufficientStorage(length);
863
21373
    for (size_t i = 0; i < length; i++) {
864
32079
      if (!arr->Get(context, i).ToLocal(&transfer_list[i]))
865
        return Nothing<bool>();
866
    }
867
10680
    return Just(true);
868
  }
869
870
20
  Isolate* isolate = env->isolate();
871
  Local<Value> iterator_method;
872
80
  if (!object.As<Object>()->Get(context, Symbol::GetIterator(isolate))
873
20
      .ToLocal(&iterator_method)) return Nothing<bool>();
874
20
  if (!iterator_method->IsFunction()) return Just(false);
875
876
  Local<Value> iterator;
877
18
  if (!iterator_method.As<Function>()->Call(context, object, 0, nullptr)
878
6
      .ToLocal(&iterator)) return Nothing<bool>();
879
6
  if (!iterator->IsObject()) return Just(false);
880
881
  Local<Value> next;
882
24
  if (!iterator.As<Object>()->Get(context, env->next_string()).ToLocal(&next))
883
    return Nothing<bool>();
884
6
  if (!next->IsFunction()) return Just(false);
885
886
6
  std::vector<Local<Value>> entries;
887
7
  while (env->can_call_into_js()) {
888
    Local<Value> result;
889
15
    if (!next.As<Function>()->Call(context, iterator, 0, nullptr)
890
7
        .ToLocal(&result)) return Nothing<bool>();
891
4
    if (!result->IsObject()) return Just(false);
892
893
    Local<Value> done;
894
16
    if (!result.As<Object>()->Get(context, env->done_string()).ToLocal(&done))
895
      return Nothing<bool>();
896
4
    if (done->BooleanValue(isolate)) break;
897
898
    Local<Value> val;
899
8
    if (!result.As<Object>()->Get(context, env->value_string()).ToLocal(&val))
900
      return Nothing<bool>();
901
2
    entries.push_back(val);
902
  }
903
904
2
  transfer_list.AllocateSufficientStorage(entries.size());
905
2
  std::copy(entries.begin(), entries.end(), &transfer_list[0]);
906
2
  return Just(true);
907
}
908
909
44992
void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
910
44992
  Environment* env = Environment::GetCurrent(args);
911
44992
  Local<Object> obj = args.This();
912
44992
  Local<Context> context = obj->CreationContext();
913
914
44992
  if (args.Length() == 0) {
915
    return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
916
13
                                       "MessagePort.postMessage");
917
  }
918
919

156368
  if (!args[1]->IsNullOrUndefined() && !args[1]->IsObject()) {
920
    // Browsers ignore null or undefined, and otherwise accept an array or an
921
    // options object.
922
    return THROW_ERR_INVALID_ARG_TYPE(env,
923
4
        "Optional transferList argument must be an iterable");
924
  }
925
926
89967
  TransferList transfer_list;
927
89976
  if (args[1]->IsObject()) {
928
    bool was_iterable;
929
21384
    if (!ReadIterable(env, context, transfer_list, args[1]).To(&was_iterable))
930
8
      return;
931
10692
    if (!was_iterable) {
932
      Local<Value> transfer_option;
933
65
      if (!args[1].As<Object>()->Get(context, env->transfer_string())
934
21
          .ToLocal(&transfer_option)) return;
935
26
      if (!transfer_option->IsUndefined()) {
936
22
        if (!ReadIterable(env, context, transfer_list, transfer_option)
937
12
            .To(&was_iterable)) return;
938
10
        if (!was_iterable) {
939
          return THROW_ERR_INVALID_ARG_TYPE(env,
940
7
              "Optional options.transfer argument must be an iterable");
941
        }
942
      }
943
    }
944
  }
945
946
44980
  MessagePort* port = Unwrap<MessagePort>(args.This());
947
  // Even if the backing MessagePort object has already been deleted, we still
948
  // want to serialize the message to ensure spec-compliant behavior w.r.t.
949
  // transfers.
950
44980
  if (port == nullptr) {
951
2
    Message msg;
952
1
    USE(msg.Serialize(env, context, args[0], transfer_list, obj));
953
1
    return;
954
  }
955
956
44979
  Maybe<bool> res = port->PostMessage(env, args[0], transfer_list);
957
44979
  if (res.IsJust())
958
134871
    args.GetReturnValue().Set(res.FromJust());
959
}
960
961
23222
void MessagePort::Start() {
962
23222
  Debug(this, "Start receiving messages");
963
23222
  receiving_messages_ = true;
964
46444
  Mutex::ScopedLock lock(data_->mutex_);
965
23222
  if (!data_->incoming_messages_.empty())
966
10822
    TriggerAsync();
967
23222
}
968
969
20325
void MessagePort::Stop() {
970
20325
  Debug(this, "Stop receiving messages");
971
20325
  receiving_messages_ = false;
972
20325
}
973
974
23223
void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
975
  MessagePort* port;
976
23224
  ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
977
23223
  if (!port->data_) {
978
1
    return;
979
  }
980
23222
  port->Start();
981
}
982
983
20516
void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
984
  MessagePort* port;
985
41032
  CHECK(args[0]->IsObject());
986
41223
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
987
20327
  if (!port->data_) {
988
2
    return;
989
  }
990
20325
  port->Stop();
991
}
992
993
6
void MessagePort::CheckType(const FunctionCallbackInfo<Value>& args) {
994
6
  Environment* env = Environment::GetCurrent(args);
995
18
  args.GetReturnValue().Set(
996
18
      GetMessagePortConstructorTemplate(env)->HasInstance(args[0]));
997
6
}
998
999
1198
void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
1000
  MessagePort* port;
1001
2396
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
1002
494
  port->OnMessage();
1003
}
1004
1005
11
void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) {
1006
11
  Environment* env = Environment::GetCurrent(args);
1007

41
  if (!args[0]->IsObject() ||
1008
27
      !env->message_port_constructor_template()->HasInstance(args[0])) {
1009
    return THROW_ERR_INVALID_ARG_TYPE(env,
1010
10
        "The \"port\" argument must be a MessagePort instance");
1011
  }
1012
12
  MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1013
6
  if (port == nullptr) {
1014
    // Return 'no messages' for a closed port.
1015
    args.GetReturnValue().Set(
1016
        Environment::GetCurrent(args)->no_message_symbol());
1017
    return;
1018
  }
1019
1020
  MaybeLocal<Value> payload =
1021
12
      port->ReceiveMessage(port->object()->CreationContext(), false);
1022
6
  if (!payload.IsEmpty())
1023
12
    args.GetReturnValue().Set(payload.ToLocalChecked());
1024
}
1025
1026
5
void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
1027
5
  Environment* env = Environment::GetCurrent(args);
1028

20
  if (!args[0]->IsObject() ||
1029
15
      !env->message_port_constructor_template()->HasInstance(args[0])) {
1030
    return THROW_ERR_INVALID_ARG_TYPE(env,
1031
        "The \"port\" argument must be a MessagePort instance");
1032
  }
1033
10
  MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1034
5
  CHECK_NOT_NULL(port);
1035
1036
5
  Local<Value> context_arg = args[1];
1037
  ContextifyContext* context_wrapper;
1038

15
  if (!context_arg->IsObject() ||
1039
      (context_wrapper = ContextifyContext::ContextFromContextifiedSandbox(
1040
10
          env, context_arg.As<Object>())) == nullptr) {
1041
    return THROW_ERR_INVALID_ARG_TYPE(env, "Invalid context argument");
1042
  }
1043
1044
10
  std::unique_ptr<MessagePortData> data;
1045
5
  if (!port->IsDetached())
1046
5
    data = port->Detach();
1047
1048
5
  Context::Scope context_scope(context_wrapper->context());
1049
  MessagePort* target =
1050
5
      MessagePort::New(env, context_wrapper->context(), std::move(data));
1051
5
  if (target != nullptr)
1052
15
    args.GetReturnValue().Set(target->object());
1053
}
1054
1055
10707
void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
1056
10707
  MessagePortData::Entangle(a->data_.get(), b->data_.get());
1057
10707
}
1058
1059
629
void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
1060
629
  MessagePortData::Entangle(a->data_.get(), b);
1061
629
}
1062
1063
2
void MessagePort::MemoryInfo(MemoryTracker* tracker) const {
1064
2
  tracker->TrackField("data", data_);
1065
2
  tracker->TrackField("emit_message_fn", emit_message_fn_);
1066
2
}
1067
1068
33747
Local<FunctionTemplate> GetMessagePortConstructorTemplate(Environment* env) {
1069
  // Factor generating the MessagePort JS constructor into its own piece
1070
  // of code, because it is needed early on in the child environment setup.
1071
33747
  Local<FunctionTemplate> templ = env->message_port_constructor_template();
1072
33747
  if (!templ.IsEmpty())
1073
33308
    return templ;
1074
1075
  {
1076
439
    Local<FunctionTemplate> m = env->NewFunctionTemplate(MessagePort::New);
1077
878
    m->SetClassName(env->message_port_constructor_string());
1078
1317
    m->InstanceTemplate()->SetInternalFieldCount(
1079
439
        MessagePort::kInternalFieldCount);
1080
878
    m->Inherit(HandleWrap::GetConstructorTemplate(env));
1081
1082
439
    env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage);
1083
439
    env->SetProtoMethod(m, "start", MessagePort::Start);
1084
1085
439
    env->set_message_port_constructor_template(m);
1086
  }
1087
1088
439
  return GetMessagePortConstructorTemplate(env);
1089
}
1090
1091
3251
JSTransferable::JSTransferable(Environment* env, Local<Object> obj)
1092
3251
    : BaseObject(env, obj) {
1093
3251
  MakeWeak();
1094
3251
}
1095
1096
3251
void JSTransferable::New(const FunctionCallbackInfo<Value>& args) {
1097
3251
  CHECK(args.IsConstructCall());
1098
6502
  new JSTransferable(Environment::GetCurrent(args), args.This());
1099
3251
}
1100
1101
23
JSTransferable::TransferMode JSTransferable::GetTransferMode() const {
1102
  // Implement `kClone in this ? kCloneable : kTransferable`.
1103
46
  HandleScope handle_scope(env()->isolate());
1104
46
  errors::TryCatchScope ignore_exceptions(env());
1105
1106
  bool has_clone;
1107
92
  if (!object()->Has(env()->context(),
1108
115
                     env()->messaging_clone_symbol()).To(&has_clone)) {
1109
    return TransferMode::kUntransferable;
1110
  }
1111
1112
23
  return has_clone ? TransferMode::kCloneable : TransferMode::kTransferable;
1113
}
1114
1115
10
std::unique_ptr<TransferData> JSTransferable::TransferForMessaging() {
1116
10
  return TransferOrClone(TransferMode::kTransferable);
1117
}
1118
1119
5
std::unique_ptr<TransferData> JSTransferable::CloneForMessaging() const {
1120
5
  return TransferOrClone(TransferMode::kCloneable);
1121
}
1122
1123
15
std::unique_ptr<TransferData> JSTransferable::TransferOrClone(
1124
    TransferMode mode) const {
1125
  // Call `this[symbol]()` where `symbol` is `kClone` or `kTransfer`,
1126
  // which should return an object with `data` and `deserializeInfo` properties;
1127
  // `data` is written to the serializer later, and `deserializeInfo` is stored
1128
  // on the `TransferData` instance as a string.
1129
30
  HandleScope handle_scope(env()->isolate());
1130
15
  Local<Context> context = env()->isolate()->GetCurrentContext();
1131
  Local<Symbol> method_name = mode == TransferMode::kCloneable ?
1132
15
      env()->messaging_clone_symbol() : env()->messaging_transfer_symbol();
1133
1134
  Local<Value> method;
1135
45
  if (!object()->Get(context, method_name).ToLocal(&method)) {
1136
    return {};
1137
  }
1138
15
  if (method->IsFunction()) {
1139
    Local<Value> result_v;
1140
48
    if (!method.As<Function>()->Call(
1141
48
            context, object(), 0, nullptr).ToLocal(&result_v)) {
1142
15
      return {};
1143
    }
1144
1145
9
    if (result_v->IsObject()) {
1146
9
      Local<Object> result = result_v.As<Object>();
1147
      Local<Value> data;
1148
      Local<Value> deserialize_info;
1149

45
      if (!result->Get(context, env()->data_string()).ToLocal(&data) ||
1150
36
          !result->Get(context, env()->deserialize_info_string())
1151
9
              .ToLocal(&deserialize_info)) {
1152
        return {};
1153
      }
1154
18
      Utf8Value deserialize_info_str(env()->isolate(), deserialize_info);
1155
9
      if (*deserialize_info_str == nullptr) return {};
1156
18
      return std::make_unique<Data>(
1157
45
          *deserialize_info_str, Global<Value>(env()->isolate(), data));
1158
    }
1159
  }
1160
1161
3
  if (mode == TransferMode::kTransferable)
1162
    return TransferOrClone(TransferMode::kCloneable);
1163
  else
1164
3
    return {};
1165
}
1166
1167
Maybe<BaseObjectList>
1168
10
JSTransferable::NestedTransferables() const {
1169
  // Call `this[kTransferList]()` and return the resulting list of BaseObjects.
1170
20
  HandleScope handle_scope(env()->isolate());
1171
10
  Local<Context> context = env()->isolate()->GetCurrentContext();
1172
10
  Local<Symbol> method_name = env()->messaging_transfer_list_symbol();
1173
1174
  Local<Value> method;
1175
30
  if (!object()->Get(context, method_name).ToLocal(&method)) {
1176
    return Nothing<BaseObjectList>();
1177
  }
1178
10
  if (!method->IsFunction()) return Just(BaseObjectList {});
1179
1180
  Local<Value> list_v;
1181
40
  if (!method.As<Function>()->Call(
1182
40
          context, object(), 0, nullptr).ToLocal(&list_v)) {
1183
    return Nothing<BaseObjectList>();
1184
  }
1185
10
  if (!list_v->IsArray()) return Just(BaseObjectList {});
1186
10
  Local<Array> list = list_v.As<Array>();
1187
1188
20
  BaseObjectList ret;
1189
40
  for (size_t i = 0; i < list->Length(); i++) {
1190
    Local<Value> value;
1191
30
    if (!list->Get(context, i).ToLocal(&value))
1192
      return Nothing<BaseObjectList>();
1193
20
    if (env()->base_object_ctor_template()->HasInstance(value))
1194
10
      ret.emplace_back(Unwrap<BaseObject>(value));
1195
  }
1196
10
  return Just(ret);
1197
}
1198
1199
4
Maybe<bool> JSTransferable::FinalizeTransferRead(
1200
    Local<Context> context, ValueDeserializer* deserializer) {
1201
  // Call `this[kDeserialize](data)` where `data` comes from the return value
1202
  // of `this[kTransfer]()` or `this[kClone]()`.
1203
8
  HandleScope handle_scope(env()->isolate());
1204
  Local<Value> data;
1205
8
  if (!deserializer->ReadValue(context).ToLocal(&data)) return Nothing<bool>();
1206
1207
4
  Local<Symbol> method_name = env()->messaging_deserialize_symbol();
1208
  Local<Value> method;
1209
12
  if (!object()->Get(context, method_name).ToLocal(&method)) {
1210
    return Nothing<bool>();
1211
  }
1212
4
  if (!method->IsFunction()) return Just(true);
1213
1214
16
  if (method.As<Function>()->Call(context, object(), 1, &data).IsEmpty()) {
1215
    return Nothing<bool>();
1216
  }
1217
4
  return Just(true);
1218
}
1219
1220
9
JSTransferable::Data::Data(std::string&& deserialize_info,
1221
9
                           v8::Global<v8::Value>&& data)
1222
9
    : deserialize_info_(std::move(deserialize_info)),
1223
27
      data_(std::move(data)) {}
1224
1225
7
BaseObjectPtr<BaseObject> JSTransferable::Data::Deserialize(
1226
    Environment* env,
1227
    Local<Context> context,
1228
    std::unique_ptr<TransferData> self) {
1229
  // Create the JS wrapper object that will later be filled with data passed to
1230
  // the `[kDeserialize]()` method on it. This split is necessary, because here
1231
  // we need to create an object with the right prototype and internal fields,
1232
  // but the actual JS data stored in the serialized data can only be read at
1233
  // the end of the stream, after the main message has been read.
1234
1235
14
  if (context != env->context()) {
1236
1
    THROW_ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE(env);
1237
1
    return {};
1238
  }
1239
12
  HandleScope handle_scope(env->isolate());
1240
  Local<Value> info;
1241
12
  if (!ToV8Value(context, deserialize_info_).ToLocal(&info)) return {};
1242
1243
  Local<Value> ret;
1244
12
  CHECK(!env->messaging_deserialize_create_object().IsEmpty());
1245
36
  if (!env->messaging_deserialize_create_object()->Call(
1246

34
          context, Null(env->isolate()), 1, &info).ToLocal(&ret) ||
1247
14
      !env->base_object_ctor_template()->HasInstance(ret)) {
1248
2
    return {};
1249
  }
1250
1251
4
  return BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(ret) };
1252
}
1253
1254
9
Maybe<bool> JSTransferable::Data::FinalizeTransferWrite(
1255
    Local<Context> context, ValueSerializer* serializer) {
1256
18
  HandleScope handle_scope(context->GetIsolate());
1257
9
  auto ret = serializer->WriteValue(context, PersistentToLocal::Strong(data_));
1258
9
  data_.Reset();
1259
18
  return ret;
1260
}
1261
1262
39
std::shared_ptr<SiblingGroup> SiblingGroup::Get(const std::string& name) {
1263
78
  Mutex::ScopedLock lock(SiblingGroup::groups_mutex_);
1264
39
  std::shared_ptr<SiblingGroup> group;
1265
39
  auto i = groups_.find(name);
1266

39
  if (i == groups_.end() || i->second.expired()) {
1267
19
    group = std::make_shared<SiblingGroup>(name);
1268
19
    groups_[name] = group;
1269
  } else {
1270
20
    group = i->second.lock();
1271
  }
1272
78
  return group;
1273
}
1274
1275
19
void SiblingGroup::CheckSiblingGroup(const std::string& name) {
1276
38
  Mutex::ScopedLock lock(SiblingGroup::groups_mutex_);
1277
19
  auto i = groups_.find(name);
1278

19
  if (i != groups_.end() && i->second.expired())
1279
19
    groups_.erase(name);
1280
19
}
1281
1282
19
SiblingGroup::SiblingGroup(const std::string& name)
1283
19
    : name_(name) { }
1284
1285
22624
SiblingGroup::~SiblingGroup() {
1286
  // If this is a named group, check to see if we can remove the group
1287
11312
  if (!name_.empty())
1288
19
    CheckSiblingGroup(name_);
1289
11312
}
1290
1291
44957
Maybe<bool> SiblingGroup::Dispatch(
1292
    MessagePortData* source,
1293
    std::shared_ptr<Message> message,
1294
    std::string* error) {
1295
1296
89914
  Mutex::ScopedLock lock(group_mutex_);
1297
1298
  // The source MessagePortData is not part of this group.
1299
44957
  if (ports_.find(source) == ports_.end()) {
1300
    if (error != nullptr)
1301
      *error = "Source MessagePort is not entangled with this group.";
1302
    return Nothing<bool>();
1303
  }
1304
1305
  // There are no destination ports.
1306
44957
  if (size() <= 1)
1307
81
    return Just(false);
1308
1309
  // Transferables cannot be used when there is more
1310
  // than a single destination.
1311

44876
  if (size() > 2 && message->has_transferables()) {
1312
    if (error != nullptr)
1313
      *error = "Transferables cannot be used with multiple destinations.";
1314
    return Nothing<bool>();
1315
  }
1316
1317
134636
  for (MessagePortData* port : ports_) {
1318
89762
    if (port == source)
1319
44875
      continue;
1320
    // This loop should only be entered if there's only a single destination
1321
55555
    for (const auto& transferable : message->transferables()) {
1322
10670
      if (port == transferable.get()) {
1323
2
        if (error != nullptr) {
1324
          *error = "The target port was posted to itself, and the "
1325
2
                   "communication channel was lost";
1326
        }
1327
2
        return Just(true);
1328
      }
1329
    }
1330
44885
    port->AddToIncomingQueue(message);
1331
  }
1332
1333
44874
  return Just(true);
1334
}
1335
1336
39
void SiblingGroup::Entangle(MessagePortData* port) {
1337
39
  Entangle({ port });
1338
39
}
1339
1340
11375
void SiblingGroup::Entangle(std::initializer_list<MessagePortData*> ports) {
1341
22750
  Mutex::ScopedLock lock(group_mutex_);
1342
34086
  for (MessagePortData* data : ports) {
1343
22711
    ports_.insert(data);
1344
22711
    CHECK(!data->group_);
1345
22711
    data->group_ = shared_from_this();
1346
  }
1347
11375
}
1348
1349
22666
void SiblingGroup::Disentangle(MessagePortData* data) {
1350
45334
  auto self = shared_from_this();  // Keep alive until end of function.
1351
45335
  Mutex::ScopedLock lock(group_mutex_);
1352
22668
  ports_.erase(data);
1353
22665
  data->group_.reset();
1354
1355
22664
  data->AddToIncomingQueue(std::make_shared<Message>());
1356
  // If this is an anonymous group and there's another port, close it.
1357

22667
  if (size() == 1 && name_.empty())
1358
11332
    (*(ports_.begin()))->AddToIncomingQueue(std::make_shared<Message>());
1359
22668
}
1360
1361
4665
SiblingGroup::Map SiblingGroup::groups_;
1362
4665
Mutex SiblingGroup::groups_mutex_;
1363
1364
namespace {
1365
1366
439
static void SetDeserializerCreateObjectFunction(
1367
    const FunctionCallbackInfo<Value>& args) {
1368
439
  Environment* env = Environment::GetCurrent(args);
1369
878
  CHECK(args[0]->IsFunction());
1370
878
  env->set_messaging_deserialize_create_object(args[0].As<Function>());
1371
439
}
1372
1373
10708
static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
1374
10708
  Environment* env = Environment::GetCurrent(args);
1375
10708
  if (!args.IsConstructCall()) {
1376
1
    THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
1377
1
    return;
1378
  }
1379
1380
21414
  Local<Context> context = args.This()->CreationContext();
1381
10707
  Context::Scope context_scope(context);
1382
1383
10707
  MessagePort* port1 = MessagePort::New(env, context);
1384
10707
  if (port1 == nullptr) return;
1385
10707
  MessagePort* port2 = MessagePort::New(env, context);
1386
10707
  if (port2 == nullptr) {
1387
    port1->Close();
1388
    return;
1389
  }
1390
1391
10707
  MessagePort::Entangle(port1, port2);
1392
1393
53535
  args.This()->Set(context, env->port1_string(), port1->object())
1394
      .Check();
1395
53535
  args.This()->Set(context, env->port2_string(), port2->object())
1396
      .Check();
1397
}
1398
1399
39
static void BroadcastChannel(const FunctionCallbackInfo<Value>& args) {
1400
117
  CHECK(args[0]->IsString());
1401
39
  Environment* env = Environment::GetCurrent(args);
1402
39
  Context::Scope context_scope(env->context());
1403
78
  Utf8Value name(env->isolate(), args[0]);
1404
  MessagePort* port =
1405
39
      MessagePort::New(env, env->context(), {}, SiblingGroup::Get(*name));
1406
39
  if (port != nullptr) {
1407
117
    args.GetReturnValue().Set(port->object());
1408
  }
1409
39
}
1410
1411
439
static void InitMessaging(Local<Object> target,
1412
                          Local<Value> unused,
1413
                          Local<Context> context,
1414
                          void* priv) {
1415
439
  Environment* env = Environment::GetCurrent(context);
1416
1417
  {
1418
878
    env->SetConstructorFunction(
1419
        target,
1420
        "MessageChannel",
1421
439
        env->NewFunctionTemplate(MessageChannel));
1422
  }
1423
1424
  {
1425
439
    Local<FunctionTemplate> t = env->NewFunctionTemplate(JSTransferable::New);
1426
878
    t->Inherit(BaseObject::GetConstructorTemplate(env));
1427
1317
    t->InstanceTemplate()->SetInternalFieldCount(
1428
439
        JSTransferable::kInternalFieldCount);
1429
439
    env->SetConstructorFunction(target, "JSTransferable", t);
1430
  }
1431
1432
439
  env->SetConstructorFunction(
1433
      target,
1434
      env->message_port_constructor_string(),
1435
439
      GetMessagePortConstructorTemplate(env));
1436
1437
  // These are not methods on the MessagePort prototype, because
1438
  // the browser equivalents do not provide them.
1439
439
  env->SetMethod(target, "stopMessagePort", MessagePort::Stop);
1440
439
  env->SetMethod(target, "checkMessagePort", MessagePort::CheckType);
1441
439
  env->SetMethod(target, "drainMessagePort", MessagePort::Drain);
1442
439
  env->SetMethod(target, "receiveMessageOnPort", MessagePort::ReceiveMessage);
1443
  env->SetMethod(target, "moveMessagePortToContext",
1444
439
                 MessagePort::MoveToContext);
1445
  env->SetMethod(target, "setDeserializerCreateObjectFunction",
1446
439
                 SetDeserializerCreateObjectFunction);
1447
439
  env->SetMethod(target, "broadcastChannel", BroadcastChannel);
1448
1449
  {
1450
878
    Local<Function> domexception = GetDOMException(context).ToLocalChecked();
1451
    target
1452
878
        ->Set(context,
1453
              FIXED_ONE_BYTE_STRING(env->isolate(), "DOMException"),
1454
1317
              domexception)
1455
        .Check();
1456
  }
1457
439
}
1458
1459
4591
static void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
1460
4591
  registry->Register(MessageChannel);
1461
4591
  registry->Register(BroadcastChannel);
1462
4591
  registry->Register(JSTransferable::New);
1463
4591
  registry->Register(MessagePort::New);
1464
4591
  registry->Register(MessagePort::PostMessage);
1465
4591
  registry->Register(MessagePort::Start);
1466
4591
  registry->Register(MessagePort::Stop);
1467
4591
  registry->Register(MessagePort::CheckType);
1468
4591
  registry->Register(MessagePort::Drain);
1469
4591
  registry->Register(MessagePort::ReceiveMessage);
1470
4591
  registry->Register(MessagePort::MoveToContext);
1471
4591
  registry->Register(SetDeserializerCreateObjectFunction);
1472
4591
}
1473
1474
}  // anonymous namespace
1475
1476
}  // namespace worker
1477
}  // namespace node
1478
1479
4657
NODE_MODULE_CONTEXT_AWARE_INTERNAL(messaging, node::worker::InitMessaging)
1480

18586
NODE_MODULE_EXTERNAL_REFERENCE(messaging,
1481
                               node::worker::RegisterExternalReferences)