GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/node_messaging.cc Lines: 728 788 92.4 %
Date: 2020-12-12 04:11:07 Branches: 357 489 73.0 %

Line Branch Exec Source
1
#include "node_messaging.h"
2
3
#include "async_wrap-inl.h"
4
#include "debug_utils-inl.h"
5
#include "memory_tracker-inl.h"
6
#include "node_buffer.h"
7
#include "node_contextify.h"
8
#include "node_errors.h"
9
#include "node_external_reference.h"
10
#include "node_process.h"
11
#include "util-inl.h"
12
13
using node::contextify::ContextifyContext;
14
using node::errors::TryCatchScope;
15
using v8::Array;
16
using v8::ArrayBuffer;
17
using v8::BackingStore;
18
using v8::CompiledWasmModule;
19
using v8::Context;
20
using v8::EscapableHandleScope;
21
using v8::Function;
22
using v8::FunctionCallbackInfo;
23
using v8::FunctionTemplate;
24
using v8::Global;
25
using v8::HandleScope;
26
using v8::Isolate;
27
using v8::Just;
28
using v8::Local;
29
using v8::Maybe;
30
using v8::MaybeLocal;
31
using v8::Nothing;
32
using v8::Object;
33
using v8::SharedArrayBuffer;
34
using v8::String;
35
using v8::Symbol;
36
using v8::Value;
37
using v8::ValueDeserializer;
38
using v8::ValueSerializer;
39
using v8::WasmModuleObject;
40
41
namespace node {
42
43
using BaseObjectList = std::vector<BaseObjectPtr<BaseObject>>;
44
45
1
BaseObject::TransferMode BaseObject::GetTransferMode() const {
46
1
  return BaseObject::TransferMode::kUntransferable;
47
}
48
49
std::unique_ptr<worker::TransferData> BaseObject::TransferForMessaging() {
50
  return CloneForMessaging();
51
}
52
53
std::unique_ptr<worker::TransferData> BaseObject::CloneForMessaging() const {
54
  return {};
55
}
56
57
10662
Maybe<BaseObjectList> BaseObject::NestedTransferables() const {
58
10662
  return Just(BaseObjectList {});
59
}
60
61
10407
Maybe<bool> BaseObject::FinalizeTransferRead(
62
    Local<Context> context, ValueDeserializer* deserializer) {
63
10407
  return Just(true);
64
}
65
66
namespace worker {
67
68
10667
Maybe<bool> TransferData::FinalizeTransferWrite(
69
    Local<Context> context, ValueSerializer* serializer) {
70
10667
  return Just(true);
71
}
72
73
79010
Message::Message(MallocedBuffer<char>&& buffer)
74
79010
    : main_message_buf_(std::move(buffer)) {}
75
76
109447
bool Message::IsCloseMessage() const {
77
109447
  return main_message_buf_.data == nullptr;
78
}
79
80
namespace {
81
82
// This is used to tell V8 how to read transferred host objects, like other
83
// `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
84
43934
class DeserializerDelegate : public ValueDeserializer::Delegate {
85
 public:
86
43969
  DeserializerDelegate(
87
      Message* m,
88
      Environment* env,
89
      const std::vector<BaseObjectPtr<BaseObject>>& host_objects,
90
      const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
91
      const std::vector<CompiledWasmModule>& wasm_modules)
92
43969
      : host_objects_(host_objects),
93
        shared_array_buffers_(shared_array_buffers),
94
43969
        wasm_modules_(wasm_modules) {}
95
96
10410
  MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
97
    // Identifying the index in the message's BaseObject array is sufficient.
98
    uint32_t id;
99
10410
    if (!deserializer->ReadUint32(&id))
100
      return MaybeLocal<Object>();
101
10410
    CHECK_LE(id, host_objects_.size());
102
20820
    return host_objects_[id]->object(isolate);
103
  }
104
105
416
  MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
106
      Isolate* isolate, uint32_t clone_id) override {
107
416
    CHECK_LE(clone_id, shared_array_buffers_.size());
108
832
    return shared_array_buffers_[clone_id];
109
  }
110
111
2
  MaybeLocal<WasmModuleObject> GetWasmModuleFromId(
112
      Isolate* isolate, uint32_t transfer_id) override {
113
2
    CHECK_LE(transfer_id, wasm_modules_.size());
114
    return WasmModuleObject::FromCompiledModule(
115
2
        isolate, wasm_modules_[transfer_id]);
116
  }
117
118
  ValueDeserializer* deserializer = nullptr;
119
120
 private:
121
  const std::vector<BaseObjectPtr<BaseObject>>& host_objects_;
122
  const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
123
  const std::vector<CompiledWasmModule>& wasm_modules_;
124
};
125
126
}  // anonymous namespace
127
128
44012
MaybeLocal<Value> Message::Deserialize(Environment* env,
129
                                       Local<Context> context) {
130
44012
  CHECK(!IsCloseMessage());
131
132
44017
  EscapableHandleScope handle_scope(env->isolate());
133
  Context::Scope context_scope(context);
134
135
  // Create all necessary objects for transferables, e.g. MessagePort handles.
136
88016
  std::vector<BaseObjectPtr<BaseObject>> host_objects(transferables_.size());
137
43826
  auto cleanup = OnScopeLeave([&]() {
138
43826
    for (BaseObjectPtr<BaseObject> object : host_objects) {
139
9
      if (!object) continue;
140
141
      // If the function did not finish successfully, host_objects will contain
142
      // a list of objects that will never be passed to JS. Therefore, we
143
      // destroy them here.
144
      object->Detach();
145
    }
146
132033
  });
147
148
54428
  for (uint32_t i = 0; i < transferables_.size(); ++i) {
149
10416
    TransferData* data = transferables_[i].get();
150
20832
    host_objects[i] = data->Deserialize(
151
20832
        env, context, std::move(transferables_[i]));
152
10422
    if (!host_objects[i]) return {};
153
  }
154
44004
  transferables_.clear();
155
156
87985
  std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
157
  // Attach all transferred SharedArrayBuffers to their new Isolate.
158
44427
  for (uint32_t i = 0; i < shared_array_buffers_.size(); ++i) {
159
    Local<SharedArrayBuffer> sab =
160
        SharedArrayBuffer::New(env->isolate(),
161
416
                               std::move(shared_array_buffers_[i]));
162
416
    shared_array_buffers.push_back(sab);
163
  }
164
165
  DeserializerDelegate delegate(
166
87971
      this, env, host_objects, shared_array_buffers, wasm_modules_);
167
  ValueDeserializer deserializer(
168
      env->isolate(),
169
44007
      reinterpret_cast<const uint8_t*>(main_message_buf_.data),
170
      main_message_buf_.size,
171
132002
      &delegate);
172
44013
  delegate.deserializer = &deserializer;
173
174
  // Attach all transferred ArrayBuffers to their new Isolate.
175
44022
  for (uint32_t i = 0; i < array_buffers_.size(); ++i) {
176
    Local<ArrayBuffer> ab =
177
9
        ArrayBuffer::New(env->isolate(), std::move(array_buffers_[i]));
178
9
    deserializer.TransferArrayBuffer(i, ab);
179
  }
180
181
87972
  if (deserializer.ReadHeader(context).IsNothing())
182
    return {};
183
  Local<Value> return_value;
184
87743
  if (!deserializer.ReadValue(context).ToLocal(&return_value))
185
    return {};
186
187
54194
  for (BaseObjectPtr<BaseObject> base_object : host_objects) {
188
20820
    if (base_object->FinalizeTransferRead(context, &deserializer).IsNothing())
189
      return {};
190
  }
191
192
43874
  host_objects.clear();
193
43988
  return handle_scope.Escape(return_value);
194
}
195
196
663
void Message::AddSharedArrayBuffer(
197
    std::shared_ptr<BackingStore> backing_store) {
198
663
  shared_array_buffers_.emplace_back(std::move(backing_store));
199
663
}
200
201
10674
void Message::AddTransferable(std::unique_ptr<TransferData>&& data) {
202
10674
  transferables_.emplace_back(std::move(data));
203
10674
}
204
205
2
uint32_t Message::AddWASMModule(CompiledWasmModule&& mod) {
206
2
  wasm_modules_.emplace_back(std::move(mod));
207
2
  return wasm_modules_.size() - 1;
208
}
209
210
namespace {
211
212
32887
MaybeLocal<Function> GetEmitMessageFunction(Local<Context> context) {
213
32887
  Isolate* isolate = context->GetIsolate();
214
  Local<Object> per_context_bindings;
215
  Local<Value> emit_message_val;
216

131548
  if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
217
98661
      !per_context_bindings->Get(context,
218
98661
                                FIXED_ONE_BYTE_STRING(isolate, "emitMessage"))
219
32887
          .ToLocal(&emit_message_val)) {
220
    return MaybeLocal<Function>();
221
  }
222
32887
  CHECK(emit_message_val->IsFunction());
223
32887
  return emit_message_val.As<Function>();
224
}
225
226
460
MaybeLocal<Function> GetDOMException(Local<Context> context) {
227
460
  Isolate* isolate = context->GetIsolate();
228
  Local<Object> per_context_bindings;
229
  Local<Value> domexception_ctor_val;
230

1840
  if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
231
1380
      !per_context_bindings->Get(context,
232
1380
                                FIXED_ONE_BYTE_STRING(isolate, "DOMException"))
233
460
          .ToLocal(&domexception_ctor_val)) {
234
    return MaybeLocal<Function>();
235
  }
