GCC Code Coverage Report
Directory: ./ Exec Total Coverage
File: node_messaging.cc Lines: 746 805 92.7 %
Date: 2022-09-07 04:19:57 Branches: 368 506 72.7 %

Line Branch Exec Source
1
#include "node_messaging.h"
2
3
#include "async_wrap-inl.h"
4
#include "debug_utils-inl.h"
5
#include "memory_tracker-inl.h"
6
#include "node_buffer.h"
7
#include "node_contextify.h"
8
#include "node_errors.h"
9
#include "node_external_reference.h"
10
#include "node_process-inl.h"
11
#include "util-inl.h"
12
13
using node::contextify::ContextifyContext;
14
using node::errors::TryCatchScope;
15
using v8::Array;
16
using v8::ArrayBuffer;
17
using v8::BackingStore;
18
using v8::CompiledWasmModule;
19
using v8::Context;
20
using v8::EscapableHandleScope;
21
using v8::Function;
22
using v8::FunctionCallbackInfo;
23
using v8::FunctionTemplate;
24
using v8::Global;
25
using v8::HandleScope;
26
using v8::Isolate;
27
using v8::Just;
28
using v8::Local;
29
using v8::Maybe;
30
using v8::MaybeLocal;
31
using v8::Nothing;
32
using v8::Object;
33
using v8::SharedArrayBuffer;
34
using v8::String;
35
using v8::Symbol;
36
using v8::Value;
37
using v8::ValueDeserializer;
38
using v8::ValueSerializer;
39
using v8::WasmModuleObject;
40
41
namespace node {
42
43
using BaseObjectList = std::vector<BaseObjectPtr<BaseObject>>;
44
45
1
BaseObject::TransferMode BaseObject::GetTransferMode() const {
46
1
  return BaseObject::TransferMode::kUntransferable;
47
}
48
49
std::unique_ptr<worker::TransferData> BaseObject::TransferForMessaging() {
50
  return CloneForMessaging();
51
}
52
53
std::unique_ptr<worker::TransferData> BaseObject::CloneForMessaging() const {
54
  return {};
55
}
56
57
11011
Maybe<BaseObjectList> BaseObject::NestedTransferables() const {
58
11011
  return Just(BaseObjectList {});
59
}
60
61
10763
Maybe<bool> BaseObject::FinalizeTransferRead(
62
    Local<Context> context, ValueDeserializer* deserializer) {
63
10763
  return Just(true);
64
}
65
66
namespace worker {
67
68
11021
Maybe<bool> TransferData::FinalizeTransferWrite(
69
    Local<Context> context, ValueSerializer* serializer) {
70
11021
  return Just(true);
71
}
72
73
112822
Message::Message(MallocedBuffer<char>&& buffer)
74
112822
    : main_message_buf_(std::move(buffer)) {}
75
76
172789
bool Message::IsCloseMessage() const {
77
172789
  return main_message_buf_.data == nullptr;
78
}
79
80
namespace {
81
82
// This is used to tell V8 how to read transferred host objects, like other
83
// `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
84
class DeserializerDelegate : public ValueDeserializer::Delegate {
85
 public:
86
75263
  DeserializerDelegate(
87
      Message* m,
88
      Environment* env,
89
      const std::vector<BaseObjectPtr<BaseObject>>& host_objects,
90
      const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
91
      const std::vector<CompiledWasmModule>& wasm_modules)
92
75263
      : host_objects_(host_objects),
93
        shared_array_buffers_(shared_array_buffers),
94
75263
        wasm_modules_(wasm_modules) {}
95
96
10794
  MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
97
    // Identifying the index in the message's BaseObject array is sufficient.
98
    uint32_t id;
99
10794
    if (!deserializer->ReadUint32(&id))
100
      return MaybeLocal<Object>();
101
10794
    CHECK_LT(id, host_objects_.size());
102
21588
    return host_objects_[id]->object(isolate);
103
  }
104
105
746
  MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
106
      Isolate* isolate, uint32_t clone_id) override {
107
746
    CHECK_LT(clone_id, shared_array_buffers_.size());
108
1492
    return shared_array_buffers_[clone_id];
109
  }
110
111
2
  MaybeLocal<WasmModuleObject> GetWasmModuleFromId(
112
      Isolate* isolate, uint32_t transfer_id) override {
113
2
    CHECK_LT(transfer_id, wasm_modules_.size());
114
    return WasmModuleObject::FromCompiledModule(
115
2
        isolate, wasm_modules_[transfer_id]);
116
  }
117
118
  ValueDeserializer* deserializer = nullptr;
119
120
 private:
121
  const std::vector<BaseObjectPtr<BaseObject>>& host_objects_;
122
  const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
123
  const std::vector<CompiledWasmModule>& wasm_modules_;
124
};
125
126
}  // anonymous namespace
127
128
75276
MaybeLocal<Value> Message::Deserialize(Environment* env,
129
                                       Local<Context> context,
130
                                       Local<Value>* port_list) {
131
75276
  Context::Scope context_scope(context);
132
133
75276
  CHECK(!IsCloseMessage());
134

75276
  if (port_list != nullptr && !transferables_.empty()) {
135
    // Need to create this outside of the EscapableHandleScope, but inside
136
    // the Context::Scope.
137
21542
    *port_list = Array::New(env->isolate());
138
  }
139
140
75276
  EscapableHandleScope handle_scope(env->isolate());
141
142
  // Create all necessary objects for transferables, e.g. MessagePort handles.
143
150552
  std::vector<BaseObjectPtr<BaseObject>> host_objects(transferables_.size());
144
75276
  auto cleanup = OnScopeLeave([&]() {
145
75292
    for (BaseObjectPtr<BaseObject> object : host_objects) {
146
16
      if (!object) continue;
147
148
      // If the function did not finish successfully, host_objects will contain
149
      // a list of objects that will never be passed to JS. Therefore, we
150
      // destroy them here.
151
      object->Detach();
152
    }
153
150552
  });
154
155
86071
  for (uint32_t i = 0; i < transferables_.size(); ++i) {
156
10808
    HandleScope handle_scope(env->isolate());
157
10808
    TransferData* data = transferables_[i].get();
158
21616
    host_objects[i] = data->Deserialize(
159
21616
        env, context, std::move(transferables_[i]));
160
10808
    if (!host_objects[i]) return {};
161
10795
    if (port_list != nullptr) {
162
      // If we gather a list of all message ports, and this transferred object
163
      // is a message port, add it to that list. This is a bit of an odd case
164
      // of special handling for MessagePorts (as opposed to applying to all
165
      // transferables), but it's required for spec compliance.
166
      DCHECK((*port_list)->IsArray());
167
10795
      Local<Array> port_list_array = port_list->As<Array>();
168
10795
      Local<Object> obj = host_objects[i]->object();
169
21590
      if (env->message_port_constructor_template()->HasInstance(obj)) {
170
10747
        if (port_list_array->Set(context,
171
                                 port_list_array->Length(),
172
21494
                                 obj).IsNothing()) {
173
          return {};
174
        }
175
      }
176
    }
177
  }
178
75263
  transferables_.clear();
179
180
150526
  std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
181
  // Attach all transferred SharedArrayBuffers to their new Isolate.
182
76009
  for (uint32_t i = 0; i < shared_array_buffers_.size(); ++i) {
183
    Local<SharedArrayBuffer> sab =
184
746
        SharedArrayBuffer::New(env->isolate(), shared_array_buffers_[i]);
185
746
    shared_array_buffers.push_back(sab);
186
  }
187
188
  DeserializerDelegate delegate(
189
150526
      this, env, host_objects, shared_array_buffers, wasm_modules_);
190
  ValueDeserializer deserializer(
191
      env->isolate(),
192
75263
      reinterpret_cast<const uint8_t*>(main_message_buf_.data),
193
      main_message_buf_.size,
194
150526
      &delegate);
195
75263
  delegate.deserializer = &deserializer;
196
197
  // Attach all transferred ArrayBuffers to their new Isolate.
198
75272
  for (uint32_t i = 0; i < array_buffers_.size(); ++i) {
199
    Local<ArrayBuffer> ab =
200
9
        ArrayBuffer::New(env->isolate(), std::move(array_buffers_[i]));
201
9
    deserializer.TransferArrayBuffer(i, ab);
202
  }
203
204
150526
  if (deserializer.ReadHeader(context).IsNothing())
205
    return {};
206
  Local<Value> return_value;
207
150526
  if (!deserializer.ReadValue(context).ToLocal(&return_value))
208
    return {};
209
210
86058
  for (BaseObjectPtr<BaseObject> base_object : host_objects) {
211
21590
    if (base_object->FinalizeTransferRead(context, &deserializer).IsNothing())
212
      return {};
213
  }
214
215
75263
  host_objects.clear();
216
75263
  return handle_scope.Escape(return_value);
217
}
218
219
986
void Message::AddSharedArrayBuffer(
220
    std::shared_ptr<BackingStore> backing_store) {
221
986
  shared_array_buffers_.emplace_back(std::move(backing_store));
222
986
}
223
224
11069
void Message::AddTransferable(std::unique_ptr<TransferData>&& data) {
225
11069
  transferables_.emplace_back(std::move(data));
226
11069
}
227
228
2
uint32_t Message::AddWASMModule(CompiledWasmModule&& mod) {
229
2
  wasm_modules_.emplace_back(std::move(mod));
230
2
  return wasm_modules_.size() - 1;
231
}
232
233
namespace {
234
235
34699
MaybeLocal<Function> GetEmitMessageFunction(Local<Context> context) {
236
34699
  Isolate* isolate = context->GetIsolate();
237
  Local<Object> per_context_bindings;
238
  Local<Value> emit_message_val;
239
104097
  if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
240
69398
      !per_context_bindings->Get(context,
241
104097
                                FIXED_ONE_BYTE_STRING(isolate, "emitMessage"))
242
34699
          .ToLocal(&emit_message_val)) {
243
    return MaybeLocal<Function>();
244
  }
245
34699
  CHECK(emit_message_val->IsFunction());
246
34699
  return emit_message_val.As<Function>();
247
}
248
249
802
MaybeLocal<Function> GetDOMException(Local<Context> context) {
250
802
  Isolate* isolate = context->GetIsolate();
251
  Local<Object> per_context_bindings;
252
  Local<Value> domexception_ctor_val;
253
2406
  if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
254
1604
      !per_context_bindings->Get(context,
255
2406
                                FIXED_ONE_BYTE_STRING(isolate, "DOMException"))
256
802
          .ToLocal(&domexception_ctor_val)) {
257
    return MaybeLocal<Function>();
258
  }
