GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/node_messaging.cc Lines: 633 695 91.1 %
Date: 2020-06-24 22:13:30 Branches: 311 435 71.5 %

Line Branch Exec Source
1
#include "node_messaging.h"
2
3
#include "async_wrap-inl.h"
4
#include "debug_utils-inl.h"
5
#include "memory_tracker-inl.h"
6
#include "node_contextify.h"
7
#include "node_buffer.h"
8
#include "node_errors.h"
9
#include "node_process.h"
10
#include "util-inl.h"
11
12
using node::contextify::ContextifyContext;
13
using node::errors::TryCatchScope;
14
using v8::Array;
15
using v8::ArrayBuffer;
16
using v8::BackingStore;
17
using v8::CompiledWasmModule;
18
using v8::Context;
19
using v8::EscapableHandleScope;
20
using v8::Function;
21
using v8::FunctionCallbackInfo;
22
using v8::FunctionTemplate;
23
using v8::Global;
24
using v8::HandleScope;
25
using v8::Isolate;
26
using v8::Just;
27
using v8::Local;
28
using v8::Maybe;
29
using v8::MaybeLocal;
30
using v8::Nothing;
31
using v8::Object;
32
using v8::SharedArrayBuffer;
33
using v8::String;
34
using v8::Symbol;
35
using v8::Value;
36
using v8::ValueDeserializer;
37
using v8::ValueSerializer;
38
using v8::WasmModuleObject;
39
40
namespace node {
41
42
using BaseObjectList = std::vector<BaseObjectPtr<BaseObject>>;
43
44
1
BaseObject::TransferMode BaseObject::GetTransferMode() const {
45
1
  return BaseObject::TransferMode::kUntransferable;
46
}
47
48
std::unique_ptr<worker::TransferData> BaseObject::TransferForMessaging() {
49
  return CloneForMessaging();
50
}
51
52
std::unique_ptr<worker::TransferData> BaseObject::CloneForMessaging() const {
53
  return {};
54
}
55
56
10575
Maybe<BaseObjectList> BaseObject::NestedTransferables() const {
57
10575
  return Just(BaseObjectList {});
58
}
59
60
10347
Maybe<bool> BaseObject::FinalizeTransferRead(
61
    Local<Context> context, ValueDeserializer* deserializer) {
62
10347
  return Just(true);
63
}
64
65
namespace worker {
66
67
10582
Maybe<bool> TransferData::FinalizeTransferWrite(
68
    Local<Context> context, ValueSerializer* serializer) {
69
10582
  return Just(true);
70
}
71
72
181008
Message::Message(MallocedBuffer<char>&& buffer)
73
181008
    : main_message_buf_(std::move(buffer)) {}
74
75
104939
bool Message::IsCloseMessage() const {
76
104939
  return main_message_buf_.data == nullptr;
77
}
78
79
namespace {
80
81
// This is used to tell V8 how to read transferred host objects, like other
82
// `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
83
41724
class DeserializerDelegate : public ValueDeserializer::Delegate {
84
 public:
85
41808
  DeserializerDelegate(
86
      Message* m,
87
      Environment* env,
88
      const std::vector<BaseObjectPtr<BaseObject>>& host_objects,
89
      const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
90
      const std::vector<CompiledWasmModule>& wasm_modules)
91
41808
      : host_objects_(host_objects),
92
        shared_array_buffers_(shared_array_buffers),
93
41808
        wasm_modules_(wasm_modules) {}
94
95
10348
  MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
96
    // Identifying the index in the message's BaseObject array is sufficient.
97
    uint32_t id;
98
10348
    if (!deserializer->ReadUint32(&id))
99
      return MaybeLocal<Object>();
100
10348
    CHECK_LE(id, host_objects_.size());
101
20696
    return host_objects_[id]->object(isolate);
102
  }
103
104
358
  MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
105
      Isolate* isolate, uint32_t clone_id) override {
106
358
    CHECK_LE(clone_id, shared_array_buffers_.size());
107
716
    return shared_array_buffers_[clone_id];
108
  }
109
110
2
  MaybeLocal<WasmModuleObject> GetWasmModuleFromId(
111
      Isolate* isolate, uint32_t transfer_id) override {
112
2
    CHECK_LE(transfer_id, wasm_modules_.size());
113
    return WasmModuleObject::FromCompiledModule(
114
2
        isolate, wasm_modules_[transfer_id]);
115
  }
116
117
  ValueDeserializer* deserializer = nullptr;
118
119
 private:
120
  const std::vector<BaseObjectPtr<BaseObject>>& host_objects_;
121
  const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
122
  const std::vector<CompiledWasmModule>& wasm_modules_;
123
};
124
125
}  // anonymous namespace
126
127
41813
MaybeLocal<Value> Message::Deserialize(Environment* env,
128
                                       Local<Context> context) {
129
41813
  CHECK(!IsCloseMessage());
130
131
41825
  EscapableHandleScope handle_scope(env->isolate());
132
  Context::Scope context_scope(context);
133
134
  // Create all necessary objects for transferables, e.g. MessagePort handles.
135
83626
  std::vector<BaseObjectPtr<BaseObject>> host_objects(transferables_.size());
136
41679
  auto cleanup = OnScopeLeave([&]() {
137
41679
    for (BaseObjectPtr<BaseObject> object : host_objects) {
138
9
      if (!object) continue;
139
140
      // If the function did not finish successfully, host_objects will contain
141
      // a list of objects that will never be passed to JS. Therefore, we
142
      // destroy them here.
143
      object->Detach();
144
    }
145
125312
  });
146
147
52169
  for (uint32_t i = 0; i < transferables_.size(); ++i) {
148
10354
    TransferData* data = transferables_[i].get();
149
20708
    host_objects[i] = data->Deserialize(
150
20708
        env, context, std::move(transferables_[i]));
151
10360
    if (!host_objects[i]) return {};
152
  }
153
41806
  transferables_.clear();
154
155
83607
  std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
156
  // Attach all transferred SharedArrayBuffers to their new Isolate.
157
42160
  for (uint32_t i = 0; i < shared_array_buffers_.size(); ++i) {
158
    Local<SharedArrayBuffer> sab =
159
        SharedArrayBuffer::New(env->isolate(),
160
358
                               std::move(shared_array_buffers_[i]));
161
358
    shared_array_buffers.push_back(sab);
162
  }
163
41764
  shared_array_buffers_.clear();
164
165
  DeserializerDelegate delegate(
166
83615
      this, env, host_objects, shared_array_buffers, wasm_modules_);
167
  ValueDeserializer deserializer(
168
      env->isolate(),
169
41818
      reinterpret_cast<const uint8_t*>(main_message_buf_.data),
170
      main_message_buf_.size,
171
125441
      &delegate);
172
41824
  delegate.deserializer = &deserializer;
173
174
  // Attach all transferred ArrayBuffers to their new Isolate.
175
41832
  for (uint32_t i = 0; i < array_buffers_.size(); ++i) {
176
    Local<ArrayBuffer> ab =
177
8
        ArrayBuffer::New(env->isolate(), std::move(array_buffers_[i]));
178
8
    deserializer.TransferArrayBuffer(i, ab);
179
  }
180
41824
  array_buffers_.clear();
181
182
83580
  if (deserializer.ReadHeader(context).IsNothing())
183
    return {};
184
  Local<Value> return_value;
185
83510
  if (!deserializer.ReadValue(context).ToLocal(&return_value))
186
    return {};
187
188
52102
  for (BaseObjectPtr<BaseObject> base_object : host_objects) {
189
20696
    if (base_object->FinalizeTransferRead(context, &deserializer).IsNothing())
190
      return {};
191
  }
192
193
41794
  host_objects.clear();
194
41805
  return handle_scope.Escape(return_value);
195
}
196
197
581
void Message::AddSharedArrayBuffer(
198
    std::shared_ptr<BackingStore> backing_store) {
199
581
  shared_array_buffers_.emplace_back(std::move(backing_store));
200
581
}
201
202
10587
void Message::AddTransferable(std::unique_ptr<TransferData>&& data) {
203
10587
  transferables_.emplace_back(std::move(data));
204
10587
}
205
206
2
uint32_t Message::AddWASMModule(CompiledWasmModule&& mod) {
207
2
  wasm_modules_.emplace_back(std::move(mod));
208
2
  return wasm_modules_.size() - 1;
209
}
210
211
namespace {
212
213
32474
MaybeLocal<Function> GetEmitMessageFunction(Local<Context> context) {
214
32474
  Isolate* isolate = context->GetIsolate();
215
  Local<Object> per_context_bindings;
216
  Local<Value> emit_message_val;
217

129896
  if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
218
97422
      !per_context_bindings->Get(context,
219
97422
                                FIXED_ONE_BYTE_STRING(isolate, "emitMessage"))
220
32474
          .ToLocal(&emit_message_val)) {
221
    return MaybeLocal<Function>();
222
  }
223
32474
  CHECK(emit_message_val->IsFunction());
224
32474
  return emit_message_val.As<Function>();
225
}
226
227
4717
MaybeLocal<Function> GetDOMException(Local<Context> context) {
228
4717
  Isolate* isolate = context->GetIsolate();
229
  Local<Object> per_context_bindings;
230
  Local<Value> domexception_ctor_val;
231

18868
  if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
232
14151
      !per_context_bindings->Get(context,
233
14151
                                FIXED_ONE_BYTE_STRING(isolate, "DOMException"))
234
4717
          .ToLocal(&domexception_ctor_val)) {
235
    return MaybeLocal<Function>();
236
  }
