GCC Code Coverage Report
Directory: ./ Exec Total Coverage
File: node_messaging.cc Lines: 747 805 92.8 %
Date: 2022-09-19 04:21:54 Branches: 371 506 73.3 %

Line Branch Exec Source
1
#include "node_messaging.h"
2
3
#include "async_wrap-inl.h"
4
#include "debug_utils-inl.h"
5
#include "memory_tracker-inl.h"
6
#include "node_buffer.h"
7
#include "node_contextify.h"
8
#include "node_errors.h"
9
#include "node_external_reference.h"
10
#include "node_process-inl.h"
11
#include "util-inl.h"
12
13
using node::contextify::ContextifyContext;
14
using node::errors::TryCatchScope;
15
using v8::Array;
16
using v8::ArrayBuffer;
17
using v8::BackingStore;
18
using v8::CompiledWasmModule;
19
using v8::Context;
20
using v8::EscapableHandleScope;
21
using v8::Function;
22
using v8::FunctionCallbackInfo;
23
using v8::FunctionTemplate;
24
using v8::Global;
25
using v8::HandleScope;
26
using v8::Isolate;
27
using v8::Just;
28
using v8::Local;
29
using v8::Maybe;
30
using v8::MaybeLocal;
31
using v8::Nothing;
32
using v8::Object;
33
using v8::SharedArrayBuffer;
34
using v8::String;
35
using v8::Symbol;
36
using v8::Value;
37
using v8::ValueDeserializer;
38
using v8::ValueSerializer;
39
using v8::WasmModuleObject;
40
41
namespace node {
42
43
using BaseObjectList = std::vector<BaseObjectPtr<BaseObject>>;
44
45
1
BaseObject::TransferMode BaseObject::GetTransferMode() const {
46
1
  return BaseObject::TransferMode::kUntransferable;
47
}
48
49
std::unique_ptr<worker::TransferData> BaseObject::TransferForMessaging() {
50
  return CloneForMessaging();
51
}
52
53
std::unique_ptr<worker::TransferData> BaseObject::CloneForMessaging() const {
54
  return {};
55
}
56
57
11012
Maybe<BaseObjectList> BaseObject::NestedTransferables() const {
58
11012
  return Just(BaseObjectList {});
59
}
60
61
10770
Maybe<bool> BaseObject::FinalizeTransferRead(
62
    Local<Context> context, ValueDeserializer* deserializer) {
63
10770
  return Just(true);
64
}
65
66
namespace worker {
67
68
11022
Maybe<bool> TransferData::FinalizeTransferWrite(
69
    Local<Context> context, ValueSerializer* serializer) {
70
11022
  return Just(true);
71
}
72
73
112928
Message::Message(MallocedBuffer<char>&& buffer)
74
112928
    : main_message_buf_(std::move(buffer)) {}
75
76
172993
bool Message::IsCloseMessage() const {
77
172993
  return main_message_buf_.data == nullptr;
78
}
79
80
namespace {
81
82
// This is used to tell V8 how to read transferred host objects, like other
83
// `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
84
class DeserializerDelegate : public ValueDeserializer::Delegate {
85
 public:
86
75377
  DeserializerDelegate(
87
      Message* m,
88
      Environment* env,
89
      const std::vector<BaseObjectPtr<BaseObject>>& host_objects,
90
      const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
91
      const std::vector<CompiledWasmModule>& wasm_modules)
92
75377
      : host_objects_(host_objects),
93
        shared_array_buffers_(shared_array_buffers),
94
75377
        wasm_modules_(wasm_modules) {}
95
96
10801
  MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
97
    // Identifying the index in the message's BaseObject array is sufficient.
98
    uint32_t id;
99
10801
    if (!deserializer->ReadUint32(&id))
100
      return MaybeLocal<Object>();
101
10801
    CHECK_LT(id, host_objects_.size());
102
21602
    return host_objects_[id]->object(isolate);
103
  }
104
105
753
  MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
106
      Isolate* isolate, uint32_t clone_id) override {
107
753
    CHECK_LT(clone_id, shared_array_buffers_.size());
108
1506
    return shared_array_buffers_[clone_id];
109
  }
110
111
2
  MaybeLocal<WasmModuleObject> GetWasmModuleFromId(
112
      Isolate* isolate, uint32_t transfer_id) override {
113
2
    CHECK_LT(transfer_id, wasm_modules_.size());
114
    return WasmModuleObject::FromCompiledModule(
115
2
        isolate, wasm_modules_[transfer_id]);
116
  }
117
118
  ValueDeserializer* deserializer = nullptr;
119
120
 private:
121
  const std::vector<BaseObjectPtr<BaseObject>>& host_objects_;
122
  const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
123
  const std::vector<CompiledWasmModule>& wasm_modules_;
124
};
125
126
}  // anonymous namespace
127
128
75390
MaybeLocal<Value> Message::Deserialize(Environment* env,
129
                                       Local<Context> context,
130
                                       Local<Value>* port_list) {
131
75390
  Context::Scope context_scope(context);
132
133
75390
  CHECK(!IsCloseMessage());
134

75390
  if (port_list != nullptr && !transferables_.empty()) {
135
    // Need to create this outside of the EscapableHandleScope, but inside
136
    // the Context::Scope.
137
21556
    *port_list = Array::New(env->isolate());
138
  }
139
140
75390
  EscapableHandleScope handle_scope(env->isolate());
141
142
  // Create all necessary objects for transferables, e.g. MessagePort handles.
143
150780
  std::vector<BaseObjectPtr<BaseObject>> host_objects(transferables_.size());
144
75390
  auto cleanup = OnScopeLeave([&]() {
145
75406
    for (BaseObjectPtr<BaseObject> object : host_objects) {
146
16
      if (!object) continue;
147
148
      // If the function did not finish successfully, host_objects will contain
149
      // a list of objects that will never be passed to JS. Therefore, we
150
      // destroy them here.
151
      object->Detach();
152
    }
153
150780
  });
154
155
86192
  for (uint32_t i = 0; i < transferables_.size(); ++i) {
156
10815
    HandleScope handle_scope(env->isolate());
157
10815
    TransferData* data = transferables_[i].get();
158
21630
    host_objects[i] = data->Deserialize(
159
21630
        env, context, std::move(transferables_[i]));
160
10815
    if (!host_objects[i]) return {};
161
10802
    if (port_list != nullptr) {
162
      // If we gather a list of all message ports, and this transferred object
163
      // is a message port, add it to that list. This is a bit of an odd case
164
      // of special handling for MessagePorts (as opposed to applying to all
165
      // transferables), but it's required for spec compliance.
166
      DCHECK((*port_list)->IsArray());
167
10802
      Local<Array> port_list_array = port_list->As<Array>();
168
10802
      Local<Object> obj = host_objects[i]->object();
169
21604
      if (env->message_port_constructor_template()->HasInstance(obj)) {
170
10754
        if (port_list_array->Set(context,
171
                                 port_list_array->Length(),
172
21508
                                 obj).IsNothing()) {
173
          return {};
174
        }
175
      }
176
    }
177
  }
178
75377
  transferables_.clear();
179
180
150754
  std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
181
  // Attach all transferred SharedArrayBuffers to their new Isolate.
182
76130
  for (uint32_t i = 0; i < shared_array_buffers_.size(); ++i) {
183
    Local<SharedArrayBuffer> sab =
184
753
        SharedArrayBuffer::New(env->isolate(), shared_array_buffers_[i]);
185
753
    shared_array_buffers.push_back(sab);
186
  }
187
188
  DeserializerDelegate delegate(
189
150754
      this, env, host_objects, shared_array_buffers, wasm_modules_);
190
  ValueDeserializer deserializer(
191
      env->isolate(),
192
75377
      reinterpret_cast<const uint8_t*>(main_message_buf_.data),
193
      main_message_buf_.size,
194
150754
      &delegate);
195
75377
  delegate.deserializer = &deserializer;
196
197
  // Attach all transferred ArrayBuffers to their new Isolate.
198
75386
  for (uint32_t i = 0; i < array_buffers_.size(); ++i) {
199
    Local<ArrayBuffer> ab =
200
9
        ArrayBuffer::New(env->isolate(), std::move(array_buffers_[i]));
201
9
    deserializer.TransferArrayBuffer(i, ab);
202
  }
203
204
150754
  if (deserializer.ReadHeader(context).IsNothing())
205
    return {};
206
  Local<Value> return_value;
207
150754
  if (!deserializer.ReadValue(context).ToLocal(&return_value))
208
3
    return {};
209
210
86176
  for (BaseObjectPtr<BaseObject> base_object : host_objects) {
211
21604
    if (base_object->FinalizeTransferRead(context, &deserializer).IsNothing())
212
      return {};
213
  }
214
215
75374
  host_objects.clear();
216
75374
  return handle_scope.Escape(return_value);
217
}
218
219
987
void Message::AddSharedArrayBuffer(
220
    std::shared_ptr<BackingStore> backing_store) {
221
987
  shared_array_buffers_.emplace_back(std::move(backing_store));
222
987
}
223
224
11070
void Message::AddTransferable(std::unique_ptr<TransferData>&& data) {
225
11070
  transferables_.emplace_back(std::move(data));
226
11070
}
227
228
2
uint32_t Message::AddWASMModule(CompiledWasmModule&& mod) {
229
2
  wasm_modules_.emplace_back(std::move(mod));
230
2
  return wasm_modules_.size() - 1;
231
}
232
233
namespace {
234
235
34716
MaybeLocal<Function> GetEmitMessageFunction(Local<Context> context) {
236
34716
  Isolate* isolate = context->GetIsolate();
237
  Local<Object> per_context_bindings;
238
  Local<Value> emit_message_val;
239
104148
  if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
240
69432
      !per_context_bindings->Get(context,
241
104148
                                FIXED_ONE_BYTE_STRING(isolate, "emitMessage"))
242
34716
          .ToLocal(&emit_message_val)) {
243
    return MaybeLocal<Function>();
244
  }
245
34716
  CHECK(emit_message_val->IsFunction());
246
34716
  return emit_message_val.As<Function>();
247
}
248
249
810
MaybeLocal<Function> GetDOMException(Local<Context> context) {
250
810
  Isolate* isolate = context->GetIsolate();
251
  Local<Object> per_context_bindings;
252
  Local<Value> domexception_ctor_val;
253
2430
  if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
254
1620
      !per_context_bindings->Get(context,
255
2430
                                FIXED_ONE_BYTE_STRING(isolate, "DOMException"))
256
810
          .ToLocal(&domexception_ctor_val)) {
257
    return MaybeLocal<Function>();
258
  }