236
460
  CHECK(domexception_ctor_val->IsFunction());
237
460
  Local<Function> domexception_ctor = domexception_ctor_val.As<Function>();
238
460
  return domexception_ctor;
239
}
240
241
16
void ThrowDataCloneException(Local<Context> context, Local<String> message) {
242
16
  Isolate* isolate = context->GetIsolate();
243
  Local<Value> argv[] = {message,
244
48
                         FIXED_ONE_BYTE_STRING(isolate, "DataCloneError")};
245
  Local<Value> exception;
246
  Local<Function> domexception_ctor;
247

64
  if (!GetDOMException(context).ToLocal(&domexception_ctor) ||
248
64
      !domexception_ctor->NewInstance(context, arraysize(argv), argv)
249
16
           .ToLocal(&exception)) {
250
    return;
251
  }
252
16
  isolate->ThrowException(exception);
253
}
254
255
// This tells V8 how to serialize objects that it does not understand
256
// (e.g. C++ objects) into the output buffer, in a way that our own
257
// DeserializerDelegate understands how to unpack.
258
44962
class SerializerDelegate : public ValueSerializer::Delegate {
259
 public:
260
44962
  SerializerDelegate(Environment* env, Local<Context> context, Message* m)
261
44962
      : env_(env), context_(context), msg_(m) {}
262
263
6
  void ThrowDataCloneError(Local<String> message) override {
264
6
    ThrowDataCloneException(context_, message);
265
6
  }
266
267
10681
  Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
268
21362
    if (env_->base_object_ctor_template()->HasInstance(object)) {
269
      return WriteHostObject(
270
10681
          BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(object) });
271
    }
272
273
    ThrowDataCloneError(env_->clone_unsupported_type_str());
274
    return Nothing<bool>();
275
  }
276
277
663
  Maybe<uint32_t> GetSharedArrayBufferId(
278
      Isolate* isolate,
279
      Local<SharedArrayBuffer> shared_array_buffer) override {
280
    uint32_t i;
281
688
    for (i = 0; i < seen_shared_array_buffers_.size(); ++i) {
282
50
      if (PersistentToLocal::Strong(seen_shared_array_buffers_[i]) ==
283
          shared_array_buffer) {
284
        return Just(i);
285
      }
286
    }
287
288
663
    seen_shared_array_buffers_.emplace_back(
289
1326
      Global<SharedArrayBuffer> { isolate, shared_array_buffer });
290
663
    msg_->AddSharedArrayBuffer(shared_array_buffer->GetBackingStore());
291
663
    return Just(i);
292
  }
293
294
2
  Maybe<uint32_t> GetWasmModuleTransferId(
295
      Isolate* isolate, Local<WasmModuleObject> module) override {
296
2
    return Just(msg_->AddWASMModule(module->GetCompiledModule()));
297
  }
298
299
44943
  Maybe<bool> Finish(Local<Context> context) {
300
55617
    for (uint32_t i = 0; i < host_objects_.size(); i++) {
301
21352
      BaseObjectPtr<BaseObject> host_object = std::move(host_objects_[i]);
302
21352
      std::unique_ptr<TransferData> data;
303
10678
      if (i < first_cloned_object_index_)
304
10666
        data = host_object->TransferForMessaging();
305
10678
      if (!data)
306
15
        data = host_object->CloneForMessaging();
307
10678
      if (!data) return Nothing<bool>();
308
21350
      if (data->FinalizeTransferWrite(context, serializer).IsNothing())
309
1
        return Nothing<bool>();
310
10674
      msg_->AddTransferable(std::move(data));
311
    }
312
44938
    return Just(true);
313
  }
314
315
10673
  inline void AddHostObject(BaseObjectPtr<BaseObject> host_object) {
316
    // Make sure we have not started serializing the value itself yet.
317
10673
    CHECK_EQ(first_cloned_object_index_, SIZE_MAX);
318
10673
    host_objects_.emplace_back(std::move(host_object));
319
10673
  }
320
321
  // Some objects in the transfer list may register sub-objects that can be
322
  // transferred. This could e.g. be a public JS wrapper object, such as a
323
  // FileHandle, that is registering its C++ handle for transfer.
324
44951
  inline Maybe<bool> AddNestedHostObjects() {
325
55623
    for (size_t i = 0; i < host_objects_.size(); i++) {
326
21344
      std::vector<BaseObjectPtr<BaseObject>> nested_transferables;
327
21344
      if (!host_objects_[i]->NestedTransferables().To(&nested_transferables))
328
        return Nothing<bool>();
329

10682
      for (auto nested_transferable : nested_transferables) {
330
20
        if (std::find(host_objects_.begin(),
331
                      host_objects_.end(),
332
20
                      nested_transferable) == host_objects_.end()) {
333
10
          AddHostObject(nested_transferable);
334
        }
335
      }
336
    }
337
44951
    return Just(true);
338
  }
339
340
  ValueSerializer* serializer = nullptr;
341
342
 private:
343
10681
  Maybe<bool> WriteHostObject(BaseObjectPtr<BaseObject> host_object) {
344
10681
    BaseObject::TransferMode mode = host_object->GetTransferMode();
345
10681
    if (mode == BaseObject::TransferMode::kUntransferable) {
346
2
      ThrowDataCloneError(env_->clone_unsupported_type_str());
347
2
      return Nothing<bool>();
348
    }
349
350
10693
    for (uint32_t i = 0; i < host_objects_.size(); i++) {
351
10677
      if (host_objects_[i] == host_object) {
352
10663
        serializer->WriteUint32(i);
353
10663
        return Just(true);
354
      }
355
    }
356
357
16
    if (mode == BaseObject::TransferMode::kTransferable) {
358
4
      THROW_ERR_MISSING_TRANSFERABLE_IN_TRANSFER_LIST(env_);
359
4
      return Nothing<bool>();
360
    }
361
362
12
    CHECK_EQ(mode, BaseObject::TransferMode::kCloneable);
363
12
    uint32_t index = host_objects_.size();
364
12
    if (first_cloned_object_index_ == SIZE_MAX)
365
11
      first_cloned_object_index_ = index;
366
12
    serializer->WriteUint32(index);
367
12
    host_objects_.push_back(host_object);
368
12
    return Just(true);
369
  }
370
371
  Environment* env_;
372
  Local<Context> context_;
373
  Message* msg_;
374
  std::vector<Global<SharedArrayBuffer>> seen_shared_array_buffers_;
