GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/node_messaging.cc Lines: 744 804 92.5 %
Date: 2021-06-07 04:11:51 Branches: 376 511 73.6 %

Line Branch Exec Source
1
#include "node_messaging.h"
2
3
#include "async_wrap-inl.h"
4
#include "debug_utils-inl.h"
5
#include "memory_tracker-inl.h"
6
#include "node_buffer.h"
7
#include "node_contextify.h"
8
#include "node_errors.h"
9
#include "node_external_reference.h"
10
#include "node_process-inl.h"
11
#include "util-inl.h"
12
13
using node::contextify::ContextifyContext;
14
using node::errors::TryCatchScope;
15
using v8::Array;
16
using v8::ArrayBuffer;
17
using v8::BackingStore;
18
using v8::CompiledWasmModule;
19
using v8::Context;
20
using v8::EscapableHandleScope;
21
using v8::Function;
22
using v8::FunctionCallbackInfo;
23
using v8::FunctionTemplate;
24
using v8::Global;
25
using v8::HandleScope;
26
using v8::Isolate;
27
using v8::Just;
28
using v8::Local;
29
using v8::Maybe;
30
using v8::MaybeLocal;
31
using v8::Nothing;
32
using v8::Object;
33
using v8::SharedArrayBuffer;
34
using v8::String;
35
using v8::Symbol;
36
using v8::Value;
37
using v8::ValueDeserializer;
38
using v8::ValueSerializer;
39
using v8::WasmModuleObject;
40
41
namespace node {
42
43
using BaseObjectList = std::vector<BaseObjectPtr<BaseObject>>;
44
45
1
BaseObject::TransferMode BaseObject::GetTransferMode() const {
46
1
  return BaseObject::TransferMode::kUntransferable;
47
}
48
49
std::unique_ptr<worker::TransferData> BaseObject::TransferForMessaging() {
50
  return CloneForMessaging();
51
}
52
53
std::unique_ptr<worker::TransferData> BaseObject::CloneForMessaging() const {
54
  return {};
55
}
56
57
10685
Maybe<BaseObjectList> BaseObject::NestedTransferables() const {
58
10685
  return Just(BaseObjectList {});
59
}
60
61
10429
Maybe<bool> BaseObject::FinalizeTransferRead(
62
    Local<Context> context, ValueDeserializer* deserializer) {
63
10429
  return Just(true);
64
}
65
66
namespace worker {
67
68
10696
Maybe<bool> TransferData::FinalizeTransferWrite(
69
    Local<Context> context, ValueSerializer* serializer) {
70
10696
  return Just(true);
71
}
72
73
83443
Message::Message(MallocedBuffer<char>&& buffer)
74
83443
    : main_message_buf_(std::move(buffer)) {}
75
76
117841
bool Message::IsCloseMessage() const {
77
117841
  return main_message_buf_.data == nullptr;
78
}
79
80
namespace {
81
82
// This is used to tell V8 how to read transferred host objects, like other
83
// `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
84
48258
class DeserializerDelegate : public ValueDeserializer::Delegate {
85
 public:
86
48224
  DeserializerDelegate(
87
      Message* m,
88
      Environment* env,
89
      const std::vector<BaseObjectPtr<BaseObject>>& host_objects,
90
      const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
91
      const std::vector<CompiledWasmModule>& wasm_modules)
92
48224
      : host_objects_(host_objects),
93
        shared_array_buffers_(shared_array_buffers),
94
48224
        wasm_modules_(wasm_modules) {}
95
96
10439
  MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
97
    // Identifying the index in the message's BaseObject array is sufficient.
98
    uint32_t id;
99
10439
    if (!deserializer->ReadUint32(&id))
100
      return MaybeLocal<Object>();
101
10439
    CHECK_LE(id, host_objects_.size());
102
20878
    return host_objects_[id]->object(isolate);
103
  }
104
105
431
  MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
106
      Isolate* isolate, uint32_t clone_id) override {
107
431
    CHECK_LE(clone_id, shared_array_buffers_.size());
108
862
    return shared_array_buffers_[clone_id];
109
  }
110
111
2
  MaybeLocal<WasmModuleObject> GetWasmModuleFromId(
112
      Isolate* isolate, uint32_t transfer_id) override {
113
2
    CHECK_LE(transfer_id, wasm_modules_.size());
114
    return WasmModuleObject::FromCompiledModule(
115
2
        isolate, wasm_modules_[transfer_id]);
116
  }
117
118
  ValueDeserializer* deserializer = nullptr;
119
120
 private:
121
  const std::vector<BaseObjectPtr<BaseObject>>& host_objects_;
122
  const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
123
  const std::vector<CompiledWasmModule>& wasm_modules_;
124
};
125
126
}  // anonymous namespace
127
128
48271
MaybeLocal<Value> Message::Deserialize(Environment* env,
129
                                       Local<Context> context,
130
                                       Local<Value>* port_list) {
131
  Context::Scope context_scope(context);
132
133
48262
  CHECK(!IsCloseMessage());
134

48258
  if (port_list != nullptr && !transferables_.empty()) {
135
    // Need to create this outside of the EscapableHandleScope, but inside
136
    // the Context::Scope.
137
20862
    *port_list = Array::New(env->isolate());
138
  }
139
140
48229
  EscapableHandleScope handle_scope(env->isolate());
141
142
  // Create all necessary objects for transferables, e.g. MessagePort handles.
143
96497
  std::vector<BaseObjectPtr<BaseObject>> host_objects(transferables_.size());
144
48268
  auto cleanup = OnScopeLeave([&]() {
145
48268
    for (BaseObjectPtr<BaseObject> object : host_objects) {
146
9
      if (!object) continue;
147
148
      // If the function did not finish successfully, host_objects will contain
149
      // a list of objects that will never be passed to JS. Therefore, we
150
      // destroy them here.
151
      object->Detach();
152
    }
153
144762
  });
154
155
58668
  for (uint32_t i = 0; i < transferables_.size(); ++i) {
156
20884
    HandleScope handle_scope(env->isolate());
157
10445
    TransferData* data = transferables_[i].get();
158
20890
    host_objects[i] = data->Deserialize(
159
20890
        env, context, std::move(transferables_[i]));
160
10451
    if (!host_objects[i]) return {};
161
10439
    if (port_list != nullptr) {
162
      // If we gather a list of all message ports, and this transferred object
163
      // is a message port, add it to that list. This is a bit of an odd case
164
      // of special handling for MessagePorts (as opposed to applying to all
165
      // transferables), but it's required for spec compliance.
166
      DCHECK((*port_list)->IsArray());
167
10439
      Local<Array> port_list_array = port_list->As<Array>();
168
10439
      Local<Object> obj = host_objects[i]->object();
169
20878
      if (env->message_port_constructor_template()->HasInstance(obj)) {
170
31239
        if (port_list_array->Set(context,
171
                                 port_list_array->Length(),
172
20826
                                 obj).IsNothing()) {
173
          return {};
174
        }
175
      }
176
    }
177
  }
178
48225
  transferables_.clear();
179
180
96496
  std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
181
  // Attach all transferred SharedArrayBuffers to their new Isolate.
182
48657
  for (uint32_t i = 0; i < shared_array_buffers_.size(); ++i) {
183
    Local<SharedArrayBuffer> sab =
184
431
        SharedArrayBuffer::New(env->isolate(), shared_array_buffers_[i]);
185
431
    shared_array_buffers.push_back(sab);
186
  }
187
188
  DeserializerDelegate delegate(
189
96497
      this, env, host_objects, shared_array_buffers, wasm_modules_);
190
  ValueDeserializer deserializer(
191
      env->isolate(),
192
48223
      reinterpret_cast<const uint8_t*>(main_message_buf_.data),
193
      main_message_buf_.size,
194
144707
      &delegate);
195
48272
  delegate.deserializer = &deserializer;
196
197
  // Attach all transferred ArrayBuffers to their new Isolate.
198
48281
  for (uint32_t i = 0; i < array_buffers_.size(); ++i) {
199
    Local<ArrayBuffer> ab =
200
9
        ArrayBuffer::New(env->isolate(), std::move(array_buffers_[i]));
201
9
    deserializer.TransferArrayBuffer(i, ab);
202
  }
203
204
96548
  if (deserializer.ReadHeader(context).IsNothing())
205
    return {};
206
  Local<Value> return_value;
207
96543
  if (!deserializer.ReadValue(context).ToLocal(&return_value))
208
    return {};
209
210
58706
  for (BaseObjectPtr<BaseObject> base_object : host_objects) {
211
20878
    if (base_object->FinalizeTransferRead(context, &deserializer).IsNothing())
212
      return {};
213
  }
214
215
48247
  host_objects.clear();
216
48261
  return handle_scope.Escape(return_value);
217
}
218
219
684
void Message::AddSharedArrayBuffer(
220
    std::shared_ptr<BackingStore> backing_store) {
221
684
  shared_array_buffers_.emplace_back(std::move(backing_store));
222
684
}
223
224
10710
void Message::AddTransferable(std::unique_ptr<TransferData>&& data) {
225
10710
  transferables_.emplace_back(std::move(data));
226
10710
}
227
228
2
uint32_t Message::AddWASMModule(CompiledWasmModule&& mod) {
229
2
  wasm_modules_.emplace_back(std::move(mod));
230
2
  return wasm_modules_.size() - 1;
231
}
232
233
namespace {
234
235
33001
MaybeLocal<Function> GetEmitMessageFunction(Local<Context> context) {
236
33001
  Isolate* isolate = context->GetIsolate();
237
  Local<Object> per_context_bindings;
238
  Local<Value> emit_message_val;
239

132004
  if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
240
99003
      !per_context_bindings->Get(context,
241
99003
                                FIXED_ONE_BYTE_STRING(isolate, "emitMessage"))
242
33001
          .ToLocal(&emit_message_val)) {
243
    return MaybeLocal<Function>();
244
  }
245
33001
  CHECK(emit_message_val->IsFunction());
246
33001
  return emit_message_val.As<Function>();
247
}
248
249
475
MaybeLocal<Function> GetDOMException(Local<Context> context) {
250
475
  Isolate* isolate = context->GetIsolate();
251
  Local<Object> per_context_bindings;
252
  Local<Value> domexception_ctor_val;
253

1900
  if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
254
1425
      !per_context_bindings->Get(context,
255
1425
                                FIXED_ONE_BYTE_STRING(isolate, "DOMException"))
256
475
          .ToLocal(&domexception_ctor_val)) {
257
    return MaybeLocal<Function>();
258
  }
