GCC Code Coverage Report
Directory: ./ Exec Total Coverage
File: node_messaging.cc Lines: 762 820 92.9 %
Date: 2022-12-31 04:22:30 Branches: 381 518 73.6 %

Line Branch Exec Source
1
#include "node_messaging.h"
2
3
#include "async_wrap-inl.h"
4
#include "debug_utils-inl.h"
5
#include "memory_tracker-inl.h"
6
#include "node_buffer.h"
7
#include "node_contextify.h"
8
#include "node_errors.h"
9
#include "node_external_reference.h"
10
#include "node_process-inl.h"
11
#include "util-inl.h"
12
13
using node::contextify::ContextifyContext;
14
using node::errors::TryCatchScope;
15
using v8::Array;
16
using v8::ArrayBuffer;
17
using v8::BackingStore;
18
using v8::CompiledWasmModule;
19
using v8::Context;
20
using v8::EscapableHandleScope;
21
using v8::Function;
22
using v8::FunctionCallbackInfo;
23
using v8::FunctionTemplate;
24
using v8::Global;
25
using v8::HandleScope;
26
using v8::Isolate;
27
using v8::Just;
28
using v8::Local;
29
using v8::Maybe;
30
using v8::MaybeLocal;
31
using v8::Nothing;
32
using v8::Object;
33
using v8::SharedArrayBuffer;
34
using v8::String;
35
using v8::Symbol;
36
using v8::Value;
37
using v8::ValueDeserializer;
38
using v8::ValueSerializer;
39
using v8::WasmModuleObject;
40
41
namespace node {
42
43
using BaseObjectList = std::vector<BaseObjectPtr<BaseObject>>;
44
45
// Hack to have WriteHostObject inform ReadHostObject that the value
46
// should be treated as a regular JS object. Used to transfer process.env.
47
static const uint32_t kNormalObject = static_cast<uint32_t>(-1);
48
49
1
BaseObject::TransferMode BaseObject::GetTransferMode() const {
50
1
  return BaseObject::TransferMode::kUntransferable;
51
}
52
53
std::unique_ptr<worker::TransferData> BaseObject::TransferForMessaging() {
54
  return CloneForMessaging();
55
}
56
57
std::unique_ptr<worker::TransferData> BaseObject::CloneForMessaging() const {
58
  return {};
59
}
60
61
11035
Maybe<BaseObjectList> BaseObject::NestedTransferables() const {
62
11035
  return Just(BaseObjectList {});
63
}
64
65
10827
Maybe<bool> BaseObject::FinalizeTransferRead(
66
    Local<Context> context, ValueDeserializer* deserializer) {
67
10827
  return Just(true);
68
}
69
70
namespace worker {
71
72
11072
Maybe<bool> TransferData::FinalizeTransferWrite(
73
    Local<Context> context, ValueSerializer* serializer) {
74
11072
  return Just(true);
75
}
76
77
116295
Message::Message(MallocedBuffer<char>&& buffer)
78
116295
    : main_message_buf_(std::move(buffer)) {}
79
80
179568
bool Message::IsCloseMessage() const {
81
179568
  return main_message_buf_.data == nullptr;
82
}
83
84
namespace {
85
86
// This is used to tell V8 how to read transferred host objects, like other
87
// `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
88
class DeserializerDelegate : public ValueDeserializer::Delegate {
89
 public:
90
78633
  DeserializerDelegate(
91
      Message* m,
92
      Environment* env,
93
      const std::vector<BaseObjectPtr<BaseObject>>& host_objects,
94
      const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
95
      const std::vector<CompiledWasmModule>& wasm_modules)
96
78633
      : host_objects_(host_objects),
97
        shared_array_buffers_(shared_array_buffers),
98
78633
        wasm_modules_(wasm_modules) {}
99
100
10883
  MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
101
    // Identifying the index in the message's BaseObject array is sufficient.
102
    uint32_t id;
103
10883
    if (!deserializer->ReadUint32(&id))
104
      return MaybeLocal<Object>();
105
10883
    if (id != kNormalObject) {
106
10882
      CHECK_LT(id, host_objects_.size());
107
21764
      return host_objects_[id]->object(isolate);
108
    }
109
1
    EscapableHandleScope scope(isolate);
110
1
    Local<Context> context = isolate->GetCurrentContext();
111
    Local<Value> object;
112
2
    if (!deserializer->ReadValue(context).ToLocal(&object))
113
      return MaybeLocal<Object>();
114
1
    CHECK(object->IsObject());
115
1
    return scope.Escape(object.As<Object>());
116
  }
117
118
785
  MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
119
      Isolate* isolate, uint32_t clone_id) override {
120
785
    CHECK_LT(clone_id, shared_array_buffers_.size());
121
1570
    return shared_array_buffers_[clone_id];
122
  }
123
124
2
  MaybeLocal<WasmModuleObject> GetWasmModuleFromId(
125
      Isolate* isolate, uint32_t transfer_id) override {
126
2
    CHECK_LT(transfer_id, wasm_modules_.size());
127
    return WasmModuleObject::FromCompiledModule(
128
2
        isolate, wasm_modules_[transfer_id]);
129
  }
130
131
  ValueDeserializer* deserializer = nullptr;
132
133
 private:
134
  const std::vector<BaseObjectPtr<BaseObject>>& host_objects_;
135
  const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
136
  const std::vector<CompiledWasmModule>& wasm_modules_;
137
};
138
139
}  // anonymous namespace
140
141
78647
MaybeLocal<Value> Message::Deserialize(Environment* env,
142
                                       Local<Context> context,
143
                                       Local<Value>* port_list) {
144
78647
  Context::Scope context_scope(context);
145
146
78647
  CHECK(!IsCloseMessage());
147

78647
  if (port_list != nullptr && !transferables_.empty()) {
148
    // Need to create this outside of the EscapableHandleScope, but inside
149
    // the Context::Scope.
150
21622
    *port_list = Array::New(env->isolate());
151
  }
152
153
78647
  EscapableHandleScope handle_scope(env->isolate());
154
155
  // Create all necessary objects for transferables, e.g. MessagePort handles.
156
157294
  std::vector<BaseObjectPtr<BaseObject>> host_objects(transferables_.size());
157
78647
  auto cleanup = OnScopeLeave([&]() {
158
78665
    for (BaseObjectPtr<BaseObject> object : host_objects) {
159
18
      if (!object) continue;
160
161
      // If the function did not finish successfully, host_objects will contain
162
      // a list of objects that will never be passed to JS. Therefore, we
163
      // destroy them here.
164
      object->Detach();
165
    }
166
157294
  });
167
168
89530
  for (uint32_t i = 0; i < transferables_.size(); ++i) {
169
10897
    HandleScope handle_scope(env->isolate());
170
10897
    TransferData* data = transferables_[i].get();
171
21794
    host_objects[i] = data->Deserialize(
172
21794
        env, context, std::move(transferables_[i]));
173
10897
    if (!host_objects[i]) return {};
174
10883
    if (port_list != nullptr) {
175
      // If we gather a list of all message ports, and this transferred object
176
      // is a message port, add it to that list. This is a bit of an odd case
177
      // of special handling for MessagePorts (as opposed to applying to all
178
      // transferables), but it's required for spec compliance.
179
      DCHECK((*port_list)->IsArray());
180
10840
      Local<Array> port_list_array = port_list->As<Array>();
181
10840
      Local<Object> obj = host_objects[i]->object();
182
21680
      if (env->message_port_constructor_template()->HasInstance(obj)) {
183
10784
        if (port_list_array->Set(context,
184
                                 port_list_array->Length(),
185
21568
                                 obj).IsNothing()) {
186
          return {};
187
        }
188
      }
189
    }
190
  }
191
78633
  transferables_.clear();
192
193
157266
  std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
194
  // Attach all transferred SharedArrayBuffers to their new Isolate.
195
79418
  for (uint32_t i = 0; i < shared_array_buffers_.size(); ++i) {
196
    Local<SharedArrayBuffer> sab =
197
785
        SharedArrayBuffer::New(env->isolate(), shared_array_buffers_[i]);
198
785
    shared_array_buffers.push_back(sab);
199
  }
200
201
  DeserializerDelegate delegate(
202
157266
      this, env, host_objects, shared_array_buffers, wasm_modules_);
203
  ValueDeserializer deserializer(
204
      env->isolate(),
205
78633
      reinterpret_cast<const uint8_t*>(main_message_buf_.data),
206
      main_message_buf_.size,
207
157266
      &delegate);
208
78633
  delegate.deserializer = &deserializer;
209
210
  // Attach all transferred ArrayBuffers to their new Isolate.
211
78644
  for (uint32_t i = 0; i < array_buffers_.size(); ++i) {
212
    Local<ArrayBuffer> ab =
213
11
        ArrayBuffer::New(env->isolate(), std::move(array_buffers_[i]));
214
11
    deserializer.TransferArrayBuffer(i, ab);
215
  }
216
217
157266
  if (deserializer.ReadHeader(context).IsNothing())
218
    return {};
219
  Local<Value> return_value;
220
157266
  if (!deserializer.ReadValue(context).ToLocal(&return_value))
221
    return {};
222
223
89516
  for (BaseObjectPtr<BaseObject> base_object : host_objects) {
224
21766
    if (base_object->FinalizeTransferRead(context, &deserializer).IsNothing())
225
      return {};
226
  }
227
228
78633
  host_objects.clear();
229
78633
  return handle_scope.Escape(return_value);
230
}
231
232
1011
void Message::AddSharedArrayBuffer(
233
    std::shared_ptr<BackingStore> backing_store) {
234
1011
  shared_array_buffers_.emplace_back(std::move(backing_store));
235
1011
}
236
237
11145
void Message::AddTransferable(std::unique_ptr<TransferData>&& data) {
238
11145
  transferables_.emplace_back(std::move(data));
239
11145
}
240
241
2
uint32_t Message::AddWASMModule(CompiledWasmModule&& mod) {
242
2
  wasm_modules_.emplace_back(std::move(mod));
243
2
  return wasm_modules_.size() - 1;
244
}
245
246
namespace {
247
248
34860
MaybeLocal<Function> GetEmitMessageFunction(Local<Context> context) {
249
34860
  Isolate* isolate = context->GetIsolate();
250
  Local<Object> per_context_bindings;
251
  Local<Value> emit_message_val;
252
104580
  if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
253
69720
      !per_context_bindings->Get(context,
254
104580
                                FIXED_ONE_BYTE_STRING(isolate, "emitMessage"))
255
34860
          .ToLocal(&emit_message_val)) {
256
    return MaybeLocal<Function>();
257
  }
258
34860
  CHECK(emit_message_val->IsFunction());
259
34860
  return emit_message_val.As<Function>();
260
}
261
262
849
MaybeLocal<Function> GetDOMException(Local<Context> context) {
263
849
  Isolate* isolate = context->GetIsolate();
264
  Local<Object> per_context_bindings;
265
  Local<Value> domexception_ctor_val;
266
2547
  if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
267
1698
      !per_context_bindings->Get(context,
268
2547
                                FIXED_ONE_BYTE_STRING(isolate, "DOMException"))
269
849
          .ToLocal(&domexception_ctor_val)) {
270
    return MaybeLocal<Function>();
271
  }