375
  std::vector<BaseObjectPtr<BaseObject>> host_objects_;
376
  size_t first_cloned_object_index_ = SIZE_MAX;
377
378
  friend class worker::Message;
379
};
380
381
}  // anonymous namespace
382
383
44962
Maybe<bool> Message::Serialize(Environment* env,
384
                               Local<Context> context,
385
                               Local<Value> input,
386
                               const TransferList& transfer_list_v,
387
                               Local<Object> source_port) {
388
89923
  HandleScope handle_scope(env->isolate());
389
  Context::Scope context_scope(context);
390
391
  // Verify that we're not silently overwriting an existing message.
392
44962
  CHECK(main_message_buf_.is_empty());
393
394
89924
  SerializerDelegate delegate(env, context, this);
395
89924
  ValueSerializer serializer(env->isolate(), &delegate);
396
44961
  delegate.serializer = &serializer;
397
398
89923
  std::vector<Local<ArrayBuffer>> array_buffers;
399
55651
  for (uint32_t i = 0; i < transfer_list_v.length(); ++i) {
400
10700
    Local<Value> entry = transfer_list_v[i];
401
10700
    if (entry->IsObject()) {
402
      // See https://github.com/nodejs/node/pull/30339#issuecomment-552225353
403
      // for details.
404
      bool untransferable;
405
32100
      if (!entry.As<Object>()->HasPrivate(
406
              context,
407
21400
              env->untransferable_object_private_symbol())
408
10700
              .To(&untransferable)) {
409
        return Nothing<bool>();
410
      }
411
10700
      if (untransferable) continue;
412
    }
413
414
    // Currently, we support ArrayBuffers and BaseObjects for which
415
    // GetTransferMode() does not return kUntransferable.
416
10695
    if (entry->IsArrayBuffer()) {
417
23
      Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
418
      // If we cannot render the ArrayBuffer unusable in this Isolate,
419
      // copying the buffer will have to do.
420
      // Note that we can currently transfer ArrayBuffers even if they were
421
      // not allocated by Node’s ArrayBufferAllocator in the first place,
422
      // because we pass the underlying v8::BackingStore around rather than
423
      // raw data *and* an Isolate with a non-default ArrayBuffer allocator
424
      // is always going to outlive any Workers it creates, and so will its
425
      // allocator along with it.
426
45
      if (!ab->IsDetachable()) continue;
427
46
      if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
428
46
          array_buffers.end()) {
429
1
        ThrowDataCloneException(
430
            context,
431
            FIXED_ONE_BYTE_STRING(
432
                env->isolate(),
433
1
                "Transfer list contains duplicate ArrayBuffer"));
434
1
        return Nothing<bool>();
435
      }
436
      // We simply use the array index in the `array_buffers` list as the
437
      // ID that we write into the serialized buffer.
438
22
      uint32_t id = array_buffers.size();
439
22
      array_buffers.push_back(ab);
440
22
      serializer.TransferArrayBuffer(id, ab);
441
22
      continue;
442
21344
    } else if (env->base_object_ctor_template()->HasInstance(entry)) {
443
      // Check if the source MessagePort is being transferred.
444

21344
      if (!source_port.IsEmpty() && entry == source_port) {
445
1
        ThrowDataCloneException(
446
            context,
447
            FIXED_ONE_BYTE_STRING(env->isolate(),
448
1
                                  "Transfer list contains source port"));
449
10
        return Nothing<bool>();
450
      }
451
      BaseObjectPtr<BaseObject> host_object {
452
10671
          Unwrap<BaseObject>(entry.As<Object>()) };
453

32010
      if (env->message_port_constructor_template()->HasInstance(entry) &&
454
21321
          (!host_object ||
455
10660
           static_cast<MessagePort*>(host_object.get())->IsDetached())) {
456
7
        ThrowDataCloneException(
457
            context,
458
            FIXED_ONE_BYTE_STRING(
459
                env->isolate(),
460
7
                "MessagePort in transfer list is already detached"));
461
7
        return Nothing<bool>();
462
      }
463
21328
      if (std::find(delegate.host_objects_.begin(),
464
                    delegate.host_objects_.end(),
465
21328
                    host_object) != delegate.host_objects_.end()) {
466
1
        ThrowDataCloneException(
467
            context,
468
            String::Concat(env->isolate(),
469
                FIXED_ONE_BYTE_STRING(
470
                  env->isolate(),
471
                  "Transfer list contains duplicate "),
472
2
                entry.As<Object>()->GetConstructorName()));
473
1
        return Nothing<bool>();
474
      }
475

10663
      if (host_object && host_object->GetTransferMode() !=
476
10663
              BaseObject::TransferMode::kUntransferable) {
477
10663
        delegate.AddHostObject(host_object);
478
10663
        continue;
479
      }
480
    }
481
482
    THROW_ERR_INVALID_TRANSFER_OBJECT(env);
483
    return Nothing<bool>();
484
  }
485
89902
  if (delegate.AddNestedHostObjects().IsNothing())
486
    return Nothing<bool>();
487
488
44951
  serializer.WriteHeader();
489
89903
  if (serializer.WriteValue(context, input).IsNothing()) {
490
9
    return Nothing<bool>();
491
  }
492
493
44957
  for (Local<ArrayBuffer> ab : array_buffers) {
494
    // If serialization succeeded, we render it inaccessible in this Isolate.
495
28
    std::shared_ptr<BackingStore> backing_store = ab->GetBackingStore();
496
14
    ab->Detach();
497
498
14
    array_buffers_.emplace_back(std::move(backing_store));
499
  }
500
501
89886
  if (delegate.Finish(context).IsNothing())
502
4
    return Nothing<bool>();
503
504
  // The serializer gave us a buffer allocated using `malloc()`.
505
44939
  std::pair<uint8_t*, size_t> data = serializer.Release();
506
44939
  CHECK_NOT_NULL(data.first);
507
  main_message_buf_ =
508
44939
      MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
509
44937
  return Just(true);
510
}
511
512
void Message::MemoryInfo(MemoryTracker* tracker) const {
513
  tracker->TrackField("array_buffers_", array_buffers_);
514
  tracker->TrackField("shared_array_buffers", shared_array_buffers_);
515
  tracker->TrackField("transferables", transferables_);
516
}
517
518
// TODO(@jasnell): The name here will be an empty string if the
519
// one-to-one MessageChannel is used. In such cases,
520
// SiblingGroup::Get() will return nothing and group_ will be
521
// an empty pointer. @addaleax suggests that the code here
522
// could be clearer if attaching the SiblingGroup were a
523
// separate step rather than part of the constructor here.
524
33521
MessagePortData::MessagePortData(
525
    MessagePort* owner,
526
33521
    const std::string& name)
527
    : owner_(owner),
528
33521
      group_(SiblingGroup::Get(name)) {
529
33521
  if (group_)
530
38
    group_->Entangle(this);
531
33521
}
532
533
100190
MessagePortData::~MessagePortData() {
534
33396
  CHECK_NULL(owner_);
535
33396
  Disentangle();
536
66828
}
537
538
2
void MessagePortData::MemoryInfo(MemoryTracker* tracker) const {
539
4
  Mutex::ScopedLock lock(mutex_);
540
2
  tracker->TrackField("incoming_messages", incoming_messages_);
541
2
}
542
543
78905
void MessagePortData::AddToIncomingQueue(std::shared_ptr<Message> message) {
544
  // This function will be called by other threads.
545
157813
  Mutex::ScopedLock lock(mutex_);
546
78912
  incoming_messages_.emplace_back(std::move(message));
547
548
78906
  if (owner_ != nullptr) {
549
54408
    Debug(owner_, "Adding message to incoming queue");
550
54408
    owner_->TriggerAsync();
551
  }
552
78912
}
553
554
11345
void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
555
11345
  CHECK(!a->group_);
556
11345
  CHECK(!b->group_);
557
11345
  b->group_ = a->group_ = std::make_shared<SiblingGroup>();
558
11345
  a->group_->Entangle(a);
