GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/node_messaging.cc Lines: 646 709 91.1 %
Date: 2020-07-19 22:14:24 Branches: 314 441 71.2 %

Line Branch Exec Source
1
#include "node_messaging.h"
2
3
#include "async_wrap-inl.h"
4
#include "debug_utils-inl.h"
5
#include "memory_tracker-inl.h"
6
#include "node_buffer.h"
7
#include "node_contextify.h"
8
#include "node_errors.h"
9
#include "node_external_reference.h"
10
#include "node_process.h"
11
#include "util-inl.h"
12
13
using node::contextify::ContextifyContext;
14
using node::errors::TryCatchScope;
15
using v8::Array;
16
using v8::ArrayBuffer;
17
using v8::BackingStore;
18
using v8::CompiledWasmModule;
19
using v8::Context;
20
using v8::EscapableHandleScope;
21
using v8::Function;
22
using v8::FunctionCallbackInfo;
23
using v8::FunctionTemplate;
24
using v8::Global;
25
using v8::HandleScope;
26
using v8::Isolate;
27
using v8::Just;
28
using v8::Local;
29
using v8::Maybe;
30
using v8::MaybeLocal;
31
using v8::Nothing;
32
using v8::Object;
33
using v8::SharedArrayBuffer;
34
using v8::String;
35
using v8::Symbol;
36
using v8::Value;
37
using v8::ValueDeserializer;
38
using v8::ValueSerializer;
39
using v8::WasmModuleObject;
40
41
namespace node {
42
43
using BaseObjectList = std::vector<BaseObjectPtr<BaseObject>>;
44
45
1
BaseObject::TransferMode BaseObject::GetTransferMode() const {
46
1
  return BaseObject::TransferMode::kUntransferable;
47
}
48
49
std::unique_ptr<worker::TransferData> BaseObject::TransferForMessaging() {
50
  return CloneForMessaging();
51
}
52
53
std::unique_ptr<worker::TransferData> BaseObject::CloneForMessaging() const {
54
  return {};
55
}
56
57
10583
Maybe<BaseObjectList> BaseObject::NestedTransferables() const {
58
10583
  return Just(BaseObjectList {});
59
}
60
61
10352
Maybe<bool> BaseObject::FinalizeTransferRead(
62
    Local<Context> context, ValueDeserializer* deserializer) {
63
10352
  return Just(true);
64
}
65
66
namespace worker {
67
68
10590
Maybe<bool> TransferData::FinalizeTransferWrite(
69
    Local<Context> context, ValueSerializer* serializer) {
70
10590
  return Just(true);
71
}
72
73
181950
Message::Message(MallocedBuffer<char>&& buffer)
74
181950
    : main_message_buf_(std::move(buffer)) {}
75
76
105248
bool Message::IsCloseMessage() const {
77
105248
  return main_message_buf_.data == nullptr;
78
}
79
80
namespace {
81
82
// This is used to tell V8 how to read transferred host objects, like other
83
// `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
84
41902
class DeserializerDelegate : public ValueDeserializer::Delegate {
85
 public:
86
41907
  DeserializerDelegate(
87
      Message* m,
88
      Environment* env,
89
      const std::vector<BaseObjectPtr<BaseObject>>& host_objects,
90
      const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
91
      const std::vector<CompiledWasmModule>& wasm_modules)
92
41907
      : host_objects_(host_objects),
93
        shared_array_buffers_(shared_array_buffers),
94
41907
        wasm_modules_(wasm_modules) {}
95
96
10353
  MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
97
    // Identifying the index in the message's BaseObject array is sufficient.
98
    uint32_t id;
99
10353
    if (!deserializer->ReadUint32(&id))
100
      return MaybeLocal<Object>();
101
10353
    CHECK_LE(id, host_objects_.size());
102
20706
    return host_objects_[id]->object(isolate);
103
  }
104
105
363
  MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
106
      Isolate* isolate, uint32_t clone_id) override {
107
363
    CHECK_LE(clone_id, shared_array_buffers_.size());
108
726
    return shared_array_buffers_[clone_id];
109
  }
110
111
2
  MaybeLocal<WasmModuleObject> GetWasmModuleFromId(
112
      Isolate* isolate, uint32_t transfer_id) override {
113
2
    CHECK_LE(transfer_id, wasm_modules_.size());
114
    return WasmModuleObject::FromCompiledModule(
115
2
        isolate, wasm_modules_[transfer_id]);
116
  }
117
118
  ValueDeserializer* deserializer = nullptr;
119
120
 private:
121
  const std::vector<BaseObjectPtr<BaseObject>>& host_objects_;
122
  const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
123
  const std::vector<CompiledWasmModule>& wasm_modules_;
124
};
125
126
}  // anonymous namespace
127
128
41909
MaybeLocal<Value> Message::Deserialize(Environment* env,
129
                                       Local<Context> context) {
130
41909
  CHECK(!IsCloseMessage());
131
132
41949
  EscapableHandleScope handle_scope(env->isolate());
133
  Context::Scope context_scope(context);
134
135
  // Create all necessary objects for transferables, e.g. MessagePort handles.
136
83935
  std::vector<BaseObjectPtr<BaseObject>> host_objects(transferables_.size());
137
41891
  auto cleanup = OnScopeLeave([&]() {
138
41891
    for (BaseObjectPtr<BaseObject> object : host_objects) {
139
9
      if (!object) continue;
140
141
      // If the function did not finish successfully, host_objects will contain
142
      // a list of objects that will never be passed to JS. Therefore, we
143
      // destroy them here.
144
      object->Detach();
145
    }
146
125971
  });
147
148
52347
  for (uint32_t i = 0; i < transferables_.size(); ++i) {
149
10359
    TransferData* data = transferables_[i].get();
150
20718
    host_objects[i] = data->Deserialize(
151
20718
        env, context, std::move(transferables_[i]));
152
10365
    if (!host_objects[i]) return {};
153
  }
154
41920
  transferables_.clear();
155
156
83912
  std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
157
  // Attach all transferred SharedArrayBuffers to their new Isolate.
158
42355
  for (uint32_t i = 0; i < shared_array_buffers_.size(); ++i) {
159
    Local<SharedArrayBuffer> sab =
160
        SharedArrayBuffer::New(env->isolate(),
161
363
                               std::move(shared_array_buffers_[i]));
162
363
    shared_array_buffers.push_back(sab);
163
  }
164
41878
  shared_array_buffers_.clear();
165
166
  DeserializerDelegate delegate(
167
83938
      this, env, host_objects, shared_array_buffers, wasm_modules_);
168
  ValueDeserializer deserializer(
169
      env->isolate(),
170
41928
      reinterpret_cast<const uint8_t*>(main_message_buf_.data),
171
      main_message_buf_.size,
172
125792
      &delegate);
173
41993
  delegate.deserializer = &deserializer;
174
175
  // Attach all transferred ArrayBuffers to their new Isolate.
176
42002
  for (uint32_t i = 0; i < array_buffers_.size(); ++i) {
177
    Local<ArrayBuffer> ab =
178
9
        ArrayBuffer::New(env->isolate(), std::move(array_buffers_[i]));
179
9
    deserializer.TransferArrayBuffer(i, ab);
180
  }
181
41993
  array_buffers_.clear();
182
183
83989
  if (deserializer.ReadHeader(context).IsNothing())
184
    return {};
185
  Local<Value> return_value;
186
83965
  if (!deserializer.ReadValue(context).ToLocal(&return_value))
187
    return {};
188
189
52324
  for (BaseObjectPtr<BaseObject> base_object : host_objects) {
190
20706
    if (base_object->FinalizeTransferRead(context, &deserializer).IsNothing())
191
      return {};
192
  }
193
194
41959
  host_objects.clear();
195
41936
  return handle_scope.Escape(return_value);
196
}
197
198
589
void Message::AddSharedArrayBuffer(
199
    std::shared_ptr<BackingStore> backing_store) {
200
589
  shared_array_buffers_.emplace_back(std::move(backing_store));
201
589
}
202
203
10595
void Message::AddTransferable(std::unique_ptr<TransferData>&& data) {
204
10595
  transferables_.emplace_back(std::move(data));
205
10595
}
206
207
2
uint32_t Message::AddWASMModule(CompiledWasmModule&& mod) {
208
2
  wasm_modules_.emplace_back(std::move(mod));
209
2
  return wasm_modules_.size() - 1;
210
}
211
212
namespace {
213
214
32516
MaybeLocal<Function> GetEmitMessageFunction(Local<Context> context) {
215
32516
  Isolate* isolate = context->GetIsolate();
216
  Local<Object> per_context_bindings;
217
  Local<Value> emit_message_val;
218

130064
  if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
219
97548
      !per_context_bindings->Get(context,
220
97548
                                FIXED_ONE_BYTE_STRING(isolate, "emitMessage"))
221
32516
          .ToLocal(&emit_message_val)) {
222
    return MaybeLocal<Function>();
223
  }
224
32516
  CHECK(emit_message_val->IsFunction());
225
32516
  return emit_message_val.As<Function>();
226
}
227
228
398
MaybeLocal<Function> GetDOMException(Local<Context> context) {
229
398
  Isolate* isolate = context->GetIsolate();
230
  Local<Object> per_context_bindings;
231
  Local<Value> domexception_ctor_val;
232

1592
  if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
233
1194
      !per_context_bindings->Get(context,
234
1194
                                FIXED_ONE_BYTE_STRING(isolate, "DOMException"))
235
398
          .ToLocal(&domexception_ctor_val)) {
236
    return MaybeLocal<Function>();
237
  }