272
849
  CHECK(domexception_ctor_val->IsFunction());
273
849
  Local<Function> domexception_ctor = domexception_ctor_val.As<Function>();
274
849
  return domexception_ctor;
275
}
276
277
22
void ThrowDataCloneException(Local<Context> context, Local<String> message) {
278
22
  Isolate* isolate = context->GetIsolate();
279
  Local<Value> argv[] = {message,
280
44
                         FIXED_ONE_BYTE_STRING(isolate, "DataCloneError")};
281
  Local<Value> exception;
282
  Local<Function> domexception_ctor;
283
66
  if (!GetDOMException(context).ToLocal(&domexception_ctor) ||
284
44
      !domexception_ctor->NewInstance(context, arraysize(argv), argv)
285
22
           .ToLocal(&exception)) {
286
    return;
287
  }
288
22
  isolate->ThrowException(exception);
289
}
290
291
// This tells V8 how to serialize objects that it does not understand
292
// (e.g. C++ objects) into the output buffer, in a way that our own
293
// DeserializerDelegate understands how to unpack.
294
class SerializerDelegate : public ValueSerializer::Delegate {
295
 public:
296
79913
  SerializerDelegate(Environment* env, Local<Context> context, Message* m)
297
79913
      : env_(env), context_(context), msg_(m) {}
298
299
12
  void ThrowDataCloneError(Local<String> message) override {
300
12
    ThrowDataCloneException(context_, message);
301
12
  }
302
303
11156
  Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
304
22312
    if (env_->base_object_ctor_template()->HasInstance(object)) {
305
      return WriteHostObject(
306
11155
          BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(object) });
307
    }
308
309
    // Convert process.env to a regular object.
310
1
    auto env_proxy_ctor_template = env_->env_proxy_ctor_template();
311

2
    if (!env_proxy_ctor_template.IsEmpty() &&
312
2
        env_proxy_ctor_template->HasInstance(object)) {
313
1
      HandleScope scope(isolate);
314
      // TODO(bnoordhuis) Prototype-less object in case process.env contains
315
      // a "__proto__" key? process.env has a prototype with concomitant
316
      // methods like toString(). It's probably confusing if that gets lost
317
      // in transmission.
318
1
      Local<Object> normal_object = Object::New(isolate);
319
1
      env_->env_vars()->AssignToObject(isolate, env_->context(), normal_object);
320
1
      serializer->WriteUint32(kNormalObject);  // Instead of a BaseObject.
321
1
      return serializer->WriteValue(env_->context(), normal_object);
322
    }
323
324
    ThrowDataCloneError(env_->clone_unsupported_type_str());
325
    return Nothing<bool>();
326
  }
327
328
1011
  Maybe<uint32_t> GetSharedArrayBufferId(
329
      Isolate* isolate,
330
      Local<SharedArrayBuffer> shared_array_buffer) override {
331
    uint32_t i;
332
1035
    for (i = 0; i < seen_shared_array_buffers_.size(); ++i) {
333
48
      if (PersistentToLocal::Strong(seen_shared_array_buffers_[i]) ==
334
          shared_array_buffer) {
335
        return Just(i);
336
      }
337
    }
338
339
    seen_shared_array_buffers_.emplace_back(
340
2022
      Global<SharedArrayBuffer> { isolate, shared_array_buffer });
341
1011
    msg_->AddSharedArrayBuffer(shared_array_buffer->GetBackingStore());
342
1011
    return Just(i);
343
  }
344
345
2
  Maybe<uint32_t> GetWasmModuleTransferId(
346
      Isolate* isolate, Local<WasmModuleObject> module) override {
347
2
    return Just(msg_->AddWASMModule(module->GetCompiledModule()));
348
  }
349
350
79884
  Maybe<bool> Finish(Local<Context> context) {
351
91029
    for (uint32_t i = 0; i < host_objects_.size(); i++) {
352
11150
      BaseObjectPtr<BaseObject> host_object = std::move(host_objects_[i]);
353
11150
      std::unique_ptr<TransferData> data;
354
11150
      if (i < first_cloned_object_index_)
355
11061
        data = host_object->TransferForMessaging();
356
11150
      if (!data)
357
93
        data = host_object->CloneForMessaging();
358
11150
      if (!data) return Nothing<bool>();
359
22292
      if (data->FinalizeTransferWrite(context, serializer).IsNothing())
360
1
        return Nothing<bool>();
361
11145
      msg_->AddTransferable(std::move(data));
362
    }
363
79879
    return Just(true);
364
  }
365
366
11069
  inline void AddHostObject(BaseObjectPtr<BaseObject> host_object) {
367
    // Make sure we have not started serializing the value itself yet.
368
11069
    CHECK_EQ(first_cloned_object_index_, SIZE_MAX);
369
11069
    host_objects_.emplace_back(std::move(host_object));
370
11069
  }
371
372
  // Some objects in the transfer list may register sub-objects that can be
373
  // transferred. This could e.g. be a public JS wrapper object, such as a
374
  // FileHandle, that is registering its C++ handle for transfer.
375
79903
  inline Maybe<bool> AddNestedHostObjects() {
376
90971
    for (size_t i = 0; i < host_objects_.size(); i++) {
377
11068
      std::vector<BaseObjectPtr<BaseObject>> nested_transferables;
378
22136
      if (!host_objects_[i]->NestedTransferables().To(&nested_transferables))
379
        return Nothing<bool>();
380
11102
      for (auto& nested_transferable : nested_transferables) {
381
68
        if (std::find(host_objects_.begin(),
382
                      host_objects_.end(),
383
34
                      nested_transferable) == host_objects_.end()) {
384
34
          AddHostObject(nested_transferable);
385
        }
386
      }
387
    }
388
79903
    return Just(true);
389
  }