237
4717
  CHECK(domexception_ctor_val->IsFunction());
238
4717
  Local<Function> domexception_ctor = domexception_ctor_val.As<Function>();
239
4717
  return domexception_ctor;
240
}
241
242
14
void ThrowDataCloneException(Local<Context> context, Local<String> message) {
243
14
  Isolate* isolate = context->GetIsolate();
244
  Local<Value> argv[] = {message,
245
42
                         FIXED_ONE_BYTE_STRING(isolate, "DataCloneError")};
246
  Local<Value> exception;
247
  Local<Function> domexception_ctor;
248

56
  if (!GetDOMException(context).ToLocal(&domexception_ctor) ||
249
56
      !domexception_ctor->NewInstance(context, arraysize(argv), argv)
250
14
           .ToLocal(&exception)) {
251
    return;
252
  }
253
14
  isolate->ThrowException(exception);
254
}
255
256
// This tells V8 how to serialize objects that it does not understand
257
// (e.g. C++ objects) into the output buffer, in a way that our own
258
// DeserializerDelegate understands how to unpack.
259
42683
class SerializerDelegate : public ValueSerializer::Delegate {
260
 public:
261
42683
  SerializerDelegate(Environment* env, Local<Context> context, Message* m)
262
42683
      : env_(env), context_(context), msg_(m) {}
263
264
4
  void ThrowDataCloneError(Local<String> message) override {
265
4
    ThrowDataCloneException(context_, message);
266
4
  }
267
268
10587
  Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
269
21174
    if (env_->base_object_ctor_template()->HasInstance(object)) {
270
      return WriteHostObject(
271
10587
          BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(object) });
272
    }
273
274
    ThrowDataCloneError(env_->clone_unsupported_type_str());
275
    return Nothing<bool>();
276
  }
277
278
581
  Maybe<uint32_t> GetSharedArrayBufferId(
279
      Isolate* isolate,
280
      Local<SharedArrayBuffer> shared_array_buffer) override {
281
    uint32_t i;
282
605
    for (i = 0; i < seen_shared_array_buffers_.size(); ++i) {
283
48
      if (PersistentToLocal::Strong(seen_shared_array_buffers_[i]) ==
284
          shared_array_buffer) {
285
        return Just(i);
286
      }
287
    }
288
289
581
    seen_shared_array_buffers_.emplace_back(
290
1162
      Global<SharedArrayBuffer> { isolate, shared_array_buffer });
291
581
    msg_->AddSharedArrayBuffer(shared_array_buffer->GetBackingStore());
292
581
    return Just(i);
293
  }
294
295
2
  Maybe<uint32_t> GetWasmModuleTransferId(
296
      Isolate* isolate, Local<WasmModuleObject> module) override {
297
2
    return Just(msg_->AddWASMModule(module->GetCompiledModule()));
298
  }
299
300
42667
  Maybe<bool> Finish(Local<Context> context) {
301
53254
    for (uint32_t i = 0; i < host_objects_.size(); i++) {
302
21174
      BaseObjectPtr<BaseObject> host_object = std::move(host_objects_[i]);
303
21174
      std::unique_ptr<TransferData> data;
304
10587
      if (i < first_cloned_object_index_)
305
10578
        data = host_object->TransferForMessaging();
306
10587
      if (!data)
307
9
        data = host_object->CloneForMessaging();
308
10587
      if (!data) return Nothing<bool>();
309
21174
      if (data->FinalizeTransferWrite(context, serializer).IsNothing())
310
        return Nothing<bool>();
311
10587
      msg_->AddTransferable(std::move(data));
312
    }
313
42667
    return Just(true);
314
  }
315
316
10581
  inline void AddHostObject(BaseObjectPtr<BaseObject> host_object) {
317
    // Make sure we have not started serializing the value itself yet.
318
10581
    CHECK_EQ(first_cloned_object_index_, SIZE_MAX);
319
10581
    host_objects_.emplace_back(std::move(host_object));
320
10581
  }
321
322
  // Some objects in the transfer list may register sub-objects that can be
323
  // transferred. This could e.g. be a public JS wrapper object, such as a
324
  // FileHandle, that is registering its C++ handle for transfer.
325
42672
  inline Maybe<bool> AddNestedHostObjects() {
326
53252
    for (size_t i = 0; i < host_objects_.size(); i++) {
327
21160
      std::vector<BaseObjectPtr<BaseObject>> nested_transferables;
328
21160
      if (!host_objects_[i]->NestedTransferables().To(&nested_transferables))
329
        return Nothing<bool>();
330

10585
      for (auto nested_transferable : nested_transferables) {
331
10
        if (std::find(host_objects_.begin(),
332
                      host_objects_.end(),
333
10
                      nested_transferable) == host_objects_.end()) {
334
5
          AddHostObject(nested_transferable);
335
        }
336
      }
337
    }
338
42672
    return Just(true);
339
  }
340
341
  ValueSerializer* serializer = nullptr;
342
343
 private:
344
10587
  Maybe<bool> WriteHostObject(BaseObjectPtr<BaseObject> host_object) {
345
10595
    for (uint32_t i = 0; i < host_objects_.size(); i++) {
346
10583
      if (host_objects_[i] == host_object) {
347
10575
        serializer->WriteUint32(i);
348
10575
        return Just(true);
349
      }
350
    }
351
352
12
    BaseObject::TransferMode mode = host_object->GetTransferMode();
353
12
    if (mode == BaseObject::TransferMode::kUntransferable) {
354
1
      ThrowDataCloneError(env_->clone_unsupported_type_str());
355
1
      return Nothing<bool>();
356
11
    } else if (mode == BaseObject::TransferMode::kTransferable) {
357
2
      THROW_ERR_MISSING_TRANSFERABLE_IN_TRANSFER_LIST(env_);
358
2
      return Nothing<bool>();
359
    }
360
361
9
    CHECK_EQ(mode, BaseObject::TransferMode::kCloneable);
362
9
    uint32_t index = host_objects_.size();
363
9
    if (first_cloned_object_index_ == SIZE_MAX)
364
9
      first_cloned_object_index_ = index;
365
9
    serializer->WriteUint32(index);
366
9
    host_objects_.push_back(host_object);
367
9
    return Just(true);
368
  }
369
370
  Environment* env_;
371
  Local<Context> context_;
372
  Message* msg_;
373
  std::vector<Global<SharedArrayBuffer>> seen_shared_array_buffers_;
374
  std::vector<BaseObjectPtr<BaseObject>> host_objects_;
375
  size_t first_cloned_object_index_ = SIZE_MAX;
376
377
  friend class worker::Message;
378
};
379
380
}  // anonymous namespace
381
382
42683
Maybe<bool> Message::Serialize(Environment* env,
383
                               Local<Context> context,
384
                               Local<Value> input,
385
                               const TransferList& transfer_list_v,
386
                               Local<Object> source_port) {
387
85366
  HandleScope handle_scope(env->isolate());
388
  Context::Scope context_scope(context);
389
390
  // Verify that we're not silently overwriting an existing message.
391
42683
  CHECK(main_message_buf_.is_empty());
392
393
85366
  SerializerDelegate delegate(env, context, this);
394
85366
  ValueSerializer serializer(env->isolate(), &delegate);
395
42683
  delegate.serializer = &serializer;
396
397
85366
  std::vector<Local<ArrayBuffer>> array_buffers;
398
53283
  for (uint32_t i = 0; i < transfer_list_v.length(); ++i) {
399
10610
    Local<Value> entry = transfer_list_v[i];
400
    // Currently, we support ArrayBuffers and MessagePorts.
401
10610
    if (entry->IsArrayBuffer()) {
402
25
      Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
403
      // If we cannot render the ArrayBuffer unusable in this Isolate,
404
      // copying the buffer will have to do.
405
      // Note that we can currently transfer ArrayBuffers even if they were
406
      // not allocated by Node’s ArrayBufferAllocator in the first place,
407
      // because we pass the underlying v8::BackingStore around rather than
408
      // raw data *and* an Isolate with a non-default ArrayBuffer allocator
409
      // is always going to outlive any Workers it creates, and so will its
410
      // allocator along with it.
411
49
      if (!ab->IsDetachable()) continue;
412
      // See https://github.com/nodejs/node/pull/30339#issuecomment-552225353
413
      // for details.
414
      bool untransferrable;
415
50
      if (!ab->HasPrivate(
416
              context,
417
25
              env->arraybuffer_untransferable_private_symbol())
418
25
              .To(&untransferrable)) {
419
1
        return Nothing<bool>();
420
      }
421
25
      if (untransferrable) continue;
422
44
      if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
423
44
          array_buffers.end()) {
424
1
        ThrowDataCloneException(
425
            context,
426
            FIXED_ONE_BYTE_STRING(
427
                env->isolate(),
428
1
                "Transfer list contains duplicate ArrayBuffer"));
429
1
        return Nothing<bool>();
430
      }
431
      // We simply use the array index in the `array_buffers` list as the
432
      // ID that we write into the serialized buffer.
433
21
      uint32_t id = array_buffers.size();
434
21
      array_buffers.push_back(ab);
435
21
      serializer.TransferArrayBuffer(id, ab);
436
21
      continue;
437
21170
    } else if (env->base_object_ctor_template()->HasInstance(entry)) {
438
      // Check if the source MessagePort is being transferred.
439

21170
      if (!source_port.IsEmpty() && entry == source_port) {
440
1
        ThrowDataCloneException(
441
            context,
442
            FIXED_ONE_BYTE_STRING(env->isolate(),
443
1
                                  "Transfer list contains source port"));
444
10
        return Nothing<bool>();
445
      }
446
      BaseObjectPtr<BaseObject> host_object {
447
10584
          Unwrap<BaseObject>(entry.As<Object>()) };
448

31754
      if (env->message_port_constructor_template()->HasInstance(entry) &&
449
21157
          (!host_object ||
450
10578
           static_cast<MessagePort*>(host_object.get())->IsDetached())) {
451
7
        ThrowDataCloneException(
452
            context,
453
            FIXED_ONE_BYTE_STRING(
454
                env->isolate(),
455
7
                "MessagePort in transfer list is already detached"));
456
7
        return Nothing<bool>();
457
      }
458
21154
      if (std::find(delegate.host_objects_.begin(),
459
                    delegate.host_objects_.end(),
460
21154
                    host_object) != delegate.host_objects_.end()) {
461
1
        ThrowDataCloneException(
462
            context,
463
            String::Concat(env->isolate(),
464
                FIXED_ONE_BYTE_STRING(
465
                  env->isolate(),
466
                  "Transfer list contains duplicate "),
467
2
                entry.As<Object>()->GetConstructorName()));
468
1
        return Nothing<bool>();
469
      }
470

10576
      if (host_object && host_object->GetTransferMode() !=
471
10576
              BaseObject::TransferMode::kUntransferable) {
472
10576
        delegate.AddHostObject(host_object);
473
10576
        continue;
474
      }
475
    }
476
477
    THROW_ERR_INVALID_TRANSFER_OBJECT(env);
478
    return Nothing<bool>();
479
  }
480
85346
  if (delegate.AddNestedHostObjects().IsNothing())
481
    return Nothing<bool>();
482
483
42673
  serializer.WriteHeader();
484
85345
  if (serializer.WriteValue(context, input).IsNothing()) {
485
6
    return Nothing<bool>();
486
  }
487
488
42680
  for (Local<ArrayBuffer> ab : array_buffers) {
489
    // If serialization succeeded, we render it inaccessible in this Isolate.
490
26
    std::shared_ptr<BackingStore> backing_store = ab->GetBackingStore();
491
13
    ab->Detach();
492
493
13
    array_buffers_.emplace_back(std::move(backing_store));
494
  }
495
496
85334
  if (delegate.Finish(context).IsNothing())
497
    return Nothing<bool>();
498
499
  // The serializer gave us a buffer allocated using `malloc()`.
500
42667
  std::pair<uint8_t*, size_t> data = serializer.Release();
501
42667
  CHECK_NOT_NULL(data.first);
502
  main_message_buf_ =
503
42667
      MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