238
398
  CHECK(domexception_ctor_val->IsFunction());
239
398
  Local<Function> domexception_ctor = domexception_ctor_val.As<Function>();
240
398
  return domexception_ctor;
241
}
242
243
14
void ThrowDataCloneException(Local<Context> context, Local<String> message) {
244
14
  Isolate* isolate = context->GetIsolate();
245
  Local<Value> argv[] = {message,
246
42
                         FIXED_ONE_BYTE_STRING(isolate, "DataCloneError")};
247
  Local<Value> exception;
248
  Local<Function> domexception_ctor;
249

56
  if (!GetDOMException(context).ToLocal(&domexception_ctor) ||
250
56
      !domexception_ctor->NewInstance(context, arraysize(argv), argv)
251
14
           .ToLocal(&exception)) {
252
    return;
253
  }
254
14
  isolate->ThrowException(exception);
255
}
256
257
// This tells V8 how to serialize objects that it does not understand
258
// (e.g. C++ objects) into the output buffer, in a way that our own
259
// DeserializerDelegate understands how to unpack.
260
42865
class SerializerDelegate : public ValueSerializer::Delegate {
261
 public:
262
42865
  SerializerDelegate(Environment* env, Local<Context> context, Message* m)
263
42865
      : env_(env), context_(context), msg_(m) {}
264
265
4
  void ThrowDataCloneError(Local<String> message) override {
266
4
    ThrowDataCloneException(context_, message);
267
4
  }
268
269
10596
  Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
270
21192
    if (env_->base_object_ctor_template()->HasInstance(object)) {
271
      return WriteHostObject(
272
10596
          BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(object) });
273
    }
274
275
    ThrowDataCloneError(env_->clone_unsupported_type_str());
276
    return Nothing<bool>();
277
  }
278
279
589
  Maybe<uint32_t> GetSharedArrayBufferId(
280
      Isolate* isolate,
281
      Local<SharedArrayBuffer> shared_array_buffer) override {
282
    uint32_t i;
283
613
    for (i = 0; i < seen_shared_array_buffers_.size(); ++i) {
284
48
      if (PersistentToLocal::Strong(seen_shared_array_buffers_[i]) ==
285
          shared_array_buffer) {
286
        return Just(i);
287
      }
288
    }
289
290
589
    seen_shared_array_buffers_.emplace_back(
291
1178
      Global<SharedArrayBuffer> { isolate, shared_array_buffer });
292
589
    msg_->AddSharedArrayBuffer(shared_array_buffer->GetBackingStore());
293
589
    return Just(i);
294
  }
295
296
2
  Maybe<uint32_t> GetWasmModuleTransferId(
297
      Isolate* isolate, Local<WasmModuleObject> module) override {
298
2
    return Just(msg_->AddWASMModule(module->GetCompiledModule()));
299
  }
300
301
42848
  Maybe<bool> Finish(Local<Context> context) {
302
53443
    for (uint32_t i = 0; i < host_objects_.size(); i++) {
303
21190
      BaseObjectPtr<BaseObject> host_object = std::move(host_objects_[i]);
304
21190
      std::unique_ptr<TransferData> data;
305
10595
      if (i < first_cloned_object_index_)
306
10586
        data = host_object->TransferForMessaging();
307
10595
      if (!data)
308
9
        data = host_object->CloneForMessaging();
309
10595
      if (!data) return Nothing<bool>();
310
21190
      if (data->FinalizeTransferWrite(context, serializer).IsNothing())
311
        return Nothing<bool>();
312
10595
      msg_->AddTransferable(std::move(data));
313
    }
314
42848
    return Just(true);
315
  }
316
317
10589
  inline void AddHostObject(BaseObjectPtr<BaseObject> host_object) {
318
    // Make sure we have not started serializing the value itself yet.
319
10589
    CHECK_EQ(first_cloned_object_index_, SIZE_MAX);
320
10589
    host_objects_.emplace_back(std::move(host_object));
321
10589
  }
322
323
  // Some objects in the transfer list may register sub-objects that can be
324
  // transferred. This could e.g. be a public JS wrapper object, such as a
325
  // FileHandle, that is registering its C++ handle for transfer.
326
42855
  inline Maybe<bool> AddNestedHostObjects() {
327
53443
    for (size_t i = 0; i < host_objects_.size(); i++) {
328
21176
      std::vector<BaseObjectPtr<BaseObject>> nested_transferables;
329
21176
      if (!host_objects_[i]->NestedTransferables().To(&nested_transferables))
330
        return Nothing<bool>();
331

10593
      for (auto nested_transferable : nested_transferables) {
332
10
        if (std::find(host_objects_.begin(),
333
                      host_objects_.end(),
334
10
                      nested_transferable) == host_objects_.end()) {
335
5
          AddHostObject(nested_transferable);
336
        }
337
      }
338
    }
339
42855
    return Just(true);
340
  }
341
342
  ValueSerializer* serializer = nullptr;
343
344
 private:
345
10596
  Maybe<bool> WriteHostObject(BaseObjectPtr<BaseObject> host_object) {
346
10604
    for (uint32_t i = 0; i < host_objects_.size(); i++) {
347
10591
      if (host_objects_[i] == host_object) {
348
10583
        serializer->WriteUint32(i);
349
10583
        return Just(true);
350
      }
351
    }
352
353
13
    BaseObject::TransferMode mode = host_object->GetTransferMode();
354
13
    if (mode == BaseObject::TransferMode::kUntransferable) {
355
1
      ThrowDataCloneError(env_->clone_unsupported_type_str());
356
1
      return Nothing<bool>();
357
12
    } else if (mode == BaseObject::TransferMode::kTransferable) {
358
3
      THROW_ERR_MISSING_TRANSFERABLE_IN_TRANSFER_LIST(env_);
359
3
      return Nothing<bool>();
360
    }
361
362
9
    CHECK_EQ(mode, BaseObject::TransferMode::kCloneable);
363
9
    uint32_t index = host_objects_.size();
364
9
    if (first_cloned_object_index_ == SIZE_MAX)
365
9
      first_cloned_object_index_ = index;
366
9
    serializer->WriteUint32(index);
367
9
    host_objects_.push_back(host_object);
368
9
    return Just(true);
369
  }
370
371
  Environment* env_;
372
  Local<Context> context_;
373
  Message* msg_;
374
  std::vector<Global<SharedArrayBuffer>> seen_shared_array_buffers_;
375
  std::vector<BaseObjectPtr<BaseObject>> host_objects_;
376
  size_t first_cloned_object_index_ = SIZE_MAX;
377
378
  friend class worker::Message;
379
};
380
381
}  // anonymous namespace
382
383
42865
Maybe<bool> Message::Serialize(Environment* env,
384
                               Local<Context> context,
385
                               Local<Value> input,
386
                               const TransferList& transfer_list_v,
387
                               Local<Object> source_port) {
388
85730
  HandleScope handle_scope(env->isolate());
389
  Context::Scope context_scope(context);
390
391
  // Verify that we're not silently overwriting an existing message.
392
42865
  CHECK(main_message_buf_.is_empty());
393
394
85730
  SerializerDelegate delegate(env, context, this);
395
85730
  ValueSerializer serializer(env->isolate(), &delegate);
396
42865
  delegate.serializer = &serializer;
397
398
85730
  std::vector<Local<ArrayBuffer>> array_buffers;
399
53476
  for (uint32_t i = 0; i < transfer_list_v.length(); ++i) {
400
10621
    Local<Value> entry = transfer_list_v[i];
401
10621
    if (entry->IsObject()) {
402
      // See https://github.com/nodejs/node/pull/30339#issuecomment-552225353
403
      // for details.
404
      bool untransferable;
405
31863
      if (!entry.As<Object>()->HasPrivate(
406
              context,
407
21242
              env->untransferable_object_private_symbol())
408
10621
              .To(&untransferable)) {
409
        return Nothing<bool>();
410
      }
411
10621
      if (untransferable) continue;
412
    }
413
414
    // Currently, we support ArrayBuffers and BaseObjects for which
415
    // GetTransferMode() does not return kUntransferable.
416
10616
    if (entry->IsArrayBuffer()) {
417
23
      Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
418
      // If we cannot render the ArrayBuffer unusable in this Isolate,
419
      // copying the buffer will have to do.
420
      // Note that we can currently transfer ArrayBuffers even if they were
421
      // not allocated by Node’s ArrayBufferAllocator in the first place,
422
      // because we pass the underlying v8::BackingStore around rather than
423
      // raw data *and* an Isolate with a non-default ArrayBuffer allocator
424
      // is always going to outlive any Workers it creates, and so will its
425
      // allocator along with it.
426
45
      if (!ab->IsDetachable()) continue;
427
46
      if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
428
46
          array_buffers.end()) {
429
1
        ThrowDataCloneException(
430
            context,
431
            FIXED_ONE_BYTE_STRING(
432
                env->isolate(),
433
1
                "Transfer list contains duplicate ArrayBuffer"));
434
1
        return Nothing<bool>();
435
      }
436
      // We simply use the array index in the `array_buffers` list as the
437
      // ID that we write into the serialized buffer.
438
22
      uint32_t id = array_buffers.size();
439
22
      array_buffers.push_back(ab);
440
22
      serializer.TransferArrayBuffer(id, ab);
441
22
      continue;
442
21186
    } else if (env->base_object_ctor_template()->HasInstance(entry)) {
443
      // Check if the source MessagePort is being transferred.
444

21186
      if (!source_port.IsEmpty() && entry == source_port) {
445
1
        ThrowDataCloneException(
446
            context,
447
            FIXED_ONE_BYTE_STRING(env->isolate(),
448
1
                                  "Transfer list contains source port"));
449
10
        return Nothing<bool>();
450
      }
451
      BaseObjectPtr<BaseObject> host_object {
452
10592
          Unwrap<BaseObject>(entry.As<Object>()) };
453

31778
      if (env->message_port_constructor_template()->HasInstance(entry) &&
454
21173
          (!host_object ||
455
10586
           static_cast<MessagePort*>(host_object.get())->IsDetached())) {
456
7
        ThrowDataCloneException(
457
            context,
458
            FIXED_ONE_BYTE_STRING(
459
                env->isolate(),
460
7
                "MessagePort in transfer list is already detached"));
461
7
        return Nothing<bool>();
462
      }
463
21170
      if (std::find(delegate.host_objects_.begin(),
464
                    delegate.host_objects_.end(),
465
21170
                    host_object) != delegate.host_objects_.end()) {
466
1
        ThrowDataCloneException(
467
            context,
468
            String::Concat(env->isolate(),
469
                FIXED_ONE_BYTE_STRING(
470
                  env->isolate(),
471
                  "Transfer list contains duplicate "),
472
2
                entry.As<Object>()->GetConstructorName()));
473
1
        return Nothing<bool>();
474
      }
475

10584
      if (host_object && host_object->GetTransferMode() !=
476
10584
              BaseObject::TransferMode::kUntransferable) {
477
10584
        delegate.AddHostObject(host_object);
478
10584
        continue;
479
      }
480
    }
481
482
    THROW_ERR_INVALID_TRANSFER_OBJECT(env);
483
    return Nothing<bool>();
484
  }
485
85710
  if (delegate.AddNestedHostObjects().IsNothing())
486
    return Nothing<bool>();
487
488
42855
  serializer.WriteHeader();
489
85710
  if (serializer.WriteValue(context, input).IsNothing()) {
490
7
    return Nothing<bool>();
491
  }
492
493
42862
  for (Local<ArrayBuffer> ab : array_buffers) {
494
    // If serialization succeeded, we render it inaccessible in this Isolate.
495
28
    std::shared_ptr<BackingStore> backing_store = ab->GetBackingStore();
496
14
    ab->Detach();
497
498
14
    array_buffers_.emplace_back(std::move(backing_store));
499
  }
500
501
85696
  if (delegate.Finish(context).IsNothing())
502
    return Nothing<bool>();
503
504
  // The serializer gave us a buffer allocated using `malloc()`.
505
42848
  std::pair<uint8_t*, size_t> data = serializer.Release();
506
42848
  CHECK_NOT_NULL(data.first);
507
  main_message_buf_ =
508
42848
      MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
509
42848
  return Just(true);