259
810
  CHECK(domexception_ctor_val->IsFunction());
260
810
  Local<Function> domexception_ctor = domexception_ctor_val.As<Function>();
261
810
  return domexception_ctor;
262
}
263
264
22
void ThrowDataCloneException(Local<Context> context, Local<String> message) {
265
22
  Isolate* isolate = context->GetIsolate();
266
  Local<Value> argv[] = {message,
267
44
                         FIXED_ONE_BYTE_STRING(isolate, "DataCloneError")};
268
  Local<Value> exception;
269
  Local<Function> domexception_ctor;
270
66
  if (!GetDOMException(context).ToLocal(&domexception_ctor) ||
271
44
      !domexception_ctor->NewInstance(context, arraysize(argv), argv)
272
22
           .ToLocal(&exception)) {
273
    return;
274
  }
275
22
  isolate->ThrowException(exception);
276
}
277
278
// This tells V8 how to serialize objects that it does not understand
279
// (e.g. C++ objects) into the output buffer, in a way that our own
280
// DeserializerDelegate understands how to unpack.
281
class SerializerDelegate : public ValueSerializer::Delegate {
282
 public:
283
76722
  SerializerDelegate(Environment* env, Local<Context> context, Message* m)
284
76722
      : env_(env), context_(context), msg_(m) {}
285
286
12
  void ThrowDataCloneError(Local<String> message) override {
287
12
    ThrowDataCloneException(context_, message);
288
12
  }
289
290
11080
  Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
291
22160
    if (env_->base_object_ctor_template()->HasInstance(object)) {
292
      return WriteHostObject(
293
11080
          BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(object) });
294
    }
295
296
    ThrowDataCloneError(env_->clone_unsupported_type_str());
297
    return Nothing<bool>();
298
  }
299
300
987
  Maybe<uint32_t> GetSharedArrayBufferId(
301
      Isolate* isolate,
302
      Local<SharedArrayBuffer> shared_array_buffer) override {
303
    uint32_t i;
304
1009
    for (i = 0; i < seen_shared_array_buffers_.size(); ++i) {
305
44
      if (PersistentToLocal::Strong(seen_shared_array_buffers_[i]) ==
306
          shared_array_buffer) {
307
        return Just(i);
308
      }
309
    }
310
311
    seen_shared_array_buffers_.emplace_back(
312
1974
      Global<SharedArrayBuffer> { isolate, shared_array_buffer });
313
987
    msg_->AddSharedArrayBuffer(shared_array_buffer->GetBackingStore());
314
987
    return Just(i);
315
  }
316
317
2
  Maybe<uint32_t> GetWasmModuleTransferId(
318
      Isolate* isolate, Local<WasmModuleObject> module) override {
319
2
    return Just(msg_->AddWASMModule(module->GetCompiledModule()));
320
  }
321
322
76693
  Maybe<bool> Finish(Local<Context> context) {
323
87763
    for (uint32_t i = 0; i < host_objects_.size(); i++) {
324
11075
      BaseObjectPtr<BaseObject> host_object = std::move(host_objects_[i]);
325
11075
      std::unique_ptr<TransferData> data;
326
11075
      if (i < first_cloned_object_index_)
327
11038
        data = host_object->TransferForMessaging();
328
11075
      if (!data)
329
41
        data = host_object->CloneForMessaging();
330
11075
      if (!data) return Nothing<bool>();
331
22142
      if (data->FinalizeTransferWrite(context, serializer).IsNothing())
332
1
        return Nothing<bool>();
333
11070
      msg_->AddTransferable(std::move(data));
334
    }
335
76688
    return Just(true);
336
  }
337
338
11046
  inline void AddHostObject(BaseObjectPtr<BaseObject> host_object) {
339
    // Make sure we have not started serializing the value itself yet.
340
11046
    CHECK_EQ(first_cloned_object_index_, SIZE_MAX);
341
11046
    host_objects_.emplace_back(std::move(host_object));
342
11046
  }
343
344
  // Some objects in the transfer list may register sub-objects that can be
345
  // transferred. This could e.g. be a public JS wrapper object, such as a
346
  // FileHandle, that is registering its C++ handle for transfer.
347
76712
  inline Maybe<bool> AddNestedHostObjects() {
348
87757
    for (size_t i = 0; i < host_objects_.size(); i++) {
349
11045
      std::vector<BaseObjectPtr<BaseObject>> nested_transferables;
350
22090
      if (!host_objects_[i]->NestedTransferables().To(&nested_transferables))
351
        return Nothing<bool>();
352
11079
      for (auto& nested_transferable : nested_transferables) {
353
68
        if (std::find(host_objects_.begin(),
354
                      host_objects_.end(),
355
34
                      nested_transferable) == host_objects_.end()) {
356
34
          AddHostObject(nested_transferable);
357
        }
358
      }
359
    }
360
76712
    return Just(true);
361
  }
362
363
  ValueSerializer* serializer = nullptr;
364
365
 private:
366
11080
  Maybe<bool> WriteHostObject(BaseObjectPtr<BaseObject> host_object) {
367
11080
    BaseObject::TransferMode mode = host_object->GetTransferMode();
368
11080
    if (mode == BaseObject::TransferMode::kUntransferable) {
369
2
      ThrowDataCloneError(env_->clone_unsupported_type_str());
370
2
      return Nothing<bool>();
371
    }
372
373
11133
    for (uint32_t i = 0; i < host_objects_.size(); i++) {
374
11089
      if (host_objects_[i] == host_object) {
375
11034
        serializer->WriteUint32(i);
376
11034
        return Just(true);
377
      }
378
    }
379
380
44
    if (mode == BaseObject::TransferMode::kTransferable) {
381
7
      THROW_ERR_MISSING_TRANSFERABLE_IN_TRANSFER_LIST(env_);
382
7
      return Nothing<bool>();
383
    }
384
385
37
    CHECK_EQ(mode, BaseObject::TransferMode::kCloneable);
386
37
    uint32_t index = host_objects_.size();
387
37
    if (first_cloned_object_index_ == SIZE_MAX)
388
30
      first_cloned_object_index_ = index;
389
37
    serializer->WriteUint32(index);
390
37
    host_objects_.push_back(host_object);
391
37
    return Just(true);
392
  }
