GCC Code Coverage Report
Directory: ./ Exec Total Coverage
File: node_messaging.cc Lines: 747 803 93.0 %
Date: 2022-05-21 04:15:56 Branches: 374 506 73.9 %

Line Branch Exec Source
1
#include "node_messaging.h"
2
3
#include "async_wrap-inl.h"
4
#include "debug_utils-inl.h"
5
#include "memory_tracker-inl.h"
6
#include "node_buffer.h"
7
#include "node_contextify.h"
8
#include "node_errors.h"
9
#include "node_external_reference.h"
10
#include "node_process-inl.h"
11
#include "util-inl.h"
12
13
using node::contextify::ContextifyContext;
14
using node::errors::TryCatchScope;
15
using v8::Array;
16
using v8::ArrayBuffer;
17
using v8::BackingStore;
18
using v8::CompiledWasmModule;
19
using v8::Context;
20
using v8::EscapableHandleScope;
21
using v8::Function;
22
using v8::FunctionCallbackInfo;
23
using v8::FunctionTemplate;
24
using v8::Global;
25
using v8::HandleScope;
26
using v8::Isolate;
27
using v8::Just;
28
using v8::Local;
29
using v8::Maybe;
30
using v8::MaybeLocal;
31
using v8::Nothing;
32
using v8::Object;
33
using v8::SharedArrayBuffer;
34
using v8::String;
35
using v8::Symbol;
36
using v8::Value;
37
using v8::ValueDeserializer;
38
using v8::ValueSerializer;
39
using v8::WasmModuleObject;
40
41
namespace node {
42
43
using BaseObjectList = std::vector<BaseObjectPtr<BaseObject>>;
44
45
1
BaseObject::TransferMode BaseObject::GetTransferMode() const {
46
1
  return BaseObject::TransferMode::kUntransferable;
47
}
48
49
std::unique_ptr<worker::TransferData> BaseObject::TransferForMessaging() {
50
  return CloneForMessaging();
51
}
52
53
std::unique_ptr<worker::TransferData> BaseObject::CloneForMessaging() const {
54
  return {};
55
}
56
57
11082
Maybe<BaseObjectList> BaseObject::NestedTransferables() const {
58
11082
  return Just(BaseObjectList {});
59
}
60
61
10840
Maybe<bool> BaseObject::FinalizeTransferRead(
62
    Local<Context> context, ValueDeserializer* deserializer) {
63
10840
  return Just(true);
64
}
65
66
namespace worker {
67
68
11092
Maybe<bool> TransferData::FinalizeTransferWrite(
69
    Local<Context> context, ValueSerializer* serializer) {
70
11092
  return Just(true);
71
}
72
73
98437
Message::Message(MallocedBuffer<char>&& buffer)
74
98437
    : main_message_buf_(std::move(buffer)) {}
75
76
143704
bool Message::IsCloseMessage() const {
77
143704
  return main_message_buf_.data == nullptr;
78
}
79
80
namespace {
81
82
// This is used to tell V8 how to read transferred host objects, like other
83
// `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
84
class DeserializerDelegate : public ValueDeserializer::Delegate {
85
 public:
86
60650
  DeserializerDelegate(
87
      Message* m,
88
      Environment* env,
89
      const std::vector<BaseObjectPtr<BaseObject>>& host_objects,
90
      const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
91
      const std::vector<CompiledWasmModule>& wasm_modules)
92
60650
      : host_objects_(host_objects),
93
        shared_array_buffers_(shared_array_buffers),
94
60650
        wasm_modules_(wasm_modules) {}
95
96
10872
  MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
97
    // Identifying the index in the message's BaseObject array is sufficient.
98
    uint32_t id;
99
10872
    if (!deserializer->ReadUint32(&id))
100
      return MaybeLocal<Object>();
101
10872
    CHECK_LT(id, host_objects_.size());
102
21744
    return host_objects_[id]->object(isolate);
103
  }
104
105
824
  MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
106
      Isolate* isolate, uint32_t clone_id) override {
107
824
    CHECK_LT(clone_id, shared_array_buffers_.size());
108
1648
    return shared_array_buffers_[clone_id];
109
  }
110
111
2
  MaybeLocal<WasmModuleObject> GetWasmModuleFromId(
112
      Isolate* isolate, uint32_t transfer_id) override {
113
2
    CHECK_LT(transfer_id, wasm_modules_.size());
114
    return WasmModuleObject::FromCompiledModule(
115
2
        isolate, wasm_modules_[transfer_id]);
116
  }
117
118
  ValueDeserializer* deserializer = nullptr;
119
120
 private:
121
  const std::vector<BaseObjectPtr<BaseObject>>& host_objects_;
122
  const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
123
  const std::vector<CompiledWasmModule>& wasm_modules_;
124
};
125
126
}  // anonymous namespace
127
128
60663
MaybeLocal<Value> Message::Deserialize(Environment* env,
129
                                       Local<Context> context,
130
                                       Local<Value>* port_list) {
131
60663
  Context::Scope context_scope(context);
132
133
60663
  CHECK(!IsCloseMessage());
134

60663
  if (port_list != nullptr && !transferables_.empty()) {
135
    // Need to create this outside of the EscapableHandleScope, but inside
136
    // the Context::Scope.
137
21696
    *port_list = Array::New(env->isolate());
138
  }
139
140
60663
  EscapableHandleScope handle_scope(env->isolate());
141
142
  // Create all necessary objects for transferables, e.g. MessagePort handles.
143
121326
  std::vector<BaseObjectPtr<BaseObject>> host_objects(transferables_.size());
144
60663
  auto cleanup = OnScopeLeave([&]() {
145
60679
    for (BaseObjectPtr<BaseObject> object : host_objects) {
146
16
      if (!object) continue;
147
148
      // If the function did not finish successfully, host_objects will contain
149
      // a list of objects that will never be passed to JS. Therefore, we
150
      // destroy them here.
151
      object->Detach();
152
    }
153
121326
  });
154
155
71535
  for (uint32_t i = 0; i < transferables_.size(); ++i) {
156
10885
    HandleScope handle_scope(env->isolate());
157
10885
    TransferData* data = transferables_[i].get();
158
21770
    host_objects[i] = data->Deserialize(
159
21770
        env, context, std::move(transferables_[i]));
160
10885
    if (!host_objects[i]) return {};
161
10872
    if (port_list != nullptr) {
162
      // If we gather a list of all message ports, and this transferred object
163
      // is a message port, add it to that list. This is a bit of an odd case
164
      // of special handling for MessagePorts (as opposed to applying to all
165
      // transferables), but it's required for spec compliance.
166
      DCHECK((*port_list)->IsArray());
167
10872
      Local<Array> port_list_array = port_list->As<Array>();
168
10872
      Local<Object> obj = host_objects[i]->object();
169
21744
      if (env->message_port_constructor_template()->HasInstance(obj)) {
170
10824
        if (port_list_array->Set(context,
171
                                 port_list_array->Length(),
172
21648
                                 obj).IsNothing()) {
173
          return {};
174
        }
175
      }
176
    }
177
  }
178
60650
  transferables_.clear();
179
180
121300
  std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
181
  // Attach all transferred SharedArrayBuffers to their new Isolate.
182
61474
  for (uint32_t i = 0; i < shared_array_buffers_.size(); ++i) {
183
    Local<SharedArrayBuffer> sab =
184
824
        SharedArrayBuffer::New(env->isolate(), shared_array_buffers_[i]);
185
824
    shared_array_buffers.push_back(sab);
186
  }
187
188
  DeserializerDelegate delegate(
189
121300
      this, env, host_objects, shared_array_buffers, wasm_modules_);
190
  ValueDeserializer deserializer(
191
      env->isolate(),
192
60650
      reinterpret_cast<const uint8_t*>(main_message_buf_.data),
193
      main_message_buf_.size,
194
121300
      &delegate);
195
60650
  delegate.deserializer = &deserializer;
196
197
  // Attach all transferred ArrayBuffers to their new Isolate.
198
60659
  for (uint32_t i = 0; i < array_buffers_.size(); ++i) {
199
    Local<ArrayBuffer> ab =
200
9
        ArrayBuffer::New(env->isolate(), std::move(array_buffers_[i]));
201
9
    deserializer.TransferArrayBuffer(i, ab);
202
  }
203
204
121300
  if (deserializer.ReadHeader(context).IsNothing())
205
    return {};
206
  Local<Value> return_value;
207
121300
  if (!deserializer.ReadValue(context).ToLocal(&return_value))
208
2
    return {};
209
210
71520
  for (BaseObjectPtr<BaseObject> base_object : host_objects) {
211
21744
    if (base_object->FinalizeTransferRead(context, &deserializer).IsNothing())
212
      return {};
213
  }
214
215
60648
  host_objects.clear();
216
60648
  return handle_scope.Escape(return_value);
217
}
218
219
1058
void Message::AddSharedArrayBuffer(
220
    std::shared_ptr<BackingStore> backing_store) {
221
1058
  shared_array_buffers_.emplace_back(std::move(backing_store));
222
1058
}
223
224
11140
void Message::AddTransferable(std::unique_ptr<TransferData>&& data) {
225
11140
  transferables_.emplace_back(std::move(data));
226
11140
}
227
228
2
uint32_t Message::AddWASMModule(CompiledWasmModule&& mod) {
229
2
  wasm_modules_.emplace_back(std::move(mod));
230
2
  return wasm_modules_.size() - 1;
231
}
232
233
namespace {
234
235
35062
MaybeLocal<Function> GetEmitMessageFunction(Local<Context> context) {
236
35062
  Isolate* isolate = context->GetIsolate();
237
  Local<Object> per_context_bindings;
238
  Local<Value> emit_message_val;
239
105186
  if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
240
70124
      !per_context_bindings->Get(context,
241
105186
                                FIXED_ONE_BYTE_STRING(isolate, "emitMessage"))
242
35062
          .ToLocal(&emit_message_val)) {
243
    return MaybeLocal<Function>();
244
  }
245
35062
  CHECK(emit_message_val->IsFunction());
246
35062
  return emit_message_val.As<Function>();
247
}
248
249
878
MaybeLocal<Function> GetDOMException(Local<Context> context) {
250
878
  Isolate* isolate = context->GetIsolate();
251
  Local<Object> per_context_bindings;
252
  Local<Value> domexception_ctor_val;
253
2634
  if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
254
1756
      !per_context_bindings->Get(context,
255
2634
                                FIXED_ONE_BYTE_STRING(isolate, "DOMException"))
256
878
          .ToLocal(&domexception_ctor_val)) {
257
    return MaybeLocal<Function>();
258
  }