510
}
511
512
void Message::MemoryInfo(MemoryTracker* tracker) const {
513
  tracker->TrackField("array_buffers_", array_buffers_);
514
  tracker->TrackField("shared_array_buffers", shared_array_buffers_);
515
  tracker->TrackField("transferables", transferables_);
516
}
517
518
33078
MessagePortData::MessagePortData(MessagePort* owner) : owner_(owner) { }
519
520
99088
MessagePortData::~MessagePortData() {
521
33027
  CHECK_NULL(owner_);
522
33027
  Disentangle();
523
66061
}
524
525
void MessagePortData::MemoryInfo(MemoryTracker* tracker) const {
526
  Mutex::ScopedLock lock(mutex_);
527
  tracker->TrackField("incoming_messages", incoming_messages_);
528
}
529
530
108883
void MessagePortData::AddToIncomingQueue(Message&& message) {
531
  // This function will be called by other threads.
532
217764
  Mutex::ScopedLock lock(mutex_);
533
108894
  incoming_messages_.emplace_back(std::move(message));
534
535
108882
  if (owner_ != nullptr) {
536
52368
    Debug(owner_, "Adding message to incoming queue");
537
52368
    owner_->TriggerAsync();
538
  }
539
108892
}
540
541
11194
void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
542
11194
  CHECK_NULL(a->sibling_);
543
11194
  CHECK_NULL(b->sibling_);
544
11194
  a->sibling_ = b;
545
11194
  b->sibling_ = a;
546
11194
  a->sibling_mutex_ = b->sibling_mutex_;
547
11194
}
548
549
54912
void MessagePortData::Disentangle() {
550
  // Grab a copy of the sibling mutex, then replace it so that each sibling
551
  // has its own sibling_mutex_ now.
552
109835
  std::shared_ptr<Mutex> sibling_mutex = sibling_mutex_;
553
109833
  Mutex::ScopedLock sibling_lock(*sibling_mutex);
554
54922
  sibling_mutex_ = std::make_shared<Mutex>();
555
556
54921
  MessagePortData* sibling = sibling_;
557
54921
  if (sibling_ != nullptr) {
558
11192
    sibling_->sibling_ = nullptr;
559
11192
    sibling_ = nullptr;
560
  }
561
562
  // We close MessagePorts after disentanglement, so we enqueue a corresponding
563
  // message and trigger the corresponding uv_async_t to let them know that
564
  // this happened.
565
54921
  AddToIncomingQueue(Message());
566
54908
  if (sibling != nullptr) {
567
11188
    sibling->AddToIncomingQueue(Message());
568
  }
569
54922
}
570
571
129884
MessagePort::~MessagePort() {
572
32469
  if (data_) Detach();
573
64940
}
574
575
32516
MessagePort::MessagePort(Environment* env,
576
                         Local<Context> context,
577
32516
                         Local<Object> wrap)
578
  : HandleWrap(env,
579
               wrap,
580
32516
               reinterpret_cast<uv_handle_t*>(&async_),
581
               AsyncWrap::PROVIDER_MESSAGEPORT),
582
32516
    data_(new MessagePortData(this)) {
583
93568
  auto onmessage = [](uv_async_t* handle) {
584
    // Called when data has been put into the queue.
585
30526
    MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
586
30526
    channel->OnMessage();
587
93563
  };
588
589
32516
  CHECK_EQ(uv_async_init(env->event_loop(),
590
                         &async_,
591
                         onmessage), 0);
592
  // Reset later to indicate success of the constructor.
593
32516
  bool succeeded = false;
594
97548
  auto cleanup = OnScopeLeave([&]() { if (!succeeded) Close(); });
595
596
  Local<Value> fn;
597
97548
  if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn))
598
    return;
599
600
32516
  if (fn->IsFunction()) {
601
32511
    Local<Function> init = fn.As<Function>();
602
65022
    if (init->Call(context, wrap, 0, nullptr).IsEmpty())
603
      return;
604
  }
605
606
  Local<Function> emit_message_fn;
607
65032
  if (!GetEmitMessageFunction(context).ToLocal(&emit_message_fn))
608
    return;
609
32516
  emit_message_fn_.Reset(env->isolate(), emit_message_fn);
610
611
32516
  succeeded = true;
612
32516
  Debug(this, "Created message port");
613
}
614
615
21172
bool MessagePort::IsDetached() const {
616

21172
  return data_ == nullptr || IsHandleClosing();
617
}
618
619
73836
void MessagePort::TriggerAsync() {
620
73836
  if (IsHandleClosing()) return;
621
73807
  CHECK_EQ(uv_async_send(&async_), 0);
622
}
623
624
32470
void MessagePort::Close(v8::Local<v8::Value> close_callback) {
625
64942
  Debug(this, "Closing message port, data set = %d", static_cast<int>(!!data_));
626
627
32472
  if (data_) {
628
    // Wrap this call with accessing the mutex, so that TriggerAsync()
629
    // can check IsHandleClosing() without race conditions.
630
64934
    Mutex::ScopedLock sibling_lock(data_->mutex_);
631
32467
    HandleWrap::Close(close_callback);
632
  } else {
633
5
    HandleWrap::Close(close_callback);
634
  }
635
32473
}
636
637
2
void MessagePort::New(const FunctionCallbackInfo<Value>& args) {
638
  // This constructor just throws an error. Unfortunately, we can’t use V8’s
639
  // ConstructorBehavior::kThrow, as that also removes the prototype from the
640
  // class (i.e. makes it behave like an arrow function).
641
2
  Environment* env = Environment::GetCurrent(args);
642
2
  THROW_ERR_CONSTRUCT_CALL_INVALID(env);
643
2
}
644
645
32516
MessagePort* MessagePort::New(
646
    Environment* env,
647
    Local<Context> context,
648
    std::unique_ptr<MessagePortData> data) {
649
  Context::Scope context_scope(context);
650
32516
  Local<FunctionTemplate> ctor_templ = GetMessagePortConstructorTemplate(env);
651
652
  // Construct a new instance, then assign the listener instance and possibly
653
  // the MessagePortData to it.
654
  Local<Object> instance;
655
97548
  if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance))
656
    return nullptr;
657
32516
  MessagePort* port = new MessagePort(env, context, instance);
658
32516
  CHECK_NOT_NULL(port);
659
32516
  if (port->IsHandleClosing()) {
660
    // Construction failed with an exception.
661
    return nullptr;
662
  }
663
664
32516
  if (data) {
665
10690
    port->Detach();
666
10690
    port->data_ = std::move(data);
667
668
    // This lock is here to avoid race conditions with the `owner_` read
669
    // in AddToIncomingQueue(). (This would likely be unproblematic without it,
670
    // but it's better to be safe than sorry.)
671
21380
    Mutex::ScopedLock lock(port->data_->mutex_);
672
10690
    port->data_->owner_ = port;
673
    // If the existing MessagePortData object had pending messages, this is
674
    // the easiest way to run that queue.
675
10690
    port->TriggerAsync();
676
  }
677
32516
  return port;