390
391
  ValueSerializer* serializer = nullptr;
392
393
 private:
394
11155
  Maybe<bool> WriteHostObject(BaseObjectPtr<BaseObject> host_object) {
395
11155
    BaseObject::TransferMode mode = host_object->GetTransferMode();
396
11155
    if (mode == BaseObject::TransferMode::kUntransferable) {
397
2
      ThrowDataCloneError(env_->clone_unsupported_type_str());
398
2
      return Nothing<bool>();
399
    }
400
401
11241
    for (uint32_t i = 0; i < host_objects_.size(); i++) {
402
11145
      if (host_objects_[i] == host_object) {
403
11057
        serializer->WriteUint32(i);
404
11057
        return Just(true);
405
      }
406
    }
407
408
96
    if (mode == BaseObject::TransferMode::kTransferable) {
409
7
      THROW_ERR_MISSING_TRANSFERABLE_IN_TRANSFER_LIST(env_);
410
7
      return Nothing<bool>();
411
    }
412
413
89
    CHECK_EQ(mode, BaseObject::TransferMode::kCloneable);
414
89
    uint32_t index = host_objects_.size();
415
89
    if (first_cloned_object_index_ == SIZE_MAX)
416
56
      first_cloned_object_index_ = index;
417
89
    serializer->WriteUint32(index);
418
89
    host_objects_.push_back(host_object);
419
89
    return Just(true);
420
  }
421
422
  Environment* env_;
423
  Local<Context> context_;
424
  Message* msg_;
425
  std::vector<Global<SharedArrayBuffer>> seen_shared_array_buffers_;
426
  std::vector<BaseObjectPtr<BaseObject>> host_objects_;
427
  size_t first_cloned_object_index_ = SIZE_MAX;
428
429
  friend class worker::Message;
430
};
431
432
}  // anonymous namespace
433
434
79913
Maybe<bool> Message::Serialize(Environment* env,
435
                               Local<Context> context,
436
                               Local<Value> input,
437
                               const TransferList& transfer_list_v,
438
                               Local<Object> source_port) {
439
159826
  HandleScope handle_scope(env->isolate());
440
79913
  Context::Scope context_scope(context);
441
442
  // Verify that we're not silently overwriting an existing message.
443
79913
  CHECK(main_message_buf_.is_empty());
444
445
159826
  SerializerDelegate delegate(env, context, this);
446
159826
  ValueSerializer serializer(env->isolate(), &delegate);
447
79913
  delegate.serializer = &serializer;
448
449
159826
  std::vector<Local<ArrayBuffer>> array_buffers;
450
90979
  for (uint32_t i = 0; i < transfer_list_v.length(); ++i) {
451
11076
    Local<Value> entry = transfer_list_v[i];
452
11076
    if (entry->IsObject()) {
453
      // See https://github.com/nodejs/node/pull/30339#issuecomment-552225353
454
      // for details.
455
      bool untransferable;
456
11076
      if (!entry.As<Object>()->HasPrivate(
457
              context,
458
11076
              env->untransferable_object_private_symbol())
459
11076
              .To(&untransferable)) {
460
        return Nothing<bool>();
461
      }
462
11076
      if (untransferable) continue;
463
    }
464
465
    // Currently, we support ArrayBuffers and BaseObjects for which
466
    // GetTransferMode() does not return kUntransferable.
467
11071
    if (entry->IsArrayBuffer()) {
468
27
      Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
469
      // If we cannot render the ArrayBuffer unusable in this Isolate,
470
      // copying the buffer will have to do.
471
      // Note that we can currently transfer ArrayBuffers even if they were
472
      // not allocated by Node’s ArrayBufferAllocator in the first place,
473
      // because we pass the underlying v8::BackingStore around rather than
474
      // raw data *and* an Isolate with a non-default ArrayBuffer allocator
475
      // is always going to outlive any Workers it creates, and so will its
476
      // allocator along with it.
477
53
      if (!ab->IsDetachable()) continue;
478
27
      if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
479
54
          array_buffers.end()) {
480
1
        ThrowDataCloneException(
481
            context,
482
            FIXED_ONE_BYTE_STRING(
483
                env->isolate(),
484
                "Transfer list contains duplicate ArrayBuffer"));
485
1
        return Nothing<bool>();
486
      }
487
      // We simply use the array index in the `array_buffers` list as the
488
      // ID that we write into the serialized buffer.
489
26
      uint32_t id = array_buffers.size();
490
26
      array_buffers.push_back(ab);
491
26
      serializer.TransferArrayBuffer(id, ab);
492
26
      continue;
493
22088
    } else if (env->base_object_ctor_template()->HasInstance(entry)) {
494
      // Check if the source MessagePort is being transferred.
495

22088
      if (!source_port.IsEmpty() && entry == source_port) {
496
1
        ThrowDataCloneException(
497
            context,
498
            FIXED_ONE_BYTE_STRING(env->isolate(),
499
                                  "Transfer list contains source port"));
500
9
        return Nothing<bool>();
501
      }
502
      BaseObjectPtr<BaseObject> host_object {
503
11043
          Unwrap<BaseObject>(entry.As<Object>()) };
504

33100
      if (env->message_port_constructor_template()->HasInstance(entry) &&
505

22027
          (!host_object ||
506
11013
           static_cast<MessagePort*>(host_object.get())->IsDetached())) {
507
7
        ThrowDataCloneException(
508
            context,
509
            FIXED_ONE_BYTE_STRING(
510
                env->isolate(),
511
                "MessagePort in transfer list is already detached"));
512
7
        return Nothing<bool>();
513
      }
514
22072
      if (std::find(delegate.host_objects_.begin(),
515
                    delegate.host_objects_.end(),
516
11036
                    host_object) != delegate.host_objects_.end()) {
517
1
        ThrowDataCloneException(
518
            context,
519
            String::Concat(env->isolate(),
520
                FIXED_ONE_BYTE_STRING(
521
                  env->isolate(),
522
                  "Transfer list contains duplicate "),
523
1
                entry.As<Object>()->GetConstructorName()));
524
1
        return Nothing<bool>();
525
      }
526

11035
      if (host_object && host_object->GetTransferMode() !=
527
11035
              BaseObject::TransferMode::kUntransferable) {
528
11035
        delegate.AddHostObject(host_object);
529
11035
        continue;
530
      }
531
    }
532
533
    THROW_ERR_INVALID_TRANSFER_OBJECT(env);
534
    return Nothing<bool>();
535
  }
536
159806
  if (delegate.AddNestedHostObjects().IsNothing())
537
    return Nothing<bool>();
538
539
79903
  serializer.WriteHeader();
540
159806
  if (serializer.WriteValue(context, input).IsNothing()) {
541
19
    return Nothing<bool>();
542
  }
543
544
79902
  for (Local<ArrayBuffer> ab : array_buffers) {
545
    // If serialization succeeded, we render it inaccessible in this Isolate.
546
36
    std::shared_ptr<BackingStore> backing_store = ab->GetBackingStore();
547
36
    ab->Detach(Local<Value>()).Check();
548
549
18
    array_buffers_.emplace_back(std::move(backing_store));
550
  }
551
552
159768
  if (delegate.Finish(context).IsNothing())
553
5
    return Nothing<bool>();
554
555
  // The serializer gave us a buffer allocated using `malloc()`.
556
79879
  std::pair<uint8_t*, size_t> data = serializer.Release();
557
79879
  CHECK_NOT_NULL(data.first);
558
  main_message_buf_ =
559
79879
      MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
560
79879
  return Just(true);
