GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/node_messaging.cc Lines: 657 713 92.1 %
Date: 2020-09-03 22:13:26 Branches: 326 443 73.6 %

Line Branch Exec Source
1
#include "node_messaging.h"
2
3
#include "async_wrap-inl.h"
4
#include "debug_utils-inl.h"
5
#include "memory_tracker-inl.h"
6
#include "node_buffer.h"
7
#include "node_contextify.h"
8
#include "node_errors.h"
9
#include "node_external_reference.h"
10
#include "node_process.h"
11
#include "util-inl.h"
12
13
using node::contextify::ContextifyContext;
14
using node::errors::TryCatchScope;
15
using v8::Array;
16
using v8::ArrayBuffer;
17
using v8::BackingStore;
18
using v8::CompiledWasmModule;
19
using v8::Context;
20
using v8::EscapableHandleScope;
21
using v8::Function;
22
using v8::FunctionCallbackInfo;
23
using v8::FunctionTemplate;
24
using v8::Global;
25
using v8::HandleScope;
26
using v8::Isolate;
27
using v8::Just;
28
using v8::Local;
29
using v8::Maybe;
30
using v8::MaybeLocal;
31
using v8::Nothing;
32
using v8::Object;
33
using v8::SharedArrayBuffer;
34
using v8::String;
35
using v8::Symbol;
36
using v8::Value;
37
using v8::ValueDeserializer;
38
using v8::ValueSerializer;
39
using v8::WasmModuleObject;
40
41
namespace node {
42
43
using BaseObjectList = std::vector<BaseObjectPtr<BaseObject>>;
44
45
1
BaseObject::TransferMode BaseObject::GetTransferMode() const {
46
1
  return BaseObject::TransferMode::kUntransferable;
47
}
48
49
std::unique_ptr<worker::TransferData> BaseObject::TransferForMessaging() {
50
  return CloneForMessaging();
51
}
52
53
std::unique_ptr<worker::TransferData> BaseObject::CloneForMessaging() const {
54
  return {};
55
}
56
57
10652
Maybe<BaseObjectList> BaseObject::NestedTransferables() const {
58
10652
  return Just(BaseObjectList {});
59
}
60
61
10419
Maybe<bool> BaseObject::FinalizeTransferRead(
62
    Local<Context> context, ValueDeserializer* deserializer) {
63
10419
  return Just(true);
64
}
65
66
namespace worker {
67
68
10656
Maybe<bool> TransferData::FinalizeTransferWrite(
69
    Local<Context> context, ValueSerializer* serializer) {
70
10656
  return Just(true);
71
}
72
73
206124
Message::Message(MallocedBuffer<char>&& buffer)
74
206124
    : main_message_buf_(std::move(buffer)) {}
75
76
128724
bool Message::IsCloseMessage() const {
77
128724
  return main_message_buf_.data == nullptr;
78
}
79
80
namespace {
81
82
// This is used to tell V8 how to read transferred host objects, like other
83
// `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
84
53436
class DeserializerDelegate : public ValueDeserializer::Delegate {
85
 public:
86
53615
  DeserializerDelegate(
87
      Message* m,
88
      Environment* env,
89
      const std::vector<BaseObjectPtr<BaseObject>>& host_objects,
90
      const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
91
      const std::vector<CompiledWasmModule>& wasm_modules)
92
53615
      : host_objects_(host_objects),
93
        shared_array_buffers_(shared_array_buffers),
94
53615
        wasm_modules_(wasm_modules) {}
95
96
10420
  MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
97
    // Identifying the index in the message's BaseObject array is sufficient.
98
    uint32_t id;
99
10420
    if (!deserializer->ReadUint32(&id))
100
      return MaybeLocal<Object>();
101
10420
    CHECK_LE(id, host_objects_.size());
102
20840
    return host_objects_[id]->object(isolate);
103
  }
104
105
430
  MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
106
      Isolate* isolate, uint32_t clone_id) override {
107
430
    CHECK_LE(clone_id, shared_array_buffers_.size());
108
860
    return shared_array_buffers_[clone_id];
109
  }
110
111
2
  MaybeLocal<WasmModuleObject> GetWasmModuleFromId(
112
      Isolate* isolate, uint32_t transfer_id) override {
113
2
    CHECK_LE(transfer_id, wasm_modules_.size());
114
    return WasmModuleObject::FromCompiledModule(
115
2
        isolate, wasm_modules_[transfer_id]);
116
  }
117
118
  ValueDeserializer* deserializer = nullptr;
119
120
 private:
121
  const std::vector<BaseObjectPtr<BaseObject>>& host_objects_;
122
  const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
123
  const std::vector<CompiledWasmModule>& wasm_modules_;
124
};
125
126
}  // anonymous namespace
127
128
53583
MaybeLocal<Value> Message::Deserialize(Environment* env,
129
                                       Local<Context> context) {
130
53583
  CHECK(!IsCloseMessage());
131
132
53651
  EscapableHandleScope handle_scope(env->isolate());
133
  Context::Scope context_scope(context);
134
135
  // Create all necessary objects for transferables, e.g. MessagePort handles.
136
107249
  std::vector<BaseObjectPtr<BaseObject>> host_objects(transferables_.size());
137
53377
  auto cleanup = OnScopeLeave([&]() {
138
53377
    for (BaseObjectPtr<BaseObject> object : host_objects) {
139
9
      if (!object) continue;
140
141
      // If the function did not finish successfully, host_objects will contain
142
      // a list of objects that will never be passed to JS. Therefore, we
143
      // destroy them here.
144
      object->Detach();
145
    }
146
160661
  });
147
148
64066
  for (uint32_t i = 0; i < transferables_.size(); ++i) {
149
10425
    TransferData* data = transferables_[i].get();
150
20851
    host_objects[i] = data->Deserialize(
151
20851
        env, context, std::move(transferables_[i]));
152
10432
    if (!host_objects[i]) return {};
153
  }
154
53616
  transferables_.clear();
155
156
107207
  std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
157
  // Attach all transferred SharedArrayBuffers to their new Isolate.
158
54093
  for (uint32_t i = 0; i < shared_array_buffers_.size(); ++i) {
159
    Local<SharedArrayBuffer> sab =
160
        SharedArrayBuffer::New(env->isolate(),
161
429
                               std::move(shared_array_buffers_[i]));
162
430
    shared_array_buffers.push_back(sab);
163
  }
164
53547
  shared_array_buffers_.clear();
165
166
  DeserializerDelegate delegate(
167
107270
      this, env, host_objects, shared_array_buffers, wasm_modules_);
168
  ValueDeserializer deserializer(
169
      env->isolate(),
170
53590
      reinterpret_cast<const uint8_t*>(main_message_buf_.data),
171
      main_message_buf_.size,
172
160800
      &delegate);
173
53665
  delegate.deserializer = &deserializer;
174
175
  // Attach all transferred ArrayBuffers to their new Isolate.
176
53674
  for (uint32_t i = 0; i < array_buffers_.size(); ++i) {
177
    Local<ArrayBuffer> ab =
178
9
        ArrayBuffer::New(env->isolate(), std::move(array_buffers_[i]));
179
9
    deserializer.TransferArrayBuffer(i, ab);
180
  }
181
53666
  array_buffers_.clear();
182
183
107259
  if (deserializer.ReadHeader(context).IsNothing())
184
    return {};
185
  Local<Value> return_value;
186
107158
  if (!deserializer.ReadValue(context).ToLocal(&return_value))
187
    return {};
188
189
63985
  for (BaseObjectPtr<BaseObject> base_object : host_objects) {
190
20840
    if (base_object->FinalizeTransferRead(context, &deserializer).IsNothing())
191
      return {};
192
  }
193
194
53524
  host_objects.clear();
195
53620
  return handle_scope.Escape(return_value);
196
}
197
198
654
void Message::AddSharedArrayBuffer(
199
    std::shared_ptr<BackingStore> backing_store) {
200
654
  shared_array_buffers_.emplace_back(std::move(backing_store));
201
654
}
202
203
10661
void Message::AddTransferable(std::unique_ptr<TransferData>&& data) {
204
10661
  transferables_.emplace_back(std::move(data));
205
10661
}
206
207
2
uint32_t Message::AddWASMModule(CompiledWasmModule&& mod) {
208
2
  wasm_modules_.emplace_back(std::move(mod));
209
2
  return wasm_modules_.size() - 1;
210
}
211
212
namespace {
213
214
32847
MaybeLocal<Function> GetEmitMessageFunction(Local<Context> context) {
215
32847
  Isolate* isolate = context->GetIsolate();
216
  Local<Object> per_context_bindings;
217
  Local<Value> emit_message_val;
218

131391
  if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
219
98543
      !per_context_bindings->Get(context,
220
98542
                                FIXED_ONE_BYTE_STRING(isolate, "emitMessage"))
221
32848
          .ToLocal(&emit_message_val)) {
222
    return MaybeLocal<Function>();
223
  }
224
32848
  CHECK(emit_message_val->IsFunction());
225
32848
  return emit_message_val.As<Function>();
226
}
227
228
467
MaybeLocal<Function> GetDOMException(Local<Context> context) {
229
467
  Isolate* isolate = context->GetIsolate();
230
  Local<Object> per_context_bindings;
231
  Local<Value> domexception_ctor_val;
232

1868
  if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
233
1401
      !per_context_bindings->Get(context,
234
1401
                                FIXED_ONE_BYTE_STRING(isolate, "DOMException"))
235
467
          .ToLocal(&domexception_ctor_val)) {
236
    return MaybeLocal<Function>();
237
  }