259
475
  CHECK(domexception_ctor_val->IsFunction());
260
475
  Local<Function> domexception_ctor = domexception_ctor_val.As<Function>();
261
475
  return domexception_ctor;
262
}
263
264
16
void ThrowDataCloneException(Local<Context> context, Local<String> message) {
265
16
  Isolate* isolate = context->GetIsolate();
266
  Local<Value> argv[] = {message,
267
48
                         FIXED_ONE_BYTE_STRING(isolate, "DataCloneError")};
268
  Local<Value> exception;
269
  Local<Function> domexception_ctor;
270

64
  if (!GetDOMException(context).ToLocal(&domexception_ctor) ||
271
64
      !domexception_ctor->NewInstance(context, arraysize(argv), argv)
272
16
           .ToLocal(&exception)) {
273
    return;
274
  }
275
16
  isolate->ThrowException(exception);
276
}
277
278
// This tells V8 how to serialize objects that it does not understand
279
// (e.g. C++ objects) into the output buffer, in a way that our own
280
// DeserializerDelegate understands how to unpack.
281
49245
class SerializerDelegate : public ValueSerializer::Delegate {
282
 public:
283
49236
  SerializerDelegate(Environment* env, Local<Context> context, Message* m)
284
49236
      : env_(env), context_(context), msg_(m) {}
285
286
6
  void ThrowDataCloneError(Local<String> message) override {
287
6
    ThrowDataCloneException(context_, message);
288
6
  }
289
290
10717
  Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
291
21434
    if (env_->base_object_ctor_template()->HasInstance(object)) {
292
      return WriteHostObject(
293
10717
          BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(object) });
294
    }
295
296
    ThrowDataCloneError(env_->clone_unsupported_type_str());
297
    return Nothing<bool>();
298
  }
299
300
684
  Maybe<uint32_t> GetSharedArrayBufferId(
301
      Isolate* isolate,
302
      Local<SharedArrayBuffer> shared_array_buffer) override {
303
    uint32_t i;
304
709
    for (i = 0; i < seen_shared_array_buffers_.size(); ++i) {
305
50
      if (PersistentToLocal::Strong(seen_shared_array_buffers_[i]) ==
306
          shared_array_buffer) {
307
        return Just(i);
308
      }
309
    }
310
311
684
    seen_shared_array_buffers_.emplace_back(
312
1368
      Global<SharedArrayBuffer> { isolate, shared_array_buffer });
313
684
    msg_->AddSharedArrayBuffer(shared_array_buffer->GetBackingStore());
314
684
    return Just(i);
315
  }
316
317
2
  Maybe<uint32_t> GetWasmModuleTransferId(
318
      Isolate* isolate, Local<WasmModuleObject> module) override {
319
2
    return Just(msg_->AddWASMModule(module->GetCompiledModule()));
320
  }
321
322
49225
  Maybe<bool> Finish(Local<Context> context) {
323
59935
    for (uint32_t i = 0; i < host_objects_.size(); i++) {
324
21424
      BaseObjectPtr<BaseObject> host_object = std::move(host_objects_[i]);
325
21424
      std::unique_ptr<TransferData> data;
326
10714
      if (i < first_cloned_object_index_)
327
10689
        data = host_object->TransferForMessaging();
328
10714
      if (!data)
329
28
        data = host_object->CloneForMessaging();
330
10714
      if (!data) return Nothing<bool>();
331
21422
      if (data->FinalizeTransferWrite(context, serializer).IsNothing())
332
1
        return Nothing<bool>();
333
10710
      msg_->AddTransferable(std::move(data));
334
    }
335
49222
    return Just(true);
336
  }
337
338
10696
  inline void AddHostObject(BaseObjectPtr<BaseObject> host_object) {
339
    // Make sure we have not started serializing the value itself yet.
340
10696
    CHECK_EQ(first_cloned_object_index_, SIZE_MAX);
341
10696
    host_objects_.emplace_back(std::move(host_object));
342
10696
  }
343
344
  // Some objects in the transfer list may register sub-objects that can be
345
  // transferred. This could e.g. be a public JS wrapper object, such as a
346
  // FileHandle, that is registering its C++ handle for transfer.
347
49227
  inline Maybe<bool> AddNestedHostObjects() {
348
59922
    for (size_t i = 0; i < host_objects_.size(); i++) {
349
21390
      std::vector<BaseObjectPtr<BaseObject>> nested_transferables;
350
21390
      if (!host_objects_[i]->NestedTransferables().To(&nested_transferables))
351
        return Nothing<bool>();
352

10705
      for (auto nested_transferable : nested_transferables) {
353
20
        if (std::find(host_objects_.begin(),
354
                      host_objects_.end(),
355
20
                      nested_transferable) == host_objects_.end()) {
356
10
          AddHostObject(nested_transferable);
357
        }
358
      }
359
    }
360
49227
    return Just(true);
361
  }
362
363
  ValueSerializer* serializer = nullptr;
364
365
 private:
366
10717
  Maybe<bool> WriteHostObject(BaseObjectPtr<BaseObject> host_object) {
367
10717
    BaseObject::TransferMode mode = host_object->GetTransferMode();
368
10717
    if (mode == BaseObject::TransferMode::kUntransferable) {
369
2
      ThrowDataCloneError(env_->clone_unsupported_type_str());
370
2
      return Nothing<bool>();
371
    }
372
373
10735
    for (uint32_t i = 0; i < host_objects_.size(); i++) {
374
10706
      if (host_objects_[i] == host_object) {
375
10686
        serializer->WriteUint32(i);
376
10686
        return Just(true);
377
      }
378
    }
379
380
29
    if (mode == BaseObject::TransferMode::kTransferable) {
381
4
      THROW_ERR_MISSING_TRANSFERABLE_IN_TRANSFER_LIST(env_);
382
4
      return Nothing<bool>();
383
    }
384
385
25
    CHECK_EQ(mode, BaseObject::TransferMode::kCloneable);
386
25
    uint32_t index = host_objects_.size();
387
25
    if (first_cloned_object_index_ == SIZE_MAX)
388
18
      first_cloned_object_index_ = index;
389
25
    serializer->WriteUint32(index);
390
25
    host_objects_.push_back(host_object);
391
25
    return Just(true);
392
  }