259
802
  CHECK(domexception_ctor_val->IsFunction());
260
802
  Local<Function> domexception_ctor = domexception_ctor_val.As<Function>();
261
802
  return domexception_ctor;
262
}
263
264
22
void ThrowDataCloneException(Local<Context> context, Local<String> message) {
265
22
  Isolate* isolate = context->GetIsolate();
266
  Local<Value> argv[] = {message,
267
44
                         FIXED_ONE_BYTE_STRING(isolate, "DataCloneError")};
268
  Local<Value> exception;
269
  Local<Function> domexception_ctor;
270
66
  if (!GetDOMException(context).ToLocal(&domexception_ctor) ||
271
44
      !domexception_ctor->NewInstance(context, arraysize(argv), argv)
272
22
           .ToLocal(&exception)) {
273
    return;
274
  }
275
22
  isolate->ThrowException(exception);
276
}
277
278
// This tells V8 how to serialize objects that it does not understand
279
// (e.g. C++ objects) into the output buffer, in a way that our own
280
// DeserializerDelegate understands how to unpack.
281
class SerializerDelegate : public ValueSerializer::Delegate {
282
 public:
283
76602
  SerializerDelegate(Environment* env, Local<Context> context, Message* m)
284
76602
      : env_(env), context_(context), msg_(m) {}
285
286
12
  void ThrowDataCloneError(Local<String> message) override {
287
12
    ThrowDataCloneException(context_, message);
288
12
  }
289
290
11079
  Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
291
22158
    if (env_->base_object_ctor_template()->HasInstance(object)) {
292
      return WriteHostObject(
293
11079
          BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(object) });
294
    }
295
296
    ThrowDataCloneError(env_->clone_unsupported_type_str());
297
    return Nothing<bool>();
298
  }
299
300
986
  Maybe<uint32_t> GetSharedArrayBufferId(
301
      Isolate* isolate,
302
      Local<SharedArrayBuffer> shared_array_buffer) override {
303
    uint32_t i;
304
1008
    for (i = 0; i < seen_shared_array_buffers_.size(); ++i) {
305
44
      if (PersistentToLocal::Strong(seen_shared_array_buffers_[i]) ==
306
          shared_array_buffer) {
307
        return Just(i);
308
      }
309
    }
310
311
    seen_shared_array_buffers_.emplace_back(
312
1972
      Global<SharedArrayBuffer> { isolate, shared_array_buffer });
313
986
    msg_->AddSharedArrayBuffer(shared_array_buffer->GetBackingStore());
314
986
    return Just(i);
315
  }
316
317
2
  Maybe<uint32_t> GetWasmModuleTransferId(
318
      Isolate* isolate, Local<WasmModuleObject> module) override {
319
2
    return Just(msg_->AddWASMModule(module->GetCompiledModule()));
320
  }
321
322
76573
  Maybe<bool> Finish(Local<Context> context) {
323
87642
    for (uint32_t i = 0; i < host_objects_.size(); i++) {
324
11074
      BaseObjectPtr<BaseObject> host_object = std::move(host_objects_[i]);
325
11074
      std::unique_ptr<TransferData> data;
326
11074
      if (i < first_cloned_object_index_)
327
11037
        data = host_object->TransferForMessaging();
328
11074
      if (!data)
329
41
        data = host_object->CloneForMessaging();
330
11074
      if (!data) return Nothing<bool>();
331
22140
      if (data->FinalizeTransferWrite(context, serializer).IsNothing())
332
1
        return Nothing<bool>();
333
11069
      msg_->AddTransferable(std::move(data));
334
    }
335
76568
    return Just(true);
336
  }
337
338
11045
  inline void AddHostObject(BaseObjectPtr<BaseObject> host_object) {
339
    // Make sure we have not started serializing the value itself yet.
340
11045
    CHECK_EQ(first_cloned_object_index_, SIZE_MAX);
341
11045
    host_objects_.emplace_back(std::move(host_object));
342
11045
  }
343
344
  // Some objects in the transfer list may register sub-objects that can be
345
  // transferred. This could e.g. be a public JS wrapper object, such as a
346
  // FileHandle, that is registering its C++ handle for transfer.
347
76592
  inline Maybe<bool> AddNestedHostObjects() {
348
87636
    for (size_t i = 0; i < host_objects_.size(); i++) {
349
11044
      std::vector<BaseObjectPtr<BaseObject>> nested_transferables;
350
22088
      if (!host_objects_[i]->NestedTransferables().To(&nested_transferables))
351
        return Nothing<bool>();
352
11078
      for (auto& nested_transferable : nested_transferables) {
353
68
        if (std::find(host_objects_.begin(),
354
                      host_objects_.end(),
355
34
                      nested_transferable) == host_objects_.end()) {
356
34
          AddHostObject(nested_transferable);
357
        }
358
      }
359
    }
360
76592
    return Just(true);
361
  }
362
363
  ValueSerializer* serializer = nullptr;
364
365
 private:
366
11079
  Maybe<bool> WriteHostObject(BaseObjectPtr<BaseObject> host_object) {
367
11079
    BaseObject::TransferMode mode = host_object->GetTransferMode();
368
11079
    if (mode == BaseObject::TransferMode::kUntransferable) {
369
2
      ThrowDataCloneError(env_->clone_unsupported_type_str());
370
2
      return Nothing<bool>();
371
    }
372
373
11132
    for (uint32_t i = 0; i < host_objects_.size(); i++) {
374
11088
      if (host_objects_[i] == host_object) {
375
11033
        serializer->WriteUint32(i);
376
11033
        return Just(true);
377
      }
378
    }
379
380
44
    if (mode == BaseObject::TransferMode::kTransferable) {
381
7
      THROW_ERR_MISSING_TRANSFERABLE_IN_TRANSFER_LIST(env_);
382
7
      return Nothing<bool>();
383
    }
384
385
37
    CHECK_EQ(mode, BaseObject::TransferMode::kCloneable);
386
37
    uint32_t index = host_objects_.size();
387
37
    if (first_cloned_object_index_ == SIZE_MAX)
388
30
      first_cloned_object_index_ = index;
389
37
    serializer->WriteUint32(index);
390
37
    host_objects_.push_back(host_object);
391
37
    return Just(true);
392
  }