393
394
  Environment* env_;
395
  Local<Context> context_;
396
  Message* msg_;
397
  std::vector<Global<SharedArrayBuffer>> seen_shared_array_buffers_;
398
  std::vector<BaseObjectPtr<BaseObject>> host_objects_;
399
  size_t first_cloned_object_index_ = SIZE_MAX;
400
401
  friend class worker::Message;
402
};
403
404
}  // anonymous namespace
405
406
76722
Maybe<bool> Message::Serialize(Environment* env,
407
                               Local<Context> context,
408
                               Local<Value> input,
409
                               const TransferList& transfer_list_v,
410
                               Local<Object> source_port) {
411
153444
  HandleScope handle_scope(env->isolate());
412
76722
  Context::Scope context_scope(context);
413
414
  // Verify that we're not silently overwriting an existing message.
415
76722
  CHECK(main_message_buf_.is_empty());
416
417
153444
  SerializerDelegate delegate(env, context, this);
418
153444
  ValueSerializer serializer(env->isolate(), &delegate);
419
76722
  delegate.serializer = &serializer;
420
421
153444
  std::vector<Local<ArrayBuffer>> array_buffers;
422
87762
  for (uint32_t i = 0; i < transfer_list_v.length(); ++i) {
423
11050
    Local<Value> entry = transfer_list_v[i];
424
11050
    if (entry->IsObject()) {
425
      // See https://github.com/nodejs/node/pull/30339#issuecomment-552225353
426
      // for details.
427
      bool untransferable;
428
11050
      if (!entry.As<Object>()->HasPrivate(
429
              context,
430
11050
              env->untransferable_object_private_symbol())
431
11050
              .To(&untransferable)) {
432
        return Nothing<bool>();
433
      }
434
11050
      if (untransferable) continue;
435
    }
436
437
    // Currently, we support ArrayBuffers and BaseObjects for which
438
    // GetTransferMode() does not return kUntransferable.
439
11045
    if (entry->IsArrayBuffer()) {
440
24
      Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
441
      // If we cannot render the ArrayBuffer unusable in this Isolate,
442
      // copying the buffer will have to do.
443
      // Note that we can currently transfer ArrayBuffers even if they were
444
      // not allocated by Node’s ArrayBufferAllocator in the first place,
445
      // because we pass the underlying v8::BackingStore around rather than
446
      // raw data *and* an Isolate with a non-default ArrayBuffer allocator
447
      // is always going to outlive any Workers it creates, and so will its
448
      // allocator along with it.
449
47
      if (!ab->IsDetachable()) continue;
450
24
      if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
451
48
          array_buffers.end()) {
452
1
        ThrowDataCloneException(
453
            context,
454
            FIXED_ONE_BYTE_STRING(
455
                env->isolate(),
456
                "Transfer list contains duplicate ArrayBuffer"));
457
1
        return Nothing<bool>();
458
      }
459
      // We simply use the array index in the `array_buffers` list as the
460
      // ID that we write into the serialized buffer.
461
23
      uint32_t id = array_buffers.size();
462
23
      array_buffers.push_back(ab);
463
23
      serializer.TransferArrayBuffer(id, ab);
464
23
      continue;
465
22042
    } else if (env->base_object_ctor_template()->HasInstance(entry)) {
466
      // Check if the source MessagePort is being transferred.
467

22042
      if (!source_port.IsEmpty() && entry == source_port) {
468
1
        ThrowDataCloneException(
469
            context,
470
            FIXED_ONE_BYTE_STRING(env->isolate(),
471
                                  "Transfer list contains source port"));
472
9
        return Nothing<bool>();
473
      }
474
      BaseObjectPtr<BaseObject> host_object {
475
11020
          Unwrap<BaseObject>(entry.As<Object>()) };
476

33031
      if (env->message_port_constructor_template()->HasInstance(entry) &&
477

21981
          (!host_object ||
478
10990
           static_cast<MessagePort*>(host_object.get())->IsDetached())) {
479
7
        ThrowDataCloneException(
480
            context,
481
            FIXED_ONE_BYTE_STRING(
482
                env->isolate(),
483
                "MessagePort in transfer list is already detached"));
484
7
        return Nothing<bool>();
485
      }
486
22026
      if (std::find(delegate.host_objects_.begin(),
487
                    delegate.host_objects_.end(),
488
11013
                    host_object) != delegate.host_objects_.end()) {
489
1
        ThrowDataCloneException(
490
            context,
491
            String::Concat(env->isolate(),
492
                FIXED_ONE_BYTE_STRING(
493
                  env->isolate(),
494
                  "Transfer list contains duplicate "),
495
1
                entry.As<Object>()->GetConstructorName()));
496
1
        return Nothing<bool>();
497
      }
498

11012
      if (host_object && host_object->GetTransferMode() !=
499
11012
              BaseObject::TransferMode::kUntransferable) {
500
11012
        delegate.AddHostObject(host_object);
501
11012
        continue;
502
      }
503
    }
504
505
    THROW_ERR_INVALID_TRANSFER_OBJECT(env);
506
    return Nothing<bool>();
507
  }
508
153424
  if (delegate.AddNestedHostObjects().IsNothing())
509
    return Nothing<bool>();
510
511
76712
  serializer.WriteHeader();
512
153424
  if (serializer.WriteValue(context, input).IsNothing()) {
513
19
    return Nothing<bool>();
514
  }
515
516
76708
  for (Local<ArrayBuffer> ab : array_buffers) {
517
    // If serialization succeeded, we render it inaccessible in this Isolate.
518
30
    std::shared_ptr<BackingStore> backing_store = ab->GetBackingStore();
519
15
    ab->Detach();
520
521
15
    array_buffers_.emplace_back(std::move(backing_store));
522
  }
523
524
153386
  if (delegate.Finish(context).IsNothing())
525
5
    return Nothing<bool>();
526
527
  // The serializer gave us a buffer allocated using `malloc()`.
528
76688
  std::pair<uint8_t*, size_t> data = serializer.Release();
529
76688
  CHECK_NOT_NULL(data.first);
530
  main_message_buf_ =
531
76688
      MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
532
76688
  return Just(true);
533
}
534
535
2
void Message::MemoryInfo(MemoryTracker* tracker) const {
536
2
  tracker->TrackField("array_buffers_", array_buffers_);
537
2
  tracker->TrackField("shared_array_buffers", shared_array_buffers_);
538
2
  tracker->TrackField("transferables", transferables_);
539
2
}
540
541
35677
MessagePortData::MessagePortData(MessagePort* owner)
542
35677
    : owner_(owner) {
543
35677
}
544
545
142524
MessagePortData::~MessagePortData() {
546
71262
  CHECK_NULL(owner_);
547
71262
  Disentangle();
548
142524
}
549
550
6
void MessagePortData::MemoryInfo(MemoryTracker* tracker) const {
551
12
  Mutex::ScopedLock lock(mutex_);
552
6
  tracker->TrackField("incoming_messages", incoming_messages_);
553
6
}
554
555
112819
void MessagePortData::AddToIncomingQueue(std::shared_ptr<Message> message) {
556
  // This function will be called by other threads.
557
225638
  Mutex::ScopedLock lock(mutex_);
558
112819
  incoming_messages_.emplace_back(std::move(message));
559
560
112819
  if (owner_ != nullptr) {
561
85860
    Debug(owner_, "Adding message to incoming queue");
562
85860
    owner_->TriggerAsync();
563
  }
564
112819
}
565
566
12059
void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
567
12059
  auto group = std::make_shared<SiblingGroup>();
568
12059
  group->Entangle({a, b});