504
42667
  return Just(true);
505
}
506
507
void Message::MemoryInfo(MemoryTracker* tracker) const {
508
  tracker->TrackField("array_buffers_", array_buffers_);
509
  tracker->TrackField("shared_array_buffers", shared_array_buffers_);
510
  tracker->TrackField("transferables", transferables_);
511
}
512
513
33028
MessagePortData::MessagePortData(MessagePort* owner) : owner_(owner) { }
514
515
98949
MessagePortData::~MessagePortData() {
516
32983
  CHECK_NULL(owner_);
517
32983
  Disentangle();
518
65966
}
519
520
void MessagePortData::MemoryInfo(MemoryTracker* tracker) const {
521
  Mutex::ScopedLock lock(mutex_);
522
  tracker->TrackField("incoming_messages", incoming_messages_);
523
}
524
525
108609
void MessagePortData::AddToIncomingQueue(Message&& message) {
526
  // This function will be called by other threads.
527
217220
  Mutex::ScopedLock lock(mutex_);
528
108612
  incoming_messages_.emplace_back(std::move(message));
529
530
108611
  if (owner_ != nullptr) {
531
52191
    Debug(owner_, "Adding message to incoming queue");
532
52191
    owner_->TriggerAsync();
533
  }
534
108612
}
535
536
11174
void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
537
11174
  CHECK_NULL(a->sibling_);
538
11174
  CHECK_NULL(b->sibling_);
539
11174
  a->sibling_ = b;
540
11174
  b->sibling_ = a;
541
11174
  a->sibling_mutex_ = b->sibling_mutex_;
542
11174
}
543
544
54837
void MessagePortData::Disentangle() {
545
  // Grab a copy of the sibling mutex, then replace it so that each sibling
546
  // has its own sibling_mutex_ now.
547
109678
  std::shared_ptr<Mutex> sibling_mutex = sibling_mutex_;
548
109675
  Mutex::ScopedLock sibling_lock(*sibling_mutex);
549
54841
  sibling_mutex_ = std::make_shared<Mutex>();
550
551
54840
  MessagePortData* sibling = sibling_;
552
54840
  if (sibling_ != nullptr) {
553
11171
    sibling_->sibling_ = nullptr;
554
11171
    sibling_ = nullptr;
555
  }
556
557
  // We close MessagePorts after disentanglement, so we enqueue a corresponding
558
  // message and trigger the corresponding uv_async_t to let them know that
559
  // this happened.
560
54840
  AddToIncomingQueue(Message());
561
54836
  if (sibling != nullptr) {
562
11173
    sibling->AddToIncomingQueue(Message());
563
  }
564
54839
}
565
566
129718
MessagePort::~MessagePort() {
567
32429
  if (data_) Detach();
568
64858
}
569
570
32474
MessagePort::MessagePort(Environment* env,
571
                         Local<Context> context,
572
32474
                         Local<Object> wrap)
573
  : HandleWrap(env,
574
               wrap,
575
32474
               reinterpret_cast<uv_handle_t*>(&async_),
576
               AsyncWrap::PROVIDER_MESSAGEPORT),
577
32474
    data_(new MessagePortData(this)) {
578
92552
  auto onmessage = [](uv_async_t* handle) {
579
    // Called when data has been put into the queue.
580
30039
    MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
581
30039
    channel->OnMessage();
582
92547
  };
583
584
32474
  CHECK_EQ(uv_async_init(env->event_loop(),
585
                         &async_,
586
                         onmessage), 0);
587
  // Reset later to indicate success of the constructor.
588
32474
  bool succeeded = false;
589
97422
  auto cleanup = OnScopeLeave([&]() { if (!succeeded) Close(); });
590
591
  Local<Value> fn;
592
97422
  if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn))
593
    return;
594
595
32474
  if (fn->IsFunction()) {
596
32469
    Local<Function> init = fn.As<Function>();
597
64938
    if (init->Call(context, wrap, 0, nullptr).IsEmpty())
598
      return;
599
  }
600
601
  Local<Function> emit_message_fn;
602
64948
  if (!GetEmitMessageFunction(context).ToLocal(&emit_message_fn))
603
    return;
604
32474
  emit_message_fn_.Reset(env->isolate(), emit_message_fn);
605
606
32474
  succeeded = true;
607
32474
  Debug(this, "Created message port");
608
}
609
610
21155
bool MessagePort::IsDetached() const {
611

21155
  return data_ == nullptr || IsHandleClosing();
612
}
613
614
73627
void MessagePort::TriggerAsync() {
615
73627
  if (IsHandleClosing()) return;
616
73601
  CHECK_EQ(uv_async_send(&async_), 0);
617
}
618
619
32430
void MessagePort::Close(v8::Local<v8::Value> close_callback) {
620
64861
  Debug(this, "Closing message port, data set = %d", static_cast<int>(!!data_));
621
622
32431
  if (data_) {
623
    // Wrap this call with accessing the mutex, so that TriggerAsync()
624
    // can check IsHandleClosing() without race conditions.
625
64851
    Mutex::ScopedLock sibling_lock(data_->mutex_);
626
32426
    HandleWrap::Close(close_callback);
627
  } else {
628
5
    HandleWrap::Close(close_callback);
629
  }
630
32431
}
631
632
2
void MessagePort::New(const FunctionCallbackInfo<Value>& args) {
633
  // This constructor just throws an error. Unfortunately, we can’t use V8’s
634
  // ConstructorBehavior::kThrow, as that also removes the prototype from the
635
  // class (i.e. makes it behave like an arrow function).
636
2
  Environment* env = Environment::GetCurrent(args);
637
2
  THROW_ERR_CONSTRUCT_CALL_INVALID(env);
638
2
}
639
640
32474
MessagePort* MessagePort::New(
641
    Environment* env,
642
    Local<Context> context,
643
    std::unique_ptr<MessagePortData> data) {
644
  Context::Scope context_scope(context);
645
32474
  Local<FunctionTemplate> ctor_templ = GetMessagePortConstructorTemplate(env);
646
647
  // Construct a new instance, then assign the listener instance and possibly
648
  // the MessagePortData to it.
649
  Local<Object> instance;
650
97422
  if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance))
651
    return nullptr;
652
32474
  MessagePort* port = new MessagePort(env, context, instance);
653
32474
  CHECK_NOT_NULL(port);
654
32474
  if (port->IsHandleClosing()) {
655
    // Construction failed with an exception.
656
    return nullptr;
657
  }
658
659
32474
  if (data) {
660
10680
    port->Detach();
661
10680
    port->data_ = std::move(data);
662
663
    // This lock is here to avoid race conditions with the `owner_` read
664
    // in AddToIncomingQueue(). (This would likely be unproblematic without it,
665
    // but it's better to be safe than sorry.)
666
21360
    Mutex::ScopedLock lock(port->data_->mutex_);
667
10680
    port->data_->owner_ = port;
668
    // If the existing MessagePortData object had pending messages, this is
669
    // the easiest way to run that queue.
670
10680
    port->TriggerAsync();
671
  }
672
32474
  return port;