238
467
  CHECK(domexception_ctor_val->IsFunction());
239
467
  Local<Function> domexception_ctor = domexception_ctor_val.As<Function>();
240
467
  return domexception_ctor;
241
}
242
243
15
void ThrowDataCloneException(Local<Context> context, Local<String> message) {
244
15
  Isolate* isolate = context->GetIsolate();
245
  Local<Value> argv[] = {message,
246
45
                         FIXED_ONE_BYTE_STRING(isolate, "DataCloneError")};
247
  Local<Value> exception;
248
  Local<Function> domexception_ctor;
249

60
  if (!GetDOMException(context).ToLocal(&domexception_ctor) ||
250
60
      !domexception_ctor->NewInstance(context, arraysize(argv), argv)
251
15
           .ToLocal(&exception)) {
252
    return;
253
  }
254
15
  isolate->ThrowException(exception);
255
}
256
257
// This tells V8 how to serialize objects that it does not understand
258
// (e.g. C++ objects) into the output buffer, in a way that our own
259
// DeserializerDelegate understands how to unpack.
260
54547
class SerializerDelegate : public ValueSerializer::Delegate {
261
 public:
262
54548
  SerializerDelegate(Environment* env, Local<Context> context, Message* m)
263
54548
      : env_(env), context_(context), msg_(m) {}
264
265
5
  void ThrowDataCloneError(Local<String> message) override {
266
5
    ThrowDataCloneException(context_, message);
267
5
  }
268
269
10666
  Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
270
21332
    if (env_->base_object_ctor_template()->HasInstance(object)) {
271
      return WriteHostObject(
272
10666
          BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(object) });
273
    }
274
275
    ThrowDataCloneError(env_->clone_unsupported_type_str());
276
    return Nothing<bool>();
277
  }
278
279
654
  Maybe<uint32_t> GetSharedArrayBufferId(
280
      Isolate* isolate,
281
      Local<SharedArrayBuffer> shared_array_buffer) override {
282
    uint32_t i;
283
678
    for (i = 0; i < seen_shared_array_buffers_.size(); ++i) {
284
48
      if (PersistentToLocal::Strong(seen_shared_array_buffers_[i]) ==
285
          shared_array_buffer) {
286
        return Just(i);
287
      }
288
    }
289
290
654
    seen_shared_array_buffers_.emplace_back(
291
1308
      Global<SharedArrayBuffer> { isolate, shared_array_buffer });
292
654
    msg_->AddSharedArrayBuffer(shared_array_buffer->GetBackingStore());
293
654
    return Just(i);
294
  }
295
296
2
  Maybe<uint32_t> GetWasmModuleTransferId(
297
      Isolate* isolate, Local<WasmModuleObject> module) override {
298
2
    return Just(msg_->AddWASMModule(module->GetCompiledModule()));
299
  }
300
301
54531
  Maybe<bool> Finish(Local<Context> context) {
302
65192
    for (uint32_t i = 0; i < host_objects_.size(); i++) {
303
21325
      BaseObjectPtr<BaseObject> host_object = std::move(host_objects_[i]);
304
21325
      std::unique_ptr<TransferData> data;
305
10664
      if (i < first_cloned_object_index_)
306
10655
        data = host_object->TransferForMessaging();
307
10664
      if (!data)
308
11
        data = host_object->CloneForMessaging();
309
10664
      if (!data) return Nothing<bool>();
310
21324
      if (data->FinalizeTransferWrite(context, serializer).IsNothing())
311
1
        return Nothing<bool>();
312
10661
      msg_->AddTransferable(std::move(data));
313
    }
314
54528
    return Just(true);
315
  }
316
317
10661
  inline void AddHostObject(BaseObjectPtr<BaseObject> host_object) {
318
    // Make sure we have not started serializing the value itself yet.
319
10661
    CHECK_EQ(first_cloned_object_index_, SIZE_MAX);
320
10661
    host_objects_.emplace_back(std::move(host_object));
321
10661
  }
322
323
  // Some objects in the transfer list may register sub-objects that can be
324
  // transferred. This could e.g. be a public JS wrapper object, such as a
325
  // FileHandle, that is registering its C++ handle for transfer.
326
54537
  inline Maybe<bool> AddNestedHostObjects() {
327
65197
    for (size_t i = 0; i < host_objects_.size(); i++) {
328
21320
      std::vector<BaseObjectPtr<BaseObject>> nested_transferables;
329
21320
      if (!host_objects_[i]->NestedTransferables().To(&nested_transferables))
330
        return Nothing<bool>();
331

10668
      for (auto nested_transferable : nested_transferables) {
332
16
        if (std::find(host_objects_.begin(),
333
                      host_objects_.end(),
334
16
                      nested_transferable) == host_objects_.end()) {
335
8
          AddHostObject(nested_transferable);
336
        }
337
      }
338
    }
339
54537
    return Just(true);
340
  }
341
342
  ValueSerializer* serializer = nullptr;
343
344
 private:
345
10666
  Maybe<bool> WriteHostObject(BaseObjectPtr<BaseObject> host_object) {
346
10666
    BaseObject::TransferMode mode = host_object->GetTransferMode();
347
10666
    if (mode == BaseObject::TransferMode::kUntransferable) {
348
2
      ThrowDataCloneError(env_->clone_unsupported_type_str());
349
2
      return Nothing<bool>();
350
    }
351
352
10672
    for (uint32_t i = 0; i < host_objects_.size(); i++) {
353
10660
      if (host_objects_[i] == host_object) {
354
10652
        serializer->WriteUint32(i);
355
10652
        return Just(true);
356
      }
357
    }
358
359
12
    if (mode == BaseObject::TransferMode::kTransferable) {
360
3
      THROW_ERR_MISSING_TRANSFERABLE_IN_TRANSFER_LIST(env_);
361
3
      return Nothing<bool>();
362
    }
363
364
9
    CHECK_EQ(mode, BaseObject::TransferMode::kCloneable);
365
9
    uint32_t index = host_objects_.size();
366
9
    if (first_cloned_object_index_ == SIZE_MAX)
367
9
      first_cloned_object_index_ = index;
368
9
    serializer->WriteUint32(index);
369
9
    host_objects_.push_back(host_object);
370
9
    return Just(true);
371
  }
372
373
  Environment* env_;
374
  Local<Context> context_;
375
  Message* msg_;
376
  std::vector<Global<SharedArrayBuffer>> seen_shared_array_buffers_;
377
  std::vector<BaseObjectPtr<BaseObject>> host_objects_;
378
  size_t first_cloned_object_index_ = SIZE_MAX;
379
380
  friend class worker::Message;
381
};
382
383
}  // anonymous namespace
384
385
54548
Maybe<bool> Message::Serialize(Environment* env,
386
                               Local<Context> context,
387
                               Local<Value> input,
388
                               const TransferList& transfer_list_v,
389
                               Local<Object> source_port) {
390
109097
  HandleScope handle_scope(env->isolate());
391
  Context::Scope context_scope(context);
392
393
  // Verify that we're not silently overwriting an existing message.
394
54549
  CHECK(main_message_buf_.is_empty());
395
396
109099
  SerializerDelegate delegate(env, context, this);
397
109096
  ValueSerializer serializer(env->isolate(), &delegate);
398
54547
  delegate.serializer = &serializer;
399
400
109097
  std::vector<Local<ArrayBuffer>> array_buffers;
401
65229
  for (uint32_t i = 0; i < transfer_list_v.length(); ++i) {
402
10690
    Local<Value> entry = transfer_list_v[i];
403
10690
    if (entry->IsObject()) {
404
      // See https://github.com/nodejs/node/pull/30339#issuecomment-552225353
405
      // for details.
406
      bool untransferable;
407
32070
      if (!entry.As<Object>()->HasPrivate(
408
              context,
409
21380
              env->untransferable_object_private_symbol())
410
10690
              .To(&untransferable)) {
411
        return Nothing<bool>();
412
      }
413
10690
      if (untransferable) continue;
414
    }
415
416
    // Currently, we support ArrayBuffers and BaseObjects for which
417
    // GetTransferMode() does not return kUntransferable.
418
10685
    if (entry->IsArrayBuffer()) {
419
23
      Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
420
      // If we cannot render the ArrayBuffer unusable in this Isolate,
421
      // copying the buffer will have to do.
422
      // Note that we can currently transfer ArrayBuffers even if they were
423
      // not allocated by Node’s ArrayBufferAllocator in the first place,
424
      // because we pass the underlying v8::BackingStore around rather than
425
      // raw data *and* an Isolate with a non-default ArrayBuffer allocator
426
      // is always going to outlive any Workers it creates, and so will its
427
      // allocator along with it.
428
45
      if (!ab->IsDetachable()) continue;
429
46
      if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
430
46
          array_buffers.end()) {
431
1
        ThrowDataCloneException(
432
            context,
433
            FIXED_ONE_BYTE_STRING(
434
                env->isolate(),
435
1
                "Transfer list contains duplicate ArrayBuffer"));
436
1
        return Nothing<bool>();
437
      }
438
      // We simply use the array index in the `array_buffers` list as the
439
      // ID that we write into the serialized buffer.
440
22
      uint32_t id = array_buffers.size();
441
22
      array_buffers.push_back(ab);
442
22
      serializer.TransferArrayBuffer(id, ab);
443
22
      continue;
444
21324
    } else if (env->base_object_ctor_template()->HasInstance(entry)) {
445
      // Check if the source MessagePort is being transferred.
446

21324
      if (!source_port.IsEmpty() && entry == source_port) {
447
1
        ThrowDataCloneException(
448
            context,
449
            FIXED_ONE_BYTE_STRING(env->isolate(),
450
1
                                  "Transfer list contains source port"));
451
10
        return Nothing<bool>();
452
      }
453
      BaseObjectPtr<BaseObject> host_object {
454
10661
          Unwrap<BaseObject>(entry.As<Object>()) };
455

31982
      if (env->message_port_constructor_template()->HasInstance(entry) &&
456
21305
          (!host_object ||
457
10652
           static_cast<MessagePort*>(host_object.get())->IsDetached())) {
458
7
        ThrowDataCloneException(
459
            context,
460
            FIXED_ONE_BYTE_STRING(
461
                env->isolate(),
462
7
                "MessagePort in transfer list is already detached"));
463
7
        return Nothing<bool>();
464
      }
465
21308
      if (std::find(delegate.host_objects_.begin(),
466
                    delegate.host_objects_.end(),
467
21308
                    host_object) != delegate.host_objects_.end()) {
468
1
        ThrowDataCloneException(
469
            context,
470
            String::Concat(env->isolate(),
471
                FIXED_ONE_BYTE_STRING(
472
                  env->isolate(),
473
                  "Transfer list contains duplicate "),
474
2
                entry.As<Object>()->GetConstructorName()));
475
1
        return Nothing<bool>();
476
      }
477

10653
      if (host_object && host_object->GetTransferMode() !=
478
10653
              BaseObject::TransferMode::kUntransferable) {
479
10653
        delegate.AddHostObject(host_object);
480
10653
        continue;
481
      }
482
    }
483
484
    THROW_ERR_INVALID_TRANSFER_OBJECT(env);
485
    return Nothing<bool>();
486
  }
487
109076
  if (delegate.AddNestedHostObjects().IsNothing())
488
    return Nothing<bool>();
489
490
54538
  serializer.WriteHeader();
491
109078
  if (serializer.WriteValue(context, input).IsNothing()) {
492
7
    return Nothing<bool>();
493
  }
494
495
54546
  for (Local<ArrayBuffer> ab : array_buffers) {
496
    // If serialization succeeded, we render it inaccessible in this Isolate.
497
28
    std::shared_ptr<BackingStore> backing_store = ab->GetBackingStore();
498
14
    ab->Detach();
499
500
14
    array_buffers_.emplace_back(std::move(backing_store));
501
  }
502
503
109064
  if (delegate.Finish(context).IsNothing())
504
3
    return Nothing<bool>();
505
506
  // The serializer gave us a buffer allocated using `malloc()`.
507
54529
  std::pair<uint8_t*, size_t> data = serializer.Release();
508
54528
  CHECK_NOT_NULL(data.first);
509
  main_message_buf_ =
510
54528
      MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
511
54530
  return Just(true);