393
394
  Environment* env_;
395
  Local<Context> context_;
396
  Message* msg_;
397
  std::vector<Global<SharedArrayBuffer>> seen_shared_array_buffers_;
398
  std::vector<BaseObjectPtr<BaseObject>> host_objects_;
399
  size_t first_cloned_object_index_ = SIZE_MAX;
400
401
  friend class worker::Message;
402
};
403
404
}  // anonymous namespace
405
406
76602
Maybe<bool> Message::Serialize(Environment* env,
407
                               Local<Context> context,
408
                               Local<Value> input,
409
                               const TransferList& transfer_list_v,
410
                               Local<Object> source_port) {
411
153204
  HandleScope handle_scope(env->isolate());
412
76602
  Context::Scope context_scope(context);
413
414
  // Verify that we're not silently overwriting an existing message.
415
76602
  CHECK(main_message_buf_.is_empty());
416
417
153204
  SerializerDelegate delegate(env, context, this);
418
153204
  ValueSerializer serializer(env->isolate(), &delegate);
419
76602
  delegate.serializer = &serializer;
420
421
153204
  std::vector<Local<ArrayBuffer>> array_buffers;
422
87641
  for (uint32_t i = 0; i < transfer_list_v.length(); ++i) {
423
11049
    Local<Value> entry = transfer_list_v[i];
424
11049
    if (entry->IsObject()) {
425
      // See https://github.com/nodejs/node/pull/30339#issuecomment-552225353
426
      // for details.
427
      bool untransferable;
428
11049
      if (!entry.As<Object>()->HasPrivate(
429
              context,
430
11049
              env->untransferable_object_private_symbol())
431
11049
              .To(&untransferable)) {
432
        return Nothing<bool>();
433
      }
434
11049
      if (untransferable) continue;
435
    }
436
437
    // Currently, we support ArrayBuffers and BaseObjects for which
438
    // GetTransferMode() does not return kUntransferable.
439
11044
    if (entry->IsArrayBuffer()) {
440
24
      Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
441
      // If we cannot render the ArrayBuffer unusable in this Isolate,
442
      // copying the buffer will have to do.
443
      // Note that we can currently transfer ArrayBuffers even if they were
444
      // not allocated by Node’s ArrayBufferAllocator in the first place,
445
      // because we pass the underlying v8::BackingStore around rather than
446
      // raw data *and* an Isolate with a non-default ArrayBuffer allocator
447
      // is always going to outlive any Workers it creates, and so will its
448
      // allocator along with it.
449
47
      if (!ab->IsDetachable()) continue;
450
24
      if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
451
48
          array_buffers.end()) {
452
1
        ThrowDataCloneException(
453
            context,
454
            FIXED_ONE_BYTE_STRING(
455
                env->isolate(),
456
                "Transfer list contains duplicate ArrayBuffer"));
457
1
        return Nothing<bool>();
458
      }
459
      // We simply use the array index in the `array_buffers` list as the
460
      // ID that we write into the serialized buffer.
461
23
      uint32_t id = array_buffers.size();
462
23
      array_buffers.push_back(ab);
463
23
      serializer.TransferArrayBuffer(id, ab);
464
23
      continue;
465
22040
    } else if (env->base_object_ctor_template()->HasInstance(entry)) {
466
      // Check if the source MessagePort is being transferred.
467

22040
      if (!source_port.IsEmpty() && entry == source_port) {
468
1
        ThrowDataCloneException(
469
            context,
470
            FIXED_ONE_BYTE_STRING(env->isolate(),
471
                                  "Transfer list contains source port"));
472
9
        return Nothing<bool>();
473
      }
474
      BaseObjectPtr<BaseObject> host_object {
475
11019
          Unwrap<BaseObject>(entry.As<Object>()) };
476

33028
      if (env->message_port_constructor_template()->HasInstance(entry) &&
477

21979
          (!host_object ||
478
10989
           static_cast<MessagePort*>(host_object.get())->IsDetached())) {
479
7
        ThrowDataCloneException(
480
            context,
481
            FIXED_ONE_BYTE_STRING(
482
                env->isolate(),
483
                "MessagePort in transfer list is already detached"));
484
7
        return Nothing<bool>();
485
      }
486
22024
      if (std::find(delegate.host_objects_.begin(),
487
                    delegate.host_objects_.end(),
488
11012
                    host_object) != delegate.host_objects_.end()) {
489
1
        ThrowDataCloneException(
490
            context,
491
            String::Concat(env->isolate(),
492
                FIXED_ONE_BYTE_STRING(
493
                  env->isolate(),
494
                  "Transfer list contains duplicate "),
495
1
                entry.As<Object>()->GetConstructorName()));
496
1
        return Nothing<bool>();
497
      }
498

11011
      if (host_object && host_object->GetTransferMode() !=
499
11011
              BaseObject::TransferMode::kUntransferable) {
500
11011
        delegate.AddHostObject(host_object);
501
11011
        continue;
502
      }
503
    }
504
505
    THROW_ERR_INVALID_TRANSFER_OBJECT(env);
506
    return Nothing<bool>();
507
  }
508
153184
  if (delegate.AddNestedHostObjects().IsNothing())
509
    return Nothing<bool>();
510
511
76592
  serializer.WriteHeader();
512
153184
  if (serializer.WriteValue(context, input).IsNothing()) {
513
19
    return Nothing<bool>();
514
  }
515
516
76588
  for (Local<ArrayBuffer> ab : array_buffers) {
517
    // If serialization succeeded, we render it inaccessible in this Isolate.
518
30
    std::shared_ptr<BackingStore> backing_store = ab->GetBackingStore();
519
15
    ab->Detach();
520
521
15
    array_buffers_.emplace_back(std::move(backing_store));
522
  }
523
524
153146
  if (delegate.Finish(context).IsNothing())
525
5
    return Nothing<bool>();
526
527
  // The serializer gave us a buffer allocated using `malloc()`.
528
76568
  std::pair<uint8_t*, size_t> data = serializer.Release();
529
76568
  CHECK_NOT_NULL(data.first);
530
  main_message_buf_ =
531
76568
      MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
532
76568
  return Just(true);
533
}
534
535
2
void Message::MemoryInfo(MemoryTracker* tracker) const {
536
2
  tracker->TrackField("array_buffers_", array_buffers_);
537
2
  tracker->TrackField("shared_array_buffers", shared_array_buffers_);
538
2
  tracker->TrackField("transferables", transferables_);
539
2
}
540
541
35659
MessagePortData::MessagePortData(MessagePort* owner)
542
35659
    : owner_(owner) {
543
35659
}
544
545
142532
MessagePortData::~MessagePortData() {
546
71266
  CHECK_NULL(owner_);
547
71266
  Disentangle();
548
142532
}
549
550
4
void MessagePortData::MemoryInfo(MemoryTracker* tracker) const {
551
8
  Mutex::ScopedLock lock(mutex_);
552
4
  tracker->TrackField("incoming_messages", incoming_messages_);
553
4
}
554
555
112724
void MessagePortData::AddToIncomingQueue(std::shared_ptr<Message> message) {
556
  // This function will be called by other threads.
557
225448
  Mutex::ScopedLock lock(mutex_);
558
112724
  incoming_messages_.emplace_back(std::move(message));
559
560
112724
  if (owner_ != nullptr) {
561
85740
    Debug(owner_, "Adding message to incoming queue");
562
85740
    owner_->TriggerAsync();
563
  }
564
112724
}
565
566
12057
void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
567
12057
  auto group = std::make_shared<SiblingGroup>();
568
12057
  group->Entangle({a, b});
