GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/node_messaging.cc Lines: 741 802 92.4 %
Date: 2021-04-24 04:15:50 Branches: 372 507 73.4 %

Line Branch Exec Source
1
#include "node_messaging.h"
2
3
#include "async_wrap-inl.h"
4
#include "debug_utils-inl.h"
5
#include "memory_tracker-inl.h"
6
#include "node_buffer.h"
7
#include "node_contextify.h"
8
#include "node_errors.h"
9
#include "node_external_reference.h"
10
#include "node_process.h"
11
#include "util-inl.h"
12
13
using node::contextify::ContextifyContext;
14
using node::errors::TryCatchScope;
15
using v8::Array;
16
using v8::ArrayBuffer;
17
using v8::BackingStore;
18
using v8::CompiledWasmModule;
19
using v8::Context;
20
using v8::EscapableHandleScope;
21
using v8::Function;
22
using v8::FunctionCallbackInfo;
23
using v8::FunctionTemplate;
24
using v8::Global;
25
using v8::HandleScope;
26
using v8::Isolate;
27
using v8::Just;
28
using v8::Local;
29
using v8::Maybe;
30
using v8::MaybeLocal;
31
using v8::Nothing;
32
using v8::Object;
33
using v8::SharedArrayBuffer;
34
using v8::String;
35
using v8::Symbol;
36
using v8::Value;
37
using v8::ValueDeserializer;
38
using v8::ValueSerializer;
39
using v8::WasmModuleObject;
40
41
namespace node {
42
43
using BaseObjectList = std::vector<BaseObjectPtr<BaseObject>>;
44
45
1
BaseObject::TransferMode BaseObject::GetTransferMode() const {
46
1
  return BaseObject::TransferMode::kUntransferable;
47
}
48
49
std::unique_ptr<worker::TransferData> BaseObject::TransferForMessaging() {
50
  return CloneForMessaging();
51
}
52
53
std::unique_ptr<worker::TransferData> BaseObject::CloneForMessaging() const {
54
  return {};
55
}
56
57
10683
Maybe<BaseObjectList> BaseObject::NestedTransferables() const {
58
10683
  return Just(BaseObjectList {});
59
}
60
61
10433
Maybe<bool> BaseObject::FinalizeTransferRead(
62
    Local<Context> context, ValueDeserializer* deserializer) {
63
10433
  return Just(true);
64
}
65
66
namespace worker {
67
68
10694
Maybe<bool> TransferData::FinalizeTransferWrite(
69
    Local<Context> context, ValueSerializer* serializer) {
70
10694
  return Just(true);
71
}
72
73
83177
Message::Message(MallocedBuffer<char>&& buffer)
74
83177
    : main_message_buf_(std::move(buffer)) {}
75
76
117494
bool Message::IsCloseMessage() const {
77
117494
  return main_message_buf_.data == nullptr;
78
}
79
80
namespace {
81
82
// This is used to tell V8 how to read transferred host objects, like other
83
// `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
84
47950
class DeserializerDelegate : public ValueDeserializer::Delegate {
85
 public:
86
48017
  DeserializerDelegate(
87
      Message* m,
88
      Environment* env,
89
      const std::vector<BaseObjectPtr<BaseObject>>& host_objects,
90
      const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
91
      const std::vector<CompiledWasmModule>& wasm_modules)
92
48017
      : host_objects_(host_objects),
93
        shared_array_buffers_(shared_array_buffers),
94
48017
        wasm_modules_(wasm_modules) {}
95
96
10442
  MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
97
    // Identifying the index in the message's BaseObject array is sufficient.
98
    uint32_t id;
99
10442
    if (!deserializer->ReadUint32(&id))
100
      return MaybeLocal<Object>();
101
10442
    CHECK_LE(id, host_objects_.size());
102
20884
    return host_objects_[id]->object(isolate);
103
  }
104
105
435
  MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
106
      Isolate* isolate, uint32_t clone_id) override {
107
435
    CHECK_LE(clone_id, shared_array_buffers_.size());
108
870
    return shared_array_buffers_[clone_id];
109
  }
110
111
2
  MaybeLocal<WasmModuleObject> GetWasmModuleFromId(
112
      Isolate* isolate, uint32_t transfer_id) override {
113
2
    CHECK_LE(transfer_id, wasm_modules_.size());
114
    return WasmModuleObject::FromCompiledModule(
115
2
        isolate, wasm_modules_[transfer_id]);
116
  }
117
118
  ValueDeserializer* deserializer = nullptr;
119
120
 private:
121
  const std::vector<BaseObjectPtr<BaseObject>>& host_objects_;
122
  const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
123
  const std::vector<CompiledWasmModule>& wasm_modules_;
124
};
125
126
}  // anonymous namespace
127
128
48035
MaybeLocal<Value> Message::Deserialize(Environment* env,
129
                                       Local<Context> context,
130
                                       Local<Value>* port_list) {
131
  Context::Scope context_scope(context);
132
133
48032
  CHECK(!IsCloseMessage());
134

48033
  if (port_list != nullptr && !transferables_.empty()) {
135
    // Need to create this outside of the EscapableHandleScope, but inside
136
    // the Context::Scope.
137
20868
    *port_list = Array::New(env->isolate());
138
  }
139
140
48025
  EscapableHandleScope handle_scope(env->isolate());
141
142
  // Create all necessary objects for transferables, e.g. MessagePort handles.
143
96051
  std::vector<BaseObjectPtr<BaseObject>> host_objects(transferables_.size());
144
47915
  auto cleanup = OnScopeLeave([&]() {
145
47915
    for (BaseObjectPtr<BaseObject> object : host_objects) {
146
9
      if (!object) continue;
147
148
      // If the function did not finish successfully, host_objects will contain
149
      // a list of objects that will never be passed to JS. Therefore, we
150
      // destroy them here.
151
      object->Detach();
152
    }
153
144068
  });
154
155
58470
  for (uint32_t i = 0; i < transferables_.size(); ++i) {
156
20890
    HandleScope handle_scope(env->isolate());
157
10448
    TransferData* data = transferables_[i].get();
158
20896
    host_objects[i] = data->Deserialize(
159
20896
        env, context, std::move(transferables_[i]));
160
10454
    if (!host_objects[i]) return {};
161
10442
    if (port_list != nullptr) {
162
      // If we gather a list of all message ports, and this transferred object
163
      // is a message port, add it to that list. This is a bit of an odd case
164
      // of special handling for MessagePorts (as opposed to applying to all
165
      // transferables), but it's required for spec compliancy.
166
      DCHECK((*port_list)->IsArray());
167
10442
      Local<Array> port_list_array = port_list->As<Array>();
168
10442
      Local<Object> obj = host_objects[i]->object();
169
20884
      if (env->message_port_constructor_template()->HasInstance(obj)) {
170
31251
        if (port_list_array->Set(context,
171
                                 port_list_array->Length(),
172
20834
                                 obj).IsNothing()) {
173
          return {};
174
        }
175
      }
176
    }
177
  }
178
48021
  transferables_.clear();
179
180
96012
  std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
181
  // Attach all transferred SharedArrayBuffers to their new Isolate.
182
48457
  for (uint32_t i = 0; i < shared_array_buffers_.size(); ++i) {
183
    Local<SharedArrayBuffer> sab =
184
435
        SharedArrayBuffer::New(env->isolate(), shared_array_buffers_[i]);
185
435
    shared_array_buffers.push_back(sab);
186
  }
187
188
  DeserializerDelegate delegate(
189
96046
      this, env, host_objects, shared_array_buffers, wasm_modules_);
190
  ValueDeserializer deserializer(
191
      env->isolate(),
192
48021
      reinterpret_cast<const uint8_t*>(main_message_buf_.data),
193
      main_message_buf_.size,
194
144034
      &delegate);
195
48027
  delegate.deserializer = &deserializer;
196
197
  // Attach all transferred ArrayBuffers to their new Isolate.
198
48036
  for (uint32_t i = 0; i < array_buffers_.size(); ++i) {
199
    Local<ArrayBuffer> ab =
200
9
        ArrayBuffer::New(env->isolate(), std::move(array_buffers_[i]));
201
9
    deserializer.TransferArrayBuffer(i, ab);
202
  }
203
204
96056
  if (deserializer.ReadHeader(context).IsNothing())
205
    return {};
206
  Local<Value> return_value;
207
96054
  if (!deserializer.ReadValue(context).ToLocal(&return_value))
208
    return {};
209
210
58467
  for (BaseObjectPtr<BaseObject> base_object : host_objects) {
211
20884
    if (base_object->FinalizeTransferRead(context, &deserializer).IsNothing())
212
      return {};
213
  }
214
215
47958
  host_objects.clear();
216
47992
  return handle_scope.Escape(return_value);
217
}
218
219
682
void Message::AddSharedArrayBuffer(
220
    std::shared_ptr<BackingStore> backing_store) {
221
682
  shared_array_buffers_.emplace_back(std::move(backing_store));
222
682
}
223
224
10707
void Message::AddTransferable(std::unique_ptr<TransferData>&& data) {
225
10707
  transferables_.emplace_back(std::move(data));
226
10707
}
227
228
2
uint32_t Message::AddWASMModule(CompiledWasmModule&& mod) {
229
2
  wasm_modules_.emplace_back(std::move(mod));
230
2
  return wasm_modules_.size() - 1;
231
}
232
233
namespace {
234
235
32999
MaybeLocal<Function> GetEmitMessageFunction(Local<Context> context) {
236
32999
  Isolate* isolate = context->GetIsolate();
237
  Local<Object> per_context_bindings;
238
  Local<Value> emit_message_val;
239

131996
  if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
240
98997
      !per_context_bindings->Get(context,
241
98997
                                FIXED_ONE_BYTE_STRING(isolate, "emitMessage"))
242
32999
          .ToLocal(&emit_message_val)) {
243
    return MaybeLocal<Function>();
244
  }
245
32999
  CHECK(emit_message_val->IsFunction());
246
32999
  return emit_message_val.As<Function>();
247
}
248
249
479
MaybeLocal<Function> GetDOMException(Local<Context> context) {
250
479
  Isolate* isolate = context->GetIsolate();
251
  Local<Object> per_context_bindings;
252
  Local<Value> domexception_ctor_val;
253

1916
  if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
254
1437
      !per_context_bindings->Get(context,
255
1437
                                FIXED_ONE_BYTE_STRING(isolate, "DOMException"))
256
479
          .ToLocal(&domexception_ctor_val)) {
257
    return MaybeLocal<Function>();
258
  }