259
878
  CHECK(domexception_ctor_val->IsFunction());
260
878
  Local<Function> domexception_ctor = domexception_ctor_val.As<Function>();
261
878
  return domexception_ctor;
262
}
263
264
22
void ThrowDataCloneException(Local<Context> context, Local<String> message) {
265
22
  Isolate* isolate = context->GetIsolate();
266
  Local<Value> argv[] = {message,
267
44
                         FIXED_ONE_BYTE_STRING(isolate, "DataCloneError")};
268
  Local<Value> exception;
269
  Local<Function> domexception_ctor;
270
66
  if (!GetDOMException(context).ToLocal(&domexception_ctor) ||
271
44
      !domexception_ctor->NewInstance(context, arraysize(argv), argv)
272
22
           .ToLocal(&exception)) {
273
    return;
274
  }
275
22
  isolate->ThrowException(exception);
276
}
277
278
// This tells V8 how to serialize objects that it does not understand
279
// (e.g. C++ objects) into the output buffer, in a way that our own
280
// DeserializerDelegate understands how to unpack.
281
class SerializerDelegate : public ValueSerializer::Delegate {
282
 public:
283
61795
  SerializerDelegate(Environment* env, Local<Context> context, Message* m)
284
61795
      : env_(env), context_(context), msg_(m) {}
285
286
12
  void ThrowDataCloneError(Local<String> message) override {
287
12
    ThrowDataCloneException(context_, message);
288
12
  }
289
290
11151
  Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
291
22302
    if (env_->base_object_ctor_template()->HasInstance(object)) {
292
      return WriteHostObject(
293
11151
          BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(object) });
294
    }
295
296
    ThrowDataCloneError(env_->clone_unsupported_type_str());
297
    return Nothing<bool>();
298
  }
299
300
1058
  Maybe<uint32_t> GetSharedArrayBufferId(
301
      Isolate* isolate,
302
      Local<SharedArrayBuffer> shared_array_buffer) override {
303
    uint32_t i;
304
1080
    for (i = 0; i < seen_shared_array_buffers_.size(); ++i) {
305
44
      if (PersistentToLocal::Strong(seen_shared_array_buffers_[i]) ==
306
          shared_array_buffer) {
307
        return Just(i);
308
      }
309
    }
310
311
    seen_shared_array_buffers_.emplace_back(
312
2116
      Global<SharedArrayBuffer> { isolate, shared_array_buffer });
313
1058
    msg_->AddSharedArrayBuffer(shared_array_buffer->GetBackingStore());
314
1058
    return Just(i);
315
  }
316
317
2
  Maybe<uint32_t> GetWasmModuleTransferId(
318
      Isolate* isolate, Local<WasmModuleObject> module) override {
319
2
    return Just(msg_->AddWASMModule(module->GetCompiledModule()));
320
  }
321
322
61766
  Maybe<bool> Finish(Local<Context> context) {
323
72906
    for (uint32_t i = 0; i < host_objects_.size(); i++) {
324
11145
      BaseObjectPtr<BaseObject> host_object = std::move(host_objects_[i]);
325
11145
      std::unique_ptr<TransferData> data;
326
11145
      if (i < first_cloned_object_index_)
327
11108
        data = host_object->TransferForMessaging();
328
11145
      if (!data)
329
41
        data = host_object->CloneForMessaging();
330
11145
      if (!data) return Nothing<bool>();
331
22282
      if (data->FinalizeTransferWrite(context, serializer).IsNothing())
332
1
        return Nothing<bool>();
333
11140
      msg_->AddTransferable(std::move(data));
334
    }
335
61761
    return Just(true);
336
  }
337
338
11116
  inline void AddHostObject(BaseObjectPtr<BaseObject> host_object) {
339
    // Make sure we have not started serializing the value itself yet.
340
11116
    CHECK_EQ(first_cloned_object_index_, SIZE_MAX);
341
11116
    host_objects_.emplace_back(std::move(host_object));
342
11116
  }
343
344
  // Some objects in the transfer list may register sub-objects that can be
345
  // transferred. This could e.g. be a public JS wrapper object, such as a
346
  // FileHandle, that is registering its C++ handle for transfer.
347
61785
  inline Maybe<bool> AddNestedHostObjects() {
348
72900
    for (size_t i = 0; i < host_objects_.size(); i++) {
349
11115
      std::vector<BaseObjectPtr<BaseObject>> nested_transferables;
350
22230
      if (!host_objects_[i]->NestedTransferables().To(&nested_transferables))
351
        return Nothing<bool>();
352
11149
      for (auto nested_transferable : nested_transferables) {
353
68
        if (std::find(host_objects_.begin(),
354
                      host_objects_.end(),
355
34
                      nested_transferable) == host_objects_.end()) {
356
34
          AddHostObject(nested_transferable);
357
        }
358
      }
359
    }
360
61785
    return Just(true);
361
  }
362
363
  ValueSerializer* serializer = nullptr;
364
365
 private:
366
11151
  Maybe<bool> WriteHostObject(BaseObjectPtr<BaseObject> host_object) {
367
11151
    BaseObject::TransferMode mode = host_object->GetTransferMode();
368
11151
    if (mode == BaseObject::TransferMode::kUntransferable) {
369
2
      ThrowDataCloneError(env_->clone_unsupported_type_str());
370
2
      return Nothing<bool>();
371
    }
372
373
11204
    for (uint32_t i = 0; i < host_objects_.size(); i++) {
374
11160
      if (host_objects_[i] == host_object) {
375
11105
        serializer->WriteUint32(i);
376
11105
        return Just(true);
377
      }
378
    }
379
380
44
    if (mode == BaseObject::TransferMode::kTransferable) {
381
7
      THROW_ERR_MISSING_TRANSFERABLE_IN_TRANSFER_LIST(env_);
382
7
      return Nothing<bool>();
383
    }
384
385
37
    CHECK_EQ(mode, BaseObject::TransferMode::kCloneable);
386
37
    uint32_t index = host_objects_.size();
387
37
    if (first_cloned_object_index_ == SIZE_MAX)
388
30
      first_cloned_object_index_ = index;
389
37
    serializer->WriteUint32(index);
390
37
    host_objects_.push_back(host_object);
391
37
    return Just(true);
392
  }