678
}
679
680
72989
MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
681
                                              bool only_if_receiving) {
682
145987
  Message received;
683
  {
684
    // Get the head of the message queue.
685
126057
    Mutex::ScopedLock lock(data_->mutex_);
686
687
73005
    Debug(this, "MessagePort has message");
688
689

72999
    bool wants_message = receiving_messages_ || !only_if_receiving;
690
    // We have nothing to do if:
691
    // - There are no pending messages
692
    // - We are not intending to receive messages, and the message we would
693
    //   receive is not the final "close" message.
694

146010
    if (data_->incoming_messages_.empty() ||
695
63352
        (!wants_message &&
696
10243
         !data_->incoming_messages_.front().IsCloseMessage())) {
697
39801
      return env()->no_message_symbol();
698
    }
699
700
53097
    received = std::move(data_->incoming_messages_.front());
701
53062
    data_->incoming_messages_.pop_front();
702
  }
703
704
53103
  if (received.IsCloseMessage()) {
705
22204
    Close();
706
22204
    return env()->no_message_symbol();
707
  }
708
709
41991
  if (!env()->can_call_into_js()) return MaybeLocal<Value>();
710
711
41972
  return received.Deserialize(env(), context);
712
}
713
714
31056
void MessagePort::OnMessage() {
715
31056
  Debug(this, "Running MessagePort::OnMessage()");
716
62056
  HandleScope handle_scope(env()->isolate());
717
62112
  Local<Context> context = object(env()->isolate())->CreationContext();
718
719
  size_t processing_limit;
720
  {
721
31056
    Mutex::ScopedLock(data_->mutex_);
722
62112
    processing_limit = std::max(data_->incoming_messages_.size(),
723
93168
                                static_cast<size_t>(1000));
724
  }
725
726
  // data_ can only ever be modified by the owner thread, so no need to lock.
727
  // However, the message port may be transferred while it is processing
728
  // messages, so we need to check that this handle still owns its `data_` field
729
  // on every iteration.
730

114946
  while (data_) {
731
72990
    if (processing_limit-- == 0) {
732
      // Prevent event loop starvation by only processing those messages without
733
      // interruption that were already present when the OnMessage() call was
734
      // first triggered, but at least 1000 messages because otherwise the
735
      // overhead of repeatedly triggering the uv_async_t instance becomes
736
      // noticable, at least on Windows.
737
      // (That might require more investigation by somebody more familiar with
738
      // Windows.)
739
1
      TriggerAsync();
740
1
      return;
741
    }
742
743
114934
    HandleScope handle_scope(env()->isolate());
744

41945
    Context::Scope context_scope(context);
745
72974
    Local<Function> emit_message = PersistentToLocal::Strong(emit_message_fn_);
746
747
    Local<Value> payload;
748
    Local<Value> message_error;
749
    {
750
      // Catch any exceptions from parsing the message itself (not from
751
      // emitting it) as 'messageeror' events.
752
145912
      TryCatchScope try_catch(env());
753
145886
      if (!ReceiveMessage(context, true).ToLocal(&payload)) {
754

6
        if (try_catch.HasCaught() && !try_catch.HasTerminated())
755
6
          message_error = try_catch.Exception();
756
6
        goto reschedule;
757
      }
758
    }
759
145918
    if (payload == env()->no_message_symbol()) break;
760
761
41989
    if (!env()->can_call_into_js()) {
762
      Debug(this, "MessagePort drains queue because !can_call_into_js()");
763
      // In this case there is nothing to do but to drain the current queue.
764
      continue;
765
    }
766
767
83972
    if (MakeCallback(emit_message, 1, &payload).IsEmpty()) {
768
    reschedule:
769
52
      if (!message_error.IsEmpty()) {
770
        // This should become a `messageerror` event in the sense of the
771
        // EventTarget API at some point.
772
        Local<Value> argv[] = {
773
          env()->messageerror_string(),
774
          message_error
775
12
        };
776
6
        USE(MakeCallback(env()->emit_string(), arraysize(argv), argv));
777
      }
778
779
      // Re-schedule OnMessage() execution in case of failure.
780
52
      if (data_)
781
52
        TriggerAsync();
782
52
      return;
783
    }
784
  }
785
}
786
787
32469
void MessagePort::OnClose() {
788
32469
  Debug(this, "MessagePort::OnClose()");
789
32470
  if (data_) {
790
    // Detach() returns move(data_).
791
21888
    Detach()->Disentangle();
792
  }
793
32473
}
794
795
43156
std::unique_ptr<MessagePortData> MessagePort::Detach() {
796
43156
  CHECK(data_);
797
86319
  Mutex::ScopedLock lock(data_->mutex_);
798
43159
  data_->owner_ = nullptr;
799
86324
  return std::move(data_);
800
}
801
802
10581
BaseObject::TransferMode MessagePort::GetTransferMode() const {
803
10581
  if (IsDetached())
804
    return BaseObject::TransferMode::kUntransferable;
805
10581
  return BaseObject::TransferMode::kTransferable;
806
}
807
808
10576
std::unique_ptr<TransferData> MessagePort::TransferForMessaging() {
809
21152
  Close();
810
10576
  return Detach();
811
}
812
813
10345
BaseObjectPtr<BaseObject> MessagePortData::Deserialize(
814
    Environment* env,
815
    Local<Context> context,
816
    std::unique_ptr<TransferData> self) {
817
20690
  return BaseObjectPtr<MessagePort> { MessagePort::New(
818
      env, context,
819
31035
      static_unique_pointer_cast<MessagePortData>(std::move(self))) };
820
}
821
822
42864
Maybe<bool> MessagePort::PostMessage(Environment* env,
823
                                     Local<Value> message_v,
824
                                     const TransferList& transfer_v) {
825
42864
  Isolate* isolate = env->isolate();
826
42864
  Local<Object> obj = object(isolate);
827
42864
  Local<Context> context = obj->CreationContext();
828
829
85728
  Message msg;
830
831
  // Per spec, we need to both check if transfer list has the source port, and
832
  // serialize the input message, even if the MessagePort is closed or detached.
833
834
  Maybe<bool> serialization_maybe =
835
42864
      msg.Serialize(env, context, message_v, transfer_v, obj);
836
42864
  if (data_ == nullptr) {
837
2
    return serialization_maybe;
838
  }
839
42862
  if (serialization_maybe.IsNothing()) {
840
14
    return Nothing<bool>();
841
  }
842
843
85696
  Mutex::ScopedLock lock(*data_->sibling_mutex_);
844
42848
  bool doomed = false;
845
846
  // Check if the target port is posted to itself.
847
42848
  if (data_->sibling_ != nullptr) {
848
53376
    for (const auto& transferable : msg.transferables()) {
849
10595
      if (data_->sibling_ == transferable.get()) {
850
1
        doomed = true;
851
        ProcessEmitWarning(env, "The target port was posted to itself, and "
852
1
                                "the communication channel was lost");
853
1
        break;
854
      }
855
    }
856
  }
857
858

42848
  if (data_->sibling_ == nullptr || doomed)
859
67
    return Just(true);
860
861
42781
  data_->sibling_->AddToIncomingQueue(std::move(msg));
862
42779
  return Just(true);
863
}
864
865
10631
static Maybe<bool> ReadIterable(Environment* env,
866
                                Local<Context> context,
867
                                // NOLINTNEXTLINE(runtime/references)
868
                                TransferList& transfer_list,
869
                                Local<Value> object) {
870
10631
  if (!object->IsObject()) return Just(false);
871
872
10628
  if (object->IsArray()) {
873
10608
    Local<Array> arr = object.As<Array>();
874
10608
    size_t length = arr->Length();
875
10608
    transfer_list.AllocateSufficientStorage(length);
876
21227
    for (size_t i = 0; i < length; i++) {
877
31857
      if (!arr->Get(context, i).ToLocal(&transfer_list[i]))
878
        return Nothing<bool>();
879
    }
880
10608
    return Just(true);
881
  }
882
883
20
  Isolate* isolate = env->isolate();
884
  Local<Value> iterator_method;
885
80
  if (!object.As<Object>()->Get(context, Symbol::GetIterator(isolate))
886
20
      .ToLocal(&iterator_method)) return Nothing<bool>();
887
20
  if (!iterator_method->IsFunction()) return Just(false);
888
889
  Local<Value> iterator;
890
18
  if (!iterator_method.As<Function>()->Call(context, object, 0, nullptr)
891
6
      .ToLocal(&iterator)) return Nothing<bool>();
892
6
  if (!iterator->IsObject()) return Just(false);
893
894
  Local<Value> next;
895
24
  if (!iterator.As<Object>()->Get(context, env->next_string()).ToLocal(&next))
896
    return Nothing<bool>();
897
6
  if (!next->IsFunction()) return Just(false);
898
899
6
  std::vector<Local<Value>> entries;
900
7
  while (env->can_call_into_js()) {
901
    Local<Value> result;
902
15
    if (!next.As<Function>()->Call(context, iterator, 0, nullptr)
903
7
        .ToLocal(&result)) return Nothing<bool>();
904
4
    if (!result->IsObject()) return Just(false);
905
906
    Local<Value> done;
907
16
    if (!result.As<Object>()->Get(context, env->done_string()).ToLocal(&done))
908
      return Nothing<bool>();
909
4
    if (done->BooleanValue(isolate)) break;
910
911
    Local<Value> val;
912
8
    if (!result.As<Object>()->Get(context, env->value_string()).ToLocal(&val))
913
      return Nothing<bool>();
914
2
    entries.push_back(val);
915
  }
916
917
2
  transfer_list.AllocateSufficientStorage(entries.size());
918
2
  std::copy(entries.begin(), entries.end(), &transfer_list[0]);
919
2
  return Just(true);
920
}
921
922
42877
void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
923
42877
  Environment* env = Environment::GetCurrent(args);