673
}
674
675
72320
MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
676
                                              bool only_if_receiving) {
677
144652
  Message received;
678
  {
679
    // Get the head of the message queue.
680
125238
    Mutex::ScopedLock lock(data_->mutex_);
681
682
72333
    Debug(this, "MessagePort has message");
683
684

72331
    bool wants_message = receiving_messages_ || !only_if_receiving;
685
    // We have nothing to do if:
686
    // - There are no pending messages
687
    // - We are not intending to receive messages, and the message we would
688
    //   receive is not the final "close" message.
689

144667
    if (data_->incoming_messages_.empty() ||
690
63163
        (!wants_message &&
691
10237
         !data_->incoming_messages_.front().IsCloseMessage())) {
692
38826
      return env()->no_message_symbol();
693
    }
694
695
52915
    received = std::move(data_->incoming_messages_.front());
696
52907
    data_->incoming_messages_.pop_front();
697
  }
698
699
52920
  if (received.IsCloseMessage()) {
700
22178
    Close();
701
22178
    return env()->no_message_symbol();
702
  }
703
704
41821
  if (!env()->can_call_into_js()) return MaybeLocal<Value>();
705
706
41829
  return received.Deserialize(env(), context);
707
}
708
709
30553
void MessagePort::OnMessage() {
710
30553
  Debug(this, "Running MessagePort::OnMessage()");
711
61051
  HandleScope handle_scope(env()->isolate());
712
61106
  Local<Context> context = object(env()->isolate())->CreationContext();
713
714
  size_t processing_limit;
715
  {
716
30553
    Mutex::ScopedLock(data_->mutex_);
717
61106
    processing_limit = std::max(data_->incoming_messages_.size(),
718
91659
                                static_cast<size_t>(1000));
719
  }
720
721
  // data_ can only ever be modified by the owner thread, so no need to lock.
722
  // However, the message port may be transferred while it is processing
723
  // messages, so we need to check that this handle still owns its `data_` field
724
  // on every iteration.
725

114107
  while (data_) {
726
72320
    if (processing_limit-- == 0) {
727
      // Prevent event loop starvation by only processing those messages without
728
      // interruption that were already present when the OnMessage() call was
729
      // first triggered, but at least 1000 messages because otherwise the
730
      // overhead of repeatedly triggering the uv_async_t instance becomes
731
      // noticable, at least on Windows.
732
      // (That might require more investigation by somebody more familiar with
733
      // Windows.)
734
1
      TriggerAsync();
735
1
      return;
736
    }
737
738
114096
    HandleScope handle_scope(env()->isolate());
739

41777
    Context::Scope context_scope(context);
740
72315
    Local<Function> emit_message = PersistentToLocal::Strong(emit_message_fn_);
741
742
    Local<Value> payload;
743
    Local<Value> message_error;
744
    {
745
      // Catch any exceptions from parsing the message itself (not from
746
      // emitting it) as 'messageeror' events.
747
144473
      TryCatchScope try_catch(env());
748
144542
      if (!ReceiveMessage(context, true).ToLocal(&payload)) {
749

6
        if (try_catch.HasCaught() && !try_catch.HasTerminated())
750
6
          message_error = try_catch.Exception();
751
6
        goto reschedule;
752
      }
753
    }
754
144479
    if (payload == env()->no_message_symbol()) break;
755
756
41816
    if (!env()->can_call_into_js()) {
757
      Debug(this, "MessagePort drains queue because !can_call_into_js()");
758
      // In this case there is nothing to do but to drain the current queue.
759
      continue;
760
    }
761
762
83624
    if (MakeCallback(emit_message, 1, &payload).IsEmpty()) {
763
    reschedule:
764
49
      if (!message_error.IsEmpty()) {
765
        // This should become a `messageerror` event in the sense of the
766
        // EventTarget API at some point.
767
        Local<Value> argv[] = {
768
          env()->messageerror_string(),
769
          message_error
770
12
        };
771
6
        USE(MakeCallback(env()->emit_string(), arraysize(argv), argv));
772
      }
773
774
      // Re-schedule OnMessage() execution in case of failure.
775
49
      if (data_)
776
49
        TriggerAsync();
777
49
      return;
778
    }
779
  }
780
}
781
782
32428
void MessagePort::OnClose() {
783
32428
  Debug(this, "MessagePort::OnClose()");
784
32430
  if (data_) {
785
    // Detach() returns move(data_).
786
21857
    Detach()->Disentangle();
787
  }
788
32430
}
789
790
43109
std::unique_ptr<MessagePortData> MessagePort::Detach() {
791
43109
  CHECK(data_);
792
86219
  Mutex::ScopedLock lock(data_->mutex_);
793
43109
  data_->owner_ = nullptr;
794
86219
  return std::move(data_);
795
}
796
797
10572
BaseObject::TransferMode MessagePort::GetTransferMode() const {
798
10572
  if (IsDetached())
799
    return BaseObject::TransferMode::kUntransferable;
800
10572
  return BaseObject::TransferMode::kTransferable;
801
}
802
803
10568
std::unique_ptr<TransferData> MessagePort::TransferForMessaging() {
804
21136
  Close();
805
10568
  return Detach();
806
}
807
808
10340
BaseObjectPtr<BaseObject> MessagePortData::Deserialize(
809
    Environment* env,
810
    Local<Context> context,
811
    std::unique_ptr<TransferData> self) {
812
20680
  return BaseObjectPtr<MessagePort> { MessagePort::New(
813
      env, context,
814
31020
      static_unique_pointer_cast<MessagePortData>(std::move(self))) };
815
}
816
817
42682
Maybe<bool> MessagePort::PostMessage(Environment* env,
818
                                     Local<Value> message_v,
819
                                     const TransferList& transfer_v) {
820
42682
  Isolate* isolate = env->isolate();
821
42682
  Local<Object> obj = object(isolate);
822
42682
  Local<Context> context = obj->CreationContext();
823
824
85364
  Message msg;
825
826
  // Per spec, we need to both check if transfer list has the source port, and
827
  // serialize the input message, even if the MessagePort is closed or detached.
828
829
  Maybe<bool> serialization_maybe =
830
42682
      msg.Serialize(env, context, message_v, transfer_v, obj);
831
42682
  if (data_ == nullptr) {
832
2
    return serialization_maybe;
833
  }
834
42680
  if (serialization_maybe.IsNothing()) {
835
13
    return Nothing<bool>();
836
  }
837
838
85334
  Mutex::ScopedLock lock(*data_->sibling_mutex_);
839
42667
  bool doomed = false;
840
841
  // Check if the target port is posted to itself.
842
42667
  if (data_->sibling_ != nullptr) {
843
53186
    for (const auto& transferable : msg.transferables()) {
844
10587
      if (data_->sibling_ == transferable.get()) {
845
1
        doomed = true;
846
        ProcessEmitWarning(env, "The target port was posted to itself, and "
847
1
                                "the communication channel was lost");
848
1
        break;
849
      }
850
    }
851
  }
852
853

42667
  if (data_->sibling_ == nullptr || doomed)
854
68
    return Just(true);
855
856
42599
  data_->sibling_->AddToIncomingQueue(std::move(msg));
857
42598
  return Just(true);
858
}
859
860
10620
static Maybe<bool> ReadIterable(Environment* env,
861
                                Local<Context> context,
862
                                // NOLINTNEXTLINE(runtime/references)
863
                                TransferList& transfer_list,
864
                                Local<Value> object) {
865
10620
  if (!object->IsObject()) return Just(false);
866
867
10617
  if (object->IsArray()) {
868
10597
    Local<Array> arr = object.As<Array>();
869
10597
    size_t length = arr->Length();
870
10597
    transfer_list.AllocateSufficientStorage(length);
871
21205
    for (size_t i = 0; i < length; i++) {
872
31824
      if (!arr->Get(context, i).ToLocal(&transfer_list[i]))
873
        return Nothing<bool>();
874
    }
875
10597
    return Just(true);
876
  }
877
878
20
  Isolate* isolate = env->isolate();
879
  Local<Value> iterator_method;
880
80
  if (!object.As<Object>()->Get(context, Symbol::GetIterator(isolate))
881
20
      .ToLocal(&iterator_method)) return Nothing<bool>();
882
20
  if (!iterator_method->IsFunction()) return Just(false);
883
884
  Local<Value> iterator;
885
18
  if (!iterator_method.As<Function>()->Call(context, object, 0, nullptr)
886
6
      .ToLocal(&iterator)) return Nothing<bool>();
887
6
  if (!iterator->IsObject()) return Just(false);
888
889
  Local<Value> next;
890
24
  if (!iterator.As<Object>()->Get(context, env->next_string()).ToLocal(&next))
891
    return Nothing<bool>();
892
6
  if (!next->IsFunction()) return Just(false);
893
894
6
  std::vector<Local<Value>> entries;
895
7
  while (env->can_call_into_js()) {
896
    Local<Value> result;
897
15
    if (!next.As<Function>()->Call(context, iterator, 0, nullptr)
898
7
        .ToLocal(&result)) return Nothing<bool>();
899
4
    if (!result->IsObject()) return Just(false);
900
901
    Local<Value> done;
902
16
    if (!result.As<Object>()->Get(context, env->done_string()).ToLocal(&done))
903
      return Nothing<bool>();
904
4
    if (done->BooleanValue(isolate)) break;
905
906
    Local<Value> val;
907
8
    if (!result.As<Object>()->Get(context, env->value_string()).ToLocal(&val))
908
      return Nothing<bool>();
909
2
    entries.push_back(val);
910
  }
911
912
2
  transfer_list.AllocateSufficientStorage(entries.size());
913
2
  std::copy(entries.begin(), entries.end(), &transfer_list[0]);
914
2
  return Just(true);
915
}
916
917
42695
void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
918
42695
  Environment* env = Environment::GetCurrent(args);