512
}
513
514
void Message::MemoryInfo(MemoryTracker* tracker) const {
515
  tracker->TrackField("array_buffers_", array_buffers_);
516
  tracker->TrackField("shared_array_buffers", shared_array_buffers_);
517
  tracker->TrackField("transferables", transferables_);
518
}
519
520
33475
MessagePortData::MessagePortData(MessagePort* owner) : owner_(owner) { }
521
522
100284
MessagePortData::~MessagePortData() {
523
33427
  CHECK_NULL(owner_);
524
33427
  Disentangle();
525
66856
}
526
527
void MessagePortData::MemoryInfo(MemoryTracker* tracker) const {
528
  Mutex::ScopedLock lock(mutex_);
529
  tracker->TrackField("incoming_messages", incoming_messages_);
530
}
531
532
121350
void MessagePortData::AddToIncomingQueue(Message&& message) {
533
  // This function will be called by other threads.
534
242703
  Mutex::ScopedLock lock(mutex_);
535
121359
  incoming_messages_.emplace_back(std::move(message));
536
537
121351
  if (owner_ != nullptr) {
538
64000
    Debug(owner_, "Adding message to incoming queue");
539
64002
    owner_->TriggerAsync();
540
  }
541
121358
}
542
543
11325
void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
544
11325
  CHECK_NULL(a->sibling_);
545
11325
  CHECK_NULL(b->sibling_);
546
11325
  a->sibling_ = b;
547
11325
  b->sibling_ = a;
548
11325
  a->sibling_mutex_ = b->sibling_mutex_;
549
11325
}
550
551
55579
void MessagePortData::Disentangle() {
552
  // Grab a copy of the sibling mutex, then replace it so that each sibling
553
  // has its own sibling_mutex_ now.
554
111166
  std::shared_ptr<Mutex> sibling_mutex = sibling_mutex_;
555
111174
  Mutex::ScopedLock sibling_lock(*sibling_mutex);
556
55588
  sibling_mutex_ = std::make_shared<Mutex>();
557
558
55588
  MessagePortData* sibling = sibling_;
559
55588
  if (sibling_ != nullptr) {
560
11324
    sibling_->sibling_ = nullptr;
561
11324
    sibling_ = nullptr;
562
  }
563
564
  // We close MessagePorts after disentanglement, so we enqueue a corresponding
565
  // message and trigger the corresponding uv_async_t to let them know that
566
  // this happened.
567
55588
  AddToIncomingQueue(Message());
568
55586
  if (sibling != nullptr) {
569
11324
    sibling->AddToIncomingQueue(Message());
570
  }
571
55588
}
572
573
131204
MessagePort::~MessagePort() {
574
32800
  if (data_) Detach();
575
65603
}
576
577
32848
MessagePort::MessagePort(Environment* env,
578
                         Local<Context> context,
579
32848
                         Local<Object> wrap)
580
  : HandleWrap(env,
581
               wrap,
582
32848
               reinterpret_cast<uv_handle_t*>(&async_),
583
               AsyncWrap::PROVIDER_MESSAGEPORT),
584
32848
    data_(new MessagePortData(this)) {
585
93891
  auto onmessage = [](uv_async_t* handle) {
586
    // Called when data has been put into the queue.
587
30522
    MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
588
30522
    channel->OnMessage();
589
93884
  };
590
591
32847
  CHECK_EQ(uv_async_init(env->event_loop(),
592
                         &async_,
593
                         onmessage), 0);
594
  // Reset later to indicate success of the constructor.
595
32847
  bool succeeded = false;
596
98542
  auto cleanup = OnScopeLeave([&]() { if (!succeeded) Close(); });
597
598
  Local<Value> fn;
599
98542
  if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn))
600
    return;
601
602
32847
  if (fn->IsFunction()) {
603
32843
    Local<Function> init = fn.As<Function>();
604
65685
    if (init->Call(context, wrap, 0, nullptr).IsEmpty())
605
      return;
606
  }
607
608
  Local<Function> emit_message_fn;
609
65695
  if (!GetEmitMessageFunction(context).ToLocal(&emit_message_fn))
610
    return;
611
32848
  emit_message_fn_.Reset(env->isolate(), emit_message_fn);
612
613
32848
  succeeded = true;
614
32848
  Debug(this, "Created message port");
615
}
616
617
31946
bool MessagePort::IsDetached() const {
618

31946
  return data_ == nullptr || IsHandleClosing();
619
}
620
621
85727
void MessagePort::TriggerAsync() {
622
85727
  if (IsHandleClosing()) return;
623
85695
  CHECK_EQ(uv_async_send(&async_), 0);
624
}
625
626
32801
void MessagePort::Close(v8::Local<v8::Value> close_callback) {
627
65604
  Debug(this, "Closing message port, data set = %d", static_cast<int>(!!data_));
628
629
32803
  if (data_) {
630
    // Wrap this call with accessing the mutex, so that TriggerAsync()
631
    // can check IsHandleClosing() without race conditions.
632
65596
    Mutex::ScopedLock sibling_lock(data_->mutex_);
633
32800
    HandleWrap::Close(close_callback);
634
  } else {
635
5
    HandleWrap::Close(close_callback);
636
  }
637
32805
}
638
639
2
void MessagePort::New(const FunctionCallbackInfo<Value>& args) {
640
  // This constructor just throws an error. Unfortunately, we can’t use V8’s
641
  // ConstructorBehavior::kThrow, as that also removes the prototype from the
642
  // class (i.e. makes it behave like an arrow function).
643
2
  Environment* env = Environment::GetCurrent(args);
644
2
  THROW_ERR_CONSTRUCT_CALL_INVALID(env);
645
2
}
646
647
32848
MessagePort* MessagePort::New(
648
    Environment* env,
649
    Local<Context> context,
650
    std::unique_ptr<MessagePortData> data) {
651
  Context::Scope context_scope(context);
652
32848
  Local<FunctionTemplate> ctor_templ = GetMessagePortConstructorTemplate(env);
653
654
  // Construct a new instance, then assign the listener instance and possibly
655
  // the MessagePortData to it.
656
  Local<Object> instance;
657
98544
  if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance))
658
    return nullptr;
659
32848
  MessagePort* port = new MessagePort(env, context, instance);
660
32848
  CHECK_NOT_NULL(port);
661
32848
  if (port->IsHandleClosing()) {
662
    // Construction failed with an exception.
663
    return nullptr;
664
  }
665
666
32847
  if (data) {
667
10825
    port->Detach();
668
10825
    port->data_ = std::move(data);
669
670
    // This lock is here to avoid race conditions with the `owner_` read
671
    // in AddToIncomingQueue(). (This would likely be unproblematic without it,
672
    // but it's better to be safe than sorry.)
673
21650
    Mutex::ScopedLock lock(port->data_->mutex_);
674
10825
    port->data_->owner_ = port;
675
    // If the existing MessagePortData object had pending messages, this is
676
    // the easiest way to run that queue.
677
10825
    port->TriggerAsync();
678
  }
679
32848
  return port;