924
42877
  Local<Object> obj = args.This();
925
42877
  Local<Context> context = obj->CreationContext();
926
927
42877
  if (args.Length() == 0) {
928
    return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
929
13
                                       "MessagePort.postMessage");
930
  }
931
932

149879
  if (!args[1]->IsNullOrUndefined() && !args[1]->IsObject()) {
933
    // Browsers ignore null or undefined, and otherwise accept an array or an
934
    // options object.
935
    return THROW_ERR_INVALID_ARG_TYPE(env,
936
4
        "Optional transferList argument must be an iterable");
937
  }
938
939
85737
  TransferList transfer_list;
940
85746
  if (args[1]->IsObject()) {
941
    bool was_iterable;
942
21240
    if (!ReadIterable(env, context, transfer_list, args[1]).To(&was_iterable))
943
8
      return;
944
10620
    if (!was_iterable) {
945
      Local<Value> transfer_option;
946
65
      if (!args[1].As<Object>()->Get(context, env->transfer_string())
947
21
          .ToLocal(&transfer_option)) return;
948
26
      if (!transfer_option->IsUndefined()) {
949
22
        if (!ReadIterable(env, context, transfer_list, transfer_option)
950
12
            .To(&was_iterable)) return;
951
10
        if (!was_iterable) {
952
          return THROW_ERR_INVALID_ARG_TYPE(env,
953
7
              "Optional options.transfer argument must be an iterable");
954
        }
955
      }
956
    }
957
  }
958
959
42865
  MessagePort* port = Unwrap<MessagePort>(args.This());
960
  // Even if the backing MessagePort object has already been deleted, we still
961
  // want to serialize the message to ensure spec-compliant behavior w.r.t.
962
  // transfers.
963
42865
  if (port == nullptr) {
964
2
    Message msg;
965
1
    USE(msg.Serialize(env, context, args[0], transfer_list, obj));
966
1
    return;
967
  }
968
969
42864
  port->PostMessage(env, args[0], transfer_list);
970
}
971
972
22834
void MessagePort::Start() {
973
22834
  Debug(this, "Start receiving messages");
974
22834
  receiving_messages_ = true;
975
45668
  Mutex::ScopedLock lock(data_->mutex_);
976
22834
  if (!data_->incoming_messages_.empty())
977
10726
    TriggerAsync();
978
22834
}
979
980
20303
void MessagePort::Stop() {
981
20303
  Debug(this, "Stop receiving messages");
982
20303
  receiving_messages_ = false;
983
20303
}
984
985
22835
void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
986
  MessagePort* port;
987
22836
  ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
988
22835
  if (!port->data_) {
989
1
    return;
990
  }
991
22834
  port->Start();
992
}
993
994
20421
void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
995
  MessagePort* port;
996
40842
  CHECK(args[0]->IsObject());
997
40960
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
998
20305
  if (!port->data_) {
999
2
    return;
1000
  }
1001
20303
  port->Stop();
1002
}
1003
1004
1066
void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
1005
  MessagePort* port;
1006
2132
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
1007
530
  port->OnMessage();
1008
}
1009
1010
11
void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) {
1011
11
  Environment* env = Environment::GetCurrent(args);
1012

41
  if (!args[0]->IsObject() ||
1013
27
      !env->message_port_constructor_template()->HasInstance(args[0])) {
1014
    return THROW_ERR_INVALID_ARG_TYPE(env,
1015
10
        "The \"port\" argument must be a MessagePort instance");
1016
  }
1017
12
  MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1018
6
  if (port == nullptr) {
1019
    // Return 'no messages' for a closed port.
1020
    args.GetReturnValue().Set(
1021
        Environment::GetCurrent(args)->no_message_symbol());
1022
    return;
1023
  }
1024
1025
  MaybeLocal<Value> payload =
1026
12
      port->ReceiveMessage(port->object()->CreationContext(), false);
1027
6
  if (!payload.IsEmpty())
1028
12
    args.GetReturnValue().Set(payload.ToLocalChecked());
1029
}
1030
1031
5
void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
1032
5
  Environment* env = Environment::GetCurrent(args);
1033

20
  if (!args[0]->IsObject() ||
1034
15
      !env->message_port_constructor_template()->HasInstance(args[0])) {
1035
    return THROW_ERR_INVALID_ARG_TYPE(env,
1036
        "The \"port\" argument must be a MessagePort instance");
1037
  }
1038
10
  MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1039
5
  CHECK_NOT_NULL(port);
1040
1041
5
  Local<Value> context_arg = args[1];
1042
  ContextifyContext* context_wrapper;
1043

15
  if (!context_arg->IsObject() ||
1044
      (context_wrapper = ContextifyContext::ContextFromContextifiedSandbox(
1045
10
          env, context_arg.As<Object>())) == nullptr) {
1046
    return THROW_ERR_INVALID_ARG_TYPE(env, "Invalid context argument");
1047
  }
1048
1049
10
  std::unique_ptr<MessagePortData> data;
1050
5
  if (!port->IsDetached())
1051
5
    data = port->Detach();
1052
1053
5
  Context::Scope context_scope(context_wrapper->context());
1054
  MessagePort* target =
1055
5
      MessagePort::New(env, context_wrapper->context(), std::move(data));
1056
5
  if (target != nullptr)
1057
15
    args.GetReturnValue().Set(target->object());