919
42695
  Local<Object> obj = args.This();
920
42695
  Local<Context> context = obj->CreationContext();
921
922
42695
  if (args.Length() == 0) {
923
    return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
924
13
                                       "MessagePort.postMessage");
925
  }
926
927

149311
  if (!args[1]->IsNullOrUndefined() && !args[1]->IsObject()) {
928
    // Browsers ignore null or undefined, and otherwise accept an array or an
929
    // options object.
930
    return THROW_ERR_INVALID_ARG_TYPE(env,
931
4
        "Optional transferList argument must be an iterable");
932
  }
933
934
85373
  TransferList transfer_list;
935
85382
  if (args[1]->IsObject()) {
936
    bool was_iterable;
937
21218
    if (!ReadIterable(env, context, transfer_list, args[1]).To(&was_iterable))
938
8
      return;
939
10609
    if (!was_iterable) {
940
      Local<Value> transfer_option;
941
65
      if (!args[1].As<Object>()->Get(context, env->transfer_string())
942
21
          .ToLocal(&transfer_option)) return;
943
26
      if (!transfer_option->IsUndefined()) {
944
22
        if (!ReadIterable(env, context, transfer_list, transfer_option)
945
12
            .To(&was_iterable)) return;
946
10
        if (!was_iterable) {
947
          return THROW_ERR_INVALID_ARG_TYPE(env,
948
7
              "Optional options.transfer argument must be an iterable");
949
        }
950
      }
951
    }
952
  }
953
954
42683
  MessagePort* port = Unwrap<MessagePort>(args.This());
955
  // Even if the backing MessagePort object has already been deleted, we still
956
  // want to serialize the message to ensure spec-compliant behavior w.r.t.
957
  // transfers.
958
42683
  if (port == nullptr) {
959
2
    Message msg;
960
1
    USE(msg.Serialize(env, context, args[0], transfer_list, obj));
961
1
    return;
962
  }
963
964
42682
  port->PostMessage(env, args[0], transfer_list);
965
}
966
967
22787
void MessagePort::Start() {
968
22787
  Debug(this, "Start receiving messages");
969
22787
  receiving_messages_ = true;
970
45574
  Mutex::ScopedLock lock(data_->mutex_);
971
22787
  if (!data_->incoming_messages_.empty())
972
10707
    TriggerAsync();
973
22787
}
974
975
20282
void MessagePort::Stop() {
976
20282
  Debug(this, "Stop receiving messages");
977
20282
  receiving_messages_ = false;
978
20282
}
979
980
22788
void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
981
  MessagePort* port;
982
22789
  ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
983
22788
  if (!port->data_) {
984
1
    return;
985
  }
986
22787
  port->Start();
987
}
988
989
20410
void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
990
  MessagePort* port;
991
40820
  CHECK(args[0]->IsObject());
992
40948
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
993
20284
  if (!port->data_) {
994
2
    return;
995
  }
996
20282
  port->Stop();
997
}
998
999
1052
void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
1000
  MessagePort* port;
1001
2104
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
1002
514
  port->OnMessage();
1003
}
1004
1005
11
void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) {
1006
11
  Environment* env = Environment::GetCurrent(args);
1007

41
  if (!args[0]->IsObject() ||
1008
27
      !env->message_port_constructor_template()->HasInstance(args[0])) {
1009
    return THROW_ERR_INVALID_ARG_TYPE(env,
1010
10
        "The \"port\" argument must be a MessagePort instance");
1011
  }
1012
12
  MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1013
6
  if (port == nullptr) {
1014
    // Return 'no messages' for a closed port.
1015
    args.GetReturnValue().Set(
1016
        Environment::GetCurrent(args)->no_message_symbol());
1017
    return;
1018
  }
1019
1020
  MaybeLocal<Value> payload =
1021
12
      port->ReceiveMessage(port->object()->CreationContext(), false);
1022
6
  if (!payload.IsEmpty())
1023
12
    args.GetReturnValue().Set(payload.ToLocalChecked());
1024
}
1025
1026
5
void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
1027
5
  Environment* env = Environment::GetCurrent(args);
1028

20
  if (!args[0]->IsObject() ||
1029
15
      !env->message_port_constructor_template()->HasInstance(args[0])) {
1030
    return THROW_ERR_INVALID_ARG_TYPE(env,
1031
        "The \"port\" argument must be a MessagePort instance");
1032
  }
1033
10
  MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1034
5
  CHECK_NOT_NULL(port);
1035
1036
5
  Local<Value> context_arg = args[1];
1037
  ContextifyContext* context_wrapper;
1038

15
  if (!context_arg->IsObject() ||
1039
      (context_wrapper = ContextifyContext::ContextFromContextifiedSandbox(
1040
10
          env, context_arg.As<Object>())) == nullptr) {
1041
    return THROW_ERR_INVALID_ARG_TYPE(env, "Invalid context argument");
1042
  }