569
12059
}
570
571
59297
void MessagePortData::Disentangle() {
572
59297
  if (group_) {
573
24146
    group_->Disentangle(this);
574
  }
575
59297
}
576
577
208020
MessagePort::~MessagePort() {
578
69340
  if (data_) Detach();
579
138680
}
580
581
34716
MessagePort::MessagePort(Environment* env,
582
                         Local<Context> context,
583
34716
                         Local<Object> wrap)
584
  : HandleWrap(env,
585
               wrap,
586
34716
               reinterpret_cast<uv_handle_t*>(&async_),
587
               AsyncWrap::PROVIDER_MESSAGEPORT),
588
34716
    data_(new MessagePortData(this)) {
589
49807
  auto onmessage = [](uv_async_t* handle) {
590
    // Called when data has been put into the queue.
591
49807
    MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
592
49807
    channel->OnMessage(MessageProcessingMode::kNormalOperation);
593
49803
  };
594
595
34716
  CHECK_EQ(uv_async_init(env->event_loop(),
596
                         &async_,
597
                         onmessage), 0);
598
  // Reset later to indicate success of the constructor.
599
34716
  bool succeeded = false;
600
69432
  auto cleanup = OnScopeLeave([&]() { if (!succeeded) Close(); });
601
602
  Local<Value> fn;
603
104148
  if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn))
604
    return;
605
606
34716
  if (fn->IsFunction()) {
607
34710
    Local<Function> init = fn.As<Function>();
608
69420
    if (init->Call(context, wrap, 0, nullptr).IsEmpty())
609
      return;
610
  }
611
612
  Local<Function> emit_message_fn;
613
69432
  if (!GetEmitMessageFunction(context).ToLocal(&emit_message_fn))
614
    return;
615
34716
  emit_message_fn_.Reset(env->isolate(), emit_message_fn);
616
617
34716
  succeeded = true;
618
34716
  Debug(this, "Created message port");
619
}
620
621
32979
bool MessagePort::IsDetached() const {
622

32979
  return data_ == nullptr || IsHandleClosing();
623
}
624
625
108545
void MessagePort::TriggerAsync() {
626
108545
  if (IsHandleClosing()) return;
627
108463
  CHECK_EQ(uv_async_send(&async_), 0);
628
}
629
630
34678
void MessagePort::Close(v8::Local<v8::Value> close_callback) {
631
34678
  Debug(this, "Closing message port, data set = %d", static_cast<int>(!!data_));
632
633
34678
  if (data_) {
634
    // Wrap this call with accessing the mutex, so that TriggerAsync()
635
    // can check IsHandleClosing() without race conditions.
636
69346
    Mutex::ScopedLock sibling_lock(data_->mutex_);
637
34673
    HandleWrap::Close(close_callback);
638
  } else {
639
5
    HandleWrap::Close(close_callback);
640
  }
641
34678
}
642
643
2
void MessagePort::New(const FunctionCallbackInfo<Value>& args) {
644
  // This constructor just throws an error. Unfortunately, we can’t use V8’s
645
  // ConstructorBehavior::kThrow, as that also removes the prototype from the
646
  // class (i.e. makes it behave like an arrow function).
647
2
  Environment* env = Environment::GetCurrent(args);
648
2
  THROW_ERR_CONSTRUCT_CALL_INVALID(env);
649
2
}
650
651
34716
MessagePort* MessagePort::New(
652
    Environment* env,
653
    Local<Context> context,
654
    std::unique_ptr<MessagePortData> data,
655
    std::shared_ptr<SiblingGroup> sibling_group) {
656
34716
  Context::Scope context_scope(context);
657
34716
  Local<FunctionTemplate> ctor_templ = GetMessagePortConstructorTemplate(env);
658
659
  // Construct a new instance, then assign the listener instance and possibly
660
  // the MessagePortData to it.
661
  Local<Object> instance;
662
104148
  if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance))
663
    return nullptr;
664
34716
  MessagePort* port = new MessagePort(env, context, instance);
665
34716
  CHECK_NOT_NULL(port);
666
34716
  if (port->IsHandleClosing()) {
667
    // Construction failed with an exception.
668
    return nullptr;
669
  }
670
671
34716
  if (data) {
672
11485
    CHECK(!sibling_group);
673
11485
    port->Detach();
674
11485
    port->data_ = std::move(data);
675
676
    // This lock is here to avoid race conditions with the `owner_` read
677
    // in AddToIncomingQueue(). (This would likely be unproblematic without it,
678
    // but it's better to be safe than sorry.)
679
22970
    Mutex::ScopedLock lock(port->data_->mutex_);
680
11485
    port->data_->owner_ = port;
681
    // If the existing MessagePortData object had pending messages, this is
682
    // the easiest way to run that queue.
683
11485
    port->TriggerAsync();
684
23231
  } else if (sibling_group) {
685
74
    sibling_group->Entangle(port->data_.get());
686
  }
687
34716
  return port;
688
}
689
690
125444
MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
691
                                              MessageProcessingMode mode,