680
}
681
682
84689
MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
683
                                              bool only_if_receiving) {
684
169395
  Message received;
685
  {
686
    // Get the head of the message queue.
687
149585
    Mutex::ScopedLock lock(data_->mutex_);
688
689
84706
    Debug(this, "MessagePort has message");
690
691

84706
    bool wants_message = receiving_messages_ || !only_if_receiving;
692
    // We have nothing to do if:
693
    // - There are no pending messages
694
    // - We are not intending to receive messages, and the message we would
695
    //   receive is not the final "close" message.
696

169404
    if (data_->incoming_messages_.empty() ||
697
75142
        (!wants_message &&
698
10240
         !data_->incoming_messages_.front().IsCloseMessage())) {
699
39598
      return env()->no_message_symbol();
700
    }
701
702
64898
    received = std::move(data_->incoming_messages_.front());
703
64881
    data_->incoming_messages_.pop_front();
704
  }
705
706
64903
  if (received.IsCloseMessage()) {
707
22470
    Close();
708
22470
    return env()->no_message_symbol();
709
  }
710
711
53634
  if (!env()->can_call_into_js()) return MaybeLocal<Value>();
712
713
53662
  return received.Deserialize(env(), context);
714
}
715
716
31096
void MessagePort::OnMessage() {
717
31096
  Debug(this, "Running MessagePort::OnMessage()");
718
62129
  HandleScope handle_scope(env()->isolate());
719
62192
  Local<Context> context = object(env()->isolate())->CreationContext();
720
721
  size_t processing_limit;
722
  {
723
31096
    Mutex::ScopedLock(data_->mutex_);
724
62192
    processing_limit = std::max(data_->incoming_messages_.size(),
725
93288
                                static_cast<size_t>(1000));
726
  }
727
728
  // data_ can only ever be modified by the owner thread, so no need to lock.
729
  // However, the message port may be transferred while it is processing
730
  // messages, so we need to check that this handle still owns its `data_` field
731
  // on every iteration.
732

138310
  while (data_) {
733
84698
    if (processing_limit-- == 0) {
734
      // Prevent event loop starvation by only processing those messages without
735
      // interruption that were already present when the OnMessage() call was
736
      // first triggered, but at least 1000 messages because otherwise the
737
      // overhead of repeatedly triggering the uv_async_t instance becomes
738
      // noticable, at least on Windows.
739
      // (That might require more investigation by somebody more familiar with
740
      // Windows.)
741
3
      TriggerAsync();
742
3
      return;
743
    }
744
745
138302
    HandleScope handle_scope(env()->isolate());
746

53610
    Context::Scope context_scope(context);
747
84673
    Local<Function> emit_message = PersistentToLocal::Strong(emit_message_fn_);
748
749
    Local<Value> payload;
750
    Local<Value> message_error;
751
254021
    Local<Value> argv[2];
752
753
    {
754
      // Catch any exceptions from parsing the message itself (not from
755
      // emitting it) as 'messageeror' events.
756
169195
      TryCatchScope try_catch(env());
757
169279
      if (!ReceiveMessage(context, true).ToLocal(&payload)) {
758

6
        if (try_catch.HasCaught() && !try_catch.HasTerminated())
759
6
          message_error = try_catch.Exception();
760
6
        goto reschedule;
761
      }
762
    }
763
169207
    if (payload == env()->no_message_symbol()) break;
764
765
53657
    if (!env()->can_call_into_js()) {
766
      Debug(this, "MessagePort drains queue because !can_call_into_js()");
767
      // In this case there is nothing to do but to drain the current queue.
768
      continue;
769
    }
770
771
53641
    argv[0] = payload;
772
107290
    argv[1] = env()->message_string();
773
774
107309
    if (MakeCallback(emit_message, arraysize(argv), argv).IsEmpty()) {
775
    reschedule:
776
57
      if (!message_error.IsEmpty()) {
777
6
        argv[0] = message_error;
778
12
        argv[1] = env()->messageerror_string();
779
6
        USE(MakeCallback(emit_message, arraysize(argv), argv));
780
      }
781
782
      // Re-schedule OnMessage() execution in case of failure.
783
57
      if (data_)
784
57
        TriggerAsync();
785
57
      return;
786
    }
787
  }
788
}
789
790
32801
void MessagePort::OnClose() {
791
32801
  Debug(this, "MessagePort::OnClose()");
792
32803
  if (data_) {
793
    // Detach() returns move(data_).
794
22157
    Detach()->Disentangle();
795
  }
796
32803
}
797
798
43624
std::unique_ptr<MessagePortData> MessagePort::Detach() {
799
43624
  CHECK(data_);
800
87255
  Mutex::ScopedLock lock(data_->mutex_);
801
43628
  data_->owner_ = nullptr;
802
87256
  return std::move(data_);
803
}
804
805
21289
BaseObject::TransferMode MessagePort::GetTransferMode() const {
806
21289
  if (IsDetached())
807
1
    return BaseObject::TransferMode::kUntransferable;
808
21288
  return BaseObject::TransferMode::kTransferable;
809
}
810
811
10642
std::unique_ptr<TransferData> MessagePort::TransferForMessaging() {
812
21284
  Close();
813
10642
  return Detach();
814
}
815
816
10412
BaseObjectPtr<BaseObject> MessagePortData::Deserialize(
817
    Environment* env,
818
    Local<Context> context,
819
    std::unique_ptr<TransferData> self) {
820
20824
  return BaseObjectPtr<MessagePort> { MessagePort::New(
821
      env, context,
822
31235
      static_unique_pointer_cast<MessagePortData>(std::move(self))) };
823
}
824
825
54547
Maybe<bool> MessagePort::PostMessage(Environment* env,
826
                                     Local<Value> message_v,
827
                                     const TransferList& transfer_v) {
828
54547
  Isolate* isolate = env->isolate();
829
54548
  Local<Object> obj = object(isolate);
830
54549
  Local<Context> context = obj->CreationContext();
831
832
109098
  Message msg;
833
834
  // Per spec, we need to both check if transfer list has the source port, and
835
  // serialize the input message, even if the MessagePort is closed or detached.
836
837
  Maybe<bool> serialization_maybe =
838
54549
      msg.Serialize(env, context, message_v, transfer_v, obj);
839
54548
  if (data_ == nullptr) {
840
2
    return serialization_maybe;
841
  }
842
54545
  if (serialization_maybe.IsNothing()) {
843
17
    return Nothing<bool>();
844
  }
845
846
109057
  Mutex::ScopedLock lock(*data_->sibling_mutex_);
847
54530
  bool doomed = false;
848
849
  // Check if the target port is posted to itself.
850
54530
  if (data_->sibling_ != nullptr) {
851
65107
    for (const auto& transferable : msg.transferables()) {
852
10661
      if (data_->sibling_ == transferable.get()) {
853
2
        doomed = true;
854
        ProcessEmitWarning(env, "The target port was posted to itself, and "
855
2
                                "the communication channel was lost");
856
2
        break;
857
      }
858
    }
859
  }
860
861

54530
  if (data_->sibling_ == nullptr || doomed)
862
84
    return Just(true);
863
864
54446
  data_->sibling_->AddToIncomingQueue(std::move(msg));
865
54445
  return Just(true);
866
}
867
868
10700
static Maybe<bool> ReadIterable(Environment* env,
869
                                Local<Context> context,
870
                                // NOLINTNEXTLINE(runtime/references)
871
                                TransferList& transfer_list,
872
                                Local<Value> object) {
873
10700
  if (!object->IsObject()) return Just(false);
874
875
10697
  if (object->IsArray()) {
876
10677
    Local<Array> arr = object.As<Array>();
877
10677
    size_t length = arr->Length();
878
10677
    transfer_list.AllocateSufficientStorage(length);
879
21365
    for (size_t i = 0; i < length; i++) {
880
32064
      if (!arr->Get(context, i).ToLocal(&transfer_list[i]))
881
        return Nothing<bool>();
882
    }
883
10677
    return Just(true);
884
  }
885
886
20
  Isolate* isolate = env->isolate();
887
  Local<Value> iterator_method;
888
80
  if (!object.As<Object>()->Get(context, Symbol::GetIterator(isolate))
889
20
      .ToLocal(&iterator_method)) return Nothing<bool>();
890
20
  if (!iterator_method->IsFunction()) return Just(false);
891
892
  Local<Value> iterator;
893
18
  if (!iterator_method.As<Function>()->Call(context, object, 0, nullptr)
894
6
      .ToLocal(&iterator)) return Nothing<bool>();
895
6
  if (!iterator->IsObject()) return Just(false);
896
897
  Local<Value> next;
898
24
  if (!iterator.As<Object>()->Get(context, env->next_string()).ToLocal(&next))
899
    return Nothing<bool>();
900
6
  if (!next->IsFunction()) return Just(false);
901
902
6
  std::vector<Local<Value>> entries;
903
7
  while (env->can_call_into_js()) {
904
    Local<Value> result;
905
15
    if (!next.As<Function>()->Call(context, iterator, 0, nullptr)
906
7
        .ToLocal(&result)) return Nothing<bool>();
907
4
    if (!result->IsObject()) return Just(false);
908
909
    Local<Value> done;
910
16
    if (!result.As<Object>()->Get(context, env->done_string()).ToLocal(&done))
911
      return Nothing<bool>();
912
4
    if (done->BooleanValue(isolate)) break;
913
914
    Local<Value> val;
915
8
    if (!result.As<Object>()->Get(context, env->value_string()).ToLocal(&val))
916
      return Nothing<bool>();
917
2
    entries.push_back(val);
918
  }
919
920
2
  transfer_list.AllocateSufficientStorage(entries.size());
921
2
  std::copy(entries.begin(), entries.end(), &transfer_list[0]);
922
2
  return Just(true);
923
}
924
925
54561
void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
926
54561
  Environment* env = Environment::GetCurrent(args);