393
394
  Environment* env_;
395
  Local<Context> context_;
396
  Message* msg_;
397
  std::vector<Global<SharedArrayBuffer>> seen_shared_array_buffers_;
398
  std::vector<BaseObjectPtr<BaseObject>> host_objects_;
399
  size_t first_cloned_object_index_ = SIZE_MAX;
400
401
  friend class worker::Message;
402
};
403
404
}  // anonymous namespace
405
406
61795
Maybe<bool> Message::Serialize(Environment* env,
407
                               Local<Context> context,
408
                               Local<Value> input,
409
                               const TransferList& transfer_list_v,
410
                               Local<Object> source_port) {
411
123590
  HandleScope handle_scope(env->isolate());
412
61795
  Context::Scope context_scope(context);
413
414
  // Verify that we're not silently overwriting an existing message.
415
61795
  CHECK(main_message_buf_.is_empty());
416
417
123590
  SerializerDelegate delegate(env, context, this);
418
123590
  ValueSerializer serializer(env->isolate(), &delegate);
419
61795
  delegate.serializer = &serializer;
420
421
123590
  std::vector<Local<ArrayBuffer>> array_buffers;
422
72905
  for (uint32_t i = 0; i < transfer_list_v.length(); ++i) {
423
11120
    Local<Value> entry = transfer_list_v[i];
424
11120
    if (entry->IsObject()) {
425
      // See https://github.com/nodejs/node/pull/30339#issuecomment-552225353
426
      // for details.
427
      bool untransferable;
428
11120
      if (!entry.As<Object>()->HasPrivate(
429
              context,
430
11120
              env->untransferable_object_private_symbol())
431
11120
              .To(&untransferable)) {
432
        return Nothing<bool>();
433
      }
434
11120
      if (untransferable) continue;
435
    }
436
437
    // Currently, we support ArrayBuffers and BaseObjects for which
438
    // GetTransferMode() does not return kUntransferable.
439
11115
    if (entry->IsArrayBuffer()) {
440
24
      Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
441
      // If we cannot render the ArrayBuffer unusable in this Isolate,
442
      // copying the buffer will have to do.
443
      // Note that we can currently transfer ArrayBuffers even if they were
444
      // not allocated by Node’s ArrayBufferAllocator in the first place,
445
      // because we pass the underlying v8::BackingStore around rather than
446
      // raw data *and* an Isolate with a non-default ArrayBuffer allocator
447
      // is always going to outlive any Workers it creates, and so will its
448
      // allocator along with it.
449
47
      if (!ab->IsDetachable()) continue;
450
24
      if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
451
48
          array_buffers.end()) {
452
1
        ThrowDataCloneException(
453
            context,
454
            FIXED_ONE_BYTE_STRING(
455
                env->isolate(),
456
                "Transfer list contains duplicate ArrayBuffer"));
457
1
        return Nothing<bool>();
458
      }
459
      // We simply use the array index in the `array_buffers` list as the
460
      // ID that we write into the serialized buffer.
461
23
      uint32_t id = array_buffers.size();
462
23
      array_buffers.push_back(ab);
463
23
      serializer.TransferArrayBuffer(id, ab);
464
23
      continue;
465
22182
    } else if (env->base_object_ctor_template()->HasInstance(entry)) {
466
      // Check if the source MessagePort is being transferred.
467

22182
      if (!source_port.IsEmpty() && entry == source_port) {
468
1
        ThrowDataCloneException(
469
            context,
470
            FIXED_ONE_BYTE_STRING(env->isolate(),
471
                                  "Transfer list contains source port"));
472
9
        return Nothing<bool>();
473
      }
474
      BaseObjectPtr<BaseObject> host_object {
475
11090
          Unwrap<BaseObject>(entry.As<Object>()) };
476

33241
      if (env->message_port_constructor_template()->HasInstance(entry) &&
477

22121
          (!host_object ||
478
11060
           static_cast<MessagePort*>(host_object.get())->IsDetached())) {
479
7
        ThrowDataCloneException(
480
            context,
481
            FIXED_ONE_BYTE_STRING(
482
                env->isolate(),
483
                "MessagePort in transfer list is already detached"));
484
7
        return Nothing<bool>();
485
      }
486
22166
      if (std::find(delegate.host_objects_.begin(),
487
                    delegate.host_objects_.end(),
488
11083
                    host_object) != delegate.host_objects_.end()) {
489
1
        ThrowDataCloneException(
490
            context,
491
            String::Concat(env->isolate(),
492
                FIXED_ONE_BYTE_STRING(
493
                  env->isolate(),
494
                  "Transfer list contains duplicate "),
495
1
                entry.As<Object>()->GetConstructorName()));
496
1
        return Nothing<bool>();
497
      }
498

11082
      if (host_object && host_object->GetTransferMode() !=
499
11082
              BaseObject::TransferMode::kUntransferable) {
500
11082
        delegate.AddHostObject(host_object);
501
11082
        continue;
502
      }
503
    }
504
505
    THROW_ERR_INVALID_TRANSFER_OBJECT(env);
506
    return Nothing<bool>();
507
  }
508
123570
  if (delegate.AddNestedHostObjects().IsNothing())
509
    return Nothing<bool>();
510
511
61785
  serializer.WriteHeader();
512
123570
  if (serializer.WriteValue(context, input).IsNothing()) {
513
19
    return Nothing<bool>();
514
  }
515
516
61781
  for (Local<ArrayBuffer> ab : array_buffers) {
517
    // If serialization succeeded, we render it inaccessible in this Isolate.
518
30
    std::shared_ptr<BackingStore> backing_store = ab->GetBackingStore();
519
15
    ab->Detach();
520
521
15
    array_buffers_.emplace_back(std::move(backing_store));
522
  }
523
524
123532
  if (delegate.Finish(context).IsNothing())
525
5
    return Nothing<bool>();
526
527
  // The serializer gave us a buffer allocated using `malloc()`.
528
61761
  std::pair<uint8_t*, size_t> data = serializer.Release();
529
61761
  CHECK_NOT_NULL(data.first);
530
  main_message_buf_ =
531
61761
      MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
532
61761
  return Just(true);
533
}
534
535
2
void Message::MemoryInfo(MemoryTracker* tracker) const {
536
2
  tracker->TrackField("array_buffers_", array_buffers_);
537
2
  tracker->TrackField("shared_array_buffers", shared_array_buffers_);
538
2
  tracker->TrackField("transferables", transferables_);
539
2
}
540
541
36094
MessagePortData::MessagePortData(MessagePort* owner)
542
36094
    : owner_(owner) {
543
36094
}
544
545
144280
MessagePortData::~MessagePortData() {
546
72140
  CHECK_NULL(owner_);
547
72140
  Disentangle();
548
144280
}
549
550
4
void MessagePortData::MemoryInfo(MemoryTracker* tracker) const {
551
8
  Mutex::ScopedLock lock(mutex_);
552
4
  tracker->TrackField("incoming_messages", incoming_messages_);
553
4
}
554
555
98276
void MessagePortData::AddToIncomingQueue(std::shared_ptr<Message> message) {
556
  // This function will be called by other threads.
557
196552
  Mutex::ScopedLock lock(mutex_);
558
98276
  incoming_messages_.emplace_back(std::move(message));
559
560
98276
  if (owner_ != nullptr) {
561
70956
    Debug(owner_, "Adding message to incoming queue");
562
70956
    owner_->TriggerAsync();
563
  }
564
98276
}
565
566
12197
void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
567
12197
  auto group = std::make_shared<SiblingGroup>();
568
12197
  group->Entangle({a, b});