692
                                              Local<Value>* port_list) {
693
125444
  std::shared_ptr<Message> received;
694
  {
695
    // Get the head of the message queue.
696
125444
    Mutex::ScopedLock lock(data_->mutex_);
697
698
125444
    Debug(this, "MessagePort has message");
699
700
125444
    bool wants_message =
701

125444
        receiving_messages_ ||
702
        mode == MessageProcessingMode::kForceReadMessages;
703
    // We have nothing to do if:
704
    // - There are no pending messages
705
    // - We are not intending to receive messages, and the message we would
706
    //   receive is not the final "close" message.
707

212775
    if (data_->incoming_messages_.empty() ||
708
87331
        (!wants_message &&
709
10283
         !data_->incoming_messages_.front()->IsCloseMessage())) {
710
76248
      return env()->no_message_symbol();
711
    }
712
713
87320
    received = data_->incoming_messages_.front();
714
87320
    data_->incoming_messages_.pop_front();
715
  }
716
717
87320
  if (received->IsCloseMessage()) {
718
23860
    Close();
719
23860
    return env()->no_message_symbol();
720
  }
721
722
75390
  if (!env()->can_call_into_js()) return MaybeLocal<Value>();
723
724
75390
  return received->Deserialize(env(), context, port_list);
725
}
726
727
50296
void MessagePort::OnMessage(MessageProcessingMode mode) {
728
50296
  Debug(this, "Running MessagePort::OnMessage()");
729
50296
  HandleScope handle_scope(env()->isolate());
730
  Local<Context> context =
731
100592
      object(env()->isolate())->GetCreationContext().ToLocalChecked();
732
733
  size_t processing_limit;
734
50296
  if (mode == MessageProcessingMode::kNormalOperation) {
735
49807
    Mutex::ScopedLock(data_->mutex_);
736
49807
    processing_limit = std::max(data_->incoming_messages_.size(),
737
99614
                                static_cast<size_t>(1000));
738
  } else {
739
489
    processing_limit = std::numeric_limits<size_t>::max();
740
  }
741
742
  // data_ can only ever be modified by the owner thread, so no need to lock.
743
  // However, the message port may be transferred while it is processing
744
  // messages, so we need to check that this handle still owns its `data_` field
745
  // on every iteration.
746
125404
  while (data_) {
747
125404
    if (processing_limit-- == 0) {
748
      // Prevent event loop starvation by only processing those messages without
749
      // interruption that were already present when the OnMessage() call was
750
      // first triggered, but at least 1000 messages because otherwise the
751
      // overhead of repeatedly triggering the uv_async_t instance becomes
752
      // noticeable, at least on Windows.
753
      // (That might require more investigation by somebody more familiar with
754
      // Windows.)
755
1
      TriggerAsync();
756
255
      return;
757
    }
758
759
125403
    HandleScope handle_scope(env()->isolate());
760
125403
    Context::Scope context_scope(context);
761
125403
    Local<Function> emit_message = PersistentToLocal::Strong(emit_message_fn_);
762
763
    Local<Value> payload;
764
250806
    Local<Value> port_list = Undefined(env()->isolate());
765
    Local<Value> message_error;
766
501612
    Local<Value> argv[3];
767
768
    {
769
      // Catch any exceptions from parsing the message itself (not from
770
      // emitting it) as 'messageeror' events.
771
125403
      TryCatchScope try_catch(env());
772
250806
      if (!ReceiveMessage(context, mode, &port_list).ToLocal(&payload)) {
773

16
        if (try_catch.HasCaught() && !try_catch.HasTerminated())
774
13
          message_error = try_catch.Exception();
775
16
        goto reschedule;
776
      }
777
    }
778
250774
    if (payload == env()->no_message_symbol()) break;
779
780
75350
    if (!env()->can_call_into_js()) {
781
      Debug(this, "MessagePort drains queue because !can_call_into_js()");
782
      // In this case there is nothing to do but to drain the current queue.
783
      continue;
784
    }
785
786
75350
    argv[0] = payload;
787
75350
    argv[1] = port_list;
788
75350
    argv[2] = env()->message_string();
789
790
150696
    if (MakeCallback(emit_message, arraysize(argv), argv).IsEmpty()) {
791
254
    reschedule:
792
254
      if (!message_error.IsEmpty()) {
793
13
        argv[0] = message_error;
794
13
        argv[1] = Undefined(env()->isolate());
795
13
        argv[2] = env()->messageerror_string();
796
13
        USE(MakeCallback(emit_message, arraysize(argv), argv));
797
      }
798
799
      // Re-schedule OnMessage() execution in case of failure.
800
254
      if (data_)
801
254
        TriggerAsync();
802
254
      return;
803
    }
804
  }
805
}
806
807
34670
void MessagePort::OnClose() {
808
34670
  Debug(this, "MessagePort::OnClose()");
809
34670
  if (data_) {
810
    // Detach() returns move(data_).
811
23666
    Detach()->Disentangle();
812
  }
813
34670
}
814
815
46155
std::unique_ptr<MessagePortData> MessagePort::Detach() {
816
46155
  CHECK(data_);
817
92310
  Mutex::ScopedLock lock(data_->mutex_);
818
46155
  data_->owner_ = nullptr;
819
46155
  return std::move(data_);
820
}
821
822
21984
BaseObject::TransferMode MessagePort::GetTransferMode() const {
823
21984
  if (IsDetached())
824
1
    return BaseObject::TransferMode::kUntransferable;
825
21983
  return BaseObject::TransferMode::kTransferable;
826
}
827
828
10999
std::unique_ptr<TransferData> MessagePort::TransferForMessaging() {
829
21998
  Close();
830
10999
  return Detach();
831
}
832
833
10754
BaseObjectPtr<BaseObject> MessagePortData::Deserialize(
834
    Environment* env,
835
    Local<Context> context,
836
    std::unique_ptr<TransferData> self) {
837
43016
  return BaseObjectPtr<MessagePort> { MessagePort::New(
838
      env, context,
839
32262
      static_unique_pointer_cast<MessagePortData>(std::move(self))) };
840
}
841
842
76715
Maybe<bool> MessagePort::PostMessage(Environment* env,
843
                                     Local<Context> context,
844
                                     Local<Value> message_v,
845
                                     const TransferList& transfer_v) {
846
76715
  Isolate* isolate = env->isolate();
847
76715
  Local<Object> obj = object(isolate);
848
849
153430
  std::shared_ptr<Message> msg = std::make_shared<Message>();
850
851
  // Per spec, we need to both check if transfer list has the source port, and
852
  // serialize the input message, even if the MessagePort is closed or detached.
853
854
  Maybe<bool> serialization_maybe =
855
76715
      msg->Serialize(env, context, message_v, transfer_v, obj);
856
76715
  if (data_ == nullptr) {
857
    return serialization_maybe;
858
  }
859
76715
  if (serialization_maybe.IsNothing()) {
860
28
    return Nothing<bool>();
861
  }
862
863
153374
  std::string error;
864
76687
  Maybe<bool> res = data_->Dispatch(msg, &error);
865
76687
  if (res.IsNothing())
866
    return res;
867
868
76687
  if (!error.empty())
869
2
    ProcessEmitWarning(env, error.c_str());
870
871
76687
  return res;
872
}
873
874
76687
Maybe<bool> MessagePortData::Dispatch(
875
    std::shared_ptr<Message> message,
876
    std::string* error) {
877
76687
  if (!group_) {
878
    if (error != nullptr)
879
      *error = "MessagePortData is not entangled.";
880
    return Nothing<bool>();
881
  }
882
76687
  return group_->Dispatch(this, message, error);
883
}
884
885
11058
static Maybe<bool> ReadIterable(Environment* env,
886
                                Local<Context> context,
887
                                // NOLINTNEXTLINE(runtime/references)
888
                                TransferList& transfer_list,
889
                                Local<Value> object) {
890
11058
  if (!object->IsObject()) return Just(false);
891
892
11055
  if (object->IsArray()) {
893
11035
    Local<Array> arr = object.As<Array>();
894
11035
    size_t length = arr->Length();
895
11035
    transfer_list.AllocateSufficientStorage(length);
896
22083
    for (size_t i = 0; i < length; i++) {
897
22096
      if (!arr->Get(context, i).ToLocal(&transfer_list[i]))
898
        return Nothing<bool>();
899
    }
900
11035
    return Just(true);
901
  }
902
903
20
  Isolate* isolate = env->isolate();
904
  Local<Value> iterator_method;
905
40
  if (!object.As<Object>()->Get(context, Symbol::GetIterator(isolate))
906
20
      .ToLocal(&iterator_method)) return Nothing<bool>();
907
20
  if (!iterator_method->IsFunction()) return Just(false);
908
909
  Local<Value> iterator;
910
6
  if (!iterator_method.As<Function>()->Call(context, object, 0, nullptr)
911
6
      .ToLocal(&iterator)) return Nothing<bool>();
912
6
  if (!iterator->IsObject()) return Just(false);
913
914
  Local<Value> next;
915
18
  if (!iterator.As<Object>()->Get(context, env->next_string()).ToLocal(&next))
916
    return Nothing<bool>();
917
6
  if (!next->IsFunction()) return Just(false);
918
919
6
  std::vector<Local<Value>> entries;
920
5
  while (env->can_call_into_js()) {
921
    Local<Value> result;
922
5
    if (!next.As<Function>()->Call(context, iterator, 0, nullptr)
923
5
        .ToLocal(&result)) return Nothing<bool>();
924
4
    if (!result->IsObject()) return Just(false);
925
926
    Local<Value> done;
927
12
    if (!result.As<Object>()->Get(context, env->done_string()).ToLocal(&done))
928
      return Nothing<bool>();
929
4
    if (done->BooleanValue(isolate)) break;
930
931
    Local<Value> val;
932
6
    if (!result.As<Object>()->Get(context, env->value_string()).ToLocal(&val))
933
      return Nothing<bool>();
934
2
    entries.push_back(val);
935
  }
936
937
2
  transfer_list.AllocateSufficientStorage(entries.size());
938
2
  std::copy(entries.begin(), entries.end(), &transfer_list[0]);
939
2
  return Just(true);
940
}
941
942
76734
void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
943
76734
  Environment* env = Environment::GetCurrent(args);
944
76734
  Local<Object> obj = args.This();
945
153468
  Local<Context> context = obj->GetCreationContext().ToLocalChecked();
946
947
76734
  if (args.Length() == 0) {
948
    return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
949
19
                                       "MessagePort.postMessage");
950
  }
951
952

164519
  if (!args[1]->IsNullOrUndefined() && !args[1]->IsObject()) {
953
    // Browsers ignore null or undefined, and otherwise accept an array or an
954
    // options object.
955
4
    return THROW_ERR_INVALID_ARG_TYPE(env,
956
4
        "Optional transferList argument must be an iterable");
957
  }
958
959
76730
  TransferList transfer_list;
960
76730
  if (args[1]->IsObject()) {
961
    bool was_iterable;
962
22094
    if (!ReadIterable(env, context, transfer_list, args[1]).To(&was_iterable))
963
8
      return;
964
11047
    if (!was_iterable) {
965
      Local<Value> transfer_option;
966
39
      if (!args[1].As<Object>()->Get(context, env->transfer_string())
967
21
          .ToLocal(&transfer_option)) return;
968
26
      if (!transfer_option->IsUndefined()) {
969
11
        if (!ReadIterable(env, context, transfer_list, transfer_option)
970
11
            .To(&was_iterable)) return;
971
10
        if (!was_iterable) {
972
7
          return THROW_ERR_INVALID_ARG_TYPE(env,
973
7
              "Optional options.transfer argument must be an iterable");
974
        }
975
      }
976
    }
977
  }