559
11345
  a->group_->Entangle(b);
560
11345
}
561
562
55596
void MessagePortData::Disentangle() {
563
55596
  if (group_) {
564
22704
    group_->Disentangle(this);
565
22704
    group_.reset();
566
  }
567
55595
}
568
569
131434
MessagePort::~MessagePort() {
570
32858
  if (data_) Detach();
571
65720
}
572
573
32887
MessagePort::MessagePort(Environment* env,
574
                         Local<Context> context,
575
                         Local<Object> wrap,
576
32887
                         const std::string& name)
577
  : HandleWrap(env,
578
               wrap,
579
32887
               reinterpret_cast<uv_handle_t*>(&async_),
580
               AsyncWrap::PROVIDER_MESSAGEPORT),
581
32887
    data_(new MessagePortData(this, name)) {
582
96127
  auto onmessage = [](uv_async_t* handle) {
583
    // Called when data has been put into the queue.
584
31620
    MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
585
31620
    channel->OnMessage();
586
96124
  };
587
588
32887
  CHECK_EQ(uv_async_init(env->event_loop(),
589
                         &async_,
590
                         onmessage), 0);
591
  // Reset later to indicate success of the constructor.
592
32887
  bool succeeded = false;
593
98661
  auto cleanup = OnScopeLeave([&]() { if (!succeeded) Close(); });
594
595
  Local<Value> fn;
596
98661
  if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn))
597
    return;
598
599
32887
  if (fn->IsFunction()) {
600
32882
    Local<Function> init = fn.As<Function>();
601
65764
    if (init->Call(context, wrap, 0, nullptr).IsEmpty())
602
      return;
603
  }
604
605
  Local<Function> emit_message_fn;
606
65774
  if (!GetEmitMessageFunction(context).ToLocal(&emit_message_fn))
607
    return;
608
32887
  emit_message_fn_.Reset(env->isolate(), emit_message_fn);
609
610
32887
  succeeded = true;
611
32887
  Debug(this, "Created message port");
612
}
613
614
31971
bool MessagePort::IsDetached() const {
615

31971
  return data_ == nullptr || IsHandleClosing();
616
}
617
618
76093
void MessagePort::TriggerAsync() {
619
76093
  if (IsHandleClosing()) return;
620
76056
  CHECK_EQ(uv_async_send(&async_), 0);
621
}
622
623
32863
void MessagePort::Close(v8::Local<v8::Value> close_callback) {
624
65727
  Debug(this, "Closing message port, data set = %d", static_cast<int>(!!data_));
625
626
32864
  if (data_) {
627
    // Wrap this call with accessing the mutex, so that TriggerAsync()
628
    // can check IsHandleClosing() without race conditions.
629
65715
    Mutex::ScopedLock sibling_lock(data_->mutex_);
630
32859
    HandleWrap::Close(close_callback);
631
  } else {
632
5
    HandleWrap::Close(close_callback);
633
  }
634
32864
}
635
636
2
void MessagePort::New(const FunctionCallbackInfo<Value>& args) {
637
  // This constructor just throws an error. Unfortunately, we can’t use V8’s
638
  // ConstructorBehavior::kThrow, as that also removes the prototype from the
639
  // class (i.e. makes it behave like an arrow function).
640
2
  Environment* env = Environment::GetCurrent(args);
641
2
  THROW_ERR_CONSTRUCT_CALL_INVALID(env);
642
2
}
643
644
32887
MessagePort* MessagePort::New(
645
    Environment* env,
646
    Local<Context> context,
647
    std::unique_ptr<MessagePortData> data,
648
    const std::string& name) {
649
  Context::Scope context_scope(context);
650
32887
  Local<FunctionTemplate> ctor_templ = GetMessagePortConstructorTemplate(env);
651
652
  // Construct a new instance, then assign the listener instance and possibly
653
  // the MessagePortData to it.
654
  Local<Object> instance;
655
98661
  if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance))
656
    return nullptr;
657
32887
  MessagePort* port = new MessagePort(env, context, instance, name);
658
32887
  CHECK_NOT_NULL(port);
659
32887
  if (port->IsHandleClosing()) {
660
    // Construction failed with an exception.
661
    return nullptr;
662
  }
663
664
32887
  if (data) {
665
10793
    port->Detach();
666
10793
    port->data_ = std::move(data);
667
668
    // This lock is here to avoid race conditions with the `owner_` read
669
    // in AddToIncomingQueue(). (This would likely be unproblematic without it,
670
    // but it's better to be safe than sorry.)
671
21586
    Mutex::ScopedLock lock(port->data_->mutex_);
672
10793
    port->data_->owner_ = port;
673
    // If the existing MessagePortData object had pending messages, this is
674
    // the easiest way to run that queue.
675
10793
    port->TriggerAsync();
676
  }
677
32887
  return port;