569
12197
}
570
571
60034
void MessagePortData::Disentangle() {
572
60034
  if (group_) {
573
24444
    group_->Disentangle(this);
574
  }
575
60034
}
576
577
210228
MessagePort::~MessagePort() {
578
70076
  if (data_) Detach();
579
140152
}
580
581
35062
MessagePort::MessagePort(Environment* env,
582
                         Local<Context> context,
583
35062
                         Local<Object> wrap)
584
  : HandleWrap(env,
585
               wrap,
586
35062
               reinterpret_cast<uv_handle_t*>(&async_),
587
               AsyncWrap::PROVIDER_MESSAGEPORT),
588
35062
    data_(new MessagePortData(this)) {
589
37386
  auto onmessage = [](uv_async_t* handle) {
590
    // Called when data has been put into the queue.
591
37386
    MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
592
37386
    channel->OnMessage(MessageProcessingMode::kNormalOperation);
593
37383
  };
594
595
35062
  CHECK_EQ(uv_async_init(env->event_loop(),
596
                         &async_,
597
                         onmessage), 0);
598
  // Reset later to indicate success of the constructor.
599
35062
  bool succeeded = false;
600
70124
  auto cleanup = OnScopeLeave([&]() { if (!succeeded) Close(); });
601
602
  Local<Value> fn;
603
105186
  if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn))
604
    return;
605
606
35062
  if (fn->IsFunction()) {
607
35056
    Local<Function> init = fn.As<Function>();
608
70112
    if (init->Call(context, wrap, 0, nullptr).IsEmpty())
609
      return;
610
  }
611
612
  Local<Function> emit_message_fn;
613
70124
  if (!GetEmitMessageFunction(context).ToLocal(&emit_message_fn))
614
    return;
615
35062
  emit_message_fn_.Reset(env->isolate(), emit_message_fn);
616
617
35062
  succeeded = true;
618
35062
  Debug(this, "Created message port");
619
}
620
621
33190
bool MessagePort::IsDetached() const {
622

33190
  return data_ == nullptr || IsHandleClosing();
623
}
624
625
94186
void MessagePort::TriggerAsync() {
626
94186
  if (IsHandleClosing()) return;
627
94109
  CHECK_EQ(uv_async_send(&async_), 0);
628
}
629
630
35046
void MessagePort::Close(v8::Local<v8::Value> close_callback) {
631
35046
  Debug(this, "Closing message port, data set = %d", static_cast<int>(!!data_));
632
633
35046
  if (data_) {
634
    // Wrap this call with accessing the mutex, so that TriggerAsync()
635
    // can check IsHandleClosing() without race conditions.
636
70082
    Mutex::ScopedLock sibling_lock(data_->mutex_);
637
35041
    HandleWrap::Close(close_callback);
638
  } else {
639
5
    HandleWrap::Close(close_callback);
640
  }
641
35046
}
642
643
2
void MessagePort::New(const FunctionCallbackInfo<Value>& args) {
644
  // This constructor just throws an error. Unfortunately, we can’t use V8’s
645
  // ConstructorBehavior::kThrow, as that also removes the prototype from the
646
  // class (i.e. makes it behave like an arrow function).
647
2
  Environment* env = Environment::GetCurrent(args);
648
2
  THROW_ERR_CONSTRUCT_CALL_INVALID(env);
649
2
}
650
651
35062
MessagePort* MessagePort::New(
652
    Environment* env,
653
    Local<Context> context,
654
    std::unique_ptr<MessagePortData> data,
655
    std::shared_ptr<SiblingGroup> sibling_group) {
656
35062
  Context::Scope context_scope(context);
657
35062
  Local<FunctionTemplate> ctor_templ = GetMessagePortConstructorTemplate(env);
658
659
  // Construct a new instance, then assign the listener instance and possibly
660
  // the MessagePortData to it.
661
  Local<Object> instance;
662
105186
  if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance))
663
    return nullptr;
664
35062
  MessagePort* port = new MessagePort(env, context, instance);
665
35062
  CHECK_NOT_NULL(port);
666
35062
  if (port->IsHandleClosing()) {
667
    // Construction failed with an exception.
668
    return nullptr;
669
  }
670
671
35062
  if (data) {
672
11626
    CHECK(!sibling_group);
673
11626
    port->Detach();
674
11626
    port->data_ = std::move(data);
675
676
    // This lock is here to avoid race conditions with the `owner_` read
677
    // in AddToIncomingQueue(). (This would likely be unproblematic without it,
678
    // but it's better to be safe than sorry.)
679
23252
    Mutex::ScopedLock lock(port->data_->mutex_);
680
11626
    port->data_->owner_ = port;
681
    // If the existing MessagePortData object had pending messages, this is
682
    // the easiest way to run that queue.
683
11626
    port->TriggerAsync();
684
23436
  } else if (sibling_group) {
685
74
    sibling_group->Entangle(port->data_.get());
686
  }
687
35062
  return port;
688
}
689
690
98432
MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
691
                                              MessageProcessingMode mode,