978
979
76722
  MessagePort* port = Unwrap<MessagePort>(args.This());
980
  // Even if the backing MessagePort object has already been deleted, we still
981
  // want to serialize the message to ensure spec-compliant behavior w.r.t.
982
  // transfers.
983

76722
  if (port == nullptr || port->IsHandleClosing()) {
984
14
    Message msg;
985
7
    USE(msg.Serialize(env, context, args[0], transfer_list, obj));
986
7
    return;
987
  }
988
989
76715
  Maybe<bool> res = port->PostMessage(env, context, args[0], transfer_list);
990
76715
  if (res.IsJust())
991
230061
    args.GetReturnValue().Set(res.FromJust());
992
}
993
994
25295
void MessagePort::Start() {
995
25295
  Debug(this, "Start receiving messages");
996
25295
  receiving_messages_ = true;
997
50590
  Mutex::ScopedLock lock(data_->mutex_);
998
25295
  if (!data_->incoming_messages_.empty())
999
10945
    TriggerAsync();
1000
25295
}
1001
1002
20343
void MessagePort::Stop() {
1003
20343
  Debug(this, "Stop receiving messages");
1004
20343
  receiving_messages_ = false;
1005
20343
}
1006
1007
25296
void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
1008
  MessagePort* port;
1009
25297
  ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
1010
25296
  if (!port->data_) {
1011
1
    return;
1012
  }
1013
25295
  port->Start();
1014
}
1015
1016
20827
void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
1017
  MessagePort* port;
1018
20827
  CHECK(args[0]->IsObject());
1019
41656
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
1020
20345
  if (!port->data_) {
1021
2
    return;
1022
  }
1023
20343
  port->Stop();
1024
}
1025
1026
27
void MessagePort::CheckType(const FunctionCallbackInfo<Value>& args) {
1027
27
  Environment* env = Environment::GetCurrent(args);
1028
27
  args.GetReturnValue().Set(
1029
81
      GetMessagePortConstructorTemplate(env)->HasInstance(args[0]));
1030
27
}
1031
1032
1860
void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
1033
  MessagePort* port;
1034
3720
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
1035
489
  port->OnMessage(MessageProcessingMode::kForceReadMessages);
1036
}
1037
1038
46
void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) {
1039
46
  Environment* env = Environment::GetCurrent(args);
1040
89
  if (!args[0]->IsObject() ||
1041

175
      !env->message_port_constructor_template()->HasInstance(args[0])) {
1042
5
    return THROW_ERR_INVALID_ARG_TYPE(env,
1043
5
        "The \"port\" argument must be a MessagePort instance");
1044
  }
1045
82
  MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1046
41
  if (port == nullptr) {
1047
    // Return 'no messages' for a closed port.
1048
    args.GetReturnValue().Set(
1049
        Environment::GetCurrent(args)->no_message_symbol());
1050
    return;
1051
  }
1052
1053
  MaybeLocal<Value> payload = port->ReceiveMessage(
1054
123
      port->object()->GetCreationContext().ToLocalChecked(),
1055
41
      MessageProcessingMode::kForceReadMessages);
1056
41
  if (!payload.IsEmpty())
1057
82
    args.GetReturnValue().Set(payload.ToLocalChecked());
1058
}
1059
1060
6
void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
1061
6
  Environment* env = Environment::GetCurrent(args);
1062
12
  if (!args[0]->IsObject() ||
1063

24
      !env->message_port_constructor_template()->HasInstance(args[0])) {
1064
    return THROW_ERR_INVALID_ARG_TYPE(env,
1065
1
        "The \"port\" argument must be a MessagePort instance");
1066
  }
1067
12
  MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1068

6
  if (port == nullptr || port->IsHandleClosing()) {
1069
1
    Isolate* isolate = env->isolate();
1070
1
    THROW_ERR_CLOSED_MESSAGE_PORT(isolate);
1071
1
    return;
1072
  }
1073
1074
5
  Local<Value> context_arg = args[1];
1075
  ContextifyContext* context_wrapper;
1076

10
  if (!context_arg->IsObject() ||
1077
5
      (context_wrapper = ContextifyContext::ContextFromContextifiedSandbox(
1078
10
          env, context_arg.As<Object>())) == nullptr) {
1079
    return THROW_ERR_INVALID_ARG_TYPE(env, "Invalid context argument");
1080
  }
1081
1082
5
  std::unique_ptr<MessagePortData> data;
1083
5
  if (!port->IsDetached())
1084
5
    data = port->Detach();
1085
1086
5
  Context::Scope context_scope(context_wrapper->context());
1087
  MessagePort* target =
1088
5
      MessagePort::New(env, context_wrapper->context(), std::move(data));
1089
5
  if (target != nullptr)
1090
10
    args.GetReturnValue().Set(target->object());
1091
}
1092
1093
11098
void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
1094
11098
  MessagePortData::Entangle(a->data_.get(), b->data_.get());
1095
11098
}
1096
1097
961
void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
1098
961
  MessagePortData::Entangle(a->data_.get(), b);