927
54561
  Local<Object> obj = args.This();
928
54561
  Local<Context> context = obj->CreationContext();
929
930
54560
  if (args.Length() == 0) {
931
    return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
932
13
                                       "MessagePort.postMessage");
933
  }
934
935

185066
  if (!args[1]->IsNullOrUndefined() && !args[1]->IsObject()) {
936
    // Browsers ignore null or undefined, and otherwise accept an array or an
937
    // options object.
938
    return THROW_ERR_INVALID_ARG_TYPE(env,
939
4
        "Optional transferList argument must be an iterable");
940
  }
941
942
109104
  TransferList transfer_list;
943
109116
  if (args[1]->IsObject()) {
944
    bool was_iterable;
945
21378
    if (!ReadIterable(env, context, transfer_list, args[1]).To(&was_iterable))
946
8
      return;
947
10689
    if (!was_iterable) {
948
      Local<Value> transfer_option;
949
65
      if (!args[1].As<Object>()->Get(context, env->transfer_string())
950
21
          .ToLocal(&transfer_option)) return;
951
26
      if (!transfer_option->IsUndefined()) {
952
22
        if (!ReadIterable(env, context, transfer_list, transfer_option)
953
12
            .To(&was_iterable)) return;
954
10
        if (!was_iterable) {
955
          return THROW_ERR_INVALID_ARG_TYPE(env,
956
7
              "Optional options.transfer argument must be an iterable");
957
        }
958
      }
959
    }
960
  }
961
962
54549
  MessagePort* port = Unwrap<MessagePort>(args.This());
963
  // Even if the backing MessagePort object has already been deleted, we still
964
  // want to serialize the message to ensure spec-compliant behavior w.r.t.
965
  // transfers.
966
54550
  if (port == nullptr) {
967
2
    Message msg;
968
1
    USE(msg.Serialize(env, context, args[0], transfer_list, obj));
969
1
    return;
970
  }
971
972
54549
  port->PostMessage(env, args[0], transfer_list);
973
}
974
975
23216
void MessagePort::Start() {
976
23216
  Debug(this, "Start receiving messages");
977
23216
  receiving_messages_ = true;
978
46432
  Mutex::ScopedLock lock(data_->mutex_);
979
23216
  if (!data_->incoming_messages_.empty())
980
10842
    TriggerAsync();
981
23216
}
982
983
20325
void MessagePort::Stop() {
984
20325
  Debug(this, "Stop receiving messages");
985
20325
  receiving_messages_ = false;
986
20325
}
987
988
23217
void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
989
  MessagePort* port;
990
23218
  ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
991
23217
  if (!port->data_) {
992
1
    return;
993
  }
994
23216
  port->Start();
995
}
996
997
20479
void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
998
  MessagePort* port;
999
40958
  CHECK(args[0]->IsObject());
1000
41112
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
1001
20327
  if (!port->data_) {
1002
2
    return;
1003
  }
1004
20325
  port->Stop();
1005
}
1006
1007
1198
void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
1008
  MessagePort* port;
1009
2396
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
1010
574
  port->OnMessage();
1011
}
1012
1013
11
void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) {
1014
11
  Environment* env = Environment::GetCurrent(args);
1015

41
  if (!args[0]->IsObject() ||
1016
27
      !env->message_port_constructor_template()->HasInstance(args[0])) {
1017
    return THROW_ERR_INVALID_ARG_TYPE(env,
1018
10
        "The \"port\" argument must be a MessagePort instance");
1019
  }
1020
12
  MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1021
6
  if (port == nullptr) {
1022
    // Return 'no messages' for a closed port.
1023
    args.GetReturnValue().Set(
1024
        Environment::GetCurrent(args)->no_message_symbol());
1025
    return;
1026
  }
1027
1028
  MaybeLocal<Value> payload =
1029
12
      port->ReceiveMessage(port->object()->CreationContext(), false);
1030
6
  if (!payload.IsEmpty())
1031
12
    args.GetReturnValue().Set(payload.ToLocalChecked());
1032
}
1033
1034
5
void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
1035
5
  Environment* env = Environment::GetCurrent(args);
1036

20
  if (!args[0]->IsObject() ||
1037
15
      !env->message_port_constructor_template()->HasInstance(args[0])) {
1038
    return THROW_ERR_INVALID_ARG_TYPE(env,
1039
        "The \"port\" argument must be a MessagePort instance");
1040
  }
1041
10
  MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1042
5
  CHECK_NOT_NULL(port);
1043
1044
5
  Local<Value> context_arg = args[1];
1045
  ContextifyContext* context_wrapper;
1046

15
  if (!context_arg->IsObject() ||
1047
      (context_wrapper = ContextifyContext::ContextFromContextifiedSandbox(
1048
10
          env, context_arg.As<Object>())) == nullptr) {
1049
    return THROW_ERR_INVALID_ARG_TYPE(env, "Invalid context argument");
1050
  }
1051
1052
10
  std::unique_ptr<MessagePortData> data;
1053
5
  if (!port->IsDetached())
1054
5
    data = port->Detach();
1055
1056
5
  Context::Scope context_scope(context_wrapper->context());
1057
  MessagePort* target =
1058
5
      MessagePort::New(env, context_wrapper->context(), std::move(data));
1059
5
  if (target != nullptr)
1060
15
    args.GetReturnValue().Set(target->object());