692
                                              Local<Value>* port_list) {
693
98432
  std::shared_ptr<Message> received;
694
  {
695
    // Get the head of the message queue.
696
98432
    Mutex::ScopedLock lock(data_->mutex_);
697
698
98432
    Debug(this, "MessagePort has message");
699
700
98432
    bool wants_message =
701

98432
        receiving_messages_ ||
702
        mode == MessageProcessingMode::kForceReadMessages;
703
    // We have nothing to do if:
704
    // - There are no pending messages
705
    // - We are not intending to receive messages, and the message we would
706
    //   receive is not the final "close" message.
707

171201
    if (data_->incoming_messages_.empty() ||
708
72769
        (!wants_message &&
709
10282
         !data_->incoming_messages_.front()->IsCloseMessage())) {
710
51346
      return env()->no_message_symbol();
711
    }
712
713
72759
    received = data_->incoming_messages_.front();
714
72759
    data_->incoming_messages_.pop_front();
715
  }
716
717
72759
  if (received->IsCloseMessage()) {
718
24188
    Close();
719
24188
    return env()->no_message_symbol();
720
  }
721
722
60665
  if (!env()->can_call_into_js()) return MaybeLocal<Value>();
723
724
60663
  return received->Deserialize(env(), context, port_list);
725
}
726
727
37885
void MessagePort::OnMessage(MessageProcessingMode mode) {
728
37885
  Debug(this, "Running MessagePort::OnMessage()");
729
37885
  HandleScope handle_scope(env()->isolate());
730
  Local<Context> context =
731
75770
      object(env()->isolate())->GetCreationContext().ToLocalChecked();
732
733
  size_t processing_limit;
734
37885
  if (mode == MessageProcessingMode::kNormalOperation) {
735
37386
    Mutex::ScopedLock(data_->mutex_);
736
37386
    processing_limit = std::max(data_->incoming_messages_.size(),
737
74772
                                static_cast<size_t>(1000));
738
  } else {
739
499
    processing_limit = std::numeric_limits<size_t>::max();
740
  }
741
742
  // data_ can only ever be modified by the owner thread, so no need to lock.
743
  // However, the message port may be transferred while it is processing
744
  // messages, so we need to check that this handle still owns its `data_` field
745
  // on every iteration.
746
98393
  while (data_) {
747
98393
    if (processing_limit-- == 0) {
748
      // Prevent event loop starvation by only processing those messages without
749
      // interruption that were already present when the OnMessage() call was
750
      // first triggered, but at least 1000 messages because otherwise the
751
      // overhead of repeatedly triggering the uv_async_t instance becomes
752
      // noticeable, at least on Windows.
753
      // (That might require more investigation by somebody more familiar with
754
      // Windows.)
755
1
      TriggerAsync();
756
132
      return;
757
    }
758
759
98392
    HandleScope handle_scope(env()->isolate());
760
98392
    Context::Scope context_scope(context);
761
98392
    Local<Function> emit_message = PersistentToLocal::Strong(emit_message_fn_);
762
763
    Local<Value> payload;
764
196784
    Local<Value> port_list = Undefined(env()->isolate());
765
    Local<Value> message_error;
766
393568
    Local<Value> argv[3];
767
768
    {
769
      // Catch any exceptions from parsing the message itself (not from
770
      // emitting it) as 'messageeror' events.
771
98392
      TryCatchScope try_catch(env());
772
196784
      if (!ReceiveMessage(context, mode, &port_list).ToLocal(&payload)) {
773

17
        if (try_catch.HasCaught() && !try_catch.HasTerminated())
774
13
          message_error = try_catch.Exception();
775
17
        goto reschedule;
776
      }
777
    }
778
196750
    if (payload == env()->no_message_symbol()) break;
779
780
60625
    if (!env()->can_call_into_js()) {
781
2
      Debug(this, "MessagePort drains queue because !can_call_into_js()");
782
      // In this case there is nothing to do but to drain the current queue.
783
2
      continue;
784
    }
785
786
60623
    argv[0] = payload;
787
60623
    argv[1] = port_list;
788
60623
    argv[2] = env()->message_string();
789
790
121243
    if (MakeCallback(emit_message, arraysize(argv), argv).IsEmpty()) {
791
131
    reschedule:
792
131
      if (!message_error.IsEmpty()) {
793
13
        argv[0] = message_error;
794
13
        argv[1] = Undefined(env()->isolate());
795
13
        argv[2] = env()->messageerror_string();
796
13
        USE(MakeCallback(emit_message, arraysize(argv), argv));
797
      }
798
799
      // Re-schedule OnMessage() execution in case of failure.
800
131
      if (data_)
801
131
        TriggerAsync();
802
131
      return;
803
    }
804
  }
805
}
806
807
35038
void MessagePort::OnClose() {
808
35038
  Debug(this, "MessagePort::OnClose()");
809
35038
  if (data_) {
810
    // Detach() returns move(data_).
811
23964
    Detach()->Disentangle();
812
  }
813
35038
}
814
815
46664
std::unique_ptr<MessagePortData> MessagePort::Detach() {
816
46664
  CHECK(data_);
817
93328
  Mutex::ScopedLock lock(data_->mutex_);
818
46664
  data_->owner_ = nullptr;
819
46664
  return std::move(data_);
820
}
821
822
22125
BaseObject::TransferMode MessagePort::GetTransferMode() const {
823
22125
  if (IsDetached())
824
1
    return BaseObject::TransferMode::kUntransferable;
825
22124
  return BaseObject::TransferMode::kTransferable;
826
}
827
828
11069
std::unique_ptr<TransferData> MessagePort::TransferForMessaging() {
829
22138
  Close();
830
11069
  return Detach();
831
}
832
833
10824
BaseObjectPtr<BaseObject> MessagePortData::Deserialize(
834
    Environment* env,
835
    Local<Context> context,
836
    std::unique_ptr<TransferData> self) {
837
43296
  return BaseObjectPtr<MessagePort> { MessagePort::New(
838
      env, context,
839
32472
      static_unique_pointer_cast<MessagePortData>(std::move(self))) };
840
}
841
842
61788
Maybe<bool> MessagePort::PostMessage(Environment* env,
843
                                     Local<Context> context,
844
                                     Local<Value> message_v,
845
                                     const TransferList& transfer_v) {
846
61788
  Isolate* isolate = env->isolate();
847
61788
  Local<Object> obj = object(isolate);
848
849
123576
  std::shared_ptr<Message> msg = std::make_shared<Message>();
850
851
  // Per spec, we need to both check if transfer list has the source port, and
852
  // serialize the input message, even if the MessagePort is closed or detached.
853
854
  Maybe<bool> serialization_maybe =
855
61788
      msg->Serialize(env, context, message_v, transfer_v, obj);
856
61788
  if (data_ == nullptr) {
857
    return serialization_maybe;
858
  }
859
61788
  if (serialization_maybe.IsNothing()) {
860
28
    return Nothing<bool>();
861
  }
862
863
123520
  std::string error;
864
61760
  Maybe<bool> res = data_->Dispatch(msg, &error);
865
61760
  if (res.IsNothing())
866
    return res;
867
868
61760
  if (!error.empty())
869
2
    ProcessEmitWarning(env, error.c_str());
870
871
61760
  return res;
872
}
873
874
61760
Maybe<bool> MessagePortData::Dispatch(
875
    std::shared_ptr<Message> message,
876
    std::string* error) {
877
61760
  if (!group_) {
878
    if (error != nullptr)
879
      *error = "MessagePortData is not entangled.";
880
    return Nothing<bool>();
881
  }
882
61760
  return group_->Dispatch(this, message, error);
883
}
884
885
11128
static Maybe<bool> ReadIterable(Environment* env,
886
                                Local<Context> context,
887
                                // NOLINTNEXTLINE(runtime/references)
888
                                TransferList& transfer_list,
889
                                Local<Value> object) {
890
11128
  if (!object->IsObject()) return Just(false);
891
892
11125
  if (object->IsArray()) {
893
11105
    Local<Array> arr = object.As<Array>();
894
11105
    size_t length = arr->Length();
895
11105
    transfer_list.AllocateSufficientStorage(length);
896
22223
    for (size_t i = 0; i < length; i++) {
897
22236
      if (!arr->Get(context, i).ToLocal(&transfer_list[i]))
898
        return Nothing<bool>();
899
    }
900
11105
    return Just(true);
901
  }
902
903
20
  Isolate* isolate = env->isolate();
904
  Local<Value> iterator_method;
905
40
  if (!object.As<Object>()->Get(context, Symbol::GetIterator(isolate))
906
20
      .ToLocal(&iterator_method)) return Nothing<bool>();
907
20
  if (!iterator_method->IsFunction()) return Just(false);
908
909
  Local<Value> iterator;
910
6
  if (!iterator_method.As<Function>()->Call(context, object, 0, nullptr)
911
6
      .ToLocal(&iterator)) return Nothing<bool>();
912
6
  if (!iterator->IsObject()) return Just(false);
913
914
  Local<Value> next;
915
18
  if (!iterator.As<Object>()->Get(context, env->next_string()).ToLocal(&next))
916
    return Nothing<bool>();
917
6
  if (!next->IsFunction()) return Just(false);
918
919
6
  std::vector<Local<Value>> entries;
920
5
  while (env->can_call_into_js()) {
921
    Local<Value> result;
922
5
    if (!next.As<Function>()->Call(context, iterator, 0, nullptr)
923
5
        .ToLocal(&result)) return Nothing<bool>();
924
4
    if (!result->IsObject()) return Just(false);
925
926
    Local<Value> done;
927
12
    if (!result.As<Object>()->Get(context, env->done_string()).ToLocal(&done))
928
      return Nothing<bool>();
929
4
    if (done->BooleanValue(isolate)) break;
930
931
    Local<Value> val;
932
6
    if (!result.As<Object>()->Get(context, env->value_string()).ToLocal(&val))
933
      return Nothing<bool>();
934
2
    entries.push_back(val);
935
  }
936
937
2
  transfer_list.AllocateSufficientStorage(entries.size());
938
2
  std::copy(entries.begin(), entries.end(), &transfer_list[0]);
939
2
  return Just(true);
940
}
941
942
61807
void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
943
61807
  Environment* env = Environment::GetCurrent(args);
944
61807
  Local<Object> obj = args.This();
945
123614
  Local<Context> context = obj->GetCreationContext().ToLocalChecked();
946
947
61807
  if (args.Length() == 0) {
948
    return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
949
19
                                       "MessagePort.postMessage");
950
  }
951
952

134735
  if (!args[1]->IsNullOrUndefined() && !args[1]->IsObject()) {
953
    // Browsers ignore null or undefined, and otherwise accept an array or an
954
    // options object.
955
4
    return THROW_ERR_INVALID_ARG_TYPE(env,
956
4
        "Optional transferList argument must be an iterable");
957
  }
958
959
61803
  TransferList transfer_list;
960
61803
  if (args[1]->IsObject()) {
961
    bool was_iterable;
962
22234
    if (!ReadIterable(env, context, transfer_list, args[1]).To(&was_iterable))
963
8
      return;
964
11117
    if (!was_iterable) {
965
      Local<Value> transfer_option;
966
39
      if (!args[1].As<Object>()->Get(context, env->transfer_string())
967
21
          .ToLocal(&transfer_option)) return;
968
26
      if (!transfer_option->IsUndefined()) {
969
11
        if (!ReadIterable(env, context, transfer_list, transfer_option)
970
11
            .To(&was_iterable)) return;
971
10
        if (!was_iterable) {
972
7
          return THROW_ERR_INVALID_ARG_TYPE(env,
973
7
              "Optional options.transfer argument must be an iterable");
974
        }
975
      }
976
    }
977
  }