569
12057
}
570
571
59303
void MessagePortData::Disentangle() {
572
59303
  if (group_) {
573
24162
    group_->Disentangle(this);
574
  }
575
59303
}
576
577
208038
MessagePort::~MessagePort() {
578
69346
  if (data_) Detach();
579
138692
}
580
581
34699
MessagePort::MessagePort(Environment* env,
582
                         Local<Context> context,
583
34699
                         Local<Object> wrap)
584
  : HandleWrap(env,
585
               wrap,
586
34699
               reinterpret_cast<uv_handle_t*>(&async_),
587
               AsyncWrap::PROVIDER_MESSAGEPORT),
588
34699
    data_(new MessagePortData(this)) {
589
50116
  auto onmessage = [](uv_async_t* handle) {
590
    // Called when data has been put into the queue.
591
50116
    MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
592
50116
    channel->OnMessage(MessageProcessingMode::kNormalOperation);
593
50112
  };
594
595
34699
  CHECK_EQ(uv_async_init(env->event_loop(),
596
                         &async_,
597
                         onmessage), 0);
598
  // Reset later to indicate success of the constructor.
599
34699
  bool succeeded = false;
600
69398
  auto cleanup = OnScopeLeave([&]() { if (!succeeded) Close(); });
601
602
  Local<Value> fn;
603
104097
  if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn))
604
    return;
605
606
34699
  if (fn->IsFunction()) {
607
34693
    Local<Function> init = fn.As<Function>();
608
69386
    if (init->Call(context, wrap, 0, nullptr).IsEmpty())
609
      return;
610
  }
611
612
  Local<Function> emit_message_fn;
613
69398
  if (!GetEmitMessageFunction(context).ToLocal(&emit_message_fn))
614
    return;
615
34699
  emit_message_fn_.Reset(env->isolate(), emit_message_fn);
616
617
34699
  succeeded = true;
618
34699
  Debug(this, "Created message port");
619
}
620
621
32976
bool MessagePort::IsDetached() const {
622

32976
  return data_ == nullptr || IsHandleClosing();
623
}
624
625
108752
void MessagePort::TriggerAsync() {
626
108752
  if (IsHandleClosing()) return;
627
108670
  CHECK_EQ(uv_async_send(&async_), 0);
628
}
629
630
34681
void MessagePort::Close(v8::Local<v8::Value> close_callback) {
631
34681
  Debug(this, "Closing message port, data set = %d", static_cast<int>(!!data_));
632
633
34681
  if (data_) {
634
    // Wrap this call with accessing the mutex, so that TriggerAsync()
635
    // can check IsHandleClosing() without race conditions.
636
69352
    Mutex::ScopedLock sibling_lock(data_->mutex_);
637
34676
    HandleWrap::Close(close_callback);
638
  } else {
639
5
    HandleWrap::Close(close_callback);
640
  }
641
34681
}
642
643
2
void MessagePort::New(const FunctionCallbackInfo<Value>& args) {
644
  // This constructor just throws an error. Unfortunately, we can’t use V8’s
645
  // ConstructorBehavior::kThrow, as that also removes the prototype from the
646
  // class (i.e. makes it behave like an arrow function).
647
2
  Environment* env = Environment::GetCurrent(args);
648
2
  THROW_ERR_CONSTRUCT_CALL_INVALID(env);
649
2
}
650
651
34699
MessagePort* MessagePort::New(
652
    Environment* env,
653
    Local<Context> context,
654
    std::unique_ptr<MessagePortData> data,
655
    std::shared_ptr<SiblingGroup> sibling_group) {
656
34699
  Context::Scope context_scope(context);
657
34699
  Local<FunctionTemplate> ctor_templ = GetMessagePortConstructorTemplate(env);
658
659
  // Construct a new instance, then assign the listener instance and possibly
660
  // the MessagePortData to it.
661
  Local<Object> instance;
662
104097
  if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance))
663
    return nullptr;
664
34699
  MessagePort* port = new MessagePort(env, context, instance);
665
34699
  CHECK_NOT_NULL(port);
666
34699
  if (port->IsHandleClosing()) {
667
    // Construction failed with an exception.
668
    return nullptr;
669
  }
670
671
34699
  if (data) {
672
11471
    CHECK(!sibling_group);
673
11471
    port->Detach();
674
11471
    port->data_ = std::move(data);
675
676
    // This lock is here to avoid race conditions with the `owner_` read
677
    // in AddToIncomingQueue(). (This would likely be unproblematic without it,
678
    // but it's better to be safe than sorry.)
679
22942
    Mutex::ScopedLock lock(port->data_->mutex_);
680
11471
    port->data_->owner_ = port;
681
    // If the existing MessagePortData object had pending messages, this is
682
    // the easiest way to run that queue.
683
11471
    port->TriggerAsync();
684
23228
  } else if (sibling_group) {
685
74
    sibling_group->Entangle(port->data_.get());
686
  }
687
34699
  return port;
688
}
689
690
125647
MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
691
                                              MessageProcessingMode mode,