393
394
  Environment* env_;
395
  Local<Context> context_;
396
  Message* msg_;
397
  std::vector<Global<SharedArrayBuffer>> seen_shared_array_buffers_;
398
  std::vector<BaseObjectPtr<BaseObject>> host_objects_;
399
  size_t first_cloned_object_index_ = SIZE_MAX;
400
401
  friend class worker::Message;
402
};
403
404
}  // anonymous namespace
405
406
49231
Maybe<bool> Message::Serialize(Environment* env,
407
                               Local<Context> context,
408
                               Local<Value> input,
409
                               const TransferList& transfer_list_v,
410
                               Local<Object> source_port) {
411
98471
  HandleScope handle_scope(env->isolate());
412
  Context::Scope context_scope(context);
413
414
  // Verify that we're not silently overwriting an existing message.
415
49234
  CHECK(main_message_buf_.is_empty());
416
417
98480
  SerializerDelegate delegate(env, context, this);
418
98477
  ValueSerializer serializer(env->isolate(), &delegate);
419
49242
  delegate.serializer = &serializer;
420
421
98487
  std::vector<Local<ArrayBuffer>> array_buffers;
422
59948
  for (uint32_t i = 0; i < transfer_list_v.length(); ++i) {
423
10721
    Local<Value> entry = transfer_list_v[i];
424
10721
    if (entry->IsObject()) {
425
      // See https://github.com/nodejs/node/pull/30339#issuecomment-552225353
426
      // for details.
427
      bool untransferable;
428
32163
      if (!entry.As<Object>()->HasPrivate(
429
              context,
430
21442
              env->untransferable_object_private_symbol())
431
10721
              .To(&untransferable)) {
432
        return Nothing<bool>();
433
      }
434
10721
      if (untransferable) continue;
435
    }
436
437
    // Currently, we support ArrayBuffers and BaseObjects for which
438
    // GetTransferMode() does not return kUntransferable.
439
10716
    if (entry->IsArrayBuffer()) {
440
21
      Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
441
      // If we cannot render the ArrayBuffer unusable in this Isolate,
442
      // copying the buffer will have to do.
443
      // Note that we can currently transfer ArrayBuffers even if they were
444
      // not allocated by Node’s ArrayBufferAllocator in the first place,
445
      // because we pass the underlying v8::BackingStore around rather than
446
      // raw data *and* an Isolate with a non-default ArrayBuffer allocator
447
      // is always going to outlive any Workers it creates, and so will its
448
      // allocator along with it.
449
41
      if (!ab->IsDetachable()) continue;
450
42
      if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
451
42
          array_buffers.end()) {
452
1
        ThrowDataCloneException(
453
            context,
454
            FIXED_ONE_BYTE_STRING(
455
                env->isolate(),
456
1
                "Transfer list contains duplicate ArrayBuffer"));
457
1
        return Nothing<bool>();
458
      }
459
      // We simply use the array index in the `array_buffers` list as the
460
      // ID that we write into the serialized buffer.
461
20
      uint32_t id = array_buffers.size();
462
20
      array_buffers.push_back(ab);
463
20
      serializer.TransferArrayBuffer(id, ab);
464
20
      continue;
465
21390
    } else if (env->base_object_ctor_template()->HasInstance(entry)) {
466
      // Check if the source MessagePort is being transferred.
467

21390
      if (!source_port.IsEmpty() && entry == source_port) {
468
1
        ThrowDataCloneException(
469
            context,
470
            FIXED_ONE_BYTE_STRING(env->isolate(),
471
1
                                  "Transfer list contains source port"));
472
10
        return Nothing<bool>();
473
      }
474
      BaseObjectPtr<BaseObject> host_object {
475
10694
          Unwrap<BaseObject>(entry.As<Object>()) };
476

32079
      if (env->message_port_constructor_template()->HasInstance(entry) &&
477
21367
          (!host_object ||
478
10683
           static_cast<MessagePort*>(host_object.get())->IsDetached())) {
479
7
        ThrowDataCloneException(
480
            context,
481
            FIXED_ONE_BYTE_STRING(
482
                env->isolate(),
483
7
                "MessagePort in transfer list is already detached"));
484
7
        return Nothing<bool>();
485
      }
486
21374
      if (std::find(delegate.host_objects_.begin(),
487
                    delegate.host_objects_.end(),
488
21374
                    host_object) != delegate.host_objects_.end()) {
489
1
        ThrowDataCloneException(
490
            context,
491
            String::Concat(env->isolate(),
492
                FIXED_ONE_BYTE_STRING(
493
                  env->isolate(),
494
                  "Transfer list contains duplicate "),
495
2
                entry.As<Object>()->GetConstructorName()));
496
1
        return Nothing<bool>();
497
      }
498

10686
      if (host_object && host_object->GetTransferMode() !=
499
10686
              BaseObject::TransferMode::kUntransferable) {
500
10686
        delegate.AddHostObject(host_object);
501
10686
        continue;
502
      }
503
    }
504
505
    THROW_ERR_INVALID_TRANSFER_OBJECT(env);
506
    return Nothing<bool>();
507
  }
508
98462
  if (delegate.AddNestedHostObjects().IsNothing())
509
    return Nothing<bool>();
510
511
49235
  serializer.WriteHeader();
512
98471
  if (serializer.WriteValue(context, input).IsNothing()) {
513
9
    return Nothing<bool>();
514
  }
515
516
49239
  for (Local<ArrayBuffer> ab : array_buffers) {
517
    // If serialization succeeded, we render it inaccessible in this Isolate.
518
24
    std::shared_ptr<BackingStore> backing_store = ab->GetBackingStore();
519
12
    ab->Detach();
520
521
12
    array_buffers_.emplace_back(std::move(backing_store));
522
  }
523
524
98452
  if (delegate.Finish(context).IsNothing())
525
4
    return Nothing<bool>();
526
527
  // The serializer gave us a buffer allocated using `malloc()`.
528
49223
  std::pair<uint8_t*, size_t> data = serializer.Release();
529
49223
  CHECK_NOT_NULL(data.first);
530
  main_message_buf_ =
531
49223
      MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
532
49222
  return Just(true);
533
}
534
535
void Message::MemoryInfo(MemoryTracker* tracker) const {
536
  tracker->TrackField("array_buffers_", array_buffers_);
537
  tracker->TrackField("shared_array_buffers", shared_array_buffers_);
538
  tracker->TrackField("transferables", transferables_);
539
}
540
541
33656
MessagePortData::MessagePortData(MessagePort* owner)
542
33656
    : owner_(owner) {
543
33656
}
544
545
100742
MessagePortData::~MessagePortData() {
546
33581
  CHECK_NULL(owner_);
547
33581
  Disentangle();
548
67163
}
549
550
2
void MessagePortData::MemoryInfo(MemoryTracker* tracker) const {
551
4
  Mutex::ScopedLock lock(mutex_);
552
2
  tracker->TrackField("incoming_messages", incoming_messages_);
553
2
}
554
555
83347
void MessagePortData::AddToIncomingQueue(std::shared_ptr<Message> message) {
556
  // This function will be called by other threads.
557
166675
  Mutex::ScopedLock lock(mutex_);
558
83355
  incoming_messages_.emplace_back(std::move(message));
559
560
83330
  if (owner_ != nullptr) {
561
58656
    Debug(owner_, "Adding message to incoming queue");
562
58653
    owner_->TriggerAsync();
563
  }
564
83354
}
565
566
11396
void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
567
22792
  auto group = std::make_shared<SiblingGroup>();
568
11396
  group->Entangle({a, b});
