1 |
|
|
#ifndef SRC_NODE_MESSAGING_H_ |
2 |
|
|
#define SRC_NODE_MESSAGING_H_ |
3 |
|
|
|
4 |
|
|
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS |
5 |
|
|
|
6 |
|
|
#include "env.h" |
7 |
|
|
#include "node_mutex.h" |
8 |
|
|
#include "v8.h" |
9 |
|
|
#include <deque> |
10 |
|
|
#include <string> |
11 |
|
|
#include <unordered_map> |
12 |
|
|
#include <set> |
13 |
|
|
|
14 |
|
|
namespace node { |
15 |
|
|
namespace worker { |
16 |
|
|
|
17 |
|
|
class MessagePortData; |
18 |
|
|
class MessagePort; |
19 |
|
|
|
20 |
|
|
typedef MaybeStackBuffer<v8::Local<v8::Value>, 8> TransferList; |
21 |
|
|
|
22 |
|
|
// Used to represent the in-flight structure of an object that is being |
23 |
|
|
// transferred or cloned using postMessage(). |
24 |
|
|
class TransferData : public MemoryRetainer { |
25 |
|
|
public: |
26 |
|
|
// Deserialize this object on the receiving end after a .postMessage() call. |
27 |
|
|
// - `context` may not be the same as `env->context()`. This method should |
28 |
|
|
// not produce JS objects coming from Contexts other than `context`. |
29 |
|
|
// - `self` is a unique_ptr for the object that this is being called on. |
30 |
|
|
// - The return value is treated like a `Maybe`, i.e. if `nullptr` is |
31 |
|
|
// returned, any further deserialization of the message is stopped and |
32 |
|
|
// control is returned to the event loop or JS as soon as possible. |
33 |
|
|
virtual BaseObjectPtr<BaseObject> Deserialize( |
34 |
|
|
Environment* env, |
35 |
|
|
v8::Local<v8::Context> context, |
36 |
|
|
std::unique_ptr<TransferData> self) = 0; |
37 |
|
|
// FinalizeTransferWrite() is the counterpart to |
38 |
|
|
// BaseObject::FinalizeTransferRead(). It is called right after the transfer |
39 |
|
|
// data was created, and defaults to doing nothing. After this function, |
40 |
|
|
// this object should not hold any more Isolate-specific data. |
41 |
|
|
virtual v8::Maybe<bool> FinalizeTransferWrite( |
42 |
|
|
v8::Local<v8::Context> context, v8::ValueSerializer* serializer); |
43 |
|
|
}; |
44 |
|
|
|
45 |
|
|
// Represents a single communication message. |
46 |
|
|
class Message : public MemoryRetainer { |
47 |
|
|
public: |
48 |
|
|
// Create a Message with a specific underlying payload, in the format of the |
49 |
|
|
// V8 ValueSerializer API. If `payload` is empty, this message indicates |
50 |
|
|
// that the receiving message port should close itself. |
51 |
|
|
explicit Message(MallocedBuffer<char>&& payload = MallocedBuffer<char>()); |
52 |
|
196814 |
~Message() = default; |
53 |
|
|
|
54 |
|
|
Message(Message&& other) = default; |
55 |
|
|
Message& operator=(Message&& other) = default; |
56 |
|
|
Message& operator=(const Message&) = delete; |
57 |
|
|
Message(const Message&) = delete; |
58 |
|
|
|
59 |
|
|
// Whether this is a message indicating that the port is to be closed. |
60 |
|
|
// This is the last message to be received by a MessagePort. |
61 |
|
|
bool IsCloseMessage() const; |
62 |
|
|
|
63 |
|
|
// Deserialize the contained JS value. May only be called once, and only |
64 |
|
|
// after Serialize() has been called (e.g. by another thread). |
65 |
|
|
v8::MaybeLocal<v8::Value> Deserialize( |
66 |
|
|
Environment* env, |
67 |
|
|
v8::Local<v8::Context> context, |
68 |
|
|
v8::Local<v8::Value>* port_list = nullptr); |
69 |
|
|
|
70 |
|
|
// Serialize a JS value, and optionally transfer objects, into this message. |
71 |
|
|
// The Message object retains ownership of all transferred objects until |
72 |
|
|
// deserialization. |
73 |
|
|
// The source_port parameter, if provided, will make Serialize() throw a |
74 |
|
|
// "DataCloneError" DOMException if source_port is found in transfer_list. |
75 |
|
|
v8::Maybe<bool> Serialize(Environment* env, |
76 |
|
|
v8::Local<v8::Context> context, |
77 |
|
|
v8::Local<v8::Value> input, |
78 |
|
|
const TransferList& transfer_list, |
79 |
|
|
v8::Local<v8::Object> source_port = |
80 |
|
|
v8::Local<v8::Object>()); |
81 |
|
|
|
82 |
|
|
// Internal method of Message that is called when a new SharedArrayBuffer |
83 |
|
|
// object is encountered in the incoming value's structure. |
84 |
|
|
void AddSharedArrayBuffer(std::shared_ptr<v8::BackingStore> backing_store); |
85 |
|
|
// Internal method of Message that is called once serialization finishes |
86 |
|
|
// and that transfers ownership of `data` to this message. |
87 |
|
|
void AddTransferable(std::unique_ptr<TransferData>&& data); |
88 |
|
|
// Internal method of Message that is called when a new WebAssembly.Module |
89 |
|
|
// object is encountered in the incoming value's structure. |
90 |
|
|
uint32_t AddWASMModule(v8::CompiledWasmModule&& mod); |
91 |
|
|
|
92 |
|
|
// The host objects that will be transferred, as recorded by Serialize() |
93 |
|
|
// (e.g. MessagePorts). |
94 |
|
|
// Used for warning user about posting the target MessagePort to itself, |
95 |
|
|
// which will as a side effect destroy the communication channel. |
96 |
|
61636 |
const std::vector<std::unique_ptr<TransferData>>& transferables() const { |
97 |
|
61636 |
return transferables_; |
98 |
|
|
} |
99 |
|
19 |
bool has_transferables() const { |
100 |
✓✗✗✓
|
19 |
return !transferables_.empty() || !array_buffers_.empty(); |
101 |
|
|
} |
102 |
|
|
|
103 |
|
|
void MemoryInfo(MemoryTracker* tracker) const override; |
104 |
|
|
|
105 |
|
2 |
SET_MEMORY_INFO_NAME(Message) |
106 |
|
2 |
SET_SELF_SIZE(Message) |
107 |
|
|
|
108 |
|
|
private: |
109 |
|
|
MallocedBuffer<char> main_message_buf_; |
110 |
|
|
// TODO(addaleax): Make this a std::variant to save storage size in the common |
111 |
|
|
// case (which is that all of these vectors are empty) once that is available |
112 |
|
|
// with C++17. |
113 |
|
|
std::vector<std::shared_ptr<v8::BackingStore>> array_buffers_; |
114 |
|
|
std::vector<std::shared_ptr<v8::BackingStore>> shared_array_buffers_; |
115 |
|
|
std::vector<std::unique_ptr<TransferData>> transferables_; |
116 |
|
|
std::vector<v8::CompiledWasmModule> wasm_modules_; |
117 |
|
|
|
118 |
|
|
friend class MessagePort; |
119 |
|
|
}; |
120 |
|
|
|
121 |
|
|
class SiblingGroup final : public std::enable_shared_from_this<SiblingGroup> { |
122 |
|
|
public: |
123 |
|
|
// Named SiblingGroup, Used for one-to-many BroadcastChannels. |
124 |
|
|
static std::shared_ptr<SiblingGroup> Get(const std::string& name); |
125 |
|
|
|
126 |
|
|
// Anonymous SiblingGroup, Used for one-to-one MessagePort pairs. |
127 |
|
12197 |
SiblingGroup() = default; |
128 |
|
|
explicit SiblingGroup(const std::string& name); |
129 |
|
|
~SiblingGroup(); |
130 |
|
|
|
131 |
|
|
// Dispatches the Message to the collection of associated |
132 |
|
|
// ports. If there is more than one destination port and |
133 |
|
|
// the Message contains transferables, Dispatch will fail. |
134 |
|
|
// Returns Just(true) if successful and the message was |
135 |
|
|
// dispatched to at least one destination. Returns Just(false) |
136 |
|
|
// if there were no destinations. Returns Nothing<bool>() |
137 |
|
|
// if there was an error. If error is not nullptr, it will |
138 |
|
|
// be set to an error message or warning message as appropriate. |
139 |
|
|
v8::Maybe<bool> Dispatch( |
140 |
|
|
MessagePortData* source, |
141 |
|
|
std::shared_ptr<Message> message, |
142 |
|
|
std::string* error = nullptr); |
143 |
|
|
|
144 |
|
|
void Entangle(MessagePortData* data); |
145 |
|
|
void Entangle(std::initializer_list<MessagePortData*> data); |
146 |
|
|
void Disentangle(MessagePortData* data); |
147 |
|
|
|
148 |
|
|
const std::string& name() const { return name_; } |
149 |
|
|
|
150 |
|
147818 |
size_t size() const { return ports_.size(); } |
151 |
|
|
|
152 |
|
|
private: |
153 |
|
|
const std::string name_; |
154 |
|
|
RwLock group_mutex_; // Protects ports_. |
155 |
|
|
std::set<MessagePortData*> ports_; |
156 |
|
|
|
157 |
|
|
static void CheckSiblingGroup(const std::string& name); |
158 |
|
|
|
159 |
|
|
using Map = |
160 |
|
|
std::unordered_map<std::string, std::weak_ptr<SiblingGroup>>; |
161 |
|
|
|
162 |
|
|
static Mutex groups_mutex_; |
163 |
|
|
static Map groups_; |
164 |
|
|
}; |
165 |
|
|
|
166 |
|
|
// This contains all data for a `MessagePort` instance that is not tied to |
167 |
|
|
// a specific Environment/Isolate/event loop, for easier transfer between those. |
168 |
|
|
class MessagePortData : public TransferData { |
169 |
|
|
public: |
170 |
|
|
explicit MessagePortData(MessagePort* owner); |
171 |
|
|
~MessagePortData() override; |
172 |
|
|
|
173 |
|
|
MessagePortData(MessagePortData&& other) = delete; |
174 |
|
|
MessagePortData& operator=(MessagePortData&& other) = delete; |
175 |
|
|
MessagePortData(const MessagePortData& other) = delete; |
176 |
|
|
MessagePortData& operator=(const MessagePortData& other) = delete; |
177 |
|
|
|
178 |
|
|
// Add a message to the incoming queue and notify the receiver. |
179 |
|
|
// This may be called from any thread. |
180 |
|
|
void AddToIncomingQueue(std::shared_ptr<Message> message); |
181 |
|
|
v8::Maybe<bool> Dispatch( |
182 |
|
|
std::shared_ptr<Message> message, |
183 |
|
|
std::string* error = nullptr); |
184 |
|
|
|
185 |
|
|
// Turns `a` and `b` into siblings, i.e. connects the sending side of one |
186 |
|
|
// to the receiving side of the other. This is not thread-safe. |
187 |
|
|
static void Entangle(MessagePortData* a, MessagePortData* b); |
188 |
|
|
|
189 |
|
|
// Removes any possible sibling. This is thread-safe (it acquires both |
190 |
|
|
// `sibling_mutex_` and `mutex_`), and has to be because it is called once |
191 |
|
|
// the corresponding JS handle handle wants to close |
192 |
|
|
// which can happen on either side of a worker. |
193 |
|
|
void Disentangle(); |
194 |
|
|
|
195 |
|
|
void MemoryInfo(MemoryTracker* tracker) const override; |
196 |
|
|
BaseObjectPtr<BaseObject> Deserialize( |
197 |
|
|
Environment* env, |
198 |
|
|
v8::Local<v8::Context> context, |
199 |
|
|
std::unique_ptr<TransferData> self) override; |
200 |
|
|
|
201 |
|
4 |
SET_MEMORY_INFO_NAME(MessagePortData) |
202 |
|
4 |
SET_SELF_SIZE(MessagePortData) |
203 |
|
|
|
204 |
|
|
private: |
205 |
|
|
// This mutex protects all fields below it, with the exception of |
206 |
|
|
// sibling_. |
207 |
|
|
mutable Mutex mutex_; |
208 |
|
|
// TODO(addaleax): Make this a std::variant<std::shared_ptr, std::unique_ptr> |
209 |
|
|
// once that is available with C++17, because std::shared_ptr comes with |
210 |
|
|
// overhead that is only necessary for BroadcastChannel. |
211 |
|
|
std::deque<std::shared_ptr<Message>> incoming_messages_; |
212 |
|
|
MessagePort* owner_ = nullptr; |
213 |
|
|
std::shared_ptr<SiblingGroup> group_; |
214 |
|
|
friend class MessagePort; |
215 |
|
|
friend class SiblingGroup; |
216 |
|
|
}; |
217 |
|
|
|
218 |
|
|
// A message port that receives messages from other threads, including |
219 |
|
|
// the uv_async_t handle that is used to notify the current event loop of |
220 |
|
|
// new incoming messages. |
221 |
|
|
class MessagePort : public HandleWrap { |
222 |
|
|
private: |
223 |
|
|
// Create a new MessagePort. The `context` argument specifies the Context |
224 |
|
|
// instance that is used for creating the values emitted from this port. |
225 |
|
|
// This is called by MessagePort::New(), which is the public API used for |
226 |
|
|
// creating MessagePort instances. |
227 |
|
|
MessagePort(Environment* env, |
228 |
|
|
v8::Local<v8::Context> context, |
229 |
|
|
v8::Local<v8::Object> wrap); |
230 |
|
|
|
231 |
|
|
public: |
232 |
|
|
~MessagePort() override; |
233 |
|
|
|
234 |
|
|
// Create a new message port instance, optionally over an existing |
235 |
|
|
// `MessagePortData` object. |
236 |
|
|
static MessagePort* New(Environment* env, |
237 |
|
|
v8::Local<v8::Context> context, |
238 |
|
|
std::unique_ptr<MessagePortData> data = {}, |
239 |
|
|
std::shared_ptr<SiblingGroup> sibling_group = {}); |
240 |
|
|
|
241 |
|
|
// Send a message, i.e. deliver it into the sibling's incoming queue. |
242 |
|
|
// If this port is closed, or if there is no sibling, this message is |
243 |
|
|
// serialized with transfers, then silently discarded. |
244 |
|
|
v8::Maybe<bool> PostMessage(Environment* env, |
245 |
|
|
v8::Local<v8::Context> context, |
246 |
|
|
v8::Local<v8::Value> message, |
247 |
|
|
const TransferList& transfer); |
248 |
|
|
|
249 |
|
|
// Start processing messages on this port as a receiving end. |
250 |
|
|
void Start(); |
251 |
|
|
// Stop processing messages on this port as a receiving end. |
252 |
|
|
void Stop(); |
253 |
|
|
|
254 |
|
|
/* constructor */ |
255 |
|
|
static void New(const v8::FunctionCallbackInfo<v8::Value>& args); |
256 |
|
|
/* prototype methods */ |
257 |
|
|
static void PostMessage(const v8::FunctionCallbackInfo<v8::Value>& args); |
258 |
|
|
static void Start(const v8::FunctionCallbackInfo<v8::Value>& args); |
259 |
|
|
static void Stop(const v8::FunctionCallbackInfo<v8::Value>& args); |
260 |
|
|
static void CheckType(const v8::FunctionCallbackInfo<v8::Value>& args); |
261 |
|
|
static void Drain(const v8::FunctionCallbackInfo<v8::Value>& args); |
262 |
|
|
static void ReceiveMessage(const v8::FunctionCallbackInfo<v8::Value>& args); |
263 |
|
|
|
264 |
|
|
/* static */ |
265 |
|
|
static void MoveToContext(const v8::FunctionCallbackInfo<v8::Value>& args); |
266 |
|
|
|
267 |
|
|
// Turns `a` and `b` into siblings, i.e. connects the sending side of one |
268 |
|
|
// to the receiving side of the other. This is not thread-safe. |
269 |
|
|
static void Entangle(MessagePort* a, MessagePort* b); |
270 |
|
|
static void Entangle(MessagePort* a, MessagePortData* b); |
271 |
|
|
|
272 |
|
|
// Detach this port's data for transferring. After this, the MessagePortData |
273 |
|
|
// is no longer associated with this handle, although it can still receive |
274 |
|
|
// messages. |
275 |
|
|
std::unique_ptr<MessagePortData> Detach(); |
276 |
|
|
|
277 |
|
|
void Close( |
278 |
|
|
v8::Local<v8::Value> close_callback = v8::Local<v8::Value>()) override; |
279 |
|
|
|
280 |
|
|
// Returns true if either data_ has been freed, or if the handle is being |
281 |
|
|
// closed. Equivalent to the [[Detached]] internal slot in the HTML Standard. |
282 |
|
|
// |
283 |
|
|
// If checking if a JavaScript MessagePort object is detached, this method |
284 |
|
|
// alone is often not enough, since the backing C++ MessagePort object may |
285 |
|
|
// have been deleted already. For all intents and purposes, an object with a |
286 |
|
|
// NULL pointer to the C++ MessagePort object is also detached. |
287 |
|
|
inline bool IsDetached() const; |
288 |
|
|
|
289 |
|
|
TransferMode GetTransferMode() const override; |
290 |
|
|
std::unique_ptr<TransferData> TransferForMessaging() override; |
291 |
|
|
|
292 |
|
|
void MemoryInfo(MemoryTracker* tracker) const override; |
293 |
|
4 |
SET_MEMORY_INFO_NAME(MessagePort) |
294 |
|
4 |
SET_SELF_SIZE(MessagePort) |
295 |
|
|
|
296 |
|
|
private: |
297 |
|
|
enum class MessageProcessingMode { |
298 |
|
|
kNormalOperation, |
299 |
|
|
kForceReadMessages |
300 |
|
|
}; |
301 |
|
|
|
302 |
|
|
void OnClose() override; |
303 |
|
|
void OnMessage(MessageProcessingMode mode); |
304 |
|
|
void TriggerAsync(); |
305 |
|
|
v8::MaybeLocal<v8::Value> ReceiveMessage( |
306 |
|
|
v8::Local<v8::Context> context, |
307 |
|
|
MessageProcessingMode mode, |
308 |
|
|
v8::Local<v8::Value>* port_list = nullptr); |
309 |
|
|
|
310 |
|
|
std::unique_ptr<MessagePortData> data_ = nullptr; |
311 |
|
|
bool receiving_messages_ = false; |
312 |
|
|
uv_async_t async_; |
313 |
|
|
v8::Global<v8::Function> emit_message_fn_; |
314 |
|
|
|
315 |
|
|
friend class MessagePortData; |
316 |
|
|
}; |
317 |
|
|
|
318 |
|
|
// Provide a base class from which JS classes that should be transferable or |
319 |
|
|
// cloneable by postMesssage() can inherit. |
320 |
|
|
// See e.g. FileHandle in internal/fs/promises.js for an example. |
321 |
|
|
class JSTransferable : public BaseObject { |
322 |
|
|
public: |
323 |
|
|
JSTransferable(Environment* env, v8::Local<v8::Object> obj); |
324 |
|
|
static void New(const v8::FunctionCallbackInfo<v8::Value>& args); |
325 |
|
|
|
326 |
|
|
TransferMode GetTransferMode() const override; |
327 |
|
|
std::unique_ptr<TransferData> TransferForMessaging() override; |
328 |
|
|
std::unique_ptr<TransferData> CloneForMessaging() const override; |
329 |
|
|
v8::Maybe<std::vector<BaseObjectPtr<BaseObject>>> |
330 |
|
|
NestedTransferables() const override; |
331 |
|
|
v8::Maybe<bool> FinalizeTransferRead( |
332 |
|
|
v8::Local<v8::Context> context, |
333 |
|
|
v8::ValueDeserializer* deserializer) override; |
334 |
|
|
|
335 |
|
24 |
SET_NO_MEMORY_INFO() |
336 |
|
24 |
SET_MEMORY_INFO_NAME(JSTransferable) |
337 |
|
24 |
SET_SELF_SIZE(JSTransferable) |
338 |
|
|
|
339 |
|
|
private: |
340 |
|
|
std::unique_ptr<TransferData> TransferOrClone(TransferMode mode) const; |
341 |
|
|
|
342 |
|
|
class Data : public TransferData { |
343 |
|
|
public: |
344 |
|
|
Data(std::string&& deserialize_info, v8::Global<v8::Value>&& data); |
345 |
|
|
|
346 |
|
|
BaseObjectPtr<BaseObject> Deserialize( |
347 |
|
|
Environment* env, |
348 |
|
|
v8::Local<v8::Context> context, |
349 |
|
|
std::unique_ptr<TransferData> self) override; |
350 |
|
|
v8::Maybe<bool> FinalizeTransferWrite( |
351 |
|
|
v8::Local<v8::Context> context, |
352 |
|
|
v8::ValueSerializer* serializer) override; |
353 |
|
|
|
354 |
|
|
SET_NO_MEMORY_INFO() |
355 |
|
|
SET_MEMORY_INFO_NAME(JSTransferableTransferData) |
356 |
|
|
SET_SELF_SIZE(Data) |
357 |
|
|
|
358 |
|
|
private: |
359 |
|
|
std::string deserialize_info_; |
360 |
|
|
v8::Global<v8::Value> data_; |
361 |
|
|
}; |
362 |
|
|
}; |
363 |
|
|
|
364 |
|
|
v8::Local<v8::FunctionTemplate> GetMessagePortConstructorTemplate( |
365 |
|
|
Environment* env); |
366 |
|
|
|
367 |
|
|
} // namespace worker |
368 |
|
|
} // namespace node |
369 |
|
|
|
370 |
|
|
#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS |
371 |
|
|
|
372 |
|
|
|
373 |
|
|
#endif // SRC_NODE_MESSAGING_H_ |