1061
}
1062
1063
10698
void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
1064
10698
  Entangle(a, b->data_.get());
1065
10698
}
1066
1067
11325
void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
1068
11325
  MessagePortData::Entangle(a->data_.get(), b);
1069
11325
}
1070
1071
void MessagePort::MemoryInfo(MemoryTracker* tracker) const {
1072
  tracker->TrackField("data", data_);
1073
  tracker->TrackField("emit_message_fn", emit_message_fn_);
1074
}
1075
1076
33751
Local<FunctionTemplate> GetMessagePortConstructorTemplate(Environment* env) {
1077
  // Factor generating the MessagePort JS constructor into its own piece
1078
  // of code, because it is needed early on in the child environment setup.
1079
33751
  Local<FunctionTemplate> templ = env->message_port_constructor_template();
1080
33751
  if (!templ.IsEmpty())
1081
33299
    return templ;
1082
1083
  {
1084
452
    Local<FunctionTemplate> m = env->NewFunctionTemplate(MessagePort::New);
1085
904
    m->SetClassName(env->message_port_constructor_string());
1086
1356
    m->InstanceTemplate()->SetInternalFieldCount(
1087
452
        MessagePort::kInternalFieldCount);
1088
904
    m->Inherit(HandleWrap::GetConstructorTemplate(env));
1089
1090
452
    env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage);
1091
452
    env->SetProtoMethod(m, "start", MessagePort::Start);
1092
1093
452
    env->set_message_port_constructor_template(m);
1094
  }
1095
1096
452
  return GetMessagePortConstructorTemplate(env);
1097
}
1098
1099
297
JSTransferable::JSTransferable(Environment* env, Local<Object> obj)
1100
297
    : BaseObject(env, obj) {
1101
297
  MakeWeak();
1102
297
}
1103
1104
297
void JSTransferable::New(const FunctionCallbackInfo<Value>& args) {
1105
297
  CHECK(args.IsConstructCall());
1106
594
  new JSTransferable(Environment::GetCurrent(args), args.This());
1107
297
}
1108
1109
17
JSTransferable::TransferMode JSTransferable::GetTransferMode() const {
1110
  // Implement `kClone in this ? kCloneable : kTransferable`.
1111
34
  HandleScope handle_scope(env()->isolate());
1112
34
  errors::TryCatchScope ignore_exceptions(env());
1113
1114
  bool has_clone;
1115
68
  if (!object()->Has(env()->context(),
1116
85
                     env()->messaging_clone_symbol()).To(&has_clone)) {
1117
    return TransferMode::kUntransferable;
1118
  }
1119
1120
17
  return has_clone ? TransferMode::kCloneable : TransferMode::kTransferable;
1121
}
1122
1123
8
std::unique_ptr<TransferData> JSTransferable::TransferForMessaging() {
1124
8
  return TransferOrClone(TransferMode::kTransferable);
1125
}
1126
1127
2
std::unique_ptr<TransferData> JSTransferable::CloneForMessaging() const {
1128
2
  return TransferOrClone(TransferMode::kCloneable);
1129
}
1130
1131
10
std::unique_ptr<TransferData> JSTransferable::TransferOrClone(
1132
    TransferMode mode) const {
1133
  // Call `this[symbol]()` where `symbol` is `kClone` or `kTransfer`,
1134
  // which should return an object with `data` and `deserializeInfo` properties;
1135
  // `data` is written to the serializer later, and `deserializeInfo` is stored
1136
  // on the `TransferData` instance as a string.
1137
20
  HandleScope handle_scope(env()->isolate());
1138
10
  Local<Context> context = env()->isolate()->GetCurrentContext();
1139
  Local<Symbol> method_name = mode == TransferMode::kCloneable ?
1140
10
      env()->messaging_clone_symbol() : env()->messaging_transfer_symbol();
1141
1142
  Local<Value> method;
1143
30
  if (!object()->Get(context, method_name).ToLocal(&method)) {
1144
    return {};
1145
  }
1146
10
  if (method->IsFunction()) {
1147
    Local<Value> result_v;
1148
32
    if (!method.As<Function>()->Call(
1149
32
            context, object(), 0, nullptr).ToLocal(&result_v)) {
1150
10
      return {};
1151
    }
1152
1153
6
    if (result_v->IsObject()) {
1154
6
      Local<Object> result = result_v.As<Object>();
1155
      Local<Value> data;
1156
      Local<Value> deserialize_info;
1157

30
      if (!result->Get(context, env()->data_string()).ToLocal(&data) ||
1158
24
          !result->Get(context, env()->deserialize_info_string())
1159
6
              .ToLocal(&deserialize_info)) {
1160
        return {};
1161
      }
1162
12
      Utf8Value deserialize_info_str(env()->isolate(), deserialize_info);
1163
6
      if (*deserialize_info_str == nullptr) return {};
1164
12
      return std::make_unique<Data>(
1165
30
          *deserialize_info_str, Global<Value>(env()->isolate(), data));
1166
    }
1167
  }
1168
1169
2
  if (mode == TransferMode::kTransferable)
1170
    return TransferOrClone(TransferMode::kCloneable);
1171
  else
1172
2
    return {};
1173
}
1174
1175
Maybe<BaseObjectList>
1176
8
JSTransferable::NestedTransferables() const {
1177
  // Call `this[kTransferList]()` and return the resulting list of BaseObjects.
1178
16
  HandleScope handle_scope(env()->isolate());
1179
8
  Local<Context> context = env()->isolate()->GetCurrentContext();
1180
8
  Local<Symbol> method_name = env()->messaging_transfer_list_symbol();
1181
1182
  Local<Value> method;
1183
24
  if (!object()->Get(context, method_name).ToLocal(&method)) {
1184
    return Nothing<BaseObjectList>();
1185
  }
1186
8
  if (!method->IsFunction()) return Just(BaseObjectList {});
1187
1188
  Local<Value> list_v;
1189
32
  if (!method.As<Function>()->Call(
1190
32
          context, object(), 0, nullptr).ToLocal(&list_v)) {
1191
    return Nothing<BaseObjectList>();
1192
  }
1193
8
  if (!list_v->IsArray()) return Just(BaseObjectList {});
1194
8
  Local<Array> list = list_v.As<Array>();
1195
1196
16
  BaseObjectList ret;
1197
32
  for (size_t i = 0; i < list->Length(); i++) {
1198
    Local<Value> value;
1199
24
    if (!list->Get(context, i).ToLocal(&value))
1200
      return Nothing<BaseObjectList>();
1201
16
    if (env()->base_object_ctor_template()->HasInstance(value))
1202
8
      ret.emplace_back(Unwrap<BaseObject>(value));
1203
  }
1204
8
  return Just(ret);
1205
}
1206
1207
1
Maybe<bool> JSTransferable::FinalizeTransferRead(
1208
    Local<Context> context, ValueDeserializer* deserializer) {
1209
  // Call `this[kDeserialize](data)` where `data` comes from the return value
1210
  // of `this[kTransfer]()` or `this[kClone]()`.
1211
2
  HandleScope handle_scope(env()->isolate());
1212
  Local<Value> data;
1213
2
  if (!deserializer->ReadValue(context).ToLocal(&data)) return Nothing<bool>();
1214
1215
1
  Local<Symbol> method_name = env()->messaging_deserialize_symbol();
1216
  Local<Value> method;
1217
3
  if (!object()->Get(context, method_name).ToLocal(&method)) {
1218
    return Nothing<bool>();
1219
  }
1220
1
  if (!method->IsFunction()) return Just(true);
1221
1222
4
  if (method.As<Function>()->Call(context, object(), 1, &data).IsEmpty()) {
1223
    return Nothing<bool>();
1224
  }
1225
1
  return Just(true);
1226
}
1227
1228
6
JSTransferable::Data::Data(std::string&& deserialize_info,
1229
6
                           v8::Global<v8::Value>&& data)