569
11396
}
570
571
55878
void MessagePortData::Disentangle() {
572
55878
  if (group_) {
573
22808
    group_->Disentangle(this);
574
  }
575
55877
}
576
577
131903
MessagePort::~MessagePort() {
578
32975
  if (data_) Detach();
579
65952
}
580
581
33001
MessagePort::MessagePort(Environment* env,
582
                         Local<Context> context,
583
33001
                         Local<Object> wrap)
584
  : HandleWrap(env,
585
               wrap,
586
33001
               reinterpret_cast<uv_handle_t*>(&async_),
587
               AsyncWrap::PROVIDER_MESSAGEPORT),
588
33001
    data_(new MessagePortData(this)) {
589
102281
  auto onmessage = [](uv_async_t* handle) {
590
    // Called when data has been put into the queue.
591
34640
    MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
592
34640
    channel->OnMessage(MessageProcessingMode::kNormalOperation);
593
102278
  };
594
595
33001
  CHECK_EQ(uv_async_init(env->event_loop(),
596
                         &async_,
597
                         onmessage), 0);
598
  // Reset later to indicate success of the constructor.
599
33001
  bool succeeded = false;
600
99003
  auto cleanup = OnScopeLeave([&]() { if (!succeeded) Close(); });
601
602
  Local<Value> fn;
603
99003
  if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn))
604
    return;
605
606
33001
  if (fn->IsFunction()) {
607
32995
    Local<Function> init = fn.As<Function>();
608
65990
    if (init->Call(context, wrap, 0, nullptr).IsEmpty())
609
      return;
610
  }
611
612
  Local<Function> emit_message_fn;
613
66002
  if (!GetEmitMessageFunction(context).ToLocal(&emit_message_fn))
614
    return;
615
33001
  emit_message_fn_.Reset(env->isolate(), emit_message_fn);
616
617
33001
  succeeded = true;
618
33001
  Debug(this, "Created message port");
619
}
620
621
32040
bool MessagePort::IsDetached() const {
622

32040
  return data_ == nullptr || IsHandleClosing();
623
}
624
625
80414
void MessagePort::TriggerAsync() {
626
80414
  if (IsHandleClosing()) return;
627
80377
  CHECK_EQ(uv_async_send(&async_), 0);
628
}
629
630
32978
void MessagePort::Close(v8::Local<v8::Value> close_callback) {
631
65956
  Debug(this, "Closing message port, data set = %d", static_cast<int>(!!data_));
632
633
32978
  if (data_) {
634
    // Wrap this call with accessing the mutex, so that TriggerAsync()
635
    // can check IsHandleClosing() without race conditions.
636
65946
    Mutex::ScopedLock sibling_lock(data_->mutex_);
637
32973
    HandleWrap::Close(close_callback);
638
  } else {
639
5
    HandleWrap::Close(close_callback);
640
  }
641
32978
}
642
643
2
void MessagePort::New(const FunctionCallbackInfo<Value>& args) {
644
  // This constructor just throws an error. Unfortunately, we can’t use V8’s
645
  // ConstructorBehavior::kThrow, as that also removes the prototype from the
646
  // class (i.e. makes it behave like an arrow function).
647
2
  Environment* env = Environment::GetCurrent(args);
648
2
  THROW_ERR_CONSTRUCT_CALL_INVALID(env);
649
2
}
650
651
33001
MessagePort* MessagePort::New(
652
    Environment* env,
653
    Local<Context> context,
654
    std::unique_ptr<MessagePortData> data,
655
    std::shared_ptr<SiblingGroup> sibling_group) {
656
  Context::Scope context_scope(context);
657
33001
  Local<FunctionTemplate> ctor_templ = GetMessagePortConstructorTemplate(env);
658
659
  // Construct a new instance, then assign the listener instance and possibly
660
  // the MessagePortData to it.
661
  Local<Object> instance;
662
99003
  if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance))
663
    return nullptr;
664
33001
  MessagePort* port = new MessagePort(env, context, instance);
665
33001
  CHECK_NOT_NULL(port);
666
33001
  if (port->IsHandleClosing()) {
667
    // Construction failed with an exception.
668
    return nullptr;
669
  }
670
671
33001
  if (data) {
672
10823
    CHECK(!sibling_group);
673
10823
    port->Detach();
674
10823
    port->data_ = std::move(data);
675
676
    // This lock is here to avoid race conditions with the `owner_` read
677
    // in AddToIncomingQueue(). (This would likely be unproblematic without it,
678
    // but it's better to be safe than sorry.)
679
21646
    Mutex::ScopedLock lock(port->data_->mutex_);
680
10823
    port->data_->owner_ = port;
681
    // If the existing MessagePortData object had pending messages, this is
682
    // the easiest way to run that queue.
683
10823
    port->TriggerAsync();
684
22178
  } else if (sibling_group) {
685
41
    sibling_group->Entangle(port->data_.get());
686
  }
687
33000
  return port;
688
}
689
690
83219
MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
691
                                              MessageProcessingMode mode,