1099
961
}
1100
1101
6
void MessagePort::MemoryInfo(MemoryTracker* tracker) const {
1102
6
  tracker->TrackField("data", data_);
1103
6
  tracker->TrackField("emit_message_fn", emit_message_fn_);
1104
6
}
1105
1106
36319
Local<FunctionTemplate> GetMessagePortConstructorTemplate(Environment* env) {
1107
  // Factor generating the MessagePort JS constructor into its own piece
1108
  // of code, because it is needed early on in the child environment setup.
1109
36319
  Local<FunctionTemplate> templ = env->message_port_constructor_template();
1110
36319
  if (!templ.IsEmpty())
1111
35531
    return templ;
1112
1113
  {
1114
788
    Isolate* isolate = env->isolate();
1115
788
    Local<FunctionTemplate> m = NewFunctionTemplate(isolate, MessagePort::New);
1116
788
    m->SetClassName(env->message_port_constructor_string());
1117
1576
    m->InstanceTemplate()->SetInternalFieldCount(
1118
        MessagePort::kInternalFieldCount);
1119
788
    m->Inherit(HandleWrap::GetConstructorTemplate(env));
1120
1121
788
    SetProtoMethod(isolate, m, "postMessage", MessagePort::PostMessage);
1122
788
    SetProtoMethod(isolate, m, "start", MessagePort::Start);
1123
1124
788
    env->set_message_port_constructor_template(m);
1125
  }
1126
1127
788
  return GetMessagePortConstructorTemplate(env);
1128
}
1129
1130
38307
JSTransferable::JSTransferable(Environment* env, Local<Object> obj)
1131
38307
    : BaseObject(env, obj) {
1132
38307
  MakeWeak();
1133
38307
}
1134
1135
38307
void JSTransferable::New(const FunctionCallbackInfo<Value>& args) {
1136
38307
  CHECK(args.IsConstructCall());
1137
76614
  new JSTransferable(Environment::GetCurrent(args), args.This());
1138
38307
}
1139
1140
86
JSTransferable::TransferMode JSTransferable::GetTransferMode() const {
1141
  // Implement `kClone in this ? kCloneable : kTransferable`.
1142
172
  HandleScope handle_scope(env()->isolate());
1143
172
  errors::TryCatchScope ignore_exceptions(env());
1144
1145
  bool has_clone;
1146
86
  if (!object()->Has(env()->context(),
1147
258
                     env()->messaging_clone_symbol()).To(&has_clone)) {
1148
    return TransferMode::kUntransferable;
1149
  }
1150
1151
86
  return has_clone ? TransferMode::kCloneable : TransferMode::kTransferable;
1152
}
1153
1154
33
std::unique_ptr<TransferData> JSTransferable::TransferForMessaging() {
1155
33
  return TransferOrClone(TransferMode::kTransferable);
1156
}
1157
1158
24
std::unique_ptr<TransferData> JSTransferable::CloneForMessaging() const {
1159
24
  return TransferOrClone(TransferMode::kCloneable);
1160
}
1161
1162
57
std::unique_ptr<TransferData> JSTransferable::TransferOrClone(
1163
    TransferMode mode) const {
1164
  // Call `this[symbol]()` where `symbol` is `kClone` or `kTransfer`,
1165
  // which should return an object with `data` and `deserializeInfo` properties;
1166
  // `data` is written to the serializer later, and `deserializeInfo` is stored
1167
  // on the `TransferData` instance as a string.
1168
114
  HandleScope handle_scope(env()->isolate());
1169
57
  Local<Context> context = env()->isolate()->GetCurrentContext();
1170
  Local<Symbol> method_name = mode == TransferMode::kCloneable ?
1171
57
      env()->messaging_clone_symbol() : env()->messaging_transfer_symbol();
1172
1173
  Local<Value> method;
1174
171
  if (!object()->Get(context, method_name).ToLocal(&method)) {
1175
    return {};
1176
  }
1177
57
  if (method->IsFunction()) {
1178
    Local<Value> result_v;
1179
53
    if (!method.As<Function>()->Call(
1180
159
            context, object(), 0, nullptr).ToLocal(&result_v)) {
1181
53
      return {};
1182
    }
1183
1184
49
    if (result_v->IsObject()) {
1185
49
      Local<Object> result = result_v.As<Object>();
1186
      Local<Value> data;
1187
      Local<Value> deserialize_info;
1188
196
      if (!result->Get(context, env()->data_string()).ToLocal(&data) ||
1189
147
          !result->Get(context, env()->deserialize_info_string())
1190
49
              .ToLocal(&deserialize_info)) {
1191
        return {};
1192
      }
1193
98
      Utf8Value deserialize_info_str(env()->isolate(), deserialize_info);
1194
49
      if (*deserialize_info_str == nullptr) return {};
1195
49
      return std::make_unique<Data>(
1196
196
          *deserialize_info_str, Global<Value>(env()->isolate(), data));
1197
    }
1198
  }
1199
1200
4
  if (mode == TransferMode::kTransferable)
1201
    return TransferOrClone(TransferMode::kCloneable);
1202
  else
1203
4
    return {};
1204
}
1205
1206
Maybe<BaseObjectList>
1207
33
JSTransferable::NestedTransferables() const {
1208
  // Call `this[kTransferList]()` and return the resulting list of BaseObjects.
1209
66
  HandleScope handle_scope(env()->isolate());
1210
33
  Local<Context> context = env()->isolate()->GetCurrentContext();
1211
33
  Local<Symbol> method_name = env()->messaging_transfer_list_symbol();
1212
1213
  Local<Value> method;
1214
99
  if (!object()->Get(context, method_name).ToLocal(&method)) {
1215
    return Nothing<BaseObjectList>();
1216
  }
1217
33
  if (!method->IsFunction()) return Just(BaseObjectList {});
1218
1219
  Local<Value> list_v;
1220
33
  if (!method.As<Function>()->Call(
1221
99
          context, object(), 0, nullptr).ToLocal(&list_v)) {
1222
    return Nothing<BaseObjectList>();
1223
  }
1224
33
  if (!list_v->IsArray()) return Just(BaseObjectList {});
1225
33
  Local<Array> list = list_v.As<Array>();
1226
1227
66
  BaseObjectList ret;
1228
134
  for (size_t i = 0; i < list->Length(); i++) {
1229
    Local<Value> value;
1230
68
    if (!list->Get(context, i).ToLocal(&value))
1231
      return Nothing<BaseObjectList>();
1232
68
    if (env()->base_object_ctor_template()->HasInstance(value))
1233
34
      ret.emplace_back(Unwrap<BaseObject>(value));
1234
  }
1235
33
  return Just(ret);
1236
}
1237
1238
32
Maybe<bool> JSTransferable::FinalizeTransferRead(
1239
    Local<Context> context, ValueDeserializer* deserializer) {
1240
  // Call `this[kDeserialize](data)` where `data` comes from the return value
1241
  // of `this[kTransfer]()` or `this[kClone]()`.
1242
64
  HandleScope handle_scope(env()->isolate());
1243
  Local<Value> data;
1244
64
  if (!deserializer->ReadValue(context).ToLocal(&data)) return Nothing<bool>();
1245
1246
32
  Local<Symbol> method_name = env()->messaging_deserialize_symbol();
1247
  Local<Value> method;
1248
96
  if (!object()->Get(context, method_name).ToLocal(&method)) {
1249
    return Nothing<bool>();
1250
  }
1251
32
  if (!method->IsFunction()) return Just(true);
1252
1253
96
  if (method.As<Function>()->Call(context, object(), 1, &data).IsEmpty()) {
1254
    return Nothing<bool>();
1255
  }
1256
32
  return Just(true);
1257
}
1258
1259
49
JSTransferable::Data::Data(std::string&& deserialize_info,
1260
49
                           v8::Global<v8::Value>&& data)
1261
49
    : deserialize_info_(std::move(deserialize_info)),
1262
49
      data_(std::move(data)) {}
1263
1264
42
BaseObjectPtr<BaseObject> JSTransferable::Data::Deserialize(
1265
    Environment* env,
1266
    Local<Context> context,
1267
    std::unique_ptr<TransferData> self) {
1268
  // Create the JS wrapper object that will later be filled with data passed to
1269
  // the `[kDeserialize]()` method on it. This split is necessary, because here
1270
  // we need to create an object with the right prototype and internal fields,
1271
  // but the actual JS data stored in the serialized data can only be read at
1272
  // the end of the stream, after the main message has been read.
1273
1274
84
  if (context != env->context()) {
1275
1
    THROW_ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE(env);
1276
1
    return {};
1277
  }
1278
82
  HandleScope handle_scope(env->isolate());
1279
  Local<Value> info;
1280
82
  if (!ToV8Value(context, deserialize_info_).ToLocal(&info)) return {};
1281
1282
  Local<Value> ret;
1283
82
  CHECK(!env->messaging_deserialize_create_object().IsEmpty());
1284
41
  if (!env->messaging_deserialize_create_object()->Call(
1285
155
          context, Null(env->isolate()), 1, &info).ToLocal(&ret) ||
1286

105
      !env->base_object_ctor_template()->HasInstance(ret)) {
1287
9
    return {};
1288
  }
1289
1290
32
  return BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(ret) };
1291
}
1292
1293
49
Maybe<bool> JSTransferable::Data::FinalizeTransferWrite(
1294
    Local<Context> context, ValueSerializer* serializer) {
1295
49
  HandleScope handle_scope(context->GetIsolate());
1296
49
  auto ret = serializer->WriteValue(context, PersistentToLocal::Strong(data_));
1297
49
  data_.Reset();
1298
49
  return ret;
1299
}
1300
1301
74
std::shared_ptr<SiblingGroup> SiblingGroup::Get(const std::string& name) {
1302
148
  Mutex::ScopedLock lock(SiblingGroup::groups_mutex_);
1303
74
  std::shared_ptr<SiblingGroup> group;
1304
74
  auto i = groups_.find(name);
1305

74
  if (i == groups_.end() || i->second.expired()) {
1306
31
    group = std::make_shared<SiblingGroup>(name);
1307
31
    groups_[name] = group;
1308
  } else {
1309
43
    group = i->second.lock();
1310
  }
1311
74
  return group;
1312
}
1313
1314
30
void SiblingGroup::CheckSiblingGroup(const std::string& name) {
1315
60
  Mutex::ScopedLock lock(SiblingGroup::groups_mutex_);
1316
30
  auto i = groups_.find(name);
1317

30
  if (i != groups_.end() && i->second.expired())
1318
30
    groups_.erase(name);
1319
30
}
1320
1321
31
SiblingGroup::SiblingGroup(const std::string& name)
1322
31
    : name_(name) { }