1058
}
1059
1060
10632
void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
1061
10632
  Entangle(a, b->data_.get());
1062
10632
}
1063
1064
11194
void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
1065
11194
  MessagePortData::Entangle(a->data_.get(), b);
1066
11194
}
1067
1068
void MessagePort::MemoryInfo(MemoryTracker* tracker) const {
1069
  tracker->TrackField("data", data_);
1070
  tracker->TrackField("emit_message_fn", emit_message_fn_);
1071
}
1072
1073
33284
Local<FunctionTemplate> GetMessagePortConstructorTemplate(Environment* env) {
1074
  // Factor generating the MessagePort JS constructor into its own piece
1075
  // of code, because it is needed early on in the child environment setup.
1076
33284
  Local<FunctionTemplate> templ = env->message_port_constructor_template();
1077
33284
  if (!templ.IsEmpty())
1078
32900
    return templ;
1079
1080
  {
1081
384
    Local<FunctionTemplate> m = env->NewFunctionTemplate(MessagePort::New);
1082
768
    m->SetClassName(env->message_port_constructor_string());
1083
1152
    m->InstanceTemplate()->SetInternalFieldCount(
1084
384
        MessagePort::kInternalFieldCount);
1085
768
    m->Inherit(HandleWrap::GetConstructorTemplate(env));
1086
1087
384
    env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage);
1088
384
    env->SetProtoMethod(m, "start", MessagePort::Start);
1089
1090
384
    env->set_message_port_constructor_template(m);
1091
  }
1092
1093
384
  return GetMessagePortConstructorTemplate(env);
1094
}
1095
1096
293
JSTransferable::JSTransferable(Environment* env, Local<Object> obj)
1097
293
    : BaseObject(env, obj) {
1098
293
  MakeWeak();
1099
293
}
1100
1101
293
void JSTransferable::New(const FunctionCallbackInfo<Value>& args) {
1102
293
  CHECK(args.IsConstructCall());
1103
586
  new JSTransferable(Environment::GetCurrent(args), args.This());
1104
293
}
1105
1106
6
JSTransferable::TransferMode JSTransferable::GetTransferMode() const {
1107
  // Implement `kClone in this ? kCloneable : kTransferable`.
1108
12
  HandleScope handle_scope(env()->isolate());
1109
12
  errors::TryCatchScope ignore_exceptions(env());
1110
1111
  bool has_clone;
1112
24
  if (!object()->Has(env()->context(),
1113
30
                     env()->messaging_clone_symbol()).To(&has_clone)) {
1114
    return TransferMode::kUntransferable;
1115
  }
1116
1117
6
  return has_clone ? TransferMode::kCloneable : TransferMode::kTransferable;
1118
}
1119
1120
5
std::unique_ptr<TransferData> JSTransferable::TransferForMessaging() {
1121
5
  return TransferOrClone(TransferMode::kTransferable);
1122
}
1123
1124
std::unique_ptr<TransferData> JSTransferable::CloneForMessaging() const {
1125
  return TransferOrClone(TransferMode::kCloneable);
1126
}
1127
1128
5
std::unique_ptr<TransferData> JSTransferable::TransferOrClone(
1129
    TransferMode mode) const {
1130
  // Call `this[symbol]()` where `symbol` is `kClone` or `kTransfer`,
1131
  // which should return an object with `data` and `deserializeInfo` properties;
1132
  // `data` is written to the serializer later, and `deserializeInfo` is stored
1133
  // on the `TransferData` instance as a string.
1134
10
  HandleScope handle_scope(env()->isolate());
1135
5
  Local<Context> context = env()->isolate()->GetCurrentContext();
1136
  Local<Symbol> method_name = mode == TransferMode::kCloneable ?
1137
5
      env()->messaging_clone_symbol() : env()->messaging_transfer_symbol();
1138
1139
  Local<Value> method;
1140
15
  if (!object()->Get(context, method_name).ToLocal(&method)) {
1141
    return {};
1142
  }
1143
5
  if (method->IsFunction()) {
1144
    Local<Value> result_v;
1145
20
    if (!method.As<Function>()->Call(
1146
20
            context, object(), 0, nullptr).ToLocal(&result_v)) {
1147
5
      return {};
1148
    }
1149
1150
5
    if (result_v->IsObject()) {
1151
5
      Local<Object> result = result_v.As<Object>();
1152
      Local<Value> data;
1153
      Local<Value> deserialize_info;
1154

25
      if (!result->Get(context, env()->data_string()).ToLocal(&data) ||
1155
20
          !result->Get(context, env()->deserialize_info_string())
1156
5
              .ToLocal(&deserialize_info)) {
1157
        return {};
1158
      }
1159
10
      Utf8Value deserialize_info_str(env()->isolate(), deserialize_info);
1160
5
      if (*deserialize_info_str == nullptr) return {};
1161
10
      return std::make_unique<Data>(
1162
25
          *deserialize_info_str, Global<Value>(env()->isolate(), data));
1163
    }
1164
  }
1165
1166
  if (mode == TransferMode::kTransferable)
1167
    return TransferOrClone(TransferMode::kCloneable);
1168
  else
1169
    return {};
1170
}
1171
1172
Maybe<BaseObjectList>
1173
5
JSTransferable::NestedTransferables() const {
1174
  // Call `this[kTransferList]()` and return the resulting list of BaseObjects.
1175
10
  HandleScope handle_scope(env()->isolate());
1176
5
  Local<Context> context = env()->isolate()->GetCurrentContext();
1177
5
  Local<Symbol> method_name = env()->messaging_transfer_list_symbol();
1178
1179
  Local<Value> method;
1180
15
  if (!object()->Get(context, method_name).ToLocal(&method)) {
1181
    return Nothing<BaseObjectList>();
1182
  }
1183
5
  if (!method->IsFunction()) return Just(BaseObjectList {});
1184
1185
  Local<Value> list_v;
1186
20
  if (!method.As<Function>()->Call(
1187
20
          context, object(), 0, nullptr).ToLocal(&list_v)) {
1188
    return Nothing<BaseObjectList>();
1189
  }
1190
5
  if (!list_v->IsArray()) return Just(BaseObjectList {});
1191
5
  Local<Array> list = list_v.As<Array>();
1192
1193
10
  BaseObjectList ret;
1194
20
  for (size_t i = 0; i < list->Length(); i++) {
1195
    Local<Value> value;
1196
15
    if (!list->Get(context, i).ToLocal(&value))
1197
      return Nothing<BaseObjectList>();
1198
10
    if (env()->base_object_ctor_template()->HasInstance(value))
1199
5
      ret.emplace_back(Unwrap<BaseObject>(value));
1200
  }
1201
5
  return Just(ret);
1202
}
1203
1204
1
Maybe<bool> JSTransferable::FinalizeTransferRead(
1205
    Local<Context> context, ValueDeserializer* deserializer) {
1206
  // Call `this[kDeserialize](data)` where `data` comes from the return value
1207
  // of `this[kTransfer]()` or `this[kClone]()`.
1208
2
  HandleScope handle_scope(env()->isolate());
1209
  Local<Value> data;
1210
2
  if (!deserializer->ReadValue(context).ToLocal(&data)) return Nothing<bool>();
1211
1212
1
  Local<Symbol> method_name = env()->messaging_deserialize_symbol();
1213
  Local<Value> method;
1214
3
  if (!object()->Get(context, method_name).ToLocal(&method)) {
1215
    return Nothing<bool>();
1216
  }
1217
1
  if (!method->IsFunction()) return Just(true);
1218
1219
4
  if (method.As<Function>()->Call(context, object(), 1, &data).IsEmpty()) {
1220
    return Nothing<bool>();
1221
  }
1222
1
  return Just(true);
1223
}
1224
1225
5
JSTransferable::Data::Data(std::string&& deserialize_info,
1226
5
                           v8::Global<v8::Value>&& data)