259
479
  CHECK(domexception_ctor_val->IsFunction());
260
479
  Local<Function> domexception_ctor = domexception_ctor_val.As<Function>();
261
479
  return domexception_ctor;
262
}
263
264
16
void ThrowDataCloneException(Local<Context> context, Local<String> message) {
265
16
  Isolate* isolate = context->GetIsolate();
266
  Local<Value> argv[] = {message,
267
48
                         FIXED_ONE_BYTE_STRING(isolate, "DataCloneError")};
268
  Local<Value> exception;
269
  Local<Function> domexception_ctor;
270

64
  if (!GetDOMException(context).ToLocal(&domexception_ctor) ||
271
64
      !domexception_ctor->NewInstance(context, arraysize(argv), argv)
272
16
           .ToLocal(&exception)) {
273
    return;
274
  }
275
16
  isolate->ThrowException(exception);
276
}
277
278
// This tells V8 how to serialize objects that it does not understand
279
// (e.g. C++ objects) into the output buffer, in a way that our own
280
// DeserializerDelegate understands how to unpack.
281
48999
class SerializerDelegate : public ValueSerializer::Delegate {
282
 public:
283
48997
  SerializerDelegate(Environment* env, Local<Context> context, Message* m)
284
48997
      : env_(env), context_(context), msg_(m) {}
285
286
6
  void ThrowDataCloneError(Local<String> message) override {
287
6
    ThrowDataCloneException(context_, message);
288
6
  }
289
290
10714
  Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
291
21428
    if (env_->base_object_ctor_template()->HasInstance(object)) {
292
      return WriteHostObject(
293
10714
          BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(object) });
294
    }
295
296
    ThrowDataCloneError(env_->clone_unsupported_type_str());
297
    return Nothing<bool>();
298
  }
299
300
682
  Maybe<uint32_t> GetSharedArrayBufferId(
301
      Isolate* isolate,
302
      Local<SharedArrayBuffer> shared_array_buffer) override {
303
    uint32_t i;
304
707
    for (i = 0; i < seen_shared_array_buffers_.size(); ++i) {
305
50
      if (PersistentToLocal::Strong(seen_shared_array_buffers_[i]) ==
306
          shared_array_buffer) {
307
        return Just(i);
308
      }
309
    }
310
311
682
    seen_shared_array_buffers_.emplace_back(
312
1364
      Global<SharedArrayBuffer> { isolate, shared_array_buffer });
313
682
    msg_->AddSharedArrayBuffer(shared_array_buffer->GetBackingStore());
314
682
    return Just(i);
315
  }
316
317
2
  Maybe<uint32_t> GetWasmModuleTransferId(
318
      Isolate* isolate, Local<WasmModuleObject> module) override {
319
2
    return Just(msg_->AddWASMModule(module->GetCompiledModule()));
320
  }
321
322
48977
  Maybe<bool> Finish(Local<Context> context) {
323
59684
    for (uint32_t i = 0; i < host_objects_.size(); i++) {
324
21418
      BaseObjectPtr<BaseObject> host_object = std::move(host_objects_[i]);
325
21418
      std::unique_ptr<TransferData> data;
326
10711
      if (i < first_cloned_object_index_)
327
10687
        data = host_object->TransferForMessaging();
328
10711
      if (!data)
329
27
        data = host_object->CloneForMessaging();
330
10711
      if (!data) return Nothing<bool>();
331
21416
      if (data->FinalizeTransferWrite(context, serializer).IsNothing())
332
1
        return Nothing<bool>();
333
10707
      msg_->AddTransferable(std::move(data));
334
    }
335
48973
    return Just(true);
336
  }
337
338
10694
  inline void AddHostObject(BaseObjectPtr<BaseObject> host_object) {
339
    // Make sure we have not started serializing the value itself yet.
340
10694
    CHECK_EQ(first_cloned_object_index_, SIZE_MAX);
341
10694
    host_objects_.emplace_back(std::move(host_object));
342
10694
  }
343
344
  // Some objects in the transfer list may register sub-objects that can be
345
  // transferred. This could e.g. be a public JS wrapper object, such as a
346
  // FileHandle, that is registering its C++ handle for transfer.
347
48984
  inline Maybe<bool> AddNestedHostObjects() {
348
59677
    for (size_t i = 0; i < host_objects_.size(); i++) {
349
21386
      std::vector<BaseObjectPtr<BaseObject>> nested_transferables;
350
21386
      if (!host_objects_[i]->NestedTransferables().To(&nested_transferables))
351
        return Nothing<bool>();
352

10703
      for (auto nested_transferable : nested_transferables) {
353
20
        if (std::find(host_objects_.begin(),
354
                      host_objects_.end(),
355
20
                      nested_transferable) == host_objects_.end()) {
356
10
          AddHostObject(nested_transferable);
357
        }
358
      }
359
    }
360
48984
    return Just(true);
361
  }
362
363
  ValueSerializer* serializer = nullptr;
364
365
 private:
366
10714
  Maybe<bool> WriteHostObject(BaseObjectPtr<BaseObject> host_object) {
367
10714
    BaseObject::TransferMode mode = host_object->GetTransferMode();
368
10714
    if (mode == BaseObject::TransferMode::kUntransferable) {
369
2
      ThrowDataCloneError(env_->clone_unsupported_type_str());
370
2
      return Nothing<bool>();
371
    }
372
373
10732
    for (uint32_t i = 0; i < host_objects_.size(); i++) {
374
10704
      if (host_objects_[i] == host_object) {
375
10684
        serializer->WriteUint32(i);
376
10684
        return Just(true);
377
      }
378
    }
379
380
28
    if (mode == BaseObject::TransferMode::kTransferable) {
381
4
      THROW_ERR_MISSING_TRANSFERABLE_IN_TRANSFER_LIST(env_);
382
4
      return Nothing<bool>();
383
    }
384
385
24
    CHECK_EQ(mode, BaseObject::TransferMode::kCloneable);
386
24
    uint32_t index = host_objects_.size();
387
24
    if (first_cloned_object_index_ == SIZE_MAX)
388
17
      first_cloned_object_index_ = index;
389
24
    serializer->WriteUint32(index);
390
24
    host_objects_.push_back(host_object);
391
24
    return Just(true);
392
  }