561
}
562
563
4
void Message::MemoryInfo(MemoryTracker* tracker) const {
564
4
  tracker->TrackField("array_buffers_", array_buffers_);
565
4
  tracker->TrackField("shared_array_buffers", shared_array_buffers_);
566
4
  tracker->TrackField("transferables", transferables_);
567
4
}
568
569
35843
MessagePortData::MessagePortData(MessagePort* owner)
570
35843
    : owner_(owner) {
571
35843
}
572
573
143268
MessagePortData::~MessagePortData() {
574
71634
  CHECK_NULL(owner_);
575
71634
  Disentangle();
576
143268
}
577
578
14
void MessagePortData::MemoryInfo(MemoryTracker* tracker) const {
579
28
  Mutex::ScopedLock lock(mutex_);
580
14
  tracker->TrackField("incoming_messages", incoming_messages_);
581
14
}
582
583
116177
void MessagePortData::AddToIncomingQueue(std::shared_ptr<Message> message) {
584
  // This function will be called by other threads.
585
232354
  Mutex::ScopedLock lock(mutex_);
586
116177
  incoming_messages_.emplace_back(std::move(message));
587
588
116177
  if (owner_ != nullptr) {
589
89057
    Debug(owner_, "Adding message to incoming queue");
590
89057
    owner_->TriggerAsync();
591
  }
592
116177
}
593
594
12111
void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
595
12111
  auto group = std::make_shared<SiblingGroup>();
596
12111
  group->Entangle({a, b});
597
12111
}
598
599
59623
void MessagePortData::Disentangle() {
600
59623
  if (group_) {
601
24270
    group_->Disentangle(this);
602
  }
603
59623
}
604
605
209004
MessagePort::~MessagePort() {
606
69668
  if (data_) Detach();
607
139336
}
608
609
34860
MessagePort::MessagePort(Environment* env,
610
                         Local<Context> context,
611
34860
                         Local<Object> wrap)
612
  : HandleWrap(env,
613
               wrap,
614
34860
               reinterpret_cast<uv_handle_t*>(&async_),
615
               AsyncWrap::PROVIDER_MESSAGEPORT),
616
34860
    data_(new MessagePortData(this)) {
617
49363
  auto onmessage = [](uv_async_t* handle) {
618
    // Called when data has been put into the queue.
619
49363
    MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
620
49363
    channel->OnMessage(MessageProcessingMode::kNormalOperation);
621
49359
  };
622
623
34860
  CHECK_EQ(uv_async_init(env->event_loop(),
624
                         &async_,
625
                         onmessage), 0);
626
  // Reset later to indicate success of the constructor.
627
34860
  bool succeeded = false;
628
69720
  auto cleanup = OnScopeLeave([&]() { if (!succeeded) Close(); });
629
630
  Local<Value> fn;
631
104580
  if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn))
632
    return;
633
634
34860
  if (fn->IsFunction()) {
635
34853
    Local<Function> init = fn.As<Function>();
636
69706
    if (init->Call(context, wrap, 0, nullptr).IsEmpty())
637
      return;
638
  }
639
640
  Local<Function> emit_message_fn;
641
69720
  if (!GetEmitMessageFunction(context).ToLocal(&emit_message_fn))
642
    return;
643
34860
  emit_message_fn_.Reset(env->isolate(), emit_message_fn);
644
645
34860
  succeeded = true;
646
34860
  Debug(this, "Created message port");
647
}
648
649
33049
bool MessagePort::IsDetached() const {
650

33049
  return data_ == nullptr || IsHandleClosing();
651
}
652
653
112227
void MessagePort::TriggerAsync() {
654
112227
  if (IsHandleClosing()) return;
655
112138
  CHECK_EQ(uv_async_send(&async_), 0);
656
}
657
658
34842
void MessagePort::Close(v8::Local<v8::Value> close_callback) {
659
34842
  Debug(this, "Closing message port, data set = %d", static_cast<int>(!!data_));
660
661
34842
  if (data_) {
662
    // Wrap this call with accessing the mutex, so that TriggerAsync()
663
    // can check IsHandleClosing() without race conditions.
664
69672
    Mutex::ScopedLock sibling_lock(data_->mutex_);
665
34836
    HandleWrap::Close(close_callback);
666
  } else {
667
6
    HandleWrap::Close(close_callback);
668
  }
669
34842
}
670
671
2
void MessagePort::New(const FunctionCallbackInfo<Value>& args) {
672
  // This constructor just throws an error. Unfortunately, we can’t use V8’s
673
  // ConstructorBehavior::kThrow, as that also removes the prototype from the
674
  // class (i.e. makes it behave like an arrow function).
675
2
  Environment* env = Environment::GetCurrent(args);
676
2
  THROW_ERR_CONSTRUCT_CALL_INVALID(env);
677
2
}
678
679
34860
MessagePort* MessagePort::New(
680
    Environment* env,
681
    Local<Context> context,
682
    std::unique_ptr<MessagePortData> data,
683
    std::shared_ptr<SiblingGroup> sibling_group) {
684
34860
  Context::Scope context_scope(context);
685
34860
  Local<FunctionTemplate> ctor_templ = GetMessagePortConstructorTemplate(env);
686
687
  // Construct a new instance, then assign the listener instance and possibly
688
  // the MessagePortData to it.
689
  Local<Object> instance;
690
104580
  if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance))
691
    return nullptr;
692
34860
  MessagePort* port = new MessagePort(env, context, instance);
693
34860
  CHECK_NOT_NULL(port);
694
34860
  if (port->IsHandleClosing()) {
695
    // Construction failed with an exception.
696
    return nullptr;
697
  }
698
699
34860
  if (data) {
700
11547
    CHECK(!sibling_group);
701
11547
    port->Detach();
702
11547
    port->data_ = std::move(data);
703
704
    // This lock is here to avoid race conditions with the `owner_` read
705
    // in AddToIncomingQueue(). (This would likely be unproblematic without it,
706
    // but it's better to be safe than sorry.)
707
23094
    Mutex::ScopedLock lock(port->data_->mutex_);
708
11547
    port->data_->owner_ = port;
709
    // If the existing MessagePortData object had pending messages, this is
710
    // the easiest way to run that queue.
711
11547
    port->TriggerAsync();
712
23313
  } else if (sibling_group) {
713
74
    sibling_group->Entangle(port->data_.get());
714
  }
715
34860
  return port;
716
}
717
718
128290
MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
719
                                              MessageProcessingMode mode,