678
}
679
680
76012
MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
681
                                              bool only_if_receiving) {
682
152075
  std::shared_ptr<Message> received;
683
  {
684
    // Get the head of the message queue.
685
131302
    Mutex::ScopedLock lock(data_->mutex_);
686
687
76063
    Debug(this, "MessagePort has message");
688
689

76055
    bool wants_message = receiving_messages_ || !only_if_receiving;
690
    // We have nothing to do if:
691
    // - There are no pending messages
692
    // - We are not intending to receive messages, and the message we would
693
    //   receive is not the final "close" message.
694

152107
    if (data_->incoming_messages_.empty() ||
695
65533
        (!wants_message &&
696
10266
         !data_->incoming_messages_.front()->IsCloseMessage())) {
697
41572
      return env()->no_message_symbol();
698
    }
699
700
55239
    received = data_->incoming_messages_.front();
701
55289
    data_->incoming_messages_.pop_front();
702
  }
703
704
55290
  if (received->IsCloseMessage()) {
705
22538
    Close();
706
22538
    return env()->no_message_symbol();
707
  }
708
709
44025
  if (!env()->can_call_into_js()) return MaybeLocal<Value>();
710
711
44018
  return received->Deserialize(env(), context);
712
}
713
714
32114
void MessagePort::OnMessage() {
715
32114
  Debug(this, "Running MessagePort::OnMessage()");
716
64166
  HandleScope handle_scope(env()->isolate());
717
64228
  Local<Context> context = object(env()->isolate())->CreationContext();
718
719
  size_t processing_limit;
720
  {
721
32114
    Mutex::ScopedLock(data_->mutex_);
722
64228
    processing_limit = std::max(data_->incoming_messages_.size(),
723
96342
                                static_cast<size_t>(1000));
724
  }
725
726
  // data_ can only ever be modified by the owner thread, so no need to lock.
727
  // However, the message port may be transferred while it is processing
728
  // messages, so we need to check that this handle still owns its `data_` field
729
  // on every iteration.
730

120036
  while (data_) {
731
75998
    if (processing_limit-- == 0) {
732
      // Prevent event loop starvation by only processing those messages without
733
      // interruption that were already present when the OnMessage() call was
734
      // first triggered, but at least 1000 messages because otherwise the
735
      // overhead of repeatedly triggering the uv_async_t instance becomes
736
      // noticable, at least on Windows.
737
      // (That might require more investigation by somebody more familiar with
738
      // Windows.)
739
1
      TriggerAsync();
740
1
      return;
741
    }
742
743
119958
    HandleScope handle_scope(env()->isolate());
744

43961
    Context::Scope context_scope(context);
745
76034
    Local<Function> emit_message = PersistentToLocal::Strong(emit_message_fn_);
746
747
    Local<Value> payload;
748
    Local<Value> message_error;
749
228055
    Local<Value> argv[2];
750
751
    {
752
      // Catch any exceptions from parsing the message itself (not from
753
      // emitting it) as 'messageeror' events.
754
151921
      TryCatchScope try_catch(env());
755
151898
      if (!ReceiveMessage(context, true).ToLocal(&payload)) {
756

6
        if (try_catch.HasCaught() && !try_catch.HasTerminated())
757
6
          message_error = try_catch.Exception();
758
6
        goto reschedule;
759
      }
760
    }
761
151960
    if (payload == env()->no_message_symbol()) break;
762
763
44006
    if (!env()->can_call_into_js()) {
764
      Debug(this, "MessagePort drains queue because !can_call_into_js()");
765
      // In this case there is nothing to do but to drain the current queue.
766
      continue;
767
    }
768
769
43996
    argv[0] = payload;
770
87984
    argv[1] = env()->message_string();
771
772
88001
    if (MakeCallback(emit_message, arraysize(argv), argv).IsEmpty()) {
773
    reschedule:
774
58
      if (!message_error.IsEmpty()) {
775
6
        argv[0] = message_error;
776
12
        argv[1] = env()->messageerror_string();
777
6
        USE(MakeCallback(emit_message, arraysize(argv), argv));
778
      }
779
780
      // Re-schedule OnMessage() execution in case of failure.
781
58
      if (data_)
782
58
        TriggerAsync();
783
58
      return;
784
    }
785
  }
786
}
787
788
32858
void MessagePort::OnClose() {
789
32858
  Debug(this, "MessagePort::OnClose()");
790
32858
  if (data_) {
791
    // Detach() returns move(data_).
792
22207
    Detach()->Disentangle();
793
  }
794
32861
}
795
796
43644
std::unique_ptr<MessagePortData> MessagePort::Detach() {
797
43644
  CHECK(data_);
798
87300
  Mutex::ScopedLock lock(data_->mutex_);
799
43653
  data_->owner_ = nullptr;
800
87310
  return std::move(data_);
801
}
802
803
21306
BaseObject::TransferMode MessagePort::GetTransferMode() const {
804
21306
  if (IsDetached())
805
1
    return BaseObject::TransferMode::kUntransferable;
806
21305
  return BaseObject::TransferMode::kTransferable;
807
}
808
809
10650
std::unique_ptr<TransferData> MessagePort::TransferForMessaging() {
810
21300
  Close();
811
10650
  return Detach();
812
}
813
814
10397
BaseObjectPtr<BaseObject> MessagePortData::Deserialize(
815
    Environment* env,
816
    Local<Context> context,
817
    std::unique_ptr<TransferData> self) {
818
41588
  return BaseObjectPtr<MessagePort> { MessagePort::New(
819
      env, context,
820
41588
      static_unique_pointer_cast<MessagePortData>(std::move(self))) };
821
}
822
823
44961
Maybe<bool> MessagePort::PostMessage(Environment* env,
824
                                     Local<Value> message_v,
825
                                     const TransferList& transfer_v) {
826
44961
  Isolate* isolate = env->isolate();
827
44961
  Local<Object> obj = object(isolate);
828
44961
  Local<Context> context = obj->CreationContext();
829
830
89922
  std::shared_ptr<Message> msg = std::make_shared<Message>();
831
832
  // Per spec, we need to both check if transfer list has the source port, and
833
  // serialize the input message, even if the MessagePort is closed or detached.
834
835
  Maybe<bool> serialization_maybe =
836
44961
      msg->Serialize(env, context, message_v, transfer_v, obj);
837
44960
  if (data_ == nullptr) {
838
2
    return serialization_maybe;
839
  }
840
44958
  if (serialization_maybe.IsNothing()) {
841
20
    return Nothing<bool>();
842
  }
843
844
89877
  std::string error;
845
44938
  Maybe<bool> res = data_->Dispatch(msg, &error);
846
44939
  if (res.IsNothing())
847
    return res;
848
849
44939
  if (!error.empty())
850
2
    ProcessEmitWarning(env, error.c_str());
851
852
44939
  return res;
853
}
854
855
44939
Maybe<bool> MessagePortData::Dispatch(
856
    std::shared_ptr<Message> message,
857
    std::string* error) {
858
44939
  if (!group_) {
859
    if (error != nullptr)
860
      *error = "MessagePortData is not entangled.";
861
    return Nothing<bool>();
862
  }
863
44939
  return group_->Dispatch(this, message, error);
864
}
865
866
10708
static Maybe<bool> ReadIterable(Environment* env,
867
                                Local<Context> context,
868
                                // NOLINTNEXTLINE(runtime/references)
869
                                TransferList& transfer_list,
870
                                Local<Value> object) {
871
10708
  if (!object->IsObject()) return Just(false);
872
873
10705
  if (object->IsArray()) {
874
10685
    Local<Array> arr = object.As<Array>();
875
10685
    size_t length = arr->Length();
876
10685
    transfer_list.AllocateSufficientStorage(length);
877
21383
    for (size_t i = 0; i < length; i++) {
878
32094
      if (!arr->Get(context, i).ToLocal(&transfer_list[i]))
879
        return Nothing<bool>();
880
    }
881
10685
    return Just(true);
882
  }
883
884
20
  Isolate* isolate = env->isolate();
885
  Local<Value> iterator_method;
886
80
  if (!object.As<Object>()->Get(context, Symbol::GetIterator(isolate))
887
20
      .ToLocal(&iterator_method)) return Nothing<bool>();
888
20
  if (!iterator_method->IsFunction()) return Just(false);
889
890
  Local<Value> iterator;
891
18
  if (!iterator_method.As<Function>()->Call(context, object, 0, nullptr)
892
6
      .ToLocal(&iterator)) return Nothing<bool>();
893
6
  if (!iterator->IsObject()) return Just(false);
894
895
  Local<Value> next;
896
24
  if (!iterator.As<Object>()->Get(context, env->next_string()).ToLocal(&next))
897
    return Nothing<bool>();
898
6
  if (!next->IsFunction()) return Just(false);
899
900
6
  std::vector<Local<Value>> entries;
901
7
  while (env->can_call_into_js()) {
902
    Local<Value> result;
903
15
    if (!next.As<Function>()->Call(context, iterator, 0, nullptr)
904
7
        .ToLocal(&result)) return Nothing<bool>();
905
4
    if (!result->IsObject()) return Just(false);
906
907
    Local<Value> done;
908
16
    if (!result.As<Object>()->Get(context, env->done_string()).ToLocal(&done))
909
      return Nothing<bool>();
910
4
    if (done->BooleanValue(isolate)) break;
911
912
    Local<Value> val;
913
8
    if (!result.As<Object>()->Get(context, env->value_string()).ToLocal(&val))
914
      return Nothing<bool>();
915
2
    entries.push_back(val);
916
  }
917
918
2
  transfer_list.AllocateSufficientStorage(entries.size());
919
2
  std::copy(entries.begin(), entries.end(), &transfer_list[0]);
920
2
  return Just(true);
921
}
922
923
44974
void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
924
44974
  Environment* env = Environment::GetCurrent(args);
925
44974
  Local<Object> obj = args.This();
926
44974
  Local<Context> context = obj->CreationContext();
927
928
44974
  if (args.Length() == 0) {
929
    return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
930
13
                                       "MessagePort.postMessage");
931
  }
932
933

156324
  if (!args[1]->IsNullOrUndefined() && !args[1]->IsObject()) {
934
    // Browsers ignore null or undefined, and otherwise accept an array or an
935
    // options object.
936
    return THROW_ERR_INVALID_ARG_TYPE(env,
937
4
        "Optional transferList argument must be an iterable");
938
  }
939
940
89931
  TransferList transfer_list;
941
89940
  if (args[1]->IsObject()) {
942
    bool was_iterable;
943
21394
    if (!ReadIterable(env, context, transfer_list, args[1]).To(&was_iterable))
944
8
      return;
945
10697
    if (!was_iterable) {
946
      Local<Value> transfer_option;
947
65
      if (!args[1].As<Object>()->Get(context, env->transfer_string())
948
21
          .ToLocal(&transfer_option)) return;
949
26
      if (!transfer_option->IsUndefined()) {
950
22
        if (!ReadIterable(env, context, transfer_list, transfer_option)
951
12
            .To(&was_iterable)) return;
952
10
        if (!was_iterable) {
953
          return THROW_ERR_INVALID_ARG_TYPE(env,
954
7
              "Optional options.transfer argument must be an iterable");
955
        }
956
      }
957
    }
958
  }