1043
1044
10
  std::unique_ptr<MessagePortData> data;
1045
5
  if (!port->IsDetached())
1046
5
    data = port->Detach();
1047
1048
5
  Context::Scope context_scope(context_wrapper->context());
1049
  MessagePort* target =
1050
5
      MessagePort::New(env, context_wrapper->context(), std::move(data));
1051
5
  if (target != nullptr)
1052
15
    args.GetReturnValue().Set(target->object());
1053
}
1054
1055
10620
void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
1056
10620
  Entangle(a, b->data_.get());
1057
10620
}
1058
1059
11174
void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
1060
11174
  MessagePortData::Entangle(a->data_.get(), b);
1061
11174
}
1062
1063
void MessagePort::MemoryInfo(MemoryTracker* tracker) const {
1064
  tracker->TrackField("data", data_);
1065
  tracker->TrackField("emit_message_fn", emit_message_fn_);
1066
}
1067
1068
41880
Local<FunctionTemplate> GetMessagePortConstructorTemplate(Environment* env) {
1069
  // Factor generating the MessagePort JS constructor into its own piece
1070
  // of code, because it is needed early on in the child environment setup.
1071
41880
  Local<FunctionTemplate> templ = env->message_port_constructor_template();
1072
41880
  if (!templ.IsEmpty())
1073
37177
    return templ;
1074
1075
  {
1076
4703
    Local<FunctionTemplate> m = env->NewFunctionTemplate(MessagePort::New);
1077
9406
    m->SetClassName(env->message_port_constructor_string());
1078
14109
    m->InstanceTemplate()->SetInternalFieldCount(
1079
4703
        MessagePort::kInternalFieldCount);
1080
9406
    m->Inherit(HandleWrap::GetConstructorTemplate(env));
1081
1082
4703
    env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage);
1083
4703
    env->SetProtoMethod(m, "start", MessagePort::Start);
1084
1085
4703
    env->set_message_port_constructor_template(m);
1086
  }
1087
1088
4703
  return GetMessagePortConstructorTemplate(env);
1089
}
1090
1091
291
JSTransferable::JSTransferable(Environment* env, Local<Object> obj)
1092
291
    : BaseObject(env, obj) {
1093
291
  MakeWeak();
1094
291
}
1095
1096
291
void JSTransferable::New(const FunctionCallbackInfo<Value>& args) {
1097
291
  CHECK(args.IsConstructCall());
1098
582
  new JSTransferable(Environment::GetCurrent(args), args.This());
1099
291
}
1100
1101
6
JSTransferable::TransferMode JSTransferable::GetTransferMode() const {
1102
  // Implement `kClone in this ? kCloneable : kTransferable`.
1103
12
  HandleScope handle_scope(env()->isolate());
1104
12
  errors::TryCatchScope ignore_exceptions(env());
1105
1106
  bool has_clone;
1107
24
  if (!object()->Has(env()->context(),
1108
30
                     env()->messaging_clone_symbol()).To(&has_clone)) {
1109
    return TransferMode::kUntransferable;
1110
  }
1111
1112
6
  return has_clone ? TransferMode::kCloneable : TransferMode::kTransferable;
1113
}
1114
1115
5
std::unique_ptr<TransferData> JSTransferable::TransferForMessaging() {
1116
5
  return TransferOrClone(TransferMode::kTransferable);
1117
}
1118
1119
std::unique_ptr<TransferData> JSTransferable::CloneForMessaging() const {
1120
  return TransferOrClone(TransferMode::kCloneable);
1121
}
1122
1123
5
std::unique_ptr<TransferData> JSTransferable::TransferOrClone(
1124
    TransferMode mode) const {
1125
  // Call `this[symbol]()` where `symbol` is `kClone` or `kTransfer`,
1126
  // which should return an object with `data` and `deserializeInfo` properties;
1127
  // `data` is written to the serializer later, and `deserializeInfo` is stored
1128
  // on the `TransferData` instance as a string.
1129
10
  HandleScope handle_scope(env()->isolate());
1130
5
  Local<Context> context = env()->isolate()->GetCurrentContext();
1131
  Local<Symbol> method_name = mode == TransferMode::kCloneable ?
1132
5
      env()->messaging_clone_symbol() : env()->messaging_transfer_symbol();
1133
1134
  Local<Value> method;
1135
15
  if (!object()->Get(context, method_name).ToLocal(&method)) {
1136
    return {};
1137
  }
1138
5
  if (method->IsFunction()) {
1139
    Local<Value> result_v;
1140
20
    if (!method.As<Function>()->Call(
1141
20
            context, object(), 0, nullptr).ToLocal(&result_v)) {
1142
5
      return {};
1143
    }
1144
1145
5
    if (result_v->IsObject()) {
1146
5
      Local<Object> result = result_v.As<Object>();
1147
      Local<Value> data;
1148
      Local<Value> deserialize_info;
1149

25
      if (!result->Get(context, env()->data_string()).ToLocal(&data) ||
1150
20
          !result->Get(context, env()->deserialize_info_string())
1151
5
              .ToLocal(&deserialize_info)) {
1152
        return {};
1153
      }
1154
10
      Utf8Value deserialize_info_str(env()->isolate(), deserialize_info);
1155
5
      if (*deserialize_info_str == nullptr) return {};
1156
10
      return std::make_unique<Data>(
1157
25
          *deserialize_info_str, Global<Value>(env()->isolate(), data));
1158
    }
1159
  }
1160
1161
  if (mode == TransferMode::kTransferable)
1162
    return TransferOrClone(TransferMode::kCloneable);
1163
  else
1164
    return {};
1165
}
1166
1167
Maybe<BaseObjectList>
1168
5
JSTransferable::NestedTransferables() const {
1169
  // Call `this[kTransferList]()` and return the resulting list of BaseObjects.
1170
10
  HandleScope handle_scope(env()->isolate());
1171
5
  Local<Context> context = env()->isolate()->GetCurrentContext();
1172
5
  Local<Symbol> method_name = env()->messaging_transfer_list_symbol();
1173
1174
  Local<Value> method;
1175
15
  if (!object()->Get(context, method_name).ToLocal(&method)) {
1176
    return Nothing<BaseObjectList>();
1177
  }
1178
5
  if (!method->IsFunction()) return Just(BaseObjectList {});
1179
1180
  Local<Value> list_v;
1181
20
  if (!method.As<Function>()->Call(
1182
20
          context, object(), 0, nullptr).ToLocal(&list_v)) {
1183
    return Nothing<BaseObjectList>();
1184
  }
1185
5
  if (!list_v->IsArray()) return Just(BaseObjectList {});
1186
5
  Local<Array> list = list_v.As<Array>();
1187
1188
10
  BaseObjectList ret;
1189
20
  for (size_t i = 0; i < list->Length(); i++) {
1190
    Local<Value> value;
1191
15
    if (!list->Get(context, i).ToLocal(&value))
1192
      return Nothing<BaseObjectList>();
1193
10
    if (env()->base_object_ctor_template()->HasInstance(value))
1194
5
      ret.emplace_back(Unwrap<BaseObject>(value));
1195
  }
1196
5
  return Just(ret);
1197
}
1198
1199
1
Maybe<bool> JSTransferable::FinalizeTransferRead(
1200
    Local<Context> context, ValueDeserializer* deserializer) {
1201
  // Call `this[kDeserialize](data)` where `data` comes from the return value
1202
  // of `this[kTransfer]()` or `this[kClone]()`.
1203
2
  HandleScope handle_scope(env()->isolate());
1204
  Local<Value> data;
1205
2
  if (!deserializer->ReadValue(context).ToLocal(&data)) return Nothing<bool>();
1206
1207
1
  Local<Symbol> method_name = env()->messaging_deserialize_symbol();
1208
  Local<Value> method;
1209
3
  if (!object()->Get(context, method_name).ToLocal(&method)) {
1210
    return Nothing<bool>();
1211
  }
1212
1
  if (!method->IsFunction()) return Just(true);
1213
1214
4
  if (method.As<Function>()->Call(context, object(), 1, &data).IsEmpty()) {
1215
    return Nothing<bool>();
1216
  }
1217
1
  return Just(true);
1218
}
1219
1220
5
JSTransferable::Data::Data(std::string&& deserialize_info,
1221
5
                           v8::Global<v8::Value>&& data)