720
                                              Local<Value>* port_list) {
721
128290
  std::shared_ptr<Message> received;
722
  {
723
    // Get the head of the message queue.
724
128290
    Mutex::ScopedLock lock(data_->mutex_);
725
726
128290
    Debug(this, "MessagePort has message");
727
728
128290
    bool wants_message =
729

128290
        receiving_messages_ ||
730
        mode == MessageProcessingMode::kForceReadMessages;
731
    // We have nothing to do if:
732
    // - There are no pending messages
733
    // - We are not intending to receive messages, and the message we would
734
    //   receive is not the final "close" message.
735

218945
    if (data_->incoming_messages_.empty() ||
736
90655
        (!wants_message &&
737
10276
         !data_->incoming_messages_.front()->IsCloseMessage())) {
738
75290
      return env()->no_message_symbol();
739
    }
740
741
90645
    received = data_->incoming_messages_.front();
742
90645
    data_->incoming_messages_.pop_front();
743
  }
744
745
90645
  if (received->IsCloseMessage()) {
746
23990
    Close();
747
23990
    return env()->no_message_symbol();
748
  }
749
750
78650
  if (!env()->can_call_into_js()) return MaybeLocal<Value>();
751
752
78647
  return received->Deserialize(env(), context, port_list);
753
}
754
755
49867
void MessagePort::OnMessage(MessageProcessingMode mode) {
756
49867
  Debug(this, "Running MessagePort::OnMessage()");
757
49867
  HandleScope handle_scope(env()->isolate());
758
  Local<Context> context =
759
99734
      object(env()->isolate())->GetCreationContext().ToLocalChecked();
760
761
  size_t processing_limit;
762
49867
  if (mode == MessageProcessingMode::kNormalOperation) {
763
49363
    Mutex::ScopedLock(data_->mutex_);
764
49363
    processing_limit = std::max(data_->incoming_messages_.size(),
765
98726
                                static_cast<size_t>(1000));
766
  } else {
767
504
    processing_limit = std::numeric_limits<size_t>::max();
768
  }
769
770
  // data_ can only ever be modified by the owner thread, so no need to lock.
771
  // However, the message port may be transferred while it is processing
772
  // messages, so we need to check that this handle still owns its `data_` field
773
  // on every iteration.
774
128140
  while (data_) {
775
128140
    if (processing_limit-- == 0) {
776
      // Prevent event loop starvation by only processing those messages without
777
      // interruption that were already present when the OnMessage() call was
778
      // first triggered, but at least 1000 messages because otherwise the
779
      // overhead of repeatedly triggering the uv_async_t instance becomes
780
      // noticeable, at least on Windows.
781
      // (That might require more investigation by somebody more familiar with
782
      // Windows.)
783
1
      TriggerAsync();
784
240
      return;
785
    }
786
787
128139
    HandleScope handle_scope(env()->isolate());
788
128139
    Context::Scope context_scope(context);
789
128139
    Local<Function> emit_message = PersistentToLocal::Strong(emit_message_fn_);
790
791
    Local<Value> payload;
792
256278
    Local<Value> port_list = Undefined(env()->isolate());
793
    Local<Value> message_error;
794
512556
    Local<Value> argv[3];
795
796
    {
797
      // Catch any exceptions from parsing the message itself (not from
798
      // emitting it) as 'messageeror' events.
799
128139
      TryCatchScope try_catch(env());
800
256278
      if (!ReceiveMessage(context, mode, &port_list).ToLocal(&payload)) {
801

17
        if (try_catch.HasCaught() && !try_catch.HasTerminated())
802
14
          message_error = try_catch.Exception();
803
17
        goto reschedule;
804
      }
805
    }
806
256244
    if (payload == env()->no_message_symbol()) break;
807
808
78499
    if (!env()->can_call_into_js()) {
809
1
      Debug(this, "MessagePort drains queue because !can_call_into_js()");
810
      // In this case there is nothing to do but to drain the current queue.
811
1
      continue;
812
    }
813
814
78498
    argv[0] = payload;
815
78498
    argv[1] = port_list;
816
78498
    argv[2] = env()->message_string();
817
818
156992
    if (MakeCallback(emit_message, arraysize(argv), argv).IsEmpty()) {
819
239
    reschedule:
820
239
      if (!message_error.IsEmpty()) {
821
14
        argv[0] = message_error;
822
14
        argv[1] = Undefined(env()->isolate());
823
14
        argv[2] = env()->messageerror_string();
824
14
        USE(MakeCallback(emit_message, arraysize(argv), argv));
825
      }
826
827
      // Re-schedule OnMessage() execution in case of failure.
828
239
      if (data_)
829
239
        TriggerAsync();
830
239
      return;
831
    }
832
  }
833
}
834
835
34834
void MessagePort::OnClose() {
836
34834
  Debug(this, "MessagePort::OnClose()");
837
34834
  if (data_) {
838
    // Detach() returns move(data_).
839
23806
    Detach()->Disentangle();
840
  }
841
34834
}
842
843
46381
std::unique_ptr<MessagePortData> MessagePort::Detach() {
844
46381
  CHECK(data_);
845
92762
  Mutex::ScopedLock lock(data_->mutex_);
846
46381
  data_->owner_ = nullptr;
847
46381
  return std::move(data_);
848
}
849
850
22030
BaseObject::TransferMode MessagePort::GetTransferMode() const {
851
22030
  if (IsDetached())
852
1
    return BaseObject::TransferMode::kUntransferable;
853
22029
  return BaseObject::TransferMode::kTransferable;
854
}
855
856
11022
std::unique_ptr<TransferData> MessagePort::TransferForMessaging() {
857
22044
  Close();
858
11022
  return Detach();
859
}
860
861
10785
BaseObjectPtr<BaseObject> MessagePortData::Deserialize(
862
    Environment* env,
863
    Local<Context> context,
864
    std::unique_ptr<TransferData> self) {
865
43140
  return BaseObjectPtr<MessagePort> { MessagePort::New(
866
      env, context,
867
32355
      static_unique_pointer_cast<MessagePortData>(std::move(self))) };
868
}
869
870
79906
Maybe<bool> MessagePort::PostMessage(Environment* env,
871
                                     Local<Context> context,
872
                                     Local<Value> message_v,
873
                                     const TransferList& transfer_v) {
874
79906
  Isolate* isolate = env->isolate();
875
79906
  Local<Object> obj = object(isolate);
876
877
159812
  std::shared_ptr<Message> msg = std::make_shared<Message>();
878
879
  // Per spec, we need to both check if transfer list has the source port, and
880
  // serialize the input message, even if the MessagePort is closed or detached.
881
882
  Maybe<bool> serialization_maybe =
883
79906
      msg->Serialize(env, context, message_v, transfer_v, obj);
884
79906
  if (data_ == nullptr) {
885
    return serialization_maybe;
886
  }
887
79906
  if (serialization_maybe.IsNothing()) {
888
28
    return Nothing<bool>();
889
  }
890
891
159756
  std::string error;
892
79878
  Maybe<bool> res = data_->Dispatch(msg, &error);
893
79878
  if (res.IsNothing())
894
    return res;
895
896
79878
  if (!error.empty())
897
2
    ProcessEmitWarning(env, error.c_str());
898
899
79878
  return res;
900
}
901
902
79878
Maybe<bool> MessagePortData::Dispatch(
903
    std::shared_ptr<Message> message,
904
    std::string* error) {
905
79878
  if (!group_) {
906
    if (error != nullptr)
907
      *error = "MessagePortData is not entangled.";
908
    return Nothing<bool>();
909
  }
910
79878
  return group_->Dispatch(this, message, error);
911
}
912
913
11084
static Maybe<bool> ReadIterable(Environment* env,
914
                                Local<Context> context,
915
                                // NOLINTNEXTLINE(runtime/references)
916
                                TransferList& transfer_list,
917
                                Local<Value> object) {
918
11084
  if (!object->IsObject()) return Just(false);
919
920
11081
  if (object->IsArray()) {
921
11061
    Local<Array> arr = object.As<Array>();
922
11061
    size_t length = arr->Length();
923
11061
    transfer_list.AllocateSufficientStorage(length);
924
22135
    for (size_t i = 0; i < length; i++) {
925
22148
      if (!arr->Get(context, i).ToLocal(&transfer_list[i]))
926
        return Nothing<bool>();
927
    }
928
11061
    return Just(true);
929
  }
930
931
20
  Isolate* isolate = env->isolate();
932
  Local<Value> iterator_method;
933
40
  if (!object.As<Object>()->Get(context, Symbol::GetIterator(isolate))
934
20
      .ToLocal(&iterator_method)) return Nothing<bool>();
935
20
  if (!iterator_method->IsFunction()) return Just(false);
936
937
  Local<Value> iterator;
938
6
  if (!iterator_method.As<Function>()->Call(context, object, 0, nullptr)
939
6
      .ToLocal(&iterator)) return Nothing<bool>();
940
6
  if (!iterator->IsObject()) return Just(false);
941
942
  Local<Value> next;
943
18
  if (!iterator.As<Object>()->Get(context, env->next_string()).ToLocal(&next))
944
    return Nothing<bool>();
945
6
  if (!next->IsFunction()) return Just(false);
946
947
6
  std::vector<Local<Value>> entries;
948
5
  while (env->can_call_into_js()) {
949
    Local<Value> result;
950
5
    if (!next.As<Function>()->Call(context, iterator, 0, nullptr)
951
5
        .ToLocal(&result)) return Nothing<bool>();
952
4
    if (!result->IsObject()) return Just(false);
953
954
    Local<Value> done;
955
12
    if (!result.As<Object>()->Get(context, env->done_string()).ToLocal(&done))
956
      return Nothing<bool>();
957
4
    if (done->BooleanValue(isolate)) break;
958
959
    Local<Value> val;
960
6
    if (!result.As<Object>()->Get(context, env->value_string()).ToLocal(&val))
961
      return Nothing<bool>();
962
2
    entries.push_back(val);
963
  }
964
965
2
  transfer_list.AllocateSufficientStorage(entries.size());
966
2
  std::copy(entries.begin(), entries.end(), &transfer_list[0]);
967
2
  return Just(true);
968
}
969
970
79925
void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
971
79925
  Environment* env = Environment::GetCurrent(args);
972
79925
  Local<Object> obj = args.This();
973
159850
  Local<Context> context = obj->GetCreationContext().ToLocalChecked();
974
975
79925
  if (args.Length() == 0) {
976
    return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
977
19
                                       "MessagePort.postMessage");
978
  }
979
980

170927
  if (!args[1]->IsNullOrUndefined() && !args[1]->IsObject()) {
981
    // Browsers ignore null or undefined, and otherwise accept an array or an
982
    // options object.
983
4
    return THROW_ERR_INVALID_ARG_TYPE(env,
984
4
        "Optional transferList argument must be an iterable");
985
  }
986
987
79921
  TransferList transfer_list;