393
394
  Environment* env_;
395
  Local<Context> context_;
396
  Message* msg_;
397
  std::vector<Global<SharedArrayBuffer>> seen_shared_array_buffers_;
398
  std::vector<BaseObjectPtr<BaseObject>> host_objects_;
399
  size_t first_cloned_object_index_ = SIZE_MAX;
400
401
  friend class worker::Message;
402
};
403
404
}  // anonymous namespace
405
406
48991
Maybe<bool> Message::Serialize(Environment* env,
407
                               Local<Context> context,
408
                               Local<Value> input,
409
                               const TransferList& transfer_list_v,
410
                               Local<Object> source_port) {
411
97986
  HandleScope handle_scope(env->isolate());
412
  Context::Scope context_scope(context);
413
414
  // Verify that we're not silently overwriting an existing message.
415
48994
  CHECK(main_message_buf_.is_empty());
416
417
97993
  SerializerDelegate delegate(env, context, this);
418
97987
  ValueSerializer serializer(env->isolate(), &delegate);
419
48996
  delegate.serializer = &serializer;
420
421
97993
  std::vector<Local<ArrayBuffer>> array_buffers;
422
59704
  for (uint32_t i = 0; i < transfer_list_v.length(); ++i) {
423
10719
    Local<Value> entry = transfer_list_v[i];
424
10719
    if (entry->IsObject()) {
425
      // See https://github.com/nodejs/node/pull/30339#issuecomment-552225353
426
      // for details.
427
      bool untransferable;
428
32157
      if (!entry.As<Object>()->HasPrivate(
429
              context,
430
21438
              env->untransferable_object_private_symbol())
431
10719
              .To(&untransferable)) {
432
        return Nothing<bool>();
433
      }
434
10719
      if (untransferable) continue;
435
    }
436
437
    // Currently, we support ArrayBuffers and BaseObjects for which
438
    // GetTransferMode() does not return kUntransferable.
439
10714
    if (entry->IsArrayBuffer()) {
440
21
      Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
441
      // If we cannot render the ArrayBuffer unusable in this Isolate,
442
      // copying the buffer will have to do.
443
      // Note that we can currently transfer ArrayBuffers even if they were
444
      // not allocated by Node’s ArrayBufferAllocator in the first place,
445
      // because we pass the underlying v8::BackingStore around rather than
446
      // raw data *and* an Isolate with a non-default ArrayBuffer allocator
447
      // is always going to outlive any Workers it creates, and so will its
448
      // allocator along with it.
449
41
      if (!ab->IsDetachable()) continue;
450
42
      if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
451
42
          array_buffers.end()) {
452
1
        ThrowDataCloneException(
453
            context,
454
            FIXED_ONE_BYTE_STRING(
455
                env->isolate(),
456
1
                "Transfer list contains duplicate ArrayBuffer"));
457
1
        return Nothing<bool>();
458
      }
459
      // We simply use the array index in the `array_buffers` list as the
460
      // ID that we write into the serialized buffer.
461
20
      uint32_t id = array_buffers.size();
462
20
      array_buffers.push_back(ab);
463
20
      serializer.TransferArrayBuffer(id, ab);
464
20
      continue;
465
21386
    } else if (env->base_object_ctor_template()->HasInstance(entry)) {
466
      // Check if the source MessagePort is being transferred.
467

21386
      if (!source_port.IsEmpty() && entry == source_port) {
468
1
        ThrowDataCloneException(
469
            context,
470
            FIXED_ONE_BYTE_STRING(env->isolate(),
471
1
                                  "Transfer list contains source port"));
472
10
        return Nothing<bool>();
473
      }
474
      BaseObjectPtr<BaseObject> host_object {
475
10692
          Unwrap<BaseObject>(entry.As<Object>()) };
476

32073
      if (env->message_port_constructor_template()->HasInstance(entry) &&
477
21363
          (!host_object ||
478
10681
           static_cast<MessagePort*>(host_object.get())->IsDetached())) {
479
7
        ThrowDataCloneException(
480
            context,
481
            FIXED_ONE_BYTE_STRING(
482
                env->isolate(),
483
7
                "MessagePort in transfer list is already detached"));
484
7
        return Nothing<bool>();
485
      }
486
21370
      if (std::find(delegate.host_objects_.begin(),
487
                    delegate.host_objects_.end(),
488
21370
                    host_object) != delegate.host_objects_.end()) {
489
1
        ThrowDataCloneException(
490
            context,
491
            String::Concat(env->isolate(),
492
                FIXED_ONE_BYTE_STRING(
493
                  env->isolate(),
494
                  "Transfer list contains duplicate "),
495
2
                entry.As<Object>()->GetConstructorName()));
496
1
        return Nothing<bool>();
497
      }
498

10684
      if (host_object && host_object->GetTransferMode() !=
499
10684
              BaseObject::TransferMode::kUntransferable) {
500
10684
        delegate.AddHostObject(host_object);
501
10684
        continue;
502
      }
503
    }
504
505
    THROW_ERR_INVALID_TRANSFER_OBJECT(env);
506
    return Nothing<bool>();
507
  }
508
97973
  if (delegate.AddNestedHostObjects().IsNothing())
509
    return Nothing<bool>();
510
511
48989
  serializer.WriteHeader();
512
97975
  if (serializer.WriteValue(context, input).IsNothing()) {
513
9
    return Nothing<bool>();
514
  }
515
516
48990
  for (Local<ArrayBuffer> ab : array_buffers) {
517
    // If serialization succeeded, we render it inaccessible in this Isolate.
518
24
    std::shared_ptr<BackingStore> backing_store = ab->GetBackingStore();
519
12
    ab->Detach();
520
521
12
    array_buffers_.emplace_back(std::move(backing_store));
522
  }
523
524
97956
  if (delegate.Finish(context).IsNothing())
525
4
    return Nothing<bool>();
526
527
  // The serializer gave us a buffer allocated using `malloc()`.
528
48975
  std::pair<uint8_t*, size_t> data = serializer.Release();
529
48975
  CHECK_NOT_NULL(data.first);
530
  main_message_buf_ =
531
48975
      MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
532
48970
  return Just(true);
533
}
534
535
void Message::MemoryInfo(MemoryTracker* tracker) const {
536
  tracker->TrackField("array_buffers_", array_buffers_);
537
  tracker->TrackField("shared_array_buffers", shared_array_buffers_);
538
  tracker->TrackField("transferables", transferables_);
539
}
540
541
33652
MessagePortData::MessagePortData(MessagePort* owner)
542
33652
    : owner_(owner) {
543
33652
}
544
545
100797
MessagePortData::~MessagePortData() {
546
33581
  CHECK_NULL(owner_);
547
33581
  Disentangle();
548
67199
}
549
550
2
void MessagePortData::MemoryInfo(MemoryTracker* tracker) const {
551
4
  Mutex::ScopedLock lock(mutex_);
552
2
  tracker->TrackField("incoming_messages", incoming_messages_);
553
2
}
554
555
83074
void MessagePortData::AddToIncomingQueue(std::shared_ptr<Message> message) {
556
  // This function will be called by other threads.
557
166145
  Mutex::ScopedLock lock(mutex_);
558
83087
  incoming_messages_.emplace_back(std::move(message));
559
560
83070
  if (owner_ != nullptr) {
561
58419
    Debug(owner_, "Adding message to incoming queue");
562
58417
    owner_->TriggerAsync();
563
  }
564
83087
}
565
566
11390
void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
567
22780
  auto group = std::make_shared<SiblingGroup>();
568
11390
  group->Entangle({a, b});