1222
5
    : deserialize_info_(std::move(deserialize_info)),
1223
15
      data_(std::move(data)) {}
1224
1225
4
BaseObjectPtr<BaseObject> JSTransferable::Data::Deserialize(
1226
    Environment* env,
1227
    Local<Context> context,
1228
    std::unique_ptr<TransferData> self) {
1229
  // Create the JS wrapper object that will later be filled with data passed to
1230
  // the `[kDeserialize]()` method on it. This split is necessary, because here
1231
  // we need to create an object with the right prototype and internal fields,
1232
  // but the actual JS data stored in the serialized data can only be read at
1233
  // the end of the stream, after the main message has been read.
1234
1235
8
  if (context != env->context()) {
1236
1
    THROW_ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE(env);
1237
1
    return {};
1238
  }
1239
6
  HandleScope handle_scope(env->isolate());
1240
  Local<Value> info;
1241
6
  if (!ToV8Value(context, deserialize_info_).ToLocal(&info)) return {};
1242
1243
  Local<Value> ret;
1244
6
  CHECK(!env->messaging_deserialize_create_object().IsEmpty());
1245
18
  if (!env->messaging_deserialize_create_object()->Call(
1246

16
          context, Null(env->isolate()), 1, &info).ToLocal(&ret) ||
1247
5
      !env->base_object_ctor_template()->HasInstance(ret)) {
1248
2
    return {};
1249
  }
1250
1251
1
  return BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(ret) };
1252
}
1253
1254
5
Maybe<bool> JSTransferable::Data::FinalizeTransferWrite(
1255
    Local<Context> context, ValueSerializer* serializer) {
1256
10
  HandleScope handle_scope(context->GetIsolate());
1257
5
  auto ret = serializer->WriteValue(context, PersistentToLocal::Strong(data_));
1258
5
  data_.Reset();
1259
10
  return ret;
1260
}
1261
1262
namespace {
1263
1264
4703
static void SetDeserializerCreateObjectFunction(
1265
    const FunctionCallbackInfo<Value>& args) {
1266
4703
  Environment* env = Environment::GetCurrent(args);
1267
9406
  CHECK(args[0]->IsFunction());
1268
9406
  env->set_messaging_deserialize_create_object(args[0].As<Function>());
1269
4703
}
1270
1271
10621
static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
1272
10621
  Environment* env = Environment::GetCurrent(args);
1273
10621
  if (!args.IsConstructCall()) {
1274
1
    THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
1275
1
    return;
1276
  }
1277
1278
21240
  Local<Context> context = args.This()->CreationContext();
1279
10620
  Context::Scope context_scope(context);
1280
1281
10620
  MessagePort* port1 = MessagePort::New(env, context);
1282
10620
  if (port1 == nullptr) return;
1283
10620
  MessagePort* port2 = MessagePort::New(env, context);
1284
10620
  if (port2 == nullptr) {
1285
    port1->Close();
1286
    return;
1287
  }
1288
1289
10620
  MessagePort::Entangle(port1, port2);
1290
1291
53100
  args.This()->Set(context, env->port1_string(), port1->object())
1292
      .Check();
1293
53100
  args.This()->Set(context, env->port2_string(), port2->object())
1294
      .Check();
1295
}
1296
1297
4703
static void InitMessaging(Local<Object> target,
1298
                          Local<Value> unused,
1299
                          Local<Context> context,
1300
                          void* priv) {
1301
4703
  Environment* env = Environment::GetCurrent(context);
1302
1303
  {
1304
    Local<String> message_channel_string =
1305
4703
        FIXED_ONE_BYTE_STRING(env->isolate(), "MessageChannel");
1306
4703
    Local<FunctionTemplate> templ = env->NewFunctionTemplate(MessageChannel);
1307
4703
    templ->SetClassName(message_channel_string);
1308
9406
    target->Set(context,
1309
                message_channel_string,
1310
14109
                templ->GetFunction(context).ToLocalChecked()).Check();
1311
  }
1312
1313
  {
1314
    Local<String> js_transferable_string =
1315
4703
        FIXED_ONE_BYTE_STRING(env->isolate(), "JSTransferable");
1316
4703
    Local<FunctionTemplate> t = env->NewFunctionTemplate(JSTransferable::New);
1317
9406
    t->Inherit(BaseObject::GetConstructorTemplate(env));
1318
4703
    t->SetClassName(js_transferable_string);
1319
14109
    t->InstanceTemplate()->SetInternalFieldCount(
1320
4703
        JSTransferable::kInternalFieldCount);
1321
9406
    target->Set(context,
1322
                js_transferable_string,
1323
14109
                t->GetFunction(context).ToLocalChecked()).Check();
1324
  }
1325
1326
9406
  target->Set(context,
1327
              env->message_port_constructor_string(),
1328
9406
              GetMessagePortConstructorTemplate(env)
1329
18812
                  ->GetFunction(context).ToLocalChecked()).Check();
1330
1331
  // These are not methods on the MessagePort prototype, because
1332
  // the browser equivalents do not provide them.
1333
4703
  env->SetMethod(target, "stopMessagePort", MessagePort::Stop);
1334
4703
  env->SetMethod(target, "drainMessagePort", MessagePort::Drain);
1335
4703
  env->SetMethod(target, "receiveMessageOnPort", MessagePort::ReceiveMessage);
1336
  env->SetMethod(target, "moveMessagePortToContext",
1337
4703
                 MessagePort::MoveToContext);
1338
  env->SetMethod(target, "setDeserializerCreateObjectFunction",
1339
4703
                 SetDeserializerCreateObjectFunction);
1340
1341
  {
1342
9406
    Local<Function> domexception = GetDOMException(context).ToLocalChecked();
1343
    target
1344
9406
        ->Set(context,
1345
              FIXED_ONE_BYTE_STRING(env->isolate(), "DOMException"),
1346
14109
              domexception)
1347
        .Check();
1348
  }
1349
4703
}
1350
1351
}  // anonymous namespace
1352
1353
}  // namespace worker
1354
}  // namespace node
1355
1356
4398
NODE_MODULE_CONTEXT_AWARE_INTERNAL(messaging, node::worker::InitMessaging)