1323
1324
12044
SiblingGroup::~SiblingGroup() {
1325
  // If this is a named group, check to see if we can remove the group
1326
12044
  if (!name_.empty())
1327
30
    CheckSiblingGroup(name_);
1328
12044
}
1329
1330
76687
Maybe<bool> SiblingGroup::Dispatch(
1331
    MessagePortData* source,
1332
    std::shared_ptr<Message> message,
1333
    std::string* error) {
1334
1335
153374
  RwLock::ScopedReadLock lock(group_mutex_);
1336
1337
  // The source MessagePortData is not part of this group.
1338
76687
  if (ports_.find(source) == ports_.end()) {
1339
    if (error != nullptr)
1340
      *error = "Source MessagePort is not entangled with this group.";
1341
    return Nothing<bool>();
1342
  }
1343
1344
  // There are no destination ports.
1345
76687
  if (size() <= 1)
1346
94
    return Just(false);
1347
1348
  // Transferables cannot be used when there is more
1349
  // than a single destination.
1350

76593
  if (size() > 2 && message->has_transferables()) {
1351
    if (error != nullptr)
1352
      *error = "Transferables cannot be used with multiple destinations.";
1353
    return Nothing<bool>();
1354
  }
1355
1356
229798
  for (MessagePortData* port : ports_) {
1357
153207
    if (port == source)
1358
76592
      continue;
1359
    // This loop should only be entered if there's only a single destination
1360
87681
    for (const auto& transferable : message->transferables()) {
1361
11068
      if (port == transferable.get()) {
1362
2
        if (error != nullptr) {
1363
          *error = "The target port was posted to itself, and the "
1364
2
                   "communication channel was lost";
1365
        }
1366
2
        return Just(true);
1367
      }
1368
    }
1369
76613
    port->AddToIncomingQueue(message);
1370
  }
1371
1372
76591
  return Just(true);
1373
}
1374
1375
74
void SiblingGroup::Entangle(MessagePortData* port) {
1376
74
  Entangle({ port });
1377
74
}
1378
1379
12133
void SiblingGroup::Entangle(std::initializer_list<MessagePortData*> ports) {
1380
24266
  RwLock::ScopedWriteLock lock(group_mutex_);
1381
36325
  for (MessagePortData* data : ports) {
1382
24192
    ports_.insert(data);
1383
24192
    CHECK(!data->group_);
1384
24192
    data->group_ = shared_from_this();
1385
  }
1386
12133
}
1387
1388
24146
void SiblingGroup::Disentangle(MessagePortData* data) {
1389
48292
  auto self = shared_from_this();  // Keep alive until end of function.
1390
48292
  RwLock::ScopedWriteLock lock(group_mutex_);
1391
24146
  ports_.erase(data);
1392
24146
  data->group_.reset();
1393
1394
24146
  data->AddToIncomingQueue(std::make_shared<Message>());
1395
  // If this is an anonymous group and there's another port, close it.
1396

24146
  if (size() == 1 && name_.empty())
1397
12060
    (*(ports_.begin()))->AddToIncomingQueue(std::make_shared<Message>());
1398
24146
}
1399
1400
SiblingGroup::Map SiblingGroup::groups_;
1401
Mutex SiblingGroup::groups_mutex_;
1402
1403
namespace {
1404
1405
788
static void SetDeserializerCreateObjectFunction(
1406
    const FunctionCallbackInfo<Value>& args) {
1407
788
  Environment* env = Environment::GetCurrent(args);
1408
788
  CHECK(args[0]->IsFunction());
1409
1576
  env->set_messaging_deserialize_create_object(args[0].As<Function>());
1410
788
}
1411
1412
11099
static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
1413
11099
  Environment* env = Environment::GetCurrent(args);
1414
11099
  if (!args.IsConstructCall()) {
1415
1
    THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
1416
1
    return;
1417
  }
1418
1419
22196
  Local<Context> context = args.This()->GetCreationContext().ToLocalChecked();
1420
11098
  Context::Scope context_scope(context);
1421
1422
11098
  MessagePort* port1 = MessagePort::New(env, context);
1423
11098
  if (port1 == nullptr) return;
1424
11098
  MessagePort* port2 = MessagePort::New(env, context);
1425
11098
  if (port2 == nullptr) {
1426
    port1->Close();
1427
    return;
1428
  }
1429
1430
11098
  MessagePort::Entangle(port1, port2);
1431
1432
44392
  args.This()->Set(context, env->port1_string(), port1->object())
1433
      .Check();
1434
44392
  args.This()->Set(context, env->port2_string(), port2->object())
1435
      .Check();
1436
}
1437
1438
74
static void BroadcastChannel(const FunctionCallbackInfo<Value>& args) {
1439
148
  CHECK(args[0]->IsString());
1440
74
  Environment* env = Environment::GetCurrent(args);
1441
148
  Context::Scope context_scope(env->context());
1442
148
  Utf8Value name(env->isolate(), args[0]);
1443
  MessagePort* port =
1444
74
      MessagePort::New(env, env->context(), {}, SiblingGroup::Get(*name));
1445
74
  if (port != nullptr) {
1446
148
    args.GetReturnValue().Set(port->object());
1447
  }
1448
74
}
1449
1450
788
static void InitMessaging(Local<Object> target,
1451
                          Local<Value> unused,
1452
                          Local<Context> context,
1453
                          void* priv) {
1454
788
  Environment* env = Environment::GetCurrent(context);
1455
788
  Isolate* isolate = env->isolate();
1456
1457
  {
1458
788
    SetConstructorFunction(context,
1459
                           target,
1460
                           "MessageChannel",
1461
                           NewFunctionTemplate(isolate, MessageChannel));
1462
  }
1463
1464
  {
1465
    Local<FunctionTemplate> t =
1466
788
        NewFunctionTemplate(isolate, JSTransferable::New);
1467
788
    t->Inherit(BaseObject::GetConstructorTemplate(env));
1468
1576
    t->InstanceTemplate()->SetInternalFieldCount(
1469
        JSTransferable::kInternalFieldCount);
1470
788
    SetConstructorFunction(context, target, "JSTransferable", t);
1471
  }
1472
1473
788
  SetConstructorFunction(context,
1474
                         target,
1475
                         env->message_port_constructor_string(),
1476
                         GetMessagePortConstructorTemplate(env));
1477
1478
  // These are not methods on the MessagePort prototype, because
1479
  // the browser equivalents do not provide them.
1480
788
  SetMethod(context, target, "stopMessagePort", MessagePort::Stop);
1481
788
  SetMethod(context, target, "checkMessagePort", MessagePort::CheckType);
1482
788
  SetMethod(context, target, "drainMessagePort", MessagePort::Drain);
1483
788
  SetMethod(
1484
      context, target, "receiveMessageOnPort", MessagePort::ReceiveMessage);
1485
788
  SetMethod(
1486
      context, target, "moveMessagePortToContext", MessagePort::MoveToContext);
1487
788
  SetMethod(context,
1488
            target,
1489
            "setDeserializerCreateObjectFunction",
1490
            SetDeserializerCreateObjectFunction);
1491
788
  SetMethod(context, target, "broadcastChannel", BroadcastChannel);
1492
1493
  {
1494
1576
    Local<Function> domexception = GetDOMException(context).ToLocalChecked();
1495
    target
1496
788
        ->Set(context,
1497
              FIXED_ONE_BYTE_STRING(env->isolate(), "DOMException"),
1498
1576
              domexception)
1499
        .Check();
1500
  }
1501
788
}
1502
1503
5528
static void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
1504
5528
  registry->Register(MessageChannel);
1505
5528
  registry->Register(BroadcastChannel);
1506
5528
  registry->Register(JSTransferable::New);
1507
5528
  registry->Register(MessagePort::New);
1508
5528
  registry->Register(MessagePort::PostMessage);
1509
5528
  registry->Register(MessagePort::Start);
1510
5528
  registry->Register(MessagePort::Stop);
1511
5528
  registry->Register(MessagePort::CheckType);
1512
5528
  registry->Register(MessagePort::Drain);
1513
5528
  registry->Register(MessagePort::ReceiveMessage);
1514
5528
  registry->Register(MessagePort::MoveToContext);
1515
5528
  registry->Register(SetDeserializerCreateObjectFunction);
1516
5528
}
1517
1518
}  // anonymous namespace
1519
1520
}  // namespace worker
1521
}  // namespace node
1522
1523
5598
NODE_MODULE_CONTEXT_AWARE_INTERNAL(messaging, node::worker::InitMessaging)
1524
5528
NODE_MODULE_EXTERNAL_REFERENCE(messaging,
1525
                               node::worker::RegisterExternalReferences)