692
                                              Local<Value>* port_list) {
693
125647
  std::shared_ptr<Message> received;
694
  {
695
    // Get the head of the message queue.
696
125647
    Mutex::ScopedLock lock(data_->mutex_);
697
698
125647
    Debug(this, "MessagePort has message");
699
700
125647
    bool wants_message =
701

125647
        receiving_messages_ ||
702
        mode == MessageProcessingMode::kForceReadMessages;
703
    // We have nothing to do if:
704
    // - There are no pending messages
705
    // - We are not intending to receive messages, and the message we would
706
    //   receive is not the final "close" message.
707

212882
    if (data_->incoming_messages_.empty() ||
708
87235
        (!wants_message &&
709
10289
         !data_->incoming_messages_.front()->IsCloseMessage())) {
710
76846
      return env()->no_message_symbol();
711
    }
712
713
87224
    received = data_->incoming_messages_.front();
714
87224
    data_->incoming_messages_.pop_front();
715
  }
716
717
87224
  if (received->IsCloseMessage()) {
718
23896
    Close();
719
23896
    return env()->no_message_symbol();
720
  }
721
722
75276
  if (!env()->can_call_into_js()) return MaybeLocal<Value>();
723
724
75276
  return received->Deserialize(env(), context, port_list);
725
}
726
727
50607
void MessagePort::OnMessage(MessageProcessingMode mode) {
728
50607
  Debug(this, "Running MessagePort::OnMessage()");
729
50607
  HandleScope handle_scope(env()->isolate());
730
  Local<Context> context =
731
101214
      object(env()->isolate())->GetCreationContext().ToLocalChecked();
732
733
  size_t processing_limit;
734
50607
  if (mode == MessageProcessingMode::kNormalOperation) {
735
50116
    Mutex::ScopedLock(data_->mutex_);
736
50116
    processing_limit = std::max(data_->incoming_messages_.size(),
737
100232
                                static_cast<size_t>(1000));
738
  } else {
739
491
    processing_limit = std::numeric_limits<size_t>::max();
740
  }
741
742
  // data_ can only ever be modified by the owner thread, so no need to lock.
743
  // However, the message port may be transferred while it is processing
744
  // messages, so we need to check that this handle still owns its `data_` field
745
  // on every iteration.
746
125607
  while (data_) {
747
125607
    if (processing_limit-- == 0) {
748
      // Prevent event loop starvation by only processing those messages without
749
      // interruption that were already present when the OnMessage() call was
750
      // first triggered, but at least 1000 messages because otherwise the
751
      // overhead of repeatedly triggering the uv_async_t instance becomes
752
      // noticeable, at least on Windows.
753
      // (That might require more investigation by somebody more familiar with
754
      // Windows.)
755
1
      TriggerAsync();
756
249
      return;
757
    }
758
759
125606
    HandleScope handle_scope(env()->isolate());
760
125606
    Context::Scope context_scope(context);
761
125606
    Local<Function> emit_message = PersistentToLocal::Strong(emit_message_fn_);
762
763
    Local<Value> payload;
764
251212
    Local<Value> port_list = Undefined(env()->isolate());
765
    Local<Value> message_error;
766
502424
    Local<Value> argv[3];
767
768
    {
769
      // Catch any exceptions from parsing the message itself (not from
770
      // emitting it) as 'messageeror' events.
771
125606
      TryCatchScope try_catch(env());
772
251212
      if (!ReceiveMessage(context, mode, &port_list).ToLocal(&payload)) {
773

13
        if (try_catch.HasCaught() && !try_catch.HasTerminated())
774
13
          message_error = try_catch.Exception();
775
13
        goto reschedule;
776
      }
777
    }
778
251186
    if (payload == env()->no_message_symbol()) break;
779
780
75239
    if (!env()->can_call_into_js()) {
781
      Debug(this, "MessagePort drains queue because !can_call_into_js()");
782
      // In this case there is nothing to do but to drain the current queue.
783
      continue;
784
    }
785
786
75239
    argv[0] = payload;
787
75239
    argv[1] = port_list;
788
75239
    argv[2] = env()->message_string();
789
790
150474
    if (MakeCallback(emit_message, arraysize(argv), argv).IsEmpty()) {
791
248
    reschedule:
792
248
      if (!message_error.IsEmpty()) {
793
13
        argv[0] = message_error;
794
13
        argv[1] = Undefined(env()->isolate());
795
13
        argv[2] = env()->messageerror_string();
796
13
        USE(MakeCallback(emit_message, arraysize(argv), argv));
797
      }
798
799
      // Re-schedule OnMessage() execution in case of failure.
800
248
      if (data_)
801
248
        TriggerAsync();
802
248
      return;
803
    }
804
  }
805
}
806
807
34673
void MessagePort::OnClose() {
808
34673
  Debug(this, "MessagePort::OnClose()");
809
34673
  if (data_) {
810
    // Detach() returns move(data_).
811
23670
    Detach()->Disentangle();
812
  }
813
34673
}
814
815
46144
std::unique_ptr<MessagePortData> MessagePort::Detach() {
816
46144
  CHECK(data_);
817
92288
  Mutex::ScopedLock lock(data_->mutex_);
818
46144
  data_->owner_ = nullptr;
819
46144
  return std::move(data_);
820
}
821
822
21982
BaseObject::TransferMode MessagePort::GetTransferMode() const {
823
21982
  if (IsDetached())
824
1
    return BaseObject::TransferMode::kUntransferable;
825
21981
  return BaseObject::TransferMode::kTransferable;
826
}
827
828
10998
std::unique_ptr<TransferData> MessagePort::TransferForMessaging() {
829
21996
  Close();
830
10998
  return Detach();
831
}
832
833
10747
BaseObjectPtr<BaseObject> MessagePortData::Deserialize(
834
    Environment* env,
835
    Local<Context> context,
836
    std::unique_ptr<TransferData> self) {
837
42988
  return BaseObjectPtr<MessagePort> { MessagePort::New(
838
      env, context,
839
32241
      static_unique_pointer_cast<MessagePortData>(std::move(self))) };
840
}
841
842
76595
Maybe<bool> MessagePort::PostMessage(Environment* env,
843
                                     Local<Context> context,
844
                                     Local<Value> message_v,
845
                                     const TransferList& transfer_v) {
846
76595
  Isolate* isolate = env->isolate();
847
76595
  Local<Object> obj = object(isolate);
848
849
153190
  std::shared_ptr<Message> msg = std::make_shared<Message>();
850
851
  // Per spec, we need to both check if transfer list has the source port, and
852
  // serialize the input message, even if the MessagePort is closed or detached.
853
854
  Maybe<bool> serialization_maybe =
855
76595
      msg->Serialize(env, context, message_v, transfer_v, obj);
856
76595
  if (data_ == nullptr) {
857
    return serialization_maybe;
858
  }
859
76595
  if (serialization_maybe.IsNothing()) {
860
28
    return Nothing<bool>();
861
  }
862
863
153134
  std::string error;
864
76567
  Maybe<bool> res = data_->Dispatch(msg, &error);
865
76567
  if (res.IsNothing())
866
    return res;
867
868
76567
  if (!error.empty())
869
2
    ProcessEmitWarning(env, error.c_str());
870
871
76567
  return res;
872
}
873
874
76567
Maybe<bool> MessagePortData::Dispatch(
875
    std::shared_ptr<Message> message,
876
    std::string* error) {
877
76567
  if (!group_) {
878
    if (error != nullptr)
879
      *error = "MessagePortData is not entangled.";
880
    return Nothing<bool>();
881
  }
882
76567
  return group_->Dispatch(this, message, error);
883
}
884
885
11057
static Maybe<bool> ReadIterable(Environment* env,
886
                                Local<Context> context,
887
                                // NOLINTNEXTLINE(runtime/references)
888
                                TransferList& transfer_list,
889
                                Local<Value> object) {
890
11057
  if (!object->IsObject()) return Just(false);
891
892
11054
  if (object->IsArray()) {
893
11034
    Local<Array> arr = object.As<Array>();
894
11034
    size_t length = arr->Length();
895
11034
    transfer_list.AllocateSufficientStorage(length);
896
22081
    for (size_t i = 0; i < length; i++) {
897
22094
      if (!arr->Get(context, i).ToLocal(&transfer_list[i]))
898
        return Nothing<bool>();
899
    }
900
11034
    return Just(true);
901
  }
902
903
20
  Isolate* isolate = env->isolate();
904
  Local<Value> iterator_method;
905
40
  if (!object.As<Object>()->Get(context, Symbol::GetIterator(isolate))
906
20
      .ToLocal(&iterator_method)) return Nothing<bool>();
907
20
  if (!iterator_method->IsFunction()) return Just(false);
908
909
  Local<Value> iterator;
910
6
  if (!iterator_method.As<Function>()->Call(context, object, 0, nullptr)
911
6
      .ToLocal(&iterator)) return Nothing<bool>();
912
6
  if (!iterator->IsObject()) return Just(false);
913
914
  Local<Value> next;
915
18
  if (!iterator.As<Object>()->Get(context, env->next_string()).ToLocal(&next))
916
    return Nothing<bool>();
917
6
  if (!next->IsFunction()) return Just(false);
918
919
6
  std::vector<Local<Value>> entries;
920
5
  while (env->can_call_into_js()) {
921
    Local<Value> result;
922
5
    if (!next.As<Function>()->Call(context, iterator, 0, nullptr)
923
5
        .ToLocal(&result)) return Nothing<bool>();
924
4
    if (!result->IsObject()) return Just(false);
925
926
    Local<Value> done;
927
12
    if (!result.As<Object>()->Get(context, env->done_string()).ToLocal(&done))
928
      return Nothing<bool>();
929
4
    if (done->BooleanValue(isolate)) break;
930
931
    Local<Value> val;
932
6
    if (!result.As<Object>()->Get(context, env->value_string()).ToLocal(&val))
933
      return Nothing<bool>();
934
2
    entries.push_back(val);
935
  }
936
937
2
  transfer_list.AllocateSufficientStorage(entries.size());
938
2
  std::copy(entries.begin(), entries.end(), &transfer_list[0]);
939
2
  return Just(true);
940
}
941
942
76614
void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
943
76614
  Environment* env = Environment::GetCurrent(args);
944
76614
  Local<Object> obj = args.This();
945
153228
  Local<Context> context = obj->GetCreationContext().ToLocalChecked();
946
947
76614
  if (args.Length() == 0) {
948
    return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
949
19
                                       "MessagePort.postMessage");
950
  }
951
952

164278
  if (!args[1]->IsNullOrUndefined() && !args[1]->IsObject()) {
953
    // Browsers ignore null or undefined, and otherwise accept an array or an
954
    // options object.
955
4
    return THROW_ERR_INVALID_ARG_TYPE(env,
956
4
        "Optional transferList argument must be an iterable");
957
  }
958
959
76610
  TransferList transfer_list;
960
76610
  if (args[1]->IsObject()) {
961
    bool was_iterable;
962
22092
    if (!ReadIterable(env, context, transfer_list, args[1]).To(&was_iterable))
963
8
      return;
964
11046
    if (!was_iterable) {
965
      Local<Value> transfer_option;
966
39
      if (!args[1].As<Object>()->Get(context, env->transfer_string())
967
21
          .ToLocal(&transfer_option)) return;
968
26
      if (!transfer_option->IsUndefined()) {
969
11
        if (!ReadIterable(env, context, transfer_list, transfer_option)
970
11
            .To(&was_iterable)) return;
971
10
        if (!was_iterable) {
972
7
          return THROW_ERR_INVALID_ARG_TYPE(env,
973
7
              "Optional options.transfer argument must be an iterable");
974
        }
975
      }
976
    }
977
  }