1227
5
    : deserialize_info_(std::move(deserialize_info)),
1228
15
      data_(std::move(data)) {}
1229
1230
4
BaseObjectPtr<BaseObject> JSTransferable::Data::Deserialize(
1231
    Environment* env,
1232
    Local<Context> context,
1233
    std::unique_ptr<TransferData> self) {
1234
  // Create the JS wrapper object that will later be filled with data passed to
1235
  // the `[kDeserialize]()` method on it. This split is necessary, because here
1236
  // we need to create an object with the right prototype and internal fields,
1237
  // but the actual JS data stored in the serialized data can only be read at
1238
  // the end of the stream, after the main message has been read.
1239
1240
8
  if (context != env->context()) {
1241
1
    THROW_ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE(env);
1242
1
    return {};
1243
  }
1244
6
  HandleScope handle_scope(env->isolate());
1245
  Local<Value> info;
1246
6
  if (!ToV8Value(context, deserialize_info_).ToLocal(&info)) return {};
1247
1248
  Local<Value> ret;
1249
6
  CHECK(!env->messaging_deserialize_create_object().IsEmpty());
1250
18
  if (!env->messaging_deserialize_create_object()->Call(
1251

16
          context, Null(env->isolate()), 1, &info).ToLocal(&ret) ||
1252
5
      !env->base_object_ctor_template()->HasInstance(ret)) {
1253
2
    return {};
1254
  }
1255
1256
1
  return BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(ret) };
1257
}
1258
1259
5
Maybe<bool> JSTransferable::Data::FinalizeTransferWrite(
1260
    Local<Context> context, ValueSerializer* serializer) {
1261
10
  HandleScope handle_scope(context->GetIsolate());
1262
5
  auto ret = serializer->WriteValue(context, PersistentToLocal::Strong(data_));
1263
5
  data_.Reset();
1264
10
  return ret;
1265
}
1266
1267
namespace {
1268
1269
384
static void SetDeserializerCreateObjectFunction(
1270
    const FunctionCallbackInfo<Value>& args) {
1271
384
  Environment* env = Environment::GetCurrent(args);
1272
768
  CHECK(args[0]->IsFunction());
1273
768
  env->set_messaging_deserialize_create_object(args[0].As<Function>());
1274
384
}
1275
1276
10633
static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
1277
10633
  Environment* env = Environment::GetCurrent(args);
1278
10633
  if (!args.IsConstructCall()) {
1279
1
    THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
1280
1
    return;
1281
  }
1282
1283
21264
  Local<Context> context = args.This()->CreationContext();
1284
10632
  Context::Scope context_scope(context);
1285
1286
10632
  MessagePort* port1 = MessagePort::New(env, context);
1287
10632
  if (port1 == nullptr) return;
1288
10632
  MessagePort* port2 = MessagePort::New(env, context);
1289
10632
  if (port2 == nullptr) {
1290
    port1->Close();
1291
    return;
1292
  }
1293
1294
10632
  MessagePort::Entangle(port1, port2);
1295
1296
53160
  args.This()->Set(context, env->port1_string(), port1->object())
1297
      .Check();
1298
53160
  args.This()->Set(context, env->port2_string(), port2->object())
1299
      .Check();
1300
}
1301
1302
384
static void InitMessaging(Local<Object> target,
1303
                          Local<Value> unused,
1304
                          Local<Context> context,
1305
                          void* priv) {
1306
384
  Environment* env = Environment::GetCurrent(context);
1307
1308
  {
1309
    Local<String> message_channel_string =
1310
384
        FIXED_ONE_BYTE_STRING(env->isolate(), "MessageChannel");
1311
384
    Local<FunctionTemplate> templ = env->NewFunctionTemplate(MessageChannel);
1312
384
    templ->SetClassName(message_channel_string);
1313
768
    target->Set(context,
1314
                message_channel_string,
1315
1152
                templ->GetFunction(context).ToLocalChecked()).Check();
1316
  }
1317
1318
  {
1319
    Local<String> js_transferable_string =
1320
384
        FIXED_ONE_BYTE_STRING(env->isolate(), "JSTransferable");
1321
384
    Local<FunctionTemplate> t = env->NewFunctionTemplate(JSTransferable::New);
1322
768
    t->Inherit(BaseObject::GetConstructorTemplate(env));
1323
384
    t->SetClassName(js_transferable_string);
1324
1152
    t->InstanceTemplate()->SetInternalFieldCount(
1325
384
        JSTransferable::kInternalFieldCount);
1326
768
    target->Set(context,
1327
                js_transferable_string,
1328
1152
                t->GetFunction(context).ToLocalChecked()).Check();
1329
  }
1330
1331
768
  target->Set(context,
1332
              env->message_port_constructor_string(),
1333
768
              GetMessagePortConstructorTemplate(env)
1334
1536
                  ->GetFunction(context).ToLocalChecked()).Check();
1335
1336
  // These are not methods on the MessagePort prototype, because
1337
  // the browser equivalents do not provide them.
1338
384
  env->SetMethod(target, "stopMessagePort", MessagePort::Stop);
1339
384
  env->SetMethod(target, "drainMessagePort", MessagePort::Drain);
1340
384
  env->SetMethod(target, "receiveMessageOnPort", MessagePort::ReceiveMessage);
1341
  env->SetMethod(target, "moveMessagePortToContext",
1342
384
                 MessagePort::MoveToContext);
1343
  env->SetMethod(target, "setDeserializerCreateObjectFunction",
1344
384
                 SetDeserializerCreateObjectFunction);
1345
1346
  {
1347
768
    Local<Function> domexception = GetDOMException(context).ToLocalChecked();
1348
    target
1349
768
        ->Set(context,
1350
              FIXED_ONE_BYTE_STRING(env->isolate(), "DOMException"),
1351
1152
              domexception)
1352
        .Check();
1353
  }
1354
384
}
1355
1356
4920
static void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
1357
4920
  registry->Register(MessageChannel);
1358
4920
  registry->Register(JSTransferable::New);
1359
4920
  registry->Register(MessagePort::New);
1360
4920
  registry->Register(MessagePort::PostMessage);
1361
4920
  registry->Register(MessagePort::Start);
1362
4920
  registry->Register(MessagePort::Stop);
1363
4920
  registry->Register(MessagePort::Drain);
1364
4920
  registry->Register(MessagePort::ReceiveMessage);
1365
4920
  registry->Register(MessagePort::MoveToContext);
1366
4920
  registry->Register(SetDeserializerCreateObjectFunction);
1367
4920
}
1368
1369
}  // anonymous namespace
1370
1371
}  // namespace worker
1372
}  // namespace node
1373
1374
4989
NODE_MODULE_CONTEXT_AWARE_INTERNAL(messaging, node::worker::InitMessaging)
1375

19890
NODE_MODULE_EXTERNAL_REFERENCE(messaging,
1376
                               node::worker::RegisterExternalReferences)