692
                                              Local<Value>* port_list) {
693
166574
  std::shared_ptr<Message> received;
694
  {
695
    // Get the head of the message queue.
696
142811
    Mutex::ScopedLock lock(data_->mutex_);
697
698
83345
    Debug(this, "MessagePort has message");
699
700
    bool wants_message =
701

83336
        receiving_messages_ ||
702
83336
        mode == MessageProcessingMode::kForceReadMessages;
703
    // We have nothing to do if:
704
    // - There are no pending messages
705
    // - We are not intending to receive messages, and the message we would
706
    //   receive is not the final "close" message.
707

166637
    if (data_->incoming_messages_.empty() ||
708
69809
        (!wants_message &&
709
10278
         !data_->incoming_messages_.front()->IsCloseMessage())) {
710
47538
      return env()->no_message_symbol();
711
    }
712
713
59530
    received = data_->incoming_messages_.front();
714
59591
    data_->incoming_messages_.pop_front();
715
  }
716
717
59592
  if (received->IsCloseMessage()) {
718
22630
    Close();
719
22630
    return env()->no_message_symbol();
720
  }
721
722
48282
  if (!env()->can_call_into_js()) return MaybeLocal<Value>();
723
724
48271
  return received->Deserialize(env(), context, port_list);
725
}
726
727
35151
void MessagePort::OnMessage(MessageProcessingMode mode) {
728
35151
  Debug(this, "Running MessagePort::OnMessage()");
729
70231
  HandleScope handle_scope(env()->isolate());
730
  Local<Context> context =
731
105453
      object(env()->isolate())->GetCreationContext().ToLocalChecked();
732
733
  size_t processing_limit;
734
35151
  if (mode == MessageProcessingMode::kNormalOperation) {
735
34640
    Mutex::ScopedLock(data_->mutex_);
736
69280
    processing_limit = std::max(data_->incoming_messages_.size(),
737
103920
                                static_cast<size_t>(1000));
738
  } else {
739
511
    processing_limit = std::numeric_limits<size_t>::max();
740
  }
741
742
  // data_ can only ever be modified by the owner thread, so no need to lock.
743
  // However, the message port may be transferred while it is processing
744
  // messages, so we need to check that this handle still owns its `data_` field
745
  // on every iteration.
746

131563
  while (data_) {
747
83228
    if (processing_limit-- == 0) {
748
      // Prevent event loop starvation by only processing those messages without
749
      // interruption that were already present when the OnMessage() call was
750
      // first triggered, but at least 1000 messages because otherwise the
751
      // overhead of repeatedly triggering the uv_async_t instance becomes
752
      // noticeable, at least on Windows.
753
      // (That might require more investigation by somebody more familiar with
754
      // Windows.)
755
1
      TriggerAsync();
756
1
      return;
757
    }
758
759
131433
    HandleScope handle_scope(env()->isolate());
760

48206
    Context::Scope context_scope(context);
761
83190
    Local<Function> emit_message = PersistentToLocal::Strong(emit_message_fn_);
762
763
    Local<Value> payload;
764
83197
    Local<Value> port_list = Undefined(env()->isolate());
765
    Local<Value> message_error;
766
332920
    Local<Value> argv[3];
767
768
    {
769
      // Catch any exceptions from parsing the message itself (not from
770
      // emitting it) as 'messageeror' events.
771
166523
      TryCatchScope try_catch(env());
772
166505
      if (!ReceiveMessage(context, mode, &port_list).ToLocal(&payload)) {
773

6
        if (try_catch.HasCaught() && !try_catch.HasTerminated())
774
6
          message_error = try_catch.Exception();
775
6
        goto reschedule;
776
      }
777
    }
778
166641
    if (payload == env()->no_message_symbol()) break;
779
780
48272
    if (!env()->can_call_into_js()) {
781
      Debug(this, "MessagePort drains queue because !can_call_into_js()");
782
      // In this case there is nothing to do but to drain the current queue.
783
      continue;
784
    }
785
786
48269
    argv[0] = payload;
787
48269
    argv[1] = port_list;
788
96541
    argv[2] = env()->message_string();
789
790
96539
    if (MakeCallback(emit_message, arraysize(argv), argv).IsEmpty()) {
791
    reschedule:
792
67
      if (!message_error.IsEmpty()) {
793
6
        argv[0] = message_error;
794
12
        argv[1] = Undefined(env()->isolate());
795
12
        argv[2] = env()->messageerror_string();
796
6
        USE(MakeCallback(emit_message, arraysize(argv), argv));
797
      }
798
799
      // Re-schedule OnMessage() execution in case of failure.
800
67
      if (data_)
801
67
        TriggerAsync();
802
67
      return;
803
    }
804
  }
805
}
806
807
32974
void MessagePort::OnClose() {
808
32974
  Debug(this, "MessagePort::OnClose()");
809
32974
  if (data_) {
810
    // Detach() returns move(data_).
811
22297
    Detach()->Disentangle();
812
  }
813
32976
}
814
815
43790
std::unique_ptr<MessagePortData> MessagePort::Detach() {
816
43790
  CHECK(data_);
817
87589
  Mutex::ScopedLock lock(data_->mutex_);
818
43800
  data_->owner_ = nullptr;
819
87599
  return std::move(data_);
820
}
821
822
21352
BaseObject::TransferMode MessagePort::GetTransferMode() const {
823
21352
  if (IsDetached())
824
1
    return BaseObject::TransferMode::kUntransferable;
825
21351
  return BaseObject::TransferMode::kTransferable;
826
}
827
828
10673
std::unique_ptr<TransferData> MessagePort::TransferForMessaging() {
829
21346
  Close();
830
10673
  return Detach();
831
}
832
833
10413
BaseObjectPtr<BaseObject> MessagePortData::Deserialize(
834
    Environment* env,
835
    Local<Context> context,
836
    std::unique_ptr<TransferData> self) {
837
41652
  return BaseObjectPtr<MessagePort> { MessagePort::New(
838
      env, context,
839
41652
      static_unique_pointer_cast<MessagePortData>(std::move(self))) };
840
}
841
842
49245
Maybe<bool> MessagePort::PostMessage(Environment* env,
843
                                     Local<Context> context,
844
                                     Local<Value> message_v,
845
                                     const TransferList& transfer_v) {
846
49245
  Isolate* isolate = env->isolate();
847
49245
  Local<Object> obj = object(isolate);
848
849
98479
  std::shared_ptr<Message> msg = std::make_shared<Message>();
850
851
  // Per spec, we need to both check if transfer list has the source port, and
852
  // serialize the input message, even if the MessagePort is closed or detached.
853
854
  Maybe<bool> serialization_maybe =
855
49230
      msg->Serialize(env, context, message_v, transfer_v, obj);
856
49239
  if (data_ == nullptr) {
857
2
    return serialization_maybe;
858
  }
859
49236
  if (serialization_maybe.IsNothing()) {
860
20
    return Nothing<bool>();
861
  }
862
863
98434
  std::string error;
864
49216
  Maybe<bool> res = data_->Dispatch(msg, &error);
865
49219
  if (res.IsNothing())
866
    return res;
867
868
49219
  if (!error.empty())
869
2
    ProcessEmitWarning(env, error.c_str());
870
871
49218
  return res;
872
}
873
874
49217
Maybe<bool> MessagePortData::Dispatch(
875
    std::shared_ptr<Message> message,
876
    std::string* error) {
877
49217
  if (!group_) {
878
    if (error != nullptr)
879
      *error = "MessagePortData is not entangled.";
880
    return Nothing<bool>();
881
  }
882
49214
  return group_->Dispatch(this, message, error);
883
}
884
885
10729
static Maybe<bool> ReadIterable(Environment* env,
886
                                Local<Context> context,
887
                                // NOLINTNEXTLINE(runtime/references)
888
                                TransferList& transfer_list,
889
                                Local<Value> object) {
890
10729
  if (!object->IsObject()) return Just(false);
891
892
10726
  if (object->IsArray()) {
893
10706
    Local<Array> arr = object.As<Array>();
894
10706
    size_t length = arr->Length();
895
10706
    transfer_list.AllocateSufficientStorage(length);
896
21425
    for (size_t i = 0; i < length; i++) {
897
32157
      if (!arr->Get(context, i).ToLocal(&transfer_list[i]))
898
        return Nothing<bool>();
899
    }
900
10706
    return Just(true);
901
  }
902
903
20
  Isolate* isolate = env->isolate();
904
  Local<Value> iterator_method;
905
80
  if (!object.As<Object>()->Get(context, Symbol::GetIterator(isolate))
906
20
      .ToLocal(&iterator_method)) return Nothing<bool>();
907
20
  if (!iterator_method->IsFunction()) return Just(false);
908
909
  Local<Value> iterator;
910
18
  if (!iterator_method.As<Function>()->Call(context, object, 0, nullptr)
911
6
      .ToLocal(&iterator)) return Nothing<bool>();
912
6
  if (!iterator->IsObject()) return Just(false);
913
914
  Local<Value> next;
915
24
  if (!iterator.As<Object>()->Get(context, env->next_string()).ToLocal(&next))
916
    return Nothing<bool>();
917
6
  if (!next->IsFunction()) return Just(false);
918
919
6
  std::vector<Local<Value>> entries;
920
7
  while (env->can_call_into_js()) {
921
    Local<Value> result;
922
15
    if (!next.As<Function>()->Call(context, iterator, 0, nullptr)
923
7
        .ToLocal(&result)) return Nothing<bool>();
924
4
    if (!result->IsObject()) return Just(false);
925
926
    Local<Value> done;
927
16
    if (!result.As<Object>()->Get(context, env->done_string()).ToLocal(&done))
928
      return Nothing<bool>();
929
4
    if (done->BooleanValue(isolate)) break;
930
931
    Local<Value> val;
932
8
    if (!result.As<Object>()->Get(context, env->value_string()).ToLocal(&val))
933
      return Nothing<bool>();
934
2
    entries.push_back(val);
935
  }
936
937
2
  transfer_list.AllocateSufficientStorage(entries.size());
938
2
  std::copy(entries.begin(), entries.end(), &transfer_list[0]);
939
2
  return Just(true);
940
}
941
942
49258
void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
943
49258
  Environment* env = Environment::GetCurrent(args);
944
49256
  Local<Object> obj = args.This();
945
98514
  Local<Context> context = obj->GetCreationContext().ToLocalChecked();
946
947
49258
  if (args.Length() == 0) {
948
    return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
949
13
                                       "MessagePort.postMessage");
950
  }
951
952

169218
  if (!args[1]->IsNullOrUndefined() && !args[1]->IsObject()) {
953
    // Browsers ignore null or undefined, and otherwise accept an array or an
954
    // options object.
955
    return THROW_ERR_INVALID_ARG_TYPE(env,
956
4
        "Optional transferList argument must be an iterable");
957
  }
958
959
98496
  TransferList transfer_list;
960
98508
  if (args[1]->IsObject()) {
961
    bool was_iterable;
962
21436
    if (!ReadIterable(env, context, transfer_list, args[1]).To(&was_iterable))
963
8
      return;
964
10718
    if (!was_iterable) {
965
      Local<Value> transfer_option;
966
65
      if (!args[1].As<Object>()->Get(context, env->transfer_string())
967
21
          .ToLocal(&transfer_option)) return;
968
26
      if (!transfer_option->IsUndefined()) {
969
22
        if (!ReadIterable(env, context, transfer_list, transfer_option)
970
12
            .To(&was_iterable)) return;
971
10
        if (!was_iterable) {
972
          return THROW_ERR_INVALID_ARG_TYPE(env,
973
7
              "Optional options.transfer argument must be an iterable");
974
        }
975
      }
976
    }
977
  }