988
79921
  if (args[1]->IsObject()) {
989
    bool was_iterable;
990
22146
    if (!ReadIterable(env, context, transfer_list, args[1]).To(&was_iterable))
991
8
      return;
992
11073
    if (!was_iterable) {
993
      Local<Value> transfer_option;
994
39
      if (!args[1].As<Object>()->Get(context, env->transfer_string())
995
21
          .ToLocal(&transfer_option)) return;
996
26
      if (!transfer_option->IsUndefined()) {
997
11
        if (!ReadIterable(env, context, transfer_list, transfer_option)
998
11
            .To(&was_iterable)) return;
999
10
        if (!was_iterable) {
1000
7
          return THROW_ERR_INVALID_ARG_TYPE(env,
1001
7
              "Optional options.transfer argument must be an iterable");
1002
        }
1003
      }
1004
    }
1005
  }
1006
1007
79913
  MessagePort* port = Unwrap<MessagePort>(args.This());
1008
  // Even if the backing MessagePort object has already been deleted, we still
1009
  // want to serialize the message to ensure spec-compliant behavior w.r.t.
1010
  // transfers.
1011

79913
  if (port == nullptr || port->IsHandleClosing()) {
1012
14
    Message msg;
1013
7
    USE(msg.Serialize(env, context, args[0], transfer_list, obj));
1014
7
    return;
1015
  }
1016
1017
79906
  Maybe<bool> res = port->PostMessage(env, context, args[0], transfer_list);
1018
79906
  if (res.IsJust())
1019
239634
    args.GetReturnValue().Set(res.FromJust());
1020
}
1021
1022
25439
void MessagePort::Start() {
1023
25439
  Debug(this, "Start receiving messages");
1024
25439
  receiving_messages_ = true;
1025
50878
  Mutex::ScopedLock lock(data_->mutex_);
1026
25439
  if (!data_->incoming_messages_.empty())
1027
11383
    TriggerAsync();
1028
25439
}
1029
1030
20356
void MessagePort::Stop() {
1031
20356
  Debug(this, "Stop receiving messages");
1032
20356
  receiving_messages_ = false;
1033
20356
}
1034
1035
25440
void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
1036
  MessagePort* port;
1037
25441
  ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
1038
25440
  if (!port->data_) {
1039
1
    return;
1040
  }
1041
25439
  port->Start();
1042
}
1043
1044
20843
void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
1045
  MessagePort* port;
1046
20843
  CHECK(args[0]->IsObject());
1047
41688
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
1048
20358
  if (!port->data_) {
1049
2
    return;
1050
  }
1051
20356
  port->Stop();
1052
}
1053
1054
27
void MessagePort::CheckType(const FunctionCallbackInfo<Value>& args) {
1055
27
  Environment* env = Environment::GetCurrent(args);
1056
27
  args.GetReturnValue().Set(
1057
81
      GetMessagePortConstructorTemplate(env)->HasInstance(args[0]));
1058
27
}
1059
1060
1904
void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
1061
  MessagePort* port;
1062
3808
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
1063
504
  port->OnMessage(MessageProcessingMode::kForceReadMessages);
1064
}
1065
1066
156
void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) {
1067
156
  Environment* env = Environment::GetCurrent(args);
1068
309
  if (!args[0]->IsObject() ||
1069

615
      !env->message_port_constructor_template()->HasInstance(args[0])) {
1070
5
    return THROW_ERR_INVALID_ARG_TYPE(env,
1071
5
        "The \"port\" argument must be a MessagePort instance");
1072
  }
1073
302
  MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1074
151
  if (port == nullptr) {
1075
    // Return 'no messages' for a closed port.
1076
    args.GetReturnValue().Set(
1077
        Environment::GetCurrent(args)->no_message_symbol());
1078
    return;
1079
  }
1080
1081
  MaybeLocal<Value> payload = port->ReceiveMessage(
1082
453
      port->object()->GetCreationContext().ToLocalChecked(),
1083
151
      MessageProcessingMode::kForceReadMessages);
1084
151
  if (!payload.IsEmpty())
1085
302
    args.GetReturnValue().Set(payload.ToLocalChecked());
1086
}
1087
1088
7
void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
1089
7
  Environment* env = Environment::GetCurrent(args);
1090
14
  if (!args[0]->IsObject() ||
1091

28
      !env->message_port_constructor_template()->HasInstance(args[0])) {
1092
    return THROW_ERR_INVALID_ARG_TYPE(env,
1093
1
        "The \"port\" argument must be a MessagePort instance");
1094
  }
1095
14
  MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1096

7
  if (port == nullptr || port->IsHandleClosing()) {
1097
1
    Isolate* isolate = env->isolate();
1098
1
    THROW_ERR_CLOSED_MESSAGE_PORT(isolate);
1099
1
    return;
1100
  }
1101
1102
6
  Local<Value> context_arg = args[1];
1103
  ContextifyContext* context_wrapper;
1104

12
  if (!context_arg->IsObject() ||
1105
6
      (context_wrapper = ContextifyContext::ContextFromContextifiedSandbox(
1106
12
          env, context_arg.As<Object>())) == nullptr) {
1107
    return THROW_ERR_INVALID_ARG_TYPE(env, "Invalid context argument");
1108
  }
1109
1110
6
  std::unique_ptr<MessagePortData> data;
1111
6
  if (!port->IsDetached())
1112
6
    data = port->Detach();
1113
1114
6
  Context::Scope context_scope(context_wrapper->context());
1115
  MessagePort* target =
1116
6
      MessagePort::New(env, context_wrapper->context(), std::move(data));
1117
6
  if (target != nullptr)
1118
12
    args.GetReturnValue().Set(target->object());
1119
}
1120
1121
11128
void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
1122
11128
  MessagePortData::Entangle(a->data_.get(), b->data_.get());
1123
11128
}
1124
1125
983
void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
1126
983
  MessagePortData::Entangle(a->data_.get(), b);