978
979
76602
  MessagePort* port = Unwrap<MessagePort>(args.This());
980
  // Even if the backing MessagePort object has already been deleted, we still
981
  // want to serialize the message to ensure spec-compliant behavior w.r.t.
982
  // transfers.
983

76602
  if (port == nullptr || port->IsHandleClosing()) {
984
14
    Message msg;
985
7
    USE(msg.Serialize(env, context, args[0], transfer_list, obj));
986
7
    return;
987
  }
988
989
76595
  Maybe<bool> res = port->PostMessage(env, context, args[0], transfer_list);
990
76595
  if (res.IsJust())
991
229701
    args.GetReturnValue().Set(res.FromJust());
992
}
993
994
25278
void MessagePort::Start() {
995
25278
  Debug(this, "Start receiving messages");
996
25278
  receiving_messages_ = true;
997
50556
  Mutex::ScopedLock lock(data_->mutex_);
998
25278
  if (!data_->incoming_messages_.empty())
999
11292
    TriggerAsync();
1000
25278
}
1001
1002
20346
void MessagePort::Stop() {
1003
20346
  Debug(this, "Stop receiving messages");
1004
20346
  receiving_messages_ = false;
1005
20346
}
1006
1007
25279
void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
1008
  MessagePort* port;
1009
25280
  ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
1010
25279
  if (!port->data_) {
1011
1
    return;
1012
  }
1013
25278
  port->Start();
1014
}
1015
1016
20827
void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
1017
  MessagePort* port;
1018
20827
  CHECK(args[0]->IsObject());
1019
41656
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
1020
20348
  if (!port->data_) {
1021
2
    return;
1022
  }
1023
20346
  port->Stop();
1024
}
1025
1026
27
void MessagePort::CheckType(const FunctionCallbackInfo<Value>& args) {
1027
27
  Environment* env = Environment::GetCurrent(args);
1028
27
  args.GetReturnValue().Set(
1029
81
      GetMessagePortConstructorTemplate(env)->HasInstance(args[0]));
1030
27
}
1031
1032
1858
void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
1033
  MessagePort* port;
1034
3716
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
1035
491
  port->OnMessage(MessageProcessingMode::kForceReadMessages);
1036
}
1037
1038
46
void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) {
1039
46
  Environment* env = Environment::GetCurrent(args);
1040
89
  if (!args[0]->IsObject() ||
1041

175
      !env->message_port_constructor_template()->HasInstance(args[0])) {
1042
5
    return THROW_ERR_INVALID_ARG_TYPE(env,
1043
5
        "The \"port\" argument must be a MessagePort instance");
1044
  }
1045
82
  MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1046
41
  if (port == nullptr) {
1047
    // Return 'no messages' for a closed port.
1048
    args.GetReturnValue().Set(
1049
        Environment::GetCurrent(args)->no_message_symbol());
1050
    return;
1051
  }
1052
1053
  MaybeLocal<Value> payload = port->ReceiveMessage(
1054
123
      port->object()->GetCreationContext().ToLocalChecked(),
1055
41
      MessageProcessingMode::kForceReadMessages);
1056
41
  if (!payload.IsEmpty())
1057
82
    args.GetReturnValue().Set(payload.ToLocalChecked());
1058
}
1059
1060
6
void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
1061
6
  Environment* env = Environment::GetCurrent(args);
1062
12
  if (!args[0]->IsObject() ||
1063

24
      !env->message_port_constructor_template()->HasInstance(args[0])) {
1064
    return THROW_ERR_INVALID_ARG_TYPE(env,
1065
1
        "The \"port\" argument must be a MessagePort instance");
1066
  }
1067
12
  MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1068

6
  if (port == nullptr || port->IsHandleClosing()) {
1069
1
    Isolate* isolate = env->isolate();
1070
1
    THROW_ERR_CLOSED_MESSAGE_PORT(isolate);
1071
1
    return;
1072
  }
1073
1074
5
  Local<Value> context_arg = args[1];
1075
  ContextifyContext* context_wrapper;
1076

10
  if (!context_arg->IsObject() ||
1077
5
      (context_wrapper = ContextifyContext::ContextFromContextifiedSandbox(
1078
10
          env, context_arg.As<Object>())) == nullptr) {
1079
    return THROW_ERR_INVALID_ARG_TYPE(env, "Invalid context argument");
1080
  }
1081
1082
5
  std::unique_ptr<MessagePortData> data;
1083
5
  if (!port->IsDetached())
1084
5
    data = port->Detach();
1085
1086
5
  Context::Scope context_scope(context_wrapper->context());
1087
  MessagePort* target =
1088
5
      MessagePort::New(env, context_wrapper->context(), std::move(data));
1089
5
  if (target != nullptr)
1090
10
    args.GetReturnValue().Set(target->object());
1091
}
1092
1093
11097
void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
1094
11097
  MessagePortData::Entangle(a->data_.get(), b->data_.get());
1095
11097
}
1096
1097
960
void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
1098
960
  MessagePortData::Entangle(a->data_.get(), b);