1230
6
    : deserialize_info_(std::move(deserialize_info)),
1231
18
      data_(std::move(data)) {}
1232
1233
4
BaseObjectPtr<BaseObject> JSTransferable::Data::Deserialize(
1234
    Environment* env,
1235
    Local<Context> context,
1236
    std::unique_ptr<TransferData> self) {
1237
  // Create the JS wrapper object that will later be filled with data passed to
1238
  // the `[kDeserialize]()` method on it. This split is necessary, because here
1239
  // we need to create an object with the right prototype and internal fields,
1240
  // but the actual JS data stored in the serialized data can only be read at
1241
  // the end of the stream, after the main message has been read.
1242
1243
8
  if (context != env->context()) {
1244
1
    THROW_ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE(env);
1245
1
    return {};
1246
  }
1247
6
  HandleScope handle_scope(env->isolate());
1248
  Local<Value> info;
1249
6
  if (!ToV8Value(context, deserialize_info_).ToLocal(&info)) return {};
1250
1251
  Local<Value> ret;
1252
6
  CHECK(!env->messaging_deserialize_create_object().IsEmpty());
1253
18
  if (!env->messaging_deserialize_create_object()->Call(
1254

16
          context, Null(env->isolate()), 1, &info).ToLocal(&ret) ||
1255
5
      !env->base_object_ctor_template()->HasInstance(ret)) {
1256
2
    return {};
1257
  }
1258
1259
1
  return BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(ret) };
1260
}
1261
1262
6
Maybe<bool> JSTransferable::Data::FinalizeTransferWrite(
1263
    Local<Context> context, ValueSerializer* serializer) {
1264
12
  HandleScope handle_scope(context->GetIsolate());
1265
6
  auto ret = serializer->WriteValue(context, PersistentToLocal::Strong(data_));
1266
6
  data_.Reset();
1267
12
  return ret;
1268
}
1269
1270
namespace {
1271
1272
452
static void SetDeserializerCreateObjectFunction(
1273
    const FunctionCallbackInfo<Value>& args) {
1274
452
  Environment* env = Environment::GetCurrent(args);
1275
904
  CHECK(args[0]->IsFunction());
1276
904
  env->set_messaging_deserialize_create_object(args[0].As<Function>());
1277
452
}
1278
1279
10699
static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
1280
10699
  Environment* env = Environment::GetCurrent(args);
1281
10699
  if (!args.IsConstructCall()) {
1282
1
    THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
1283
1
    return;
1284
  }
1285
1286
21396
  Local<Context> context = args.This()->CreationContext();
1287
10698
  Context::Scope context_scope(context);
1288
1289
10698
  MessagePort* port1 = MessagePort::New(env, context);
1290
10698
  if (port1 == nullptr) return;
1291
10698
  MessagePort* port2 = MessagePort::New(env, context);
1292
10698
  if (port2 == nullptr) {
1293
    port1->Close();
1294
    return;
1295
  }
1296
1297
10698
  MessagePort::Entangle(port1, port2);
1298
1299
53490
  args.This()->Set(context, env->port1_string(), port1->object())
1300
      .Check();
1301
53490
  args.This()->Set(context, env->port2_string(), port2->object())
1302
      .Check();
1303
}
1304
1305
452
static void InitMessaging(Local<Object> target,
1306
                          Local<Value> unused,
1307
                          Local<Context> context,
1308
                          void* priv) {
1309
452
  Environment* env = Environment::GetCurrent(context);
1310
1311
  {
1312
    Local<String> message_channel_string =
1313
452
        FIXED_ONE_BYTE_STRING(env->isolate(), "MessageChannel");
1314
452
    Local<FunctionTemplate> templ = env->NewFunctionTemplate(MessageChannel);
1315
452
    templ->SetClassName(message_channel_string);
1316
904
    target->Set(context,
1317
                message_channel_string,
1318
1356
                templ->GetFunction(context).ToLocalChecked()).Check();
1319
  }
1320
1321
  {
1322
    Local<String> js_transferable_string =
1323
452
        FIXED_ONE_BYTE_STRING(env->isolate(), "JSTransferable");
1324
452
    Local<FunctionTemplate> t = env->NewFunctionTemplate(JSTransferable::New);
1325
904
    t->Inherit(BaseObject::GetConstructorTemplate(env));
1326
452
    t->SetClassName(js_transferable_string);
1327
1356
    t->InstanceTemplate()->SetInternalFieldCount(
1328
452
        JSTransferable::kInternalFieldCount);
1329
904
    target->Set(context,
1330
                js_transferable_string,
1331
1356
                t->GetFunction(context).ToLocalChecked()).Check();
1332
  }
1333
1334
904
  target->Set(context,
1335
              env->message_port_constructor_string(),
1336
904
              GetMessagePortConstructorTemplate(env)
1337
1808
                  ->GetFunction(context).ToLocalChecked()).Check();
1338
1339
  // These are not methods on the MessagePort prototype, because
1340
  // the browser equivalents do not provide them.
1341
452
  env->SetMethod(target, "stopMessagePort", MessagePort::Stop);
1342
452
  env->SetMethod(target, "drainMessagePort", MessagePort::Drain);
1343
452
  env->SetMethod(target, "receiveMessageOnPort", MessagePort::ReceiveMessage);
1344
  env->SetMethod(target, "moveMessagePortToContext",
1345
452
                 MessagePort::MoveToContext);
1346
  env->SetMethod(target, "setDeserializerCreateObjectFunction",
1347
452
                 SetDeserializerCreateObjectFunction);
1348
1349
  {
1350
904
    Local<Function> domexception = GetDOMException(context).ToLocalChecked();
1351
    target
1352
904
        ->Set(context,
1353
              FIXED_ONE_BYTE_STRING(env->isolate(), "DOMException"),
1354
1356
              domexception)
1355
        .Check();
1356
  }
1357
452
}
1358
1359
4402
static void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
1360
4402
  registry->Register(MessageChannel);
1361
4402
  registry->Register(JSTransferable::New);
1362
4402
  registry->Register(MessagePort::New);
1363
4402
  registry->Register(MessagePort::PostMessage);
1364
4402
  registry->Register(MessagePort::Start);
1365
4402
  registry->Register(MessagePort::Stop);
1366
4402
  registry->Register(MessagePort::Drain);
1367
4402
  registry->Register(MessagePort::ReceiveMessage);
1368
4402
  registry->Register(MessagePort::MoveToContext);
1369
4402
  registry->Register(SetDeserializerCreateObjectFunction);
1370
4402
}
1371
1372
}  // anonymous namespace
1373
1374
}  // namespace worker
1375
}  // namespace node
1376
1377
4471
NODE_MODULE_CONTEXT_AWARE_INTERNAL(messaging, node::worker::InitMessaging)
1378

17818
NODE_MODULE_EXTERNAL_REFERENCE(messaging,
1379
                               node::worker::RegisterExternalReferences)