569
11390
}
570
571
55882
void MessagePortData::Disentangle() {
572
55882
  if (group_) {
573
22796
    group_->Disentangle(this);
574
  }
575
55908
}
576
577
131891
MessagePort::~MessagePort() {
578
32971
  if (data_) Detach();
579
65946
}
580
581
32999
MessagePort::MessagePort(Environment* env,
582
                         Local<Context> context,
583
32999
                         Local<Object> wrap)
584
  : HandleWrap(env,
585
               wrap,
586
32999
               reinterpret_cast<uv_handle_t*>(&async_),
587
               AsyncWrap::PROVIDER_MESSAGEPORT),
588
32999
    data_(new MessagePortData(this)) {
589
101351
  auto onmessage = [](uv_async_t* handle) {
590
    // Called when data has been put into the queue.
591
34176
    MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
592
34176
    channel->OnMessage(MessageProcessingMode::kNormalOperation);
593
101345
  };
594
595
32999
  CHECK_EQ(uv_async_init(env->event_loop(),
596
                         &async_,
597
                         onmessage), 0);
598
  // Reset later to indicate success of the constructor.
599
32999
  bool succeeded = false;
600
98997
  auto cleanup = OnScopeLeave([&]() { if (!succeeded) Close(); });
601
602
  Local<Value> fn;
603
98997
  if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn))
604
    return;
605
606
32999
  if (fn->IsFunction()) {
607
32993
    Local<Function> init = fn.As<Function>();
608
65986
    if (init->Call(context, wrap, 0, nullptr).IsEmpty())
609
      return;
610
  }
611
612
  Local<Function> emit_message_fn;
613
65998
  if (!GetEmitMessageFunction(context).ToLocal(&emit_message_fn))
614
    return;
615
32999
  emit_message_fn_.Reset(env->isolate(), emit_message_fn);
616
617
32999
  succeeded = true;
618
32999
  Debug(this, "Created message port");
619
}
620
621
32034
bool MessagePort::IsDetached() const {
622

32034
  return data_ == nullptr || IsHandleClosing();
623
}
624
625
80195
void MessagePort::TriggerAsync() {
626
80195
  if (IsHandleClosing()) return;
627
80159
  CHECK_EQ(uv_async_send(&async_), 0);
628
}
629
630
32976
void MessagePort::Close(v8::Local<v8::Value> close_callback) {
631
65952
  Debug(this, "Closing message port, data set = %d", static_cast<int>(!!data_));
632
633
32976
  if (data_) {
634
    // Wrap this call with accessing the mutex, so that TriggerAsync()
635
    // can check IsHandleClosing() without race conditions.
636
65940
    Mutex::ScopedLock sibling_lock(data_->mutex_);
637
32971
    HandleWrap::Close(close_callback);
638
  } else {
639
5
    HandleWrap::Close(close_callback);
640
  }
641
32976
}
642
643
2
void MessagePort::New(const FunctionCallbackInfo<Value>& args) {
644
  // This constructor just throws an error. Unfortunately, we can’t use V8’s
645
  // ConstructorBehavior::kThrow, as that also removes the prototype from the
646
  // class (i.e. makes it behave like an arrow function).
647
2
  Environment* env = Environment::GetCurrent(args);
648
2
  THROW_ERR_CONSTRUCT_CALL_INVALID(env);
649
2
}
650
651
32999
MessagePort* MessagePort::New(
652
    Environment* env,
653
    Local<Context> context,
654
    std::unique_ptr<MessagePortData> data,
655
    std::shared_ptr<SiblingGroup> sibling_group) {
656
  Context::Scope context_scope(context);
657
32999
  Local<FunctionTemplate> ctor_templ = GetMessagePortConstructorTemplate(env);
658
659
  // Construct a new instance, then assign the listener instance and possibly
660
  // the MessagePortData to it.
661
  Local<Object> instance;
662
98997
  if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance))
663
    return nullptr;
664
32999
  MessagePort* port = new MessagePort(env, context, instance);
665
32999
  CHECK_NOT_NULL(port);
666
32999
  if (port->IsHandleClosing()) {
667
    // Construction failed with an exception.
668
    return nullptr;
669
  }
670
671
32999
  if (data) {
672
10831
    CHECK(!sibling_group);
673
10831
    port->Detach();
674
10831
    port->data_ = std::move(data);
675
676
    // This lock is here to avoid race conditions with the `owner_` read
677
    // in AddToIncomingQueue(). (This would likely be unproblematic without it,
678
    // but it's better to be safe than sorry.)
679
21662
    Mutex::ScopedLock lock(port->data_->mutex_);
680
10831
    port->data_->owner_ = port;
681
    // If the existing MessagePortData object had pending messages, this is
682
    // the easiest way to run that queue.
683
10831
    port->TriggerAsync();
684
22168
  } else if (sibling_group) {
685
41
    sibling_group->Entangle(port->data_.get());
686
  }
687
32999
  return port;
688
}
689
690
82532
MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
691
                                              MessageProcessingMode mode,