978
979
61795
  MessagePort* port = Unwrap<MessagePort>(args.This());
980
  // Even if the backing MessagePort object has already been deleted, we still
981
  // want to serialize the message to ensure spec-compliant behavior w.r.t.
982
  // transfers.
983

61795
  if (port == nullptr || port->IsHandleClosing()) {
984
14
    Message msg;
985
7
    USE(msg.Serialize(env, context, args[0], transfer_list, obj));
986
7
    return;
987
  }
988
989
61788
  Maybe<bool> res = port->PostMessage(env, context, args[0], transfer_list);
990
61788
  if (res.IsJust())
991
185280
    args.GetReturnValue().Set(res.FromJust());
992
}
993
994
25734
void MessagePort::Start() {
995
25734
  Debug(this, "Start receiving messages");
996
25734
  receiving_messages_ = true;
997
51468
  Mutex::ScopedLock lock(data_->mutex_);
998
25734
  if (!data_->incoming_messages_.empty())
999
11472
    TriggerAsync();
1000
25734
}
1001
1002
20346
void MessagePort::Stop() {
1003
20346
  Debug(this, "Stop receiving messages");
1004
20346
  receiving_messages_ = false;
1005
20346
}
1006
1007
25735
void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
1008
  MessagePort* port;
1009
25736
  ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
1010
25735
  if (!port->data_) {
1011
1
    return;
1012
  }
1013
25734
  port->Start();
1014
}
1015
1016
20912
void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
1017
  MessagePort* port;
1018
20912
  CHECK(args[0]->IsObject());
1019
41826
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
1020
20348
  if (!port->data_) {
1021
2
    return;
1022
  }
1023
20346
  port->Stop();
1024
}
1025
1026
26
void MessagePort::CheckType(const FunctionCallbackInfo<Value>& args) {
1027
26
  Environment* env = Environment::GetCurrent(args);
1028
26
  args.GetReturnValue().Set(
1029
78
      GetMessagePortConstructorTemplate(env)->HasInstance(args[0]));
1030
26
}
1031
1032
2004
void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
1033
  MessagePort* port;
1034
4008
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
1035
499
  port->OnMessage(MessageProcessingMode::kForceReadMessages);
1036
}
1037
1038
45
void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) {
1039
45
  Environment* env = Environment::GetCurrent(args);
1040
87
  if (!args[0]->IsObject() ||
1041

171
      !env->message_port_constructor_template()->HasInstance(args[0])) {
1042
5
    return THROW_ERR_INVALID_ARG_TYPE(env,
1043
5
        "The \"port\" argument must be a MessagePort instance");
1044
  }
1045
80
  MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1046
40
  if (port == nullptr) {
1047
    // Return 'no messages' for a closed port.
1048
    args.GetReturnValue().Set(
1049
        Environment::GetCurrent(args)->no_message_symbol());
1050
    return;
1051
  }
1052
1053
  MaybeLocal<Value> payload = port->ReceiveMessage(
1054
120
      port->object()->GetCreationContext().ToLocalChecked(),
1055
40
      MessageProcessingMode::kForceReadMessages);
1056
40
  if (!payload.IsEmpty())
1057
80
    args.GetReturnValue().Set(payload.ToLocalChecked());
1058
}
1059
1060
6
void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
1061
6
  Environment* env = Environment::GetCurrent(args);
1062
12
  if (!args[0]->IsObject() ||
1063

24
      !env->message_port_constructor_template()->HasInstance(args[0])) {
1064
    return THROW_ERR_INVALID_ARG_TYPE(env,
1065
1
        "The \"port\" argument must be a MessagePort instance");
1066
  }
1067
12
  MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1068

6
  if (port == nullptr || port->IsHandleClosing()) {
1069
1
    Isolate* isolate = env->isolate();
1070
1
    THROW_ERR_CLOSED_MESSAGE_PORT(isolate);
1071
1
    return;
1072
  }
1073
1074
5
  Local<Value> context_arg = args[1];
1075
  ContextifyContext* context_wrapper;
1076

10
  if (!context_arg->IsObject() ||
1077
5
      (context_wrapper = ContextifyContext::ContextFromContextifiedSandbox(
1078
10
          env, context_arg.As<Object>())) == nullptr) {
1079
    return THROW_ERR_INVALID_ARG_TYPE(env, "Invalid context argument");
1080
  }
1081
1082
5
  std::unique_ptr<MessagePortData> data;
1083
5
  if (!port->IsDetached())
1084
5
    data = port->Detach();
1085
1086
5
  Context::Scope context_scope(context_wrapper->context());
1087
  MessagePort* target =
1088
5
      MessagePort::New(env, context_wrapper->context(), std::move(data));
1089
5
  if (target != nullptr)
1090
10
    args.GetReturnValue().Set(target->object());
1091
}
1092
1093
11165
void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
1094
11165
  MessagePortData::Entangle(a->data_.get(), b->data_.get());
1095
11165
}
1096
1097
1032
void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
1098
1032
  MessagePortData::Entangle(a->data_.get(), b);
1099
1032
}
1100
1101
4
void MessagePort::MemoryInfo(MemoryTracker* tracker) const {
1102
4
  tracker->TrackField("data", data_);
1103
4
  tracker->TrackField("emit_message_fn", emit_message_fn_);
1104
4
}
1105
1106
36800
Local<FunctionTemplate> GetMessagePortConstructorTemplate(Environment* env) {
1107
  // Factor generating the MessagePort JS constructor into its own piece
1108
  // of code, because it is needed early on in the child environment setup.
1109
36800
  Local<FunctionTemplate> templ = env->message_port_constructor_template();
1110
36800
  if (!templ.IsEmpty())
1111
35944
    return templ;
1112
1113
  {
1114
856
    Local<FunctionTemplate> m = env->NewFunctionTemplate(MessagePort::New);
1115
856
    m->SetClassName(env->message_port_constructor_string());
1116
1712
    m->InstanceTemplate()->SetInternalFieldCount(
1117
        MessagePort::kInternalFieldCount);
1118
856
    m->Inherit(HandleWrap::GetConstructorTemplate(env));
1119
1120
856
    env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage);
1121
856
    env->SetProtoMethod(m, "start", MessagePort::Start);
1122
1123
856
    env->set_message_port_constructor_template(m);
1124
  }
1125
1126
856
  return GetMessagePortConstructorTemplate(env);