978
979
49246
  MessagePort* port = Unwrap<MessagePort>(args.This());
980
  // Even if the backing MessagePort object has already been deleted, we still
981
  // want to serialize the message to ensure spec-compliant behavior w.r.t.
982
  // transfers.
983
49246
  if (port == nullptr) {
984
2
    Message msg;
985
1
    USE(msg.Serialize(env, context, args[0], transfer_list, obj));
986
1
    return;
987
  }
988
989
49245
  Maybe<bool> res = port->PostMessage(env, context, args[0], transfer_list);
990
49242
  if (res.IsJust())
991
147663
    args.GetReturnValue().Set(res.FromJust());
992
}
993
994
23364
void MessagePort::Start() {
995
23364
  Debug(this, "Start receiving messages");
996
23364
  receiving_messages_ = true;
997
46728
  Mutex::ScopedLock lock(data_->mutex_);
998
23364
  if (!data_->incoming_messages_.empty())
999
10870
    TriggerAsync();
1000
23364
}
1001
1002
20338
void MessagePort::Stop() {
1003
20338
  Debug(this, "Stop receiving messages");
1004
20338
  receiving_messages_ = false;
1005
20338
}
1006
1007
23365
void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
1008
  MessagePort* port;
1009
23366
  ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
1010
23365
  if (!port->data_) {
1011
1
    return;
1012
  }
1013
23364
  port->Start();
1014
}
1015
1016
20533
void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
1017
  MessagePort* port;
1018
41066
  CHECK(args[0]->IsObject());
1019
41261
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
1020
20340
  if (!port->data_) {
1021
2
    return;
1022
  }
1023
20338
  port->Stop();
1024
}
1025
1026
7
void MessagePort::CheckType(const FunctionCallbackInfo<Value>& args) {
1027
7
  Environment* env = Environment::GetCurrent(args);
1028
21
  args.GetReturnValue().Set(
1029
21
      GetMessagePortConstructorTemplate(env)->HasInstance(args[0]));
1030
7
}
1031
1032
1252
void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
1033
  MessagePort* port;
1034
2504
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
1035
511
  port->OnMessage(MessageProcessingMode::kForceReadMessages);
1036
}
1037
1038
13
void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) {
1039
13
  Environment* env = Environment::GetCurrent(args);
1040

49
  if (!args[0]->IsObject() ||
1041
33
      !env->message_port_constructor_template()->HasInstance(args[0])) {
1042
    return THROW_ERR_INVALID_ARG_TYPE(env,
1043
10
        "The \"port\" argument must be a MessagePort instance");
1044
  }
1045
16
  MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1046
8
  if (port == nullptr) {
1047
    // Return 'no messages' for a closed port.
1048
    args.GetReturnValue().Set(
1049
        Environment::GetCurrent(args)->no_message_symbol());
1050
    return;
1051
  }
1052
1053
  MaybeLocal<Value> payload = port->ReceiveMessage(
1054
24
      port->object()->GetCreationContext().ToLocalChecked(),
1055
8
      MessageProcessingMode::kForceReadMessages);
1056
8
  if (!payload.IsEmpty())
1057
16
    args.GetReturnValue().Set(payload.ToLocalChecked());
1058
}
1059
1060
6
void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
1061
6
  Environment* env = Environment::GetCurrent(args);
1062

24
  if (!args[0]->IsObject() ||
1063
18
      !env->message_port_constructor_template()->HasInstance(args[0])) {
1064
    return THROW_ERR_INVALID_ARG_TYPE(env,
1065
1
        "The \"port\" argument must be a MessagePort instance");
1066
  }
1067
12
  MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1068

6
  if (port == nullptr || port->IsHandleClosing()) {
1069
1
    Isolate* isolate = env->isolate();
1070
1
    THROW_ERR_CLOSED_MESSAGE_PORT(isolate);
1071
1
    return;
1072
  }
1073
1074
5
  Local<Value> context_arg = args[1];
1075
  ContextifyContext* context_wrapper;
1076

15
  if (!context_arg->IsObject() ||
1077
      (context_wrapper = ContextifyContext::ContextFromContextifiedSandbox(
1078
10
          env, context_arg.As<Object>())) == nullptr) {
1079
    return THROW_ERR_INVALID_ARG_TYPE(env, "Invalid context argument");
1080
  }
1081
1082
10
  std::unique_ptr<MessagePortData> data;
1083
5
  if (!port->IsDetached())
1084
5
    data = port->Detach();
1085
1086
5
  Context::Scope context_scope(context_wrapper->context());
1087
  MessagePort* target =
1088
5
      MessagePort::New(env, context_wrapper->context(), std::move(data));
1089
5
  if (target != nullptr)
1090
15
    args.GetReturnValue().Set(target->object());
1091
}
1092
1093
10741
void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
1094
10741
  MessagePortData::Entangle(a->data_.get(), b->data_.get());
1095
10741
}
1096
1097
655
void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
1098
655
  MessagePortData::Entangle(a->data_.get(), b);
1099
655
}
1100
1101
2
void MessagePort::MemoryInfo(MemoryTracker* tracker) const {
1102
2
  tracker->TrackField("data", data_);
1103
2
  tracker->TrackField("emit_message_fn", emit_message_fn_);
1104
2
}
1105
1106
33926
Local<FunctionTemplate> GetMessagePortConstructorTemplate(Environment* env) {
1107
  // Factor generating the MessagePort JS constructor into its own piece
1108
  // of code, because it is needed early on in the child environment setup.
1109
33926
  Local<FunctionTemplate> templ = env->message_port_constructor_template();
1110
33926
  if (!templ.IsEmpty())
1111
33467
    return templ;
1112
1113
  {
1114
459
    Local<FunctionTemplate> m = env->NewFunctionTemplate(MessagePort::New);
1115
918
    m->SetClassName(env->message_port_constructor_string());
1116
1377
    m->InstanceTemplate()->SetInternalFieldCount(
1117
459
        MessagePort::kInternalFieldCount);
1118
918
    m->Inherit(HandleWrap::GetConstructorTemplate(env));
1119
1120
459
    env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage);
1121
459
    env->SetProtoMethod(m, "start", MessagePort::Start);
1122
1123
459
    env->set_message_port_constructor_template(m);
1124
  }
1125
1126
459
  return GetMessagePortConstructorTemplate(env);