692
                                              Local<Value>* port_list) {
693
165137
  std::shared_ptr<Message> received;
694
  {
695
    // Get the head of the message queue.
696
141829
    Mutex::ScopedLock lock(data_->mutex_);
697
698
82620
    Debug(this, "MessagePort has message");
699
700
    bool wants_message =
701

82617
        receiving_messages_ ||
702
82617
        mode == MessageProcessingMode::kForceReadMessages;
703
    // We have nothing to do if:
704
    // - There are no pending messages
705
    // - We are not intending to receive messages, and the message we would
706
    //   receive is not the final "close" message.
707

165227
    if (data_->incoming_messages_.empty() ||
708
69602
        (!wants_message &&
709
10272
         !data_->incoming_messages_.front()->IsCloseMessage())) {
710
46567
      return env()->no_message_symbol();
711
    }
712
713
59314
    received = data_->incoming_messages_.front();
714
59333
    data_->incoming_messages_.pop_front();
715
  }
716
717
59297
  if (received->IsCloseMessage()) {
718
22620
    Close();
719
22620
    return env()->no_message_symbol();
720
  }
721
722
48037
  if (!env()->can_call_into_js()) return MaybeLocal<Value>();
723
724
48036
  return received->Deserialize(env(), context, port_list);
725
}
726
727
34667
void MessagePort::OnMessage(MessageProcessingMode mode) {
728
34667
  Debug(this, "Running MessagePort::OnMessage()");
729
69257
  HandleScope handle_scope(env()->isolate());
730
  Local<Context> context =
731
104001
      object(env()->isolate())->GetCreationContext().ToLocalChecked();
732
733
  size_t processing_limit;
734
34667
  if (mode == MessageProcessingMode::kNormalOperation) {
735
34176
    Mutex::ScopedLock(data_->mutex_);
736
68352
    processing_limit = std::max(data_->incoming_messages_.size(),
737
102528
                                static_cast<size_t>(1000));
738
  } else {
739
491
    processing_limit = std::numeric_limits<size_t>::max();
740
  }
741
742
  // data_ can only ever be modified by the owner thread, so no need to lock.
743
  // However, the message port may be transferred while it is processing
744
  // messages, so we need to check that this handle still owns its `data_` field
745
  // on every iteration.
746

130575
  while (data_) {
747
82526
    if (processing_limit-- == 0) {
748
      // Prevent event loop starvation by only processing those messages without
749
      // interruption that were already present when the OnMessage() call was
750
      // first triggered, but at least 1000 messages because otherwise the
751
      // overhead of repeatedly triggering the uv_async_t instance becomes
752
      // noticable, at least on Windows.
753
      // (That might require more investigation by somebody more familiar with
754
      // Windows.)
755
1
      TriggerAsync();
756
1
      return;
757
    }
758
759
130479
    HandleScope handle_scope(env()->isolate());
760

47954
    Context::Scope context_scope(context);
761
82518
    Local<Function> emit_message = PersistentToLocal::Strong(emit_message_fn_);
762
763
    Local<Value> payload;
764
82521
    Local<Value> port_list = Undefined(env()->isolate());
765
    Local<Value> message_error;
766
330140
    Local<Value> argv[3];
767
768
    {
769
      // Catch any exceptions from parsing the message itself (not from
770
      // emitting it) as 'messageeror' events.
771
164905
      TryCatchScope try_catch(env());
772
165002
      if (!ReceiveMessage(context, mode, &port_list).ToLocal(&payload)) {
773

6
        if (try_catch.HasCaught() && !try_catch.HasTerminated())
774
6
          message_error = try_catch.Exception();
775
6
        goto reschedule;
776
      }
777
    }
778
164963
    if (payload == env()->no_message_symbol()) break;
779
780
48017
    if (!env()->can_call_into_js()) {
781
      Debug(this, "MessagePort drains queue because !can_call_into_js()");
782
      // In this case there is nothing to do but to drain the current queue.
783
      continue;
784
    }
785
786
48007
    argv[0] = payload;
787
48007
    argv[1] = port_list;
788
96011
    argv[2] = env()->message_string();
789
790
96023
    if (MakeCallback(emit_message, arraysize(argv), argv).IsEmpty()) {
791
    reschedule:
792
73
      if (!message_error.IsEmpty()) {
793
6
        argv[0] = message_error;
794
12
        argv[1] = Undefined(env()->isolate());
795
12
        argv[2] = env()->messageerror_string();
796
6
        USE(MakeCallback(emit_message, arraysize(argv), argv));
797
      }
798
799
      // Re-schedule OnMessage() execution in case of failure.
800
73
      if (data_)
801
73
        TriggerAsync();
802
73
      return;
803
    }
804
  }
805
}
806
807
32970
void MessagePort::OnClose() {
808
32970
  Debug(this, "MessagePort::OnClose()");
809
32969
  if (data_) {
810
    // Detach() returns move(data_).
811
22297
    Detach()->Disentangle();
812
  }
813
32972
}
814
815
43796
std::unique_ptr<MessagePortData> MessagePort::Detach() {
816
43796
  CHECK(data_);
817
87591
  Mutex::ScopedLock lock(data_->mutex_);
818
43801
  data_->owner_ = nullptr;
819
87608
  return std::move(data_);
820
}
821
822
21348
BaseObject::TransferMode MessagePort::GetTransferMode() const {
823
21348
  if (IsDetached())
824
1
    return BaseObject::TransferMode::kUntransferable;
825
21347
  return BaseObject::TransferMode::kTransferable;
826
}
827
828
10671
std::unique_ptr<TransferData> MessagePort::TransferForMessaging() {
829
21342
  Close();
830
10671
  return Detach();
831
}
832
833
10417
BaseObjectPtr<BaseObject> MessagePortData::Deserialize(
834
    Environment* env,
835
    Local<Context> context,
836
    std::unique_ptr<TransferData> self) {
837
41668
  return BaseObjectPtr<MessagePort> { MessagePort::New(
838
      env, context,
839
41668
      static_unique_pointer_cast<MessagePortData>(std::move(self))) };
840
}
841
842
48998
Maybe<bool> MessagePort::PostMessage(Environment* env,
843
                                     Local<Value> message_v,
844
                                     const TransferList& transfer_v) {
845
48998
  Isolate* isolate = env->isolate();
846
48998
  Local<Object> obj = object(isolate);
847
97994
  Local<Context> context = obj->GetCreationContext().ToLocalChecked();
848
849
97990
  std::shared_ptr<Message> msg = std::make_shared<Message>();
850
851
  // Per spec, we need to both check if transfer list has the source port, and
852
  // serialize the input message, even if the MessagePort is closed or detached.
853
854
  Maybe<bool> serialization_maybe =
855
48987
      msg->Serialize(env, context, message_v, transfer_v, obj);
856
48994
  if (data_ == nullptr) {
857
2
    return serialization_maybe;
858
  }
859
48992
  if (serialization_maybe.IsNothing()) {
860
20
    return Nothing<bool>();
861
  }
862
863
97943
  std::string error;
864
48971
  Maybe<bool> res = data_->Dispatch(msg, &error);
865
48971
  if (res.IsNothing())
866
    return res;
867
868
48971
  if (!error.empty())
869
2
    ProcessEmitWarning(env, error.c_str());
870
871
48971
  return res;
872
}
873
874
48972
Maybe<bool> MessagePortData::Dispatch(
875
    std::shared_ptr<Message> message,
876
    std::string* error) {
877
48972
  if (!group_) {
878
    if (error != nullptr)
879
      *error = "MessagePortData is not entangled.";
880
    return Nothing<bool>();
881
  }
882
48971
  return group_->Dispatch(this, message, error);
883
}
884
885
10727
static Maybe<bool> ReadIterable(Environment* env,
886
                                Local<Context> context,
887
                                // NOLINTNEXTLINE(runtime/references)
888
                                TransferList& transfer_list,
889
                                Local<Value> object) {
890
10727
  if (!object->IsObject()) return Just(false);
891
892
10724
  if (object->IsArray()) {
893
10704
    Local<Array> arr = object.As<Array>();
894
10704
    size_t length = arr->Length();
895
10704
    transfer_list.AllocateSufficientStorage(length);
896
21421
    for (size_t i = 0; i < length; i++) {
897
32151
      if (!arr->Get(context, i).ToLocal(&transfer_list[i]))
898
        return Nothing<bool>();
899
    }
900
10704
    return Just(true);
901
  }
902
903
20
  Isolate* isolate = env->isolate();
904
  Local<Value> iterator_method;
905
80
  if (!object.As<Object>()->Get(context, Symbol::GetIterator(isolate))
906
20
      .ToLocal(&iterator_method)) return Nothing<bool>();
907
20
  if (!iterator_method->IsFunction()) return Just(false);
908
909
  Local<Value> iterator;
910
18
  if (!iterator_method.As<Function>()->Call(context, object, 0, nullptr)
911
6
      .ToLocal(&iterator)) return Nothing<bool>();
912
6
  if (!iterator->IsObject()) return Just(false);
913
914
  Local<Value> next;
915
24
  if (!iterator.As<Object>()->Get(context, env->next_string()).ToLocal(&next))
916
    return Nothing<bool>();
917
6
  if (!next->IsFunction()) return Just(false);
918
919
6
  std::vector<Local<Value>> entries;
920
7
  while (env->can_call_into_js()) {
921
    Local<Value> result;
922
15
    if (!next.As<Function>()->Call(context, iterator, 0, nullptr)
923
7
        .ToLocal(&result)) return Nothing<bool>();
924
4
    if (!result->IsObject()) return Just(false);
925
926
    Local<Value> done;
927
16
    if (!result.As<Object>()->Get(context, env->done_string()).ToLocal(&done))
928
      return Nothing<bool>();
929
4
    if (done->BooleanValue(isolate)) break;
930
931
    Local<Value> val;
932
8
    if (!result.As<Object>()->Get(context, env->value_string()).ToLocal(&val))
933
      return Nothing<bool>();
934
2
    entries.push_back(val);
935
  }
936
937
2
  transfer_list.AllocateSufficientStorage(entries.size());
938
2
  std::copy(entries.begin(), entries.end(), &transfer_list[0]);
939
2
  return Just(true);
940
}
941
942
49011
void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
943
49011
  Environment* env = Environment::GetCurrent(args);
944
49011
  Local<Object> obj = args.This();
945
98022
  Local<Context> context = obj->GetCreationContext().ToLocalChecked();
946
947
49011
  if (args.Length() == 0) {
948
    return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
949
13
                                       "MessagePort.postMessage");
950
  }
951
952

168473
  if (!args[1]->IsNullOrUndefined() && !args[1]->IsObject()) {
953
    // Browsers ignore null or undefined, and otherwise accept an array or an
954
    // options object.
955
    return THROW_ERR_INVALID_ARG_TYPE(env,
956
4
        "Optional transferList argument must be an iterable");
957
  }
958
959
98001
  TransferList transfer_list;