1127
}
1128
1129
34130
JSTransferable::JSTransferable(Environment* env, Local<Object> obj)
1130
34130
    : BaseObject(env, obj) {
1131
34130
  MakeWeak();
1132
34130
}
1133
1134
34130
void JSTransferable::New(const FunctionCallbackInfo<Value>& args) {
1135
34130
  CHECK(args.IsConstructCall());
1136
68260
  new JSTransferable(Environment::GetCurrent(args), args.This());
1137
34130
}
1138
1139
86
JSTransferable::TransferMode JSTransferable::GetTransferMode() const {
1140
  // Implement `kClone in this ? kCloneable : kTransferable`.
1141
172
  HandleScope handle_scope(env()->isolate());
1142
172
  errors::TryCatchScope ignore_exceptions(env());
1143
1144
  bool has_clone;
1145
86
  if (!object()->Has(env()->context(),
1146
258
                     env()->messaging_clone_symbol()).To(&has_clone)) {
1147
    return TransferMode::kUntransferable;
1148
  }
1149
1150
86
  return has_clone ? TransferMode::kCloneable : TransferMode::kTransferable;
1151
}
1152
1153
33
std::unique_ptr<TransferData> JSTransferable::TransferForMessaging() {
1154
33
  return TransferOrClone(TransferMode::kTransferable);
1155
}
1156
1157
24
std::unique_ptr<TransferData> JSTransferable::CloneForMessaging() const {
1158
24
  return TransferOrClone(TransferMode::kCloneable);
1159
}
1160
1161
57
std::unique_ptr<TransferData> JSTransferable::TransferOrClone(
1162
    TransferMode mode) const {
1163
  // Call `this[symbol]()` where `symbol` is `kClone` or `kTransfer`,
1164
  // which should return an object with `data` and `deserializeInfo` properties;
1165
  // `data` is written to the serializer later, and `deserializeInfo` is stored
1166
  // on the `TransferData` instance as a string.
1167
114
  HandleScope handle_scope(env()->isolate());
1168
57
  Local<Context> context = env()->isolate()->GetCurrentContext();
1169
  Local<Symbol> method_name = mode == TransferMode::kCloneable ?
1170
57
      env()->messaging_clone_symbol() : env()->messaging_transfer_symbol();
1171
1172
  Local<Value> method;
1173
171
  if (!object()->Get(context, method_name).ToLocal(&method)) {
1174
    return {};
1175
  }
1176
57
  if (method->IsFunction()) {
1177
    Local<Value> result_v;
1178
53
    if (!method.As<Function>()->Call(
1179
159
            context, object(), 0, nullptr).ToLocal(&result_v)) {
1180
53
      return {};
1181
    }
1182
1183
49
    if (result_v->IsObject()) {
1184
49
      Local<Object> result = result_v.As<Object>();
1185
      Local<Value> data;
1186
      Local<Value> deserialize_info;
1187
196
      if (!result->Get(context, env()->data_string()).ToLocal(&data) ||
1188
147
          !result->Get(context, env()->deserialize_info_string())
1189
49
              .ToLocal(&deserialize_info)) {
1190
        return {};
1191
      }
1192
98
      Utf8Value deserialize_info_str(env()->isolate(), deserialize_info);
1193
49
      if (*deserialize_info_str == nullptr) return {};
1194
49
      return std::make_unique<Data>(
1195
196
          *deserialize_info_str, Global<Value>(env()->isolate(), data));
1196
    }
1197
  }
1198
1199
4
  if (mode == TransferMode::kTransferable)
1200
    return TransferOrClone(TransferMode::kCloneable);
1201
  else
1202
4
    return {};
1203
}
1204
1205
Maybe<BaseObjectList>
1206
33
JSTransferable::NestedTransferables() const {
1207
  // Call `this[kTransferList]()` and return the resulting list of BaseObjects.
1208
66
  HandleScope handle_scope(env()->isolate());
1209
33
  Local<Context> context = env()->isolate()->GetCurrentContext();
1210
33
  Local<Symbol> method_name = env()->messaging_transfer_list_symbol();
1211
1212
  Local<Value> method;
1213
99
  if (!object()->Get(context, method_name).ToLocal(&method)) {
1214
    return Nothing<BaseObjectList>();
1215
  }
1216
33
  if (!method->IsFunction()) return Just(BaseObjectList {});
1217
1218
  Local<Value> list_v;
1219
33
  if (!method.As<Function>()->Call(
1220
99
          context, object(), 0, nullptr).ToLocal(&list_v)) {
1221
    return Nothing<BaseObjectList>();
1222
  }
1223
33
  if (!list_v->IsArray()) return Just(BaseObjectList {});
1224
33
  Local<Array> list = list_v.As<Array>();
1225
1226
66
  BaseObjectList ret;
1227
134
  for (size_t i = 0; i < list->Length(); i++) {
1228
    Local<Value> value;
1229
68
    if (!list->Get(context, i).ToLocal(&value))
1230
      return Nothing<BaseObjectList>();
1231
68
    if (env()->base_object_ctor_template()->HasInstance(value))
1232
34
      ret.emplace_back(Unwrap<BaseObject>(value));
1233
  }
1234
33
  return Just(ret);
1235
}
1236
1237
32
Maybe<bool> JSTransferable::FinalizeTransferRead(
1238
    Local<Context> context, ValueDeserializer* deserializer) {
1239
  // Call `this[kDeserialize](data)` where `data` comes from the return value
1240
  // of `this[kTransfer]()` or `this[kClone]()`.
1241
64
  HandleScope handle_scope(env()->isolate());
1242
  Local<Value> data;
1243
64
  if (!deserializer->ReadValue(context).ToLocal(&data)) return Nothing<bool>();
1244
1245
32
  Local<Symbol> method_name = env()->messaging_deserialize_symbol();
1246
  Local<Value> method;
1247
96
  if (!object()->Get(context, method_name).ToLocal(&method)) {
1248
    return Nothing<bool>();
1249
  }
1250
32
  if (!method->IsFunction()) return Just(true);
1251
1252
96
  if (method.As<Function>()->Call(context, object(), 1, &data).IsEmpty()) {
1253
    return Nothing<bool>();
1254
  }
1255
32
  return Just(true);
1256
}
1257
1258
49
JSTransferable::Data::Data(std::string&& deserialize_info,
1259
49
                           v8::Global<v8::Value>&& data)
1260
49
    : deserialize_info_(std::move(deserialize_info)),
1261
49
      data_(std::move(data)) {}
1262
1263
42
BaseObjectPtr<BaseObject> JSTransferable::Data::Deserialize(
1264
    Environment* env,
1265
    Local<Context> context,
1266
    std::unique_ptr<TransferData> self) {
1267
  // Create the JS wrapper object that will later be filled with data passed to
1268
  // the `[kDeserialize]()` method on it. This split is necessary, because here
1269
  // we need to create an object with the right prototype and internal fields,
1270
  // but the actual JS data stored in the serialized data can only be read at
1271
  // the end of the stream, after the main message has been read.
1272
1273
84
  if (context != env->context()) {
1274
1
    THROW_ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE(env);
1275
1
    return {};
1276
  }
1277
82
  HandleScope handle_scope(env->isolate());
1278
  Local<Value> info;
1279
82
  if (!ToV8Value(context, deserialize_info_).ToLocal(&info)) return {};
1280
1281
  Local<Value> ret;
1282
82
  CHECK(!env->messaging_deserialize_create_object().IsEmpty());
1283
41
  if (!env->messaging_deserialize_create_object()->Call(
1284
155
          context, Null(env->isolate()), 1, &info).ToLocal(&ret) ||
1285

105
      !env->base_object_ctor_template()->HasInstance(ret)) {
1286
9
    return {};
1287
  }
1288
1289
32
  return BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(ret) };
1290
}
1291
1292
49
Maybe<bool> JSTransferable::Data::FinalizeTransferWrite(
1293
    Local<Context> context, ValueSerializer* serializer) {
1294
49
  HandleScope handle_scope(context->GetIsolate());
1295
49
  auto ret = serializer->WriteValue(context, PersistentToLocal::Strong(data_));
1296
49
  data_.Reset();
1297
49
  return ret;
1298
}
1299
1300
74
std::shared_ptr<SiblingGroup> SiblingGroup::Get(const std::string& name) {
1301
148
  Mutex::ScopedLock lock(SiblingGroup::groups_mutex_);
1302
74
  std::shared_ptr<SiblingGroup> group;
1303
74
  auto i = groups_.find(name);
1304

74
  if (i == groups_.end() || i->second.expired()) {
1305
31
    group = std::make_shared<SiblingGroup>(name);
1306
31
    groups_[name] = group;
1307
  } else {
1308
43
    group = i->second.lock();
1309
  }
1310
74
  return group;
1311
}
1312
1313
30
void SiblingGroup::CheckSiblingGroup(const std::string& name) {
1314
60
  Mutex::ScopedLock lock(SiblingGroup::groups_mutex_);
1315
30
  auto i = groups_.find(name);
1316

30
  if (i != groups_.end() && i->second.expired())
1317
30
    groups_.erase(name);
1318
30
}
1319
1320
31
SiblingGroup::SiblingGroup(const std::string& name)
1321
31
    : name_(name) { }