959
960
44962
  MessagePort* port = Unwrap<MessagePort>(args.This());
961
  // Even if the backing MessagePort object has already been deleted, we still
962
  // want to serialize the message to ensure spec-compliant behavior w.r.t.
963
  // transfers.
964
44962
  if (port == nullptr) {
965
2
    Message msg;
966
1
    USE(msg.Serialize(env, context, args[0], transfer_list, obj));
967
1
    return;
968
  }
969
970
44961
  Maybe<bool> res = port->PostMessage(env, args[0], transfer_list);
971
44961
  if (res.IsJust())
972
134817
    args.GetReturnValue().Set(res.FromJust());
973
}
974
975
23247
void MessagePort::Start() {
976
23247
  Debug(this, "Start receiving messages");
977
23247
  receiving_messages_ = true;
978
46494
  Mutex::ScopedLock lock(data_->mutex_);
979
23247
  if (!data_->incoming_messages_.empty())
980
10834
    TriggerAsync();
981
23247
}
982
983
20325
void MessagePort::Stop() {
984
20325
  Debug(this, "Stop receiving messages");
985
20325
  receiving_messages_ = false;
986
20325
}
987
988
23248
void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
989
  MessagePort* port;
990
23249
  ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
991
23248
  if (!port->data_) {
992
1
    return;
993
  }
994
23247
  port->Start();
995
}
996
997
20514
void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
998
  MessagePort* port;
999
41028
  CHECK(args[0]->IsObject());
1000
41217
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
1001
20327
  if (!port->data_) {
1002
2
    return;
1003
  }
1004
20325
  port->Stop();
1005
}
1006
1007
6
void MessagePort::CheckType(const FunctionCallbackInfo<Value>& args) {
1008
6
  Environment* env = Environment::GetCurrent(args);
1009
18
  args.GetReturnValue().Set(
1010
18
      GetMessagePortConstructorTemplate(env)->HasInstance(args[0]));
1011
6
}
1012
1013
1208
void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
1014
  MessagePort* port;
1015
2416
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
1016
494
  port->OnMessage();
1017
}
1018
1019
11
void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) {
1020
11
  Environment* env = Environment::GetCurrent(args);
1021

41
  if (!args[0]->IsObject() ||
1022
27
      !env->message_port_constructor_template()->HasInstance(args[0])) {
1023
    return THROW_ERR_INVALID_ARG_TYPE(env,
1024
10
        "The \"port\" argument must be a MessagePort instance");
1025
  }
1026
12
  MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1027
6
  if (port == nullptr) {
1028
    // Return 'no messages' for a closed port.
1029
    args.GetReturnValue().Set(
1030
        Environment::GetCurrent(args)->no_message_symbol());
1031
    return;
1032
  }
1033
1034
  MaybeLocal<Value> payload =
1035
12
      port->ReceiveMessage(port->object()->CreationContext(), false);
1036
6
  if (!payload.IsEmpty())
1037
12
    args.GetReturnValue().Set(payload.ToLocalChecked());
1038
}
1039
1040
5
void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
1041
5
  Environment* env = Environment::GetCurrent(args);
1042

20
  if (!args[0]->IsObject() ||
1043
15
      !env->message_port_constructor_template()->HasInstance(args[0])) {
1044
    return THROW_ERR_INVALID_ARG_TYPE(env,
1045
        "The \"port\" argument must be a MessagePort instance");
1046
  }
1047
10
  MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1048
5
  CHECK_NOT_NULL(port);
1049
1050
5
  Local<Value> context_arg = args[1];
1051
  ContextifyContext* context_wrapper;
1052

15
  if (!context_arg->IsObject() ||
1053
      (context_wrapper = ContextifyContext::ContextFromContextifiedSandbox(
1054
10
          env, context_arg.As<Object>())) == nullptr) {
1055
    return THROW_ERR_INVALID_ARG_TYPE(env, "Invalid context argument");
1056
  }
1057
1058
10
  std::unique_ptr<MessagePortData> data;
1059
5
  if (!port->IsDetached())
1060
5
    data = port->Detach();
1061
1062
5
  Context::Scope context_scope(context_wrapper->context());
1063
  MessagePort* target =
1064
5
      MessagePort::New(env, context_wrapper->context(), std::move(data));
1065
5
  if (target != nullptr)
1066
15
    args.GetReturnValue().Set(target->object());
1067
}
1068
1069
10711
void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
1070
10711
  Entangle(a, b->data_.get());
1071
10711
}
1072
1073
11345
void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
1074
11345
  MessagePortData::Entangle(a->data_.get(), b);
1075
11345
}
1076
1077
2
void MessagePort::MemoryInfo(MemoryTracker* tracker) const {
1078
2
  tracker->TrackField("data", data_);
1079
2
  tracker->TrackField("emit_message_fn", emit_message_fn_);
1080
2
}
1081
1082
33781
Local<FunctionTemplate> GetMessagePortConstructorTemplate(Environment* env) {
1083
  // Factor generating the MessagePort JS constructor into its own piece
1084
  // of code, because it is needed early on in the child environment setup.
1085
33781
  Local<FunctionTemplate> templ = env->message_port_constructor_template();
1086
33781
  if (!templ.IsEmpty())
1087
33337
    return templ;
1088
1089
  {
1090
444
    Local<FunctionTemplate> m = env->NewFunctionTemplate(MessagePort::New);
1091
888
    m->SetClassName(env->message_port_constructor_string());
1092
1332
    m->InstanceTemplate()->SetInternalFieldCount(
1093
444
        MessagePort::kInternalFieldCount);
1094
888
    m->Inherit(HandleWrap::GetConstructorTemplate(env));
1095
1096
444
    env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage);
1097
444
    env->SetProtoMethod(m, "start", MessagePort::Start);
1098
1099
444
    env->set_message_port_constructor_template(m);
1100
  }
1101
1102
444
  return GetMessagePortConstructorTemplate(env);