1099
960
}
1100
1101
4
void MessagePort::MemoryInfo(MemoryTracker* tracker) const {
1102
4
  tracker->TrackField("data", data_);
1103
4
  tracker->TrackField("emit_message_fn", emit_message_fn_);
1104
4
}
1105
1106
36286
Local<FunctionTemplate> GetMessagePortConstructorTemplate(Environment* env) {
1107
  // Factor generating the MessagePort JS constructor into its own piece
1108
  // of code, because it is needed early on in the child environment setup.
1109
36286
  Local<FunctionTemplate> templ = env->message_port_constructor_template();
1110
36286
  if (!templ.IsEmpty())
1111
35506
    return templ;
1112
1113
  {
1114
780
    Isolate* isolate = env->isolate();
1115
780
    Local<FunctionTemplate> m = NewFunctionTemplate(isolate, MessagePort::New);
1116
780
    m->SetClassName(env->message_port_constructor_string());
1117
1560
    m->InstanceTemplate()->SetInternalFieldCount(
1118
        MessagePort::kInternalFieldCount);
1119
780
    m->Inherit(HandleWrap::GetConstructorTemplate(env));
1120
1121
780
    SetProtoMethod(isolate, m, "postMessage", MessagePort::PostMessage);
1122
780
    SetProtoMethod(isolate, m, "start", MessagePort::Start);
1123
1124
780
    env->set_message_port_constructor_template(m);
1125
  }
1126
1127
780
  return GetMessagePortConstructorTemplate(env);
1128
}
1129
1130
38130
JSTransferable::JSTransferable(Environment* env, Local<Object> obj)
1131
38130
    : BaseObject(env, obj) {
1132
38130
  MakeWeak();
1133
38130
}
1134
1135
38130
void JSTransferable::New(const FunctionCallbackInfo<Value>& args) {
1136
38130
  CHECK(args.IsConstructCall());
1137
76260
  new JSTransferable(Environment::GetCurrent(args), args.This());
1138
38130
}
1139
1140
86
JSTransferable::TransferMode JSTransferable::GetTransferMode() const {
1141
  // Implement `kClone in this ? kCloneable : kTransferable`.
1142
172
  HandleScope handle_scope(env()->isolate());
1143
172
  errors::TryCatchScope ignore_exceptions(env());
1144
1145
  bool has_clone;
1146
86
  if (!object()->Has(env()->context(),
1147
258
                     env()->messaging_clone_symbol()).To(&has_clone)) {
1148
    return TransferMode::kUntransferable;
1149
  }
1150
1151
86
  return has_clone ? TransferMode::kCloneable : TransferMode::kTransferable;
1152
}
1153
1154
33
std::unique_ptr<TransferData> JSTransferable::TransferForMessaging() {
1155
33
  return TransferOrClone(TransferMode::kTransferable);
1156
}
1157
1158
24
std::unique_ptr<TransferData> JSTransferable::CloneForMessaging() const {
1159
24
  return TransferOrClone(TransferMode::kCloneable);
1160
}
1161
1162
57
std::unique_ptr<TransferData> JSTransferable::TransferOrClone(
1163
    TransferMode mode) const {
1164
  // Call `this[symbol]()` where `symbol` is `kClone` or `kTransfer`,
1165
  // which should return an object with `data` and `deserializeInfo` properties;
1166
  // `data` is written to the serializer later, and `deserializeInfo` is stored
1167
  // on the `TransferData` instance as a string.
1168
114
  HandleScope handle_scope(env()->isolate());
1169
57
  Local<Context> context = env()->isolate()->GetCurrentContext();
1170
  Local<Symbol> method_name = mode == TransferMode::kCloneable ?
1171
57
      env()->messaging_clone_symbol() : env()->messaging_transfer_symbol();
1172
1173
  Local<Value> method;
1174
171
  if (!object()->Get(context, method_name).ToLocal(&method)) {
1175
    return {};
1176
  }
1177
57
  if (method->IsFunction()) {
1178
    Local<Value> result_v;
1179
53
    if (!method.As<Function>()->Call(
1180
159
            context, object(), 0, nullptr).ToLocal(&result_v)) {
1181
53
      return {};
1182
    }
1183
1184
49
    if (result_v->IsObject()) {
1185
49
      Local<Object> result = result_v.As<Object>();
1186
      Local<Value> data;
1187
      Local<Value> deserialize_info;
1188
196
      if (!result->Get(context, env()->data_string()).ToLocal(&data) ||
1189
147
          !result->Get(context, env()->deserialize_info_string())
1190
49
              .ToLocal(&deserialize_info)) {
1191
        return {};
1192
      }
1193
98
      Utf8Value deserialize_info_str(env()->isolate(), deserialize_info);
1194
49
      if (*deserialize_info_str == nullptr) return {};
1195
49
      return std::make_unique<Data>(
1196
196
          *deserialize_info_str, Global<Value>(env()->isolate(), data));
1197
    }
1198
  }
1199
1200
4
  if (mode == TransferMode::kTransferable)
1201
    return TransferOrClone(TransferMode::kCloneable);
1202
  else
1203
4
    return {};
1204
}
1205
1206
Maybe<BaseObjectList>
1207
33
JSTransferable::NestedTransferables() const {
1208
  // Call `this[kTransferList]()` and return the resulting list of BaseObjects.
1209
66
  HandleScope handle_scope(env()->isolate());
1210
33
  Local<Context> context = env()->isolate()->GetCurrentContext();
1211
33
  Local<Symbol> method_name = env()->messaging_transfer_list_symbol();
1212
1213
  Local<Value> method;
1214
99
  if (!object()->Get(context, method_name).ToLocal(&method)) {
1215
    return Nothing<BaseObjectList>();
1216
  }
1217
33
  if (!method->IsFunction()) return Just(BaseObjectList {});
1218
1219
  Local<Value> list_v;
1220
33
  if (!method.As<Function>()->Call(
1221
99
          context, object(), 0, nullptr).ToLocal(&list_v)) {
1222
    return Nothing<BaseObjectList>();
1223
  }
1224
33
  if (!list_v->IsArray()) return Just(BaseObjectList {});
1225
33
  Local<Array> list = list_v.As<Array>();
1226
1227
66
  BaseObjectList ret;
1228
134
  for (size_t i = 0; i < list->Length(); i++) {
1229
    Local<Value> value;
1230
68
    if (!list->Get(context, i).ToLocal(&value))
1231
      return Nothing<BaseObjectList>();
1232
68
    if (env()->base_object_ctor_template()->HasInstance(value))
1233
34
      ret.emplace_back(Unwrap<BaseObject>(value));
1234
  }
1235
33
  return Just(ret);
1236
}
1237
1238
32
Maybe<bool> JSTransferable::FinalizeTransferRead(
1239
    Local<Context> context, ValueDeserializer* deserializer) {
1240
  // Call `this[kDeserialize](data)` where `data` comes from the return value
1241
  // of `this[kTransfer]()` or `this[kClone]()`.
1242
64
  HandleScope handle_scope(env()->isolate());
1243
  Local<Value> data;
1244
64
  if (!deserializer->ReadValue(context).ToLocal(&data)) return Nothing<bool>();
1245
1246
32
  Local<Symbol> method_name = env()->messaging_deserialize_symbol();
1247
  Local<Value> method;
1248
96
  if (!object()->Get(context, method_name).ToLocal(&method)) {
1249
    return Nothing<bool>();
1250
  }
1251
32
  if (!method->IsFunction()) return Just(true);
1252
1253
96
  if (method.As<Function>()->Call(context, object(), 1, &data).IsEmpty()) {
1254
    return Nothing<bool>();
1255
  }
1256
32
  return Just(true);
1257
}
1258
1259
49
JSTransferable::Data::Data(std::string&& deserialize_info,
1260
49
                           v8::Global<v8::Value>&& data)
1261
49
    : deserialize_info_(std::move(deserialize_info)),
1262
49
      data_(std::move(data)) {}
1263
1264
42
BaseObjectPtr<BaseObject> JSTransferable::Data::Deserialize(
1265
    Environment* env,
1266
    Local<Context> context,
1267
    std::unique_ptr<TransferData> self) {
1268
  // Create the JS wrapper object that will later be filled with data passed to
1269
  // the `[kDeserialize]()` method on it. This split is necessary, because here
1270
  // we need to create an object with the right prototype and internal fields,
1271
  // but the actual JS data stored in the serialized data can only be read at
1272
  // the end of the stream, after the main message has been read.
1273
1274
84
  if (context != env->context()) {
1275
1
    THROW_ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE(env);
1276
1
    return {};
1277
  }
1278
82
  HandleScope handle_scope(env->isolate());
1279
  Local<Value> info;
1280
82
  if (!ToV8Value(context, deserialize_info_).ToLocal(&info)) return {};
1281
1282
  Local<Value> ret;
1283
82
  CHECK(!env->messaging_deserialize_create_object().IsEmpty());
1284
41
  if (!env->messaging_deserialize_create_object()->Call(
1285
155
          context, Null(env->isolate()), 1, &info).ToLocal(&ret) ||
1286

105
      !env->base_object_ctor_template()->HasInstance(ret)) {
1287
9
    return {};
1288
  }
1289
1290
32
  return BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(ret) };
1291
}
1292
1293
49
Maybe<bool> JSTransferable::Data::FinalizeTransferWrite(
1294
    Local<Context> context, ValueSerializer* serializer) {
1295
49
  HandleScope handle_scope(context->GetIsolate());
1296
49
  auto ret = serializer->WriteValue(context, PersistentToLocal::Strong(data_));
1297
49
  data_.Reset();
1298
49
  return ret;
1299
}
1300
1301
74
std::shared_ptr<SiblingGroup> SiblingGroup::Get(const std::string& name) {
1302
148
  Mutex::ScopedLock lock(SiblingGroup::groups_mutex_);
1303
74
  std::shared_ptr<SiblingGroup> group;
1304
74
  auto i = groups_.find(name);
1305

74
  if (i == groups_.end() || i->second.expired()) {
1306
31
    group = std::make_shared<SiblingGroup>(name);
1307
31
    groups_[name] = group;
1308
  } else {
1309
43
    group = i->second.lock();
1310
  }
1311
74
  return group;
1312
}
1313
1314
30
void SiblingGroup::CheckSiblingGroup(const std::string& name) {
1315
60
  Mutex::ScopedLock lock(SiblingGroup::groups_mutex_);
1316
30
  auto i = groups_.find(name);
1317

30
  if (i != groups_.end() && i->second.expired())
1318
30
    groups_.erase(name);
1319
30
}
1320
1321
31
SiblingGroup::SiblingGroup(const std::string& name)
1322
31
    : name_(name) { }