1322
1323
12204
SiblingGroup::~SiblingGroup() {
1324
  // If this is a named group, check to see if we can remove the group
1325
12204
  if (!name_.empty())
1326
30
    CheckSiblingGroup(name_);
1327
12204
}
1328
1329
61760
Maybe<bool> SiblingGroup::Dispatch(
1330
    MessagePortData* source,
1331
    std::shared_ptr<Message> message,
1332
    std::string* error) {
1333
1334
123520
  RwLock::ScopedReadLock lock(group_mutex_);
1335
1336
  // The source MessagePortData is not part of this group.
1337
61760
  if (ports_.find(source) == ports_.end()) {
1338
    if (error != nullptr)
1339
      *error = "Source MessagePort is not entangled with this group.";
1340
    return Nothing<bool>();
1341
  }
1342
1343
  // There are no destination ports.
1344
61760
  if (size() <= 1)
1345
146
    return Just(false);
1346
1347
  // Transferables cannot be used when there is more
1348
  // than a single destination.
1349

61614
  if (size() > 2 && message->has_transferables()) {
1350
    if (error != nullptr)
1351
      *error = "Transferables cannot be used with multiple destinations.";
1352
    return Nothing<bool>();
1353
  }
1354
1355
184860
  for (MessagePortData* port : ports_) {
1356
123248
    if (port == source)
1357
61612
      continue;
1358
    // This loop should only be entered if there's only a single destination
1359
72772
    for (const auto& transferable : message->transferables()) {
1360
11138
      if (port == transferable.get()) {
1361
2
        if (error != nullptr) {
1362
          *error = "The target port was posted to itself, and the "
1363
2
                   "communication channel was lost";
1364
        }
1365
2
        return Just(true);
1366
      }
1367
    }
1368
61634
    port->AddToIncomingQueue(message);
1369
  }
1370
1371
61612
  return Just(true);
1372
}
1373
1374
74
void SiblingGroup::Entangle(MessagePortData* port) {
1375
74
  Entangle({ port });
1376
74
}
1377
1378
12271
void SiblingGroup::Entangle(std::initializer_list<MessagePortData*> ports) {
1379
24542
  RwLock::ScopedWriteLock lock(group_mutex_);
1380
36739
  for (MessagePortData* data : ports) {
1381
24468
    ports_.insert(data);
1382
24468
    CHECK(!data->group_);
1383
24468
    data->group_ = shared_from_this();
1384
  }
1385
12271
}
1386
1387
24444
void SiblingGroup::Disentangle(MessagePortData* data) {
1388
48888
  auto self = shared_from_this();  // Keep alive until end of function.
1389
48888
  RwLock::ScopedWriteLock lock(group_mutex_);
1390
24444
  ports_.erase(data);
1391
24444
  data->group_.reset();
1392
1393
24444
  data->AddToIncomingQueue(std::make_shared<Message>());
1394
  // If this is an anonymous group and there's another port, close it.
1395

24444
  if (size() == 1 && name_.empty())
1396
12198
    (*(ports_.begin()))->AddToIncomingQueue(std::make_shared<Message>());
1397
24444
}
1398
1399
SiblingGroup::Map SiblingGroup::groups_;
1400
Mutex SiblingGroup::groups_mutex_;
1401
1402
namespace {
1403
1404
856
static void SetDeserializerCreateObjectFunction(
1405
    const FunctionCallbackInfo<Value>& args) {
1406
856
  Environment* env = Environment::GetCurrent(args);
1407
856
  CHECK(args[0]->IsFunction());
1408
1712
  env->set_messaging_deserialize_create_object(args[0].As<Function>());
1409
856
}
1410
1411
11166
static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
1412
11166
  Environment* env = Environment::GetCurrent(args);
1413
11166
  if (!args.IsConstructCall()) {
1414
1
    THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
1415
1
    return;
1416
  }
1417
1418
22330
  Local<Context> context = args.This()->GetCreationContext().ToLocalChecked();
1419
11165
  Context::Scope context_scope(context);
1420
1421
11165
  MessagePort* port1 = MessagePort::New(env, context);
1422
11165
  if (port1 == nullptr) return;
1423
11165
  MessagePort* port2 = MessagePort::New(env, context);
1424
11165
  if (port2 == nullptr) {
1425
    port1->Close();
1426
    return;
1427
  }
1428
1429
11165
  MessagePort::Entangle(port1, port2);
1430
1431
44660
  args.This()->Set(context, env->port1_string(), port1->object())
1432
      .Check();
1433
44660
  args.This()->Set(context, env->port2_string(), port2->object())
1434
      .Check();
1435
}
1436
1437
74
static void BroadcastChannel(const FunctionCallbackInfo<Value>& args) {
1438
148
  CHECK(args[0]->IsString());
1439
74
  Environment* env = Environment::GetCurrent(args);
1440
148
  Context::Scope context_scope(env->context());
1441
148
  Utf8Value name(env->isolate(), args[0]);
1442
  MessagePort* port =
1443
74
      MessagePort::New(env, env->context(), {}, SiblingGroup::Get(*name));
1444
74
  if (port != nullptr) {
1445
148
    args.GetReturnValue().Set(port->object());
1446
  }
1447
74
}
1448
1449
856
static void InitMessaging(Local<Object> target,
1450
                          Local<Value> unused,
1451
                          Local<Context> context,
1452
                          void* priv) {
1453
856
  Environment* env = Environment::GetCurrent(context);
1454
1455
  {
1456
856
    env->SetConstructorFunction(
1457
        target,
1458
        "MessageChannel",
1459
        env->NewFunctionTemplate(MessageChannel));
1460
  }
1461
1462
  {
1463
856
    Local<FunctionTemplate> t = env->NewFunctionTemplate(JSTransferable::New);
1464
856
    t->Inherit(BaseObject::GetConstructorTemplate(env));
1465
1712
    t->InstanceTemplate()->SetInternalFieldCount(
1466
        JSTransferable::kInternalFieldCount);
1467
856
    env->SetConstructorFunction(target, "JSTransferable", t);
1468
  }
1469
1470
856
  env->SetConstructorFunction(
1471
      target,
1472
      env->message_port_constructor_string(),
1473
      GetMessagePortConstructorTemplate(env));
1474
1475
  // These are not methods on the MessagePort prototype, because
1476
  // the browser equivalents do not provide them.
1477
856
  env->SetMethod(target, "stopMessagePort", MessagePort::Stop);
1478
856
  env->SetMethod(target, "checkMessagePort", MessagePort::CheckType);
1479
856
  env->SetMethod(target, "drainMessagePort", MessagePort::Drain);
1480
856
  env->SetMethod(target, "receiveMessageOnPort", MessagePort::ReceiveMessage);
1481
856
  env->SetMethod(target, "moveMessagePortToContext",
1482
                 MessagePort::MoveToContext);
1483
856
  env->SetMethod(target, "setDeserializerCreateObjectFunction",
1484
                 SetDeserializerCreateObjectFunction);
1485
856
  env->SetMethod(target, "broadcastChannel", BroadcastChannel);
1486
1487
  {
1488
1712
    Local<Function> domexception = GetDOMException(context).ToLocalChecked();
1489
    target
1490
856
        ->Set(context,
1491
              FIXED_ONE_BYTE_STRING(env->isolate(), "DOMException"),
1492
1712
              domexception)
1493
        .Check();
1494
  }
1495
856
}
1496
1497
5184
static void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
1498
5184
  registry->Register(MessageChannel);
1499
5184
  registry->Register(BroadcastChannel);
1500
5184
  registry->Register(JSTransferable::New);
1501
5184
  registry->Register(MessagePort::New);
1502
5184
  registry->Register(MessagePort::PostMessage);
1503
5184
  registry->Register(MessagePort::Start);
1504
5184
  registry->Register(MessagePort::Stop);
1505
5184
  registry->Register(MessagePort::CheckType);
1506
5184
  registry->Register(MessagePort::Drain);
1507
5184
  registry->Register(MessagePort::ReceiveMessage);
1508
5184
  registry->Register(MessagePort::MoveToContext);
1509
5184
  registry->Register(SetDeserializerCreateObjectFunction);
1510
5184
}
1511
1512
}  // anonymous namespace
1513
1514
}  // namespace worker
1515
}  // namespace node
1516
1517
5252
NODE_MODULE_CONTEXT_AWARE_INTERNAL(messaging, node::worker::InitMessaging)
1518
5184
NODE_MODULE_EXTERNAL_REFERENCE(messaging,
1519
                               node::worker::RegisterExternalReferences)