1103
}
1104
1105
3240
JSTransferable::JSTransferable(Environment* env, Local<Object> obj)
1106
3240
    : BaseObject(env, obj) {
1107
3240
  MakeWeak();
1108
3240
}
1109
1110
3240
void JSTransferable::New(const FunctionCallbackInfo<Value>& args) {
1111
3240
  CHECK(args.IsConstructCall());
1112
6480
  new JSTransferable(Environment::GetCurrent(args), args.This());
1113
3240
}
1114
1115
22
JSTransferable::TransferMode JSTransferable::GetTransferMode() const {
1116
  // Implement `kClone in this ? kCloneable : kTransferable`.
1117
44
  HandleScope handle_scope(env()->isolate());
1118
44
  errors::TryCatchScope ignore_exceptions(env());
1119
1120
  bool has_clone;
1121
88
  if (!object()->Has(env()->context(),
1122
110
                     env()->messaging_clone_symbol()).To(&has_clone)) {
1123
    return TransferMode::kUntransferable;
1124
  }
1125
1126
22
  return has_clone ? TransferMode::kCloneable : TransferMode::kTransferable;
1127
}
1128
1129
10
std::unique_ptr<TransferData> JSTransferable::TransferForMessaging() {
1130
10
  return TransferOrClone(TransferMode::kTransferable);
1131
}
1132
1133
4
std::unique_ptr<TransferData> JSTransferable::CloneForMessaging() const {
1134
4
  return TransferOrClone(TransferMode::kCloneable);
1135
}
1136
1137
14
std::unique_ptr<TransferData> JSTransferable::TransferOrClone(
1138
    TransferMode mode) const {
1139
  // Call `this[symbol]()` where `symbol` is `kClone` or `kTransfer`,
1140
  // which should return an object with `data` and `deserializeInfo` properties;
1141
  // `data` is written to the serializer later, and `deserializeInfo` is stored
1142
  // on the `TransferData` instance as a string.
1143
28
  HandleScope handle_scope(env()->isolate());
1144
14
  Local<Context> context = env()->isolate()->GetCurrentContext();
1145
  Local<Symbol> method_name = mode == TransferMode::kCloneable ?
1146
14
      env()->messaging_clone_symbol() : env()->messaging_transfer_symbol();
1147
1148
  Local<Value> method;
1149
42
  if (!object()->Get(context, method_name).ToLocal(&method)) {
1150
    return {};
1151
  }
1152
14
  if (method->IsFunction()) {
1153
    Local<Value> result_v;
1154
44
    if (!method.As<Function>()->Call(
1155
44
            context, object(), 0, nullptr).ToLocal(&result_v)) {
1156
14
      return {};
1157
    }
1158
1159
8
    if (result_v->IsObject()) {
1160
8
      Local<Object> result = result_v.As<Object>();
1161
      Local<Value> data;
1162
      Local<Value> deserialize_info;
1163

40
      if (!result->Get(context, env()->data_string()).ToLocal(&data) ||
1164
32
          !result->Get(context, env()->deserialize_info_string())
1165
8
              .ToLocal(&deserialize_info)) {
1166
        return {};
1167
      }
1168
16
      Utf8Value deserialize_info_str(env()->isolate(), deserialize_info);
1169
8
      if (*deserialize_info_str == nullptr) return {};
1170
16
      return std::make_unique<Data>(
1171
40
          *deserialize_info_str, Global<Value>(env()->isolate(), data));
1172
    }
1173
  }
1174
1175
3
  if (mode == TransferMode::kTransferable)
1176
    return TransferOrClone(TransferMode::kCloneable);
1177
  else
1178
3
    return {};
1179
}
1180
1181
Maybe<BaseObjectList>
1182
10
JSTransferable::NestedTransferables() const {
1183
  // Call `this[kTransferList]()` and return the resulting list of BaseObjects.
1184
20
  HandleScope handle_scope(env()->isolate());
1185
10
  Local<Context> context = env()->isolate()->GetCurrentContext();
1186
10
  Local<Symbol> method_name = env()->messaging_transfer_list_symbol();
1187
1188
  Local<Value> method;
1189
30
  if (!object()->Get(context, method_name).ToLocal(&method)) {
1190
    return Nothing<BaseObjectList>();
1191
  }
1192
10
  if (!method->IsFunction()) return Just(BaseObjectList {});
1193
1194
  Local<Value> list_v;
1195
40
  if (!method.As<Function>()->Call(
1196
40
          context, object(), 0, nullptr).ToLocal(&list_v)) {
1197
    return Nothing<BaseObjectList>();
1198
  }
1199
10
  if (!list_v->IsArray()) return Just(BaseObjectList {});
1200
10
  Local<Array> list = list_v.As<Array>();
1201
1202
20
  BaseObjectList ret;
1203
40
  for (size_t i = 0; i < list->Length(); i++) {
1204
    Local<Value> value;
1205
30
    if (!list->Get(context, i).ToLocal(&value))
1206
      return Nothing<BaseObjectList>();
1207
20
    if (env()->base_object_ctor_template()->HasInstance(value))
1208
10
      ret.emplace_back(Unwrap<BaseObject>(value));
1209
  }
1210
10
  return Just(ret);
1211
}
1212
1213
3
Maybe<bool> JSTransferable::FinalizeTransferRead(
1214
    Local<Context> context, ValueDeserializer* deserializer) {
1215
  // Call `this[kDeserialize](data)` where `data` comes from the return value
1216
  // of `this[kTransfer]()` or `this[kClone]()`.
1217
6
  HandleScope handle_scope(env()->isolate());
1218
  Local<Value> data;
1219
6
  if (!deserializer->ReadValue(context).ToLocal(&data)) return Nothing<bool>();
1220
1221
3
  Local<Symbol> method_name = env()->messaging_deserialize_symbol();
1222
  Local<Value> method;
1223
9
  if (!object()->Get(context, method_name).ToLocal(&method)) {
1224
    return Nothing<bool>();
1225
  }
1226
3
  if (!method->IsFunction()) return Just(true);
1227
1228
12
  if (method.As<Function>()->Call(context, object(), 1, &data).IsEmpty()) {
1229
    return Nothing<bool>();
1230
  }
1231
3
  return Just(true);
1232
}
1233
1234
8
JSTransferable::Data::Data(std::string&& deserialize_info,
1235
8
                           v8::Global<v8::Value>&& data)
1236
8
    : deserialize_info_(std::move(deserialize_info)),
1237
24
      data_(std::move(data)) {}
1238
1239
6
BaseObjectPtr<BaseObject> JSTransferable::Data::Deserialize(
1240
    Environment* env,
1241
    Local<Context> context,
1242
    std::unique_ptr<TransferData> self) {
1243
  // Create the JS wrapper object that will later be filled with data passed to
1244
  // the `[kDeserialize]()` method on it. This split is necessary, because here
1245
  // we need to create an object with the right prototype and internal fields,
1246
  // but the actual JS data stored in the serialized data can only be read at
1247
  // the end of the stream, after the main message has been read.
1248
1249
12
  if (context != env->context()) {
1250
1
    THROW_ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE(env);
1251
1
    return {};
1252
  }
1253
10
  HandleScope handle_scope(env->isolate());
1254
  Local<Value> info;
1255
10
  if (!ToV8Value(context, deserialize_info_).ToLocal(&info)) return {};
1256
1257
  Local<Value> ret;
1258
10
  CHECK(!env->messaging_deserialize_create_object().IsEmpty());
1259
30
  if (!env->messaging_deserialize_create_object()->Call(
1260

28
          context, Null(env->isolate()), 1, &info).ToLocal(&ret) ||
1261
11
      !env->base_object_ctor_template()->HasInstance(ret)) {
1262
2
    return {};
1263
  }
1264
1265
3
  return BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(ret) };
1266
}
1267
1268
8
Maybe<bool> JSTransferable::Data::FinalizeTransferWrite(
1269
    Local<Context> context, ValueSerializer* serializer) {
1270
16
  HandleScope handle_scope(context->GetIsolate());
1271
8
  auto ret = serializer->WriteValue(context, PersistentToLocal::Strong(data_));
1272
8
  data_.Reset();
1273
16
  return ret;
1274
}
1275
1276
33521
std::shared_ptr<SiblingGroup> SiblingGroup::Get(const std::string& name) {
1277
33521
  if (name.empty()) return {};
1278
76
  Mutex::ScopedLock lock(SiblingGroup::groups_mutex_);
1279
76
  std::shared_ptr<SiblingGroup> group;
1280
38
  auto i = groups_.find(name);
1281

38
  if (i == groups_.end() || i->second.expired()) {
1282
19
    group = std::make_shared<SiblingGroup>(name);
1283
19
    groups_[name] = group;
1284
  } else {
1285
19
    group = i->second.lock();
1286
  }
1287
38
  return group;
1288
}
1289
1290
19
void SiblingGroup::CheckSiblingGroup(const std::string& name) {
1291
38
  Mutex::ScopedLock lock(SiblingGroup::groups_mutex_);
1292
19
  auto i = groups_.find(name);
1293

19
  if (i != groups_.end() && i->second.expired())
1294
19
    groups_.erase(name);
1295
19
}
1296
1297
19
SiblingGroup::SiblingGroup(const std::string& name)
1298
19
    : name_(name) { }