1127
}
1128
1129
2780
JSTransferable::JSTransferable(Environment* env, Local<Object> obj)
1130
2780
    : BaseObject(env, obj) {
1131
2780
  MakeWeak();
1132
2780
}
1133
1134
2780
void JSTransferable::New(const FunctionCallbackInfo<Value>& args) {
1135
2780
  CHECK(args.IsConstructCall());
1136
5560
  new JSTransferable(Environment::GetCurrent(args), args.This());
1137
2780
}
1138
1139
29
JSTransferable::TransferMode JSTransferable::GetTransferMode() const {
1140
  // Implement `kClone in this ? kCloneable : kTransferable`.
1141
58
  HandleScope handle_scope(env()->isolate());
1142
58
  errors::TryCatchScope ignore_exceptions(env());
1143
1144
  bool has_clone;
1145
116
  if (!object()->Has(env()->context(),
1146
145
                     env()->messaging_clone_symbol()).To(&has_clone)) {
1147
    return TransferMode::kUntransferable;
1148
  }
1149
1150
29
  return has_clone ? TransferMode::kCloneable : TransferMode::kTransferable;
1151
}
1152
1153
10
std::unique_ptr<TransferData> JSTransferable::TransferForMessaging() {
1154
10
  return TransferOrClone(TransferMode::kTransferable);
1155
}
1156
1157
11
std::unique_ptr<TransferData> JSTransferable::CloneForMessaging() const {
1158
11
  return TransferOrClone(TransferMode::kCloneable);
1159
}
1160
1161
21
std::unique_ptr<TransferData> JSTransferable::TransferOrClone(
1162
    TransferMode mode) const {
1163
  // Call `this[symbol]()` where `symbol` is `kClone` or `kTransfer`,
1164
  // which should return an object with `data` and `deserializeInfo` properties;
1165
  // `data` is written to the serializer later, and `deserializeInfo` is stored
1166
  // on the `TransferData` instance as a string.
1167
42
  HandleScope handle_scope(env()->isolate());
1168
21
  Local<Context> context = env()->isolate()->GetCurrentContext();
1169
  Local<Symbol> method_name = mode == TransferMode::kCloneable ?
1170
21
      env()->messaging_clone_symbol() : env()->messaging_transfer_symbol();
1171
1172
  Local<Value> method;
1173
63
  if (!object()->Get(context, method_name).ToLocal(&method)) {
1174
    return {};
1175
  }
1176
21
  if (method->IsFunction()) {
1177
    Local<Value> result_v;
1178
72
    if (!method.As<Function>()->Call(
1179
72
            context, object(), 0, nullptr).ToLocal(&result_v)) {
1180
21
      return {};
1181
    }
1182
1183
15
    if (result_v->IsObject()) {
1184
15
      Local<Object> result = result_v.As<Object>();
1185
      Local<Value> data;
1186
      Local<Value> deserialize_info;
1187

75
      if (!result->Get(context, env()->data_string()).ToLocal(&data) ||
1188
60
          !result->Get(context, env()->deserialize_info_string())
1189
15
              .ToLocal(&deserialize_info)) {
1190
        return {};
1191
      }
1192
30
      Utf8Value deserialize_info_str(env()->isolate(), deserialize_info);
1193
15
      if (*deserialize_info_str == nullptr) return {};
1194
30
      return std::make_unique<Data>(
1195
75
          *deserialize_info_str, Global<Value>(env()->isolate(), data));
1196
    }
1197
  }
1198
1199
3
  if (mode == TransferMode::kTransferable)
1200
    return TransferOrClone(TransferMode::kCloneable);
1201
  else
1202
3
    return {};
1203
}
1204
1205
Maybe<BaseObjectList>
1206
10
JSTransferable::NestedTransferables() const {
1207
  // Call `this[kTransferList]()` and return the resulting list of BaseObjects.
1208
20
  HandleScope handle_scope(env()->isolate());
1209
10
  Local<Context> context = env()->isolate()->GetCurrentContext();
1210
10
  Local<Symbol> method_name = env()->messaging_transfer_list_symbol();
1211
1212
  Local<Value> method;
1213
30
  if (!object()->Get(context, method_name).ToLocal(&method)) {
1214
    return Nothing<BaseObjectList>();
1215
  }
1216
10
  if (!method->IsFunction()) return Just(BaseObjectList {});
1217
1218
  Local<Value> list_v;
1219
40
  if (!method.As<Function>()->Call(
1220
40
          context, object(), 0, nullptr).ToLocal(&list_v)) {
1221
    return Nothing<BaseObjectList>();
1222
  }
1223
10
  if (!list_v->IsArray()) return Just(BaseObjectList {});
1224
10
  Local<Array> list = list_v.As<Array>();
1225
1226
20
  BaseObjectList ret;
1227
40
  for (size_t i = 0; i < list->Length(); i++) {
1228
    Local<Value> value;
1229
30
    if (!list->Get(context, i).ToLocal(&value))
1230
      return Nothing<BaseObjectList>();
1231
20
    if (env()->base_object_ctor_template()->HasInstance(value))
1232
10
      ret.emplace_back(Unwrap<BaseObject>(value));
1233
  }
1234
10
  return Just(ret);
1235
}
1236
1237
10
Maybe<bool> JSTransferable::FinalizeTransferRead(
1238
    Local<Context> context, ValueDeserializer* deserializer) {
1239
  // Call `this[kDeserialize](data)` where `data` comes from the return value
1240
  // of `this[kTransfer]()` or `this[kClone]()`.
1241
20
  HandleScope handle_scope(env()->isolate());
1242
  Local<Value> data;
1243
20
  if (!deserializer->ReadValue(context).ToLocal(&data)) return Nothing<bool>();
1244
1245
10
  Local<Symbol> method_name = env()->messaging_deserialize_symbol();
1246
  Local<Value> method;
1247
30
  if (!object()->Get(context, method_name).ToLocal(&method)) {
1248
    return Nothing<bool>();
1249
  }
1250
10
  if (!method->IsFunction()) return Just(true);
1251
1252
40
  if (method.As<Function>()->Call(context, object(), 1, &data).IsEmpty()) {
1253
    return Nothing<bool>();
1254
  }
1255
10
  return Just(true);
1256
}
1257
1258
15
JSTransferable::Data::Data(std::string&& deserialize_info,
1259
15
                           v8::Global<v8::Value>&& data)
1260
15
    : deserialize_info_(std::move(deserialize_info)),
1261
45
      data_(std::move(data)) {}
1262
1263
13
BaseObjectPtr<BaseObject> JSTransferable::Data::Deserialize(
1264
    Environment* env,
1265
    Local<Context> context,
1266
    std::unique_ptr<TransferData> self) {
1267
  // Create the JS wrapper object that will later be filled with data passed to
1268
  // the `[kDeserialize]()` method on it. This split is necessary, because here
1269
  // we need to create an object with the right prototype and internal fields,
1270
  // but the actual JS data stored in the serialized data can only be read at
1271
  // the end of the stream, after the main message has been read.
1272
1273
26
  if (context != env->context()) {
1274
1
    THROW_ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE(env);
1275
1
    return {};
1276
  }
1277
24
  HandleScope handle_scope(env->isolate());
1278
  Local<Value> info;
1279
24
  if (!ToV8Value(context, deserialize_info_).ToLocal(&info)) return {};
1280
1281
  Local<Value> ret;
1282
24
  CHECK(!env->messaging_deserialize_create_object().IsEmpty());
1283
72
  if (!env->messaging_deserialize_create_object()->Call(
1284

70
          context, Null(env->isolate()), 1, &info).ToLocal(&ret) ||
1285
32
      !env->base_object_ctor_template()->HasInstance(ret)) {
1286
2
    return {};
1287
  }
1288
1289
10
  return BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(ret) };
1290
}
1291
1292
15
Maybe<bool> JSTransferable::Data::FinalizeTransferWrite(
1293
    Local<Context> context, ValueSerializer* serializer) {
1294
30
  HandleScope handle_scope(context->GetIsolate());
1295
15
  auto ret = serializer->WriteValue(context, PersistentToLocal::Strong(data_));
1296
15
  data_.Reset();
1297
30
  return ret;
1298
}
1299
1300
41
std::shared_ptr<SiblingGroup> SiblingGroup::Get(const std::string& name) {
1301
82
  Mutex::ScopedLock lock(SiblingGroup::groups_mutex_);
1302
41
  std::shared_ptr<SiblingGroup> group;
1303
41
  auto i = groups_.find(name);
1304

41
  if (i == groups_.end() || i->second.expired()) {
1305
20
    group = std::make_shared<SiblingGroup>(name);
1306
20
    groups_[name] = group;
1307
  } else {
1308
21
    group = i->second.lock();
1309
  }
1310
82
  return group;
1311
}
1312
1313
20
void SiblingGroup::CheckSiblingGroup(const std::string& name) {
1314
40
  Mutex::ScopedLock lock(SiblingGroup::groups_mutex_);
1315
20
  auto i = groups_.find(name);
1316

20
  if (i != groups_.end() && i->second.expired())
1317
20
    groups_.erase(name);
1318
20
}
1319
1320
20
SiblingGroup::SiblingGroup(const std::string& name)
1321
20
    : name_(name) { }