960
98012
  if (args[1]->IsObject()) {
961
    bool was_iterable;
962
21432
    if (!ReadIterable(env, context, transfer_list, args[1]).To(&was_iterable))
963
8
      return;
964
10716
    if (!was_iterable) {
965
      Local<Value> transfer_option;
966
65
      if (!args[1].As<Object>()->Get(context, env->transfer_string())
967
21
          .ToLocal(&transfer_option)) return;
968
26
      if (!transfer_option->IsUndefined()) {
969
22
        if (!ReadIterable(env, context, transfer_list, transfer_option)
970
12
            .To(&was_iterable)) return;
971
10
        if (!was_iterable) {
972
          return THROW_ERR_INVALID_ARG_TYPE(env,
973
7
              "Optional options.transfer argument must be an iterable");
974
        }
975
      }
976
    }
977
  }
978
979
48998
  MessagePort* port = Unwrap<MessagePort>(args.This());
980
  // Even if the backing MessagePort object has already been deleted, we still
981
  // want to serialize the message to ensure spec-compliant behavior w.r.t.
982
  // transfers.
983
48998
  if (port == nullptr) {
984
2
    Message msg;
985
1
    USE(msg.Serialize(env, context, args[0], transfer_list, obj));
986
1
    return;
987
  }
988
989
48997
  Maybe<bool> res = port->PostMessage(env, args[0], transfer_list);
990
48995
  if (res.IsJust())
991
146922
    args.GetReturnValue().Set(res.FromJust());
992
}
993
994
23364
void MessagePort::Start() {
995
23364
  Debug(this, "Start receiving messages");
996
23364
  receiving_messages_ = true;
997
46728
  Mutex::ScopedLock lock(data_->mutex_);
998
23364
  if (!data_->incoming_messages_.empty())
999
10874
    TriggerAsync();
1000
23364
}
1001
1002
20329
void MessagePort::Stop() {
1003
20329
  Debug(this, "Stop receiving messages");
1004
20329
  receiving_messages_ = false;
1005
20329
}
1006
1007
23365
void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
1008
  MessagePort* port;
1009
23366
  ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
1010
23365
  if (!port->data_) {
1011
1
    return;
1012
  }
1013
23364
  port->Start();
1014
}
1015
1016
20532
void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
1017
  MessagePort* port;
1018
41064
  CHECK(args[0]->IsObject());
1019
41267
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
1020
20331
  if (!port->data_) {
1021
2
    return;
1022
  }
1023
20329
  port->Stop();
1024
}
1025
1026
7
void MessagePort::CheckType(const FunctionCallbackInfo<Value>& args) {
1027
7
  Environment* env = Environment::GetCurrent(args);
1028
21
  args.GetReturnValue().Set(
1029
21
      GetMessagePortConstructorTemplate(env)->HasInstance(args[0]));
1030
7
}
1031
1032
1246
void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
1033
  MessagePort* port;
1034
2492
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
1035
491
  port->OnMessage(MessageProcessingMode::kForceReadMessages);
1036
}
1037
1038
13
void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) {
1039
13
  Environment* env = Environment::GetCurrent(args);
1040

49
  if (!args[0]->IsObject() ||
1041
33
      !env->message_port_constructor_template()->HasInstance(args[0])) {
1042
    return THROW_ERR_INVALID_ARG_TYPE(env,
1043
10
        "The \"port\" argument must be a MessagePort instance");
1044
  }
1045
16
  MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1046
8
  if (port == nullptr) {
1047
    // Return 'no messages' for a closed port.
1048
    args.GetReturnValue().Set(
1049
        Environment::GetCurrent(args)->no_message_symbol());
1050
    return;
1051
  }
1052
1053
  MaybeLocal<Value> payload = port->ReceiveMessage(
1054
24
      port->object()->GetCreationContext().ToLocalChecked(),
1055
8
      MessageProcessingMode::kForceReadMessages);
1056
8
  if (!payload.IsEmpty())
1057
16
    args.GetReturnValue().Set(payload.ToLocalChecked());
1058
}
1059
1060
5
void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
1061
5
  Environment* env = Environment::GetCurrent(args);
1062

20
  if (!args[0]->IsObject() ||
1063
15
      !env->message_port_constructor_template()->HasInstance(args[0])) {
1064
    return THROW_ERR_INVALID_ARG_TYPE(env,
1065
        "The \"port\" argument must be a MessagePort instance");
1066
  }
1067
10
  MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1068
5
  CHECK_NOT_NULL(port);
1069
1070
5
  Local<Value> context_arg = args[1];
1071
  ContextifyContext* context_wrapper;
1072

15
  if (!context_arg->IsObject() ||
1073
      (context_wrapper = ContextifyContext::ContextFromContextifiedSandbox(
1074
10
          env, context_arg.As<Object>())) == nullptr) {
1075
    return THROW_ERR_INVALID_ARG_TYPE(env, "Invalid context argument");
1076
  }
1077
1078
10
  std::unique_ptr<MessagePortData> data;
1079
5
  if (!port->IsDetached())
1080
5
    data = port->Detach();
1081
1082
5
  Context::Scope context_scope(context_wrapper->context());
1083
  MessagePort* target =
1084
5
      MessagePort::New(env, context_wrapper->context(), std::move(data));
1085
5
  if (target != nullptr)
1086
15
    args.GetReturnValue().Set(target->object());
1087
}
1088
1089
10737
void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
1090
10737
  MessagePortData::Entangle(a->data_.get(), b->data_.get());
1091
10737
}
1092
1093
653
void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
1094
653
  MessagePortData::Entangle(a->data_.get(), b);
1095
653
}
1096
1097
2
void MessagePort::MemoryInfo(MemoryTracker* tracker) const {
1098
2
  tracker->TrackField("data", data_);
1099
2
  tracker->TrackField("emit_message_fn", emit_message_fn_);
1100
2
}
1101
1102
33932
Local<FunctionTemplate> GetMessagePortConstructorTemplate(Environment* env) {
1103
  // Factor generating the MessagePort JS constructor into its own piece
1104
  // of code, because it is needed early on in the child environment setup.
1105
33932
  Local<FunctionTemplate> templ = env->message_port_constructor_template();
1106
33932
  if (!templ.IsEmpty())
1107
33469
    return templ;
1108
1109
  {
1110
463
    Local<FunctionTemplate> m = env->NewFunctionTemplate(MessagePort::New);
1111
926
    m->SetClassName(env->message_port_constructor_string());
1112
1389
    m->InstanceTemplate()->SetInternalFieldCount(
1113
463
        MessagePort::kInternalFieldCount);
1114
926
    m->Inherit(HandleWrap::GetConstructorTemplate(env));
1115
1116
463
    env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage);
1117
463
    env->SetProtoMethod(m, "start", MessagePort::Start);
1118
1119
463
    env->set_message_port_constructor_template(m);
1120
  }
1121
1122
463
  return GetMessagePortConstructorTemplate(env);