1127
983
}
1128
1129
14
void MessagePort::MemoryInfo(MemoryTracker* tracker) const {
1130
14
  tracker->TrackField("data", data_);
1131
14
  tracker->TrackField("emit_message_fn", emit_message_fn_);
1132
14
}
1133
1134
36541
Local<FunctionTemplate> GetMessagePortConstructorTemplate(Environment* env) {
1135
  // Factor generating the MessagePort JS constructor into its own piece
1136
  // of code, because it is needed early on in the child environment setup.
1137
36541
  Local<FunctionTemplate> templ = env->message_port_constructor_template();
1138
36541
  if (!templ.IsEmpty())
1139
35714
    return templ;
1140
1141
  {
1142
827
    Isolate* isolate = env->isolate();
1143
827
    Local<FunctionTemplate> m = NewFunctionTemplate(isolate, MessagePort::New);
1144
827
    m->SetClassName(env->message_port_constructor_string());
1145
1654
    m->InstanceTemplate()->SetInternalFieldCount(
1146
        MessagePort::kInternalFieldCount);
1147
827
    m->Inherit(HandleWrap::GetConstructorTemplate(env));
1148
1149
827
    SetProtoMethod(isolate, m, "postMessage", MessagePort::PostMessage);
1150
827
    SetProtoMethod(isolate, m, "start", MessagePort::Start);
1151
1152
827
    env->set_message_port_constructor_template(m);
1153
  }
1154
1155
827
  return GetMessagePortConstructorTemplate(env);
1156
}
1157
1158
13708
JSTransferable::JSTransferable(Environment* env, Local<Object> obj)
1159
13708
    : BaseObject(env, obj) {
1160
13708
  MakeWeak();
1161
13708
}
1162
1163
13708
void JSTransferable::New(const FunctionCallbackInfo<Value>& args) {
1164
13708
  CHECK(args.IsConstructCall());
1165
27416
  new JSTransferable(Environment::GetCurrent(args), args.This());
1166
13708
}
1167
1168
111
JSTransferable::TransferMode JSTransferable::GetTransferMode() const {
1169
  // Implement `kClone in this ? kCloneable : kTransferable`.
1170
222
  HandleScope handle_scope(env()->isolate());
1171
222
  errors::TryCatchScope ignore_exceptions(env());
1172
1173
  bool has_clone;
1174
111
  if (!object()->Has(env()->context(),
1175
333
                     env()->messaging_clone_symbol()).To(&has_clone)) {
1176
    return TransferMode::kUntransferable;
1177
  }
1178
1179
111
  return has_clone ? TransferMode::kCloneable : TransferMode::kTransferable;
1180
}
1181
1182
33
std::unique_ptr<TransferData> JSTransferable::TransferForMessaging() {
1183
33
  return TransferOrClone(TransferMode::kTransferable);
1184
}
1185
1186
49
std::unique_ptr<TransferData> JSTransferable::CloneForMessaging() const {
1187
49
  return TransferOrClone(TransferMode::kCloneable);
1188
}
1189
1190
82
std::unique_ptr<TransferData> JSTransferable::TransferOrClone(
1191
    TransferMode mode) const {
1192
  // Call `this[symbol]()` where `symbol` is `kClone` or `kTransfer`,
1193
  // which should return an object with `data` and `deserializeInfo` properties;
1194
  // `data` is written to the serializer later, and `deserializeInfo` is stored
1195
  // on the `TransferData` instance as a string.
1196
164
  HandleScope handle_scope(env()->isolate());
1197
82
  Local<Context> context = env()->isolate()->GetCurrentContext();
1198
  Local<Symbol> method_name = mode == TransferMode::kCloneable ?
1199
82
      env()->messaging_clone_symbol() : env()->messaging_transfer_symbol();
1200
1201
  Local<Value> method;
1202
246
  if (!object()->Get(context, method_name).ToLocal(&method)) {
1203
    return {};
1204
  }
1205
82
  if (method->IsFunction()) {
1206
    Local<Value> result_v;
1207
78
    if (!method.As<Function>()->Call(
1208
234
            context, object(), 0, nullptr).ToLocal(&result_v)) {
1209
78
      return {};
1210
    }
1211
1212
74
    if (result_v->IsObject()) {
1213
74
      Local<Object> result = result_v.As<Object>();
1214
      Local<Value> data;
1215
      Local<Value> deserialize_info;
1216
296
      if (!result->Get(context, env()->data_string()).ToLocal(&data) ||
1217
222
          !result->Get(context, env()->deserialize_info_string())
1218
74
              .ToLocal(&deserialize_info)) {
1219
        return {};
1220
      }
1221
148
      Utf8Value deserialize_info_str(env()->isolate(), deserialize_info);
1222
74
      if (*deserialize_info_str == nullptr) return {};
1223
74
      return std::make_unique<Data>(
1224
296
          *deserialize_info_str, Global<Value>(env()->isolate(), data));
1225
    }
1226
  }
1227
1228
4
  if (mode == TransferMode::kTransferable)
1229
    return TransferOrClone(TransferMode::kCloneable);
1230
  else
1231
4
    return {};
1232
}
1233
1234
Maybe<BaseObjectList>
1235
33
JSTransferable::NestedTransferables() const {
1236
  // Call `this[kTransferList]()` and return the resulting list of BaseObjects.
1237
66
  HandleScope handle_scope(env()->isolate());
1238
33
  Local<Context> context = env()->isolate()->GetCurrentContext();
1239
33
  Local<Symbol> method_name = env()->messaging_transfer_list_symbol();
1240
1241
  Local<Value> method;
1242
99
  if (!object()->Get(context, method_name).ToLocal(&method)) {
1243
    return Nothing<BaseObjectList>();
1244
  }
1245
33
  if (!method->IsFunction()) return Just(BaseObjectList {});
1246
1247
  Local<Value> list_v;
1248
33
  if (!method.As<Function>()->Call(
1249
99
          context, object(), 0, nullptr).ToLocal(&list_v)) {
1250
    return Nothing<BaseObjectList>();
1251
  }
1252
33
  if (!list_v->IsArray()) return Just(BaseObjectList {});
1253
33
  Local<Array> list = list_v.As<Array>();
1254
1255
66
  BaseObjectList ret;
1256
134
  for (size_t i = 0; i < list->Length(); i++) {
1257
    Local<Value> value;
1258
68
    if (!list->Get(context, i).ToLocal(&value))
1259
      return Nothing<BaseObjectList>();
1260
68
    if (env()->base_object_ctor_template()->HasInstance(value))
1261
34
      ret.emplace_back(Unwrap<BaseObject>(value));
1262
  }
1263
33
  return Just(ret);
1264
}
1265
1266
56
Maybe<bool> JSTransferable::FinalizeTransferRead(
1267
    Local<Context> context, ValueDeserializer* deserializer) {
1268
  // Call `this[kDeserialize](data)` where `data` comes from the return value
1269
  // of `this[kTransfer]()` or `this[kClone]()`.
1270
112
  HandleScope handle_scope(env()->isolate());
1271
  Local<Value> data;
1272
112
  if (!deserializer->ReadValue(context).ToLocal(&data)) return Nothing<bool>();
1273
1274
56
  Local<Symbol> method_name = env()->messaging_deserialize_symbol();
1275
  Local<Value> method;
1276
168
  if (!object()->Get(context, method_name).ToLocal(&method)) {
1277
    return Nothing<bool>();
1278
  }
1279
56
  if (!method->IsFunction()) return Just(true);
1280
1281
168
  if (method.As<Function>()->Call(context, object(), 1, &data).IsEmpty()) {
1282
    return Nothing<bool>();
1283
  }
1284
56
  return Just(true);
1285
}
1286
1287
74
JSTransferable::Data::Data(std::string&& deserialize_info,
1288
74
                           v8::Global<v8::Value>&& data)
1289
74
    : deserialize_info_(std::move(deserialize_info)),
1290
74
      data_(std::move(data)) {}
1291
1292
67
BaseObjectPtr<BaseObject> JSTransferable::Data::Deserialize(
1293
    Environment* env,
1294
    Local<Context> context,
1295
    std::unique_ptr<TransferData> self) {
1296
  // Create the JS wrapper object that will later be filled with data passed to
1297
  // the `[kDeserialize]()` method on it. This split is necessary, because here
1298
  // we need to create an object with the right prototype and internal fields,
1299
  // but the actual JS data stored in the serialized data can only be read at
1300
  // the end of the stream, after the main message has been read.
1301
1302
134
  if (context != env->context()) {
1303
2
    THROW_ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE(env);
1304
2
    return {};
1305
  }
1306
130
  HandleScope handle_scope(env->isolate());
1307
  Local<Value> info;
1308
130
  if (!ToV8Value(context, deserialize_info_).ToLocal(&info)) return {};
1309
1310
  Local<Value> ret;
1311
130
  CHECK(!env->messaging_deserialize_create_object().IsEmpty());
1312
65
  if (!env->messaging_deserialize_create_object()->Call(
1313
251
          context, Null(env->isolate()), 1, &info).ToLocal(&ret) ||
1314

177
      !env->base_object_ctor_template()->HasInstance(ret)) {
1315
9
    return {};
1316
  }
1317
1318
56
  return BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(ret) };
1319
}
1320
1321
74
Maybe<bool> JSTransferable::Data::FinalizeTransferWrite(
1322
    Local<Context> context, ValueSerializer* serializer) {
1323
74
  HandleScope handle_scope(context->GetIsolate());
1324
74
  auto ret = serializer->WriteValue(context, PersistentToLocal::Strong(data_));
1325
74
  data_.Reset();
1326
74
  return ret;
1327
}
1328
1329
74
std::shared_ptr<SiblingGroup> SiblingGroup::Get(const std::string& name) {
1330
148
  Mutex::ScopedLock lock(SiblingGroup::groups_mutex_);
1331
74
  std::shared_ptr<SiblingGroup> group;
1332
74
  auto i = groups_.find(name);
1333

74
  if (i == groups_.end() || i->second.expired()) {
1334
31
    group = std::make_shared<SiblingGroup>(name);
1335
31
    groups_[name] = group;
1336
  } else {
1337
43
    group = i->second.lock();
1338
  }
1339
74
  return group;
1340
}
1341
1342
30
void SiblingGroup::CheckSiblingGroup(const std::string& name) {
1343
60
  Mutex::ScopedLock lock(SiblingGroup::groups_mutex_);
1344
30
  auto i = groups_.find(name);
1345

30
  if (i != groups_.end() && i->second.expired())
1346
30
    groups_.erase(name);
1347
30
}
1348
1349
31
SiblingGroup::SiblingGroup(const std::string& name)
1350
31
    : name_(name) { }