1322
1323
22784
SiblingGroup::~SiblingGroup() {
1324
  // If this is a named group, check to see if we can remove the group
1325
11392
  if (!name_.empty())
1326
20
    CheckSiblingGroup(name_);
1327
11392
}
1328
1329
49217
Maybe<bool> SiblingGroup::Dispatch(
1330
    MessagePortData* source,
1331
    std::shared_ptr<Message> message,
1332
    std::string* error) {
1333
1334
98440
  RwLock::ScopedReadLock lock(group_mutex_);
1335
1336
  // The source MessagePortData is not part of this group.
1337
49218
  if (ports_.find(source) == ports_.end()) {
1338
    if (error != nullptr)
1339
      *error = "Source MessagePort is not entangled with this group.";
1340
    return Nothing<bool>();
1341
  }
1342
1343
  // There are no destination ports.
1344
49199
  if (size() <= 1)
1345
80
    return Just(false);
1346
1347
  // Transferables cannot be used when there is more
1348
  // than a single destination.
1349

49120
  if (size() > 2 && message->has_transferables()) {
1350
    if (error != nullptr)
1351
      *error = "Transferables cannot be used with multiple destinations.";
1352
    return Nothing<bool>();
1353
  }
1354
1355
147392
  for (MessagePortData* port : ports_) {
1356
98257
    if (port == source)
1357
49130
      continue;
1358
    // This loop should only be entered if there's only a single destination
1359
59834
    for (const auto& transferable : message->transferables()) {
1360
10709
      if (port == transferable.get()) {
1361
2
        if (error != nullptr) {
1362
          *error = "The target port was posted to itself, and the "
1363
2
                   "communication channel was lost";
1364
        }
1365
2
        return Just(true);
1366
      }
1367
    }
1368
49125
    port->AddToIncomingQueue(message);
1369
  }
1370
1371
49118
  return Just(true);
1372
}
1373
1374
41
void SiblingGroup::Entangle(MessagePortData* port) {
1375
41
  Entangle({ port });
1376
41
}
1377
1378
11437
void SiblingGroup::Entangle(std::initializer_list<MessagePortData*> ports) {
1379
22874
  RwLock::ScopedWriteLock lock(group_mutex_);
1380
34270
  for (MessagePortData* data : ports) {
1381
22833
    ports_.insert(data);
1382
22833
    CHECK(!data->group_);
1383
22833
    data->group_ = shared_from_this();
1384
  }
1385
11437
}
1386
1387
22808
void SiblingGroup::Disentangle(MessagePortData* data) {
1388
45617
  auto self = shared_from_this();  // Keep alive until end of function.
1389
45617
  RwLock::ScopedWriteLock lock(group_mutex_);
1390
22809
  ports_.erase(data);
1391
22808
  data->group_.reset();
1392
1393
22808
  data->AddToIncomingQueue(std::make_shared<Message>());
1394
  // If this is an anonymous group and there's another port, close it.
1395

22809
  if (size() == 1 && name_.empty())
1396
11395
    (*(ports_.begin()))->AddToIncomingQueue(std::make_shared<Message>());
1397
22809
}
1398
1399
4845
SiblingGroup::Map SiblingGroup::groups_;
1400
4845
Mutex SiblingGroup::groups_mutex_;
1401
1402
namespace {
1403
1404
459
static void SetDeserializerCreateObjectFunction(
1405
    const FunctionCallbackInfo<Value>& args) {
1406
459
  Environment* env = Environment::GetCurrent(args);
1407
918
  CHECK(args[0]->IsFunction());
1408
918
  env->set_messaging_deserialize_create_object(args[0].As<Function>());
1409
459
}
1410
1411
10742
static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
1412
10742
  Environment* env = Environment::GetCurrent(args);
1413
10742
  if (!args.IsConstructCall()) {
1414
1
    THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
1415
1
    return;
1416
  }
1417
1418
32223
  Local<Context> context = args.This()->GetCreationContext().ToLocalChecked();
1419
10741
  Context::Scope context_scope(context);
1420
1421
10741
  MessagePort* port1 = MessagePort::New(env, context);
1422
10741
  if (port1 == nullptr) return;
1423
10741
  MessagePort* port2 = MessagePort::New(env, context);
1424
10741
  if (port2 == nullptr) {
1425
    port1->Close();
1426
    return;
1427
  }
1428
1429
10741
  MessagePort::Entangle(port1, port2);
1430
1431
53705
  args.This()->Set(context, env->port1_string(), port1->object())
1432
      .Check();
1433
53705
  args.This()->Set(context, env->port2_string(), port2->object())
1434
      .Check();
1435
}
1436
1437
41
static void BroadcastChannel(const FunctionCallbackInfo<Value>& args) {
1438
123
  CHECK(args[0]->IsString());
1439
41
  Environment* env = Environment::GetCurrent(args);
1440
41
  Context::Scope context_scope(env->context());
1441
82
  Utf8Value name(env->isolate(), args[0]);
1442
  MessagePort* port =
1443
41
      MessagePort::New(env, env->context(), {}, SiblingGroup::Get(*name));
1444
41
  if (port != nullptr) {
1445
123
    args.GetReturnValue().Set(port->object());
1446
  }
1447
41
}
1448
1449
459
static void InitMessaging(Local<Object> target,
1450
                          Local<Value> unused,
1451
                          Local<Context> context,
1452
                          void* priv) {
1453
459
  Environment* env = Environment::GetCurrent(context);
1454
1455
  {
1456
918
    env->SetConstructorFunction(
1457
        target,
1458
        "MessageChannel",
1459
459
        env->NewFunctionTemplate(MessageChannel));
1460
  }
1461
1462
  {
1463
459
    Local<FunctionTemplate> t = env->NewFunctionTemplate(JSTransferable::New);
1464
918
    t->Inherit(BaseObject::GetConstructorTemplate(env));
1465
1377
    t->InstanceTemplate()->SetInternalFieldCount(
1466
459
        JSTransferable::kInternalFieldCount);
1467
459
    env->SetConstructorFunction(target, "JSTransferable", t);
1468
  }
1469
1470
459
  env->SetConstructorFunction(
1471
      target,
1472
      env->message_port_constructor_string(),
1473
459
      GetMessagePortConstructorTemplate(env));
1474
1475
  // These are not methods on the MessagePort prototype, because
1476
  // the browser equivalents do not provide them.
1477
459
  env->SetMethod(target, "stopMessagePort", MessagePort::Stop);
1478
459
  env->SetMethod(target, "checkMessagePort", MessagePort::CheckType);
1479
459
  env->SetMethod(target, "drainMessagePort", MessagePort::Drain);
1480
459
  env->SetMethod(target, "receiveMessageOnPort", MessagePort::ReceiveMessage);
1481
  env->SetMethod(target, "moveMessagePortToContext",
1482
459
                 MessagePort::MoveToContext);
1483
  env->SetMethod(target, "setDeserializerCreateObjectFunction",
1484
459
                 SetDeserializerCreateObjectFunction);
1485
459
  env->SetMethod(target, "broadcastChannel", BroadcastChannel);
1486
1487
  {
1488
918
    Local<Function> domexception = GetDOMException(context).ToLocalChecked();
1489
    target
1490
918
        ->Set(context,
1491
              FIXED_ONE_BYTE_STRING(env->isolate(), "DOMException"),
1492
1377
              domexception)
1493
        .Check();
1494
  }
1495
459
}
1496
1497
4779
static void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
1498
4779
  registry->Register(MessageChannel);
1499
4779
  registry->Register(BroadcastChannel);
1500
4779
  registry->Register(JSTransferable::New);
1501
4779
  registry->Register(MessagePort::New);
1502
4779
  registry->Register(MessagePort::PostMessage);
1503
4779
  registry->Register(MessagePort::Start);
1504
4779
  registry->Register(MessagePort::Stop);
1505
4779
  registry->Register(MessagePort::CheckType);
1506
4779
  registry->Register(MessagePort::Drain);
1507
4779
  registry->Register(MessagePort::ReceiveMessage);
1508
4779
  registry->Register(MessagePort::MoveToContext);
1509
4779
  registry->Register(SetDeserializerCreateObjectFunction);
1510
4779
}
1511
1512
}  // anonymous namespace
1513
1514
}  // namespace worker
1515
}  // namespace node
1516
1517
4837
NODE_MODULE_CONTEXT_AWARE_INTERNAL(messaging, node::worker::InitMessaging)
1518

19314
NODE_MODULE_EXTERNAL_REFERENCE(messaging,
1519
                               node::worker::RegisterExternalReferences)