1123
}
1124
1125
3619
JSTransferable::JSTransferable(Environment* env, Local<Object> obj)
1126
3619
    : BaseObject(env, obj) {
1127
3619
  MakeWeak();
1128
3619
}
1129
1130
3619
void JSTransferable::New(const FunctionCallbackInfo<Value>& args) {
1131
3619
  CHECK(args.IsConstructCall());
1132
7238
  new JSTransferable(Environment::GetCurrent(args), args.This());
1133
3619
}
1134
1135
28
JSTransferable::TransferMode JSTransferable::GetTransferMode() const {
1136
  // Implement `kClone in this ? kCloneable : kTransferable`.
1137
56
  HandleScope handle_scope(env()->isolate());
1138
56
  errors::TryCatchScope ignore_exceptions(env());
1139
1140
  bool has_clone;
1141
112
  if (!object()->Has(env()->context(),
1142
140
                     env()->messaging_clone_symbol()).To(&has_clone)) {
1143
    return TransferMode::kUntransferable;
1144
  }
1145
1146
28
  return has_clone ? TransferMode::kCloneable : TransferMode::kTransferable;
1147
}
1148
1149
10
std::unique_ptr<TransferData> JSTransferable::TransferForMessaging() {
1150
10
  return TransferOrClone(TransferMode::kTransferable);
1151
}
1152
1153
10
std::unique_ptr<TransferData> JSTransferable::CloneForMessaging() const {
1154
10
  return TransferOrClone(TransferMode::kCloneable);
1155
}
1156
1157
20
std::unique_ptr<TransferData> JSTransferable::TransferOrClone(
1158
    TransferMode mode) const {
1159
  // Call `this[symbol]()` where `symbol` is `kClone` or `kTransfer`,
1160
  // which should return an object with `data` and `deserializeInfo` properties;
1161
  // `data` is written to the serializer later, and `deserializeInfo` is stored
1162
  // on the `TransferData` instance as a string.
1163
40
  HandleScope handle_scope(env()->isolate());
1164
20
  Local<Context> context = env()->isolate()->GetCurrentContext();
1165
  Local<Symbol> method_name = mode == TransferMode::kCloneable ?
1166
20
      env()->messaging_clone_symbol() : env()->messaging_transfer_symbol();
1167
1168
  Local<Value> method;
1169
60
  if (!object()->Get(context, method_name).ToLocal(&method)) {
1170
    return {};
1171
  }
1172
20
  if (method->IsFunction()) {
1173
    Local<Value> result_v;
1174
68
    if (!method.As<Function>()->Call(
1175
68
            context, object(), 0, nullptr).ToLocal(&result_v)) {
1176
20
      return {};
1177
    }
1178
1179
14
    if (result_v->IsObject()) {
1180
14
      Local<Object> result = result_v.As<Object>();
1181
      Local<Value> data;
1182
      Local<Value> deserialize_info;
1183

70
      if (!result->Get(context, env()->data_string()).ToLocal(&data) ||
1184
56
          !result->Get(context, env()->deserialize_info_string())
1185
14
              .ToLocal(&deserialize_info)) {
1186
        return {};
1187
      }
1188
28
      Utf8Value deserialize_info_str(env()->isolate(), deserialize_info);
1189
14
      if (*deserialize_info_str == nullptr) return {};
1190
28
      return std::make_unique<Data>(
1191
70
          *deserialize_info_str, Global<Value>(env()->isolate(), data));
1192
    }
1193
  }
1194
1195
3
  if (mode == TransferMode::kTransferable)
1196
    return TransferOrClone(TransferMode::kCloneable);
1197
  else
1198
3
    return {};
1199
}
1200
1201
Maybe<BaseObjectList>
1202
10
JSTransferable::NestedTransferables() const {
1203
  // Call `this[kTransferList]()` and return the resulting list of BaseObjects.
1204
20
  HandleScope handle_scope(env()->isolate());
1205
10
  Local<Context> context = env()->isolate()->GetCurrentContext();
1206
10
  Local<Symbol> method_name = env()->messaging_transfer_list_symbol();
1207
1208
  Local<Value> method;
1209
30
  if (!object()->Get(context, method_name).ToLocal(&method)) {
1210
    return Nothing<BaseObjectList>();
1211
  }
1212
10
  if (!method->IsFunction()) return Just(BaseObjectList {});
1213
1214
  Local<Value> list_v;
1215
40
  if (!method.As<Function>()->Call(
1216
40
          context, object(), 0, nullptr).ToLocal(&list_v)) {
1217
    return Nothing<BaseObjectList>();
1218
  }
1219
10
  if (!list_v->IsArray()) return Just(BaseObjectList {});
1220
10
  Local<Array> list = list_v.As<Array>();
1221
1222
20
  BaseObjectList ret;
1223
40
  for (size_t i = 0; i < list->Length(); i++) {
1224
    Local<Value> value;
1225
30
    if (!list->Get(context, i).ToLocal(&value))
1226
      return Nothing<BaseObjectList>();
1227
20
    if (env()->base_object_ctor_template()->HasInstance(value))
1228
10
      ret.emplace_back(Unwrap<BaseObject>(value));
1229
  }
1230
10
  return Just(ret);
1231
}
1232
1233
9
Maybe<bool> JSTransferable::FinalizeTransferRead(
1234
    Local<Context> context, ValueDeserializer* deserializer) {
1235
  // Call `this[kDeserialize](data)` where `data` comes from the return value
1236
  // of `this[kTransfer]()` or `this[kClone]()`.
1237
18
  HandleScope handle_scope(env()->isolate());
1238
  Local<Value> data;
1239
18
  if (!deserializer->ReadValue(context).ToLocal(&data)) return Nothing<bool>();
1240
1241
9
  Local<Symbol> method_name = env()->messaging_deserialize_symbol();
1242
  Local<Value> method;
1243
27
  if (!object()->Get(context, method_name).ToLocal(&method)) {
1244
    return Nothing<bool>();
1245
  }
1246
9
  if (!method->IsFunction()) return Just(true);
1247
1248
36
  if (method.As<Function>()->Call(context, object(), 1, &data).IsEmpty()) {
1249
    return Nothing<bool>();
1250
  }
1251
9
  return Just(true);
1252
}
1253
1254
14
JSTransferable::Data::Data(std::string&& deserialize_info,
1255
14
                           v8::Global<v8::Value>&& data)
1256
14
    : deserialize_info_(std::move(deserialize_info)),
1257
42
      data_(std::move(data)) {}
1258
1259
12
BaseObjectPtr<BaseObject> JSTransferable::Data::Deserialize(
1260
    Environment* env,
1261
    Local<Context> context,
1262
    std::unique_ptr<TransferData> self) {
1263
  // Create the JS wrapper object that will later be filled with data passed to
1264
  // the `[kDeserialize]()` method on it. This split is necessary, because here
1265
  // we need to create an object with the right prototype and internal fields,
1266
  // but the actual JS data stored in the serialized data can only be read at
1267
  // the end of the stream, after the main message has been read.
1268
1269
24
  if (context != env->context()) {
1270
1
    THROW_ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE(env);
1271
1
    return {};
1272
  }
1273
22
  HandleScope handle_scope(env->isolate());
1274
  Local<Value> info;
1275
22
  if (!ToV8Value(context, deserialize_info_).ToLocal(&info)) return {};
1276
1277
  Local<Value> ret;
1278
22
  CHECK(!env->messaging_deserialize_create_object().IsEmpty());
1279
66
  if (!env->messaging_deserialize_create_object()->Call(
1280

64
          context, Null(env->isolate()), 1, &info).ToLocal(&ret) ||
1281
29
      !env->base_object_ctor_template()->HasInstance(ret)) {
1282
2
    return {};
1283
  }
1284
1285
9
  return BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(ret) };
1286
}
1287
1288
14
Maybe<bool> JSTransferable::Data::FinalizeTransferWrite(
1289
    Local<Context> context, ValueSerializer* serializer) {
1290
28
  HandleScope handle_scope(context->GetIsolate());
1291
14
  auto ret = serializer->WriteValue(context, PersistentToLocal::Strong(data_));
1292
14
  data_.Reset();
1293
28
  return ret;
1294
}
1295
1296
41
std::shared_ptr<SiblingGroup> SiblingGroup::Get(const std::string& name) {
1297
82
  Mutex::ScopedLock lock(SiblingGroup::groups_mutex_);
1298
41
  std::shared_ptr<SiblingGroup> group;
1299
41
  auto i = groups_.find(name);
1300

41
  if (i == groups_.end() || i->second.expired()) {
1301
20
    group = std::make_shared<SiblingGroup>(name);
1302
20
    groups_[name] = group;
1303
  } else {
1304
21
    group = i->second.lock();
1305
  }
1306
82
  return group;
1307
}
1308
1309
20
void SiblingGroup::CheckSiblingGroup(const std::string& name) {
1310
40
  Mutex::ScopedLock lock(SiblingGroup::groups_mutex_);
1311
20
  auto i = groups_.find(name);
1312

20
  if (i != groups_.end() && i->second.expired())
1313
20
    groups_.erase(name);
1314
20
}
1315
1316
20
SiblingGroup::SiblingGroup(const std::string& name)
1317
20
    : name_(name) { }