1323
1324
12062
SiblingGroup::~SiblingGroup() {
1325
  // If this is a named group, check to see if we can remove the group
1326
12062
  if (!name_.empty())
1327
30
    CheckSiblingGroup(name_);
1328
12062
}
1329
1330
76567
Maybe<bool> SiblingGroup::Dispatch(
1331
    MessagePortData* source,
1332
    std::shared_ptr<Message> message,
1333
    std::string* error) {
1334
1335
153134
  RwLock::ScopedReadLock lock(group_mutex_);
1336
1337
  // The source MessagePortData is not part of this group.
1338
76567
  if (ports_.find(source) == ports_.end()) {
1339
    if (error != nullptr)
1340
      *error = "Source MessagePort is not entangled with this group.";
1341
    return Nothing<bool>();
1342
  }
1343
1344
  // There are no destination ports.
1345
76567
  if (size() <= 1)
1346
83
    return Just(false);
1347
1348
  // Transferables cannot be used when there is more
1349
  // than a single destination.
1350

76484
  if (size() > 2 && message->has_transferables()) {
1351
    if (error != nullptr)
1352
      *error = "Transferables cannot be used with multiple destinations.";
1353
    return Nothing<bool>();
1354
  }
1355
1356
229471
  for (MessagePortData* port : ports_) {
1357
152989
    if (port == source)
1358
76483
      continue;
1359
    // This loop should only be entered if there's only a single destination
1360
87571
    for (const auto& transferable : message->transferables()) {
1361
11067
      if (port == transferable.get()) {
1362
2
        if (error != nullptr) {
1363
          *error = "The target port was posted to itself, and the "
1364
2
                   "communication channel was lost";
1365
        }
1366
2
        return Just(true);
1367
      }
1368
    }
1369
76504
    port->AddToIncomingQueue(message);
1370
  }
1371
1372
76482
  return Just(true);
1373
}
1374
1375
74
void SiblingGroup::Entangle(MessagePortData* port) {
1376
74
  Entangle({ port });
1377
74
}
1378
1379
12131
void SiblingGroup::Entangle(std::initializer_list<MessagePortData*> ports) {
1380
24262
  RwLock::ScopedWriteLock lock(group_mutex_);
1381
36319
  for (MessagePortData* data : ports) {
1382
24188
    ports_.insert(data);
1383
24188
    CHECK(!data->group_);
1384
24188
    data->group_ = shared_from_this();
1385
  }
1386
12131
}
1387
1388
24162
void SiblingGroup::Disentangle(MessagePortData* data) {
1389
48324
  auto self = shared_from_this();  // Keep alive until end of function.
1390
48324
  RwLock::ScopedWriteLock lock(group_mutex_);
1391
24162
  ports_.erase(data);
1392
24162
  data->group_.reset();
1393
1394
24162
  data->AddToIncomingQueue(std::make_shared<Message>());
1395
  // If this is an anonymous group and there's another port, close it.
1396

24162
  if (size() == 1 && name_.empty())
1397
12058
    (*(ports_.begin()))->AddToIncomingQueue(std::make_shared<Message>());
1398
24162
}
1399
1400
SiblingGroup::Map SiblingGroup::groups_;
1401
Mutex SiblingGroup::groups_mutex_;
1402
1403
namespace {
1404
1405
780
static void SetDeserializerCreateObjectFunction(
1406
    const FunctionCallbackInfo<Value>& args) {
1407
780
  Environment* env = Environment::GetCurrent(args);
1408
780
  CHECK(args[0]->IsFunction());
1409
1560
  env->set_messaging_deserialize_create_object(args[0].As<Function>());
1410
780
}
1411
1412
11098
static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
1413
11098
  Environment* env = Environment::GetCurrent(args);
1414
11098
  if (!args.IsConstructCall()) {
1415
1
    THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
1416
1
    return;
1417
  }
1418
1419
22194
  Local<Context> context = args.This()->GetCreationContext().ToLocalChecked();
1420
11097
  Context::Scope context_scope(context);
1421
1422
11097
  MessagePort* port1 = MessagePort::New(env, context);
1423
11097
  if (port1 == nullptr) return;
1424
11097
  MessagePort* port2 = MessagePort::New(env, context);
1425
11097
  if (port2 == nullptr) {
1426
    port1->Close();
1427
    return;
1428
  }
1429
1430
11097
  MessagePort::Entangle(port1, port2);
1431
1432
44388
  args.This()->Set(context, env->port1_string(), port1->object())
1433
      .Check();
1434
44388
  args.This()->Set(context, env->port2_string(), port2->object())
1435
      .Check();
1436
}
1437
1438
74
static void BroadcastChannel(const FunctionCallbackInfo<Value>& args) {
1439
148
  CHECK(args[0]->IsString());
1440
74
  Environment* env = Environment::GetCurrent(args);
1441
148
  Context::Scope context_scope(env->context());
1442
148
  Utf8Value name(env->isolate(), args[0]);
1443
  MessagePort* port =
1444
74
      MessagePort::New(env, env->context(), {}, SiblingGroup::Get(*name));
1445
74
  if (port != nullptr) {
1446
148
    args.GetReturnValue().Set(port->object());
1447
  }
1448
74
}
1449
1450
780
static void InitMessaging(Local<Object> target,
1451
                          Local<Value> unused,
1452
                          Local<Context> context,
1453
                          void* priv) {
1454
780
  Environment* env = Environment::GetCurrent(context);
1455
780
  Isolate* isolate = env->isolate();
1456
1457
  {
1458
780
    SetConstructorFunction(context,
1459
                           target,
1460
                           "MessageChannel",
1461
                           NewFunctionTemplate(isolate, MessageChannel));
1462
  }
1463
1464
  {
1465
    Local<FunctionTemplate> t =
1466
780
        NewFunctionTemplate(isolate, JSTransferable::New);
1467
780
    t->Inherit(BaseObject::GetConstructorTemplate(env));
1468
1560
    t->InstanceTemplate()->SetInternalFieldCount(
1469
        JSTransferable::kInternalFieldCount);
1470
780
    SetConstructorFunction(context, target, "JSTransferable", t);
1471
  }
1472
1473
780
  SetConstructorFunction(context,
1474
                         target,
1475
                         env->message_port_constructor_string(),
1476
                         GetMessagePortConstructorTemplate(env));
1477
1478
  // These are not methods on the MessagePort prototype, because
1479
  // the browser equivalents do not provide them.
1480
780
  SetMethod(context, target, "stopMessagePort", MessagePort::Stop);
1481
780
  SetMethod(context, target, "checkMessagePort", MessagePort::CheckType);
1482
780
  SetMethod(context, target, "drainMessagePort", MessagePort::Drain);
1483
780
  SetMethod(
1484
      context, target, "receiveMessageOnPort", MessagePort::ReceiveMessage);
1485
780
  SetMethod(
1486
      context, target, "moveMessagePortToContext", MessagePort::MoveToContext);
1487
780
  SetMethod(context,
1488
            target,
1489
            "setDeserializerCreateObjectFunction",
1490
            SetDeserializerCreateObjectFunction);
1491
780
  SetMethod(context, target, "broadcastChannel", BroadcastChannel);
1492
1493
  {
1494
1560
    Local<Function> domexception = GetDOMException(context).ToLocalChecked();
1495
    target
1496
780
        ->Set(context,
1497
              FIXED_ONE_BYTE_STRING(env->isolate(), "DOMException"),
1498
1560
              domexception)
1499
        .Check();
1500
  }
1501
780
}
1502
1503
5473
static void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
1504
5473
  registry->Register(MessageChannel);
1505
5473
  registry->Register(BroadcastChannel);
1506
5473
  registry->Register(JSTransferable::New);
1507
5473
  registry->Register(MessagePort::New);
1508
5473
  registry->Register(MessagePort::PostMessage);
1509
5473
  registry->Register(MessagePort::Start);
1510
5473
  registry->Register(MessagePort::Stop);
1511
5473
  registry->Register(MessagePort::CheckType);
1512
5473
  registry->Register(MessagePort::Drain);
1513
5473
  registry->Register(MessagePort::ReceiveMessage);
1514
5473
  registry->Register(MessagePort::MoveToContext);
1515
5473
  registry->Register(SetDeserializerCreateObjectFunction);
1516
5473
}
1517
1518
}  // anonymous namespace
1519
1520
}  // namespace worker
1521
}  // namespace node
1522
1523
5545
NODE_MODULE_CONTEXT_AWARE_INTERNAL(messaging, node::worker::InitMessaging)
1524
5473
NODE_MODULE_EXTERNAL_REFERENCE(messaging,
1525
                               node::worker::RegisterExternalReferences)