1351
1352
12116
SiblingGroup::~SiblingGroup() {
1353
  // If this is a named group, check to see if we can remove the group
1354
12116
  if (!name_.empty())
1355
30
    CheckSiblingGroup(name_);
1356
12116
}
1357
1358
79878
Maybe<bool> SiblingGroup::Dispatch(
1359
    MessagePortData* source,
1360
    std::shared_ptr<Message> message,
1361
    std::string* error) {
1362
1363
159756
  RwLock::ScopedReadLock lock(group_mutex_);
1364
1365
  // The source MessagePortData is not part of this group.
1366
79878
  if (ports_.find(source) == ports_.end()) {
1367
    if (error != nullptr)
1368
      *error = "Source MessagePort is not entangled with this group.";
1369
    return Nothing<bool>();
1370
  }
1371
1372
  // There are no destination ports.
1373
79878
  if (size() <= 1)
1374
103
    return Just(false);
1375
1376
  // Transferables cannot be used when there is more
1377
  // than a single destination.
1378

79775
  if (size() > 2 && message->has_transferables()) {
1379
    if (error != nullptr)
1380
      *error = "Transferables cannot be used with multiple destinations.";
1381
    return Nothing<bool>();
1382
  }
1383
1384
239343
  for (MessagePortData* port : ports_) {
1385
159570
    if (port == source)
1386
79773
      continue;
1387
    // This loop should only be entered if there's only a single destination
1388
90938
    for (const auto& transferable : message->transferables()) {
1389
11143
      if (port == transferable.get()) {
1390
2
        if (error != nullptr) {
1391
          *error = "The target port was posted to itself, and the "
1392
2
                   "communication channel was lost";
1393
        }
1394
2
        return Just(true);
1395
      }
1396
    }
1397
79795
    port->AddToIncomingQueue(message);
1398
  }
1399
1400
79773
  return Just(true);
1401
}
1402
1403
74
void SiblingGroup::Entangle(MessagePortData* port) {
1404
74
  Entangle({ port });
1405
74
}
1406
1407
12185
void SiblingGroup::Entangle(std::initializer_list<MessagePortData*> ports) {
1408
24370
  RwLock::ScopedWriteLock lock(group_mutex_);
1409
36481
  for (MessagePortData* data : ports) {
1410
24296
    ports_.insert(data);
1411
24296
    CHECK(!data->group_);
1412
24296
    data->group_ = shared_from_this();
1413
  }
1414
12185
}
1415
1416
24270
void SiblingGroup::Disentangle(MessagePortData* data) {
1417
48540
  auto self = shared_from_this();  // Keep alive until end of function.
1418
48540
  RwLock::ScopedWriteLock lock(group_mutex_);
1419
24270
  ports_.erase(data);
1420
24270
  data->group_.reset();
1421
1422
24270
  data->AddToIncomingQueue(std::make_shared<Message>());
1423
  // If this is an anonymous group and there's another port, close it.
1424

24270
  if (size() == 1 && name_.empty())
1425
12112
    (*(ports_.begin()))->AddToIncomingQueue(std::make_shared<Message>());
1426
24270
}
1427
1428
SiblingGroup::Map SiblingGroup::groups_;
1429
Mutex SiblingGroup::groups_mutex_;
1430
1431
namespace {
1432
1433
827
static void SetDeserializerCreateObjectFunction(
1434
    const FunctionCallbackInfo<Value>& args) {
1435
827
  Environment* env = Environment::GetCurrent(args);
1436
827
  CHECK(args[0]->IsFunction());
1437
1654
  env->set_messaging_deserialize_create_object(args[0].As<Function>());
1438
827
}
1439
1440
11129
static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
1441
11129
  Environment* env = Environment::GetCurrent(args);
1442
11129
  if (!args.IsConstructCall()) {
1443
1
    THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
1444
1
    return;
1445
  }
1446
1447
22256
  Local<Context> context = args.This()->GetCreationContext().ToLocalChecked();
1448
11128
  Context::Scope context_scope(context);
1449
1450
11128
  MessagePort* port1 = MessagePort::New(env, context);
1451
11128
  if (port1 == nullptr) return;
1452
11128
  MessagePort* port2 = MessagePort::New(env, context);
1453
11128
  if (port2 == nullptr) {
1454
    port1->Close();
1455
    return;
1456
  }
1457
1458
11128
  MessagePort::Entangle(port1, port2);
1459
1460
44512
  args.This()->Set(context, env->port1_string(), port1->object())
1461
      .Check();
1462
44512
  args.This()->Set(context, env->port2_string(), port2->object())
1463
      .Check();
1464
}
1465
1466
74
static void BroadcastChannel(const FunctionCallbackInfo<Value>& args) {
1467
148
  CHECK(args[0]->IsString());
1468
74
  Environment* env = Environment::GetCurrent(args);
1469
148
  Context::Scope context_scope(env->context());
1470
148
  Utf8Value name(env->isolate(), args[0]);
1471
  MessagePort* port =
1472
74
      MessagePort::New(env, env->context(), {}, SiblingGroup::Get(*name));
1473
74
  if (port != nullptr) {
1474
148
    args.GetReturnValue().Set(port->object());
1475
  }
1476
74
}
1477
1478
827
static void InitMessaging(Local<Object> target,
1479
                          Local<Value> unused,
1480
                          Local<Context> context,
1481
                          void* priv) {
1482
827
  Environment* env = Environment::GetCurrent(context);
1483
827
  Isolate* isolate = env->isolate();
1484
1485
  {
1486
827
    SetConstructorFunction(context,
1487
                           target,
1488
                           "MessageChannel",
1489
                           NewFunctionTemplate(isolate, MessageChannel));
1490
  }
1491
1492
  {
1493
    Local<FunctionTemplate> t =
1494
827
        NewFunctionTemplate(isolate, JSTransferable::New);
1495
827
    t->Inherit(BaseObject::GetConstructorTemplate(env));
1496
1654
    t->InstanceTemplate()->SetInternalFieldCount(
1497
        JSTransferable::kInternalFieldCount);
1498
827
    SetConstructorFunction(context, target, "JSTransferable", t);
1499
  }
1500
1501
827
  SetConstructorFunction(context,
1502
                         target,
1503
                         env->message_port_constructor_string(),
1504
                         GetMessagePortConstructorTemplate(env));
1505
1506
  // These are not methods on the MessagePort prototype, because
1507
  // the browser equivalents do not provide them.
1508
827
  SetMethod(context, target, "stopMessagePort", MessagePort::Stop);
1509
827
  SetMethod(context, target, "checkMessagePort", MessagePort::CheckType);
1510
827
  SetMethod(context, target, "drainMessagePort", MessagePort::Drain);
1511
827
  SetMethod(
1512
      context, target, "receiveMessageOnPort", MessagePort::ReceiveMessage);
1513
827
  SetMethod(
1514
      context, target, "moveMessagePortToContext", MessagePort::MoveToContext);
1515
827
  SetMethod(context,
1516
            target,
1517
            "setDeserializerCreateObjectFunction",
1518
            SetDeserializerCreateObjectFunction);
1519
827
  SetMethod(context, target, "broadcastChannel", BroadcastChannel);
1520
1521
  {
1522
1654
    Local<Function> domexception = GetDOMException(context).ToLocalChecked();
1523
    target
1524
827
        ->Set(context,
1525
              FIXED_ONE_BYTE_STRING(env->isolate(), "DOMException"),
1526
1654
              domexception)
1527
        .Check();
1528
  }
1529
827
}
1530
1531
5717
static void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
1532
5717
  registry->Register(MessageChannel);
1533
5717
  registry->Register(BroadcastChannel);
1534
5717
  registry->Register(JSTransferable::New);
1535
5717
  registry->Register(MessagePort::New);
1536
5717
  registry->Register(MessagePort::PostMessage);
1537
5717
  registry->Register(MessagePort::Start);
1538
5717
  registry->Register(MessagePort::Stop);
1539
5717
  registry->Register(MessagePort::CheckType);
1540
5717
  registry->Register(MessagePort::Drain);
1541
5717
  registry->Register(MessagePort::ReceiveMessage);
1542
5717
  registry->Register(MessagePort::MoveToContext);
1543
5717
  registry->Register(SetDeserializerCreateObjectFunction);
1544
5717
}
1545
1546
}  // anonymous namespace
1547
1548
}  // namespace worker
1549
}  // namespace node
1550
1551
5788
NODE_BINDING_CONTEXT_AWARE_INTERNAL(messaging, node::worker::InitMessaging)
1552
5717
NODE_BINDING_EXTERNAL_REFERENCE(messaging,
1553
                                node::worker::RegisterExternalReferences)