1318
1319
22772
SiblingGroup::~SiblingGroup() {
1320
  // If this is a named group, check to see if we can remove the group
1321
11386
  if (!name_.empty())
1322
20
    CheckSiblingGroup(name_);
1323
11386
}
1324
1325
48973
Maybe<bool> SiblingGroup::Dispatch(
1326
    MessagePortData* source,
1327
    std::shared_ptr<Message> message,
1328
    std::string* error) {
1329
1330
97949
  Mutex::ScopedLock lock(group_mutex_);
1331
1332
  // The source MessagePortData is not part of this group.
1333
48975
  if (ports_.find(source) == ports_.end()) {
1334
    if (error != nullptr)
1335
      *error = "Source MessagePort is not entangled with this group.";
1336
    return Nothing<bool>();
1337
  }
1338
1339
  // There are no destination ports.
1340
48965
  if (size() <= 1)
1341
79
    return Just(false);
1342
1343
  // Transferables cannot be used when there is more
1344
  // than a single destination.
1345

48886
  if (size() > 2 && message->has_transferables()) {
1346
    if (error != nullptr)
1347
      *error = "Transferables cannot be used with multiple destinations.";
1348
    return Nothing<bool>();
1349
  }
1350
1351
146678
  for (MessagePortData* port : ports_) {
1352
97786
    if (port == source)
1353
48890
      continue;
1354
    // This loop should only be entered if there's only a single destination
1355
59600
    for (const auto& transferable : message->transferables()) {
1356
10706
      if (port == transferable.get()) {
1357
2
        if (error != nullptr) {
1358
          *error = "The target port was posted to itself, and the "
1359
2
                   "communication channel was lost";
1360
        }
1361
2
        return Just(true);
1362
      }
1363
    }
1364
48892
    port->AddToIncomingQueue(message);
1365
  }
1366
1367
48887
  return Just(true);
1368
}
1369
1370
41
void SiblingGroup::Entangle(MessagePortData* port) {
1371
41
  Entangle({ port });
1372
41
}
1373
1374
11431
void SiblingGroup::Entangle(std::initializer_list<MessagePortData*> ports) {
1375
22862
  Mutex::ScopedLock lock(group_mutex_);
1376
34252
  for (MessagePortData* data : ports) {
1377
22821
    ports_.insert(data);
1378
22821
    CHECK(!data->group_);
1379
22821
    data->group_ = shared_from_this();
1380
  }
1381
11431
}
1382
1383
22795
void SiblingGroup::Disentangle(MessagePortData* data) {
1384
45592
  auto self = shared_from_this();  // Keep alive until end of function.
1385
45594
  Mutex::ScopedLock lock(group_mutex_);
1386
22797
  ports_.erase(data);
1387
22797
  data->group_.reset();
1388
1389
22797
  data->AddToIncomingQueue(std::make_shared<Message>());
1390
  // If this is an anonymous group and there's another port, close it.
1391

22794
  if (size() == 1 && name_.empty())
1392
11386
    (*(ports_.begin()))->AddToIncomingQueue(std::make_shared<Message>());
1393
22797
}
1394
1395
4772
SiblingGroup::Map SiblingGroup::groups_;
1396
4772
Mutex SiblingGroup::groups_mutex_;
1397
1398
namespace {
1399
1400
463
static void SetDeserializerCreateObjectFunction(
1401
    const FunctionCallbackInfo<Value>& args) {
1402
463
  Environment* env = Environment::GetCurrent(args);
1403
926
  CHECK(args[0]->IsFunction());
1404
926
  env->set_messaging_deserialize_create_object(args[0].As<Function>());
1405
463
}
1406
1407
10738
static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
1408
10738
  Environment* env = Environment::GetCurrent(args);
1409
10738
  if (!args.IsConstructCall()) {
1410
1
    THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
1411
1
    return;
1412
  }
1413
1414
32211
  Local<Context> context = args.This()->GetCreationContext().ToLocalChecked();
1415
10737
  Context::Scope context_scope(context);
1416
1417
10737
  MessagePort* port1 = MessagePort::New(env, context);
1418
10737
  if (port1 == nullptr) return;
1419
10737
  MessagePort* port2 = MessagePort::New(env, context);
1420
10737
  if (port2 == nullptr) {
1421
    port1->Close();
1422
    return;
1423
  }
1424
1425
10737
  MessagePort::Entangle(port1, port2);
1426
1427
53685
  args.This()->Set(context, env->port1_string(), port1->object())
1428
      .Check();
1429
53685
  args.This()->Set(context, env->port2_string(), port2->object())
1430
      .Check();
1431
}
1432
1433
41
static void BroadcastChannel(const FunctionCallbackInfo<Value>& args) {
1434
123
  CHECK(args[0]->IsString());
1435
41
  Environment* env = Environment::GetCurrent(args);
1436
41
  Context::Scope context_scope(env->context());
1437
82
  Utf8Value name(env->isolate(), args[0]);
1438
  MessagePort* port =
1439
41
      MessagePort::New(env, env->context(), {}, SiblingGroup::Get(*name));
1440
41
  if (port != nullptr) {
1441
123
    args.GetReturnValue().Set(port->object());
1442
  }
1443
41
}
1444
1445
463
static void InitMessaging(Local<Object> target,
1446
                          Local<Value> unused,
1447
                          Local<Context> context,
1448
                          void* priv) {
1449
463
  Environment* env = Environment::GetCurrent(context);
1450
1451
  {
1452
926
    env->SetConstructorFunction(
1453
        target,
1454
        "MessageChannel",
1455
463
        env->NewFunctionTemplate(MessageChannel));
1456
  }
1457
1458
  {
1459
463
    Local<FunctionTemplate> t = env->NewFunctionTemplate(JSTransferable::New);
1460
926
    t->Inherit(BaseObject::GetConstructorTemplate(env));
1461
1389
    t->InstanceTemplate()->SetInternalFieldCount(
1462
463
        JSTransferable::kInternalFieldCount);
1463
463
    env->SetConstructorFunction(target, "JSTransferable", t);
1464
  }
1465
1466
463
  env->SetConstructorFunction(
1467
      target,
1468
      env->message_port_constructor_string(),
1469
463
      GetMessagePortConstructorTemplate(env));
1470
1471
  // These are not methods on the MessagePort prototype, because
1472
  // the browser equivalents do not provide them.
1473
463
  env->SetMethod(target, "stopMessagePort", MessagePort::Stop);
1474
463
  env->SetMethod(target, "checkMessagePort", MessagePort::CheckType);
1475
463
  env->SetMethod(target, "drainMessagePort", MessagePort::Drain);
1476
463
  env->SetMethod(target, "receiveMessageOnPort", MessagePort::ReceiveMessage);
1477
  env->SetMethod(target, "moveMessagePortToContext",
1478
463
                 MessagePort::MoveToContext);
1479
  env->SetMethod(target, "setDeserializerCreateObjectFunction",
1480
463
                 SetDeserializerCreateObjectFunction);
1481
463
  env->SetMethod(target, "broadcastChannel", BroadcastChannel);
1482
1483
  {
1484
926
    Local<Function> domexception = GetDOMException(context).ToLocalChecked();
1485
    target
1486
926
        ->Set(context,
1487
              FIXED_ONE_BYTE_STRING(env->isolate(), "DOMException"),
1488
1389
              domexception)
1489
        .Check();
1490
  }
1491
463
}
1492
1493
4705
static void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
1494
4705
  registry->Register(MessageChannel);
1495
4705
  registry->Register(BroadcastChannel);
1496
4705
  registry->Register(JSTransferable::New);
1497
4705
  registry->Register(MessagePort::New);
1498
4705
  registry->Register(MessagePort::PostMessage);
1499
4705
  registry->Register(MessagePort::Start);
1500
4705
  registry->Register(MessagePort::Stop);
1501
4705
  registry->Register(MessagePort::CheckType);
1502
4705
  registry->Register(MessagePort::Drain);
1503
4705
  registry->Register(MessagePort::ReceiveMessage);
1504
4705
  registry->Register(MessagePort::MoveToContext);
1505
4705
  registry->Register(SetDeserializerCreateObjectFunction);
1506
4705
}
1507
1508
}  // anonymous namespace
1509
1510
}  // namespace worker
1511
}  // namespace node
1512
1513
4764
NODE_MODULE_CONTEXT_AWARE_INTERNAL(messaging, node::worker::InitMessaging)
1514

19021
NODE_MODULE_EXTERNAL_REFERENCE(messaging,
1515
                               node::worker::RegisterExternalReferences)