1299
1300
22680
SiblingGroup::~SiblingGroup() {
1301
  // If this is a named group, check to see if we can remove the group
1302
11340
  if (!name_.empty())
1303
19
    CheckSiblingGroup(name_);
1304
11340
}
1305
1306
44939
Maybe<bool> SiblingGroup::Dispatch(
1307
    MessagePortData* source,
1308
    std::shared_ptr<Message> message,
1309
    std::string* error) {
1310
1311
89878
  Mutex::ScopedLock lock(group_mutex_);
1312
1313
  // The source MessagePortData is not part of this group.
1314
44939
  if (ports_.find(source) == ports_.end()) {
1315
    if (error != nullptr)
1316
      *error = "Source MessagePort is not entangled with this group.";
1317
    return Nothing<bool>();
1318
  }
1319
1320
  // There are no destination ports.
1321
44938
  if (size() <= 1)
1322
83
    return Just(false);
1323
1324
  // Transferables cannot be used when there is more
1325
  // than a single destination.
1326

44854
  if (size() > 2 && message->transferables().size()) {
1327
    if (error != nullptr)
1328
      *error = "Transferables cannot be used with multiple destinations.";
1329
    return Nothing<bool>();
1330
  }
1331
1332
134574
  for (MessagePortData* port : ports_) {
1333
89720
    if (port == source)
1334
44856
      continue;
1335
    // This loop should only be entered if there's only a single destination
1336
55535
    for (const auto& transferable : message->transferables()) {
1337
10673
      if (port == transferable.get()) {
1338
2
        if (error != nullptr) {
1339
          *error = "The target port was posted to itself, and the "
1340
2
                   "communication channel was lost";
1341
        }
1342
2
        return Just(true);
1343
      }
1344
    }
1345
44862
    port->AddToIncomingQueue(message);
1346
  }
1347
1348
44854
  return Just(true);
1349
}
1350
1351
22728
void SiblingGroup::Entangle(MessagePortData* data) {
1352
45456
  Mutex::ScopedLock lock(group_mutex_);
1353
22728
  ports_.insert(data);
1354
22728
}
1355
1356
22701
void SiblingGroup::Disentangle(MessagePortData* data) {
1357
45405
  Mutex::ScopedLock lock(group_mutex_);
1358
22704
  ports_.erase(data);
1359
1360
22704
  data->AddToIncomingQueue(std::make_shared<Message>());
1361
  // If this is an anonymous group and there's another port, close it.
1362

22702
  if (size() == 1 && name_.empty())
1363
11345
    (*(ports_.begin()))->AddToIncomingQueue(std::make_shared<Message>());
1364
22704
}
1365
1366
4691
SiblingGroup::Map SiblingGroup::groups_;
1367
4691
Mutex SiblingGroup::groups_mutex_;
1368
1369
namespace {
1370
1371
444
static void SetDeserializerCreateObjectFunction(
1372
    const FunctionCallbackInfo<Value>& args) {
1373
444
  Environment* env = Environment::GetCurrent(args);
1374
888
  CHECK(args[0]->IsFunction());
1375
888
  env->set_messaging_deserialize_create_object(args[0].As<Function>());
1376
444
}
1377
1378
10712
static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
1379
10712
  Environment* env = Environment::GetCurrent(args);
1380
10712
  if (!args.IsConstructCall()) {
1381
1
    THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
1382
1
    return;
1383
  }
1384
1385
21422
  Local<Context> context = args.This()->CreationContext();
1386
10711
  Context::Scope context_scope(context);
1387
1388
10711
  MessagePort* port1 = MessagePort::New(env, context);
1389
10711
  if (port1 == nullptr) return;
1390
10711
  MessagePort* port2 = MessagePort::New(env, context);
1391
10711
  if (port2 == nullptr) {
1392
    port1->Close();
1393
    return;
1394
  }
1395
1396
10711
  MessagePort::Entangle(port1, port2);
1397
1398
53555
  args.This()->Set(context, env->port1_string(), port1->object())
1399
      .Check();
1400
53555
  args.This()->Set(context, env->port2_string(), port2->object())
1401
      .Check();
1402
}
1403
1404
38
static void BroadcastChannel(const FunctionCallbackInfo<Value>& args) {
1405
114
  CHECK(args[0]->IsString());
1406
38
  Environment* env = Environment::GetCurrent(args);
1407
38
  Context::Scope context_scope(env->context());
1408
76
  Utf8Value name(env->isolate(), args[0]);
1409
  MessagePort* port =
1410
38
      MessagePort::New(env, env->context(), nullptr, std::string(*name));
1411
114
  args.GetReturnValue().Set(port->object());
1412
38
}
1413
1414
444
static void InitMessaging(Local<Object> target,
1415
                          Local<Value> unused,
1416
                          Local<Context> context,
1417
                          void* priv) {
1418
444
  Environment* env = Environment::GetCurrent(context);
1419
1420
  {
1421
    Local<String> message_channel_string =
1422
444
        FIXED_ONE_BYTE_STRING(env->isolate(), "MessageChannel");
1423
444
    Local<FunctionTemplate> templ = env->NewFunctionTemplate(MessageChannel);
1424
444
    templ->SetClassName(message_channel_string);
1425
888
    target->Set(context,
1426
                message_channel_string,
1427
1332
                templ->GetFunction(context).ToLocalChecked()).Check();
1428
  }
1429
1430
  {
1431
    Local<String> js_transferable_string =
1432
444
        FIXED_ONE_BYTE_STRING(env->isolate(), "JSTransferable");
1433
444
    Local<FunctionTemplate> t = env->NewFunctionTemplate(JSTransferable::New);
1434
888
    t->Inherit(BaseObject::GetConstructorTemplate(env));
1435
444
    t->SetClassName(js_transferable_string);
1436
1332
    t->InstanceTemplate()->SetInternalFieldCount(
1437
444
        JSTransferable::kInternalFieldCount);
1438
888
    target->Set(context,
1439
                js_transferable_string,
1440
1332
                t->GetFunction(context).ToLocalChecked()).Check();
1441
  }
1442
1443
888
  target->Set(context,
1444
              env->message_port_constructor_string(),
1445
888
              GetMessagePortConstructorTemplate(env)
1446
1776
                  ->GetFunction(context).ToLocalChecked()).Check();
1447
1448
  // These are not methods on the MessagePort prototype, because
1449
  // the browser equivalents do not provide them.
1450
444
  env->SetMethod(target, "stopMessagePort", MessagePort::Stop);
1451
444
  env->SetMethod(target, "checkMessagePort", MessagePort::CheckType);
1452
444
  env->SetMethod(target, "drainMessagePort", MessagePort::Drain);
1453
444
  env->SetMethod(target, "receiveMessageOnPort", MessagePort::ReceiveMessage);
1454
  env->SetMethod(target, "moveMessagePortToContext",
1455
444
                 MessagePort::MoveToContext);
1456
  env->SetMethod(target, "setDeserializerCreateObjectFunction",
1457
444
                 SetDeserializerCreateObjectFunction);
1458
444
  env->SetMethod(target, "broadcastChannel", BroadcastChannel);
1459
1460
  {
1461
888
    Local<Function> domexception = GetDOMException(context).ToLocalChecked();
1462
    target
1463
888
        ->Set(context,
1464
              FIXED_ONE_BYTE_STRING(env->isolate(), "DOMException"),
1465
1332
              domexception)
1466
        .Check();
1467
  }
1468
444
}
1469
1470
4614
static void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
1471
4614
  registry->Register(MessageChannel);
1472
4614
  registry->Register(BroadcastChannel);
1473
4614
  registry->Register(JSTransferable::New);
1474
4614
  registry->Register(MessagePort::New);
1475
4614
  registry->Register(MessagePort::PostMessage);
1476
4614
  registry->Register(MessagePort::Start);
1477
4614
  registry->Register(MessagePort::Stop);
1478
4614
  registry->Register(MessagePort::CheckType);
1479
4614
  registry->Register(MessagePort::Drain);
1480
4614
  registry->Register(MessagePort::ReceiveMessage);
1481
4614
  registry->Register(MessagePort::MoveToContext);
1482
4614
  registry->Register(SetDeserializerCreateObjectFunction);
1483
4614
}
1484
1485
}  // anonymous namespace
1486
1487
}  // namespace worker
1488
}  // namespace node
1489
1490
4683
NODE_MODULE_CONTEXT_AWARE_INTERNAL(messaging, node::worker::InitMessaging)
1491

18687
NODE_MODULE_EXTERNAL_REFERENCE(messaging,
1492
                               node::worker::RegisterExternalReferences)